rust_rule_engine/streaming/
engine.rs

1//! Streaming Rule Engine
2//!
3//! Core engine for processing real-time event streams with rule evaluation.
4
5use crate::engine::facts::Facts;
6use crate::engine::knowledge_base::KnowledgeBase;
7use crate::engine::RustRuleEngine;
8use crate::parser::grl_parser::GRLParser;
9use crate::streaming::aggregator::StreamAnalytics;
10use crate::streaming::event::StreamEvent;
11use crate::streaming::window::{TimeWindow, WindowManager, WindowType};
12use crate::types::Value;
13use crate::{Result, RuleEngineError};
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18use tokio::sync::{mpsc, RwLock};
19use tokio::time::interval;
20
21/// Configuration for stream rule engine
22#[derive(Debug, Clone)]
23pub struct StreamConfig {
24    /// Buffer size for incoming events
25    pub buffer_size: usize,
26    /// Window duration for aggregations
27    pub window_duration: Duration,
28    /// Maximum events per window
29    pub max_events_per_window: usize,
30    /// Maximum number of windows to keep
31    pub max_windows: usize,
32    /// Window type (sliding, tumbling, etc.)
33    pub window_type: WindowType,
34    /// Cache TTL for analytics
35    pub analytics_cache_ttl_ms: u64,
36    /// Processing interval for rule evaluation
37    pub processing_interval: Duration,
38}
39
40impl Default for StreamConfig {
41    fn default() -> Self {
42        Self {
43            buffer_size: 10000,
44            window_duration: Duration::from_secs(60),
45            max_events_per_window: 1000,
46            max_windows: 100,
47            window_type: WindowType::Sliding,
48            analytics_cache_ttl_ms: 30000,
49            processing_interval: Duration::from_millis(100),
50        }
51    }
52}
53
54/// Result of stream rule execution
55#[derive(Debug, Clone)]
56pub struct StreamExecutionResult {
57    /// Number of rules that fired
58    pub rules_fired: usize,
59    /// Number of events processed
60    pub events_processed: usize,
61    /// Processing duration
62    pub processing_time_ms: u64,
63    /// Triggered actions
64    pub actions: Vec<StreamAction>,
65    /// Analytics results
66    pub analytics: HashMap<String, Value>,
67}
68
69/// Action triggered by stream rules
70#[derive(Debug, Clone)]
71pub struct StreamAction {
72    /// Action type identifier
73    pub action_type: String,
74    /// Action parameters
75    pub parameters: HashMap<String, Value>,
76    /// Timestamp when action was triggered
77    pub timestamp: u64,
78    /// Rule that triggered this action
79    pub rule_name: String,
80}
81
82/// Main streaming rule engine
83pub struct StreamRuleEngine {
84    /// Configuration
85    config: StreamConfig,
86    /// Regular rule engine for rule evaluation
87    rule_engine: RustRuleEngine,
88    /// Window manager for time-based processing
89    window_manager: Arc<RwLock<WindowManager>>,
90    /// Stream analytics
91    analytics: Arc<RwLock<StreamAnalytics>>,
92    /// Event sender
93    event_sender: Option<mpsc::Sender<StreamEvent>>,
94    /// Action callbacks
95    action_handlers: Arc<RwLock<HashMap<String, Box<dyn Fn(&StreamAction) + Send + Sync>>>>,
96    /// Running state
97    is_running: Arc<RwLock<bool>>,
98}
99
100impl StreamRuleEngine {
101    /// Create a new stream rule engine
102    pub fn new() -> Self {
103        let config = StreamConfig::default();
104        let kb = KnowledgeBase::new("StreamKB");
105        let rule_engine = RustRuleEngine::new(kb);
106
107        let window_manager = Arc::new(RwLock::new(WindowManager::new(
108            config.window_type.clone(),
109            config.window_duration,
110            config.max_events_per_window,
111            config.max_windows,
112        )));
113
114        let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
115            config.analytics_cache_ttl_ms,
116        )));
117
118        Self {
119            config,
120            rule_engine,
121            window_manager,
122            analytics,
123            event_sender: None,
124            action_handlers: Arc::new(RwLock::new(HashMap::new())),
125            is_running: Arc::new(RwLock::new(false)),
126        }
127    }
128
129    /// Create with custom configuration
130    pub fn with_config(config: StreamConfig) -> Self {
131        let kb = KnowledgeBase::new("StreamKB");
132        let rule_engine = RustRuleEngine::new(kb);
133
134        let window_manager = Arc::new(RwLock::new(WindowManager::new(
135            config.window_type.clone(),
136            config.window_duration,
137            config.max_events_per_window,
138            config.max_windows,
139        )));
140
141        let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
142            config.analytics_cache_ttl_ms,
143        )));
144
145        Self {
146            config,
147            rule_engine,
148            window_manager,
149            analytics,
150            event_sender: None,
151            action_handlers: Arc::new(RwLock::new(HashMap::new())),
152            is_running: Arc::new(RwLock::new(false)),
153        }
154    }
155
156    /// Add streaming rule from GRL string
157    pub async fn add_rule(&mut self, grl_rule: &str) -> Result<()> {
158        let rules = GRLParser::parse_rules(grl_rule)?;
159
160        for rule in rules {
161            self.rule_engine.knowledge_base_mut().add_rule(rule)?;
162        }
163
164        Ok(())
165    }
166
167    /// Add streaming rule from file
168    pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<()> {
169        let content = std::fs::read_to_string(path)?;
170        self.add_rule(&content).await
171    }
172
173    /// Register action handler
174    pub async fn register_action_handler<F>(&self, action_type: &str, handler: F)
175    where
176        F: Fn(&StreamAction) + Send + Sync + 'static,
177    {
178        let mut handlers = self.action_handlers.write().await;
179        handlers.insert(action_type.to_string(), Box::new(handler));
180    }
181
182    /// Start the streaming engine
183    pub async fn start(&mut self) -> Result<()> {
184        let (tx, mut rx) = mpsc::channel::<StreamEvent>(self.config.buffer_size);
185        self.event_sender = Some(tx);
186
187        // Set running state
188        {
189            let mut running = self.is_running.write().await;
190            *running = true;
191        }
192
193        // Clone shared components for the processing task
194        let window_manager = Arc::clone(&self.window_manager);
195        let _analytics = Arc::clone(&self.analytics);
196        let _action_handlers = Arc::clone(&self.action_handlers);
197        let is_running = Arc::clone(&self.is_running);
198        let processing_interval = self.config.processing_interval;
199
200        // Start event processing task
201        let _processing_task = tokio::spawn(async move {
202            let mut interval_timer = interval(processing_interval);
203            let mut event_batch = Vec::new();
204
205            loop {
206                tokio::select! {
207                    // Process incoming events
208                    event = rx.recv() => {
209                        match event {
210                            Some(event) => {
211                                event_batch.push(event);
212
213                                // Process batch when full or on timer
214                                if event_batch.len() >= 100 {
215                                    Self::process_event_batch(&window_manager, &event_batch).await;
216                                    event_batch.clear();
217                                }
218                            }
219                            None => break, // Channel closed
220                        }
221                    }
222
223                    // Timer tick for processing
224                    _ = interval_timer.tick() => {
225                        if !event_batch.is_empty() {
226                            Self::process_event_batch(&window_manager, &event_batch).await;
227                            event_batch.clear();
228                        }
229
230                        // Check if still running
231                        let running = is_running.read().await;
232                        if !*running {
233                            break;
234                        }
235                    }
236                }
237            }
238        });
239
240        Ok(())
241    }
242
243    /// Stop the streaming engine
244    pub async fn stop(&self) {
245        let mut running = self.is_running.write().await;
246        *running = false;
247    }
248
249    /// Send event to stream for processing
250    pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
251        if let Some(ref sender) = self.event_sender {
252            sender.send(event).await.map_err(|_| {
253                RuleEngineError::ExecutionError("Failed to send event to stream".to_string())
254            })?;
255        }
256        Ok(())
257    }
258
259    /// Process a batch of events
260    async fn process_event_batch(
261        window_manager: &Arc<RwLock<WindowManager>>,
262        events: &[StreamEvent],
263    ) {
264        let mut manager = window_manager.write().await;
265        for event in events {
266            manager.process_event(event.clone());
267        }
268    }
269
270    /// Execute rules against current window state
271    pub async fn execute_rules(&self) -> Result<StreamExecutionResult> {
272        let start_time = SystemTime::now()
273            .duration_since(UNIX_EPOCH)
274            .unwrap()
275            .as_millis() as u64;
276
277        let window_manager = self.window_manager.read().await;
278        let _analytics = self.analytics.read().await;
279
280        // Get current windows
281        let windows = window_manager.active_windows();
282        let mut total_events_processed = 0;
283        let mut rules_fired = 0;
284        let actions = Vec::new();
285        let mut analytics_results = HashMap::new();
286
287        // Process each window
288        for window in windows {
289            total_events_processed += window.count();
290
291            // Create facts from window data
292            let facts = Facts::new();
293
294            // Add window aggregations to facts
295            self.add_window_aggregations_to_facts(&facts, window)
296                .await?;
297
298            // Execute rules on this window
299            let result = self.rule_engine.execute(&facts)?;
300            rules_fired += result.rules_fired;
301
302            // Note: Traditional rule engine doesn't return actions,
303            // we'd need to extend it for streaming action capture
304            // For now, we create empty actions list
305        }
306
307        // Calculate analytics
308        if !windows.is_empty() {
309            let latest_window = windows.last().unwrap();
310            analytics_results.insert(
311                "total_events".to_string(),
312                Value::Number(total_events_processed as f64),
313            );
314            analytics_results.insert(
315                "window_count".to_string(),
316                Value::Number(windows.len() as f64),
317            );
318            analytics_results.insert(
319                "latest_window_events".to_string(),
320                Value::Number(latest_window.count() as f64),
321            );
322        }
323
324        let end_time = SystemTime::now()
325            .duration_since(UNIX_EPOCH)
326            .unwrap()
327            .as_millis() as u64;
328
329        Ok(StreamExecutionResult {
330            rules_fired,
331            events_processed: total_events_processed,
332            processing_time_ms: end_time - start_time,
333            actions,
334            analytics: analytics_results,
335        })
336    }
337
338    /// Add window aggregations to facts
339    async fn add_window_aggregations_to_facts(
340        &self,
341        facts: &Facts,
342        window: &TimeWindow,
343    ) -> Result<()> {
344        // Add basic window stats
345        facts.add_value("WindowEventCount", Value::Number(window.count() as f64))?;
346        facts.add_value("WindowStartTime", Value::Number(window.start_time as f64))?;
347        facts.add_value("WindowEndTime", Value::Number(window.end_time as f64))?;
348        facts.add_value(
349            "WindowDurationMs",
350            Value::Number(window.duration_ms() as f64),
351        )?;
352
353        // Add common aggregations for numeric fields
354        let numeric_fields = self.detect_numeric_fields(window);
355        for field in numeric_fields {
356            if let Some(sum) = window
357                .events()
358                .iter()
359                .filter_map(|e| e.get_numeric(&field))
360                .reduce(|a, b| a + b)
361            {
362                facts.add_value(&format!("{}Sum", field), Value::Number(sum))?;
363            }
364
365            if let Some(avg) = window.average(&field) {
366                facts.add_value(&format!("{}Average", field), Value::Number(avg))?;
367            }
368
369            if let Some(min) = window.min(&field) {
370                facts.add_value(&format!("{}Min", field), Value::Number(min))?;
371            }
372
373            if let Some(max) = window.max(&field) {
374                facts.add_value(&format!("{}Max", field), Value::Number(max))?;
375            }
376        }
377
378        Ok(())
379    }
380
381    /// Detect numeric fields in window events
382    fn detect_numeric_fields(&self, window: &TimeWindow) -> Vec<String> {
383        let mut fields = std::collections::HashSet::new();
384
385        for event in window.events() {
386            for (key, value) in &event.data {
387                match value {
388                    Value::Number(_) | Value::Integer(_) => {
389                        fields.insert(key.clone());
390                    }
391                    _ => {}
392                }
393            }
394        }
395
396        fields.into_iter().collect()
397    }
398
399    /// Get current window statistics
400    pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
401        let window_manager = self.window_manager.read().await;
402        window_manager.get_statistics()
403    }
404
405    /// Get analytics for a specific field
406    pub async fn get_field_analytics(&self, field: &str) -> HashMap<String, Value> {
407        let window_manager = self.window_manager.read().await;
408        let mut results = HashMap::new();
409
410        let windows = window_manager.active_windows();
411        if windows.is_empty() {
412            return results;
413        }
414
415        // Calculate aggregations across all windows
416        let total_sum: f64 = windows.iter().map(|w| w.sum(field)).sum();
417        let total_count: usize = windows.iter().map(|w| w.count()).sum();
418
419        results.insert("total_sum".to_string(), Value::Number(total_sum));
420        results.insert("total_count".to_string(), Value::Number(total_count as f64));
421
422        if total_count > 0 {
423            results.insert(
424                "overall_average".to_string(),
425                Value::Number(total_sum / total_count as f64),
426            );
427        }
428
429        // Get min/max across all windows
430        let all_values: Vec<f64> = windows
431            .iter()
432            .flat_map(|w| w.events().iter().filter_map(|e| e.get_numeric(field)))
433            .collect();
434
435        if !all_values.is_empty() {
436            let min = all_values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
437            let max = all_values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
438
439            results.insert("global_min".to_string(), Value::Number(min));
440            results.insert("global_max".to_string(), Value::Number(max));
441        }
442
443        results
444    }
445
446    /// Check if engine is running
447    pub async fn is_running(&self) -> bool {
448        let running = self.is_running.read().await;
449        *running
450    }
451}
452
453impl Default for StreamRuleEngine {
454    fn default() -> Self {
455        Self::new()
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use crate::types::Value;
463    use std::collections::HashMap;
464
465    #[tokio::test]
466    async fn test_stream_engine_creation() {
467        let engine = StreamRuleEngine::new();
468        assert!(!engine.is_running().await);
469    }
470
471    #[tokio::test]
472    async fn test_add_streaming_rule() {
473        let mut engine = StreamRuleEngine::new();
474
475        let rule = r#"
476        rule "TestStreamRule" salience 10 {
477            when
478                WindowEventCount > 5
479            then
480                log("High event count detected");
481        }
482        "#;
483
484        assert!(engine.add_rule(rule).await.is_ok());
485    }
486
487    #[tokio::test]
488    async fn test_event_processing() {
489        let mut engine = StreamRuleEngine::new();
490        engine.start().await.unwrap();
491
492        let mut data = HashMap::new();
493        data.insert("value".to_string(), Value::Number(10.0));
494
495        let event = StreamEvent::new("TestEvent", data, "test_source");
496        assert!(engine.send_event(event).await.is_ok());
497
498        engine.stop().await;
499    }
500}