1use crate::event::StreamEvent;
50use anyhow::Result;
51use chrono::{DateTime, Duration as ChronoDuration, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, VecDeque};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56use tokio::sync::RwLock;
57use tracing::{debug, info};
58use uuid::Uuid;
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CepConfig {
63 pub max_events_in_memory: usize,
65
66 pub max_time_window: Duration,
68
69 pub enable_correlation: bool,
71
72 pub enable_state_machines: bool,
74
75 pub enable_rules: bool,
77
78 pub enable_enrichment: bool,
80
81 pub max_pattern_depth: usize,
83
84 pub pattern_matching_timeout: Duration,
86
87 pub event_buffer_size: usize,
89
90 pub collect_metrics: bool,
92
93 pub gc_interval: Duration,
95
96 pub enable_distributed: bool,
98
99 pub num_partitions: usize,
101}
102
103impl Default for CepConfig {
104 fn default() -> Self {
105 Self {
106 max_events_in_memory: 100000,
107 max_time_window: Duration::from_secs(3600),
108 enable_correlation: true,
109 enable_state_machines: true,
110 enable_rules: true,
111 enable_enrichment: true,
112 max_pattern_depth: 10,
113 pattern_matching_timeout: Duration::from_millis(100),
114 event_buffer_size: 10000,
115 collect_metrics: true,
116 gc_interval: Duration::from_secs(60),
117 enable_distributed: false,
118 num_partitions: 8,
119 }
120 }
121}
122
123pub struct CepEngine {
125 patterns: Arc<RwLock<HashMap<String, EventPattern>>>,
127
128 event_buffers: Arc<RwLock<HashMap<String, EventBuffer>>>,
130
131 state_machines: Arc<RwLock<HashMap<String, StateMachine>>>,
133
134 rule_engine: Arc<RwLock<RuleEngine>>,
136
137 correlator: Arc<RwLock<EventCorrelator>>,
139
140 enrichment_service: Arc<RwLock<EnrichmentService>>,
142
143 pattern_detector: Arc<RwLock<PatternDetector>>,
145
146 metrics: Arc<RwLock<CepMetrics>>,
148
149 config: CepConfig,
151
152 last_gc: Arc<RwLock<Instant>>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub enum EventPattern {
159 Simple {
161 name: String,
163 predicates: Vec<FieldPredicate>,
165 },
166
167 Sequence {
169 name: String,
171 patterns: Vec<EventPattern>,
173 time_window: Option<Duration>,
175 strict: bool,
177 },
178
179 And {
181 name: String,
183 patterns: Vec<EventPattern>,
185 time_window: Option<Duration>,
187 },
188
189 Or {
191 name: String,
193 patterns: Vec<EventPattern>,
195 },
196
197 Not {
199 name: String,
201 pattern: Box<EventPattern>,
203 time_window: Duration,
205 },
206
207 Repeat {
209 name: String,
211 pattern: Box<EventPattern>,
213 min_count: usize,
215 max_count: Option<usize>,
217 time_window: Option<Duration>,
219 },
220
221 Temporal {
223 name: String,
225 first: Box<EventPattern>,
227 operator: TemporalOperator,
229 second: Box<EventPattern>,
231 tolerance: Option<Duration>,
233 },
234
235 Aggregation {
237 name: String,
239 pattern: Box<EventPattern>,
241 aggregation: CepAggregationFunction,
243 window: Duration,
245 threshold: f64,
247 },
248}
249
250impl EventPattern {
251 pub fn simple(field: &str, value: &str) -> Self {
253 EventPattern::Simple {
254 name: format!("{}={}", field, value),
255 predicates: vec![FieldPredicate::Equals {
256 field: field.to_string(),
257 value: value.to_string(),
258 }],
259 }
260 }
261
262 pub fn sequence(patterns: Vec<EventPattern>) -> Self {
264 EventPattern::Sequence {
265 name: "sequence".to_string(),
266 patterns,
267 time_window: None,
268 strict: false,
269 }
270 }
271
272 pub fn with_time_window(mut self, window: Duration) -> Self {
274 match &mut self {
275 EventPattern::Sequence { time_window, .. } => *time_window = Some(window),
276 EventPattern::And { time_window, .. } => *time_window = Some(window),
277 EventPattern::Repeat { time_window, .. } => *time_window = Some(window),
278 _ => {}
279 }
280 self
281 }
282
283 pub fn name(&self) -> &str {
285 match self {
286 EventPattern::Simple { name, .. } => name,
287 EventPattern::Sequence { name, .. } => name,
288 EventPattern::And { name, .. } => name,
289 EventPattern::Or { name, .. } => name,
290 EventPattern::Not { name, .. } => name,
291 EventPattern::Repeat { name, .. } => name,
292 EventPattern::Temporal { name, .. } => name,
293 EventPattern::Aggregation { name, .. } => name,
294 }
295 }
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
300pub enum FieldPredicate {
301 Equals { field: String, value: String },
303 NotEquals { field: String, value: String },
305 Contains { field: String, substring: String },
307 Regex { field: String, pattern: String },
309 GreaterThan { field: String, value: f64 },
311 LessThan { field: String, value: f64 },
313 InRange { field: String, min: f64, max: f64 },
315 Exists { field: String },
317 Custom { name: String },
319}
320
321#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
323pub enum TemporalOperator {
324 Before,
326 After,
328 Meets,
330 During,
332 Overlaps,
334 Starts,
336 Finishes,
338 Equals,
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize)]
344pub enum CepAggregationFunction {
345 Count,
347 Sum { field: String },
349 Average { field: String },
351 Min { field: String },
353 Max { field: String },
355 StdDev { field: String },
357 Percentile { field: String, percentile: f64 },
359 Custom { name: String },
361}
362
363#[derive(Debug, Clone)]
365pub struct EventBuffer {
366 pub stream_name: String,
368 pub events: VecDeque<TimestampedEvent>,
370 pub max_size: usize,
372 pub oldest_timestamp: Option<DateTime<Utc>>,
374 pub newest_timestamp: Option<DateTime<Utc>>,
376}
377
378#[derive(Debug, Clone)]
380pub struct TimestampedEvent {
381 pub event: StreamEvent,
383 pub timestamp: DateTime<Utc>,
385 pub id: Uuid,
387}
388
389#[derive(Debug, Clone)]
391pub struct StateMachine {
392 pub pattern: EventPattern,
394 pub state: State,
396 pub partial_matches: Vec<PartialMatch>,
398 pub completed_matches: Vec<CompleteMatch>,
400 pub transition_count: usize,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
406pub enum State {
407 Initial,
409 Intermediate { stage: usize },
411 Final,
413 Error,
415}
416
417#[derive(Debug, Clone)]
419pub struct PartialMatch {
420 pub id: Uuid,
422 pub events: Vec<TimestampedEvent>,
424 pub stage: usize,
426 pub start_time: DateTime<Utc>,
428 pub last_update: DateTime<Utc>,
430 pub state: HashMap<String, String>,
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct CompleteMatch {
437 pub id: Uuid,
439 pub pattern_name: String,
441 pub event_ids: Vec<Uuid>,
443 pub start_time: DateTime<Utc>,
445 pub end_time: DateTime<Utc>,
447 pub duration: Duration,
449 pub confidence: f64,
451 pub metadata: HashMap<String, String>,
453}
454
455#[derive(Debug, Clone)]
457pub struct RuleEngine {
458 pub rules: HashMap<String, ProcessingRule>,
460 pub stats: RuleExecutionStats,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct ProcessingRule {
467 pub name: String,
469 pub condition: RuleCondition,
471 pub actions: Vec<RuleAction>,
473 pub priority: i32,
475 pub enabled: bool,
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
481pub enum RuleCondition {
482 PatternMatched { pattern: String },
484 FieldCondition { predicate: FieldPredicate },
486 ThresholdExceeded { metric: String, threshold: f64 },
488 Complex {
490 operator: String,
491 conditions: Vec<RuleCondition>,
492 },
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
497pub enum RuleAction {
498 EmitEvent {
500 event_type: String,
501 data: HashMap<String, String>,
502 },
503 SendAlert { severity: String, message: String },
505 UpdateState { key: String, value: String },
507 Webhook { url: String, method: String },
509 Custom {
511 name: String,
512 params: HashMap<String, String>,
513 },
514}
515
516#[derive(Debug, Clone, Default, Serialize, Deserialize)]
518pub struct RuleExecutionStats {
519 pub total_executions: u64,
521 pub successful_executions: u64,
523 pub failed_executions: u64,
525 pub total_execution_time: Duration,
527 pub avg_execution_time: Duration,
529}
530
531#[derive(Debug, Clone)]
533pub struct EventCorrelator {
534 pub correlation_functions: HashMap<String, CorrelationFunction>,
536 pub correlation_cache: HashMap<CorrelationKey, CorrelationResult>,
538 pub stats: CorrelationStats,
540}
541
542#[derive(Debug, Clone)]
544pub struct CorrelationFunction {
545 pub name: String,
547 pub time_window: Duration,
549 pub fields: Vec<String>,
551 pub threshold: f64,
553}
554
555#[derive(Debug, Clone, Hash, PartialEq, Eq)]
557pub struct CorrelationKey {
558 pub event1: Uuid,
560 pub event2: Uuid,
562 pub function: String,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct CorrelationResult {
569 pub score: f64,
571 pub correlated_fields: Vec<String>,
573 pub timestamp: DateTime<Utc>,
575}
576
577#[derive(Debug, Clone, Default, Serialize, Deserialize)]
579pub struct CorrelationStats {
580 pub total_correlations: u64,
582 pub cache_hits: u64,
584 pub cache_misses: u64,
586 pub avg_correlation_score: f64,
588}
589
590#[derive(Debug, Clone)]
592pub struct EnrichmentService {
593 pub sources: HashMap<String, EnrichmentSource>,
595 pub cache: HashMap<String, EnrichmentData>,
597 pub stats: EnrichmentStats,
599}
600
601#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct EnrichmentSource {
604 pub name: String,
606 pub source_type: EnrichmentSourceType,
608 pub key_field: String,
610 pub cache_ttl: Duration,
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
616pub enum EnrichmentSourceType {
617 ExternalApi { url: String, auth: Option<String> },
619 Database {
621 connection_string: String,
622 query: String,
623 },
624 InMemory {
626 data: HashMap<String, HashMap<String, String>>,
627 },
628 Custom { name: String },
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
634pub struct EnrichmentData {
635 pub fields: HashMap<String, String>,
637 pub source: String,
639 pub timestamp: DateTime<Utc>,
641 pub ttl: Duration,
643}
644
645#[derive(Debug, Clone, Default, Serialize, Deserialize)]
647pub struct EnrichmentStats {
648 pub total_enrichments: u64,
650 pub cache_hits: u64,
652 pub cache_misses: u64,
654 pub failed_enrichments: u64,
656}
657
658#[derive(Debug, Clone)]
660pub struct PatternDetector {
661 pub patterns: HashMap<String, EventPattern>,
663 pub algorithms: HashMap<String, DetectionAlgorithm>,
665 pub stats: DetectionStats,
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
671pub enum DetectionAlgorithm {
672 Sequential,
674 Automaton,
676 Tree,
678 Graph,
680 MachineLearning { model_name: String },
682}
683
684#[derive(Debug, Clone, Default, Serialize, Deserialize)]
686pub struct DetectionStats {
687 pub total_events_processed: u64,
689 pub patterns_detected: u64,
691 pub false_positives: u64,
693 pub false_negatives: u64,
695 pub avg_detection_latency: Duration,
697 pub total_detection_time: Duration,
699}
700
701#[derive(Debug, Clone, Default, Serialize, Deserialize)]
703pub struct CepMetrics {
704 pub total_events_processed: u64,
706 pub total_patterns_detected: u64,
708 pub events_per_second: f64,
710 pub patterns_per_second: f64,
712 pub avg_event_processing_latency: Duration,
714 pub avg_pattern_matching_latency: Duration,
716 pub memory_usage_bytes: usize,
718 pub active_partial_matches: usize,
720 pub completed_matches: usize,
722 pub gc_count: u64,
724 pub last_update: DateTime<Utc>,
726}
727
728#[derive(Debug, Clone, Serialize, Deserialize)]
730pub struct DetectedPattern {
731 pub pattern_match: CompleteMatch,
733 pub triggered_rules: Vec<String>,
735 pub correlations: Vec<CorrelationResult>,
737 pub enrichments: HashMap<String, EnrichmentData>,
739}
740
741impl CepEngine {
742 pub fn new(config: CepConfig) -> Result<Self> {
744 Ok(Self {
745 patterns: Arc::new(RwLock::new(HashMap::new())),
746 event_buffers: Arc::new(RwLock::new(HashMap::new())),
747 state_machines: Arc::new(RwLock::new(HashMap::new())),
748 rule_engine: Arc::new(RwLock::new(RuleEngine {
749 rules: HashMap::new(),
750 stats: RuleExecutionStats::default(),
751 })),
752 correlator: Arc::new(RwLock::new(EventCorrelator {
753 correlation_functions: HashMap::new(),
754 correlation_cache: HashMap::new(),
755 stats: CorrelationStats::default(),
756 })),
757 enrichment_service: Arc::new(RwLock::new(EnrichmentService {
758 sources: HashMap::new(),
759 cache: HashMap::new(),
760 stats: EnrichmentStats::default(),
761 })),
762 pattern_detector: Arc::new(RwLock::new(PatternDetector {
763 patterns: HashMap::new(),
764 algorithms: HashMap::new(),
765 stats: DetectionStats::default(),
766 })),
767 metrics: Arc::new(RwLock::new(CepMetrics {
768 last_update: Utc::now(),
769 ..Default::default()
770 })),
771 config,
772 last_gc: Arc::new(RwLock::new(Instant::now())),
773 })
774 }
775
776 pub async fn register_pattern(&mut self, name: &str, pattern: EventPattern) -> Result<()> {
778 let mut patterns = self.patterns.write().await;
779 patterns.insert(name.to_string(), pattern.clone());
780
781 let mut state_machines = self.state_machines.write().await;
783 state_machines.insert(
784 name.to_string(),
785 StateMachine {
786 pattern,
787 state: State::Initial,
788 partial_matches: Vec::new(),
789 completed_matches: Vec::new(),
790 transition_count: 0,
791 },
792 );
793
794 info!("Registered CEP pattern: {}", name);
795 Ok(())
796 }
797
798 pub async fn register_rule(&mut self, rule: ProcessingRule) -> Result<()> {
800 let mut rule_engine = self.rule_engine.write().await;
801 rule_engine.rules.insert(rule.name.clone(), rule.clone());
802 info!("Registered CEP rule: {}", rule.name);
803 Ok(())
804 }
805
806 pub async fn process_event(&mut self, event: StreamEvent) -> Result<Vec<DetectedPattern>> {
808 let start_time = Instant::now();
809 let event_timestamp = Utc::now();
810
811 let timestamped_event = TimestampedEvent {
813 event: event.clone(),
814 timestamp: event_timestamp,
815 id: Uuid::new_v4(),
816 };
817
818 self.add_to_buffer("default", timestamped_event.clone())
820 .await?;
821
822 self.maybe_run_gc().await?;
824
825 let detected_patterns = self.detect_patterns(×tamped_event).await?;
827
828 let mut results = Vec::new();
830 for pattern_match in detected_patterns {
831 let triggered_rules = self.execute_rules(&pattern_match).await?;
832
833 let correlations = if self.config.enable_correlation {
835 self.correlate_events(&pattern_match).await?
836 } else {
837 Vec::new()
838 };
839
840 let enrichments = if self.config.enable_enrichment {
842 self.enrich_events(&pattern_match).await?
843 } else {
844 HashMap::new()
845 };
846
847 results.push(DetectedPattern {
848 pattern_match,
849 triggered_rules,
850 correlations,
851 enrichments,
852 });
853 }
854
855 let processing_latency = start_time.elapsed();
857 self.update_metrics(processing_latency, results.len()).await;
858
859 Ok(results)
860 }
861
862 async fn add_to_buffer(&self, stream: &str, event: TimestampedEvent) -> Result<()> {
864 let mut buffers = self.event_buffers.write().await;
865 let buffer = buffers
866 .entry(stream.to_string())
867 .or_insert_with(|| EventBuffer {
868 stream_name: stream.to_string(),
869 events: VecDeque::new(),
870 max_size: self.config.event_buffer_size,
871 oldest_timestamp: None,
872 newest_timestamp: None,
873 });
874
875 if buffer.oldest_timestamp.is_none() {
877 buffer.oldest_timestamp = Some(event.timestamp);
878 }
879 buffer.newest_timestamp = Some(event.timestamp);
880
881 buffer.events.push_back(event);
883
884 while buffer.events.len() > buffer.max_size {
886 buffer.events.pop_front();
887 if let Some(first_event) = buffer.events.front() {
888 buffer.oldest_timestamp = Some(first_event.timestamp);
889 }
890 }
891
892 Ok(())
893 }
894
895 async fn detect_patterns(&self, new_event: &TimestampedEvent) -> Result<Vec<CompleteMatch>> {
897 let mut detected = Vec::new();
898 let mut state_machines = self.state_machines.write().await;
899
900 for (pattern_name, state_machine) in state_machines.iter_mut() {
901 if let Some(complete_match) = self.try_match_pattern(state_machine, new_event).await? {
903 detected.push(complete_match);
904 debug!("Pattern detected: {}", pattern_name);
905 }
906 }
907
908 Ok(detected)
909 }
910
911 async fn try_match_pattern(
913 &self,
914 state_machine: &mut StateMachine,
915 event: &TimestampedEvent,
916 ) -> Result<Option<CompleteMatch>> {
917 let pattern = state_machine.pattern.clone();
919
920 match &pattern {
921 EventPattern::Simple { predicates, .. } => {
922 if self.evaluate_predicates(predicates, &event.event).await? {
923 Ok(Some(CompleteMatch {
924 id: Uuid::new_v4(),
925 pattern_name: pattern.name().to_string(),
926 event_ids: vec![event.id],
927 start_time: event.timestamp,
928 end_time: event.timestamp,
929 duration: Duration::from_secs(0),
930 confidence: 1.0,
931 metadata: HashMap::new(),
932 }))
933 } else {
934 Ok(None)
935 }
936 }
937 EventPattern::Sequence {
938 patterns,
939 time_window,
940 strict,
941 ..
942 } => {
943 self.match_sequence(state_machine, event, patterns, *time_window, *strict)
944 .await
945 }
946 EventPattern::And {
947 patterns,
948 time_window,
949 ..
950 } => {
951 self.match_conjunction(state_machine, event, patterns, *time_window)
952 .await
953 }
954 _ => {
955 Ok(None)
957 }
958 }
959 }
960
961 async fn match_sequence(
963 &self,
964 state_machine: &mut StateMachine,
965 event: &TimestampedEvent,
966 patterns: &[EventPattern],
967 time_window: Option<Duration>,
968 _strict: bool,
969 ) -> Result<Option<CompleteMatch>> {
970 let mut new_partial_matches = Vec::new();
972
973 for partial_match in &mut state_machine.partial_matches {
974 let next_stage = partial_match.stage;
975 if next_stage < patterns.len() {
976 if let EventPattern::Simple { predicates, .. } = &patterns[next_stage] {
977 if self.evaluate_predicates(predicates, &event.event).await? {
978 if let Some(window) = time_window {
980 let elapsed = event
981 .timestamp
982 .signed_duration_since(partial_match.start_time);
983 if elapsed.num_seconds() > window.as_secs() as i64 {
984 continue; }
986 }
987
988 let mut new_match = partial_match.clone();
990 new_match.events.push(event.clone());
991 new_match.stage += 1;
992 new_match.last_update = event.timestamp;
993
994 if new_match.stage == patterns.len() {
995 let event_ids: Vec<Uuid> =
997 new_match.events.iter().map(|e| e.id).collect();
998 let duration = event
999 .timestamp
1000 .signed_duration_since(new_match.start_time)
1001 .to_std()
1002 .unwrap_or(Duration::from_secs(0));
1003
1004 return Ok(Some(CompleteMatch {
1005 id: Uuid::new_v4(),
1006 pattern_name: state_machine.pattern.name().to_string(),
1007 event_ids,
1008 start_time: new_match.start_time,
1009 end_time: event.timestamp,
1010 duration,
1011 confidence: 1.0,
1012 metadata: HashMap::new(),
1013 }));
1014 } else {
1015 new_partial_matches.push(new_match);
1016 }
1017 }
1018 }
1019 }
1020 }
1021
1022 if let EventPattern::Simple { predicates, .. } = &patterns[0] {
1024 if self.evaluate_predicates(predicates, &event.event).await? {
1025 new_partial_matches.push(PartialMatch {
1026 id: Uuid::new_v4(),
1027 events: vec![event.clone()],
1028 stage: 1,
1029 start_time: event.timestamp,
1030 last_update: event.timestamp,
1031 state: HashMap::new(),
1032 });
1033 }
1034 }
1035
1036 state_machine.partial_matches = new_partial_matches;
1037 Ok(None)
1038 }
1039
1040 async fn match_conjunction(
1042 &self,
1043 _state_machine: &mut StateMachine,
1044 _event: &TimestampedEvent,
1045 _patterns: &[EventPattern],
1046 _time_window: Option<Duration>,
1047 ) -> Result<Option<CompleteMatch>> {
1048 Ok(None)
1050 }
1051
1052 async fn evaluate_predicates(
1054 &self,
1055 predicates: &[FieldPredicate],
1056 event: &StreamEvent,
1057 ) -> Result<bool> {
1058 for predicate in predicates {
1059 match predicate {
1060 FieldPredicate::Equals { field, value } if field == "event_type" => {
1061 let event_type = match event {
1063 StreamEvent::TripleAdded { .. } => "TripleAdded",
1064 StreamEvent::TripleRemoved { .. } => "TripleRemoved",
1065 StreamEvent::QuadAdded { .. } => "QuadAdded",
1066 StreamEvent::QuadRemoved { .. } => "QuadRemoved",
1067 StreamEvent::GraphCreated { .. } => "GraphCreated",
1068 StreamEvent::GraphCleared { .. } => "GraphCleared",
1069 StreamEvent::GraphDeleted { .. } => "GraphDeleted",
1070 StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
1071 StreamEvent::TransactionBegin { .. } => "TransactionBegin",
1072 StreamEvent::TransactionCommit { .. } => "TransactionCommit",
1073 StreamEvent::TransactionAbort { .. } => "TransactionAbort",
1074 StreamEvent::SchemaChanged { .. } => "SchemaChanged",
1075 StreamEvent::Heartbeat { .. } => "Heartbeat",
1076 _ => "Other", };
1078 if event_type != value {
1079 return Ok(false);
1080 }
1081 }
1082 FieldPredicate::Contains { field, substring } if field == "source" => {
1083 let source = match event {
1085 StreamEvent::Heartbeat { source, .. } => source,
1086 _ => return Ok(false),
1087 };
1088 if !source.contains(substring) {
1089 return Ok(false);
1090 }
1091 }
1092 _ => {
1093 }
1095 }
1096 }
1097 Ok(true)
1098 }
1099
1100 async fn execute_rules(&self, pattern_match: &CompleteMatch) -> Result<Vec<String>> {
1102 let mut triggered = Vec::new();
1103
1104 let rules = {
1106 let rule_engine = self.rule_engine.read().await;
1107 rule_engine.rules.clone()
1108 };
1109
1110 for (rule_name, rule) in &rules {
1111 if !rule.enabled {
1112 continue;
1113 }
1114
1115 if self
1117 .evaluate_rule_condition(&rule.condition, pattern_match)
1118 .await?
1119 {
1120 for action in &rule.actions {
1122 self.execute_rule_action(action).await?;
1123 }
1124 triggered.push(rule_name.clone());
1125
1126 let mut rule_engine = self.rule_engine.write().await;
1128 rule_engine.stats.successful_executions += 1;
1129 }
1130 }
1131
1132 Ok(triggered)
1133 }
1134
1135 async fn evaluate_rule_condition(
1137 &self,
1138 condition: &RuleCondition,
1139 pattern_match: &CompleteMatch,
1140 ) -> Result<bool> {
1141 match condition {
1142 RuleCondition::PatternMatched { pattern } => Ok(&pattern_match.pattern_name == pattern),
1143 _ => {
1144 Ok(false)
1146 }
1147 }
1148 }
1149
1150 async fn execute_rule_action(&self, action: &RuleAction) -> Result<()> {
1152 match action {
1153 RuleAction::SendAlert { severity, message } => {
1154 info!("CEP Alert [{}]: {}", severity, message);
1155 }
1156 RuleAction::EmitEvent { event_type, data } => {
1157 debug!("CEP Emit Event: {} with data: {:?}", event_type, data);
1158 }
1159 _ => {
1160 }
1162 }
1163 Ok(())
1164 }
1165
1166 async fn correlate_events(
1168 &self,
1169 _pattern_match: &CompleteMatch,
1170 ) -> Result<Vec<CorrelationResult>> {
1171 Ok(Vec::new())
1173 }
1174
1175 async fn enrich_events(
1177 &self,
1178 _pattern_match: &CompleteMatch,
1179 ) -> Result<HashMap<String, EnrichmentData>> {
1180 Ok(HashMap::new())
1182 }
1183
1184 async fn maybe_run_gc(&self) -> Result<()> {
1186 let mut last_gc = self.last_gc.write().await;
1187 if last_gc.elapsed() >= self.config.gc_interval {
1188 self.run_gc().await?;
1189 *last_gc = Instant::now();
1190
1191 let mut metrics = self.metrics.write().await;
1192 metrics.gc_count += 1;
1193 }
1194 Ok(())
1195 }
1196
1197 async fn run_gc(&self) -> Result<()> {
1199 let cutoff_time =
1200 Utc::now() - ChronoDuration::seconds(self.config.max_time_window.as_secs() as i64);
1201
1202 let mut buffers = self.event_buffers.write().await;
1204 for buffer in buffers.values_mut() {
1205 buffer.events.retain(|e| e.timestamp > cutoff_time);
1206 if let Some(first_event) = buffer.events.front() {
1207 buffer.oldest_timestamp = Some(first_event.timestamp);
1208 }
1209 }
1210
1211 let mut state_machines = self.state_machines.write().await;
1213 for state_machine in state_machines.values_mut() {
1214 state_machine
1215 .partial_matches
1216 .retain(|m| m.last_update > cutoff_time);
1217 }
1218
1219 debug!("CEP garbage collection completed");
1220 Ok(())
1221 }
1222
1223 async fn update_metrics(&self, processing_latency: Duration, patterns_detected: usize) {
1225 let mut metrics = self.metrics.write().await;
1226 metrics.total_events_processed += 1;
1227 metrics.total_patterns_detected += patterns_detected as u64;
1228
1229 let now = Utc::now();
1230 let elapsed_duration = now.signed_duration_since(metrics.last_update);
1231 let elapsed_secs = elapsed_duration.num_seconds() as f64;
1232
1233 if elapsed_secs > 0.0 {
1234 metrics.events_per_second = metrics.total_events_processed as f64 / elapsed_secs;
1235 metrics.patterns_per_second = metrics.total_patterns_detected as f64 / elapsed_secs;
1236 }
1237
1238 let total_latency = metrics.avg_event_processing_latency.as_micros()
1240 * (metrics.total_events_processed - 1) as u128
1241 + processing_latency.as_micros();
1242 metrics.avg_event_processing_latency =
1243 Duration::from_micros((total_latency / metrics.total_events_processed as u128) as u64);
1244
1245 let state_machines = self.state_machines.read().await;
1247 metrics.active_partial_matches = state_machines
1248 .values()
1249 .map(|sm| sm.partial_matches.len())
1250 .sum();
1251 }
1252
1253 pub async fn get_metrics(&self) -> CepMetrics {
1255 self.metrics.read().await.clone()
1256 }
1257
1258 pub async fn get_statistics(&self) -> CepStatistics {
1260 let metrics = self.metrics.read().await;
1261 let rule_engine = self.rule_engine.read().await;
1262 let correlator = self.correlator.read().await;
1263 let enrichment = self.enrichment_service.read().await;
1264 let detector = self.pattern_detector.read().await;
1265
1266 CepStatistics {
1267 metrics: metrics.clone(),
1268 rule_stats: rule_engine.stats.clone(),
1269 correlation_stats: correlator.stats.clone(),
1270 enrichment_stats: enrichment.stats.clone(),
1271 detection_stats: detector.stats.clone(),
1272 }
1273 }
1274}
1275
1276#[derive(Debug, Clone, Serialize, Deserialize)]
1278pub struct CepStatistics {
1279 pub metrics: CepMetrics,
1281 pub rule_stats: RuleExecutionStats,
1283 pub correlation_stats: CorrelationStats,
1285 pub enrichment_stats: EnrichmentStats,
1287 pub detection_stats: DetectionStats,
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use super::*;
1294 use crate::event::EventMetadata;
1295
1296 #[tokio::test]
1297 async fn test_cep_engine_creation() {
1298 let config = CepConfig::default();
1299 let engine = CepEngine::new(config);
1300 assert!(engine.is_ok());
1301 }
1302
1303 #[tokio::test]
1304 async fn test_pattern_registration() {
1305 let config = CepConfig::default();
1306 let mut engine = CepEngine::new(config).unwrap();
1307
1308 let pattern = EventPattern::simple("event_type", "test");
1309 let result = engine.register_pattern("test_pattern", pattern).await;
1310 assert!(result.is_ok());
1311
1312 let patterns = engine.patterns.read().await;
1313 assert!(patterns.contains_key("test_pattern"));
1314 }
1315
1316 #[tokio::test]
1317 async fn test_simple_pattern_matching() {
1318 let config = CepConfig::default();
1319 let mut engine = CepEngine::new(config).unwrap();
1320
1321 let pattern = EventPattern::simple("event_type", "Heartbeat");
1322 engine.register_pattern("heartbeat", pattern).await.unwrap();
1323
1324 let event = StreamEvent::Heartbeat {
1325 timestamp: Utc::now(),
1326 source: "test".to_string(),
1327 metadata: EventMetadata::default(),
1328 };
1329
1330 let detected = engine.process_event(event).await.unwrap();
1331 assert!(!detected.is_empty());
1332 }
1333
1334 #[tokio::test]
1335 async fn test_sequence_pattern() {
1336 let config = CepConfig::default();
1337 let mut engine = CepEngine::new(config).unwrap();
1338
1339 let pattern = EventPattern::sequence(vec![
1340 EventPattern::simple("event_type", "Heartbeat"),
1341 EventPattern::simple("event_type", "Heartbeat"),
1342 ])
1343 .with_time_window(Duration::from_secs(10));
1344
1345 engine
1346 .register_pattern("double_heartbeat", pattern)
1347 .await
1348 .unwrap();
1349
1350 let event1 = StreamEvent::Heartbeat {
1351 timestamp: Utc::now(),
1352 source: "test".to_string(),
1353 metadata: EventMetadata::default(),
1354 };
1355
1356 let detected1 = engine.process_event(event1).await.unwrap();
1357 assert!(detected1.is_empty()); let event2 = StreamEvent::Heartbeat {
1360 timestamp: Utc::now(),
1361 source: "test".to_string(),
1362 metadata: EventMetadata::default(),
1363 };
1364
1365 let detected2 = engine.process_event(event2).await.unwrap();
1366 assert!(!detected2.is_empty()); }
1368
1369 #[tokio::test]
1370 async fn test_rule_registration() {
1371 let config = CepConfig::default();
1372 let mut engine = CepEngine::new(config).unwrap();
1373
1374 let rule = ProcessingRule {
1375 name: "test_rule".to_string(),
1376 condition: RuleCondition::PatternMatched {
1377 pattern: "heartbeat".to_string(),
1378 },
1379 actions: vec![RuleAction::SendAlert {
1380 severity: "info".to_string(),
1381 message: "Heartbeat detected".to_string(),
1382 }],
1383 priority: 1,
1384 enabled: true,
1385 };
1386
1387 let result = engine.register_rule(rule).await;
1388 assert!(result.is_ok());
1389 }
1390
1391 #[tokio::test]
1392 async fn test_event_buffer() {
1393 let config = CepConfig::default();
1394 let engine = CepEngine::new(config).unwrap();
1395
1396 let event = StreamEvent::Heartbeat {
1397 timestamp: Utc::now(),
1398 source: "test".to_string(),
1399 metadata: EventMetadata::default(),
1400 };
1401
1402 let timestamped = TimestampedEvent {
1403 event,
1404 timestamp: Utc::now(),
1405 id: Uuid::new_v4(),
1406 };
1407
1408 engine
1409 .add_to_buffer("test_stream", timestamped)
1410 .await
1411 .unwrap();
1412
1413 let buffers = engine.event_buffers.read().await;
1414 assert!(buffers.contains_key("test_stream"));
1415 assert_eq!(buffers.get("test_stream").unwrap().events.len(), 1);
1416 }
1417
1418 #[tokio::test]
1419 async fn test_predicate_evaluation() {
1420 let config = CepConfig::default();
1421 let engine = CepEngine::new(config).unwrap();
1422
1423 let predicates = vec![FieldPredicate::Equals {
1424 field: "event_type".to_string(),
1425 value: "Heartbeat".to_string(),
1426 }];
1427
1428 let event = StreamEvent::Heartbeat {
1429 timestamp: Utc::now(),
1430 source: "test".to_string(),
1431 metadata: EventMetadata::default(),
1432 };
1433
1434 let result = engine
1435 .evaluate_predicates(&predicates, &event)
1436 .await
1437 .unwrap();
1438 assert!(result);
1439 }
1440
1441 #[tokio::test]
1442 async fn test_metrics_collection() {
1443 let config = CepConfig::default();
1444 let mut engine = CepEngine::new(config).unwrap();
1445
1446 let event = StreamEvent::Heartbeat {
1447 timestamp: Utc::now(),
1448 source: "test".to_string(),
1449 metadata: EventMetadata::default(),
1450 };
1451
1452 engine.process_event(event).await.unwrap();
1453
1454 let metrics = engine.get_metrics().await;
1455 assert_eq!(metrics.total_events_processed, 1);
1456 }
1457
1458 #[tokio::test]
1459 async fn test_garbage_collection() {
1460 let config = CepConfig {
1461 gc_interval: Duration::from_millis(10),
1462 ..Default::default()
1463 };
1464 let engine = CepEngine::new(config).unwrap();
1465
1466 let old_event = TimestampedEvent {
1468 event: StreamEvent::Heartbeat {
1469 timestamp: Utc::now(),
1470 source: "test".to_string(),
1471 metadata: EventMetadata::default(),
1472 },
1473 timestamp: Utc::now() - ChronoDuration::hours(2),
1474 id: Uuid::new_v4(),
1475 };
1476
1477 engine.add_to_buffer("test", old_event).await.unwrap();
1478
1479 tokio::time::sleep(Duration::from_millis(20)).await;
1481
1482 engine.run_gc().await.unwrap();
1484
1485 let buffers = engine.event_buffers.read().await;
1486 assert!(buffers.get("test").unwrap().events.is_empty());
1487 }
1488
1489 #[tokio::test]
1490 async fn test_pattern_with_time_window() {
1491 let pattern = EventPattern::sequence(vec![
1492 EventPattern::simple("type", "A"),
1493 EventPattern::simple("type", "B"),
1494 ])
1495 .with_time_window(Duration::from_secs(5));
1496
1497 match pattern {
1498 EventPattern::Sequence { time_window, .. } => {
1499 assert_eq!(time_window, Some(Duration::from_secs(5)));
1500 }
1501 _ => panic!("Expected sequence pattern"),
1502 }
1503 }
1504
1505 #[tokio::test]
1506 async fn test_statistics() {
1507 let config = CepConfig::default();
1508 let engine = CepEngine::new(config).unwrap();
1509
1510 let stats = engine.get_statistics().await;
1511 assert_eq!(stats.metrics.total_events_processed, 0);
1512 }
1513}