use timely::dataflow::operators::unordered_input::{ActivateCapability, UnorderedHandle};
use timely::progress::Timestamp;
use differential_dataflow::difference::Monoid;
use differential_dataflow::Data;
pub struct UnorderedSession<T: Timestamp + Clone, D: Data, R: Monoid> {
time: T,
cap: ActivateCapability<T>,
buffer: Vec<(D, T, R)>,
handle: UnorderedHandle<T, (D, T, R)>,
}
impl<T: Timestamp + Clone, D: Data, R: Monoid> UnorderedSession<T, D, R> {
pub fn from(handle: UnorderedHandle<T, (D, T, R)>, cap: ActivateCapability<T>) -> Self {
UnorderedSession {
time: cap.time().clone(),
cap,
buffer: Vec::new(),
handle,
}
}
pub fn update(&mut self, element: D, change: R) {
if self.buffer.len() == self.buffer.capacity() {
if !self.buffer.is_empty() {
self.handle
.session(self.cap.clone())
.give_iterator(self.buffer.drain(..));
}
self.buffer.reserve(1024);
}
self.buffer.push((element, self.cap.time().clone(), change));
}
pub fn update_at(&mut self, element: D, time: T, change: R) {
assert!(self.cap.time().less_equal(&time));
if self.buffer.len() == self.buffer.capacity() {
if !self.buffer.is_empty() {
self.handle
.session(self.cap.clone())
.give_iterator(self.buffer.drain(..));
}
self.buffer.reserve(1024);
}
self.buffer.push((element, time, change));
}
pub fn flush(&mut self) {
self.handle
.session(self.cap.clone())
.give_iterator(self.buffer.drain(..));
if self.cap.time().less_than(&self.time) {
self.cap = self.cap.delayed(&self.time);
}
}
#[inline]
pub fn advance_to(&mut self, time: T) {
assert!(self.epoch().less_equal(&time));
assert!(&self.time.less_equal(&time));
self.time = time;
}
pub fn epoch(&self) -> &T {
&self.time
}
pub fn close(self) {}
}
impl<T: Timestamp + Clone, D: Data, R: Monoid> Drop for UnorderedSession<T, D, R> {
fn drop(&mut self) {
self.flush();
}
}