use std::rc::Rc;
use std::cell::RefCell;
use std::time::{Instant, Duration};
use std::collections::VecDeque;
use crate::{ExchangeData, PartialOrder};
use crate::worker::Worker;
use crate::dataflow::channels::pact::Exchange;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::operators::generic::operator::Operator;
use crate::scheduling::activate::Activator;
struct CatchupActivator {
pub catchup_until: Option<Duration>,
activator: Activator,
}
impl CatchupActivator {
pub fn activate(&self) {
self.activator.activate();
}
}
pub struct Sequencer<T> {
activator: Rc<RefCell<Option<CatchupActivator>>>,
send: Rc<RefCell<VecDeque<T>>>, recv: Rc<RefCell<VecDeque<T>>>, }
impl<T: ExchangeData+Clone> Sequencer<T> {
pub fn new(worker: &mut Worker, timer: Instant) -> Self {
Sequencer::preloaded(worker, timer, VecDeque::new())
}
pub fn preloaded(worker: &mut Worker, timer: Instant, preload: VecDeque<T>) -> Self {
let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
let recv = Rc::new(RefCell::new(preload));
let send_weak = Rc::downgrade(&send);
let recv_weak = Rc::downgrade(&recv);
let activator = Rc::new(RefCell::new(None));
let activator_source = Rc::clone(&activator);
let activator_sink = Rc::clone(&activator);
worker.dataflow::<Duration,_,_>(move |scope| {
let peers = scope.peers();
let mut recvd = Vec::new();
let mut counter = 0;
source(scope, "SequenceInput", move |capability, info| {
activator_source
.borrow_mut()
.replace(CatchupActivator {
activator: scope.activator_for(info.address),
catchup_until: None,
});
let mut capability = Some(capability);
move |output| {
if let Some(send_queue) = send_weak.upgrade() {
let capability = capability.as_mut().expect("Capability unavailable");
capability.downgrade(&timer.elapsed());
let mut session = output.session(&capability);
let mut borrow = send_queue.borrow_mut();
for element in borrow.drain(..) {
for worker_index in 0 .. peers {
session.give((worker_index, counter, element.clone()));
}
counter += 1;
}
let mut activator_borrow = activator_source.borrow_mut();
let activator = activator_borrow.as_mut().unwrap();
if let Some(t) = activator.catchup_until {
if capability.time().less_than(&t) {
activator.activate();
} else {
activator.catchup_until = None;
}
}
} else {
capability = None;
}
}
})
.sink(
Exchange::new(|x: &(usize, usize, T)| x.0 as u64),
"SequenceOutput",
move |(input, frontier)| {
input.for_each_time(|time, data| {
for (worker, counter, element) in data.flat_map(|d| d.drain(..)) {
recvd.push(((*time.time(), worker, counter), element));
}
});
recvd.sort_unstable_by(|x,y| x.0.cmp(&y.0));
if let Some(last) = recvd.last() {
let mut activator_borrow = activator_sink.borrow_mut();
let activator = activator_borrow.as_mut().unwrap();
activator.catchup_until = Some((last.0).0);
activator.activate();
}
let count = recvd.iter().filter(|&((ref time, _, _), _)| !frontier.less_equal(time)).count();
let iter = recvd.drain(..count);
if let Some(recv_queue) = recv_weak.upgrade() {
recv_queue.borrow_mut().extend(iter.map(|(_,elem)| elem));
}
}
);
});
Sequencer { activator, send, recv, }
}
pub fn push(&mut self, element: T) {
self.send.borrow_mut().push_back(element);
self.activator.borrow_mut().as_mut().unwrap().activate();
}
}
impl<T> Iterator for Sequencer<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.recv.borrow_mut().pop_front()
}
}
impl<T> Drop for Sequencer<T> {
fn drop(&mut self) {
self.activator
.borrow()
.as_ref()
.expect("Sequencer.activator unavailable")
.activate()
}
}