fluxus_core/
metrics.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
4use std::time::{Duration, Instant};
5
6/// Counter for accumulating values
7#[derive(Debug, Default)]
8pub struct Counter {
9    value: AtomicU64,
10}
11
12impl Counter {
13    pub fn new() -> Self {
14        Self {
15            value: AtomicU64::new(0),
16        }
17    }
18
19    pub fn increment(&self) {
20        self.value.fetch_add(1, Ordering::Relaxed);
21    }
22
23    pub fn add(&self, value: u64) {
24        self.value.fetch_add(value, Ordering::Relaxed);
25    }
26
27    pub fn value(&self) -> u64 {
28        self.value.load(Ordering::Relaxed)
29    }
30}
31
32/// Gauge for tracking current value
33#[derive(Debug, Default)]
34pub struct Gauge {
35    value: AtomicI64,
36}
37
38impl Gauge {
39    pub fn new() -> Self {
40        Self {
41            value: AtomicI64::new(0),
42        }
43    }
44
45    pub fn set(&self, value: i64) {
46        self.value.store(value, Ordering::Relaxed);
47    }
48
49    pub fn value(&self) -> i64 {
50        self.value.load(Ordering::Relaxed)
51    }
52}
53
54/// Timer for measuring durations
55#[derive(Debug)]
56pub struct Timer {
57    start: Instant,
58    duration_counter: Counter,
59    count_counter: Counter,
60}
61
62impl Default for Timer {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl Timer {
69    pub fn new() -> Self {
70        Self {
71            start: Instant::now(),
72            duration_counter: Counter::new(),
73            count_counter: Counter::new(),
74        }
75    }
76
77    pub fn start(&mut self) {
78        self.start = Instant::now();
79    }
80
81    pub fn stop(&mut self) {
82        let duration = self.start.elapsed();
83        self.duration_counter.add(duration.as_micros() as u64);
84        self.count_counter.increment();
85    }
86
87    /// Record a duration directly
88    pub fn record(&mut self, duration: Duration) {
89        self.duration_counter.add(duration.as_micros() as u64);
90        self.count_counter.increment();
91    }
92
93    pub fn average_duration_micros(&self) -> u64 {
94        let total = self.duration_counter.value();
95        let count = self.count_counter.value();
96        if count == 0 { 0 } else { total / count }
97    }
98}
99
100/// Metrics collection for pipeline monitoring
101#[derive(Debug, Default)]
102pub struct Metrics {
103    counters: HashMap<String, Arc<Counter>>,
104    gauges: HashMap<String, Arc<Gauge>>,
105    timers: HashMap<String, Arc<Timer>>,
106}
107
108impl Metrics {
109    pub fn new() -> Self {
110        Self::default()
111    }
112
113    pub fn counter(&mut self, name: &str) -> Arc<Counter> {
114        self.counters
115            .entry(name.to_string())
116            .or_insert_with(|| Arc::new(Counter::new()))
117            .clone()
118    }
119
120    pub fn gauge(&mut self, name: &str) -> Arc<Gauge> {
121        self.gauges
122            .entry(name.to_string())
123            .or_insert_with(|| Arc::new(Gauge::new()))
124            .clone()
125    }
126
127    pub fn timer(&mut self, name: &str) -> Arc<Timer> {
128        self.timers
129            .entry(name.to_string())
130            .or_insert_with(|| Arc::new(Timer::new()))
131            .clone()
132    }
133
134    pub fn snapshot(&self) -> HashMap<String, MetricValue> {
135        let mut snapshot = HashMap::new();
136
137        for (name, counter) in &self.counters {
138            snapshot.insert(name.clone(), MetricValue::Counter(counter.value()));
139        }
140
141        for (name, gauge) in &self.gauges {
142            snapshot.insert(name.clone(), MetricValue::Gauge(gauge.value()));
143        }
144
145        for (name, timer) in &self.timers {
146            snapshot.insert(
147                name.clone(),
148                MetricValue::Timer {
149                    avg_micros: timer.average_duration_micros(),
150                    count: timer.count_counter.value(),
151                },
152            );
153        }
154
155        snapshot
156    }
157}
158
159#[derive(Debug, Clone)]
160pub enum MetricValue {
161    Counter(u64),
162    Gauge(i64),
163    Timer { avg_micros: u64, count: u64 },
164}