Skip to main content

renoir/operator/
rich_map_custom.rs

1use std::fmt::Display;
2use std::marker::PhantomData;
3
4use crate::block::{BlockStructure, OperatorStructure};
5use crate::operator::{Operator, StreamElement};
6use crate::scheduler::ExecutionMetadata;
7
8pub struct ElementGenerator<'a, Op> {
9    inner: &'a mut Op,
10}
11
12impl<'a, Op: Operator> ElementGenerator<'a, Op> {
13    pub fn new(inner: &'a mut Op) -> Self {
14        Self { inner }
15    }
16
17    #[allow(clippy::should_implement_trait)]
18    pub fn next(&mut self) -> StreamElement<Op::Out> {
19        self.inner.next()
20    }
21}
22
23#[derive(Debug)]
24pub struct RichMapCustom<O: Send, F, Op>
25where
26    F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send,
27    Op: Operator,
28{
29    prev: Op,
30    map_fn: F,
31    _new_out: PhantomData<O>,
32}
33
34impl<O: Send, F: Clone, Op: Clone> Clone for RichMapCustom<O, F, Op>
35where
36    F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send,
37    Op: Operator,
38{
39    fn clone(&self) -> Self {
40        Self {
41            prev: self.prev.clone(),
42            map_fn: self.map_fn.clone(),
43            _new_out: PhantomData,
44        }
45    }
46}
47
48impl<O: Send, F, Op> Display for RichMapCustom<O, F, Op>
49where
50    F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send,
51    Op: Operator,
52{
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(
55            f,
56            "{} -> RichMapCustom<{} -> {}>",
57            self.prev,
58            std::any::type_name::<Op::Out>(),
59            std::any::type_name::<O>()
60        )
61    }
62}
63
64impl<O: Send, F, Op> RichMapCustom<O, F, Op>
65where
66    F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send,
67    Op: Operator,
68{
69    pub(super) fn new(prev: Op, f: F) -> Self {
70        Self {
71            prev,
72            map_fn: f,
73            _new_out: Default::default(),
74        }
75    }
76}
77
78impl<O: Send, F, Op> Operator for RichMapCustom<O, F, Op>
79where
80    F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send,
81    Op: Operator,
82{
83    type Out = O;
84
85    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
86        self.prev.setup(metadata);
87    }
88
89    fn next(&mut self) -> StreamElement<O> {
90        let eg = ElementGenerator::new(&mut self.prev);
91        (self.map_fn)(eg)
92    }
93
94    fn structure(&self) -> BlockStructure {
95        self.prev
96            .structure()
97            .add_operator(OperatorStructure::new::<O, _>("RichMapCustom"))
98    }
99}