rust_rule_engine/streaming/
engine.rs1#![allow(clippy::type_complexity)]
6
7use crate::engine::facts::Facts;
8use crate::engine::knowledge_base::KnowledgeBase;
9use crate::engine::RustRuleEngine;
10use crate::parser::grl::GRLParser;
11use crate::streaming::aggregator::StreamAnalytics;
12use crate::streaming::event::StreamEvent;
13use crate::streaming::window::{TimeWindow, WindowManager, WindowType};
14use crate::types::Value;
15use crate::{Result, RuleEngineError};
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20use tokio::sync::{mpsc, RwLock};
21use tokio::time::interval;
22
23#[derive(Debug, Clone)]
25pub struct StreamConfig {
26 pub buffer_size: usize,
28 pub window_duration: Duration,
30 pub max_events_per_window: usize,
32 pub max_windows: usize,
34 pub window_type: WindowType,
36 pub analytics_cache_ttl_ms: u64,
38 pub processing_interval: Duration,
40}
41
42impl Default for StreamConfig {
43 fn default() -> Self {
44 Self {
45 buffer_size: 10000,
46 window_duration: Duration::from_secs(60),
47 max_events_per_window: 1000,
48 max_windows: 100,
49 window_type: WindowType::Sliding,
50 analytics_cache_ttl_ms: 30000,
51 processing_interval: Duration::from_millis(100),
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct StreamExecutionResult {
59 pub rules_fired: usize,
61 pub events_processed: usize,
63 pub processing_time_ms: u64,
65 pub actions: Vec<StreamAction>,
67 pub analytics: HashMap<String, Value>,
69}
70
71#[derive(Debug, Clone)]
73pub struct StreamAction {
74 pub action_type: String,
76 pub parameters: HashMap<String, Value>,
78 pub timestamp: u64,
80 pub rule_name: String,
82}
83
84pub struct StreamRuleEngine {
86 config: StreamConfig,
88 rule_engine: RustRuleEngine,
90 window_manager: Arc<RwLock<WindowManager>>,
92 analytics: Arc<RwLock<StreamAnalytics>>,
94 event_sender: Option<mpsc::Sender<StreamEvent>>,
96 action_handlers: Arc<RwLock<HashMap<String, Box<dyn Fn(&StreamAction) + Send + Sync>>>>,
98 is_running: Arc<RwLock<bool>>,
100}
101
102impl StreamRuleEngine {
103 pub fn new() -> Self {
105 let config = StreamConfig::default();
106 let kb = KnowledgeBase::new("StreamKB");
107 let rule_engine = RustRuleEngine::new(kb);
108
109 let window_manager = Arc::new(RwLock::new(WindowManager::new(
110 config.window_type.clone(),
111 config.window_duration,
112 config.max_events_per_window,
113 config.max_windows,
114 )));
115
116 let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
117 config.analytics_cache_ttl_ms,
118 )));
119
120 Self {
121 config,
122 rule_engine,
123 window_manager,
124 analytics,
125 event_sender: None,
126 action_handlers: Arc::new(RwLock::new(HashMap::new())),
127 is_running: Arc::new(RwLock::new(false)),
128 }
129 }
130
131 pub fn with_config(config: StreamConfig) -> Self {
133 let kb = KnowledgeBase::new("StreamKB");
134 let rule_engine = RustRuleEngine::new(kb);
135
136 let window_manager = Arc::new(RwLock::new(WindowManager::new(
137 config.window_type.clone(),
138 config.window_duration,
139 config.max_events_per_window,
140 config.max_windows,
141 )));
142
143 let analytics = Arc::new(RwLock::new(StreamAnalytics::new(
144 config.analytics_cache_ttl_ms,
145 )));
146
147 Self {
148 config,
149 rule_engine,
150 window_manager,
151 analytics,
152 event_sender: None,
153 action_handlers: Arc::new(RwLock::new(HashMap::new())),
154 is_running: Arc::new(RwLock::new(false)),
155 }
156 }
157
158 pub async fn add_rule(&mut self, grl_rule: &str) -> Result<()> {
160 let rules = GRLParser::parse_rules(grl_rule)?;
161
162 for rule in rules {
163 self.rule_engine.knowledge_base_mut().add_rule(rule)?;
164 }
165
166 Ok(())
167 }
168
169 pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<()> {
171 let content = std::fs::read_to_string(path)?;
172 self.add_rule(&content).await
173 }
174
175 pub async fn register_action_handler<F>(&self, action_type: &str, handler: F)
177 where
178 F: Fn(&StreamAction) + Send + Sync + 'static,
179 {
180 let mut handlers = self.action_handlers.write().await;
181 handlers.insert(action_type.to_string(), Box::new(handler));
182 }
183
184 pub async fn start(&mut self) -> Result<()> {
186 let (tx, mut rx) = mpsc::channel::<StreamEvent>(self.config.buffer_size);
187 self.event_sender = Some(tx);
188
189 {
191 let mut running = self.is_running.write().await;
192 *running = true;
193 }
194
195 let window_manager = Arc::clone(&self.window_manager);
197 let _analytics = Arc::clone(&self.analytics);
198 let _action_handlers = Arc::clone(&self.action_handlers);
199 let is_running = Arc::clone(&self.is_running);
200 let processing_interval = self.config.processing_interval;
201
202 let _processing_task = tokio::spawn(async move {
204 let mut interval_timer = interval(processing_interval);
205 let mut event_batch = Vec::new();
206
207 loop {
208 tokio::select! {
209 event = rx.recv() => {
211 match event {
212 Some(event) => {
213 event_batch.push(event);
214
215 if event_batch.len() >= 100 {
217 Self::process_event_batch(&window_manager, &event_batch).await;
218 event_batch.clear();
219 }
220 }
221 None => break, }
223 }
224
225 _ = interval_timer.tick() => {
227 if !event_batch.is_empty() {
228 Self::process_event_batch(&window_manager, &event_batch).await;
229 event_batch.clear();
230 }
231
232 let running = is_running.read().await;
234 if !*running {
235 break;
236 }
237 }
238 }
239 }
240 });
241
242 Ok(())
243 }
244
245 pub async fn stop(&self) {
247 let mut running = self.is_running.write().await;
248 *running = false;
249 }
250
251 pub async fn send_event(&self, event: StreamEvent) -> Result<()> {
253 if let Some(ref sender) = self.event_sender {
254 sender.send(event).await.map_err(|_| {
255 RuleEngineError::ExecutionError("Failed to send event to stream".to_string())
256 })?;
257 }
258 Ok(())
259 }
260
261 async fn process_event_batch(
263 window_manager: &Arc<RwLock<WindowManager>>,
264 events: &[StreamEvent],
265 ) {
266 let mut manager = window_manager.write().await;
267 for event in events {
268 manager.process_event(event.clone());
269 }
270 }
271
272 pub async fn execute_rules(&mut self) -> Result<StreamExecutionResult> {
274 let start_time = SystemTime::now()
275 .duration_since(UNIX_EPOCH)
276 .unwrap()
277 .as_millis() as u64;
278
279 let window_manager = self.window_manager.read().await;
280 let _analytics = self.analytics.read().await;
281
282 let windows = window_manager.active_windows();
284 let mut total_events_processed = 0;
285 let mut rules_fired = 0;
286 let actions = Vec::new();
287 let mut analytics_results = HashMap::new();
288
289 for window in windows {
291 total_events_processed += window.count();
292
293 let facts = Facts::new();
295
296 self.add_window_aggregations_to_facts(&facts, window)
298 .await?;
299
300 let result = self.rule_engine.execute(&facts)?;
302 rules_fired += result.rules_fired;
303
304 }
308
309 if !windows.is_empty() {
311 let latest_window = windows.last().unwrap();
312 analytics_results.insert(
313 "total_events".to_string(),
314 Value::Number(total_events_processed as f64),
315 );
316 analytics_results.insert(
317 "window_count".to_string(),
318 Value::Number(windows.len() as f64),
319 );
320 analytics_results.insert(
321 "latest_window_events".to_string(),
322 Value::Number(latest_window.count() as f64),
323 );
324 }
325
326 let end_time = SystemTime::now()
327 .duration_since(UNIX_EPOCH)
328 .unwrap()
329 .as_millis() as u64;
330
331 Ok(StreamExecutionResult {
332 rules_fired,
333 events_processed: total_events_processed,
334 processing_time_ms: end_time - start_time,
335 actions,
336 analytics: analytics_results,
337 })
338 }
339
340 async fn add_window_aggregations_to_facts(
342 &self,
343 facts: &Facts,
344 window: &TimeWindow,
345 ) -> Result<()> {
346 facts.add_value("WindowEventCount", Value::Number(window.count() as f64))?;
348 facts.add_value("WindowStartTime", Value::Number(window.start_time as f64))?;
349 facts.add_value("WindowEndTime", Value::Number(window.end_time as f64))?;
350 facts.add_value(
351 "WindowDurationMs",
352 Value::Number(window.duration_ms() as f64),
353 )?;
354
355 let numeric_fields = self.detect_numeric_fields(window);
357 for field in numeric_fields {
358 if let Some(sum) = window
359 .events()
360 .iter()
361 .filter_map(|e| e.get_numeric(&field))
362 .reduce(|a, b| a + b)
363 {
364 facts.add_value(&format!("{}Sum", field), Value::Number(sum))?;
365 }
366
367 if let Some(avg) = window.average(&field) {
368 facts.add_value(&format!("{}Average", field), Value::Number(avg))?;
369 }
370
371 if let Some(min) = window.min(&field) {
372 facts.add_value(&format!("{}Min", field), Value::Number(min))?;
373 }
374
375 if let Some(max) = window.max(&field) {
376 facts.add_value(&format!("{}Max", field), Value::Number(max))?;
377 }
378 }
379
380 Ok(())
381 }
382
383 fn detect_numeric_fields(&self, window: &TimeWindow) -> Vec<String> {
385 let mut fields = std::collections::HashSet::new();
386
387 for event in window.events() {
388 for (key, value) in &event.data {
389 match value {
390 Value::Number(_) | Value::Integer(_) => {
391 fields.insert(key.clone());
392 }
393 _ => {}
394 }
395 }
396 }
397
398 fields.into_iter().collect()
399 }
400
401 pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
403 let window_manager = self.window_manager.read().await;
404 window_manager.get_statistics()
405 }
406
407 pub async fn get_field_analytics(&self, field: &str) -> HashMap<String, Value> {
409 let window_manager = self.window_manager.read().await;
410 let mut results = HashMap::new();
411
412 let windows = window_manager.active_windows();
413 if windows.is_empty() {
414 return results;
415 }
416
417 let total_sum: f64 = windows.iter().map(|w| w.sum(field)).sum();
419 let total_count: usize = windows.iter().map(|w| w.count()).sum();
420
421 results.insert("total_sum".to_string(), Value::Number(total_sum));
422 results.insert("total_count".to_string(), Value::Number(total_count as f64));
423
424 if total_count > 0 {
425 results.insert(
426 "overall_average".to_string(),
427 Value::Number(total_sum / total_count as f64),
428 );
429 }
430
431 let all_values: Vec<f64> = windows
433 .iter()
434 .flat_map(|w| w.events().iter().filter_map(|e| e.get_numeric(field)))
435 .collect();
436
437 if !all_values.is_empty() {
438 let min = all_values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
439 let max = all_values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
440
441 results.insert("global_min".to_string(), Value::Number(min));
442 results.insert("global_max".to_string(), Value::Number(max));
443 }
444
445 results
446 }
447
448 pub async fn is_running(&self) -> bool {
450 let running = self.is_running.read().await;
451 *running
452 }
453}
454
455impl Default for StreamRuleEngine {
456 fn default() -> Self {
457 Self::new()
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use crate::types::Value;
465 use std::collections::HashMap;
466
467 #[tokio::test]
468 async fn test_stream_engine_creation() {
469 let engine = StreamRuleEngine::new();
470 assert!(!engine.is_running().await);
471 }
472
473 #[tokio::test]
474 async fn test_add_streaming_rule() {
475 let mut engine = StreamRuleEngine::new();
476
477 let rule = r#"
478 rule "TestStreamRule" salience 10 {
479 when
480 WindowEventCount > 5
481 then
482 log("High event count detected");
483 }
484 "#;
485
486 assert!(engine.add_rule(rule).await.is_ok());
487 }
488
489 #[tokio::test]
490 async fn test_event_processing() {
491 let mut engine = StreamRuleEngine::new();
492 engine.start().await.unwrap();
493
494 let mut data = HashMap::new();
495 data.insert("value".to_string(), Value::Number(10.0));
496
497 let event = StreamEvent::new("TestEvent", data, "test_source");
498 assert!(engine.send_event(event).await.is_ok());
499
500 engine.stop().await;
501 }
502}