use timely_communication::Allocate;
use timely::progress::Timestamp;
use timely::progress::timestamp::RootTimestamp;
use timely::progress::nested::product::Product;
use timely::dataflow::operators::Input as TimelyInput;
use timely::dataflow::operators::input::Handle;
use timely::dataflow::scopes::{Child, Root};
use ::{Data, Diff};
use collection::{Collection, AsCollection};
pub trait Input<'a, A: Allocate, T: Timestamp+Ord> {
fn new_collection<D, R>(&mut self) -> (InputSession<T, D, R>, Collection<Child<'a, Root<A>, T>, D, R>)
where D: Data, R: Diff;
fn new_collection_from<I>(&mut self, data: I) -> (InputSession<T, I::Item, isize>, Collection<Child<'a, Root<A>, T>, I::Item, isize>)
where I: IntoIterator+'static, I::Item: Data;
}
impl<'a, A: Allocate, T: Timestamp+Ord> Input<'a, A, T> for Child<'a, Root<A>, T> {
fn new_collection<D, R>(&mut self) -> (InputSession<T, D, R>, Collection<Child<'a, Root<A>, T>, D, R>)
where D: Data, R: Diff{
let (handle, stream) = self.new_input();
(InputSession::from(handle), stream.as_collection())
}
fn new_collection_from<I>(&mut self, data: I) -> (InputSession<T, I::Item, isize>, Collection<Child<'a, Root<A>, T>, I::Item, isize>)
where I: IntoIterator+'static, I::Item: Data {
use timely::dataflow::operators::ToStream;
let (handle, stream) = self.new_input();
let source = data.into_iter().map(|d| (d, Default::default(), 1)).to_stream(self).as_collection();
(InputSession::from(handle), stream.as_collection().concat(&source))
}
}
pub struct InputSession<T: Timestamp+Clone, D: Data, R: Diff> {
time: Product<RootTimestamp, T>,
buffer: Vec<(D, Product<RootTimestamp, T>, R)>,
handle: Handle<T,(D,Product<RootTimestamp, T>,R)>,
}
impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
pub fn insert(&mut self, element: D) { self.update(element, 1); }
pub fn remove(&mut self, element: D) { self.update(element,-1); }
}
impl<'a, T: Timestamp+Clone, D: Data, R: Diff> InputSession<T, D, R> {
pub fn from(handle: Handle<T,(D,Product<RootTimestamp, T>,R)>) -> Self {
InputSession {
time: handle.time().clone(),
buffer: Vec::new(),
handle: handle,
}
}
pub fn update(&mut self, element: D, change: R) {
if self.buffer.len() == self.buffer.capacity() {
if self.buffer.len() > 0 {
self.handle.send_batch(&mut self.buffer);
}
self.buffer.reserve(1024);
}
self.buffer.push((element, self.time.clone(), change));
}
pub fn flush(&mut self) {
self.handle.send_batch(&mut self.buffer);
if self.handle.epoch().less_than(&self.time.inner) {
self.handle.advance_to(self.time.inner.clone());
}
}
pub fn advance_to(&mut self, time: T) {
assert!(self.handle.epoch().less_equal(&time));
assert!(&self.time.inner.less_equal(&time));
self.time = Product::new(RootTimestamp, time);
}
pub fn epoch(&self) -> &T { &self.time.inner }
pub fn time(&self) -> &Product<RootTimestamp, T> { &self.time }
pub fn close(self) { }
}
impl<T: Timestamp+Clone, D: Data, R: Diff> Drop for InputSession<T, D, R> {
fn drop(&mut self) {
self.flush();
}
}