use crate::dataflow::{Scope, Stream};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;
use super::Event;
use super::event::EventIterator;
use crate::Container;
use crate::dataflow::channels::Message;
pub trait Replay<T: Timestamp, C> : Sized {
fn replay_into<'scope>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>;
}
impl<T: Timestamp, C: Container+Clone, I> Replay<T, C> for I
where
I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, C>+'static,
{
fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>{
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope);
let address = builder.operator_info().address;
let activator = scope.activator_for(address);
let (targets, stream) = builder.new_output();
let mut output = PushCounter::new(targets);
let mut event_streams = self.into_iter().collect::<Vec<_>>();
let mut started = false;
let mut allocation: C = Default::default();
builder.build(
move |progress| {
if !started {
progress.internals[0].update(T::minimum(), (event_streams.len() as i64) - 1);
started = true;
}
for event_stream in event_streams.iter_mut() {
while let Some(event) = event_stream.next() {
use std::borrow::Cow::*;
match event {
Owned(Event::Progress(vec)) => {
progress.internals[0].extend(vec.into_iter());
},
Owned(Event::Messages(time, data)) => {
output.push(&mut Some(Message::new(time, data)));
}
Borrowed(Event::Progress(vec)) => {
progress.internals[0].extend(vec.iter().cloned());
},
Borrowed(Event::Messages(time, data)) => {
allocation.clone_from(data);
Message::push_at(&mut allocation, time.clone(), &mut output);
}
}
}
}
if let Some(delay) = period {
activator.activate_after(delay);
}
use timely_communication::Push;
output.done();
output.produced().borrow_mut().drain_into(&mut progress.produceds[0]);
false
}
);
stream
}
}