rust_rule_engine/streaming/
mod.rs

1//! Rule Streaming Engine - Real-time Event Processing
2//!
3//! This module provides real-time rule execution capabilities for streaming data,
4//! including time-based windows, event aggregation, and continuous rule evaluation.
5//!
6//! ## Features
7//!
8//! - **🔄 Continuous Processing**: Non-stop rule evaluation on incoming events
9//! - **⏰ Time Windows**: Sliding and tumbling window aggregations
10//! - **📊 Stream Analytics**: Count, sum, average, min/max over time windows
11//! - **🎯 Event Filtering**: Pattern matching and event correlation
12//! - **⚡ High Throughput**: Async processing with backpressure handling
13//!
14//! ## Example
15//!
16
17#![allow(missing_docs)]
18//! ```rust,ignore
19//! use rust_rule_engine::streaming::*;
20//!
21//! let mut stream_engine = StreamRuleEngine::new()
22//!     .with_window_size(Duration::from_secs(60))
23//!     .with_buffer_size(1000);
24//!
25//! // Define streaming rule
26//! let rule = r#"
27//! rule "HighFrequencyTrading" {
28//!     when
29//!         stream(TradeEvent, 5s).count() > 100
30//!     then
31//!         AlertService.trigger("High frequency detected");
32//! }
33//! "#;
34//!
35//! stream_engine.add_rule(rule).await?;
36//! stream_engine.start().await?;
37//! ```
38
39#[cfg(feature = "streaming")]
40pub mod aggregator;
41#[cfg(feature = "streaming")]
42pub mod engine;
43#[cfg(feature = "streaming")]
44pub mod event;
45#[cfg(feature = "streaming")]
46pub mod operators;
47#[cfg(feature = "streaming")]
48pub mod state;
49#[cfg(feature = "streaming")]
50pub mod watermark;
51#[cfg(feature = "streaming")]
52pub mod window;
53
54#[cfg(feature = "streaming")]
55pub use aggregator::{AggregationType, Aggregator};
56#[cfg(feature = "streaming")]
57pub use engine::StreamRuleEngine;
58#[cfg(feature = "streaming")]
59pub use event::{EventMetadata, StreamEvent};
60#[cfg(feature = "streaming")]
61pub use operators::{
62    AggregateResult, Aggregation, Average, Count, CustomAggregator, DataStream, GroupedStream,
63    KeyedStream, Max, Min, Sum, WindowConfig, WindowedStream,
64};
65#[cfg(feature = "streaming")]
66pub use state::{
67    CheckpointMetadata, StateBackend, StateConfig, StateStatistics, StateStore, StatefulOperator,
68};
69#[cfg(feature = "streaming")]
70pub use watermark::{
71    LateDataHandler, LateDataStats, LateDataStrategy, LateEventDecision, Watermark,
72    WatermarkGenerator, WatermarkStrategy, WatermarkedStream,
73};
74#[cfg(feature = "streaming")]
75pub use window::{TimeWindow, WindowManager, WindowType};
76
77/// Re-export for non-streaming builds
78#[cfg(not(feature = "streaming"))]
79pub struct StreamRuleEngine;
80
81#[cfg(not(feature = "streaming"))]
82impl StreamRuleEngine {
83    /// Create a new stream rule engine (non-streaming placeholder)
84    pub fn new() -> Self {
85        StreamRuleEngine
86    }
87
88    /// Create with custom configuration (requires streaming feature)
89    pub fn with_config(_config: StreamConfig) -> Self {
90        panic!("StreamRuleEngine configuration methods require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
91    }
92
93    /// Add streaming rule from GRL string (requires streaming feature)
94    pub async fn add_rule(&mut self, _grl_rule: &str) -> Result<()> {
95        Err(crate::RuleEngineError::FeatureNotEnabled {
96            feature: "streaming".to_string(),
97            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
98                .to_string(),
99        })
100    }
101
102    /// Add streaming rule from file (requires streaming feature)
103    pub async fn add_rule_file<P: AsRef<std::path::Path>>(&mut self, _path: P) -> Result<()> {
104        Err(crate::RuleEngineError::FeatureNotEnabled {
105            feature: "streaming".to_string(),
106            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
107                .to_string(),
108        })
109    }
110
111    /// Register action handler (requires streaming feature)
112    pub async fn register_action_handler<F>(&self, _action_type: &str, _handler: F)
113    where
114        F: Fn(&StreamAction) + Send + Sync + 'static,
115    {
116        panic!("StreamRuleEngine action handlers require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
117    }
118
119    /// Start the streaming engine (requires streaming feature)
120    pub async fn start(&mut self) -> Result<()> {
121        Err(crate::RuleEngineError::FeatureNotEnabled {
122            feature: "streaming".to_string(),
123            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
124                .to_string(),
125        })
126    }
127
128    /// Stop the streaming engine (requires streaming feature)
129    pub async fn stop(&self) {
130        panic!("StreamRuleEngine stop method requires the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
131    }
132
133    /// Send event to streaming engine (requires streaming feature)
134    pub async fn send_event(&self, _event: StreamEvent) -> Result<()> {
135        Err(crate::RuleEngineError::FeatureNotEnabled {
136            feature: "streaming".to_string(),
137            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
138                .to_string(),
139        })
140    }
141
142    /// Execute rules manually (requires streaming feature)
143    pub async fn execute_rules(&mut self) -> Result<StreamExecutionResult> {
144        Err(crate::RuleEngineError::FeatureNotEnabled {
145            feature: "streaming".to_string(),
146            message: "Streaming rule engine requires the 'streaming' feature to be enabled"
147                .to_string(),
148        })
149    }
150
151    /// Get window statistics (requires streaming feature)
152    pub async fn get_window_statistics(&self) -> crate::streaming::window::WindowStatistics {
153        panic!("StreamRuleEngine window statistics require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
154    }
155
156    /// Get field analytics (requires streaming feature)
157    pub async fn get_field_analytics(
158        &self,
159        _field: &str,
160    ) -> std::collections::HashMap<String, crate::types::Value> {
161        panic!("StreamRuleEngine field analytics require the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
162    }
163
164    /// Check if engine is running (requires streaming feature)
165    pub async fn is_running(&self) -> bool {
166        panic!("StreamRuleEngine running status requires the 'streaming' feature to be enabled. Enable it in Cargo.toml: features = [\"streaming\"]");
167    }
168}