pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sizedwhere
for<'a> &'a C: IntoIterator,{
// Required method
fn inspect_core<F>(self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static;
// Provided methods
fn inspect<F>(self, func: F) -> Self
where F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static { ... }
fn inspect_time<F>(self, func: F) -> Self
where F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static { ... }
fn inspect_batch(
self,
func: impl FnMut(&G::Timestamp, &C) + 'static,
) -> Self { ... }
}Expand description
Methods to inspect records and batches of records on a stream.
Required Methods§
Sourcefn inspect_core<F>(self, func: F) -> Self
fn inspect_core<F>(self, func: F) -> Self
Runs a supplied closure on each observed data batch, and each frontier advancement.
Rust’s Result type is used to distinguish the events, with Ok for time and data,
and Err for frontiers. Frontiers are only presented when they change.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect_core(|event| {
match event {
Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
Err(frontier) => println!("frontier advanced to {:?}", frontier),
}
});
});Provided Methods§
Sourcefn inspect<F>(self, func: F) -> Self
fn inspect<F>(self, func: F) -> Self
Runs a supplied closure on each observed data element.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
});Examples found in repository?
More examples
examples/threadless.rs (line 20)
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}examples/capture_recv.rs (line 24)
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10 // create replayers from disjoint partition of source worker identifiers.
11 let replayers =
12 (0 .. source_peers)
13 .filter(|i| i % worker.peers() == worker.index())
14 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15 .collect::<Vec<_>>()
16 .into_iter()
17 .map(|l| l.incoming().next().unwrap().unwrap())
18 .map(EventReader::<_,Vec<u64>,_>::new)
19 .collect::<Vec<_>>();
20
21 worker.dataflow::<u64,_,_>(|scope| {
22 replayers
23 .replay_into(scope)
24 .inspect(|x| println!("replayed: {:?}", x));
25 })
26 }).unwrap(); // asserts error-free execution
27}examples/rc.rs (line 21)
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}examples/hello.rs (line 17)
4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}examples/logging-recv.rs (line 27)
8fn main() {
9 timely::execute_from_args(std::env::args(), |worker| {
10
11 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
12
13 // create replayers from disjoint partition of source worker identifiers.
14 let replayers =
15 (0 .. source_peers)
16 .filter(|i| i % worker.peers() == worker.index())
17 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
18 .collect::<Vec<_>>()
19 .into_iter()
20 .map(|l| l.incoming().next().unwrap().unwrap())
21 .map(EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new)
22 .collect::<Vec<_>>();
23
24 worker.dataflow(|scope| {
25 replayers
26 .replay_into(scope)
27 .inspect(|x| println!("replayed: {:?}", x));
28 })
29 }).unwrap(); // asserts error-free execution
30}Additional examples can be found in:
Sourcefn inspect_time<F>(self, func: F) -> Self
fn inspect_time<F>(self, func: F) -> Self
Runs a supplied closure on each observed data element and associated time.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
});Examples found in repository?
examples/flow_controlled.rs (line 29)
4fn main() {
5 timely::execute_from_args(std::env::args(), |worker| {
6 let mut input = (0u64..100000).peekable();
7 worker.dataflow(|scope| {
8 let probe_handle = probe::Handle::new();
9 let probe_handle_2 = probe_handle.clone();
10
11 iterator_source(
12 scope,
13 "Source",
14 move |prev_t| {
15 if let Some(first_x) = input.peek().cloned() {
16 let next_t = first_x / 100 * 100;
17 Some(IteratorSourceInput {
18 lower_bound: Default::default(),
19 data: vec![
20 (next_t,
21 input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
22 target: *prev_t,
23 })
24 } else {
25 None
26 }
27 },
28 probe_handle_2)
29 .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
30 .probe_with(&probe_handle);
31 });
32 }).unwrap();
33}Sourcefn inspect_batch(self, func: impl FnMut(&G::Timestamp, &C) + 'static) -> Self
fn inspect_batch(self, func: impl FnMut(&G::Timestamp, &C) + 'static) -> Self
Runs a supplied closure on each observed data batch (time and data slice).
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
});Examples found in repository?
examples/unordered_input.rs (line 8)
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.