timely/dataflow/operators/
inspect.rs

1//! Extension trait and implementation for observing and action on streamed data.
2
3use std::rc::Rc;
4use timely_container::columnation::{Columnation, TimelyStack};
5use crate::Container;
6use crate::Data;
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::{Scope, StreamCore};
9use crate::dataflow::operators::generic::Operator;
10
11/// Methods to inspect records and batches of records on a stream.
12pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
13    /// Runs a supplied closure on each observed data element.
14    ///
15    /// # Examples
16    /// ```
17    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
18    ///
19    /// timely::example(|scope| {
20    ///     (0..10).to_stream(scope)
21    ///            .inspect(|x| println!("seen: {:?}", x));
22    /// });
23    /// ```
24    fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self {
25        self.inspect_batch(move |_, data| {
26            for datum in data.iter() { func(datum); }
27        })
28    }
29
30    /// Runs a supplied closure on each observed data element and associated time.
31    ///
32    /// # Examples
33    /// ```
34    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
35    ///
36    /// timely::example(|scope| {
37    ///     (0..10).to_stream(scope)
38    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
39    /// });
40    /// ```
41    fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self {
42        self.inspect_batch(move |time, data| {
43            for datum in data.iter() {
44                func(&time, &datum);
45            }
46        })
47    }
48
49    /// Runs a supplied closure on each observed data batch (time and data slice).
50    ///
51    /// # Examples
52    /// ```
53    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
54    ///
55    /// timely::example(|scope| {
56    ///     (0..10).to_stream(scope)
57    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
58    /// });
59    /// ```
60    fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self {
61        self.inspect_core(move |event| {
62            if let Ok((time, data)) = event {
63                func(time, data);
64            }
65        })
66    }
67
68    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
69    ///
70    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
71    /// and `Err` for frontiers. Frontiers are only presented when they change.
72    ///
73    /// # Examples
74    /// ```
75    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
76    ///
77    /// timely::example(|scope| {
78    ///     (0..10).to_stream(scope)
79    ///            .inspect_core(|event| {
80    ///                match event {
81    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
82    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
83    ///                }
84    ///             });
85    /// });
86    /// ```
87    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static;
88}
89
90impl<G: Scope, D: Data> Inspect<G, Vec<D>> for StreamCore<G, Vec<D>> {
91    fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
92        self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
93    }
94}
95
96impl<G: Scope, D: Data+Columnation> Inspect<G, TimelyStack<D>> for StreamCore<G, TimelyStack<D>> {
97    fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
98        self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
99    }
100}
101
102impl<G: Scope, C: Container> Inspect<G, Rc<C>> for StreamCore<G, Rc<C>>
103    where C: AsRef<[C::Item]>
104{
105    fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static {
106        self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref()))))
107    }
108}
109
110/// Inspect containers
111pub trait InspectCore<G: Scope, C: Container> {
112    /// Runs a supplied closure on each observed container, and each frontier advancement.
113    ///
114    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
115    /// and `Err` for frontiers. Frontiers are only presented when they change.
116    ///
117    /// # Examples
118    /// ```
119    /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
120    ///
121    /// timely::example(|scope| {
122    ///     (0..10).to_stream(scope)
123    ///            .inspect_container(|event| {
124    ///                match event {
125    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
126    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
127    ///                }
128    ///             });
129    /// });
130    /// ```
131    fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
132}
133
134impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
135
136    fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
137        where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
138    {
139        use crate::progress::timestamp::Timestamp;
140        let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
141        let mut vector = Default::default();
142        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
143            if input.frontier.frontier() != frontier.borrow() {
144                frontier.clear();
145                frontier.extend(input.frontier.frontier().iter().cloned());
146                func(Err(frontier.elements()));
147            }
148            input.for_each(|time, data| {
149                data.swap(&mut vector);
150                func(Ok((&time, &vector)));
151                output.session(&time).give_container(&mut vector);
152            });
153        })
154    }
155}