opentelemetry_stdout/metrics/
exporter.rs1use chrono::{DateTime, Utc};
2use core::{f64, fmt};
3use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
4use opentelemetry_sdk::metrics::Temporality;
5use opentelemetry_sdk::{
6 error::OTelSdkResult,
7 metrics::{
8 data::{
9 Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, ResourceMetrics, ScopeMetrics,
10 Sum, SumDataPoint,
11 },
12 exporter::PushMetricExporter,
13 },
14};
15use std::fmt::Debug;
16use std::sync::atomic;
17use std::time::Duration;
18
19pub struct MetricExporter {
21 is_shutdown: atomic::AtomicBool,
22 temporality: Temporality,
23}
24
25impl MetricExporter {
26 pub fn builder() -> MetricExporterBuilder {
28 MetricExporterBuilder::default()
29 }
30}
31impl Default for MetricExporter {
32 fn default() -> Self {
33 MetricExporterBuilder::default().build()
34 }
35}
36
37impl fmt::Debug for MetricExporter {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.write_str("MetricExporter")
40 }
41}
42
43impl PushMetricExporter for MetricExporter {
44 async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
46 if self.is_shutdown.load(atomic::Ordering::SeqCst) {
47 Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown)
48 } else {
49 println!("Metrics");
50 println!("Resource");
51 if let Some(schema_url) = metrics.resource().schema_url() {
52 println!("\tResource SchemaUrl: {schema_url:?}");
53 }
54
55 metrics.resource().iter().for_each(|(k, v)| {
56 println!("\t -> {k}={v:?}");
57 });
58 print_metrics(metrics.scope_metrics());
59 Ok(())
60 }
61 }
62
63 fn force_flush(&self) -> OTelSdkResult {
64 Ok(())
66 }
67
68 fn shutdown(&self) -> OTelSdkResult {
69 self.shutdown_with_timeout(Duration::from_secs(5))
70 }
71
72 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
73 self.is_shutdown.store(true, atomic::Ordering::SeqCst);
74 Ok(())
75 }
76
77 fn temporality(&self) -> Temporality {
78 self.temporality
79 }
80}
81
82fn print_metrics<'a>(metrics: impl Iterator<Item = &'a ScopeMetrics>) {
83 for (i, metric) in metrics.enumerate() {
84 println!("\tInstrumentation Scope #{i}");
85 let scope = metric.scope();
86 println!("\t\tName : {}", scope.name());
87 if let Some(version) = scope.version() {
88 println!("\t\tVersion : {version:?}");
89 }
90 if let Some(schema_url) = scope.schema_url() {
91 println!("\t\tSchemaUrl: {schema_url:?}");
92 }
93 scope.attributes().enumerate().for_each(|(index, kv)| {
94 if index == 0 {
95 println!("\t\tScope Attributes:");
96 }
97 println!("\t\t\t -> {}: {}", kv.key, kv.value);
98 });
99
100 metric.metrics().enumerate().for_each(|(i, metric)| {
101 println!("Metric #{i}");
102 println!("\t\tName : {}", metric.name());
103 println!("\t\tDescription : {}", metric.description());
104 println!("\t\tUnit : {}", metric.unit());
105
106 fn print_info<T>(data: &MetricData<T>)
107 where
108 T: Debug + Copy,
109 {
110 match data {
111 MetricData::Gauge(gauge) => {
112 println!("\t\tType : Gauge");
113 print_gauge(gauge);
114 }
115 MetricData::Sum(sum) => {
116 println!("\t\tType : Sum");
117 print_sum(sum);
118 }
119 MetricData::Histogram(hist) => {
120 println!("\t\tType : Histogram");
121 print_histogram(hist);
122 }
123 MetricData::ExponentialHistogram(_) => {
124 println!("\t\tType : Exponential Histogram");
125 }
127 }
128 }
129 match metric.data() {
130 AggregatedMetrics::F64(data) => print_info(data),
131 AggregatedMetrics::U64(data) => print_info(data),
132 AggregatedMetrics::I64(data) => print_info(data),
133 }
134 });
135 }
136}
137
138fn print_sum<T: Debug + Copy>(sum: &Sum<T>) {
139 println!("\t\tSum DataPoints");
140 println!("\t\tMonotonic : {}", sum.is_monotonic());
141 if sum.temporality() == Temporality::Cumulative {
142 println!("\t\tTemporality : Cumulative");
143 } else {
144 println!("\t\tTemporality : Delta");
145 }
146 let datetime: DateTime<Utc> = sum.start_time().into();
147 println!(
148 "\t\tStartTime : {}",
149 datetime.format("%Y-%m-%d %H:%M:%S%.6f")
150 );
151 let datetime: DateTime<Utc> = sum.time().into();
152 println!(
153 "\t\tEndTime : {}",
154 datetime.format("%Y-%m-%d %H:%M:%S%.6f")
155 );
156 print_sum_data_points(sum.data_points());
157}
158
159fn print_gauge<T: Debug + Copy>(gauge: &Gauge<T>) {
160 println!("\t\tGauge DataPoints");
161 if let Some(start_time) = gauge.start_time() {
162 let datetime: DateTime<Utc> = start_time.into();
163 println!(
164 "\t\tStartTime : {}",
165 datetime.format("%Y-%m-%d %H:%M:%S%.6f")
166 );
167 }
168 let datetime: DateTime<Utc> = gauge.time().into();
169 println!(
170 "\t\tEndTime : {}",
171 datetime.format("%Y-%m-%d %H:%M:%S%.6f")
172 );
173 print_gauge_data_points(gauge.data_points());
174}
175
176fn print_histogram<T: Debug + Copy>(histogram: &Histogram<T>) {
177 if histogram.temporality() == Temporality::Cumulative {
178 println!("\t\tTemporality : Cumulative");
179 } else {
180 println!("\t\tTemporality : Delta");
181 }
182 let datetime: DateTime<Utc> = histogram.start_time().into();
183 println!(
184 "\t\tStartTime : {}",
185 datetime.format("%Y-%m-%d %H:%M:%S%.6f")
186 );
187 let datetime: DateTime<Utc> = histogram.time().into();
188 println!(
189 "\t\tEndTime : {}",
190 datetime.format("%Y-%m-%d %H:%M:%S%.6f")
191 );
192 println!("\t\tHistogram DataPoints");
193 print_hist_data_points(histogram.data_points());
194}
195
196fn print_sum_data_points<'a, T: Debug + Copy + 'a>(
197 data_points: impl Iterator<Item = &'a SumDataPoint<T>>,
198) {
199 for (i, data_point) in data_points.enumerate() {
200 println!("\t\tDataPoint #{i}");
201 println!("\t\t\tValue : {:#?}", data_point.value());
202 println!("\t\t\tAttributes :");
203 for kv in data_point.attributes() {
204 println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str());
205 }
206 }
207}
208
209fn print_gauge_data_points<'a, T: Debug + Copy + 'a>(
210 data_points: impl Iterator<Item = &'a GaugeDataPoint<T>>,
211) {
212 for (i, data_point) in data_points.enumerate() {
213 println!("\t\tDataPoint #{i}");
214 println!("\t\t\tValue : {:#?}", data_point.value());
215 println!("\t\t\tAttributes :");
216 for kv in data_point.attributes() {
217 println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str());
218 }
219 }
220}
221
222fn print_hist_data_points<'a, T: Debug + Copy + 'a>(
223 data_points: impl Iterator<Item = &'a HistogramDataPoint<T>>,
224) {
225 for (i, data_point) in data_points.enumerate() {
226 println!("\t\tDataPoint #{i}");
227 println!("\t\t\tCount : {}", data_point.count());
228 println!("\t\t\tSum : {:?}", data_point.sum());
229 if let Some(min) = &data_point.min() {
230 println!("\t\t\tMin : {min:?}");
231 }
232
233 if let Some(max) = &data_point.max() {
234 println!("\t\t\tMax : {max:?}");
235 }
236
237 println!("\t\t\tAttributes :");
238 for kv in data_point.attributes() {
239 println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str());
240 }
241
242 let mut lower_bound = f64::NEG_INFINITY;
243 let bounds_iter = data_point.bounds();
244 let mut bucket_counts_iter = data_point.bucket_counts();
245 let mut header_printed = false;
246
247 for upper_bound in bounds_iter {
249 if !header_printed {
251 println!("\t\t\tBuckets");
252 header_printed = true;
253 }
254
255 let count = bucket_counts_iter.next().unwrap_or(0);
257 println!("\t\t\t\t {lower_bound} to {upper_bound} : {count}");
258 lower_bound = upper_bound;
259 }
260
261 if header_printed {
263 let last_count = bucket_counts_iter.next().unwrap_or(0);
264 println!("\t\t\t\t{lower_bound} to +Infinity : {last_count}");
265 }
266 }
267}
268
269#[derive(Default)]
271pub struct MetricExporterBuilder {
272 temporality: Option<Temporality>,
273}
274
275impl MetricExporterBuilder {
276 pub fn with_temporality(mut self, temporality: Temporality) -> Self {
278 self.temporality = Some(temporality);
279 self
280 }
281
282 pub fn build(self) -> MetricExporter {
284 MetricExporter {
285 temporality: self.temporality.unwrap_or_default(),
286 is_shutdown: atomic::AtomicBool::new(false),
287 }
288 }
289}
290
291impl fmt::Debug for MetricExporterBuilder {
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 f.write_str("MetricExporterBuilder")
294 }
295}