use timely::progress::Timestamp;
use timely::dataflow::operators::Input as TimelyInput;
use timely::dataflow::operators::input::Handle;
use timely::dataflow::scopes::ScopeParent;
use ::{Data, Diff};
use collection::{Collection, AsCollection};
pub trait Input : TimelyInput {
fn new_collection<D, R>(&mut self) -> (InputSession<<Self as ScopeParent>::Timestamp, D, R>, Collection<Self, D, R>)
where D: Data, R: Diff;
fn new_collection_from<I>(&mut self, data: I) -> (InputSession<<Self as ScopeParent>::Timestamp, I::Item, isize>, Collection<Self, I::Item, isize>)
where I: IntoIterator+'static, I::Item: Data;
fn new_collection_from_raw<D, R, I>(&mut self, data: I) -> (InputSession<<Self as ScopeParent>::Timestamp, D, R>, Collection<Self, D, R>)
where I: IntoIterator<Item=(D,<Self as ScopeParent>::Timestamp,R)>+'static, D: Data, R: Diff+Data;
}
use lattice::Lattice;
impl<G: TimelyInput> Input for G where <G as ScopeParent>::Timestamp: Lattice {
fn new_collection<D, R>(&mut self) -> (InputSession<<G as ScopeParent>::Timestamp, D, R>, Collection<G, D, R>)
where D: Data, R: Diff{
let (handle, stream) = self.new_input();
(InputSession::from(handle), stream.as_collection())
}
fn new_collection_from<I>(&mut self, data: I) -> (InputSession<<G as ScopeParent>::Timestamp, I::Item, isize>, Collection<G, I::Item, isize>)
where I: IntoIterator+'static, I::Item: Data {
self.new_collection_from_raw(data.into_iter().map(|d| (d, Default::default(), 1)))
}
fn new_collection_from_raw<D,R,I>(&mut self, data: I) -> (InputSession<<G as ScopeParent>::Timestamp, D, R>, Collection<G, D, R>)
where
D: Data,
R: Diff+Data,
I: IntoIterator<Item=(D,<Self as ScopeParent>::Timestamp,R)>+'static,
{
use timely::dataflow::operators::ToStream;
let (handle, stream) = self.new_input();
let source = data.to_stream(self).as_collection();
(InputSession::from(handle), stream.as_collection().concat(&source))
}}
pub struct InputSession<T: Timestamp+Clone, D: Data, R: Diff> {
time: T,
buffer: Vec<(D, T, R)>,
handle: Handle<T,(D,T,R)>,
}
impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
pub fn insert(&mut self, element: D) { self.update(element, 1); }
pub fn remove(&mut self, element: D) { self.update(element,-1); }
}
impl<T: Timestamp+Clone, D: Data, R: Diff> InputSession<T, D, R> {
pub fn to_collection<G: TimelyInput>(&mut self, scope: &mut G) -> Collection<G, D, R>
where
G: ScopeParent<Timestamp=T>,
{
scope
.input_from(&mut self.handle)
.as_collection()
}
pub fn new() -> Self {
let handle: Handle<T,_> = Handle::new();
InputSession {
time: handle.time().clone(),
buffer: Vec::new(),
handle,
}
}
pub fn from(handle: Handle<T,(D,T,R)>) -> Self {
InputSession {
time: handle.time().clone(),
buffer: Vec::new(),
handle,
}
}
pub fn update(&mut self, element: D, change: R) {
if self.buffer.len() == self.buffer.capacity() {
if self.buffer.len() > 0 {
self.handle.send_batch(&mut self.buffer);
}
self.buffer.reserve(1024);
}
self.buffer.push((element, self.time.clone(), change));
}
pub fn update_at(&mut self, element: D, time: T, change: R) {
assert!(self.time.less_equal(&time));
if self.buffer.len() == self.buffer.capacity() {
if self.buffer.len() > 0 {
self.handle.send_batch(&mut self.buffer);
}
self.buffer.reserve(1024);
}
self.buffer.push((element, time, change));
}
pub fn flush(&mut self) {
self.handle.send_batch(&mut self.buffer);
if self.handle.epoch().less_than(&self.time) {
self.handle.advance_to(self.time.clone());
}
}
pub fn advance_to(&mut self, time: T) {
assert!(self.handle.epoch().less_equal(&time));
assert!(&self.time.less_equal(&time));
self.time = time;
}
pub fn epoch(&self) -> &T { &self.time }
pub fn time(&self) -> &T { &self.time }
pub fn close(self) { }
}
impl<T: Timestamp+Clone, D: Data, R: Diff> Drop for InputSession<T, D, R> {
fn drop(&mut self) {
self.flush();
}
}