use std::rc::Rc;
use std::cell::RefCell;
use crate::Data;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::ChangeBatch;
use crate::progress::Timestamp;
use super::{Event, EventPusher};
pub trait Capture<T: Timestamp, D: Data> {
fn capture_into<P: EventPusher<T, D>+'static>(&self, pusher: P);
fn capture(&self) -> ::std::sync::mpsc::Receiver<Event<T, D>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}
impl<S: Scope, D: Data> Capture<S::Timestamp, D> for Stream<S, D> {
fn capture_into<P: EventPusher<S::Timestamp, D>+'static>(&self, event_pusher: P) {
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
let mut started = false;
let event_pusher1 = Rc::new(RefCell::new(event_pusher));
let event_pusher2 = event_pusher1.clone();
builder.build(
move |frontier| {
if !started {
frontier[0].update(Default::default(), -1);
started = true;
}
if !frontier[0].is_empty() {
let to_send = ::std::mem::replace(&mut frontier[0], ChangeBatch::new());
event_pusher1.borrow_mut().push(Event::Progress(to_send.into_inner()));
}
},
move |consumed, _internal, _external| {
use crate::communication::message::RefOrMut;
let mut borrow = event_pusher2.borrow_mut();
while let Some(message) = input.next() {
let (time, data) = match message.as_ref_or_mut() {
RefOrMut::Ref(reference) => (&reference.time, RefOrMut::Ref(&reference.data)),
RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)),
};
let vector = data.replace(Vec::new());
borrow.push(Event::Messages(time.clone(), vector));
}
input.consumed().borrow_mut().drain_into(&mut consumed[0]);
false
}
);
}
}