use crate::event::StreamEvent;
use anyhow::Result;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CepConfig {
pub max_events_in_memory: usize,
pub max_time_window: Duration,
pub enable_correlation: bool,
pub enable_state_machines: bool,
pub enable_rules: bool,
pub enable_enrichment: bool,
pub max_pattern_depth: usize,
pub pattern_matching_timeout: Duration,
pub event_buffer_size: usize,
pub collect_metrics: bool,
pub gc_interval: Duration,
pub enable_distributed: bool,
pub num_partitions: usize,
}
impl Default for CepConfig {
fn default() -> Self {
Self {
max_events_in_memory: 100000,
max_time_window: Duration::from_secs(3600),
enable_correlation: true,
enable_state_machines: true,
enable_rules: true,
enable_enrichment: true,
max_pattern_depth: 10,
pattern_matching_timeout: Duration::from_millis(100),
event_buffer_size: 10000,
collect_metrics: true,
gc_interval: Duration::from_secs(60),
enable_distributed: false,
num_partitions: 8,
}
}
}
pub struct CepEngine {
patterns: Arc<RwLock<HashMap<String, EventPattern>>>,
event_buffers: Arc<RwLock<HashMap<String, EventBuffer>>>,
state_machines: Arc<RwLock<HashMap<String, StateMachine>>>,
rule_engine: Arc<RwLock<RuleEngine>>,
correlator: Arc<RwLock<EventCorrelator>>,
enrichment_service: Arc<RwLock<EnrichmentService>>,
pattern_detector: Arc<RwLock<PatternDetector>>,
metrics: Arc<RwLock<CepMetrics>>,
config: CepConfig,
last_gc: Arc<RwLock<Instant>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventPattern {
Simple {
name: String,
predicates: Vec<FieldPredicate>,
},
Sequence {
name: String,
patterns: Vec<EventPattern>,
time_window: Option<Duration>,
strict: bool,
},
And {
name: String,
patterns: Vec<EventPattern>,
time_window: Option<Duration>,
},
Or {
name: String,
patterns: Vec<EventPattern>,
},
Not {
name: String,
pattern: Box<EventPattern>,
time_window: Duration,
},
Repeat {
name: String,
pattern: Box<EventPattern>,
min_count: usize,
max_count: Option<usize>,
time_window: Option<Duration>,
},
Temporal {
name: String,
first: Box<EventPattern>,
operator: TemporalOperator,
second: Box<EventPattern>,
tolerance: Option<Duration>,
},
Aggregation {
name: String,
pattern: Box<EventPattern>,
aggregation: CepAggregationFunction,
window: Duration,
threshold: f64,
},
}
impl EventPattern {
pub fn simple(field: &str, value: &str) -> Self {
EventPattern::Simple {
name: format!("{}={}", field, value),
predicates: vec![FieldPredicate::Equals {
field: field.to_string(),
value: value.to_string(),
}],
}
}
pub fn sequence(patterns: Vec<EventPattern>) -> Self {
EventPattern::Sequence {
name: "sequence".to_string(),
patterns,
time_window: None,
strict: false,
}
}
pub fn with_time_window(mut self, window: Duration) -> Self {
match &mut self {
EventPattern::Sequence { time_window, .. } => *time_window = Some(window),
EventPattern::And { time_window, .. } => *time_window = Some(window),
EventPattern::Repeat { time_window, .. } => *time_window = Some(window),
_ => {}
}
self
}
pub fn name(&self) -> &str {
match self {
EventPattern::Simple { name, .. } => name,
EventPattern::Sequence { name, .. } => name,
EventPattern::And { name, .. } => name,
EventPattern::Or { name, .. } => name,
EventPattern::Not { name, .. } => name,
EventPattern::Repeat { name, .. } => name,
EventPattern::Temporal { name, .. } => name,
EventPattern::Aggregation { name, .. } => name,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FieldPredicate {
Equals { field: String, value: String },
NotEquals { field: String, value: String },
Contains { field: String, substring: String },
Regex { field: String, pattern: String },
GreaterThan { field: String, value: f64 },
LessThan { field: String, value: f64 },
InRange { field: String, min: f64, max: f64 },
Exists { field: String },
Custom { name: String },
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum TemporalOperator {
Before,
After,
Meets,
During,
Overlaps,
Starts,
Finishes,
Equals,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CepAggregationFunction {
Count,
Sum { field: String },
Average { field: String },
Min { field: String },
Max { field: String },
StdDev { field: String },
Percentile { field: String, percentile: f64 },
Custom { name: String },
}
#[derive(Debug, Clone)]
pub struct EventBuffer {
pub stream_name: String,
pub events: VecDeque<TimestampedEvent>,
pub max_size: usize,
pub oldest_timestamp: Option<DateTime<Utc>>,
pub newest_timestamp: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub struct TimestampedEvent {
pub event: StreamEvent,
pub timestamp: DateTime<Utc>,
pub id: Uuid,
}
#[derive(Debug, Clone)]
pub struct StateMachine {
pub pattern: EventPattern,
pub state: State,
pub partial_matches: Vec<PartialMatch>,
pub completed_matches: Vec<CompleteMatch>,
pub transition_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum State {
Initial,
Intermediate { stage: usize },
Final,
Error,
}
#[derive(Debug, Clone)]
pub struct PartialMatch {
pub id: Uuid,
pub events: Vec<TimestampedEvent>,
pub stage: usize,
pub start_time: DateTime<Utc>,
pub last_update: DateTime<Utc>,
pub state: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompleteMatch {
pub id: Uuid,
pub pattern_name: String,
pub event_ids: Vec<Uuid>,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub duration: Duration,
pub confidence: f64,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct RuleEngine {
pub rules: HashMap<String, ProcessingRule>,
pub stats: RuleExecutionStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessingRule {
pub name: String,
pub condition: RuleCondition,
pub actions: Vec<RuleAction>,
pub priority: i32,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RuleCondition {
PatternMatched { pattern: String },
FieldCondition { predicate: FieldPredicate },
ThresholdExceeded { metric: String, threshold: f64 },
Complex {
operator: String,
conditions: Vec<RuleCondition>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RuleAction {
EmitEvent {
event_type: String,
data: HashMap<String, String>,
},
SendAlert { severity: String, message: String },
UpdateState { key: String, value: String },
Webhook { url: String, method: String },
Custom {
name: String,
params: HashMap<String, String>,
},
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RuleExecutionStats {
pub total_executions: u64,
pub successful_executions: u64,
pub failed_executions: u64,
pub total_execution_time: Duration,
pub avg_execution_time: Duration,
}
#[derive(Debug, Clone)]
pub struct EventCorrelator {
pub correlation_functions: HashMap<String, CorrelationFunction>,
pub correlation_cache: HashMap<CorrelationKey, CorrelationResult>,
pub stats: CorrelationStats,
}
#[derive(Debug, Clone)]
pub struct CorrelationFunction {
pub name: String,
pub time_window: Duration,
pub fields: Vec<String>,
pub threshold: f64,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct CorrelationKey {
pub event1: Uuid,
pub event2: Uuid,
pub function: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CorrelationResult {
pub score: f64,
pub correlated_fields: Vec<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CorrelationStats {
pub total_correlations: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub avg_correlation_score: f64,
}
#[derive(Debug, Clone)]
pub struct EnrichmentService {
pub sources: HashMap<String, EnrichmentSource>,
pub cache: HashMap<String, EnrichmentData>,
pub stats: EnrichmentStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnrichmentSource {
pub name: String,
pub source_type: EnrichmentSourceType,
pub key_field: String,
pub cache_ttl: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EnrichmentSourceType {
ExternalApi { url: String, auth: Option<String> },
Database {
connection_string: String,
query: String,
},
InMemory {
data: HashMap<String, HashMap<String, String>>,
},
Custom { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnrichmentData {
pub fields: HashMap<String, String>,
pub source: String,
pub timestamp: DateTime<Utc>,
pub ttl: Duration,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EnrichmentStats {
pub total_enrichments: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub failed_enrichments: u64,
}
#[derive(Debug, Clone)]
pub struct PatternDetector {
pub patterns: HashMap<String, EventPattern>,
pub algorithms: HashMap<String, DetectionAlgorithm>,
pub stats: DetectionStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DetectionAlgorithm {
Sequential,
Automaton,
Tree,
Graph,
MachineLearning { model_name: String },
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DetectionStats {
pub total_events_processed: u64,
pub patterns_detected: u64,
pub false_positives: u64,
pub false_negatives: u64,
pub avg_detection_latency: Duration,
pub total_detection_time: Duration,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CepMetrics {
pub total_events_processed: u64,
pub total_patterns_detected: u64,
pub events_per_second: f64,
pub patterns_per_second: f64,
pub avg_event_processing_latency: Duration,
pub avg_pattern_matching_latency: Duration,
pub memory_usage_bytes: usize,
pub active_partial_matches: usize,
pub completed_matches: usize,
pub gc_count: u64,
pub last_update: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetectedPattern {
pub pattern_match: CompleteMatch,
pub triggered_rules: Vec<String>,
pub correlations: Vec<CorrelationResult>,
pub enrichments: HashMap<String, EnrichmentData>,
}
impl CepEngine {
pub fn new(config: CepConfig) -> Result<Self> {
Ok(Self {
patterns: Arc::new(RwLock::new(HashMap::new())),
event_buffers: Arc::new(RwLock::new(HashMap::new())),
state_machines: Arc::new(RwLock::new(HashMap::new())),
rule_engine: Arc::new(RwLock::new(RuleEngine {
rules: HashMap::new(),
stats: RuleExecutionStats::default(),
})),
correlator: Arc::new(RwLock::new(EventCorrelator {
correlation_functions: HashMap::new(),
correlation_cache: HashMap::new(),
stats: CorrelationStats::default(),
})),
enrichment_service: Arc::new(RwLock::new(EnrichmentService {
sources: HashMap::new(),
cache: HashMap::new(),
stats: EnrichmentStats::default(),
})),
pattern_detector: Arc::new(RwLock::new(PatternDetector {
patterns: HashMap::new(),
algorithms: HashMap::new(),
stats: DetectionStats::default(),
})),
metrics: Arc::new(RwLock::new(CepMetrics {
last_update: Utc::now(),
..Default::default()
})),
config,
last_gc: Arc::new(RwLock::new(Instant::now())),
})
}
pub async fn register_pattern(&mut self, name: &str, pattern: EventPattern) -> Result<()> {
let mut patterns = self.patterns.write().await;
patterns.insert(name.to_string(), pattern.clone());
let mut state_machines = self.state_machines.write().await;
state_machines.insert(
name.to_string(),
StateMachine {
pattern,
state: State::Initial,
partial_matches: Vec::new(),
completed_matches: Vec::new(),
transition_count: 0,
},
);
info!("Registered CEP pattern: {}", name);
Ok(())
}
pub async fn register_rule(&mut self, rule: ProcessingRule) -> Result<()> {
let mut rule_engine = self.rule_engine.write().await;
rule_engine.rules.insert(rule.name.clone(), rule.clone());
info!("Registered CEP rule: {}", rule.name);
Ok(())
}
pub async fn process_event(&mut self, event: StreamEvent) -> Result<Vec<DetectedPattern>> {
let start_time = Instant::now();
let event_timestamp = Utc::now();
let timestamped_event = TimestampedEvent {
event: event.clone(),
timestamp: event_timestamp,
id: Uuid::new_v4(),
};
self.add_to_buffer("default", timestamped_event.clone())
.await?;
self.maybe_run_gc().await?;
let detected_patterns = self.detect_patterns(×tamped_event).await?;
let mut results = Vec::new();
for pattern_match in detected_patterns {
let triggered_rules = self.execute_rules(&pattern_match).await?;
let correlations = if self.config.enable_correlation {
self.correlate_events(&pattern_match).await?
} else {
Vec::new()
};
let enrichments = if self.config.enable_enrichment {
self.enrich_events(&pattern_match).await?
} else {
HashMap::new()
};
results.push(DetectedPattern {
pattern_match,
triggered_rules,
correlations,
enrichments,
});
}
let processing_latency = start_time.elapsed();
self.update_metrics(processing_latency, results.len()).await;
Ok(results)
}
async fn add_to_buffer(&self, stream: &str, event: TimestampedEvent) -> Result<()> {
let mut buffers = self.event_buffers.write().await;
let buffer = buffers
.entry(stream.to_string())
.or_insert_with(|| EventBuffer {
stream_name: stream.to_string(),
events: VecDeque::new(),
max_size: self.config.event_buffer_size,
oldest_timestamp: None,
newest_timestamp: None,
});
if buffer.oldest_timestamp.is_none() {
buffer.oldest_timestamp = Some(event.timestamp);
}
buffer.newest_timestamp = Some(event.timestamp);
buffer.events.push_back(event);
while buffer.events.len() > buffer.max_size {
buffer.events.pop_front();
if let Some(first_event) = buffer.events.front() {
buffer.oldest_timestamp = Some(first_event.timestamp);
}
}
Ok(())
}
async fn detect_patterns(&self, new_event: &TimestampedEvent) -> Result<Vec<CompleteMatch>> {
let mut detected = Vec::new();
let mut state_machines = self.state_machines.write().await;
for (pattern_name, state_machine) in state_machines.iter_mut() {
if let Some(complete_match) = self.try_match_pattern(state_machine, new_event).await? {
detected.push(complete_match);
debug!("Pattern detected: {}", pattern_name);
}
}
Ok(detected)
}
async fn try_match_pattern(
&self,
state_machine: &mut StateMachine,
event: &TimestampedEvent,
) -> Result<Option<CompleteMatch>> {
let pattern = state_machine.pattern.clone();
match &pattern {
EventPattern::Simple { predicates, .. } => {
if self.evaluate_predicates(predicates, &event.event).await? {
Ok(Some(CompleteMatch {
id: Uuid::new_v4(),
pattern_name: pattern.name().to_string(),
event_ids: vec![event.id],
start_time: event.timestamp,
end_time: event.timestamp,
duration: Duration::from_secs(0),
confidence: 1.0,
metadata: HashMap::new(),
}))
} else {
Ok(None)
}
}
EventPattern::Sequence {
patterns,
time_window,
strict,
..
} => {
self.match_sequence(state_machine, event, patterns, *time_window, *strict)
.await
}
EventPattern::And {
patterns,
time_window,
..
} => {
self.match_conjunction(state_machine, event, patterns, *time_window)
.await
}
_ => {
Ok(None)
}
}
}
async fn match_sequence(
&self,
state_machine: &mut StateMachine,
event: &TimestampedEvent,
patterns: &[EventPattern],
time_window: Option<Duration>,
_strict: bool,
) -> Result<Option<CompleteMatch>> {
let mut new_partial_matches = Vec::new();
for partial_match in &mut state_machine.partial_matches {
let next_stage = partial_match.stage;
if next_stage < patterns.len() {
if let EventPattern::Simple { predicates, .. } = &patterns[next_stage] {
if self.evaluate_predicates(predicates, &event.event).await? {
if let Some(window) = time_window {
let elapsed = event
.timestamp
.signed_duration_since(partial_match.start_time);
if elapsed.num_seconds() > window.as_secs() as i64 {
continue; }
}
let mut new_match = partial_match.clone();
new_match.events.push(event.clone());
new_match.stage += 1;
new_match.last_update = event.timestamp;
if new_match.stage == patterns.len() {
let event_ids: Vec<Uuid> =
new_match.events.iter().map(|e| e.id).collect();
let duration = event
.timestamp
.signed_duration_since(new_match.start_time)
.to_std()
.unwrap_or(Duration::from_secs(0));
return Ok(Some(CompleteMatch {
id: Uuid::new_v4(),
pattern_name: state_machine.pattern.name().to_string(),
event_ids,
start_time: new_match.start_time,
end_time: event.timestamp,
duration,
confidence: 1.0,
metadata: HashMap::new(),
}));
} else {
new_partial_matches.push(new_match);
}
}
}
}
}
if let EventPattern::Simple { predicates, .. } = &patterns[0] {
if self.evaluate_predicates(predicates, &event.event).await? {
new_partial_matches.push(PartialMatch {
id: Uuid::new_v4(),
events: vec![event.clone()],
stage: 1,
start_time: event.timestamp,
last_update: event.timestamp,
state: HashMap::new(),
});
}
}
state_machine.partial_matches = new_partial_matches;
Ok(None)
}
async fn match_conjunction(
&self,
_state_machine: &mut StateMachine,
_event: &TimestampedEvent,
_patterns: &[EventPattern],
_time_window: Option<Duration>,
) -> Result<Option<CompleteMatch>> {
Ok(None)
}
async fn evaluate_predicates(
&self,
predicates: &[FieldPredicate],
event: &StreamEvent,
) -> Result<bool> {
for predicate in predicates {
match predicate {
FieldPredicate::Equals { field, value } => {
if field == "event_type" {
let event_type = match event {
StreamEvent::TripleAdded { .. } => "TripleAdded",
StreamEvent::TripleRemoved { .. } => "TripleRemoved",
StreamEvent::QuadAdded { .. } => "QuadAdded",
StreamEvent::QuadRemoved { .. } => "QuadRemoved",
StreamEvent::GraphCreated { .. } => "GraphCreated",
StreamEvent::GraphCleared { .. } => "GraphCleared",
StreamEvent::GraphDeleted { .. } => "GraphDeleted",
StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
StreamEvent::TransactionBegin { .. } => "TransactionBegin",
StreamEvent::TransactionCommit { .. } => "TransactionCommit",
StreamEvent::TransactionAbort { .. } => "TransactionAbort",
StreamEvent::SchemaChanged { .. } => "SchemaChanged",
StreamEvent::Heartbeat { .. } => "Heartbeat",
_ => "Other", };
if event_type != value {
return Ok(false);
}
}
}
FieldPredicate::Contains { field, substring } => {
if field == "source" {
let source = match event {
StreamEvent::Heartbeat { source, .. } => source,
_ => return Ok(false),
};
if !source.contains(substring) {
return Ok(false);
}
}
}
_ => {
}
}
}
Ok(true)
}
async fn execute_rules(&self, pattern_match: &CompleteMatch) -> Result<Vec<String>> {
let mut triggered = Vec::new();
let rules = {
let rule_engine = self.rule_engine.read().await;
rule_engine.rules.clone()
};
for (rule_name, rule) in &rules {
if !rule.enabled {
continue;
}
if self
.evaluate_rule_condition(&rule.condition, pattern_match)
.await?
{
for action in &rule.actions {
self.execute_rule_action(action).await?;
}
triggered.push(rule_name.clone());
let mut rule_engine = self.rule_engine.write().await;
rule_engine.stats.successful_executions += 1;
}
}
Ok(triggered)
}
async fn evaluate_rule_condition(
&self,
condition: &RuleCondition,
pattern_match: &CompleteMatch,
) -> Result<bool> {
match condition {
RuleCondition::PatternMatched { pattern } => Ok(&pattern_match.pattern_name == pattern),
_ => {
Ok(false)
}
}
}
async fn execute_rule_action(&self, action: &RuleAction) -> Result<()> {
match action {
RuleAction::SendAlert { severity, message } => {
info!("CEP Alert [{}]: {}", severity, message);
}
RuleAction::EmitEvent { event_type, data } => {
debug!("CEP Emit Event: {} with data: {:?}", event_type, data);
}
_ => {
}
}
Ok(())
}
async fn correlate_events(
&self,
_pattern_match: &CompleteMatch,
) -> Result<Vec<CorrelationResult>> {
Ok(Vec::new())
}
async fn enrich_events(
&self,
_pattern_match: &CompleteMatch,
) -> Result<HashMap<String, EnrichmentData>> {
Ok(HashMap::new())
}
async fn maybe_run_gc(&self) -> Result<()> {
let mut last_gc = self.last_gc.write().await;
if last_gc.elapsed() >= self.config.gc_interval {
self.run_gc().await?;
*last_gc = Instant::now();
let mut metrics = self.metrics.write().await;
metrics.gc_count += 1;
}
Ok(())
}
async fn run_gc(&self) -> Result<()> {
let cutoff_time =
Utc::now() - ChronoDuration::seconds(self.config.max_time_window.as_secs() as i64);
let mut buffers = self.event_buffers.write().await;
for buffer in buffers.values_mut() {
buffer.events.retain(|e| e.timestamp > cutoff_time);
if let Some(first_event) = buffer.events.front() {
buffer.oldest_timestamp = Some(first_event.timestamp);
}
}
let mut state_machines = self.state_machines.write().await;
for state_machine in state_machines.values_mut() {
state_machine
.partial_matches
.retain(|m| m.last_update > cutoff_time);
}
debug!("CEP garbage collection completed");
Ok(())
}
async fn update_metrics(&self, processing_latency: Duration, patterns_detected: usize) {
let mut metrics = self.metrics.write().await;
metrics.total_events_processed += 1;
metrics.total_patterns_detected += patterns_detected as u64;
let now = Utc::now();
let elapsed_duration = now.signed_duration_since(metrics.last_update);
let elapsed_secs = elapsed_duration.num_seconds() as f64;
if elapsed_secs > 0.0 {
metrics.events_per_second = metrics.total_events_processed as f64 / elapsed_secs;
metrics.patterns_per_second = metrics.total_patterns_detected as f64 / elapsed_secs;
}
let total_latency = metrics.avg_event_processing_latency.as_micros()
* (metrics.total_events_processed - 1) as u128
+ processing_latency.as_micros();
metrics.avg_event_processing_latency =
Duration::from_micros((total_latency / metrics.total_events_processed as u128) as u64);
let state_machines = self.state_machines.read().await;
metrics.active_partial_matches = state_machines
.values()
.map(|sm| sm.partial_matches.len())
.sum();
}
pub async fn get_metrics(&self) -> CepMetrics {
self.metrics.read().await.clone()
}
pub async fn get_statistics(&self) -> CepStatistics {
let metrics = self.metrics.read().await;
let rule_engine = self.rule_engine.read().await;
let correlator = self.correlator.read().await;
let enrichment = self.enrichment_service.read().await;
let detector = self.pattern_detector.read().await;
CepStatistics {
metrics: metrics.clone(),
rule_stats: rule_engine.stats.clone(),
correlation_stats: correlator.stats.clone(),
enrichment_stats: enrichment.stats.clone(),
detection_stats: detector.stats.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CepStatistics {
pub metrics: CepMetrics,
pub rule_stats: RuleExecutionStats,
pub correlation_stats: CorrelationStats,
pub enrichment_stats: EnrichmentStats,
pub detection_stats: DetectionStats,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_cep_engine_creation() {
let config = CepConfig::default();
let engine = CepEngine::new(config);
assert!(engine.is_ok());
}
#[tokio::test]
async fn test_pattern_registration() {
let config = CepConfig::default();
let mut engine = CepEngine::new(config).unwrap();
let pattern = EventPattern::simple("event_type", "test");
let result = engine.register_pattern("test_pattern", pattern).await;
assert!(result.is_ok());
let patterns = engine.patterns.read().await;
assert!(patterns.contains_key("test_pattern"));
}
#[tokio::test]
async fn test_simple_pattern_matching() {
let config = CepConfig::default();
let mut engine = CepEngine::new(config).unwrap();
let pattern = EventPattern::simple("event_type", "Heartbeat");
engine.register_pattern("heartbeat", pattern).await.unwrap();
let event = StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
};
let detected = engine.process_event(event).await.unwrap();
assert!(!detected.is_empty());
}
#[tokio::test]
async fn test_sequence_pattern() {
let config = CepConfig::default();
let mut engine = CepEngine::new(config).unwrap();
let pattern = EventPattern::sequence(vec![
EventPattern::simple("event_type", "Heartbeat"),
EventPattern::simple("event_type", "Heartbeat"),
])
.with_time_window(Duration::from_secs(10));
engine
.register_pattern("double_heartbeat", pattern)
.await
.unwrap();
let event1 = StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
};
let detected1 = engine.process_event(event1).await.unwrap();
assert!(detected1.is_empty());
let event2 = StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
};
let detected2 = engine.process_event(event2).await.unwrap();
assert!(!detected2.is_empty()); }
#[tokio::test]
async fn test_rule_registration() {
let config = CepConfig::default();
let mut engine = CepEngine::new(config).unwrap();
let rule = ProcessingRule {
name: "test_rule".to_string(),
condition: RuleCondition::PatternMatched {
pattern: "heartbeat".to_string(),
},
actions: vec![RuleAction::SendAlert {
severity: "info".to_string(),
message: "Heartbeat detected".to_string(),
}],
priority: 1,
enabled: true,
};
let result = engine.register_rule(rule).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_event_buffer() {
let config = CepConfig::default();
let engine = CepEngine::new(config).unwrap();
let event = StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
};
let timestamped = TimestampedEvent {
event,
timestamp: Utc::now(),
id: Uuid::new_v4(),
};
engine
.add_to_buffer("test_stream", timestamped)
.await
.unwrap();
let buffers = engine.event_buffers.read().await;
assert!(buffers.contains_key("test_stream"));
assert_eq!(buffers.get("test_stream").unwrap().events.len(), 1);
}
#[tokio::test]
async fn test_predicate_evaluation() {
let config = CepConfig::default();
let engine = CepEngine::new(config).unwrap();
let predicates = vec![FieldPredicate::Equals {
field: "event_type".to_string(),
value: "Heartbeat".to_string(),
}];
let event = StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
};
let result = engine
.evaluate_predicates(&predicates, &event)
.await
.unwrap();
assert!(result);
}
#[tokio::test]
async fn test_metrics_collection() {
let config = CepConfig::default();
let mut engine = CepEngine::new(config).unwrap();
let event = StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
};
engine.process_event(event).await.unwrap();
let metrics = engine.get_metrics().await;
assert_eq!(metrics.total_events_processed, 1);
}
#[tokio::test]
async fn test_garbage_collection() {
let config = CepConfig {
gc_interval: Duration::from_millis(10),
..Default::default()
};
let engine = CepEngine::new(config).unwrap();
let old_event = TimestampedEvent {
event: StreamEvent::Heartbeat {
timestamp: Utc::now(),
source: "test".to_string(),
metadata: EventMetadata::default(),
},
timestamp: Utc::now() - ChronoDuration::hours(2),
id: Uuid::new_v4(),
};
engine.add_to_buffer("test", old_event).await.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
engine.run_gc().await.unwrap();
let buffers = engine.event_buffers.read().await;
assert!(buffers.get("test").unwrap().events.is_empty());
}
#[tokio::test]
async fn test_pattern_with_time_window() {
let pattern = EventPattern::sequence(vec![
EventPattern::simple("type", "A"),
EventPattern::simple("type", "B"),
])
.with_time_window(Duration::from_secs(5));
match pattern {
EventPattern::Sequence { time_window, .. } => {
assert_eq!(time_window, Some(Duration::from_secs(5)));
}
_ => panic!("Expected sequence pattern"),
}
}
#[tokio::test]
async fn test_statistics() {
let config = CepConfig::default();
let engine = CepEngine::new(config).unwrap();
let stats = engine.get_statistics().await;
assert_eq!(stats.metrics.total_events_processed, 0);
}
}