opentelemetry_spanprocessor_any/sdk/export/metrics/
mod.rs

1//! Metrics Export
2use crate::sdk::resource::Resource;
3use crate::{
4    attributes,
5    metrics::{Descriptor, InstrumentKind, Number, Result},
6};
7use std::any::Any;
8use std::fmt;
9use std::sync::Arc;
10use std::time::SystemTime;
11
12mod aggregation;
13pub mod stdout;
14
15pub use aggregation::{
16    Buckets, Count, Histogram, LastValue, Max, Min, MinMaxSumCount, Points, Sum,
17};
18pub use stdout::stdout;
19
20/// Processor is responsible for deciding which kind of aggregation to use (via
21/// `aggregation_selector`), gathering exported results from the SDK during
22/// collection, and deciding over which dimensions to group the exported data.
23///
24/// The SDK supports binding only one of these interfaces, as it has the sole
25/// responsibility of determining which Aggregator to use for each record.
26///
27/// The embedded AggregatorSelector interface is called (concurrently) in
28/// instrumentation context to select the appropriate Aggregator for an
29/// instrument.
30pub trait Processor: fmt::Debug {
31    /// AggregatorSelector is responsible for selecting the
32    /// concrete type of Aggregator used for a metric in the SDK.
33    ///
34    /// This may be a static decision based on fields of the
35    /// Descriptor, or it could use an external configuration
36    /// source to customize the treatment of each metric
37    /// instrument.
38    ///
39    /// The result from AggregatorSelector.AggregatorFor should be
40    /// the same type for a given Descriptor or else nil.  The same
41    /// type should be returned for a given descriptor, because
42    /// Aggregators only know how to Merge with their own type.  If
43    /// the result is nil, the metric instrument will be disabled.
44    ///
45    /// Note that the SDK only calls AggregatorFor when new records
46    /// require an Aggregator. This does not provide a way to
47    /// disable metrics with active records.
48    fn aggregation_selector(&self) -> &dyn AggregatorSelector;
49}
50
51/// A locked processor.
52///
53/// The `Process` method is called during collection in a single-threaded
54/// context from the SDK, after the aggregator is checkpointed, allowing the
55/// processor to build the set of metrics currently being exported.
56pub trait LockedProcessor {
57    /// Process is called by the SDK once per internal record, passing the export
58    /// Accumulation (a Descriptor, the corresponding Attributes, and the checkpointed
59    /// Aggregator).
60    ///
61    /// The Context argument originates from the controller that orchestrates
62    /// collection.
63    fn process(&mut self, accumulation: Accumulation<'_>) -> Result<()>;
64}
65
66/// AggregatorSelector supports selecting the kind of `Aggregator` to use at
67/// runtime for a specific metric instrument.
68pub trait AggregatorSelector: fmt::Debug {
69    /// This allocates a variable number of aggregators of a kind suitable for
70    /// the requested export.
71    ///
72    /// When the call returns `None`, the metric instrument is explicitly disabled.
73    ///
74    /// This must return a consistent type to avoid confusion in later stages of
75    /// the metrics export process, e.g., when merging or checkpointing
76    /// aggregators for a specific instrument.
77    ///
78    /// This call should not block.
79    fn aggregator_for(&self, descriptor: &Descriptor) -> Option<Arc<dyn Aggregator + Send + Sync>>;
80}
81
82/// The interface used by a `Controller` to coordinate the `Processor` with
83/// `Accumulator`(s) and `Exporter`(s). The `start_collection` and
84/// `finish_collection` methods start and finish a collection interval.
85/// `Controller`s call the `Accumulator`(s) during collection to process
86/// `Accumulation`s.
87pub trait Checkpointer: LockedProcessor {
88    /// A checkpoint of the current data set. This may be called before and after
89    /// collection. The implementation is required to return the same value
90    /// throughout its lifetime.
91    fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet;
92
93    /// Logic to be run at the start of a collection interval.
94    fn start_collection(&mut self);
95
96    /// Cleanup logic or other behavior that needs to be run after a collection
97    /// interval is complete.
98    fn finish_collection(&mut self) -> Result<()>;
99}
100
101/// Aggregator implements a specific aggregation behavior, i.e., a behavior to
102/// track a sequence of updates to an instrument. Sum-only instruments commonly
103/// use a simple Sum aggregator, but for the distribution instruments
104/// (ValueRecorder, ValueObserver) there are a number of possible aggregators
105/// with different cost and accuracy tradeoffs.
106///
107/// Note that any Aggregator may be attached to any instrument--this is the
108/// result of the OpenTelemetry API/SDK separation. It is possible to attach a
109/// Sum aggregator to a ValueRecorder instrument or a MinMaxSumCount aggregator
110/// to a Counter instrument.
111pub trait Aggregator: fmt::Debug {
112    /// Update receives a new measured value and incorporates it into the
113    /// aggregation. Update calls may be called concurrently.
114    ///
115    /// `Descriptor::number_kind` should be consulted to determine whether the
116    /// provided number is an `i64`, `u64` or `f64`.
117    ///
118    /// The current Context could be inspected for a `Baggage` or
119    /// `SpanContext`.
120    fn update(&self, number: &Number, descriptor: &Descriptor) -> Result<()>;
121
122    /// This method is called during collection to finish one period of aggregation
123    /// by atomically saving the currently-updating state into the argument
124    /// Aggregator.
125    ///
126    /// `synchronized_move` is called concurrently with `update`. These two methods
127    /// must be synchronized with respect to each other, for correctness.
128    ///
129    /// This method will return an `InconsistentAggregator` error if this
130    /// `Aggregator` cannot be copied into the destination due to an incompatible
131    /// type.
132    ///
133    /// This call has no `Context` argument because it is expected to perform only
134    /// computation.
135    fn synchronized_move(
136        &self,
137        destination: &Arc<dyn Aggregator + Send + Sync>,
138        descriptor: &Descriptor,
139    ) -> Result<()>;
140
141    /// This combines the checkpointed state from the argument `Aggregator` into this
142    /// `Aggregator`. `merge` is not synchronized with respect to `update` or
143    /// `synchronized_move`.
144    ///
145    /// The owner of an `Aggregator` being merged is responsible for synchronization
146    /// of both `Aggregator` states.
147    fn merge(&self, other: &(dyn Aggregator + Send + Sync), descriptor: &Descriptor) -> Result<()>;
148
149    /// Returns the implementing aggregator as `Any` for downcasting.
150    fn as_any(&self) -> &dyn Any;
151}
152
153/// An optional interface implemented by some Aggregators. An Aggregator must
154/// support `subtract()` in order to be configured for a Precomputed-Sum
155/// instrument (SumObserver, UpDownSumObserver) using a DeltaExporter.
156pub trait Subtractor {
157    /// Subtract subtracts the `operand` from this Aggregator and outputs the value
158    /// in `result`.
159    fn subtract(
160        &self,
161        operand: &(dyn Aggregator + Send + Sync),
162        result: &(dyn Aggregator + Send + Sync),
163        descriptor: &Descriptor,
164    ) -> Result<()>;
165}
166
167/// Exporter handles presentation of the checkpoint of aggregate metrics. This
168/// is the final stage of a metrics export pipeline, where metric data are
169/// formatted for a specific system.
170pub trait Exporter: ExportKindFor {
171    /// Export is called immediately after completing a collection pass in the SDK.
172    ///
173    /// The CheckpointSet interface refers to the Processor that just completed
174    /// collection.
175    fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> Result<()>;
176}
177
178/// ExportKindSelector is a sub-interface of Exporter used to indicate
179/// whether the Processor should compute Delta or Cumulative
180/// Aggregations.
181pub trait ExportKindFor: fmt::Debug {
182    /// Determines the correct `ExportKind` that should be used when exporting data
183    /// for the given metric instrument.
184    fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind;
185}
186
187/// CheckpointSet allows a controller to access a complete checkpoint of
188/// aggregated metrics from the Processor. This is passed to the `Exporter`
189/// which may then use `try_for_each` to iterate over the collection of
190/// aggregated metrics.
191pub trait CheckpointSet: fmt::Debug {
192    /// This iterates over aggregated checkpoints for all metrics that were updated
193    /// during the last collection period. Each aggregated checkpoint returned by
194    /// the function parameter may return an error.
195    ///
196    /// The `ExportKindSelector` argument is used to determine whether the `Record`
197    /// is computed using delta or cumulative aggregation.
198    ///
199    /// ForEach tolerates `MetricsError::NoData` silently, as this is expected from
200    /// the Meter implementation. Any other kind of error will immediately halt and
201    /// return the error to the caller.
202    fn try_for_each(
203        &mut self,
204        export_selector: &dyn ExportKindFor,
205        f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
206    ) -> Result<()>;
207}
208
209/// Allows `Accumulator` implementations to construct new `Accumulation`s to
210/// send to `Processor`s. The `Descriptor`, `Attributes`, `Resource`, and
211/// `Aggregator` represent aggregate metric events received over a single
212/// collection period.
213pub fn accumulation<'a>(
214    descriptor: &'a Descriptor,
215    attributes: &'a attributes::AttributeSet,
216    resource: &'a Resource,
217    aggregator: &'a Arc<dyn Aggregator + Send + Sync>,
218) -> Accumulation<'a> {
219    Accumulation::new(descriptor, attributes, resource, aggregator)
220}
221
222/// Allows `Processor` implementations to construct export records. The
223/// `Descriptor`, `Attributes`, and `Aggregator` represent aggregate metric events
224/// received over a single collection period.
225pub fn record<'a>(
226    descriptor: &'a Descriptor,
227    attributes: &'a attributes::AttributeSet,
228    resource: &'a Resource,
229    aggregator: Option<&'a Arc<dyn Aggregator + Send + Sync>>,
230    start: SystemTime,
231    end: SystemTime,
232) -> Record<'a> {
233    Record {
234        metadata: Metadata::new(descriptor, attributes, resource),
235        aggregator,
236        start,
237        end,
238    }
239}
240
241impl Record<'_> {
242    /// The aggregator for this metric
243    pub fn aggregator(&self) -> Option<&Arc<dyn Aggregator + Send + Sync>> {
244        self.aggregator
245    }
246}
247
248/// A container for the common elements for exported metric data that are shared
249/// by the `Accumulator`->`Processor` and `Processor`->`Exporter` steps.
250#[derive(Debug)]
251pub struct Metadata<'a> {
252    descriptor: &'a Descriptor,
253    attributes: &'a attributes::AttributeSet,
254    resource: &'a Resource,
255}
256
257impl<'a> Metadata<'a> {
258    /// Create a new `Metadata` instance.
259    pub fn new(
260        descriptor: &'a Descriptor,
261        attributes: &'a attributes::AttributeSet,
262        resource: &'a Resource,
263    ) -> Self {
264        {
265            Metadata {
266                descriptor,
267                attributes,
268                resource,
269            }
270        }
271    }
272
273    /// A description of the metric instrument being exported.
274    pub fn descriptor(&self) -> &Descriptor {
275        self.descriptor
276    }
277
278    /// The attributes associated with the instrument and the aggregated data.
279    pub fn attributes(&self) -> &attributes::AttributeSet {
280        self.attributes
281    }
282
283    /// Common attributes that apply to this metric event.
284    pub fn resource(&self) -> &Resource {
285        self.resource
286    }
287}
288
289/// A container for the exported data for a single metric instrument and attribute
290/// set, as prepared by the `Processor` for the `Exporter`. This includes the
291/// effective start and end time for the aggregation.
292#[derive(Debug)]
293pub struct Record<'a> {
294    metadata: Metadata<'a>,
295    aggregator: Option<&'a Arc<dyn Aggregator + Send + Sync>>,
296    start: SystemTime,
297    end: SystemTime,
298}
299
300impl Record<'_> {
301    /// A description of the metric instrument being exported.
302    pub fn descriptor(&self) -> &Descriptor {
303        self.metadata.descriptor
304    }
305
306    /// The attributes associated with the instrument and the aggregated data.
307    pub fn attributes(&self) -> &attributes::AttributeSet {
308        self.metadata.attributes
309    }
310
311    /// Common attributes that apply to this metric event.
312    pub fn resource(&self) -> &Resource {
313        self.metadata.resource
314    }
315
316    /// The start time of the interval covered by this aggregation.
317    pub fn start_time(&self) -> &SystemTime {
318        &self.start
319    }
320
321    /// The end time of the interval covered by this aggregation.
322    pub fn end_time(&self) -> &SystemTime {
323        &self.end
324    }
325}
326
327/// A container for the exported data for a single metric instrument and attribute
328/// set, as prepared by an `Accumulator` for the `Processor`.
329#[derive(Debug)]
330pub struct Accumulation<'a> {
331    metadata: Metadata<'a>,
332    aggregator: &'a Arc<dyn Aggregator + Send + Sync>,
333}
334
335impl<'a> Accumulation<'a> {
336    /// Create a new `Record` instance.
337    pub fn new(
338        descriptor: &'a Descriptor,
339        attributes: &'a attributes::AttributeSet,
340        resource: &'a Resource,
341        aggregator: &'a Arc<dyn Aggregator + Send + Sync>,
342    ) -> Self {
343        Accumulation {
344            metadata: Metadata::new(descriptor, attributes, resource),
345            aggregator,
346        }
347    }
348
349    /// A description of the metric instrument being exported.
350    pub fn descriptor(&self) -> &Descriptor {
351        self.metadata.descriptor
352    }
353
354    /// The attributes associated with the instrument and the aggregated data.
355    pub fn attributes(&self) -> &attributes::AttributeSet {
356        self.metadata.attributes
357    }
358
359    /// Common attributes that apply to this metric event.
360    pub fn resource(&self) -> &Resource {
361        self.metadata.resource
362    }
363
364    /// The checkpointed aggregator for this metric.
365    pub fn aggregator(&self) -> &Arc<dyn Aggregator + Send + Sync> {
366        self.aggregator
367    }
368}
369
370/// Indicates the kind of data exported by an exporter.
371/// These bits may be OR-d together when multiple exporters are in use.
372#[derive(Clone, Debug)]
373pub enum ExportKind {
374    /// Indicates that the `Exporter` expects a cumulative `Aggregation`.
375    Cumulative = 1,
376
377    /// Indicates that the `Exporter` expects a delta `Aggregation`.
378    Delta = 2,
379}
380
381/// Strategies for selecting which export kind is used for an instrument.
382#[derive(Debug, Clone)]
383pub enum ExportKindSelector {
384    /// A selector that always returns [`ExportKind::Cumulative`].
385    Cumulative,
386    /// A selector that always returns [`ExportKind::Delta`].
387    Delta,
388    /// A selector that returns cumulative or delta based on a given instrument
389    /// kind.
390    Stateless,
391}
392
393impl ExportKind {
394    /// Tests whether `kind` includes a specific kind of exporter.
395    pub fn includes(&self, has: &ExportKind) -> bool {
396        (self.clone() as u32) & (has.clone() as u32) != 0
397    }
398
399    /// Returns whether an exporter of this kind requires memory to export correctly.
400    pub fn memory_required(&self, kind: &InstrumentKind) -> bool {
401        match kind {
402            InstrumentKind::ValueRecorder
403            | InstrumentKind::ValueObserver
404            | InstrumentKind::Counter
405            | InstrumentKind::UpDownCounter => {
406                // Delta-oriented instruments:
407                self.includes(&ExportKind::Cumulative)
408            }
409
410            InstrumentKind::SumObserver | InstrumentKind::UpDownSumObserver => {
411                // Cumulative-oriented instruments:
412                self.includes(&ExportKind::Delta)
413            }
414        }
415    }
416}
417
418impl ExportKindFor for ExportKindSelector {
419    fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind {
420        match self {
421            ExportKindSelector::Cumulative => ExportKind::Cumulative,
422            ExportKindSelector::Delta => ExportKind::Delta,
423            ExportKindSelector::Stateless => {
424                if descriptor.instrument_kind().precomputed_sum() {
425                    ExportKind::Cumulative
426                } else {
427                    ExportKind::Delta
428                }
429            }
430        }
431    }
432}