1use crate::StreamEvent;
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, VecDeque};
18use uuid::Uuid;
19
20use scirs2_core::ndarray_ext::{s, Array1};
22
23#[derive(Debug, Clone)]
25struct RepeatMatchParams<'a> {
26 pattern: &'a Pattern,
27 min_count: usize,
28 max_count: Option<usize>,
29 time_window: &'a ChronoDuration,
30}
31
32#[derive(Debug, Clone)]
34struct StatisticalMatchParams<'a> {
35 name: &'a str,
36 stat_type: &'a StatisticalPatternType,
37 threshold: f64,
38 time_window: &'a ChronoDuration,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum PatternMatchStrategy {
44 Any,
46 All,
48 First,
50 Last,
52 BestMatch,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub enum Pattern {
59 Simple { name: String, predicate: String },
61 Sequence {
63 patterns: Vec<Pattern>,
64 max_distance: Option<ChronoDuration>,
65 },
66 And {
68 patterns: Vec<Pattern>,
69 time_window: ChronoDuration,
70 },
71 Or { patterns: Vec<Pattern> },
73 Not {
75 positive: Box<Pattern>,
76 negative: Box<Pattern>,
77 time_window: ChronoDuration,
78 },
79 Repeat {
81 pattern: Box<Pattern>,
82 min_count: usize,
83 max_count: Option<usize>,
84 time_window: ChronoDuration,
85 },
86 Statistical {
88 name: String,
89 stat_type: StatisticalPatternType,
90 threshold: f64,
91 time_window: ChronoDuration,
92 },
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum StatisticalPatternType {
98 Frequency,
100 Correlation { field_a: String, field_b: String },
102 MovingAverage { field: String, window_size: usize },
104 StdDev { field: String },
106 Anomaly { field: String, sensitivity: f64 },
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct PatternMatch {
113 pub pattern_id: String,
114 pub pattern_name: String,
115 pub events: Vec<StreamEvent>,
116 pub start_time: DateTime<Utc>,
117 pub end_time: DateTime<Utc>,
118 pub confidence: f64,
119 pub metadata: HashMap<String, serde_json::Value>,
120}
121
122pub struct PatternMatcher {
124 patterns: HashMap<String, Pattern>,
125 active_matches: HashMap<String, Vec<PartialMatch>>,
126 completed_matches: VecDeque<PatternMatch>,
127 event_buffer: VecDeque<(StreamEvent, DateTime<Utc>)>,
128 buffer_size: usize,
129 stats: PatternMatcherStats,
130}
131
132#[derive(Debug, Clone)]
133struct PartialMatch {
134 pattern_id: String,
135 matched_events: Vec<StreamEvent>,
136 start_time: DateTime<Utc>,
137 current_state: usize,
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct PatternMatcherStats {
142 pub events_processed: u64,
143 pub patterns_matched: u64,
144 pub partial_matches: u64,
145 pub timeouts: u64,
146 pub processing_time_ms: f64,
147}
148
149impl PatternMatcher {
150 pub fn new(buffer_size: usize) -> Self {
152 Self {
153 patterns: HashMap::new(),
154 active_matches: HashMap::new(),
155 completed_matches: VecDeque::new(),
156 event_buffer: VecDeque::new(),
157 buffer_size,
158 stats: PatternMatcherStats::default(),
159 }
160 }
161
162 pub fn register_pattern(&mut self, pattern: Pattern) -> String {
164 let pattern_id = Uuid::new_v4().to_string();
165 self.patterns.insert(pattern_id.clone(), pattern);
166 pattern_id
167 }
168
169 pub fn process_event(&mut self, event: StreamEvent) -> Result<Vec<PatternMatch>> {
171 let start = std::time::Instant::now();
172 let now = Utc::now();
173
174 self.stats.events_processed += 1;
175
176 self.event_buffer.push_back((event.clone(), now));
178 if self.event_buffer.len() > self.buffer_size {
179 self.event_buffer.pop_front();
180 }
181
182 let mut new_matches = Vec::new();
184
185 let patterns: Vec<(String, Pattern)> = self
187 .patterns
188 .iter()
189 .map(|(k, v)| (k.clone(), v.clone()))
190 .collect();
191
192 for (pattern_id, pattern) in patterns {
193 match self.match_pattern(&pattern_id, &pattern, &event, now) {
194 Ok(matches) => new_matches.extend(matches),
195 Err(e) => tracing::warn!("Pattern matching error for {}: {}", pattern_id, e),
196 }
197 }
198
199 self.cleanup_expired_matches(now);
201
202 self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
204 self.stats.patterns_matched += new_matches.len() as u64;
205
206 Ok(new_matches)
207 }
208
209 fn match_pattern(
211 &mut self,
212 pattern_id: &str,
213 pattern: &Pattern,
214 event: &StreamEvent,
215 now: DateTime<Utc>,
216 ) -> Result<Vec<PatternMatch>> {
217 match pattern {
218 Pattern::Simple { name, predicate } => {
219 self.match_simple_pattern(pattern_id, name, predicate, event, now)
220 }
221 Pattern::Sequence {
222 patterns,
223 max_distance,
224 } => self.match_sequence_pattern(pattern_id, patterns, max_distance, event, now),
225 Pattern::And {
226 patterns,
227 time_window,
228 } => self.match_and_pattern(pattern_id, patterns, time_window, event, now),
229 Pattern::Or { patterns } => self.match_or_pattern(pattern_id, patterns, event, now),
230 Pattern::Not {
231 positive,
232 negative,
233 time_window,
234 } => self.match_not_pattern(pattern_id, positive, negative, time_window, event, now),
235 Pattern::Repeat {
236 pattern,
237 min_count,
238 max_count,
239 time_window,
240 } => self.match_repeat_pattern(
241 pattern_id,
242 RepeatMatchParams {
243 pattern,
244 min_count: *min_count,
245 max_count: *max_count,
246 time_window,
247 },
248 event,
249 now,
250 ),
251 Pattern::Statistical {
252 name,
253 stat_type,
254 threshold,
255 time_window,
256 } => self.match_statistical_pattern(
257 pattern_id,
258 StatisticalMatchParams {
259 name,
260 stat_type,
261 threshold: *threshold,
262 time_window,
263 },
264 event,
265 now,
266 ),
267 }
268 }
269
270 fn match_simple_pattern(
272 &mut self,
273 pattern_id: &str,
274 name: &str,
275 predicate: &str,
276 event: &StreamEvent,
277 now: DateTime<Utc>,
278 ) -> Result<Vec<PatternMatch>> {
279 if self.evaluate_predicate(predicate, event)? {
280 Ok(vec![PatternMatch {
281 pattern_id: pattern_id.to_string(),
282 pattern_name: name.to_string(),
283 events: vec![event.clone()],
284 start_time: now,
285 end_time: now,
286 confidence: 1.0,
287 metadata: HashMap::new(),
288 }])
289 } else {
290 Ok(vec![])
291 }
292 }
293
294 fn match_sequence_pattern(
296 &mut self,
297 pattern_id: &str,
298 patterns: &[Pattern],
299 max_distance: &Option<ChronoDuration>,
300 event: &StreamEvent,
301 now: DateTime<Utc>,
302 ) -> Result<Vec<PatternMatch>> {
303 let mut matches = Vec::new();
304
305 let existing_partials = self
307 .active_matches
308 .get(pattern_id)
309 .cloned()
310 .unwrap_or_default();
311
312 let mut new_partial_matches = Vec::new();
314
315 for partial in existing_partials.iter() {
316 if partial.current_state < patterns.len() {
317 let next_pattern = &patterns[partial.current_state];
318
319 let matches_next = self.evaluate_pattern_simple(next_pattern, event)?;
321
322 if matches_next {
323 if let Some(max_dist) = max_distance {
325 if now - partial.start_time > *max_dist {
326 continue;
327 }
328 }
329
330 let mut new_events = partial.matched_events.clone();
331 new_events.push(event.clone());
332
333 if partial.current_state + 1 == patterns.len() {
334 matches.push(PatternMatch {
336 pattern_id: pattern_id.to_string(),
337 pattern_name: "Sequence".to_string(),
338 events: new_events,
339 start_time: partial.start_time,
340 end_time: now,
341 confidence: 1.0,
342 metadata: HashMap::new(),
343 });
344 } else {
345 new_partial_matches.push(PartialMatch {
347 pattern_id: pattern_id.to_string(),
348 matched_events: new_events,
349 start_time: partial.start_time,
350 current_state: partial.current_state + 1,
351 });
352 }
353 }
354 }
355 }
356
357 if !patterns.is_empty() {
359 let first_pattern = &patterns[0];
360 let matches_first = self.evaluate_pattern_simple(first_pattern, event)?;
361
362 if matches_first {
363 if patterns.len() == 1 {
364 matches.push(PatternMatch {
366 pattern_id: pattern_id.to_string(),
367 pattern_name: "Sequence".to_string(),
368 events: vec![event.clone()],
369 start_time: now,
370 end_time: now,
371 confidence: 1.0,
372 metadata: HashMap::new(),
373 });
374 } else {
375 new_partial_matches.push(PartialMatch {
376 pattern_id: pattern_id.to_string(),
377 matched_events: vec![event.clone()],
378 start_time: now,
379 current_state: 1,
380 });
381 }
382 }
383 }
384
385 self.active_matches
387 .insert(pattern_id.to_string(), new_partial_matches.clone());
388 self.stats.partial_matches = new_partial_matches.len() as u64;
389
390 Ok(matches)
391 }
392
393 fn match_and_pattern(
395 &mut self,
396 pattern_id: &str,
397 patterns: &[Pattern],
398 time_window: &ChronoDuration,
399 _event: &StreamEvent,
400 now: DateTime<Utc>,
401 ) -> Result<Vec<PatternMatch>> {
402 let window_start = now - *time_window;
404 let recent_events: Vec<_> = self
405 .event_buffer
406 .iter()
407 .filter(|(_, timestamp)| *timestamp >= window_start)
408 .cloned()
409 .collect();
410
411 let mut all_matched = true;
413 let mut matched_events = Vec::new();
414
415 for pattern in patterns {
416 let mut pattern_matched = false;
417
418 for (evt, evt_time) in &recent_events {
419 let sub_matches = self.match_pattern(pattern_id, pattern, evt, *evt_time)?;
420
421 if !sub_matches.is_empty() {
422 pattern_matched = true;
423 matched_events.push(evt.clone());
424 break;
425 }
426 }
427
428 if !pattern_matched {
429 all_matched = false;
430 break;
431 }
432 }
433
434 if all_matched && !matched_events.is_empty() {
435 Ok(vec![PatternMatch {
436 pattern_id: pattern_id.to_string(),
437 pattern_name: "And".to_string(),
438 events: matched_events,
439 start_time: window_start,
440 end_time: now,
441 confidence: 1.0,
442 metadata: HashMap::new(),
443 }])
444 } else {
445 Ok(vec![])
446 }
447 }
448
449 fn match_or_pattern(
451 &mut self,
452 pattern_id: &str,
453 patterns: &[Pattern],
454 event: &StreamEvent,
455 now: DateTime<Utc>,
456 ) -> Result<Vec<PatternMatch>> {
457 for pattern in patterns {
458 let matches = self.match_pattern(pattern_id, pattern, event, now)?;
459 if !matches.is_empty() {
460 return Ok(matches);
461 }
462 }
463
464 Ok(vec![])
465 }
466
467 fn match_not_pattern(
469 &mut self,
470 pattern_id: &str,
471 positive: &Pattern,
472 negative: &Pattern,
473 time_window: &ChronoDuration,
474 event: &StreamEvent,
475 now: DateTime<Utc>,
476 ) -> Result<Vec<PatternMatch>> {
477 let positive_matches = self.match_pattern(pattern_id, positive, event, now)?;
479
480 if positive_matches.is_empty() {
481 return Ok(vec![]);
482 }
483
484 let window_start = now - *time_window;
486 let recent_events: Vec<_> = self
487 .event_buffer
488 .iter()
489 .filter(|(_, timestamp)| *timestamp >= window_start)
490 .cloned()
491 .collect();
492
493 for (evt, evt_time) in recent_events {
494 let negative_matches = self.match_pattern(pattern_id, negative, &evt, evt_time)?;
495
496 if !negative_matches.is_empty() {
497 return Ok(vec![]);
499 }
500 }
501
502 Ok(positive_matches)
504 }
505
506 fn match_repeat_pattern(
508 &mut self,
509 pattern_id: &str,
510 params: RepeatMatchParams,
511 _event: &StreamEvent,
512 now: DateTime<Utc>,
513 ) -> Result<Vec<PatternMatch>> {
514 let window_start = now - *params.time_window;
516 let mut matched_events = Vec::new();
517
518 let buffer_clone: Vec<(StreamEvent, DateTime<Utc>)> =
520 self.event_buffer.iter().cloned().collect();
521
522 for (evt, evt_time) in buffer_clone {
523 if evt_time >= window_start {
524 let matches = self.evaluate_pattern_simple(params.pattern, &evt)?;
525
526 if matches {
527 matched_events.push(evt.clone());
528 }
529 }
530 }
531
532 let match_count = matched_events.len();
533
534 if match_count >= params.min_count
535 && params.max_count.map_or(true, |max| match_count <= max)
536 {
537 Ok(vec![PatternMatch {
538 pattern_id: pattern_id.to_string(),
539 pattern_name: "Repeat".to_string(),
540 events: matched_events,
541 start_time: window_start,
542 end_time: now,
543 confidence: 1.0,
544 metadata: {
545 let mut meta = HashMap::new();
546 meta.insert(
547 "repeat_count".to_string(),
548 serde_json::Value::Number(match_count.into()),
549 );
550 meta
551 },
552 }])
553 } else {
554 Ok(vec![])
555 }
556 }
557
558 fn match_statistical_pattern(
560 &mut self,
561 pattern_id: &str,
562 params: StatisticalMatchParams,
563 _event: &StreamEvent,
564 now: DateTime<Utc>,
565 ) -> Result<Vec<PatternMatch>> {
566 let window_start = now - *params.time_window;
567 let recent_events: Vec<_> = self
568 .event_buffer
569 .iter()
570 .filter(|(_, timestamp)| *timestamp >= window_start)
571 .map(|(evt, _)| evt)
572 .cloned()
573 .collect();
574
575 if recent_events.is_empty() {
576 return Ok(vec![]);
577 }
578
579 match params.stat_type {
580 StatisticalPatternType::Frequency => {
581 let frequency =
582 recent_events.len() as f64 / params.time_window.num_seconds() as f64;
583
584 if frequency >= params.threshold {
585 Ok(vec![PatternMatch {
586 pattern_id: pattern_id.to_string(),
587 pattern_name: params.name.to_string(),
588 events: recent_events,
589 start_time: window_start,
590 end_time: now,
591 confidence: frequency / params.threshold,
592 metadata: {
593 let mut meta = HashMap::new();
594 meta.insert(
595 "frequency".to_string(),
596 serde_json::Value::Number(
597 serde_json::Number::from_f64(frequency).unwrap_or(0.into()),
598 ),
599 );
600 meta
601 },
602 }])
603 } else {
604 Ok(vec![])
605 }
606 }
607 StatisticalPatternType::Correlation { field_a, field_b } => {
608 let values_a: Vec<f64> = recent_events
610 .iter()
611 .filter_map(|evt| self.extract_numeric_value(evt, field_a))
612 .collect();
613
614 let values_b: Vec<f64> = recent_events
615 .iter()
616 .filter_map(|evt| self.extract_numeric_value(evt, field_b))
617 .collect();
618
619 if values_a.len() < 2 || values_b.len() < 2 {
620 return Ok(vec![]);
621 }
622
623 let min_len = values_a.len().min(values_b.len());
625 let arr_a = Array1::from_vec(values_a[..min_len].to_vec());
626 let arr_b = Array1::from_vec(values_b[..min_len].to_vec());
627
628 let correlation = compute_correlation(&arr_a, &arr_b)?;
630
631 if correlation.abs() >= params.threshold {
632 Ok(vec![PatternMatch {
633 pattern_id: pattern_id.to_string(),
634 pattern_name: params.name.to_string(),
635 events: recent_events,
636 start_time: window_start,
637 end_time: now,
638 confidence: correlation.abs(),
639 metadata: {
640 let mut meta = HashMap::new();
641 meta.insert(
642 "correlation".to_string(),
643 serde_json::Value::Number(
644 serde_json::Number::from_f64(correlation).unwrap_or(0.into()),
645 ),
646 );
647 meta
648 },
649 }])
650 } else {
651 Ok(vec![])
652 }
653 }
654 StatisticalPatternType::MovingAverage { field, window_size } => {
655 let values: Vec<f64> = recent_events
656 .iter()
657 .filter_map(|evt| self.extract_numeric_value(evt, field))
658 .collect();
659
660 if values.len() < *window_size {
661 return Ok(vec![]);
662 }
663
664 let arr = Array1::from_vec(values);
666 let ma = arr
667 .slice(s![arr.len() - window_size..])
668 .mean()
669 .unwrap_or(0.0);
670
671 if ma >= params.threshold {
672 Ok(vec![PatternMatch {
673 pattern_id: pattern_id.to_string(),
674 pattern_name: params.name.to_string(),
675 events: recent_events,
676 start_time: window_start,
677 end_time: now,
678 confidence: ma / params.threshold,
679 metadata: {
680 let mut meta = HashMap::new();
681 meta.insert(
682 "moving_average".to_string(),
683 serde_json::Value::Number(
684 serde_json::Number::from_f64(ma).unwrap_or(0.into()),
685 ),
686 );
687 meta
688 },
689 }])
690 } else {
691 Ok(vec![])
692 }
693 }
694 StatisticalPatternType::StdDev { field } => {
695 let values: Vec<f64> = recent_events
696 .iter()
697 .filter_map(|evt| self.extract_numeric_value(evt, field))
698 .collect();
699
700 if values.len() < 2 {
701 return Ok(vec![]);
702 }
703
704 let arr = Array1::from_vec(values);
706 let std_dev = arr.std(0.0);
707
708 if std_dev >= params.threshold {
709 Ok(vec![PatternMatch {
710 pattern_id: pattern_id.to_string(),
711 pattern_name: params.name.to_string(),
712 events: recent_events,
713 start_time: window_start,
714 end_time: now,
715 confidence: std_dev / params.threshold,
716 metadata: {
717 let mut meta = HashMap::new();
718 meta.insert(
719 "std_dev".to_string(),
720 serde_json::Value::Number(
721 serde_json::Number::from_f64(std_dev).unwrap_or(0.into()),
722 ),
723 );
724 meta
725 },
726 }])
727 } else {
728 Ok(vec![])
729 }
730 }
731 StatisticalPatternType::Anomaly { field, sensitivity } => {
732 let values: Vec<f64> = recent_events
733 .iter()
734 .filter_map(|evt| self.extract_numeric_value(evt, field))
735 .collect();
736
737 if values.len() < 3 {
738 return Ok(vec![]);
739 }
740
741 let arr = Array1::from_vec(values.clone());
743 let mean = arr.mean().unwrap_or(0.0);
744 let std_dev = arr.std(0.0);
745
746 let last_value = values.last().unwrap();
747 let z_score = if std_dev > 0.0 {
748 (last_value - mean).abs() / std_dev
749 } else {
750 0.0
751 };
752
753 if z_score >= params.threshold * sensitivity {
754 Ok(vec![PatternMatch {
755 pattern_id: pattern_id.to_string(),
756 pattern_name: params.name.to_string(),
757 events: recent_events,
758 start_time: window_start,
759 end_time: now,
760 confidence: z_score / (params.threshold * sensitivity),
761 metadata: {
762 let mut meta = HashMap::new();
763 meta.insert(
764 "z_score".to_string(),
765 serde_json::Value::Number(
766 serde_json::Number::from_f64(z_score).unwrap_or(0.into()),
767 ),
768 );
769 meta
770 },
771 }])
772 } else {
773 Ok(vec![])
774 }
775 }
776 }
777 }
778
779 fn evaluate_pattern_simple(&self, pattern: &Pattern, event: &StreamEvent) -> Result<bool> {
781 match pattern {
782 Pattern::Simple { predicate, .. } => self.evaluate_predicate(predicate, event),
783 _ => Ok(false), }
785 }
786
787 fn evaluate_predicate(&self, predicate: &str, event: &StreamEvent) -> Result<bool> {
789 match predicate {
792 "always" => Ok(true),
793 "never" => Ok(false),
794 pred if pred.starts_with("type:") => {
795 let expected_type = pred.strip_prefix("type:").unwrap();
796 Ok(self.get_event_type(event) == expected_type)
797 }
798 pred if pred.starts_with("subject:") => {
799 let expected_subject = pred.strip_prefix("subject:").unwrap();
800 Ok(self.get_event_subject(event) == Some(expected_subject.to_string()))
801 }
802 _ => Ok(false),
803 }
804 }
805
806 fn extract_numeric_value(&self, _event: &StreamEvent, _field: &str) -> Option<f64> {
808 Some(1.0)
810 }
811
812 fn get_event_type(&self, event: &StreamEvent) -> &str {
814 match event {
815 StreamEvent::TripleAdded { .. } => "triple_added",
816 StreamEvent::TripleRemoved { .. } => "triple_removed",
817 StreamEvent::QuadAdded { .. } => "quad_added",
818 StreamEvent::QuadRemoved { .. } => "quad_removed",
819 StreamEvent::GraphCreated { .. } => "graph_created",
820 StreamEvent::GraphCleared { .. } => "graph_cleared",
821 StreamEvent::GraphDeleted { .. } => "graph_deleted",
822 StreamEvent::TransactionBegin { .. } => "transaction_begin",
823 StreamEvent::TransactionCommit { .. } => "transaction_commit",
824 StreamEvent::TransactionAbort { .. } => "transaction_abort",
825 _ => "unknown",
826 }
827 }
828
829 fn get_event_subject(&self, event: &StreamEvent) -> Option<String> {
831 match event {
832 StreamEvent::TripleAdded { subject, .. } => Some(subject.clone()),
833 StreamEvent::TripleRemoved { subject, .. } => Some(subject.clone()),
834 StreamEvent::QuadAdded { subject, .. } => Some(subject.clone()),
835 StreamEvent::QuadRemoved { subject, .. } => Some(subject.clone()),
836 _ => None,
837 }
838 }
839
840 fn cleanup_expired_matches(&mut self, now: DateTime<Utc>) {
842 let timeout = ChronoDuration::minutes(5);
843
844 for (_, matches) in self.active_matches.iter_mut() {
845 matches.retain(|m| now - m.start_time < timeout);
846 }
847
848 self.active_matches.retain(|_, matches| !matches.is_empty());
849 }
850
851 pub fn completed_matches(&self) -> &VecDeque<PatternMatch> {
853 &self.completed_matches
854 }
855
856 pub fn stats(&self) -> &PatternMatcherStats {
858 &self.stats
859 }
860
861 pub fn reset(&mut self) {
863 self.active_matches.clear();
864 self.completed_matches.clear();
865 self.event_buffer.clear();
866 self.stats = PatternMatcherStats::default();
867 }
868}
869
870fn compute_correlation(a: &Array1<f64>, b: &Array1<f64>) -> Result<f64> {
872 if a.len() != b.len() || a.len() < 2 {
873 return Err(anyhow!(
874 "Arrays must have same length and at least 2 elements"
875 ));
876 }
877
878 let mean_a = a.mean().unwrap_or(0.0);
879 let mean_b = b.mean().unwrap_or(0.0);
880
881 let mut sum_product = 0.0;
882 let mut sum_sq_a = 0.0;
883 let mut sum_sq_b = 0.0;
884
885 for i in 0..a.len() {
886 let diff_a = a[i] - mean_a;
887 let diff_b = b[i] - mean_b;
888 sum_product += diff_a * diff_b;
889 sum_sq_a += diff_a * diff_a;
890 sum_sq_b += diff_b * diff_b;
891 }
892
893 let denominator = (sum_sq_a * sum_sq_b).sqrt();
894 if denominator == 0.0 {
895 Ok(0.0)
896 } else {
897 Ok(sum_product / denominator)
898 }
899}
900
901#[cfg(test)]
902mod tests {
903 use super::*;
904 use crate::event::EventMetadata;
905
906 fn create_test_event(subject: &str) -> StreamEvent {
907 StreamEvent::TripleAdded {
908 subject: subject.to_string(),
909 predicate: "test".to_string(),
910 object: "value".to_string(),
911 graph: None,
912 metadata: EventMetadata::default(),
913 }
914 }
915
916 #[tokio::test]
917 async fn test_simple_pattern() {
918 let mut matcher = PatternMatcher::new(100);
919
920 let pattern = Pattern::Simple {
921 name: "test_pattern".to_string(),
922 predicate: "type:triple_added".to_string(),
923 };
924
925 let pattern_id = matcher.register_pattern(pattern);
926
927 let event = create_test_event("test_subject");
928 let matches = matcher.process_event(event).unwrap();
929
930 assert_eq!(matches.len(), 1);
931 assert_eq!(matches[0].pattern_id, pattern_id);
932 }
933
934 #[tokio::test]
935 async fn test_sequence_pattern() {
936 let mut matcher = PatternMatcher::new(100);
937
938 let pattern = Pattern::Sequence {
939 patterns: vec![
940 Pattern::Simple {
941 name: "first".to_string(),
942 predicate: "type:triple_added".to_string(),
943 },
944 Pattern::Simple {
945 name: "second".to_string(),
946 predicate: "type:triple_added".to_string(),
947 },
948 ],
949 max_distance: Some(ChronoDuration::seconds(10)),
950 };
951
952 let _pattern_id = matcher.register_pattern(pattern);
953
954 let event1 = create_test_event("subject1");
955 let event2 = create_test_event("subject2");
956
957 let matches1 = matcher.process_event(event1).unwrap();
958 assert_eq!(matches1.len(), 0); let matches2 = matcher.process_event(event2).unwrap();
961 assert_eq!(matches2.len(), 1); }
963}