rust_rule_engine/rete/
stream_alpha_node.rs

1//! StreamAlphaNode: RETE node for filtering stream events
2//!
3//! This module implements a specialized alpha node for stream sources in the RETE network.
4//! It connects stream sources to the rule engine, managing windows and filtering events.
5
6#![allow(missing_docs)]
7
8use crate::streaming::event::StreamEvent;
9use crate::streaming::window::WindowType;
10use std::collections::VecDeque;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13/// StreamAlphaNode filters events from a named stream
14///
15/// This node:
16/// - Filters events by stream name
17/// - Optionally filters by event type
18/// - Manages time-based windows (sliding, tumbling)
19/// - Evicts expired events automatically
20#[derive(Debug, Clone)]
21pub struct StreamAlphaNode {
22    /// Name of the stream to filter
23    pub stream_name: String,
24
25    /// Optional event type filter
26    pub event_type: Option<String>,
27
28    /// Window specification (if any)
29    pub window: Option<WindowSpec>,
30
31    /// Events currently in the window
32    events: VecDeque<StreamEvent>,
33
34    /// Maximum events to retain in memory
35    max_events: usize,
36
37    /// Last window start time (for tumbling windows)
38    last_window_start: u64,
39
40    /// Last event timestamp in session (for session windows)
41    last_session_event_timestamp: Option<u64>,
42}
43
44/// Window specification for StreamAlphaNode
45#[derive(Debug, Clone, PartialEq)]
46pub struct WindowSpec {
47    pub duration: Duration,
48    pub window_type: WindowType,
49}
50
51impl StreamAlphaNode {
52    /// Create a new StreamAlphaNode
53    ///
54    /// # Arguments
55    /// * `stream_name` - Name of the stream to filter
56    /// * `event_type` - Optional event type filter
57    /// * `window` - Optional window specification
58    ///
59    /// # Example
60    /// ```rust
61    /// use rust_rule_engine::rete::stream_alpha_node::{StreamAlphaNode, WindowSpec};
62    /// use rust_rule_engine::streaming::window::WindowType;
63    /// use std::time::Duration;
64    ///
65    /// // Node without window
66    /// let node = StreamAlphaNode::new("user-events", None, None);
67    ///
68    /// // Node with sliding window
69    /// let window = WindowSpec {
70    ///     duration: Duration::from_secs(300),
71    ///     window_type: WindowType::Sliding,
72    /// };
73    /// let node = StreamAlphaNode::new("logins", Some("LoginEvent".to_string()), Some(window));
74    /// ```
75    pub fn new(
76        stream_name: impl Into<String>,
77        event_type: Option<String>,
78        window: Option<WindowSpec>,
79    ) -> Self {
80        Self {
81            stream_name: stream_name.into(),
82            event_type,
83            window,
84            events: VecDeque::new(),
85            max_events: 10_000, // Default: keep 10k events max
86            last_window_start: 0,
87            last_session_event_timestamp: None,
88        }
89    }
90
91    /// Create with custom max events
92    pub fn with_max_events(mut self, max_events: usize) -> Self {
93        self.max_events = max_events;
94        self
95    }
96
97    /// Process incoming event and return whether it matches
98    ///
99    /// # Arguments
100    /// * `event` - The stream event to process
101    ///
102    /// # Returns
103    /// * `true` if event matches stream name, event type, and window criteria
104    /// * `false` otherwise
105    pub fn process_event(&mut self, event: &StreamEvent) -> bool {
106        // Check stream name matches
107        if event.metadata.source != self.stream_name {
108            return false;
109        }
110
111        // Check event type matches (if specified)
112        if let Some(ref expected_type) = self.event_type {
113            if &event.event_type != expected_type {
114                return false;
115            }
116        }
117
118        // If no window, event matches
119        let matches = if self.window.is_none() {
120            true
121        } else {
122            // With window, check if event is within window
123            self.is_in_window(event.metadata.timestamp)
124        };
125
126        if matches {
127            // For session windows, check if we need to start a new session BEFORE adding
128            if let Some(WindowSpec {
129                window_type: WindowType::Session { timeout },
130                ..
131            }) = &self.window
132            {
133                if let Some(last_time) = self.last_session_event_timestamp {
134                    let gap = event.metadata.timestamp.saturating_sub(last_time);
135                    let timeout_ms = timeout.as_millis() as u64;
136
137                    if gap > timeout_ms {
138                        // Session expired - clear old events before adding new one
139                        self.events.clear();
140                        self.last_session_event_timestamp = None;
141                    }
142                }
143            }
144
145            // Add to buffer and evict old events
146            self.add_event(event.clone());
147            self.evict_expired_events();
148        }
149
150        matches
151    }
152
153    /// Add event to internal buffer
154    fn add_event(&mut self, event: StreamEvent) {
155        let event_timestamp = event.metadata.timestamp;
156        self.events.push_back(event);
157
158        // Update session tracking
159        if let Some(WindowSpec {
160            window_type: WindowType::Session { .. },
161            ..
162        }) = &self.window
163        {
164            self.last_session_event_timestamp = Some(event_timestamp);
165        }
166
167        // Keep buffer size under limit
168        while self.events.len() > self.max_events {
169            self.events.pop_front();
170        }
171    }
172
173    /// Check if timestamp falls within current window
174    fn is_in_window(&self, timestamp: u64) -> bool {
175        match &self.window {
176            None => true,
177            Some(spec) => {
178                let current_time = Self::current_time_ms();
179                let window_duration_ms = spec.duration.as_millis() as u64;
180
181                match spec.window_type {
182                    WindowType::Sliding => {
183                        // Event must be within duration from now
184                        timestamp >= current_time.saturating_sub(window_duration_ms)
185                            && timestamp <= current_time
186                    }
187                    WindowType::Tumbling => {
188                        // Calculate window boundaries
189                        let window_start = (current_time / window_duration_ms) * window_duration_ms;
190                        let window_end = window_start + window_duration_ms;
191
192                        timestamp >= window_start && timestamp < window_end
193                    }
194                    WindowType::Session { timeout } => {
195                        let timeout_ms = timeout.as_millis() as u64;
196
197                        match self.last_session_event_timestamp {
198                            None => {
199                                // First event in session - always accept
200                                true
201                            }
202                            Some(last_event_time) => {
203                                // Check if gap from last event exceeds timeout
204                                let gap = timestamp.saturating_sub(last_event_time);
205
206                                if gap > timeout_ms {
207                                    // Gap too large - this starts a new session
208                                    // But we still accept this event (it starts new session)
209                                    true
210                                } else {
211                                    // Within timeout - part of current session
212                                    true
213                                }
214                            }
215                        }
216                    }
217                }
218            }
219        }
220    }
221
222    /// Evict events that are outside the window
223    fn evict_expired_events(&mut self) {
224        if let Some(spec) = &self.window {
225            let current_time = Self::current_time_ms();
226            let window_duration_ms = spec.duration.as_millis() as u64;
227
228            match spec.window_type {
229                WindowType::Sliding => {
230                    let cutoff_time = current_time.saturating_sub(window_duration_ms);
231
232                    // Remove events older than cutoff
233                    while let Some(event) = self.events.front() {
234                        if event.metadata.timestamp < cutoff_time {
235                            self.events.pop_front();
236                        } else {
237                            break;
238                        }
239                    }
240                }
241                WindowType::Tumbling => {
242                    let window_start = (current_time / window_duration_ms) * window_duration_ms;
243
244                    // If we've moved to a new window, clear old events
245                    if self.last_window_start != 0 && window_start != self.last_window_start {
246                        self.events.clear();
247                        self.last_window_start = window_start;
248                    } else if self.last_window_start == 0 {
249                        self.last_window_start = window_start;
250                    }
251
252                    // Remove events from previous windows
253                    while let Some(event) = self.events.front() {
254                        if event.metadata.timestamp < window_start {
255                            self.events.pop_front();
256                        } else {
257                            break;
258                        }
259                    }
260                }
261                WindowType::Session { timeout } => {
262                    let timeout_ms = timeout.as_millis() as u64;
263
264                    // Check if current session has expired
265                    if let Some(last_event_time) = self.last_session_event_timestamp {
266                        let gap_since_last = current_time.saturating_sub(last_event_time);
267
268                        if gap_since_last > timeout_ms {
269                            // Session expired - clear all events and reset
270                            self.events.clear();
271                            self.last_session_event_timestamp = None;
272                        }
273                    }
274
275                    // Note: Unlike sliding/tumbling, session windows don't evict individual events
276                    // They either keep the entire session or clear it when timeout expires
277                }
278            }
279        }
280    }
281
282    /// Get all events currently in the window
283    ///
284    /// # Returns
285    /// A slice of events that are within the current window
286    pub fn get_events(&self) -> &VecDeque<StreamEvent> {
287        &self.events
288    }
289
290    /// Get count of events in window
291    pub fn event_count(&self) -> usize {
292        self.events.len()
293    }
294
295    /// Get current time in milliseconds since epoch
296    fn current_time_ms() -> u64 {
297        SystemTime::now()
298            .duration_since(UNIX_EPOCH)
299            .unwrap()
300            .as_millis() as u64
301    }
302
303    /// Clear all events from buffer
304    pub fn clear(&mut self) {
305        self.events.clear();
306        self.last_window_start = 0;
307        self.last_session_event_timestamp = None;
308    }
309
310    /// Get window statistics
311    pub fn window_stats(&self) -> WindowStats {
312        WindowStats {
313            event_count: self.events.len(),
314            oldest_event_timestamp: self.events.front().map(|e| e.metadata.timestamp),
315            newest_event_timestamp: self.events.back().map(|e| e.metadata.timestamp),
316            window_duration_ms: self.window.as_ref().map(|w| w.duration.as_millis() as u64),
317        }
318    }
319}
320
321/// Window statistics for monitoring
322#[derive(Debug, Clone)]
323pub struct WindowStats {
324    pub event_count: usize,
325    pub oldest_event_timestamp: Option<u64>,
326    pub newest_event_timestamp: Option<u64>,
327    pub window_duration_ms: Option<u64>,
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::streaming::event::StreamEvent;
334    use crate::types::Value;
335    use std::collections::HashMap;
336
337    fn create_test_event(stream_name: &str, event_type: &str, timestamp: u64) -> StreamEvent {
338        let mut data = HashMap::new();
339        data.insert(
340            "test_field".to_string(),
341            Value::String("test_value".to_string()),
342        );
343
344        StreamEvent::with_timestamp(event_type, data, stream_name, timestamp)
345    }
346
347    #[test]
348    fn test_stream_alpha_node_basic() {
349        let mut node = StreamAlphaNode::new("user-events", None, None);
350        let event = create_test_event("user-events", "LoginEvent", 1000);
351
352        assert!(node.process_event(&event));
353        assert_eq!(node.event_count(), 1);
354    }
355
356    #[test]
357    fn test_stream_name_filtering() {
358        let mut node = StreamAlphaNode::new("user-events", None, None);
359
360        let matching_event = create_test_event("user-events", "LoginEvent", 1000);
361        let non_matching_event = create_test_event("other-stream", "LoginEvent", 1000);
362
363        assert!(node.process_event(&matching_event));
364        assert!(!node.process_event(&non_matching_event));
365        assert_eq!(node.event_count(), 1);
366    }
367
368    #[test]
369    fn test_event_type_filtering() {
370        let mut node = StreamAlphaNode::new("user-events", Some("LoginEvent".to_string()), None);
371
372        let matching_event = create_test_event("user-events", "LoginEvent", 1000);
373        let non_matching_event = create_test_event("user-events", "LogoutEvent", 1000);
374
375        assert!(node.process_event(&matching_event));
376        assert!(!node.process_event(&non_matching_event));
377        assert_eq!(node.event_count(), 1);
378    }
379
380    #[test]
381    fn test_sliding_window() {
382        let window = WindowSpec {
383            duration: Duration::from_secs(5),
384            window_type: WindowType::Sliding,
385        };
386
387        let mut node = StreamAlphaNode::new("sensors", None, Some(window));
388
389        let current_time = StreamAlphaNode::current_time_ms();
390
391        // Event within window
392        let recent_event = create_test_event("sensors", "TempReading", current_time - 2000);
393        assert!(node.process_event(&recent_event));
394
395        // Event outside window (6 seconds ago)
396        let old_event = create_test_event("sensors", "TempReading", current_time - 6000);
397        assert!(!node.process_event(&old_event));
398
399        assert_eq!(node.event_count(), 1);
400    }
401
402    #[test]
403    fn test_tumbling_window() {
404        let window = WindowSpec {
405            duration: Duration::from_secs(10),
406            window_type: WindowType::Tumbling,
407        };
408
409        let mut node = StreamAlphaNode::new("sensors", None, Some(window));
410
411        let current_time = StreamAlphaNode::current_time_ms();
412        let window_start = (current_time / 10_000) * 10_000;
413
414        // Event in current window
415        let event1 = create_test_event("sensors", "TempReading", window_start + 1000);
416        assert!(node.process_event(&event1));
417
418        // Event in current window
419        let event2 = create_test_event("sensors", "TempReading", window_start + 5000);
420        assert!(node.process_event(&event2));
421
422        // Event from previous window
423        let old_event = create_test_event("sensors", "TempReading", window_start - 5000);
424        assert!(!node.process_event(&old_event));
425
426        assert_eq!(node.event_count(), 2);
427    }
428
429    #[test]
430    fn test_eviction() {
431        let window = WindowSpec {
432            duration: Duration::from_millis(100),
433            window_type: WindowType::Sliding,
434        };
435
436        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
437
438        let current_time = StreamAlphaNode::current_time_ms();
439
440        // Add event within window
441        let event1 = create_test_event("test-stream", "TestEvent", current_time - 50);
442        node.process_event(&event1);
443
444        assert_eq!(node.event_count(), 1);
445
446        // Wait to ensure event becomes old
447        std::thread::sleep(Duration::from_millis(150));
448
449        // Process new event, which should trigger eviction
450        let event2 = create_test_event(
451            "test-stream",
452            "TestEvent",
453            StreamAlphaNode::current_time_ms(),
454        );
455        node.process_event(&event2);
456
457        // Old event should be evicted
458        assert_eq!(node.event_count(), 1);
459    }
460
461    #[test]
462    fn test_max_events_limit() {
463        let mut node = StreamAlphaNode::new("test-stream", None, None).with_max_events(5);
464
465        let current_time = StreamAlphaNode::current_time_ms();
466
467        // Add 10 events
468        for i in 0..10 {
469            let event = create_test_event("test-stream", "TestEvent", current_time + i);
470            node.process_event(&event);
471        }
472
473        // Should only keep 5 events
474        assert_eq!(node.event_count(), 5);
475    }
476
477    #[test]
478    fn test_clear() {
479        let mut node = StreamAlphaNode::new("test-stream", None, None);
480
481        let event = create_test_event("test-stream", "TestEvent", 1000);
482        node.process_event(&event);
483
484        assert_eq!(node.event_count(), 1);
485
486        node.clear();
487        assert_eq!(node.event_count(), 0);
488    }
489
490    #[test]
491    fn test_window_stats() {
492        let window = WindowSpec {
493            duration: Duration::from_secs(60),
494            window_type: WindowType::Sliding,
495        };
496
497        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
498
499        let current_time = StreamAlphaNode::current_time_ms();
500        let event1 = create_test_event("test-stream", "TestEvent", current_time - 10_000);
501        let event2 = create_test_event("test-stream", "TestEvent", current_time - 5_000);
502
503        node.process_event(&event1);
504        node.process_event(&event2);
505
506        let stats = node.window_stats();
507        assert_eq!(stats.event_count, 2);
508        assert_eq!(stats.oldest_event_timestamp, Some(current_time - 10_000));
509        assert_eq!(stats.newest_event_timestamp, Some(current_time - 5_000));
510        assert_eq!(stats.window_duration_ms, Some(60_000));
511    }
512
513    #[test]
514    fn test_get_events() {
515        let mut node = StreamAlphaNode::new("test-stream", None, None);
516
517        let event1 = create_test_event("test-stream", "Event1", 1000);
518        let event2 = create_test_event("test-stream", "Event2", 2000);
519
520        node.process_event(&event1);
521        node.process_event(&event2);
522
523        let events = node.get_events();
524        assert_eq!(events.len(), 2);
525        assert_eq!(events[0].event_type, "Event1");
526        assert_eq!(events[1].event_type, "Event2");
527    }
528
529    #[test]
530    fn test_session_window_basic() {
531        let window = WindowSpec {
532            duration: Duration::from_secs(60), // Not used for session windows (timeout is in WindowType)
533            window_type: WindowType::Session {
534                timeout: Duration::from_secs(5),
535            },
536        };
537
538        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
539
540        let current_time = StreamAlphaNode::current_time_ms();
541
542        // First event - starts session
543        let event1 = create_test_event("test-stream", "Event1", current_time);
544        assert!(node.process_event(&event1));
545        assert_eq!(node.event_count(), 1);
546
547        // Second event within timeout (2 seconds later)
548        let event2 = create_test_event("test-stream", "Event2", current_time + 2000);
549        assert!(node.process_event(&event2));
550        assert_eq!(node.event_count(), 2);
551
552        // Third event within timeout (1 second after event2)
553        let event3 = create_test_event("test-stream", "Event3", current_time + 3000);
554        assert!(node.process_event(&event3));
555        assert_eq!(node.event_count(), 3);
556    }
557
558    #[test]
559    fn test_session_window_timeout_new_session() {
560        let window = WindowSpec {
561            duration: Duration::from_secs(60),
562            window_type: WindowType::Session {
563                timeout: Duration::from_millis(100),
564            },
565        };
566
567        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
568
569        let current_time = StreamAlphaNode::current_time_ms();
570
571        // First event - Session 1
572        let event1 = create_test_event("test-stream", "Event1", current_time);
573        node.process_event(&event1);
574        assert_eq!(node.event_count(), 1);
575
576        // Wait 150ms - exceed timeout
577        std::thread::sleep(Duration::from_millis(150));
578
579        // This should trigger eviction of old session
580        let event2 = create_test_event("test-stream", "Event2", StreamAlphaNode::current_time_ms());
581        node.process_event(&event2);
582
583        // Old session should be cleared, only new event remains
584        assert_eq!(node.event_count(), 1);
585        assert_eq!(node.get_events()[0].event_type, "Event2");
586    }
587
588    #[test]
589    fn test_session_window_gap_detection() {
590        let window = WindowSpec {
591            duration: Duration::from_secs(60),
592            window_type: WindowType::Session {
593                timeout: Duration::from_secs(2),
594            },
595        };
596
597        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
598
599        let base_time = StreamAlphaNode::current_time_ms();
600
601        // Session 1: Events at t=0, t=1
602        let event1 = create_test_event("test-stream", "S1_Event1", base_time);
603        let event2 = create_test_event("test-stream", "S1_Event2", base_time + 1000);
604
605        node.process_event(&event1);
606        node.process_event(&event2);
607        assert_eq!(node.event_count(), 2);
608
609        // Gap of 3 seconds (exceeds 2-second timeout)
610        // Session 2: Event at t=5
611        let event3 = create_test_event("test-stream", "S2_Event1", base_time + 5000);
612        node.process_event(&event3);
613
614        // New event is accepted (starts new session)
615        assert!(node
616            .get_events()
617            .iter()
618            .any(|e| e.event_type == "S2_Event1"));
619    }
620
621    #[test]
622    fn test_session_window_clear_resets_state() {
623        let window = WindowSpec {
624            duration: Duration::from_secs(60),
625            window_type: WindowType::Session {
626                timeout: Duration::from_secs(5),
627            },
628        };
629
630        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
631
632        let current_time = StreamAlphaNode::current_time_ms();
633        let event = create_test_event("test-stream", "Event1", current_time);
634
635        node.process_event(&event);
636        assert_eq!(node.event_count(), 1);
637        assert!(node.last_session_event_timestamp.is_some());
638
639        node.clear();
640        assert_eq!(node.event_count(), 0);
641        assert!(node.last_session_event_timestamp.is_none());
642    }
643
644    #[test]
645    fn test_session_window_continuous_activity() {
646        let window = WindowSpec {
647            duration: Duration::from_secs(60),
648            window_type: WindowType::Session {
649                timeout: Duration::from_secs(1),
650            },
651        };
652
653        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
654
655        let base_time = StreamAlphaNode::current_time_ms();
656
657        // Add events every 500ms (within 1-second timeout)
658        for i in 0..5 {
659            let event =
660                create_test_event("test-stream", &format!("Event{}", i), base_time + (i * 500));
661            node.process_event(&event);
662        }
663
664        // All events should be in the same session
665        assert_eq!(node.event_count(), 5);
666    }
667
668    #[test]
669    fn test_session_window_multiple_sessions() {
670        let window = WindowSpec {
671            duration: Duration::from_secs(60),
672            window_type: WindowType::Session {
673                timeout: Duration::from_millis(500),
674            },
675        };
676
677        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
678
679        let base_time = StreamAlphaNode::current_time_ms();
680
681        // Session 1: Events at t=0, t=200
682        node.process_event(&create_test_event("test-stream", "S1_E1", base_time));
683        node.process_event(&create_test_event("test-stream", "S1_E2", base_time + 200));
684
685        // Gap of 600ms - new session
686        // Session 2: Event at t=1000
687        node.process_event(&create_test_event("test-stream", "S2_E1", base_time + 1000));
688
689        // Gap of 700ms - new session
690        // Session 3: Event at t=2000
691        node.process_event(&create_test_event("test-stream", "S3_E1", base_time + 2000));
692
693        // Node should contain events from the latest session only
694        // (previous sessions should be evicted when gap exceeded)
695        assert!(node.event_count() > 0);
696    }
697}