1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::time::Duration;
7
8#[derive(Debug, Default)]
10pub struct DataMatchingMetrics {
11 total_queries: AtomicU64,
13 successful_matches: AtomicU64,
15 failed_matches: AtomicU64,
17 cache_hits: AtomicU64,
19 cache_misses: AtomicU64,
21 cache_evictions: AtomicU64,
23 negative_cache_hits: AtomicU64,
25 processing_time_ema: AtomicU64,
27 peak_processing_time: AtomicU64,
29 records_scanned: AtomicU64,
31 memory_usage: AtomicUsize,
33 cpf_matches: AtomicU64,
35 cnpj_matches: AtomicU64,
37 fuzzy_matches: AtomicU64,
39 cross_source_matches: AtomicU64,
41}
42
43impl DataMatchingMetrics {
44 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn record_query(&self, success: bool, duration: Duration) {
51 self.total_queries.fetch_add(1, Ordering::Relaxed);
52
53 if success {
54 self.successful_matches.fetch_add(1, Ordering::Relaxed);
55 } else {
56 self.failed_matches.fetch_add(1, Ordering::Relaxed);
57 }
58
59 let duration_us = duration.as_micros() as u64;
62 let prev = self.processing_time_ema.load(Ordering::Relaxed);
63 let new_ema = if prev == 0 {
64 duration_us
65 } else {
66 (duration_us * 3 + prev * 7) / 10
69 };
70 self.processing_time_ema.store(new_ema, Ordering::Relaxed);
71
72 self.peak_processing_time
74 .fetch_max(duration_us, Ordering::Relaxed);
75 }
76
77 pub fn record_cache(&self, hit: bool, negative: bool) {
79 if hit {
80 if negative {
81 self.negative_cache_hits.fetch_add(1, Ordering::Relaxed);
82 } else {
83 self.cache_hits.fetch_add(1, Ordering::Relaxed);
84 }
85 } else {
86 self.cache_misses.fetch_add(1, Ordering::Relaxed);
87 }
88 }
89
90 pub fn record_eviction(&self) {
92 self.cache_evictions.fetch_add(1, Ordering::Relaxed);
93 }
94
95 pub fn record_scan(&self, count: u64) {
97 self.records_scanned.fetch_add(count, Ordering::Relaxed);
98 }
99
100 pub fn record_match_type(&self, cpf: bool, cnpj: bool, fuzzy: bool, cross_source: bool) {
102 if cpf {
103 self.cpf_matches.fetch_add(1, Ordering::Relaxed);
104 }
105 if cnpj {
106 self.cnpj_matches.fetch_add(1, Ordering::Relaxed);
107 }
108 if fuzzy {
109 self.fuzzy_matches.fetch_add(1, Ordering::Relaxed);
110 }
111 if cross_source {
112 self.cross_source_matches.fetch_add(1, Ordering::Relaxed);
113 }
114 }
115
116 pub fn set_memory_usage(&self, bytes: usize) {
118 self.memory_usage.store(bytes, Ordering::Relaxed);
119 }
120
121 pub fn cache_hit_rate(&self) -> f64 {
123 let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
124 let negative_hits = self.negative_cache_hits.load(Ordering::Relaxed) as f64;
125 let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
126 let total = hits + negative_hits + misses;
127
128 if total == 0.0 {
129 0.0
130 } else {
131 (hits + negative_hits) / total
132 }
133 }
134
135 pub fn success_rate(&self) -> f64 {
137 let successful = self.successful_matches.load(Ordering::Relaxed) as f64;
138 let total = self.total_queries.load(Ordering::Relaxed) as f64;
139
140 if total == 0.0 {
141 0.0
142 } else {
143 successful / total
144 }
145 }
146
147 pub fn avg_processing_time_ms(&self) -> f64 {
149 self.processing_time_ema.load(Ordering::Relaxed) as f64 / 1000.0
150 }
151
152 pub fn peak_processing_time_ms(&self) -> f64 {
154 self.peak_processing_time.load(Ordering::Relaxed) as f64 / 1000.0
155 }
156
157 pub fn throughput_rps(&self) -> f64 {
159 let avg_time_us = self.processing_time_ema.load(Ordering::Relaxed) as f64;
160 if avg_time_us == 0.0 {
161 0.0
162 } else {
163 1_000_000.0 / avg_time_us
164 }
165 }
166
167 pub fn snapshot(&self) -> MetricsSnapshot {
169 MetricsSnapshot {
170 total_queries: self.total_queries.load(Ordering::Relaxed),
171 successful_matches: self.successful_matches.load(Ordering::Relaxed),
172 failed_matches: self.failed_matches.load(Ordering::Relaxed),
173 cache_hits: self.cache_hits.load(Ordering::Relaxed),
174 cache_misses: self.cache_misses.load(Ordering::Relaxed),
175 cache_evictions: self.cache_evictions.load(Ordering::Relaxed),
176 negative_cache_hits: self.negative_cache_hits.load(Ordering::Relaxed),
177 cache_hit_rate: self.cache_hit_rate(),
178 success_rate: self.success_rate(),
179 avg_processing_time_ms: self.avg_processing_time_ms(),
180 peak_processing_time_ms: self.peak_processing_time_ms(),
181 throughput_rps: self.throughput_rps(),
182 records_scanned: self.records_scanned.load(Ordering::Relaxed),
183 memory_usage_bytes: self.memory_usage.load(Ordering::Relaxed),
184 cpf_matches: self.cpf_matches.load(Ordering::Relaxed),
185 cnpj_matches: self.cnpj_matches.load(Ordering::Relaxed),
186 fuzzy_matches: self.fuzzy_matches.load(Ordering::Relaxed),
187 cross_source_matches: self.cross_source_matches.load(Ordering::Relaxed),
188 }
189 }
190
191 pub fn reset(&self) {
193 self.total_queries.store(0, Ordering::Relaxed);
194 self.successful_matches.store(0, Ordering::Relaxed);
195 self.failed_matches.store(0, Ordering::Relaxed);
196 self.cache_hits.store(0, Ordering::Relaxed);
197 self.cache_misses.store(0, Ordering::Relaxed);
198 self.cache_evictions.store(0, Ordering::Relaxed);
199 self.negative_cache_hits.store(0, Ordering::Relaxed);
200 self.processing_time_ema.store(0, Ordering::Relaxed);
201 self.peak_processing_time.store(0, Ordering::Relaxed);
202 self.records_scanned.store(0, Ordering::Relaxed);
203 self.cpf_matches.store(0, Ordering::Relaxed);
204 self.cnpj_matches.store(0, Ordering::Relaxed);
205 self.fuzzy_matches.store(0, Ordering::Relaxed);
206 self.cross_source_matches.store(0, Ordering::Relaxed);
207 }
208}
209
210#[derive(Debug, Clone, serde::Serialize)]
212pub struct MetricsSnapshot {
213 pub total_queries: u64,
214 pub successful_matches: u64,
215 pub failed_matches: u64,
216 pub cache_hits: u64,
217 pub cache_misses: u64,
218 pub cache_evictions: u64,
219 pub negative_cache_hits: u64,
220 pub cache_hit_rate: f64,
221 pub success_rate: f64,
222 pub avg_processing_time_ms: f64,
223 pub peak_processing_time_ms: f64,
224 pub throughput_rps: f64,
225 pub records_scanned: u64,
226 pub memory_usage_bytes: usize,
227 pub cpf_matches: u64,
228 pub cnpj_matches: u64,
229 pub fuzzy_matches: u64,
230 pub cross_source_matches: u64,
231}
232
233impl std::fmt::Display for MetricsSnapshot {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 writeln!(f, "Data Matching Metrics:")?;
236 writeln!(
237 f,
238 " Queries: {} total, {} successful ({:.1}%)",
239 self.total_queries,
240 self.successful_matches,
241 self.success_rate * 100.0
242 )?;
243 writeln!(
244 f,
245 " Cache: {:.1}% hit rate ({} hits, {} misses, {} negative)",
246 self.cache_hit_rate * 100.0,
247 self.cache_hits,
248 self.cache_misses,
249 self.negative_cache_hits
250 )?;
251 writeln!(
252 f,
253 " Performance: {:.2}ms avg, {:.2}ms peak, {:.1} queries/sec",
254 self.avg_processing_time_ms, self.peak_processing_time_ms, self.throughput_rps
255 )?;
256 writeln!(
257 f,
258 " Matches: {} CPF, {} CNPJ, {} fuzzy, {} cross-source",
259 self.cpf_matches, self.cnpj_matches, self.fuzzy_matches, self.cross_source_matches
260 )?;
261 writeln!(
262 f,
263 " Records scanned: {}, Memory: {} bytes",
264 self.records_scanned, self.memory_usage_bytes
265 )?;
266 Ok(())
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn test_metrics_basic() {
276 let metrics = DataMatchingMetrics::new();
277
278 metrics.record_query(true, Duration::from_micros(1000));
279 metrics.record_query(true, Duration::from_micros(2000));
280 metrics.record_query(false, Duration::from_micros(500));
281
282 let snapshot = metrics.snapshot();
283 assert_eq!(snapshot.total_queries, 3);
284 assert_eq!(snapshot.successful_matches, 2);
285 assert_eq!(snapshot.failed_matches, 1);
286 }
287
288 #[test]
289 fn test_cache_hit_rate() {
290 let metrics = DataMatchingMetrics::new();
291
292 metrics.record_cache(true, false); metrics.record_cache(true, false); metrics.record_cache(false, false); let rate = metrics.cache_hit_rate();
297 assert!((rate - 0.666).abs() < 0.01);
298 }
299
300 #[test]
301 fn test_negative_cache() {
302 let metrics = DataMatchingMetrics::new();
303
304 metrics.record_cache(true, true); metrics.record_cache(true, false); metrics.record_cache(false, false); let snapshot = metrics.snapshot();
309 assert_eq!(snapshot.negative_cache_hits, 1);
310 assert_eq!(snapshot.cache_hits, 1);
311 assert_eq!(snapshot.cache_misses, 1);
312 assert!((snapshot.cache_hit_rate - 0.666).abs() < 0.01);
314 }
315
316 #[test]
317 fn test_ema_processing_time() {
318 let metrics = DataMatchingMetrics::new();
319
320 metrics.record_query(true, Duration::from_micros(1000));
322 assert_eq!(metrics.processing_time_ema.load(Ordering::Relaxed), 1000);
323
324 metrics.record_query(true, Duration::from_micros(2000));
326 let ema = metrics.processing_time_ema.load(Ordering::Relaxed);
327 assert!(ema > 1000 && ema < 2000);
328 }
329
330 #[test]
331 fn test_peak_tracking() {
332 let metrics = DataMatchingMetrics::new();
333
334 metrics.record_query(true, Duration::from_micros(100));
335 metrics.record_query(true, Duration::from_micros(500));
336 metrics.record_query(true, Duration::from_micros(200));
337
338 assert_eq!(metrics.peak_processing_time.load(Ordering::Relaxed), 500);
339 }
340
341 #[test]
342 fn test_match_types() {
343 let metrics = DataMatchingMetrics::new();
344
345 metrics.record_match_type(true, false, false, false);
346 metrics.record_match_type(false, true, false, true);
347 metrics.record_match_type(false, false, true, true);
348
349 let snapshot = metrics.snapshot();
350 assert_eq!(snapshot.cpf_matches, 1);
351 assert_eq!(snapshot.cnpj_matches, 1);
352 assert_eq!(snapshot.fuzzy_matches, 1);
353 assert_eq!(snapshot.cross_source_matches, 2);
354 }
355
356 #[test]
357 fn test_reset() {
358 let metrics = DataMatchingMetrics::new();
359
360 metrics.record_query(true, Duration::from_micros(1000));
361 metrics.record_cache(true, false);
362
363 metrics.reset();
364
365 let snapshot = metrics.snapshot();
366 assert_eq!(snapshot.total_queries, 0);
367 assert_eq!(snapshot.cache_hits, 0);
368 }
369}