1use crate::streaming::event::StreamEvent;
26use crate::streaming::window::{TimeWindow, WindowType};
27use crate::types::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31
32#[derive(Clone)]
34pub struct DataStream {
35 events: Vec<StreamEvent>,
36}
37
38impl DataStream {
39 pub fn from_events(events: Vec<StreamEvent>) -> Self {
41 Self { events }
42 }
43
44 pub fn new() -> Self {
46 Self { events: Vec::new() }
47 }
48
49 pub fn push(&mut self, event: StreamEvent) {
51 self.events.push(event);
52 }
53
54 pub fn len(&self) -> usize {
56 self.events.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
61 self.events.is_empty()
62 }
63
64 pub fn filter<F>(self, predicate: F) -> Self
71 where
72 F: Fn(&StreamEvent) -> bool,
73 {
74 let filtered_events = self.events.into_iter().filter(predicate).collect();
75 Self {
76 events: filtered_events,
77 }
78 }
79
80 pub fn map<F>(self, mapper: F) -> Self
90 where
91 F: Fn(StreamEvent) -> StreamEvent,
92 {
93 let mapped_events = self.events.into_iter().map(mapper).collect();
94 Self {
95 events: mapped_events,
96 }
97 }
98
99 pub fn flat_map<F>(self, mapper: F) -> Self
109 where
110 F: Fn(StreamEvent) -> Vec<StreamEvent>,
111 {
112 let flat_mapped_events = self.events.into_iter().flat_map(mapper).collect();
113 Self {
114 events: flat_mapped_events,
115 }
116 }
117
118 pub fn key_by<F, K>(self, key_selector: F) -> KeyedStream<K>
125 where
126 F: Fn(&StreamEvent) -> K,
127 K: std::hash::Hash + Eq + Clone,
128 {
129 let mut keyed_events: HashMap<K, Vec<StreamEvent>> = HashMap::new();
130
131 for event in self.events {
132 let key = key_selector(&event);
133 keyed_events.entry(key).or_default().push(event);
134 }
135
136 KeyedStream { keyed_events }
137 }
138
139 pub fn window(self, config: WindowConfig) -> WindowedStream {
146 WindowedStream::new(self.events, config)
147 }
148
149 pub fn reduce<F>(self, reducer: F) -> Option<StreamEvent>
159 where
160 F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
161 {
162 self.events.into_iter().reduce(reducer)
163 }
164
165 pub fn count(self) -> usize {
167 self.events.len()
168 }
169
170 pub fn collect(self) -> Vec<StreamEvent> {
172 self.events
173 }
174
175 pub fn take(self, n: usize) -> Self {
177 Self {
178 events: self.events.into_iter().take(n).collect(),
179 }
180 }
181
182 pub fn skip(self, n: usize) -> Self {
184 Self {
185 events: self.events.into_iter().skip(n).collect(),
186 }
187 }
188
189 pub fn for_each<F>(self, action: F) -> Self
198 where
199 F: Fn(&StreamEvent),
200 {
201 for event in &self.events {
202 action(event);
203 }
204 self
205 }
206
207 pub fn union(mut self, other: DataStream) -> Self {
209 self.events.extend(other.events);
210 Self {
211 events: self.events,
212 }
213 }
214
215 pub fn find<F>(self, predicate: F) -> Option<StreamEvent>
217 where
218 F: Fn(&StreamEvent) -> bool,
219 {
220 self.events.into_iter().find(predicate)
221 }
222
223 pub fn any<F>(&self, predicate: F) -> bool
225 where
226 F: Fn(&StreamEvent) -> bool,
227 {
228 self.events.iter().any(predicate)
229 }
230
231 pub fn all<F>(&self, predicate: F) -> bool
233 where
234 F: Fn(&StreamEvent) -> bool,
235 {
236 self.events.iter().all(predicate)
237 }
238
239 pub fn sort_by<F, K>(mut self, key_fn: F) -> Self
241 where
242 F: Fn(&StreamEvent) -> K,
243 K: Ord,
244 {
245 self.events.sort_by_key(key_fn);
246 Self {
247 events: self.events,
248 }
249 }
250
251 pub fn group_by<F, K>(self, key_selector: F) -> GroupedStream<K>
253 where
254 F: Fn(&StreamEvent) -> K,
255 K: std::hash::Hash + Eq + Clone,
256 {
257 let mut grouped: HashMap<K, Vec<StreamEvent>> = HashMap::new();
258
259 for event in self.events {
260 let key = key_selector(&event);
261 grouped.entry(key).or_default().push(event);
262 }
263
264 GroupedStream { groups: grouped }
265 }
266
267 pub fn aggregate<A>(self, aggregator: A) -> AggregateResult
269 where
270 A: Aggregation,
271 {
272 aggregator.aggregate(&self.events)
273 }
274}
275
276impl Default for DataStream {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282pub struct KeyedStream<K>
284where
285 K: std::hash::Hash + Eq,
286{
287 keyed_events: HashMap<K, Vec<StreamEvent>>,
288}
289
290impl<K> KeyedStream<K>
291where
292 K: std::hash::Hash + Eq + Clone,
293{
294 pub fn reduce<F>(self, reducer: F) -> HashMap<K, StreamEvent>
296 where
297 F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
298 {
299 self.keyed_events
300 .into_iter()
301 .filter_map(|(key, events)| {
302 events
303 .into_iter()
304 .reduce(&reducer)
305 .map(|result| (key, result))
306 })
307 .collect()
308 }
309
310 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
312 where
313 A: Aggregation + Clone,
314 {
315 self.keyed_events
316 .into_iter()
317 .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
318 .collect()
319 }
320
321 pub fn window(self, config: WindowConfig) -> KeyedWindowedStream<K> {
323 KeyedWindowedStream {
324 keyed_events: self.keyed_events,
325 config,
326 }
327 }
328
329 pub fn count(self) -> HashMap<K, usize> {
331 self.keyed_events
332 .into_iter()
333 .map(|(key, events)| (key, events.len()))
334 .collect()
335 }
336
337 pub fn keys(&self) -> Vec<K> {
339 self.keyed_events.keys().cloned().collect()
340 }
341
342 pub fn flatten(self) -> DataStream {
344 let events: Vec<StreamEvent> = self.keyed_events.into_values().flatten().collect();
347
348 DataStream { events }
349 }
350}
351
352#[derive(Debug, Clone)]
354pub struct WindowConfig {
355 pub window_type: WindowType,
356 pub duration: Duration,
357 pub max_events: usize,
358}
359
360impl WindowConfig {
361 pub fn sliding(duration: Duration) -> Self {
363 Self {
364 window_type: WindowType::Sliding,
365 duration,
366 max_events: 10000,
367 }
368 }
369
370 pub fn tumbling(duration: Duration) -> Self {
372 Self {
373 window_type: WindowType::Tumbling,
374 duration,
375 max_events: 10000,
376 }
377 }
378
379 pub fn session(timeout: Duration) -> Self {
381 Self {
382 window_type: WindowType::Session { timeout },
383 duration: timeout,
384 max_events: 10000,
385 }
386 }
387
388 pub fn with_max_events(mut self, max_events: usize) -> Self {
390 self.max_events = max_events;
391 self
392 }
393}
394
395pub struct WindowedStream {
397 windows: Vec<TimeWindow>,
398}
399
400impl WindowedStream {
401 pub fn new(events: Vec<StreamEvent>, config: WindowConfig) -> Self {
403 let mut windows = Vec::new();
404
405 if events.is_empty() {
406 return Self { windows };
407 }
408
409 match config.window_type {
411 WindowType::Tumbling => {
412 let window_ms = config.duration.as_millis() as u64;
414 let mut window_map: HashMap<u64, Vec<StreamEvent>> = HashMap::new();
415
416 for event in events {
417 let window_start = (event.metadata.timestamp / window_ms) * window_ms;
418 window_map.entry(window_start).or_default().push(event);
419 }
420
421 for (start_time, mut window_events) in window_map {
423 let mut window = TimeWindow::new(
424 config.window_type.clone(),
425 config.duration,
426 start_time,
427 config.max_events,
428 );
429
430 for event in window_events.drain(..) {
431 window.add_event(event);
432 }
433
434 windows.push(window);
435 }
436 }
437 WindowType::Sliding | WindowType::Session { .. } => {
438 let window_ms = config.duration.as_millis() as u64;
441
442 if !events.is_empty() {
443 let min_time = events.iter().map(|e| e.metadata.timestamp).min().unwrap();
444 let max_time = events.iter().map(|e| e.metadata.timestamp).max().unwrap();
445
446 let mut current_start = min_time;
447
448 while current_start <= max_time {
449 let mut window = TimeWindow::new(
450 config.window_type.clone(),
451 config.duration,
452 current_start,
453 config.max_events,
454 );
455
456 for event in &events {
457 if event.metadata.timestamp >= current_start
458 && event.metadata.timestamp < current_start + window_ms
459 {
460 window.add_event(event.clone());
461 }
462 }
463
464 if window.count() > 0 {
465 windows.push(window);
466 }
467
468 current_start += window_ms / 2;
470 }
471 }
472 }
473 }
474
475 Self { windows }
476 }
477
478 pub fn aggregate<A>(self, aggregator: A) -> Vec<AggregateResult>
480 where
481 A: Aggregation,
482 {
483 self.windows
484 .iter()
485 .map(|window| {
486 let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
487 aggregator.aggregate(&events)
488 })
489 .collect()
490 }
491
492 pub fn reduce<F>(self, reducer: F) -> Vec<StreamEvent>
494 where
495 F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
496 {
497 self.windows
498 .into_iter()
499 .filter_map(|window| {
500 let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
501 events.into_iter().reduce(&reducer)
502 })
503 .collect()
504 }
505
506 pub fn windows(&self) -> &[TimeWindow] {
508 &self.windows
509 }
510
511 pub fn counts(self) -> Vec<usize> {
513 self.windows.iter().map(|w| w.count()).collect()
514 }
515
516 pub fn flatten(self) -> DataStream {
518 let events: Vec<StreamEvent> = self
519 .windows
520 .into_iter()
521 .flat_map(|w| w.events().iter().cloned().collect::<Vec<_>>())
522 .collect();
523
524 DataStream { events }
525 }
526}
527
528pub struct KeyedWindowedStream<K>
530where
531 K: std::hash::Hash + Eq,
532{
533 keyed_events: HashMap<K, Vec<StreamEvent>>,
534 config: WindowConfig,
535}
536
537impl<K> KeyedWindowedStream<K>
538where
539 K: std::hash::Hash + Eq + Clone,
540{
541 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, Vec<AggregateResult>>
543 where
544 A: Aggregation + Clone,
545 {
546 self.keyed_events
547 .into_iter()
548 .map(|(key, events)| {
549 let windowed = WindowedStream::new(events, self.config.clone());
550 let results = windowed.aggregate(aggregator.clone());
551 (key, results)
552 })
553 .collect()
554 }
555
556 pub fn reduce<F>(self, reducer: F) -> HashMap<K, Vec<StreamEvent>>
558 where
559 F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
560 {
561 self.keyed_events
562 .into_iter()
563 .map(|(key, events)| {
564 let windowed = WindowedStream::new(events, self.config.clone());
565 let results = windowed.reduce(reducer.clone());
566 (key, results)
567 })
568 .collect()
569 }
570}
571
572pub struct GroupedStream<K>
574where
575 K: std::hash::Hash + Eq,
576{
577 groups: HashMap<K, Vec<StreamEvent>>,
578}
579
580impl<K> GroupedStream<K>
581where
582 K: std::hash::Hash + Eq + Clone,
583{
584 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
586 where
587 A: Aggregation + Clone,
588 {
589 self.groups
590 .into_iter()
591 .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
592 .collect()
593 }
594
595 pub fn count(self) -> HashMap<K, usize> {
597 self.groups
598 .into_iter()
599 .map(|(key, events)| (key, events.len()))
600 .collect()
601 }
602
603 pub fn first(self) -> HashMap<K, StreamEvent> {
605 self.groups
606 .into_iter()
607 .filter_map(|(key, mut events)| {
608 if !events.is_empty() {
609 Some((key, events.remove(0)))
610 } else {
611 None
612 }
613 })
614 .collect()
615 }
616
617 pub fn last(self) -> HashMap<K, StreamEvent> {
619 self.groups
620 .into_iter()
621 .filter_map(|(key, mut events)| events.pop().map(|e| (key, e)))
622 .collect()
623 }
624}
625
626pub trait Aggregation: Send + Sync {
628 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult;
629}
630
631#[derive(Debug, Clone)]
633pub enum AggregateResult {
634 Number(f64),
635 String(String),
636 Map(HashMap<String, Value>),
637 List(Vec<Value>),
638 None,
639}
640
641impl AggregateResult {
642 pub fn as_number(&self) -> Option<f64> {
643 match self {
644 AggregateResult::Number(n) => Some(*n),
645 _ => None,
646 }
647 }
648
649 pub fn as_string(&self) -> Option<&str> {
650 match self {
651 AggregateResult::String(s) => Some(s),
652 _ => None,
653 }
654 }
655
656 pub fn as_map(&self) -> Option<&HashMap<String, Value>> {
657 match self {
658 AggregateResult::Map(m) => Some(m),
659 _ => None,
660 }
661 }
662}
663
664#[derive(Clone)]
668pub struct Count;
669
670impl Aggregation for Count {
671 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
672 AggregateResult::Number(events.len() as f64)
673 }
674}
675
676#[derive(Clone)]
678pub struct Sum {
679 pub field: String,
680}
681
682impl Sum {
683 pub fn new(field: impl Into<String>) -> Self {
684 Self {
685 field: field.into(),
686 }
687 }
688}
689
690impl Aggregation for Sum {
691 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
692 let sum: f64 = events
693 .iter()
694 .filter_map(|e| e.get_numeric(&self.field))
695 .sum();
696 AggregateResult::Number(sum)
697 }
698}
699
700#[derive(Clone)]
702pub struct Average {
703 pub field: String,
704}
705
706impl Average {
707 pub fn new(field: impl Into<String>) -> Self {
708 Self {
709 field: field.into(),
710 }
711 }
712}
713
714impl Aggregation for Average {
715 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
716 let values: Vec<f64> = events
717 .iter()
718 .filter_map(|e| e.get_numeric(&self.field))
719 .collect();
720
721 if values.is_empty() {
722 AggregateResult::None
723 } else {
724 let avg = values.iter().sum::<f64>() / values.len() as f64;
725 AggregateResult::Number(avg)
726 }
727 }
728}
729
730#[derive(Clone)]
732pub struct Min {
733 pub field: String,
734}
735
736impl Min {
737 pub fn new(field: impl Into<String>) -> Self {
738 Self {
739 field: field.into(),
740 }
741 }
742}
743
744impl Aggregation for Min {
745 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
746 events
747 .iter()
748 .filter_map(|e| e.get_numeric(&self.field))
749 .min_by(|a, b| a.partial_cmp(b).unwrap())
750 .map(AggregateResult::Number)
751 .unwrap_or(AggregateResult::None)
752 }
753}
754
755#[derive(Clone)]
757pub struct Max {
758 pub field: String,
759}
760
761impl Max {
762 pub fn new(field: impl Into<String>) -> Self {
763 Self {
764 field: field.into(),
765 }
766 }
767}
768
769impl Aggregation for Max {
770 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
771 events
772 .iter()
773 .filter_map(|e| e.get_numeric(&self.field))
774 .max_by(|a, b| a.partial_cmp(b).unwrap())
775 .map(AggregateResult::Number)
776 .unwrap_or(AggregateResult::None)
777 }
778}
779
780pub struct CustomAggregator<F>
782where
783 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
784{
785 func: Arc<F>,
786}
787
788impl<F> CustomAggregator<F>
789where
790 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
791{
792 pub fn new(func: F) -> Self {
793 Self {
794 func: Arc::new(func),
795 }
796 }
797}
798
799impl<F> Clone for CustomAggregator<F>
800where
801 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
802{
803 fn clone(&self) -> Self {
804 Self {
805 func: Arc::clone(&self.func),
806 }
807 }
808}
809
810impl<F> Aggregation for CustomAggregator<F>
811where
812 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
813{
814 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
815 (self.func)(events)
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use crate::types::Value;
823 use std::collections::HashMap;
824
825 fn create_test_events(count: usize) -> Vec<StreamEvent> {
826 (0..count)
827 .map(|i| {
828 let mut data = HashMap::new();
829 data.insert("value".to_string(), Value::Number(i as f64));
830 data.insert(
831 "user_id".to_string(),
832 Value::String(format!("user_{}", i % 3)),
833 );
834 StreamEvent::new("TestEvent", data, "test")
835 })
836 .collect()
837 }
838
839 #[test]
840 fn test_filter_operator() {
841 let events = create_test_events(10);
842 let stream = DataStream::from_events(events);
843
844 let filtered = stream.filter(|e| e.get_numeric("value").unwrap_or(0.0) > 5.0);
845
846 assert_eq!(filtered.len(), 4); }
848
849 #[test]
850 fn test_map_operator() {
851 let events = create_test_events(5);
852 let stream = DataStream::from_events(events);
853
854 let mapped = stream.map(|mut e| {
855 if let Some(value) = e.get_numeric("value") {
856 e.data
857 .insert("doubled".to_string(), Value::Number(value * 2.0));
858 }
859 e
860 });
861
862 let collected = mapped.collect();
863 assert_eq!(collected[0].get_numeric("doubled"), Some(0.0));
864 assert_eq!(collected[1].get_numeric("doubled"), Some(2.0));
865 }
866
867 #[test]
868 fn test_key_by_operator() {
869 let events = create_test_events(9);
870 let stream = DataStream::from_events(events);
871
872 let keyed = stream.key_by(|e| e.get_string("user_id").unwrap_or("").to_string());
873
874 let counts = keyed.count();
875 assert_eq!(counts.len(), 3); assert_eq!(*counts.get("user_0").unwrap(), 3);
877 assert_eq!(*counts.get("user_1").unwrap(), 3);
878 assert_eq!(*counts.get("user_2").unwrap(), 3);
879 }
880
881 #[test]
882 fn test_reduce_operator() {
883 let events = create_test_events(5);
884 let stream = DataStream::from_events(events);
885
886 let result = stream.reduce(|mut acc, e| {
887 let acc_val = acc.get_numeric("value").unwrap_or(0.0);
888 let e_val = e.get_numeric("value").unwrap_or(0.0);
889 acc.data
890 .insert("value".to_string(), Value::Number(acc_val + e_val));
891 acc
892 });
893
894 assert!(result.is_some());
895 assert_eq!(result.unwrap().get_numeric("value"), Some(10.0)); }
897
898 #[test]
899 fn test_count_aggregator() {
900 let events = create_test_events(10);
901 let result = Count.aggregate(&events);
902
903 assert_eq!(result.as_number(), Some(10.0));
904 }
905
906 #[test]
907 fn test_sum_aggregator() {
908 let events = create_test_events(5);
909 let result = Sum::new("value").aggregate(&events);
910
911 assert_eq!(result.as_number(), Some(10.0)); }
913
914 #[test]
915 fn test_average_aggregator() {
916 let events = create_test_events(5);
917 let result = Average::new("value").aggregate(&events);
918
919 assert_eq!(result.as_number(), Some(2.0)); }
921
922 #[test]
923 fn test_group_by() {
924 let events = create_test_events(9);
925 let stream = DataStream::from_events(events);
926
927 let grouped = stream.group_by(|e| e.get_string("user_id").unwrap_or("").to_string());
928
929 let counts = grouped.count();
930 assert_eq!(counts.len(), 3);
931 }
932
933 #[test]
934 fn test_chaining_operators() {
935 let events = create_test_events(20);
936 let stream = DataStream::from_events(events);
937
938 let result = stream
939 .filter(|e| e.get_numeric("value").unwrap_or(0.0) >= 5.0)
940 .map(|mut e| {
941 if let Some(v) = e.get_numeric("value") {
942 e.data.insert("doubled".to_string(), Value::Number(v * 2.0));
943 }
944 e
945 })
946 .take(5)
947 .collect();
948
949 assert_eq!(result.len(), 5);
950 assert_eq!(result[0].get_numeric("doubled"), Some(10.0)); }
952
953 #[test]
954 fn test_windowed_stream() {
955 let events = create_test_events(10);
956 let stream = DataStream::from_events(events);
957
958 let windowed = stream.window(WindowConfig::tumbling(Duration::from_secs(60)));
959
960 assert!(!windowed.windows().is_empty());
961 }
962}