Skip to main content

oxigdal_streaming/core/
operators.rs

1//! Stream operators for processing data.
2
3use crate::core::stream::{StreamElement, StreamMessage};
4use crate::error::Result;
5use async_trait::async_trait;
6use std::sync::Arc;
7
8/// Base trait for stream operators.
9#[async_trait]
10pub trait StreamOperator: Send + Sync {
11    /// Process a stream message.
12    async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>>;
13
14    /// Get the operator name.
15    fn name(&self) -> &str;
16
17    /// Initialize the operator.
18    async fn initialize(&mut self) -> Result<()> {
19        Ok(())
20    }
21
22    /// Finalize the operator.
23    async fn finalize(&mut self) -> Result<()> {
24        Ok(())
25    }
26}
27
28/// A source operator that produces stream elements.
29#[async_trait]
30pub trait SourceOperator: Send + Sync {
31    /// Produce the next batch of elements.
32    async fn produce(&mut self) -> Result<Vec<StreamMessage>>;
33
34    /// Check if the source has more data.
35    async fn has_more(&self) -> bool;
36
37    /// Get the source name.
38    fn name(&self) -> &str;
39}
40
41/// A sink operator that consumes stream elements.
42#[async_trait]
43pub trait SinkOperator: Send + Sync {
44    /// Consume a batch of elements.
45    async fn consume(&mut self, messages: Vec<StreamMessage>) -> Result<()>;
46
47    /// Flush any buffered data.
48    async fn flush(&mut self) -> Result<()>;
49
50    /// Get the sink name.
51    fn name(&self) -> &str;
52}
53
54/// A transform operator that modifies stream elements.
55#[async_trait]
56pub trait TransformOperator: StreamOperator {
57    /// Transform a single element.
58    async fn transform(&mut self, element: StreamElement) -> Result<Vec<StreamElement>>;
59}
60
61/// A filter operator.
62pub struct FilterOperator<F>
63where
64    F: Fn(&StreamElement) -> bool + Send + Sync,
65{
66    predicate: Arc<F>,
67    name: String,
68}
69
70impl<F> FilterOperator<F>
71where
72    F: Fn(&StreamElement) -> bool + Send + Sync,
73{
74    /// Create a new filter operator.
75    pub fn new(predicate: F, name: String) -> Self {
76        Self {
77            predicate: Arc::new(predicate),
78            name,
79        }
80    }
81}
82
83#[async_trait]
84impl<F> StreamOperator for FilterOperator<F>
85where
86    F: Fn(&StreamElement) -> bool + Send + Sync,
87{
88    async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
89        match message {
90            StreamMessage::Data(elem) => {
91                if (self.predicate)(&elem) {
92                    Ok(vec![StreamMessage::Data(elem)])
93                } else {
94                    Ok(vec![])
95                }
96            }
97            other => Ok(vec![other]),
98        }
99    }
100
101    fn name(&self) -> &str {
102        &self.name
103    }
104}
105
106/// A map operator.
107pub struct MapOperator<F>
108where
109    F: Fn(StreamElement) -> StreamElement + Send + Sync,
110{
111    mapper: Arc<F>,
112    name: String,
113}
114
115impl<F> MapOperator<F>
116where
117    F: Fn(StreamElement) -> StreamElement + Send + Sync,
118{
119    /// Create a new map operator.
120    pub fn new(mapper: F, name: String) -> Self {
121        Self {
122            mapper: Arc::new(mapper),
123            name,
124        }
125    }
126}
127
128#[async_trait]
129impl<F> StreamOperator for MapOperator<F>
130where
131    F: Fn(StreamElement) -> StreamElement + Send + Sync,
132{
133    async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
134        match message {
135            StreamMessage::Data(elem) => {
136                let transformed = (self.mapper)(elem);
137                Ok(vec![StreamMessage::Data(transformed)])
138            }
139            other => Ok(vec![other]),
140        }
141    }
142
143    fn name(&self) -> &str {
144        &self.name
145    }
146}
147
148/// A flat map operator.
149pub struct FlatMapOperator<F>
150where
151    F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
152{
153    mapper: Arc<F>,
154    name: String,
155}
156
157impl<F> FlatMapOperator<F>
158where
159    F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
160{
161    /// Create a new flat map operator.
162    pub fn new(mapper: F, name: String) -> Self {
163        Self {
164            mapper: Arc::new(mapper),
165            name,
166        }
167    }
168}
169
170#[async_trait]
171impl<F> StreamOperator for FlatMapOperator<F>
172where
173    F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
174{
175    async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
176        match message {
177            StreamMessage::Data(elem) => {
178                let elements = (self.mapper)(elem);
179                Ok(elements.into_iter().map(StreamMessage::Data).collect())
180            }
181            other => Ok(vec![other]),
182        }
183    }
184
185    fn name(&self) -> &str {
186        &self.name
187    }
188}
189
190/// A logging sink operator for debugging.
191pub struct LoggingSink {
192    name: String,
193    count: u64,
194}
195
196impl LoggingSink {
197    /// Create a new logging sink.
198    pub fn new(name: String) -> Self {
199        Self { name, count: 0 }
200    }
201
202    /// Get the number of messages logged.
203    pub fn count(&self) -> u64 {
204        self.count
205    }
206}
207
208#[async_trait]
209impl SinkOperator for LoggingSink {
210    async fn consume(&mut self, messages: Vec<StreamMessage>) -> Result<()> {
211        for msg in messages {
212            match msg {
213                StreamMessage::Data(elem) => {
214                    tracing::debug!(
215                        sink = %self.name,
216                        event_time = %elem.event_time,
217                        size = elem.size_bytes(),
218                        "Received element"
219                    );
220                    self.count += 1;
221                }
222                StreamMessage::Watermark(wm) => {
223                    tracing::debug!(sink = %self.name, watermark = %wm, "Received watermark");
224                }
225                StreamMessage::Checkpoint(id) => {
226                    tracing::debug!(sink = %self.name, checkpoint = id, "Received checkpoint");
227                }
228                StreamMessage::EndOfStream => {
229                    tracing::info!(sink = %self.name, "Received end of stream");
230                }
231            }
232        }
233        Ok(())
234    }
235
236    async fn flush(&mut self) -> Result<()> {
237        tracing::debug!(sink = %self.name, "Flushing sink");
238        Ok(())
239    }
240
241    fn name(&self) -> &str {
242        &self.name
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use chrono::Utc;
250
251    #[tokio::test]
252    async fn test_filter_operator() {
253        let mut filter = FilterOperator::new(
254            |elem: &StreamElement| elem.data.len() > 2,
255            "test_filter".to_string(),
256        );
257
258        let elem1 = StreamElement::new(vec![1, 2, 3], Utc::now());
259        let elem2 = StreamElement::new(vec![1], Utc::now());
260
261        let result1 = filter
262            .process(StreamMessage::Data(elem1))
263            .await
264            .expect("filter should process element");
265        assert_eq!(result1.len(), 1);
266
267        let result2 = filter
268            .process(StreamMessage::Data(elem2))
269            .await
270            .expect("filter should process element");
271        assert_eq!(result2.len(), 0);
272    }
273
274    #[tokio::test]
275    async fn test_map_operator() {
276        let mut mapper = MapOperator::new(
277            |mut elem: StreamElement| {
278                elem.data.push(99);
279                elem
280            },
281            "test_map".to_string(),
282        );
283
284        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
285        let result = mapper
286            .process(StreamMessage::Data(elem))
287            .await
288            .expect("map should transform element");
289
290        assert_eq!(result.len(), 1);
291        if let StreamMessage::Data(transformed) = &result[0] {
292            assert_eq!(transformed.data.len(), 4);
293            assert_eq!(transformed.data[3], 99);
294        } else {
295            panic!("Expected data message");
296        }
297    }
298
299    #[tokio::test]
300    async fn test_flat_map_operator() {
301        let mut flat_mapper = FlatMapOperator::new(
302            |elem: StreamElement| {
303                vec![
304                    StreamElement::new(elem.data.clone(), elem.event_time),
305                    StreamElement::new(elem.data.clone(), elem.event_time),
306                ]
307            },
308            "test_flatmap".to_string(),
309        );
310
311        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
312        let result = flat_mapper
313            .process(StreamMessage::Data(elem))
314            .await
315            .expect("flat_map should process element");
316
317        assert_eq!(result.len(), 2);
318    }
319
320    #[tokio::test]
321    async fn test_logging_sink() {
322        let mut sink = LoggingSink::new("test_sink".to_string());
323
324        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
325        sink.consume(vec![StreamMessage::Data(elem)])
326            .await
327            .expect("sink should consume element");
328
329        assert_eq!(sink.count(), 1);
330    }
331}