Skip to main content

oxirs_stream/processing/
operators.rs

1//! Advanced Stream Operators
2//!
3//! This module provides functional stream processing operators:
4//! - map: Transform events
5//! - filter: Select events based on predicate
6//! - flatMap: Transform and flatten events
7//! - partition: Split streams based on criteria
8//! - reduce: Aggregate events
9//! - scan: Stateful transformation
10//! - distinct: Remove duplicates
11//! - throttle: Rate limiting
12//! - debounce: Event coalescing
13
14use crate::StreamEvent;
15use anyhow::Result;
16use chrono::{DateTime, Duration as ChronoDuration, Utc};
17use std::collections::{HashSet, VecDeque};
18use std::sync::Arc;
19
20/// Stream operator trait for composable transformations
21#[async_trait::async_trait]
22pub trait StreamOperator: Send + Sync {
23    /// Apply operator to an event
24    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>>;
25
26    /// Get operator statistics
27    fn stats(&self) -> OperatorStats;
28
29    /// Reset operator state
30    fn reset(&mut self);
31}
32
33/// Operator statistics for monitoring
34#[derive(Debug, Clone, Default)]
35pub struct OperatorStats {
36    pub events_processed: u64,
37    pub events_emitted: u64,
38    pub events_filtered: u64,
39    pub processing_time_ms: f64,
40    pub errors: u64,
41}
42
43/// Map operator - transforms each event
44pub struct MapOperator<F>
45where
46    F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync,
47{
48    transform: Arc<F>,
49    stats: OperatorStats,
50}
51
52impl<F> MapOperator<F>
53where
54    F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync,
55{
56    pub fn new(transform: F) -> Self {
57        Self {
58            transform: Arc::new(transform),
59            stats: OperatorStats::default(),
60        }
61    }
62}
63
64#[async_trait::async_trait]
65impl<F> StreamOperator for MapOperator<F>
66where
67    F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync,
68{
69    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
70        let start = std::time::Instant::now();
71
72        self.stats.events_processed += 1;
73
74        match (self.transform)(event) {
75            Ok(transformed) => {
76                self.stats.events_emitted += 1;
77                self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
78                Ok(vec![transformed])
79            }
80            Err(e) => {
81                self.stats.errors += 1;
82                Err(e)
83            }
84        }
85    }
86
87    fn stats(&self) -> OperatorStats {
88        self.stats.clone()
89    }
90
91    fn reset(&mut self) {
92        self.stats = OperatorStats::default();
93    }
94}
95
96/// Filter operator - selects events based on predicate
97pub struct FilterOperator<F>
98where
99    F: Fn(&StreamEvent) -> bool + Send + Sync,
100{
101    predicate: Arc<F>,
102    stats: OperatorStats,
103}
104
105impl<F> FilterOperator<F>
106where
107    F: Fn(&StreamEvent) -> bool + Send + Sync,
108{
109    pub fn new(predicate: F) -> Self {
110        Self {
111            predicate: Arc::new(predicate),
112            stats: OperatorStats::default(),
113        }
114    }
115}
116
117#[async_trait::async_trait]
118impl<F> StreamOperator for FilterOperator<F>
119where
120    F: Fn(&StreamEvent) -> bool + Send + Sync,
121{
122    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
123        let start = std::time::Instant::now();
124
125        self.stats.events_processed += 1;
126
127        if (self.predicate)(&event) {
128            self.stats.events_emitted += 1;
129            self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
130            Ok(vec![event])
131        } else {
132            self.stats.events_filtered += 1;
133            self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
134            Ok(vec![])
135        }
136    }
137
138    fn stats(&self) -> OperatorStats {
139        self.stats.clone()
140    }
141
142    fn reset(&mut self) {
143        self.stats = OperatorStats::default();
144    }
145}
146
147/// FlatMap operator - transforms and flattens events
148pub struct FlatMapOperator<F>
149where
150    F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync,
151{
152    transform: Arc<F>,
153    stats: OperatorStats,
154}
155
156impl<F> FlatMapOperator<F>
157where
158    F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync,
159{
160    pub fn new(transform: F) -> Self {
161        Self {
162            transform: Arc::new(transform),
163            stats: OperatorStats::default(),
164        }
165    }
166}
167
168#[async_trait::async_trait]
169impl<F> StreamOperator for FlatMapOperator<F>
170where
171    F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync,
172{
173    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
174        let start = std::time::Instant::now();
175
176        self.stats.events_processed += 1;
177
178        match (self.transform)(event) {
179            Ok(events) => {
180                self.stats.events_emitted += events.len() as u64;
181                self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
182                Ok(events)
183            }
184            Err(e) => {
185                self.stats.errors += 1;
186                Err(e)
187            }
188        }
189    }
190
191    fn stats(&self) -> OperatorStats {
192        self.stats.clone()
193    }
194
195    fn reset(&mut self) {
196        self.stats = OperatorStats::default();
197    }
198}
199
200/// Partition operator - splits stream into multiple partitions
201pub struct PartitionOperator<F>
202where
203    F: Fn(&StreamEvent) -> usize + Send + Sync,
204{
205    partition_fn: Arc<F>,
206    num_partitions: usize,
207    partition_buffers: Vec<VecDeque<StreamEvent>>,
208    stats: OperatorStats,
209}
210
211impl<F> PartitionOperator<F>
212where
213    F: Fn(&StreamEvent) -> usize + Send + Sync,
214{
215    pub fn new(partition_fn: F, num_partitions: usize) -> Self {
216        Self {
217            partition_fn: Arc::new(partition_fn),
218            num_partitions,
219            partition_buffers: vec![VecDeque::new(); num_partitions],
220            stats: OperatorStats::default(),
221        }
222    }
223
224    pub fn get_partition(&mut self, partition_id: usize) -> Option<Vec<StreamEvent>> {
225        if partition_id >= self.num_partitions {
226            return None;
227        }
228
229        let events: Vec<_> = self.partition_buffers[partition_id].drain(..).collect();
230        Some(events)
231    }
232}
233
234#[async_trait::async_trait]
235impl<F> StreamOperator for PartitionOperator<F>
236where
237    F: Fn(&StreamEvent) -> usize + Send + Sync,
238{
239    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
240        let start = std::time::Instant::now();
241
242        self.stats.events_processed += 1;
243
244        let partition_id = (self.partition_fn)(&event) % self.num_partitions;
245        self.partition_buffers[partition_id].push_back(event.clone());
246
247        self.stats.events_emitted += 1;
248        self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
249
250        Ok(vec![event])
251    }
252
253    fn stats(&self) -> OperatorStats {
254        self.stats.clone()
255    }
256
257    fn reset(&mut self) {
258        self.stats = OperatorStats::default();
259        for buffer in &mut self.partition_buffers {
260            buffer.clear();
261        }
262    }
263}
264
265/// Distinct operator - removes duplicate events
266pub struct DistinctOperator {
267    seen: HashSet<String>,
268    key_extractor: Arc<dyn Fn(&StreamEvent) -> String + Send + Sync>,
269    stats: OperatorStats,
270}
271
272impl DistinctOperator {
273    pub fn new<F>(key_extractor: F) -> Self
274    where
275        F: Fn(&StreamEvent) -> String + Send + Sync + 'static,
276    {
277        Self {
278            seen: HashSet::new(),
279            key_extractor: Arc::new(key_extractor),
280            stats: OperatorStats::default(),
281        }
282    }
283}
284
285#[async_trait::async_trait]
286impl StreamOperator for DistinctOperator {
287    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
288        let start = std::time::Instant::now();
289
290        self.stats.events_processed += 1;
291
292        let key = (self.key_extractor)(&event);
293
294        if self.seen.insert(key) {
295            self.stats.events_emitted += 1;
296            self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
297            Ok(vec![event])
298        } else {
299            self.stats.events_filtered += 1;
300            self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
301            Ok(vec![])
302        }
303    }
304
305    fn stats(&self) -> OperatorStats {
306        self.stats.clone()
307    }
308
309    fn reset(&mut self) {
310        self.stats = OperatorStats::default();
311        self.seen.clear();
312    }
313}
314
315/// Throttle operator - rate limiting
316pub struct ThrottleOperator {
317    interval: ChronoDuration,
318    last_emit: Option<DateTime<Utc>>,
319    stats: OperatorStats,
320}
321
322impl ThrottleOperator {
323    pub fn new(interval: ChronoDuration) -> Self {
324        Self {
325            interval,
326            last_emit: None,
327            stats: OperatorStats::default(),
328        }
329    }
330}
331
332#[async_trait::async_trait]
333impl StreamOperator for ThrottleOperator {
334    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
335        let start = std::time::Instant::now();
336
337        self.stats.events_processed += 1;
338
339        let now = Utc::now();
340
341        let should_emit = match self.last_emit {
342            None => true,
343            Some(last) => now - last >= self.interval,
344        };
345
346        if should_emit {
347            self.last_emit = Some(now);
348            self.stats.events_emitted += 1;
349            self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
350            Ok(vec![event])
351        } else {
352            self.stats.events_filtered += 1;
353            self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
354            Ok(vec![])
355        }
356    }
357
358    fn stats(&self) -> OperatorStats {
359        self.stats.clone()
360    }
361
362    fn reset(&mut self) {
363        self.stats = OperatorStats::default();
364        self.last_emit = None;
365    }
366}
367
368/// Debounce operator - event coalescing
369pub struct DebounceOperator {
370    delay: ChronoDuration,
371    pending: Option<(StreamEvent, DateTime<Utc>)>,
372    stats: OperatorStats,
373}
374
375impl DebounceOperator {
376    pub fn new(delay: ChronoDuration) -> Self {
377        Self {
378            delay,
379            pending: None,
380            stats: OperatorStats::default(),
381        }
382    }
383
384    pub async fn flush(&mut self) -> Result<Vec<StreamEvent>> {
385        if let Some((event, _)) = self.pending.take() {
386            self.stats.events_emitted += 1;
387            Ok(vec![event])
388        } else {
389            Ok(vec![])
390        }
391    }
392}
393
394#[async_trait::async_trait]
395impl StreamOperator for DebounceOperator {
396    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
397        let start = std::time::Instant::now();
398
399        self.stats.events_processed += 1;
400
401        let now = Utc::now();
402
403        // Check if we should emit the pending event
404        let mut to_emit = vec![];
405        if let Some((pending_event, pending_time)) = &self.pending {
406            if now - *pending_time >= self.delay {
407                to_emit.push(pending_event.clone());
408                self.stats.events_emitted += 1;
409            }
410        }
411
412        // Update pending event
413        self.pending = Some((event, now));
414
415        self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
416
417        Ok(to_emit)
418    }
419
420    fn stats(&self) -> OperatorStats {
421        self.stats.clone()
422    }
423
424    fn reset(&mut self) {
425        self.stats = OperatorStats::default();
426        self.pending = None;
427    }
428}
429
430/// Reduce operator - stateful aggregation
431pub struct ReduceOperator<F, S>
432where
433    F: Fn(&mut S, StreamEvent) -> Result<()> + Send + Sync,
434    S: Clone + Send + Sync,
435{
436    reducer: Arc<F>,
437    state: S,
438    stats: OperatorStats,
439}
440
441impl<F, S> ReduceOperator<F, S>
442where
443    F: Fn(&mut S, StreamEvent) -> Result<()> + Send + Sync,
444    S: Clone + Send + Sync,
445{
446    pub fn new(initial_state: S, reducer: F) -> Self {
447        Self {
448            reducer: Arc::new(reducer),
449            state: initial_state,
450            stats: OperatorStats::default(),
451        }
452    }
453
454    pub fn get_state(&self) -> &S {
455        &self.state
456    }
457}
458
459#[async_trait::async_trait]
460impl<F, S> StreamOperator for ReduceOperator<F, S>
461where
462    F: Fn(&mut S, StreamEvent) -> Result<()> + Send + Sync,
463    S: Clone + Send + Sync,
464{
465    async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
466        let start = std::time::Instant::now();
467
468        self.stats.events_processed += 1;
469
470        match (self.reducer)(&mut self.state, event.clone()) {
471            Ok(_) => {
472                self.stats.events_emitted += 1;
473                self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
474                Ok(vec![event])
475            }
476            Err(e) => {
477                self.stats.errors += 1;
478                Err(e)
479            }
480        }
481    }
482
483    fn stats(&self) -> OperatorStats {
484        self.stats.clone()
485    }
486
487    fn reset(&mut self) {
488        self.stats = OperatorStats::default();
489    }
490}
491
492/// Stream operator pipeline for chaining operations
493pub struct OperatorPipeline {
494    operators: Vec<Box<dyn StreamOperator>>,
495    stats: PipelineStats,
496}
497
498#[derive(Debug, Clone, Default)]
499pub struct PipelineStats {
500    pub total_events_in: u64,
501    pub total_events_out: u64,
502    pub total_processing_time_ms: f64,
503    pub operator_stats: Vec<OperatorStats>,
504}
505
506impl OperatorPipeline {
507    pub fn new() -> Self {
508        Self {
509            operators: Vec::new(),
510            stats: PipelineStats::default(),
511        }
512    }
513
514    pub fn add_operator(&mut self, operator: Box<dyn StreamOperator>) {
515        self.operators.push(operator);
516    }
517
518    pub async fn process(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
519        let start = std::time::Instant::now();
520
521        self.stats.total_events_in += 1;
522
523        let mut current_events = vec![event];
524
525        for operator in &mut self.operators {
526            let mut next_events = Vec::new();
527            for evt in current_events {
528                match operator.apply(evt).await {
529                    Ok(mut events) => next_events.append(&mut events),
530                    Err(e) => return Err(e),
531                }
532            }
533            current_events = next_events;
534        }
535
536        self.stats.total_events_out += current_events.len() as u64;
537        self.stats.total_processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
538
539        Ok(current_events)
540    }
541
542    pub fn stats(&self) -> PipelineStats {
543        let mut stats = self.stats.clone();
544        stats.operator_stats = self.operators.iter().map(|op| op.stats()).collect();
545        stats
546    }
547
548    pub fn reset(&mut self) {
549        self.stats = PipelineStats::default();
550        for operator in &mut self.operators {
551            operator.reset();
552        }
553    }
554}
555
556impl Default for OperatorPipeline {
557    fn default() -> Self {
558        Self::new()
559    }
560}
561
562/// Builder for creating operator pipelines fluently
563pub struct PipelineBuilder {
564    pipeline: OperatorPipeline,
565}
566
567impl PipelineBuilder {
568    pub fn new() -> Self {
569        Self {
570            pipeline: OperatorPipeline::new(),
571        }
572    }
573
574    pub fn map<F>(mut self, transform: F) -> Self
575    where
576        F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync + 'static,
577    {
578        self.pipeline
579            .add_operator(Box::new(MapOperator::new(transform)));
580        self
581    }
582
583    pub fn filter<F>(mut self, predicate: F) -> Self
584    where
585        F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
586    {
587        self.pipeline
588            .add_operator(Box::new(FilterOperator::new(predicate)));
589        self
590    }
591
592    pub fn flat_map<F>(mut self, transform: F) -> Self
593    where
594        F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync + 'static,
595    {
596        self.pipeline
597            .add_operator(Box::new(FlatMapOperator::new(transform)));
598        self
599    }
600
601    pub fn distinct<F>(mut self, key_extractor: F) -> Self
602    where
603        F: Fn(&StreamEvent) -> String + Send + Sync + 'static,
604    {
605        self.pipeline
606            .add_operator(Box::new(DistinctOperator::new(key_extractor)));
607        self
608    }
609
610    pub fn throttle(mut self, interval: ChronoDuration) -> Self {
611        self.pipeline
612            .add_operator(Box::new(ThrottleOperator::new(interval)));
613        self
614    }
615
616    pub fn debounce(mut self, delay: ChronoDuration) -> Self {
617        self.pipeline
618            .add_operator(Box::new(DebounceOperator::new(delay)));
619        self
620    }
621
622    pub fn build(self) -> OperatorPipeline {
623        self.pipeline
624    }
625}
626
627impl Default for PipelineBuilder {
628    fn default() -> Self {
629        Self::new()
630    }
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use crate::event::EventMetadata;
637
638    fn create_test_event(subject: &str) -> StreamEvent {
639        StreamEvent::TripleAdded {
640            subject: subject.to_string(),
641            predicate: "test".to_string(),
642            object: "value".to_string(),
643            graph: None,
644            metadata: EventMetadata::default(),
645        }
646    }
647
648    #[tokio::test]
649    async fn test_map_operator() {
650        let mut operator = MapOperator::new(|mut event| {
651            if let StreamEvent::TripleAdded { ref mut object, .. } = event {
652                *object = "transformed".to_string();
653            }
654            Ok(event)
655        });
656
657        let event = create_test_event("test");
658        let results = operator.apply(event).await.unwrap();
659
660        assert_eq!(results.len(), 1);
661        if let StreamEvent::TripleAdded { object, .. } = &results[0] {
662            assert_eq!(object, "transformed");
663        }
664    }
665
666    #[tokio::test]
667    async fn test_filter_operator() {
668        let mut operator = FilterOperator::new(|event| {
669            if let StreamEvent::TripleAdded { subject, .. } = event {
670                subject == "keep"
671            } else {
672                false
673            }
674        });
675
676        let event1 = create_test_event("keep");
677        let event2 = create_test_event("drop");
678
679        assert_eq!(operator.apply(event1).await.unwrap().len(), 1);
680        assert_eq!(operator.apply(event2).await.unwrap().len(), 0);
681    }
682
683    #[tokio::test]
684    async fn test_pipeline() {
685        let mut pipeline = PipelineBuilder::new()
686            .filter(|event| {
687                if let StreamEvent::TripleAdded { subject, .. } = event {
688                    subject.starts_with("test")
689                } else {
690                    false
691                }
692            })
693            .map(|mut event| {
694                if let StreamEvent::TripleAdded { ref mut object, .. } = event {
695                    *object = format!("{}_transformed", object);
696                }
697                Ok(event)
698            })
699            .build();
700
701        let event = create_test_event("test_subject");
702        let results = pipeline.process(event).await.unwrap();
703
704        assert_eq!(results.len(), 1);
705        if let StreamEvent::TripleAdded { object, .. } = &results[0] {
706            assert_eq!(object, "value_transformed");
707        }
708    }
709}