oxirs_stream/
cep_engine.rs

1//! # Complex Event Processing (CEP) Engine
2//!
3//! Production-grade CEP engine for detecting complex patterns across multiple event streams,
4//! featuring composite event detection, event correlation, state machines, and rule-based
5//! event processing with real-time pattern matching.
6//!
7//! ## Features
8//!
9//! - **Composite Event Detection**: Detect complex patterns from multiple simple events
10//! - **Event Correlation**: Correlate events across streams using time windows and predicates
11//! - **State Machine Processing**: Define complex event sequences with state transitions
12//! - **Rule-Based Engine**: Define processing rules with conditions and actions
13//! - **Temporal Operators**: Before, After, During, Overlaps, Meets, Starts, Finishes
14//! - **Event Aggregation**: Aggregate events over time windows with custom functions
15//! - **Event Enrichment**: Enrich events with contextual data from external sources
16//! - **Pattern Library**: Pre-defined patterns for common scenarios
17//! - **Real-time Processing**: Sub-millisecond pattern detection latency
18//! - **Distributed Support**: Partition-aware processing for horizontal scaling
19//!
20//! ## Example
21//!
22//! ```no_run
23//! use oxirs_stream::cep_engine::{CepEngine, CepConfig, EventPattern, TemporalOperator};
24//! use oxirs_stream::event::StreamEvent;
25//!
26//! # async fn example() -> anyhow::Result<()> {
27//! let config = CepConfig::default();
28//! let mut engine = CepEngine::new(config)?;
29//!
30//! // Define a pattern: "A followed by B within 10 seconds"
31//! let pattern = EventPattern::sequence(vec![
32//!     EventPattern::simple("event_type", "A"),
33//!     EventPattern::simple("event_type", "B"),
34//! ]).with_time_window(std::time::Duration::from_secs(10));
35//!
36//! engine.register_pattern("a_then_b", pattern).await?;
37//!
38//! // Process events
39//! # let event = StreamEvent::Heartbeat {
40//! #     timestamp: chrono::Utc::now(),
41//! #     source: "test".to_string(),
42//! #     metadata: Default::default(),
43//! # };
44//! let detected_patterns = engine.process_event(event).await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use crate::event::StreamEvent;
50use anyhow::Result;
51use chrono::{DateTime, Duration as ChronoDuration, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, VecDeque};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56use tokio::sync::RwLock;
57use tracing::{debug, info};
58use uuid::Uuid;
59
60/// Configuration for CEP engine
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CepConfig {
63    /// Maximum number of events to keep in memory per partition
64    pub max_events_in_memory: usize,
65
66    /// Maximum time window for pattern detection
67    pub max_time_window: Duration,
68
69    /// Enable event correlation
70    pub enable_correlation: bool,
71
72    /// Enable state machine processing
73    pub enable_state_machines: bool,
74
75    /// Enable rule-based processing
76    pub enable_rules: bool,
77
78    /// Enable event enrichment
79    pub enable_enrichment: bool,
80
81    /// Maximum pattern complexity (nested depth)
82    pub max_pattern_depth: usize,
83
84    /// Pattern matching timeout
85    pub pattern_matching_timeout: Duration,
86
87    /// Event buffer size per stream
88    pub event_buffer_size: usize,
89
90    /// Enable metrics collection
91    pub collect_metrics: bool,
92
93    /// Garbage collection interval for expired events
94    pub gc_interval: Duration,
95
96    /// Enable distributed processing
97    pub enable_distributed: bool,
98
99    /// Number of partitions for distributed processing
100    pub num_partitions: usize,
101}
102
103impl Default for CepConfig {
104    fn default() -> Self {
105        Self {
106            max_events_in_memory: 100000,
107            max_time_window: Duration::from_secs(3600),
108            enable_correlation: true,
109            enable_state_machines: true,
110            enable_rules: true,
111            enable_enrichment: true,
112            max_pattern_depth: 10,
113            pattern_matching_timeout: Duration::from_millis(100),
114            event_buffer_size: 10000,
115            collect_metrics: true,
116            gc_interval: Duration::from_secs(60),
117            enable_distributed: false,
118            num_partitions: 8,
119        }
120    }
121}
122
123/// Complex Event Processing engine
124pub struct CepEngine {
125    /// Registered patterns
126    patterns: Arc<RwLock<HashMap<String, EventPattern>>>,
127
128    /// Event buffers per stream
129    event_buffers: Arc<RwLock<HashMap<String, EventBuffer>>>,
130
131    /// State machines for pattern tracking
132    state_machines: Arc<RwLock<HashMap<String, StateMachine>>>,
133
134    /// Rule engine
135    rule_engine: Arc<RwLock<RuleEngine>>,
136
137    /// Event correlator
138    correlator: Arc<RwLock<EventCorrelator>>,
139
140    /// Event enrichment service
141    enrichment_service: Arc<RwLock<EnrichmentService>>,
142
143    /// Pattern detector
144    pattern_detector: Arc<RwLock<PatternDetector>>,
145
146    /// Metrics collector
147    metrics: Arc<RwLock<CepMetrics>>,
148
149    /// Configuration
150    config: CepConfig,
151
152    /// Last garbage collection time
153    last_gc: Arc<RwLock<Instant>>,
154}
155
156/// Event pattern definition
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub enum EventPattern {
159    /// Simple event pattern with field predicates
160    Simple {
161        /// Pattern name
162        name: String,
163        /// Field predicates
164        predicates: Vec<FieldPredicate>,
165    },
166
167    /// Sequence pattern (events in order)
168    Sequence {
169        /// Pattern name
170        name: String,
171        /// Sub-patterns
172        patterns: Vec<EventPattern>,
173        /// Time window
174        time_window: Option<Duration>,
175        /// Strict ordering (no interleaving)
176        strict: bool,
177    },
178
179    /// Conjunction pattern (all events must occur)
180    And {
181        /// Pattern name
182        name: String,
183        /// Sub-patterns
184        patterns: Vec<EventPattern>,
185        /// Time window
186        time_window: Option<Duration>,
187    },
188
189    /// Disjunction pattern (any event occurs)
190    Or {
191        /// Pattern name
192        name: String,
193        /// Sub-patterns
194        patterns: Vec<EventPattern>,
195    },
196
197    /// Negation pattern (event must not occur)
198    Not {
199        /// Pattern name
200        name: String,
201        /// Pattern to negate
202        pattern: Box<EventPattern>,
203        /// Time window for negation
204        time_window: Duration,
205    },
206
207    /// Repeat pattern (event occurs N times)
208    Repeat {
209        /// Pattern name
210        name: String,
211        /// Pattern to repeat
212        pattern: Box<EventPattern>,
213        /// Minimum occurrences
214        min_count: usize,
215        /// Maximum occurrences
216        max_count: Option<usize>,
217        /// Time window
218        time_window: Option<Duration>,
219    },
220
221    /// Temporal pattern with Allen's interval algebra
222    Temporal {
223        /// Pattern name
224        name: String,
225        /// First event pattern
226        first: Box<EventPattern>,
227        /// Temporal operator
228        operator: TemporalOperator,
229        /// Second event pattern
230        second: Box<EventPattern>,
231        /// Time tolerance
232        tolerance: Option<Duration>,
233    },
234
235    /// Aggregation pattern (aggregate events over window)
236    Aggregation {
237        /// Pattern name
238        name: String,
239        /// Pattern to aggregate
240        pattern: Box<EventPattern>,
241        /// Aggregation function
242        aggregation: CepAggregationFunction,
243        /// Time window
244        window: Duration,
245        /// Threshold for triggering
246        threshold: f64,
247    },
248}
249
250impl EventPattern {
251    /// Create a simple pattern
252    pub fn simple(field: &str, value: &str) -> Self {
253        EventPattern::Simple {
254            name: format!("{}={}", field, value),
255            predicates: vec![FieldPredicate::Equals {
256                field: field.to_string(),
257                value: value.to_string(),
258            }],
259        }
260    }
261
262    /// Create a sequence pattern
263    pub fn sequence(patterns: Vec<EventPattern>) -> Self {
264        EventPattern::Sequence {
265            name: "sequence".to_string(),
266            patterns,
267            time_window: None,
268            strict: false,
269        }
270    }
271
272    /// Add time window to pattern
273    pub fn with_time_window(mut self, window: Duration) -> Self {
274        match &mut self {
275            EventPattern::Sequence { time_window, .. } => *time_window = Some(window),
276            EventPattern::And { time_window, .. } => *time_window = Some(window),
277            EventPattern::Repeat { time_window, .. } => *time_window = Some(window),
278            _ => {}
279        }
280        self
281    }
282
283    /// Get pattern name
284    pub fn name(&self) -> &str {
285        match self {
286            EventPattern::Simple { name, .. } => name,
287            EventPattern::Sequence { name, .. } => name,
288            EventPattern::And { name, .. } => name,
289            EventPattern::Or { name, .. } => name,
290            EventPattern::Not { name, .. } => name,
291            EventPattern::Repeat { name, .. } => name,
292            EventPattern::Temporal { name, .. } => name,
293            EventPattern::Aggregation { name, .. } => name,
294        }
295    }
296}
297
298/// Field predicate for event matching
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub enum FieldPredicate {
301    /// Field equals value
302    Equals { field: String, value: String },
303    /// Field not equals value
304    NotEquals { field: String, value: String },
305    /// Field contains substring
306    Contains { field: String, substring: String },
307    /// Field matches regex
308    Regex { field: String, pattern: String },
309    /// Field greater than value
310    GreaterThan { field: String, value: f64 },
311    /// Field less than value
312    LessThan { field: String, value: f64 },
313    /// Field in range
314    InRange { field: String, min: f64, max: f64 },
315    /// Field exists
316    Exists { field: String },
317    /// Custom predicate function (serialized as name)
318    Custom { name: String },
319}
320
321/// Temporal operators (Allen's interval algebra)
322#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
323pub enum TemporalOperator {
324    /// First event before second event
325    Before,
326    /// First event after second event
327    After,
328    /// First event meets second event (end == start)
329    Meets,
330    /// First event during second event
331    During,
332    /// First event overlaps second event
333    Overlaps,
334    /// First event starts second event (same start)
335    Starts,
336    /// First event finishes second event (same end)
337    Finishes,
338    /// First event equals second event (same start and end)
339    Equals,
340}
341
342/// Aggregation function for CEP event aggregation
343#[derive(Debug, Clone, Serialize, Deserialize)]
344pub enum CepAggregationFunction {
345    /// Count events
346    Count,
347    /// Sum field values
348    Sum { field: String },
349    /// Average field values
350    Average { field: String },
351    /// Minimum field value
352    Min { field: String },
353    /// Maximum field value
354    Max { field: String },
355    /// Standard deviation of field values
356    StdDev { field: String },
357    /// Percentile of field values
358    Percentile { field: String, percentile: f64 },
359    /// Custom aggregation function
360    Custom { name: String },
361}
362
363/// Event buffer for storing recent events
364#[derive(Debug, Clone)]
365pub struct EventBuffer {
366    /// Stream name
367    pub stream_name: String,
368    /// Buffered events with timestamps
369    pub events: VecDeque<TimestampedEvent>,
370    /// Maximum buffer size
371    pub max_size: usize,
372    /// Oldest event timestamp
373    pub oldest_timestamp: Option<DateTime<Utc>>,
374    /// Newest event timestamp
375    pub newest_timestamp: Option<DateTime<Utc>>,
376}
377
378/// Timestamped event
379#[derive(Debug, Clone)]
380pub struct TimestampedEvent {
381    /// Event
382    pub event: StreamEvent,
383    /// Processing timestamp
384    pub timestamp: DateTime<Utc>,
385    /// Event ID
386    pub id: Uuid,
387}
388
389/// State machine for pattern tracking
390#[derive(Debug, Clone)]
391pub struct StateMachine {
392    /// Pattern being tracked
393    pub pattern: EventPattern,
394    /// Current state
395    pub state: State,
396    /// Partial matches
397    pub partial_matches: Vec<PartialMatch>,
398    /// Completed matches
399    pub completed_matches: Vec<CompleteMatch>,
400    /// State transition count
401    pub transition_count: usize,
402}
403
404/// State in state machine
405#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
406pub enum State {
407    /// Initial state
408    Initial,
409    /// Intermediate state
410    Intermediate { stage: usize },
411    /// Final state (pattern matched)
412    Final,
413    /// Error state (pattern violated)
414    Error,
415}
416
417/// Partial match in progress
418#[derive(Debug, Clone)]
419pub struct PartialMatch {
420    /// Match ID
421    pub id: Uuid,
422    /// Matched events so far
423    pub events: Vec<TimestampedEvent>,
424    /// Current stage in pattern
425    pub stage: usize,
426    /// Start time
427    pub start_time: DateTime<Utc>,
428    /// Last update time
429    pub last_update: DateTime<Utc>,
430    /// Match state
431    pub state: HashMap<String, String>,
432}
433
434/// Complete pattern match
435#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct CompleteMatch {
437    /// Match ID
438    pub id: Uuid,
439    /// Pattern name
440    pub pattern_name: String,
441    /// Matched events
442    pub event_ids: Vec<Uuid>,
443    /// Start time
444    pub start_time: DateTime<Utc>,
445    /// End time
446    pub end_time: DateTime<Utc>,
447    /// Match duration
448    pub duration: Duration,
449    /// Confidence score (0.0-1.0)
450    pub confidence: f64,
451    /// Additional metadata
452    pub metadata: HashMap<String, String>,
453}
454
455/// Rule engine for event processing
456#[derive(Debug, Clone)]
457pub struct RuleEngine {
458    /// Registered rules
459    pub rules: HashMap<String, ProcessingRule>,
460    /// Rule execution statistics
461    pub stats: RuleExecutionStats,
462}
463
464/// Processing rule
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct ProcessingRule {
467    /// Rule name
468    pub name: String,
469    /// Condition to trigger rule
470    pub condition: RuleCondition,
471    /// Actions to execute
472    pub actions: Vec<RuleAction>,
473    /// Priority (higher = executed first)
474    pub priority: i32,
475    /// Enabled flag
476    pub enabled: bool,
477}
478
479/// Rule condition
480#[derive(Debug, Clone, Serialize, Deserialize)]
481pub enum RuleCondition {
482    /// Pattern matched
483    PatternMatched { pattern: String },
484    /// Event field condition
485    FieldCondition { predicate: FieldPredicate },
486    /// Threshold exceeded
487    ThresholdExceeded { metric: String, threshold: f64 },
488    /// Complex condition (AND/OR)
489    Complex {
490        operator: String,
491        conditions: Vec<RuleCondition>,
492    },
493}
494
495/// Rule action
496#[derive(Debug, Clone, Serialize, Deserialize)]
497pub enum RuleAction {
498    /// Emit new event
499    EmitEvent {
500        event_type: String,
501        data: HashMap<String, String>,
502    },
503    /// Send alert
504    SendAlert { severity: String, message: String },
505    /// Update state
506    UpdateState { key: String, value: String },
507    /// Execute external webhook
508    Webhook { url: String, method: String },
509    /// Custom action
510    Custom {
511        name: String,
512        params: HashMap<String, String>,
513    },
514}
515
516/// Rule execution statistics
517#[derive(Debug, Clone, Default, Serialize, Deserialize)]
518pub struct RuleExecutionStats {
519    /// Total rules executed
520    pub total_executions: u64,
521    /// Successful executions
522    pub successful_executions: u64,
523    /// Failed executions
524    pub failed_executions: u64,
525    /// Total execution time
526    pub total_execution_time: Duration,
527    /// Average execution time
528    pub avg_execution_time: Duration,
529}
530
531/// Event correlator for finding related events
532#[derive(Debug, Clone)]
533pub struct EventCorrelator {
534    /// Correlation functions
535    pub correlation_functions: HashMap<String, CorrelationFunction>,
536    /// Correlation results cache
537    pub correlation_cache: HashMap<CorrelationKey, CorrelationResult>,
538    /// Statistics
539    pub stats: CorrelationStats,
540}
541
542/// Correlation function
543#[derive(Debug, Clone)]
544pub struct CorrelationFunction {
545    /// Function name
546    pub name: String,
547    /// Time window
548    pub time_window: Duration,
549    /// Fields to correlate
550    pub fields: Vec<String>,
551    /// Correlation threshold
552    pub threshold: f64,
553}
554
555/// Correlation key for caching
556#[derive(Debug, Clone, Hash, PartialEq, Eq)]
557pub struct CorrelationKey {
558    /// Event ID 1
559    pub event1: Uuid,
560    /// Event ID 2
561    pub event2: Uuid,
562    /// Function name
563    pub function: String,
564}
565
566/// Correlation result
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct CorrelationResult {
569    /// Correlation score (0.0-1.0)
570    pub score: f64,
571    /// Correlated fields
572    pub correlated_fields: Vec<String>,
573    /// Timestamp
574    pub timestamp: DateTime<Utc>,
575}
576
577/// Correlation statistics
578#[derive(Debug, Clone, Default, Serialize, Deserialize)]
579pub struct CorrelationStats {
580    /// Total correlations computed
581    pub total_correlations: u64,
582    /// Cache hits
583    pub cache_hits: u64,
584    /// Cache misses
585    pub cache_misses: u64,
586    /// Average correlation score
587    pub avg_correlation_score: f64,
588}
589
590/// Event enrichment service
591#[derive(Debug, Clone)]
592pub struct EnrichmentService {
593    /// Enrichment sources
594    pub sources: HashMap<String, EnrichmentSource>,
595    /// Enrichment cache
596    pub cache: HashMap<String, EnrichmentData>,
597    /// Statistics
598    pub stats: EnrichmentStats,
599}
600
601/// Enrichment source
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct EnrichmentSource {
604    /// Source name
605    pub name: String,
606    /// Source type
607    pub source_type: EnrichmentSourceType,
608    /// Lookup key field
609    pub key_field: String,
610    /// Cache TTL
611    pub cache_ttl: Duration,
612}
613
614/// Enrichment source type
615#[derive(Debug, Clone, Serialize, Deserialize)]
616pub enum EnrichmentSourceType {
617    /// External API
618    ExternalApi { url: String, auth: Option<String> },
619    /// Database
620    Database {
621        connection_string: String,
622        query: String,
623    },
624    /// In-memory cache
625    InMemory {
626        data: HashMap<String, HashMap<String, String>>,
627    },
628    /// Custom source
629    Custom { name: String },
630}
631
632/// Enrichment data
633#[derive(Debug, Clone, Serialize, Deserialize)]
634pub struct EnrichmentData {
635    /// Enriched fields
636    pub fields: HashMap<String, String>,
637    /// Source name
638    pub source: String,
639    /// Timestamp
640    pub timestamp: DateTime<Utc>,
641    /// TTL
642    pub ttl: Duration,
643}
644
645/// Enrichment statistics
646#[derive(Debug, Clone, Default, Serialize, Deserialize)]
647pub struct EnrichmentStats {
648    /// Total enrichments
649    pub total_enrichments: u64,
650    /// Cache hits
651    pub cache_hits: u64,
652    /// Cache misses
653    pub cache_misses: u64,
654    /// Failed enrichments
655    pub failed_enrichments: u64,
656}
657
658/// Pattern detector
659#[derive(Debug, Clone)]
660pub struct PatternDetector {
661    /// Registered patterns
662    pub patterns: HashMap<String, EventPattern>,
663    /// Detection algorithms
664    pub algorithms: HashMap<String, DetectionAlgorithm>,
665    /// Detection statistics
666    pub stats: DetectionStats,
667}
668
669/// Detection algorithm
670#[derive(Debug, Clone, Serialize, Deserialize)]
671pub enum DetectionAlgorithm {
672    /// Naive sequential matching
673    Sequential,
674    /// Automaton-based matching
675    Automaton,
676    /// Tree-based matching
677    Tree,
678    /// Graph-based matching
679    Graph,
680    /// Machine learning based
681    MachineLearning { model_name: String },
682}
683
684/// Detection statistics
685#[derive(Debug, Clone, Default, Serialize, Deserialize)]
686pub struct DetectionStats {
687    /// Total events processed
688    pub total_events_processed: u64,
689    /// Patterns detected
690    pub patterns_detected: u64,
691    /// False positives
692    pub false_positives: u64,
693    /// False negatives
694    pub false_negatives: u64,
695    /// Average detection latency
696    pub avg_detection_latency: Duration,
697    /// Total detection time
698    pub total_detection_time: Duration,
699}
700
701/// CEP metrics
702#[derive(Debug, Clone, Default, Serialize, Deserialize)]
703pub struct CepMetrics {
704    /// Total events processed
705    pub total_events_processed: u64,
706    /// Total patterns detected
707    pub total_patterns_detected: u64,
708    /// Events per second
709    pub events_per_second: f64,
710    /// Patterns per second
711    pub patterns_per_second: f64,
712    /// Average event processing latency
713    pub avg_event_processing_latency: Duration,
714    /// Average pattern matching latency
715    pub avg_pattern_matching_latency: Duration,
716    /// Memory usage (bytes)
717    pub memory_usage_bytes: usize,
718    /// Active partial matches
719    pub active_partial_matches: usize,
720    /// Completed matches in window
721    pub completed_matches: usize,
722    /// Garbage collections performed
723    pub gc_count: u64,
724    /// Last update time
725    pub last_update: DateTime<Utc>,
726}
727
728/// Detected pattern result
729#[derive(Debug, Clone, Serialize, Deserialize)]
730pub struct DetectedPattern {
731    /// Pattern match
732    pub pattern_match: CompleteMatch,
733    /// Triggered rules
734    pub triggered_rules: Vec<String>,
735    /// Correlation results
736    pub correlations: Vec<CorrelationResult>,
737    /// Enriched data
738    pub enrichments: HashMap<String, EnrichmentData>,
739}
740
741impl CepEngine {
742    /// Create a new CEP engine
743    pub fn new(config: CepConfig) -> Result<Self> {
744        Ok(Self {
745            patterns: Arc::new(RwLock::new(HashMap::new())),
746            event_buffers: Arc::new(RwLock::new(HashMap::new())),
747            state_machines: Arc::new(RwLock::new(HashMap::new())),
748            rule_engine: Arc::new(RwLock::new(RuleEngine {
749                rules: HashMap::new(),
750                stats: RuleExecutionStats::default(),
751            })),
752            correlator: Arc::new(RwLock::new(EventCorrelator {
753                correlation_functions: HashMap::new(),
754                correlation_cache: HashMap::new(),
755                stats: CorrelationStats::default(),
756            })),
757            enrichment_service: Arc::new(RwLock::new(EnrichmentService {
758                sources: HashMap::new(),
759                cache: HashMap::new(),
760                stats: EnrichmentStats::default(),
761            })),
762            pattern_detector: Arc::new(RwLock::new(PatternDetector {
763                patterns: HashMap::new(),
764                algorithms: HashMap::new(),
765                stats: DetectionStats::default(),
766            })),
767            metrics: Arc::new(RwLock::new(CepMetrics {
768                last_update: Utc::now(),
769                ..Default::default()
770            })),
771            config,
772            last_gc: Arc::new(RwLock::new(Instant::now())),
773        })
774    }
775
776    /// Register an event pattern
777    pub async fn register_pattern(&mut self, name: &str, pattern: EventPattern) -> Result<()> {
778        let mut patterns = self.patterns.write().await;
779        patterns.insert(name.to_string(), pattern.clone());
780
781        // Initialize state machine for pattern
782        let mut state_machines = self.state_machines.write().await;
783        state_machines.insert(
784            name.to_string(),
785            StateMachine {
786                pattern,
787                state: State::Initial,
788                partial_matches: Vec::new(),
789                completed_matches: Vec::new(),
790                transition_count: 0,
791            },
792        );
793
794        info!("Registered CEP pattern: {}", name);
795        Ok(())
796    }
797
798    /// Register a processing rule
799    pub async fn register_rule(&mut self, rule: ProcessingRule) -> Result<()> {
800        let mut rule_engine = self.rule_engine.write().await;
801        rule_engine.rules.insert(rule.name.clone(), rule.clone());
802        info!("Registered CEP rule: {}", rule.name);
803        Ok(())
804    }
805
806    /// Process an event through the CEP engine
807    pub async fn process_event(&mut self, event: StreamEvent) -> Result<Vec<DetectedPattern>> {
808        let start_time = Instant::now();
809        let event_timestamp = Utc::now();
810
811        // Create timestamped event
812        let timestamped_event = TimestampedEvent {
813            event: event.clone(),
814            timestamp: event_timestamp,
815            id: Uuid::new_v4(),
816        };
817
818        // Add to event buffer
819        self.add_to_buffer("default", timestamped_event.clone())
820            .await?;
821
822        // Run garbage collection if needed
823        self.maybe_run_gc().await?;
824
825        // Detect patterns
826        let detected_patterns = self.detect_patterns(&timestamped_event).await?;
827
828        // Execute rules for detected patterns
829        let mut results = Vec::new();
830        for pattern_match in detected_patterns {
831            let triggered_rules = self.execute_rules(&pattern_match).await?;
832
833            // Correlate events if enabled
834            let correlations = if self.config.enable_correlation {
835                self.correlate_events(&pattern_match).await?
836            } else {
837                Vec::new()
838            };
839
840            // Enrich events if enabled
841            let enrichments = if self.config.enable_enrichment {
842                self.enrich_events(&pattern_match).await?
843            } else {
844                HashMap::new()
845            };
846
847            results.push(DetectedPattern {
848                pattern_match,
849                triggered_rules,
850                correlations,
851                enrichments,
852            });
853        }
854
855        // Update metrics
856        let processing_latency = start_time.elapsed();
857        self.update_metrics(processing_latency, results.len()).await;
858
859        Ok(results)
860    }
861
862    /// Add event to buffer
863    async fn add_to_buffer(&self, stream: &str, event: TimestampedEvent) -> Result<()> {
864        let mut buffers = self.event_buffers.write().await;
865        let buffer = buffers
866            .entry(stream.to_string())
867            .or_insert_with(|| EventBuffer {
868                stream_name: stream.to_string(),
869                events: VecDeque::new(),
870                max_size: self.config.event_buffer_size,
871                oldest_timestamp: None,
872                newest_timestamp: None,
873            });
874
875        // Update timestamps
876        if buffer.oldest_timestamp.is_none() {
877            buffer.oldest_timestamp = Some(event.timestamp);
878        }
879        buffer.newest_timestamp = Some(event.timestamp);
880
881        // Add event
882        buffer.events.push_back(event);
883
884        // Trim buffer if too large
885        while buffer.events.len() > buffer.max_size {
886            buffer.events.pop_front();
887            if let Some(first_event) = buffer.events.front() {
888                buffer.oldest_timestamp = Some(first_event.timestamp);
889            }
890        }
891
892        Ok(())
893    }
894
895    /// Detect patterns in recent events
896    async fn detect_patterns(&self, new_event: &TimestampedEvent) -> Result<Vec<CompleteMatch>> {
897        let mut detected = Vec::new();
898        let mut state_machines = self.state_machines.write().await;
899
900        for (pattern_name, state_machine) in state_machines.iter_mut() {
901            // Try to match pattern
902            if let Some(complete_match) = self.try_match_pattern(state_machine, new_event).await? {
903                detected.push(complete_match);
904                debug!("Pattern detected: {}", pattern_name);
905            }
906        }
907
908        Ok(detected)
909    }
910
911    /// Try to match a pattern
912    async fn try_match_pattern(
913        &self,
914        state_machine: &mut StateMachine,
915        event: &TimestampedEvent,
916    ) -> Result<Option<CompleteMatch>> {
917        // Clone pattern to avoid borrow issues
918        let pattern = state_machine.pattern.clone();
919
920        match &pattern {
921            EventPattern::Simple { predicates, .. } => {
922                if self.evaluate_predicates(predicates, &event.event).await? {
923                    Ok(Some(CompleteMatch {
924                        id: Uuid::new_v4(),
925                        pattern_name: pattern.name().to_string(),
926                        event_ids: vec![event.id],
927                        start_time: event.timestamp,
928                        end_time: event.timestamp,
929                        duration: Duration::from_secs(0),
930                        confidence: 1.0,
931                        metadata: HashMap::new(),
932                    }))
933                } else {
934                    Ok(None)
935                }
936            }
937            EventPattern::Sequence {
938                patterns,
939                time_window,
940                strict,
941                ..
942            } => {
943                self.match_sequence(state_machine, event, patterns, *time_window, *strict)
944                    .await
945            }
946            EventPattern::And {
947                patterns,
948                time_window,
949                ..
950            } => {
951                self.match_conjunction(state_machine, event, patterns, *time_window)
952                    .await
953            }
954            _ => {
955                // Other pattern types (to be implemented)
956                Ok(None)
957            }
958        }
959    }
960
961    /// Match sequence pattern
962    async fn match_sequence(
963        &self,
964        state_machine: &mut StateMachine,
965        event: &TimestampedEvent,
966        patterns: &[EventPattern],
967        time_window: Option<Duration>,
968        _strict: bool,
969    ) -> Result<Option<CompleteMatch>> {
970        // Update partial matches
971        let mut new_partial_matches = Vec::new();
972
973        for partial_match in &mut state_machine.partial_matches {
974            let next_stage = partial_match.stage;
975            if next_stage < patterns.len() {
976                if let EventPattern::Simple { predicates, .. } = &patterns[next_stage] {
977                    if self.evaluate_predicates(predicates, &event.event).await? {
978                        // Check time window
979                        if let Some(window) = time_window {
980                            let elapsed = event
981                                .timestamp
982                                .signed_duration_since(partial_match.start_time);
983                            if elapsed.num_seconds() > window.as_secs() as i64 {
984                                continue; // Expired
985                            }
986                        }
987
988                        // Advance match
989                        let mut new_match = partial_match.clone();
990                        new_match.events.push(event.clone());
991                        new_match.stage += 1;
992                        new_match.last_update = event.timestamp;
993
994                        if new_match.stage == patterns.len() {
995                            // Complete match!
996                            let event_ids: Vec<Uuid> =
997                                new_match.events.iter().map(|e| e.id).collect();
998                            let duration = event
999                                .timestamp
1000                                .signed_duration_since(new_match.start_time)
1001                                .to_std()
1002                                .unwrap_or(Duration::from_secs(0));
1003
1004                            return Ok(Some(CompleteMatch {
1005                                id: Uuid::new_v4(),
1006                                pattern_name: state_machine.pattern.name().to_string(),
1007                                event_ids,
1008                                start_time: new_match.start_time,
1009                                end_time: event.timestamp,
1010                                duration,
1011                                confidence: 1.0,
1012                                metadata: HashMap::new(),
1013                            }));
1014                        } else {
1015                            new_partial_matches.push(new_match);
1016                        }
1017                    }
1018                }
1019            }
1020        }
1021
1022        // Check if this event starts a new partial match
1023        if let EventPattern::Simple { predicates, .. } = &patterns[0] {
1024            if self.evaluate_predicates(predicates, &event.event).await? {
1025                new_partial_matches.push(PartialMatch {
1026                    id: Uuid::new_v4(),
1027                    events: vec![event.clone()],
1028                    stage: 1,
1029                    start_time: event.timestamp,
1030                    last_update: event.timestamp,
1031                    state: HashMap::new(),
1032                });
1033            }
1034        }
1035
1036        state_machine.partial_matches = new_partial_matches;
1037        Ok(None)
1038    }
1039
1040    /// Match conjunction pattern
1041    async fn match_conjunction(
1042        &self,
1043        _state_machine: &mut StateMachine,
1044        _event: &TimestampedEvent,
1045        _patterns: &[EventPattern],
1046        _time_window: Option<Duration>,
1047    ) -> Result<Option<CompleteMatch>> {
1048        // Simplified implementation
1049        Ok(None)
1050    }
1051
1052    /// Evaluate predicates against an event
1053    async fn evaluate_predicates(
1054        &self,
1055        predicates: &[FieldPredicate],
1056        event: &StreamEvent,
1057    ) -> Result<bool> {
1058        for predicate in predicates {
1059            match predicate {
1060                FieldPredicate::Equals { field, value } => {
1061                    // Extract field from event (simplified)
1062                    if field == "event_type" {
1063                        let event_type = match event {
1064                            StreamEvent::TripleAdded { .. } => "TripleAdded",
1065                            StreamEvent::TripleRemoved { .. } => "TripleRemoved",
1066                            StreamEvent::QuadAdded { .. } => "QuadAdded",
1067                            StreamEvent::QuadRemoved { .. } => "QuadRemoved",
1068                            StreamEvent::GraphCreated { .. } => "GraphCreated",
1069                            StreamEvent::GraphCleared { .. } => "GraphCleared",
1070                            StreamEvent::GraphDeleted { .. } => "GraphDeleted",
1071                            StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
1072                            StreamEvent::TransactionBegin { .. } => "TransactionBegin",
1073                            StreamEvent::TransactionCommit { .. } => "TransactionCommit",
1074                            StreamEvent::TransactionAbort { .. } => "TransactionAbort",
1075                            StreamEvent::SchemaChanged { .. } => "SchemaChanged",
1076                            StreamEvent::Heartbeat { .. } => "Heartbeat",
1077                            _ => "Other", // Catch-all for other event types
1078                        };
1079                        if event_type != value {
1080                            return Ok(false);
1081                        }
1082                    }
1083                }
1084                FieldPredicate::Contains { field, substring } => {
1085                    // Simplified implementation
1086                    if field == "source" {
1087                        let source = match event {
1088                            StreamEvent::Heartbeat { source, .. } => source,
1089                            _ => return Ok(false),
1090                        };
1091                        if !source.contains(substring) {
1092                            return Ok(false);
1093                        }
1094                    }
1095                }
1096                _ => {
1097                    // Other predicates (simplified)
1098                }
1099            }
1100        }
1101        Ok(true)
1102    }
1103
1104    /// Execute rules for a pattern match
1105    async fn execute_rules(&self, pattern_match: &CompleteMatch) -> Result<Vec<String>> {
1106        let mut triggered = Vec::new();
1107
1108        // Clone rules to avoid borrow issues
1109        let rules = {
1110            let rule_engine = self.rule_engine.read().await;
1111            rule_engine.rules.clone()
1112        };
1113
1114        for (rule_name, rule) in &rules {
1115            if !rule.enabled {
1116                continue;
1117            }
1118
1119            // Check if rule condition matches
1120            if self
1121                .evaluate_rule_condition(&rule.condition, pattern_match)
1122                .await?
1123            {
1124                // Execute actions
1125                for action in &rule.actions {
1126                    self.execute_rule_action(action).await?;
1127                }
1128                triggered.push(rule_name.clone());
1129
1130                // Update stats
1131                let mut rule_engine = self.rule_engine.write().await;
1132                rule_engine.stats.successful_executions += 1;
1133            }
1134        }
1135
1136        Ok(triggered)
1137    }
1138
1139    /// Evaluate rule condition
1140    async fn evaluate_rule_condition(
1141        &self,
1142        condition: &RuleCondition,
1143        pattern_match: &CompleteMatch,
1144    ) -> Result<bool> {
1145        match condition {
1146            RuleCondition::PatternMatched { pattern } => Ok(&pattern_match.pattern_name == pattern),
1147            _ => {
1148                // Other conditions (simplified)
1149                Ok(false)
1150            }
1151        }
1152    }
1153
1154    /// Execute rule action
1155    async fn execute_rule_action(&self, action: &RuleAction) -> Result<()> {
1156        match action {
1157            RuleAction::SendAlert { severity, message } => {
1158                info!("CEP Alert [{}]: {}", severity, message);
1159            }
1160            RuleAction::EmitEvent { event_type, data } => {
1161                debug!("CEP Emit Event: {} with data: {:?}", event_type, data);
1162            }
1163            _ => {
1164                // Other actions (simplified)
1165            }
1166        }
1167        Ok(())
1168    }
1169
1170    /// Correlate events
1171    async fn correlate_events(
1172        &self,
1173        _pattern_match: &CompleteMatch,
1174    ) -> Result<Vec<CorrelationResult>> {
1175        // Simplified implementation
1176        Ok(Vec::new())
1177    }
1178
1179    /// Enrich events
1180    async fn enrich_events(
1181        &self,
1182        _pattern_match: &CompleteMatch,
1183    ) -> Result<HashMap<String, EnrichmentData>> {
1184        // Simplified implementation
1185        Ok(HashMap::new())
1186    }
1187
1188    /// Run garbage collection if needed
1189    async fn maybe_run_gc(&self) -> Result<()> {
1190        let mut last_gc = self.last_gc.write().await;
1191        if last_gc.elapsed() >= self.config.gc_interval {
1192            self.run_gc().await?;
1193            *last_gc = Instant::now();
1194
1195            let mut metrics = self.metrics.write().await;
1196            metrics.gc_count += 1;
1197        }
1198        Ok(())
1199    }
1200
1201    /// Run garbage collection
1202    async fn run_gc(&self) -> Result<()> {
1203        let cutoff_time =
1204            Utc::now() - ChronoDuration::seconds(self.config.max_time_window.as_secs() as i64);
1205
1206        // Clean event buffers
1207        let mut buffers = self.event_buffers.write().await;
1208        for buffer in buffers.values_mut() {
1209            buffer.events.retain(|e| e.timestamp > cutoff_time);
1210            if let Some(first_event) = buffer.events.front() {
1211                buffer.oldest_timestamp = Some(first_event.timestamp);
1212            }
1213        }
1214
1215        // Clean partial matches
1216        let mut state_machines = self.state_machines.write().await;
1217        for state_machine in state_machines.values_mut() {
1218            state_machine
1219                .partial_matches
1220                .retain(|m| m.last_update > cutoff_time);
1221        }
1222
1223        debug!("CEP garbage collection completed");
1224        Ok(())
1225    }
1226
1227    /// Update metrics
1228    async fn update_metrics(&self, processing_latency: Duration, patterns_detected: usize) {
1229        let mut metrics = self.metrics.write().await;
1230        metrics.total_events_processed += 1;
1231        metrics.total_patterns_detected += patterns_detected as u64;
1232
1233        let now = Utc::now();
1234        let elapsed_duration = now.signed_duration_since(metrics.last_update);
1235        let elapsed_secs = elapsed_duration.num_seconds() as f64;
1236
1237        if elapsed_secs > 0.0 {
1238            metrics.events_per_second = metrics.total_events_processed as f64 / elapsed_secs;
1239            metrics.patterns_per_second = metrics.total_patterns_detected as f64 / elapsed_secs;
1240        }
1241
1242        // Update average latency
1243        let total_latency = metrics.avg_event_processing_latency.as_micros()
1244            * (metrics.total_events_processed - 1) as u128
1245            + processing_latency.as_micros();
1246        metrics.avg_event_processing_latency =
1247            Duration::from_micros((total_latency / metrics.total_events_processed as u128) as u64);
1248
1249        // Count active partial matches
1250        let state_machines = self.state_machines.read().await;
1251        metrics.active_partial_matches = state_machines
1252            .values()
1253            .map(|sm| sm.partial_matches.len())
1254            .sum();
1255    }
1256
1257    /// Get current metrics
1258    pub async fn get_metrics(&self) -> CepMetrics {
1259        self.metrics.read().await.clone()
1260    }
1261
1262    /// Get statistics
1263    pub async fn get_statistics(&self) -> CepStatistics {
1264        let metrics = self.metrics.read().await;
1265        let rule_engine = self.rule_engine.read().await;
1266        let correlator = self.correlator.read().await;
1267        let enrichment = self.enrichment_service.read().await;
1268        let detector = self.pattern_detector.read().await;
1269
1270        CepStatistics {
1271            metrics: metrics.clone(),
1272            rule_stats: rule_engine.stats.clone(),
1273            correlation_stats: correlator.stats.clone(),
1274            enrichment_stats: enrichment.stats.clone(),
1275            detection_stats: detector.stats.clone(),
1276        }
1277    }
1278}
1279
1280/// CEP statistics
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1282pub struct CepStatistics {
1283    /// CEP metrics
1284    pub metrics: CepMetrics,
1285    /// Rule execution statistics
1286    pub rule_stats: RuleExecutionStats,
1287    /// Correlation statistics
1288    pub correlation_stats: CorrelationStats,
1289    /// Enrichment statistics
1290    pub enrichment_stats: EnrichmentStats,
1291    /// Detection statistics
1292    pub detection_stats: DetectionStats,
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297    use super::*;
1298    use crate::event::EventMetadata;
1299
1300    #[tokio::test]
1301    async fn test_cep_engine_creation() {
1302        let config = CepConfig::default();
1303        let engine = CepEngine::new(config);
1304        assert!(engine.is_ok());
1305    }
1306
1307    #[tokio::test]
1308    async fn test_pattern_registration() {
1309        let config = CepConfig::default();
1310        let mut engine = CepEngine::new(config).unwrap();
1311
1312        let pattern = EventPattern::simple("event_type", "test");
1313        let result = engine.register_pattern("test_pattern", pattern).await;
1314        assert!(result.is_ok());
1315
1316        let patterns = engine.patterns.read().await;
1317        assert!(patterns.contains_key("test_pattern"));
1318    }
1319
1320    #[tokio::test]
1321    async fn test_simple_pattern_matching() {
1322        let config = CepConfig::default();
1323        let mut engine = CepEngine::new(config).unwrap();
1324
1325        let pattern = EventPattern::simple("event_type", "Heartbeat");
1326        engine.register_pattern("heartbeat", pattern).await.unwrap();
1327
1328        let event = StreamEvent::Heartbeat {
1329            timestamp: Utc::now(),
1330            source: "test".to_string(),
1331            metadata: EventMetadata::default(),
1332        };
1333
1334        let detected = engine.process_event(event).await.unwrap();
1335        assert!(!detected.is_empty());
1336    }
1337
1338    #[tokio::test]
1339    async fn test_sequence_pattern() {
1340        let config = CepConfig::default();
1341        let mut engine = CepEngine::new(config).unwrap();
1342
1343        let pattern = EventPattern::sequence(vec![
1344            EventPattern::simple("event_type", "Heartbeat"),
1345            EventPattern::simple("event_type", "Heartbeat"),
1346        ])
1347        .with_time_window(Duration::from_secs(10));
1348
1349        engine
1350            .register_pattern("double_heartbeat", pattern)
1351            .await
1352            .unwrap();
1353
1354        let event1 = StreamEvent::Heartbeat {
1355            timestamp: Utc::now(),
1356            source: "test".to_string(),
1357            metadata: EventMetadata::default(),
1358        };
1359
1360        let detected1 = engine.process_event(event1).await.unwrap();
1361        assert!(detected1.is_empty()); // First event, no match yet
1362
1363        let event2 = StreamEvent::Heartbeat {
1364            timestamp: Utc::now(),
1365            source: "test".to_string(),
1366            metadata: EventMetadata::default(),
1367        };
1368
1369        let detected2 = engine.process_event(event2).await.unwrap();
1370        assert!(!detected2.is_empty()); // Second event, pattern matched
1371    }
1372
1373    #[tokio::test]
1374    async fn test_rule_registration() {
1375        let config = CepConfig::default();
1376        let mut engine = CepEngine::new(config).unwrap();
1377
1378        let rule = ProcessingRule {
1379            name: "test_rule".to_string(),
1380            condition: RuleCondition::PatternMatched {
1381                pattern: "heartbeat".to_string(),
1382            },
1383            actions: vec![RuleAction::SendAlert {
1384                severity: "info".to_string(),
1385                message: "Heartbeat detected".to_string(),
1386            }],
1387            priority: 1,
1388            enabled: true,
1389        };
1390
1391        let result = engine.register_rule(rule).await;
1392        assert!(result.is_ok());
1393    }
1394
1395    #[tokio::test]
1396    async fn test_event_buffer() {
1397        let config = CepConfig::default();
1398        let engine = CepEngine::new(config).unwrap();
1399
1400        let event = StreamEvent::Heartbeat {
1401            timestamp: Utc::now(),
1402            source: "test".to_string(),
1403            metadata: EventMetadata::default(),
1404        };
1405
1406        let timestamped = TimestampedEvent {
1407            event,
1408            timestamp: Utc::now(),
1409            id: Uuid::new_v4(),
1410        };
1411
1412        engine
1413            .add_to_buffer("test_stream", timestamped)
1414            .await
1415            .unwrap();
1416
1417        let buffers = engine.event_buffers.read().await;
1418        assert!(buffers.contains_key("test_stream"));
1419        assert_eq!(buffers.get("test_stream").unwrap().events.len(), 1);
1420    }
1421
1422    #[tokio::test]
1423    async fn test_predicate_evaluation() {
1424        let config = CepConfig::default();
1425        let engine = CepEngine::new(config).unwrap();
1426
1427        let predicates = vec![FieldPredicate::Equals {
1428            field: "event_type".to_string(),
1429            value: "Heartbeat".to_string(),
1430        }];
1431
1432        let event = StreamEvent::Heartbeat {
1433            timestamp: Utc::now(),
1434            source: "test".to_string(),
1435            metadata: EventMetadata::default(),
1436        };
1437
1438        let result = engine
1439            .evaluate_predicates(&predicates, &event)
1440            .await
1441            .unwrap();
1442        assert!(result);
1443    }
1444
1445    #[tokio::test]
1446    async fn test_metrics_collection() {
1447        let config = CepConfig::default();
1448        let mut engine = CepEngine::new(config).unwrap();
1449
1450        let event = StreamEvent::Heartbeat {
1451            timestamp: Utc::now(),
1452            source: "test".to_string(),
1453            metadata: EventMetadata::default(),
1454        };
1455
1456        engine.process_event(event).await.unwrap();
1457
1458        let metrics = engine.get_metrics().await;
1459        assert_eq!(metrics.total_events_processed, 1);
1460    }
1461
1462    #[tokio::test]
1463    async fn test_garbage_collection() {
1464        let config = CepConfig {
1465            gc_interval: Duration::from_millis(10),
1466            ..Default::default()
1467        };
1468        let engine = CepEngine::new(config).unwrap();
1469
1470        // Add old event
1471        let old_event = TimestampedEvent {
1472            event: StreamEvent::Heartbeat {
1473                timestamp: Utc::now(),
1474                source: "test".to_string(),
1475                metadata: EventMetadata::default(),
1476            },
1477            timestamp: Utc::now() - ChronoDuration::hours(2),
1478            id: Uuid::new_v4(),
1479        };
1480
1481        engine.add_to_buffer("test", old_event).await.unwrap();
1482
1483        // Wait for GC
1484        tokio::time::sleep(Duration::from_millis(20)).await;
1485
1486        // Run GC
1487        engine.run_gc().await.unwrap();
1488
1489        let buffers = engine.event_buffers.read().await;
1490        assert!(buffers.get("test").unwrap().events.is_empty());
1491    }
1492
1493    #[tokio::test]
1494    async fn test_pattern_with_time_window() {
1495        let pattern = EventPattern::sequence(vec![
1496            EventPattern::simple("type", "A"),
1497            EventPattern::simple("type", "B"),
1498        ])
1499        .with_time_window(Duration::from_secs(5));
1500
1501        match pattern {
1502            EventPattern::Sequence { time_window, .. } => {
1503                assert_eq!(time_window, Some(Duration::from_secs(5)));
1504            }
1505            _ => panic!("Expected sequence pattern"),
1506        }
1507    }
1508
1509    #[tokio::test]
1510    async fn test_statistics() {
1511        let config = CepConfig::default();
1512        let engine = CepEngine::new(config).unwrap();
1513
1514        let stats = engine.get_statistics().await;
1515        assert_eq!(stats.metrics.total_events_processed, 0);
1516    }
1517}