opentelemetry_spanprocessor_any/sdk/export/metrics/
stdout.rs1use crate::global;
3use crate::sdk::{
4 export::metrics::{
5 CheckpointSet, Count, ExportKind, ExportKindFor, ExportKindSelector, Exporter, LastValue,
6 Max, Min, Sum,
7 },
8 metrics::{
9 aggregators::{
10 ArrayAggregator, HistogramAggregator, LastValueAggregator, MinMaxSumCountAggregator,
11 SumAggregator,
12 },
13 controllers::{self, PushController, PushControllerWorker},
14 selectors::simple,
15 },
16};
17use crate::{
18 attributes::{default_encoder, AttributeSet, Encoder},
19 metrics::{Descriptor, MetricsError, Result},
20 KeyValue,
21};
22use futures_util::stream::Stream;
23#[cfg(feature = "serialize")]
24use serde::{Serialize, Serializer};
25use std::fmt;
26use std::io;
27use std::iter;
28use std::sync::Mutex;
29use std::time::{Duration, SystemTime};
30
31pub fn stdout<S, SO, I, IS, ISI>(spawn: S, interval: I) -> StdoutExporterBuilder<io::Stdout, S, I>
33where
34 S: Fn(PushControllerWorker) -> SO,
35 I: Fn(Duration) -> IS,
36 IS: Stream<Item = ISI> + Send + 'static,
37{
38 StdoutExporterBuilder::<io::Stdout, S, I>::builder(spawn, interval)
39}
40
41#[derive(Debug)]
43pub struct StdoutExporter<W> {
44 writer: Mutex<W>,
46 do_not_print_time: bool,
49 attribute_encoder: Box<dyn Encoder + Send + Sync>,
51 formatter: Option<Formatter>,
53}
54
55#[cfg_attr(feature = "serialize", derive(Serialize))]
57#[derive(Default, Debug)]
58pub struct ExportBatch {
59 #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
60 timestamp: Option<SystemTime>,
61 lines: Vec<ExportLine>,
62}
63
64#[cfg_attr(feature = "serialize", derive(Serialize))]
65#[derive(Default, Debug)]
66struct ExportLine {
67 name: String,
68 #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
69 min: Option<ExportNumeric>,
70 #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
71 max: Option<ExportNumeric>,
72 #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
73 sum: Option<ExportNumeric>,
74 count: u64,
75 #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
76 last_value: Option<ExportNumeric>,
77
78 #[cfg_attr(feature = "serialize", serde(skip_serializing_if = "Option::is_none"))]
79 timestamp: Option<SystemTime>,
80}
81
82pub struct ExportNumeric(Box<dyn fmt::Debug>);
84
85impl fmt::Debug for ExportNumeric {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 self.0.fmt(f)
88 }
89}
90
91#[cfg(feature = "serialize")]
92impl Serialize for ExportNumeric {
93 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
94 where
95 S: Serializer,
96 {
97 let s = format!("{:?}", self);
98 serializer.serialize_str(&s)
99 }
100}
101
102impl<W> Exporter for StdoutExporter<W>
103where
104 W: fmt::Debug + io::Write,
105{
106 fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> Result<()> {
107 let mut batch = ExportBatch::default();
108 if !self.do_not_print_time {
109 batch.timestamp = Some(crate::time::now());
110 }
111 checkpoint_set.try_for_each(self, &mut |record| {
112 let agg = record.aggregator().ok_or(MetricsError::NoDataCollected)?;
113 let desc = record.descriptor();
114 let kind = desc.number_kind();
115 let encoded_resource = record.resource().encoded(self.attribute_encoder.as_ref());
116 let encoded_inst_attributes = if !desc.instrumentation_name().is_empty() {
117 let inst_attributes = AttributeSet::from_attributes(iter::once(KeyValue::new(
118 "instrumentation.name",
119 desc.instrumentation_name().to_owned(),
120 )));
121 inst_attributes.encoded(Some(self.attribute_encoder.as_ref()))
122 } else {
123 String::new()
124 };
125
126 let mut expose = ExportLine::default();
127
128 if let Some(array) = agg.as_any().downcast_ref::<ArrayAggregator>() {
129 expose.count = array.count()?;
130 }
131
132 if let Some(last_value) = agg.as_any().downcast_ref::<LastValueAggregator>() {
133 let (value, timestamp) = last_value.last_value()?;
134 expose.last_value = Some(ExportNumeric(value.to_debug(kind)));
135
136 if !self.do_not_print_time {
137 expose.timestamp = Some(timestamp);
138 }
139 }
140
141 if let Some(histogram) = agg.as_any().downcast_ref::<HistogramAggregator>() {
142 expose.sum = Some(ExportNumeric(histogram.sum()?.to_debug(kind)));
143 expose.count = histogram.count()?;
144 }
146
147 if let Some(mmsc) = agg.as_any().downcast_ref::<MinMaxSumCountAggregator>() {
148 expose.min = Some(ExportNumeric(mmsc.min()?.to_debug(kind)));
149 expose.max = Some(ExportNumeric(mmsc.max()?.to_debug(kind)));
150 expose.sum = Some(ExportNumeric(mmsc.sum()?.to_debug(kind)));
151 expose.count = mmsc.count()?;
152 }
153
154 if let Some(sum) = agg.as_any().downcast_ref::<SumAggregator>() {
155 expose.sum = Some(ExportNumeric(sum.sum()?.to_debug(kind)));
156 }
157
158 let mut encoded_attributes = String::new();
159 let iter = record.attributes().iter();
160 if let (0, _) = iter.size_hint() {
161 encoded_attributes = record
162 .attributes()
163 .encoded(Some(self.attribute_encoder.as_ref()));
164 }
165
166 let mut sb = String::new();
167
168 sb.push_str(desc.name());
169
170 if !encoded_attributes.is_empty()
171 || !encoded_resource.is_empty()
172 || !encoded_inst_attributes.is_empty()
173 {
174 sb.push('{');
175 sb.push_str(&encoded_resource);
176 if !encoded_inst_attributes.is_empty() && !encoded_resource.is_empty() {
177 sb.push(',');
178 }
179 sb.push_str(&encoded_inst_attributes);
180 if !encoded_attributes.is_empty()
181 && (!encoded_inst_attributes.is_empty() || !encoded_resource.is_empty())
182 {
183 sb.push(',');
184 }
185 sb.push_str(&encoded_attributes);
186 sb.push('}');
187 }
188
189 expose.name = sb;
190
191 batch.lines.push(expose);
192 Ok(())
193 })?;
194
195 self.writer.lock().map_err(From::from).and_then(|mut w| {
196 let formatted = match &self.formatter {
197 Some(formatter) => formatter.0(batch)?,
198 None => format!("{:?}\n", batch),
199 };
200 w.write_all(formatted.as_bytes())
201 .map_err(|e| MetricsError::Other(e.to_string()))
202 })
203 }
204}
205
206impl<W> ExportKindFor for StdoutExporter<W>
207where
208 W: fmt::Debug + io::Write,
209{
210 fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind {
211 ExportKindSelector::Stateless.export_kind_for(descriptor)
212 }
213}
214
215pub struct Formatter(Box<dyn Fn(ExportBatch) -> Result<String> + Send + Sync>);
217impl fmt::Debug for Formatter {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 write!(f, "Formatter(closure)")
220 }
221}
222
223#[derive(Debug)]
225pub struct StdoutExporterBuilder<W, S, I> {
226 spawn: S,
227 interval: I,
228 writer: Mutex<W>,
229 do_not_print_time: bool,
230 quantiles: Option<Vec<f64>>,
231 attribute_encoder: Option<Box<dyn Encoder + Send + Sync>>,
232 period: Option<Duration>,
233 formatter: Option<Formatter>,
234}
235
236impl<W, S, SO, I, IS, ISI> StdoutExporterBuilder<W, S, I>
237where
238 W: io::Write + fmt::Debug + Send + Sync + 'static,
239 S: Fn(PushControllerWorker) -> SO,
240 I: Fn(Duration) -> IS,
241 IS: Stream<Item = ISI> + Send + 'static,
242{
243 fn builder(spawn: S, interval: I) -> StdoutExporterBuilder<io::Stdout, S, I> {
244 StdoutExporterBuilder {
245 spawn,
246 interval,
247 writer: Mutex::new(io::stdout()),
248 do_not_print_time: false,
249 quantiles: None,
250 attribute_encoder: None,
251 period: None,
252 formatter: None,
253 }
254 }
255 pub fn with_writer<W2: io::Write>(self, writer: W2) -> StdoutExporterBuilder<W2, S, I> {
257 StdoutExporterBuilder {
258 spawn: self.spawn,
259 interval: self.interval,
260 writer: Mutex::new(writer),
261 do_not_print_time: self.do_not_print_time,
262 quantiles: self.quantiles,
263 attribute_encoder: self.attribute_encoder,
264 period: self.period,
265 formatter: self.formatter,
266 }
267 }
268
269 pub fn with_do_not_print_time(self, do_not_print_time: bool) -> Self {
271 StdoutExporterBuilder {
272 do_not_print_time,
273 ..self
274 }
275 }
276
277 pub fn with_attribute_encoder<E>(self, attribute_encoder: E) -> Self
279 where
280 E: Encoder + Send + Sync + 'static,
281 {
282 StdoutExporterBuilder {
283 attribute_encoder: Some(Box::new(attribute_encoder)),
284 ..self
285 }
286 }
287
288 pub fn with_period(self, period: Duration) -> Self {
290 StdoutExporterBuilder {
291 period: Some(period),
292 ..self
293 }
294 }
295
296 pub fn with_formatter<T>(self, formatter: T) -> Self
298 where
299 T: Fn(ExportBatch) -> Result<String> + Send + Sync + 'static,
300 {
301 StdoutExporterBuilder {
302 formatter: Some(Formatter(Box::new(formatter))),
303 ..self
304 }
305 }
306
307 pub fn init(mut self) -> PushController {
309 let period = self.period.take();
310 let exporter = StdoutExporter {
311 writer: self.writer,
312 do_not_print_time: self.do_not_print_time,
313 attribute_encoder: self.attribute_encoder.unwrap_or_else(default_encoder),
314 formatter: self.formatter,
315 };
316 let mut push_builder = controllers::push(
317 simple::Selector::Exact,
318 ExportKindSelector::Stateless,
319 exporter,
320 self.spawn,
321 self.interval,
322 );
323 if let Some(period) = period {
324 push_builder = push_builder.with_period(period);
325 }
326
327 let controller = push_builder.build();
328 global::set_meter_provider(controller.provider());
329 controller
330 }
331}