use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::operate::SharedProgress;
use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::Container;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers;
use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
use crate::progress::operate::{FrontierInterest, PortConnectivity};
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
#[derive(Debug)]
pub struct OperatorBuilder<'scope, T: Timestamp> {
builder: OperatorBuilderRaw<'scope, T>,
frontier: Vec<MutableAntichain<T>>,
consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Vec<Rc<RefCell<PortConnectivity<<T as Timestamp>::Summary>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
}
impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
OperatorBuilder {
builder: OperatorBuilderRaw::new(name, scope),
frontier: Vec::new(),
consumed: Vec::new(),
internal: Rc::new(RefCell::new(Vec::new())),
summaries: Vec::new(),
produced: Vec::new(),
}
}
pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
self.builder.set_notify_for(input, notify);
}
pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> InputHandleCore<T, C, P::Puller>
where
P: ParallelizationContract<T, C> {
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
self.new_input_connection(stream, pact, connection)
}
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> InputHandleCore<T, C, P::Puller>
where
P: ParallelizationContract<T, C>,
I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
{
let puller = self.builder.new_input_connection(stream, pact, connection.clone());
let input = PullCounter::new(puller);
self.frontier.push(MutableAntichain::new());
self.consumed.push(Rc::clone(input.consumed()));
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
self.summaries.push(Rc::clone(&shared_summary));
new_input_handle(input, Rc::clone(&self.internal), shared_summary)
}
pub fn new_output<C: Container>(&mut self) -> (pushers::Output<T, C>, Stream<'scope, T, C>) {
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}
pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (pushers::Output<T, C>, Stream<'scope, T, C>)
where
I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
{
let new_output = self.shape().outputs();
let (tee, stream) = self.builder.new_output_connection(connection.clone());
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
self.internal.borrow_mut().push(Rc::clone(&internal));
let counter = PushCounter::new(tee);
self.produced.push(Rc::clone(counter.produced()));
for (input, entry) in connection {
self.summaries[input].borrow_mut().add_port(new_output, entry);
}
(pushers::Output::new(counter, internal, new_output), stream)
}
pub fn build<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<T>>) -> L,
L: FnMut(&[MutableAntichain<T>])+'static
{
self.build_reschedule(|caps| {
let mut logic = constructor(caps);
move |frontier| { logic(frontier); false }
})
}
pub fn build_reschedule<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<T>>) -> L,
L: FnMut(&[MutableAntichain<T>])->bool+'static
{
self.build_reschedule_boxed(Box::new(|caps| -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> { Box::new(constructor(caps)) }));
}
pub fn build_reschedule_boxed<'a>(self, constructor: Box<dyn FnOnce(Vec<Capability<T>>) -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> + 'a>) {
self.build_reschedule_typed(constructor);
}
pub fn build_reschedule_typed<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<T>>) -> L,
L: FnMut(&[MutableAntichain<T>])->bool+'static
{
let mut logic = constructor(self.mint_capabilities());
let mut bookkeeping = ProgressBookkeeping {
frontier: self.frontier,
consumed: self.consumed,
internal: self.internal,
produced: self.produced,
};
let raw_logic =
move |progress: &mut SharedProgress<T>| {
bookkeeping.drain_frontiers(progress);
let result = logic(bookkeeping.frontiers());
bookkeeping.publish_progress(progress);
result
};
self.builder.build_typed(raw_logic);
}
fn mint_capabilities(&self) -> Vec<Capability<T>> {
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
for batch in self.internal.borrow().iter() {
capabilities.push(Capability::new(T::minimum(), Rc::clone(batch)));
batch.borrow_mut().clear();
}
capabilities
}
pub fn index(&self) -> usize { self.builder.index() }
pub fn global(&self) -> usize { self.builder.global() }
pub fn shape(&self) -> &OperatorShape { self.builder.shape() }
pub fn operator_info(&self) -> OperatorInfo { self.builder.operator_info() }
}
struct ProgressBookkeeping<T: Timestamp> {
frontier: Vec<MutableAntichain<T>>,
consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
}
impl<T: Timestamp> ProgressBookkeeping<T> {
#[inline(always)] fn frontiers(&self) -> &[MutableAntichain<T>] { &self.frontier[..] }
fn drain_frontiers(&mut self, progress: &mut SharedProgress<T>) {
for (progress, frontier) in progress.frontiers.iter_mut().zip(self.frontier.iter_mut()) {
frontier.update_iter(progress.drain());
}
}
fn publish_progress(&self, progress: &mut SharedProgress<T>) {
for (progress, consumed) in progress.consumeds.iter_mut().zip(self.consumed.iter()) {
consumed.borrow_mut().drain_into(progress);
}
let self_internal_borrow = self.internal.borrow_mut();
for index in 0 .. self_internal_borrow.len() {
let mut borrow = self_internal_borrow[index].borrow_mut();
progress.internals[index].extend(borrow.drain());
}
for (progress, produced) in progress.produceds.iter_mut().zip(self.produced.iter()) {
produced.borrow_mut().drain_into(progress);
}
}
}
#[cfg(test)]
mod tests {
use crate::dataflow::operators::generic::OutputBuilder;
#[test]
#[should_panic]
fn incorrect_capabilities() {
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
crate::example(|scope| {
let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
let (output1, _stream1) = builder.new_output::<Vec<()>>();
let (output2, _stream2) = builder.new_output::<Vec<()>>();
let mut output1 = OutputBuilder::from(output1);
let mut output2 = OutputBuilder::from(output2);
builder.build(move |capabilities| {
move |_frontiers| {
let mut output_handle1 = output1.activate();
let mut output_handle2 = output2.activate();
output_handle2.session(&capabilities[0]);
output_handle1.session(&capabilities[1]);
}
});
})
}
#[test]
fn correct_capabilities() {
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
crate::example(|scope| {
let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
let (output1, _stream1) = builder.new_output::<Vec<()>>();
let (output2, _stream2) = builder.new_output::<Vec<()>>();
let mut output1 = OutputBuilder::from(output1);
let mut output2 = OutputBuilder::from(output2);
builder.build(move |mut capabilities| {
move |_frontiers| {
let mut output_handle1 = output1.activate();
let mut output_handle2 = output2.activate();
if !capabilities.is_empty() {
output_handle1.session(&capabilities[0]);
output_handle2.session(&capabilities[1]);
capabilities.clear();
}
}
});
"Hello".to_owned()
});
}
}