use std::default::Default;
use std::rc::Rc;
use std::cell::RefCell;
use crate::scheduling::{Schedule, Activations};
use crate::progress::{Source, Target};
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
use crate::Container;
use crate::dataflow::{Stream, Scope, OperatorSlot};
use crate::dataflow::channels::pushers::Tee;
use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
#[derive(Debug)]
pub struct OperatorShape {
name: String, notify: Vec<FrontierInterest>, peers: usize, inputs: usize, outputs: usize, }
impl OperatorShape {
fn new(name: String, peers: usize) -> Self {
OperatorShape {
name,
notify: Vec::new(),
peers,
inputs: 0,
outputs: 0,
}
}
pub fn inputs(&self) -> usize { self.inputs }
pub fn outputs(&self) -> usize { self.outputs }
}
#[derive(Debug)]
pub struct OperatorBuilder<'scope, T: Timestamp> {
scope: Scope<'scope, T>,
slot: OperatorSlot<'scope, T>,
address: Rc<[usize]>, shape: OperatorShape,
summary: Connectivity<<T as Timestamp>::Summary>,
}
impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
let slot = scope.reserve_operator();
let address = slot.addr();
let peers = scope.peers();
OperatorBuilder {
scope,
slot,
address,
shape: OperatorShape::new(name, peers),
summary: vec![],
}
}
pub fn index(&self) -> usize { self.slot.index() }
pub fn global(&self) -> usize { self.slot.identifier() }
pub fn shape(&self) -> &OperatorShape { &self.shape }
pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
self.shape.notify[input] = notify;
}
pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> P::Puller
where
P: ParallelizationContract<T, C>
{
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
self.new_input_connection(stream, pact, connection)
}
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> P::Puller
where
P: ParallelizationContract<T, C>,
I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)>,
{
let channel_id = self.scope.worker().new_identifier();
let logging = self.scope.worker().logging();
let (sender, receiver) = pact.connect(self.scope.worker(), channel_id, Rc::clone(&self.address), logging);
let target = Target::new(self.slot.index(), self.shape.inputs);
stream.connect_to(target, sender, channel_id);
self.shape.inputs += 1;
self.shape.notify.push(FrontierInterest::Always);
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
self.summary.push(connectivity);
receiver
}
pub fn new_output<C: Container>(&mut self) -> (Tee<T, C>, Stream<'scope, T, C>) {
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}
pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<T, C>, Stream<'scope, T, C>)
where
I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)>,
{
let new_output = self.shape.outputs;
self.shape.outputs += 1;
let (target, registrar) = Tee::new();
let source = Source::new(self.slot.index(), new_output);
let stream = Stream::new(source, registrar, self.scope);
for (input, entry) in connection {
self.summary[input].add_port(new_output, entry);
}
(target, stream)
}
pub fn build<L>(self, logic: L)
where
L: FnMut(&mut SharedProgress<T>)->bool+'static
{
self.build_boxed(Box::new(logic));
}
pub fn build_boxed(self, logic: Box<dyn FnMut(&mut SharedProgress<T>)->bool>) {
self.build_typed(logic);
}
pub fn build_typed<L>(self, logic: L)
where
L: FnMut(&mut SharedProgress<T>)->bool+'static
{
let inputs = self.shape.inputs;
let outputs = self.shape.outputs;
let operator = OperatorCore {
shape: self.shape,
address: self.address,
activations: self.scope.activations(),
logic,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
summary: self.summary,
};
self.slot.install(Box::new(operator));
}
pub fn operator_info(&self) -> OperatorInfo {
OperatorInfo::new(self.index(), self.global(), Rc::clone(&self.address))
}
}
struct OperatorCore<T, L>
where
T: Timestamp,
L: FnMut(&mut SharedProgress<T>)->bool+'static,
{
shape: OperatorShape,
address: Rc<[usize]>,
logic: L,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
activations: Rc<RefCell<Activations>>,
summary: Connectivity<T::Summary>,
}
impl<T, L> Schedule for OperatorCore<T, L>
where
T: Timestamp,
L: FnMut(&mut SharedProgress<T>)->bool+'static,
{
fn name(&self) -> &str { &self.shape.name }
fn path(&self) -> &[usize] { &self.address[..] }
fn schedule(&mut self) -> bool {
let shared_progress = &mut *self.shared_progress.borrow_mut();
(self.logic)(shared_progress)
}
}
impl<T, L> Operate<T> for OperatorCore<T, L>
where
T: Timestamp,
L: FnMut(&mut SharedProgress<T>)->bool+'static,
{
fn inputs(&self) -> usize { self.shape.inputs }
fn outputs(&self) -> usize { self.shape.outputs }
fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
self.activations.borrow_mut().activate(&self.address[..]);
self.shared_progress
.borrow_mut()
.internals
.iter_mut()
.for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
(self.summary.clone(), Rc::clone(&self.shared_progress), self)
}
fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
}