use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use crate::scheduling::{Schedule, Activations, ActivateOnDrop};
use crate::progress::frontier::Antichain;
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
use crate::progress::Source;
use crate::progress::ChangeBatch;
use crate::Data;
use crate::dataflow::channels::pushers::{Tee, Counter as PushCounter};
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
use crate::dataflow::operators::Capability;
use crate::dataflow::operators::capability::mint as mint_capability;
use crate::dataflow::{Stream, Scope};
pub trait UnorderedInput<G: Scope> {
fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
}
impl<G: Scope> UnorderedInput<G> for G {
fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>) {
let (output, registrar) = Tee::<G::Timestamp, D>::new();
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
let cap = mint_capability(Default::default(), internal.clone());
let counter = PushCounter::new(output);
let produced = counter.produced().clone();
let peers = self.peers();
let index = self.allocate_operator_index();
let mut address = self.addr();
address.push(index);
let cap = ActivateCapability::new(cap, &address[..], self.activations().clone());
let helper = UnorderedHandle::new(counter);
self.add_operator_with_index(Box::new(UnorderedOperator {
name: "UnorderedInput".to_owned(),
address,
shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
internal,
produced,
peers,
}), index);
((helper, cap), Stream::new(Source { index, port: 0 }, registrar, self.clone()))
}
}
struct UnorderedOperator<T:Timestamp> {
name: String,
address: Vec<usize>,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
internal: Rc<RefCell<ChangeBatch<T>>>,
produced: Rc<RefCell<ChangeBatch<T>>>,
peers: usize,
}
impl<T:Timestamp> Schedule for UnorderedOperator<T> {
fn name(&self) -> &str { &self.name }
fn path(&self) -> &[usize] { &self.address[..] }
fn schedule(&mut self) -> bool {
let shared_progress = &mut *self.shared_progress.borrow_mut();
self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
false
}
}
impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
let mut borrow = self.internal.borrow_mut();
for (time, count) in borrow.drain() {
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
}
(Vec::new(), self.shared_progress.clone())
}
fn notify_me(&self) -> bool { false }
}
pub struct UnorderedHandle<T: Timestamp, D: Data> {
buffer: PushBuffer<T, D, PushCounter<T, D, Tee<T, D>>>,
}
impl<T: Timestamp, D: Data> UnorderedHandle<T, D> {
fn new(pusher: PushCounter<T, D, Tee<T, D>>) -> UnorderedHandle<T, D> {
UnorderedHandle {
buffer: PushBuffer::new(pusher),
}
}
pub fn session<'b>(&'b mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'b, T, D, PushCounter<T, D, Tee<T, D>>>> {
ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone())
}
}
#[derive(Clone)]
pub struct ActivateCapability<T: Timestamp> {
capability: Capability<T>,
address: Rc<Vec<usize>>,
activations: Rc<RefCell<Activations>>,
}
impl<T: Timestamp> ActivateCapability<T> {
pub fn new(capability: Capability<T>, address: &[usize], activations: Rc<RefCell<Activations>>) -> Self {
Self {
capability,
address: Rc::new(address.to_vec()),
activations,
}
}
pub fn time(&self) -> &T {
self.capability.time()
}
pub fn delayed(&self, time: &T) -> Self {
ActivateCapability {
capability: self.capability.delayed(time),
address: self.address.clone(),
activations: self.activations.clone(),
}
}
pub fn downgrade(&mut self, time: &T) {
self.capability.downgrade(time);
self.activations.borrow_mut().activate(&self.address[..]);
}
}
impl<T: Timestamp> Drop for ActivateCapability<T> {
fn drop(&mut self) {
self.activations.borrow_mut().activate(&self.address[..]);
}
}