influxive_otel/
lib.rs

1#![deny(missing_docs)]
2#![deny(warnings)]
3#![deny(unsafe_code)]
4//! Opentelemetry metrics bindings for influxive-child-svc.
5//!
6//! ## Example
7//!
8//! ```
9//! # #[tokio::main(flavor = "multi_thread")]
10//! # async fn main() {
11//! #     use std::sync::Arc;
12//! use influxive_writer::*;
13//!
14//! // create an influxive writer
15//! let writer = InfluxiveWriter::with_token_auth(
16//!     InfluxiveWriterConfig::default(),
17//!     "http://127.0.0.1:8086",
18//!     "my.bucket",
19//!     "my.token",
20//! );
21//!
22//! // register the meter provider
23//! opentelemetry_api::global::set_meter_provider(
24//!     influxive_otel::InfluxiveMeterProvider::new(
25//!         Default::default(),
26//!         Arc::new(writer),
27//!     )
28//! );
29//!
30//! // create a metric
31//! let m = opentelemetry_api::global::meter("my.meter")
32//!     .f64_histogram("my.metric")
33//!     .init();
34//!
35//! // make a recording
36//! m.record(3.14, &[]);
37//! # }
38//! ```
39
40use influxive_core::*;
41use opentelemetry_api::metrics::*;
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::sync::Mutex;
45
46type Erased = Box<dyn Fn() + 'static + Send + Sync>;
47struct ErasedMap(Mutex<HashMap<u64, Erased>>);
48
49impl ErasedMap {
50    pub fn new() -> Arc<Self> {
51        Arc::new(Self(Mutex::new(HashMap::new())))
52    }
53
54    pub fn push(&self, erased: Erased) -> u64 {
55        static ID: std::sync::atomic::AtomicU64 =
56            std::sync::atomic::AtomicU64::new(1);
57        let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
58        self.0.lock().unwrap().insert(id, erased);
59        id
60    }
61
62    pub fn remove(&self, id: u64) {
63        self.0.lock().unwrap().remove(&id);
64    }
65
66    pub fn invoke(&self) {
67        let mut map = std::mem::take(&mut *self.0.lock().unwrap());
68        for (_, cb) in map.iter() {
69            cb();
70        }
71        let mut lock = self.0.lock().unwrap();
72        for (id, cb) in lock.drain() {
73            map.insert(id, cb);
74        }
75        std::mem::swap(&mut *lock, &mut map);
76    }
77}
78
79struct InfluxiveUniMetric<
80    T: 'static + std::fmt::Display + Into<DataType> + Send + Sync,
81> {
82    this: std::sync::Weak<Self>,
83    influxive: Arc<dyn MetricWriter + 'static + Send + Sync>,
84    name: std::borrow::Cow<'static, str>,
85    unit: Option<opentelemetry_api::metrics::Unit>,
86    attributes: Option<Arc<[opentelemetry_api::KeyValue]>>,
87    _p: std::marker::PhantomData<T>,
88}
89
90impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
91    InfluxiveUniMetric<T>
92{
93    pub fn new(
94        influxive: Arc<dyn MetricWriter + 'static + Send + Sync>,
95        name: std::borrow::Cow<'static, str>,
96        // description over and over takes up too much space in the
97        // influx database, just ignore it for this application.
98        _description: Option<std::borrow::Cow<'static, str>>,
99        unit: Option<opentelemetry_api::metrics::Unit>,
100        attributes: Option<Arc<[opentelemetry_api::KeyValue]>>,
101    ) -> Arc<Self> {
102        Arc::new_cyclic(|this| Self {
103            this: this.clone(),
104            influxive,
105            name,
106            unit,
107            attributes,
108            _p: std::marker::PhantomData,
109        })
110    }
111
112    fn report(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
113        let name = if let Some(unit) = &self.unit {
114            format!("{}.{}", &self.name, unit.as_str())
115        } else {
116            self.name.to_string()
117        };
118
119        // otel metrics are largely a single measurement... so
120        // just applying them to the generic "value" name in influx.
121        let mut metric = Metric::new(std::time::SystemTime::now(), name)
122            .with_field("value", value);
123
124        // everything else is a tag? would these be better as fields?
125        // some kind of naming convention to pick between the two??
126        for kv in attributes {
127            metric = metric.with_tag(kv.key.to_string(), kv.value.to_string());
128        }
129
130        if let Some(attributes) = &self.attributes {
131            for kv in attributes.iter() {
132                metric =
133                    metric.with_tag(kv.key.to_string(), kv.value.to_string());
134            }
135        }
136
137        self.influxive.write_metric(metric);
138    }
139}
140
141impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
142    opentelemetry_api::metrics::SyncCounter<T> for InfluxiveUniMetric<T>
143{
144    fn add(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
145        self.report(value, attributes)
146    }
147}
148
149impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
150    opentelemetry_api::metrics::SyncUpDownCounter<T> for InfluxiveUniMetric<T>
151{
152    fn add(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
153        self.report(value, attributes)
154    }
155}
156
157impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
158    opentelemetry_api::metrics::SyncHistogram<T> for InfluxiveUniMetric<T>
159{
160    fn record(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
161        self.report(value, attributes)
162    }
163}
164
165impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
166    opentelemetry_api::metrics::AsyncInstrument<T> for InfluxiveUniMetric<T>
167{
168    fn observe(
169        &self,
170        measurement: T,
171        attributes: &[opentelemetry_api::KeyValue],
172    ) {
173        self.report(measurement, attributes)
174    }
175
176    fn as_any(&self) -> Arc<dyn std::any::Any> {
177        // this unwrap *should* be safe... so long as no one calls
178        // Arc::into_inner() ever, which shouldn't be possible
179        // because we're using trait objects everywhere??
180        self.this.upgrade().unwrap()
181    }
182}
183
184struct InfluxiveInstrumentProvider(
185    Arc<dyn MetricWriter + 'static + Send + Sync>,
186    Option<Arc<[opentelemetry_api::KeyValue]>>,
187    Arc<ErasedMap>,
188);
189
190macro_rules! obs_body {
191    ($s:ident, $t:ident, $n:ident, $d:ident, $u:ident, $c:ident,) => {{
192        let g = $t::new(InfluxiveUniMetric::new(
193            $s.0.clone(),
194            $n,
195            $d,
196            $u,
197            $s.1.clone(),
198        ));
199
200        let g2 = g.clone();
201        $s.2.push(Box::new(move || {
202            for cb in $c.iter() {
203                cb(&g2);
204            }
205        }));
206
207        Ok(g)
208    }};
209}
210
211impl opentelemetry_api::metrics::InstrumentProvider
212    for InfluxiveInstrumentProvider
213{
214    fn u64_counter(
215        &self,
216        name: std::borrow::Cow<'static, str>,
217        description: Option<std::borrow::Cow<'static, str>>,
218        unit: Option<opentelemetry_api::metrics::Unit>,
219    ) -> opentelemetry_api::metrics::Result<
220        opentelemetry_api::metrics::Counter<u64>,
221    > {
222        Ok(opentelemetry_api::metrics::Counter::new(
223            InfluxiveUniMetric::new(
224                self.0.clone(),
225                name,
226                description,
227                unit,
228                self.1.clone(),
229            ),
230        ))
231    }
232
233    fn f64_counter(
234        &self,
235        name: std::borrow::Cow<'static, str>,
236        description: Option<std::borrow::Cow<'static, str>>,
237        unit: Option<opentelemetry_api::metrics::Unit>,
238    ) -> opentelemetry_api::metrics::Result<
239        opentelemetry_api::metrics::Counter<f64>,
240    > {
241        Ok(opentelemetry_api::metrics::Counter::new(
242            InfluxiveUniMetric::new(
243                self.0.clone(),
244                name,
245                description,
246                unit,
247                self.1.clone(),
248            ),
249        ))
250    }
251
252    fn u64_observable_counter(
253        &self,
254        name: std::borrow::Cow<'static, str>,
255        description: Option<std::borrow::Cow<'static, str>>,
256        unit: Option<opentelemetry_api::metrics::Unit>,
257        callback_list: Vec<opentelemetry_api::metrics::Callback<u64>>,
258    ) -> opentelemetry_api::metrics::Result<
259        opentelemetry_api::metrics::ObservableCounter<u64>,
260    > {
261        obs_body!(
262            self,
263            ObservableCounter,
264            name,
265            description,
266            unit,
267            callback_list,
268        )
269    }
270
271    fn f64_observable_counter(
272        &self,
273        name: std::borrow::Cow<'static, str>,
274        description: Option<std::borrow::Cow<'static, str>>,
275        unit: Option<opentelemetry_api::metrics::Unit>,
276        callback_list: Vec<opentelemetry_api::metrics::Callback<f64>>,
277    ) -> opentelemetry_api::metrics::Result<
278        opentelemetry_api::metrics::ObservableCounter<f64>,
279    > {
280        obs_body!(
281            self,
282            ObservableCounter,
283            name,
284            description,
285            unit,
286            callback_list,
287        )
288    }
289
290    fn i64_up_down_counter(
291        &self,
292        name: std::borrow::Cow<'static, str>,
293        description: Option<std::borrow::Cow<'static, str>>,
294        unit: Option<opentelemetry_api::metrics::Unit>,
295    ) -> opentelemetry_api::metrics::Result<
296        opentelemetry_api::metrics::UpDownCounter<i64>,
297    > {
298        Ok(opentelemetry_api::metrics::UpDownCounter::new(
299            InfluxiveUniMetric::new(
300                self.0.clone(),
301                name,
302                description,
303                unit,
304                self.1.clone(),
305            ),
306        ))
307    }
308
309    fn f64_up_down_counter(
310        &self,
311        name: std::borrow::Cow<'static, str>,
312        description: Option<std::borrow::Cow<'static, str>>,
313        unit: Option<opentelemetry_api::metrics::Unit>,
314    ) -> opentelemetry_api::metrics::Result<
315        opentelemetry_api::metrics::UpDownCounter<f64>,
316    > {
317        Ok(opentelemetry_api::metrics::UpDownCounter::new(
318            InfluxiveUniMetric::new(
319                self.0.clone(),
320                name,
321                description,
322                unit,
323                self.1.clone(),
324            ),
325        ))
326    }
327
328    fn i64_observable_up_down_counter(
329        &self,
330        name: std::borrow::Cow<'static, str>,
331        description: Option<std::borrow::Cow<'static, str>>,
332        unit: Option<opentelemetry_api::metrics::Unit>,
333        callback_list: Vec<opentelemetry_api::metrics::Callback<i64>>,
334    ) -> opentelemetry_api::metrics::Result<
335        opentelemetry_api::metrics::ObservableUpDownCounter<i64>,
336    > {
337        obs_body!(
338            self,
339            ObservableUpDownCounter,
340            name,
341            description,
342            unit,
343            callback_list,
344        )
345    }
346
347    fn f64_observable_up_down_counter(
348        &self,
349        name: std::borrow::Cow<'static, str>,
350        description: Option<std::borrow::Cow<'static, str>>,
351        unit: Option<opentelemetry_api::metrics::Unit>,
352        callback_list: Vec<opentelemetry_api::metrics::Callback<f64>>,
353    ) -> opentelemetry_api::metrics::Result<
354        opentelemetry_api::metrics::ObservableUpDownCounter<f64>,
355    > {
356        obs_body!(
357            self,
358            ObservableUpDownCounter,
359            name,
360            description,
361            unit,
362            callback_list,
363        )
364    }
365
366    fn u64_observable_gauge(
367        &self,
368        name: std::borrow::Cow<'static, str>,
369        description: Option<std::borrow::Cow<'static, str>>,
370        unit: Option<opentelemetry_api::metrics::Unit>,
371        callback_list: Vec<opentelemetry_api::metrics::Callback<u64>>,
372    ) -> opentelemetry_api::metrics::Result<
373        opentelemetry_api::metrics::ObservableGauge<u64>,
374    > {
375        obs_body!(
376            self,
377            ObservableGauge,
378            name,
379            description,
380            unit,
381            callback_list,
382        )
383    }
384
385    fn i64_observable_gauge(
386        &self,
387        name: std::borrow::Cow<'static, str>,
388        description: Option<std::borrow::Cow<'static, str>>,
389        unit: Option<opentelemetry_api::metrics::Unit>,
390        callback_list: Vec<opentelemetry_api::metrics::Callback<i64>>,
391    ) -> opentelemetry_api::metrics::Result<
392        opentelemetry_api::metrics::ObservableGauge<i64>,
393    > {
394        obs_body!(
395            self,
396            ObservableGauge,
397            name,
398            description,
399            unit,
400            callback_list,
401        )
402    }
403
404    fn f64_observable_gauge(
405        &self,
406        name: std::borrow::Cow<'static, str>,
407        description: Option<std::borrow::Cow<'static, str>>,
408        unit: Option<opentelemetry_api::metrics::Unit>,
409        callback_list: Vec<opentelemetry_api::metrics::Callback<f64>>,
410    ) -> opentelemetry_api::metrics::Result<
411        opentelemetry_api::metrics::ObservableGauge<f64>,
412    > {
413        obs_body!(
414            self,
415            ObservableGauge,
416            name,
417            description,
418            unit,
419            callback_list,
420        )
421    }
422
423    fn f64_histogram(
424        &self,
425        name: std::borrow::Cow<'static, str>,
426        description: Option<std::borrow::Cow<'static, str>>,
427        unit: Option<opentelemetry_api::metrics::Unit>,
428    ) -> opentelemetry_api::metrics::Result<
429        opentelemetry_api::metrics::Histogram<f64>,
430    > {
431        Ok(opentelemetry_api::metrics::Histogram::new(
432            InfluxiveUniMetric::new(
433                self.0.clone(),
434                name,
435                description,
436                unit,
437                self.1.clone(),
438            ),
439        ))
440    }
441
442    fn u64_histogram(
443        &self,
444        name: std::borrow::Cow<'static, str>,
445        description: Option<std::borrow::Cow<'static, str>>,
446        unit: Option<opentelemetry_api::metrics::Unit>,
447    ) -> opentelemetry_api::metrics::Result<
448        opentelemetry_api::metrics::Histogram<u64>,
449    > {
450        Ok(opentelemetry_api::metrics::Histogram::new(
451            InfluxiveUniMetric::new(
452                self.0.clone(),
453                name,
454                description,
455                unit,
456                self.1.clone(),
457            ),
458        ))
459    }
460
461    fn i64_histogram(
462        &self,
463        name: std::borrow::Cow<'static, str>,
464        description: Option<std::borrow::Cow<'static, str>>,
465        unit: Option<opentelemetry_api::metrics::Unit>,
466    ) -> opentelemetry_api::metrics::Result<
467        opentelemetry_api::metrics::Histogram<i64>,
468    > {
469        Ok(opentelemetry_api::metrics::Histogram::new(
470            InfluxiveUniMetric::new(
471                self.0.clone(),
472                name,
473                description,
474                unit,
475                self.1.clone(),
476            ),
477        ))
478    }
479
480    fn register_callback(
481        &self,
482        _instruments: &[Arc<dyn std::any::Any>],
483        callback: Box<
484            dyn Fn(&dyn opentelemetry_api::metrics::Observer) + Send + Sync,
485        >,
486    ) -> opentelemetry_api::metrics::Result<
487        Box<dyn opentelemetry_api::metrics::CallbackRegistration>,
488    > {
489        struct O;
490        impl opentelemetry_api::metrics::Observer for O {
491            fn observe_f64(
492                &self,
493                inst: &dyn opentelemetry_api::metrics::AsyncInstrument<f64>,
494                measurement: f64,
495                attrs: &[opentelemetry_api::KeyValue],
496            ) {
497                inst.observe(measurement, attrs);
498            }
499
500            fn observe_u64(
501                &self,
502                inst: &dyn opentelemetry_api::metrics::AsyncInstrument<u64>,
503                measurement: u64,
504                attrs: &[opentelemetry_api::KeyValue],
505            ) {
506                inst.observe(measurement, attrs);
507            }
508
509            fn observe_i64(
510                &self,
511                inst: &dyn opentelemetry_api::metrics::AsyncInstrument<i64>,
512                measurement: i64,
513                attrs: &[opentelemetry_api::KeyValue],
514            ) {
515                inst.observe(measurement, attrs);
516            }
517        }
518
519        let id = self.2.push(Box::new(move || callback(&O)));
520
521        struct Unregister(u64, Arc<ErasedMap>);
522
523        impl opentelemetry_api::metrics::CallbackRegistration for Unregister {
524            fn unregister(&mut self) -> opentelemetry_api::metrics::Result<()> {
525                self.1.remove(self.0);
526                Ok(())
527            }
528        }
529
530        Ok(Box::new(Unregister(id, self.2.clone())))
531    }
532}
533
534/// Influxive InfluxDB Meter Provider Configuration.
535#[non_exhaustive]
536pub struct InfluxiveMeterProviderConfig {
537    /// Reporting interval for observable metrics.
538    /// Set to `None` to disable periodic reporting
539    /// (you'll need to call [InfluxiveMeterProvider::report] manually).
540    /// Defaults to 30 seconds.
541    pub observable_report_interval: Option<std::time::Duration>,
542}
543
544impl Default for InfluxiveMeterProviderConfig {
545    fn default() -> Self {
546        Self {
547            observable_report_interval: Some(std::time::Duration::from_secs(
548                30,
549            )),
550        }
551    }
552}
553
554impl InfluxiveMeterProviderConfig {
555    /// Apply [InfluxiveMeterProviderConfig::observable_report_interval].
556    pub fn with_observable_report_interval(
557        mut self,
558        observable_report_interval: Option<std::time::Duration>,
559    ) -> Self {
560        self.observable_report_interval = observable_report_interval;
561        self
562    }
563}
564
565/// Influxive InfluxDB Opentelemetry Meter Provider.
566#[derive(Clone)]
567pub struct InfluxiveMeterProvider(
568    Arc<dyn MetricWriter + 'static + Send + Sync>,
569    Arc<ErasedMap>,
570);
571
572impl InfluxiveMeterProvider {
573    /// Construct a new InfluxiveMeterProvider instance with a given
574    /// "Influxive" InfluxiveDB child process connector.
575    pub fn new(
576        config: InfluxiveMeterProviderConfig,
577        influxive: Arc<dyn MetricWriter + 'static + Send + Sync>,
578    ) -> Self {
579        let strong = ErasedMap::new();
580
581        if let Some(interval) = config.observable_report_interval {
582            let weak = Arc::downgrade(&strong);
583            tokio::task::spawn(async move {
584                let mut interval = tokio::time::interval(interval);
585                loop {
586                    interval.tick().await;
587                    if let Some(strong) = weak.upgrade() {
588                        strong.invoke();
589                    } else {
590                        break;
591                    }
592                }
593            });
594        }
595
596        Self(influxive, strong)
597    }
598
599    /// Manually report all observable metrics.
600    pub fn report(&self) {
601        self.1.invoke();
602    }
603}
604
605impl opentelemetry_api::metrics::MeterProvider for InfluxiveMeterProvider {
606    fn versioned_meter(
607        &self,
608        _name: impl Into<std::borrow::Cow<'static, str>>,
609        _version: Option<impl Into<std::borrow::Cow<'static, str>>>,
610        _schema_url: Option<impl Into<std::borrow::Cow<'static, str>>>,
611        attributes: Option<Vec<opentelemetry_api::KeyValue>>,
612    ) -> opentelemetry_api::metrics::Meter {
613        let attributes: Option<Arc<[opentelemetry_api::KeyValue]>> =
614            attributes.map(|a| a.into_boxed_slice().into());
615        opentelemetry_api::metrics::Meter::new(Arc::new(
616            InfluxiveInstrumentProvider(
617                self.0.clone(),
618                attributes,
619                self.1.clone(),
620            ),
621        ))
622    }
623}
624
625#[cfg(test)]
626mod test;