opentelemetry_stdout/metrics/
exporter.rs

1use chrono::{DateTime, Utc};
2use core::{f64, fmt};
3use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
4use opentelemetry_sdk::metrics::Temporality;
5use opentelemetry_sdk::{
6    error::OTelSdkResult,
7    metrics::{
8        data::{
9            Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, ResourceMetrics, ScopeMetrics,
10            Sum, SumDataPoint,
11        },
12        exporter::PushMetricExporter,
13    },
14};
15use std::fmt::Debug;
16use std::sync::atomic;
17use std::time::Duration;
18
19/// An OpenTelemetry exporter that writes to stdout on export.
20pub struct MetricExporter {
21    is_shutdown: atomic::AtomicBool,
22    temporality: Temporality,
23}
24
25impl MetricExporter {
26    /// Create a builder to configure this exporter.
27    pub fn builder() -> MetricExporterBuilder {
28        MetricExporterBuilder::default()
29    }
30}
31impl Default for MetricExporter {
32    fn default() -> Self {
33        MetricExporterBuilder::default().build()
34    }
35}
36
37impl fmt::Debug for MetricExporter {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.write_str("MetricExporter")
40    }
41}
42
43impl PushMetricExporter for MetricExporter {
44    /// Write Metrics to stdout
45    async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
46        if self.is_shutdown.load(atomic::Ordering::SeqCst) {
47            Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown)
48        } else {
49            println!("Metrics");
50            println!("Resource");
51            if let Some(schema_url) = metrics.resource().schema_url() {
52                println!("\tResource SchemaUrl: {schema_url:?}");
53            }
54
55            metrics.resource().iter().for_each(|(k, v)| {
56                println!("\t ->  {k}={v:?}");
57            });
58            print_metrics(metrics.scope_metrics());
59            Ok(())
60        }
61    }
62
63    fn force_flush(&self) -> OTelSdkResult {
64        // exporter holds no state, nothing to flush
65        Ok(())
66    }
67
68    fn shutdown(&self) -> OTelSdkResult {
69        self.shutdown_with_timeout(Duration::from_secs(5))
70    }
71
72    fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
73        self.is_shutdown.store(true, atomic::Ordering::SeqCst);
74        Ok(())
75    }
76
77    fn temporality(&self) -> Temporality {
78        self.temporality
79    }
80}
81
82fn print_metrics<'a>(metrics: impl Iterator<Item = &'a ScopeMetrics>) {
83    for (i, metric) in metrics.enumerate() {
84        println!("\tInstrumentation Scope #{i}");
85        let scope = metric.scope();
86        println!("\t\tName         : {}", scope.name());
87        if let Some(version) = scope.version() {
88            println!("\t\tVersion  : {version:?}");
89        }
90        if let Some(schema_url) = scope.schema_url() {
91            println!("\t\tSchemaUrl: {schema_url:?}");
92        }
93        scope.attributes().enumerate().for_each(|(index, kv)| {
94            if index == 0 {
95                println!("\t\tScope Attributes:");
96            }
97            println!("\t\t\t ->  {}: {}", kv.key, kv.value);
98        });
99
100        metric.metrics().enumerate().for_each(|(i, metric)| {
101            println!("Metric #{i}");
102            println!("\t\tName         : {}", metric.name());
103            println!("\t\tDescription  : {}", metric.description());
104            println!("\t\tUnit         : {}", metric.unit());
105
106            fn print_info<T>(data: &MetricData<T>)
107            where
108                T: Debug + Copy,
109            {
110                match data {
111                    MetricData::Gauge(gauge) => {
112                        println!("\t\tType         : Gauge");
113                        print_gauge(gauge);
114                    }
115                    MetricData::Sum(sum) => {
116                        println!("\t\tType         : Sum");
117                        print_sum(sum);
118                    }
119                    MetricData::Histogram(hist) => {
120                        println!("\t\tType         : Histogram");
121                        print_histogram(hist);
122                    }
123                    MetricData::ExponentialHistogram(_) => {
124                        println!("\t\tType         : Exponential Histogram");
125                        // TODO: add support for ExponentialHistogram
126                    }
127                }
128            }
129            match metric.data() {
130                AggregatedMetrics::F64(data) => print_info(data),
131                AggregatedMetrics::U64(data) => print_info(data),
132                AggregatedMetrics::I64(data) => print_info(data),
133            }
134        });
135    }
136}
137
138fn print_sum<T: Debug + Copy>(sum: &Sum<T>) {
139    println!("\t\tSum DataPoints");
140    println!("\t\tMonotonic    : {}", sum.is_monotonic());
141    if sum.temporality() == Temporality::Cumulative {
142        println!("\t\tTemporality  : Cumulative");
143    } else {
144        println!("\t\tTemporality  : Delta");
145    }
146    let datetime: DateTime<Utc> = sum.start_time().into();
147    println!(
148        "\t\tStartTime    : {}",
149        datetime.format("%Y-%m-%d %H:%M:%S%.6f")
150    );
151    let datetime: DateTime<Utc> = sum.time().into();
152    println!(
153        "\t\tEndTime      : {}",
154        datetime.format("%Y-%m-%d %H:%M:%S%.6f")
155    );
156    print_sum_data_points(sum.data_points());
157}
158
159fn print_gauge<T: Debug + Copy>(gauge: &Gauge<T>) {
160    println!("\t\tGauge DataPoints");
161    if let Some(start_time) = gauge.start_time() {
162        let datetime: DateTime<Utc> = start_time.into();
163        println!(
164            "\t\tStartTime    : {}",
165            datetime.format("%Y-%m-%d %H:%M:%S%.6f")
166        );
167    }
168    let datetime: DateTime<Utc> = gauge.time().into();
169    println!(
170        "\t\tEndTime      : {}",
171        datetime.format("%Y-%m-%d %H:%M:%S%.6f")
172    );
173    print_gauge_data_points(gauge.data_points());
174}
175
176fn print_histogram<T: Debug + Copy>(histogram: &Histogram<T>) {
177    if histogram.temporality() == Temporality::Cumulative {
178        println!("\t\tTemporality  : Cumulative");
179    } else {
180        println!("\t\tTemporality  : Delta");
181    }
182    let datetime: DateTime<Utc> = histogram.start_time().into();
183    println!(
184        "\t\tStartTime    : {}",
185        datetime.format("%Y-%m-%d %H:%M:%S%.6f")
186    );
187    let datetime: DateTime<Utc> = histogram.time().into();
188    println!(
189        "\t\tEndTime      : {}",
190        datetime.format("%Y-%m-%d %H:%M:%S%.6f")
191    );
192    println!("\t\tHistogram DataPoints");
193    print_hist_data_points(histogram.data_points());
194}
195
196fn print_sum_data_points<'a, T: Debug + Copy + 'a>(
197    data_points: impl Iterator<Item = &'a SumDataPoint<T>>,
198) {
199    for (i, data_point) in data_points.enumerate() {
200        println!("\t\tDataPoint #{i}");
201        println!("\t\t\tValue        : {:#?}", data_point.value());
202        println!("\t\t\tAttributes   :");
203        for kv in data_point.attributes() {
204            println!("\t\t\t\t ->  {}: {}", kv.key, kv.value.as_str());
205        }
206    }
207}
208
209fn print_gauge_data_points<'a, T: Debug + Copy + 'a>(
210    data_points: impl Iterator<Item = &'a GaugeDataPoint<T>>,
211) {
212    for (i, data_point) in data_points.enumerate() {
213        println!("\t\tDataPoint #{i}");
214        println!("\t\t\tValue        : {:#?}", data_point.value());
215        println!("\t\t\tAttributes   :");
216        for kv in data_point.attributes() {
217            println!("\t\t\t\t ->  {}: {}", kv.key, kv.value.as_str());
218        }
219    }
220}
221
222fn print_hist_data_points<'a, T: Debug + Copy + 'a>(
223    data_points: impl Iterator<Item = &'a HistogramDataPoint<T>>,
224) {
225    for (i, data_point) in data_points.enumerate() {
226        println!("\t\tDataPoint #{i}");
227        println!("\t\t\tCount        : {}", data_point.count());
228        println!("\t\t\tSum          : {:?}", data_point.sum());
229        if let Some(min) = &data_point.min() {
230            println!("\t\t\tMin          : {min:?}");
231        }
232
233        if let Some(max) = &data_point.max() {
234            println!("\t\t\tMax          : {max:?}");
235        }
236
237        println!("\t\t\tAttributes   :");
238        for kv in data_point.attributes() {
239            println!("\t\t\t\t ->  {}: {}", kv.key, kv.value.as_str());
240        }
241
242        let mut lower_bound = f64::NEG_INFINITY;
243        let bounds_iter = data_point.bounds();
244        let mut bucket_counts_iter = data_point.bucket_counts();
245        let mut header_printed = false;
246
247        // Process all the regular buckets
248        for upper_bound in bounds_iter {
249            // Print header only once before the first item
250            if !header_printed {
251                println!("\t\t\tBuckets");
252                header_printed = true;
253            }
254
255            // Get the count for this bucket, or 0 if not available
256            let count = bucket_counts_iter.next().unwrap_or(0);
257            println!("\t\t\t\t {lower_bound} to {upper_bound} : {count}");
258            lower_bound = upper_bound;
259        }
260
261        // Handle the final +Infinity bucket if we processed any buckets
262        if header_printed {
263            let last_count = bucket_counts_iter.next().unwrap_or(0);
264            println!("\t\t\t\t{lower_bound} to +Infinity : {last_count}");
265        }
266    }
267}
268
269/// Configuration for the stdout metrics exporter
270#[derive(Default)]
271pub struct MetricExporterBuilder {
272    temporality: Option<Temporality>,
273}
274
275impl MetricExporterBuilder {
276    /// Set the [Temporality] of the exporter.
277    pub fn with_temporality(mut self, temporality: Temporality) -> Self {
278        self.temporality = Some(temporality);
279        self
280    }
281
282    /// Create a metrics exporter with the current configuration
283    pub fn build(self) -> MetricExporter {
284        MetricExporter {
285            temporality: self.temporality.unwrap_or_default(),
286            is_shutdown: atomic::AtomicBool::new(false),
287        }
288    }
289}
290
291impl fmt::Debug for MetricExporterBuilder {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        f.write_str("MetricExporterBuilder")
294    }
295}