Skip to main content

Inspect

Trait Inspect 

Source
pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
where for<'a> &'a C: IntoIterator,
{ // Required method fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static; // Provided methods fn inspect<F>(self, func: F) -> Self where F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static { ... } fn inspect_time<F>(self, func: F) -> Self where F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static { ... } fn inspect_batch( self, func: impl FnMut(&G::Timestamp, &C) + 'static, ) -> Self { ... } }
Expand description

Methods to inspect records and batches of records on a stream.

Required Methods§

Source

fn inspect_core<F>(self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,

Runs a supplied closure on each observed data batch, and each frontier advancement.

Rust’s Result type is used to distinguish the events, with Ok for time and data, and Err for frontiers. Frontiers are only presented when they change.

§Examples
use timely::dataflow::operators::{ToStream, Inspect};

timely::example(|scope| {
    (0..10).to_stream(scope)
           .container::<Vec<_>>()
           .inspect_core(|event| {
               match event {
                   Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
                   Err(frontier) => println!("frontier advanced to {:?}", frontier),
               }
            });
});

Provided Methods§

Source

fn inspect<F>(self, func: F) -> Self
where F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,

Runs a supplied closure on each observed data element.

§Examples
use timely::dataflow::operators::{ToStream, Inspect};

timely::example(|scope| {
    (0..10).to_stream(scope)
           .container::<Vec<_>>()
           .inspect(|x| println!("seen: {:?}", x));
});
Examples found in repository?
examples/simple.rs (line 6)
3fn main() {
4    timely::example(|scope| {
5        (0..10).to_stream(scope)
6               .inspect(|x| println!("seen: {:?}", x));
7    });
8}
More examples
Hide additional examples
examples/threadless.rs (line 20)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
examples/capture_recv.rs (line 24)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10        // create replayers from disjoint partition of source worker identifiers.
11        let replayers =
12        (0 .. source_peers)
13            .filter(|i| i % worker.peers() == worker.index())
14            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15            .collect::<Vec<_>>()
16            .into_iter()
17            .map(|l| l.incoming().next().unwrap().unwrap())
18            .map(EventReader::<_,Vec<u64>,_>::new)
19            .collect::<Vec<_>>();
20
21        worker.dataflow::<u64,_,_>(|scope| {
22            replayers
23                .replay_into(scope)
24                .inspect(|x| println!("replayed: {:?}", x));
25        })
26    }).unwrap(); // asserts error-free execution
27}
examples/rc.rs (line 21)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 17)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/logging-recv.rs (line 27)
8fn main() {
9    timely::execute_from_args(std::env::args(), |worker| {
10
11        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
12
13        // create replayers from disjoint partition of source worker identifiers.
14        let replayers =
15        (0 .. source_peers)
16            .filter(|i| i % worker.peers() == worker.index())
17            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
18            .collect::<Vec<_>>()
19            .into_iter()
20            .map(|l| l.incoming().next().unwrap().unwrap())
21            .map(EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new)
22            .collect::<Vec<_>>();
23
24        worker.dataflow(|scope| {
25            replayers
26                .replay_into(scope)
27                .inspect(|x| println!("replayed: {:?}", x));
28        })
29    }).unwrap(); // asserts error-free execution
30}
Source

fn inspect_time<F>(self, func: F) -> Self
where F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,

Runs a supplied closure on each observed data element and associated time.

§Examples
use timely::dataflow::operators::{ToStream, Inspect};

timely::example(|scope| {
    (0..10).to_stream(scope)
           .container::<Vec<_>>()
           .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
});
Examples found in repository?
examples/flow_controlled.rs (line 29)
4fn main() {
5    timely::execute_from_args(std::env::args(), |worker| {
6        let mut input = (0u64..100000).peekable();
7        worker.dataflow(|scope| {
8            let probe_handle = probe::Handle::new();
9            let probe_handle_2 = probe_handle.clone();
10
11            iterator_source(
12                scope,
13                "Source",
14                move |prev_t| {
15                    if let Some(first_x) = input.peek().cloned() {
16                        let next_t = first_x / 100 * 100;
17                        Some(IteratorSourceInput {
18                            lower_bound: Default::default(),
19                            data: vec![
20                                (next_t,
21                                 input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
22                            target: *prev_t,
23                        })
24                    } else {
25                        None
26                    }
27                },
28                probe_handle_2)
29            .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
30            .probe_with(&probe_handle);
31        });
32    }).unwrap();
33}
Source

fn inspect_batch(self, func: impl FnMut(&G::Timestamp, &C) + 'static) -> Self

Runs a supplied closure on each observed data batch (time and data slice).

§Examples
use timely::dataflow::operators::{ToStream, Inspect};

timely::example(|scope| {
    (0..10).to_stream(scope)
           .container::<Vec<_>>()
           .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
});
Examples found in repository?
examples/unordered_input.rs (line 8)
4fn main() {
5    timely::execute(Config::thread(), |worker| {
6        let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7            let (input, stream) = scope.new_unordered_input();
8            stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9            input
10        });
11
12        for round in 0..10 {
13            input.activate().session(&cap).give(round);
14            cap = cap.delayed(&(round + 1));
15            worker.step();
16        }
17    }).unwrap();
18}

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<G: Scope, C: Container> Inspect<G, C> for Stream<G, C>
where for<'a> &'a C: IntoIterator,