use crate::Data;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;
use super::Event;
use super::event::EventIterator;
pub trait Replay<T: Timestamp, D: Data> {
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>;
}
impl<T: Timestamp, D: Data, I> Replay<T, D> for I
where I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, D>+'static {
fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>{
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
let address = builder.operator_info().address;
let activator = scope.activator_for(&address[..]);
let (targets, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(targets));
let mut event_streams = self.into_iter().collect::<Vec<_>>();
let mut started = false;
builder.build(
move |_frontier| { },
move |_consumed, internal, produced| {
if !started {
internal[0].update(Default::default(), (event_streams.len() as i64) - 1);
started = true;
}
for event_stream in event_streams.iter_mut() {
while let Some(event) = event_stream.next() {
match *event {
Event::Progress(ref vec) => {
internal[0].extend(vec.iter().cloned());
},
Event::Messages(ref time, ref data) => {
output.session(time).give_iterator(data.iter().cloned());
}
}
}
}
activator.activate();
output.cease();
output.inner().produced().borrow_mut().drain_into(&mut produced[0]);
false
}
);
stream
}
}