1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//! Defines a sink operator that inspects every element of its input stream by
//! applying a user-provided callback to it.
use crate::circuit::{
Circuit, Scope, Stream,
operator_traits::{Operator, UnaryOperator},
};
use std::{borrow::Cow, marker::PhantomData};
impl<C, D> Stream<C, D>
where
D: Clone + 'static,
C: Circuit,
{
/// Apply [`Inspect`] operator to `self`.
///
/// # Examples
///
/// ```
/// # use dbsp::{
/// # operator::Generator,
/// # Circuit, RootCircuit,
/// # };
/// let circuit = RootCircuit::build(move |circuit| {
/// let mut n = 1;
/// let stream = circuit.add_source(Generator::new(move || {
/// let res = n;
/// n += 1;
/// res
/// }));
/// // Print all values in `stream`.
/// stream.inspect(|n| println!("inspect: {}", n));
/// Ok(())
/// })
/// .unwrap();
/// ```
#[track_caller]
pub fn inspect<F>(&self, callback: F) -> Self
where
F: FnMut(&D) + 'static,
{
let inspected = self
.circuit()
.add_unary_operator(Inspect::new(callback), &self.try_sharded_version());
inspected.mark_sharded_if(self);
inspected
}
}
/// Sink operator that consumes a stream of values of type `T` and
/// applies a user-provided callback to each input.
pub struct Inspect<T, F> {
callback: F,
phantom: PhantomData<T>,
}
impl<T, F> Inspect<T, F>
where
F: FnMut(&T),
{
/// Create a new instance of the `Inspect` operator that will apply
/// `callback` to each value in the input stream.
pub fn new(callback: F) -> Self {
Self {
callback,
phantom: PhantomData,
}
}
}
impl<T, F> Operator for Inspect<T, F>
where
T: 'static,
F: FnMut(&T) + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("Inspect")
}
fn fixedpoint(&self, _scope: Scope) -> bool {
true
}
}
impl<T, F> UnaryOperator<T, T> for Inspect<T, F>
where
T: Clone + 'static,
F: FnMut(&T) + 'static,
{
async fn eval(&mut self, i: &T) -> T {
(self.callback)(i);
i.clone()
}
async fn eval_owned(&mut self, i: T) -> T {
(self.callback)(&i);
i
}
}