1use std::fmt::Write as FmtWrite;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use opentelemetry_sdk::error::OTelSdkError;
6use opentelemetry_sdk::metrics::Temporality;
7use opentelemetry_sdk::metrics::data::{
8 AggregatedMetrics, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
9};
10use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
11use tracing::debug;
12
13#[derive(Clone)]
15pub struct PrometheusExporter {
16 data: Arc<Mutex<Option<String>>>,
17}
18
19impl PrometheusExporter {
20 pub fn new() -> Self {
21 debug!("Creating new Prometheus exporter");
22 Self {
23 data: Arc::new(Mutex::new(None)),
24 }
25 }
26
27 pub fn get_metrics(&self) -> Option<String> {
29 self.data.lock().unwrap().clone()
30 }
31
32 fn format_metrics(metrics: &ResourceMetrics) -> String {
34 let mut output = String::new();
35
36 for scope_metrics in metrics.scope_metrics() {
37 let scope_name = scope_metrics.scope().name();
38 for metric in scope_metrics.metrics() {
39 let metric_name = format!("{}_{}", scope_name, metric.name());
40 let description = metric.description();
41
42 writeln!(output, "# HELP {} {}", metric_name, description).ok();
44
45 match metric.data() {
47 AggregatedMetrics::F64(data) => {
48 Self::format_metric_data(&metric_name, data, &mut output, |v| v);
49 }
50 AggregatedMetrics::U64(data) => {
51 Self::format_metric_data(&metric_name, data, &mut output, |v| v as f64);
52 }
53 AggregatedMetrics::I64(data) => {
54 Self::format_metric_data(&metric_name, data, &mut output, |v| v as f64);
55 }
56 }
57 }
58 }
59
60 writeln!(output, "# EOF").ok();
61 output
62 }
63
64 fn format_metric_data<T>(
65 metric_name: &str,
66 data: &MetricData<T>,
67 output: &mut String,
68 to_f64: impl Fn(T) -> f64,
69 ) where
70 T: Copy,
71 {
72 match data {
73 MetricData::Sum(sum) => {
74 writeln!(output, "# TYPE {} counter", metric_name).ok();
75 Self::format_sum(metric_name, sum, output, to_f64);
76 }
77 MetricData::Gauge(gauge) => {
78 writeln!(output, "# TYPE {} gauge", metric_name).ok();
79 Self::format_gauge(metric_name, gauge, output, to_f64);
80 }
81 MetricData::Histogram(histogram) => {
82 writeln!(output, "# TYPE {} histogram", metric_name).ok();
83 Self::format_histogram(metric_name, histogram, output, to_f64);
84 }
85 MetricData::ExponentialHistogram(_) => {
86 }
88 }
89 }
90
91 fn format_sum<T>(
92 metric_name: &str,
93 sum: &Sum<T>,
94 output: &mut String,
95 to_f64: impl Fn(T) -> f64,
96 ) where
97 T: Copy,
98 {
99 for data_point in sum.data_points() {
100 let labels = format_attributes(data_point.attributes());
101 let value = to_f64(data_point.value());
102 writeln!(output, "{}_total{} {}", metric_name, labels, value).ok();
103 }
104 }
105
106 fn format_gauge<T>(
107 metric_name: &str,
108 gauge: &Gauge<T>,
109 output: &mut String,
110 to_f64: impl Fn(T) -> f64,
111 ) where
112 T: Copy,
113 {
114 for data_point in gauge.data_points() {
115 let labels = format_attributes(data_point.attributes());
116 let value = to_f64(data_point.value());
117 writeln!(output, "{}{} {}", metric_name, labels, value).ok();
118 }
119 }
120
121 fn format_histogram<T>(
122 metric_name: &str,
123 histogram: &Histogram<T>,
124 output: &mut String,
125 to_f64: impl Fn(T) -> f64,
126 ) where
127 T: Copy,
128 {
129 for data_point in histogram.data_points() {
130 let labels_base = format_attributes(data_point.attributes());
131
132 let bounds_vec: Vec<_> = data_point.bounds().collect();
134 let mut cumulative = 0u64;
135 for (i, count) in data_point.bucket_counts().enumerate() {
136 cumulative += count;
137 let bucket_label = if i < bounds_vec.len() {
138 format!("le=\"{}\"", bounds_vec[i])
139 } else {
140 "le=\"+Inf\"".to_string()
141 };
142
143 let labels = if labels_base.is_empty() {
144 format!("{{{}}}", bucket_label)
145 } else {
146 format!(
147 "{},{}}}",
148 &labels_base[..labels_base.len() - 1],
149 bucket_label
150 )
151 };
152
153 writeln!(output, "{}_bucket{} {}", metric_name, labels, cumulative).ok();
154 }
155
156 writeln!(
158 output,
159 "{}_sum{} {}",
160 metric_name,
161 labels_base,
162 to_f64(data_point.sum())
163 )
164 .ok();
165 writeln!(
166 output,
167 "{}_count{} {}",
168 metric_name,
169 labels_base,
170 data_point.count()
171 )
172 .ok();
173 }
174 }
175}
176
177impl Default for PrometheusExporter {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183fn format_attributes<'a>(attrs: impl Iterator<Item = &'a opentelemetry::KeyValue>) -> String {
185 let collected: Vec<_> = attrs.collect();
186 if collected.is_empty() {
187 return String::new();
188 }
189
190 let mut labels = String::from("{");
191 for (i, kv) in collected.iter().enumerate() {
192 if i > 0 {
193 labels.push(',');
194 }
195 write!(labels, "{}=\"{}\"", kv.key.as_str(), kv.value).ok();
196 }
197 labels.push('}');
198 labels
199}
200
201impl PushMetricExporter for PrometheusExporter {
202 async fn export(&self, metrics: &ResourceMetrics) -> Result<(), OTelSdkError> {
203 let formatted = Self::format_metrics(metrics);
204 *self.data.lock().unwrap() = Some(formatted);
205 Ok(())
206 }
207
208 fn force_flush(&self) -> Result<(), OTelSdkError> {
209 Ok(())
210 }
211
212 fn shutdown(&self) -> Result<(), OTelSdkError> {
213 Ok(())
214 }
215
216 fn shutdown_with_timeout(&self, _timeout: Duration) -> Result<(), OTelSdkError> {
217 Ok(())
218 }
219
220 fn temporality(&self) -> Temporality {
221 Temporality::Cumulative
222 }
223}
224
225static PROMETHEUS_EXPORTER: Mutex<Option<PrometheusExporter>> = Mutex::new(None);
227
228pub fn set_global_exporter(exporter: PrometheusExporter) {
230 debug!("Setting global Prometheus exporter");
231 *PROMETHEUS_EXPORTER.lock().unwrap() = Some(exporter);
232}
233
234pub fn format_prometheus_metrics(_service_name: &str) -> Result<String, String> {
236 let exporter_guard = PROMETHEUS_EXPORTER.lock().unwrap();
237
238 if let Some(exporter) = exporter_guard.as_ref() {
239 exporter
240 .get_metrics()
241 .ok_or_else(|| "No metrics available yet".to_string())
242 } else {
243 Err("Prometheus exporter not initialized".to_string())
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use opentelemetry::KeyValue;
251 use opentelemetry::metrics::{Meter, MeterProvider};
252
253 fn create_test_provider_and_exporter() -> (
254 opentelemetry_sdk::metrics::SdkMeterProvider,
255 PrometheusExporter,
256 Meter,
257 ) {
258 let exporter = PrometheusExporter::new();
259 let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter.clone())
260 .with_interval(std::time::Duration::from_millis(50))
261 .build();
262 let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
263 .with_reader(reader)
264 .build();
265 let meter = provider.meter("test");
266 (provider, exporter, meter)
267 }
268
269 async fn wait_for_export() {
270 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
271 }
272
273 #[tokio::test]
274 async fn test_counter_export() {
275 let (_provider, exporter, meter) = create_test_provider_and_exporter();
276
277 let counter = meter
278 .u64_counter("test_counter")
279 .with_description("A test counter")
280 .build();
281
282 counter.add(5, &[KeyValue::new("label1", "value1")]);
283 counter.add(3, &[KeyValue::new("label1", "value2")]);
284
285 wait_for_export().await;
286
287 let metrics = exporter.get_metrics();
288 assert!(metrics.is_some(), "Metrics should be available");
289
290 let metrics_text = metrics.unwrap();
291 assert!(
292 metrics_text.contains("# HELP test_test_counter"),
293 "Should contain HELP line"
294 );
295 assert!(
296 metrics_text.contains("# TYPE test_test_counter counter"),
297 "Should contain TYPE line"
298 );
299 assert!(
300 metrics_text.contains("test_test_counter"),
301 "Should contain metric name"
302 );
303 }
304
305 #[tokio::test]
306 async fn test_gauge_export() {
307 let (_provider, exporter, meter) = create_test_provider_and_exporter();
308
309 let gauge = meter
310 .i64_gauge("test_gauge")
311 .with_description("A test gauge")
312 .build();
313
314 gauge.record(42, &[KeyValue::new("status", "active")]);
315
316 wait_for_export().await;
317
318 let metrics = exporter.get_metrics();
319 assert!(metrics.is_some(), "Metrics should be available");
320
321 let metrics_text = metrics.unwrap();
322 assert!(
323 metrics_text.contains("# HELP test_test_gauge"),
324 "Should contain HELP line"
325 );
326 assert!(
327 metrics_text.contains("# TYPE test_test_gauge gauge"),
328 "Should contain TYPE line"
329 );
330 }
331
332 #[tokio::test]
333 async fn test_histogram_export() {
334 let (_provider, exporter, meter) = create_test_provider_and_exporter();
335
336 let histogram = meter
337 .f64_histogram("test_histogram")
338 .with_description("A test histogram")
339 .build();
340
341 histogram.record(0.5, &[KeyValue::new("method", "GET")]);
342 histogram.record(1.5, &[KeyValue::new("method", "GET")]);
343
344 wait_for_export().await;
345
346 let metrics = exporter.get_metrics();
347 assert!(metrics.is_some(), "Metrics should be available");
348
349 let metrics_text = metrics.unwrap();
350 assert!(
351 metrics_text.contains("# HELP test_test_histogram"),
352 "Should contain HELP line"
353 );
354 assert!(
355 metrics_text.contains("# TYPE test_test_histogram histogram"),
356 "Should contain TYPE line"
357 );
358 assert!(
359 metrics_text.contains("test_test_histogram_bucket"),
360 "Should contain bucket metrics"
361 );
362 assert!(
363 metrics_text.contains("test_test_histogram_sum"),
364 "Should contain sum metric"
365 );
366 assert!(
367 metrics_text.contains("test_test_histogram_count"),
368 "Should contain count metric"
369 );
370 }
371
372 #[tokio::test]
373 async fn test_multiple_labels() {
374 let (_provider, exporter, meter) = create_test_provider_and_exporter();
375
376 let counter = meter.u64_counter("multi_label_counter").build();
377
378 counter.add(
379 1,
380 &[
381 KeyValue::new("controller", "kanidm"),
382 KeyValue::new("namespace", "default"),
383 KeyValue::new("action", "reconcile"),
384 ],
385 );
386
387 wait_for_export().await;
388
389 let metrics = exporter.get_metrics();
390 assert!(metrics.is_some(), "Metrics should be available");
391
392 let metrics_text = metrics.unwrap();
393 assert!(
394 metrics_text.contains("controller="),
395 "Should contain controller label"
396 );
397 assert!(
398 metrics_text.contains("namespace="),
399 "Should contain namespace label"
400 );
401 assert!(
402 metrics_text.contains("action="),
403 "Should contain action label"
404 );
405 }
406
407 #[test]
408 fn test_format_attributes_empty() {
409 let attrs: [KeyValue; 0] = [];
410 let result = format_attributes(attrs.iter());
411 assert_eq!(result, "", "Empty attributes should return empty string");
412 }
413
414 #[test]
415 fn test_format_attributes_single() {
416 let attrs = [KeyValue::new("key", "value")];
417 let result = format_attributes(attrs.iter());
418 assert_eq!(result, r#"{key="value"}"#);
419 }
420
421 #[test]
422 fn test_format_attributes_multiple() {
423 let attrs = [
424 KeyValue::new("key1", "value1"),
425 KeyValue::new("key2", "value2"),
426 ];
427 let result = format_attributes(attrs.iter());
428 assert_eq!(result, r#"{key1="value1",key2="value2"}"#);
429 }
430
431 #[tokio::test]
432 async fn test_global_exporter() {
433 let exporter = PrometheusExporter::new();
434 set_global_exporter(exporter.clone());
435
436 let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
437 .with_interval(std::time::Duration::from_millis(50))
438 .build();
439 let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
440 .with_reader(reader)
441 .build();
442 let meter = provider.meter("global_test");
443
444 let counter = meter.u64_counter("global_counter").build();
445 counter.add(1, &[]);
446
447 wait_for_export().await;
448
449 let result = format_prometheus_metrics("test");
450 assert!(result.is_ok(), "Should get metrics from global exporter");
451 }
452}