Skip to main content

oxigdal_streaming/metrics/
reporter.rs

1//! Metrics reporting for streaming operations.
2
3use super::collector::{Metric, MetricType, MetricValue};
4use crate::error::Result;
5use async_trait::async_trait;
6use serde_json;
7
8/// Trait for metrics reporters.
9#[async_trait]
10pub trait MetricsReporter: Send + Sync {
11    /// Report a single metric.
12    async fn report_metric(&self, metric: &Metric) -> Result<()>;
13
14    /// Report multiple metrics.
15    async fn report_metrics(&self, metrics: &[Metric]) -> Result<()>;
16
17    /// Flush any buffered metrics.
18    async fn flush(&self) -> Result<()>;
19}
20
21/// Console reporter that prints metrics to stdout.
22pub struct ConsoleReporter {
23    format: ReportFormat,
24}
25
26/// Format for console output.
27#[derive(Debug, Clone, Copy)]
28pub enum ReportFormat {
29    /// Human-readable format
30    Human,
31
32    /// JSON format
33    Json,
34
35    /// Prometheus format
36    Prometheus,
37}
38
39impl ConsoleReporter {
40    /// Create a new console reporter.
41    pub fn new() -> Self {
42        Self {
43            format: ReportFormat::Human,
44        }
45    }
46
47    /// Create a console reporter with specified format.
48    pub fn with_format(format: ReportFormat) -> Self {
49        Self { format }
50    }
51
52    /// Format a metric for human-readable output.
53    fn format_human(&self, metric: &Metric) -> String {
54        let value_str = match &metric.value {
55            MetricValue::Integer(v) => format!("{}", v),
56            MetricValue::Float(v) => format!("{:.2}", v),
57            MetricValue::Histogram { buckets, counts } => {
58                let mut s = String::from("[");
59                for (bucket, count) in buckets.iter().zip(counts.iter()) {
60                    s.push_str(&format!("{}:{}, ", bucket, count));
61                }
62                s.push(']');
63                s
64            }
65            MetricValue::Summary {
66                count,
67                sum,
68                quantiles,
69            } => {
70                format!("count={}, sum={:.2}, quantiles={:?}", count, sum, quantiles)
71            }
72        };
73
74        let tags_str = if metric.tags.is_empty() {
75            String::new()
76        } else {
77            let tags: Vec<String> = metric
78                .tags
79                .iter()
80                .map(|(k, v)| format!("{}={}", k, v))
81                .collect();
82            format!(" {{{}}}", tags.join(", "))
83        };
84
85        format!(
86            "{} [{:?}] = {}{}",
87            metric.name, metric.metric_type, value_str, tags_str
88        )
89    }
90
91    /// Format a metric for Prometheus output.
92    fn format_prometheus(&self, metric: &Metric) -> String {
93        let help = if let Some(h) = &metric.help {
94            format!("# HELP {} {}\n", metric.name, h)
95        } else {
96            String::new()
97        };
98
99        let type_str = match metric.metric_type {
100            MetricType::Counter => "counter",
101            MetricType::Gauge => "gauge",
102            MetricType::Histogram => "histogram",
103            MetricType::Summary => "summary",
104            MetricType::Timer => "gauge",
105        };
106
107        let type_line = format!("# TYPE {} {}\n", metric.name, type_str);
108
109        let value_line = match &metric.value {
110            MetricValue::Integer(v) => {
111                format!("{} {}", metric.name, v)
112            }
113            MetricValue::Float(v) => {
114                format!("{} {}", metric.name, v)
115            }
116            MetricValue::Histogram { buckets, counts } => {
117                let mut lines = Vec::new();
118                let mut cumulative = 0;
119
120                for (bucket, count) in buckets.iter().zip(counts.iter()) {
121                    cumulative += count;
122                    lines.push(format!(
123                        "{}_bucket{{le=\"{}\"}} {}",
124                        metric.name, bucket, cumulative
125                    ));
126                }
127
128                lines.push(format!(
129                    "{}_bucket{{le=\"+Inf\"}} {}",
130                    metric.name, cumulative
131                ));
132                lines.push(format!("{}_count {}", metric.name, cumulative));
133
134                lines.join("\n")
135            }
136            MetricValue::Summary {
137                count,
138                sum,
139                quantiles,
140            } => {
141                let mut lines = Vec::new();
142
143                for (quantile, value) in quantiles {
144                    lines.push(format!(
145                        "{}{{quantile=\"{}\"}} {}",
146                        metric.name, quantile, value
147                    ));
148                }
149
150                lines.push(format!("{}_sum {}", metric.name, sum));
151                lines.push(format!("{}_count {}", metric.name, count));
152
153                lines.join("\n")
154            }
155        };
156
157        format!("{}{}{}", help, type_line, value_line)
158    }
159}
160
161impl Default for ConsoleReporter {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167#[async_trait]
168impl MetricsReporter for ConsoleReporter {
169    async fn report_metric(&self, metric: &Metric) -> Result<()> {
170        match self.format {
171            ReportFormat::Human => {
172                println!("{}", self.format_human(metric));
173            }
174            ReportFormat::Json => {
175                let json = serde_json::to_string_pretty(metric)?;
176                println!("{}", json);
177            }
178            ReportFormat::Prometheus => {
179                println!("{}", self.format_prometheus(metric));
180            }
181        }
182
183        Ok(())
184    }
185
186    async fn report_metrics(&self, metrics: &[Metric]) -> Result<()> {
187        for metric in metrics {
188            self.report_metric(metric).await?;
189        }
190
191        Ok(())
192    }
193
194    async fn flush(&self) -> Result<()> {
195        Ok(())
196    }
197}
198
199/// JSON reporter that outputs metrics as JSON.
200pub struct JsonReporter {
201    pretty: bool,
202}
203
204impl JsonReporter {
205    /// Create a new JSON reporter.
206    pub fn new() -> Self {
207        Self { pretty: true }
208    }
209
210    /// Create a JSON reporter with compact output.
211    pub fn compact() -> Self {
212        Self { pretty: false }
213    }
214}
215
216impl Default for JsonReporter {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222#[async_trait]
223impl MetricsReporter for JsonReporter {
224    async fn report_metric(&self, metric: &Metric) -> Result<()> {
225        let json = if self.pretty {
226            serde_json::to_string_pretty(metric)?
227        } else {
228            serde_json::to_string(metric)?
229        };
230
231        println!("{}", json);
232
233        Ok(())
234    }
235
236    async fn report_metrics(&self, metrics: &[Metric]) -> Result<()> {
237        let json = if self.pretty {
238            serde_json::to_string_pretty(metrics)?
239        } else {
240            serde_json::to_string(metrics)?
241        };
242
243        println!("{}", json);
244
245        Ok(())
246    }
247
248    async fn flush(&self) -> Result<()> {
249        Ok(())
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[tokio::test]
258    async fn test_console_reporter() {
259        let reporter = ConsoleReporter::new();
260
261        let metric = Metric::new(
262            "test_metric".to_string(),
263            MetricType::Counter,
264            MetricValue::Integer(42),
265        );
266
267        reporter
268            .report_metric(&metric)
269            .await
270            .expect("metric reporting should succeed");
271    }
272
273    #[tokio::test]
274    async fn test_json_reporter() {
275        let reporter = JsonReporter::new();
276
277        let metric = Metric::new(
278            "test_metric".to_string(),
279            MetricType::Gauge,
280            MetricValue::Float(std::f64::consts::E),
281        );
282
283        reporter
284            .report_metric(&metric)
285            .await
286            .expect("metric reporting should succeed");
287    }
288
289    #[test]
290    fn test_format_human() {
291        let reporter = ConsoleReporter::new();
292
293        let metric = Metric::new(
294            "test".to_string(),
295            MetricType::Counter,
296            MetricValue::Integer(100),
297        );
298
299        let formatted = reporter.format_human(&metric);
300        assert!(formatted.contains("test"));
301        assert!(formatted.contains("100"));
302    }
303
304    #[test]
305    fn test_format_prometheus() {
306        let reporter = ConsoleReporter::with_format(ReportFormat::Prometheus);
307
308        let metric = Metric::new(
309            "test_counter".to_string(),
310            MetricType::Counter,
311            MetricValue::Integer(42),
312        )
313        .with_help("Test counter metric".to_string());
314
315        let formatted = reporter.format_prometheus(&metric);
316        assert!(formatted.contains("# HELP"));
317        assert!(formatted.contains("# TYPE"));
318    }
319}