1use crate::streaming::event::StreamEvent;
26use crate::streaming::window::{TimeWindow, WindowType};
27use crate::types::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31
32#[derive(Clone)]
34pub struct DataStream {
35 events: Vec<StreamEvent>,
36}
37
38impl DataStream {
39 pub fn from_events(events: Vec<StreamEvent>) -> Self {
41 Self { events }
42 }
43
44 pub fn new() -> Self {
46 Self { events: Vec::new() }
47 }
48
49 pub fn push(&mut self, event: StreamEvent) {
51 self.events.push(event);
52 }
53
54 pub fn len(&self) -> usize {
56 self.events.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
61 self.events.is_empty()
62 }
63
64 pub fn filter<F>(self, predicate: F) -> Self
71 where
72 F: Fn(&StreamEvent) -> bool,
73 {
74 let filtered_events = self.events.into_iter().filter(predicate).collect();
75 Self {
76 events: filtered_events,
77 }
78 }
79
80 pub fn map<F>(self, mapper: F) -> Self
90 where
91 F: Fn(StreamEvent) -> StreamEvent,
92 {
93 let mapped_events = self.events.into_iter().map(mapper).collect();
94 Self {
95 events: mapped_events,
96 }
97 }
98
99 pub fn flat_map<F>(self, mapper: F) -> Self
109 where
110 F: Fn(StreamEvent) -> Vec<StreamEvent>,
111 {
112 let flat_mapped_events = self.events.into_iter().flat_map(mapper).collect();
113 Self {
114 events: flat_mapped_events,
115 }
116 }
117
118 pub fn key_by<F, K>(self, key_selector: F) -> KeyedStream<K>
125 where
126 F: Fn(&StreamEvent) -> K,
127 K: std::hash::Hash + Eq + Clone,
128 {
129 let mut keyed_events: HashMap<K, Vec<StreamEvent>> = HashMap::new();
130
131 for event in self.events {
132 let key = key_selector(&event);
133 keyed_events.entry(key).or_default().push(event);
134 }
135
136 KeyedStream { keyed_events }
137 }
138
139 pub fn window(self, config: WindowConfig) -> WindowedStream {
146 WindowedStream::new(self.events, config)
147 }
148
149 pub fn reduce<F>(self, reducer: F) -> Option<StreamEvent>
159 where
160 F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
161 {
162 self.events.into_iter().reduce(reducer)
163 }
164
165 pub fn count(self) -> usize {
167 self.events.len()
168 }
169
170 pub fn collect(self) -> Vec<StreamEvent> {
172 self.events
173 }
174
175 pub fn take(self, n: usize) -> Self {
177 Self {
178 events: self.events.into_iter().take(n).collect(),
179 }
180 }
181
182 pub fn skip(self, n: usize) -> Self {
184 Self {
185 events: self.events.into_iter().skip(n).collect(),
186 }
187 }
188
189 pub fn for_each<F>(self, action: F) -> Self
198 where
199 F: Fn(&StreamEvent),
200 {
201 for event in &self.events {
202 action(event);
203 }
204 self
205 }
206
207 pub fn union(mut self, other: DataStream) -> Self {
209 self.events.extend(other.events);
210 Self {
211 events: self.events,
212 }
213 }
214
215 pub fn find<F>(self, predicate: F) -> Option<StreamEvent>
217 where
218 F: Fn(&StreamEvent) -> bool,
219 {
220 self.events.into_iter().find(predicate)
221 }
222
223 pub fn any<F>(&self, predicate: F) -> bool
225 where
226 F: Fn(&StreamEvent) -> bool,
227 {
228 self.events.iter().any(predicate)
229 }
230
231 pub fn all<F>(&self, predicate: F) -> bool
233 where
234 F: Fn(&StreamEvent) -> bool,
235 {
236 self.events.iter().all(predicate)
237 }
238
239 pub fn sort_by<F, K>(mut self, key_fn: F) -> Self
241 where
242 F: Fn(&StreamEvent) -> K,
243 K: Ord,
244 {
245 self.events.sort_by_key(key_fn);
246 Self {
247 events: self.events,
248 }
249 }
250
251 pub fn group_by<F, K>(self, key_selector: F) -> GroupedStream<K>
253 where
254 F: Fn(&StreamEvent) -> K,
255 K: std::hash::Hash + Eq + Clone,
256 {
257 let mut grouped: HashMap<K, Vec<StreamEvent>> = HashMap::new();
258
259 for event in self.events {
260 let key = key_selector(&event);
261 grouped.entry(key).or_default().push(event);
262 }
263
264 GroupedStream { groups: grouped }
265 }
266
267 pub fn aggregate<A>(self, aggregator: A) -> AggregateResult
269 where
270 A: Aggregation,
271 {
272 aggregator.aggregate(&self.events)
273 }
274}
275
276impl Default for DataStream {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282pub struct KeyedStream<K>
284where
285 K: std::hash::Hash + Eq,
286{
287 keyed_events: HashMap<K, Vec<StreamEvent>>,
288}
289
290impl<K> KeyedStream<K>
291where
292 K: std::hash::Hash + Eq + Clone,
293{
294 pub fn reduce<F>(self, reducer: F) -> HashMap<K, StreamEvent>
296 where
297 F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
298 {
299 self.keyed_events
300 .into_iter()
301 .filter_map(|(key, events)| {
302 events
303 .into_iter()
304 .reduce(&reducer)
305 .map(|result| (key, result))
306 })
307 .collect()
308 }
309
310 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
312 where
313 A: Aggregation + Clone,
314 {
315 self.keyed_events
316 .into_iter()
317 .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
318 .collect()
319 }
320
321 pub fn window(self, config: WindowConfig) -> KeyedWindowedStream<K> {
323 KeyedWindowedStream {
324 keyed_events: self.keyed_events,
325 config,
326 }
327 }
328
329 pub fn count(self) -> HashMap<K, usize> {
331 self.keyed_events
332 .into_iter()
333 .map(|(key, events)| (key, events.len()))
334 .collect()
335 }
336
337 pub fn keys(&self) -> Vec<K> {
339 self.keyed_events.keys().cloned().collect()
340 }
341
342 pub fn flatten(self) -> DataStream {
344 let events: Vec<StreamEvent> = self
345 .keyed_events
346 .into_iter()
347 .flat_map(|(_, events)| events)
348 .collect();
349
350 DataStream { events }
351 }
352}
353
354#[derive(Debug, Clone)]
356pub struct WindowConfig {
357 pub window_type: WindowType,
358 pub duration: Duration,
359 pub max_events: usize,
360}
361
362impl WindowConfig {
363 pub fn sliding(duration: Duration) -> Self {
365 Self {
366 window_type: WindowType::Sliding,
367 duration,
368 max_events: 10000,
369 }
370 }
371
372 pub fn tumbling(duration: Duration) -> Self {
374 Self {
375 window_type: WindowType::Tumbling,
376 duration,
377 max_events: 10000,
378 }
379 }
380
381 pub fn session(timeout: Duration) -> Self {
383 Self {
384 window_type: WindowType::Session { timeout },
385 duration: timeout,
386 max_events: 10000,
387 }
388 }
389
390 pub fn with_max_events(mut self, max_events: usize) -> Self {
392 self.max_events = max_events;
393 self
394 }
395}
396
397pub struct WindowedStream {
399 windows: Vec<TimeWindow>,
400}
401
402impl WindowedStream {
403 pub fn new(events: Vec<StreamEvent>, config: WindowConfig) -> Self {
405 let mut windows = Vec::new();
406
407 if events.is_empty() {
408 return Self { windows };
409 }
410
411 match config.window_type {
413 WindowType::Tumbling => {
414 let window_ms = config.duration.as_millis() as u64;
416 let mut window_map: HashMap<u64, Vec<StreamEvent>> = HashMap::new();
417
418 for event in events {
419 let window_start = (event.metadata.timestamp / window_ms) * window_ms;
420 window_map.entry(window_start).or_default().push(event);
421 }
422
423 for (start_time, mut window_events) in window_map {
425 let mut window = TimeWindow::new(
426 config.window_type.clone(),
427 config.duration,
428 start_time,
429 config.max_events,
430 );
431
432 for event in window_events.drain(..) {
433 window.add_event(event);
434 }
435
436 windows.push(window);
437 }
438 }
439 WindowType::Sliding | WindowType::Session { .. } => {
440 let window_ms = config.duration.as_millis() as u64;
443
444 if !events.is_empty() {
445 let min_time = events.iter().map(|e| e.metadata.timestamp).min().unwrap();
446 let max_time = events.iter().map(|e| e.metadata.timestamp).max().unwrap();
447
448 let mut current_start = min_time;
449
450 while current_start <= max_time {
451 let mut window = TimeWindow::new(
452 config.window_type.clone(),
453 config.duration,
454 current_start,
455 config.max_events,
456 );
457
458 for event in &events {
459 if event.metadata.timestamp >= current_start
460 && event.metadata.timestamp < current_start + window_ms
461 {
462 window.add_event(event.clone());
463 }
464 }
465
466 if window.count() > 0 {
467 windows.push(window);
468 }
469
470 current_start += window_ms / 2;
472 }
473 }
474 }
475 }
476
477 Self { windows }
478 }
479
480 pub fn aggregate<A>(self, aggregator: A) -> Vec<AggregateResult>
482 where
483 A: Aggregation,
484 {
485 self.windows
486 .iter()
487 .map(|window| {
488 let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
489 aggregator.aggregate(&events)
490 })
491 .collect()
492 }
493
494 pub fn reduce<F>(self, reducer: F) -> Vec<StreamEvent>
496 where
497 F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
498 {
499 self.windows
500 .into_iter()
501 .filter_map(|window| {
502 let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
503 events.into_iter().reduce(&reducer)
504 })
505 .collect()
506 }
507
508 pub fn windows(&self) -> &[TimeWindow] {
510 &self.windows
511 }
512
513 pub fn counts(self) -> Vec<usize> {
515 self.windows.iter().map(|w| w.count()).collect()
516 }
517
518 pub fn flatten(self) -> DataStream {
520 let events: Vec<StreamEvent> = self
521 .windows
522 .into_iter()
523 .flat_map(|w| w.events().iter().cloned().collect::<Vec<_>>())
524 .collect();
525
526 DataStream { events }
527 }
528}
529
530pub struct KeyedWindowedStream<K>
532where
533 K: std::hash::Hash + Eq,
534{
535 keyed_events: HashMap<K, Vec<StreamEvent>>,
536 config: WindowConfig,
537}
538
539impl<K> KeyedWindowedStream<K>
540where
541 K: std::hash::Hash + Eq + Clone,
542{
543 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, Vec<AggregateResult>>
545 where
546 A: Aggregation + Clone,
547 {
548 self.keyed_events
549 .into_iter()
550 .map(|(key, events)| {
551 let windowed = WindowedStream::new(events, self.config.clone());
552 let results = windowed.aggregate(aggregator.clone());
553 (key, results)
554 })
555 .collect()
556 }
557
558 pub fn reduce<F>(self, reducer: F) -> HashMap<K, Vec<StreamEvent>>
560 where
561 F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
562 {
563 self.keyed_events
564 .into_iter()
565 .map(|(key, events)| {
566 let windowed = WindowedStream::new(events, self.config.clone());
567 let results = windowed.reduce(reducer.clone());
568 (key, results)
569 })
570 .collect()
571 }
572}
573
574pub struct GroupedStream<K>
576where
577 K: std::hash::Hash + Eq,
578{
579 groups: HashMap<K, Vec<StreamEvent>>,
580}
581
582impl<K> GroupedStream<K>
583where
584 K: std::hash::Hash + Eq + Clone,
585{
586 pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
588 where
589 A: Aggregation + Clone,
590 {
591 self.groups
592 .into_iter()
593 .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
594 .collect()
595 }
596
597 pub fn count(self) -> HashMap<K, usize> {
599 self.groups
600 .into_iter()
601 .map(|(key, events)| (key, events.len()))
602 .collect()
603 }
604
605 pub fn first(self) -> HashMap<K, StreamEvent> {
607 self.groups
608 .into_iter()
609 .filter_map(|(key, mut events)| {
610 if !events.is_empty() {
611 Some((key, events.remove(0)))
612 } else {
613 None
614 }
615 })
616 .collect()
617 }
618
619 pub fn last(self) -> HashMap<K, StreamEvent> {
621 self.groups
622 .into_iter()
623 .filter_map(|(key, mut events)| events.pop().map(|e| (key, e)))
624 .collect()
625 }
626}
627
628pub trait Aggregation: Send + Sync {
630 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult;
631}
632
633#[derive(Debug, Clone)]
635pub enum AggregateResult {
636 Number(f64),
637 String(String),
638 Map(HashMap<String, Value>),
639 List(Vec<Value>),
640 None,
641}
642
643impl AggregateResult {
644 pub fn as_number(&self) -> Option<f64> {
645 match self {
646 AggregateResult::Number(n) => Some(*n),
647 _ => None,
648 }
649 }
650
651 pub fn as_string(&self) -> Option<&str> {
652 match self {
653 AggregateResult::String(s) => Some(s),
654 _ => None,
655 }
656 }
657
658 pub fn as_map(&self) -> Option<&HashMap<String, Value>> {
659 match self {
660 AggregateResult::Map(m) => Some(m),
661 _ => None,
662 }
663 }
664}
665
666#[derive(Clone)]
670pub struct Count;
671
672impl Aggregation for Count {
673 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
674 AggregateResult::Number(events.len() as f64)
675 }
676}
677
678#[derive(Clone)]
680pub struct Sum {
681 pub field: String,
682}
683
684impl Sum {
685 pub fn new(field: impl Into<String>) -> Self {
686 Self {
687 field: field.into(),
688 }
689 }
690}
691
692impl Aggregation for Sum {
693 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
694 let sum: f64 = events
695 .iter()
696 .filter_map(|e| e.get_numeric(&self.field))
697 .sum();
698 AggregateResult::Number(sum)
699 }
700}
701
702#[derive(Clone)]
704pub struct Average {
705 pub field: String,
706}
707
708impl Average {
709 pub fn new(field: impl Into<String>) -> Self {
710 Self {
711 field: field.into(),
712 }
713 }
714}
715
716impl Aggregation for Average {
717 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
718 let values: Vec<f64> = events
719 .iter()
720 .filter_map(|e| e.get_numeric(&self.field))
721 .collect();
722
723 if values.is_empty() {
724 AggregateResult::None
725 } else {
726 let avg = values.iter().sum::<f64>() / values.len() as f64;
727 AggregateResult::Number(avg)
728 }
729 }
730}
731
732#[derive(Clone)]
734pub struct Min {
735 pub field: String,
736}
737
738impl Min {
739 pub fn new(field: impl Into<String>) -> Self {
740 Self {
741 field: field.into(),
742 }
743 }
744}
745
746impl Aggregation for Min {
747 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
748 events
749 .iter()
750 .filter_map(|e| e.get_numeric(&self.field))
751 .min_by(|a, b| a.partial_cmp(b).unwrap())
752 .map(AggregateResult::Number)
753 .unwrap_or(AggregateResult::None)
754 }
755}
756
757#[derive(Clone)]
759pub struct Max {
760 pub field: String,
761}
762
763impl Max {
764 pub fn new(field: impl Into<String>) -> Self {
765 Self {
766 field: field.into(),
767 }
768 }
769}
770
771impl Aggregation for Max {
772 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
773 events
774 .iter()
775 .filter_map(|e| e.get_numeric(&self.field))
776 .max_by(|a, b| a.partial_cmp(b).unwrap())
777 .map(AggregateResult::Number)
778 .unwrap_or(AggregateResult::None)
779 }
780}
781
782pub struct CustomAggregator<F>
784where
785 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
786{
787 func: Arc<F>,
788}
789
790impl<F> CustomAggregator<F>
791where
792 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
793{
794 pub fn new(func: F) -> Self {
795 Self {
796 func: Arc::new(func),
797 }
798 }
799}
800
801impl<F> Clone for CustomAggregator<F>
802where
803 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
804{
805 fn clone(&self) -> Self {
806 Self {
807 func: Arc::clone(&self.func),
808 }
809 }
810}
811
812impl<F> Aggregation for CustomAggregator<F>
813where
814 F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
815{
816 fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
817 (self.func)(events)
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use super::*;
824 use crate::types::Value;
825 use std::collections::HashMap;
826
827 fn create_test_events(count: usize) -> Vec<StreamEvent> {
828 (0..count)
829 .map(|i| {
830 let mut data = HashMap::new();
831 data.insert("value".to_string(), Value::Number(i as f64));
832 data.insert(
833 "user_id".to_string(),
834 Value::String(format!("user_{}", i % 3)),
835 );
836 StreamEvent::new("TestEvent", data, "test")
837 })
838 .collect()
839 }
840
841 #[test]
842 fn test_filter_operator() {
843 let events = create_test_events(10);
844 let stream = DataStream::from_events(events);
845
846 let filtered = stream.filter(|e| e.get_numeric("value").unwrap_or(0.0) > 5.0);
847
848 assert_eq!(filtered.len(), 4); }
850
851 #[test]
852 fn test_map_operator() {
853 let events = create_test_events(5);
854 let stream = DataStream::from_events(events);
855
856 let mapped = stream.map(|mut e| {
857 if let Some(value) = e.get_numeric("value") {
858 e.data
859 .insert("doubled".to_string(), Value::Number(value * 2.0));
860 }
861 e
862 });
863
864 let collected = mapped.collect();
865 assert_eq!(collected[0].get_numeric("doubled"), Some(0.0));
866 assert_eq!(collected[1].get_numeric("doubled"), Some(2.0));
867 }
868
869 #[test]
870 fn test_key_by_operator() {
871 let events = create_test_events(9);
872 let stream = DataStream::from_events(events);
873
874 let keyed = stream.key_by(|e| e.get_string("user_id").unwrap_or("").to_string());
875
876 let counts = keyed.count();
877 assert_eq!(counts.len(), 3); assert_eq!(*counts.get("user_0").unwrap(), 3);
879 assert_eq!(*counts.get("user_1").unwrap(), 3);
880 assert_eq!(*counts.get("user_2").unwrap(), 3);
881 }
882
883 #[test]
884 fn test_reduce_operator() {
885 let events = create_test_events(5);
886 let stream = DataStream::from_events(events);
887
888 let result = stream.reduce(|mut acc, e| {
889 let acc_val = acc.get_numeric("value").unwrap_or(0.0);
890 let e_val = e.get_numeric("value").unwrap_or(0.0);
891 acc.data
892 .insert("value".to_string(), Value::Number(acc_val + e_val));
893 acc
894 });
895
896 assert!(result.is_some());
897 assert_eq!(result.unwrap().get_numeric("value"), Some(10.0)); }
899
900 #[test]
901 fn test_count_aggregator() {
902 let events = create_test_events(10);
903 let result = Count.aggregate(&events);
904
905 assert_eq!(result.as_number(), Some(10.0));
906 }
907
908 #[test]
909 fn test_sum_aggregator() {
910 let events = create_test_events(5);
911 let result = Sum::new("value").aggregate(&events);
912
913 assert_eq!(result.as_number(), Some(10.0)); }
915
916 #[test]
917 fn test_average_aggregator() {
918 let events = create_test_events(5);
919 let result = Average::new("value").aggregate(&events);
920
921 assert_eq!(result.as_number(), Some(2.0)); }
923
924 #[test]
925 fn test_group_by() {
926 let events = create_test_events(9);
927 let stream = DataStream::from_events(events);
928
929 let grouped = stream.group_by(|e| e.get_string("user_id").unwrap_or("").to_string());
930
931 let counts = grouped.count();
932 assert_eq!(counts.len(), 3);
933 }
934
935 #[test]
936 fn test_chaining_operators() {
937 let events = create_test_events(20);
938 let stream = DataStream::from_events(events);
939
940 let result = stream
941 .filter(|e| e.get_numeric("value").unwrap_or(0.0) >= 5.0)
942 .map(|mut e| {
943 if let Some(v) = e.get_numeric("value") {
944 e.data.insert("doubled".to_string(), Value::Number(v * 2.0));
945 }
946 e
947 })
948 .take(5)
949 .collect();
950
951 assert_eq!(result.len(), 5);
952 assert_eq!(result[0].get_numeric("doubled"), Some(10.0)); }
954
955 #[test]
956 fn test_windowed_stream() {
957 let events = create_test_events(10);
958 let stream = DataStream::from_events(events);
959
960 let windowed = stream.window(WindowConfig::tumbling(Duration::from_secs(60)));
961
962 assert!(!windowed.windows().is_empty());
963 }
964}