Trait timely::dataflow::operators::capture::Capture
[−]
[src]
pub trait Capture<T: Timestamp, D: Data> { fn capture_into<P: EventPusher<T, D> + 'static>(&self, pusher: P); fn capture(&self) -> Receiver<Event<T, D>> { ... } }
Capture a stream of timestamped data for later replay.
Required Methods
fn capture_into<P: EventPusher<T, D> + 'static>(&self, pusher: P)
Captures a stream of timestamped data for later replay.
Examples
The type Rc<EventLink<T,D>>
implements a typed linked list,
and can be captured into and replayed from.
use std::rc::Rc; use std::sync::{Arc, Mutex}; use timely::dataflow::Scope; use timely::dataflow::operators::{Capture, ToStream, Inspect}; use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; // get send and recv endpoints, wrap send to share let (send, recv) = ::std::sync::mpsc::channel(); let send = Arc::new(Mutex::new(send)); timely::execute(timely::Configuration::Thread, move |worker| { // this is only to validate the output. let send = send.lock().unwrap().clone(); // these are to capture/replay the stream. let handle1 = Rc::new(EventLink::new()); let handle2 = handle1.clone(); worker.dataflow::<u64,_,_>(|scope1| (0..10).to_stream(scope1) .capture_into(handle1) ); worker.dataflow(|scope2| { handle2.replay_into(scope2) .capture_into(send) }); }).unwrap(); assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());
The types EventWriter<T, D, W>
and EventReader<T, D, R>
can be
captured into and replayed from, respectively. They use binary writers
and readers respectively, and can be backed by files, network sockets,
etc.
use std::rc::Rc; use std::net::{TcpListener, TcpStream}; use std::sync::{Arc, Mutex}; use timely::dataflow::Scope; use timely::dataflow::operators::{Capture, ToStream, Inspect}; use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract}; // get send and recv endpoints, wrap send to share let (send0, recv0) = ::std::sync::mpsc::channel(); let send0 = Arc::new(Mutex::new(send0)); timely::execute(timely::Configuration::Thread, move |worker| { // this is only to validate the output. let send0 = send0.lock().unwrap().clone(); // these allow us to capture / replay a timely stream. let list = TcpListener::bind("127.0.0.1:8000").unwrap(); let send = TcpStream::connect("127.0.0.1:8000").unwrap(); let recv = list.incoming().next().unwrap().unwrap(); worker.dataflow::<u64,_,_>(|scope1| (0..10u64) .to_stream(scope1) .capture_into(EventWriter::new(send)) ); worker.dataflow::<u64,_,_>(|scope2| { EventReader::<_,u64,_>::new(recv) .replay_into(scope2) .capture_into(send0) }); }).unwrap(); assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());