Skip to main content

thread_flow/monitoring/
mod.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! # Thread Flow Monitoring
5//!
6//! Production-ready monitoring and observability infrastructure for Thread Flow.
7//!
8//! ## Features
9//!
10//! - **Metrics Collection**: Prometheus-compatible metrics for cache, latency, throughput
11//! - **Structured Logging**: JSON and human-readable logging with tracing
12//! - **Performance Tracking**: Real-time performance metrics and alerts
13//! - **Error Tracking**: Error rates and error type categorization
14//!
15//! ## Usage
16//!
17//! ```rust,ignore
18//! use thread_flow::monitoring::{Metrics, init_logging};
19//!
20//! // Initialize logging
21//! init_logging(LogLevel::Info, LogFormat::Json)?;
22//!
23//! // Create metrics collector
24//! let metrics = Metrics::new();
25//!
26//! // Track operations
27//! metrics.record_cache_hit();
28//! metrics.record_query_latency(15);  // 15ms
29//! metrics.record_fingerprint_time(425);  // 425ns
30//!
31//! // Get statistics
32//! let stats = metrics.snapshot();
33//! println!("Cache hit rate: {:.2}%", stats.cache_hit_rate());
34//! ```
35//!
36//! ## Metrics Tracked
37//!
38//! ### Cache Metrics
39//! - `cache_hits` - Total cache hits
40//! - `cache_misses` - Total cache misses
41//! - `cache_hit_rate` - Hit rate percentage (target: >90%)
42//!
43//! ### Latency Metrics (in milliseconds)
44//! - `query_latency_p50` - Median query latency
45//! - `query_latency_p95` - 95th percentile query latency
46//! - `query_latency_p99` - 99th percentile query latency
47//!
48//! ### Performance Metrics
49//! - `fingerprint_time_ns` - Blake3 fingerprinting time in nanoseconds
50//! - `parse_time_us` - Tree-sitter parsing time in microseconds
51//! - `extract_time_us` - Symbol extraction time in microseconds
52//!
53//! ### Throughput Metrics
54//! - `files_processed_total` - Total files processed
55//! - `symbols_extracted_total` - Total symbols extracted
56//! - `throughput_files_per_sec` - Files processed per second
57//!
58//! ### Error Metrics
59//! - `errors_total` - Total errors by type
60//! - `error_rate` - Error rate percentage
61
62pub mod logging;
63pub mod performance;
64
65use std::sync::atomic::{AtomicU64, Ordering};
66use std::sync::{Arc, RwLock};
67use std::time::{Duration, Instant};
68use thread_utilities::RapidMap;
69
70/// Metrics collector for Thread Flow operations
71#[derive(Clone)]
72pub struct Metrics {
73    inner: Arc<MetricsInner>,
74}
75
76struct MetricsInner {
77    // Cache metrics
78    cache_hits: AtomicU64,
79    cache_misses: AtomicU64,
80
81    // Latency tracking (microseconds)
82    query_latencies: RwLock<Vec<u64>>,
83    fingerprint_times: RwLock<Vec<u64>>,
84    parse_times: RwLock<Vec<u64>>,
85
86    // Throughput tracking
87    files_processed: AtomicU64,
88    symbols_extracted: AtomicU64,
89    start_time: Instant,
90
91    // Error tracking
92    errors_by_type: RwLock<RapidMap<String, u64>>,
93}
94
95impl Metrics {
96    /// Create a new metrics collector
97    pub fn new() -> Self {
98        Self {
99            inner: Arc::new(MetricsInner {
100                cache_hits: AtomicU64::new(0),
101                cache_misses: AtomicU64::new(0),
102                query_latencies: RwLock::new(Vec::new()),
103                fingerprint_times: RwLock::new(Vec::new()),
104                parse_times: RwLock::new(Vec::new()),
105                files_processed: AtomicU64::new(0),
106                symbols_extracted: AtomicU64::new(0),
107                start_time: Instant::now(),
108                errors_by_type: RwLock::new(thread_utilities::get_map()),
109            }),
110        }
111    }
112
113    /// Record a cache hit
114    pub fn record_cache_hit(&self) {
115        self.inner.cache_hits.fetch_add(1, Ordering::Relaxed);
116    }
117
118    /// Record a cache miss
119    pub fn record_cache_miss(&self) {
120        self.inner.cache_misses.fetch_add(1, Ordering::Relaxed);
121    }
122
123    /// Record query latency in milliseconds
124    pub fn record_query_latency(&self, latency_ms: u64) {
125        if let Ok(mut latencies) = self.inner.query_latencies.write() {
126            latencies.push(latency_ms);
127            // Keep only last 10,000 samples to prevent unbounded growth
128            if latencies.len() > 10_000 {
129                latencies.drain(0..5_000);
130            }
131        }
132    }
133
134    /// Record fingerprint computation time in nanoseconds
135    pub fn record_fingerprint_time(&self, time_ns: u64) {
136        if let Ok(mut times) = self.inner.fingerprint_times.write() {
137            times.push(time_ns);
138            if times.len() > 10_000 {
139                times.drain(0..5_000);
140            }
141        }
142    }
143
144    /// Record parse time in microseconds
145    pub fn record_parse_time(&self, time_us: u64) {
146        if let Ok(mut times) = self.inner.parse_times.write() {
147            times.push(time_us);
148            if times.len() > 10_000 {
149                times.drain(0..5_000);
150            }
151        }
152    }
153
154    /// Record files processed
155    pub fn record_files_processed(&self, count: u64) {
156        self.inner
157            .files_processed
158            .fetch_add(count, Ordering::Relaxed);
159    }
160
161    /// Record symbols extracted
162    pub fn record_symbols_extracted(&self, count: u64) {
163        self.inner
164            .symbols_extracted
165            .fetch_add(count, Ordering::Relaxed);
166    }
167
168    /// Record an error by type
169    pub fn record_error(&self, error_type: impl Into<String>) {
170        if let Ok(mut errors) = self.inner.errors_by_type.write() {
171            *errors.entry(error_type.into()).or_insert(0) += 1;
172        }
173    }
174
175    /// Get a snapshot of current metrics
176    pub fn snapshot(&self) -> MetricsSnapshot {
177        let cache_hits = self.inner.cache_hits.load(Ordering::Relaxed);
178        let cache_misses = self.inner.cache_misses.load(Ordering::Relaxed);
179        let total_cache_lookups = cache_hits + cache_misses;
180
181        let cache_hit_rate = if total_cache_lookups > 0 {
182            (cache_hits as f64 / total_cache_lookups as f64) * 100.0
183        } else {
184            0.0
185        };
186
187        // Calculate percentiles
188        let query_latencies = self
189            .inner
190            .query_latencies
191            .read()
192            .ok()
193            .map(|l| calculate_percentiles(&l))
194            .unwrap_or_default();
195
196        let fingerprint_times = self
197            .inner
198            .fingerprint_times
199            .read()
200            .ok()
201            .map(|t| calculate_percentiles(&t))
202            .unwrap_or_default();
203
204        let parse_times = self
205            .inner
206            .parse_times
207            .read()
208            .ok()
209            .map(|t| calculate_percentiles(&t))
210            .unwrap_or_default();
211
212        let files_processed = self.inner.files_processed.load(Ordering::Relaxed);
213        let symbols_extracted = self.inner.symbols_extracted.load(Ordering::Relaxed);
214        let elapsed = self.inner.start_time.elapsed();
215
216        let throughput_files_per_sec = if elapsed.as_secs() > 0 {
217            files_processed as f64 / elapsed.as_secs_f64()
218        } else {
219            0.0
220        };
221
222        let errors_by_type = self
223            .inner
224            .errors_by_type
225            .read()
226            .ok()
227            .map(|e| e.clone())
228            .unwrap_or_default();
229
230        let total_errors: u64 = errors_by_type.values().sum();
231        let error_rate = if files_processed > 0 {
232            (total_errors as f64 / files_processed as f64) * 100.0
233        } else {
234            0.0
235        };
236
237        MetricsSnapshot {
238            cache_hits,
239            cache_misses,
240            cache_hit_rate,
241            query_latency_p50: query_latencies.p50,
242            query_latency_p95: query_latencies.p95,
243            query_latency_p99: query_latencies.p99,
244            fingerprint_time_p50: fingerprint_times.p50,
245            fingerprint_time_p95: fingerprint_times.p95,
246            parse_time_p50: parse_times.p50,
247            parse_time_p95: parse_times.p95,
248            files_processed,
249            symbols_extracted,
250            throughput_files_per_sec,
251            errors_by_type,
252            error_rate,
253            uptime: elapsed,
254        }
255    }
256
257    /// Export metrics in Prometheus format
258    pub fn export_prometheus(&self) -> String {
259        let snapshot = self.snapshot();
260        format!(
261            r#"# HELP thread_cache_hits_total Total number of cache hits
262# TYPE thread_cache_hits_total counter
263thread_cache_hits_total {}
264
265# HELP thread_cache_misses_total Total number of cache misses
266# TYPE thread_cache_misses_total counter
267thread_cache_misses_total {}
268
269# HELP thread_cache_hit_rate Cache hit rate percentage
270# TYPE thread_cache_hit_rate gauge
271thread_cache_hit_rate {:.2}
272
273# HELP thread_query_latency_milliseconds Query latency in milliseconds
274# TYPE thread_query_latency_milliseconds summary
275thread_query_latency_milliseconds{{quantile="0.5"}} {}
276thread_query_latency_milliseconds{{quantile="0.95"}} {}
277thread_query_latency_milliseconds{{quantile="0.99"}} {}
278
279# HELP thread_fingerprint_time_nanoseconds Fingerprint computation time in nanoseconds
280# TYPE thread_fingerprint_time_nanoseconds summary
281thread_fingerprint_time_nanoseconds{{quantile="0.5"}} {}
282thread_fingerprint_time_nanoseconds{{quantile="0.95"}} {}
283
284# HELP thread_parse_time_microseconds Parse time in microseconds
285# TYPE thread_parse_time_microseconds summary
286thread_parse_time_microseconds{{quantile="0.5"}} {}
287thread_parse_time_microseconds{{quantile="0.95"}} {}
288
289# HELP thread_files_processed_total Total files processed
290# TYPE thread_files_processed_total counter
291thread_files_processed_total {}
292
293# HELP thread_symbols_extracted_total Total symbols extracted
294# TYPE thread_symbols_extracted_total counter
295thread_symbols_extracted_total {}
296
297# HELP thread_throughput_files_per_second Files processed per second
298# TYPE thread_throughput_files_per_second gauge
299thread_throughput_files_per_second {:.2}
300
301# HELP thread_error_rate Error rate percentage
302# TYPE thread_error_rate gauge
303thread_error_rate {:.2}
304"#,
305            snapshot.cache_hits,
306            snapshot.cache_misses,
307            snapshot.cache_hit_rate,
308            snapshot.query_latency_p50,
309            snapshot.query_latency_p95,
310            snapshot.query_latency_p99,
311            snapshot.fingerprint_time_p50,
312            snapshot.fingerprint_time_p95,
313            snapshot.parse_time_p50,
314            snapshot.parse_time_p95,
315            snapshot.files_processed,
316            snapshot.symbols_extracted,
317            snapshot.throughput_files_per_sec,
318            snapshot.error_rate,
319        )
320    }
321
322    /// Reset all metrics
323    pub fn reset(&self) {
324        self.inner.cache_hits.store(0, Ordering::Relaxed);
325        self.inner.cache_misses.store(0, Ordering::Relaxed);
326        self.inner.files_processed.store(0, Ordering::Relaxed);
327        self.inner.symbols_extracted.store(0, Ordering::Relaxed);
328
329        if let Ok(mut latencies) = self.inner.query_latencies.write() {
330            latencies.clear();
331        }
332        if let Ok(mut times) = self.inner.fingerprint_times.write() {
333            times.clear();
334        }
335        if let Ok(mut times) = self.inner.parse_times.write() {
336            times.clear();
337        }
338        if let Ok(mut errors) = self.inner.errors_by_type.write() {
339            errors.clear();
340        }
341    }
342}
343
344impl Default for Metrics {
345    fn default() -> Self {
346        Self::new()
347    }
348}
349
350/// Snapshot of metrics at a point in time
351#[derive(Debug, Clone)]
352pub struct MetricsSnapshot {
353    // Cache metrics
354    pub cache_hits: u64,
355    pub cache_misses: u64,
356    pub cache_hit_rate: f64,
357
358    // Latency metrics (milliseconds)
359    pub query_latency_p50: u64,
360    pub query_latency_p95: u64,
361    pub query_latency_p99: u64,
362
363    // Performance metrics
364    pub fingerprint_time_p50: u64, // nanoseconds
365    pub fingerprint_time_p95: u64, // nanoseconds
366    pub parse_time_p50: u64,       // microseconds
367    pub parse_time_p95: u64,       // microseconds
368
369    // Throughput metrics
370    pub files_processed: u64,
371    pub symbols_extracted: u64,
372    pub throughput_files_per_sec: f64,
373
374    // Error metrics
375    pub errors_by_type: RapidMap<String, u64>,
376    pub error_rate: f64,
377
378    // System metrics
379    pub uptime: Duration,
380}
381
382impl MetricsSnapshot {
383    /// Check if metrics meet production SLOs
384    pub fn meets_slo(&self) -> SLOStatus {
385        let mut violations = Vec::new();
386
387        // Cache hit rate SLO: >90%
388        if self.cache_hit_rate < 90.0 {
389            violations.push(format!(
390                "Cache hit rate {:.2}% below SLO (90%)",
391                self.cache_hit_rate
392            ));
393        }
394
395        // Query latency SLO: p95 <10ms (CLI), <50ms (Edge)
396        // Assume CLI for now - could make this configurable
397        if self.query_latency_p95 > 50 {
398            violations.push(format!(
399                "Query p95 latency {}ms above SLO (50ms)",
400                self.query_latency_p95
401            ));
402        }
403
404        // Error rate SLO: <1%
405        if self.error_rate > 1.0 {
406            violations.push(format!("Error rate {:.2}% above SLO (1%)", self.error_rate));
407        }
408
409        if violations.is_empty() {
410            SLOStatus::Healthy
411        } else {
412            SLOStatus::Violated(violations)
413        }
414    }
415
416    /// Format metrics as human-readable text
417    pub fn format_text(&self) -> String {
418        format!(
419            r#"Thread Flow Metrics
420==================
421
422Cache Performance:
423  Hits: {} | Misses: {} | Hit Rate: {:.2}%
424
425Query Latency (ms):
426  p50: {} | p95: {} | p99: {}
427
428Performance (Blake3 fingerprint in ns, parse in µs):
429  Fingerprint p50: {}ns | p95: {}ns
430  Parse p50: {}µs | p95: {}µs
431
432Throughput:
433  Files Processed: {}
434  Symbols Extracted: {}
435  Files/sec: {:.2}
436
437Errors:
438  Total Errors: {} ({:.2}% rate)
439  By Type: {:?}
440
441Uptime: {:.2}s
442"#,
443            self.cache_hits,
444            self.cache_misses,
445            self.cache_hit_rate,
446            self.query_latency_p50,
447            self.query_latency_p95,
448            self.query_latency_p99,
449            self.fingerprint_time_p50,
450            self.fingerprint_time_p95,
451            self.parse_time_p50,
452            self.parse_time_p95,
453            self.files_processed,
454            self.symbols_extracted,
455            self.throughput_files_per_sec,
456            self.errors_by_type.values().sum::<u64>(),
457            self.error_rate,
458            self.errors_by_type,
459            self.uptime.as_secs_f64(),
460        )
461    }
462}
463
464/// SLO compliance status
465#[derive(Debug, Clone, PartialEq)]
466pub enum SLOStatus {
467    /// All SLOs are met
468    Healthy,
469    /// One or more SLOs are violated
470    Violated(Vec<String>),
471}
472
473/// Helper struct for percentile calculations
474#[derive(Debug, Default)]
475struct Percentiles {
476    p50: u64,
477    p95: u64,
478    p99: u64,
479}
480
481/// Calculate percentiles from a sorted list
482fn calculate_percentiles(values: &[u64]) -> Percentiles {
483    if values.is_empty() {
484        return Percentiles::default();
485    }
486
487    let mut sorted = values.to_vec();
488    sorted.sort_unstable();
489
490    let p50_idx = (sorted.len() as f64 * 0.50) as usize;
491    let p95_idx = (sorted.len() as f64 * 0.95) as usize;
492    let p99_idx = (sorted.len() as f64 * 0.99) as usize;
493
494    Percentiles {
495        p50: sorted.get(p50_idx).copied().unwrap_or(0),
496        p95: sorted.get(p95_idx).copied().unwrap_or(0),
497        p99: sorted.get(p99_idx).copied().unwrap_or(0),
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn test_metrics_cache_tracking() {
507        let metrics = Metrics::new();
508
509        metrics.record_cache_hit();
510        metrics.record_cache_hit();
511        metrics.record_cache_miss();
512
513        let snapshot = metrics.snapshot();
514        assert_eq!(snapshot.cache_hits, 2);
515        assert_eq!(snapshot.cache_misses, 1);
516        assert_eq!(snapshot.cache_hit_rate, 66.66666666666666);
517    }
518
519    #[test]
520    fn test_metrics_latency_percentiles() {
521        let metrics = Metrics::new();
522
523        // Record latencies: 10, 20, 30, 40, 50, 60, 70, 80, 90, 100
524        for i in 1..=10 {
525            metrics.record_query_latency(i * 10);
526        }
527
528        let snapshot = metrics.snapshot();
529        // With 10 values, p50_idx = (10 * 0.50) as usize = 5, sorted[5] = 60
530        assert_eq!(snapshot.query_latency_p50, 60);
531        assert_eq!(snapshot.query_latency_p95, 100);
532        assert_eq!(snapshot.query_latency_p99, 100);
533    }
534
535    #[test]
536    fn test_metrics_slo_compliance() {
537        let metrics = Metrics::new();
538
539        // Good metrics (meet SLO)
540        for _ in 0..95 {
541            metrics.record_cache_hit();
542        }
543        for _ in 0..5 {
544            metrics.record_cache_miss();
545        }
546        metrics.record_query_latency(5);
547        metrics.record_files_processed(100);
548
549        let snapshot = metrics.snapshot();
550        assert_eq!(snapshot.meets_slo(), SLOStatus::Healthy);
551
552        // Bad metrics (violate SLO)
553        metrics.reset();
554        for _ in 0..50 {
555            metrics.record_cache_hit();
556        }
557        for _ in 0..50 {
558            metrics.record_cache_miss();
559        }
560
561        let snapshot = metrics.snapshot();
562        assert!(matches!(snapshot.meets_slo(), SLOStatus::Violated(_)));
563    }
564
565    #[test]
566    fn test_prometheus_export() {
567        let metrics = Metrics::new();
568        metrics.record_cache_hit();
569        metrics.record_files_processed(10);
570
571        let prometheus = metrics.export_prometheus();
572        assert!(prometheus.contains("thread_cache_hits_total 1"));
573        assert!(prometheus.contains("thread_files_processed_total 10"));
574    }
575
576    #[test]
577    fn test_metrics_reset() {
578        let metrics = Metrics::new();
579        metrics.record_cache_hit();
580        metrics.record_files_processed(10);
581
582        let snapshot = metrics.snapshot();
583        assert_eq!(snapshot.cache_hits, 1);
584        assert_eq!(snapshot.files_processed, 10);
585
586        metrics.reset();
587
588        let snapshot = metrics.snapshot();
589        assert_eq!(snapshot.cache_hits, 0);
590        assert_eq!(snapshot.files_processed, 0);
591    }
592}