ts_opentelemetry_sdk/metrics/
instrument.rs1use std::{any::Any, borrow::Cow, fmt, hash::Hash, marker, sync::Arc};
2
3use ts_opentelemetry_api::{
4 metrics::{
5 AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
6 },
7 Context, KeyValue,
8};
9
10use crate::{
11 attributes::AttributeSet,
12 instrumentation::Scope,
13 metrics::data::Temporality,
14 metrics::{aggregation::Aggregation, internal::Aggregator},
15};
16
17pub(crate) const EMPTY_AGG_MSG: &str = "no aggregators for observable instrument";
18
19#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
21pub enum InstrumentKind {
22 Counter,
25 UpDownCounter,
28 Histogram,
31 ObservableCounter,
34 ObservableUpDownCounter,
37 ObservableGauge,
39}
40
41#[derive(Clone, Default, Debug, PartialEq)]
58#[non_exhaustive]
59pub struct Instrument {
60 pub name: Cow<'static, str>,
62 pub description: Cow<'static, str>,
64 pub kind: Option<InstrumentKind>,
66 pub unit: Unit,
68 pub scope: Scope,
70}
71
72impl Instrument {
73 pub fn new() -> Self {
75 Instrument::default()
76 }
77
78 pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
80 self.name = name.into();
81 self
82 }
83
84 pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
86 self.description = description.into();
87 self
88 }
89
90 pub fn unit(mut self, unit: Unit) -> Self {
92 self.unit = unit;
93 self
94 }
95
96 pub fn scope(mut self, scope: Scope) -> Self {
98 self.scope = scope;
99 self
100 }
101
102 pub(crate) fn is_empty(&self) -> bool {
104 self.name == ""
105 && self.description == ""
106 && self.kind.is_none()
107 && self.unit.as_str() == ""
108 && self.scope == Scope::default()
109 }
110
111 pub(crate) fn matches(&self, other: &Instrument) -> bool {
112 self.matches_name(other)
113 && self.matches_description(other)
114 && self.matches_kind(other)
115 && self.matches_unit(other)
116 && self.matches_scope(other)
117 }
118
119 pub(crate) fn matches_name(&self, other: &Instrument) -> bool {
120 self.name.is_empty() || self.name.as_ref() == other.name.as_ref()
121 }
122
123 pub(crate) fn matches_description(&self, other: &Instrument) -> bool {
124 self.description.is_empty() || self.description.as_ref() == other.description.as_ref()
125 }
126
127 pub(crate) fn matches_kind(&self, other: &Instrument) -> bool {
128 self.kind.is_none() || self.kind == other.kind
129 }
130
131 pub(crate) fn matches_unit(&self, other: &Instrument) -> bool {
132 self.unit.as_str() == "" || self.unit == other.unit
133 }
134
135 pub(crate) fn matches_scope(&self, other: &Instrument) -> bool {
136 (self.scope.name.is_empty() || self.scope.name.as_ref() == other.scope.name.as_ref())
137 && (self.scope.version.is_none()
138 || self.scope.version.as_ref().map(AsRef::as_ref)
139 == other.scope.version.as_ref().map(AsRef::as_ref))
140 && (self.scope.schema_url.is_none()
141 || self.scope.schema_url.as_ref().map(AsRef::as_ref)
142 == other.scope.schema_url.as_ref().map(AsRef::as_ref))
143 }
144}
145
146#[derive(Default)]
162#[non_exhaustive]
163pub struct Stream {
164 pub name: Cow<'static, str>,
166 pub description: Cow<'static, str>,
168 pub unit: Unit,
170 pub aggregation: Option<Aggregation>,
172 pub attribute_filter: Option<Filter>,
174}
175
176type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
177
178impl Stream {
179 pub fn new() -> Self {
181 Stream::default()
182 }
183
184 pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
186 self.name = name.into();
187 self
188 }
189
190 pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
192 self.description = description.into();
193 self
194 }
195
196 pub fn unit(mut self, unit: Unit) -> Self {
198 self.unit = unit;
199 self
200 }
201
202 pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
204 self.aggregation = Some(aggregation);
205 self
206 }
207
208 pub fn attribute_filter(
210 mut self,
211 filter: impl Fn(&KeyValue) -> bool + Send + Sync + 'static,
212 ) -> Self {
213 self.attribute_filter = Some(Arc::new(filter));
214 self
215 }
216}
217
218impl fmt::Debug for Stream {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 f.debug_struct("Stream")
221 .field("name", &self.name)
222 .field("description", &self.description)
223 .field("unit", &self.unit)
224 .field("aggregation", &self.aggregation)
225 .field("attribute_filter", &self.attribute_filter.is_some())
226 .finish()
227 }
228}
229
230#[derive(Debug, PartialEq, Eq, Hash)]
232pub(crate) struct StreamId {
233 pub(crate) name: Cow<'static, str>,
235 pub(crate) description: Cow<'static, str>,
237 pub(crate) unit: Unit,
239 pub(crate) aggregation: String,
241 pub(crate) monotonic: bool,
245 pub(crate) temporality: Option<Temporality>,
248 pub(crate) number: Cow<'static, str>,
250}
251
252pub(crate) struct InstrumentImpl<T> {
253 pub(crate) aggregators: Vec<Arc<dyn Aggregator<T>>>,
254}
255
256impl<T: Copy> SyncCounter<T> for InstrumentImpl<T> {
257 fn add(&self, _cx: &Context, val: T, attrs: &[KeyValue]) {
258 for agg in &self.aggregators {
259 agg.aggregate(val, AttributeSet::from(attrs))
260 }
261 }
262}
263
264impl<T: Copy> SyncUpDownCounter<T> for InstrumentImpl<T> {
265 fn add(&self, _cx: &Context, val: T, attrs: &[KeyValue]) {
266 for agg in &self.aggregators {
267 agg.aggregate(val, AttributeSet::from(attrs))
268 }
269 }
270}
271
272impl<T: Copy> SyncHistogram<T> for InstrumentImpl<T> {
273 fn record(&self, _cx: &Context, val: T, attrs: &[KeyValue]) {
274 for agg in &self.aggregators {
275 agg.aggregate(val, AttributeSet::from(attrs))
276 }
277 }
278}
279
280#[derive(Clone, Debug)]
282pub(crate) struct ObservableId<T> {
283 pub(crate) inner: IdInner,
284 _marker: marker::PhantomData<T>,
285}
286
287#[derive(Clone, Debug, Hash, PartialEq, Eq)]
288pub(crate) struct IdInner {
289 pub(crate) name: Cow<'static, str>,
291 pub(crate) description: Cow<'static, str>,
293 kind: InstrumentKind,
295 pub(crate) unit: Unit,
297 scope: Scope,
299}
300
301impl<T> Hash for ObservableId<T> {
302 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
303 self.inner.hash(state)
304 }
305}
306
307impl<T> PartialEq for ObservableId<T> {
308 fn eq(&self, other: &Self) -> bool {
309 self.inner == other.inner
310 }
311}
312
313impl<T> Eq for ObservableId<T> {}
314
315#[derive(Clone)]
316pub(crate) struct Observable<T> {
317 pub(crate) id: ObservableId<T>,
318 aggregators: Vec<Arc<dyn Aggregator<T>>>,
319}
320
321impl<T> Observable<T> {
322 pub(crate) fn new(
323 scope: Scope,
324 kind: InstrumentKind,
325 name: Cow<'static, str>,
326 description: Cow<'static, str>,
327 unit: Unit,
328 aggregators: Vec<Arc<dyn Aggregator<T>>>,
329 ) -> Self {
330 Self {
331 id: ObservableId {
332 inner: IdInner {
333 name,
334 description,
335 kind,
336 unit,
337 scope,
338 },
339 _marker: marker::PhantomData,
340 },
341 aggregators,
342 }
343 }
344
345 pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
352 if self.aggregators.is_empty() {
353 return Err(MetricsError::Other(EMPTY_AGG_MSG.into()));
354 }
355 if &self.id.inner.scope != scope {
356 return Err(MetricsError::Other(format!(
357 "invalid registration: observable {} from Meter {:?}, registered with Meter {}",
358 self.id.inner.name, self.id.inner.scope, scope.name,
359 )));
360 }
361
362 Ok(())
363 }
364}
365
366impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
367 fn observe(&self, measurement: T, attrs: &[KeyValue]) {
368 for agg in &self.aggregators {
369 agg.aggregate(measurement, AttributeSet::from(attrs))
370 }
371 }
372
373 fn as_any(&self) -> Arc<dyn Any> {
374 Arc::new(self.clone())
375 }
376}