Skip to main content

oxigdal_streaming/transformations/
transform.rs

1//! Basic transformation operations.
2
3use crate::core::stream::{StreamElement, StreamMessage};
4use std::sync::Arc;
5
6/// Map transformation.
7pub struct MapTransform<F>
8where
9    F: Fn(Vec<u8>) -> Vec<u8> + Send + Sync,
10{
11    mapper: Arc<F>,
12}
13
14impl<F> MapTransform<F>
15where
16    F: Fn(Vec<u8>) -> Vec<u8> + Send + Sync,
17{
18    /// Create a new map transformation.
19    pub fn new(mapper: F) -> Self {
20        Self {
21            mapper: Arc::new(mapper),
22        }
23    }
24
25    /// Apply the transformation to an element.
26    pub fn apply(&self, element: StreamElement) -> StreamElement {
27        let new_data = (self.mapper)(element.data);
28        StreamElement {
29            data: new_data,
30            event_time: element.event_time,
31            processing_time: element.processing_time,
32            key: element.key,
33            metadata: element.metadata,
34        }
35    }
36
37    /// Apply the transformation to a message.
38    pub fn apply_message(&self, message: StreamMessage) -> StreamMessage {
39        match message {
40            StreamMessage::Data(elem) => StreamMessage::Data(self.apply(elem)),
41            other => other,
42        }
43    }
44}
45
46/// Filter transformation.
47pub struct FilterTransform<F>
48where
49    F: Fn(&StreamElement) -> bool + Send + Sync,
50{
51    predicate: Arc<F>,
52}
53
54impl<F> FilterTransform<F>
55where
56    F: Fn(&StreamElement) -> bool + Send + Sync,
57{
58    /// Create a new filter transformation.
59    pub fn new(predicate: F) -> Self {
60        Self {
61            predicate: Arc::new(predicate),
62        }
63    }
64
65    /// Check if an element passes the filter.
66    pub fn test(&self, element: &StreamElement) -> bool {
67        (self.predicate)(element)
68    }
69
70    /// Apply the filter to a message.
71    pub fn apply_message(&self, message: StreamMessage) -> Option<StreamMessage> {
72        match message {
73            StreamMessage::Data(elem) => {
74                if self.test(&elem) {
75                    Some(StreamMessage::Data(elem))
76                } else {
77                    None
78                }
79            }
80            other => Some(other),
81        }
82    }
83}
84
85/// FlatMap transformation.
86pub struct FlatMapTransform<F>
87where
88    F: Fn(Vec<u8>) -> Vec<Vec<u8>> + Send + Sync,
89{
90    mapper: Arc<F>,
91}
92
93impl<F> FlatMapTransform<F>
94where
95    F: Fn(Vec<u8>) -> Vec<Vec<u8>> + Send + Sync,
96{
97    /// Create a new flat map transformation.
98    pub fn new(mapper: F) -> Self {
99        Self {
100            mapper: Arc::new(mapper),
101        }
102    }
103
104    /// Apply the transformation to an element.
105    pub fn apply(&self, element: StreamElement) -> Vec<StreamElement> {
106        let new_data_vec = (self.mapper)(element.data);
107        new_data_vec
108            .into_iter()
109            .map(|data| StreamElement {
110                data,
111                event_time: element.event_time,
112                processing_time: element.processing_time,
113                key: element.key.clone(),
114                metadata: element.metadata.clone(),
115            })
116            .collect()
117    }
118
119    /// Apply the transformation to a message.
120    pub fn apply_message(&self, message: StreamMessage) -> Vec<StreamMessage> {
121        match message {
122            StreamMessage::Data(elem) => self
123                .apply(elem)
124                .into_iter()
125                .map(StreamMessage::Data)
126                .collect(),
127            other => vec![other],
128        }
129    }
130}
131
132/// KeyBy transformation.
133pub struct KeyByTransform<F>
134where
135    F: Fn(&Vec<u8>) -> Vec<u8> + Send + Sync,
136{
137    key_selector: Arc<F>,
138}
139
140impl<F> KeyByTransform<F>
141where
142    F: Fn(&Vec<u8>) -> Vec<u8> + Send + Sync,
143{
144    /// Create a new keyBy transformation.
145    pub fn new(key_selector: F) -> Self {
146        Self {
147            key_selector: Arc::new(key_selector),
148        }
149    }
150
151    /// Apply the transformation to an element.
152    pub fn apply(&self, element: StreamElement) -> StreamElement {
153        let key = (self.key_selector)(&element.data);
154        StreamElement {
155            data: element.data,
156            event_time: element.event_time,
157            processing_time: element.processing_time,
158            key: Some(key),
159            metadata: element.metadata,
160        }
161    }
162
163    /// Apply the transformation to a message.
164    pub fn apply_message(&self, message: StreamMessage) -> StreamMessage {
165        match message {
166            StreamMessage::Data(elem) => StreamMessage::Data(self.apply(elem)),
167            other => other,
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use chrono::Utc;
176
177    #[test]
178    fn test_map_transform() {
179        let transform = MapTransform::new(|mut data: Vec<u8>| {
180            data.push(99);
181            data
182        });
183
184        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
185        let result = transform.apply(elem);
186
187        assert_eq!(result.data, vec![1, 2, 3, 99]);
188    }
189
190    #[test]
191    fn test_filter_transform() {
192        let transform = FilterTransform::new(|elem: &StreamElement| elem.data.len() > 2);
193
194        let elem1 = StreamElement::new(vec![1, 2, 3], Utc::now());
195        let elem2 = StreamElement::new(vec![1], Utc::now());
196
197        assert!(transform.test(&elem1));
198        assert!(!transform.test(&elem2));
199    }
200
201    #[test]
202    fn test_flatmap_transform() {
203        let transform = FlatMapTransform::new(|data: Vec<u8>| vec![data.clone(), data]);
204
205        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
206        let result = transform.apply(elem);
207
208        assert_eq!(result.len(), 2);
209        assert_eq!(result[0].data, vec![1, 2, 3]);
210        assert_eq!(result[1].data, vec![1, 2, 3]);
211    }
212
213    #[test]
214    fn test_keyby_transform() {
215        let transform = KeyByTransform::new(|data: &Vec<u8>| vec![data.len() as u8]);
216
217        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
218        let result = transform.apply(elem);
219
220        assert_eq!(result.key, Some(vec![3]));
221    }
222}