oxigdal_streaming/metrics/
collector.rs1use crate::error::Result;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
12pub enum MetricType {
13 Counter,
15
16 Gauge,
18
19 Histogram,
21
22 Summary,
24
25 Timer,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum MetricValue {
32 Integer(i64),
34
35 Float(f64),
37
38 Histogram {
40 buckets: Vec<f64>,
42 counts: Vec<u64>,
44 },
45
46 Summary {
48 count: u64,
50 sum: f64,
52 quantiles: Vec<(f64, f64)>,
54 },
55}
56
57impl MetricValue {
58 pub fn as_i64(&self) -> Option<i64> {
60 match self {
61 MetricValue::Integer(v) => Some(*v),
62 _ => None,
63 }
64 }
65
66 pub fn as_f64(&self) -> Option<f64> {
68 match self {
69 MetricValue::Float(v) => Some(*v),
70 MetricValue::Integer(v) => Some(*v as f64),
71 _ => None,
72 }
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct Metric {
79 pub name: String,
81
82 pub metric_type: MetricType,
84
85 pub value: MetricValue,
87
88 pub timestamp: DateTime<Utc>,
90
91 pub tags: HashMap<String, String>,
93
94 pub help: Option<String>,
96}
97
98impl Metric {
99 pub fn new(name: String, metric_type: MetricType, value: MetricValue) -> Self {
101 Self {
102 name,
103 metric_type,
104 value,
105 timestamp: Utc::now(),
106 tags: HashMap::new(),
107 help: None,
108 }
109 }
110
111 pub fn with_tag(mut self, key: String, value: String) -> Self {
113 self.tags.insert(key, value);
114 self
115 }
116
117 pub fn with_help(mut self, help: String) -> Self {
119 self.help = Some(help);
120 self
121 }
122}
123
124pub struct MetricsCollector {
126 metrics: Arc<RwLock<HashMap<String, Metric>>>,
127 enabled: Arc<RwLock<bool>>,
128}
129
130impl MetricsCollector {
131 pub fn new() -> Self {
133 Self {
134 metrics: Arc::new(RwLock::new(HashMap::new())),
135 enabled: Arc::new(RwLock::new(true)),
136 }
137 }
138
139 pub async fn enable(&self) {
141 *self.enabled.write().await = true;
142 }
143
144 pub async fn disable(&self) {
146 *self.enabled.write().await = false;
147 }
148
149 pub async fn is_enabled(&self) -> bool {
151 *self.enabled.read().await
152 }
153
154 pub async fn record(&self, metric: Metric) -> Result<()> {
156 if !self.is_enabled().await {
157 return Ok(());
158 }
159
160 let mut metrics = self.metrics.write().await;
161 metrics.insert(metric.name.clone(), metric);
162
163 Ok(())
164 }
165
166 pub async fn increment_counter(&self, name: &str, value: i64) -> Result<()> {
168 if !self.is_enabled().await {
169 return Ok(());
170 }
171
172 let mut metrics = self.metrics.write().await;
173
174 let metric = metrics.entry(name.to_string()).or_insert_with(|| {
175 Metric::new(
176 name.to_string(),
177 MetricType::Counter,
178 MetricValue::Integer(0),
179 )
180 });
181
182 if let MetricValue::Integer(current) = metric.value {
183 metric.value = MetricValue::Integer(current + value);
184 metric.timestamp = Utc::now();
185 }
186
187 Ok(())
188 }
189
190 pub async fn set_gauge(&self, name: &str, value: f64) -> Result<()> {
192 if !self.is_enabled().await {
193 return Ok(());
194 }
195
196 let mut metrics = self.metrics.write().await;
197
198 let metric = metrics.entry(name.to_string()).or_insert_with(|| {
199 Metric::new(name.to_string(), MetricType::Gauge, MetricValue::Float(0.0))
200 });
201
202 metric.value = MetricValue::Float(value);
203 metric.timestamp = Utc::now();
204
205 Ok(())
206 }
207
208 pub async fn record_histogram(&self, name: &str, value: f64, buckets: Vec<f64>) -> Result<()> {
210 if !self.is_enabled().await {
211 return Ok(());
212 }
213
214 let mut metrics = self.metrics.write().await;
215
216 let metric = metrics.entry(name.to_string()).or_insert_with(|| {
217 let counts = vec![0; buckets.len()];
218 Metric::new(
219 name.to_string(),
220 MetricType::Histogram,
221 MetricValue::Histogram {
222 buckets: buckets.clone(),
223 counts,
224 },
225 )
226 });
227
228 if let MetricValue::Histogram { buckets, counts } = &mut metric.value {
229 for (i, &bucket) in buckets.iter().enumerate() {
230 if value <= bucket {
231 counts[i] += 1;
232 }
233 }
234 metric.timestamp = Utc::now();
235 }
236
237 Ok(())
238 }
239
240 pub async fn get_metric(&self, name: &str) -> Option<Metric> {
242 self.metrics.read().await.get(name).cloned()
243 }
244
245 pub async fn get_all_metrics(&self) -> Vec<Metric> {
247 self.metrics.read().await.values().cloned().collect()
248 }
249
250 pub async fn clear(&self) -> Result<()> {
252 self.metrics.write().await.clear();
253 Ok(())
254 }
255
256 pub async fn metric_count(&self) -> usize {
258 self.metrics.read().await.len()
259 }
260}
261
262impl Default for MetricsCollector {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[tokio::test]
273 async fn test_metrics_collector() {
274 let collector = MetricsCollector::new();
275
276 assert!(collector.is_enabled().await);
277 assert_eq!(collector.metric_count().await, 0);
278 }
279
280 #[tokio::test]
281 async fn test_counter_metric() {
282 let collector = MetricsCollector::new();
283
284 collector
285 .increment_counter("test_counter", 5)
286 .await
287 .expect("Failed to increment counter by 5 in test");
288 collector
289 .increment_counter("test_counter", 3)
290 .await
291 .expect("Failed to increment counter by 3 in test");
292
293 let metric = collector
294 .get_metric("test_counter")
295 .await
296 .expect("Failed to get test_counter metric");
297 assert_eq!(metric.value.as_i64(), Some(8));
298 }
299
300 #[tokio::test]
301 async fn test_gauge_metric() {
302 let collector = MetricsCollector::new();
303
304 collector
305 .set_gauge("test_gauge", 42.5)
306 .await
307 .expect("Failed to set gauge to 42.5 in test");
308
309 let metric = collector
310 .get_metric("test_gauge")
311 .await
312 .expect("Failed to get test_gauge metric after first set");
313 assert_eq!(metric.value.as_f64(), Some(42.5));
314
315 collector
316 .set_gauge("test_gauge", 100.0)
317 .await
318 .expect("Failed to set gauge to 100.0 in test");
319
320 let metric = collector
321 .get_metric("test_gauge")
322 .await
323 .expect("Failed to get test_gauge metric after second set");
324 assert_eq!(metric.value.as_f64(), Some(100.0));
325 }
326
327 #[tokio::test]
328 async fn test_histogram_metric() {
329 let collector = MetricsCollector::new();
330 let buckets = vec![1.0, 5.0, 10.0, 50.0, 100.0];
331
332 collector
333 .record_histogram("test_histogram", 3.0, buckets.clone())
334 .await
335 .expect("Failed to record histogram value 3.0 in test");
336
337 collector
338 .record_histogram("test_histogram", 7.0, buckets.clone())
339 .await
340 .expect("Failed to record histogram value 7.0 in test");
341
342 let metric = collector
343 .get_metric("test_histogram")
344 .await
345 .expect("Failed to get test_histogram metric");
346 assert_eq!(metric.metric_type, MetricType::Histogram);
347 }
348
349 #[tokio::test]
350 async fn test_enable_disable() {
351 let collector = MetricsCollector::new();
352
353 assert!(collector.is_enabled().await);
354
355 collector.disable().await;
356 assert!(!collector.is_enabled().await);
357
358 collector
359 .increment_counter("test", 1)
360 .await
361 .expect("Failed to increment counter while disabled in test");
362 assert_eq!(collector.metric_count().await, 0);
363
364 collector.enable().await;
365 collector
366 .increment_counter("test", 1)
367 .await
368 .expect("Failed to increment counter after enable in test");
369 assert_eq!(collector.metric_count().await, 1);
370 }
371}