Skip to main content

dbsp/operator/
inspect.rs

1//! Defines a sink operator that inspects every element of its input stream by
2//! applying a user-provided callback to it.
3
4use crate::circuit::{
5    Circuit, Scope, Stream,
6    operator_traits::{Operator, UnaryOperator},
7};
8use std::{borrow::Cow, marker::PhantomData};
9
10impl<C, D> Stream<C, D>
11where
12    D: Clone + 'static,
13    C: Circuit,
14{
15    /// Apply [`Inspect`] operator to `self`.
16    ///
17    /// # Examples
18    ///
19    /// ```
20    /// # use dbsp::{
21    /// #     operator::Generator,
22    /// #     Circuit, RootCircuit,
23    /// # };
24    /// let circuit = RootCircuit::build(move |circuit| {
25    ///     let mut n = 1;
26    ///     let stream = circuit.add_source(Generator::new(move || {
27    ///         let res = n;
28    ///         n += 1;
29    ///         res
30    ///     }));
31    ///     // Print all values in `stream`.
32    ///     stream.inspect(|n| println!("inspect: {}", n));
33    ///     Ok(())
34    /// })
35    /// .unwrap();
36    /// ```
37    #[track_caller]
38    pub fn inspect<F>(&self, callback: F) -> Self
39    where
40        F: FnMut(&D) + 'static,
41    {
42        let inspected = self
43            .circuit()
44            .add_unary_operator(Inspect::new(callback), &self.try_sharded_version());
45        inspected.mark_sharded_if(self);
46        inspected
47    }
48}
49
50/// Sink operator that consumes a stream of values of type `T` and
51/// applies a user-provided callback to each input.
52pub struct Inspect<T, F> {
53    callback: F,
54    phantom: PhantomData<T>,
55}
56
57impl<T, F> Inspect<T, F>
58where
59    F: FnMut(&T),
60{
61    /// Create a new instance of the `Inspect` operator that will apply
62    /// `callback` to each value in the input stream.
63    pub fn new(callback: F) -> Self {
64        Self {
65            callback,
66            phantom: PhantomData,
67        }
68    }
69}
70
71impl<T, F> Operator for Inspect<T, F>
72where
73    T: 'static,
74    F: FnMut(&T) + 'static,
75{
76    fn name(&self) -> Cow<'static, str> {
77        Cow::from("Inspect")
78    }
79
80    fn fixedpoint(&self, _scope: Scope) -> bool {
81        true
82    }
83}
84
85impl<T, F> UnaryOperator<T, T> for Inspect<T, F>
86where
87    T: Clone + 'static,
88    F: FnMut(&T) + 'static,
89{
90    async fn eval(&mut self, i: &T) -> T {
91        (self.callback)(i);
92        i.clone()
93    }
94    async fn eval_owned(&mut self, i: T) -> T {
95        (self.callback)(&i);
96        i
97    }
98}