Skip to main content

rust_rule_engine/streaming/
mod.rs

1//! Rule Streaming Engine - Real-time Event Processing
2//!
3//! This module provides real-time rule execution capabilities for streaming data,
4//! including time-based windows, event aggregation, and continuous rule evaluation.
5//!
6//! ## Features
7//!
8//! - **🔄 Continuous Processing**: Non-stop rule evaluation on incoming events
9//! - **⏰ Time Windows**: Sliding and tumbling window aggregations
10//! - **📊 Stream Analytics**: Count, sum, average, min/max over time windows
11//! - **🎯 Event Filtering**: Pattern matching and event correlation
12//! - **⚡ High Throughput**: Async processing with backpressure handling
13//!
14//! ## Example
15//!
16//! ```rust,ignore
17//! use rust_rule_engine::streaming::*;
18//!
19//! let mut stream_engine = StreamRuleEngine::new()
20//!     .with_window_size(Duration::from_secs(60))
21//!     .with_buffer_size(1000);
22//!
23//! // Define streaming rule
24//! let rule = r#"
25//! rule "HighFrequencyTrading" {
26//!     when
27//!         stream(TradeEvent, 5s).count() > 100
28//!     then
29//!         AlertService.trigger("High frequency detected");
30//! }
31//! "#;
32//!
33//! stream_engine.add_rule(rule).await?;
34//! stream_engine.start().await?;
35//! ```
36
37#[cfg(feature = "streaming")]
38pub mod aggregator;
39#[cfg(feature = "streaming")]
40pub mod engine;
41#[cfg(feature = "streaming")]
42pub mod event;
43#[cfg(feature = "streaming")]
44pub mod join_manager;
45#[cfg(feature = "streaming")]
46pub mod join_optimizer;
47#[cfg(feature = "streaming")]
48pub mod operators;
49#[cfg(feature = "streaming")]
50pub mod state;
51#[cfg(feature = "streaming")]
52pub mod watermark;
53#[cfg(feature = "streaming")]
54pub mod window;
55
56#[cfg(feature = "streaming")]
57pub use aggregator::{AggregationType, Aggregator};
58#[cfg(feature = "streaming")]
59pub use engine::StreamRuleEngine;
60#[cfg(feature = "streaming")]
61pub use event::{EventMetadata, StreamEvent};
62#[cfg(feature = "streaming")]
63pub use join_manager::StreamJoinManager;
64#[cfg(feature = "streaming")]
65pub use join_optimizer::{JoinOptimization, JoinOptimizer, OptimizedJoinPlan, StreamStats};
66#[cfg(feature = "streaming")]
67pub use operators::{
68    AggregateResult, Aggregation, Average, Count, CustomAggregator, DataStream, GroupedStream,
69    KeyedStream, Max, Min, Sum, WindowConfig, WindowedStream,
70};
71#[cfg(feature = "streaming")]
72pub use state::{
73    CheckpointMetadata, StateBackend, StateConfig, StateStatistics, StateStore, StatefulOperator,
74};
75#[cfg(feature = "streaming")]
76pub use watermark::{
77    LateDataHandler, LateDataStats, LateDataStrategy, LateEventDecision, Watermark,
78    WatermarkGenerator, WatermarkStrategy, WatermarkedStream,
79};
80#[cfg(feature = "streaming")]
81pub use window::{TimeWindow, WindowManager, WindowType};
82
83/// Re-export for non-streaming builds
84#[cfg(not(feature = "streaming"))]
85pub struct StreamRuleEngine;
86
87#[cfg(not(feature = "streaming"))]
88impl StreamRuleEngine {
89    /// Create a new stream rule engine (non-streaming placeholder)
90    pub fn new() -> Self {
91        StreamRuleEngine
92    }
93
94    /// Create with custom configuration (requires streaming feature)
95    pub fn with_config(_config: StreamConfig) -> Self {
96        panic!("StreamRuleEngine configuration methods require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
97    }
98
99    /// Add streaming rule from GRL string (requires streaming feature)
100    pub async fn add_rule(&mut self, _grl_rule: &str) -> Result<()> {
101        Err(crate::RuleEngineError::FeatureNotEnabled {
102            feature: "streaming".to_string(),
103            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
104                .to_string(),
105        })
106    }
107
108    /// Add streaming rule from file (requires streaming feature)
109    pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, _path: P) -> Result<()> {
110        Err(crate::RuleEngineError::FeatureNotEnabled {
111            feature: "streaming".to_string(),
112            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
113                .to_string(),
114        })
115    }
116
117    /// Register action handler (requires streaming feature)
118    pub async fn register_action_handler<F>(&self, _action_type: &str, _handler: F)
119    where
120        F: Fn(&StreamAction) + Send + Sync + 'static,
121    {
122        panic!("StreamRuleEngine action handlers require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
123    }
124
125    /// Start the streaming engine (requires streaming feature)
126    pub async fn start(&mut self) -> Result<()> {
127        Err(crate::RuleEngineError::FeatureNotEnabled {
128            feature: "streaming".to_string(),
129            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
130                .to_string(),
131        })
132    }
133
134    /// Stop the streaming engine (requires streaming feature)
135    pub async fn stop(&self) {
136        panic!("StreamRuleEngine stop method requires the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
137    }
138
139    /// Send event to streaming engine (requires streaming feature)
140    pub async fn send_event(&self, _event: StreamEvent) -> Result<()> {
141        Err(crate::RuleEngineError::FeatureNotEnabled {
142            feature: "streaming".to_string(),
143            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
144                .to_string(),
145        })
146    }
147
148    /// Execute rules manually (requires streaming feature)
149    pub async fn execute_rules(&mut self) -> Result<StreamExecutionResult> {
150        Err(crate::RuleEngineError::FeatureNotEnabled {
151            feature: "streaming".to_string(),
152            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
153                .to_string(),
154        })
155    }
156
157    /// Get window statistics (requires streaming feature)
158    pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
159        panic!("StreamRuleEngine window statistics require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
160    }
161
162    /// Get field analytics (requires streaming feature)
163    pub async fn get_field_analytics(
164        &self,
165        _field: &str,
166    ) -> std::collections::HashMap<String, crate::types::Value> {
167        panic!("StreamRuleEngine field analytics require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
168    }
169
170    /// Check if engine is running (requires streaming feature)
171    pub async fn is_running(&self) -> bool {
172        panic!("StreamRuleEngine running status requires the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
173    }
174}