dbsp/operator/inspect.rs
1//! Defines a sink operator that inspects every element of its input stream by
2//! applying a user-provided callback to it.
3
4use crate::circuit::{
5 Circuit, Scope, Stream,
6 operator_traits::{Operator, UnaryOperator},
7};
8use std::{borrow::Cow, marker::PhantomData};
9
10impl<C, D> Stream<C, D>
11where
12 D: Clone + 'static,
13 C: Circuit,
14{
15 /// Apply [`Inspect`] operator to `self`.
16 ///
17 /// # Examples
18 ///
19 /// ```
20 /// # use dbsp::{
21 /// # operator::Generator,
22 /// # Circuit, RootCircuit,
23 /// # };
24 /// let circuit = RootCircuit::build(move |circuit| {
25 /// let mut n = 1;
26 /// let stream = circuit.add_source(Generator::new(move || {
27 /// let res = n;
28 /// n += 1;
29 /// res
30 /// }));
31 /// // Print all values in `stream`.
32 /// stream.inspect(|n| println!("inspect: {}", n));
33 /// Ok(())
34 /// })
35 /// .unwrap();
36 /// ```
37 #[track_caller]
38 pub fn inspect<F>(&self, callback: F) -> Self
39 where
40 F: FnMut(&D) + 'static,
41 {
42 let inspected = self
43 .circuit()
44 .add_unary_operator(Inspect::new(callback), &self.try_sharded_version());
45 inspected.mark_sharded_if(self);
46 inspected
47 }
48}
49
50/// Sink operator that consumes a stream of values of type `T` and
51/// applies a user-provided callback to each input.
52pub struct Inspect<T, F> {
53 callback: F,
54 phantom: PhantomData<T>,
55}
56
57impl<T, F> Inspect<T, F>
58where
59 F: FnMut(&T),
60{
61 /// Create a new instance of the `Inspect` operator that will apply
62 /// `callback` to each value in the input stream.
63 pub fn new(callback: F) -> Self {
64 Self {
65 callback,
66 phantom: PhantomData,
67 }
68 }
69}
70
71impl<T, F> Operator for Inspect<T, F>
72where
73 T: 'static,
74 F: FnMut(&T) + 'static,
75{
76 fn name(&self) -> Cow<'static, str> {
77 Cow::from("Inspect")
78 }
79
80 fn fixedpoint(&self, _scope: Scope) -> bool {
81 true
82 }
83}
84
85impl<T, F> UnaryOperator<T, T> for Inspect<T, F>
86where
87 T: Clone + 'static,
88 F: FnMut(&T) + 'static,
89{
90 async fn eval(&mut self, i: &T) -> T {
91 (self.callback)(i);
92 i.clone()
93 }
94 async fn eval_owned(&mut self, i: T) -> T {
95 (self.callback)(&i);
96 i
97 }
98}