use crate::dataflow::channels::{Bundle, Message};
use crate::progress::Timestamp;
use crate::dataflow::operators::Capability;
use crate::communication::Push;
pub struct Buffer<T, D, P: Push<Bundle<T, D>>> {
time: Option<T>, buffer: Vec<D>, pusher: P,
}
impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
pub fn new(pusher: P) -> Buffer<T, D, P> {
Buffer {
time: None,
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
pusher,
}
}
pub fn session(&mut self, time: &T) -> Session<T, D, P> {
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
self.time = Some(time.clone());
Session { buffer: self }
}
pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<T, D, P> where T: Timestamp {
if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
self.time = Some(cap.time().clone());
AutoflushSession {
buffer: self,
_capability: cap,
}
}
pub fn inner(&mut self) -> &mut P { &mut self.pusher }
pub fn cease(&mut self) {
self.flush();
self.pusher.push(&mut None);
}
fn flush(&mut self) {
if !self.buffer.is_empty() {
let time = self.time.as_ref().unwrap().clone();
Message::push_at(&mut self.buffer, time, &mut self.pusher);
}
}
fn give(&mut self, data: D) {
self.buffer.push(data);
if self.buffer.len() == self.buffer.capacity() {
self.flush();
}
}
fn give_vec(&mut self, vector: &mut Vec<D>) {
if !self.buffer.is_empty() {
self.flush();
}
let time = self.time.as_ref().expect("Buffer::give_vec(): time is None.").clone();
Message::push_at(vector, time, &mut self.pusher);
}
}
pub struct Session<'a, T, D, P: Push<Bundle<T, D>>+'a> where T: Eq+Clone+'a, D: 'a {
buffer: &'a mut Buffer<T, D, P>,
}
impl<'a, T, D, P: Push<Bundle<T, D>>+'a> Session<'a, T, D, P> where T: Eq+Clone+'a, D: 'a {
#[inline(always)]
pub fn give(&mut self, data: D) {
self.buffer.give(data);
}
#[inline(always)]
pub fn give_iterator<I: Iterator<Item=D>>(&mut self, iter: I) {
for item in iter {
self.give(item);
}
}
#[inline(always)]
pub fn give_vec(&mut self, message: &mut Vec<D>) {
if message.len() > 0 {
self.buffer.give_vec(message);
}
}
}
pub struct AutoflushSession<'a, T: Timestamp, D, P: Push<Bundle<T, D>>+'a> where
T: Eq+Clone+'a, D: 'a {
buffer: &'a mut Buffer<T, D, P>,
_capability: Capability<T>,
}
impl<'a, T: Timestamp, D, P: Push<Bundle<T, D>>+'a> AutoflushSession<'a, T, D, P> where T: Eq+Clone+'a, D: 'a {
#[inline(always)]
pub fn give(&mut self, data: D) {
self.buffer.give(data);
}
#[inline(always)]
pub fn give_iterator<I: Iterator<Item=D>>(&mut self, iter: I) {
for item in iter {
self.give(item);
}
}
#[inline(always)]
pub fn give_content(&mut self, message: &mut Vec<D>) {
if message.len() > 0 {
self.buffer.give_vec(message);
}
}
}
impl<'a, T: Timestamp, D, P: Push<Bundle<T, D>>+'a> Drop for AutoflushSession<'a, T, D, P> where T: Eq+Clone+'a, D: 'a {
fn drop(&mut self) {
self.buffer.cease();
}
}