1use crate::event::StreamEvent;
50use anyhow::Result;
51use chrono::{DateTime, Duration as ChronoDuration, Utc};
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, VecDeque};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56use tokio::sync::RwLock;
57use tracing::{debug, info};
58use uuid::Uuid;
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CepConfig {
63 pub max_events_in_memory: usize,
65
66 pub max_time_window: Duration,
68
69 pub enable_correlation: bool,
71
72 pub enable_state_machines: bool,
74
75 pub enable_rules: bool,
77
78 pub enable_enrichment: bool,
80
81 pub max_pattern_depth: usize,
83
84 pub pattern_matching_timeout: Duration,
86
87 pub event_buffer_size: usize,
89
90 pub collect_metrics: bool,
92
93 pub gc_interval: Duration,
95
96 pub enable_distributed: bool,
98
99 pub num_partitions: usize,
101}
102
103impl Default for CepConfig {
104 fn default() -> Self {
105 Self {
106 max_events_in_memory: 100000,
107 max_time_window: Duration::from_secs(3600),
108 enable_correlation: true,
109 enable_state_machines: true,
110 enable_rules: true,
111 enable_enrichment: true,
112 max_pattern_depth: 10,
113 pattern_matching_timeout: Duration::from_millis(100),
114 event_buffer_size: 10000,
115 collect_metrics: true,
116 gc_interval: Duration::from_secs(60),
117 enable_distributed: false,
118 num_partitions: 8,
119 }
120 }
121}
122
123pub struct CepEngine {
125 patterns: Arc<RwLock<HashMap<String, EventPattern>>>,
127
128 event_buffers: Arc<RwLock<HashMap<String, EventBuffer>>>,
130
131 state_machines: Arc<RwLock<HashMap<String, StateMachine>>>,
133
134 rule_engine: Arc<RwLock<RuleEngine>>,
136
137 correlator: Arc<RwLock<EventCorrelator>>,
139
140 enrichment_service: Arc<RwLock<EnrichmentService>>,
142
143 pattern_detector: Arc<RwLock<PatternDetector>>,
145
146 metrics: Arc<RwLock<CepMetrics>>,
148
149 config: CepConfig,
151
152 last_gc: Arc<RwLock<Instant>>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub enum EventPattern {
159 Simple {
161 name: String,
163 predicates: Vec<FieldPredicate>,
165 },
166
167 Sequence {
169 name: String,
171 patterns: Vec<EventPattern>,
173 time_window: Option<Duration>,
175 strict: bool,
177 },
178
179 And {
181 name: String,
183 patterns: Vec<EventPattern>,
185 time_window: Option<Duration>,
187 },
188
189 Or {
191 name: String,
193 patterns: Vec<EventPattern>,
195 },
196
197 Not {
199 name: String,
201 pattern: Box<EventPattern>,
203 time_window: Duration,
205 },
206
207 Repeat {
209 name: String,
211 pattern: Box<EventPattern>,
213 min_count: usize,
215 max_count: Option<usize>,
217 time_window: Option<Duration>,
219 },
220
221 Temporal {
223 name: String,
225 first: Box<EventPattern>,
227 operator: TemporalOperator,
229 second: Box<EventPattern>,
231 tolerance: Option<Duration>,
233 },
234
235 Aggregation {
237 name: String,
239 pattern: Box<EventPattern>,
241 aggregation: CepAggregationFunction,
243 window: Duration,
245 threshold: f64,
247 },
248}
249
250impl EventPattern {
251 pub fn simple(field: &str, value: &str) -> Self {
253 EventPattern::Simple {
254 name: format!("{}={}", field, value),
255 predicates: vec![FieldPredicate::Equals {
256 field: field.to_string(),
257 value: value.to_string(),
258 }],
259 }
260 }
261
262 pub fn sequence(patterns: Vec<EventPattern>) -> Self {
264 EventPattern::Sequence {
265 name: "sequence".to_string(),
266 patterns,
267 time_window: None,
268 strict: false,
269 }
270 }
271
272 pub fn with_time_window(mut self, window: Duration) -> Self {
274 match &mut self {
275 EventPattern::Sequence { time_window, .. } => *time_window = Some(window),
276 EventPattern::And { time_window, .. } => *time_window = Some(window),
277 EventPattern::Repeat { time_window, .. } => *time_window = Some(window),
278 _ => {}
279 }
280 self
281 }
282
283 pub fn name(&self) -> &str {
285 match self {
286 EventPattern::Simple { name, .. } => name,
287 EventPattern::Sequence { name, .. } => name,
288 EventPattern::And { name, .. } => name,
289 EventPattern::Or { name, .. } => name,
290 EventPattern::Not { name, .. } => name,
291 EventPattern::Repeat { name, .. } => name,
292 EventPattern::Temporal { name, .. } => name,
293 EventPattern::Aggregation { name, .. } => name,
294 }
295 }
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
300pub enum FieldPredicate {
301 Equals { field: String, value: String },
303 NotEquals { field: String, value: String },
305 Contains { field: String, substring: String },
307 Regex { field: String, pattern: String },
309 GreaterThan { field: String, value: f64 },
311 LessThan { field: String, value: f64 },
313 InRange { field: String, min: f64, max: f64 },
315 Exists { field: String },
317 Custom { name: String },
319}
320
321#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
323pub enum TemporalOperator {
324 Before,
326 After,
328 Meets,
330 During,
332 Overlaps,
334 Starts,
336 Finishes,
338 Equals,
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize)]
344pub enum CepAggregationFunction {
345 Count,
347 Sum { field: String },
349 Average { field: String },
351 Min { field: String },
353 Max { field: String },
355 StdDev { field: String },
357 Percentile { field: String, percentile: f64 },
359 Custom { name: String },
361}
362
363#[derive(Debug, Clone)]
365pub struct EventBuffer {
366 pub stream_name: String,
368 pub events: VecDeque<TimestampedEvent>,
370 pub max_size: usize,
372 pub oldest_timestamp: Option<DateTime<Utc>>,
374 pub newest_timestamp: Option<DateTime<Utc>>,
376}
377
378#[derive(Debug, Clone)]
380pub struct TimestampedEvent {
381 pub event: StreamEvent,
383 pub timestamp: DateTime<Utc>,
385 pub id: Uuid,
387}
388
389#[derive(Debug, Clone)]
391pub struct StateMachine {
392 pub pattern: EventPattern,
394 pub state: State,
396 pub partial_matches: Vec<PartialMatch>,
398 pub completed_matches: Vec<CompleteMatch>,
400 pub transition_count: usize,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
406pub enum State {
407 Initial,
409 Intermediate { stage: usize },
411 Final,
413 Error,
415}
416
417#[derive(Debug, Clone)]
419pub struct PartialMatch {
420 pub id: Uuid,
422 pub events: Vec<TimestampedEvent>,
424 pub stage: usize,
426 pub start_time: DateTime<Utc>,
428 pub last_update: DateTime<Utc>,
430 pub state: HashMap<String, String>,
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct CompleteMatch {
437 pub id: Uuid,
439 pub pattern_name: String,
441 pub event_ids: Vec<Uuid>,
443 pub start_time: DateTime<Utc>,
445 pub end_time: DateTime<Utc>,
447 pub duration: Duration,
449 pub confidence: f64,
451 pub metadata: HashMap<String, String>,
453}
454
455#[derive(Debug, Clone)]
457pub struct RuleEngine {
458 pub rules: HashMap<String, ProcessingRule>,
460 pub stats: RuleExecutionStats,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct ProcessingRule {
467 pub name: String,
469 pub condition: RuleCondition,
471 pub actions: Vec<RuleAction>,
473 pub priority: i32,
475 pub enabled: bool,
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
481pub enum RuleCondition {
482 PatternMatched { pattern: String },
484 FieldCondition { predicate: FieldPredicate },
486 ThresholdExceeded { metric: String, threshold: f64 },
488 Complex {
490 operator: String,
491 conditions: Vec<RuleCondition>,
492 },
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
497pub enum RuleAction {
498 EmitEvent {
500 event_type: String,
501 data: HashMap<String, String>,
502 },
503 SendAlert { severity: String, message: String },
505 UpdateState { key: String, value: String },
507 Webhook { url: String, method: String },
509 Custom {
511 name: String,
512 params: HashMap<String, String>,
513 },
514}
515
516#[derive(Debug, Clone, Default, Serialize, Deserialize)]
518pub struct RuleExecutionStats {
519 pub total_executions: u64,
521 pub successful_executions: u64,
523 pub failed_executions: u64,
525 pub total_execution_time: Duration,
527 pub avg_execution_time: Duration,
529}
530
531#[derive(Debug, Clone)]
533pub struct EventCorrelator {
534 pub correlation_functions: HashMap<String, CorrelationFunction>,
536 pub correlation_cache: HashMap<CorrelationKey, CorrelationResult>,
538 pub stats: CorrelationStats,
540}
541
542#[derive(Debug, Clone)]
544pub struct CorrelationFunction {
545 pub name: String,
547 pub time_window: Duration,
549 pub fields: Vec<String>,
551 pub threshold: f64,
553}
554
555#[derive(Debug, Clone, Hash, PartialEq, Eq)]
557pub struct CorrelationKey {
558 pub event1: Uuid,
560 pub event2: Uuid,
562 pub function: String,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct CorrelationResult {
569 pub score: f64,
571 pub correlated_fields: Vec<String>,
573 pub timestamp: DateTime<Utc>,
575}
576
577#[derive(Debug, Clone, Default, Serialize, Deserialize)]
579pub struct CorrelationStats {
580 pub total_correlations: u64,
582 pub cache_hits: u64,
584 pub cache_misses: u64,
586 pub avg_correlation_score: f64,
588}
589
590#[derive(Debug, Clone)]
592pub struct EnrichmentService {
593 pub sources: HashMap<String, EnrichmentSource>,
595 pub cache: HashMap<String, EnrichmentData>,
597 pub stats: EnrichmentStats,
599}
600
601#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct EnrichmentSource {
604 pub name: String,
606 pub source_type: EnrichmentSourceType,
608 pub key_field: String,
610 pub cache_ttl: Duration,
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
616pub enum EnrichmentSourceType {
617 ExternalApi { url: String, auth: Option<String> },
619 Database {
621 connection_string: String,
622 query: String,
623 },
624 InMemory {
626 data: HashMap<String, HashMap<String, String>>,
627 },
628 Custom { name: String },
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
634pub struct EnrichmentData {
635 pub fields: HashMap<String, String>,
637 pub source: String,
639 pub timestamp: DateTime<Utc>,
641 pub ttl: Duration,
643}
644
645#[derive(Debug, Clone, Default, Serialize, Deserialize)]
647pub struct EnrichmentStats {
648 pub total_enrichments: u64,
650 pub cache_hits: u64,
652 pub cache_misses: u64,
654 pub failed_enrichments: u64,
656}
657
658#[derive(Debug, Clone)]
660pub struct PatternDetector {
661 pub patterns: HashMap<String, EventPattern>,
663 pub algorithms: HashMap<String, DetectionAlgorithm>,
665 pub stats: DetectionStats,
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
671pub enum DetectionAlgorithm {
672 Sequential,
674 Automaton,
676 Tree,
678 Graph,
680 MachineLearning { model_name: String },
682}
683
684#[derive(Debug, Clone, Default, Serialize, Deserialize)]
686pub struct DetectionStats {
687 pub total_events_processed: u64,
689 pub patterns_detected: u64,
691 pub false_positives: u64,
693 pub false_negatives: u64,
695 pub avg_detection_latency: Duration,
697 pub total_detection_time: Duration,
699}
700
701#[derive(Debug, Clone, Default, Serialize, Deserialize)]
703pub struct CepMetrics {
704 pub total_events_processed: u64,
706 pub total_patterns_detected: u64,
708 pub events_per_second: f64,
710 pub patterns_per_second: f64,
712 pub avg_event_processing_latency: Duration,
714 pub avg_pattern_matching_latency: Duration,
716 pub memory_usage_bytes: usize,
718 pub active_partial_matches: usize,
720 pub completed_matches: usize,
722 pub gc_count: u64,
724 pub last_update: DateTime<Utc>,
726}
727
728#[derive(Debug, Clone, Serialize, Deserialize)]
730pub struct DetectedPattern {
731 pub pattern_match: CompleteMatch,
733 pub triggered_rules: Vec<String>,
735 pub correlations: Vec<CorrelationResult>,
737 pub enrichments: HashMap<String, EnrichmentData>,
739}
740
741impl CepEngine {
742 pub fn new(config: CepConfig) -> Result<Self> {
744 Ok(Self {
745 patterns: Arc::new(RwLock::new(HashMap::new())),
746 event_buffers: Arc::new(RwLock::new(HashMap::new())),
747 state_machines: Arc::new(RwLock::new(HashMap::new())),
748 rule_engine: Arc::new(RwLock::new(RuleEngine {
749 rules: HashMap::new(),
750 stats: RuleExecutionStats::default(),
751 })),
752 correlator: Arc::new(RwLock::new(EventCorrelator {
753 correlation_functions: HashMap::new(),
754 correlation_cache: HashMap::new(),
755 stats: CorrelationStats::default(),
756 })),
757 enrichment_service: Arc::new(RwLock::new(EnrichmentService {
758 sources: HashMap::new(),
759 cache: HashMap::new(),
760 stats: EnrichmentStats::default(),
761 })),
762 pattern_detector: Arc::new(RwLock::new(PatternDetector {
763 patterns: HashMap::new(),
764 algorithms: HashMap::new(),
765 stats: DetectionStats::default(),
766 })),
767 metrics: Arc::new(RwLock::new(CepMetrics {
768 last_update: Utc::now(),
769 ..Default::default()
770 })),
771 config,
772 last_gc: Arc::new(RwLock::new(Instant::now())),
773 })
774 }
775
776 pub async fn register_pattern(&mut self, name: &str, pattern: EventPattern) -> Result<()> {
778 let mut patterns = self.patterns.write().await;
779 patterns.insert(name.to_string(), pattern.clone());
780
781 let mut state_machines = self.state_machines.write().await;
783 state_machines.insert(
784 name.to_string(),
785 StateMachine {
786 pattern,
787 state: State::Initial,
788 partial_matches: Vec::new(),
789 completed_matches: Vec::new(),
790 transition_count: 0,
791 },
792 );
793
794 info!("Registered CEP pattern: {}", name);
795 Ok(())
796 }
797
798 pub async fn register_rule(&mut self, rule: ProcessingRule) -> Result<()> {
800 let mut rule_engine = self.rule_engine.write().await;
801 rule_engine.rules.insert(rule.name.clone(), rule.clone());
802 info!("Registered CEP rule: {}", rule.name);
803 Ok(())
804 }
805
806 pub async fn process_event(&mut self, event: StreamEvent) -> Result<Vec<DetectedPattern>> {
808 let start_time = Instant::now();
809 let event_timestamp = Utc::now();
810
811 let timestamped_event = TimestampedEvent {
813 event: event.clone(),
814 timestamp: event_timestamp,
815 id: Uuid::new_v4(),
816 };
817
818 self.add_to_buffer("default", timestamped_event.clone())
820 .await?;
821
822 self.maybe_run_gc().await?;
824
825 let detected_patterns = self.detect_patterns(×tamped_event).await?;
827
828 let mut results = Vec::new();
830 for pattern_match in detected_patterns {
831 let triggered_rules = self.execute_rules(&pattern_match).await?;
832
833 let correlations = if self.config.enable_correlation {
835 self.correlate_events(&pattern_match).await?
836 } else {
837 Vec::new()
838 };
839
840 let enrichments = if self.config.enable_enrichment {
842 self.enrich_events(&pattern_match).await?
843 } else {
844 HashMap::new()
845 };
846
847 results.push(DetectedPattern {
848 pattern_match,
849 triggered_rules,
850 correlations,
851 enrichments,
852 });
853 }
854
855 let processing_latency = start_time.elapsed();
857 self.update_metrics(processing_latency, results.len()).await;
858
859 Ok(results)
860 }
861
862 async fn add_to_buffer(&self, stream: &str, event: TimestampedEvent) -> Result<()> {
864 let mut buffers = self.event_buffers.write().await;
865 let buffer = buffers
866 .entry(stream.to_string())
867 .or_insert_with(|| EventBuffer {
868 stream_name: stream.to_string(),
869 events: VecDeque::new(),
870 max_size: self.config.event_buffer_size,
871 oldest_timestamp: None,
872 newest_timestamp: None,
873 });
874
875 if buffer.oldest_timestamp.is_none() {
877 buffer.oldest_timestamp = Some(event.timestamp);
878 }
879 buffer.newest_timestamp = Some(event.timestamp);
880
881 buffer.events.push_back(event);
883
884 while buffer.events.len() > buffer.max_size {
886 buffer.events.pop_front();
887 if let Some(first_event) = buffer.events.front() {
888 buffer.oldest_timestamp = Some(first_event.timestamp);
889 }
890 }
891
892 Ok(())
893 }
894
895 async fn detect_patterns(&self, new_event: &TimestampedEvent) -> Result<Vec<CompleteMatch>> {
897 let mut detected = Vec::new();
898 let mut state_machines = self.state_machines.write().await;
899
900 for (pattern_name, state_machine) in state_machines.iter_mut() {
901 if let Some(complete_match) = self.try_match_pattern(state_machine, new_event).await? {
903 detected.push(complete_match);
904 debug!("Pattern detected: {}", pattern_name);
905 }
906 }
907
908 Ok(detected)
909 }
910
911 async fn try_match_pattern(
913 &self,
914 state_machine: &mut StateMachine,
915 event: &TimestampedEvent,
916 ) -> Result<Option<CompleteMatch>> {
917 let pattern = state_machine.pattern.clone();
919
920 match &pattern {
921 EventPattern::Simple { predicates, .. } => {
922 if self.evaluate_predicates(predicates, &event.event).await? {
923 Ok(Some(CompleteMatch {
924 id: Uuid::new_v4(),
925 pattern_name: pattern.name().to_string(),
926 event_ids: vec![event.id],
927 start_time: event.timestamp,
928 end_time: event.timestamp,
929 duration: Duration::from_secs(0),
930 confidence: 1.0,
931 metadata: HashMap::new(),
932 }))
933 } else {
934 Ok(None)
935 }
936 }
937 EventPattern::Sequence {
938 patterns,
939 time_window,
940 strict,
941 ..
942 } => {
943 self.match_sequence(state_machine, event, patterns, *time_window, *strict)
944 .await
945 }
946 EventPattern::And {
947 patterns,
948 time_window,
949 ..
950 } => {
951 self.match_conjunction(state_machine, event, patterns, *time_window)
952 .await
953 }
954 _ => {
955 Ok(None)
957 }
958 }
959 }
960
961 async fn match_sequence(
963 &self,
964 state_machine: &mut StateMachine,
965 event: &TimestampedEvent,
966 patterns: &[EventPattern],
967 time_window: Option<Duration>,
968 _strict: bool,
969 ) -> Result<Option<CompleteMatch>> {
970 let mut new_partial_matches = Vec::new();
972
973 for partial_match in &mut state_machine.partial_matches {
974 let next_stage = partial_match.stage;
975 if next_stage < patterns.len() {
976 if let EventPattern::Simple { predicates, .. } = &patterns[next_stage] {
977 if self.evaluate_predicates(predicates, &event.event).await? {
978 if let Some(window) = time_window {
980 let elapsed = event
981 .timestamp
982 .signed_duration_since(partial_match.start_time);
983 if elapsed.num_seconds() > window.as_secs() as i64 {
984 continue; }
986 }
987
988 let mut new_match = partial_match.clone();
990 new_match.events.push(event.clone());
991 new_match.stage += 1;
992 new_match.last_update = event.timestamp;
993
994 if new_match.stage == patterns.len() {
995 let event_ids: Vec<Uuid> =
997 new_match.events.iter().map(|e| e.id).collect();
998 let duration = event
999 .timestamp
1000 .signed_duration_since(new_match.start_time)
1001 .to_std()
1002 .unwrap_or(Duration::from_secs(0));
1003
1004 return Ok(Some(CompleteMatch {
1005 id: Uuid::new_v4(),
1006 pattern_name: state_machine.pattern.name().to_string(),
1007 event_ids,
1008 start_time: new_match.start_time,
1009 end_time: event.timestamp,
1010 duration,
1011 confidence: 1.0,
1012 metadata: HashMap::new(),
1013 }));
1014 } else {
1015 new_partial_matches.push(new_match);
1016 }
1017 }
1018 }
1019 }
1020 }
1021
1022 if let EventPattern::Simple { predicates, .. } = &patterns[0] {
1024 if self.evaluate_predicates(predicates, &event.event).await? {
1025 new_partial_matches.push(PartialMatch {
1026 id: Uuid::new_v4(),
1027 events: vec![event.clone()],
1028 stage: 1,
1029 start_time: event.timestamp,
1030 last_update: event.timestamp,
1031 state: HashMap::new(),
1032 });
1033 }
1034 }
1035
1036 state_machine.partial_matches = new_partial_matches;
1037 Ok(None)
1038 }
1039
1040 async fn match_conjunction(
1042 &self,
1043 _state_machine: &mut StateMachine,
1044 _event: &TimestampedEvent,
1045 _patterns: &[EventPattern],
1046 _time_window: Option<Duration>,
1047 ) -> Result<Option<CompleteMatch>> {
1048 Ok(None)
1050 }
1051
1052 async fn evaluate_predicates(
1054 &self,
1055 predicates: &[FieldPredicate],
1056 event: &StreamEvent,
1057 ) -> Result<bool> {
1058 for predicate in predicates {
1059 match predicate {
1060 FieldPredicate::Equals { field, value } => {
1061 if field == "event_type" {
1063 let event_type = match event {
1064 StreamEvent::TripleAdded { .. } => "TripleAdded",
1065 StreamEvent::TripleRemoved { .. } => "TripleRemoved",
1066 StreamEvent::QuadAdded { .. } => "QuadAdded",
1067 StreamEvent::QuadRemoved { .. } => "QuadRemoved",
1068 StreamEvent::GraphCreated { .. } => "GraphCreated",
1069 StreamEvent::GraphCleared { .. } => "GraphCleared",
1070 StreamEvent::GraphDeleted { .. } => "GraphDeleted",
1071 StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
1072 StreamEvent::TransactionBegin { .. } => "TransactionBegin",
1073 StreamEvent::TransactionCommit { .. } => "TransactionCommit",
1074 StreamEvent::TransactionAbort { .. } => "TransactionAbort",
1075 StreamEvent::SchemaChanged { .. } => "SchemaChanged",
1076 StreamEvent::Heartbeat { .. } => "Heartbeat",
1077 _ => "Other", };
1079 if event_type != value {
1080 return Ok(false);
1081 }
1082 }
1083 }
1084 FieldPredicate::Contains { field, substring } => {
1085 if field == "source" {
1087 let source = match event {
1088 StreamEvent::Heartbeat { source, .. } => source,
1089 _ => return Ok(false),
1090 };
1091 if !source.contains(substring) {
1092 return Ok(false);
1093 }
1094 }
1095 }
1096 _ => {
1097 }
1099 }
1100 }
1101 Ok(true)
1102 }
1103
1104 async fn execute_rules(&self, pattern_match: &CompleteMatch) -> Result<Vec<String>> {
1106 let mut triggered = Vec::new();
1107
1108 let rules = {
1110 let rule_engine = self.rule_engine.read().await;
1111 rule_engine.rules.clone()
1112 };
1113
1114 for (rule_name, rule) in &rules {
1115 if !rule.enabled {
1116 continue;
1117 }
1118
1119 if self
1121 .evaluate_rule_condition(&rule.condition, pattern_match)
1122 .await?
1123 {
1124 for action in &rule.actions {
1126 self.execute_rule_action(action).await?;
1127 }
1128 triggered.push(rule_name.clone());
1129
1130 let mut rule_engine = self.rule_engine.write().await;
1132 rule_engine.stats.successful_executions += 1;
1133 }
1134 }
1135
1136 Ok(triggered)
1137 }
1138
1139 async fn evaluate_rule_condition(
1141 &self,
1142 condition: &RuleCondition,
1143 pattern_match: &CompleteMatch,
1144 ) -> Result<bool> {
1145 match condition {
1146 RuleCondition::PatternMatched { pattern } => Ok(&pattern_match.pattern_name == pattern),
1147 _ => {
1148 Ok(false)
1150 }
1151 }
1152 }
1153
1154 async fn execute_rule_action(&self, action: &RuleAction) -> Result<()> {
1156 match action {
1157 RuleAction::SendAlert { severity, message } => {
1158 info!("CEP Alert [{}]: {}", severity, message);
1159 }
1160 RuleAction::EmitEvent { event_type, data } => {
1161 debug!("CEP Emit Event: {} with data: {:?}", event_type, data);
1162 }
1163 _ => {
1164 }
1166 }
1167 Ok(())
1168 }
1169
1170 async fn correlate_events(
1172 &self,
1173 _pattern_match: &CompleteMatch,
1174 ) -> Result<Vec<CorrelationResult>> {
1175 Ok(Vec::new())
1177 }
1178
1179 async fn enrich_events(
1181 &self,
1182 _pattern_match: &CompleteMatch,
1183 ) -> Result<HashMap<String, EnrichmentData>> {
1184 Ok(HashMap::new())
1186 }
1187
1188 async fn maybe_run_gc(&self) -> Result<()> {
1190 let mut last_gc = self.last_gc.write().await;
1191 if last_gc.elapsed() >= self.config.gc_interval {
1192 self.run_gc().await?;
1193 *last_gc = Instant::now();
1194
1195 let mut metrics = self.metrics.write().await;
1196 metrics.gc_count += 1;
1197 }
1198 Ok(())
1199 }
1200
1201 async fn run_gc(&self) -> Result<()> {
1203 let cutoff_time =
1204 Utc::now() - ChronoDuration::seconds(self.config.max_time_window.as_secs() as i64);
1205
1206 let mut buffers = self.event_buffers.write().await;
1208 for buffer in buffers.values_mut() {
1209 buffer.events.retain(|e| e.timestamp > cutoff_time);
1210 if let Some(first_event) = buffer.events.front() {
1211 buffer.oldest_timestamp = Some(first_event.timestamp);
1212 }
1213 }
1214
1215 let mut state_machines = self.state_machines.write().await;
1217 for state_machine in state_machines.values_mut() {
1218 state_machine
1219 .partial_matches
1220 .retain(|m| m.last_update > cutoff_time);
1221 }
1222
1223 debug!("CEP garbage collection completed");
1224 Ok(())
1225 }
1226
1227 async fn update_metrics(&self, processing_latency: Duration, patterns_detected: usize) {
1229 let mut metrics = self.metrics.write().await;
1230 metrics.total_events_processed += 1;
1231 metrics.total_patterns_detected += patterns_detected as u64;
1232
1233 let now = Utc::now();
1234 let elapsed_duration = now.signed_duration_since(metrics.last_update);
1235 let elapsed_secs = elapsed_duration.num_seconds() as f64;
1236
1237 if elapsed_secs > 0.0 {
1238 metrics.events_per_second = metrics.total_events_processed as f64 / elapsed_secs;
1239 metrics.patterns_per_second = metrics.total_patterns_detected as f64 / elapsed_secs;
1240 }
1241
1242 let total_latency = metrics.avg_event_processing_latency.as_micros()
1244 * (metrics.total_events_processed - 1) as u128
1245 + processing_latency.as_micros();
1246 metrics.avg_event_processing_latency =
1247 Duration::from_micros((total_latency / metrics.total_events_processed as u128) as u64);
1248
1249 let state_machines = self.state_machines.read().await;
1251 metrics.active_partial_matches = state_machines
1252 .values()
1253 .map(|sm| sm.partial_matches.len())
1254 .sum();
1255 }
1256
1257 pub async fn get_metrics(&self) -> CepMetrics {
1259 self.metrics.read().await.clone()
1260 }
1261
1262 pub async fn get_statistics(&self) -> CepStatistics {
1264 let metrics = self.metrics.read().await;
1265 let rule_engine = self.rule_engine.read().await;
1266 let correlator = self.correlator.read().await;
1267 let enrichment = self.enrichment_service.read().await;
1268 let detector = self.pattern_detector.read().await;
1269
1270 CepStatistics {
1271 metrics: metrics.clone(),
1272 rule_stats: rule_engine.stats.clone(),
1273 correlation_stats: correlator.stats.clone(),
1274 enrichment_stats: enrichment.stats.clone(),
1275 detection_stats: detector.stats.clone(),
1276 }
1277 }
1278}
1279
1280#[derive(Debug, Clone, Serialize, Deserialize)]
1282pub struct CepStatistics {
1283 pub metrics: CepMetrics,
1285 pub rule_stats: RuleExecutionStats,
1287 pub correlation_stats: CorrelationStats,
1289 pub enrichment_stats: EnrichmentStats,
1291 pub detection_stats: DetectionStats,
1293}
1294
1295#[cfg(test)]
1296mod tests {
1297 use super::*;
1298 use crate::event::EventMetadata;
1299
1300 #[tokio::test]
1301 async fn test_cep_engine_creation() {
1302 let config = CepConfig::default();
1303 let engine = CepEngine::new(config);
1304 assert!(engine.is_ok());
1305 }
1306
1307 #[tokio::test]
1308 async fn test_pattern_registration() {
1309 let config = CepConfig::default();
1310 let mut engine = CepEngine::new(config).unwrap();
1311
1312 let pattern = EventPattern::simple("event_type", "test");
1313 let result = engine.register_pattern("test_pattern", pattern).await;
1314 assert!(result.is_ok());
1315
1316 let patterns = engine.patterns.read().await;
1317 assert!(patterns.contains_key("test_pattern"));
1318 }
1319
1320 #[tokio::test]
1321 async fn test_simple_pattern_matching() {
1322 let config = CepConfig::default();
1323 let mut engine = CepEngine::new(config).unwrap();
1324
1325 let pattern = EventPattern::simple("event_type", "Heartbeat");
1326 engine.register_pattern("heartbeat", pattern).await.unwrap();
1327
1328 let event = StreamEvent::Heartbeat {
1329 timestamp: Utc::now(),
1330 source: "test".to_string(),
1331 metadata: EventMetadata::default(),
1332 };
1333
1334 let detected = engine.process_event(event).await.unwrap();
1335 assert!(!detected.is_empty());
1336 }
1337
1338 #[tokio::test]
1339 async fn test_sequence_pattern() {
1340 let config = CepConfig::default();
1341 let mut engine = CepEngine::new(config).unwrap();
1342
1343 let pattern = EventPattern::sequence(vec![
1344 EventPattern::simple("event_type", "Heartbeat"),
1345 EventPattern::simple("event_type", "Heartbeat"),
1346 ])
1347 .with_time_window(Duration::from_secs(10));
1348
1349 engine
1350 .register_pattern("double_heartbeat", pattern)
1351 .await
1352 .unwrap();
1353
1354 let event1 = StreamEvent::Heartbeat {
1355 timestamp: Utc::now(),
1356 source: "test".to_string(),
1357 metadata: EventMetadata::default(),
1358 };
1359
1360 let detected1 = engine.process_event(event1).await.unwrap();
1361 assert!(detected1.is_empty()); let event2 = StreamEvent::Heartbeat {
1364 timestamp: Utc::now(),
1365 source: "test".to_string(),
1366 metadata: EventMetadata::default(),
1367 };
1368
1369 let detected2 = engine.process_event(event2).await.unwrap();
1370 assert!(!detected2.is_empty()); }
1372
1373 #[tokio::test]
1374 async fn test_rule_registration() {
1375 let config = CepConfig::default();
1376 let mut engine = CepEngine::new(config).unwrap();
1377
1378 let rule = ProcessingRule {
1379 name: "test_rule".to_string(),
1380 condition: RuleCondition::PatternMatched {
1381 pattern: "heartbeat".to_string(),
1382 },
1383 actions: vec![RuleAction::SendAlert {
1384 severity: "info".to_string(),
1385 message: "Heartbeat detected".to_string(),
1386 }],
1387 priority: 1,
1388 enabled: true,
1389 };
1390
1391 let result = engine.register_rule(rule).await;
1392 assert!(result.is_ok());
1393 }
1394
1395 #[tokio::test]
1396 async fn test_event_buffer() {
1397 let config = CepConfig::default();
1398 let engine = CepEngine::new(config).unwrap();
1399
1400 let event = StreamEvent::Heartbeat {
1401 timestamp: Utc::now(),
1402 source: "test".to_string(),
1403 metadata: EventMetadata::default(),
1404 };
1405
1406 let timestamped = TimestampedEvent {
1407 event,
1408 timestamp: Utc::now(),
1409 id: Uuid::new_v4(),
1410 };
1411
1412 engine
1413 .add_to_buffer("test_stream", timestamped)
1414 .await
1415 .unwrap();
1416
1417 let buffers = engine.event_buffers.read().await;
1418 assert!(buffers.contains_key("test_stream"));
1419 assert_eq!(buffers.get("test_stream").unwrap().events.len(), 1);
1420 }
1421
1422 #[tokio::test]
1423 async fn test_predicate_evaluation() {
1424 let config = CepConfig::default();
1425 let engine = CepEngine::new(config).unwrap();
1426
1427 let predicates = vec![FieldPredicate::Equals {
1428 field: "event_type".to_string(),
1429 value: "Heartbeat".to_string(),
1430 }];
1431
1432 let event = StreamEvent::Heartbeat {
1433 timestamp: Utc::now(),
1434 source: "test".to_string(),
1435 metadata: EventMetadata::default(),
1436 };
1437
1438 let result = engine
1439 .evaluate_predicates(&predicates, &event)
1440 .await
1441 .unwrap();
1442 assert!(result);
1443 }
1444
1445 #[tokio::test]
1446 async fn test_metrics_collection() {
1447 let config = CepConfig::default();
1448 let mut engine = CepEngine::new(config).unwrap();
1449
1450 let event = StreamEvent::Heartbeat {
1451 timestamp: Utc::now(),
1452 source: "test".to_string(),
1453 metadata: EventMetadata::default(),
1454 };
1455
1456 engine.process_event(event).await.unwrap();
1457
1458 let metrics = engine.get_metrics().await;
1459 assert_eq!(metrics.total_events_processed, 1);
1460 }
1461
1462 #[tokio::test]
1463 async fn test_garbage_collection() {
1464 let config = CepConfig {
1465 gc_interval: Duration::from_millis(10),
1466 ..Default::default()
1467 };
1468 let engine = CepEngine::new(config).unwrap();
1469
1470 let old_event = TimestampedEvent {
1472 event: StreamEvent::Heartbeat {
1473 timestamp: Utc::now(),
1474 source: "test".to_string(),
1475 metadata: EventMetadata::default(),
1476 },
1477 timestamp: Utc::now() - ChronoDuration::hours(2),
1478 id: Uuid::new_v4(),
1479 };
1480
1481 engine.add_to_buffer("test", old_event).await.unwrap();
1482
1483 tokio::time::sleep(Duration::from_millis(20)).await;
1485
1486 engine.run_gc().await.unwrap();
1488
1489 let buffers = engine.event_buffers.read().await;
1490 assert!(buffers.get("test").unwrap().events.is_empty());
1491 }
1492
1493 #[tokio::test]
1494 async fn test_pattern_with_time_window() {
1495 let pattern = EventPattern::sequence(vec![
1496 EventPattern::simple("type", "A"),
1497 EventPattern::simple("type", "B"),
1498 ])
1499 .with_time_window(Duration::from_secs(5));
1500
1501 match pattern {
1502 EventPattern::Sequence { time_window, .. } => {
1503 assert_eq!(time_window, Some(Duration::from_secs(5)));
1504 }
1505 _ => panic!("Expected sequence pattern"),
1506 }
1507 }
1508
1509 #[tokio::test]
1510 async fn test_statistics() {
1511 let config = CepConfig::default();
1512 let engine = CepEngine::new(config).unwrap();
1513
1514 let stats = engine.get_statistics().await;
1515 assert_eq!(stats.metrics.total_events_processed, 0);
1516 }
1517}