use std::hash::Hash;
use std::default::Default;
use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData;
use progress::{Timestamp, CountMap};
use progress::nested::subgraph::{Source, Target};
use progress::nested::product::Product;
use {Data, Push};
use dataflow::channels::pushers::{Counter, Tee};
use dataflow::channels::Content;
use dataflow::{Stream, Scope, ScopeParent};
use dataflow::scopes::Child;
use dataflow::operators::delay::*;
pub trait Enter<G: Scope, T: Timestamp, D: Data> {
fn enter<'a>(&self, &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D>;
}
pub trait EnterAt<G: Scope, T: Timestamp, D: Data> where G::Timestamp: Hash, T: Hash {
fn enter_at<'a, F:Fn(&D)->T+'static>(&self, scope: &Child<'a, G, T>, initial: F) -> Stream<Child<'a, G, T>, D> ;
}
impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, T, D>> EnterAt<G, T, D> for E
where G::Timestamp: Hash, T: Hash {
fn enter_at<'a, F:Fn(&D)->T+'static>(&self, scope: &Child<'a, G, T>, initial: F) ->
Stream<Child<'a, G, T>, D> {
self.enter(scope).delay(move |datum, time| Product::new(time.outer.clone(), initial(datum)))
}
}
impl<T: Timestamp, G: Scope, D: Data> Enter<G, T, D> for Stream<G, D> {
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D> {
let (targets, registrar) = Tee::<Product<G::Timestamp, T>, D>::new();
let produced = Rc::new(RefCell::new(CountMap::new()));
let ingress = IngressNub { targets: Counter::new(targets, produced.clone()) };
let scope_index = scope.subgraph.borrow().index;
let input_index = scope.subgraph.borrow_mut().new_input(produced);
let channel_id = scope.clone().new_identifier();
self.connect_to(Target { index: scope_index, port: input_index }, ingress, channel_id);
Stream::new(Source { index: 0, port: input_index }, registrar, scope.clone())
}
}
pub trait Leave<G: Scope, D: Data> {
fn leave(&self) -> Stream<G, D>;
}
impl<'a, G: Scope, D: Data, T: Timestamp> Leave<G, D> for Stream<Child<'a, G, T>, D> {
fn leave(&self) -> Stream<G, D> {
let scope = self.scope();
let output_index = scope.subgraph.borrow_mut().new_output();
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
let channel_id = scope.clone().new_identifier();
self.connect_to(Target { index: 0, port: output_index }, EgressNub { targets: targets, phantom: PhantomData }, channel_id);
let subgraph_index = scope.subgraph.borrow().index;
Stream::new(
Source { index: subgraph_index, port: output_index },
registrar,
scope.parent.clone()
)
}
}
struct IngressNub<TOuter: Timestamp, TInner: Timestamp, TData: Data> {
targets: Counter<Product<TOuter, TInner>, TData, Tee<Product<TOuter, TInner>, TData>>,
}
impl<TOuter: Timestamp, TInner: Timestamp, TData: Data> Push<(TOuter, Content<TData>)> for IngressNub<TOuter, TInner, TData> {
fn push(&mut self, message: &mut Option<(TOuter, Content<TData>)>) {
if let Some((ref time, ref mut data)) = *message {
let content = ::std::mem::replace(data, Content::Typed(Vec::new()));
let mut message = Some((Product::new(time.clone(), Default::default()), content));
self.targets.push(&mut message);
if let Some((_, content)) = message {
*data = content;
}
}
else { self.targets.done(); }
}
}
struct EgressNub<TOuter: Timestamp, TInner: Timestamp, TData: Data> {
targets: Tee<TOuter, TData>,
phantom: PhantomData<TInner>,
}
impl<TOuter, TInner, TData> Push<(Product<TOuter, TInner>, Content<TData>)> for EgressNub<TOuter, TInner, TData>
where TOuter: Timestamp, TInner: Timestamp, TData: Data {
fn push(&mut self, message: &mut Option<(Product<TOuter, TInner>, Content<TData>)>) {
if let Some((ref time, ref mut data)) = *message {
let content = ::std::mem::replace(data, Content::Typed(Vec::new()));
let mut message = Some((time.outer.clone(), content));
self.targets.push(&mut message);
if let Some((_, content)) = message {
*data = content;
}
}
else { self.targets.done(); }
}
}