opentelemetry_spanprocessor_any/sdk/export/metrics/mod.rs
1//! Metrics Export
2use crate::sdk::resource::Resource;
3use crate::{
4 attributes,
5 metrics::{Descriptor, InstrumentKind, Number, Result},
6};
7use std::any::Any;
8use std::fmt;
9use std::sync::Arc;
10use std::time::SystemTime;
11
12mod aggregation;
13pub mod stdout;
14
15pub use aggregation::{
16 Buckets, Count, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Sum,
17};
18pub use stdout::stdout;
19
20/// Processor is responsible for deciding which kind of aggregation to use (via
21/// `aggregation_selector`), gathering exported results from the SDK during
22/// collection, and deciding over which dimensions to group the exported data.
23///
24/// The SDK supports binding only one of these interfaces, as it has the sole
25/// responsibility of determining which Aggregator to use for each record.
26///
27/// The embedded AggregatorSelector interface is called (concurrently) in
28/// instrumentation context to select the appropriate Aggregator for an
29/// instrument.
30pub trait Processor: fmt::Debug {
31 /// AggregatorSelector is responsible for selecting the
32 /// concrete type of Aggregator used for a metric in the SDK.
33 ///
34 /// This may be a static decision based on fields of the
35 /// Descriptor, or it could use an external configuration
36 /// source to customize the treatment of each metric
37 /// instrument.
38 ///
39 /// The result from AggregatorSelector.AggregatorFor should be
40 /// the same type for a given Descriptor or else nil. The same
41 /// type should be returned for a given descriptor, because
42 /// Aggregators only know how to Merge with their own type. If
43 /// the result is nil, the metric instrument will be disabled.
44 ///
45 /// Note that the SDK only calls AggregatorFor when new records
46 /// require an Aggregator. This does not provide a way to
47 /// disable metrics with active records.
48 fn aggregation_selector(&self) -> &dyn AggregatorSelector;
49}
50
51/// A locked processor.
52///
53/// The `Process` method is called during collection in a single-threaded
54/// context from the SDK, after the aggregator is checkpointed, allowing the
55/// processor to build the set of metrics currently being exported.
56pub trait LockedProcessor {
57 /// Process is called by the SDK once per internal record, passing the export
58 /// Accumulation (a Descriptor, the corresponding Attributes, and the checkpointed
59 /// Aggregator).
60 ///
61 /// The Context argument originates from the controller that orchestrates
62 /// collection.
63 fn process(&mut self, accumulation: Accumulation<'_>) -> Result<()>;
64}
65
66/// AggregatorSelector supports selecting the kind of `Aggregator` to use at
67/// runtime for a specific metric instrument.
68pub trait AggregatorSelector: fmt::Debug {
69 /// This allocates a variable number of aggregators of a kind suitable for
70 /// the requested export.
71 ///
72 /// When the call returns `None`, the metric instrument is explicitly disabled.
73 ///
74 /// This must return a consistent type to avoid confusion in later stages of
75 /// the metrics export process, e.g., when merging or checkpointing
76 /// aggregators for a specific instrument.
77 ///
78 /// This call should not block.
79 fn aggregator_for(&self, descriptor: &Descriptor) -> Option<Arc<dyn Aggregator + Send + Sync>>;
80}
81
82/// The interface used by a `Controller` to coordinate the `Processor` with
83/// `Accumulator`(s) and `Exporter`(s). The `start_collection` and
84/// `finish_collection` methods start and finish a collection interval.
85/// `Controller`s call the `Accumulator`(s) during collection to process
86/// `Accumulation`s.
87pub trait Checkpointer: LockedProcessor {
88 /// A checkpoint of the current data set. This may be called before and after
89 /// collection. The implementation is required to return the same value
90 /// throughout its lifetime.
91 fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet;
92
93 /// Logic to be run at the start of a collection interval.
94 fn start_collection(&mut self);
95
96 /// Cleanup logic or other behavior that needs to be run after a collection
97 /// interval is complete.
98 fn finish_collection(&mut self) -> Result<()>;
99}
100
101/// Aggregator implements a specific aggregation behavior, i.e., a behavior to
102/// track a sequence of updates to an instrument. Sum-only instruments commonly
103/// use a simple Sum aggregator, but for the distribution instruments
104/// (ValueRecorder, ValueObserver) there are a number of possible aggregators
105/// with different cost and accuracy tradeoffs.
106///
107/// Note that any Aggregator may be attached to any instrument--this is the
108/// result of the OpenTelemetry API/SDK separation. It is possible to attach a
109/// Sum aggregator to a ValueRecorder instrument or a MinMaxSumCount aggregator
110/// to a Counter instrument.
111pub trait Aggregator: fmt::Debug {
112 /// Update receives a new measured value and incorporates it into the
113 /// aggregation. Update calls may be called concurrently.
114 ///
115 /// `Descriptor::number_kind` should be consulted to determine whether the
116 /// provided number is an `i64`, `u64` or `f64`.
117 ///
118 /// The current Context could be inspected for a `Baggage` or
119 /// `SpanContext`.
120 fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()>;
121
122 /// This method is called during collection to finish one period of aggregation
123 /// by atomically saving the currently-updating state into the argument
124 /// Aggregator.
125 ///
126 /// `synchronized_move` is called concurrently with `update`. These two methods
127 /// must be synchronized with respect to each other, for correctness.
128 ///
129 /// This method will return an `InconsistentAggregator` error if this
130 /// `Aggregator` cannot be copied into the destination due to an incompatible
131 /// type.
132 ///
133 /// This call has no `Context` argument because it is expected to perform only
134 /// computation.
135 fn synchronized_move(
136 &self,
137 destination: &Arc<dyn Aggregator + Send + Sync>,
138 descriptor: &Descriptor,
139 ) -> Result<()>;
140
141 /// This combines the checkpointed state from the argument `Aggregator` into this
142 /// `Aggregator`. `merge` is not synchronized with respect to `update` or
143 /// `synchronized_move`.
144 ///
145 /// The owner of an `Aggregator` being merged is responsible for synchronization
146 /// of both `Aggregator` states.
147 fn merge(&self, other: &(dyn Aggregator + Send + Sync), descriptor: &Descriptor) -> Result<()>;
148
149 /// Returns the implementing aggregator as `Any` for downcasting.
150 fn as_any(&self) -> &dyn Any;
151}
152
153/// An optional interface implemented by some Aggregators. An Aggregator must
154/// support `subtract()` in order to be configured for a Precomputed-Sum
155/// instrument (SumObserver, UpDownSumObserver) using a DeltaExporter.
156pub trait Subtractor {
157 /// Subtract subtracts the `operand` from this Aggregator and outputs the value
158 /// in `result`.
159 fn subtract(
160 &self,
161 operand: &(dyn Aggregator + Send + Sync),
162 result: &(dyn Aggregator + Send + Sync),
163 descriptor: &Descriptor,
164 ) -> Result<()>;
165}
166
167/// Exporter handles presentation of the checkpoint of aggregate metrics. This
168/// is the final stage of a metrics export pipeline, where metric data are
169/// formatted for a specific system.
170pub trait Exporter: ExportKindFor {
171 /// Export is called immediately after completing a collection pass in the SDK.
172 ///
173 /// The CheckpointSet interface refers to the Processor that just completed
174 /// collection.
175 fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> Result<()>;
176}
177
178/// ExportKindSelector is a sub-interface of Exporter used to indicate
179/// whether the Processor should compute Delta or Cumulative
180/// Aggregations.
181pub trait ExportKindFor: fmt::Debug {
182 /// Determines the correct `ExportKind` that should be used when exporting data
183 /// for the given metric instrument.
184 fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind;
185}
186
187/// CheckpointSet allows a controller to access a complete checkpoint of
188/// aggregated metrics from the Processor. This is passed to the `Exporter`
189/// which may then use `try_for_each` to iterate over the collection of
190/// aggregated metrics.
191pub trait CheckpointSet: fmt::Debug {
192 /// This iterates over aggregated checkpoints for all metrics that were updated
193 /// during the last collection period. Each aggregated checkpoint returned by
194 /// the function parameter may return an error.
195 ///
196 /// The `ExportKindSelector` argument is used to determine whether the `Record`
197 /// is computed using delta or cumulative aggregation.
198 ///
199 /// ForEach tolerates `MetricsError::NoData` silently, as this is expected from
200 /// the Meter implementation. Any other kind of error will immediately halt and
201 /// return the error to the caller.
202 fn try_for_each(
203 &mut self,
204 export_selector: &dyn ExportKindFor,
205 f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
206 ) -> Result<()>;
207}
208
209/// Allows `Accumulator` implementations to construct new `Accumulation`s to
210/// send to `Processor`s. The `Descriptor`, `Attributes`, `Resource`, and
211/// `Aggregator` represent aggregate metric events received over a single
212/// collection period.
213pub fn accumulation<'a>(
214 descriptor: &'a Descriptor,
215 attributes: &'a attributes::AttributeSet,
216 resource: &'a Resource,
217 aggregator: &'a Arc<dyn Aggregator + Send + Sync>,
218) -> Accumulation<'a> {
219 Accumulation::new(descriptor, attributes, resource, aggregator)
220}
221
222/// Allows `Processor` implementations to construct export records. The
223/// `Descriptor`, `Attributes`, and `Aggregator` represent aggregate metric events
224/// received over a single collection period.
225pub fn record<'a>(
226 descriptor: &'a Descriptor,
227 attributes: &'a attributes::AttributeSet,
228 resource: &'a Resource,
229 aggregator: Option<&'a Arc<dyn Aggregator + Send + Sync>>,
230 start: SystemTime,
231 end: SystemTime,
232) -> Record<'a> {
233 Record {
234 metadata: Metadata::new(descriptor, attributes, resource),
235 aggregator,
236 start,
237 end,
238 }
239}
240
241impl Record<'_> {
242 /// The aggregator for this metric
243 pub fn aggregator(&self) -> Option<&Arc<dyn Aggregator + Send + Sync>> {
244 self.aggregator
245 }
246}
247
248/// A container for the common elements for exported metric data that are shared
249/// by the `Accumulator`->`Processor` and `Processor`->`Exporter` steps.
250#[derive(Debug)]
251pub struct Metadata<'a> {
252 descriptor: &'a Descriptor,
253 attributes: &'a attributes::AttributeSet,
254 resource: &'a Resource,
255}
256
257impl<'a> Metadata<'a> {
258 /// Create a new `Metadata` instance.
259 pub fn new(
260 descriptor: &'a Descriptor,
261 attributes: &'a attributes::AttributeSet,
262 resource: &'a Resource,
263 ) -> Self {
264 {
265 Metadata {
266 descriptor,
267 attributes,
268 resource,
269 }
270 }
271 }
272
273 /// A description of the metric instrument being exported.
274 pub fn descriptor(&self) -> &Descriptor {
275 self.descriptor
276 }
277
278 /// The attributes associated with the instrument and the aggregated data.
279 pub fn attributes(&self) -> &attributes::AttributeSet {
280 self.attributes
281 }
282
283 /// Common attributes that apply to this metric event.
284 pub fn resource(&self) -> &Resource {
285 self.resource
286 }
287}
288
289/// A container for the exported data for a single metric instrument and attribute
290/// set, as prepared by the `Processor` for the `Exporter`. This includes the
291/// effective start and end time for the aggregation.
292#[derive(Debug)]
293pub struct Record<'a> {
294 metadata: Metadata<'a>,
295 aggregator: Option<&'a Arc<dyn Aggregator + Send + Sync>>,
296 start: SystemTime,
297 end: SystemTime,
298}
299
300impl Record<'_> {
301 /// A description of the metric instrument being exported.
302 pub fn descriptor(&self) -> &Descriptor {
303 self.metadata.descriptor
304 }
305
306 /// The attributes associated with the instrument and the aggregated data.
307 pub fn attributes(&self) -> &attributes::AttributeSet {
308 self.metadata.attributes
309 }
310
311 /// Common attributes that apply to this metric event.
312 pub fn resource(&self) -> &Resource {
313 self.metadata.resource
314 }
315
316 /// The start time of the interval covered by this aggregation.
317 pub fn start_time(&self) -> &SystemTime {
318 &self.start
319 }
320
321 /// The end time of the interval covered by this aggregation.
322 pub fn end_time(&self) -> &SystemTime {
323 &self.end
324 }
325}
326
327/// A container for the exported data for a single metric instrument and attribute
328/// set, as prepared by an `Accumulator` for the `Processor`.
329#[derive(Debug)]
330pub struct Accumulation<'a> {
331 metadata: Metadata<'a>,
332 aggregator: &'a Arc<dyn Aggregator + Send + Sync>,
333}
334
335impl<'a> Accumulation<'a> {
336 /// Create a new `Record` instance.
337 pub fn new(
338 descriptor: &'a Descriptor,
339 attributes: &'a attributes::AttributeSet,
340 resource: &'a Resource,
341 aggregator: &'a Arc<dyn Aggregator + Send + Sync>,
342 ) -> Self {
343 Accumulation {
344 metadata: Metadata::new(descriptor, attributes, resource),
345 aggregator,
346 }
347 }
348
349 /// A description of the metric instrument being exported.
350 pub fn descriptor(&self) -> &Descriptor {
351 self.metadata.descriptor
352 }
353
354 /// The attributes associated with the instrument and the aggregated data.
355 pub fn attributes(&self) -> &attributes::AttributeSet {
356 self.metadata.attributes
357 }
358
359 /// Common attributes that apply to this metric event.
360 pub fn resource(&self) -> &Resource {
361 self.metadata.resource
362 }
363
364 /// The checkpointed aggregator for this metric.
365 pub fn aggregator(&self) -> &Arc<dyn Aggregator + Send + Sync> {
366 self.aggregator
367 }
368}
369
370/// Indicates the kind of data exported by an exporter.
371/// These bits may be OR-d together when multiple exporters are in use.
372#[derive(Clone, Debug)]
373pub enum ExportKind {
374 /// Indicates that the `Exporter` expects a cumulative `Aggregation`.
375 Cumulative = 1,
376
377 /// Indicates that the `Exporter` expects a delta `Aggregation`.
378 Delta = 2,
379}
380
381/// Strategies for selecting which export kind is used for an instrument.
382#[derive(Debug, Clone)]
383pub enum ExportKindSelector {
384 /// A selector that always returns [`ExportKind::Cumulative`].
385 Cumulative,
386 /// A selector that always returns [`ExportKind::Delta`].
387 Delta,
388 /// A selector that returns cumulative or delta based on a given instrument
389 /// kind.
390 Stateless,
391}
392
393impl ExportKind {
394 /// Tests whether `kind` includes a specific kind of exporter.
395 pub fn includes(&self, has: &ExportKind) -> bool {
396 (self.clone() as u32) & (has.clone() as u32) != 0
397 }
398
399 /// Returns whether an exporter of this kind requires memory to export correctly.
400 pub fn memory_required(&self, kind: &InstrumentKind) -> bool {
401 match kind {
402 InstrumentKind::ValueRecorder
403 | InstrumentKind::ValueObserver
404 | InstrumentKind::Counter
405 | InstrumentKind::UpDownCounter => {
406 // Delta-oriented instruments:
407 self.includes(&ExportKind::Cumulative)
408 }
409
410 InstrumentKind::SumObserver | InstrumentKind::UpDownSumObserver => {
411 // Cumulative-oriented instruments:
412 self.includes(&ExportKind::Delta)
413 }
414 }
415 }
416}
417
418impl ExportKindFor for ExportKindSelector {
419 fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind {
420 match self {
421 ExportKindSelector::Cumulative => ExportKind::Cumulative,
422 ExportKindSelector::Delta => ExportKind::Delta,
423 ExportKindSelector::Stateless => {
424 if descriptor.instrument_kind().precomputed_sum() {
425 ExportKind::Cumulative
426 } else {
427 ExportKind::Delta
428 }
429 }
430 }
431 }
432}