oxigdal_streaming/metrics/
reporter.rs1use super::collector::{Metric, MetricType, MetricValue};
4use crate::error::Result;
5use async_trait::async_trait;
6use serde_json;
7
8#[async_trait]
10pub trait MetricsReporter: Send + Sync {
11 async fn report_metric(&self, metric: &Metric) -> Result<()>;
13
14 async fn report_metrics(&self, metrics: &[Metric]) -> Result<()>;
16
17 async fn flush(&self) -> Result<()>;
19}
20
21pub struct ConsoleReporter {
23 format: ReportFormat,
24}
25
26#[derive(Debug, Clone, Copy)]
28pub enum ReportFormat {
29 Human,
31
32 Json,
34
35 Prometheus,
37}
38
39impl ConsoleReporter {
40 pub fn new() -> Self {
42 Self {
43 format: ReportFormat::Human,
44 }
45 }
46
47 pub fn with_format(format: ReportFormat) -> Self {
49 Self { format }
50 }
51
52 fn format_human(&self, metric: &Metric) -> String {
54 let value_str = match &metric.value {
55 MetricValue::Integer(v) => format!("{}", v),
56 MetricValue::Float(v) => format!("{:.2}", v),
57 MetricValue::Histogram { buckets, counts } => {
58 let mut s = String::from("[");
59 for (bucket, count) in buckets.iter().zip(counts.iter()) {
60 s.push_str(&format!("{}:{}, ", bucket, count));
61 }
62 s.push(']');
63 s
64 }
65 MetricValue::Summary {
66 count,
67 sum,
68 quantiles,
69 } => {
70 format!("count={}, sum={:.2}, quantiles={:?}", count, sum, quantiles)
71 }
72 };
73
74 let tags_str = if metric.tags.is_empty() {
75 String::new()
76 } else {
77 let tags: Vec<String> = metric
78 .tags
79 .iter()
80 .map(|(k, v)| format!("{}={}", k, v))
81 .collect();
82 format!(" {{{}}}", tags.join(", "))
83 };
84
85 format!(
86 "{} [{:?}] = {}{}",
87 metric.name, metric.metric_type, value_str, tags_str
88 )
89 }
90
91 fn format_prometheus(&self, metric: &Metric) -> String {
93 let help = if let Some(h) = &metric.help {
94 format!("# HELP {} {}\n", metric.name, h)
95 } else {
96 String::new()
97 };
98
99 let type_str = match metric.metric_type {
100 MetricType::Counter => "counter",
101 MetricType::Gauge => "gauge",
102 MetricType::Histogram => "histogram",
103 MetricType::Summary => "summary",
104 MetricType::Timer => "gauge",
105 };
106
107 let type_line = format!("# TYPE {} {}\n", metric.name, type_str);
108
109 let value_line = match &metric.value {
110 MetricValue::Integer(v) => {
111 format!("{} {}", metric.name, v)
112 }
113 MetricValue::Float(v) => {
114 format!("{} {}", metric.name, v)
115 }
116 MetricValue::Histogram { buckets, counts } => {
117 let mut lines = Vec::new();
118 let mut cumulative = 0;
119
120 for (bucket, count) in buckets.iter().zip(counts.iter()) {
121 cumulative += count;
122 lines.push(format!(
123 "{}_bucket{{le=\"{}\"}} {}",
124 metric.name, bucket, cumulative
125 ));
126 }
127
128 lines.push(format!(
129 "{}_bucket{{le=\"+Inf\"}} {}",
130 metric.name, cumulative
131 ));
132 lines.push(format!("{}_count {}", metric.name, cumulative));
133
134 lines.join("\n")
135 }
136 MetricValue::Summary {
137 count,
138 sum,
139 quantiles,
140 } => {
141 let mut lines = Vec::new();
142
143 for (quantile, value) in quantiles {
144 lines.push(format!(
145 "{}{{quantile=\"{}\"}} {}",
146 metric.name, quantile, value
147 ));
148 }
149
150 lines.push(format!("{}_sum {}", metric.name, sum));
151 lines.push(format!("{}_count {}", metric.name, count));
152
153 lines.join("\n")
154 }
155 };
156
157 format!("{}{}{}", help, type_line, value_line)
158 }
159}
160
161impl Default for ConsoleReporter {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167#[async_trait]
168impl MetricsReporter for ConsoleReporter {
169 async fn report_metric(&self, metric: &Metric) -> Result<()> {
170 match self.format {
171 ReportFormat::Human => {
172 println!("{}", self.format_human(metric));
173 }
174 ReportFormat::Json => {
175 let json = serde_json::to_string_pretty(metric)?;
176 println!("{}", json);
177 }
178 ReportFormat::Prometheus => {
179 println!("{}", self.format_prometheus(metric));
180 }
181 }
182
183 Ok(())
184 }
185
186 async fn report_metrics(&self, metrics: &[Metric]) -> Result<()> {
187 for metric in metrics {
188 self.report_metric(metric).await?;
189 }
190
191 Ok(())
192 }
193
194 async fn flush(&self) -> Result<()> {
195 Ok(())
196 }
197}
198
199pub struct JsonReporter {
201 pretty: bool,
202}
203
204impl JsonReporter {
205 pub fn new() -> Self {
207 Self { pretty: true }
208 }
209
210 pub fn compact() -> Self {
212 Self { pretty: false }
213 }
214}
215
216impl Default for JsonReporter {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222#[async_trait]
223impl MetricsReporter for JsonReporter {
224 async fn report_metric(&self, metric: &Metric) -> Result<()> {
225 let json = if self.pretty {
226 serde_json::to_string_pretty(metric)?
227 } else {
228 serde_json::to_string(metric)?
229 };
230
231 println!("{}", json);
232
233 Ok(())
234 }
235
236 async fn report_metrics(&self, metrics: &[Metric]) -> Result<()> {
237 let json = if self.pretty {
238 serde_json::to_string_pretty(metrics)?
239 } else {
240 serde_json::to_string(metrics)?
241 };
242
243 println!("{}", json);
244
245 Ok(())
246 }
247
248 async fn flush(&self) -> Result<()> {
249 Ok(())
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[tokio::test]
258 async fn test_console_reporter() {
259 let reporter = ConsoleReporter::new();
260
261 let metric = Metric::new(
262 "test_metric".to_string(),
263 MetricType::Counter,
264 MetricValue::Integer(42),
265 );
266
267 reporter
268 .report_metric(&metric)
269 .await
270 .expect("metric reporting should succeed");
271 }
272
273 #[tokio::test]
274 async fn test_json_reporter() {
275 let reporter = JsonReporter::new();
276
277 let metric = Metric::new(
278 "test_metric".to_string(),
279 MetricType::Gauge,
280 MetricValue::Float(std::f64::consts::E),
281 );
282
283 reporter
284 .report_metric(&metric)
285 .await
286 .expect("metric reporting should succeed");
287 }
288
289 #[test]
290 fn test_format_human() {
291 let reporter = ConsoleReporter::new();
292
293 let metric = Metric::new(
294 "test".to_string(),
295 MetricType::Counter,
296 MetricValue::Integer(100),
297 );
298
299 let formatted = reporter.format_human(&metric);
300 assert!(formatted.contains("test"));
301 assert!(formatted.contains("100"));
302 }
303
304 #[test]
305 fn test_format_prometheus() {
306 let reporter = ConsoleReporter::with_format(ReportFormat::Prometheus);
307
308 let metric = Metric::new(
309 "test_counter".to_string(),
310 MetricType::Counter,
311 MetricValue::Integer(42),
312 )
313 .with_help("Test counter metric".to_string());
314
315 let formatted = reporter.format_prometheus(&metric);
316 assert!(formatted.contains("# HELP"));
317 assert!(formatted.contains("# TYPE"));
318 }
319}