oxirs_stream/processing/
pattern.rs

1//! Complex Event Pattern Matching
2//!
3//! This module provides sophisticated pattern matching capabilities for stream processing:
4//! - Sequence patterns (A followed by B)
5//! - Conjunction patterns (A and B)
6//! - Disjunction patterns (A or B)
7//! - Negation patterns (A not followed by B)
8//! - Temporal constraints (within time window)
9//! - Statistical patterns (frequency, correlation)
10//!
11//! Uses SciRS2 for statistical analysis and pattern detection
12
13use crate::StreamEvent;
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, VecDeque};
18use uuid::Uuid;
19
20// Use scirs2-core for statistical operations
21use scirs2_core::ndarray_ext::{s, Array1};
22
23/// Parameters for repeat pattern matching
24#[derive(Debug, Clone)]
25struct RepeatMatchParams<'a> {
26    pattern: &'a Pattern,
27    min_count: usize,
28    max_count: Option<usize>,
29    time_window: &'a ChronoDuration,
30}
31
32/// Parameters for statistical pattern matching
33#[derive(Debug, Clone)]
34struct StatisticalMatchParams<'a> {
35    name: &'a str,
36    stat_type: &'a StatisticalPatternType,
37    threshold: f64,
38    time_window: &'a ChronoDuration,
39}
40
41/// Pattern matching strategy
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub enum PatternMatchStrategy {
44    /// Match any occurrence
45    Any,
46    /// Match all occurrences
47    All,
48    /// Match first occurrence only
49    First,
50    /// Match last occurrence only
51    Last,
52    /// Match with maximum score
53    BestMatch,
54}
55
56/// Pattern definition for complex event processing
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub enum Pattern {
59    /// Single event pattern with predicate
60    Simple { name: String, predicate: String },
61    /// Sequence pattern (A followed by B)
62    Sequence {
63        patterns: Vec<Pattern>,
64        max_distance: Option<ChronoDuration>,
65    },
66    /// Conjunction pattern (A and B occur together)
67    And {
68        patterns: Vec<Pattern>,
69        time_window: ChronoDuration,
70    },
71    /// Disjunction pattern (A or B occurs)
72    Or { patterns: Vec<Pattern> },
73    /// Negation pattern (A occurs without B)
74    Not {
75        positive: Box<Pattern>,
76        negative: Box<Pattern>,
77        time_window: ChronoDuration,
78    },
79    /// Repetition pattern (A occurs N times)
80    Repeat {
81        pattern: Box<Pattern>,
82        min_count: usize,
83        max_count: Option<usize>,
84        time_window: ChronoDuration,
85    },
86    /// Statistical pattern (correlation, frequency)
87    Statistical {
88        name: String,
89        stat_type: StatisticalPatternType,
90        threshold: f64,
91        time_window: ChronoDuration,
92    },
93}
94
95/// Statistical pattern types using SciRS2
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum StatisticalPatternType {
98    /// Frequency above threshold
99    Frequency,
100    /// Correlation between events
101    Correlation { field_a: String, field_b: String },
102    /// Moving average
103    MovingAverage { field: String, window_size: usize },
104    /// Standard deviation
105    StdDev { field: String },
106    /// Anomaly detection
107    Anomaly { field: String, sensitivity: f64 },
108}
109
110/// Pattern match result
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct PatternMatch {
113    pub pattern_id: String,
114    pub pattern_name: String,
115    pub events: Vec<StreamEvent>,
116    pub start_time: DateTime<Utc>,
117    pub end_time: DateTime<Utc>,
118    pub confidence: f64,
119    pub metadata: HashMap<String, serde_json::Value>,
120}
121
122/// Pattern matcher state
123pub struct PatternMatcher {
124    patterns: HashMap<String, Pattern>,
125    active_matches: HashMap<String, Vec<PartialMatch>>,
126    completed_matches: VecDeque<PatternMatch>,
127    event_buffer: VecDeque<(StreamEvent, DateTime<Utc>)>,
128    buffer_size: usize,
129    stats: PatternMatcherStats,
130}
131
132#[derive(Debug, Clone)]
133struct PartialMatch {
134    pattern_id: String,
135    matched_events: Vec<StreamEvent>,
136    start_time: DateTime<Utc>,
137    current_state: usize,
138}
139
140#[derive(Debug, Clone, Default)]
141pub struct PatternMatcherStats {
142    pub events_processed: u64,
143    pub patterns_matched: u64,
144    pub partial_matches: u64,
145    pub timeouts: u64,
146    pub processing_time_ms: f64,
147}
148
149impl PatternMatcher {
150    /// Create a new pattern matcher
151    pub fn new(buffer_size: usize) -> Self {
152        Self {
153            patterns: HashMap::new(),
154            active_matches: HashMap::new(),
155            completed_matches: VecDeque::new(),
156            event_buffer: VecDeque::new(),
157            buffer_size,
158            stats: PatternMatcherStats::default(),
159        }
160    }
161
162    /// Register a pattern
163    pub fn register_pattern(&mut self, pattern: Pattern) -> String {
164        let pattern_id = Uuid::new_v4().to_string();
165        self.patterns.insert(pattern_id.clone(), pattern);
166        pattern_id
167    }
168
169    /// Process an event through all patterns
170    pub fn process_event(&mut self, event: StreamEvent) -> Result<Vec<PatternMatch>> {
171        let start = std::time::Instant::now();
172        let now = Utc::now();
173
174        self.stats.events_processed += 1;
175
176        // Add to event buffer
177        self.event_buffer.push_back((event.clone(), now));
178        if self.event_buffer.len() > self.buffer_size {
179            self.event_buffer.pop_front();
180        }
181
182        // Check all registered patterns
183        let mut new_matches = Vec::new();
184
185        // Clone pattern list to avoid borrowing issues
186        let patterns: Vec<(String, Pattern)> = self
187            .patterns
188            .iter()
189            .map(|(k, v)| (k.clone(), v.clone()))
190            .collect();
191
192        for (pattern_id, pattern) in patterns {
193            match self.match_pattern(&pattern_id, &pattern, &event, now) {
194                Ok(matches) => new_matches.extend(matches),
195                Err(e) => tracing::warn!("Pattern matching error for {}: {}", pattern_id, e),
196            }
197        }
198
199        // Clean up expired partial matches
200        self.cleanup_expired_matches(now);
201
202        // Update statistics
203        self.stats.processing_time_ms += start.elapsed().as_secs_f64() * 1000.0;
204        self.stats.patterns_matched += new_matches.len() as u64;
205
206        Ok(new_matches)
207    }
208
209    /// Match a pattern against the event
210    fn match_pattern(
211        &mut self,
212        pattern_id: &str,
213        pattern: &Pattern,
214        event: &StreamEvent,
215        now: DateTime<Utc>,
216    ) -> Result<Vec<PatternMatch>> {
217        match pattern {
218            Pattern::Simple { name, predicate } => {
219                self.match_simple_pattern(pattern_id, name, predicate, event, now)
220            }
221            Pattern::Sequence {
222                patterns,
223                max_distance,
224            } => self.match_sequence_pattern(pattern_id, patterns, max_distance, event, now),
225            Pattern::And {
226                patterns,
227                time_window,
228            } => self.match_and_pattern(pattern_id, patterns, time_window, event, now),
229            Pattern::Or { patterns } => self.match_or_pattern(pattern_id, patterns, event, now),
230            Pattern::Not {
231                positive,
232                negative,
233                time_window,
234            } => self.match_not_pattern(pattern_id, positive, negative, time_window, event, now),
235            Pattern::Repeat {
236                pattern,
237                min_count,
238                max_count,
239                time_window,
240            } => self.match_repeat_pattern(
241                pattern_id,
242                RepeatMatchParams {
243                    pattern,
244                    min_count: *min_count,
245                    max_count: *max_count,
246                    time_window,
247                },
248                event,
249                now,
250            ),
251            Pattern::Statistical {
252                name,
253                stat_type,
254                threshold,
255                time_window,
256            } => self.match_statistical_pattern(
257                pattern_id,
258                StatisticalMatchParams {
259                    name,
260                    stat_type,
261                    threshold: *threshold,
262                    time_window,
263                },
264                event,
265                now,
266            ),
267        }
268    }
269
270    /// Match simple pattern
271    fn match_simple_pattern(
272        &mut self,
273        pattern_id: &str,
274        name: &str,
275        predicate: &str,
276        event: &StreamEvent,
277        now: DateTime<Utc>,
278    ) -> Result<Vec<PatternMatch>> {
279        if self.evaluate_predicate(predicate, event)? {
280            Ok(vec![PatternMatch {
281                pattern_id: pattern_id.to_string(),
282                pattern_name: name.to_string(),
283                events: vec![event.clone()],
284                start_time: now,
285                end_time: now,
286                confidence: 1.0,
287                metadata: HashMap::new(),
288            }])
289        } else {
290            Ok(vec![])
291        }
292    }
293
294    /// Match sequence pattern
295    fn match_sequence_pattern(
296        &mut self,
297        pattern_id: &str,
298        patterns: &[Pattern],
299        max_distance: &Option<ChronoDuration>,
300        event: &StreamEvent,
301        now: DateTime<Utc>,
302    ) -> Result<Vec<PatternMatch>> {
303        let mut matches = Vec::new();
304
305        // Clone existing partial matches to avoid borrowing issues
306        let existing_partials = self
307            .active_matches
308            .get(pattern_id)
309            .cloned()
310            .unwrap_or_default();
311
312        // Try to advance existing partial matches
313        let mut new_partial_matches = Vec::new();
314
315        for partial in existing_partials.iter() {
316            if partial.current_state < patterns.len() {
317                let next_pattern = &patterns[partial.current_state];
318
319                // Check if event matches next pattern using simple predicate evaluation
320                let matches_next = self.evaluate_pattern_simple(next_pattern, event)?;
321
322                if matches_next {
323                    // Check time distance constraint
324                    if let Some(max_dist) = max_distance {
325                        if now - partial.start_time > *max_dist {
326                            continue;
327                        }
328                    }
329
330                    let mut new_events = partial.matched_events.clone();
331                    new_events.push(event.clone());
332
333                    if partial.current_state + 1 == patterns.len() {
334                        // Complete match
335                        matches.push(PatternMatch {
336                            pattern_id: pattern_id.to_string(),
337                            pattern_name: "Sequence".to_string(),
338                            events: new_events,
339                            start_time: partial.start_time,
340                            end_time: now,
341                            confidence: 1.0,
342                            metadata: HashMap::new(),
343                        });
344                    } else {
345                        // Continue matching
346                        new_partial_matches.push(PartialMatch {
347                            pattern_id: pattern_id.to_string(),
348                            matched_events: new_events,
349                            start_time: partial.start_time,
350                            current_state: partial.current_state + 1,
351                        });
352                    }
353                }
354            }
355        }
356
357        // Start new partial matches
358        if !patterns.is_empty() {
359            let first_pattern = &patterns[0];
360            let matches_first = self.evaluate_pattern_simple(first_pattern, event)?;
361
362            if matches_first {
363                if patterns.len() == 1 {
364                    // Single pattern sequence - immediate match
365                    matches.push(PatternMatch {
366                        pattern_id: pattern_id.to_string(),
367                        pattern_name: "Sequence".to_string(),
368                        events: vec![event.clone()],
369                        start_time: now,
370                        end_time: now,
371                        confidence: 1.0,
372                        metadata: HashMap::new(),
373                    });
374                } else {
375                    new_partial_matches.push(PartialMatch {
376                        pattern_id: pattern_id.to_string(),
377                        matched_events: vec![event.clone()],
378                        start_time: now,
379                        current_state: 1,
380                    });
381                }
382            }
383        }
384
385        // Update active matches
386        self.active_matches
387            .insert(pattern_id.to_string(), new_partial_matches.clone());
388        self.stats.partial_matches = new_partial_matches.len() as u64;
389
390        Ok(matches)
391    }
392
393    /// Match AND pattern (all patterns must match within time window)
394    fn match_and_pattern(
395        &mut self,
396        pattern_id: &str,
397        patterns: &[Pattern],
398        time_window: &ChronoDuration,
399        _event: &StreamEvent,
400        now: DateTime<Utc>,
401    ) -> Result<Vec<PatternMatch>> {
402        // Collect events within time window
403        let window_start = now - *time_window;
404        let recent_events: Vec<_> = self
405            .event_buffer
406            .iter()
407            .filter(|(_, timestamp)| *timestamp >= window_start)
408            .cloned()
409            .collect();
410
411        // Check if all patterns match within the window
412        let mut all_matched = true;
413        let mut matched_events = Vec::new();
414
415        for pattern in patterns {
416            let mut pattern_matched = false;
417
418            for (evt, evt_time) in &recent_events {
419                let sub_matches = self.match_pattern(pattern_id, pattern, evt, *evt_time)?;
420
421                if !sub_matches.is_empty() {
422                    pattern_matched = true;
423                    matched_events.push(evt.clone());
424                    break;
425                }
426            }
427
428            if !pattern_matched {
429                all_matched = false;
430                break;
431            }
432        }
433
434        if all_matched && !matched_events.is_empty() {
435            Ok(vec![PatternMatch {
436                pattern_id: pattern_id.to_string(),
437                pattern_name: "And".to_string(),
438                events: matched_events,
439                start_time: window_start,
440                end_time: now,
441                confidence: 1.0,
442                metadata: HashMap::new(),
443            }])
444        } else {
445            Ok(vec![])
446        }
447    }
448
449    /// Match OR pattern (any pattern matches)
450    fn match_or_pattern(
451        &mut self,
452        pattern_id: &str,
453        patterns: &[Pattern],
454        event: &StreamEvent,
455        now: DateTime<Utc>,
456    ) -> Result<Vec<PatternMatch>> {
457        for pattern in patterns {
458            let matches = self.match_pattern(pattern_id, pattern, event, now)?;
459            if !matches.is_empty() {
460                return Ok(matches);
461            }
462        }
463
464        Ok(vec![])
465    }
466
467    /// Match NOT pattern (positive without negative)
468    fn match_not_pattern(
469        &mut self,
470        pattern_id: &str,
471        positive: &Pattern,
472        negative: &Pattern,
473        time_window: &ChronoDuration,
474        event: &StreamEvent,
475        now: DateTime<Utc>,
476    ) -> Result<Vec<PatternMatch>> {
477        // Check if positive pattern matches
478        let positive_matches = self.match_pattern(pattern_id, positive, event, now)?;
479
480        if positive_matches.is_empty() {
481            return Ok(vec![]);
482        }
483
484        // Check if negative pattern matches within time window
485        let window_start = now - *time_window;
486        let recent_events: Vec<_> = self
487            .event_buffer
488            .iter()
489            .filter(|(_, timestamp)| *timestamp >= window_start)
490            .cloned()
491            .collect();
492
493        for (evt, evt_time) in recent_events {
494            let negative_matches = self.match_pattern(pattern_id, negative, &evt, evt_time)?;
495
496            if !negative_matches.is_empty() {
497                // Negative pattern matched - no match
498                return Ok(vec![]);
499            }
500        }
501
502        // Positive matched, negative didn't - success
503        Ok(positive_matches)
504    }
505
506    /// Match repeat pattern
507    fn match_repeat_pattern(
508        &mut self,
509        pattern_id: &str,
510        params: RepeatMatchParams,
511        _event: &StreamEvent,
512        now: DateTime<Utc>,
513    ) -> Result<Vec<PatternMatch>> {
514        // Collect matching events within time window
515        let window_start = now - *params.time_window;
516        let mut matched_events = Vec::new();
517
518        // Clone event buffer to avoid borrowing issues
519        let buffer_clone: Vec<(StreamEvent, DateTime<Utc>)> =
520            self.event_buffer.iter().cloned().collect();
521
522        for (evt, evt_time) in buffer_clone {
523            if evt_time >= window_start {
524                let matches = self.evaluate_pattern_simple(params.pattern, &evt)?;
525
526                if matches {
527                    matched_events.push(evt.clone());
528                }
529            }
530        }
531
532        let match_count = matched_events.len();
533
534        if match_count >= params.min_count
535            && params.max_count.map_or(true, |max| match_count <= max)
536        {
537            Ok(vec![PatternMatch {
538                pattern_id: pattern_id.to_string(),
539                pattern_name: "Repeat".to_string(),
540                events: matched_events,
541                start_time: window_start,
542                end_time: now,
543                confidence: 1.0,
544                metadata: {
545                    let mut meta = HashMap::new();
546                    meta.insert(
547                        "repeat_count".to_string(),
548                        serde_json::Value::Number(match_count.into()),
549                    );
550                    meta
551                },
552            }])
553        } else {
554            Ok(vec![])
555        }
556    }
557
558    /// Match statistical pattern using SciRS2
559    fn match_statistical_pattern(
560        &mut self,
561        pattern_id: &str,
562        params: StatisticalMatchParams,
563        _event: &StreamEvent,
564        now: DateTime<Utc>,
565    ) -> Result<Vec<PatternMatch>> {
566        let window_start = now - *params.time_window;
567        let recent_events: Vec<_> = self
568            .event_buffer
569            .iter()
570            .filter(|(_, timestamp)| *timestamp >= window_start)
571            .map(|(evt, _)| evt)
572            .cloned()
573            .collect();
574
575        if recent_events.is_empty() {
576            return Ok(vec![]);
577        }
578
579        match params.stat_type {
580            StatisticalPatternType::Frequency => {
581                let frequency =
582                    recent_events.len() as f64 / params.time_window.num_seconds() as f64;
583
584                if frequency >= params.threshold {
585                    Ok(vec![PatternMatch {
586                        pattern_id: pattern_id.to_string(),
587                        pattern_name: params.name.to_string(),
588                        events: recent_events,
589                        start_time: window_start,
590                        end_time: now,
591                        confidence: frequency / params.threshold,
592                        metadata: {
593                            let mut meta = HashMap::new();
594                            meta.insert(
595                                "frequency".to_string(),
596                                serde_json::Value::Number(
597                                    serde_json::Number::from_f64(frequency).unwrap_or(0.into()),
598                                ),
599                            );
600                            meta
601                        },
602                    }])
603                } else {
604                    Ok(vec![])
605                }
606            }
607            StatisticalPatternType::Correlation { field_a, field_b } => {
608                // Extract field values and compute correlation using SciRS2
609                let values_a: Vec<f64> = recent_events
610                    .iter()
611                    .filter_map(|evt| self.extract_numeric_value(evt, field_a))
612                    .collect();
613
614                let values_b: Vec<f64> = recent_events
615                    .iter()
616                    .filter_map(|evt| self.extract_numeric_value(evt, field_b))
617                    .collect();
618
619                if values_a.len() < 2 || values_b.len() < 2 {
620                    return Ok(vec![]);
621                }
622
623                // Use scirs2-core for correlation computation
624                let min_len = values_a.len().min(values_b.len());
625                let arr_a = Array1::from_vec(values_a[..min_len].to_vec());
626                let arr_b = Array1::from_vec(values_b[..min_len].to_vec());
627
628                // Compute Pearson correlation coefficient manually
629                let correlation = compute_correlation(&arr_a, &arr_b)?;
630
631                if correlation.abs() >= params.threshold {
632                    Ok(vec![PatternMatch {
633                        pattern_id: pattern_id.to_string(),
634                        pattern_name: params.name.to_string(),
635                        events: recent_events,
636                        start_time: window_start,
637                        end_time: now,
638                        confidence: correlation.abs(),
639                        metadata: {
640                            let mut meta = HashMap::new();
641                            meta.insert(
642                                "correlation".to_string(),
643                                serde_json::Value::Number(
644                                    serde_json::Number::from_f64(correlation).unwrap_or(0.into()),
645                                ),
646                            );
647                            meta
648                        },
649                    }])
650                } else {
651                    Ok(vec![])
652                }
653            }
654            StatisticalPatternType::MovingAverage { field, window_size } => {
655                let values: Vec<f64> = recent_events
656                    .iter()
657                    .filter_map(|evt| self.extract_numeric_value(evt, field))
658                    .collect();
659
660                if values.len() < *window_size {
661                    return Ok(vec![]);
662                }
663
664                // Compute moving average using scirs2-core
665                let arr = Array1::from_vec(values);
666                let ma = arr
667                    .slice(s![arr.len() - window_size..])
668                    .mean()
669                    .unwrap_or(0.0);
670
671                if ma >= params.threshold {
672                    Ok(vec![PatternMatch {
673                        pattern_id: pattern_id.to_string(),
674                        pattern_name: params.name.to_string(),
675                        events: recent_events,
676                        start_time: window_start,
677                        end_time: now,
678                        confidence: ma / params.threshold,
679                        metadata: {
680                            let mut meta = HashMap::new();
681                            meta.insert(
682                                "moving_average".to_string(),
683                                serde_json::Value::Number(
684                                    serde_json::Number::from_f64(ma).unwrap_or(0.into()),
685                                ),
686                            );
687                            meta
688                        },
689                    }])
690                } else {
691                    Ok(vec![])
692                }
693            }
694            StatisticalPatternType::StdDev { field } => {
695                let values: Vec<f64> = recent_events
696                    .iter()
697                    .filter_map(|evt| self.extract_numeric_value(evt, field))
698                    .collect();
699
700                if values.len() < 2 {
701                    return Ok(vec![]);
702                }
703
704                // Compute standard deviation using scirs2-core
705                let arr = Array1::from_vec(values);
706                let std_dev = arr.std(0.0);
707
708                if std_dev >= params.threshold {
709                    Ok(vec![PatternMatch {
710                        pattern_id: pattern_id.to_string(),
711                        pattern_name: params.name.to_string(),
712                        events: recent_events,
713                        start_time: window_start,
714                        end_time: now,
715                        confidence: std_dev / params.threshold,
716                        metadata: {
717                            let mut meta = HashMap::new();
718                            meta.insert(
719                                "std_dev".to_string(),
720                                serde_json::Value::Number(
721                                    serde_json::Number::from_f64(std_dev).unwrap_or(0.into()),
722                                ),
723                            );
724                            meta
725                        },
726                    }])
727                } else {
728                    Ok(vec![])
729                }
730            }
731            StatisticalPatternType::Anomaly { field, sensitivity } => {
732                let values: Vec<f64> = recent_events
733                    .iter()
734                    .filter_map(|evt| self.extract_numeric_value(evt, field))
735                    .collect();
736
737                if values.len() < 3 {
738                    return Ok(vec![]);
739                }
740
741                // Simple anomaly detection using Z-score with scirs2-core
742                let arr = Array1::from_vec(values.clone());
743                let mean = arr.mean().unwrap_or(0.0);
744                let std_dev = arr.std(0.0);
745
746                let last_value = values.last().unwrap();
747                let z_score = if std_dev > 0.0 {
748                    (last_value - mean).abs() / std_dev
749                } else {
750                    0.0
751                };
752
753                if z_score >= params.threshold * sensitivity {
754                    Ok(vec![PatternMatch {
755                        pattern_id: pattern_id.to_string(),
756                        pattern_name: params.name.to_string(),
757                        events: recent_events,
758                        start_time: window_start,
759                        end_time: now,
760                        confidence: z_score / (params.threshold * sensitivity),
761                        metadata: {
762                            let mut meta = HashMap::new();
763                            meta.insert(
764                                "z_score".to_string(),
765                                serde_json::Value::Number(
766                                    serde_json::Number::from_f64(z_score).unwrap_or(0.into()),
767                                ),
768                            );
769                            meta
770                        },
771                    }])
772                } else {
773                    Ok(vec![])
774                }
775            }
776        }
777    }
778
779    /// Evaluate a pattern against an event (simple version without recursion)
780    fn evaluate_pattern_simple(&self, pattern: &Pattern, event: &StreamEvent) -> Result<bool> {
781        match pattern {
782            Pattern::Simple { predicate, .. } => self.evaluate_predicate(predicate, event),
783            _ => Ok(false), // Complex patterns not supported in simple evaluation
784        }
785    }
786
787    /// Evaluate a predicate against an event
788    fn evaluate_predicate(&self, predicate: &str, event: &StreamEvent) -> Result<bool> {
789        // Simple predicate evaluation
790        // In a real implementation, this would parse and evaluate expressions
791        match predicate {
792            "always" => Ok(true),
793            "never" => Ok(false),
794            pred if pred.starts_with("type:") => {
795                let expected_type = pred.strip_prefix("type:").unwrap();
796                Ok(self.get_event_type(event) == expected_type)
797            }
798            pred if pred.starts_with("subject:") => {
799                let expected_subject = pred.strip_prefix("subject:").unwrap();
800                Ok(self.get_event_subject(event) == Some(expected_subject.to_string()))
801            }
802            _ => Ok(false),
803        }
804    }
805
806    /// Extract numeric value from event
807    fn extract_numeric_value(&self, _event: &StreamEvent, _field: &str) -> Option<f64> {
808        // Simplified extraction - would need proper implementation
809        Some(1.0)
810    }
811
812    /// Get event type as string
813    fn get_event_type(&self, event: &StreamEvent) -> &str {
814        match event {
815            StreamEvent::TripleAdded { .. } => "triple_added",
816            StreamEvent::TripleRemoved { .. } => "triple_removed",
817            StreamEvent::QuadAdded { .. } => "quad_added",
818            StreamEvent::QuadRemoved { .. } => "quad_removed",
819            StreamEvent::GraphCreated { .. } => "graph_created",
820            StreamEvent::GraphCleared { .. } => "graph_cleared",
821            StreamEvent::GraphDeleted { .. } => "graph_deleted",
822            StreamEvent::TransactionBegin { .. } => "transaction_begin",
823            StreamEvent::TransactionCommit { .. } => "transaction_commit",
824            StreamEvent::TransactionAbort { .. } => "transaction_abort",
825            _ => "unknown",
826        }
827    }
828
829    /// Get event subject if available
830    fn get_event_subject(&self, event: &StreamEvent) -> Option<String> {
831        match event {
832            StreamEvent::TripleAdded { subject, .. } => Some(subject.clone()),
833            StreamEvent::TripleRemoved { subject, .. } => Some(subject.clone()),
834            StreamEvent::QuadAdded { subject, .. } => Some(subject.clone()),
835            StreamEvent::QuadRemoved { subject, .. } => Some(subject.clone()),
836            _ => None,
837        }
838    }
839
840    /// Clean up expired partial matches
841    fn cleanup_expired_matches(&mut self, now: DateTime<Utc>) {
842        let timeout = ChronoDuration::minutes(5);
843
844        for (_, matches) in self.active_matches.iter_mut() {
845            matches.retain(|m| now - m.start_time < timeout);
846        }
847
848        self.active_matches.retain(|_, matches| !matches.is_empty());
849    }
850
851    /// Get completed matches
852    pub fn completed_matches(&self) -> &VecDeque<PatternMatch> {
853        &self.completed_matches
854    }
855
856    /// Get statistics
857    pub fn stats(&self) -> &PatternMatcherStats {
858        &self.stats
859    }
860
861    /// Reset matcher state
862    pub fn reset(&mut self) {
863        self.active_matches.clear();
864        self.completed_matches.clear();
865        self.event_buffer.clear();
866        self.stats = PatternMatcherStats::default();
867    }
868}
869
870/// Compute Pearson correlation coefficient
871fn compute_correlation(a: &Array1<f64>, b: &Array1<f64>) -> Result<f64> {
872    if a.len() != b.len() || a.len() < 2 {
873        return Err(anyhow!(
874            "Arrays must have same length and at least 2 elements"
875        ));
876    }
877
878    let mean_a = a.mean().unwrap_or(0.0);
879    let mean_b = b.mean().unwrap_or(0.0);
880
881    let mut sum_product = 0.0;
882    let mut sum_sq_a = 0.0;
883    let mut sum_sq_b = 0.0;
884
885    for i in 0..a.len() {
886        let diff_a = a[i] - mean_a;
887        let diff_b = b[i] - mean_b;
888        sum_product += diff_a * diff_b;
889        sum_sq_a += diff_a * diff_a;
890        sum_sq_b += diff_b * diff_b;
891    }
892
893    let denominator = (sum_sq_a * sum_sq_b).sqrt();
894    if denominator == 0.0 {
895        Ok(0.0)
896    } else {
897        Ok(sum_product / denominator)
898    }
899}
900
901#[cfg(test)]
902mod tests {
903    use super::*;
904    use crate::event::EventMetadata;
905
906    fn create_test_event(subject: &str) -> StreamEvent {
907        StreamEvent::TripleAdded {
908            subject: subject.to_string(),
909            predicate: "test".to_string(),
910            object: "value".to_string(),
911            graph: None,
912            metadata: EventMetadata::default(),
913        }
914    }
915
916    #[tokio::test]
917    async fn test_simple_pattern() {
918        let mut matcher = PatternMatcher::new(100);
919
920        let pattern = Pattern::Simple {
921            name: "test_pattern".to_string(),
922            predicate: "type:triple_added".to_string(),
923        };
924
925        let pattern_id = matcher.register_pattern(pattern);
926
927        let event = create_test_event("test_subject");
928        let matches = matcher.process_event(event).unwrap();
929
930        assert_eq!(matches.len(), 1);
931        assert_eq!(matches[0].pattern_id, pattern_id);
932    }
933
934    #[tokio::test]
935    async fn test_sequence_pattern() {
936        let mut matcher = PatternMatcher::new(100);
937
938        let pattern = Pattern::Sequence {
939            patterns: vec![
940                Pattern::Simple {
941                    name: "first".to_string(),
942                    predicate: "type:triple_added".to_string(),
943                },
944                Pattern::Simple {
945                    name: "second".to_string(),
946                    predicate: "type:triple_added".to_string(),
947                },
948            ],
949            max_distance: Some(ChronoDuration::seconds(10)),
950        };
951
952        let _pattern_id = matcher.register_pattern(pattern);
953
954        let event1 = create_test_event("subject1");
955        let event2 = create_test_event("subject2");
956
957        let matches1 = matcher.process_event(event1).unwrap();
958        assert_eq!(matches1.len(), 0); // First event, no complete match yet
959
960        let matches2 = matcher.process_event(event2).unwrap();
961        assert_eq!(matches2.len(), 1); // Second event completes the sequence
962    }
963}