use crate::dataflow::Stream;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::Container;
use crate::progress::ChangeBatch;
use crate::progress::Timestamp;
use super::{Event, EventPusher};
pub trait Capture<T: Timestamp, C: Container> : Sized {
fn capture_into<P: EventPusher<T, C>+'static>(self, pusher: P);
fn capture(self) -> ::std::sync::mpsc::Receiver<Event<T, C>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}
impl<T: Timestamp, C: Container> Capture<T, C> for Stream<'_, T, C> {
fn capture_into<P: EventPusher<T, C>+'static>(self, mut event_pusher: P) {
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
let mut started = false;
builder.build(
move |progress| {
if !started {
progress.frontiers[0].update(Timestamp::minimum(), -1);
started = true;
}
if !progress.frontiers[0].is_empty() {
let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
event_pusher.push(Event::Progress(to_send.into_inner().to_vec()));
}
while let Some(message) = input.next() {
let time = &message.time;
let data = &mut message.data;
let vector = std::mem::take(data);
event_pusher.push(Event::Messages(time.clone(), vector));
}
input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
false
}
);
}
}