use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::operate::PortConnectivity;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::channels::Message;
use crate::communication::Pull;
use crate::{Container, ContainerBuilder, Accountable};
use crate::container::{CapacityContainerBuilder, PushInto};
use crate::dataflow::operators::InputCapability;
use crate::dataflow::operators::capability::CapabilityTrait;
pub struct InputHandleCore<T: Timestamp, C, P: Pull<Message<T, C>>> {
pull_counter: PullCounter<T, C, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
staging: VecDeque<(InputCapability<T>, C)>,
staged: Vec<C>,
}
impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
#[inline]
fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
let internal = &self.internal;
let summaries = &self.summaries;
self.pull_counter.next_guarded().map(|(guard, bundle)| {
(InputCapability::new(Rc::clone(internal), Rc::clone(summaries), guard), &mut bundle.data)
})
}
pub fn for_each<F>(&mut self, mut logic: F) where F: FnMut(InputCapability<T>, &mut C) {
while let Some((cap, data)) = self.next() { logic(cap, data); }
}
pub fn for_each_time<F>(&mut self, mut logic: F) where F: FnMut(InputCapability<T>, std::slice::IterMut::<C>), C: Default {
while let Some((cap, data)) = self.next() {
let data = std::mem::take(data);
self.staging.push_back((cap, data));
}
self.staging.make_contiguous().sort_unstable_by(|x,y| x.0.time().cmp(&y.0.time()));
while let Some((cap, data)) = self.staging.pop_front() {
self.staged.push(data);
let more = self.staging.iter().take_while(|(c,_)| c.time() == cap.time()).count();
self.staged.extend(self.staging.drain(..more).map(|(_,d)| d));
logic(cap, self.staged.iter_mut());
self.staged.clear();
}
}
}
pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
pull_counter: PullCounter<T, C, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
) -> InputHandleCore<T, C, P> {
InputHandleCore {
pull_counter,
internal,
summaries,
staging: Default::default(),
staged: Default::default(),
}
}
pub struct OutputBuilder<T: Timestamp, CB: ContainerBuilder> {
output: crate::dataflow::channels::pushers::Output<T, CB::Container>,
builder: CB,
}
impl<T: Timestamp, CB: ContainerBuilder> OutputBuilder<T, CB> {
pub fn from(output: crate::dataflow::channels::pushers::Output<T, CB::Container>) -> Self {
Self { output, builder: CB::default() }
}
pub fn activate<'a>(&'a mut self) -> OutputBuilderSession<'a, T, CB> {
OutputBuilderSession {
session: self.output.activate(),
builder: &mut self.builder,
}
}
}
pub struct OutputBuilderSession<'a, T: Timestamp, CB: ContainerBuilder> {
session: crate::dataflow::channels::pushers::OutputSession<'a, T, CB::Container>,
builder: &'a mut CB,
}
impl<'a, T: Timestamp, CB: ContainerBuilder> OutputBuilderSession<'a, T, CB> {
#[inline]
pub fn output_index(&self) -> usize { self.session.output_index()}
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, capability: &'b CT) -> Session<'a, 'b, T, CB, CT> where 'a: 'b {
debug_assert!(self.session.valid(capability));
Session {
buffer: self,
capability,
}
}
}
impl<'a, T: Timestamp, C: Container> OutputBuilderSession<'a, T, CapacityContainerBuilder<C>> {
pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, capability: &'b CT) -> Session<'a, 'b, T, CapacityContainerBuilder<C>, CT> where 'a: 'b {
debug_assert!(self.session.valid(capability));
Session {
buffer: self,
capability,
}
}
}
pub struct Session<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> {
buffer: &'b mut OutputBuilderSession<'a, T, CB>,
capability: &'b CT,
}
impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Session<'a, 'b, T, CB, CT> {
pub fn builder(&mut self) -> &mut CB { &mut self.buffer.builder }
#[inline] pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.buffer.builder.push_into(data);
self.extract_and_send();
}
#[inline] pub fn give_iterator<I>(&mut self, iter: I) where I: Iterator, CB: PushInto<I::Item> {
for item in iter { self.buffer.builder.push_into(item); }
self.extract_and_send();
}
#[inline] pub fn give_container(&mut self, container: &mut CB::Container) {
self.buffer.session.give(&self.capability, container);
}
#[inline] pub fn give_containers<'c>(&mut self, containers: impl Iterator<Item = &'c mut CB::Container>) {
for container in containers { self.buffer.session.give(&self.capability, container); }
}
pub fn extract_and_send(&mut self) {
while let Some(container) = self.buffer.builder.extract() {
self.buffer.session.give(&self.capability, container);
}
}
pub fn flush(&mut self) {
while let Some(container) = self.buffer.builder.finish() {
self.buffer.session.give(&self.capability, container);
}
}
}
impl<'a: 'b, 'b, T: Timestamp, CB: ContainerBuilder, CT: CapabilityTrait<T>> Drop for Session<'a, 'b, T, CB, CT> {
fn drop(&mut self) { self.flush() }
}