Skip to main content

rust_rule_engine/streaming/
mod.rs

1//! Rule Streaming Engine - Real-time Event Processing
2//!
3//! This module provides real-time rule execution capabilities for streaming data,
4//! including time-based windows, event aggregation, and continuous rule evaluation.
5//!
6//! ## Features
7//!
8//! - **🔄 Continuous Processing**: Non-stop rule evaluation on incoming events
9//! - **⏰ Time Windows**: Sliding and tumbling window aggregations
10//! - **📊 Stream Analytics**: Count, sum, average, min/max over time windows
11//! - **🎯 Event Filtering**: Pattern matching and event correlation
12//! - **⚡ High Throughput**: Async processing with backpressure handling
13//!
14//! ## Example
15//!
16
17#![allow(missing_docs)]
18//! ```rust,ignore
19//! use rust_rule_engine::streaming::*;
20//!
21//! let mut stream_engine = StreamRuleEngine::new()
22//!     .with_window_size(Duration::from_secs(60))
23//!     .with_buffer_size(1000);
24//!
25//! // Define streaming rule
26//! let rule = r#"
27//! rule "HighFrequencyTrading" {
28//!     when
29//!         stream(TradeEvent, 5s).count() > 100
30//!     then
31//!         AlertService.trigger("High frequency detected");
32//! }
33//! "#;
34//!
35//! stream_engine.add_rule(rule).await?;
36//! stream_engine.start().await?;
37//! ```
38
39#[cfg(feature = "streaming")]
40pub mod aggregator;
41#[cfg(feature = "streaming")]
42pub mod engine;
43#[cfg(feature = "streaming")]
44pub mod event;
45#[cfg(feature = "streaming")]
46pub mod join_manager;
47#[cfg(feature = "streaming")]
48pub mod join_optimizer;
49#[cfg(feature = "streaming")]
50pub mod operators;
51#[cfg(feature = "streaming")]
52pub mod state;
53#[cfg(feature = "streaming")]
54pub mod watermark;
55#[cfg(feature = "streaming")]
56pub mod window;
57
58#[cfg(feature = "streaming")]
59pub use aggregator::{AggregationType, Aggregator};
60#[cfg(feature = "streaming")]
61pub use engine::StreamRuleEngine;
62#[cfg(feature = "streaming")]
63pub use event::{EventMetadata, StreamEvent};
64#[cfg(feature = "streaming")]
65pub use join_manager::StreamJoinManager;
66#[cfg(feature = "streaming")]
67pub use join_optimizer::{JoinOptimization, JoinOptimizer, OptimizedJoinPlan, StreamStats};
68#[cfg(feature = "streaming")]
69pub use operators::{
70    AggregateResult, Aggregation, Average, Count, CustomAggregator, DataStream, GroupedStream,
71    KeyedStream, Max, Min, Sum, WindowConfig, WindowedStream,
72};
73#[cfg(feature = "streaming")]
74pub use state::{
75    CheckpointMetadata, StateBackend, StateConfig, StateStatistics, StateStore, StatefulOperator,
76};
77#[cfg(feature = "streaming")]
78pub use watermark::{
79    LateDataHandler, LateDataStats, LateDataStrategy, LateEventDecision, Watermark,
80    WatermarkGenerator, WatermarkStrategy, WatermarkedStream,
81};
82#[cfg(feature = "streaming")]
83pub use window::{TimeWindow, WindowManager, WindowType};
84
85/// Re-export for non-streaming builds
86#[cfg(not(feature = "streaming"))]
87pub struct StreamRuleEngine;
88
89#[cfg(not(feature = "streaming"))]
90impl StreamRuleEngine {
91    /// Create a new stream rule engine (non-streaming placeholder)
92    pub fn new() -> Self {
93        StreamRuleEngine
94    }
95
96    /// Create with custom configuration (requires streaming feature)
97    pub fn with_config(_config: StreamConfig) -> Self {
98        panic!("StreamRuleEngine configuration methods require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
99    }
100
101    /// Add streaming rule from GRL string (requires streaming feature)
102    pub async fn add_rule(&mut self, _grl_rule: &str) -> Result<()> {
103        Err(crate::RuleEngineError::FeatureNotEnabled {
104            feature: "streaming".to_string(),
105            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
106                .to_string(),
107        })
108    }
109
110    /// Add streaming rule from file (requires streaming feature)
111    pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, _path: P) -> Result<()> {
112        Err(crate::RuleEngineError::FeatureNotEnabled {
113            feature: "streaming".to_string(),
114            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
115                .to_string(),
116        })
117    }
118
119    /// Register action handler (requires streaming feature)
120    pub async fn register_action_handler<F>(&self, _action_type: &str, _handler: F)
121    where
122        F: Fn(&StreamAction) + Send + Sync + 'static,
123    {
124        panic!("StreamRuleEngine action handlers require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
125    }
126
127    /// Start the streaming engine (requires streaming feature)
128    pub async fn start(&mut self) -> Result<()> {
129        Err(crate::RuleEngineError::FeatureNotEnabled {
130            feature: "streaming".to_string(),
131            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
132                .to_string(),
133        })
134    }
135
136    /// Stop the streaming engine (requires streaming feature)
137    pub async fn stop(&self) {
138        panic!("StreamRuleEngine stop method requires the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
139    }
140
141    /// Send event to streaming engine (requires streaming feature)
142    pub async fn send_event(&self, _event: StreamEvent) -> Result<()> {
143        Err(crate::RuleEngineError::FeatureNotEnabled {
144            feature: "streaming".to_string(),
145            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
146                .to_string(),
147        })
148    }
149
150    /// Execute rules manually (requires streaming feature)
151    pub async fn execute_rules(&mut self) -> Result<StreamExecutionResult> {
152        Err(crate::RuleEngineError::FeatureNotEnabled {
153            feature: "streaming".to_string(),
154            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
155                .to_string(),
156        })
157    }
158
159    /// Get window statistics (requires streaming feature)
160    pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
161        panic!("StreamRuleEngine window statistics require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
162    }
163
164    /// Get field analytics (requires streaming feature)
165    pub async fn get_field_analytics(
166        &self,
167        _field: &str,
168    ) -> std::collections::HashMap<String, crate::types::Value> {
169        panic!("StreamRuleEngine field analytics require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
170    }
171
172    /// Check if engine is running (requires streaming feature)
173    pub async fn is_running(&self) -> bool {
174        panic!("StreamRuleEngine running status requires the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
175    }
176}