Skip to main content

oxigdal_streaming/metrics/
collector.rs

1//! Metrics collection for streaming operations.
2
3use crate::error::Result;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10/// Type of metric.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
12pub enum MetricType {
13    /// Counter metric (monotonically increasing)
14    Counter,
15
16    /// Gauge metric (can increase or decrease)
17    Gauge,
18
19    /// Histogram metric
20    Histogram,
21
22    /// Summary metric
23    Summary,
24
25    /// Timer metric
26    Timer,
27}
28
29/// Value of a metric.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum MetricValue {
32    /// Integer value
33    Integer(i64),
34
35    /// Floating point value
36    Float(f64),
37
38    /// Histogram values
39    Histogram {
40        /// Bucket boundaries
41        buckets: Vec<f64>,
42        /// Counts per bucket
43        counts: Vec<u64>,
44    },
45
46    /// Summary values
47    Summary {
48        /// Count
49        count: u64,
50        /// Sum
51        sum: f64,
52        /// Quantiles (as sorted vec of (quantile, value) pairs)
53        quantiles: Vec<(f64, f64)>,
54    },
55}
56
57impl MetricValue {
58    /// Get as integer.
59    pub fn as_i64(&self) -> Option<i64> {
60        match self {
61            MetricValue::Integer(v) => Some(*v),
62            _ => None,
63        }
64    }
65
66    /// Get as float.
67    pub fn as_f64(&self) -> Option<f64> {
68        match self {
69            MetricValue::Float(v) => Some(*v),
70            MetricValue::Integer(v) => Some(*v as f64),
71            _ => None,
72        }
73    }
74}
75
76/// A single metric.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct Metric {
79    /// Metric name
80    pub name: String,
81
82    /// Metric type
83    pub metric_type: MetricType,
84
85    /// Metric value
86    pub value: MetricValue,
87
88    /// Timestamp
89    pub timestamp: DateTime<Utc>,
90
91    /// Tags
92    pub tags: HashMap<String, String>,
93
94    /// Help text
95    pub help: Option<String>,
96}
97
98impl Metric {
99    /// Create a new metric.
100    pub fn new(name: String, metric_type: MetricType, value: MetricValue) -> Self {
101        Self {
102            name,
103            metric_type,
104            value,
105            timestamp: Utc::now(),
106            tags: HashMap::new(),
107            help: None,
108        }
109    }
110
111    /// Add a tag.
112    pub fn with_tag(mut self, key: String, value: String) -> Self {
113        self.tags.insert(key, value);
114        self
115    }
116
117    /// Add help text.
118    pub fn with_help(mut self, help: String) -> Self {
119        self.help = Some(help);
120        self
121    }
122}
123
124/// Collector for streaming metrics.
125pub struct MetricsCollector {
126    metrics: Arc<RwLock<HashMap<String, Metric>>>,
127    enabled: Arc<RwLock<bool>>,
128}
129
130impl MetricsCollector {
131    /// Create a new metrics collector.
132    pub fn new() -> Self {
133        Self {
134            metrics: Arc::new(RwLock::new(HashMap::new())),
135            enabled: Arc::new(RwLock::new(true)),
136        }
137    }
138
139    /// Enable metrics collection.
140    pub async fn enable(&self) {
141        *self.enabled.write().await = true;
142    }
143
144    /// Disable metrics collection.
145    pub async fn disable(&self) {
146        *self.enabled.write().await = false;
147    }
148
149    /// Check if enabled.
150    pub async fn is_enabled(&self) -> bool {
151        *self.enabled.read().await
152    }
153
154    /// Record a metric.
155    pub async fn record(&self, metric: Metric) -> Result<()> {
156        if !self.is_enabled().await {
157            return Ok(());
158        }
159
160        let mut metrics = self.metrics.write().await;
161        metrics.insert(metric.name.clone(), metric);
162
163        Ok(())
164    }
165
166    /// Increment a counter.
167    pub async fn increment_counter(&self, name: &str, value: i64) -> Result<()> {
168        if !self.is_enabled().await {
169            return Ok(());
170        }
171
172        let mut metrics = self.metrics.write().await;
173
174        let metric = metrics.entry(name.to_string()).or_insert_with(|| {
175            Metric::new(
176                name.to_string(),
177                MetricType::Counter,
178                MetricValue::Integer(0),
179            )
180        });
181
182        if let MetricValue::Integer(current) = metric.value {
183            metric.value = MetricValue::Integer(current + value);
184            metric.timestamp = Utc::now();
185        }
186
187        Ok(())
188    }
189
190    /// Set a gauge value.
191    pub async fn set_gauge(&self, name: &str, value: f64) -> Result<()> {
192        if !self.is_enabled().await {
193            return Ok(());
194        }
195
196        let mut metrics = self.metrics.write().await;
197
198        let metric = metrics.entry(name.to_string()).or_insert_with(|| {
199            Metric::new(name.to_string(), MetricType::Gauge, MetricValue::Float(0.0))
200        });
201
202        metric.value = MetricValue::Float(value);
203        metric.timestamp = Utc::now();
204
205        Ok(())
206    }
207
208    /// Record a histogram value.
209    pub async fn record_histogram(&self, name: &str, value: f64, buckets: Vec<f64>) -> Result<()> {
210        if !self.is_enabled().await {
211            return Ok(());
212        }
213
214        let mut metrics = self.metrics.write().await;
215
216        let metric = metrics.entry(name.to_string()).or_insert_with(|| {
217            let counts = vec![0; buckets.len()];
218            Metric::new(
219                name.to_string(),
220                MetricType::Histogram,
221                MetricValue::Histogram {
222                    buckets: buckets.clone(),
223                    counts,
224                },
225            )
226        });
227
228        if let MetricValue::Histogram { buckets, counts } = &mut metric.value {
229            for (i, &bucket) in buckets.iter().enumerate() {
230                if value <= bucket {
231                    counts[i] += 1;
232                }
233            }
234            metric.timestamp = Utc::now();
235        }
236
237        Ok(())
238    }
239
240    /// Get a metric by name.
241    pub async fn get_metric(&self, name: &str) -> Option<Metric> {
242        self.metrics.read().await.get(name).cloned()
243    }
244
245    /// Get all metrics.
246    pub async fn get_all_metrics(&self) -> Vec<Metric> {
247        self.metrics.read().await.values().cloned().collect()
248    }
249
250    /// Clear all metrics.
251    pub async fn clear(&self) -> Result<()> {
252        self.metrics.write().await.clear();
253        Ok(())
254    }
255
256    /// Get metric count.
257    pub async fn metric_count(&self) -> usize {
258        self.metrics.read().await.len()
259    }
260}
261
262impl Default for MetricsCollector {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[tokio::test]
273    async fn test_metrics_collector() {
274        let collector = MetricsCollector::new();
275
276        assert!(collector.is_enabled().await);
277        assert_eq!(collector.metric_count().await, 0);
278    }
279
280    #[tokio::test]
281    async fn test_counter_metric() {
282        let collector = MetricsCollector::new();
283
284        collector
285            .increment_counter("test_counter", 5)
286            .await
287            .expect("Failed to increment counter by 5 in test");
288        collector
289            .increment_counter("test_counter", 3)
290            .await
291            .expect("Failed to increment counter by 3 in test");
292
293        let metric = collector
294            .get_metric("test_counter")
295            .await
296            .expect("Failed to get test_counter metric");
297        assert_eq!(metric.value.as_i64(), Some(8));
298    }
299
300    #[tokio::test]
301    async fn test_gauge_metric() {
302        let collector = MetricsCollector::new();
303
304        collector
305            .set_gauge("test_gauge", 42.5)
306            .await
307            .expect("Failed to set gauge to 42.5 in test");
308
309        let metric = collector
310            .get_metric("test_gauge")
311            .await
312            .expect("Failed to get test_gauge metric after first set");
313        assert_eq!(metric.value.as_f64(), Some(42.5));
314
315        collector
316            .set_gauge("test_gauge", 100.0)
317            .await
318            .expect("Failed to set gauge to 100.0 in test");
319
320        let metric = collector
321            .get_metric("test_gauge")
322            .await
323            .expect("Failed to get test_gauge metric after second set");
324        assert_eq!(metric.value.as_f64(), Some(100.0));
325    }
326
327    #[tokio::test]
328    async fn test_histogram_metric() {
329        let collector = MetricsCollector::new();
330        let buckets = vec![1.0, 5.0, 10.0, 50.0, 100.0];
331
332        collector
333            .record_histogram("test_histogram", 3.0, buckets.clone())
334            .await
335            .expect("Failed to record histogram value 3.0 in test");
336
337        collector
338            .record_histogram("test_histogram", 7.0, buckets.clone())
339            .await
340            .expect("Failed to record histogram value 7.0 in test");
341
342        let metric = collector
343            .get_metric("test_histogram")
344            .await
345            .expect("Failed to get test_histogram metric");
346        assert_eq!(metric.metric_type, MetricType::Histogram);
347    }
348
349    #[tokio::test]
350    async fn test_enable_disable() {
351        let collector = MetricsCollector::new();
352
353        assert!(collector.is_enabled().await);
354
355        collector.disable().await;
356        assert!(!collector.is_enabled().await);
357
358        collector
359            .increment_counter("test", 1)
360            .await
361            .expect("Failed to increment counter while disabled in test");
362        assert_eq!(collector.metric_count().await, 0);
363
364        collector.enable().await;
365        collector
366            .increment_counter("test", 1)
367            .await
368            .expect("Failed to increment counter after enable in test");
369        assert_eq!(collector.metric_count().await, 1);
370    }
371}