noir_compute/operator/
filter_map.rs1use std::fmt::Display;
2use std::marker::PhantomData;
3
4use crate::block::{BlockStructure, OperatorStructure};
5use crate::operator::{Data, Operator};
6
7use crate::ExecutionMetadata;
8
9use super::StreamElement;
10
11#[derive(Clone)]
12pub struct FilterMap<Out: Data, PreviousOperator, Predicate>
13where
14 Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
15 PreviousOperator: Operator + 'static,
16{
17 prev: PreviousOperator,
18 predicate: Predicate,
19 _out: PhantomData<Out>,
20}
21
22impl<Out: Data, PreviousOperator, Predicate> Display for FilterMap<Out, PreviousOperator, Predicate>
23where
24 Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
25 PreviousOperator: Operator + 'static,
26{
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 write!(
29 f,
30 "{} -> FilterMap<{}>",
31 self.prev,
32 std::any::type_name::<Out>()
33 )
34 }
35}
36
37impl<Out: Data, PreviousOperator, Predicate> FilterMap<Out, PreviousOperator, Predicate>
38where
39 Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
40 PreviousOperator: Operator + 'static,
41{
42 pub(super) fn new(prev: PreviousOperator, predicate: Predicate) -> Self {
43 Self {
44 prev,
45 predicate,
46 _out: Default::default(),
47 }
48 }
49}
50
51impl<Out: Data, PreviousOperator, Predicate> Operator
52 for FilterMap<Out, PreviousOperator, Predicate>
53where
54 Predicate: Fn(PreviousOperator::Out) -> Option<Out> + Send + Clone + 'static,
55 PreviousOperator: Operator + 'static,
56{
57 type Out = Out;
58
59 fn setup(&mut self, metadata: &mut ExecutionMetadata) {
60 self.prev.setup(metadata);
61 }
62
63 #[inline]
64 fn next(&mut self) -> StreamElement<Out> {
65 loop {
66 match self.prev.next() {
67 StreamElement::Item(item) => {
68 if let Some(el) = (self.predicate)(item) {
69 return StreamElement::Item(el);
70 }
71 }
72 StreamElement::Timestamped(item, ts) => {
73 if let Some(el) = (self.predicate)(item) {
74 return StreamElement::Timestamped(el, ts);
75 }
76 }
77 StreamElement::Watermark(w) => return StreamElement::Watermark(w),
78 StreamElement::Terminate => return StreamElement::Terminate,
79 StreamElement::FlushAndRestart => return StreamElement::FlushAndRestart,
80 StreamElement::FlushBatch => return StreamElement::FlushBatch,
81 }
82 }
83 }
84
85 fn structure(&self) -> BlockStructure {
86 self.prev
87 .structure()
88 .add_operator(OperatorStructure::new::<Out, _>("FilterMap"))
89 }
90}