ts_opentelemetry_sdk/metrics/
instrument.rs

1use std::{any::Any, borrow::Cow, fmt, hash::Hash, marker, sync::Arc};
2
3use ts_opentelemetry_api::{
4    metrics::{
5        AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
6    },
7    Context, KeyValue,
8};
9
10use crate::{
11    attributes::AttributeSet,
12    instrumentation::Scope,
13    metrics::data::Temporality,
14    metrics::{aggregation::Aggregation, internal::Aggregator},
15};
16
17pub(crate) const EMPTY_AGG_MSG: &str = "no aggregators for observable instrument";
18
19/// The identifier of a group of instruments that all perform the same function.
20#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
21pub enum InstrumentKind {
22    /// Identifies a group of instruments that record increasing values synchronously
23    /// with the code path they are measuring.
24    Counter,
25    /// A group of instruments that record increasing and decreasing values
26    /// synchronously with the code path they are measuring.
27    UpDownCounter,
28    /// A group of instruments that record a distribution of values synchronously with
29    /// the code path they are measuring.
30    Histogram,
31    /// A group of instruments that record increasing values in an asynchronous
32    /// callback.
33    ObservableCounter,
34    /// A group of instruments that record increasing and decreasing values in an
35    /// asynchronous callback.
36    ObservableUpDownCounter,
37    /// a group of instruments that record current values in an asynchronous callback.
38    ObservableGauge,
39}
40
41/// Describes properties an instrument is created with, also used for filtering
42/// in [View](crate::metrics::View)s.
43///
44/// # Example
45///
46/// Instruments can be used as criteria for views.
47///
48/// ```
49/// use ts_opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
50///
51/// let criteria = Instrument::new().name("counter_*");
52/// let mask = Stream::new().aggregation(Aggregation::Sum);
53///
54/// let view = new_view(criteria, mask);
55/// # drop(view);
56/// ```
57#[derive(Clone, Default, Debug, PartialEq)]
58#[non_exhaustive]
59pub struct Instrument {
60    /// The human-readable identifier of the instrument.
61    pub name: Cow<'static, str>,
62    /// describes the purpose of the instrument.
63    pub description: Cow<'static, str>,
64    /// The functional group of the instrument.
65    pub kind: Option<InstrumentKind>,
66    /// Unit is the unit of measurement recorded by the instrument.
67    pub unit: Unit,
68    /// The instrumentation that created the instrument.
69    pub scope: Scope,
70}
71
72impl Instrument {
73    /// Create a new instrument with default values
74    pub fn new() -> Self {
75        Instrument::default()
76    }
77
78    /// Set the instrument name.
79    pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
80        self.name = name.into();
81        self
82    }
83
84    /// Set the instrument description.
85    pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
86        self.description = description.into();
87        self
88    }
89
90    /// Set the instrument unit.
91    pub fn unit(mut self, unit: Unit) -> Self {
92        self.unit = unit;
93        self
94    }
95
96    /// Set the instrument scope.
97    pub fn scope(mut self, scope: Scope) -> Self {
98        self.scope = scope;
99        self
100    }
101
102    /// empty returns if all fields of i are their default-value.
103    pub(crate) fn is_empty(&self) -> bool {
104        self.name == ""
105            && self.description == ""
106            && self.kind.is_none()
107            && self.unit.as_str() == ""
108            && self.scope == Scope::default()
109    }
110
111    pub(crate) fn matches(&self, other: &Instrument) -> bool {
112        self.matches_name(other)
113            && self.matches_description(other)
114            && self.matches_kind(other)
115            && self.matches_unit(other)
116            && self.matches_scope(other)
117    }
118
119    pub(crate) fn matches_name(&self, other: &Instrument) -> bool {
120        self.name.is_empty() || self.name.as_ref() == other.name.as_ref()
121    }
122
123    pub(crate) fn matches_description(&self, other: &Instrument) -> bool {
124        self.description.is_empty() || self.description.as_ref() == other.description.as_ref()
125    }
126
127    pub(crate) fn matches_kind(&self, other: &Instrument) -> bool {
128        self.kind.is_none() || self.kind == other.kind
129    }
130
131    pub(crate) fn matches_unit(&self, other: &Instrument) -> bool {
132        self.unit.as_str() == "" || self.unit == other.unit
133    }
134
135    pub(crate) fn matches_scope(&self, other: &Instrument) -> bool {
136        (self.scope.name.is_empty() || self.scope.name.as_ref() == other.scope.name.as_ref())
137            && (self.scope.version.is_none()
138                || self.scope.version.as_ref().map(AsRef::as_ref)
139                    == other.scope.version.as_ref().map(AsRef::as_ref))
140            && (self.scope.schema_url.is_none()
141                || self.scope.schema_url.as_ref().map(AsRef::as_ref)
142                    == other.scope.schema_url.as_ref().map(AsRef::as_ref))
143    }
144}
145
146/// Describes the stream of data an instrument produces.
147///
148/// # Example
149///
150/// Streams can be used as masks in views.
151///
152/// ```
153/// use ts_opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
154///
155/// let criteria = Instrument::new().name("counter_*");
156/// let mask = Stream::new().aggregation(Aggregation::Sum);
157///
158/// let view = new_view(criteria, mask);
159/// # drop(view);
160/// ```
161#[derive(Default)]
162#[non_exhaustive]
163pub struct Stream {
164    /// The human-readable identifier of the stream.
165    pub name: Cow<'static, str>,
166    /// Describes the purpose of the data.
167    pub description: Cow<'static, str>,
168    /// the unit of measurement recorded.
169    pub unit: Unit,
170    /// Aggregation the stream uses for an instrument.
171    pub aggregation: Option<Aggregation>,
172    /// applied to all attributes recorded for an instrument.
173    pub attribute_filter: Option<Filter>,
174}
175
176type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
177
178impl Stream {
179    /// Create a new stream with empty values.
180    pub fn new() -> Self {
181        Stream::default()
182    }
183
184    /// Set the stream name.
185    pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
186        self.name = name.into();
187        self
188    }
189
190    /// Set the stream description.
191    pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
192        self.description = description.into();
193        self
194    }
195
196    /// Set the stream unit.
197    pub fn unit(mut self, unit: Unit) -> Self {
198        self.unit = unit;
199        self
200    }
201
202    /// Set the stream aggregation.
203    pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
204        self.aggregation = Some(aggregation);
205        self
206    }
207
208    /// Set the stream attribute filter.
209    pub fn attribute_filter(
210        mut self,
211        filter: impl Fn(&KeyValue) -> bool + Send + Sync + 'static,
212    ) -> Self {
213        self.attribute_filter = Some(Arc::new(filter));
214        self
215    }
216}
217
218impl fmt::Debug for Stream {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        f.debug_struct("Stream")
221            .field("name", &self.name)
222            .field("description", &self.description)
223            .field("unit", &self.unit)
224            .field("aggregation", &self.aggregation)
225            .field("attribute_filter", &self.attribute_filter.is_some())
226            .finish()
227    }
228}
229
230/// the identifying properties of a stream.
231#[derive(Debug, PartialEq, Eq, Hash)]
232pub(crate) struct StreamId {
233    /// The human-readable identifier of the stream.
234    pub(crate) name: Cow<'static, str>,
235    /// Describes the purpose of the data.
236    pub(crate) description: Cow<'static, str>,
237    /// the unit of measurement recorded.
238    pub(crate) unit: Unit,
239    /// The stream uses for an instrument.
240    pub(crate) aggregation: String,
241    /// Monotonic is the monotonicity of an instruments data type. This field is
242    /// not used for all data types, so a zero value needs to be understood in the
243    /// context of Aggregation.
244    pub(crate) monotonic: bool,
245    /// Temporality is the temporality of a stream's data type. This field is
246    /// not used by some data types.
247    pub(crate) temporality: Option<Temporality>,
248    /// Number is the number type of the stream.
249    pub(crate) number: Cow<'static, str>,
250}
251
252pub(crate) struct InstrumentImpl<T> {
253    pub(crate) aggregators: Vec<Arc<dyn Aggregator<T>>>,
254}
255
256impl<T: Copy> SyncCounter<T> for InstrumentImpl<T> {
257    fn add(&self, _cx: &Context, val: T, attrs: &[KeyValue]) {
258        for agg in &self.aggregators {
259            agg.aggregate(val, AttributeSet::from(attrs))
260        }
261    }
262}
263
264impl<T: Copy> SyncUpDownCounter<T> for InstrumentImpl<T> {
265    fn add(&self, _cx: &Context, val: T, attrs: &[KeyValue]) {
266        for agg in &self.aggregators {
267            agg.aggregate(val, AttributeSet::from(attrs))
268        }
269    }
270}
271
272impl<T: Copy> SyncHistogram<T> for InstrumentImpl<T> {
273    fn record(&self, _cx: &Context, val: T, attrs: &[KeyValue]) {
274        for agg in &self.aggregators {
275            agg.aggregate(val, AttributeSet::from(attrs))
276        }
277    }
278}
279
280/// A comparable unique identifier of an observable.
281#[derive(Clone, Debug)]
282pub(crate) struct ObservableId<T> {
283    pub(crate) inner: IdInner,
284    _marker: marker::PhantomData<T>,
285}
286
287#[derive(Clone, Debug, Hash, PartialEq, Eq)]
288pub(crate) struct IdInner {
289    /// The human-readable identifier of the instrument.
290    pub(crate) name: Cow<'static, str>,
291    /// describes the purpose of the instrument.
292    pub(crate) description: Cow<'static, str>,
293    /// The functional group of the instrument.
294    kind: InstrumentKind,
295    /// The unit of measurement recorded by the instrument.
296    pub(crate) unit: Unit,
297    /// The instrumentation that created the instrument.
298    scope: Scope,
299}
300
301impl<T> Hash for ObservableId<T> {
302    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
303        self.inner.hash(state)
304    }
305}
306
307impl<T> PartialEq for ObservableId<T> {
308    fn eq(&self, other: &Self) -> bool {
309        self.inner == other.inner
310    }
311}
312
313impl<T> Eq for ObservableId<T> {}
314
315#[derive(Clone)]
316pub(crate) struct Observable<T> {
317    pub(crate) id: ObservableId<T>,
318    aggregators: Vec<Arc<dyn Aggregator<T>>>,
319}
320
321impl<T> Observable<T> {
322    pub(crate) fn new(
323        scope: Scope,
324        kind: InstrumentKind,
325        name: Cow<'static, str>,
326        description: Cow<'static, str>,
327        unit: Unit,
328        aggregators: Vec<Arc<dyn Aggregator<T>>>,
329    ) -> Self {
330        Self {
331            id: ObservableId {
332                inner: IdInner {
333                    name,
334                    description,
335                    kind,
336                    unit,
337                    scope,
338                },
339                _marker: marker::PhantomData,
340            },
341            aggregators,
342        }
343    }
344
345    /// Returns `Err` if the observable should not be registered, and `Ok` if it
346    /// should.
347    ///
348    /// An error is returned if this observable is effectively a no-op because it does not have
349    /// any aggregators. Also, an error is returned if scope defines a Meter other
350    /// than the observable it was created by.
351    pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
352        if self.aggregators.is_empty() {
353            return Err(MetricsError::Other(EMPTY_AGG_MSG.into()));
354        }
355        if &self.id.inner.scope != scope {
356            return Err(MetricsError::Other(format!(
357                "invalid registration: observable {} from Meter {:?}, registered with Meter {}",
358                self.id.inner.name, self.id.inner.scope, scope.name,
359            )));
360        }
361
362        Ok(())
363    }
364}
365
366impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
367    fn observe(&self, measurement: T, attrs: &[KeyValue]) {
368        for agg in &self.aggregators {
369            agg.aggregate(measurement, AttributeSet::from(attrs))
370        }
371    }
372
373    fn as_any(&self) -> Arc<dyn Any> {
374        Arc::new(self.clone())
375    }
376}