1use crate::StreamEvent;
15use anyhow::Result;
16use chrono::{DateTime, Duration as ChronoDuration, Utc};
17use std::collections::{HashSet, VecDeque};
18use std::sync::Arc;
19
20#[async_trait::async_trait]
22pub trait StreamOperator: Send + Sync {
23 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>>;
25
26 fn stats(&self) -> OperatorStats;
28
29 fn reset(&mut self);
31}
32
33#[derive(Debug, Clone, Default)]
35pub struct OperatorStats {
36 pub events_processed: u64,
37 pub events_emitted: u64,
38 pub events_filtered: u64,
39 pub processing_time_ms: f64,
40 pub errors: u64,
41}
42
43pub struct MapOperator<F>
45where
46 F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync,
47{
48 transform: Arc<F>,
49 stats: OperatorStats,
50}
51
52impl<F> MapOperator<F>
53where
54 F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync,
55{
56 pub fn new(transform: F) -> Self {
57 Self {
58 transform: Arc::new(transform),
59 stats: OperatorStats::default(),
60 }
61 }
62}
63
64#[async_trait::async_trait]
65impl<F> StreamOperator for MapOperator<F>
66where
67 F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync,
68{
69 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
70 let start = std::time::Instant::now();
71
72 self.stats.events_processed += 1;
73
74 match (self.transform)(event) {
75 Ok(transformed) => {
76 self.stats.events_emitted += 1;
77 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
78 Ok(vec![transformed])
79 }
80 Err(e) => {
81 self.stats.errors += 1;
82 Err(e)
83 }
84 }
85 }
86
87 fn stats(&self) -> OperatorStats {
88 self.stats.clone()
89 }
90
91 fn reset(&mut self) {
92 self.stats = OperatorStats::default();
93 }
94}
95
96pub struct FilterOperator<F>
98where
99 F: Fn(&StreamEvent) -> bool + Send + Sync,
100{
101 predicate: Arc<F>,
102 stats: OperatorStats,
103}
104
105impl<F> FilterOperator<F>
106where
107 F: Fn(&StreamEvent) -> bool + Send + Sync,
108{
109 pub fn new(predicate: F) -> Self {
110 Self {
111 predicate: Arc::new(predicate),
112 stats: OperatorStats::default(),
113 }
114 }
115}
116
117#[async_trait::async_trait]
118impl<F> StreamOperator for FilterOperator<F>
119where
120 F: Fn(&StreamEvent) -> bool + Send + Sync,
121{
122 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
123 let start = std::time::Instant::now();
124
125 self.stats.events_processed += 1;
126
127 if (self.predicate)(&event) {
128 self.stats.events_emitted += 1;
129 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
130 Ok(vec![event])
131 } else {
132 self.stats.events_filtered += 1;
133 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
134 Ok(vec![])
135 }
136 }
137
138 fn stats(&self) -> OperatorStats {
139 self.stats.clone()
140 }
141
142 fn reset(&mut self) {
143 self.stats = OperatorStats::default();
144 }
145}
146
147pub struct FlatMapOperator<F>
149where
150 F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync,
151{
152 transform: Arc<F>,
153 stats: OperatorStats,
154}
155
156impl<F> FlatMapOperator<F>
157where
158 F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync,
159{
160 pub fn new(transform: F) -> Self {
161 Self {
162 transform: Arc::new(transform),
163 stats: OperatorStats::default(),
164 }
165 }
166}
167
168#[async_trait::async_trait]
169impl<F> StreamOperator for FlatMapOperator<F>
170where
171 F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync,
172{
173 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
174 let start = std::time::Instant::now();
175
176 self.stats.events_processed += 1;
177
178 match (self.transform)(event) {
179 Ok(events) => {
180 self.stats.events_emitted += events.len() as u64;
181 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
182 Ok(events)
183 }
184 Err(e) => {
185 self.stats.errors += 1;
186 Err(e)
187 }
188 }
189 }
190
191 fn stats(&self) -> OperatorStats {
192 self.stats.clone()
193 }
194
195 fn reset(&mut self) {
196 self.stats = OperatorStats::default();
197 }
198}
199
200pub struct PartitionOperator<F>
202where
203 F: Fn(&StreamEvent) -> usize + Send + Sync,
204{
205 partition_fn: Arc<F>,
206 num_partitions: usize,
207 partition_buffers: Vec<VecDeque<StreamEvent>>,
208 stats: OperatorStats,
209}
210
211impl<F> PartitionOperator<F>
212where
213 F: Fn(&StreamEvent) -> usize + Send + Sync,
214{
215 pub fn new(partition_fn: F, num_partitions: usize) -> Self {
216 Self {
217 partition_fn: Arc::new(partition_fn),
218 num_partitions,
219 partition_buffers: vec![VecDeque::new(); num_partitions],
220 stats: OperatorStats::default(),
221 }
222 }
223
224 pub fn get_partition(&mut self, partition_id: usize) -> Option<Vec<StreamEvent>> {
225 if partition_id >= self.num_partitions {
226 return None;
227 }
228
229 let events: Vec<_> = self.partition_buffers[partition_id].drain(..).collect();
230 Some(events)
231 }
232}
233
234#[async_trait::async_trait]
235impl<F> StreamOperator for PartitionOperator<F>
236where
237 F: Fn(&StreamEvent) -> usize + Send + Sync,
238{
239 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
240 let start = std::time::Instant::now();
241
242 self.stats.events_processed += 1;
243
244 let partition_id = (self.partition_fn)(&event) % self.num_partitions;
245 self.partition_buffers[partition_id].push_back(event.clone());
246
247 self.stats.events_emitted += 1;
248 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
249
250 Ok(vec![event])
251 }
252
253 fn stats(&self) -> OperatorStats {
254 self.stats.clone()
255 }
256
257 fn reset(&mut self) {
258 self.stats = OperatorStats::default();
259 for buffer in &mut self.partition_buffers {
260 buffer.clear();
261 }
262 }
263}
264
265pub struct DistinctOperator {
267 seen: HashSet<String>,
268 key_extractor: Arc<dyn Fn(&StreamEvent) -> String + Send + Sync>,
269 stats: OperatorStats,
270}
271
272impl DistinctOperator {
273 pub fn new<F>(key_extractor: F) -> Self
274 where
275 F: Fn(&StreamEvent) -> String + Send + Sync + 'static,
276 {
277 Self {
278 seen: HashSet::new(),
279 key_extractor: Arc::new(key_extractor),
280 stats: OperatorStats::default(),
281 }
282 }
283}
284
285#[async_trait::async_trait]
286impl StreamOperator for DistinctOperator {
287 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
288 let start = std::time::Instant::now();
289
290 self.stats.events_processed += 1;
291
292 let key = (self.key_extractor)(&event);
293
294 if self.seen.insert(key) {
295 self.stats.events_emitted += 1;
296 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
297 Ok(vec![event])
298 } else {
299 self.stats.events_filtered += 1;
300 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
301 Ok(vec![])
302 }
303 }
304
305 fn stats(&self) -> OperatorStats {
306 self.stats.clone()
307 }
308
309 fn reset(&mut self) {
310 self.stats = OperatorStats::default();
311 self.seen.clear();
312 }
313}
314
315pub struct ThrottleOperator {
317 interval: ChronoDuration,
318 last_emit: Option<DateTime<Utc>>,
319 stats: OperatorStats,
320}
321
322impl ThrottleOperator {
323 pub fn new(interval: ChronoDuration) -> Self {
324 Self {
325 interval,
326 last_emit: None,
327 stats: OperatorStats::default(),
328 }
329 }
330}
331
332#[async_trait::async_trait]
333impl StreamOperator for ThrottleOperator {
334 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
335 let start = std::time::Instant::now();
336
337 self.stats.events_processed += 1;
338
339 let now = Utc::now();
340
341 let should_emit = match self.last_emit {
342 None => true,
343 Some(last) => now - last >= self.interval,
344 };
345
346 if should_emit {
347 self.last_emit = Some(now);
348 self.stats.events_emitted += 1;
349 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
350 Ok(vec![event])
351 } else {
352 self.stats.events_filtered += 1;
353 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
354 Ok(vec![])
355 }
356 }
357
358 fn stats(&self) -> OperatorStats {
359 self.stats.clone()
360 }
361
362 fn reset(&mut self) {
363 self.stats = OperatorStats::default();
364 self.last_emit = None;
365 }
366}
367
368pub struct DebounceOperator {
370 delay: ChronoDuration,
371 pending: Option<(StreamEvent, DateTime<Utc>)>,
372 stats: OperatorStats,
373}
374
375impl DebounceOperator {
376 pub fn new(delay: ChronoDuration) -> Self {
377 Self {
378 delay,
379 pending: None,
380 stats: OperatorStats::default(),
381 }
382 }
383
384 pub async fn flush(&mut self) -> Result<Vec<StreamEvent>> {
385 if let Some((event, _)) = self.pending.take() {
386 self.stats.events_emitted += 1;
387 Ok(vec![event])
388 } else {
389 Ok(vec![])
390 }
391 }
392}
393
394#[async_trait::async_trait]
395impl StreamOperator for DebounceOperator {
396 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
397 let start = std::time::Instant::now();
398
399 self.stats.events_processed += 1;
400
401 let now = Utc::now();
402
403 let mut to_emit = vec![];
405 if let Some((pending_event, pending_time)) = &self.pending {
406 if now - *pending_time >= self.delay {
407 to_emit.push(pending_event.clone());
408 self.stats.events_emitted += 1;
409 }
410 }
411
412 self.pending = Some((event, now));
414
415 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
416
417 Ok(to_emit)
418 }
419
420 fn stats(&self) -> OperatorStats {
421 self.stats.clone()
422 }
423
424 fn reset(&mut self) {
425 self.stats = OperatorStats::default();
426 self.pending = None;
427 }
428}
429
430pub struct ReduceOperator<F, S>
432where
433 F: Fn(&mut S, StreamEvent) -> Result<()> + Send + Sync,
434 S: Clone + Send + Sync,
435{
436 reducer: Arc<F>,
437 state: S,
438 stats: OperatorStats,
439}
440
441impl<F, S> ReduceOperator<F, S>
442where
443 F: Fn(&mut S, StreamEvent) -> Result<()> + Send + Sync,
444 S: Clone + Send + Sync,
445{
446 pub fn new(initial_state: S, reducer: F) -> Self {
447 Self {
448 reducer: Arc::new(reducer),
449 state: initial_state,
450 stats: OperatorStats::default(),
451 }
452 }
453
454 pub fn get_state(&self) -> &S {
455 &self.state
456 }
457}
458
459#[async_trait::async_trait]
460impl<F, S> StreamOperator for ReduceOperator<F, S>
461where
462 F: Fn(&mut S, StreamEvent) -> Result<()> + Send + Sync,
463 S: Clone + Send + Sync,
464{
465 async fn apply(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
466 let start = std::time::Instant::now();
467
468 self.stats.events_processed += 1;
469
470 match (self.reducer)(&mut self.state, event.clone()) {
471 Ok(_) => {
472 self.stats.events_emitted += 1;
473 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
474 Ok(vec![event])
475 }
476 Err(e) => {
477 self.stats.errors += 1;
478 Err(e)
479 }
480 }
481 }
482
483 fn stats(&self) -> OperatorStats {
484 self.stats.clone()
485 }
486
487 fn reset(&mut self) {
488 self.stats = OperatorStats::default();
489 }
490}
491
492pub struct OperatorPipeline {
494 operators: Vec<Box<dyn StreamOperator>>,
495 stats: PipelineStats,
496}
497
498#[derive(Debug, Clone, Default)]
499pub struct PipelineStats {
500 pub total_events_in: u64,
501 pub total_events_out: u64,
502 pub total_processing_time_ms: f64,
503 pub operator_stats: Vec<OperatorStats>,
504}
505
506impl OperatorPipeline {
507 pub fn new() -> Self {
508 Self {
509 operators: Vec::new(),
510 stats: PipelineStats::default(),
511 }
512 }
513
514 pub fn add_operator(&mut self, operator: Box<dyn StreamOperator>) {
515 self.operators.push(operator);
516 }
517
518 pub async fn process(&mut self, event: StreamEvent) -> Result<Vec<StreamEvent>> {
519 let start = std::time::Instant::now();
520
521 self.stats.total_events_in += 1;
522
523 let mut current_events = vec![event];
524
525 for operator in &mut self.operators {
526 let mut next_events = Vec::new();
527 for evt in current_events {
528 match operator.apply(evt).await {
529 Ok(mut events) => next_events.append(&mut events),
530 Err(e) => return Err(e),
531 }
532 }
533 current_events = next_events;
534 }
535
536 self.stats.total_events_out += current_events.len() as u64;
537 self.stats.total_processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
538
539 Ok(current_events)
540 }
541
542 pub fn stats(&self) -> PipelineStats {
543 let mut stats = self.stats.clone();
544 stats.operator_stats = self.operators.iter().map(|op| op.stats()).collect();
545 stats
546 }
547
548 pub fn reset(&mut self) {
549 self.stats = PipelineStats::default();
550 for operator in &mut self.operators {
551 operator.reset();
552 }
553 }
554}
555
556impl Default for OperatorPipeline {
557 fn default() -> Self {
558 Self::new()
559 }
560}
561
562pub struct PipelineBuilder {
564 pipeline: OperatorPipeline,
565}
566
567impl PipelineBuilder {
568 pub fn new() -> Self {
569 Self {
570 pipeline: OperatorPipeline::new(),
571 }
572 }
573
574 pub fn map<F>(mut self, transform: F) -> Self
575 where
576 F: Fn(StreamEvent) -> Result<StreamEvent> + Send + Sync + 'static,
577 {
578 self.pipeline
579 .add_operator(Box::new(MapOperator::new(transform)));
580 self
581 }
582
583 pub fn filter<F>(mut self, predicate: F) -> Self
584 where
585 F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
586 {
587 self.pipeline
588 .add_operator(Box::new(FilterOperator::new(predicate)));
589 self
590 }
591
592 pub fn flat_map<F>(mut self, transform: F) -> Self
593 where
594 F: Fn(StreamEvent) -> Result<Vec<StreamEvent>> + Send + Sync + 'static,
595 {
596 self.pipeline
597 .add_operator(Box::new(FlatMapOperator::new(transform)));
598 self
599 }
600
601 pub fn distinct<F>(mut self, key_extractor: F) -> Self
602 where
603 F: Fn(&StreamEvent) -> String + Send + Sync + 'static,
604 {
605 self.pipeline
606 .add_operator(Box::new(DistinctOperator::new(key_extractor)));
607 self
608 }
609
610 pub fn throttle(mut self, interval: ChronoDuration) -> Self {
611 self.pipeline
612 .add_operator(Box::new(ThrottleOperator::new(interval)));
613 self
614 }
615
616 pub fn debounce(mut self, delay: ChronoDuration) -> Self {
617 self.pipeline
618 .add_operator(Box::new(DebounceOperator::new(delay)));
619 self
620 }
621
622 pub fn build(self) -> OperatorPipeline {
623 self.pipeline
624 }
625}
626
627impl Default for PipelineBuilder {
628 fn default() -> Self {
629 Self::new()
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use crate::event::EventMetadata;
637
638 fn create_test_event(subject: &str) -> StreamEvent {
639 StreamEvent::TripleAdded {
640 subject: subject.to_string(),
641 predicate: "test".to_string(),
642 object: "value".to_string(),
643 graph: None,
644 metadata: EventMetadata::default(),
645 }
646 }
647
648 #[tokio::test]
649 async fn test_map_operator() {
650 let mut operator = MapOperator::new(|mut event| {
651 if let StreamEvent::TripleAdded { ref mut object, .. } = event {
652 *object = "transformed".to_string();
653 }
654 Ok(event)
655 });
656
657 let event = create_test_event("test");
658 let results = operator.apply(event).await.unwrap();
659
660 assert_eq!(results.len(), 1);
661 if let StreamEvent::TripleAdded { object, .. } = &results[0] {
662 assert_eq!(object, "transformed");
663 }
664 }
665
666 #[tokio::test]
667 async fn test_filter_operator() {
668 let mut operator = FilterOperator::new(|event| {
669 if let StreamEvent::TripleAdded { subject, .. } = event {
670 subject == "keep"
671 } else {
672 false
673 }
674 });
675
676 let event1 = create_test_event("keep");
677 let event2 = create_test_event("drop");
678
679 assert_eq!(operator.apply(event1).await.unwrap().len(), 1);
680 assert_eq!(operator.apply(event2).await.unwrap().len(), 0);
681 }
682
683 #[tokio::test]
684 async fn test_pipeline() {
685 let mut pipeline = PipelineBuilder::new()
686 .filter(|event| {
687 if let StreamEvent::TripleAdded { subject, .. } = event {
688 subject.starts_with("test")
689 } else {
690 false
691 }
692 })
693 .map(|mut event| {
694 if let StreamEvent::TripleAdded { ref mut object, .. } = event {
695 *object = format!("{}_transformed", object);
696 }
697 Ok(event)
698 })
699 .build();
700
701 let event = create_test_event("test_subject");
702 let results = pipeline.process(event).await.unwrap();
703
704 assert_eq!(results.len(), 1);
705 if let StreamEvent::TripleAdded { object, .. } = &results[0] {
706 assert_eq!(object, "value_transformed");
707 }
708 }
709}