1use crate::streaming::event::StreamEvent;
26use crate::streaming::window::{TimeWindow, WindowType};
27use crate::types::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31
32#[derive(Clone)]
34pub struct DataStream {
35 events: Vec<StreamEvent>,
36}
37
38impl DataStream {
39 pub fn from_events(events: Vec<StreamEvent>) -> Self {
41 Self { events }
42 }
43
44 pub fn new() -> Self {
46 Self { events: Vec::new() }
47 }
48
49 pub fn push(&mut self, event: StreamEvent) {
51 self.events.push(event);
52 }
53
54 pub fn len(&self) -> usize {
56 self.events.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
61 self.events.is_empty()
62 }
63
64 pub fn filter<F>(self, predicate: F) -> Self
71 where
72 F: Fn(&StreamEvent) -> bool,
73 {
74 let filtered_events = self.events.into_iter().filter(predicate).collect();
75 Self {
76 events: filtered_events,
77 }
78 }
79
80 pub fn map<F>(self, mapper: F) -> Self
90 where
91 F: Fn(StreamEvent) -> StreamEvent,
92 {
93 let mapped_events = self.events.into_iter().map(mapper).collect();
94 Self {
95 events: mapped_events,
96 }
97 }
98
99 pub fn flat_map<F>(self, mapper: F) -> Self
109 where
110 F: Fn(StreamEvent) -> Vec<StreamEvent>,
111 {
112 let flat_mapped_events = self
113 .events
114 .into_iter()
115 .flat_map(mapper)
116 .collect();
117 Self {
118 events: flat_mapped_events,
119 }
120 }
121
122 pub fn key_by<F, K>(self, key_selector: F) -> KeyedStream<K>
129 where
130 F: Fn(&StreamEvent) -> K,
131 K: std::hash::Hash + Eq + Clone,
132 {
133 let mut keyed_events: HashMap<K, Vec<StreamEvent>> = HashMap::new();
134
135 for event in self.events {
136 let key = key_selector(&event);
137 keyed_events.entry(key).or_insert_with(Vec::new).push(event);
138 }
139
140 KeyedStream { keyed_events }
141 }
142
143 pub fn window(self, config: WindowConfig) -> WindowedStream {
150 WindowedStream::new(self.events, config)
151 }
152
153 pub fn reduce<F>(self, reducer: F) -> Option<StreamEvent>
163 where
164 F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
165 {
166 self.events.into_iter().reduce(reducer)
167 }
168
169 pub fn count(self) -> usize {
171 self.events.len()
172 }
173
174 pub fn collect(self) -> Vec<StreamEvent> {
176 self.events
177 }
178
179 pub fn take(self, n: usize) -> Self {
181 Self {
182 events: self.events.into_iter().take(n).collect(),
183 }
184 }
185
186 pub fn skip(self, n: usize) -> Self {
188 Self {
189 events: self.events.into_iter().skip(n).collect(),
190 }
191 }
192
193 pub fn for_each<F>(self, action: F) -> Self
202 where
203 F: Fn(&StreamEvent),
204 {
205 for event in &self.events {
206 action(event);
207 }
208 self
209 }
210
211 pub fn union(mut self, other: DataStream) -> Self {
213 self.events.extend(other.events);
214 Self {
215 events: self.events,
216 }
217 }
218
219 pub fn find<F>(self, predicate: F) -> Option<StreamEvent>
221 where
222 F: Fn(&StreamEvent) -> bool,
223 {
224 self.events.into_iter().find(predicate)
225 }
226
227 pub fn any<F>(&self, predicate: F) -> bool
229 where
230 F: Fn(&StreamEvent) -> bool,
231 {
232 self.events.iter().any(predicate)
233 }
234
235 pub fn all<F>(&self, predicate: F) -> bool
237 where
238 F: Fn(&StreamEvent) -> bool,
239 {
240 self.events.iter().all(predicate)
241 }
242
243 pub fn sort_by<F, K>(mut self, key_fn: F) -> Self
245 where
246 F: Fn(&StreamEvent) -> K,
247 K: Ord,
248 {
249 self.events.sort_by_key(key_fn);
250 Self {
251 events: self.events,
252 }
253 }
254
255 pub fn group_by<F, K>(self, key_selector: F) -> GroupedStream<K>
257 where
258 F: Fn(&StreamEvent) -> K,
259 K: std::hash::Hash + Eq + Clone,
260 {
261 let mut grouped: HashMap<K, Vec<StreamEvent>> = HashMap::new();
262
263 for event in self.events {
264 let key = key_selector(&event);
265 grouped.entry(key).or_insert_with(Vec::new).push(event);
266 }
267
268 GroupedStream { groups: grouped }
269 }
270
271 pub fn aggregate<A>(self, aggregator: A) -> AggregateResult
273 where
274 A: Aggregation,
275 {
276 aggregator.aggregate(&self.events)
277 }
278}
279
280impl Default for DataStream {
281 fn default() -> Self {
282 Self::new()
283 }
284}
285
286pub struct KeyedStream<K>
288where
289 K: std::hash::Hash + Eq,
290{
291 keyed_events: HashMap<K, Vec<StreamEvent>>,
292}
293
294impl<K> KeyedStream<K>
295where
296 K: std::hash::Hash + Eq + Clone,
297{
298 pub fn reduce<F>(self, reducer: F) -> HashMap<K, StreamEvent>
300 where
301 F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
302 {
303 self.keyed_events
304 .into_iter()
305 .filter_map(|(key, events)| {
306 events
307 .into_iter()
308 .reduce(|acc, e| reducer(acc, e))
309 .map(|result| (key, result))
310 })
311 .collect()
312 }
313
314 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
316 where
317 A: Aggregation + Clone,
318 {
319 self.keyed_events
320 .into_iter()
321 .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
322 .collect()
323 }
324
325 pub fn window(self, config: WindowConfig) -> KeyedWindowedStream<K> {
327 KeyedWindowedStream {
328 keyed_events: self.keyed_events,
329 config,
330 }
331 }
332
333 pub fn count(self) -> HashMap<K, usize> {
335 self.keyed_events
336 .into_iter()
337 .map(|(key, events)| (key, events.len()))
338 .collect()
339 }
340
341 pub fn keys(&self) -> Vec<K> {
343 self.keyed_events.keys().cloned().collect()
344 }
345
346 pub fn flatten(self) -> DataStream {
348 let events: Vec<StreamEvent> = self
349 .keyed_events
350 .into_iter()
351 .flat_map(|(_, events)| events)
352 .collect();
353
354 DataStream { events }
355 }
356}
357
358#[derive(Debug, Clone)]
360pub struct WindowConfig {
361 pub window_type: WindowType,
362 pub duration: Duration,
363 pub max_events: usize,
364}
365
366impl WindowConfig {
367 pub fn sliding(duration: Duration) -> Self {
369 Self {
370 window_type: WindowType::Sliding,
371 duration,
372 max_events: 10000,
373 }
374 }
375
376 pub fn tumbling(duration: Duration) -> Self {
378 Self {
379 window_type: WindowType::Tumbling,
380 duration,
381 max_events: 10000,
382 }
383 }
384
385 pub fn session(timeout: Duration) -> Self {
387 Self {
388 window_type: WindowType::Session { timeout },
389 duration: timeout,
390 max_events: 10000,
391 }
392 }
393
394 pub fn with_max_events(mut self, max_events: usize) -> Self {
396 self.max_events = max_events;
397 self
398 }
399}
400
401pub struct WindowedStream {
403 windows: Vec<TimeWindow>,
404}
405
406impl WindowedStream {
407 pub fn new(events: Vec<StreamEvent>, config: WindowConfig) -> Self {
409 let mut windows = Vec::new();
410
411 if events.is_empty() {
412 return Self { windows };
413 }
414
415 match config.window_type {
417 WindowType::Tumbling => {
418 let window_ms = config.duration.as_millis() as u64;
420 let mut window_map: HashMap<u64, Vec<StreamEvent>> = HashMap::new();
421
422 for event in events {
423 let window_start = (event.metadata.timestamp / window_ms) * window_ms;
424 window_map
425 .entry(window_start)
426 .or_insert_with(Vec::new)
427 .push(event);
428 }
429
430 for (start_time, mut window_events) in window_map {
432 let mut window = TimeWindow::new(
433 config.window_type.clone(),
434 config.duration,
435 start_time,
436 config.max_events,
437 );
438
439 for event in window_events.drain(..) {
440 window.add_event(event);
441 }
442
443 windows.push(window);
444 }
445 }
446 WindowType::Sliding | WindowType::Session { .. } => {
447 let window_ms = config.duration.as_millis() as u64;
450
451 if !events.is_empty() {
452 let min_time = events.iter().map(|e| e.metadata.timestamp).min().unwrap();
453 let max_time = events.iter().map(|e| e.metadata.timestamp).max().unwrap();
454
455 let mut current_start = min_time;
456
457 while current_start <= max_time {
458 let mut window = TimeWindow::new(
459 config.window_type.clone(),
460 config.duration,
461 current_start,
462 config.max_events,
463 );
464
465 for event in &events {
466 if event.metadata.timestamp >= current_start
467 && event.metadata.timestamp < current_start + window_ms
468 {
469 window.add_event(event.clone());
470 }
471 }
472
473 if window.count() > 0 {
474 windows.push(window);
475 }
476
477 current_start += window_ms / 2;
479 }
480 }
481 }
482 }
483
484 Self { windows }
485 }
486
487 pub fn aggregate<A>(self, aggregator: A) -> Vec<AggregateResult>
489 where
490 A: Aggregation,
491 {
492 self.windows
493 .iter()
494 .map(|window| {
495 let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
496 aggregator.aggregate(&events)
497 })
498 .collect()
499 }
500
501 pub fn reduce<F>(self, reducer: F) -> Vec<StreamEvent>
503 where
504 F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
505 {
506 self.windows
507 .into_iter()
508 .filter_map(|window| {
509 let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
510 events.into_iter().reduce(|acc, e| reducer(acc, e))
511 })
512 .collect()
513 }
514
515 pub fn windows(&self) -> &[TimeWindow] {
517 &self.windows
518 }
519
520 pub fn counts(self) -> Vec<usize> {
522 self.windows.iter().map(|w| w.count()).collect()
523 }
524
525 pub fn flatten(self) -> DataStream {
527 let events: Vec<StreamEvent> = self
528 .windows
529 .into_iter()
530 .flat_map(|w| w.events().iter().cloned().collect::<Vec<_>>())
531 .collect();
532
533 DataStream { events }
534 }
535}
536
537pub struct KeyedWindowedStream<K>
539where
540 K: std::hash::Hash + Eq,
541{
542 keyed_events: HashMap<K, Vec<StreamEvent>>,
543 config: WindowConfig,
544}
545
546impl<K> KeyedWindowedStream<K>
547where
548 K: std::hash::Hash + Eq + Clone,
549{
550 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, Vec<AggregateResult>>
552 where
553 A: Aggregation + Clone,
554 {
555 self.keyed_events
556 .into_iter()
557 .map(|(key, events)| {
558 let windowed = WindowedStream::new(events, self.config.clone());
559 let results = windowed.aggregate(aggregator.clone());
560 (key, results)
561 })
562 .collect()
563 }
564
565 pub fn reduce<F>(self, reducer: F) -> HashMap<K, Vec<StreamEvent>>
567 where
568 F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
569 {
570 self.keyed_events
571 .into_iter()
572 .map(|(key, events)| {
573 let windowed = WindowedStream::new(events, self.config.clone());
574 let results = windowed.reduce(reducer.clone());
575 (key, results)
576 })
577 .collect()
578 }
579}
580
581pub struct GroupedStream<K>
583where
584 K: std::hash::Hash + Eq,
585{
586 groups: HashMap<K, Vec<StreamEvent>>,
587}
588
589impl<K> GroupedStream<K>
590where
591 K: std::hash::Hash + Eq + Clone,
592{
593 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
595 where
596 A: Aggregation + Clone,
597 {
598 self.groups
599 .into_iter()
600 .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
601 .collect()
602 }
603
604 pub fn count(self) -> HashMap<K, usize> {
606 self.groups
607 .into_iter()
608 .map(|(key, events)| (key, events.len()))
609 .collect()
610 }
611
612 pub fn first(self) -> HashMap<K, StreamEvent> {
614 self.groups
615 .into_iter()
616 .filter_map(|(key, mut events)| {
617 if !events.is_empty() {
618 Some((key, events.remove(0)))
619 } else {
620 None
621 }
622 })
623 .collect()
624 }
625
626 pub fn last(self) -> HashMap<K, StreamEvent> {
628 self.groups
629 .into_iter()
630 .filter_map(|(key, mut events)| events.pop().map(|e| (key, e)))
631 .collect()
632 }
633}
634
635pub trait Aggregation: Send + Sync {
637 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult;
638}
639
640#[derive(Debug, Clone)]
642pub enum AggregateResult {
643 Number(f64),
644 String(String),
645 Map(HashMap<String, Value>),
646 List(Vec<Value>),
647 None,
648}
649
650impl AggregateResult {
651 pub fn as_number(&self) -> Option<f64> {
652 match self {
653 AggregateResult::Number(n) => Some(*n),
654 _ => None,
655 }
656 }
657
658 pub fn as_string(&self) -> Option<&str> {
659 match self {
660 AggregateResult::String(s) => Some(s),
661 _ => None,
662 }
663 }
664
665 pub fn as_map(&self) -> Option<&HashMap<String, Value>> {
666 match self {
667 AggregateResult::Map(m) => Some(m),
668 _ => None,
669 }
670 }
671}
672
673#[derive(Clone)]
677pub struct Count;
678
679impl Aggregation for Count {
680 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
681 AggregateResult::Number(events.len() as f64)
682 }
683}
684
685#[derive(Clone)]
687pub struct Sum {
688 pub field: String,
689}
690
691impl Sum {
692 pub fn new(field: impl Into<String>) -> Self {
693 Self {
694 field: field.into(),
695 }
696 }
697}
698
699impl Aggregation for Sum {
700 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
701 let sum: f64 = events
702 .iter()
703 .filter_map(|e| e.get_numeric(&self.field))
704 .sum();
705 AggregateResult::Number(sum)
706 }
707}
708
709#[derive(Clone)]
711pub struct Average {
712 pub field: String,
713}
714
715impl Average {
716 pub fn new(field: impl Into<String>) -> Self {
717 Self {
718 field: field.into(),
719 }
720 }
721}
722
723impl Aggregation for Average {
724 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
725 let values: Vec<f64> = events
726 .iter()
727 .filter_map(|e| e.get_numeric(&self.field))
728 .collect();
729
730 if values.is_empty() {
731 AggregateResult::None
732 } else {
733 let avg = values.iter().sum::<f64>() / values.len() as f64;
734 AggregateResult::Number(avg)
735 }
736 }
737}
738
739#[derive(Clone)]
741pub struct Min {
742 pub field: String,
743}
744
745impl Min {
746 pub fn new(field: impl Into<String>) -> Self {
747 Self {
748 field: field.into(),
749 }
750 }
751}
752
753impl Aggregation for Min {
754 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
755 events
756 .iter()
757 .filter_map(|e| e.get_numeric(&self.field))
758 .min_by(|a, b| a.partial_cmp(b).unwrap())
759 .map(AggregateResult::Number)
760 .unwrap_or(AggregateResult::None)
761 }
762}
763
764#[derive(Clone)]
766pub struct Max {
767 pub field: String,
768}
769
770impl Max {
771 pub fn new(field: impl Into<String>) -> Self {
772 Self {
773 field: field.into(),
774 }
775 }
776}
777
778impl Aggregation for Max {
779 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
780 events
781 .iter()
782 .filter_map(|e| e.get_numeric(&self.field))
783 .max_by(|a, b| a.partial_cmp(b).unwrap())
784 .map(AggregateResult::Number)
785 .unwrap_or(AggregateResult::None)
786 }
787}
788
789pub struct CustomAggregator<F>
791where
792 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
793{
794 func: Arc<F>,
795}
796
797impl<F> CustomAggregator<F>
798where
799 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
800{
801 pub fn new(func: F) -> Self {
802 Self {
803 func: Arc::new(func),
804 }
805 }
806}
807
808impl<F> Clone for CustomAggregator<F>
809where
810 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
811{
812 fn clone(&self) -> Self {
813 Self {
814 func: Arc::clone(&self.func),
815 }
816 }
817}
818
819impl<F> Aggregation for CustomAggregator<F>
820where
821 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
822{
823 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
824 (self.func)(events)
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831 use crate::types::Value;
832 use std::collections::HashMap;
833
834 fn create_test_events(count: usize) -> Vec<StreamEvent> {
835 (0..count)
836 .map(|i| {
837 let mut data = HashMap::new();
838 data.insert("value".to_string(), Value::Number(i as f64));
839 data.insert("user_id".to_string(), Value::String(format!("user_{}", i % 3)));
840 StreamEvent::new("TestEvent", data, "test")
841 })
842 .collect()
843 }
844
845 #[test]
846 fn test_filter_operator() {
847 let events = create_test_events(10);
848 let stream = DataStream::from_events(events);
849
850 let filtered = stream.filter(|e| e.get_numeric("value").unwrap_or(0.0) > 5.0);
851
852 assert_eq!(filtered.len(), 4); }
854
855 #[test]
856 fn test_map_operator() {
857 let events = create_test_events(5);
858 let stream = DataStream::from_events(events);
859
860 let mapped = stream.map(|mut e| {
861 if let Some(value) = e.get_numeric("value") {
862 e.data.insert("doubled".to_string(), Value::Number(value * 2.0));
863 }
864 e
865 });
866
867 let collected = mapped.collect();
868 assert_eq!(collected[0].get_numeric("doubled"), Some(0.0));
869 assert_eq!(collected[1].get_numeric("doubled"), Some(2.0));
870 }
871
872 #[test]
873 fn test_key_by_operator() {
874 let events = create_test_events(9);
875 let stream = DataStream::from_events(events);
876
877 let keyed = stream.key_by(|e| e.get_string("user_id").unwrap_or("").to_string());
878
879 let counts = keyed.count();
880 assert_eq!(counts.len(), 3); assert_eq!(*counts.get("user_0").unwrap(), 3);
882 assert_eq!(*counts.get("user_1").unwrap(), 3);
883 assert_eq!(*counts.get("user_2").unwrap(), 3);
884 }
885
886 #[test]
887 fn test_reduce_operator() {
888 let events = create_test_events(5);
889 let stream = DataStream::from_events(events);
890
891 let result = stream.reduce(|mut acc, e| {
892 let acc_val = acc.get_numeric("value").unwrap_or(0.0);
893 let e_val = e.get_numeric("value").unwrap_or(0.0);
894 acc.data.insert("value".to_string(), Value::Number(acc_val + e_val));
895 acc
896 });
897
898 assert!(result.is_some());
899 assert_eq!(result.unwrap().get_numeric("value"), Some(10.0)); }
901
902 #[test]
903 fn test_count_aggregator() {
904 let events = create_test_events(10);
905 let result = Count.aggregate(&events);
906
907 assert_eq!(result.as_number(), Some(10.0));
908 }
909
910 #[test]
911 fn test_sum_aggregator() {
912 let events = create_test_events(5);
913 let result = Sum::new("value").aggregate(&events);
914
915 assert_eq!(result.as_number(), Some(10.0)); }
917
918 #[test]
919 fn test_average_aggregator() {
920 let events = create_test_events(5);
921 let result = Average::new("value").aggregate(&events);
922
923 assert_eq!(result.as_number(), Some(2.0)); }
925
926 #[test]
927 fn test_group_by() {
928 let events = create_test_events(9);
929 let stream = DataStream::from_events(events);
930
931 let grouped = stream.group_by(|e| e.get_string("user_id").unwrap_or("").to_string());
932
933 let counts = grouped.count();
934 assert_eq!(counts.len(), 3);
935 }
936
937 #[test]
938 fn test_chaining_operators() {
939 let events = create_test_events(20);
940 let stream = DataStream::from_events(events);
941
942 let result = stream
943 .filter(|e| e.get_numeric("value").unwrap_or(0.0) >= 5.0)
944 .map(|mut e| {
945 if let Some(v) = e.get_numeric("value") {
946 e.data.insert("doubled".to_string(), Value::Number(v * 2.0));
947 }
948 e
949 })
950 .take(5)
951 .collect();
952
953 assert_eq!(result.len(), 5);
954 assert_eq!(result[0].get_numeric("doubled"), Some(10.0)); }
956
957 #[test]
958 fn test_windowed_stream() {
959 let events = create_test_events(10);
960 let stream = DataStream::from_events(events);
961
962 let windowed = stream.window(WindowConfig::tumbling(Duration::from_secs(60)));
963
964 assert!(!windowed.windows().is_empty());
965 }
966}