oxigdal_streaming/transformations/
transform.rs1use crate::core::stream::{StreamElement, StreamMessage};
4use std::sync::Arc;
5
6pub struct MapTransform<F>
8where
9 F: Fn(Vec<u8>) -> Vec<u8> + Send + Sync,
10{
11 mapper: Arc<F>,
12}
13
14impl<F> MapTransform<F>
15where
16 F: Fn(Vec<u8>) -> Vec<u8> + Send + Sync,
17{
18 pub fn new(mapper: F) -> Self {
20 Self {
21 mapper: Arc::new(mapper),
22 }
23 }
24
25 pub fn apply(&self, element: StreamElement) -> StreamElement {
27 let new_data = (self.mapper)(element.data);
28 StreamElement {
29 data: new_data,
30 event_time: element.event_time,
31 processing_time: element.processing_time,
32 key: element.key,
33 metadata: element.metadata,
34 }
35 }
36
37 pub fn apply_message(&self, message: StreamMessage) -> StreamMessage {
39 match message {
40 StreamMessage::Data(elem) => StreamMessage::Data(self.apply(elem)),
41 other => other,
42 }
43 }
44}
45
46pub struct FilterTransform<F>
48where
49 F: Fn(&StreamElement) -> bool + Send + Sync,
50{
51 predicate: Arc<F>,
52}
53
54impl<F> FilterTransform<F>
55where
56 F: Fn(&StreamElement) -> bool + Send + Sync,
57{
58 pub fn new(predicate: F) -> Self {
60 Self {
61 predicate: Arc::new(predicate),
62 }
63 }
64
65 pub fn test(&self, element: &StreamElement) -> bool {
67 (self.predicate)(element)
68 }
69
70 pub fn apply_message(&self, message: StreamMessage) -> Option<StreamMessage> {
72 match message {
73 StreamMessage::Data(elem) => {
74 if self.test(&elem) {
75 Some(StreamMessage::Data(elem))
76 } else {
77 None
78 }
79 }
80 other => Some(other),
81 }
82 }
83}
84
85pub struct FlatMapTransform<F>
87where
88 F: Fn(Vec<u8>) -> Vec<Vec<u8>> + Send + Sync,
89{
90 mapper: Arc<F>,
91}
92
93impl<F> FlatMapTransform<F>
94where
95 F: Fn(Vec<u8>) -> Vec<Vec<u8>> + Send + Sync,
96{
97 pub fn new(mapper: F) -> Self {
99 Self {
100 mapper: Arc::new(mapper),
101 }
102 }
103
104 pub fn apply(&self, element: StreamElement) -> Vec<StreamElement> {
106 let new_data_vec = (self.mapper)(element.data);
107 new_data_vec
108 .into_iter()
109 .map(|data| StreamElement {
110 data,
111 event_time: element.event_time,
112 processing_time: element.processing_time,
113 key: element.key.clone(),
114 metadata: element.metadata.clone(),
115 })
116 .collect()
117 }
118
119 pub fn apply_message(&self, message: StreamMessage) -> Vec<StreamMessage> {
121 match message {
122 StreamMessage::Data(elem) => self
123 .apply(elem)
124 .into_iter()
125 .map(StreamMessage::Data)
126 .collect(),
127 other => vec![other],
128 }
129 }
130}
131
132pub struct KeyByTransform<F>
134where
135 F: Fn(&Vec<u8>) -> Vec<u8> + Send + Sync,
136{
137 key_selector: Arc<F>,
138}
139
140impl<F> KeyByTransform<F>
141where
142 F: Fn(&Vec<u8>) -> Vec<u8> + Send + Sync,
143{
144 pub fn new(key_selector: F) -> Self {
146 Self {
147 key_selector: Arc::new(key_selector),
148 }
149 }
150
151 pub fn apply(&self, element: StreamElement) -> StreamElement {
153 let key = (self.key_selector)(&element.data);
154 StreamElement {
155 data: element.data,
156 event_time: element.event_time,
157 processing_time: element.processing_time,
158 key: Some(key),
159 metadata: element.metadata,
160 }
161 }
162
163 pub fn apply_message(&self, message: StreamMessage) -> StreamMessage {
165 match message {
166 StreamMessage::Data(elem) => StreamMessage::Data(self.apply(elem)),
167 other => other,
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use chrono::Utc;
176
177 #[test]
178 fn test_map_transform() {
179 let transform = MapTransform::new(|mut data: Vec<u8>| {
180 data.push(99);
181 data
182 });
183
184 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
185 let result = transform.apply(elem);
186
187 assert_eq!(result.data, vec![1, 2, 3, 99]);
188 }
189
190 #[test]
191 fn test_filter_transform() {
192 let transform = FilterTransform::new(|elem: &StreamElement| elem.data.len() > 2);
193
194 let elem1 = StreamElement::new(vec![1, 2, 3], Utc::now());
195 let elem2 = StreamElement::new(vec![1], Utc::now());
196
197 assert!(transform.test(&elem1));
198 assert!(!transform.test(&elem2));
199 }
200
201 #[test]
202 fn test_flatmap_transform() {
203 let transform = FlatMapTransform::new(|data: Vec<u8>| vec![data.clone(), data]);
204
205 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
206 let result = transform.apply(elem);
207
208 assert_eq!(result.len(), 2);
209 assert_eq!(result[0].data, vec![1, 2, 3]);
210 assert_eq!(result[1].data, vec![1, 2, 3]);
211 }
212
213 #[test]
214 fn test_keyby_transform() {
215 let transform = KeyByTransform::new(|data: &Vec<u8>| vec![data.len() as u8]);
216
217 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
218 let result = transform.apply(elem);
219
220 assert_eq!(result.key, Some(vec![3]));
221 }
222}