Skip to main content

oxirs_stream/
cep_engine.rs

1//! # Complex Event Processing (CEP) Engine
2//!
3//! Production-grade CEP engine for detecting complex patterns across multiple event streams,
4//! featuring composite event detection, event correlation, state machines, and rule-based
5//! event processing with real-time pattern matching.
6//!
7//! ## Features
8//!
9//! - **Composite Event Detection**: Detect complex patterns from multiple simple events
10//! - **Event Correlation**: Correlate events across streams using time windows and predicates
11//! - **State Machine Processing**: Define complex event sequences with state transitions
12//! - **Rule-Based Engine**: Define processing rules with conditions and actions
13//! - **Temporal Operators**: Before, After, During, Overlaps, Meets, Starts, Finishes
14//! - **Event Aggregation**: Aggregate events over time windows with custom functions
15//! - **Event Enrichment**: Enrich events with contextual data from external sources
16//! - **Pattern Library**: Pre-defined patterns for common scenarios
17//! - **Real-time Processing**: Sub-millisecond pattern detection latency
18//! - **Distributed Support**: Partition-aware processing for horizontal scaling
19//!
20//! ## Example
21//!
22//! ```no_run
23//! use oxirs_stream::cep_engine::{CepEngine, CepConfig, EventPattern, TemporalOperator};
24//! use oxirs_stream::event::StreamEvent;
25//!
26//! # async fn example() -> anyhow::Result<()> {
27//! let config = CepConfig::default();
28//! let mut engine = CepEngine::new(config)?;
29//!
30//! // Define a pattern: "A followed by B within 10 seconds"
31//! let pattern = EventPattern::sequence(vec![
32//!     EventPattern::simple("event_type", "A"),
33//!     EventPattern::simple("event_type", "B"),
34//! ]).with_time_window(std::time::Duration::from_secs(10));
35//!
36//! engine.register_pattern("a_then_b", pattern).await?;
37//!
38//! // Process events
39//! # let event = StreamEvent::Heartbeat {
40//! #     timestamp: chrono::Utc::now(),
41//! #     source: "test".to_string(),
42//! #     metadata: Default::default(),
43//! # };
44//! let detected_patterns = engine.process_event(event).await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use crate::event::StreamEvent;
50use anyhow::Result;
51use chrono::{DateTime, Duration as ChronoDuration, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, VecDeque};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56use tokio::sync::RwLock;
57use tracing::{debug, info};
58use uuid::Uuid;
59
60/// Configuration for CEP engine
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CepConfig {
63    /// Maximum number of events to keep in memory per partition
64    pub max_events_in_memory: usize,
65
66    /// Maximum time window for pattern detection
67    pub max_time_window: Duration,
68
69    /// Enable event correlation
70    pub enable_correlation: bool,
71
72    /// Enable state machine processing
73    pub enable_state_machines: bool,
74
75    /// Enable rule-based processing
76    pub enable_rules: bool,
77
78    /// Enable event enrichment
79    pub enable_enrichment: bool,
80
81    /// Maximum pattern complexity (nested depth)
82    pub max_pattern_depth: usize,
83
84    /// Pattern matching timeout
85    pub pattern_matching_timeout: Duration,
86
87    /// Event buffer size per stream
88    pub event_buffer_size: usize,
89
90    /// Enable metrics collection
91    pub collect_metrics: bool,
92
93    /// Garbage collection interval for expired events
94    pub gc_interval: Duration,
95
96    /// Enable distributed processing
97    pub enable_distributed: bool,
98
99    /// Number of partitions for distributed processing
100    pub num_partitions: usize,
101}
102
103impl Default for CepConfig {
104    fn default() -> Self {
105        Self {
106            max_events_in_memory: 100000,
107            max_time_window: Duration::from_secs(3600),
108            enable_correlation: true,
109            enable_state_machines: true,
110            enable_rules: true,
111            enable_enrichment: true,
112            max_pattern_depth: 10,
113            pattern_matching_timeout: Duration::from_millis(100),
114            event_buffer_size: 10000,
115            collect_metrics: true,
116            gc_interval: Duration::from_secs(60),
117            enable_distributed: false,
118            num_partitions: 8,
119        }
120    }
121}
122
123/// Complex Event Processing engine
124pub struct CepEngine {
125    /// Registered patterns
126    patterns: Arc<RwLock<HashMap<String, EventPattern>>>,
127
128    /// Event buffers per stream
129    event_buffers: Arc<RwLock<HashMap<String, EventBuffer>>>,
130
131    /// State machines for pattern tracking
132    state_machines: Arc<RwLock<HashMap<String, StateMachine>>>,
133
134    /// Rule engine
135    rule_engine: Arc<RwLock<RuleEngine>>,
136
137    /// Event correlator
138    correlator: Arc<RwLock<EventCorrelator>>,
139
140    /// Event enrichment service
141    enrichment_service: Arc<RwLock<EnrichmentService>>,
142
143    /// Pattern detector
144    pattern_detector: Arc<RwLock<PatternDetector>>,
145
146    /// Metrics collector
147    metrics: Arc<RwLock<CepMetrics>>,
148
149    /// Configuration
150    config: CepConfig,
151
152    /// Last garbage collection time
153    last_gc: Arc<RwLock<Instant>>,
154}
155
156/// Event pattern definition
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub enum EventPattern {
159    /// Simple event pattern with field predicates
160    Simple {
161        /// Pattern name
162        name: String,
163        /// Field predicates
164        predicates: Vec<FieldPredicate>,
165    },
166
167    /// Sequence pattern (events in order)
168    Sequence {
169        /// Pattern name
170        name: String,
171        /// Sub-patterns
172        patterns: Vec<EventPattern>,
173        /// Time window
174        time_window: Option<Duration>,
175        /// Strict ordering (no interleaving)
176        strict: bool,
177    },
178
179    /// Conjunction pattern (all events must occur)
180    And {
181        /// Pattern name
182        name: String,
183        /// Sub-patterns
184        patterns: Vec<EventPattern>,
185        /// Time window
186        time_window: Option<Duration>,
187    },
188
189    /// Disjunction pattern (any event occurs)
190    Or {
191        /// Pattern name
192        name: String,
193        /// Sub-patterns
194        patterns: Vec<EventPattern>,
195    },
196
197    /// Negation pattern (event must not occur)
198    Not {
199        /// Pattern name
200        name: String,
201        /// Pattern to negate
202        pattern: Box<EventPattern>,
203        /// Time window for negation
204        time_window: Duration,
205    },
206
207    /// Repeat pattern (event occurs N times)
208    Repeat {
209        /// Pattern name
210        name: String,
211        /// Pattern to repeat
212        pattern: Box<EventPattern>,
213        /// Minimum occurrences
214        min_count: usize,
215        /// Maximum occurrences
216        max_count: Option<usize>,
217        /// Time window
218        time_window: Option<Duration>,
219    },
220
221    /// Temporal pattern with Allen's interval algebra
222    Temporal {
223        /// Pattern name
224        name: String,
225        /// First event pattern
226        first: Box<EventPattern>,
227        /// Temporal operator
228        operator: TemporalOperator,
229        /// Second event pattern
230        second: Box<EventPattern>,
231        /// Time tolerance
232        tolerance: Option<Duration>,
233    },
234
235    /// Aggregation pattern (aggregate events over window)
236    Aggregation {
237        /// Pattern name
238        name: String,
239        /// Pattern to aggregate
240        pattern: Box<EventPattern>,
241        /// Aggregation function
242        aggregation: CepAggregationFunction,
243        /// Time window
244        window: Duration,
245        /// Threshold for triggering
246        threshold: f64,
247    },
248}
249
250impl EventPattern {
251    /// Create a simple pattern
252    pub fn simple(field: &str, value: &str) -> Self {
253        EventPattern::Simple {
254            name: format!("{}={}", field, value),
255            predicates: vec![FieldPredicate::Equals {
256                field: field.to_string(),
257                value: value.to_string(),
258            }],
259        }
260    }
261
262    /// Create a sequence pattern
263    pub fn sequence(patterns: Vec<EventPattern>) -> Self {
264        EventPattern::Sequence {
265            name: "sequence".to_string(),
266            patterns,
267            time_window: None,
268            strict: false,
269        }
270    }
271
272    /// Add time window to pattern
273    pub fn with_time_window(mut self, window: Duration) -> Self {
274        match &mut self {
275            EventPattern::Sequence { time_window, .. } => *time_window = Some(window),
276            EventPattern::And { time_window, .. } => *time_window = Some(window),
277            EventPattern::Repeat { time_window, .. } => *time_window = Some(window),
278            _ => {}
279        }
280        self
281    }
282
283    /// Get pattern name
284    pub fn name(&self) -> &str {
285        match self {
286            EventPattern::Simple { name, .. } => name,
287            EventPattern::Sequence { name, .. } => name,
288            EventPattern::And { name, .. } => name,
289            EventPattern::Or { name, .. } => name,
290            EventPattern::Not { name, .. } => name,
291            EventPattern::Repeat { name, .. } => name,
292            EventPattern::Temporal { name, .. } => name,
293            EventPattern::Aggregation { name, .. } => name,
294        }
295    }
296}
297
298/// Field predicate for event matching
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub enum FieldPredicate {
301    /// Field equals value
302    Equals { field: String, value: String },
303    /// Field not equals value
304    NotEquals { field: String, value: String },
305    /// Field contains substring
306    Contains { field: String, substring: String },
307    /// Field matches regex
308    Regex { field: String, pattern: String },
309    /// Field greater than value
310    GreaterThan { field: String, value: f64 },
311    /// Field less than value
312    LessThan { field: String, value: f64 },
313    /// Field in range
314    InRange { field: String, min: f64, max: f64 },
315    /// Field exists
316    Exists { field: String },
317    /// Custom predicate function (serialized as name)
318    Custom { name: String },
319}
320
321/// Temporal operators (Allen's interval algebra)
322#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
323pub enum TemporalOperator {
324    /// First event before second event
325    Before,
326    /// First event after second event
327    After,
328    /// First event meets second event (end == start)
329    Meets,
330    /// First event during second event
331    During,
332    /// First event overlaps second event
333    Overlaps,
334    /// First event starts second event (same start)
335    Starts,
336    /// First event finishes second event (same end)
337    Finishes,
338    /// First event equals second event (same start and end)
339    Equals,
340}
341
342/// Aggregation function for CEP event aggregation
343#[derive(Debug, Clone, Serialize, Deserialize)]
344pub enum CepAggregationFunction {
345    /// Count events
346    Count,
347    /// Sum field values
348    Sum { field: String },
349    /// Average field values
350    Average { field: String },
351    /// Minimum field value
352    Min { field: String },
353    /// Maximum field value
354    Max { field: String },
355    /// Standard deviation of field values
356    StdDev { field: String },
357    /// Percentile of field values
358    Percentile { field: String, percentile: f64 },
359    /// Custom aggregation function
360    Custom { name: String },
361}
362
363/// Event buffer for storing recent events
364#[derive(Debug, Clone)]
365pub struct EventBuffer {
366    /// Stream name
367    pub stream_name: String,
368    /// Buffered events with timestamps
369    pub events: VecDeque<TimestampedEvent>,
370    /// Maximum buffer size
371    pub max_size: usize,
372    /// Oldest event timestamp
373    pub oldest_timestamp: Option<DateTime<Utc>>,
374    /// Newest event timestamp
375    pub newest_timestamp: Option<DateTime<Utc>>,
376}
377
378/// Timestamped event
379#[derive(Debug, Clone)]
380pub struct TimestampedEvent {
381    /// Event
382    pub event: StreamEvent,
383    /// Processing timestamp
384    pub timestamp: DateTime<Utc>,
385    /// Event ID
386    pub id: Uuid,
387}
388
389/// State machine for pattern tracking
390#[derive(Debug, Clone)]
391pub struct StateMachine {
392    /// Pattern being tracked
393    pub pattern: EventPattern,
394    /// Current state
395    pub state: State,
396    /// Partial matches
397    pub partial_matches: Vec<PartialMatch>,
398    /// Completed matches
399    pub completed_matches: Vec<CompleteMatch>,
400    /// State transition count
401    pub transition_count: usize,
402}
403
404/// State in state machine
405#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
406pub enum State {
407    /// Initial state
408    Initial,
409    /// Intermediate state
410    Intermediate { stage: usize },
411    /// Final state (pattern matched)
412    Final,
413    /// Error state (pattern violated)
414    Error,
415}
416
417/// Partial match in progress
418#[derive(Debug, Clone)]
419pub struct PartialMatch {
420    /// Match ID
421    pub id: Uuid,
422    /// Matched events so far
423    pub events: Vec<TimestampedEvent>,
424    /// Current stage in pattern
425    pub stage: usize,
426    /// Start time
427    pub start_time: DateTime<Utc>,
428    /// Last update time
429    pub last_update: DateTime<Utc>,
430    /// Match state
431    pub state: HashMap<String, String>,
432}
433
434/// Complete pattern match
435#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct CompleteMatch {
437    /// Match ID
438    pub id: Uuid,
439    /// Pattern name
440    pub pattern_name: String,
441    /// Matched events
442    pub event_ids: Vec<Uuid>,
443    /// Start time
444    pub start_time: DateTime<Utc>,
445    /// End time
446    pub end_time: DateTime<Utc>,
447    /// Match duration
448    pub duration: Duration,
449    /// Confidence score (0.0-1.0)
450    pub confidence: f64,
451    /// Additional metadata
452    pub metadata: HashMap<String, String>,
453}
454
455/// Rule engine for event processing
456#[derive(Debug, Clone)]
457pub struct RuleEngine {
458    /// Registered rules
459    pub rules: HashMap<String, ProcessingRule>,
460    /// Rule execution statistics
461    pub stats: RuleExecutionStats,
462}
463
464/// Processing rule
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct ProcessingRule {
467    /// Rule name
468    pub name: String,
469    /// Condition to trigger rule
470    pub condition: RuleCondition,
471    /// Actions to execute
472    pub actions: Vec<RuleAction>,
473    /// Priority (higher = executed first)
474    pub priority: i32,
475    /// Enabled flag
476    pub enabled: bool,
477}
478
479/// Rule condition
480#[derive(Debug, Clone, Serialize, Deserialize)]
481pub enum RuleCondition {
482    /// Pattern matched
483    PatternMatched { pattern: String },
484    /// Event field condition
485    FieldCondition { predicate: FieldPredicate },
486    /// Threshold exceeded
487    ThresholdExceeded { metric: String, threshold: f64 },
488    /// Complex condition (AND/OR)
489    Complex {
490        operator: String,
491        conditions: Vec<RuleCondition>,
492    },
493}
494
495/// Rule action
496#[derive(Debug, Clone, Serialize, Deserialize)]
497pub enum RuleAction {
498    /// Emit new event
499    EmitEvent {
500        event_type: String,
501        data: HashMap<String, String>,
502    },
503    /// Send alert
504    SendAlert { severity: String, message: String },
505    /// Update state
506    UpdateState { key: String, value: String },
507    /// Execute external webhook
508    Webhook { url: String, method: String },
509    /// Custom action
510    Custom {
511        name: String,
512        params: HashMap<String, String>,
513    },
514}
515
516/// Rule execution statistics
517#[derive(Debug, Clone, Default, Serialize, Deserialize)]
518pub struct RuleExecutionStats {
519    /// Total rules executed
520    pub total_executions: u64,
521    /// Successful executions
522    pub successful_executions: u64,
523    /// Failed executions
524    pub failed_executions: u64,
525    /// Total execution time
526    pub total_execution_time: Duration,
527    /// Average execution time
528    pub avg_execution_time: Duration,
529}
530
531/// Event correlator for finding related events
532#[derive(Debug, Clone)]
533pub struct EventCorrelator {
534    /// Correlation functions
535    pub correlation_functions: HashMap<String, CorrelationFunction>,
536    /// Correlation results cache
537    pub correlation_cache: HashMap<CorrelationKey, CorrelationResult>,
538    /// Statistics
539    pub stats: CorrelationStats,
540}
541
542/// Correlation function
543#[derive(Debug, Clone)]
544pub struct CorrelationFunction {
545    /// Function name
546    pub name: String,
547    /// Time window
548    pub time_window: Duration,
549    /// Fields to correlate
550    pub fields: Vec<String>,
551    /// Correlation threshold
552    pub threshold: f64,
553}
554
555/// Correlation key for caching
556#[derive(Debug, Clone, Hash, PartialEq, Eq)]
557pub struct CorrelationKey {
558    /// Event ID 1
559    pub event1: Uuid,
560    /// Event ID 2
561    pub event2: Uuid,
562    /// Function name
563    pub function: String,
564}
565
566/// Correlation result
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct CorrelationResult {
569    /// Correlation score (0.0-1.0)
570    pub score: f64,
571    /// Correlated fields
572    pub correlated_fields: Vec<String>,
573    /// Timestamp
574    pub timestamp: DateTime<Utc>,
575}
576
577/// Correlation statistics
578#[derive(Debug, Clone, Default, Serialize, Deserialize)]
579pub struct CorrelationStats {
580    /// Total correlations computed
581    pub total_correlations: u64,
582    /// Cache hits
583    pub cache_hits: u64,
584    /// Cache misses
585    pub cache_misses: u64,
586    /// Average correlation score
587    pub avg_correlation_score: f64,
588}
589
590/// Event enrichment service
591#[derive(Debug, Clone)]
592pub struct EnrichmentService {
593    /// Enrichment sources
594    pub sources: HashMap<String, EnrichmentSource>,
595    /// Enrichment cache
596    pub cache: HashMap<String, EnrichmentData>,
597    /// Statistics
598    pub stats: EnrichmentStats,
599}
600
601/// Enrichment source
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct EnrichmentSource {
604    /// Source name
605    pub name: String,
606    /// Source type
607    pub source_type: EnrichmentSourceType,
608    /// Lookup key field
609    pub key_field: String,
610    /// Cache TTL
611    pub cache_ttl: Duration,
612}
613
614/// Enrichment source type
615#[derive(Debug, Clone, Serialize, Deserialize)]
616pub enum EnrichmentSourceType {
617    /// External API
618    ExternalApi { url: String, auth: Option<String> },
619    /// Database
620    Database {
621        connection_string: String,
622        query: String,
623    },
624    /// In-memory cache
625    InMemory {
626        data: HashMap<String, HashMap<String, String>>,
627    },
628    /// Custom source
629    Custom { name: String },
630}
631
632/// Enrichment data
633#[derive(Debug, Clone, Serialize, Deserialize)]
634pub struct EnrichmentData {
635    /// Enriched fields
636    pub fields: HashMap<String, String>,
637    /// Source name
638    pub source: String,
639    /// Timestamp
640    pub timestamp: DateTime<Utc>,
641    /// TTL
642    pub ttl: Duration,
643}
644
645/// Enrichment statistics
646#[derive(Debug, Clone, Default, Serialize, Deserialize)]
647pub struct EnrichmentStats {
648    /// Total enrichments
649    pub total_enrichments: u64,
650    /// Cache hits
651    pub cache_hits: u64,
652    /// Cache misses
653    pub cache_misses: u64,
654    /// Failed enrichments
655    pub failed_enrichments: u64,
656}
657
658/// Pattern detector
659#[derive(Debug, Clone)]
660pub struct PatternDetector {
661    /// Registered patterns
662    pub patterns: HashMap<String, EventPattern>,
663    /// Detection algorithms
664    pub algorithms: HashMap<String, DetectionAlgorithm>,
665    /// Detection statistics
666    pub stats: DetectionStats,
667}
668
669/// Detection algorithm
670#[derive(Debug, Clone, Serialize, Deserialize)]
671pub enum DetectionAlgorithm {
672    /// Naive sequential matching
673    Sequential,
674    /// Automaton-based matching
675    Automaton,
676    /// Tree-based matching
677    Tree,
678    /// Graph-based matching
679    Graph,
680    /// Machine learning based
681    MachineLearning { model_name: String },
682}
683
684/// Detection statistics
685#[derive(Debug, Clone, Default, Serialize, Deserialize)]
686pub struct DetectionStats {
687    /// Total events processed
688    pub total_events_processed: u64,
689    /// Patterns detected
690    pub patterns_detected: u64,
691    /// False positives
692    pub false_positives: u64,
693    /// False negatives
694    pub false_negatives: u64,
695    /// Average detection latency
696    pub avg_detection_latency: Duration,
697    /// Total detection time
698    pub total_detection_time: Duration,
699}
700
701/// CEP metrics
702#[derive(Debug, Clone, Default, Serialize, Deserialize)]
703pub struct CepMetrics {
704    /// Total events processed
705    pub total_events_processed: u64,
706    /// Total patterns detected
707    pub total_patterns_detected: u64,
708    /// Events per second
709    pub events_per_second: f64,
710    /// Patterns per second
711    pub patterns_per_second: f64,
712    /// Average event processing latency
713    pub avg_event_processing_latency: Duration,
714    /// Average pattern matching latency
715    pub avg_pattern_matching_latency: Duration,
716    /// Memory usage (bytes)
717    pub memory_usage_bytes: usize,
718    /// Active partial matches
719    pub active_partial_matches: usize,
720    /// Completed matches in window
721    pub completed_matches: usize,
722    /// Garbage collections performed
723    pub gc_count: u64,
724    /// Last update time
725    pub last_update: DateTime<Utc>,
726}
727
728/// Detected pattern result
729#[derive(Debug, Clone, Serialize, Deserialize)]
730pub struct DetectedPattern {
731    /// Pattern match
732    pub pattern_match: CompleteMatch,
733    /// Triggered rules
734    pub triggered_rules: Vec<String>,
735    /// Correlation results
736    pub correlations: Vec<CorrelationResult>,
737    /// Enriched data
738    pub enrichments: HashMap<String, EnrichmentData>,
739}
740
741impl CepEngine {
742    /// Create a new CEP engine
743    pub fn new(config: CepConfig) -> Result<Self> {
744        Ok(Self {
745            patterns: Arc::new(RwLock::new(HashMap::new())),
746            event_buffers: Arc::new(RwLock::new(HashMap::new())),
747            state_machines: Arc::new(RwLock::new(HashMap::new())),
748            rule_engine: Arc::new(RwLock::new(RuleEngine {
749                rules: HashMap::new(),
750                stats: RuleExecutionStats::default(),
751            })),
752            correlator: Arc::new(RwLock::new(EventCorrelator {
753                correlation_functions: HashMap::new(),
754                correlation_cache: HashMap::new(),
755                stats: CorrelationStats::default(),
756            })),
757            enrichment_service: Arc::new(RwLock::new(EnrichmentService {
758                sources: HashMap::new(),
759                cache: HashMap::new(),
760                stats: EnrichmentStats::default(),
761            })),
762            pattern_detector: Arc::new(RwLock::new(PatternDetector {
763                patterns: HashMap::new(),
764                algorithms: HashMap::new(),
765                stats: DetectionStats::default(),
766            })),
767            metrics: Arc::new(RwLock::new(CepMetrics {
768                last_update: Utc::now(),
769                ..Default::default()
770            })),
771            config,
772            last_gc: Arc::new(RwLock::new(Instant::now())),
773        })
774    }
775
776    /// Register an event pattern
777    pub async fn register_pattern(&mut self, name: &str, pattern: EventPattern) -> Result<()> {
778        let mut patterns = self.patterns.write().await;
779        patterns.insert(name.to_string(), pattern.clone());
780
781        // Initialize state machine for pattern
782        let mut state_machines = self.state_machines.write().await;
783        state_machines.insert(
784            name.to_string(),
785            StateMachine {
786                pattern,
787                state: State::Initial,
788                partial_matches: Vec::new(),
789                completed_matches: Vec::new(),
790                transition_count: 0,
791            },
792        );
793
794        info!("Registered CEP pattern: {}", name);
795        Ok(())
796    }
797
798    /// Register a processing rule
799    pub async fn register_rule(&mut self, rule: ProcessingRule) -> Result<()> {
800        let mut rule_engine = self.rule_engine.write().await;
801        rule_engine.rules.insert(rule.name.clone(), rule.clone());
802        info!("Registered CEP rule: {}", rule.name);
803        Ok(())
804    }
805
806    /// Process an event through the CEP engine
807    pub async fn process_event(&mut self, event: StreamEvent) -> Result<Vec<DetectedPattern>> {
808        let start_time = Instant::now();
809        let event_timestamp = Utc::now();
810
811        // Create timestamped event
812        let timestamped_event = TimestampedEvent {
813            event: event.clone(),
814            timestamp: event_timestamp,
815            id: Uuid::new_v4(),
816        };
817
818        // Add to event buffer
819        self.add_to_buffer("default", timestamped_event.clone())
820            .await?;
821
822        // Run garbage collection if needed
823        self.maybe_run_gc().await?;
824
825        // Detect patterns
826        let detected_patterns = self.detect_patterns(&timestamped_event).await?;
827
828        // Execute rules for detected patterns
829        let mut results = Vec::new();
830        for pattern_match in detected_patterns {
831            let triggered_rules = self.execute_rules(&pattern_match).await?;
832
833            // Correlate events if enabled
834            let correlations = if self.config.enable_correlation {
835                self.correlate_events(&pattern_match).await?
836            } else {
837                Vec::new()
838            };
839
840            // Enrich events if enabled
841            let enrichments = if self.config.enable_enrichment {
842                self.enrich_events(&pattern_match).await?
843            } else {
844                HashMap::new()
845            };
846
847            results.push(DetectedPattern {
848                pattern_match,
849                triggered_rules,
850                correlations,
851                enrichments,
852            });
853        }
854
855        // Update metrics
856        let processing_latency = start_time.elapsed();
857        self.update_metrics(processing_latency, results.len()).await;
858
859        Ok(results)
860    }
861
862    /// Add event to buffer
863    async fn add_to_buffer(&self, stream: &str, event: TimestampedEvent) -> Result<()> {
864        let mut buffers = self.event_buffers.write().await;
865        let buffer = buffers
866            .entry(stream.to_string())
867            .or_insert_with(|| EventBuffer {
868                stream_name: stream.to_string(),
869                events: VecDeque::new(),
870                max_size: self.config.event_buffer_size,
871                oldest_timestamp: None,
872                newest_timestamp: None,
873            });
874
875        // Update timestamps
876        if buffer.oldest_timestamp.is_none() {
877            buffer.oldest_timestamp = Some(event.timestamp);
878        }
879        buffer.newest_timestamp = Some(event.timestamp);
880
881        // Add event
882        buffer.events.push_back(event);
883
884        // Trim buffer if too large
885        while buffer.events.len() > buffer.max_size {
886            buffer.events.pop_front();
887            if let Some(first_event) = buffer.events.front() {
888                buffer.oldest_timestamp = Some(first_event.timestamp);
889            }
890        }
891
892        Ok(())
893    }
894
895    /// Detect patterns in recent events
896    async fn detect_patterns(&self, new_event: &TimestampedEvent) -> Result<Vec<CompleteMatch>> {
897        let mut detected = Vec::new();
898        let mut state_machines = self.state_machines.write().await;
899
900        for (pattern_name, state_machine) in state_machines.iter_mut() {
901            // Try to match pattern
902            if let Some(complete_match) = self.try_match_pattern(state_machine, new_event).await? {
903                detected.push(complete_match);
904                debug!("Pattern detected: {}", pattern_name);
905            }
906        }
907
908        Ok(detected)
909    }
910
911    /// Try to match a pattern
912    async fn try_match_pattern(
913        &self,
914        state_machine: &mut StateMachine,
915        event: &TimestampedEvent,
916    ) -> Result<Option<CompleteMatch>> {
917        // Clone pattern to avoid borrow issues
918        let pattern = state_machine.pattern.clone();
919
920        match &pattern {
921            EventPattern::Simple { predicates, .. } => {
922                if self.evaluate_predicates(predicates, &event.event).await? {
923                    Ok(Some(CompleteMatch {
924                        id: Uuid::new_v4(),
925                        pattern_name: pattern.name().to_string(),
926                        event_ids: vec![event.id],
927                        start_time: event.timestamp,
928                        end_time: event.timestamp,
929                        duration: Duration::from_secs(0),
930                        confidence: 1.0,
931                        metadata: HashMap::new(),
932                    }))
933                } else {
934                    Ok(None)
935                }
936            }
937            EventPattern::Sequence {
938                patterns,
939                time_window,
940                strict,
941                ..
942            } => {
943                self.match_sequence(state_machine, event, patterns, *time_window, *strict)
944                    .await
945            }
946            EventPattern::And {
947                patterns,
948                time_window,
949                ..
950            } => {
951                self.match_conjunction(state_machine, event, patterns, *time_window)
952                    .await
953            }
954            _ => {
955                // Other pattern types (to be implemented)
956                Ok(None)
957            }
958        }
959    }
960
961    /// Match sequence pattern
962    async fn match_sequence(
963        &self,
964        state_machine: &mut StateMachine,
965        event: &TimestampedEvent,
966        patterns: &[EventPattern],
967        time_window: Option<Duration>,
968        _strict: bool,
969    ) -> Result<Option<CompleteMatch>> {
970        // Update partial matches
971        let mut new_partial_matches = Vec::new();
972
973        for partial_match in &mut state_machine.partial_matches {
974            let next_stage = partial_match.stage;
975            if next_stage < patterns.len() {
976                if let EventPattern::Simple { predicates, .. } = &patterns[next_stage] {
977                    if self.evaluate_predicates(predicates, &event.event).await? {
978                        // Check time window
979                        if let Some(window) = time_window {
980                            let elapsed = event
981                                .timestamp
982                                .signed_duration_since(partial_match.start_time);
983                            if elapsed.num_seconds() > window.as_secs() as i64 {
984                                continue; // Expired
985                            }
986                        }
987
988                        // Advance match
989                        let mut new_match = partial_match.clone();
990                        new_match.events.push(event.clone());
991                        new_match.stage += 1;
992                        new_match.last_update = event.timestamp;
993
994                        if new_match.stage == patterns.len() {
995                            // Complete match!
996                            let event_ids: Vec<Uuid> =
997                                new_match.events.iter().map(|e| e.id).collect();
998                            let duration = event
999                                .timestamp
1000                                .signed_duration_since(new_match.start_time)
1001                                .to_std()
1002                                .unwrap_or(Duration::from_secs(0));
1003
1004                            return Ok(Some(CompleteMatch {
1005                                id: Uuid::new_v4(),
1006                                pattern_name: state_machine.pattern.name().to_string(),
1007                                event_ids,
1008                                start_time: new_match.start_time,
1009                                end_time: event.timestamp,
1010                                duration,
1011                                confidence: 1.0,
1012                                metadata: HashMap::new(),
1013                            }));
1014                        } else {
1015                            new_partial_matches.push(new_match);
1016                        }
1017                    }
1018                }
1019            }
1020        }
1021
1022        // Check if this event starts a new partial match
1023        if let EventPattern::Simple { predicates, .. } = &patterns[0] {
1024            if self.evaluate_predicates(predicates, &event.event).await? {
1025                new_partial_matches.push(PartialMatch {
1026                    id: Uuid::new_v4(),
1027                    events: vec![event.clone()],
1028                    stage: 1,
1029                    start_time: event.timestamp,
1030                    last_update: event.timestamp,
1031                    state: HashMap::new(),
1032                });
1033            }
1034        }
1035
1036        state_machine.partial_matches = new_partial_matches;
1037        Ok(None)
1038    }
1039
1040    /// Match conjunction pattern
1041    async fn match_conjunction(
1042        &self,
1043        _state_machine: &mut StateMachine,
1044        _event: &TimestampedEvent,
1045        _patterns: &[EventPattern],
1046        _time_window: Option<Duration>,
1047    ) -> Result<Option<CompleteMatch>> {
1048        // Simplified implementation
1049        Ok(None)
1050    }
1051
1052    /// Evaluate predicates against an event
1053    async fn evaluate_predicates(
1054        &self,
1055        predicates: &[FieldPredicate],
1056        event: &StreamEvent,
1057    ) -> Result<bool> {
1058        for predicate in predicates {
1059            match predicate {
1060                FieldPredicate::Equals { field, value } if field == "event_type" => {
1061                    // Extract field from event (simplified)
1062                    let event_type = match event {
1063                        StreamEvent::TripleAdded { .. } => "TripleAdded",
1064                        StreamEvent::TripleRemoved { .. } => "TripleRemoved",
1065                        StreamEvent::QuadAdded { .. } => "QuadAdded",
1066                        StreamEvent::QuadRemoved { .. } => "QuadRemoved",
1067                        StreamEvent::GraphCreated { .. } => "GraphCreated",
1068                        StreamEvent::GraphCleared { .. } => "GraphCleared",
1069                        StreamEvent::GraphDeleted { .. } => "GraphDeleted",
1070                        StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
1071                        StreamEvent::TransactionBegin { .. } => "TransactionBegin",
1072                        StreamEvent::TransactionCommit { .. } => "TransactionCommit",
1073                        StreamEvent::TransactionAbort { .. } => "TransactionAbort",
1074                        StreamEvent::SchemaChanged { .. } => "SchemaChanged",
1075                        StreamEvent::Heartbeat { .. } => "Heartbeat",
1076                        _ => "Other", // Catch-all for other event types
1077                    };
1078                    if event_type != value {
1079                        return Ok(false);
1080                    }
1081                }
1082                FieldPredicate::Contains { field, substring } if field == "source" => {
1083                    // Simplified implementation
1084                    let source = match event {
1085                        StreamEvent::Heartbeat { source, .. } => source,
1086                        _ => return Ok(false),
1087                    };
1088                    if !source.contains(substring) {
1089                        return Ok(false);
1090                    }
1091                }
1092                _ => {
1093                    // Other predicates (simplified)
1094                }
1095            }
1096        }
1097        Ok(true)
1098    }
1099
1100    /// Execute rules for a pattern match
1101    async fn execute_rules(&self, pattern_match: &CompleteMatch) -> Result<Vec<String>> {
1102        let mut triggered = Vec::new();
1103
1104        // Clone rules to avoid borrow issues
1105        let rules = {
1106            let rule_engine = self.rule_engine.read().await;
1107            rule_engine.rules.clone()
1108        };
1109
1110        for (rule_name, rule) in &rules {
1111            if !rule.enabled {
1112                continue;
1113            }
1114
1115            // Check if rule condition matches
1116            if self
1117                .evaluate_rule_condition(&rule.condition, pattern_match)
1118                .await?
1119            {
1120                // Execute actions
1121                for action in &rule.actions {
1122                    self.execute_rule_action(action).await?;
1123                }
1124                triggered.push(rule_name.clone());
1125
1126                // Update stats
1127                let mut rule_engine = self.rule_engine.write().await;
1128                rule_engine.stats.successful_executions += 1;
1129            }
1130        }
1131
1132        Ok(triggered)
1133    }
1134
1135    /// Evaluate rule condition
1136    async fn evaluate_rule_condition(
1137        &self,
1138        condition: &RuleCondition,
1139        pattern_match: &CompleteMatch,
1140    ) -> Result<bool> {
1141        match condition {
1142            RuleCondition::PatternMatched { pattern } => Ok(&pattern_match.pattern_name == pattern),
1143            _ => {
1144                // Other conditions (simplified)
1145                Ok(false)
1146            }
1147        }
1148    }
1149
1150    /// Execute rule action
1151    async fn execute_rule_action(&self, action: &RuleAction) -> Result<()> {
1152        match action {
1153            RuleAction::SendAlert { severity, message } => {
1154                info!("CEP Alert [{}]: {}", severity, message);
1155            }
1156            RuleAction::EmitEvent { event_type, data } => {
1157                debug!("CEP Emit Event: {} with data: {:?}", event_type, data);
1158            }
1159            _ => {
1160                // Other actions (simplified)
1161            }
1162        }
1163        Ok(())
1164    }
1165
1166    /// Correlate events
1167    async fn correlate_events(
1168        &self,
1169        _pattern_match: &CompleteMatch,
1170    ) -> Result<Vec<CorrelationResult>> {
1171        // Simplified implementation
1172        Ok(Vec::new())
1173    }
1174
1175    /// Enrich events
1176    async fn enrich_events(
1177        &self,
1178        _pattern_match: &CompleteMatch,
1179    ) -> Result<HashMap<String, EnrichmentData>> {
1180        // Simplified implementation
1181        Ok(HashMap::new())
1182    }
1183
1184    /// Run garbage collection if needed
1185    async fn maybe_run_gc(&self) -> Result<()> {
1186        let mut last_gc = self.last_gc.write().await;
1187        if last_gc.elapsed() >= self.config.gc_interval {
1188            self.run_gc().await?;
1189            *last_gc = Instant::now();
1190
1191            let mut metrics = self.metrics.write().await;
1192            metrics.gc_count += 1;
1193        }
1194        Ok(())
1195    }
1196
1197    /// Run garbage collection
1198    async fn run_gc(&self) -> Result<()> {
1199        let cutoff_time =
1200            Utc::now() - ChronoDuration::seconds(self.config.max_time_window.as_secs() as i64);
1201
1202        // Clean event buffers
1203        let mut buffers = self.event_buffers.write().await;
1204        for buffer in buffers.values_mut() {
1205            buffer.events.retain(|e| e.timestamp > cutoff_time);
1206            if let Some(first_event) = buffer.events.front() {
1207                buffer.oldest_timestamp = Some(first_event.timestamp);
1208            }
1209        }
1210
1211        // Clean partial matches
1212        let mut state_machines = self.state_machines.write().await;
1213        for state_machine in state_machines.values_mut() {
1214            state_machine
1215                .partial_matches
1216                .retain(|m| m.last_update > cutoff_time);
1217        }
1218
1219        debug!("CEP garbage collection completed");
1220        Ok(())
1221    }
1222
1223    /// Update metrics
1224    async fn update_metrics(&self, processing_latency: Duration, patterns_detected: usize) {
1225        let mut metrics = self.metrics.write().await;
1226        metrics.total_events_processed += 1;
1227        metrics.total_patterns_detected += patterns_detected as u64;
1228
1229        let now = Utc::now();
1230        let elapsed_duration = now.signed_duration_since(metrics.last_update);
1231        let elapsed_secs = elapsed_duration.num_seconds() as f64;
1232
1233        if elapsed_secs > 0.0 {
1234            metrics.events_per_second = metrics.total_events_processed as f64 / elapsed_secs;
1235            metrics.patterns_per_second = metrics.total_patterns_detected as f64 / elapsed_secs;
1236        }
1237
1238        // Update average latency
1239        let total_latency = metrics.avg_event_processing_latency.as_micros()
1240            * (metrics.total_events_processed - 1) as u128
1241            + processing_latency.as_micros();
1242        metrics.avg_event_processing_latency =
1243            Duration::from_micros((total_latency / metrics.total_events_processed as u128) as u64);
1244
1245        // Count active partial matches
1246        let state_machines = self.state_machines.read().await;
1247        metrics.active_partial_matches = state_machines
1248            .values()
1249            .map(|sm| sm.partial_matches.len())
1250            .sum();
1251    }
1252
1253    /// Get current metrics
1254    pub async fn get_metrics(&self) -> CepMetrics {
1255        self.metrics.read().await.clone()
1256    }
1257
1258    /// Get statistics
1259    pub async fn get_statistics(&self) -> CepStatistics {
1260        let metrics = self.metrics.read().await;
1261        let rule_engine = self.rule_engine.read().await;
1262        let correlator = self.correlator.read().await;
1263        let enrichment = self.enrichment_service.read().await;
1264        let detector = self.pattern_detector.read().await;
1265
1266        CepStatistics {
1267            metrics: metrics.clone(),
1268            rule_stats: rule_engine.stats.clone(),
1269            correlation_stats: correlator.stats.clone(),
1270            enrichment_stats: enrichment.stats.clone(),
1271            detection_stats: detector.stats.clone(),
1272        }
1273    }
1274}
1275
1276/// CEP statistics
1277#[derive(Debug, Clone, Serialize, Deserialize)]
1278pub struct CepStatistics {
1279    /// CEP metrics
1280    pub metrics: CepMetrics,
1281    /// Rule execution statistics
1282    pub rule_stats: RuleExecutionStats,
1283    /// Correlation statistics
1284    pub correlation_stats: CorrelationStats,
1285    /// Enrichment statistics
1286    pub enrichment_stats: EnrichmentStats,
1287    /// Detection statistics
1288    pub detection_stats: DetectionStats,
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use super::*;
1294    use crate::event::EventMetadata;
1295
1296    #[tokio::test]
1297    async fn test_cep_engine_creation() {
1298        let config = CepConfig::default();
1299        let engine = CepEngine::new(config);
1300        assert!(engine.is_ok());
1301    }
1302
1303    #[tokio::test]
1304    async fn test_pattern_registration() {
1305        let config = CepConfig::default();
1306        let mut engine = CepEngine::new(config).unwrap();
1307
1308        let pattern = EventPattern::simple("event_type", "test");
1309        let result = engine.register_pattern("test_pattern", pattern).await;
1310        assert!(result.is_ok());
1311
1312        let patterns = engine.patterns.read().await;
1313        assert!(patterns.contains_key("test_pattern"));
1314    }
1315
1316    #[tokio::test]
1317    async fn test_simple_pattern_matching() {
1318        let config = CepConfig::default();
1319        let mut engine = CepEngine::new(config).unwrap();
1320
1321        let pattern = EventPattern::simple("event_type", "Heartbeat");
1322        engine.register_pattern("heartbeat", pattern).await.unwrap();
1323
1324        let event = StreamEvent::Heartbeat {
1325            timestamp: Utc::now(),
1326            source: "test".to_string(),
1327            metadata: EventMetadata::default(),
1328        };
1329
1330        let detected = engine.process_event(event).await.unwrap();
1331        assert!(!detected.is_empty());
1332    }
1333
1334    #[tokio::test]
1335    async fn test_sequence_pattern() {
1336        let config = CepConfig::default();
1337        let mut engine = CepEngine::new(config).unwrap();
1338
1339        let pattern = EventPattern::sequence(vec![
1340            EventPattern::simple("event_type", "Heartbeat"),
1341            EventPattern::simple("event_type", "Heartbeat"),
1342        ])
1343        .with_time_window(Duration::from_secs(10));
1344
1345        engine
1346            .register_pattern("double_heartbeat", pattern)
1347            .await
1348            .unwrap();
1349
1350        let event1 = StreamEvent::Heartbeat {
1351            timestamp: Utc::now(),
1352            source: "test".to_string(),
1353            metadata: EventMetadata::default(),
1354        };
1355
1356        let detected1 = engine.process_event(event1).await.unwrap();
1357        assert!(detected1.is_empty()); // First event, no match yet
1358
1359        let event2 = StreamEvent::Heartbeat {
1360            timestamp: Utc::now(),
1361            source: "test".to_string(),
1362            metadata: EventMetadata::default(),
1363        };
1364
1365        let detected2 = engine.process_event(event2).await.unwrap();
1366        assert!(!detected2.is_empty()); // Second event, pattern matched
1367    }
1368
1369    #[tokio::test]
1370    async fn test_rule_registration() {
1371        let config = CepConfig::default();
1372        let mut engine = CepEngine::new(config).unwrap();
1373
1374        let rule = ProcessingRule {
1375            name: "test_rule".to_string(),
1376            condition: RuleCondition::PatternMatched {
1377                pattern: "heartbeat".to_string(),
1378            },
1379            actions: vec![RuleAction::SendAlert {
1380                severity: "info".to_string(),
1381                message: "Heartbeat detected".to_string(),
1382            }],
1383            priority: 1,
1384            enabled: true,
1385        };
1386
1387        let result = engine.register_rule(rule).await;
1388        assert!(result.is_ok());
1389    }
1390
1391    #[tokio::test]
1392    async fn test_event_buffer() {
1393        let config = CepConfig::default();
1394        let engine = CepEngine::new(config).unwrap();
1395
1396        let event = StreamEvent::Heartbeat {
1397            timestamp: Utc::now(),
1398            source: "test".to_string(),
1399            metadata: EventMetadata::default(),
1400        };
1401
1402        let timestamped = TimestampedEvent {
1403            event,
1404            timestamp: Utc::now(),
1405            id: Uuid::new_v4(),
1406        };
1407
1408        engine
1409            .add_to_buffer("test_stream", timestamped)
1410            .await
1411            .unwrap();
1412
1413        let buffers = engine.event_buffers.read().await;
1414        assert!(buffers.contains_key("test_stream"));
1415        assert_eq!(buffers.get("test_stream").unwrap().events.len(), 1);
1416    }
1417
1418    #[tokio::test]
1419    async fn test_predicate_evaluation() {
1420        let config = CepConfig::default();
1421        let engine = CepEngine::new(config).unwrap();
1422
1423        let predicates = vec![FieldPredicate::Equals {
1424            field: "event_type".to_string(),
1425            value: "Heartbeat".to_string(),
1426        }];
1427
1428        let event = StreamEvent::Heartbeat {
1429            timestamp: Utc::now(),
1430            source: "test".to_string(),
1431            metadata: EventMetadata::default(),
1432        };
1433
1434        let result = engine
1435            .evaluate_predicates(&predicates, &event)
1436            .await
1437            .unwrap();
1438        assert!(result);
1439    }
1440
1441    #[tokio::test]
1442    async fn test_metrics_collection() {
1443        let config = CepConfig::default();
1444        let mut engine = CepEngine::new(config).unwrap();
1445
1446        let event = StreamEvent::Heartbeat {
1447            timestamp: Utc::now(),
1448            source: "test".to_string(),
1449            metadata: EventMetadata::default(),
1450        };
1451
1452        engine.process_event(event).await.unwrap();
1453
1454        let metrics = engine.get_metrics().await;
1455        assert_eq!(metrics.total_events_processed, 1);
1456    }
1457
1458    #[tokio::test]
1459    async fn test_garbage_collection() {
1460        let config = CepConfig {
1461            gc_interval: Duration::from_millis(10),
1462            ..Default::default()
1463        };
1464        let engine = CepEngine::new(config).unwrap();
1465
1466        // Add old event
1467        let old_event = TimestampedEvent {
1468            event: StreamEvent::Heartbeat {
1469                timestamp: Utc::now(),
1470                source: "test".to_string(),
1471                metadata: EventMetadata::default(),
1472            },
1473            timestamp: Utc::now() - ChronoDuration::hours(2),
1474            id: Uuid::new_v4(),
1475        };
1476
1477        engine.add_to_buffer("test", old_event).await.unwrap();
1478
1479        // Wait for GC
1480        tokio::time::sleep(Duration::from_millis(20)).await;
1481
1482        // Run GC
1483        engine.run_gc().await.unwrap();
1484
1485        let buffers = engine.event_buffers.read().await;
1486        assert!(buffers.get("test").unwrap().events.is_empty());
1487    }
1488
1489    #[tokio::test]
1490    async fn test_pattern_with_time_window() {
1491        let pattern = EventPattern::sequence(vec![
1492            EventPattern::simple("type", "A"),
1493            EventPattern::simple("type", "B"),
1494        ])
1495        .with_time_window(Duration::from_secs(5));
1496
1497        match pattern {
1498            EventPattern::Sequence { time_window, .. } => {
1499                assert_eq!(time_window, Some(Duration::from_secs(5)));
1500            }
1501            _ => panic!("Expected sequence pattern"),
1502        }
1503    }
1504
1505    #[tokio::test]
1506    async fn test_statistics() {
1507        let config = CepConfig::default();
1508        let engine = CepEngine::new(config).unwrap();
1509
1510        let stats = engine.get_statistics().await;
1511        assert_eq!(stats.metrics.total_events_processed, 0);
1512    }
1513}