use crate::Data;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;
use super::Event;
use super::event::EventIterator;
pub trait Replay<T: Timestamp, D: Data> : Sized {
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> Stream<S, D>;
}
impl<T: Timestamp, D: Data, I> Replay<T, D> for I
where I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, D>+'static {
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> Stream<S, D>{
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
let address = builder.operator_info().address;
let activator = scope.activator_for(&address[..]);
let (targets, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(targets));
let mut event_streams = self.into_iter().collect::<Vec<_>>();
let mut started = false;
builder.build(
move |progress| {
if !started {
progress.internals[0].update(S::Timestamp::minimum(), (event_streams.len() as i64) - 1);
started = true;
}
for event_stream in event_streams.iter_mut() {
while let Some(event) = event_stream.next() {
match *event {
Event::Progress(ref vec) => {
progress.internals[0].extend(vec.iter().cloned());
},
Event::Messages(ref time, ref data) => {
output.session(time).give_iterator(data.iter().cloned());
}
}
}
}
if let Some(delay) = period {
activator.activate_after(delay);
}
output.cease();
output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]);
false
}
);
stream
}
}