influxive_otel_atomic_obs/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3//! Opentelemetry observable metric implementations based on std::sync::atomic
4//! types.
5//! Opentelemetry has a concept of "observable" metrics that are not reported
6//! as they are updated, but rather, when an update happens, they are polled.
7//! For ease-of-use in code, it is often desirable to have these metrics be
8//! backed by [std::sync::atomic] types, so that they can be easily updated
9//! throughout the code, and fetched whenever a metric reporting poll occurs.
10//! This crate provides the [MeterExt] trait and associated types to make
11//! it easy to use [std::sync::atomic] backed metrics with opentelemetry.
12//!
13//! ## Example
14//!
15//! ```
16//! use influxive_otel_atomic_obs::MeterExt;
17//!
18//! let (my_metric, _) = opentelemetry_api::global::meter("my_meter")
19//!     .u64_observable_gauge_atomic("my_metric", 0)
20//!     .init();
21//!
22//! my_metric.set(66); // probably will not be reported
23//! my_metric.set(99); // probably will not be reported
24//! my_metric.set(42); // will be reported next time reporting runs
25//! ```
26
27use opentelemetry_api::metrics::*;
28use std::borrow::Cow;
29use std::sync::atomic::*;
30use std::sync::Arc;
31
32#[inline(always)]
33fn f64_to_u64(v: f64) -> u64 {
34    u64::from_le_bytes(v.to_le_bytes())
35}
36
37#[inline(always)]
38fn u64_to_f64(v: u64) -> f64 {
39    f64::from_le_bytes(v.to_le_bytes())
40}
41
42/// Metric builder.
43pub struct AtomicObservableInstrumentBuilder<'a, C, I, M>
44where
45    I: AsyncInstrument<M>,
46{
47    meter: &'a Meter,
48    builder: AsyncInstrumentBuilder<'a, I, M>,
49    rcbi: Rcbi<C, I>,
50}
51
52impl<'a, C, I, M> AtomicObservableInstrumentBuilder<'a, C, I, M>
53where
54    I: TryFrom<AsyncInstrumentBuilder<'a, I, M>, Error = MetricsError>,
55    I: AsyncInstrument<M>,
56    I: Clone,
57{
58    /// Set a description.
59    pub fn with_description(
60        self,
61        description: impl Into<Cow<'static, str>>,
62    ) -> Self {
63        let Self {
64            meter,
65            builder,
66            rcbi,
67        } = self;
68        Self {
69            meter,
70            builder: builder.with_description(description),
71            rcbi,
72        }
73    }
74
75    /// Set a unit.
76    pub fn with_unit(self, unit: Unit) -> Self {
77        let Self {
78            meter,
79            builder,
80            rcbi,
81        } = self;
82        Self {
83            meter,
84            builder: builder.with_unit(unit),
85            rcbi,
86        }
87    }
88
89    /// Initialize the metric.
90    pub fn try_init(self) -> Result<(C, I)> {
91        let Self {
92            meter,
93            builder,
94            rcbi,
95        } = self;
96        let instrument = builder.try_init()?;
97        let core = rcbi(instrument.clone(), meter)?;
98        Ok((core, instrument))
99    }
100
101    /// Initialize the metric.
102    pub fn init(self) -> (C, I) {
103        let Self {
104            meter,
105            builder,
106            rcbi,
107        } = self;
108        let instrument = builder.init();
109        let core = rcbi(instrument.clone(), meter)
110            .expect("failed to register callback");
111        (core, instrument)
112    }
113}
114
115type Rcbi<C, I> =
116    Box<dyn FnOnce(I, &Meter) -> Result<C> + 'static + Send + Sync>;
117
118struct Unreg(Option<Box<dyn CallbackRegistration>>);
119
120impl std::fmt::Debug for Unreg {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_tuple("Unreg").finish()
123    }
124}
125
126impl Drop for Unreg {
127    fn drop(&mut self) {
128        if let Some(mut r) = self.0.take() {
129            let _ = r.unregister();
130        }
131    }
132}
133
134/// Observable counter based on std::sync::atomic::AtomicU64
135/// (but storing f64 bits).
136#[derive(Debug, Clone)]
137pub struct AtomicObservableCounterF64(
138    Arc<AtomicU64>,
139    #[allow(unused)] Arc<Unreg>,
140);
141
142impl AtomicObservableCounterF64 {
143    /// Construct a new AtomicObservableCounterF64, and associated
144    /// opentelemetry metric.
145    /// Note: If you would like any attributes applied to the metric reporting,
146    /// please set them with the versioned_meter api before passing the meter
147    /// into this constructor.
148    pub fn new(
149        meter: &Meter,
150        name: impl Into<std::borrow::Cow<'static, str>>,
151        initial_value: f64,
152    ) -> AtomicObservableInstrumentBuilder<
153        '_,
154        AtomicObservableCounterF64,
155        ObservableCounter<f64>,
156        f64,
157    > {
158        let data = Arc::new(AtomicU64::new(f64_to_u64(initial_value)));
159        let weak = Arc::downgrade(&data);
160        let rcbi = Box::new(
161            move |instrument: ObservableCounter<f64>, meter: &Meter| {
162                let unreg = meter.register_callback(
163                    &[instrument.as_any()],
164                    move |obs| {
165                        if let Some(data) = weak.upgrade() {
166                            obs.observe_f64(
167                                &instrument,
168                                u64_to_f64(data.load(Ordering::SeqCst)),
169                                &[],
170                            );
171                        }
172                    },
173                )?;
174                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
175            },
176        );
177
178        AtomicObservableInstrumentBuilder {
179            meter,
180            builder: meter.f64_observable_counter(name),
181            rcbi,
182        }
183    }
184
185    /// Add to the current value of the up down counter.
186    /// a negative value is a no-op.
187    pub fn add(&self, value: f64) {
188        if value <= 0.0 {
189            return;
190        }
191
192        // note: we don't care about the ABA problem,
193        // because it will still end up with the same correct value.
194        let _ = self
195            .0
196            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
197                Some(f64_to_u64(u64_to_f64(v) + value))
198            });
199    }
200
201    /// Get the current value of the up down counter.
202    pub fn get(&self) -> f64 {
203        u64_to_f64(self.0.load(Ordering::SeqCst))
204    }
205}
206
207/// Observable up down counter based on std::sync::atomic::AtomicU64
208/// (but storing f64 bits).
209#[derive(Debug, Clone)]
210pub struct AtomicObservableUpDownCounterF64(
211    Arc<AtomicU64>,
212    #[allow(unused)] Arc<Unreg>,
213);
214
215impl AtomicObservableUpDownCounterF64 {
216    /// Construct a new AtomicObservableUpDownCounterF64,
217    /// and associated opentelemetry metric.
218    /// Note: If you would like any attributes applied to the metric reporting,
219    /// please set them with the versioned_meter api before passing the meter
220    /// into this constructor.
221    pub fn new(
222        meter: &Meter,
223        name: impl Into<std::borrow::Cow<'static, str>>,
224        initial_value: f64,
225    ) -> AtomicObservableInstrumentBuilder<
226        '_,
227        AtomicObservableUpDownCounterF64,
228        ObservableUpDownCounter<f64>,
229        f64,
230    > {
231        let data = Arc::new(AtomicU64::new(f64_to_u64(initial_value)));
232        let weak = Arc::downgrade(&data);
233        let rcbi = Box::new(
234            move |instrument: ObservableUpDownCounter<f64>, meter: &Meter| {
235                let unreg = meter.register_callback(
236                    &[instrument.as_any()],
237                    move |obs| {
238                        if let Some(data) = weak.upgrade() {
239                            obs.observe_f64(
240                                &instrument,
241                                u64_to_f64(data.load(Ordering::SeqCst)),
242                                &[],
243                            );
244                        }
245                    },
246                )?;
247                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
248            },
249        );
250
251        AtomicObservableInstrumentBuilder {
252            meter,
253            builder: meter.f64_observable_up_down_counter(name),
254            rcbi,
255        }
256    }
257
258    /// Add to (or subtract from) the current value of the up down counter.
259    pub fn add(&self, value: f64) {
260        // note: we don't care about the ABA problem,
261        // because it will still end up with the same correct value.
262        let _ = self
263            .0
264            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
265                Some(f64_to_u64(u64_to_f64(v) + value))
266            });
267    }
268
269    /// Get the current value of the up down counter.
270    pub fn get(&self) -> f64 {
271        u64_to_f64(self.0.load(Ordering::SeqCst))
272    }
273}
274
275/// Observable gauge based on std::sync::atomic::AtomicU64
276/// (but storing f64 bits).
277#[derive(Debug, Clone)]
278pub struct AtomicObservableGaugeF64(
279    Arc<AtomicU64>,
280    #[allow(unused)] Arc<Unreg>,
281);
282
283impl AtomicObservableGaugeF64 {
284    /// Construct a new AtomicObservableGaugeF64, and associated opentelemetry metric.
285    /// Note: If you would like any attributes applied to the metric reporting,
286    /// please set them with the versioned_meter api before passing the meter
287    /// into this constructor.
288    pub fn new(
289        meter: &Meter,
290        name: impl Into<std::borrow::Cow<'static, str>>,
291        initial_value: f64,
292    ) -> AtomicObservableInstrumentBuilder<
293        '_,
294        AtomicObservableGaugeF64,
295        ObservableGauge<f64>,
296        f64,
297    > {
298        let data = Arc::new(AtomicU64::new(f64_to_u64(initial_value)));
299        let weak = Arc::downgrade(&data);
300        let rcbi =
301            Box::new(move |instrument: ObservableGauge<f64>, meter: &Meter| {
302                let unreg = meter.register_callback(
303                    &[instrument.as_any()],
304                    move |obs| {
305                        if let Some(data) = weak.upgrade() {
306                            obs.observe_f64(
307                                &instrument,
308                                u64_to_f64(data.load(Ordering::SeqCst)),
309                                &[],
310                            );
311                        }
312                    },
313                )?;
314                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
315            });
316
317        AtomicObservableInstrumentBuilder {
318            meter,
319            builder: meter.f64_observable_gauge(name),
320            rcbi,
321        }
322    }
323
324    /// Set the current value of the gauge.
325    pub fn set(&self, value: f64) {
326        self.0.store(f64_to_u64(value), Ordering::SeqCst);
327    }
328
329    /// Get the current value of the gauge.
330    pub fn get(&self) -> f64 {
331        u64_to_f64(self.0.load(Ordering::SeqCst))
332    }
333}
334
335/// Observable gauge based on std::sync::atomic::AtomicI64.
336#[derive(Debug, Clone)]
337pub struct AtomicObservableGaugeI64(
338    Arc<AtomicI64>,
339    #[allow(unused)] Arc<Unreg>,
340);
341
342impl AtomicObservableGaugeI64 {
343    /// Construct a new ObsGaugeAtomicI64, and associated opentelemetry metric.
344    /// Note: If you would like any attributes applied to the metric reporting,
345    /// please set them with the versioned_meter api before passing the meter
346    /// into this constructor.
347    pub fn new(
348        meter: &Meter,
349        name: impl Into<std::borrow::Cow<'static, str>>,
350        initial_value: i64,
351    ) -> AtomicObservableInstrumentBuilder<
352        '_,
353        AtomicObservableGaugeI64,
354        ObservableGauge<i64>,
355        i64,
356    > {
357        let data = Arc::new(AtomicI64::new(initial_value));
358        let weak = Arc::downgrade(&data);
359        let rcbi =
360            Box::new(move |instrument: ObservableGauge<i64>, meter: &Meter| {
361                let unreg = meter.register_callback(
362                    &[instrument.as_any()],
363                    move |obs| {
364                        if let Some(data) = weak.upgrade() {
365                            obs.observe_i64(
366                                &instrument,
367                                data.load(Ordering::SeqCst),
368                                &[],
369                            );
370                        }
371                    },
372                )?;
373                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
374            });
375
376        AtomicObservableInstrumentBuilder {
377            meter,
378            builder: meter.i64_observable_gauge(name),
379            rcbi,
380        }
381    }
382
383    /// Set the current value of the gauge.
384    pub fn set(&self, value: i64) {
385        self.0.store(value, Ordering::SeqCst);
386    }
387
388    /// Get the current value of the gauge.
389    pub fn get(&self) -> i64 {
390        self.0.load(Ordering::SeqCst)
391    }
392}
393
394/// Observable up down counter based on std::sync::atomic::AtomicI64.
395#[derive(Debug, Clone)]
396pub struct AtomicObservableUpDownCounterI64(
397    Arc<AtomicI64>,
398    #[allow(unused)] Arc<Unreg>,
399);
400
401impl AtomicObservableUpDownCounterI64 {
402    /// Construct a new AtomicObservableUpDownCounterI64,
403    /// and associated opentelemetry metric.
404    /// Note: If you would like any attributes applied to the metric reporting,
405    /// please set them with the versioned_meter api before passing the meter
406    /// into this constructor.
407    pub fn new(
408        meter: &Meter,
409        name: impl Into<std::borrow::Cow<'static, str>>,
410        initial_value: i64,
411    ) -> AtomicObservableInstrumentBuilder<
412        '_,
413        AtomicObservableUpDownCounterI64,
414        ObservableUpDownCounter<i64>,
415        i64,
416    > {
417        let data = Arc::new(AtomicI64::new(initial_value));
418        let weak = Arc::downgrade(&data);
419        let rcbi = Box::new(
420            move |instrument: ObservableUpDownCounter<i64>, meter: &Meter| {
421                let unreg = meter.register_callback(
422                    &[instrument.as_any()],
423                    move |obs| {
424                        if let Some(data) = weak.upgrade() {
425                            obs.observe_i64(
426                                &instrument,
427                                data.load(Ordering::SeqCst),
428                                &[],
429                            );
430                        }
431                    },
432                )?;
433                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
434            },
435        );
436
437        AtomicObservableInstrumentBuilder {
438            meter,
439            builder: meter.i64_observable_up_down_counter(name),
440            rcbi,
441        }
442    }
443
444    /// Add to (or subtract from) the current value of the gauge.
445    pub fn add(&self, value: i64) {
446        self.0.fetch_add(value, Ordering::SeqCst);
447    }
448
449    /// Get the current value of the gauge.
450    pub fn get(&self) -> i64 {
451        self.0.load(Ordering::SeqCst)
452    }
453}
454
455/// Observable counter based on std::sync::atomic::AtomicU64.
456#[derive(Debug, Clone)]
457pub struct AtomicObservableCounterU64(
458    Arc<AtomicU64>,
459    #[allow(unused)] Arc<Unreg>,
460);
461
462impl AtomicObservableCounterU64 {
463    /// Construct a new AtomicObservableCounterU64,
464    /// and associated opentelemetry metric.
465    /// Note: If you would like any attributes applied to the metric reporting,
466    /// please set them with the versioned_meter api before passing the meter
467    /// into this constructor.
468    pub fn new(
469        meter: &Meter,
470        name: impl Into<std::borrow::Cow<'static, str>>,
471        initial_value: u64,
472    ) -> AtomicObservableInstrumentBuilder<
473        '_,
474        AtomicObservableCounterU64,
475        ObservableCounter<u64>,
476        u64,
477    > {
478        let data = Arc::new(AtomicU64::new(initial_value));
479        let weak = Arc::downgrade(&data);
480        let rcbi = Box::new(
481            move |instrument: ObservableCounter<u64>, meter: &Meter| {
482                let unreg = meter.register_callback(
483                    &[instrument.as_any()],
484                    move |obs| {
485                        if let Some(data) = weak.upgrade() {
486                            obs.observe_u64(
487                                &instrument,
488                                data.load(Ordering::SeqCst),
489                                &[],
490                            );
491                        }
492                    },
493                )?;
494                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
495            },
496        );
497
498        AtomicObservableInstrumentBuilder {
499            meter,
500            builder: meter.u64_observable_counter(name),
501            rcbi,
502        }
503    }
504
505    /// Add to the current value of the gauge.
506    pub fn add(&self, value: u64) {
507        self.0.fetch_add(value, Ordering::SeqCst);
508    }
509
510    /// Get the current value of the gauge.
511    pub fn get(&self) -> u64 {
512        self.0.load(Ordering::SeqCst)
513    }
514}
515
516/// Observable gauge based on std::sync::atomic::AtomicU64.
517#[derive(Debug, Clone)]
518pub struct AtomicObservableGaugeU64(
519    Arc<AtomicU64>,
520    #[allow(unused)] Arc<Unreg>,
521);
522
523impl AtomicObservableGaugeU64 {
524    /// Construct a new AtomicObservableGaugeU64,
525    /// and associated opentelemetry metric.
526    /// Note: If you would like any attributes applied to the metric reporting,
527    /// please set them with the versioned_meter api before passing the meter
528    /// into this constructor.
529    pub fn new(
530        meter: &Meter,
531        name: impl Into<std::borrow::Cow<'static, str>>,
532        initial_value: u64,
533    ) -> AtomicObservableInstrumentBuilder<
534        '_,
535        AtomicObservableGaugeU64,
536        ObservableGauge<u64>,
537        u64,
538    > {
539        let data = Arc::new(AtomicU64::new(initial_value));
540        let weak = Arc::downgrade(&data);
541        let rcbi =
542            Box::new(move |instrument: ObservableGauge<u64>, meter: &Meter| {
543                let unreg = meter.register_callback(
544                    &[instrument.as_any()],
545                    move |obs| {
546                        if let Some(data) = weak.upgrade() {
547                            obs.observe_u64(
548                                &instrument,
549                                data.load(Ordering::SeqCst),
550                                &[],
551                            );
552                        }
553                    },
554                )?;
555                Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
556            });
557
558        AtomicObservableInstrumentBuilder {
559            meter,
560            builder: meter.u64_observable_gauge(name),
561            rcbi,
562        }
563    }
564
565    /// Set the current value of the gauge.
566    pub fn set(&self, value: u64) {
567        self.0.store(value, Ordering::SeqCst);
568    }
569
570    /// Get the current value of the gauge.
571    pub fn get(&self) -> u64 {
572        self.0.load(Ordering::SeqCst)
573    }
574}
575
576/// Extension trait for Meter
577pub trait MeterExt {
578    /// Get an observable f64 up down counter backed by a
579    /// std::atomic::AtomicU64.
580    fn f64_observable_counter_atomic(
581        &self,
582        name: impl Into<Cow<'static, str>>,
583        initial_value: f64,
584    ) -> AtomicObservableInstrumentBuilder<
585        '_,
586        AtomicObservableCounterF64,
587        ObservableCounter<f64>,
588        f64,
589    >;
590
591    /// Get an observable f64 gauge backed by a std::atomic::AtomicU64.
592    fn f64_observable_gauge_atomic(
593        &self,
594        name: impl Into<Cow<'static, str>>,
595        initial_value: f64,
596    ) -> AtomicObservableInstrumentBuilder<
597        '_,
598        AtomicObservableGaugeF64,
599        ObservableGauge<f64>,
600        f64,
601    >;
602
603    /// Get an observable f64 up down counter backed by a
604    /// std::atomic::AtomicU64.
605    fn f64_observable_up_down_counter_atomic(
606        &self,
607        name: impl Into<Cow<'static, str>>,
608        initial_value: f64,
609    ) -> AtomicObservableInstrumentBuilder<
610        '_,
611        AtomicObservableUpDownCounterF64,
612        ObservableUpDownCounter<f64>,
613        f64,
614    >;
615
616    /// Get an observable i64 gauge backed by a std::atomic::AtomicI64.
617    fn i64_observable_gauge_atomic(
618        &self,
619        name: impl Into<Cow<'static, str>>,
620        initial_value: i64,
621    ) -> AtomicObservableInstrumentBuilder<
622        '_,
623        AtomicObservableGaugeI64,
624        ObservableGauge<i64>,
625        i64,
626    >;
627
628    /// Get an observable i64 up down counter backed by a std::atomic::AtomicI64.
629    fn i64_observable_up_down_counter_atomic(
630        &self,
631        name: impl Into<Cow<'static, str>>,
632        initial_value: i64,
633    ) -> AtomicObservableInstrumentBuilder<
634        '_,
635        AtomicObservableUpDownCounterI64,
636        ObservableUpDownCounter<i64>,
637        i64,
638    >;
639
640    /// Get an observable u64 counter backed by a std::atomic::AtomicU64.
641    fn u64_observable_counter_atomic(
642        &self,
643        name: impl Into<Cow<'static, str>>,
644        initial_value: u64,
645    ) -> AtomicObservableInstrumentBuilder<
646        '_,
647        AtomicObservableCounterU64,
648        ObservableCounter<u64>,
649        u64,
650    >;
651
652    /// Get an observable u64 gauge backed by a std::atomic::AtomicU64.
653    fn u64_observable_gauge_atomic(
654        &self,
655        name: impl Into<Cow<'static, str>>,
656        initial_value: u64,
657    ) -> AtomicObservableInstrumentBuilder<
658        '_,
659        AtomicObservableGaugeU64,
660        ObservableGauge<u64>,
661        u64,
662    >;
663}
664
665impl MeterExt for Meter {
666    fn f64_observable_counter_atomic(
667        &self,
668        name: impl Into<Cow<'static, str>>,
669        initial_value: f64,
670    ) -> AtomicObservableInstrumentBuilder<
671        '_,
672        AtomicObservableCounterF64,
673        ObservableCounter<f64>,
674        f64,
675    > {
676        AtomicObservableCounterF64::new(self, name, initial_value)
677    }
678
679    fn f64_observable_gauge_atomic(
680        &self,
681        name: impl Into<Cow<'static, str>>,
682        initial_value: f64,
683    ) -> AtomicObservableInstrumentBuilder<
684        '_,
685        AtomicObservableGaugeF64,
686        ObservableGauge<f64>,
687        f64,
688    > {
689        AtomicObservableGaugeF64::new(self, name, initial_value)
690    }
691
692    fn f64_observable_up_down_counter_atomic(
693        &self,
694        name: impl Into<Cow<'static, str>>,
695        initial_value: f64,
696    ) -> AtomicObservableInstrumentBuilder<
697        '_,
698        AtomicObservableUpDownCounterF64,
699        ObservableUpDownCounter<f64>,
700        f64,
701    > {
702        AtomicObservableUpDownCounterF64::new(self, name, initial_value)
703    }
704
705    fn i64_observable_gauge_atomic(
706        &self,
707        name: impl Into<Cow<'static, str>>,
708        initial_value: i64,
709    ) -> AtomicObservableInstrumentBuilder<
710        '_,
711        AtomicObservableGaugeI64,
712        ObservableGauge<i64>,
713        i64,
714    > {
715        AtomicObservableGaugeI64::new(self, name, initial_value)
716    }
717
718    fn i64_observable_up_down_counter_atomic(
719        &self,
720        name: impl Into<Cow<'static, str>>,
721        initial_value: i64,
722    ) -> AtomicObservableInstrumentBuilder<
723        '_,
724        AtomicObservableUpDownCounterI64,
725        ObservableUpDownCounter<i64>,
726        i64,
727    > {
728        AtomicObservableUpDownCounterI64::new(self, name, initial_value)
729    }
730
731    fn u64_observable_counter_atomic(
732        &self,
733        name: impl Into<Cow<'static, str>>,
734        initial_value: u64,
735    ) -> AtomicObservableInstrumentBuilder<
736        '_,
737        AtomicObservableCounterU64,
738        ObservableCounter<u64>,
739        u64,
740    > {
741        AtomicObservableCounterU64::new(self, name, initial_value)
742    }
743
744    fn u64_observable_gauge_atomic(
745        &self,
746        name: impl Into<Cow<'static, str>>,
747        initial_value: u64,
748    ) -> AtomicObservableInstrumentBuilder<
749        '_,
750        AtomicObservableGaugeU64,
751        ObservableGauge<u64>,
752        u64,
753    > {
754        AtomicObservableGaugeU64::new(self, name, initial_value)
755    }
756}