1use std::{collections::HashMap, fmt, sync::RwLock};
2use tracing::{field::Visit, Subscriber};
3use tracing_core::{Field, Interest, Metadata};
4
5#[cfg(feature = "metrics_gauge_unstable")]
6use opentelemetry::metrics::Gauge;
7use opentelemetry::{
8 metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter},
9 InstrumentationScope, KeyValue, Value,
10};
11use tracing_subscriber::{
12 filter::Filtered,
13 layer::{Context, Filter},
14 registry::LookupSpan,
15 Layer,
16};
17
18use smallvec::SmallVec;
19
20const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
21const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry";
22
23const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter.";
24const METRIC_PREFIX_COUNTER: &str = "counter.";
25const METRIC_PREFIX_HISTOGRAM: &str = "histogram.";
26#[cfg(feature = "metrics_gauge_unstable")]
27const METRIC_PREFIX_GAUGE: &str = "gauge.";
28
29const I64_MAX: u64 = i64::MAX as u64;
30
31#[derive(Default)]
32pub(crate) struct Instruments {
33 u64_counter: MetricsMap<Counter<u64>>,
34 f64_counter: MetricsMap<Counter<f64>>,
35 i64_up_down_counter: MetricsMap<UpDownCounter<i64>>,
36 f64_up_down_counter: MetricsMap<UpDownCounter<f64>>,
37 u64_histogram: MetricsMap<Histogram<u64>>,
38 f64_histogram: MetricsMap<Histogram<f64>>,
39 #[cfg(feature = "metrics_gauge_unstable")]
40 u64_gauge: MetricsMap<Gauge<u64>>,
41 #[cfg(feature = "metrics_gauge_unstable")]
42 i64_gauge: MetricsMap<Gauge<i64>>,
43 #[cfg(feature = "metrics_gauge_unstable")]
44 f64_gauge: MetricsMap<Gauge<f64>>,
45}
46
47type MetricsMap<T> = RwLock<HashMap<&'static str, T>>;
48
49#[derive(Copy, Clone, Debug)]
50pub(crate) enum InstrumentType {
51 CounterU64(u64),
52 CounterF64(f64),
53 UpDownCounterI64(i64),
54 UpDownCounterF64(f64),
55 HistogramU64(u64),
56 HistogramF64(f64),
57 #[cfg(feature = "metrics_gauge_unstable")]
58 GaugeU64(u64),
59 #[cfg(feature = "metrics_gauge_unstable")]
60 GaugeI64(i64),
61 #[cfg(feature = "metrics_gauge_unstable")]
62 GaugeF64(f64),
63}
64
65impl Instruments {
66 pub(crate) fn update_metric(
67 &self,
68 meter: &Meter,
69 instrument_type: InstrumentType,
70 metric_name: &'static str,
71 attributes: &[KeyValue],
72 ) {
73 fn update_or_insert<T>(
74 map: &MetricsMap<T>,
75 name: &'static str,
76 insert: impl FnOnce() -> T,
77 update: impl FnOnce(&T),
78 ) {
79 {
80 let lock = map.read().unwrap();
81 if let Some(metric) = lock.get(name) {
82 update(metric);
83 return;
84 }
85 }
86
87 let mut lock = map.write().unwrap();
90 let metric = lock.entry(name).or_insert_with(insert);
93 update(metric)
94 }
95
96 match instrument_type {
97 InstrumentType::CounterU64(value) => {
98 update_or_insert(
99 &self.u64_counter,
100 metric_name,
101 || meter.u64_counter(metric_name).build(),
102 |ctr| ctr.add(value, attributes),
103 );
104 }
105 InstrumentType::CounterF64(value) => {
106 update_or_insert(
107 &self.f64_counter,
108 metric_name,
109 || meter.f64_counter(metric_name).build(),
110 |ctr| ctr.add(value, attributes),
111 );
112 }
113 InstrumentType::UpDownCounterI64(value) => {
114 update_or_insert(
115 &self.i64_up_down_counter,
116 metric_name,
117 || meter.i64_up_down_counter(metric_name).build(),
118 |ctr| ctr.add(value, attributes),
119 );
120 }
121 InstrumentType::UpDownCounterF64(value) => {
122 update_or_insert(
123 &self.f64_up_down_counter,
124 metric_name,
125 || meter.f64_up_down_counter(metric_name).build(),
126 |ctr| ctr.add(value, attributes),
127 );
128 }
129 InstrumentType::HistogramU64(value) => {
130 update_or_insert(
131 &self.u64_histogram,
132 metric_name,
133 || meter.u64_histogram(metric_name).build(),
134 |rec| rec.record(value, attributes),
135 );
136 }
137 InstrumentType::HistogramF64(value) => {
138 update_or_insert(
139 &self.f64_histogram,
140 metric_name,
141 || meter.f64_histogram(metric_name).build(),
142 |rec| rec.record(value, attributes),
143 );
144 }
145 #[cfg(feature = "metrics_gauge_unstable")]
146 InstrumentType::GaugeU64(value) => {
147 update_or_insert(
148 &self.u64_gauge,
149 metric_name,
150 || meter.u64_gauge(metric_name).build(),
151 |rec| rec.record(value, attributes),
152 );
153 }
154 #[cfg(feature = "metrics_gauge_unstable")]
155 InstrumentType::GaugeI64(value) => {
156 update_or_insert(
157 &self.i64_gauge,
158 metric_name,
159 || meter.i64_gauge(metric_name).build(),
160 |rec| rec.record(value, attributes),
161 );
162 }
163 #[cfg(feature = "metrics_gauge_unstable")]
164 InstrumentType::GaugeF64(value) => {
165 update_or_insert(
166 &self.f64_gauge,
167 metric_name,
168 || meter.f64_gauge(metric_name).build(),
169 |rec| rec.record(value, attributes),
170 );
171 }
172 };
173 }
174}
175
176pub(crate) struct MetricVisitor<'a> {
177 attributes: &'a mut SmallVec<[KeyValue; 8]>,
178 visited_metrics: &'a mut SmallVec<[(&'static str, InstrumentType); 2]>,
179}
180
181impl Visit for MetricVisitor<'_> {
182 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
183 self.attributes
184 .push(KeyValue::new(field.name(), format!("{value:?}")));
185 }
186
187 fn record_u64(&mut self, field: &Field, value: u64) {
188 #[cfg(feature = "metrics_gauge_unstable")]
189 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
190 self.visited_metrics
191 .push((metric_name, InstrumentType::GaugeU64(value)));
192 return;
193 }
194 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
195 self.visited_metrics
196 .push((metric_name, InstrumentType::CounterU64(value)));
197 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
198 if value <= I64_MAX {
199 self.visited_metrics
200 .push((metric_name, InstrumentType::UpDownCounterI64(value as i64)));
201 } else {
202 eprintln!(
203 "[tracing-opentelemetry]: Received Counter metric, but \
204 provided u64: {} is greater than i64::MAX. Ignoring \
205 this metric.",
206 value
207 );
208 }
209 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
210 self.visited_metrics
211 .push((metric_name, InstrumentType::HistogramU64(value)));
212 } else if value <= I64_MAX {
213 self.attributes
214 .push(KeyValue::new(field.name(), Value::I64(value as i64)));
215 }
216 }
217
218 fn record_f64(&mut self, field: &Field, value: f64) {
219 #[cfg(feature = "metrics_gauge_unstable")]
220 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
221 self.visited_metrics
222 .push((metric_name, InstrumentType::GaugeF64(value)));
223 return;
224 }
225 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
226 self.visited_metrics
227 .push((metric_name, InstrumentType::CounterF64(value)));
228 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
229 self.visited_metrics
230 .push((metric_name, InstrumentType::UpDownCounterF64(value)));
231 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
232 self.visited_metrics
233 .push((metric_name, InstrumentType::HistogramF64(value)));
234 } else {
235 self.attributes
236 .push(KeyValue::new(field.name(), Value::F64(value)));
237 }
238 }
239
240 fn record_i64(&mut self, field: &Field, value: i64) {
241 #[cfg(feature = "metrics_gauge_unstable")]
242 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
243 self.visited_metrics
244 .push((metric_name, InstrumentType::GaugeI64(value)));
245 return;
246 }
247 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
248 self.visited_metrics
249 .push((metric_name, InstrumentType::CounterU64(value as u64)));
250 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
251 self.visited_metrics
252 .push((metric_name, InstrumentType::UpDownCounterI64(value)));
253 } else {
254 self.attributes.push(KeyValue::new(field.name(), value));
255 }
256 }
257
258 fn record_str(&mut self, field: &Field, value: &str) {
259 self.attributes
260 .push(KeyValue::new(field.name(), value.to_owned()));
261 }
262
263 fn record_bool(&mut self, field: &Field, value: bool) {
264 self.attributes.push(KeyValue::new(field.name(), value));
265 }
266}
267
268#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
385pub struct MetricsLayer<S> {
386 inner: Filtered<InstrumentLayer, MetricsFilter, S>,
387}
388
389impl<S> MetricsLayer<S>
390where
391 S: Subscriber + for<'span> LookupSpan<'span>,
392{
393 pub fn new<M>(meter_provider: M) -> MetricsLayer<S>
395 where
396 M: MeterProvider,
397 {
398 let meter = meter_provider.meter_with_scope(
399 InstrumentationScope::builder(INSTRUMENTATION_LIBRARY_NAME)
400 .with_version(CARGO_PKG_VERSION)
401 .build(),
402 );
403
404 let layer = InstrumentLayer {
405 meter,
406 instruments: Default::default(),
407 };
408
409 MetricsLayer {
410 inner: layer.with_filter(MetricsFilter),
411 }
412 }
413}
414
415struct MetricsFilter;
416
417impl MetricsFilter {
418 fn is_metrics_event(&self, meta: &Metadata<'_>) -> bool {
419 meta.is_event()
420 && meta.fields().iter().any(|field| {
421 let name = field.name();
422
423 if name.starts_with(METRIC_PREFIX_COUNTER)
424 || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
425 || name.starts_with(METRIC_PREFIX_HISTOGRAM)
426 {
427 return true;
428 }
429
430 #[cfg(feature = "metrics_gauge_unstable")]
431 if name.starts_with(METRIC_PREFIX_GAUGE) {
432 return true;
433 }
434
435 false
436 })
437 }
438}
439
440impl<S> Filter<S> for MetricsFilter {
441 fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
442 self.is_metrics_event(meta)
443 }
444
445 fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
446 if self.is_metrics_event(meta) {
447 Interest::always()
448 } else {
449 Interest::never()
450 }
451 }
452}
453
454struct InstrumentLayer {
455 meter: Meter,
456 instruments: Instruments,
457}
458
459impl<S> Layer<S> for InstrumentLayer
460where
461 S: Subscriber + for<'span> LookupSpan<'span>,
462{
463 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
464 let mut attributes = SmallVec::new();
465 let mut visited_metrics = SmallVec::new();
466 let mut metric_visitor = MetricVisitor {
467 attributes: &mut attributes,
468 visited_metrics: &mut visited_metrics,
469 };
470 event.record(&mut metric_visitor);
471
472 visited_metrics
474 .into_iter()
475 .for_each(|(metric_name, value)| {
476 self.instruments.update_metric(
477 &self.meter,
478 value,
479 metric_name,
480 attributes.as_slice(),
481 );
482 })
483 }
484}
485
486impl<S> Layer<S> for MetricsLayer<S>
487where
488 S: Subscriber + for<'span> LookupSpan<'span>,
489{
490 fn on_layer(&mut self, subscriber: &mut S) {
491 self.inner.on_layer(subscriber)
492 }
493
494 fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
495 self.inner.register_callsite(metadata)
496 }
497
498 fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
499 self.inner.enabled(metadata, ctx)
500 }
501
502 fn on_new_span(
503 &self,
504 attrs: &tracing_core::span::Attributes<'_>,
505 id: &tracing_core::span::Id,
506 ctx: Context<'_, S>,
507 ) {
508 self.inner.on_new_span(attrs, id, ctx)
509 }
510
511 fn max_level_hint(&self) -> Option<tracing_core::LevelFilter> {
512 self.inner.max_level_hint()
513 }
514
515 fn on_record(
516 &self,
517 span: &tracing_core::span::Id,
518 values: &tracing_core::span::Record<'_>,
519 ctx: Context<'_, S>,
520 ) {
521 self.inner.on_record(span, values, ctx)
522 }
523
524 fn on_follows_from(
525 &self,
526 span: &tracing_core::span::Id,
527 follows: &tracing_core::span::Id,
528 ctx: Context<'_, S>,
529 ) {
530 self.inner.on_follows_from(span, follows, ctx)
531 }
532
533 fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
534 self.inner.on_event(event, ctx)
535 }
536
537 fn on_enter(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
538 self.inner.on_enter(id, ctx)
539 }
540
541 fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
542 self.inner.on_exit(id, ctx)
543 }
544
545 fn on_close(&self, id: tracing_core::span::Id, ctx: Context<'_, S>) {
546 self.inner.on_close(id, ctx)
547 }
548
549 fn on_id_change(
550 &self,
551 old: &tracing_core::span::Id,
552 new: &tracing_core::span::Id,
553 ctx: Context<'_, S>,
554 ) {
555 self.inner.on_id_change(old, new, ctx)
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use tracing_subscriber::layer::SubscriberExt;
563
564 struct PanicLayer;
565 impl<S> Layer<S> for PanicLayer
566 where
567 S: Subscriber + for<'span> LookupSpan<'span>,
568 {
569 fn on_event(&self, _event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
570 panic!("panic");
571 }
572 }
573
574 #[test]
575 fn filter_layer_should_filter_non_metrics_event() {
576 let layer = PanicLayer.with_filter(MetricsFilter);
577 let subscriber = tracing_subscriber::registry().with(layer);
578
579 tracing::subscriber::with_default(subscriber, || {
580 tracing::info!(key = "val", "foo");
581 });
582 }
583}