adaptive_pipeline/infrastructure/metrics/
concurrency_metrics.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex};
43use std::time::Duration;
44
45#[derive(Debug)]
55pub struct Histogram {
56 buckets: Vec<AtomicU64>,
59 bucket_boundaries: Vec<u64>,
60}
61
62impl Default for Histogram {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl Histogram {
69 pub fn new() -> Self {
70 let bucket_boundaries = vec![1, 5, 10, 50, 100];
73 let buckets = bucket_boundaries
74 .iter()
75 .map(|_| AtomicU64::new(0))
76 .chain(std::iter::once(AtomicU64::new(0))) .collect();
78
79 Self {
80 buckets,
81 bucket_boundaries,
82 }
83 }
84
85 pub fn record(&self, value_ms: u64) {
87 let bucket_idx = self
89 .bucket_boundaries
90 .iter()
91 .position(|&boundary| value_ms < boundary)
92 .unwrap_or(self.bucket_boundaries.len());
93
94 self.buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
95 }
96
97 pub fn count(&self) -> u64 {
99 self.buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
100 }
101
102 pub fn percentile(&self, p: f64) -> u64 {
109 let total = self.count();
110 if total == 0 {
111 return 0;
112 }
113
114 let target = (((total as f64) * p) / 100.0) as u64;
115 let mut cumulative = 0u64;
116
117 for (i, bucket) in self.buckets.iter().enumerate() {
118 cumulative += bucket.load(Ordering::Relaxed);
119 if cumulative >= target {
120 return if i < self.bucket_boundaries.len() {
121 self.bucket_boundaries[i]
122 } else {
123 100 };
125 }
126 }
127
128 0
129 }
130
131 pub fn reset(&self) {
133 for bucket in &self.buckets {
134 bucket.store(0, Ordering::Relaxed);
135 }
136 }
137}
138
139#[derive(Debug)]
151pub struct ConcurrencyMetrics {
152 cpu_tokens_available: AtomicUsize,
155
156 cpu_tokens_total: usize,
158
159 cpu_wait_total_ms: AtomicU64,
161
162 cpu_wait_histogram: Mutex<Histogram>,
164
165 io_tokens_available: AtomicUsize,
168
169 io_tokens_total: usize,
171
172 io_wait_total_ms: AtomicU64,
174
175 io_wait_histogram: Mutex<Histogram>,
177
178 memory_used_bytes: AtomicUsize,
181
182 memory_capacity_bytes: usize,
184
185 active_workers: AtomicUsize,
188
189 tasks_spawned: AtomicU64,
191
192 tasks_completed: AtomicU64,
194
195 cpu_queue_depth: AtomicUsize,
200
201 cpu_queue_depth_max: AtomicUsize,
204
205 cpu_queue_wait_histogram: Mutex<Histogram>,
208}
209
210impl ConcurrencyMetrics {
211 pub fn new(cpu_tokens: usize, io_tokens: usize, memory_capacity: usize) -> Self {
212 Self {
213 cpu_tokens_available: AtomicUsize::new(cpu_tokens),
214 cpu_tokens_total: cpu_tokens,
215 cpu_wait_total_ms: AtomicU64::new(0),
216 cpu_wait_histogram: Mutex::new(Histogram::new()),
217
218 io_tokens_available: AtomicUsize::new(io_tokens),
219 io_tokens_total: io_tokens,
220 io_wait_total_ms: AtomicU64::new(0),
221 io_wait_histogram: Mutex::new(Histogram::new()),
222
223 memory_used_bytes: AtomicUsize::new(0),
224 memory_capacity_bytes: memory_capacity,
225
226 active_workers: AtomicUsize::new(0),
227 tasks_spawned: AtomicU64::new(0),
228 tasks_completed: AtomicU64::new(0),
229
230 cpu_queue_depth: AtomicUsize::new(0),
232 cpu_queue_depth_max: AtomicUsize::new(0),
233 cpu_queue_wait_histogram: Mutex::new(Histogram::new()),
234 }
235 }
236
237 pub fn update_cpu_tokens_available(&self, available: usize) {
241 self.cpu_tokens_available.store(available, Ordering::Relaxed);
242 }
243
244 pub fn cpu_tokens_available(&self) -> usize {
246 self.cpu_tokens_available.load(Ordering::Relaxed)
247 }
248
249 pub fn cpu_saturation_percent(&self) -> f64 {
259 let available = self.cpu_tokens_available.load(Ordering::Relaxed);
260 let in_use = self.cpu_tokens_total.saturating_sub(available);
261 ((in_use as f64) / (self.cpu_tokens_total as f64)) * 100.0
262 }
263
264 pub fn record_cpu_wait(&self, duration: Duration) {
266 let ms = duration.as_millis() as u64;
267 self.cpu_wait_total_ms.fetch_add(ms, Ordering::Relaxed);
268
269 if let Ok(hist) = self.cpu_wait_histogram.lock() {
270 hist.record(ms);
271 }
272 }
273
274 pub fn cpu_wait_p50(&self) -> u64 {
276 self.cpu_wait_histogram.lock().map(|h| h.percentile(50.0)).unwrap_or(0)
277 }
278
279 pub fn cpu_wait_p95(&self) -> u64 {
280 self.cpu_wait_histogram.lock().map(|h| h.percentile(95.0)).unwrap_or(0)
281 }
282
283 pub fn cpu_wait_p99(&self) -> u64 {
284 self.cpu_wait_histogram.lock().map(|h| h.percentile(99.0)).unwrap_or(0)
285 }
286
287 pub fn update_io_tokens_available(&self, available: usize) {
291 self.io_tokens_available.store(available, Ordering::Relaxed);
292 }
293
294 pub fn io_tokens_available(&self) -> usize {
295 self.io_tokens_available.load(Ordering::Relaxed)
296 }
297
298 pub fn io_saturation_percent(&self) -> f64 {
299 let available = self.io_tokens_available.load(Ordering::Relaxed);
300 let in_use = self.io_tokens_total.saturating_sub(available);
301 ((in_use as f64) / (self.io_tokens_total as f64)) * 100.0
302 }
303
304 pub fn record_io_wait(&self, duration: Duration) {
306 let ms = duration.as_millis() as u64;
307 self.io_wait_total_ms.fetch_add(ms, Ordering::Relaxed);
308
309 if let Ok(hist) = self.io_wait_histogram.lock() {
310 hist.record(ms);
311 }
312 }
313
314 pub fn io_wait_p50(&self) -> u64 {
315 self.io_wait_histogram.lock().map(|h| h.percentile(50.0)).unwrap_or(0)
316 }
317
318 pub fn io_wait_p95(&self) -> u64 {
319 self.io_wait_histogram.lock().map(|h| h.percentile(95.0)).unwrap_or(0)
320 }
321
322 pub fn io_wait_p99(&self) -> u64 {
323 self.io_wait_histogram.lock().map(|h| h.percentile(99.0)).unwrap_or(0)
324 }
325
326 pub fn update_memory_used(&self, bytes: usize) {
329 self.memory_used_bytes.store(bytes, Ordering::Relaxed);
330 }
331
332 pub fn memory_used_bytes(&self) -> usize {
333 self.memory_used_bytes.load(Ordering::Relaxed)
334 }
335
336 pub fn memory_used_mb(&self) -> f64 {
337 (self.memory_used_bytes() as f64) / (1024.0 * 1024.0)
338 }
339
340 pub fn memory_capacity_bytes(&self) -> usize {
341 self.memory_capacity_bytes
342 }
343
344 pub fn memory_utilization_percent(&self) -> f64 {
345 ((self.memory_used_bytes() as f64) / (self.memory_capacity_bytes as f64)) * 100.0
346 }
347
348 pub fn worker_started(&self) {
351 self.active_workers.fetch_add(1, Ordering::Relaxed);
352 self.tasks_spawned.fetch_add(1, Ordering::Relaxed);
353 }
354
355 pub fn worker_completed(&self) {
356 self.active_workers.fetch_sub(1, Ordering::Relaxed);
357 self.tasks_completed.fetch_add(1, Ordering::Relaxed);
358 }
359
360 pub fn active_workers(&self) -> usize {
361 self.active_workers.load(Ordering::Relaxed)
362 }
363
364 pub fn tasks_spawned(&self) -> u64 {
365 self.tasks_spawned.load(Ordering::Relaxed)
366 }
367
368 pub fn tasks_completed(&self) -> u64 {
369 self.tasks_completed.load(Ordering::Relaxed)
370 }
371
372 pub fn update_cpu_queue_depth(&self, depth: usize) {
384 self.cpu_queue_depth.store(depth, Ordering::Relaxed);
385
386 let mut current_max = self.cpu_queue_depth_max.load(Ordering::Relaxed);
388 while depth > current_max {
389 match self.cpu_queue_depth_max.compare_exchange_weak(
390 current_max,
391 depth,
392 Ordering::Relaxed,
393 Ordering::Relaxed,
394 ) {
395 Ok(_) => {
396 break;
397 }
398 Err(x) => {
399 current_max = x;
400 }
401 }
402 }
403 }
404
405 pub fn cpu_queue_depth(&self) -> usize {
407 self.cpu_queue_depth.load(Ordering::Relaxed)
408 }
409
410 pub fn cpu_queue_depth_max(&self) -> usize {
412 self.cpu_queue_depth_max.load(Ordering::Relaxed)
413 }
414
415 pub fn record_cpu_queue_wait(&self, duration: Duration) {
417 let ms = duration.as_millis() as u64;
418 if let Ok(hist) = self.cpu_queue_wait_histogram.lock() {
419 hist.record(ms);
420 }
421 }
422
423 pub fn cpu_queue_wait_p50(&self) -> u64 {
425 self.cpu_queue_wait_histogram
426 .lock()
427 .map(|h| h.percentile(50.0))
428 .unwrap_or(0)
429 }
430
431 pub fn cpu_queue_wait_p95(&self) -> u64 {
433 self.cpu_queue_wait_histogram
434 .lock()
435 .map(|h| h.percentile(95.0))
436 .unwrap_or(0)
437 }
438
439 pub fn cpu_queue_wait_p99(&self) -> u64 {
441 self.cpu_queue_wait_histogram
442 .lock()
443 .map(|h| h.percentile(99.0))
444 .unwrap_or(0)
445 }
446
447 pub fn reset(&self) {
449 self.cpu_wait_total_ms.store(0, Ordering::Relaxed);
450 self.io_wait_total_ms.store(0, Ordering::Relaxed);
451 self.tasks_spawned.store(0, Ordering::Relaxed);
452 self.tasks_completed.store(0, Ordering::Relaxed);
453
454 self.cpu_queue_depth.store(0, Ordering::Relaxed);
456 self.cpu_queue_depth_max.store(0, Ordering::Relaxed);
457
458 if let Ok(hist) = self.cpu_wait_histogram.lock() {
459 hist.reset();
460 }
461 if let Ok(hist) = self.io_wait_histogram.lock() {
462 hist.reset();
463 }
464 if let Ok(hist) = self.cpu_queue_wait_histogram.lock() {
465 hist.reset();
466 }
467 }
468}
469
470pub static CONCURRENCY_METRICS: std::sync::LazyLock<Arc<ConcurrencyMetrics>> = std::sync::LazyLock::new(|| {
477 use crate::infrastructure::runtime::RESOURCE_MANAGER;
478
479 Arc::new(ConcurrencyMetrics::new(
480 RESOURCE_MANAGER.cpu_tokens_total(),
481 RESOURCE_MANAGER.io_tokens_total(),
482 RESOURCE_MANAGER.memory_capacity(),
483 ))
484});
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 #[test]
491 fn test_histogram_basic() {
492 let hist = Histogram::new();
493
494 hist.record(2); hist.record(7); hist.record(25); assert_eq!(hist.count(), 3);
499 }
500
501 #[test]
502 fn test_concurrency_metrics_creation() {
503 let metrics = ConcurrencyMetrics::new(8, 24, 1024 * 1024 * 1024);
504
505 assert_eq!(metrics.cpu_tokens_available(), 8);
506 assert_eq!(metrics.io_tokens_available(), 24);
507 assert_eq!(metrics.cpu_saturation_percent(), 0.0);
508 }
509
510 #[test]
511 fn test_saturation_calculation() {
512 let metrics = ConcurrencyMetrics::new(10, 20, 1024);
513
514 assert_eq!(metrics.cpu_saturation_percent(), 0.0);
516
517 metrics.update_cpu_tokens_available(5);
519 assert_eq!(metrics.cpu_saturation_percent(), 50.0);
520
521 metrics.update_cpu_tokens_available(0);
523 assert_eq!(metrics.cpu_saturation_percent(), 100.0);
524 }
525
526 #[test]
527 fn test_wait_time_recording() {
528 let metrics = ConcurrencyMetrics::new(8, 24, 1024);
529
530 metrics.record_cpu_wait(Duration::from_millis(10));
531 metrics.record_cpu_wait(Duration::from_millis(20));
532
533 assert!(metrics.cpu_wait_p50() > 0);
535 }
536
537 #[test]
538 fn test_worker_tracking() {
539 let metrics = ConcurrencyMetrics::new(8, 24, 1024);
540
541 assert_eq!(metrics.active_workers(), 0);
542
543 metrics.worker_started();
544 assert_eq!(metrics.active_workers(), 1);
545 assert_eq!(metrics.tasks_spawned(), 1);
546
547 metrics.worker_completed();
548 assert_eq!(metrics.active_workers(), 0);
549 assert_eq!(metrics.tasks_completed(), 1);
550 }
551}