1#![allow(missing_docs)]
7
8use crate::streaming::event::StreamEvent;
9use crate::streaming::window::WindowType;
10use std::collections::VecDeque;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone)]
21pub struct StreamAlphaNode {
22 pub stream_name: String,
24
25 pub event_type: Option<String>,
27
28 pub window: Option<WindowSpec>,
30
31 events: VecDeque<StreamEvent>,
33
34 max_events: usize,
36
37 last_window_start: u64,
39
40 last_session_event_timestamp: Option<u64>,
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub struct WindowSpec {
47 pub duration: Duration,
48 pub window_type: WindowType,
49}
50
51impl StreamAlphaNode {
52 pub fn new(
76 stream_name: impl Into<String>,
77 event_type: Option<String>,
78 window: Option<WindowSpec>,
79 ) -> Self {
80 Self {
81 stream_name: stream_name.into(),
82 event_type,
83 window,
84 events: VecDeque::new(),
85 max_events: 10_000, last_window_start: 0,
87 last_session_event_timestamp: None,
88 }
89 }
90
91 pub fn with_max_events(mut self, max_events: usize) -> Self {
93 self.max_events = max_events;
94 self
95 }
96
97 pub fn process_event(&mut self, event: &StreamEvent) -> bool {
106 if event.metadata.source != self.stream_name {
108 return false;
109 }
110
111 if let Some(ref expected_type) = self.event_type {
113 if &event.event_type != expected_type {
114 return false;
115 }
116 }
117
118 let matches = if self.window.is_none() {
120 true
121 } else {
122 self.is_in_window(event.metadata.timestamp)
124 };
125
126 if matches {
127 if let Some(WindowSpec {
129 window_type: WindowType::Session { timeout },
130 ..
131 }) = &self.window
132 {
133 if let Some(last_time) = self.last_session_event_timestamp {
134 let gap = event.metadata.timestamp.saturating_sub(last_time);
135 let timeout_ms = timeout.as_millis() as u64;
136
137 if gap > timeout_ms {
138 self.events.clear();
140 self.last_session_event_timestamp = None;
141 }
142 }
143 }
144
145 self.add_event(event.clone());
147 self.evict_expired_events();
148 }
149
150 matches
151 }
152
153 fn add_event(&mut self, event: StreamEvent) {
155 let event_timestamp = event.metadata.timestamp;
156 self.events.push_back(event);
157
158 if let Some(WindowSpec {
160 window_type: WindowType::Session { .. },
161 ..
162 }) = &self.window
163 {
164 self.last_session_event_timestamp = Some(event_timestamp);
165 }
166
167 while self.events.len() > self.max_events {
169 self.events.pop_front();
170 }
171 }
172
173 fn is_in_window(&self, timestamp: u64) -> bool {
175 match &self.window {
176 None => true,
177 Some(spec) => {
178 let current_time = Self::current_time_ms();
179 let window_duration_ms = spec.duration.as_millis() as u64;
180
181 match spec.window_type {
182 WindowType::Sliding => {
183 timestamp >= current_time.saturating_sub(window_duration_ms)
185 && timestamp <= current_time
186 }
187 WindowType::Tumbling => {
188 let window_start = (current_time / window_duration_ms) * window_duration_ms;
190 let window_end = window_start + window_duration_ms;
191
192 timestamp >= window_start && timestamp < window_end
193 }
194 WindowType::Session { timeout } => {
195 let timeout_ms = timeout.as_millis() as u64;
196
197 match self.last_session_event_timestamp {
198 None => {
199 true
201 }
202 Some(last_event_time) => {
203 let gap = timestamp.saturating_sub(last_event_time);
205
206 if gap > timeout_ms {
207 true
210 } else {
211 true
213 }
214 }
215 }
216 }
217 }
218 }
219 }
220 }
221
222 fn evict_expired_events(&mut self) {
224 if let Some(spec) = &self.window {
225 let current_time = Self::current_time_ms();
226 let window_duration_ms = spec.duration.as_millis() as u64;
227
228 match spec.window_type {
229 WindowType::Sliding => {
230 let cutoff_time = current_time.saturating_sub(window_duration_ms);
231
232 while let Some(event) = self.events.front() {
234 if event.metadata.timestamp < cutoff_time {
235 self.events.pop_front();
236 } else {
237 break;
238 }
239 }
240 }
241 WindowType::Tumbling => {
242 let window_start = (current_time / window_duration_ms) * window_duration_ms;
243
244 if self.last_window_start != 0 && window_start != self.last_window_start {
246 self.events.clear();
247 self.last_window_start = window_start;
248 } else if self.last_window_start == 0 {
249 self.last_window_start = window_start;
250 }
251
252 while let Some(event) = self.events.front() {
254 if event.metadata.timestamp < window_start {
255 self.events.pop_front();
256 } else {
257 break;
258 }
259 }
260 }
261 WindowType::Session { timeout } => {
262 let timeout_ms = timeout.as_millis() as u64;
263
264 if let Some(last_event_time) = self.last_session_event_timestamp {
266 let gap_since_last = current_time.saturating_sub(last_event_time);
267
268 if gap_since_last > timeout_ms {
269 self.events.clear();
271 self.last_session_event_timestamp = None;
272 }
273 }
274
275 }
278 }
279 }
280 }
281
282 pub fn get_events(&self) -> &VecDeque<StreamEvent> {
287 &self.events
288 }
289
290 pub fn event_count(&self) -> usize {
292 self.events.len()
293 }
294
295 fn current_time_ms() -> u64 {
297 SystemTime::now()
298 .duration_since(UNIX_EPOCH)
299 .unwrap()
300 .as_millis() as u64
301 }
302
303 pub fn clear(&mut self) {
305 self.events.clear();
306 self.last_window_start = 0;
307 self.last_session_event_timestamp = None;
308 }
309
310 pub fn window_stats(&self) -> WindowStats {
312 WindowStats {
313 event_count: self.events.len(),
314 oldest_event_timestamp: self.events.front().map(|e| e.metadata.timestamp),
315 newest_event_timestamp: self.events.back().map(|e| e.metadata.timestamp),
316 window_duration_ms: self.window.as_ref().map(|w| w.duration.as_millis() as u64),
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub struct WindowStats {
324 pub event_count: usize,
325 pub oldest_event_timestamp: Option<u64>,
326 pub newest_event_timestamp: Option<u64>,
327 pub window_duration_ms: Option<u64>,
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::streaming::event::StreamEvent;
334 use crate::types::Value;
335 use std::collections::HashMap;
336
337 fn create_test_event(stream_name: &str, event_type: &str, timestamp: u64) -> StreamEvent {
338 let mut data = HashMap::new();
339 data.insert(
340 "test_field".to_string(),
341 Value::String("test_value".to_string()),
342 );
343
344 StreamEvent::with_timestamp(event_type, data, stream_name, timestamp)
345 }
346
347 #[test]
348 fn test_stream_alpha_node_basic() {
349 let mut node = StreamAlphaNode::new("user-events", None, None);
350 let event = create_test_event("user-events", "LoginEvent", 1000);
351
352 assert!(node.process_event(&event));
353 assert_eq!(node.event_count(), 1);
354 }
355
356 #[test]
357 fn test_stream_name_filtering() {
358 let mut node = StreamAlphaNode::new("user-events", None, None);
359
360 let matching_event = create_test_event("user-events", "LoginEvent", 1000);
361 let non_matching_event = create_test_event("other-stream", "LoginEvent", 1000);
362
363 assert!(node.process_event(&matching_event));
364 assert!(!node.process_event(&non_matching_event));
365 assert_eq!(node.event_count(), 1);
366 }
367
368 #[test]
369 fn test_event_type_filtering() {
370 let mut node = StreamAlphaNode::new("user-events", Some("LoginEvent".to_string()), None);
371
372 let matching_event = create_test_event("user-events", "LoginEvent", 1000);
373 let non_matching_event = create_test_event("user-events", "LogoutEvent", 1000);
374
375 assert!(node.process_event(&matching_event));
376 assert!(!node.process_event(&non_matching_event));
377 assert_eq!(node.event_count(), 1);
378 }
379
380 #[test]
381 fn test_sliding_window() {
382 let window = WindowSpec {
383 duration: Duration::from_secs(5),
384 window_type: WindowType::Sliding,
385 };
386
387 let mut node = StreamAlphaNode::new("sensors", None, Some(window));
388
389 let current_time = StreamAlphaNode::current_time_ms();
390
391 let recent_event = create_test_event("sensors", "TempReading", current_time - 2000);
393 assert!(node.process_event(&recent_event));
394
395 let old_event = create_test_event("sensors", "TempReading", current_time - 6000);
397 assert!(!node.process_event(&old_event));
398
399 assert_eq!(node.event_count(), 1);
400 }
401
402 #[test]
403 fn test_tumbling_window() {
404 let window = WindowSpec {
405 duration: Duration::from_secs(10),
406 window_type: WindowType::Tumbling,
407 };
408
409 let mut node = StreamAlphaNode::new("sensors", None, Some(window));
410
411 let current_time = StreamAlphaNode::current_time_ms();
412 let window_start = (current_time / 10_000) * 10_000;
413
414 let event1 = create_test_event("sensors", "TempReading", window_start + 1000);
416 assert!(node.process_event(&event1));
417
418 let event2 = create_test_event("sensors", "TempReading", window_start + 5000);
420 assert!(node.process_event(&event2));
421
422 let old_event = create_test_event("sensors", "TempReading", window_start - 5000);
424 assert!(!node.process_event(&old_event));
425
426 assert_eq!(node.event_count(), 2);
427 }
428
429 #[test]
430 fn test_eviction() {
431 let window = WindowSpec {
432 duration: Duration::from_millis(100),
433 window_type: WindowType::Sliding,
434 };
435
436 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
437
438 let current_time = StreamAlphaNode::current_time_ms();
439
440 let event1 = create_test_event("test-stream", "TestEvent", current_time - 50);
442 node.process_event(&event1);
443
444 assert_eq!(node.event_count(), 1);
445
446 std::thread::sleep(Duration::from_millis(150));
448
449 let event2 = create_test_event(
451 "test-stream",
452 "TestEvent",
453 StreamAlphaNode::current_time_ms(),
454 );
455 node.process_event(&event2);
456
457 assert_eq!(node.event_count(), 1);
459 }
460
461 #[test]
462 fn test_max_events_limit() {
463 let mut node = StreamAlphaNode::new("test-stream", None, None).with_max_events(5);
464
465 let current_time = StreamAlphaNode::current_time_ms();
466
467 for i in 0..10 {
469 let event = create_test_event("test-stream", "TestEvent", current_time + i);
470 node.process_event(&event);
471 }
472
473 assert_eq!(node.event_count(), 5);
475 }
476
477 #[test]
478 fn test_clear() {
479 let mut node = StreamAlphaNode::new("test-stream", None, None);
480
481 let event = create_test_event("test-stream", "TestEvent", 1000);
482 node.process_event(&event);
483
484 assert_eq!(node.event_count(), 1);
485
486 node.clear();
487 assert_eq!(node.event_count(), 0);
488 }
489
490 #[test]
491 fn test_window_stats() {
492 let window = WindowSpec {
493 duration: Duration::from_secs(60),
494 window_type: WindowType::Sliding,
495 };
496
497 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
498
499 let current_time = StreamAlphaNode::current_time_ms();
500 let event1 = create_test_event("test-stream", "TestEvent", current_time - 10_000);
501 let event2 = create_test_event("test-stream", "TestEvent", current_time - 5_000);
502
503 node.process_event(&event1);
504 node.process_event(&event2);
505
506 let stats = node.window_stats();
507 assert_eq!(stats.event_count, 2);
508 assert_eq!(stats.oldest_event_timestamp, Some(current_time - 10_000));
509 assert_eq!(stats.newest_event_timestamp, Some(current_time - 5_000));
510 assert_eq!(stats.window_duration_ms, Some(60_000));
511 }
512
513 #[test]
514 fn test_get_events() {
515 let mut node = StreamAlphaNode::new("test-stream", None, None);
516
517 let event1 = create_test_event("test-stream", "Event1", 1000);
518 let event2 = create_test_event("test-stream", "Event2", 2000);
519
520 node.process_event(&event1);
521 node.process_event(&event2);
522
523 let events = node.get_events();
524 assert_eq!(events.len(), 2);
525 assert_eq!(events[0].event_type, "Event1");
526 assert_eq!(events[1].event_type, "Event2");
527 }
528
529 #[test]
530 fn test_session_window_basic() {
531 let window = WindowSpec {
532 duration: Duration::from_secs(60), window_type: WindowType::Session {
534 timeout: Duration::from_secs(5),
535 },
536 };
537
538 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
539
540 let current_time = StreamAlphaNode::current_time_ms();
541
542 let event1 = create_test_event("test-stream", "Event1", current_time);
544 assert!(node.process_event(&event1));
545 assert_eq!(node.event_count(), 1);
546
547 let event2 = create_test_event("test-stream", "Event2", current_time + 2000);
549 assert!(node.process_event(&event2));
550 assert_eq!(node.event_count(), 2);
551
552 let event3 = create_test_event("test-stream", "Event3", current_time + 3000);
554 assert!(node.process_event(&event3));
555 assert_eq!(node.event_count(), 3);
556 }
557
558 #[test]
559 fn test_session_window_timeout_new_session() {
560 let window = WindowSpec {
561 duration: Duration::from_secs(60),
562 window_type: WindowType::Session {
563 timeout: Duration::from_millis(100),
564 },
565 };
566
567 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
568
569 let current_time = StreamAlphaNode::current_time_ms();
570
571 let event1 = create_test_event("test-stream", "Event1", current_time);
573 node.process_event(&event1);
574 assert_eq!(node.event_count(), 1);
575
576 std::thread::sleep(Duration::from_millis(150));
578
579 let event2 = create_test_event("test-stream", "Event2", StreamAlphaNode::current_time_ms());
581 node.process_event(&event2);
582
583 assert_eq!(node.event_count(), 1);
585 assert_eq!(node.get_events()[0].event_type, "Event2");
586 }
587
588 #[test]
589 fn test_session_window_gap_detection() {
590 let window = WindowSpec {
591 duration: Duration::from_secs(60),
592 window_type: WindowType::Session {
593 timeout: Duration::from_secs(2),
594 },
595 };
596
597 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
598
599 let base_time = StreamAlphaNode::current_time_ms();
600
601 let event1 = create_test_event("test-stream", "S1_Event1", base_time);
603 let event2 = create_test_event("test-stream", "S1_Event2", base_time + 1000);
604
605 node.process_event(&event1);
606 node.process_event(&event2);
607 assert_eq!(node.event_count(), 2);
608
609 let event3 = create_test_event("test-stream", "S2_Event1", base_time + 5000);
612 node.process_event(&event3);
613
614 assert!(node
616 .get_events()
617 .iter()
618 .any(|e| e.event_type == "S2_Event1"));
619 }
620
621 #[test]
622 fn test_session_window_clear_resets_state() {
623 let window = WindowSpec {
624 duration: Duration::from_secs(60),
625 window_type: WindowType::Session {
626 timeout: Duration::from_secs(5),
627 },
628 };
629
630 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
631
632 let current_time = StreamAlphaNode::current_time_ms();
633 let event = create_test_event("test-stream", "Event1", current_time);
634
635 node.process_event(&event);
636 assert_eq!(node.event_count(), 1);
637 assert!(node.last_session_event_timestamp.is_some());
638
639 node.clear();
640 assert_eq!(node.event_count(), 0);
641 assert!(node.last_session_event_timestamp.is_none());
642 }
643
644 #[test]
645 fn test_session_window_continuous_activity() {
646 let window = WindowSpec {
647 duration: Duration::from_secs(60),
648 window_type: WindowType::Session {
649 timeout: Duration::from_secs(1),
650 },
651 };
652
653 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
654
655 let base_time = StreamAlphaNode::current_time_ms();
656
657 for i in 0..5 {
659 let event =
660 create_test_event("test-stream", &format!("Event{}", i), base_time + (i * 500));
661 node.process_event(&event);
662 }
663
664 assert_eq!(node.event_count(), 5);
666 }
667
668 #[test]
669 fn test_session_window_multiple_sessions() {
670 let window = WindowSpec {
671 duration: Duration::from_secs(60),
672 window_type: WindowType::Session {
673 timeout: Duration::from_millis(500),
674 },
675 };
676
677 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
678
679 let base_time = StreamAlphaNode::current_time_ms();
680
681 node.process_event(&create_test_event("test-stream", "S1_E1", base_time));
683 node.process_event(&create_test_event("test-stream", "S1_E2", base_time + 200));
684
685 node.process_event(&create_test_event("test-stream", "S2_E1", base_time + 1000));
688
689 node.process_event(&create_test_event("test-stream", "S3_E1", base_time + 2000));
692
693 assert!(node.event_count() > 0);
696 }
697}