oxigdal_streaming/core/
operators.rs1use crate::core::stream::{StreamElement, StreamMessage};
4use crate::error::Result;
5use async_trait::async_trait;
6use std::sync::Arc;
7
8#[async_trait]
10pub trait StreamOperator: Send + Sync {
11 async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>>;
13
14 fn name(&self) -> &str;
16
17 async fn initialize(&mut self) -> Result<()> {
19 Ok(())
20 }
21
22 async fn finalize(&mut self) -> Result<()> {
24 Ok(())
25 }
26}
27
28#[async_trait]
30pub trait SourceOperator: Send + Sync {
31 async fn produce(&mut self) -> Result<Vec<StreamMessage>>;
33
34 async fn has_more(&self) -> bool;
36
37 fn name(&self) -> &str;
39}
40
41#[async_trait]
43pub trait SinkOperator: Send + Sync {
44 async fn consume(&mut self, messages: Vec<StreamMessage>) -> Result<()>;
46
47 async fn flush(&mut self) -> Result<()>;
49
50 fn name(&self) -> &str;
52}
53
54#[async_trait]
56pub trait TransformOperator: StreamOperator {
57 async fn transform(&mut self, element: StreamElement) -> Result<Vec<StreamElement>>;
59}
60
61pub struct FilterOperator<F>
63where
64 F: Fn(&StreamElement) -> bool + Send + Sync,
65{
66 predicate: Arc<F>,
67 name: String,
68}
69
70impl<F> FilterOperator<F>
71where
72 F: Fn(&StreamElement) -> bool + Send + Sync,
73{
74 pub fn new(predicate: F, name: String) -> Self {
76 Self {
77 predicate: Arc::new(predicate),
78 name,
79 }
80 }
81}
82
83#[async_trait]
84impl<F> StreamOperator for FilterOperator<F>
85where
86 F: Fn(&StreamElement) -> bool + Send + Sync,
87{
88 async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
89 match message {
90 StreamMessage::Data(elem) => {
91 if (self.predicate)(&elem) {
92 Ok(vec![StreamMessage::Data(elem)])
93 } else {
94 Ok(vec![])
95 }
96 }
97 other => Ok(vec![other]),
98 }
99 }
100
101 fn name(&self) -> &str {
102 &self.name
103 }
104}
105
106pub struct MapOperator<F>
108where
109 F: Fn(StreamElement) -> StreamElement + Send + Sync,
110{
111 mapper: Arc<F>,
112 name: String,
113}
114
115impl<F> MapOperator<F>
116where
117 F: Fn(StreamElement) -> StreamElement + Send + Sync,
118{
119 pub fn new(mapper: F, name: String) -> Self {
121 Self {
122 mapper: Arc::new(mapper),
123 name,
124 }
125 }
126}
127
128#[async_trait]
129impl<F> StreamOperator for MapOperator<F>
130where
131 F: Fn(StreamElement) -> StreamElement + Send + Sync,
132{
133 async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
134 match message {
135 StreamMessage::Data(elem) => {
136 let transformed = (self.mapper)(elem);
137 Ok(vec![StreamMessage::Data(transformed)])
138 }
139 other => Ok(vec![other]),
140 }
141 }
142
143 fn name(&self) -> &str {
144 &self.name
145 }
146}
147
148pub struct FlatMapOperator<F>
150where
151 F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
152{
153 mapper: Arc<F>,
154 name: String,
155}
156
157impl<F> FlatMapOperator<F>
158where
159 F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
160{
161 pub fn new(mapper: F, name: String) -> Self {
163 Self {
164 mapper: Arc::new(mapper),
165 name,
166 }
167 }
168}
169
170#[async_trait]
171impl<F> StreamOperator for FlatMapOperator<F>
172where
173 F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
174{
175 async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
176 match message {
177 StreamMessage::Data(elem) => {
178 let elements = (self.mapper)(elem);
179 Ok(elements.into_iter().map(StreamMessage::Data).collect())
180 }
181 other => Ok(vec![other]),
182 }
183 }
184
185 fn name(&self) -> &str {
186 &self.name
187 }
188}
189
190pub struct LoggingSink {
192 name: String,
193 count: u64,
194}
195
196impl LoggingSink {
197 pub fn new(name: String) -> Self {
199 Self { name, count: 0 }
200 }
201
202 pub fn count(&self) -> u64 {
204 self.count
205 }
206}
207
208#[async_trait]
209impl SinkOperator for LoggingSink {
210 async fn consume(&mut self, messages: Vec<StreamMessage>) -> Result<()> {
211 for msg in messages {
212 match msg {
213 StreamMessage::Data(elem) => {
214 tracing::debug!(
215 sink = %self.name,
216 event_time = %elem.event_time,
217 size = elem.size_bytes(),
218 "Received element"
219 );
220 self.count += 1;
221 }
222 StreamMessage::Watermark(wm) => {
223 tracing::debug!(sink = %self.name, watermark = %wm, "Received watermark");
224 }
225 StreamMessage::Checkpoint(id) => {
226 tracing::debug!(sink = %self.name, checkpoint = id, "Received checkpoint");
227 }
228 StreamMessage::EndOfStream => {
229 tracing::info!(sink = %self.name, "Received end of stream");
230 }
231 }
232 }
233 Ok(())
234 }
235
236 async fn flush(&mut self) -> Result<()> {
237 tracing::debug!(sink = %self.name, "Flushing sink");
238 Ok(())
239 }
240
241 fn name(&self) -> &str {
242 &self.name
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use chrono::Utc;
250
251 #[tokio::test]
252 async fn test_filter_operator() {
253 let mut filter = FilterOperator::new(
254 |elem: &StreamElement| elem.data.len() > 2,
255 "test_filter".to_string(),
256 );
257
258 let elem1 = StreamElement::new(vec![1, 2, 3], Utc::now());
259 let elem2 = StreamElement::new(vec![1], Utc::now());
260
261 let result1 = filter
262 .process(StreamMessage::Data(elem1))
263 .await
264 .expect("filter should process element");
265 assert_eq!(result1.len(), 1);
266
267 let result2 = filter
268 .process(StreamMessage::Data(elem2))
269 .await
270 .expect("filter should process element");
271 assert_eq!(result2.len(), 0);
272 }
273
274 #[tokio::test]
275 async fn test_map_operator() {
276 let mut mapper = MapOperator::new(
277 |mut elem: StreamElement| {
278 elem.data.push(99);
279 elem
280 },
281 "test_map".to_string(),
282 );
283
284 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
285 let result = mapper
286 .process(StreamMessage::Data(elem))
287 .await
288 .expect("map should transform element");
289
290 assert_eq!(result.len(), 1);
291 if let StreamMessage::Data(transformed) = &result[0] {
292 assert_eq!(transformed.data.len(), 4);
293 assert_eq!(transformed.data[3], 99);
294 } else {
295 panic!("Expected data message");
296 }
297 }
298
299 #[tokio::test]
300 async fn test_flat_map_operator() {
301 let mut flat_mapper = FlatMapOperator::new(
302 |elem: StreamElement| {
303 vec![
304 StreamElement::new(elem.data.clone(), elem.event_time),
305 StreamElement::new(elem.data.clone(), elem.event_time),
306 ]
307 },
308 "test_flatmap".to_string(),
309 );
310
311 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
312 let result = flat_mapper
313 .process(StreamMessage::Data(elem))
314 .await
315 .expect("flat_map should process element");
316
317 assert_eq!(result.len(), 2);
318 }
319
320 #[tokio::test]
321 async fn test_logging_sink() {
322 let mut sink = LoggingSink::new("test_sink".to_string());
323
324 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
325 sink.consume(vec![StreamMessage::Data(elem)])
326 .await
327 .expect("sink should consume element");
328
329 assert_eq!(sink.count(), 1);
330 }
331}