use std::rc::Rc;
use std::cell::RefCell;
use std::time::{Instant, Duration};
use std::collections::VecDeque;
use ::{communication::Allocate, ExchangeData};
use worker::Worker;
use dataflow::channels::pact::Exchange;
use dataflow::operators::generic::operator::source;
use dataflow::operators::generic::operator::Operator;
pub struct Sequencer<T> {
send: Rc<RefCell<VecDeque<T>>>, recv: Rc<RefCell<VecDeque<T>>>, }
impl<T: Ord+ExchangeData> Sequencer<T> {
pub fn new<A: Allocate>(worker: &mut Worker<A>, timer: Instant) -> Self {
let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
let recv = Rc::new(RefCell::new(VecDeque::new()));
let send_weak = Rc::downgrade(&send);
let recv_weak = Rc::downgrade(&recv);
worker.dataflow::<Duration,_,_>(move |dataflow| {
let peers = dataflow.peers();
let mut recvd = Vec::new();
let mut vector = Vec::new();
source(dataflow, "SequenceInput", move |capability| {
let mut capability = Some(capability);
move |output| {
if let Some(send_queue) = send_weak.upgrade() {
let capability = capability.as_mut().expect("Capability unavailable");
capability.downgrade(&timer.elapsed());
let mut session = output.session(&capability);
let mut borrow = send_queue.borrow_mut();
for element in borrow.drain(..) {
for worker_index in 0 .. peers {
session.give((worker_index, element.clone()));
}
}
}
else {
capability = None;
}
}
})
.sink(
Exchange::new(|x: &(usize, T)| x.0 as u64),
"SequenceOutput",
move |input| {
input.for_each(|time, data| {
data.swap(&mut vector);
for (_worker, element) in vector.drain(..) {
recvd.push((time.time().clone(), element));
}
});
recvd.sort();
let count = recvd.iter().filter(|&(ref time, _)| !input.frontier().less_equal(time)).count();
let iter = recvd.drain(..count);
if let Some(recv_queue) = recv_weak.upgrade() {
recv_queue.borrow_mut().extend(iter.map(|(_time,elem)| elem));
}
}
);
});
Sequencer { send, recv, }
}
pub fn push(&mut self, element: T) {
self.send.borrow_mut().push_back(element);
}
pub fn next(&mut self) -> Option<T> {
self.recv.borrow_mut().pop_front()
}
}