midstreamer_temporal_compare/
lib.rs

1//! # Temporal-Compare
2//!
3//! Advanced temporal sequence comparison and pattern matching.
4//!
5//! ## Features
6//! - Dynamic Time Warping (DTW)
7//! - Longest Common Subsequence (LCS)
8//! - Edit Distance (Levenshtein)
9//! - Pattern matching and detection
10//! - Efficient caching
11
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fmt;
15use std::hash::Hash;
16use thiserror::Error;
17use dashmap::DashMap;
18use lru::LruCache;
19use std::sync::{Arc, Mutex};
20use std::num::NonZeroUsize;
21
22/// Errors that can occur during temporal comparison
23#[derive(Debug, Error)]
24pub enum TemporalError {
25    #[error("Sequence too long: {0}")]
26    SequenceTooLong(usize),
27
28    #[error("Invalid algorithm: {0}")]
29    InvalidAlgorithm(String),
30
31    #[error("Cache error: {0}")]
32    CacheError(String),
33
34    #[error("Invalid pattern length: min={0}, max={1}")]
35    InvalidPatternLength(usize, usize),
36
37    #[error("Pattern not found")]
38    PatternNotFound,
39}
40
41/// A temporal sequence element
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct TemporalElement<T> {
44    pub value: T,
45    pub timestamp: u64,
46}
47
48/// A temporal sequence
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct Sequence<T> {
51    pub elements: Vec<TemporalElement<T>>,
52}
53
54impl<T> Sequence<T> {
55    pub fn new() -> Self {
56        Self { elements: Vec::new() }
57    }
58
59    pub fn push(&mut self, value: T, timestamp: u64) {
60        self.elements.push(TemporalElement { value, timestamp });
61    }
62
63    pub fn len(&self) -> usize {
64        self.elements.len()
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.elements.is_empty()
69    }
70}
71
72impl<T> Default for Sequence<T> {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78/// Comparison algorithm types
79#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
80pub enum ComparisonAlgorithm {
81    /// Dynamic Time Warping
82    DTW,
83    /// Longest Common Subsequence
84    LCS,
85    /// Edit Distance (Levenshtein)
86    EditDistance,
87    /// Euclidean distance
88    Euclidean,
89}
90
91/// Result of a temporal comparison
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ComparisonResult {
94    pub distance: f64,
95    pub algorithm: ComparisonAlgorithm,
96    pub alignment: Option<Vec<(usize, usize)>>,
97}
98
99/// Statistics about cache performance
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct CacheStats {
102    pub hits: u64,
103    pub misses: u64,
104    pub size: usize,
105    pub capacity: usize,
106}
107
108impl CacheStats {
109    pub fn hit_rate(&self) -> f64 {
110        if self.hits + self.misses == 0 {
111            0.0
112        } else {
113            self.hits as f64 / (self.hits + self.misses) as f64
114        }
115    }
116}
117
118/// A detected pattern in a sequence
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct Pattern<T> {
121    /// The pattern sequence
122    pub sequence: Vec<T>,
123    /// Starting indices of all occurrences
124    pub occurrences: Vec<usize>,
125    /// Confidence score (0.0 to 1.0)
126    pub confidence: f64,
127}
128
129impl<T> Pattern<T> {
130    /// Create a new pattern
131    pub fn new(sequence: Vec<T>, occurrences: Vec<usize>, confidence: f64) -> Self {
132        Self {
133            sequence,
134            occurrences,
135            confidence,
136        }
137    }
138
139    /// Get the number of times this pattern occurs
140    pub fn frequency(&self) -> usize {
141        self.occurrences.len()
142    }
143
144    /// Get the length of the pattern
145    pub fn length(&self) -> usize {
146        self.sequence.len()
147    }
148}
149
150/// Match result for similarity search
151#[derive(Debug, Clone, PartialEq)]
152pub struct SimilarityMatch {
153    /// Starting index in the haystack
154    pub start_index: usize,
155    /// Similarity score (0.0 to 1.0, higher is more similar)
156    pub similarity: f64,
157    /// DTW distance (lower is better)
158    pub distance: f64,
159}
160
161impl SimilarityMatch {
162    pub fn new(start_index: usize, distance: f64) -> Self {
163        // Convert distance to similarity score (inverse exponential decay)
164        let similarity = (-distance / 10.0).exp();
165        Self {
166            start_index,
167            similarity,
168            distance,
169        }
170    }
171}
172
173/// Temporal comparator with caching
174pub struct TemporalComparator<T> {
175    cache: Arc<Mutex<LruCache<String, ComparisonResult>>>,
176    pattern_cache: Arc<Mutex<LruCache<String, Vec<Pattern<T>>>>>,
177    similarity_cache: Arc<Mutex<LruCache<String, Vec<SimilarityMatch>>>>,
178    cache_hits: Arc<DashMap<String, u64>>,
179    cache_misses: Arc<DashMap<String, u64>>,
180    max_sequence_length: usize,
181}
182
183impl<T> TemporalComparator<T>
184where
185    T: Clone + PartialEq + fmt::Debug + Serialize + Hash + Eq,
186{
187    /// Create a new temporal comparator
188    pub fn new(cache_size: usize, max_sequence_length: usize) -> Self {
189        Self {
190            cache: Arc::new(Mutex::new(LruCache::new(
191                NonZeroUsize::new(cache_size).unwrap()
192            ))),
193            pattern_cache: Arc::new(Mutex::new(LruCache::new(
194                NonZeroUsize::new(cache_size).unwrap()
195            ))),
196            similarity_cache: Arc::new(Mutex::new(LruCache::new(
197                NonZeroUsize::new(cache_size).unwrap()
198            ))),
199            cache_hits: Arc::new(DashMap::new()),
200            cache_misses: Arc::new(DashMap::new()),
201            max_sequence_length,
202        }
203    }
204
205    /// Compare two sequences using the specified algorithm
206    pub fn compare(
207        &self,
208        seq1: &Sequence<T>,
209        seq2: &Sequence<T>,
210        algorithm: ComparisonAlgorithm,
211    ) -> Result<ComparisonResult, TemporalError> {
212        // Check sequence length
213        if seq1.len() > self.max_sequence_length || seq2.len() > self.max_sequence_length {
214            return Err(TemporalError::SequenceTooLong(
215                seq1.len().max(seq2.len())
216            ));
217        }
218
219        // Generate cache key
220        let cache_key = self.cache_key(seq1, seq2, algorithm);
221
222        // Check cache
223        if let Ok(mut cache) = self.cache.lock() {
224            if let Some(result) = cache.get(&cache_key) {
225                self.record_cache_hit(&cache_key);
226                return Ok(result.clone());
227            }
228        }
229
230        self.record_cache_miss(&cache_key);
231
232        // Compute comparison
233        let result = match algorithm {
234            ComparisonAlgorithm::DTW => self.dtw(seq1, seq2),
235            ComparisonAlgorithm::LCS => self.lcs(seq1, seq2),
236            ComparisonAlgorithm::EditDistance => self.edit_distance(seq1, seq2),
237            ComparisonAlgorithm::Euclidean => self.euclidean(seq1, seq2),
238        }?;
239
240        // Store in cache
241        if let Ok(mut cache) = self.cache.lock() {
242            cache.put(cache_key, result.clone());
243        }
244
245        Ok(result)
246    }
247
248    /// Dynamic Time Warping implementation
249    fn dtw(&self, seq1: &Sequence<T>, seq2: &Sequence<T>) -> Result<ComparisonResult, TemporalError> {
250        let n = seq1.len();
251        let m = seq2.len();
252
253        if n == 0 || m == 0 {
254            return Ok(ComparisonResult {
255                distance: (n + m) as f64,
256                algorithm: ComparisonAlgorithm::DTW,
257                alignment: None,
258            });
259        }
260
261        // Initialize DTW matrix
262        let mut dtw = vec![vec![f64::INFINITY; m + 1]; n + 1];
263        dtw[0][0] = 0.0;
264
265        // Fill DTW matrix
266        for i in 1..=n {
267            for j in 1..=m {
268                let cost = if seq1.elements[i-1].value == seq2.elements[j-1].value {
269                    0.0
270                } else {
271                    1.0
272                };
273
274                dtw[i][j] = cost + dtw[i-1][j-1].min(dtw[i-1][j]).min(dtw[i][j-1]);
275            }
276        }
277
278        // Backtrack for alignment
279        let mut alignment = Vec::new();
280        let (mut i, mut j) = (n, m);
281
282        while i > 0 && j > 0 {
283            alignment.push((i - 1, j - 1));
284
285            let min_val = dtw[i-1][j-1].min(dtw[i-1][j]).min(dtw[i][j-1]);
286
287            if dtw[i-1][j-1] == min_val {
288                i -= 1;
289                j -= 1;
290            } else if dtw[i-1][j] == min_val {
291                i -= 1;
292            } else {
293                j -= 1;
294            }
295        }
296
297        alignment.reverse();
298
299        Ok(ComparisonResult {
300            distance: dtw[n][m],
301            algorithm: ComparisonAlgorithm::DTW,
302            alignment: Some(alignment),
303        })
304    }
305
306    /// Longest Common Subsequence implementation
307    fn lcs(&self, seq1: &Sequence<T>, seq2: &Sequence<T>) -> Result<ComparisonResult, TemporalError> {
308        let n = seq1.len();
309        let m = seq2.len();
310
311        let mut dp = vec![vec![0; m + 1]; n + 1];
312
313        for i in 1..=n {
314            for j in 1..=m {
315                if seq1.elements[i-1].value == seq2.elements[j-1].value {
316                    dp[i][j] = dp[i-1][j-1] + 1;
317                } else {
318                    dp[i][j] = dp[i-1][j].max(dp[i][j-1]);
319                }
320            }
321        }
322
323        let lcs_length = dp[n][m];
324        let distance = (n + m - 2 * lcs_length) as f64;
325
326        Ok(ComparisonResult {
327            distance,
328            algorithm: ComparisonAlgorithm::LCS,
329            alignment: None,
330        })
331    }
332
333    /// Edit Distance (Levenshtein) implementation
334    fn edit_distance(&self, seq1: &Sequence<T>, seq2: &Sequence<T>) -> Result<ComparisonResult, TemporalError> {
335        let n = seq1.len();
336        let m = seq2.len();
337
338        let mut dp = vec![vec![0; m + 1]; n + 1];
339
340        for i in 0..=n {
341            dp[i][0] = i;
342        }
343        for j in 0..=m {
344            dp[0][j] = j;
345        }
346
347        for i in 1..=n {
348            for j in 1..=m {
349                let cost = if seq1.elements[i-1].value == seq2.elements[j-1].value {
350                    0
351                } else {
352                    1
353                };
354
355                dp[i][j] = (dp[i-1][j] + 1)
356                    .min(dp[i][j-1] + 1)
357                    .min(dp[i-1][j-1] + cost);
358            }
359        }
360
361        Ok(ComparisonResult {
362            distance: dp[n][m] as f64,
363            algorithm: ComparisonAlgorithm::EditDistance,
364            alignment: None,
365        })
366    }
367
368    /// Euclidean distance (for numeric sequences)
369    fn euclidean(&self, seq1: &Sequence<T>, seq2: &Sequence<T>) -> Result<ComparisonResult, TemporalError> {
370        let n = seq1.len().min(seq2.len());
371        let mut sum: f64 = 0.0;
372
373        for i in 0..n {
374            // Simplified: just count mismatches
375            if seq1.elements[i].value != seq2.elements[i].value {
376                sum += 1.0;
377            }
378        }
379
380        Ok(ComparisonResult {
381            distance: sum.sqrt(), // f64 type is now explicit from declaration
382            algorithm: ComparisonAlgorithm::Euclidean,
383            alignment: None,
384        })
385    }
386
387    /// Generate cache key for a comparison
388    fn cache_key(&self, seq1: &Sequence<T>, seq2: &Sequence<T>, algorithm: ComparisonAlgorithm) -> String {
389        format!(
390            "{:?}:{:?}:{:?}",
391            seq1.elements.len(),
392            seq2.elements.len(),
393            algorithm
394        )
395    }
396
397    fn record_cache_hit(&self, key: &str) {
398        self.cache_hits.entry(key.to_string())
399            .and_modify(|v| *v += 1)
400            .or_insert(1);
401    }
402
403    fn record_cache_miss(&self, key: &str) {
404        self.cache_misses.entry(key.to_string())
405            .and_modify(|v| *v += 1)
406            .or_insert(1);
407    }
408
409    /// Get cache statistics
410    pub fn cache_stats(&self) -> CacheStats {
411        let hits: u64 = self.cache_hits.iter().map(|r| *r.value()).sum();
412        let misses: u64 = self.cache_misses.iter().map(|r| *r.value()).sum();
413
414        let (size, capacity) = if let Ok(cache) = self.cache.lock() {
415            (cache.len(), cache.cap().get())
416        } else {
417            (0, 0)
418        };
419
420        CacheStats {
421            hits,
422            misses,
423            size,
424            capacity,
425        }
426    }
427
428    /// Clear the cache
429    pub fn clear_cache(&self) {
430        if let Ok(mut cache) = self.cache.lock() {
431            cache.clear();
432        }
433        if let Ok(mut cache) = self.pattern_cache.lock() {
434            cache.clear();
435        }
436        if let Ok(mut cache) = self.similarity_cache.lock() {
437            cache.clear();
438        }
439        self.cache_hits.clear();
440        self.cache_misses.clear();
441    }
442
443    /// Find similar sequences within a haystack using generic types
444    pub fn find_similar_generic(
445        &self,
446        haystack: &[T],
447        needle: &[T],
448        threshold: f64,
449    ) -> Result<Vec<SimilarityMatch>, TemporalError> {
450        if needle.is_empty() || haystack.len() < needle.len() {
451            return Ok(Vec::new());
452        }
453
454        // Generate cache key
455        let cache_key = format!(
456            "similar:{:?}:{:?}:{}",
457            haystack.len(),
458            needle.len(),
459            threshold
460        );
461
462        // Check cache
463        if let Ok(mut cache) = self.similarity_cache.lock() {
464            if let Some(results) = cache.get(&cache_key) {
465                self.record_cache_hit(&cache_key);
466                return Ok(results.clone());
467            }
468        }
469
470        self.record_cache_miss(&cache_key);
471
472        let needle_len = needle.len();
473        let mut matches = Vec::new();
474
475        // Sliding window approach
476        for start_idx in 0..=(haystack.len() - needle_len) {
477            let window = &haystack[start_idx..start_idx + needle_len];
478
479            // Convert to Sequence for comparison
480            let mut seq1 = Sequence::new();
481            for (i, item) in window.iter().enumerate() {
482                seq1.push(item.clone(), i as u64);
483            }
484
485            let mut seq2 = Sequence::new();
486            for (i, item) in needle.iter().enumerate() {
487                seq2.push(item.clone(), i as u64);
488            }
489
490            // Compute DTW distance
491            if let Ok(result) = self.dtw(&seq1, &seq2) {
492                // Normalize distance by pattern length
493                let normalized_distance = result.distance / needle_len as f64;
494
495                if normalized_distance <= threshold {
496                    matches.push(SimilarityMatch::new(start_idx, result.distance));
497                }
498            }
499        }
500
501        // Sort by distance (best matches first)
502        matches.sort_by(|a, b| {
503            a.distance
504                .partial_cmp(&b.distance)
505                .unwrap_or(std::cmp::Ordering::Equal)
506        });
507
508        // Store in cache
509        if let Ok(mut cache) = self.similarity_cache.lock() {
510            cache.put(cache_key, matches.clone());
511        }
512
513        Ok(matches)
514    }
515
516    /// Detect recurring patterns in a sequence
517    pub fn detect_recurring_patterns(
518        &self,
519        sequence: &[T],
520        min_length: usize,
521        max_length: usize,
522    ) -> Result<Vec<Pattern<T>>, TemporalError> {
523        if min_length > max_length {
524            return Err(TemporalError::InvalidPatternLength(min_length, max_length));
525        }
526
527        if sequence.len() < min_length {
528            return Ok(Vec::new());
529        }
530
531        // Generate cache key
532        let cache_key = format!(
533            "patterns:{:?}:{}:{}",
534            sequence.len(),
535            min_length,
536            max_length
537        );
538
539        // Check cache
540        if let Ok(mut cache) = self.pattern_cache.lock() {
541            if let Some(patterns) = cache.get(&cache_key) {
542                self.record_cache_hit(&cache_key);
543                return Ok(patterns.clone());
544            }
545        }
546
547        self.record_cache_miss(&cache_key);
548
549        let mut pattern_map: HashMap<Vec<T>, Vec<usize>> = HashMap::new();
550
551        // Search for patterns of each length
552        for pattern_len in min_length..=max_length.min(sequence.len()) {
553            for start_idx in 0..=(sequence.len() - pattern_len) {
554                let pattern_seq = sequence[start_idx..start_idx + pattern_len].to_vec();
555
556                pattern_map
557                    .entry(pattern_seq)
558                    .or_insert_with(Vec::new)
559                    .push(start_idx);
560            }
561        }
562
563        // Filter patterns that occur at least twice
564        let mut patterns: Vec<Pattern<T>> = pattern_map
565            .into_iter()
566            .filter(|(_, occurrences)| occurrences.len() >= 2)
567            .map(|(seq, occurrences)| {
568                // Calculate confidence based on frequency and pattern length
569                let frequency = occurrences.len() as f64;
570                let pattern_len = seq.len() as f64;
571                let total_possible = (sequence.len() - seq.len() + 1) as f64;
572
573                // Confidence is weighted by frequency and pattern length
574                let confidence = ((frequency / total_possible) * (pattern_len / max_length as f64))
575                    .min(1.0);
576
577                Pattern::new(seq, occurrences, confidence)
578            })
579            .collect();
580
581        // Sort by frequency (most common first), then by confidence
582        patterns.sort_by(|a, b| {
583            b.frequency()
584                .cmp(&a.frequency())
585                .then_with(|| {
586                    b.confidence
587                        .partial_cmp(&a.confidence)
588                        .unwrap_or(std::cmp::Ordering::Equal)
589                })
590        });
591
592        // Store in cache
593        if let Ok(mut cache) = self.pattern_cache.lock() {
594            cache.put(cache_key, patterns.clone());
595        }
596
597        Ok(patterns)
598    }
599}
600
601impl<T> Default for TemporalComparator<T>
602where
603    T: Clone + PartialEq + fmt::Debug + Serialize + Hash + Eq,
604{
605    fn default() -> Self {
606        Self::new(1000, 10000)
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    #[test]
615    fn test_sequence_creation() {
616        let mut seq: Sequence<i32> = Sequence::new();
617        seq.push(1, 100);
618        seq.push(2, 200);
619
620        assert_eq!(seq.len(), 2);
621        assert!(!seq.is_empty());
622    }
623
624    #[test]
625    fn test_dtw() {
626        let comparator = TemporalComparator::new(100, 1000);
627
628        let mut seq1: Sequence<i32> = Sequence::new();
629        seq1.push(1, 100);
630        seq1.push(2, 200);
631        seq1.push(3, 300);
632
633        let mut seq2: Sequence<i32> = Sequence::new();
634        seq2.push(1, 100);
635        seq2.push(2, 200);
636        seq2.push(3, 300);
637
638        let result = comparator.compare(&seq1, &seq2, ComparisonAlgorithm::DTW).unwrap();
639        assert_eq!(result.distance, 0.0);
640    }
641
642    #[test]
643    fn test_cache() {
644        let comparator = TemporalComparator::new(100, 1000);
645
646        let mut seq1: Sequence<i32> = Sequence::new();
647        seq1.push(1, 1);
648        seq1.push(2, 2);
649
650        let mut seq2: Sequence<i32> = Sequence::new();
651        seq2.push(1, 1);
652        seq2.push(2, 2);
653
654        // First comparison - cache miss
655        comparator.compare(&seq1, &seq2, ComparisonAlgorithm::DTW).unwrap();
656
657        // Second comparison - cache hit
658        comparator.compare(&seq1, &seq2, ComparisonAlgorithm::DTW).unwrap();
659
660        let stats = comparator.cache_stats();
661        assert_eq!(stats.hits, 1);
662        assert_eq!(stats.misses, 1);
663    }
664
665    #[test]
666    fn test_find_similar_generic_integers() {
667        let comparator: TemporalComparator<i32> = TemporalComparator::new(100, 1000);
668
669        let haystack = vec![1, 2, 3, 4, 5, 3, 4, 5];
670        let needle = vec![3, 4, 5];
671
672        let matches = comparator.find_similar_generic(&haystack, &needle, 0.1).unwrap();
673
674        assert_eq!(matches.len(), 2);
675        assert_eq!(matches[0].start_index, 2);
676        assert_eq!(matches[1].start_index, 5);
677        assert!(matches[0].similarity > 0.9); // High similarity for exact match
678    }
679
680    #[test]
681    fn test_detect_recurring_patterns_simple() {
682        let comparator: TemporalComparator<char> = TemporalComparator::new(100, 1000);
683
684        let sequence = vec!['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'];
685
686        let patterns = comparator.detect_recurring_patterns(&sequence, 2, 4).unwrap();
687
688        assert!(!patterns.is_empty());
689        // Should find 'abc' pattern recurring
690        let abc_pattern = patterns.iter().find(|p| p.sequence == vec!['a', 'b', 'c']);
691        assert!(abc_pattern.is_some());
692
693        let pattern = abc_pattern.unwrap();
694        assert_eq!(pattern.frequency(), 3);
695        assert!(pattern.confidence > 0.0);
696    }
697}