opentelemetry_spanprocessor_any/sdk/export/metrics/
stdout.rs

1//! Stdout Metrics Exporter
2use crate::global;
3use crate::sdk::{
4    export::metrics::{
5        CheckpointSet, Count, ExportKind, ExportKindFor, ExportKindSelector, Exporter, LastValue,
6        Max, Min, Sum,
7    },
8    metrics::{
9        aggregators::{
10            ArrayAggregator, HistogramAggregator, LastValueAggregator, MinMaxSumCountAggregator,
11            SumAggregator,
12        },
13        controllers::{self, PushController, PushControllerWorker},
14        selectors::simple,
15    },
16};
17use crate::{
18    attributes::{default_encoder, AttributeSet, Encoder},
19    metrics::{Descriptor, MetricsError, Result},
20    KeyValue,
21};
22use futures_util::stream::Stream;
23#[cfg(feature = "serialize")]
24use serde::{Serialize, Serializer};
25use std::fmt;
26use std::io;
27use std::iter;
28use std::sync::Mutex;
29use std::time::{Duration, SystemTime};
30
31/// Create a new stdout exporter builder with the configuration for a stdout exporter.
32pub fn stdout<S, SO, I, IS, ISI>(spawn: S, interval: I) -> StdoutExporterBuilder<io::Stdout, S, I>
33where
34    S: Fn(PushControllerWorker) -> SO,
35    I: Fn(Duration) -> IS,
36    IS: Stream<Item = ISI> + Send + 'static,
37{
38    StdoutExporterBuilder::<io::Stdout, S, I>::builder(spawn, interval)
39}
40
41///
42#[derive(Debug)]
43pub struct StdoutExporter<W> {
44    /// Writer is the destination. If not set, `Stdout` is used.
45    writer: Mutex<W>,
46    /// Suppresses timestamp printing. This is useful to create deterministic test
47    /// conditions.
48    do_not_print_time: bool,
49    /// Encodes the attributes.
50    attribute_encoder: Box<dyn Encoder + Send + Sync>,
51    /// An optional user-defined function to format a given export batch.
52    formatter: Option<Formatter>,
53}
54
55/// A collection of exported lines
56#[cfg_attr(feature = "serialize", derive(Serialize))]
57#[derive(Default, Debug)]
58pub struct ExportBatch {
59    #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
60    timestamp: Option<SystemTime>,
61    lines: Vec<ExportLine>,
62}
63
64#[cfg_attr(feature = "serialize", derive(Serialize))]
65#[derive(Default, Debug)]
66struct ExportLine {
67    name: String,
68    #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
69    min: Option<ExportNumeric>,
70    #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
71    max: Option<ExportNumeric>,
72    #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
73    sum: Option<ExportNumeric>,
74    count: u64,
75    #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
76    last_value: Option<ExportNumeric>,
77
78    #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
79    timestamp: Option<SystemTime>,
80}
81
82/// A number exported as debug for serialization
83pub struct ExportNumeric(Box<dyn fmt::Debug>);
84
85impl fmt::Debug for ExportNumeric {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        self.0.fmt(f)
88    }
89}
90
91#[cfg(feature = "serialize")]
92impl Serialize for ExportNumeric {
93    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
94    where
95        S: Serializer,
96    {
97        let s = format!("{:?}", self);
98        serializer.serialize_str(&s)
99    }
100}
101
102impl<W> Exporter for StdoutExporter<W>
103where
104    W: fmt::Debug + io::Write,
105{
106    fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> Result<()> {
107        let mut batch = ExportBatch::default();
108        if !self.do_not_print_time {
109            batch.timestamp = Some(crate::time::now());
110        }
111        checkpoint_set.try_for_each(self, &mut |record| {
112            let agg = record.aggregator().ok_or(MetricsError::NoDataCollected)?;
113            let desc = record.descriptor();
114            let kind = desc.number_kind();
115            let encoded_resource = record.resource().encoded(self.attribute_encoder.as_ref());
116            let encoded_inst_attributes = if !desc.instrumentation_name().is_empty() {
117                let inst_attributes = AttributeSet::from_attributes(iter::once(KeyValue::new(
118                    "instrumentation.name",
119                    desc.instrumentation_name().to_owned(),
120                )));
121                inst_attributes.encoded(Some(self.attribute_encoder.as_ref()))
122            } else {
123                String::new()
124            };
125
126            let mut expose = ExportLine::default();
127
128            if let Some(array) = agg.as_any().downcast_ref::<ArrayAggregator>() {
129                expose.count = array.count()?;
130            }
131
132            if let Some(last_value) = agg.as_any().downcast_ref::<LastValueAggregator>() {
133                let (value, timestamp) = last_value.last_value()?;
134                expose.last_value = Some(ExportNumeric(value.to_debug(kind)));
135
136                if !self.do_not_print_time {
137                    expose.timestamp = Some(timestamp);
138                }
139            }
140
141            if let Some(histogram) = agg.as_any().downcast_ref::<HistogramAggregator>() {
142                expose.sum = Some(ExportNumeric(histogram.sum()?.to_debug(kind)));
143                expose.count = histogram.count()?;
144                // TODO expose buckets
145            }
146
147            if let Some(mmsc) = agg.as_any().downcast_ref::<MinMaxSumCountAggregator>() {
148                expose.min = Some(ExportNumeric(mmsc.min()?.to_debug(kind)));
149                expose.max = Some(ExportNumeric(mmsc.max()?.to_debug(kind)));
150                expose.sum = Some(ExportNumeric(mmsc.sum()?.to_debug(kind)));
151                expose.count = mmsc.count()?;
152            }
153
154            if let Some(sum) = agg.as_any().downcast_ref::<SumAggregator>() {
155                expose.sum = Some(ExportNumeric(sum.sum()?.to_debug(kind)));
156            }
157
158            let mut encoded_attributes = String::new();
159            let iter = record.attributes().iter();
160            if let (0, _) = iter.size_hint() {
161                encoded_attributes = record
162                    .attributes()
163                    .encoded(Some(self.attribute_encoder.as_ref()));
164            }
165
166            let mut sb = String::new();
167
168            sb.push_str(desc.name());
169
170            if !encoded_attributes.is_empty()
171                || !encoded_resource.is_empty()
172                || !encoded_inst_attributes.is_empty()
173            {
174                sb.push('{');
175                sb.push_str(&encoded_resource);
176                if !encoded_inst_attributes.is_empty() && !encoded_resource.is_empty() {
177                    sb.push(',');
178                }
179                sb.push_str(&encoded_inst_attributes);
180                if !encoded_attributes.is_empty()
181                    && (!encoded_inst_attributes.is_empty() || !encoded_resource.is_empty())
182                {
183                    sb.push(',');
184                }
185                sb.push_str(&encoded_attributes);
186                sb.push('}');
187            }
188
189            expose.name = sb;
190
191            batch.lines.push(expose);
192            Ok(())
193        })?;
194
195        self.writer.lock().map_err(From::from).and_then(|mut w| {
196            let formatted = match &self.formatter {
197                Some(formatter) => formatter.0(batch)?,
198                None => format!("{:?}\n", batch),
199            };
200            w.write_all(formatted.as_bytes())
201                .map_err(|e| MetricsError::Other(e.to_string()))
202        })
203    }
204}
205
206impl<W> ExportKindFor for StdoutExporter<W>
207where
208    W: fmt::Debug + io::Write,
209{
210    fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind {
211        ExportKindSelector::Stateless.export_kind_for(descriptor)
212    }
213}
214
215/// A formatter for user-defined batch serialization.
216pub struct Formatter(Box<dyn Fn(ExportBatch) -> Result<String> + Send + Sync>);
217impl fmt::Debug for Formatter {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        write!(f, "Formatter(closure)")
220    }
221}
222
223/// Configuration for a given stdout exporter.
224#[derive(Debug)]
225pub struct StdoutExporterBuilder<W, S, I> {
226    spawn: S,
227    interval: I,
228    writer: Mutex<W>,
229    do_not_print_time: bool,
230    quantiles: Option<Vec<f64>>,
231    attribute_encoder: Option<Box<dyn Encoder + Send + Sync>>,
232    period: Option<Duration>,
233    formatter: Option<Formatter>,
234}
235
236impl<W, S, SO, I, IS, ISI> StdoutExporterBuilder<W, S, I>
237where
238    W: io::Write + fmt::Debug + Send + Sync + 'static,
239    S: Fn(PushControllerWorker) -> SO,
240    I: Fn(Duration) -> IS,
241    IS: Stream<Item = ISI> + Send + 'static,
242{
243    fn builder(spawn: S, interval: I) -> StdoutExporterBuilder<io::Stdout, S, I> {
244        StdoutExporterBuilder {
245            spawn,
246            interval,
247            writer: Mutex::new(io::stdout()),
248            do_not_print_time: false,
249            quantiles: None,
250            attribute_encoder: None,
251            period: None,
252            formatter: None,
253        }
254    }
255    /// Set the writer that this exporter will use.
256    pub fn with_writer<W2: io::Write>(self, writer: W2) -> StdoutExporterBuilder<W2, S, I> {
257        StdoutExporterBuilder {
258            spawn: self.spawn,
259            interval: self.interval,
260            writer: Mutex::new(writer),
261            do_not_print_time: self.do_not_print_time,
262            quantiles: self.quantiles,
263            attribute_encoder: self.attribute_encoder,
264            period: self.period,
265            formatter: self.formatter,
266        }
267    }
268
269    /// Hide the timestamps from exported results
270    pub fn with_do_not_print_time(self, do_not_print_time: bool) -> Self {
271        StdoutExporterBuilder {
272            do_not_print_time,
273            ..self
274        }
275    }
276
277    /// Set the attribute encoder that this exporter will use.
278    pub fn with_attribute_encoder<E>(self, attribute_encoder: E) -> Self
279    where
280        E: Encoder + Send + Sync + 'static,
281    {
282        StdoutExporterBuilder {
283            attribute_encoder: Some(Box::new(attribute_encoder)),
284            ..self
285        }
286    }
287
288    /// Set the frequency in which metrics are exported.
289    pub fn with_period(self, period: Duration) -> Self {
290        StdoutExporterBuilder {
291            period: Some(period),
292            ..self
293        }
294    }
295
296    /// Set a formatter for serializing export batch data
297    pub fn with_formatter<T>(self, formatter: T) -> Self
298    where
299        T: Fn(ExportBatch) -> Result<String> + Send + Sync + 'static,
300    {
301        StdoutExporterBuilder {
302            formatter: Some(Formatter(Box::new(formatter))),
303            ..self
304        }
305    }
306
307    /// Build a new push controller, returning errors if they arise.
308    pub fn init(mut self) -> PushController {
309        let period = self.period.take();
310        let exporter = StdoutExporter {
311            writer: self.writer,
312            do_not_print_time: self.do_not_print_time,
313            attribute_encoder: self.attribute_encoder.unwrap_or_else(default_encoder),
314            formatter: self.formatter,
315        };
316        let mut push_builder = controllers::push(
317            simple::Selector::Exact,
318            ExportKindSelector::Stateless,
319            exporter,
320            self.spawn,
321            self.interval,
322        );
323        if let Some(period) = period {
324            push_builder = push_builder.with_period(period);
325        }
326
327        let controller = push_builder.build();
328        global::set_meter_provider(controller.provider());
329        controller
330    }
331}