Skip to main content

thread_flow/monitoring/
performance.rs

1// SPDX-FileCopyrightText: 2026 Knitli Inc.
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Performance monitoring and metrics collection
5//!
6//! Integrates with Prometheus to track:
7//! - Fingerprint computation latency
8//! - Cache hit/miss rates
9//! - Query execution times
10//! - Memory usage
11//! - Throughput metrics
12
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{Duration, Instant};
16
17/// Performance metrics collector
18#[derive(Clone)]
19pub struct PerformanceMetrics {
20    // Fingerprint metrics
21    fingerprint_total: Arc<AtomicU64>,
22    fingerprint_duration_ns: Arc<AtomicU64>,
23
24    // Cache metrics
25    cache_hits: Arc<AtomicU64>,
26    cache_misses: Arc<AtomicU64>,
27    cache_evictions: Arc<AtomicU64>,
28
29    // Query metrics
30    query_count: Arc<AtomicU64>,
31    query_duration_ns: Arc<AtomicU64>,
32    query_errors: Arc<AtomicU64>,
33
34    // Memory metrics
35    bytes_processed: Arc<AtomicU64>,
36    allocations: Arc<AtomicU64>,
37
38    // Throughput metrics
39    files_processed: Arc<AtomicU64>,
40    batch_count: Arc<AtomicU64>,
41}
42
43impl Default for PerformanceMetrics {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl PerformanceMetrics {
50    /// Create new performance metrics collector
51    pub fn new() -> Self {
52        Self {
53            fingerprint_total: Arc::new(AtomicU64::new(0)),
54            fingerprint_duration_ns: Arc::new(AtomicU64::new(0)),
55            cache_hits: Arc::new(AtomicU64::new(0)),
56            cache_misses: Arc::new(AtomicU64::new(0)),
57            cache_evictions: Arc::new(AtomicU64::new(0)),
58            query_count: Arc::new(AtomicU64::new(0)),
59            query_duration_ns: Arc::new(AtomicU64::new(0)),
60            query_errors: Arc::new(AtomicU64::new(0)),
61            bytes_processed: Arc::new(AtomicU64::new(0)),
62            allocations: Arc::new(AtomicU64::new(0)),
63            files_processed: Arc::new(AtomicU64::new(0)),
64            batch_count: Arc::new(AtomicU64::new(0)),
65        }
66    }
67
68    /// Record fingerprint computation
69    pub fn record_fingerprint(&self, duration: Duration) {
70        self.fingerprint_total.fetch_add(1, Ordering::Relaxed);
71        self.fingerprint_duration_ns
72            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
73    }
74
75    /// Record cache hit
76    pub fn record_cache_hit(&self) {
77        self.cache_hits.fetch_add(1, Ordering::Relaxed);
78    }
79
80    /// Record cache miss
81    pub fn record_cache_miss(&self) {
82        self.cache_misses.fetch_add(1, Ordering::Relaxed);
83    }
84
85    /// Record cache eviction
86    pub fn record_cache_eviction(&self) {
87        self.cache_evictions.fetch_add(1, Ordering::Relaxed);
88    }
89
90    /// Record query execution
91    pub fn record_query(&self, duration: Duration, success: bool) {
92        self.query_count.fetch_add(1, Ordering::Relaxed);
93        self.query_duration_ns
94            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
95        if !success {
96            self.query_errors.fetch_add(1, Ordering::Relaxed);
97        }
98    }
99
100    /// Record bytes processed
101    pub fn record_bytes(&self, bytes: u64) {
102        self.bytes_processed.fetch_add(bytes, Ordering::Relaxed);
103    }
104
105    /// Record memory allocation
106    pub fn record_allocation(&self) {
107        self.allocations.fetch_add(1, Ordering::Relaxed);
108    }
109
110    /// Record file processed
111    pub fn record_file_processed(&self) {
112        self.files_processed.fetch_add(1, Ordering::Relaxed);
113    }
114
115    /// Record batch processing
116    pub fn record_batch(&self, file_count: u64) {
117        self.batch_count.fetch_add(1, Ordering::Relaxed);
118        self.files_processed
119            .fetch_add(file_count, Ordering::Relaxed);
120    }
121
122    /// Get fingerprint statistics
123    pub fn fingerprint_stats(&self) -> FingerprintStats {
124        let total = self.fingerprint_total.load(Ordering::Relaxed);
125        let duration_ns = self.fingerprint_duration_ns.load(Ordering::Relaxed);
126
127        let avg_ns = duration_ns.checked_div(total).unwrap_or(0);
128
129        FingerprintStats {
130            total_count: total,
131            total_duration_ns: duration_ns,
132            avg_duration_ns: avg_ns,
133        }
134    }
135
136    /// Get cache statistics
137    pub fn cache_stats(&self) -> CacheStats {
138        let hits = self.cache_hits.load(Ordering::Relaxed);
139        let misses = self.cache_misses.load(Ordering::Relaxed);
140        let total = hits + misses;
141
142        let hit_rate = if total > 0 {
143            (hits as f64 / total as f64) * 100.0
144        } else {
145            0.0
146        };
147
148        CacheStats {
149            hits,
150            misses,
151            evictions: self.cache_evictions.load(Ordering::Relaxed),
152            hit_rate_percent: hit_rate,
153        }
154    }
155
156    /// Get query statistics
157    pub fn query_stats(&self) -> QueryStats {
158        let count = self.query_count.load(Ordering::Relaxed);
159        let duration_ns = self.query_duration_ns.load(Ordering::Relaxed);
160        let errors = self.query_errors.load(Ordering::Relaxed);
161
162        let avg_ns = duration_ns.checked_div(count).unwrap_or(0);
163        let error_rate = if count > 0 {
164            (errors as f64 / count as f64) * 100.0
165        } else {
166            0.0
167        };
168
169        QueryStats {
170            total_count: count,
171            total_duration_ns: duration_ns,
172            avg_duration_ns: avg_ns,
173            errors,
174            error_rate_percent: error_rate,
175        }
176    }
177
178    /// Get throughput statistics
179    pub fn throughput_stats(&self) -> ThroughputStats {
180        ThroughputStats {
181            bytes_processed: self.bytes_processed.load(Ordering::Relaxed),
182            files_processed: self.files_processed.load(Ordering::Relaxed),
183            batches_processed: self.batch_count.load(Ordering::Relaxed),
184        }
185    }
186
187    /// Reset all metrics
188    pub fn reset(&self) {
189        self.fingerprint_total.store(0, Ordering::Relaxed);
190        self.fingerprint_duration_ns.store(0, Ordering::Relaxed);
191        self.cache_hits.store(0, Ordering::Relaxed);
192        self.cache_misses.store(0, Ordering::Relaxed);
193        self.cache_evictions.store(0, Ordering::Relaxed);
194        self.query_count.store(0, Ordering::Relaxed);
195        self.query_duration_ns.store(0, Ordering::Relaxed);
196        self.query_errors.store(0, Ordering::Relaxed);
197        self.bytes_processed.store(0, Ordering::Relaxed);
198        self.allocations.store(0, Ordering::Relaxed);
199        self.files_processed.store(0, Ordering::Relaxed);
200        self.batch_count.store(0, Ordering::Relaxed);
201    }
202
203    /// Export metrics in Prometheus format
204    pub fn export_prometheus(&self) -> String {
205        let fingerprint = self.fingerprint_stats();
206        let cache = self.cache_stats();
207        let query = self.query_stats();
208        let throughput = self.throughput_stats();
209
210        format!(
211            r#"# HELP thread_fingerprint_total Total fingerprint computations
212# TYPE thread_fingerprint_total counter
213thread_fingerprint_total {}
214
215# HELP thread_fingerprint_duration_seconds Total fingerprint computation time
216# TYPE thread_fingerprint_duration_seconds counter
217thread_fingerprint_duration_seconds {}
218
219# HELP thread_fingerprint_avg_duration_seconds Average fingerprint computation time
220# TYPE thread_fingerprint_avg_duration_seconds gauge
221thread_fingerprint_avg_duration_seconds {}
222
223# HELP thread_cache_hits_total Total cache hits
224# TYPE thread_cache_hits_total counter
225thread_cache_hits_total {}
226
227# HELP thread_cache_misses_total Total cache misses
228# TYPE thread_cache_misses_total counter
229thread_cache_misses_total {}
230
231# HELP thread_cache_evictions_total Total cache evictions
232# TYPE thread_cache_evictions_total counter
233thread_cache_evictions_total {}
234
235# HELP thread_cache_hit_rate_percent Cache hit rate percentage
236# TYPE thread_cache_hit_rate_percent gauge
237thread_cache_hit_rate_percent {}
238
239# HELP thread_query_total Total queries executed
240# TYPE thread_query_total counter
241thread_query_total {}
242
243# HELP thread_query_duration_seconds Total query execution time
244# TYPE thread_query_duration_seconds counter
245thread_query_duration_seconds {}
246
247# HELP thread_query_avg_duration_seconds Average query execution time
248# TYPE thread_query_avg_duration_seconds gauge
249thread_query_avg_duration_seconds {}
250
251# HELP thread_query_errors_total Total query errors
252# TYPE thread_query_errors_total counter
253thread_query_errors_total {}
254
255# HELP thread_query_error_rate_percent Query error rate percentage
256# TYPE thread_query_error_rate_percent gauge
257thread_query_error_rate_percent {}
258
259# HELP thread_bytes_processed_total Total bytes processed
260# TYPE thread_bytes_processed_total counter
261thread_bytes_processed_total {}
262
263# HELP thread_files_processed_total Total files processed
264# TYPE thread_files_processed_total counter
265thread_files_processed_total {}
266
267# HELP thread_batches_processed_total Total batches processed
268# TYPE thread_batches_processed_total counter
269thread_batches_processed_total {}
270"#,
271            fingerprint.total_count,
272            fingerprint.total_duration_ns as f64 / 1_000_000_000.0,
273            fingerprint.avg_duration_ns as f64 / 1_000_000_000.0,
274            cache.hits,
275            cache.misses,
276            cache.evictions,
277            cache.hit_rate_percent,
278            query.total_count,
279            query.total_duration_ns as f64 / 1_000_000_000.0,
280            query.avg_duration_ns as f64 / 1_000_000_000.0,
281            query.errors,
282            query.error_rate_percent,
283            throughput.bytes_processed,
284            throughput.files_processed,
285            throughput.batches_processed,
286        )
287    }
288}
289
290/// Fingerprint computation statistics
291#[derive(Debug, Clone)]
292pub struct FingerprintStats {
293    pub total_count: u64,
294    pub total_duration_ns: u64,
295    pub avg_duration_ns: u64,
296}
297
298/// Cache performance statistics
299#[derive(Debug, Clone)]
300pub struct CacheStats {
301    pub hits: u64,
302    pub misses: u64,
303    pub evictions: u64,
304    pub hit_rate_percent: f64,
305}
306
307/// Query execution statistics
308#[derive(Debug, Clone)]
309pub struct QueryStats {
310    pub total_count: u64,
311    pub total_duration_ns: u64,
312    pub avg_duration_ns: u64,
313    pub errors: u64,
314    pub error_rate_percent: f64,
315}
316
317/// Throughput statistics
318#[derive(Debug, Clone)]
319pub struct ThroughputStats {
320    pub bytes_processed: u64,
321    pub files_processed: u64,
322    pub batches_processed: u64,
323}
324
325/// Performance timer for automatic metric recording
326pub struct PerformanceTimer<'a> {
327    metrics: &'a PerformanceMetrics,
328    metric_type: MetricType,
329    start: Instant,
330}
331
332/// Type of metric being timed
333pub enum MetricType {
334    Fingerprint,
335    Query,
336}
337
338impl<'a> PerformanceTimer<'a> {
339    /// Start a new performance timer
340    pub fn start(metrics: &'a PerformanceMetrics, metric_type: MetricType) -> Self {
341        Self {
342            metrics,
343            metric_type,
344            start: Instant::now(),
345        }
346    }
347
348    /// Stop the timer and record the duration (success)
349    pub fn stop_success(self) {
350        let duration = self.start.elapsed();
351        match self.metric_type {
352            MetricType::Fingerprint => self.metrics.record_fingerprint(duration),
353            MetricType::Query => self.metrics.record_query(duration, true),
354        }
355    }
356
357    /// Stop the timer and record the duration (error)
358    pub fn stop_error(self) {
359        let duration = self.start.elapsed();
360        if let MetricType::Query = self.metric_type {
361            self.metrics.record_query(duration, false)
362        }
363    }
364}
365
366impl<'a> Drop for PerformanceTimer<'a> {
367    fn drop(&mut self) {
368        // Auto-record on drop (assumes success)
369        let duration = self.start.elapsed();
370        match self.metric_type {
371            MetricType::Fingerprint => self.metrics.record_fingerprint(duration),
372            MetricType::Query => self.metrics.record_query(duration, true),
373        }
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use std::thread;
381
382    #[test]
383    fn test_fingerprint_metrics() {
384        let metrics = PerformanceMetrics::new();
385
386        // Record some fingerprints
387        metrics.record_fingerprint(Duration::from_nanos(500));
388        metrics.record_fingerprint(Duration::from_nanos(1000));
389        metrics.record_fingerprint(Duration::from_nanos(1500));
390
391        let stats = metrics.fingerprint_stats();
392        assert_eq!(stats.total_count, 3);
393        assert_eq!(stats.total_duration_ns, 3000);
394        assert_eq!(stats.avg_duration_ns, 1000);
395    }
396
397    #[test]
398    fn test_cache_metrics() {
399        let metrics = PerformanceMetrics::new();
400
401        // Record cache activity
402        metrics.record_cache_hit();
403        metrics.record_cache_hit();
404        metrics.record_cache_hit();
405        metrics.record_cache_miss();
406        metrics.record_cache_eviction();
407
408        let stats = metrics.cache_stats();
409        assert_eq!(stats.hits, 3);
410        assert_eq!(stats.misses, 1);
411        assert_eq!(stats.evictions, 1);
412        assert_eq!(stats.hit_rate_percent, 75.0);
413    }
414
415    #[test]
416    fn test_query_metrics() {
417        let metrics = PerformanceMetrics::new();
418
419        // Record queries
420        metrics.record_query(Duration::from_millis(10), true);
421        metrics.record_query(Duration::from_millis(20), true);
422        metrics.record_query(Duration::from_millis(15), false);
423
424        let stats = metrics.query_stats();
425        assert_eq!(stats.total_count, 3);
426        assert_eq!(stats.errors, 1);
427        assert!((stats.error_rate_percent - 33.33).abs() < 0.1);
428    }
429
430    #[test]
431    fn test_throughput_metrics() {
432        let metrics = PerformanceMetrics::new();
433
434        metrics.record_bytes(1024);
435        metrics.record_file_processed();
436        metrics.record_batch(10);
437
438        let stats = metrics.throughput_stats();
439        assert_eq!(stats.bytes_processed, 1024);
440        assert_eq!(stats.files_processed, 11); // 1 + 10 from batch
441        assert_eq!(stats.batches_processed, 1);
442    }
443
444    #[test]
445    fn test_performance_timer() {
446        let metrics = PerformanceMetrics::new();
447
448        {
449            let _timer = PerformanceTimer::start(&metrics, MetricType::Fingerprint);
450            thread::sleep(Duration::from_millis(1));
451        }
452
453        let stats = metrics.fingerprint_stats();
454        assert_eq!(stats.total_count, 1);
455        assert!(stats.avg_duration_ns >= 1_000_000); // At least 1ms
456    }
457
458    #[test]
459    fn test_metrics_reset() {
460        let metrics = PerformanceMetrics::new();
461
462        metrics.record_fingerprint(Duration::from_nanos(500));
463        metrics.record_cache_hit();
464        metrics.record_query(Duration::from_millis(10), true);
465
466        metrics.reset();
467
468        let fp_stats = metrics.fingerprint_stats();
469        let cache_stats = metrics.cache_stats();
470        let query_stats = metrics.query_stats();
471
472        assert_eq!(fp_stats.total_count, 0);
473        assert_eq!(cache_stats.hits, 0);
474        assert_eq!(query_stats.total_count, 0);
475    }
476
477    #[test]
478    fn test_prometheus_export() {
479        let metrics = PerformanceMetrics::new();
480
481        metrics.record_fingerprint(Duration::from_nanos(500));
482        metrics.record_cache_hit();
483
484        let export = metrics.export_prometheus();
485
486        assert!(export.contains("thread_fingerprint_total 1"));
487        assert!(export.contains("thread_cache_hits_total 1"));
488        assert!(export.contains("# HELP"));
489        assert!(export.contains("# TYPE"));
490    }
491}