rust_rule_engine/rete/
stream_alpha_node.rs

1//! StreamAlphaNode: RETE node for filtering stream events
2//!
3//! This module implements a specialized alpha node for stream sources in the RETE network.
4//! It connects stream sources to the rule engine, managing windows and filtering events.
5
6#![allow(missing_docs)]
7
8use crate::streaming::event::StreamEvent;
9use crate::streaming::window::WindowType;
10use std::collections::VecDeque;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13/// StreamAlphaNode filters events from a named stream
14///
15/// This node:
16/// - Filters events by stream name
17/// - Optionally filters by event type
18/// - Manages time-based windows (sliding, tumbling)
19/// - Evicts expired events automatically
20#[derive(Debug, Clone)]
21pub struct StreamAlphaNode {
22    /// Name of the stream to filter
23    pub stream_name: String,
24
25    /// Optional event type filter
26    pub event_type: Option<String>,
27
28    /// Window specification (if any)
29    pub window: Option<WindowSpec>,
30
31    /// Events currently in the window
32    events: VecDeque<StreamEvent>,
33
34    /// Maximum events to retain in memory
35    max_events: usize,
36
37    /// Last window start time (for tumbling windows)
38    last_window_start: u64,
39}
40
41/// Window specification for StreamAlphaNode
42#[derive(Debug, Clone, PartialEq)]
43pub struct WindowSpec {
44    pub duration: Duration,
45    pub window_type: WindowType,
46}
47
48impl StreamAlphaNode {
49    /// Create a new StreamAlphaNode
50    ///
51    /// # Arguments
52    /// * `stream_name` - Name of the stream to filter
53    /// * `event_type` - Optional event type filter
54    /// * `window` - Optional window specification
55    ///
56    /// # Example
57    /// ```rust
58    /// use rust_rule_engine::rete::stream_alpha_node::{StreamAlphaNode, WindowSpec};
59    /// use rust_rule_engine::streaming::window::WindowType;
60    /// use std::time::Duration;
61    ///
62    /// // Node without window
63    /// let node = StreamAlphaNode::new("user-events", None, None);
64    ///
65    /// // Node with sliding window
66    /// let window = WindowSpec {
67    ///     duration: Duration::from_secs(300),
68    ///     window_type: WindowType::Sliding,
69    /// };
70    /// let node = StreamAlphaNode::new("logins", Some("LoginEvent".to_string()), Some(window));
71    /// ```
72    pub fn new(
73        stream_name: impl Into<String>,
74        event_type: Option<String>,
75        window: Option<WindowSpec>,
76    ) -> Self {
77        Self {
78            stream_name: stream_name.into(),
79            event_type,
80            window,
81            events: VecDeque::new(),
82            max_events: 10_000, // Default: keep 10k events max
83            last_window_start: 0,
84        }
85    }
86
87    /// Create with custom max events
88    pub fn with_max_events(mut self, max_events: usize) -> Self {
89        self.max_events = max_events;
90        self
91    }
92
93    /// Process incoming event and return whether it matches
94    ///
95    /// # Arguments
96    /// * `event` - The stream event to process
97    ///
98    /// # Returns
99    /// * `true` if event matches stream name, event type, and window criteria
100    /// * `false` otherwise
101    pub fn process_event(&mut self, event: &StreamEvent) -> bool {
102        // Check stream name matches
103        if event.metadata.source != self.stream_name {
104            return false;
105        }
106
107        // Check event type matches (if specified)
108        if let Some(ref expected_type) = self.event_type {
109            if &event.event_type != expected_type {
110                return false;
111            }
112        }
113
114        // If no window, event matches
115        let matches = if self.window.is_none() {
116            true
117        } else {
118            // With window, check if event is within window
119            self.is_in_window(event.metadata.timestamp)
120        };
121
122        if matches {
123            // Add to buffer and evict old events
124            self.add_event(event.clone());
125            self.evict_expired_events();
126        }
127
128        matches
129    }
130
131    /// Add event to internal buffer
132    fn add_event(&mut self, event: StreamEvent) {
133        self.events.push_back(event);
134
135        // Keep buffer size under limit
136        while self.events.len() > self.max_events {
137            self.events.pop_front();
138        }
139    }
140
141    /// Check if timestamp falls within current window
142    fn is_in_window(&self, timestamp: u64) -> bool {
143        match &self.window {
144            None => true,
145            Some(spec) => {
146                let current_time = Self::current_time_ms();
147                let window_duration_ms = spec.duration.as_millis() as u64;
148
149                match spec.window_type {
150                    WindowType::Sliding => {
151                        // Event must be within duration from now
152                        timestamp >= current_time.saturating_sub(window_duration_ms)
153                            && timestamp <= current_time
154                    }
155                    WindowType::Tumbling => {
156                        // Calculate window boundaries
157                        let window_start = (current_time / window_duration_ms) * window_duration_ms;
158                        let window_end = window_start + window_duration_ms;
159
160                        timestamp >= window_start && timestamp < window_end
161                    }
162                    WindowType::Session { .. } => {
163                        // Session windows not yet implemented
164                        // For now, treat as sliding window
165                        timestamp >= current_time.saturating_sub(window_duration_ms)
166                            && timestamp <= current_time
167                    }
168                }
169            }
170        }
171    }
172
173    /// Evict events that are outside the window
174    fn evict_expired_events(&mut self) {
175        if let Some(spec) = &self.window {
176            let current_time = Self::current_time_ms();
177            let window_duration_ms = spec.duration.as_millis() as u64;
178
179            match spec.window_type {
180                WindowType::Sliding => {
181                    let cutoff_time = current_time.saturating_sub(window_duration_ms);
182
183                    // Remove events older than cutoff
184                    while let Some(event) = self.events.front() {
185                        if event.metadata.timestamp < cutoff_time {
186                            self.events.pop_front();
187                        } else {
188                            break;
189                        }
190                    }
191                }
192                WindowType::Tumbling => {
193                    let window_start = (current_time / window_duration_ms) * window_duration_ms;
194
195                    // If we've moved to a new window, clear old events
196                    if self.last_window_start != 0 && window_start != self.last_window_start {
197                        self.events.clear();
198                        self.last_window_start = window_start;
199                    } else if self.last_window_start == 0 {
200                        self.last_window_start = window_start;
201                    }
202
203                    // Remove events from previous windows
204                    while let Some(event) = self.events.front() {
205                        if event.metadata.timestamp < window_start {
206                            self.events.pop_front();
207                        } else {
208                            break;
209                        }
210                    }
211                }
212                WindowType::Session { .. } => {
213                    // Session window eviction not yet implemented
214                    // For now, use same logic as sliding
215                    let cutoff_time = current_time.saturating_sub(window_duration_ms);
216
217                    while let Some(event) = self.events.front() {
218                        if event.metadata.timestamp < cutoff_time {
219                            self.events.pop_front();
220                        } else {
221                            break;
222                        }
223                    }
224                }
225            }
226        }
227    }
228
229    /// Get all events currently in the window
230    ///
231    /// # Returns
232    /// A slice of events that are within the current window
233    pub fn get_events(&self) -> &VecDeque<StreamEvent> {
234        &self.events
235    }
236
237    /// Get count of events in window
238    pub fn event_count(&self) -> usize {
239        self.events.len()
240    }
241
242    /// Get current time in milliseconds since epoch
243    fn current_time_ms() -> u64 {
244        SystemTime::now()
245            .duration_since(UNIX_EPOCH)
246            .unwrap()
247            .as_millis() as u64
248    }
249
250    /// Clear all events from buffer
251    pub fn clear(&mut self) {
252        self.events.clear();
253        self.last_window_start = 0;
254    }
255
256    /// Get window statistics
257    pub fn window_stats(&self) -> WindowStats {
258        WindowStats {
259            event_count: self.events.len(),
260            oldest_event_timestamp: self.events.front().map(|e| e.metadata.timestamp),
261            newest_event_timestamp: self.events.back().map(|e| e.metadata.timestamp),
262            window_duration_ms: self.window.as_ref().map(|w| w.duration.as_millis() as u64),
263        }
264    }
265}
266
267/// Window statistics for monitoring
268#[derive(Debug, Clone)]
269pub struct WindowStats {
270    pub event_count: usize,
271    pub oldest_event_timestamp: Option<u64>,
272    pub newest_event_timestamp: Option<u64>,
273    pub window_duration_ms: Option<u64>,
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::streaming::event::StreamEvent;
280    use crate::types::Value;
281    use std::collections::HashMap;
282
283    fn create_test_event(stream_name: &str, event_type: &str, timestamp: u64) -> StreamEvent {
284        let mut data = HashMap::new();
285        data.insert(
286            "test_field".to_string(),
287            Value::String("test_value".to_string()),
288        );
289
290        StreamEvent::with_timestamp(event_type, data, stream_name, timestamp)
291    }
292
293    #[test]
294    fn test_stream_alpha_node_basic() {
295        let mut node = StreamAlphaNode::new("user-events", None, None);
296        let event = create_test_event("user-events", "LoginEvent", 1000);
297
298        assert!(node.process_event(&event));
299        assert_eq!(node.event_count(), 1);
300    }
301
302    #[test]
303    fn test_stream_name_filtering() {
304        let mut node = StreamAlphaNode::new("user-events", None, None);
305
306        let matching_event = create_test_event("user-events", "LoginEvent", 1000);
307        let non_matching_event = create_test_event("other-stream", "LoginEvent", 1000);
308
309        assert!(node.process_event(&matching_event));
310        assert!(!node.process_event(&non_matching_event));
311        assert_eq!(node.event_count(), 1);
312    }
313
314    #[test]
315    fn test_event_type_filtering() {
316        let mut node = StreamAlphaNode::new("user-events", Some("LoginEvent".to_string()), None);
317
318        let matching_event = create_test_event("user-events", "LoginEvent", 1000);
319        let non_matching_event = create_test_event("user-events", "LogoutEvent", 1000);
320
321        assert!(node.process_event(&matching_event));
322        assert!(!node.process_event(&non_matching_event));
323        assert_eq!(node.event_count(), 1);
324    }
325
326    #[test]
327    fn test_sliding_window() {
328        let window = WindowSpec {
329            duration: Duration::from_secs(5),
330            window_type: WindowType::Sliding,
331        };
332
333        let mut node = StreamAlphaNode::new("sensors", None, Some(window));
334
335        let current_time = StreamAlphaNode::current_time_ms();
336
337        // Event within window
338        let recent_event = create_test_event("sensors", "TempReading", current_time - 2000);
339        assert!(node.process_event(&recent_event));
340
341        // Event outside window (6 seconds ago)
342        let old_event = create_test_event("sensors", "TempReading", current_time - 6000);
343        assert!(!node.process_event(&old_event));
344
345        assert_eq!(node.event_count(), 1);
346    }
347
348    #[test]
349    fn test_tumbling_window() {
350        let window = WindowSpec {
351            duration: Duration::from_secs(10),
352            window_type: WindowType::Tumbling,
353        };
354
355        let mut node = StreamAlphaNode::new("sensors", None, Some(window));
356
357        let current_time = StreamAlphaNode::current_time_ms();
358        let window_start = (current_time / 10_000) * 10_000;
359
360        // Event in current window
361        let event1 = create_test_event("sensors", "TempReading", window_start + 1000);
362        assert!(node.process_event(&event1));
363
364        // Event in current window
365        let event2 = create_test_event("sensors", "TempReading", window_start + 5000);
366        assert!(node.process_event(&event2));
367
368        // Event from previous window
369        let old_event = create_test_event("sensors", "TempReading", window_start - 5000);
370        assert!(!node.process_event(&old_event));
371
372        assert_eq!(node.event_count(), 2);
373    }
374
375    #[test]
376    fn test_eviction() {
377        let window = WindowSpec {
378            duration: Duration::from_millis(100),
379            window_type: WindowType::Sliding,
380        };
381
382        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
383
384        let current_time = StreamAlphaNode::current_time_ms();
385
386        // Add event within window
387        let event1 = create_test_event("test-stream", "TestEvent", current_time - 50);
388        node.process_event(&event1);
389
390        assert_eq!(node.event_count(), 1);
391
392        // Wait to ensure event becomes old
393        std::thread::sleep(Duration::from_millis(150));
394
395        // Process new event, which should trigger eviction
396        let event2 = create_test_event(
397            "test-stream",
398            "TestEvent",
399            StreamAlphaNode::current_time_ms(),
400        );
401        node.process_event(&event2);
402
403        // Old event should be evicted
404        assert_eq!(node.event_count(), 1);
405    }
406
407    #[test]
408    fn test_max_events_limit() {
409        let mut node = StreamAlphaNode::new("test-stream", None, None).with_max_events(5);
410
411        let current_time = StreamAlphaNode::current_time_ms();
412
413        // Add 10 events
414        for i in 0..10 {
415            let event = create_test_event("test-stream", "TestEvent", current_time + i);
416            node.process_event(&event);
417        }
418
419        // Should only keep 5 events
420        assert_eq!(node.event_count(), 5);
421    }
422
423    #[test]
424    fn test_clear() {
425        let mut node = StreamAlphaNode::new("test-stream", None, None);
426
427        let event = create_test_event("test-stream", "TestEvent", 1000);
428        node.process_event(&event);
429
430        assert_eq!(node.event_count(), 1);
431
432        node.clear();
433        assert_eq!(node.event_count(), 0);
434    }
435
436    #[test]
437    fn test_window_stats() {
438        let window = WindowSpec {
439            duration: Duration::from_secs(60),
440            window_type: WindowType::Sliding,
441        };
442
443        let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
444
445        let current_time = StreamAlphaNode::current_time_ms();
446        let event1 = create_test_event("test-stream", "TestEvent", current_time - 10_000);
447        let event2 = create_test_event("test-stream", "TestEvent", current_time - 5_000);
448
449        node.process_event(&event1);
450        node.process_event(&event2);
451
452        let stats = node.window_stats();
453        assert_eq!(stats.event_count, 2);
454        assert_eq!(stats.oldest_event_timestamp, Some(current_time - 10_000));
455        assert_eq!(stats.newest_event_timestamp, Some(current_time - 5_000));
456        assert_eq!(stats.window_duration_ms, Some(60_000));
457    }
458
459    #[test]
460    fn test_get_events() {
461        let mut node = StreamAlphaNode::new("test-stream", None, None);
462
463        let event1 = create_test_event("test-stream", "Event1", 1000);
464        let event2 = create_test_event("test-stream", "Event2", 2000);
465
466        node.process_event(&event1);
467        node.process_event(&event2);
468
469        let events = node.get_events();
470        assert_eq!(events.len(), 2);
471        assert_eq!(events[0].event_type, "Event1");
472        assert_eq!(events[1].event_type, "Event2");
473    }
474}