1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3use opentelemetry_api::metrics::*;
28use std::borrow::Cow;
29use std::sync::atomic::*;
30use std::sync::Arc;
31
32#[inline(always)]
33fn f64_to_u64(v: f64) -> u64 {
34 u64::from_le_bytes(v.to_le_bytes())
35}
36
37#[inline(always)]
38fn u64_to_f64(v: u64) -> f64 {
39 f64::from_le_bytes(v.to_le_bytes())
40}
41
42pub struct AtomicObservableInstrumentBuilder<'a, C, I, M>
44where
45 I: AsyncInstrument<M>,
46{
47 meter: &'a Meter,
48 builder: AsyncInstrumentBuilder<'a, I, M>,
49 rcbi: Rcbi<C, I>,
50}
51
52impl<'a, C, I, M> AtomicObservableInstrumentBuilder<'a, C, I, M>
53where
54 I: TryFrom<AsyncInstrumentBuilder<'a, I, M>, Error = MetricsError>,
55 I: AsyncInstrument<M>,
56 I: Clone,
57{
58 pub fn with_description(
60 self,
61 description: impl Into<Cow<'static, str>>,
62 ) -> Self {
63 let Self {
64 meter,
65 builder,
66 rcbi,
67 } = self;
68 Self {
69 meter,
70 builder: builder.with_description(description),
71 rcbi,
72 }
73 }
74
75 pub fn with_unit(self, unit: Unit) -> Self {
77 let Self {
78 meter,
79 builder,
80 rcbi,
81 } = self;
82 Self {
83 meter,
84 builder: builder.with_unit(unit),
85 rcbi,
86 }
87 }
88
89 pub fn try_init(self) -> Result<(C, I)> {
91 let Self {
92 meter,
93 builder,
94 rcbi,
95 } = self;
96 let instrument = builder.try_init()?;
97 let core = rcbi(instrument.clone(), meter)?;
98 Ok((core, instrument))
99 }
100
101 pub fn init(self) -> (C, I) {
103 let Self {
104 meter,
105 builder,
106 rcbi,
107 } = self;
108 let instrument = builder.init();
109 let core = rcbi(instrument.clone(), meter)
110 .expect("failed to register callback");
111 (core, instrument)
112 }
113}
114
115type Rcbi<C, I> =
116 Box<dyn FnOnce(I, &Meter) -> Result<C> + 'static + Send + Sync>;
117
118struct Unreg(Option<Box<dyn CallbackRegistration>>);
119
120impl std::fmt::Debug for Unreg {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_tuple("Unreg").finish()
123 }
124}
125
126impl Drop for Unreg {
127 fn drop(&mut self) {
128 if let Some(mut r) = self.0.take() {
129 let _ = r.unregister();
130 }
131 }
132}
133
134#[derive(Debug, Clone)]
137pub struct AtomicObservableCounterF64(
138 Arc<AtomicU64>,
139 #[allow(unused)] Arc<Unreg>,
140);
141
142impl AtomicObservableCounterF64 {
143 pub fn new(
149 meter: &Meter,
150 name: impl Into<std::borrow::Cow<'static, str>>,
151 initial_value: f64,
152 ) -> AtomicObservableInstrumentBuilder<
153 '_,
154 AtomicObservableCounterF64,
155 ObservableCounter<f64>,
156 f64,
157 > {
158 let data = Arc::new(AtomicU64::new(f64_to_u64(initial_value)));
159 let weak = Arc::downgrade(&data);
160 let rcbi = Box::new(
161 move |instrument: ObservableCounter<f64>, meter: &Meter| {
162 let unreg = meter.register_callback(
163 &[instrument.as_any()],
164 move |obs| {
165 if let Some(data) = weak.upgrade() {
166 obs.observe_f64(
167 &instrument,
168 u64_to_f64(data.load(Ordering::SeqCst)),
169 &[],
170 );
171 }
172 },
173 )?;
174 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
175 },
176 );
177
178 AtomicObservableInstrumentBuilder {
179 meter,
180 builder: meter.f64_observable_counter(name),
181 rcbi,
182 }
183 }
184
185 pub fn add(&self, value: f64) {
188 if value <= 0.0 {
189 return;
190 }
191
192 let _ = self
195 .0
196 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
197 Some(f64_to_u64(u64_to_f64(v) + value))
198 });
199 }
200
201 pub fn get(&self) -> f64 {
203 u64_to_f64(self.0.load(Ordering::SeqCst))
204 }
205}
206
207#[derive(Debug, Clone)]
210pub struct AtomicObservableUpDownCounterF64(
211 Arc<AtomicU64>,
212 #[allow(unused)] Arc<Unreg>,
213);
214
215impl AtomicObservableUpDownCounterF64 {
216 pub fn new(
222 meter: &Meter,
223 name: impl Into<std::borrow::Cow<'static, str>>,
224 initial_value: f64,
225 ) -> AtomicObservableInstrumentBuilder<
226 '_,
227 AtomicObservableUpDownCounterF64,
228 ObservableUpDownCounter<f64>,
229 f64,
230 > {
231 let data = Arc::new(AtomicU64::new(f64_to_u64(initial_value)));
232 let weak = Arc::downgrade(&data);
233 let rcbi = Box::new(
234 move |instrument: ObservableUpDownCounter<f64>, meter: &Meter| {
235 let unreg = meter.register_callback(
236 &[instrument.as_any()],
237 move |obs| {
238 if let Some(data) = weak.upgrade() {
239 obs.observe_f64(
240 &instrument,
241 u64_to_f64(data.load(Ordering::SeqCst)),
242 &[],
243 );
244 }
245 },
246 )?;
247 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
248 },
249 );
250
251 AtomicObservableInstrumentBuilder {
252 meter,
253 builder: meter.f64_observable_up_down_counter(name),
254 rcbi,
255 }
256 }
257
258 pub fn add(&self, value: f64) {
260 let _ = self
263 .0
264 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
265 Some(f64_to_u64(u64_to_f64(v) + value))
266 });
267 }
268
269 pub fn get(&self) -> f64 {
271 u64_to_f64(self.0.load(Ordering::SeqCst))
272 }
273}
274
275#[derive(Debug, Clone)]
278pub struct AtomicObservableGaugeF64(
279 Arc<AtomicU64>,
280 #[allow(unused)] Arc<Unreg>,
281);
282
283impl AtomicObservableGaugeF64 {
284 pub fn new(
289 meter: &Meter,
290 name: impl Into<std::borrow::Cow<'static, str>>,
291 initial_value: f64,
292 ) -> AtomicObservableInstrumentBuilder<
293 '_,
294 AtomicObservableGaugeF64,
295 ObservableGauge<f64>,
296 f64,
297 > {
298 let data = Arc::new(AtomicU64::new(f64_to_u64(initial_value)));
299 let weak = Arc::downgrade(&data);
300 let rcbi =
301 Box::new(move |instrument: ObservableGauge<f64>, meter: &Meter| {
302 let unreg = meter.register_callback(
303 &[instrument.as_any()],
304 move |obs| {
305 if let Some(data) = weak.upgrade() {
306 obs.observe_f64(
307 &instrument,
308 u64_to_f64(data.load(Ordering::SeqCst)),
309 &[],
310 );
311 }
312 },
313 )?;
314 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
315 });
316
317 AtomicObservableInstrumentBuilder {
318 meter,
319 builder: meter.f64_observable_gauge(name),
320 rcbi,
321 }
322 }
323
324 pub fn set(&self, value: f64) {
326 self.0.store(f64_to_u64(value), Ordering::SeqCst);
327 }
328
329 pub fn get(&self) -> f64 {
331 u64_to_f64(self.0.load(Ordering::SeqCst))
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct AtomicObservableGaugeI64(
338 Arc<AtomicI64>,
339 #[allow(unused)] Arc<Unreg>,
340);
341
342impl AtomicObservableGaugeI64 {
343 pub fn new(
348 meter: &Meter,
349 name: impl Into<std::borrow::Cow<'static, str>>,
350 initial_value: i64,
351 ) -> AtomicObservableInstrumentBuilder<
352 '_,
353 AtomicObservableGaugeI64,
354 ObservableGauge<i64>,
355 i64,
356 > {
357 let data = Arc::new(AtomicI64::new(initial_value));
358 let weak = Arc::downgrade(&data);
359 let rcbi =
360 Box::new(move |instrument: ObservableGauge<i64>, meter: &Meter| {
361 let unreg = meter.register_callback(
362 &[instrument.as_any()],
363 move |obs| {
364 if let Some(data) = weak.upgrade() {
365 obs.observe_i64(
366 &instrument,
367 data.load(Ordering::SeqCst),
368 &[],
369 );
370 }
371 },
372 )?;
373 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
374 });
375
376 AtomicObservableInstrumentBuilder {
377 meter,
378 builder: meter.i64_observable_gauge(name),
379 rcbi,
380 }
381 }
382
383 pub fn set(&self, value: i64) {
385 self.0.store(value, Ordering::SeqCst);
386 }
387
388 pub fn get(&self) -> i64 {
390 self.0.load(Ordering::SeqCst)
391 }
392}
393
394#[derive(Debug, Clone)]
396pub struct AtomicObservableUpDownCounterI64(
397 Arc<AtomicI64>,
398 #[allow(unused)] Arc<Unreg>,
399);
400
401impl AtomicObservableUpDownCounterI64 {
402 pub fn new(
408 meter: &Meter,
409 name: impl Into<std::borrow::Cow<'static, str>>,
410 initial_value: i64,
411 ) -> AtomicObservableInstrumentBuilder<
412 '_,
413 AtomicObservableUpDownCounterI64,
414 ObservableUpDownCounter<i64>,
415 i64,
416 > {
417 let data = Arc::new(AtomicI64::new(initial_value));
418 let weak = Arc::downgrade(&data);
419 let rcbi = Box::new(
420 move |instrument: ObservableUpDownCounter<i64>, meter: &Meter| {
421 let unreg = meter.register_callback(
422 &[instrument.as_any()],
423 move |obs| {
424 if let Some(data) = weak.upgrade() {
425 obs.observe_i64(
426 &instrument,
427 data.load(Ordering::SeqCst),
428 &[],
429 );
430 }
431 },
432 )?;
433 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
434 },
435 );
436
437 AtomicObservableInstrumentBuilder {
438 meter,
439 builder: meter.i64_observable_up_down_counter(name),
440 rcbi,
441 }
442 }
443
444 pub fn add(&self, value: i64) {
446 self.0.fetch_add(value, Ordering::SeqCst);
447 }
448
449 pub fn get(&self) -> i64 {
451 self.0.load(Ordering::SeqCst)
452 }
453}
454
455#[derive(Debug, Clone)]
457pub struct AtomicObservableCounterU64(
458 Arc<AtomicU64>,
459 #[allow(unused)] Arc<Unreg>,
460);
461
462impl AtomicObservableCounterU64 {
463 pub fn new(
469 meter: &Meter,
470 name: impl Into<std::borrow::Cow<'static, str>>,
471 initial_value: u64,
472 ) -> AtomicObservableInstrumentBuilder<
473 '_,
474 AtomicObservableCounterU64,
475 ObservableCounter<u64>,
476 u64,
477 > {
478 let data = Arc::new(AtomicU64::new(initial_value));
479 let weak = Arc::downgrade(&data);
480 let rcbi = Box::new(
481 move |instrument: ObservableCounter<u64>, meter: &Meter| {
482 let unreg = meter.register_callback(
483 &[instrument.as_any()],
484 move |obs| {
485 if let Some(data) = weak.upgrade() {
486 obs.observe_u64(
487 &instrument,
488 data.load(Ordering::SeqCst),
489 &[],
490 );
491 }
492 },
493 )?;
494 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
495 },
496 );
497
498 AtomicObservableInstrumentBuilder {
499 meter,
500 builder: meter.u64_observable_counter(name),
501 rcbi,
502 }
503 }
504
505 pub fn add(&self, value: u64) {
507 self.0.fetch_add(value, Ordering::SeqCst);
508 }
509
510 pub fn get(&self) -> u64 {
512 self.0.load(Ordering::SeqCst)
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct AtomicObservableGaugeU64(
519 Arc<AtomicU64>,
520 #[allow(unused)] Arc<Unreg>,
521);
522
523impl AtomicObservableGaugeU64 {
524 pub fn new(
530 meter: &Meter,
531 name: impl Into<std::borrow::Cow<'static, str>>,
532 initial_value: u64,
533 ) -> AtomicObservableInstrumentBuilder<
534 '_,
535 AtomicObservableGaugeU64,
536 ObservableGauge<u64>,
537 u64,
538 > {
539 let data = Arc::new(AtomicU64::new(initial_value));
540 let weak = Arc::downgrade(&data);
541 let rcbi =
542 Box::new(move |instrument: ObservableGauge<u64>, meter: &Meter| {
543 let unreg = meter.register_callback(
544 &[instrument.as_any()],
545 move |obs| {
546 if let Some(data) = weak.upgrade() {
547 obs.observe_u64(
548 &instrument,
549 data.load(Ordering::SeqCst),
550 &[],
551 );
552 }
553 },
554 )?;
555 Ok(Self(data, Arc::new(Unreg(Some(unreg)))))
556 });
557
558 AtomicObservableInstrumentBuilder {
559 meter,
560 builder: meter.u64_observable_gauge(name),
561 rcbi,
562 }
563 }
564
565 pub fn set(&self, value: u64) {
567 self.0.store(value, Ordering::SeqCst);
568 }
569
570 pub fn get(&self) -> u64 {
572 self.0.load(Ordering::SeqCst)
573 }
574}
575
576pub trait MeterExt {
578 fn f64_observable_counter_atomic(
581 &self,
582 name: impl Into<Cow<'static, str>>,
583 initial_value: f64,
584 ) -> AtomicObservableInstrumentBuilder<
585 '_,
586 AtomicObservableCounterF64,
587 ObservableCounter<f64>,
588 f64,
589 >;
590
591 fn f64_observable_gauge_atomic(
593 &self,
594 name: impl Into<Cow<'static, str>>,
595 initial_value: f64,
596 ) -> AtomicObservableInstrumentBuilder<
597 '_,
598 AtomicObservableGaugeF64,
599 ObservableGauge<f64>,
600 f64,
601 >;
602
603 fn f64_observable_up_down_counter_atomic(
606 &self,
607 name: impl Into<Cow<'static, str>>,
608 initial_value: f64,
609 ) -> AtomicObservableInstrumentBuilder<
610 '_,
611 AtomicObservableUpDownCounterF64,
612 ObservableUpDownCounter<f64>,
613 f64,
614 >;
615
616 fn i64_observable_gauge_atomic(
618 &self,
619 name: impl Into<Cow<'static, str>>,
620 initial_value: i64,
621 ) -> AtomicObservableInstrumentBuilder<
622 '_,
623 AtomicObservableGaugeI64,
624 ObservableGauge<i64>,
625 i64,
626 >;
627
628 fn i64_observable_up_down_counter_atomic(
630 &self,
631 name: impl Into<Cow<'static, str>>,
632 initial_value: i64,
633 ) -> AtomicObservableInstrumentBuilder<
634 '_,
635 AtomicObservableUpDownCounterI64,
636 ObservableUpDownCounter<i64>,
637 i64,
638 >;
639
640 fn u64_observable_counter_atomic(
642 &self,
643 name: impl Into<Cow<'static, str>>,
644 initial_value: u64,
645 ) -> AtomicObservableInstrumentBuilder<
646 '_,
647 AtomicObservableCounterU64,
648 ObservableCounter<u64>,
649 u64,
650 >;
651
652 fn u64_observable_gauge_atomic(
654 &self,
655 name: impl Into<Cow<'static, str>>,
656 initial_value: u64,
657 ) -> AtomicObservableInstrumentBuilder<
658 '_,
659 AtomicObservableGaugeU64,
660 ObservableGauge<u64>,
661 u64,
662 >;
663}
664
665impl MeterExt for Meter {
666 fn f64_observable_counter_atomic(
667 &self,
668 name: impl Into<Cow<'static, str>>,
669 initial_value: f64,
670 ) -> AtomicObservableInstrumentBuilder<
671 '_,
672 AtomicObservableCounterF64,
673 ObservableCounter<f64>,
674 f64,
675 > {
676 AtomicObservableCounterF64::new(self, name, initial_value)
677 }
678
679 fn f64_observable_gauge_atomic(
680 &self,
681 name: impl Into<Cow<'static, str>>,
682 initial_value: f64,
683 ) -> AtomicObservableInstrumentBuilder<
684 '_,
685 AtomicObservableGaugeF64,
686 ObservableGauge<f64>,
687 f64,
688 > {
689 AtomicObservableGaugeF64::new(self, name, initial_value)
690 }
691
692 fn f64_observable_up_down_counter_atomic(
693 &self,
694 name: impl Into<Cow<'static, str>>,
695 initial_value: f64,
696 ) -> AtomicObservableInstrumentBuilder<
697 '_,
698 AtomicObservableUpDownCounterF64,
699 ObservableUpDownCounter<f64>,
700 f64,
701 > {
702 AtomicObservableUpDownCounterF64::new(self, name, initial_value)
703 }
704
705 fn i64_observable_gauge_atomic(
706 &self,
707 name: impl Into<Cow<'static, str>>,
708 initial_value: i64,
709 ) -> AtomicObservableInstrumentBuilder<
710 '_,
711 AtomicObservableGaugeI64,
712 ObservableGauge<i64>,
713 i64,
714 > {
715 AtomicObservableGaugeI64::new(self, name, initial_value)
716 }
717
718 fn i64_observable_up_down_counter_atomic(
719 &self,
720 name: impl Into<Cow<'static, str>>,
721 initial_value: i64,
722 ) -> AtomicObservableInstrumentBuilder<
723 '_,
724 AtomicObservableUpDownCounterI64,
725 ObservableUpDownCounter<i64>,
726 i64,
727 > {
728 AtomicObservableUpDownCounterI64::new(self, name, initial_value)
729 }
730
731 fn u64_observable_counter_atomic(
732 &self,
733 name: impl Into<Cow<'static, str>>,
734 initial_value: u64,
735 ) -> AtomicObservableInstrumentBuilder<
736 '_,
737 AtomicObservableCounterU64,
738 ObservableCounter<u64>,
739 u64,
740 > {
741 AtomicObservableCounterU64::new(self, name, initial_value)
742 }
743
744 fn u64_observable_gauge_atomic(
745 &self,
746 name: impl Into<Cow<'static, str>>,
747 initial_value: u64,
748 ) -> AtomicObservableInstrumentBuilder<
749 '_,
750 AtomicObservableGaugeU64,
751 ObservableGauge<u64>,
752 u64,
753 > {
754 AtomicObservableGaugeU64::new(self, name, initial_value)
755 }
756}