Skip to main content

kaniop_operator/
prometheus_exporter.rs

1use std::fmt::Write as FmtWrite;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use opentelemetry_sdk::error::OTelSdkError;
6use opentelemetry_sdk::metrics::Temporality;
7use opentelemetry_sdk::metrics::data::{
8    AggregatedMetrics, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
9};
10use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
11use tracing::debug;
12
13/// In-memory Prometheus text format exporter
14#[derive(Clone)]
15pub struct PrometheusExporter {
16    data: Arc<Mutex<Option<String>>>,
17}
18
19impl PrometheusExporter {
20    pub fn new() -> Self {
21        debug!("Creating new Prometheus exporter");
22        Self {
23            data: Arc::new(Mutex::new(None)),
24        }
25    }
26
27    /// Get the latest metrics in Prometheus text format
28    pub fn get_metrics(&self) -> Option<String> {
29        self.data.lock().unwrap().clone()
30    }
31
32    /// Convert OpenTelemetry metrics to Prometheus text format
33    fn format_metrics(metrics: &ResourceMetrics) -> String {
34        let mut output = String::new();
35
36        for scope_metrics in metrics.scope_metrics() {
37            let scope_name = scope_metrics.scope().name();
38            for metric in scope_metrics.metrics() {
39                let metric_name = format!("{}_{}", scope_name, metric.name());
40                let description = metric.description();
41
42                // Write HELP line
43                writeln!(output, "# HELP {} {}", metric_name, description).ok();
44
45                // Write TYPE line and data based on metric type
46                match metric.data() {
47                    AggregatedMetrics::F64(data) => {
48                        Self::format_metric_data(&metric_name, data, &mut output, |v| v);
49                    }
50                    AggregatedMetrics::U64(data) => {
51                        Self::format_metric_data(&metric_name, data, &mut output, |v| v as f64);
52                    }
53                    AggregatedMetrics::I64(data) => {
54                        Self::format_metric_data(&metric_name, data, &mut output, |v| v as f64);
55                    }
56                }
57            }
58        }
59
60        writeln!(output, "# EOF").ok();
61        output
62    }
63
64    fn format_metric_data<T>(
65        metric_name: &str,
66        data: &MetricData<T>,
67        output: &mut String,
68        to_f64: impl Fn(T) -> f64,
69    ) where
70        T: Copy,
71    {
72        match data {
73            MetricData::Sum(sum) => {
74                writeln!(output, "# TYPE {} counter", metric_name).ok();
75                Self::format_sum(metric_name, sum, output, to_f64);
76            }
77            MetricData::Gauge(gauge) => {
78                writeln!(output, "# TYPE {} gauge", metric_name).ok();
79                Self::format_gauge(metric_name, gauge, output, to_f64);
80            }
81            MetricData::Histogram(histogram) => {
82                writeln!(output, "# TYPE {} histogram", metric_name).ok();
83                Self::format_histogram(metric_name, histogram, output, to_f64);
84            }
85            MetricData::ExponentialHistogram(_) => {
86                // Skip exponential histograms for now
87            }
88        }
89    }
90
91    fn format_sum<T>(
92        metric_name: &str,
93        sum: &Sum<T>,
94        output: &mut String,
95        to_f64: impl Fn(T) -> f64,
96    ) where
97        T: Copy,
98    {
99        for data_point in sum.data_points() {
100            let labels = format_attributes(data_point.attributes());
101            let value = to_f64(data_point.value());
102            writeln!(output, "{}_total{} {}", metric_name, labels, value).ok();
103        }
104    }
105
106    fn format_gauge<T>(
107        metric_name: &str,
108        gauge: &Gauge<T>,
109        output: &mut String,
110        to_f64: impl Fn(T) -> f64,
111    ) where
112        T: Copy,
113    {
114        for data_point in gauge.data_points() {
115            let labels = format_attributes(data_point.attributes());
116            let value = to_f64(data_point.value());
117            writeln!(output, "{}{} {}", metric_name, labels, value).ok();
118        }
119    }
120
121    fn format_histogram<T>(
122        metric_name: &str,
123        histogram: &Histogram<T>,
124        output: &mut String,
125        to_f64: impl Fn(T) -> f64,
126    ) where
127        T: Copy,
128    {
129        for data_point in histogram.data_points() {
130            let labels_base = format_attributes(data_point.attributes());
131
132            // Write bucket counts
133            let bounds_vec: Vec<_> = data_point.bounds().collect();
134            let mut cumulative = 0u64;
135            for (i, count) in data_point.bucket_counts().enumerate() {
136                cumulative += count;
137                let bucket_label = if i < bounds_vec.len() {
138                    format!("le=\"{}\"", bounds_vec[i])
139                } else {
140                    "le=\"+Inf\"".to_string()
141                };
142
143                let labels = if labels_base.is_empty() {
144                    format!("{{{}}}", bucket_label)
145                } else {
146                    format!(
147                        "{},{}}}",
148                        &labels_base[..labels_base.len() - 1],
149                        bucket_label
150                    )
151                };
152
153                writeln!(output, "{}_bucket{} {}", metric_name, labels, cumulative).ok();
154            }
155
156            // Write sum and count
157            writeln!(
158                output,
159                "{}_sum{} {}",
160                metric_name,
161                labels_base,
162                to_f64(data_point.sum())
163            )
164            .ok();
165            writeln!(
166                output,
167                "{}_count{} {}",
168                metric_name,
169                labels_base,
170                data_point.count()
171            )
172            .ok();
173        }
174    }
175}
176
177impl Default for PrometheusExporter {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183/// Format OpenTelemetry attributes as Prometheus labels
184fn format_attributes<'a>(attrs: impl Iterator<Item = &'a opentelemetry::KeyValue>) -> String {
185    let collected: Vec<_> = attrs.collect();
186    if collected.is_empty() {
187        return String::new();
188    }
189
190    let mut labels = String::from("{");
191    for (i, kv) in collected.iter().enumerate() {
192        if i > 0 {
193            labels.push(',');
194        }
195        write!(labels, "{}=\"{}\"", kv.key.as_str(), kv.value).ok();
196    }
197    labels.push('}');
198    labels
199}
200
201impl PushMetricExporter for PrometheusExporter {
202    async fn export(&self, metrics: &ResourceMetrics) -> Result<(), OTelSdkError> {
203        let formatted = Self::format_metrics(metrics);
204        *self.data.lock().unwrap() = Some(formatted);
205        Ok(())
206    }
207
208    fn force_flush(&self) -> Result<(), OTelSdkError> {
209        Ok(())
210    }
211
212    fn shutdown(&self) -> Result<(), OTelSdkError> {
213        Ok(())
214    }
215
216    fn shutdown_with_timeout(&self, _timeout: Duration) -> Result<(), OTelSdkError> {
217        Ok(())
218    }
219
220    fn temporality(&self) -> Temporality {
221        Temporality::Cumulative
222    }
223}
224
225/// Global Prometheus exporter instance
226static PROMETHEUS_EXPORTER: Mutex<Option<PrometheusExporter>> = Mutex::new(None);
227
228/// Set the global Prometheus exporter instance
229pub fn set_global_exporter(exporter: PrometheusExporter) {
230    debug!("Setting global Prometheus exporter");
231    *PROMETHEUS_EXPORTER.lock().unwrap() = Some(exporter);
232}
233
234/// Format Prometheus metrics from the global exporter
235pub fn format_prometheus_metrics(_service_name: &str) -> Result<String, String> {
236    let exporter_guard = PROMETHEUS_EXPORTER.lock().unwrap();
237
238    if let Some(exporter) = exporter_guard.as_ref() {
239        exporter
240            .get_metrics()
241            .ok_or_else(|| "No metrics available yet".to_string())
242    } else {
243        Err("Prometheus exporter not initialized".to_string())
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use opentelemetry::KeyValue;
251    use opentelemetry::metrics::{Meter, MeterProvider};
252
253    fn create_test_provider_and_exporter() -> (
254        opentelemetry_sdk::metrics::SdkMeterProvider,
255        PrometheusExporter,
256        Meter,
257    ) {
258        let exporter = PrometheusExporter::new();
259        let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter.clone())
260            .with_interval(std::time::Duration::from_millis(50))
261            .build();
262        let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
263            .with_reader(reader)
264            .build();
265        let meter = provider.meter("test");
266        (provider, exporter, meter)
267    }
268
269    async fn wait_for_export() {
270        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
271    }
272
273    #[tokio::test]
274    async fn test_counter_export() {
275        let (_provider, exporter, meter) = create_test_provider_and_exporter();
276
277        let counter = meter
278            .u64_counter("test_counter")
279            .with_description("A test counter")
280            .build();
281
282        counter.add(5, &[KeyValue::new("label1", "value1")]);
283        counter.add(3, &[KeyValue::new("label1", "value2")]);
284
285        wait_for_export().await;
286
287        let metrics = exporter.get_metrics();
288        assert!(metrics.is_some(), "Metrics should be available");
289
290        let metrics_text = metrics.unwrap();
291        assert!(
292            metrics_text.contains("# HELP test_test_counter"),
293            "Should contain HELP line"
294        );
295        assert!(
296            metrics_text.contains("# TYPE test_test_counter counter"),
297            "Should contain TYPE line"
298        );
299        assert!(
300            metrics_text.contains("test_test_counter"),
301            "Should contain metric name"
302        );
303    }
304
305    #[tokio::test]
306    async fn test_gauge_export() {
307        let (_provider, exporter, meter) = create_test_provider_and_exporter();
308
309        let gauge = meter
310            .i64_gauge("test_gauge")
311            .with_description("A test gauge")
312            .build();
313
314        gauge.record(42, &[KeyValue::new("status", "active")]);
315
316        wait_for_export().await;
317
318        let metrics = exporter.get_metrics();
319        assert!(metrics.is_some(), "Metrics should be available");
320
321        let metrics_text = metrics.unwrap();
322        assert!(
323            metrics_text.contains("# HELP test_test_gauge"),
324            "Should contain HELP line"
325        );
326        assert!(
327            metrics_text.contains("# TYPE test_test_gauge gauge"),
328            "Should contain TYPE line"
329        );
330    }
331
332    #[tokio::test]
333    async fn test_histogram_export() {
334        let (_provider, exporter, meter) = create_test_provider_and_exporter();
335
336        let histogram = meter
337            .f64_histogram("test_histogram")
338            .with_description("A test histogram")
339            .build();
340
341        histogram.record(0.5, &[KeyValue::new("method", "GET")]);
342        histogram.record(1.5, &[KeyValue::new("method", "GET")]);
343
344        wait_for_export().await;
345
346        let metrics = exporter.get_metrics();
347        assert!(metrics.is_some(), "Metrics should be available");
348
349        let metrics_text = metrics.unwrap();
350        assert!(
351            metrics_text.contains("# HELP test_test_histogram"),
352            "Should contain HELP line"
353        );
354        assert!(
355            metrics_text.contains("# TYPE test_test_histogram histogram"),
356            "Should contain TYPE line"
357        );
358        assert!(
359            metrics_text.contains("test_test_histogram_bucket"),
360            "Should contain bucket metrics"
361        );
362        assert!(
363            metrics_text.contains("test_test_histogram_sum"),
364            "Should contain sum metric"
365        );
366        assert!(
367            metrics_text.contains("test_test_histogram_count"),
368            "Should contain count metric"
369        );
370    }
371
372    #[tokio::test]
373    async fn test_multiple_labels() {
374        let (_provider, exporter, meter) = create_test_provider_and_exporter();
375
376        let counter = meter.u64_counter("multi_label_counter").build();
377
378        counter.add(
379            1,
380            &[
381                KeyValue::new("controller", "kanidm"),
382                KeyValue::new("namespace", "default"),
383                KeyValue::new("action", "reconcile"),
384            ],
385        );
386
387        wait_for_export().await;
388
389        let metrics = exporter.get_metrics();
390        assert!(metrics.is_some(), "Metrics should be available");
391
392        let metrics_text = metrics.unwrap();
393        assert!(
394            metrics_text.contains("controller="),
395            "Should contain controller label"
396        );
397        assert!(
398            metrics_text.contains("namespace="),
399            "Should contain namespace label"
400        );
401        assert!(
402            metrics_text.contains("action="),
403            "Should contain action label"
404        );
405    }
406
407    #[test]
408    fn test_format_attributes_empty() {
409        let attrs: [KeyValue; 0] = [];
410        let result = format_attributes(attrs.iter());
411        assert_eq!(result, "", "Empty attributes should return empty string");
412    }
413
414    #[test]
415    fn test_format_attributes_single() {
416        let attrs = [KeyValue::new("key", "value")];
417        let result = format_attributes(attrs.iter());
418        assert_eq!(result, r#"{key="value"}"#);
419    }
420
421    #[test]
422    fn test_format_attributes_multiple() {
423        let attrs = [
424            KeyValue::new("key1", "value1"),
425            KeyValue::new("key2", "value2"),
426        ];
427        let result = format_attributes(attrs.iter());
428        assert_eq!(result, r#"{key1="value1",key2="value2"}"#);
429    }
430
431    #[tokio::test]
432    async fn test_global_exporter() {
433        let exporter = PrometheusExporter::new();
434        set_global_exporter(exporter.clone());
435
436        let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter)
437            .with_interval(std::time::Duration::from_millis(50))
438            .build();
439        let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
440            .with_reader(reader)
441            .build();
442        let meter = provider.meter("global_test");
443
444        let counter = meter.u64_counter("global_counter").build();
445        counter.add(1, &[]);
446
447        wait_for_export().await;
448
449        let result = format_prometheus_metrics("test");
450        assert!(result.is_ok(), "Should get metrics from global exporter");
451    }
452}