Skip to main content

cortexai_data/
metrics.rs

1//! Data matching metrics and observability
2//!
3//! Comprehensive metrics for monitoring data matching performance.
4
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::time::Duration;
7
8/// Metrics for data matching operations
9#[derive(Debug, Default)]
10pub struct DataMatchingMetrics {
11    /// Total number of queries processed
12    total_queries: AtomicU64,
13    /// Number of successful matches
14    successful_matches: AtomicU64,
15    /// Number of failed/no matches
16    failed_matches: AtomicU64,
17    /// Cache hits
18    cache_hits: AtomicU64,
19    /// Cache misses
20    cache_misses: AtomicU64,
21    /// Cache evictions
22    cache_evictions: AtomicU64,
23    /// Negative cache hits (cached "not found")
24    negative_cache_hits: AtomicU64,
25    /// Processing time in microseconds (exponential moving average)
26    processing_time_ema: AtomicU64,
27    /// Peak processing time in microseconds
28    peak_processing_time: AtomicU64,
29    /// Total records scanned
30    records_scanned: AtomicU64,
31    /// Memory usage estimate in bytes
32    memory_usage: AtomicUsize,
33    /// CPF matches found
34    cpf_matches: AtomicU64,
35    /// CNPJ matches found
36    cnpj_matches: AtomicU64,
37    /// Fuzzy name matches found
38    fuzzy_matches: AtomicU64,
39    /// Cross-source matches (entity found in multiple sources)
40    cross_source_matches: AtomicU64,
41}
42
43impl DataMatchingMetrics {
44    /// Create new metrics instance
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    /// Record a query result
50    pub fn record_query(&self, success: bool, duration: Duration) {
51        self.total_queries.fetch_add(1, Ordering::Relaxed);
52
53        if success {
54            self.successful_matches.fetch_add(1, Ordering::Relaxed);
55        } else {
56            self.failed_matches.fetch_add(1, Ordering::Relaxed);
57        }
58
59        // Update exponential moving average (EMA) for processing time
60        // α = 0.3 gives more weight to recent values
61        let duration_us = duration.as_micros() as u64;
62        let prev = self.processing_time_ema.load(Ordering::Relaxed);
63        let new_ema = if prev == 0 {
64            duration_us
65        } else {
66            // EMA = α * current + (1 - α) * previous
67            // Using integer math: (3 * current + 7 * previous) / 10
68            (duration_us * 3 + prev * 7) / 10
69        };
70        self.processing_time_ema.store(new_ema, Ordering::Relaxed);
71
72        // Update peak if this is the highest
73        self.peak_processing_time
74            .fetch_max(duration_us, Ordering::Relaxed);
75    }
76
77    /// Record cache access
78    pub fn record_cache(&self, hit: bool, negative: bool) {
79        if hit {
80            if negative {
81                self.negative_cache_hits.fetch_add(1, Ordering::Relaxed);
82            } else {
83                self.cache_hits.fetch_add(1, Ordering::Relaxed);
84            }
85        } else {
86            self.cache_misses.fetch_add(1, Ordering::Relaxed);
87        }
88    }
89
90    /// Record cache eviction
91    pub fn record_eviction(&self) {
92        self.cache_evictions.fetch_add(1, Ordering::Relaxed);
93    }
94
95    /// Record records scanned
96    pub fn record_scan(&self, count: u64) {
97        self.records_scanned.fetch_add(count, Ordering::Relaxed);
98    }
99
100    /// Record match type
101    pub fn record_match_type(&self, cpf: bool, cnpj: bool, fuzzy: bool, cross_source: bool) {
102        if cpf {
103            self.cpf_matches.fetch_add(1, Ordering::Relaxed);
104        }
105        if cnpj {
106            self.cnpj_matches.fetch_add(1, Ordering::Relaxed);
107        }
108        if fuzzy {
109            self.fuzzy_matches.fetch_add(1, Ordering::Relaxed);
110        }
111        if cross_source {
112            self.cross_source_matches.fetch_add(1, Ordering::Relaxed);
113        }
114    }
115
116    /// Update memory usage estimate
117    pub fn set_memory_usage(&self, bytes: usize) {
118        self.memory_usage.store(bytes, Ordering::Relaxed);
119    }
120
121    /// Get cache hit rate (0.0 - 1.0)
122    pub fn cache_hit_rate(&self) -> f64 {
123        let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
124        let negative_hits = self.negative_cache_hits.load(Ordering::Relaxed) as f64;
125        let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
126        let total = hits + negative_hits + misses;
127
128        if total == 0.0 {
129            0.0
130        } else {
131            (hits + negative_hits) / total
132        }
133    }
134
135    /// Get success rate (0.0 - 1.0)
136    pub fn success_rate(&self) -> f64 {
137        let successful = self.successful_matches.load(Ordering::Relaxed) as f64;
138        let total = self.total_queries.load(Ordering::Relaxed) as f64;
139
140        if total == 0.0 {
141            0.0
142        } else {
143            successful / total
144        }
145    }
146
147    /// Get average processing time in milliseconds
148    pub fn avg_processing_time_ms(&self) -> f64 {
149        self.processing_time_ema.load(Ordering::Relaxed) as f64 / 1000.0
150    }
151
152    /// Get peak processing time in milliseconds
153    pub fn peak_processing_time_ms(&self) -> f64 {
154        self.peak_processing_time.load(Ordering::Relaxed) as f64 / 1000.0
155    }
156
157    /// Get throughput (records per second based on EMA)
158    pub fn throughput_rps(&self) -> f64 {
159        let avg_time_us = self.processing_time_ema.load(Ordering::Relaxed) as f64;
160        if avg_time_us == 0.0 {
161            0.0
162        } else {
163            1_000_000.0 / avg_time_us
164        }
165    }
166
167    /// Get a snapshot of all metrics
168    pub fn snapshot(&self) -> MetricsSnapshot {
169        MetricsSnapshot {
170            total_queries: self.total_queries.load(Ordering::Relaxed),
171            successful_matches: self.successful_matches.load(Ordering::Relaxed),
172            failed_matches: self.failed_matches.load(Ordering::Relaxed),
173            cache_hits: self.cache_hits.load(Ordering::Relaxed),
174            cache_misses: self.cache_misses.load(Ordering::Relaxed),
175            cache_evictions: self.cache_evictions.load(Ordering::Relaxed),
176            negative_cache_hits: self.negative_cache_hits.load(Ordering::Relaxed),
177            cache_hit_rate: self.cache_hit_rate(),
178            success_rate: self.success_rate(),
179            avg_processing_time_ms: self.avg_processing_time_ms(),
180            peak_processing_time_ms: self.peak_processing_time_ms(),
181            throughput_rps: self.throughput_rps(),
182            records_scanned: self.records_scanned.load(Ordering::Relaxed),
183            memory_usage_bytes: self.memory_usage.load(Ordering::Relaxed),
184            cpf_matches: self.cpf_matches.load(Ordering::Relaxed),
185            cnpj_matches: self.cnpj_matches.load(Ordering::Relaxed),
186            fuzzy_matches: self.fuzzy_matches.load(Ordering::Relaxed),
187            cross_source_matches: self.cross_source_matches.load(Ordering::Relaxed),
188        }
189    }
190
191    /// Reset all metrics
192    pub fn reset(&self) {
193        self.total_queries.store(0, Ordering::Relaxed);
194        self.successful_matches.store(0, Ordering::Relaxed);
195        self.failed_matches.store(0, Ordering::Relaxed);
196        self.cache_hits.store(0, Ordering::Relaxed);
197        self.cache_misses.store(0, Ordering::Relaxed);
198        self.cache_evictions.store(0, Ordering::Relaxed);
199        self.negative_cache_hits.store(0, Ordering::Relaxed);
200        self.processing_time_ema.store(0, Ordering::Relaxed);
201        self.peak_processing_time.store(0, Ordering::Relaxed);
202        self.records_scanned.store(0, Ordering::Relaxed);
203        self.cpf_matches.store(0, Ordering::Relaxed);
204        self.cnpj_matches.store(0, Ordering::Relaxed);
205        self.fuzzy_matches.store(0, Ordering::Relaxed);
206        self.cross_source_matches.store(0, Ordering::Relaxed);
207    }
208}
209
210/// Snapshot of metrics at a point in time
211#[derive(Debug, Clone, serde::Serialize)]
212pub struct MetricsSnapshot {
213    pub total_queries: u64,
214    pub successful_matches: u64,
215    pub failed_matches: u64,
216    pub cache_hits: u64,
217    pub cache_misses: u64,
218    pub cache_evictions: u64,
219    pub negative_cache_hits: u64,
220    pub cache_hit_rate: f64,
221    pub success_rate: f64,
222    pub avg_processing_time_ms: f64,
223    pub peak_processing_time_ms: f64,
224    pub throughput_rps: f64,
225    pub records_scanned: u64,
226    pub memory_usage_bytes: usize,
227    pub cpf_matches: u64,
228    pub cnpj_matches: u64,
229    pub fuzzy_matches: u64,
230    pub cross_source_matches: u64,
231}
232
233impl std::fmt::Display for MetricsSnapshot {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        writeln!(f, "Data Matching Metrics:")?;
236        writeln!(
237            f,
238            "  Queries: {} total, {} successful ({:.1}%)",
239            self.total_queries,
240            self.successful_matches,
241            self.success_rate * 100.0
242        )?;
243        writeln!(
244            f,
245            "  Cache: {:.1}% hit rate ({} hits, {} misses, {} negative)",
246            self.cache_hit_rate * 100.0,
247            self.cache_hits,
248            self.cache_misses,
249            self.negative_cache_hits
250        )?;
251        writeln!(
252            f,
253            "  Performance: {:.2}ms avg, {:.2}ms peak, {:.1} queries/sec",
254            self.avg_processing_time_ms, self.peak_processing_time_ms, self.throughput_rps
255        )?;
256        writeln!(
257            f,
258            "  Matches: {} CPF, {} CNPJ, {} fuzzy, {} cross-source",
259            self.cpf_matches, self.cnpj_matches, self.fuzzy_matches, self.cross_source_matches
260        )?;
261        writeln!(
262            f,
263            "  Records scanned: {}, Memory: {} bytes",
264            self.records_scanned, self.memory_usage_bytes
265        )?;
266        Ok(())
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn test_metrics_basic() {
276        let metrics = DataMatchingMetrics::new();
277
278        metrics.record_query(true, Duration::from_micros(1000));
279        metrics.record_query(true, Duration::from_micros(2000));
280        metrics.record_query(false, Duration::from_micros(500));
281
282        let snapshot = metrics.snapshot();
283        assert_eq!(snapshot.total_queries, 3);
284        assert_eq!(snapshot.successful_matches, 2);
285        assert_eq!(snapshot.failed_matches, 1);
286    }
287
288    #[test]
289    fn test_cache_hit_rate() {
290        let metrics = DataMatchingMetrics::new();
291
292        metrics.record_cache(true, false); // hit
293        metrics.record_cache(true, false); // hit
294        metrics.record_cache(false, false); // miss
295
296        let rate = metrics.cache_hit_rate();
297        assert!((rate - 0.666).abs() < 0.01);
298    }
299
300    #[test]
301    fn test_negative_cache() {
302        let metrics = DataMatchingMetrics::new();
303
304        metrics.record_cache(true, true); // negative hit
305        metrics.record_cache(true, false); // regular hit
306        metrics.record_cache(false, false); // miss
307
308        let snapshot = metrics.snapshot();
309        assert_eq!(snapshot.negative_cache_hits, 1);
310        assert_eq!(snapshot.cache_hits, 1);
311        assert_eq!(snapshot.cache_misses, 1);
312        // Both negative and regular hits count toward hit rate
313        assert!((snapshot.cache_hit_rate - 0.666).abs() < 0.01);
314    }
315
316    #[test]
317    fn test_ema_processing_time() {
318        let metrics = DataMatchingMetrics::new();
319
320        // First value sets the baseline
321        metrics.record_query(true, Duration::from_micros(1000));
322        assert_eq!(metrics.processing_time_ema.load(Ordering::Relaxed), 1000);
323
324        // EMA should move toward new value
325        metrics.record_query(true, Duration::from_micros(2000));
326        let ema = metrics.processing_time_ema.load(Ordering::Relaxed);
327        assert!(ema > 1000 && ema < 2000);
328    }
329
330    #[test]
331    fn test_peak_tracking() {
332        let metrics = DataMatchingMetrics::new();
333
334        metrics.record_query(true, Duration::from_micros(100));
335        metrics.record_query(true, Duration::from_micros(500));
336        metrics.record_query(true, Duration::from_micros(200));
337
338        assert_eq!(metrics.peak_processing_time.load(Ordering::Relaxed), 500);
339    }
340
341    #[test]
342    fn test_match_types() {
343        let metrics = DataMatchingMetrics::new();
344
345        metrics.record_match_type(true, false, false, false);
346        metrics.record_match_type(false, true, false, true);
347        metrics.record_match_type(false, false, true, true);
348
349        let snapshot = metrics.snapshot();
350        assert_eq!(snapshot.cpf_matches, 1);
351        assert_eq!(snapshot.cnpj_matches, 1);
352        assert_eq!(snapshot.fuzzy_matches, 1);
353        assert_eq!(snapshot.cross_source_matches, 2);
354    }
355
356    #[test]
357    fn test_reset() {
358        let metrics = DataMatchingMetrics::new();
359
360        metrics.record_query(true, Duration::from_micros(1000));
361        metrics.record_cache(true, false);
362
363        metrics.reset();
364
365        let snapshot = metrics.snapshot();
366        assert_eq!(snapshot.total_queries, 0);
367        assert_eq!(snapshot.cache_hits, 0);
368    }
369}