adaptive_pipeline/infrastructure/metrics/
concurrency_metrics.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Concurrency Metrics
9//!
10//! This module provides metrics for monitoring and tuning concurrency and
11//! resource utilization in the pipeline system.
12//!
13//! ## Educational Purpose
14//!
15//! These metrics demonstrate:
16//! - How to observe resource saturation
17//! - When to tune resource limits
18//! - Impact of concurrency on throughput and latency
19//!
20//! ## Metric Types
21//!
22//! **Gauges** - Instant values (e.g., tokens available right now)
23//! **Counters** - Cumulative values (e.g., total wait time)
24//! **Histograms** - Distribution of values (e.g., P50/P95/P99 wait times)
25//!
26//! ## Usage
27//!
28//! ```rust,ignore
29//! use adaptive_pipeline::infrastructure::metrics::CONCURRENCY_METRICS;
30//!
31//! // Record resource acquisition
32//! CONCURRENCY_METRICS.record_cpu_wait(wait_duration);
33//!
34//! // Check saturation
35//! let saturation = CONCURRENCY_METRICS.cpu_saturation_percent();
36//! if saturation > 80.0 {
37//!     println!("CPU-saturated: consider increasing workers");
38//! }
39//! ```
40
41use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex};
43use std::time::Duration;
44
45/// Simple histogram for latency distribution tracking
46///
47/// ## Educational: Why a histogram?
48///
49/// Averages hide problems! Consider:
50/// - Average wait: 10ms (looks fine)
51/// - But P99 wait: 500ms (users experience terrible latency!)
52///
53/// Histograms show the full distribution, revealing tail latencies.
54#[derive(Debug)]
55pub struct Histogram {
56    /// Bucket boundaries in milliseconds
57    /// [0-1ms, 1-5ms, 5-10ms, 10-50ms, 50-100ms, 100+ms]
58    buckets: Vec<AtomicU64>,
59    bucket_boundaries: Vec<u64>,
60}
61
62impl Default for Histogram {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl Histogram {
69    pub fn new() -> Self {
70        // Educational: Chosen boundaries capture common latency ranges
71        // Adjust based on your system's characteristics
72        let bucket_boundaries = vec![1, 5, 10, 50, 100];
73        let buckets = bucket_boundaries
74            .iter()
75            .map(|_| AtomicU64::new(0))
76            .chain(std::iter::once(AtomicU64::new(0))) // +inf bucket
77            .collect();
78
79        Self {
80            buckets,
81            bucket_boundaries,
82        }
83    }
84
85    /// Record a value in milliseconds
86    pub fn record(&self, value_ms: u64) {
87        // Find appropriate bucket
88        let bucket_idx = self
89            .bucket_boundaries
90            .iter()
91            .position(|&boundary| value_ms < boundary)
92            .unwrap_or(self.bucket_boundaries.len());
93
94        self.buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
95    }
96
97    /// Get total count across all buckets
98    pub fn count(&self) -> u64 {
99        self.buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
100    }
101
102    /// Get rough percentile estimate
103    ///
104    /// ## Educational: Why "rough"?
105    ///
106    /// True percentiles require sorting all values. Histograms trade
107    /// precision for memory efficiency by bucketing values.
108    pub fn percentile(&self, p: f64) -> u64 {
109        let total = self.count();
110        if total == 0 {
111            return 0;
112        }
113
114        let target = (((total as f64) * p) / 100.0) as u64;
115        let mut cumulative = 0u64;
116
117        for (i, bucket) in self.buckets.iter().enumerate() {
118            cumulative += bucket.load(Ordering::Relaxed);
119            if cumulative >= target {
120                return if i < self.bucket_boundaries.len() {
121                    self.bucket_boundaries[i]
122                } else {
123                    100 // +inf bucket
124                };
125            }
126        }
127
128        0
129    }
130
131    /// Reset histogram
132    pub fn reset(&self) {
133        for bucket in &self.buckets {
134            bucket.store(0, Ordering::Relaxed);
135        }
136    }
137}
138
139/// Concurrency metrics for resource manager monitoring
140///
141/// ## Educational: Observability-Driven Tuning
142///
143/// These metrics answer key questions:
144/// - "Are we CPU-saturated?" → cpu_saturation_percent()
145/// - "Are we I/O-saturated?" → io_saturation_percent()
146/// - "What's causing latency?" → wait time histograms
147/// - "How much memory are we using?" → memory_used_bytes
148///
149/// Use these to guide tuning decisions!
150#[derive(Debug)]
151pub struct ConcurrencyMetrics {
152    // === CPU Metrics ===
153    /// Current number of available CPU tokens (gauge)
154    cpu_tokens_available: AtomicUsize,
155
156    /// Total CPU tokens configured (static)
157    cpu_tokens_total: usize,
158
159    /// Total time spent waiting for CPU tokens (counter, milliseconds)
160    cpu_wait_total_ms: AtomicU64,
161
162    /// Histogram of CPU token wait times
163    cpu_wait_histogram: Mutex<Histogram>,
164
165    // === I/O Metrics ===
166    /// Current number of available I/O tokens (gauge)
167    io_tokens_available: AtomicUsize,
168
169    /// Total I/O tokens configured (static)
170    io_tokens_total: usize,
171
172    /// Total time spent waiting for I/O tokens (counter, milliseconds)
173    io_wait_total_ms: AtomicU64,
174
175    /// Histogram of I/O token wait times
176    io_wait_histogram: Mutex<Histogram>,
177
178    // === Memory Metrics ===
179    /// Current memory usage in bytes (gauge)
180    memory_used_bytes: AtomicUsize,
181
182    /// Memory capacity in bytes (static)
183    memory_capacity_bytes: usize,
184
185    // === Worker Metrics ===
186    /// Number of currently active workers (gauge)
187    active_workers: AtomicUsize,
188
189    /// Total number of tasks spawned (counter)
190    tasks_spawned: AtomicU64,
191
192    /// Total number of tasks completed (counter)
193    tasks_completed: AtomicU64,
194
195    // === Channel Queue Metrics ===
196    /// Current depth of CPU worker channel (gauge)
197    /// Educational: Reveals backpressure - high depth means workers can't keep
198    /// up
199    cpu_queue_depth: AtomicUsize,
200
201    /// Maximum CPU queue depth observed (gauge)
202    /// Educational: Shows peak backpressure during processing
203    cpu_queue_depth_max: AtomicUsize,
204
205    /// Histogram of time chunks wait in CPU queue
206    /// Educational: Queue wait time indicates worker saturation
207    cpu_queue_wait_histogram: Mutex<Histogram>,
208}
209
210impl ConcurrencyMetrics {
211    pub fn new(cpu_tokens: usize, io_tokens: usize, memory_capacity: usize) -> Self {
212        Self {
213            cpu_tokens_available: AtomicUsize::new(cpu_tokens),
214            cpu_tokens_total: cpu_tokens,
215            cpu_wait_total_ms: AtomicU64::new(0),
216            cpu_wait_histogram: Mutex::new(Histogram::new()),
217
218            io_tokens_available: AtomicUsize::new(io_tokens),
219            io_tokens_total: io_tokens,
220            io_wait_total_ms: AtomicU64::new(0),
221            io_wait_histogram: Mutex::new(Histogram::new()),
222
223            memory_used_bytes: AtomicUsize::new(0),
224            memory_capacity_bytes: memory_capacity,
225
226            active_workers: AtomicUsize::new(0),
227            tasks_spawned: AtomicU64::new(0),
228            tasks_completed: AtomicU64::new(0),
229
230            // Queue metrics
231            cpu_queue_depth: AtomicUsize::new(0),
232            cpu_queue_depth_max: AtomicUsize::new(0),
233            cpu_queue_wait_histogram: Mutex::new(Histogram::new()),
234        }
235    }
236
237    // === CPU Metrics ===
238
239    /// Update CPU tokens available (from ResourceManager)
240    pub fn update_cpu_tokens_available(&self, available: usize) {
241        self.cpu_tokens_available.store(available, Ordering::Relaxed);
242    }
243
244    /// Get CPU tokens available
245    pub fn cpu_tokens_available(&self) -> usize {
246        self.cpu_tokens_available.load(Ordering::Relaxed)
247    }
248
249    /// Get CPU saturation percentage
250    ///
251    /// ## Educational: What is saturation?
252    ///
253    /// Saturation = (tokens_in_use / total_tokens) × 100
254    /// - 0%: Idle, not utilizing resources
255    /// - 50%: Good utilization
256    /// - 80-90%: High utilization, approaching saturation
257    /// - 100%: Fully saturated, tasks waiting
258    pub fn cpu_saturation_percent(&self) -> f64 {
259        let available = self.cpu_tokens_available.load(Ordering::Relaxed);
260        let in_use = self.cpu_tokens_total.saturating_sub(available);
261        ((in_use as f64) / (self.cpu_tokens_total as f64)) * 100.0
262    }
263
264    /// Record CPU token wait time
265    pub fn record_cpu_wait(&self, duration: Duration) {
266        let ms = duration.as_millis() as u64;
267        self.cpu_wait_total_ms.fetch_add(ms, Ordering::Relaxed);
268
269        if let Ok(hist) = self.cpu_wait_histogram.lock() {
270            hist.record(ms);
271        }
272    }
273
274    /// Get CPU wait time percentile
275    pub fn cpu_wait_p50(&self) -> u64 {
276        self.cpu_wait_histogram.lock().map(|h| h.percentile(50.0)).unwrap_or(0)
277    }
278
279    pub fn cpu_wait_p95(&self) -> u64 {
280        self.cpu_wait_histogram.lock().map(|h| h.percentile(95.0)).unwrap_or(0)
281    }
282
283    pub fn cpu_wait_p99(&self) -> u64 {
284        self.cpu_wait_histogram.lock().map(|h| h.percentile(99.0)).unwrap_or(0)
285    }
286
287    // === I/O Metrics ===
288
289    /// Update I/O tokens available (from ResourceManager)
290    pub fn update_io_tokens_available(&self, available: usize) {
291        self.io_tokens_available.store(available, Ordering::Relaxed);
292    }
293
294    pub fn io_tokens_available(&self) -> usize {
295        self.io_tokens_available.load(Ordering::Relaxed)
296    }
297
298    pub fn io_saturation_percent(&self) -> f64 {
299        let available = self.io_tokens_available.load(Ordering::Relaxed);
300        let in_use = self.io_tokens_total.saturating_sub(available);
301        ((in_use as f64) / (self.io_tokens_total as f64)) * 100.0
302    }
303
304    /// Record I/O token wait time
305    pub fn record_io_wait(&self, duration: Duration) {
306        let ms = duration.as_millis() as u64;
307        self.io_wait_total_ms.fetch_add(ms, Ordering::Relaxed);
308
309        if let Ok(hist) = self.io_wait_histogram.lock() {
310            hist.record(ms);
311        }
312    }
313
314    pub fn io_wait_p50(&self) -> u64 {
315        self.io_wait_histogram.lock().map(|h| h.percentile(50.0)).unwrap_or(0)
316    }
317
318    pub fn io_wait_p95(&self) -> u64 {
319        self.io_wait_histogram.lock().map(|h| h.percentile(95.0)).unwrap_or(0)
320    }
321
322    pub fn io_wait_p99(&self) -> u64 {
323        self.io_wait_histogram.lock().map(|h| h.percentile(99.0)).unwrap_or(0)
324    }
325
326    // === Memory Metrics ===
327
328    pub fn update_memory_used(&self, bytes: usize) {
329        self.memory_used_bytes.store(bytes, Ordering::Relaxed);
330    }
331
332    pub fn memory_used_bytes(&self) -> usize {
333        self.memory_used_bytes.load(Ordering::Relaxed)
334    }
335
336    pub fn memory_used_mb(&self) -> f64 {
337        (self.memory_used_bytes() as f64) / (1024.0 * 1024.0)
338    }
339
340    pub fn memory_capacity_bytes(&self) -> usize {
341        self.memory_capacity_bytes
342    }
343
344    pub fn memory_utilization_percent(&self) -> f64 {
345        ((self.memory_used_bytes() as f64) / (self.memory_capacity_bytes as f64)) * 100.0
346    }
347
348    // === Worker Metrics ===
349
350    pub fn worker_started(&self) {
351        self.active_workers.fetch_add(1, Ordering::Relaxed);
352        self.tasks_spawned.fetch_add(1, Ordering::Relaxed);
353    }
354
355    pub fn worker_completed(&self) {
356        self.active_workers.fetch_sub(1, Ordering::Relaxed);
357        self.tasks_completed.fetch_add(1, Ordering::Relaxed);
358    }
359
360    pub fn active_workers(&self) -> usize {
361        self.active_workers.load(Ordering::Relaxed)
362    }
363
364    pub fn tasks_spawned(&self) -> u64 {
365        self.tasks_spawned.load(Ordering::Relaxed)
366    }
367
368    pub fn tasks_completed(&self) -> u64 {
369        self.tasks_completed.load(Ordering::Relaxed)
370    }
371
372    // === Channel Queue Metrics ===
373
374    /// Update CPU queue depth
375    ///
376    /// ## Educational: Observing Backpressure
377    ///
378    /// Queue depth reveals whether workers can keep up with the reader:
379    /// - Depth near 0: Workers are faster than reader (good!)
380    /// - Depth near capacity: Workers are bottleneck (increase workers or
381    ///   optimize stages)
382    /// - Depth at capacity: Reader is blocked (severe backpressure)
383    pub fn update_cpu_queue_depth(&self, depth: usize) {
384        self.cpu_queue_depth.store(depth, Ordering::Relaxed);
385
386        // Track maximum depth observed
387        let mut current_max = self.cpu_queue_depth_max.load(Ordering::Relaxed);
388        while depth > current_max {
389            match self.cpu_queue_depth_max.compare_exchange_weak(
390                current_max,
391                depth,
392                Ordering::Relaxed,
393                Ordering::Relaxed,
394            ) {
395                Ok(_) => {
396                    break;
397                }
398                Err(x) => {
399                    current_max = x;
400                }
401            }
402        }
403    }
404
405    /// Get current CPU queue depth
406    pub fn cpu_queue_depth(&self) -> usize {
407        self.cpu_queue_depth.load(Ordering::Relaxed)
408    }
409
410    /// Get maximum CPU queue depth observed
411    pub fn cpu_queue_depth_max(&self) -> usize {
412        self.cpu_queue_depth_max.load(Ordering::Relaxed)
413    }
414
415    /// Record time a chunk waited in CPU queue
416    pub fn record_cpu_queue_wait(&self, duration: Duration) {
417        let ms = duration.as_millis() as u64;
418        if let Ok(hist) = self.cpu_queue_wait_histogram.lock() {
419            hist.record(ms);
420        }
421    }
422
423    /// Get P50 (median) CPU queue wait time in milliseconds
424    pub fn cpu_queue_wait_p50(&self) -> u64 {
425        self.cpu_queue_wait_histogram
426            .lock()
427            .map(|h| h.percentile(50.0))
428            .unwrap_or(0)
429    }
430
431    /// Get P95 CPU queue wait time in milliseconds
432    pub fn cpu_queue_wait_p95(&self) -> u64 {
433        self.cpu_queue_wait_histogram
434            .lock()
435            .map(|h| h.percentile(95.0))
436            .unwrap_or(0)
437    }
438
439    /// Get P99 CPU queue wait time in milliseconds
440    pub fn cpu_queue_wait_p99(&self) -> u64 {
441        self.cpu_queue_wait_histogram
442            .lock()
443            .map(|h| h.percentile(99.0))
444            .unwrap_or(0)
445    }
446
447    /// Reset all metrics (for testing/benchmarking)
448    pub fn reset(&self) {
449        self.cpu_wait_total_ms.store(0, Ordering::Relaxed);
450        self.io_wait_total_ms.store(0, Ordering::Relaxed);
451        self.tasks_spawned.store(0, Ordering::Relaxed);
452        self.tasks_completed.store(0, Ordering::Relaxed);
453
454        // Reset queue metrics
455        self.cpu_queue_depth.store(0, Ordering::Relaxed);
456        self.cpu_queue_depth_max.store(0, Ordering::Relaxed);
457
458        if let Ok(hist) = self.cpu_wait_histogram.lock() {
459            hist.reset();
460        }
461        if let Ok(hist) = self.io_wait_histogram.lock() {
462            hist.reset();
463        }
464        if let Ok(hist) = self.cpu_queue_wait_histogram.lock() {
465            hist.reset();
466        }
467    }
468}
469
470/// Global concurrency metrics instance
471///
472/// ## Educational: Lazy Initialization
473///
474/// Initialized from RESOURCE_MANAGER values on first access.
475/// This ensures metrics match actual resource configuration.
476pub static CONCURRENCY_METRICS: std::sync::LazyLock<Arc<ConcurrencyMetrics>> = std::sync::LazyLock::new(|| {
477    use crate::infrastructure::runtime::RESOURCE_MANAGER;
478
479    Arc::new(ConcurrencyMetrics::new(
480        RESOURCE_MANAGER.cpu_tokens_total(),
481        RESOURCE_MANAGER.io_tokens_total(),
482        RESOURCE_MANAGER.memory_capacity(),
483    ))
484});
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    #[test]
491    fn test_histogram_basic() {
492        let hist = Histogram::new();
493
494        hist.record(2); // 1-5ms bucket
495        hist.record(7); // 5-10ms bucket
496        hist.record(25); // 10-50ms bucket
497
498        assert_eq!(hist.count(), 3);
499    }
500
501    #[test]
502    fn test_concurrency_metrics_creation() {
503        let metrics = ConcurrencyMetrics::new(8, 24, 1024 * 1024 * 1024);
504
505        assert_eq!(metrics.cpu_tokens_available(), 8);
506        assert_eq!(metrics.io_tokens_available(), 24);
507        assert_eq!(metrics.cpu_saturation_percent(), 0.0);
508    }
509
510    #[test]
511    fn test_saturation_calculation() {
512        let metrics = ConcurrencyMetrics::new(10, 20, 1024);
513
514        // Initially no saturation
515        assert_eq!(metrics.cpu_saturation_percent(), 0.0);
516
517        // Simulate 5 tokens in use
518        metrics.update_cpu_tokens_available(5);
519        assert_eq!(metrics.cpu_saturation_percent(), 50.0);
520
521        // Fully saturated
522        metrics.update_cpu_tokens_available(0);
523        assert_eq!(metrics.cpu_saturation_percent(), 100.0);
524    }
525
526    #[test]
527    fn test_wait_time_recording() {
528        let metrics = ConcurrencyMetrics::new(8, 24, 1024);
529
530        metrics.record_cpu_wait(Duration::from_millis(10));
531        metrics.record_cpu_wait(Duration::from_millis(20));
532
533        // Recorded in histogram
534        assert!(metrics.cpu_wait_p50() > 0);
535    }
536
537    #[test]
538    fn test_worker_tracking() {
539        let metrics = ConcurrencyMetrics::new(8, 24, 1024);
540
541        assert_eq!(metrics.active_workers(), 0);
542
543        metrics.worker_started();
544        assert_eq!(metrics.active_workers(), 1);
545        assert_eq!(metrics.tasks_spawned(), 1);
546
547        metrics.worker_completed();
548        assert_eq!(metrics.active_workers(), 0);
549        assert_eq!(metrics.tasks_completed(), 1);
550    }
551}