1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone)]
11pub struct MetricsCollector {
12 counters: Arc<RwLock<HashMap<String, AtomicU64>>>,
13 gauges: Arc<RwLock<HashMap<String, AtomicU64>>>,
14 histograms: Arc<RwLock<HashMap<String, TaskHistogram>>>,
15 start_time: Instant,
16 memory_tracker: Arc<MemoryTracker>,
17 performance_tracker: Arc<PerformanceTracker>,
18 alert_manager: Arc<AlertManager>,
19}
20
21#[derive(Debug)]
23pub struct MemoryTracker {
24 allocated_bytes: AtomicUsize,
25 peak_memory: AtomicUsize,
26 active_tasks: AtomicUsize,
27 total_allocations: AtomicU64,
28}
29
30#[derive(Debug)]
32pub struct TaskHistogram {
33 samples: Vec<Duration>,
34 total_count: AtomicU64,
35 total_duration: AtomicU64,
36}
37
38#[derive(Debug, Serialize, Deserialize, Clone)]
40pub struct SystemMetrics {
41 pub timestamp: DateTime<Utc>,
42 pub uptime_seconds: u64,
43 pub memory: MemoryMetrics,
44 pub performance: PerformanceMetrics,
45 pub tasks: TaskMetrics,
46 pub queues: Vec<QueueDetailedMetrics>,
47 pub workers: WorkerMetrics,
48}
49
50#[derive(Debug, Serialize, Deserialize, Clone)]
52pub struct MemoryMetrics {
53 pub current_bytes: usize,
54 pub peak_bytes: usize,
55 pub total_allocations: u64,
56 pub active_tasks: usize,
57 pub memory_efficiency: f64, }
59
60#[derive(Debug, Serialize, Deserialize, Clone)]
62pub struct PerformanceMetrics {
63 pub tasks_per_second: f64,
64 pub average_execution_time_ms: f64,
65 pub p95_execution_time_ms: f64,
66 pub p99_execution_time_ms: f64,
67 pub success_rate: f64,
68 pub error_rate: f64,
69}
70
71#[derive(Debug, Serialize, Deserialize, Clone)]
73pub struct TaskMetrics {
74 pub total_executed: u64,
75 pub total_succeeded: u64,
76 pub total_failed: u64,
77 pub total_retried: u64,
78 pub total_timed_out: u64,
79 pub active_tasks: u64,
80}
81
82#[derive(Debug, Serialize, Deserialize, Clone)]
84pub struct QueueDetailedMetrics {
85 pub queue_name: String,
86 pub pending_tasks: i64,
87 pub processed_tasks: i64,
88 pub failed_tasks: i64,
89 pub average_wait_time_ms: f64,
90}
91
92#[derive(Debug, Serialize, Deserialize, Clone)]
94pub struct WorkerMetrics {
95 pub active_workers: u64,
96 pub idle_workers: u64,
97 pub busy_workers: u64,
98 pub worker_utilization: f64,
99 pub tasks_per_worker: f64,
100}
101
102#[derive(Debug)]
104pub struct PerformanceTracker {
105 task_execution_times: Arc<RwLock<HashMap<String, Vec<Duration>>>>,
106 #[allow(dead_code)] queue_latencies: Arc<RwLock<HashMap<String, Vec<Duration>>>>,
108 error_rates: Arc<RwLock<HashMap<String, ErrorRateTracker>>>,
109 sla_violations: Arc<RwLock<Vec<SLAViolation>>>,
110}
111
112#[derive(Debug)]
114pub struct AlertManager {
115 active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
116 alert_thresholds: Arc<RwLock<AlertThresholds>>,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct AlertThresholds {
122 pub max_queue_size: u64,
123 pub max_error_rate: f64,
124 pub max_task_duration_ms: u64,
125 pub max_memory_usage_mb: u64,
126 pub max_worker_idle_time_sec: u64,
127}
128
129impl Default for AlertThresholds {
130 fn default() -> Self {
131 Self {
132 max_queue_size: 10000,
133 max_error_rate: 0.05, max_task_duration_ms: 300000, max_memory_usage_mb: 1024, max_worker_idle_time_sec: 300, }
138 }
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct Alert {
144 pub id: String,
145 pub severity: AlertSeverity,
146 pub message: String,
147 pub timestamp: SystemTime,
148 pub metric_name: String,
149 pub current_value: f64,
150 pub threshold: f64,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub enum AlertSeverity {
155 Info,
156 Warning,
157 Critical,
158 Emergency,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct SLAViolation {
164 pub violation_type: SLAViolationType,
165 pub timestamp: SystemTime,
166 pub details: String,
167 pub metric_value: f64,
168 pub threshold: f64,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub enum SLAViolationType {
173 TaskTimeoutExceeded,
174 QueueBacklogTooHigh,
175 ErrorRateTooHigh,
176 MemoryUsageTooHigh,
177 WorkerUtilizationTooLow,
178}
179
180#[derive(Debug)]
182pub struct ErrorRateTracker {
183 errors: Vec<SystemTime>,
184 total_executions: u64,
185 window_size: Duration,
186}
187
188impl ErrorRateTracker {
189 pub fn new(window_size: Duration) -> Self {
190 Self {
191 errors: Vec::new(),
192 total_executions: 0,
193 window_size,
194 }
195 }
196
197 pub fn record_execution(&mut self, is_error: bool) {
198 self.total_executions += 1;
199 if is_error {
200 self.errors.push(SystemTime::now());
201 }
202 self.cleanup_old_entries();
203 }
204
205 pub fn error_rate(&mut self) -> f64 {
206 self.cleanup_old_entries();
207 if self.total_executions == 0 {
208 return 0.0;
209 }
210 self.errors.len() as f64 / self.total_executions as f64
211 }
212
213 fn cleanup_old_entries(&mut self) {
214 let cutoff = SystemTime::now() - self.window_size;
215 self.errors.retain(|&time| time > cutoff);
216 }
217}
218
219impl MetricsCollector {
220 pub fn new() -> Self {
221 Self {
222 counters: Arc::new(RwLock::new(HashMap::new())),
223 gauges: Arc::new(RwLock::new(HashMap::new())),
224 histograms: Arc::new(RwLock::new(HashMap::new())),
225 start_time: Instant::now(),
226 memory_tracker: Arc::new(MemoryTracker::new()),
227 performance_tracker: Arc::new(PerformanceTracker::new()),
228 alert_manager: Arc::new(AlertManager::new()),
229 }
230 }
231
232 pub async fn increment_counter(&self, name: &str, value: u64) {
234 let counters = self.counters.read().await;
235 if let Some(counter) = counters.get(name) {
236 counter.fetch_add(value, Ordering::Relaxed);
237 } else {
238 drop(counters);
239 let mut counters = self.counters.write().await;
240 counters
241 .entry(name.to_string())
242 .or_insert_with(|| AtomicU64::new(0))
243 .fetch_add(value, Ordering::Relaxed);
244 }
245 }
246
247 pub async fn set_gauge(&self, name: &str, value: u64) {
249 let gauges = self.gauges.read().await;
250 if let Some(gauge) = gauges.get(name) {
251 gauge.store(value, Ordering::Relaxed);
252 } else {
253 drop(gauges);
254 let mut gauges = self.gauges.write().await;
255 gauges
256 .entry(name.to_string())
257 .or_insert_with(|| AtomicU64::new(0))
258 .store(value, Ordering::Relaxed);
259 }
260 }
261
262 pub async fn record_timing(&self, name: &str, duration: Duration) {
264 let mut histograms = self.histograms.write().await;
265 let histogram = histograms
266 .entry(name.to_string())
267 .or_insert_with(TaskHistogram::new);
268 histogram.record(duration);
269 }
270
271 pub fn track_allocation(&self, bytes: usize) {
273 self.memory_tracker.track_allocation(bytes);
274 }
275
276 pub fn track_deallocation(&self, bytes: usize) {
278 self.memory_tracker.track_deallocation(bytes);
279 }
280
281 pub fn track_task_start(&self) {
283 self.memory_tracker.track_task_start();
284 }
285
286 pub fn track_task_end(&self) {
288 self.memory_tracker.track_task_end();
289 }
290
291 pub async fn record_task_execution(&self, task_name: &str, duration: Duration, success: bool) {
293 let mut histograms = self.histograms.write().await;
295 let histogram = histograms
296 .entry(format!("task_execution_time_{}", task_name))
297 .or_insert_with(TaskHistogram::new);
298 histogram.record(duration);
299
300 self.performance_tracker
302 .record_execution(task_name, duration, success)
303 .await;
304
305 self.increment_counter(&format!("tasks_executed_{}", task_name), 1)
307 .await;
308 if success {
309 self.increment_counter(&format!("tasks_succeeded_{}", task_name), 1)
310 .await;
311 } else {
312 self.increment_counter(&format!("tasks_failed_{}", task_name), 1)
313 .await;
314 }
315
316 self.alert_manager
318 .check_task_performance_alerts(task_name, duration, success)
319 .await;
320 }
321
322 pub async fn get_system_metrics(&self) -> SystemMetrics {
324 let uptime = self.start_time.elapsed().as_secs();
325
326 let counters = self.counters.read().await;
328 let total_executed = counters
329 .get("tasks_executed")
330 .map(|c| c.load(Ordering::Relaxed))
331 .unwrap_or(0);
332 let total_succeeded = counters
333 .get("tasks_succeeded")
334 .map(|c| c.load(Ordering::Relaxed))
335 .unwrap_or(0);
336 let total_failed = counters
337 .get("tasks_failed")
338 .map(|c| c.load(Ordering::Relaxed))
339 .unwrap_or(0);
340 let total_retried = counters
341 .get("tasks_retried")
342 .map(|c| c.load(Ordering::Relaxed))
343 .unwrap_or(0);
344 let total_timed_out = counters
345 .get("tasks_timed_out")
346 .map(|c| c.load(Ordering::Relaxed))
347 .unwrap_or(0);
348
349 let gauges = self.gauges.read().await;
351 let active_tasks = gauges
352 .get("active_tasks")
353 .map(|g| g.load(Ordering::Relaxed))
354 .unwrap_or(0);
355 let active_workers = gauges
356 .get("active_workers")
357 .map(|g| g.load(Ordering::Relaxed))
358 .unwrap_or(0);
359
360 let histograms = self.histograms.read().await;
362 let execution_histogram = histograms.get("task_execution_time");
363
364 let (avg_execution_ms, p95_ms, p99_ms) = if let Some(hist) = execution_histogram {
365 (
366 hist.average().as_millis() as f64,
367 hist.percentile(0.95).as_millis() as f64,
368 hist.percentile(0.99).as_millis() as f64,
369 )
370 } else {
371 (0.0, 0.0, 0.0)
372 };
373
374 let tasks_per_second = if uptime > 0 {
376 total_executed as f64 / uptime as f64
377 } else {
378 0.0
379 };
380
381 let success_rate = if total_executed > 0 {
382 total_succeeded as f64 / total_executed as f64
383 } else {
384 0.0
385 };
386
387 let error_rate = if total_executed > 0 {
388 total_failed as f64 / total_executed as f64
389 } else {
390 0.0
391 };
392
393 let memory_metrics = self.memory_tracker.get_metrics();
395
396 SystemMetrics {
397 timestamp: Utc::now(),
398 uptime_seconds: uptime,
399 memory: memory_metrics,
400 performance: PerformanceMetrics {
401 tasks_per_second,
402 average_execution_time_ms: avg_execution_ms,
403 p95_execution_time_ms: p95_ms,
404 p99_execution_time_ms: p99_ms,
405 success_rate,
406 error_rate,
407 },
408 tasks: TaskMetrics {
409 total_executed,
410 total_succeeded,
411 total_failed,
412 total_retried,
413 total_timed_out,
414 active_tasks,
415 },
416 queues: Vec::new(), workers: WorkerMetrics {
418 active_workers,
419 idle_workers: 0, busy_workers: 0, worker_utilization: 0.0,
422 tasks_per_worker: if active_workers > 0 {
423 total_executed as f64 / active_workers as f64
424 } else {
425 0.0
426 },
427 },
428 }
429 }
430
431 pub async fn get_metrics_summary(&self) -> String {
433 let metrics = self.get_system_metrics().await;
434 format!(
435 "TaskQueue Metrics Summary:\n\
436 - Uptime: {}s\n\
437 - Tasks: {} executed, {} succeeded, {} failed\n\
438 - Memory: {} bytes current, {} bytes peak\n\
439 - Performance: {:.2} tasks/sec, {:.2}ms avg execution\n\
440 - Workers: {} active\n\
441 - Success Rate: {:.1}%",
442 metrics.uptime_seconds,
443 metrics.tasks.total_executed,
444 metrics.tasks.total_succeeded,
445 metrics.tasks.total_failed,
446 metrics.memory.current_bytes,
447 metrics.memory.peak_bytes,
448 metrics.performance.tasks_per_second,
449 metrics.performance.average_execution_time_ms,
450 metrics.workers.active_workers,
451 metrics.performance.success_rate * 100.0
452 )
453 }
454
455 pub async fn get_performance_report(&self) -> PerformanceReport {
457 let histograms = self.histograms.read().await;
458 let mut task_performance = HashMap::new();
459
460 for (name, histogram) in histograms.iter() {
461 if name.starts_with("task_execution_time_") {
462 let task_name = name.strip_prefix("task_execution_time_").unwrap();
463 task_performance.insert(
464 task_name.to_string(),
465 TaskPerformanceMetrics {
466 avg_duration_ms: histogram.average().as_millis() as f64,
467 p50_duration_ms: histogram.percentile(0.50).as_millis() as u64,
468 p95_duration_ms: histogram.percentile(0.95).as_millis() as u64,
469 p99_duration_ms: histogram.percentile(0.99).as_millis() as u64,
470 total_executions: histogram.count(),
471 },
472 );
473 }
474 }
475
476 PerformanceReport {
477 uptime_seconds: self.start_time.elapsed().as_secs(),
478 task_performance,
479 active_alerts: self.alert_manager.get_active_alerts().await,
480 sla_violations: self.performance_tracker.get_recent_violations().await,
481 }
482 }
483
484 pub async fn get_health_status(&self) -> SystemHealthStatus {
486 let memory_metrics = self.memory_tracker.get_metrics();
487 let active_alerts = self.alert_manager.get_active_alerts().await;
488
489 let status = if active_alerts.iter().any(|a| {
490 matches!(
491 a.severity,
492 AlertSeverity::Critical | AlertSeverity::Emergency
493 )
494 }) {
495 HealthStatus::Critical
496 } else if !active_alerts.is_empty() {
497 HealthStatus::Warning
498 } else {
499 HealthStatus::Healthy
500 };
501
502 SystemHealthStatus {
503 status,
504 memory_usage_mb: (memory_metrics.current_bytes / (1024 * 1024)) as u64,
505 uptime_seconds: self.start_time.elapsed().as_secs(),
506 active_alert_count: active_alerts.len() as u32,
507 critical_alert_count: active_alerts
508 .iter()
509 .filter(|a| {
510 matches!(
511 a.severity,
512 AlertSeverity::Critical | AlertSeverity::Emergency
513 )
514 })
515 .count() as u32,
516 }
517 }
518}
519
520impl MemoryTracker {
521 pub fn new() -> Self {
522 Self {
523 allocated_bytes: AtomicUsize::new(0),
524 peak_memory: AtomicUsize::new(0),
525 active_tasks: AtomicUsize::new(0),
526 total_allocations: AtomicU64::new(0),
527 }
528 }
529
530 pub fn track_allocation(&self, bytes: usize) {
531 let current = self.allocated_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
532
533 let mut peak = self.peak_memory.load(Ordering::Relaxed);
535 while current > peak {
536 match self.peak_memory.compare_exchange_weak(
537 peak,
538 current,
539 Ordering::Relaxed,
540 Ordering::Relaxed,
541 ) {
542 Ok(_) => break,
543 Err(new_peak) => peak = new_peak,
544 }
545 }
546
547 self.total_allocations.fetch_add(1, Ordering::Relaxed);
548 }
549
550 pub fn track_deallocation(&self, bytes: usize) {
551 self.allocated_bytes.fetch_sub(bytes, Ordering::Relaxed);
552 }
553
554 pub fn track_task_start(&self) {
555 self.active_tasks.fetch_add(1, Ordering::Relaxed);
556 }
557
558 pub fn track_task_end(&self) {
559 self.active_tasks.fetch_sub(1, Ordering::Relaxed);
560 }
561
562 pub fn get_metrics(&self) -> MemoryMetrics {
563 let current = self.allocated_bytes.load(Ordering::Relaxed);
564 let peak = self.peak_memory.load(Ordering::Relaxed);
565 let active = self.active_tasks.load(Ordering::Relaxed);
566 let total_allocs = self.total_allocations.load(Ordering::Relaxed);
567
568 let efficiency = if active > 0 {
569 current as f64 / active as f64
570 } else {
571 0.0
572 };
573
574 MemoryMetrics {
575 current_bytes: current,
576 peak_bytes: peak,
577 total_allocations: total_allocs,
578 active_tasks: active,
579 memory_efficiency: efficiency,
580 }
581 }
582}
583
584impl TaskHistogram {
585 pub fn new() -> Self {
586 Self {
587 samples: Vec::new(),
588 total_count: AtomicU64::new(0),
589 total_duration: AtomicU64::new(0),
590 }
591 }
592
593 pub fn record(&mut self, duration: Duration) {
594 self.samples.push(duration);
595 self.total_count.fetch_add(1, Ordering::Relaxed);
596 self.total_duration
597 .fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
598
599 if self.samples.len() > 10000 {
601 self.samples.drain(..5000);
602 }
603 }
604
605 pub fn average(&self) -> Duration {
606 let count = self.total_count.load(Ordering::Relaxed);
607 if count == 0 {
608 return Duration::from_millis(0);
609 }
610
611 let total_ms = self.total_duration.load(Ordering::Relaxed);
612 Duration::from_millis(total_ms / count)
613 }
614
615 pub fn percentile(&self, p: f64) -> Duration {
616 if self.samples.is_empty() {
617 return Duration::from_millis(0);
618 }
619
620 let mut sorted_samples = self.samples.clone();
621 sorted_samples.sort();
622
623 let index = (sorted_samples.len() as f64 * p).ceil() as usize - 1;
624 sorted_samples[index.min(sorted_samples.len() - 1)]
625 }
626
627 pub fn count(&self) -> u64 {
628 self.total_count.load(Ordering::Relaxed)
629 }
630}
631
632impl PerformanceTracker {
633 pub fn new() -> Self {
634 Self {
635 task_execution_times: Arc::new(RwLock::new(HashMap::new())),
636 queue_latencies: Arc::new(RwLock::new(HashMap::new())),
637 error_rates: Arc::new(RwLock::new(HashMap::new())),
638 sla_violations: Arc::new(RwLock::new(Vec::new())),
639 }
640 }
641
642 pub async fn record_execution(&self, task_name: &str, duration: Duration, success: bool) {
643 let mut times = self.task_execution_times.write().await;
645 times
646 .entry(task_name.to_string())
647 .or_insert_with(Vec::new)
648 .push(duration);
649
650 let mut error_rates = self.error_rates.write().await;
652 error_rates
653 .entry(task_name.to_string())
654 .or_insert_with(|| ErrorRateTracker::new(Duration::from_secs(300))) .record_execution(!success);
656 }
657
658 pub async fn get_recent_violations(&self) -> Vec<SLAViolation> {
659 let violations = self.sla_violations.read().await;
660 violations.clone()
661 }
662}
663
664impl AlertManager {
665 pub fn new() -> Self {
666 Self {
667 active_alerts: Arc::new(RwLock::new(HashMap::new())),
668 alert_thresholds: Arc::new(RwLock::new(AlertThresholds::default())),
669 }
670 }
671
672 pub async fn check_task_performance_alerts(
673 &self,
674 task_name: &str,
675 duration: Duration,
676 _success: bool,
677 ) {
678 let thresholds = self.alert_thresholds.read().await;
679
680 if duration.as_millis() > thresholds.max_task_duration_ms as u128 {
682 let alert = Alert {
683 id: format!(
684 "task_duration_{}_{}",
685 task_name,
686 SystemTime::now()
687 .duration_since(UNIX_EPOCH)
688 .unwrap()
689 .as_secs()
690 ),
691 severity: AlertSeverity::Warning,
692 message: format!(
693 "Task {} took {}ms (threshold: {}ms)",
694 task_name,
695 duration.as_millis(),
696 thresholds.max_task_duration_ms
697 ),
698 timestamp: SystemTime::now(),
699 metric_name: "task_duration".to_string(),
700 current_value: duration.as_millis() as f64,
701 threshold: thresholds.max_task_duration_ms as f64,
702 };
703
704 let mut alerts = self.active_alerts.write().await;
705 alerts.insert(alert.id.clone(), alert);
706 }
707 }
708
709 pub async fn get_active_alerts(&self) -> Vec<Alert> {
710 let alerts = self.active_alerts.read().await;
711 alerts.values().cloned().collect()
712 }
713}
714
715#[derive(Debug, Serialize, Deserialize)]
717pub struct PerformanceReport {
718 pub uptime_seconds: u64,
719 pub task_performance: HashMap<String, TaskPerformanceMetrics>,
720 pub active_alerts: Vec<Alert>,
721 pub sla_violations: Vec<SLAViolation>,
722}
723
724#[derive(Debug, Serialize, Deserialize)]
725pub struct TaskPerformanceMetrics {
726 pub avg_duration_ms: f64,
727 pub p50_duration_ms: u64,
728 pub p95_duration_ms: u64,
729 pub p99_duration_ms: u64,
730 pub total_executions: u64,
731}
732
733#[derive(Debug, Serialize, Deserialize)]
734pub struct SystemHealthStatus {
735 pub status: HealthStatus,
736 pub memory_usage_mb: u64,
737 pub uptime_seconds: u64,
738 pub active_alert_count: u32,
739 pub critical_alert_count: u32,
740}
741
742#[derive(Debug, Serialize, Deserialize)]
743pub enum HealthStatus {
744 Healthy,
745 Warning,
746 Critical,
747}
748
749impl Default for MetricsCollector {
750 fn default() -> Self {
751 Self::new()
752 }
753}
754
755impl Default for MemoryTracker {
756 fn default() -> Self {
757 Self::new()
758 }
759}
760
761impl Default for TaskHistogram {
762 fn default() -> Self {
763 Self::new()
764 }
765}
766
767impl Default for PerformanceTracker {
768 fn default() -> Self {
769 Self::new()
770 }
771}
772
773impl Default for AlertManager {
774 fn default() -> Self {
775 Self::new()
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782 use std::time::Duration;
783
784 #[tokio::test]
785 async fn test_metrics_collector_creation() {
786 let collector = MetricsCollector::new();
787 let metrics = collector.get_system_metrics().await;
788
789 assert_eq!(metrics.tasks.total_executed, 0);
790 assert_eq!(metrics.memory.current_bytes, 0);
791 }
792
793 #[tokio::test]
794 async fn test_counter_increment() {
795 let collector = MetricsCollector::new();
796
797 collector.increment_counter("test_counter", 5).await;
798 collector.increment_counter("test_counter", 3).await;
799
800 let counters = collector.counters.read().await;
801 let value = counters
802 .get("test_counter")
803 .unwrap()
804 .load(Ordering::Relaxed);
805 assert_eq!(value, 8);
806 }
807
808 #[tokio::test]
809 async fn test_gauge_setting() {
810 let collector = MetricsCollector::new();
811
812 collector.set_gauge("test_gauge", 42).await;
813 collector.set_gauge("test_gauge", 100).await;
814
815 let gauges = collector.gauges.read().await;
816 let value = gauges.get("test_gauge").unwrap().load(Ordering::Relaxed);
817 assert_eq!(value, 100);
818 }
819
820 #[tokio::test]
821 async fn test_timing_recording() {
822 let collector = MetricsCollector::new();
823
824 collector
825 .record_timing("test_timing", Duration::from_millis(100))
826 .await;
827 collector
828 .record_timing("test_timing", Duration::from_millis(200))
829 .await;
830
831 let histograms = collector.histograms.read().await;
832 let histogram = histograms.get("test_timing").unwrap();
833 let avg = histogram.average();
834
835 assert_eq!(avg, Duration::from_millis(150));
836 }
837
838 #[test]
839 fn test_memory_tracker() {
840 let tracker = MemoryTracker::new();
841
842 tracker.track_allocation(1000);
843 tracker.track_allocation(500);
844 tracker.track_task_start();
845 tracker.track_task_start();
846
847 let metrics = tracker.get_metrics();
848 assert_eq!(metrics.current_bytes, 1500);
849 assert_eq!(metrics.peak_bytes, 1500);
850 assert_eq!(metrics.active_tasks, 2);
851 assert_eq!(metrics.memory_efficiency, 750.0);
852
853 tracker.track_deallocation(300);
854 tracker.track_task_end();
855
856 let metrics = tracker.get_metrics();
857 assert_eq!(metrics.current_bytes, 1200);
858 assert_eq!(metrics.active_tasks, 1);
859 assert_eq!(metrics.memory_efficiency, 1200.0);
860 }
861
862 #[test]
863 fn test_histogram_percentiles() {
864 let mut histogram = TaskHistogram::new();
865
866 for i in 1..=10 {
868 histogram.record(Duration::from_millis(i * 10));
869 }
870
871 assert_eq!(histogram.average(), Duration::from_millis(55));
872 assert_eq!(histogram.percentile(0.9), Duration::from_millis(90)); assert_eq!(histogram.percentile(0.95), Duration::from_millis(100)); }
875
876 #[tokio::test]
877 async fn test_metrics_summary() {
878 let collector = MetricsCollector::new();
879
880 collector.increment_counter("tasks_executed", 100).await;
881 collector.increment_counter("tasks_succeeded", 95).await;
882 collector.increment_counter("tasks_failed", 5).await;
883 collector.set_gauge("active_workers", 3).await;
884
885 let summary = collector.get_metrics_summary().await;
886
887 assert!(summary.contains("100 executed"));
888 assert!(summary.contains("95 succeeded"));
889 assert!(summary.contains("5 failed"));
890 assert!(summary.contains("3 active"));
891 assert!(summary.contains("95.0%")); }
893}