Trait timely::dataflow::operators::capture::Capture [] [src]

pub trait Capture<T: Timestamp, D: Data> {
    fn capture_into<P: EventPusher<T, D> + 'static>(&self, pusher: P);

    fn capture(&self) -> Receiver<Event<T, D>> { ... }

Capture a stream of timestamped data for later replay.

Required Methods

Captures a stream of timestamped data for later replay.


The type Rc<EventLink<T,D>> implements a typed linked list, and can be captured into and replayed from.

use std::rc::Rc;
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventLink, Replay, Extract};

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

timely::execute(timely::Configuration::Thread, move |worker| {

    // this is only to validate the output.
    let send = send.lock().unwrap().clone();

    // these are to capture/replay the stream.
    let handle1 = Rc::new(EventLink::new());
    let handle2 = handle1.clone();


    worker.dataflow(|scope2| {

assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());

The types EventWriter<T, D, W> and EventReader<T, D, R> can be captured into and replayed from, respectively. They use binary writers and readers respectively, and can be backed by files, network sockets, etc.

use std::rc::Rc;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};

// get send and recv endpoints, wrap send to share
let (send0, recv0) = ::std::sync::mpsc::channel();
let send0 = Arc::new(Mutex::new(send0));

timely::execute(timely::Configuration::Thread, move |worker| {
    // this is only to validate the output.
    let send0 = send0.lock().unwrap().clone();
    // these allow us to capture / replay a timely stream.
    let list = TcpListener::bind("").unwrap();
    let send = TcpStream::connect("").unwrap();
    let recv = list.incoming().next().unwrap().unwrap();


    worker.dataflow::<u64,_,_>(|scope2| {

assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());

Provided Methods

Captures a stream using Rust's MPSC channels.
