timely/dataflow/operators/inspect.rs
1//! Extension trait and implementation for observing and action on streamed data.
2
3use std::rc::Rc;
4use timely_container::columnation::{Columnation, TimelyStack};
5use crate::Container;
6use crate::Data;
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::{Scope, StreamCore};
9use crate::dataflow::operators::generic::Operator;
10
11/// Methods to inspect records and batches of records on a stream.
12pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
13 /// Runs a supplied closure on each observed data element.
14 ///
15 /// # Examples
16 /// ```
17 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
18 ///
19 /// timely::example(|scope| {
20 /// (0..10).to_stream(scope)
21 /// .inspect(|x| println!("seen: {:?}", x));
22 /// });
23 /// ```
24 fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self {
25 self.inspect_batch(move |_, data| {
26 for datum in data.iter() { func(datum); }
27 })
28 }
29
30 /// Runs a supplied closure on each observed data element and associated time.
31 ///
32 /// # Examples
33 /// ```
34 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
35 ///
36 /// timely::example(|scope| {
37 /// (0..10).to_stream(scope)
38 /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
39 /// });
40 /// ```
41 fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self {
42 self.inspect_batch(move |time, data| {
43 for datum in data.iter() {
44 func(&time, &datum);
45 }
46 })
47 }
48
49 /// Runs a supplied closure on each observed data batch (time and data slice).
50 ///
51 /// # Examples
52 /// ```
53 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
54 ///
55 /// timely::example(|scope| {
56 /// (0..10).to_stream(scope)
57 /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
58 /// });
59 /// ```
60 fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self {
61 self.inspect_core(move |event| {
62 if let Ok((time, data)) = event {
63 func(time, data);
64 }
65 })
66 }
67
68 /// Runs a supplied closure on each observed data batch, and each frontier advancement.
69 ///
70 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
71 /// and `Err` for frontiers. Frontiers are only presented when they change.
72 ///
73 /// # Examples
74 /// ```
75 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
76 ///
77 /// timely::example(|scope| {
78 /// (0..10).to_stream(scope)
79 /// .inspect_core(|event| {
80 /// match event {
81 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
82 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
83 /// }
84 /// });
85 /// });
86 /// ```
87 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static;
88}
89
90impl<G: Scope, D: Data> Inspect<G, Vec<D>> for StreamCore<G, Vec<D>> {
91 fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
92 self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
93 }
94}
95
96impl<G: Scope, D: Data+Columnation> Inspect<G, TimelyStack<D>> for StreamCore<G, TimelyStack<D>> {
97 fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
98 self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
99 }
100}
101
102impl<G: Scope, C: Container> Inspect<G, Rc<C>> for StreamCore<G, Rc<C>>
103 where C: AsRef<[C::Item]>
104{
105 fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static {
106 self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref()))))
107 }
108}
109
110/// Inspect containers
111pub trait InspectCore<G: Scope, C: Container> {
112 /// Runs a supplied closure on each observed container, and each frontier advancement.
113 ///
114 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
115 /// and `Err` for frontiers. Frontiers are only presented when they change.
116 ///
117 /// # Examples
118 /// ```
119 /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
120 ///
121 /// timely::example(|scope| {
122 /// (0..10).to_stream(scope)
123 /// .inspect_container(|event| {
124 /// match event {
125 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
126 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
127 /// }
128 /// });
129 /// });
130 /// ```
131 fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
132}
133
134impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
135
136 fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
137 where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
138 {
139 use crate::progress::timestamp::Timestamp;
140 let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
141 let mut vector = Default::default();
142 self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
143 if input.frontier.frontier() != frontier.borrow() {
144 frontier.clear();
145 frontier.extend(input.frontier.frontier().iter().cloned());
146 func(Err(frontier.elements()));
147 }
148 input.for_each(|time, data| {
149 data.swap(&mut vector);
150 func(Ok((&time, &vector)));
151 output.session(&time).give_container(&mut vector);
152 });
153 })
154 }
155}