rust_rule_engine/streaming/
engine.rs

1//! Streaming Rule Engine
2//!
3//! Core engine for processing real-time event streams with rule evaluation.
4
5#![allow(clippy::type_complexity)]
6
7use crate::engine::facts::Facts;
8use crate::engine::knowledge_base::KnowledgeBase;
9use crate::engine::RustRuleEngine;
10use crate::parser::grl::GRLParser;
11use crate::streaming::aggregator::StreamAnalytics;
12use crate::streaming::event::StreamEvent;
13use crate::streaming::window::{TimeWindow, WindowManager, WindowType};
14use crate::types::Value;
15use crate::{Result, RuleEngineError};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20use tokio::sync::{mpsc, RwLock};
21use tokio::time::interval;
22
23/// Configuration for stream rule engine
24#[derive(Debug, Clone)]
25pub struct StreamConfig {
26    /// Buffer size for incoming events
27    pub buffer_size: usize,
28    /// Window duration for aggregations
29    pub window_duration: Duration,
30    /// Maximum events per window
31    pub max_events_per_window: usize,
32    /// Maximum number of windows to keep
33    pub max_windows: usize,
34    /// Window type (sliding, tumbling, etc.)
35    pub window_type: WindowType,
36    /// Cache TTL for analytics
37    pub analytics_cache_ttl_ms: u64,
38    /// Processing interval for rule evaluation
39    pub processing_interval: Duration,
40}
41
42impl Default for StreamConfig {
43    fn default() -> Self {
44        Self {
45            buffer_size: 10000,
46            window_duration: Duration::from_secs(60),
47            max_events_per_window: 1000,
48            max_windows: 100,
49            window_type: WindowType::Sliding,
50            analytics_cache_ttl_ms: 30000,
51            processing_interval: Duration::from_millis(100),
52        }
53    }
54}
55
56/// Result of stream rule execution
57#[derive(Debug, Clone)]
58pub struct StreamExecutionResult {
59    /// Number of rules that fired
60    pub rules_fired: usize,
61    /// Number of events processed
62    pub events_processed: usize,
63    /// Processing duration
64    pub processing_time_ms: u64,
65    /// Triggered actions
66    pub actions: Vec<StreamAction>,
67    /// Analytics results
68    pub analytics: HashMap<String, Value>,
69}
70
71/// Action triggered by stream rules
72#[derive(Debug, Clone)]
73pub struct StreamAction {
74    /// Action type identifier
75    pub action_type: String,
76    /// Action parameters
77    pub parameters: HashMap<String, Value>,
78    /// Timestamp when action was triggered
79    pub timestamp: u64,
80    /// Rule that triggered this action
81    pub rule_name: String,
82}
83
84/// Main streaming rule engine
85pub struct StreamRuleEngine {
86    /// Configuration
87    config: StreamConfig,
88    /// Regular rule engine for rule evaluation
89    rule_engine: RustRuleEngine,
90    /// Window manager for time-based processing
91    window_manager: Arc<RwLock<WindowManager>>,
92    /// Stream analytics
93    analytics: Arc<RwLock<StreamAnalytics>>,
94    /// Event sender
95    event_sender: Option<mpsc::Sender<StreamEvent>>,
96    /// Action callbacks
97    action_handlers: Arc<RwLock<HashMap<String, Box<dyn Fn(&StreamAction) + Send + Sync>>>>,
98    /// Running state
99    is_running: Arc<RwLock<bool>>,
100}
101
102impl StreamRuleEngine {
103    /// Create a new stream rule engine
104    pub fn new() -> Self {
105        let config = StreamConfig::default();
106        let kb = KnowledgeBase::new("StreamKB");
107        let rule_engine = RustRuleEngine::new(kb);
108
109        let window_manager = Arc::new(RwLock::new(WindowManager::new(
110            config.window_type.clone(),
111            config.window_duration,
112            config.max_events_per_window,
113            config.max_windows,
114        )));
115
116        let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
117            config.analytics_cache_ttl_ms,
118        )));
119
120        Self {
121            config,
122            rule_engine,
123            window_manager,
124            analytics,
125            event_sender: None,
126            action_handlers: Arc::new(RwLock::new(HashMap::new())),
127            is_running: Arc::new(RwLock::new(false)),
128        }
129    }
130
131    /// Create with custom configuration
132    pub fn with_config(config: StreamConfig) -> Self {
133        let kb = KnowledgeBase::new("StreamKB");
134        let rule_engine = RustRuleEngine::new(kb);
135
136        let window_manager = Arc::new(RwLock::new(WindowManager::new(
137            config.window_type.clone(),
138            config.window_duration,
139            config.max_events_per_window,
140            config.max_windows,
141        )));
142
143        let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
144            config.analytics_cache_ttl_ms,
145        )));
146
147        Self {
148            config,
149            rule_engine,
150            window_manager,
151            analytics,
152            event_sender: None,
153            action_handlers: Arc::new(RwLock::new(HashMap::new())),
154            is_running: Arc::new(RwLock::new(false)),
155        }
156    }
157
158    /// Add streaming rule from GRL string
159    pub async fn add_rule(&mut self, grl_rule: &str) -> Result<()> {
160        let rules = GRLParser::parse_rules(grl_rule)?;
161
162        for rule in rules {
163            self.rule_engine.knowledge_base_mut().add_rule(rule)?;
164        }
165
166        Ok(())
167    }
168
169    /// Add streaming rule from file
170    pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<()> {
171        let content = std::fs::read_to_string(path)?;
172        self.add_rule(&content).await
173    }
174
175    /// Register action handler
176    pub async fn register_action_handler<F>(&self, action_type: &str, handler: F)
177    where
178        F: Fn(&StreamAction) + Send + Sync + 'static,
179    {
180        let mut handlers = self.action_handlers.write().await;
181        handlers.insert(action_type.to_string(), Box::new(handler));
182    }
183
184    /// Start the streaming engine
185    pub async fn start(&mut self) -> Result<()> {
186        let (tx, mut rx) = mpsc::channel::<StreamEvent>(self.config.buffer_size);
187        self.event_sender = Some(tx);
188
189        // Set running state
190        {
191            let mut running = self.is_running.write().await;
192            *running = true;
193        }
194
195        // Clone shared components for the processing task
196        let window_manager = Arc::clone(&self.window_manager);
197        let _analytics = Arc::clone(&self.analytics);
198        let _action_handlers = Arc::clone(&self.action_handlers);
199        let is_running = Arc::clone(&self.is_running);
200        let processing_interval = self.config.processing_interval;
201
202        // Start event processing task
203        let _processing_task = tokio::spawn(async move {
204            let mut interval_timer = interval(processing_interval);
205            let mut event_batch = Vec::new();
206
207            loop {
208                tokio::select! {
209                    // Process incoming events
210                    event = rx.recv() => {
211                        match event {
212                            Some(event) => {
213                                event_batch.push(event);
214
215                                // Process batch when full or on timer
216                                if event_batch.len() >= 100 {
217                                    Self::process_event_batch(&window_manager, &event_batch).await;
218                                    event_batch.clear();
219                                }
220                            }
221                            None => break, // Channel closed
222                        }
223                    }
224
225                    // Timer tick for processing
226                    _ = interval_timer.tick() => {
227                        if !event_batch.is_empty() {
228                            Self::process_event_batch(&window_manager, &event_batch).await;
229                            event_batch.clear();
230                        }
231
232                        // Check if still running
233                        let running = is_running.read().await;
234                        if !*running {
235                            break;
236                        }
237                    }
238                }
239            }
240        });
241
242        Ok(())
243    }
244
245    /// Stop the streaming engine
246    pub async fn stop(&self) {
247        let mut running = self.is_running.write().await;
248        *running = false;
249    }
250
251    /// Send event to stream for processing
252    pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
253        if let Some(ref sender) = self.event_sender {
254            sender.send(event).await.map_err(|_| {
255                RuleEngineError::ExecutionError("Failed to send event to stream".to_string())
256            })?;
257        }
258        Ok(())
259    }
260
261    /// Process a batch of events
262    async fn process_event_batch(
263        window_manager: &Arc<RwLock<WindowManager>>,
264        events: &[StreamEvent],
265    ) {
266        let mut manager = window_manager.write().await;
267        for event in events {
268            manager.process_event(event.clone());
269        }
270    }
271
272    /// Execute rules against current window state
273    pub async fn execute_rules(&mut self) -> Result<StreamExecutionResult> {
274        let start_time = SystemTime::now()
275            .duration_since(UNIX_EPOCH)
276            .unwrap()
277            .as_millis() as u64;
278
279        let window_manager = self.window_manager.read().await;
280        let _analytics = self.analytics.read().await;
281
282        // Get current windows
283        let windows = window_manager.active_windows();
284        let mut total_events_processed = 0;
285        let mut rules_fired = 0;
286        let actions = Vec::new();
287        let mut analytics_results = HashMap::new();
288
289        // Process each window
290        for window in windows {
291            total_events_processed += window.count();
292
293            // Create facts from window data
294            let facts = Facts::new();
295
296            // Add window aggregations to facts
297            self.add_window_aggregations_to_facts(&facts, window)
298                .await?;
299
300            // Execute rules on this window
301            let result = self.rule_engine.execute(&facts)?;
302            rules_fired += result.rules_fired;
303
304            // Note: Traditional rule engine doesn't return actions,
305            // we'd need to extend it for streaming action capture
306            // For now, we create empty actions list
307        }
308
309        // Calculate analytics
310        if !windows.is_empty() {
311            let latest_window = windows.last().unwrap();
312            analytics_results.insert(
313                "total_events".to_string(),
314                Value::Number(total_events_processed as f64),
315            );
316            analytics_results.insert(
317                "window_count".to_string(),
318                Value::Number(windows.len() as f64),
319            );
320            analytics_results.insert(
321                "latest_window_events".to_string(),
322                Value::Number(latest_window.count() as f64),
323            );
324        }
325
326        let end_time = SystemTime::now()
327            .duration_since(UNIX_EPOCH)
328            .unwrap()
329            .as_millis() as u64;
330
331        Ok(StreamExecutionResult {
332            rules_fired,
333            events_processed: total_events_processed,
334            processing_time_ms: end_time - start_time,
335            actions,
336            analytics: analytics_results,
337        })
338    }
339
340    /// Add window aggregations to facts
341    async fn add_window_aggregations_to_facts(
342        &self,
343        facts: &Facts,
344        window: &TimeWindow,
345    ) -> Result<()> {
346        // Add basic window stats
347        facts.add_value("WindowEventCount", Value::Number(window.count() as f64))?;
348        facts.add_value("WindowStartTime", Value::Number(window.start_time as f64))?;
349        facts.add_value("WindowEndTime", Value::Number(window.end_time as f64))?;
350        facts.add_value(
351            "WindowDurationMs",
352            Value::Number(window.duration_ms() as f64),
353        )?;
354
355        // Add common aggregations for numeric fields
356        let numeric_fields = self.detect_numeric_fields(window);
357        for field in numeric_fields {
358            if let Some(sum) = window
359                .events()
360                .iter()
361                .filter_map(|e| e.get_numeric(&field))
362                .reduce(|a, b| a + b)
363            {
364                facts.add_value(&format!("{}Sum", field), Value::Number(sum))?;
365            }
366
367            if let Some(avg) = window.average(&field) {
368                facts.add_value(&format!("{}Average", field), Value::Number(avg))?;
369            }
370
371            if let Some(min) = window.min(&field) {
372                facts.add_value(&format!("{}Min", field), Value::Number(min))?;
373            }
374
375            if let Some(max) = window.max(&field) {
376                facts.add_value(&format!("{}Max", field), Value::Number(max))?;
377            }
378        }
379
380        Ok(())
381    }
382
383    /// Detect numeric fields in window events
384    fn detect_numeric_fields(&self, window: &TimeWindow) -> Vec<String> {
385        let mut fields = std::collections::HashSet::new();
386
387        for event in window.events() {
388            for (key, value) in &event.data {
389                match value {
390                    Value::Number(_) | Value::Integer(_) => {
391                        fields.insert(key.clone());
392                    }
393                    _ => {}
394                }
395            }
396        }
397
398        fields.into_iter().collect()
399    }
400
401    /// Get current window statistics
402    pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
403        let window_manager = self.window_manager.read().await;
404        window_manager.get_statistics()
405    }
406
407    /// Get analytics for a specific field
408    pub async fn get_field_analytics(&self, field: &str) -> HashMap<String, Value> {
409        let window_manager = self.window_manager.read().await;
410        let mut results = HashMap::new();
411
412        let windows = window_manager.active_windows();
413        if windows.is_empty() {
414            return results;
415        }
416
417        // Calculate aggregations across all windows
418        let total_sum: f64 = windows.iter().map(|w| w.sum(field)).sum();
419        let total_count: usize = windows.iter().map(|w| w.count()).sum();
420
421        results.insert("total_sum".to_string(), Value::Number(total_sum));
422        results.insert("total_count".to_string(), Value::Number(total_count as f64));
423
424        if total_count > 0 {
425            results.insert(
426                "overall_average".to_string(),
427                Value::Number(total_sum / total_count as f64),
428            );
429        }
430
431        // Get min/max across all windows
432        let all_values: Vec<f64> = windows
433            .iter()
434            .flat_map(|w| w.events().iter().filter_map(|e| e.get_numeric(field)))
435            .collect();
436
437        if !all_values.is_empty() {
438            let min = all_values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
439            let max = all_values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
440
441            results.insert("global_min".to_string(), Value::Number(min));
442            results.insert("global_max".to_string(), Value::Number(max));
443        }
444
445        results
446    }
447
448    /// Check if engine is running
449    pub async fn is_running(&self) -> bool {
450        let running = self.is_running.read().await;
451        *running
452    }
453}
454
455impl Default for StreamRuleEngine {
456    fn default() -> Self {
457        Self::new()
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use crate::types::Value;
465    use std::collections::HashMap;
466
467    #[tokio::test]
468    async fn test_stream_engine_creation() {
469        let engine = StreamRuleEngine::new();
470        assert!(!engine.is_running().await);
471    }
472
473    #[tokio::test]
474    async fn test_add_streaming_rule() {
475        let mut engine = StreamRuleEngine::new();
476
477        let rule = r#"
478        rule "TestStreamRule" salience 10 {
479            when
480                WindowEventCount > 5
481            then
482                log("High event count detected");
483        }
484        "#;
485
486        assert!(engine.add_rule(rule).await.is_ok());
487    }
488
489    #[tokio::test]
490    async fn test_event_processing() {
491        let mut engine = StreamRuleEngine::new();
492        engine.start().await.unwrap();
493
494        let mut data = HashMap::new();
495        data.insert("value".to_string(), Value::Number(10.0));
496
497        let event = StreamEvent::new("TestEvent", data, "test_source");
498        assert!(engine.send_event(event).await.is_ok());
499
500        engine.stop().await;
501    }
502}