Skip to main content

dbsp/operator/
apply.rs

1//! Operator that applies an arbitrary function to its input.
2
3use crate::circuit::{
4    Circuit, OwnershipPreference, Scope, Stream,
5    metadata::OperatorLocation,
6    operator_traits::{Data, Operator, UnaryOperator},
7};
8use std::{borrow::Cow, panic::Location};
9
10impl<C, T1> Stream<C, T1>
11where
12    C: Circuit,
13    T1: Clone + 'static,
14{
15    /// Returns a stream that contains `func(&x)` for each `x` in `self`.
16    /// `func` cannot mutate captured state.
17    ///
18    /// The operator will have a generic name for debugging and profiling
19    /// purposes.  Use [`Stream::apply_named`], instead, to give it a
20    /// specific name.
21    #[track_caller]
22    pub fn apply<F, T2>(&self, func: F) -> Stream<C, T2>
23    where
24        F: Fn(&T1) -> T2 + 'static,
25        T2: Clone + 'static,
26    {
27        self.circuit().add_unary_operator(
28            Apply::<_, true>::new(func, Cow::Borrowed("Apply"), Location::caller()),
29            self,
30        )
31    }
32
33    /// Returns a stream that contains `func(&x)` for each `x` in `self`.
34    /// `func` can access and mutate captured state.
35    ///
36    /// The operator will have a generic name for debugging and profiling
37    /// purposes.  Use [`Stream::apply_mut_named`], instead, to give it a
38    /// specific name.
39    #[track_caller]
40    pub fn apply_mut<F, T2>(&self, func: F) -> Stream<C, T2>
41    where
42        F: FnMut(&T1) -> T2 + 'static,
43        T2: Clone + 'static,
44    {
45        self.circuit().add_unary_operator(
46            Apply::<_, false>::new(func, Cow::Borrowed("ApplyMut"), Location::caller()),
47            self,
48        )
49    }
50
51    /// Returns a stream that contains `func(&x)` for each `x` in `self`, giving
52    /// the operator the given `name` for debugging and profiling purposes.
53    /// `func` cannot mutate captured state.
54    #[track_caller]
55    pub fn apply_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
56    where
57        N: Into<Cow<'static, str>>,
58        F: Fn(&T1) -> T2 + 'static,
59        T2: Clone + 'static,
60    {
61        self.circuit().add_unary_operator(
62            Apply::<_, true>::new(func, name.into(), Location::caller()),
63            self,
64        )
65    }
66
67    /// Returns a stream that contains `func(&x)` for each `x` in `self`, giving
68    /// the operator the given `name` for debugging and profiling purposes.
69    /// `func` can access and mutate captured state.
70    #[track_caller]
71    pub fn apply_mut_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
72    where
73        N: Into<Cow<'static, str>>,
74        F: Fn(&T1) -> T2 + 'static,
75        T2: Clone + 'static,
76    {
77        self.circuit().add_unary_operator(
78            Apply::<_, false>::new(func, name.into(), Location::caller()),
79            self,
80        )
81    }
82
83    /// Returns a stream that contains `func(x)` for each `x` in `self`.
84    /// `func` cannot mutate captured state.
85    ///
86    /// The operator will have a generic name for debugging and profiling
87    /// purposes.  Use [`Stream::apply_owned_named`], instead, to give it a
88    /// specific name.
89    #[track_caller]
90    pub fn apply_owned<F, T2>(&self, func: F) -> Stream<C, T2>
91    where
92        F: Fn(T1) -> T2 + 'static,
93        T2: Clone + 'static,
94    {
95        self.circuit().add_unary_operator(
96            ApplyOwned::<_, true>::new(func, Cow::Borrowed("ApplyOwned"), Location::caller()),
97            self,
98        )
99    }
100
101    /// Returns a stream that contains `func(x)` for each `x` in `self`.
102    /// `func` can access and mutate captured state.
103    ///
104    /// The operator will have a generic name for debugging and profiling
105    /// purposes.  Use [`Stream::apply_mut_owned_named`], instead, to give it a
106    /// specific name.
107    #[track_caller]
108    pub fn apply_mut_owned<F, T2>(&self, func: F) -> Stream<C, T2>
109    where
110        F: FnMut(T1) -> T2 + 'static,
111        T2: Clone + 'static,
112    {
113        self.circuit().add_unary_operator(
114            ApplyOwned::<_, false>::new(func, Cow::Borrowed("ApplyOwned"), Location::caller()),
115            self,
116        )
117    }
118
119    /// Returns a stream that contains `func(x)` for each `x` in `self`, giving
120    /// the operator the given `name` for debugging and profiling purposes.
121    /// `func` cannot mutate captured state.
122    #[track_caller]
123    pub fn apply_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
124    where
125        N: Into<Cow<'static, str>>,
126        F: Fn(T1) -> T2 + 'static,
127        T2: Data,
128    {
129        self.circuit().add_unary_operator(
130            ApplyOwned::<_, true>::new(func, name.into(), Location::caller()),
131            self,
132        )
133    }
134
135    /// Returns a stream that contains `func(x)` for each `x` in `self`, giving
136    /// the operator the given `name` for debugging and profiling purposes.
137    /// `func` can access and mutate captured state.
138    #[track_caller]
139    pub fn apply_mut_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
140    where
141        N: Into<Cow<'static, str>>,
142        F: FnMut(T1) -> T2 + 'static,
143        T2: Data,
144    {
145        self.circuit().add_unary_operator(
146            ApplyOwned::<_, false>::new(func, name.into(), Location::caller()),
147            self,
148        )
149    }
150
151    /// Apply the `ApplyCore` operator to `self` with a custom name
152    #[track_caller]
153    pub fn apply_core<N, T2, O, B, F>(
154        &self,
155        name: N,
156        owned: O,
157        borrowed: B,
158        fixpoint: F,
159    ) -> Stream<C, T2>
160    where
161        N: Into<Cow<'static, str>>,
162        T2: Data,
163        O: FnMut(T1) -> T2 + 'static,
164        B: FnMut(&T1) -> T2 + 'static,
165        F: Fn(Scope) -> bool + 'static,
166    {
167        self.circuit().add_unary_operator(
168            ApplyCore::new(owned, borrowed, fixpoint, name.into(), Location::caller()),
169            self,
170        )
171    }
172}
173
174/// Operator that applies a user provided function to its input at each
175/// timestamp.
176pub struct Apply<F, const FP: bool> {
177    func: F,
178    name: Cow<'static, str>,
179    location: &'static Location<'static>,
180}
181
182impl<F, const FP: bool> Apply<F, FP> {
183    pub const fn new(
184        func: F,
185        name: Cow<'static, str>,
186        location: &'static Location<'static>,
187    ) -> Self {
188        Self {
189            func,
190            name,
191            location,
192        }
193    }
194}
195
196impl<F, const FP: bool> Operator for Apply<F, FP>
197where
198    F: 'static,
199{
200    fn name(&self) -> Cow<'static, str> {
201        self.name.clone()
202    }
203
204    fn location(&self) -> OperatorLocation {
205        Some(self.location)
206    }
207
208    fn fixedpoint(&self, _scope: Scope) -> bool {
209        assert!(FP);
210        true
211    }
212}
213
214impl<T1, T2, F, const FP: bool> UnaryOperator<T1, T2> for Apply<F, FP>
215where
216    F: FnMut(&T1) -> T2 + 'static,
217{
218    async fn eval(&mut self, i1: &T1) -> T2 {
219        (self.func)(i1)
220    }
221}
222
223pub struct ApplyOwned<F, const FP: bool> {
224    apply: F,
225    name: Cow<'static, str>,
226    location: &'static Location<'static>,
227}
228
229impl<F, const FP: bool> ApplyOwned<F, FP> {
230    pub const fn new(
231        apply: F,
232        name: Cow<'static, str>,
233        location: &'static Location<'static>,
234    ) -> Self {
235        Self {
236            apply,
237            name,
238            location,
239        }
240    }
241}
242
243impl<F, const FP: bool> Operator for ApplyOwned<F, FP>
244where
245    F: 'static,
246{
247    fn name(&self) -> Cow<'static, str> {
248        self.name.clone()
249    }
250
251    fn location(&self) -> OperatorLocation {
252        Some(self.location)
253    }
254
255    fn fixedpoint(&self, _scope: Scope) -> bool {
256        assert!(FP);
257        true
258    }
259}
260
261impl<T1, T2, F, const FP: bool> UnaryOperator<T1, T2> for ApplyOwned<F, FP>
262where
263    F: FnMut(T1) -> T2 + 'static,
264{
265    async fn eval(&mut self, _input: &T1) -> T2 {
266        unreachable!("cannot use ApplyOwned with reference inputs")
267    }
268
269    #[inline]
270    async fn eval_owned(&mut self, input: T1) -> T2 {
271        (self.apply)(input)
272    }
273
274    #[inline]
275    fn input_preference(&self) -> OwnershipPreference {
276        OwnershipPreference::STRONGLY_PREFER_OWNED
277    }
278}
279
280pub struct ApplyCore<O, B, F> {
281    owned: O,
282    borrowed: B,
283    fixpoint: F,
284    name: Cow<'static, str>,
285    location: &'static Location<'static>,
286}
287
288impl<O, B, F> ApplyCore<O, B, F> {
289    pub const fn new(
290        owned: O,
291        borrowed: B,
292        fixpoint: F,
293        name: Cow<'static, str>,
294        location: &'static Location<'static>,
295    ) -> Self {
296        Self {
297            owned,
298            borrowed,
299            fixpoint,
300            name,
301            location,
302        }
303    }
304}
305
306impl<O, B, F> Operator for ApplyCore<O, B, F>
307where
308    O: 'static,
309    B: 'static,
310    F: Fn(Scope) -> bool + 'static,
311{
312    fn name(&self) -> Cow<'static, str> {
313        self.name.clone()
314    }
315
316    fn location(&self) -> OperatorLocation {
317        Some(self.location)
318    }
319
320    fn fixedpoint(&self, scope: Scope) -> bool {
321        (self.fixpoint)(scope)
322    }
323}
324
325impl<O, B, F, T1, T2> UnaryOperator<T1, T2> for ApplyCore<O, B, F>
326where
327    O: FnMut(T1) -> T2 + 'static,
328    B: FnMut(&T1) -> T2 + 'static,
329    F: Fn(Scope) -> bool + 'static,
330{
331    async fn eval(&mut self, input: &T1) -> T2 {
332        (self.borrowed)(input)
333    }
334
335    #[inline]
336    async fn eval_owned(&mut self, input: T1) -> T2 {
337        (self.owned)(input)
338    }
339
340    #[inline]
341    fn input_preference(&self) -> OwnershipPreference {
342        OwnershipPreference::STRONGLY_PREFER_OWNED
343    }
344}