1#![deny(missing_docs)]
2#![deny(warnings)]
3#![deny(unsafe_code)]
4use influxive_core::*;
41use opentelemetry_api::metrics::*;
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::sync::Mutex;
45
46type Erased = Box<dyn Fn() + 'static + Send + Sync>;
47struct ErasedMap(Mutex<HashMap<u64, Erased>>);
48
49impl ErasedMap {
50 pub fn new() -> Arc<Self> {
51 Arc::new(Self(Mutex::new(HashMap::new())))
52 }
53
54 pub fn push(&self, erased: Erased) -> u64 {
55 static ID: std::sync::atomic::AtomicU64 =
56 std::sync::atomic::AtomicU64::new(1);
57 let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
58 self.0.lock().unwrap().insert(id, erased);
59 id
60 }
61
62 pub fn remove(&self, id: u64) {
63 self.0.lock().unwrap().remove(&id);
64 }
65
66 pub fn invoke(&self) {
67 let mut map = std::mem::take(&mut *self.0.lock().unwrap());
68 for (_, cb) in map.iter() {
69 cb();
70 }
71 let mut lock = self.0.lock().unwrap();
72 for (id, cb) in lock.drain() {
73 map.insert(id, cb);
74 }
75 std::mem::swap(&mut *lock, &mut map);
76 }
77}
78
79struct InfluxiveUniMetric<
80 T: 'static + std::fmt::Display + Into<DataType> + Send + Sync,
81> {
82 this: std::sync::Weak<Self>,
83 influxive: Arc<dyn MetricWriter + 'static + Send + Sync>,
84 name: std::borrow::Cow<'static, str>,
85 unit: Option<opentelemetry_api::metrics::Unit>,
86 attributes: Option<Arc<[opentelemetry_api::KeyValue]>>,
87 _p: std::marker::PhantomData<T>,
88}
89
90impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
91 InfluxiveUniMetric<T>
92{
93 pub fn new(
94 influxive: Arc<dyn MetricWriter + 'static + Send + Sync>,
95 name: std::borrow::Cow<'static, str>,
96 _description: Option<std::borrow::Cow<'static, str>>,
99 unit: Option<opentelemetry_api::metrics::Unit>,
100 attributes: Option<Arc<[opentelemetry_api::KeyValue]>>,
101 ) -> Arc<Self> {
102 Arc::new_cyclic(|this| Self {
103 this: this.clone(),
104 influxive,
105 name,
106 unit,
107 attributes,
108 _p: std::marker::PhantomData,
109 })
110 }
111
112 fn report(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
113 let name = if let Some(unit) = &self.unit {
114 format!("{}.{}", &self.name, unit.as_str())
115 } else {
116 self.name.to_string()
117 };
118
119 let mut metric = Metric::new(std::time::SystemTime::now(), name)
122 .with_field("value", value);
123
124 for kv in attributes {
127 metric = metric.with_tag(kv.key.to_string(), kv.value.to_string());
128 }
129
130 if let Some(attributes) = &self.attributes {
131 for kv in attributes.iter() {
132 metric =
133 metric.with_tag(kv.key.to_string(), kv.value.to_string());
134 }
135 }
136
137 self.influxive.write_metric(metric);
138 }
139}
140
141impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
142 opentelemetry_api::metrics::SyncCounter<T> for InfluxiveUniMetric<T>
143{
144 fn add(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
145 self.report(value, attributes)
146 }
147}
148
149impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
150 opentelemetry_api::metrics::SyncUpDownCounter<T> for InfluxiveUniMetric<T>
151{
152 fn add(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
153 self.report(value, attributes)
154 }
155}
156
157impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
158 opentelemetry_api::metrics::SyncHistogram<T> for InfluxiveUniMetric<T>
159{
160 fn record(&self, value: T, attributes: &[opentelemetry_api::KeyValue]) {
161 self.report(value, attributes)
162 }
163}
164
165impl<T: 'static + std::fmt::Display + Into<DataType> + Send + Sync>
166 opentelemetry_api::metrics::AsyncInstrument<T> for InfluxiveUniMetric<T>
167{
168 fn observe(
169 &self,
170 measurement: T,
171 attributes: &[opentelemetry_api::KeyValue],
172 ) {
173 self.report(measurement, attributes)
174 }
175
176 fn as_any(&self) -> Arc<dyn std::any::Any> {
177 self.this.upgrade().unwrap()
181 }
182}
183
184struct InfluxiveInstrumentProvider(
185 Arc<dyn MetricWriter + 'static + Send + Sync>,
186 Option<Arc<[opentelemetry_api::KeyValue]>>,
187 Arc<ErasedMap>,
188);
189
190macro_rules! obs_body {
191 ($s:ident, $t:ident, $n:ident, $d:ident, $u:ident, $c:ident,) => {{
192 let g = $t::new(InfluxiveUniMetric::new(
193 $s.0.clone(),
194 $n,
195 $d,
196 $u,
197 $s.1.clone(),
198 ));
199
200 let g2 = g.clone();
201 $s.2.push(Box::new(move || {
202 for cb in $c.iter() {
203 cb(&g2);
204 }
205 }));
206
207 Ok(g)
208 }};
209}
210
211impl opentelemetry_api::metrics::InstrumentProvider
212 for InfluxiveInstrumentProvider
213{
214 fn u64_counter(
215 &self,
216 name: std::borrow::Cow<'static, str>,
217 description: Option<std::borrow::Cow<'static, str>>,
218 unit: Option<opentelemetry_api::metrics::Unit>,
219 ) -> opentelemetry_api::metrics::Result<
220 opentelemetry_api::metrics::Counter<u64>,
221 > {
222 Ok(opentelemetry_api::metrics::Counter::new(
223 InfluxiveUniMetric::new(
224 self.0.clone(),
225 name,
226 description,
227 unit,
228 self.1.clone(),
229 ),
230 ))
231 }
232
233 fn f64_counter(
234 &self,
235 name: std::borrow::Cow<'static, str>,
236 description: Option<std::borrow::Cow<'static, str>>,
237 unit: Option<opentelemetry_api::metrics::Unit>,
238 ) -> opentelemetry_api::metrics::Result<
239 opentelemetry_api::metrics::Counter<f64>,
240 > {
241 Ok(opentelemetry_api::metrics::Counter::new(
242 InfluxiveUniMetric::new(
243 self.0.clone(),
244 name,
245 description,
246 unit,
247 self.1.clone(),
248 ),
249 ))
250 }
251
252 fn u64_observable_counter(
253 &self,
254 name: std::borrow::Cow<'static, str>,
255 description: Option<std::borrow::Cow<'static, str>>,
256 unit: Option<opentelemetry_api::metrics::Unit>,
257 callback_list: Vec<opentelemetry_api::metrics::Callback<u64>>,
258 ) -> opentelemetry_api::metrics::Result<
259 opentelemetry_api::metrics::ObservableCounter<u64>,
260 > {
261 obs_body!(
262 self,
263 ObservableCounter,
264 name,
265 description,
266 unit,
267 callback_list,
268 )
269 }
270
271 fn f64_observable_counter(
272 &self,
273 name: std::borrow::Cow<'static, str>,
274 description: Option<std::borrow::Cow<'static, str>>,
275 unit: Option<opentelemetry_api::metrics::Unit>,
276 callback_list: Vec<opentelemetry_api::metrics::Callback<f64>>,
277 ) -> opentelemetry_api::metrics::Result<
278 opentelemetry_api::metrics::ObservableCounter<f64>,
279 > {
280 obs_body!(
281 self,
282 ObservableCounter,
283 name,
284 description,
285 unit,
286 callback_list,
287 )
288 }
289
290 fn i64_up_down_counter(
291 &self,
292 name: std::borrow::Cow<'static, str>,
293 description: Option<std::borrow::Cow<'static, str>>,
294 unit: Option<opentelemetry_api::metrics::Unit>,
295 ) -> opentelemetry_api::metrics::Result<
296 opentelemetry_api::metrics::UpDownCounter<i64>,
297 > {
298 Ok(opentelemetry_api::metrics::UpDownCounter::new(
299 InfluxiveUniMetric::new(
300 self.0.clone(),
301 name,
302 description,
303 unit,
304 self.1.clone(),
305 ),
306 ))
307 }
308
309 fn f64_up_down_counter(
310 &self,
311 name: std::borrow::Cow<'static, str>,
312 description: Option<std::borrow::Cow<'static, str>>,
313 unit: Option<opentelemetry_api::metrics::Unit>,
314 ) -> opentelemetry_api::metrics::Result<
315 opentelemetry_api::metrics::UpDownCounter<f64>,
316 > {
317 Ok(opentelemetry_api::metrics::UpDownCounter::new(
318 InfluxiveUniMetric::new(
319 self.0.clone(),
320 name,
321 description,
322 unit,
323 self.1.clone(),
324 ),
325 ))
326 }
327
328 fn i64_observable_up_down_counter(
329 &self,
330 name: std::borrow::Cow<'static, str>,
331 description: Option<std::borrow::Cow<'static, str>>,
332 unit: Option<opentelemetry_api::metrics::Unit>,
333 callback_list: Vec<opentelemetry_api::metrics::Callback<i64>>,
334 ) -> opentelemetry_api::metrics::Result<
335 opentelemetry_api::metrics::ObservableUpDownCounter<i64>,
336 > {
337 obs_body!(
338 self,
339 ObservableUpDownCounter,
340 name,
341 description,
342 unit,
343 callback_list,
344 )
345 }
346
347 fn f64_observable_up_down_counter(
348 &self,
349 name: std::borrow::Cow<'static, str>,
350 description: Option<std::borrow::Cow<'static, str>>,
351 unit: Option<opentelemetry_api::metrics::Unit>,
352 callback_list: Vec<opentelemetry_api::metrics::Callback<f64>>,
353 ) -> opentelemetry_api::metrics::Result<
354 opentelemetry_api::metrics::ObservableUpDownCounter<f64>,
355 > {
356 obs_body!(
357 self,
358 ObservableUpDownCounter,
359 name,
360 description,
361 unit,
362 callback_list,
363 )
364 }
365
366 fn u64_observable_gauge(
367 &self,
368 name: std::borrow::Cow<'static, str>,
369 description: Option<std::borrow::Cow<'static, str>>,
370 unit: Option<opentelemetry_api::metrics::Unit>,
371 callback_list: Vec<opentelemetry_api::metrics::Callback<u64>>,
372 ) -> opentelemetry_api::metrics::Result<
373 opentelemetry_api::metrics::ObservableGauge<u64>,
374 > {
375 obs_body!(
376 self,
377 ObservableGauge,
378 name,
379 description,
380 unit,
381 callback_list,
382 )
383 }
384
385 fn i64_observable_gauge(
386 &self,
387 name: std::borrow::Cow<'static, str>,
388 description: Option<std::borrow::Cow<'static, str>>,
389 unit: Option<opentelemetry_api::metrics::Unit>,
390 callback_list: Vec<opentelemetry_api::metrics::Callback<i64>>,
391 ) -> opentelemetry_api::metrics::Result<
392 opentelemetry_api::metrics::ObservableGauge<i64>,
393 > {
394 obs_body!(
395 self,
396 ObservableGauge,
397 name,
398 description,
399 unit,
400 callback_list,
401 )
402 }
403
404 fn f64_observable_gauge(
405 &self,
406 name: std::borrow::Cow<'static, str>,
407 description: Option<std::borrow::Cow<'static, str>>,
408 unit: Option<opentelemetry_api::metrics::Unit>,
409 callback_list: Vec<opentelemetry_api::metrics::Callback<f64>>,
410 ) -> opentelemetry_api::metrics::Result<
411 opentelemetry_api::metrics::ObservableGauge<f64>,
412 > {
413 obs_body!(
414 self,
415 ObservableGauge,
416 name,
417 description,
418 unit,
419 callback_list,
420 )
421 }
422
423 fn f64_histogram(
424 &self,
425 name: std::borrow::Cow<'static, str>,
426 description: Option<std::borrow::Cow<'static, str>>,
427 unit: Option<opentelemetry_api::metrics::Unit>,
428 ) -> opentelemetry_api::metrics::Result<
429 opentelemetry_api::metrics::Histogram<f64>,
430 > {
431 Ok(opentelemetry_api::metrics::Histogram::new(
432 InfluxiveUniMetric::new(
433 self.0.clone(),
434 name,
435 description,
436 unit,
437 self.1.clone(),
438 ),
439 ))
440 }
441
442 fn u64_histogram(
443 &self,
444 name: std::borrow::Cow<'static, str>,
445 description: Option<std::borrow::Cow<'static, str>>,
446 unit: Option<opentelemetry_api::metrics::Unit>,
447 ) -> opentelemetry_api::metrics::Result<
448 opentelemetry_api::metrics::Histogram<u64>,
449 > {
450 Ok(opentelemetry_api::metrics::Histogram::new(
451 InfluxiveUniMetric::new(
452 self.0.clone(),
453 name,
454 description,
455 unit,
456 self.1.clone(),
457 ),
458 ))
459 }
460
461 fn i64_histogram(
462 &self,
463 name: std::borrow::Cow<'static, str>,
464 description: Option<std::borrow::Cow<'static, str>>,
465 unit: Option<opentelemetry_api::metrics::Unit>,
466 ) -> opentelemetry_api::metrics::Result<
467 opentelemetry_api::metrics::Histogram<i64>,
468 > {
469 Ok(opentelemetry_api::metrics::Histogram::new(
470 InfluxiveUniMetric::new(
471 self.0.clone(),
472 name,
473 description,
474 unit,
475 self.1.clone(),
476 ),
477 ))
478 }
479
480 fn register_callback(
481 &self,
482 _instruments: &[Arc<dyn std::any::Any>],
483 callback: Box<
484 dyn Fn(&dyn opentelemetry_api::metrics::Observer) + Send + Sync,
485 >,
486 ) -> opentelemetry_api::metrics::Result<
487 Box<dyn opentelemetry_api::metrics::CallbackRegistration>,
488 > {
489 struct O;
490 impl opentelemetry_api::metrics::Observer for O {
491 fn observe_f64(
492 &self,
493 inst: &dyn opentelemetry_api::metrics::AsyncInstrument<f64>,
494 measurement: f64,
495 attrs: &[opentelemetry_api::KeyValue],
496 ) {
497 inst.observe(measurement, attrs);
498 }
499
500 fn observe_u64(
501 &self,
502 inst: &dyn opentelemetry_api::metrics::AsyncInstrument<u64>,
503 measurement: u64,
504 attrs: &[opentelemetry_api::KeyValue],
505 ) {
506 inst.observe(measurement, attrs);
507 }
508
509 fn observe_i64(
510 &self,
511 inst: &dyn opentelemetry_api::metrics::AsyncInstrument<i64>,
512 measurement: i64,
513 attrs: &[opentelemetry_api::KeyValue],
514 ) {
515 inst.observe(measurement, attrs);
516 }
517 }
518
519 let id = self.2.push(Box::new(move || callback(&O)));
520
521 struct Unregister(u64, Arc<ErasedMap>);
522
523 impl opentelemetry_api::metrics::CallbackRegistration for Unregister {
524 fn unregister(&mut self) -> opentelemetry_api::metrics::Result<()> {
525 self.1.remove(self.0);
526 Ok(())
527 }
528 }
529
530 Ok(Box::new(Unregister(id, self.2.clone())))
531 }
532}
533
534#[non_exhaustive]
536pub struct InfluxiveMeterProviderConfig {
537 pub observable_report_interval: Option<std::time::Duration>,
542}
543
544impl Default for InfluxiveMeterProviderConfig {
545 fn default() -> Self {
546 Self {
547 observable_report_interval: Some(std::time::Duration::from_secs(
548 30,
549 )),
550 }
551 }
552}
553
554impl InfluxiveMeterProviderConfig {
555 pub fn with_observable_report_interval(
557 mut self,
558 observable_report_interval: Option<std::time::Duration>,
559 ) -> Self {
560 self.observable_report_interval = observable_report_interval;
561 self
562 }
563}
564
565#[derive(Clone)]
567pub struct InfluxiveMeterProvider(
568 Arc<dyn MetricWriter + 'static + Send + Sync>,
569 Arc<ErasedMap>,
570);
571
572impl InfluxiveMeterProvider {
573 pub fn new(
576 config: InfluxiveMeterProviderConfig,
577 influxive: Arc<dyn MetricWriter + 'static + Send + Sync>,
578 ) -> Self {
579 let strong = ErasedMap::new();
580
581 if let Some(interval) = config.observable_report_interval {
582 let weak = Arc::downgrade(&strong);
583 tokio::task::spawn(async move {
584 let mut interval = tokio::time::interval(interval);
585 loop {
586 interval.tick().await;
587 if let Some(strong) = weak.upgrade() {
588 strong.invoke();
589 } else {
590 break;
591 }
592 }
593 });
594 }
595
596 Self(influxive, strong)
597 }
598
599 pub fn report(&self) {
601 self.1.invoke();
602 }
603}
604
605impl opentelemetry_api::metrics::MeterProvider for InfluxiveMeterProvider {
606 fn versioned_meter(
607 &self,
608 _name: impl Into<std::borrow::Cow<'static, str>>,
609 _version: Option<impl Into<std::borrow::Cow<'static, str>>>,
610 _schema_url: Option<impl Into<std::borrow::Cow<'static, str>>>,
611 attributes: Option<Vec<opentelemetry_api::KeyValue>>,
612 ) -> opentelemetry_api::metrics::Meter {
613 let attributes: Option<Arc<[opentelemetry_api::KeyValue]>> =
614 attributes.map(|a| a.into_boxed_slice().into());
615 opentelemetry_api::metrics::Meter::new(Arc::new(
616 InfluxiveInstrumentProvider(
617 self.0.clone(),
618 attributes,
619 self.1.clone(),
620 ),
621 ))
622 }
623}
624
625#[cfg(test)]
626mod test;