noir_compute/operator/
filter_map.rs

1use std::fmt::Display;
2use std::marker::PhantomData;
3
4use crate::block::{BlockStructure, OperatorStructure};
5use crate::operator::{Data, Operator};
6
7use crate::ExecutionMetadata;
8
9use super::StreamElement;
10
11#[derive(Clone)]
12pub struct FilterMap<Out: Data, PreviousOperator, Predicate>
13where
14    Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
15    PreviousOperator: Operator + 'static,
16{
17    prev: PreviousOperator,
18    predicate: Predicate,
19    _out: PhantomData<Out>,
20}
21
22impl<Out: Data, PreviousOperator, Predicate> Display for FilterMap<Out, PreviousOperator, Predicate>
23where
24    Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
25    PreviousOperator: Operator + 'static,
26{
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        write!(
29            f,
30            "{} -> FilterMap<{}>",
31            self.prev,
32            std::any::type_name::<Out>()
33        )
34    }
35}
36
37impl<Out: Data, PreviousOperator, Predicate> FilterMap<Out, PreviousOperator, Predicate>
38where
39    Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
40    PreviousOperator: Operator + 'static,
41{
42    pub(super) fn new(prev: PreviousOperator, predicate: Predicate) -> Self {
43        Self {
44            prev,
45            predicate,
46            _out: Default::default(),
47        }
48    }
49}
50
51impl<Out: Data, PreviousOperator, Predicate> Operator
52    for FilterMap<Out, PreviousOperator, Predicate>
53where
54    Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
55    PreviousOperator: Operator + 'static,
56{
57    type Out = Out;
58
59    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
60        self.prev.setup(metadata);
61    }
62
63    #[inline]
64    fn next(&mut self) -> StreamElement<Out> {
65        loop {
66            match self.prev.next() {
67                StreamElement::Item(item) => {
68                    if let Some(el) = (self.predicate)(item) {
69                        return StreamElement::Item(el);
70                    }
71                }
72                StreamElement::Timestamped(item, ts) => {
73                    if let Some(el) = (self.predicate)(item) {
74                        return StreamElement::Timestamped(el, ts);
75                    }
76                }
77                StreamElement::Watermark(w) => return StreamElement::Watermark(w),
78                StreamElement::Terminate => return StreamElement::Terminate,
79                StreamElement::FlushAndRestart => return StreamElement::FlushAndRestart,
80                StreamElement::FlushBatch => return StreamElement::FlushBatch,
81            }
82        }
83    }
84
85    fn structure(&self) -> BlockStructure {
86        self.prev
87            .structure()
88            .add_operator(OperatorStructure::new::<Out, _>("FilterMap"))
89    }
90}