rust_rule_engine/streaming/
engine.rs1use crate::engine::facts::Facts;
6use crate::engine::knowledge_base::KnowledgeBase;
7use crate::engine::RustRuleEngine;
8use crate::parser::grl_parser::GRLParser;
9use crate::streaming::aggregator::StreamAnalytics;
10use crate::streaming::event::StreamEvent;
11use crate::streaming::window::{TimeWindow, WindowManager, WindowType};
12use crate::types::Value;
13use crate::{Result, RuleEngineError};
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18use tokio::sync::{mpsc, RwLock};
19use tokio::time::interval;
20
21#[derive(Debug, Clone)]
23pub struct StreamConfig {
24 pub buffer_size: usize,
26 pub window_duration: Duration,
28 pub max_events_per_window: usize,
30 pub max_windows: usize,
32 pub window_type: WindowType,
34 pub analytics_cache_ttl_ms: u64,
36 pub processing_interval: Duration,
38}
39
40impl Default for StreamConfig {
41 fn default() -> Self {
42 Self {
43 buffer_size: 10000,
44 window_duration: Duration::from_secs(60),
45 max_events_per_window: 1000,
46 max_windows: 100,
47 window_type: WindowType::Sliding,
48 analytics_cache_ttl_ms: 30000,
49 processing_interval: Duration::from_millis(100),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct StreamExecutionResult {
57 pub rules_fired: usize,
59 pub events_processed: usize,
61 pub processing_time_ms: u64,
63 pub actions: Vec<StreamAction>,
65 pub analytics: HashMap<String, Value>,
67}
68
69#[derive(Debug, Clone)]
71pub struct StreamAction {
72 pub action_type: String,
74 pub parameters: HashMap<String, Value>,
76 pub timestamp: u64,
78 pub rule_name: String,
80}
81
82pub struct StreamRuleEngine {
84 config: StreamConfig,
86 rule_engine: RustRuleEngine,
88 window_manager: Arc<RwLock<WindowManager>>,
90 analytics: Arc<RwLock<StreamAnalytics>>,
92 event_sender: Option<mpsc::Sender<StreamEvent>>,
94 action_handlers: Arc<RwLock<HashMap<String, Box<dyn Fn(&StreamAction) + Send + Sync>>>>,
96 is_running: Arc<RwLock<bool>>,
98}
99
100impl StreamRuleEngine {
101 pub fn new() -> Self {
103 let config = StreamConfig::default();
104 let kb = KnowledgeBase::new("StreamKB");
105 let rule_engine = RustRuleEngine::new(kb);
106
107 let window_manager = Arc::new(RwLock::new(WindowManager::new(
108 config.window_type.clone(),
109 config.window_duration,
110 config.max_events_per_window,
111 config.max_windows,
112 )));
113
114 let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
115 config.analytics_cache_ttl_ms,
116 )));
117
118 Self {
119 config,
120 rule_engine,
121 window_manager,
122 analytics,
123 event_sender: None,
124 action_handlers: Arc::new(RwLock::new(HashMap::new())),
125 is_running: Arc::new(RwLock::new(false)),
126 }
127 }
128
129 pub fn with_config(config: StreamConfig) -> Self {
131 let kb = KnowledgeBase::new("StreamKB");
132 let rule_engine = RustRuleEngine::new(kb);
133
134 let window_manager = Arc::new(RwLock::new(WindowManager::new(
135 config.window_type.clone(),
136 config.window_duration,
137 config.max_events_per_window,
138 config.max_windows,
139 )));
140
141 let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
142 config.analytics_cache_ttl_ms,
143 )));
144
145 Self {
146 config,
147 rule_engine,
148 window_manager,
149 analytics,
150 event_sender: None,
151 action_handlers: Arc::new(RwLock::new(HashMap::new())),
152 is_running: Arc::new(RwLock::new(false)),
153 }
154 }
155
156 pub async fn add_rule(&mut self, grl_rule: &str) -> Result<()> {
158 let rules = GRLParser::parse_rules(grl_rule)?;
159
160 for rule in rules {
161 self.rule_engine.knowledge_base_mut().add_rule(rule)?;
162 }
163
164 Ok(())
165 }
166
167 pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<()> {
169 let content = std::fs::read_to_string(path)?;
170 self.add_rule(&content).await
171 }
172
173 pub async fn register_action_handler<F>(&self, action_type: &str, handler: F)
175 where
176 F: Fn(&StreamAction) + Send + Sync + 'static,
177 {
178 let mut handlers = self.action_handlers.write().await;
179 handlers.insert(action_type.to_string(), Box::new(handler));
180 }
181
182 pub async fn start(&mut self) -> Result<()> {
184 let (tx, mut rx) = mpsc::channel::<StreamEvent>(self.config.buffer_size);
185 self.event_sender = Some(tx);
186
187 {
189 let mut running = self.is_running.write().await;
190 *running = true;
191 }
192
193 let window_manager = Arc::clone(&self.window_manager);
195 let _analytics = Arc::clone(&self.analytics);
196 let _action_handlers = Arc::clone(&self.action_handlers);
197 let is_running = Arc::clone(&self.is_running);
198 let processing_interval = self.config.processing_interval;
199
200 let _processing_task = tokio::spawn(async move {
202 let mut interval_timer = interval(processing_interval);
203 let mut event_batch = Vec::new();
204
205 loop {
206 tokio::select! {
207 event = rx.recv() => {
209 match event {
210 Some(event) => {
211 event_batch.push(event);
212
213 if event_batch.len() >= 100 {
215 Self::process_event_batch(&window_manager, &event_batch).await;
216 event_batch.clear();
217 }
218 }
219 None => break, }
221 }
222
223 _ = interval_timer.tick() => {
225 if !event_batch.is_empty() {
226 Self::process_event_batch(&window_manager, &event_batch).await;
227 event_batch.clear();
228 }
229
230 let running = is_running.read().await;
232 if !*running {
233 break;
234 }
235 }
236 }
237 }
238 });
239
240 Ok(())
241 }
242
243 pub async fn stop(&self) {
245 let mut running = self.is_running.write().await;
246 *running = false;
247 }
248
249 pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
251 if let Some(ref sender) = self.event_sender {
252 sender.send(event).await.map_err(|_| {
253 RuleEngineError::ExecutionError("Failed to send event to stream".to_string())
254 })?;
255 }
256 Ok(())
257 }
258
259 async fn process_event_batch(
261 window_manager: &Arc<RwLock<WindowManager>>,
262 events: &[StreamEvent],
263 ) {
264 let mut manager = window_manager.write().await;
265 for event in events {
266 manager.process_event(event.clone());
267 }
268 }
269
270 pub async fn execute_rules(&self) -> Result<StreamExecutionResult> {
272 let start_time = SystemTime::now()
273 .duration_since(UNIX_EPOCH)
274 .unwrap()
275 .as_millis() as u64;
276
277 let window_manager = self.window_manager.read().await;
278 let _analytics = self.analytics.read().await;
279
280 let windows = window_manager.active_windows();
282 let mut total_events_processed = 0;
283 let mut rules_fired = 0;
284 let actions = Vec::new();
285 let mut analytics_results = HashMap::new();
286
287 for window in windows {
289 total_events_processed += window.count();
290
291 let facts = Facts::new();
293
294 self.add_window_aggregations_to_facts(&facts, window)
296 .await?;
297
298 let result = self.rule_engine.execute(&facts)?;
300 rules_fired += result.rules_fired;
301
302 }
306
307 if !windows.is_empty() {
309 let latest_window = windows.last().unwrap();
310 analytics_results.insert(
311 "total_events".to_string(),
312 Value::Number(total_events_processed as f64),
313 );
314 analytics_results.insert(
315 "window_count".to_string(),
316 Value::Number(windows.len() as f64),
317 );
318 analytics_results.insert(
319 "latest_window_events".to_string(),
320 Value::Number(latest_window.count() as f64),
321 );
322 }
323
324 let end_time = SystemTime::now()
325 .duration_since(UNIX_EPOCH)
326 .unwrap()
327 .as_millis() as u64;
328
329 Ok(StreamExecutionResult {
330 rules_fired,
331 events_processed: total_events_processed,
332 processing_time_ms: end_time - start_time,
333 actions,
334 analytics: analytics_results,
335 })
336 }
337
338 async fn add_window_aggregations_to_facts(
340 &self,
341 facts: &Facts,
342 window: &TimeWindow,
343 ) -> Result<()> {
344 facts.add_value("WindowEventCount", Value::Number(window.count() as f64))?;
346 facts.add_value("WindowStartTime", Value::Number(window.start_time as f64))?;
347 facts.add_value("WindowEndTime", Value::Number(window.end_time as f64))?;
348 facts.add_value(
349 "WindowDurationMs",
350 Value::Number(window.duration_ms() as f64),
351 )?;
352
353 let numeric_fields = self.detect_numeric_fields(window);
355 for field in numeric_fields {
356 if let Some(sum) = window
357 .events()
358 .iter()
359 .filter_map(|e| e.get_numeric(&field))
360 .reduce(|a, b| a + b)
361 {
362 facts.add_value(&format!("{}Sum", field), Value::Number(sum))?;
363 }
364
365 if let Some(avg) = window.average(&field) {
366 facts.add_value(&format!("{}Average", field), Value::Number(avg))?;
367 }
368
369 if let Some(min) = window.min(&field) {
370 facts.add_value(&format!("{}Min", field), Value::Number(min))?;
371 }
372
373 if let Some(max) = window.max(&field) {
374 facts.add_value(&format!("{}Max", field), Value::Number(max))?;
375 }
376 }
377
378 Ok(())
379 }
380
381 fn detect_numeric_fields(&self, window: &TimeWindow) -> Vec<String> {
383 let mut fields = std::collections::HashSet::new();
384
385 for event in window.events() {
386 for (key, value) in &event.data {
387 match value {
388 Value::Number(_) | Value::Integer(_) => {
389 fields.insert(key.clone());
390 }
391 _ => {}
392 }
393 }
394 }
395
396 fields.into_iter().collect()
397 }
398
399 pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
401 let window_manager = self.window_manager.read().await;
402 window_manager.get_statistics()
403 }
404
405 pub async fn get_field_analytics(&self, field: &str) -> HashMap<String, Value> {
407 let window_manager = self.window_manager.read().await;
408 let mut results = HashMap::new();
409
410 let windows = window_manager.active_windows();
411 if windows.is_empty() {
412 return results;
413 }
414
415 let total_sum: f64 = windows.iter().map(|w| w.sum(field)).sum();
417 let total_count: usize = windows.iter().map(|w| w.count()).sum();
418
419 results.insert("total_sum".to_string(), Value::Number(total_sum));
420 results.insert("total_count".to_string(), Value::Number(total_count as f64));
421
422 if total_count > 0 {
423 results.insert(
424 "overall_average".to_string(),
425 Value::Number(total_sum / total_count as f64),
426 );
427 }
428
429 let all_values: Vec<f64> = windows
431 .iter()
432 .flat_map(|w| w.events().iter().filter_map(|e| e.get_numeric(field)))
433 .collect();
434
435 if !all_values.is_empty() {
436 let min = all_values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
437 let max = all_values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
438
439 results.insert("global_min".to_string(), Value::Number(min));
440 results.insert("global_max".to_string(), Value::Number(max));
441 }
442
443 results
444 }
445
446 pub async fn is_running(&self) -> bool {
448 let running = self.is_running.read().await;
449 *running
450 }
451}
452
453impl Default for StreamRuleEngine {
454 fn default() -> Self {
455 Self::new()
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::types::Value;
463 use std::collections::HashMap;
464
465 #[tokio::test]
466 async fn test_stream_engine_creation() {
467 let engine = StreamRuleEngine::new();
468 assert!(!engine.is_running().await);
469 }
470
471 #[tokio::test]
472 async fn test_add_streaming_rule() {
473 let mut engine = StreamRuleEngine::new();
474
475 let rule = r#"
476 rule "TestStreamRule" salience 10 {
477 when
478 WindowEventCount > 5
479 then
480 log("High event count detected");
481 }
482 "#;
483
484 assert!(engine.add_rule(rule).await.is_ok());
485 }
486
487 #[tokio::test]
488 async fn test_event_processing() {
489 let mut engine = StreamRuleEngine::new();
490 engine.start().await.unwrap();
491
492 let mut data = HashMap::new();
493 data.insert("value".to_string(), Value::Number(10.0));
494
495 let event = StreamEvent::new("TestEvent", data, "test_source");
496 assert!(engine.send_event(event).await.is_ok());
497
498 engine.stop().await;
499 }
500}