tracing_opentelemetry/layer/
filtered.rs

1use std::any::TypeId;
2
3use opentelemetry::{trace::TraceContextExt as _, Key, KeyValue, Value};
4use tracing::{span, Event, Subscriber};
5use tracing_subscriber::{
6    layer::{Context, Filter},
7    registry::LookupSpan,
8    Layer,
9};
10
11use crate::{OtelData, OtelDataState};
12
13use super::{OpenTelemetryLayer, SPAN_EVENT_COUNT_FIELD};
14
15/// A layer wrapping a [`OpenTelemetryLayer`], discarding all events filtered out by a given
16/// [`Filter`]. This can be built by calling [`OpenTelemetryLayer::with_counting_event_filter`].
17///
18/// Only events that are not filtered out will be saved as events on the span. All events, including
19/// those filtered out, will be counted and the total will be provided in the
20/// `otel.tracing_event_count` field of the exported span.
21///
22/// This is useful when there is large volume of logs outputted by the application and it would be
23/// too expensive to export all of them as span events, but it is still desirable to have
24/// information whether there is more information in logs for the given span.
25pub struct FilteredOpenTelemetryLayer<S, T, F> {
26    inner: OpenTelemetryLayer<S, T>,
27    filter: F,
28}
29
30impl<S, T, F> FilteredOpenTelemetryLayer<S, T, F> {
31    pub fn map_inner<Mapper, S2, T2>(self, mapper: Mapper) -> FilteredOpenTelemetryLayer<S2, T2, F>
32    where
33        Mapper: FnOnce(OpenTelemetryLayer<S, T>) -> OpenTelemetryLayer<S2, T2>,
34        F: Filter<S>,
35    {
36        FilteredOpenTelemetryLayer {
37            inner: mapper(self.inner),
38            filter: self.filter,
39        }
40    }
41
42    pub fn with_counting_event_filter<F2>(self, filter: F2) -> FilteredOpenTelemetryLayer<S, T, F2>
43    where
44        F2: Filter<S>,
45    {
46        FilteredOpenTelemetryLayer {
47            inner: self.inner,
48            filter,
49        }
50    }
51
52    pub(crate) fn new(inner: OpenTelemetryLayer<S, T>, filter: F) -> Self
53    where
54        S: Subscriber + for<'span> LookupSpan<'span>,
55        F: Filter<S>,
56    {
57        Self { inner, filter }
58    }
59}
60
61struct EventCount(u32);
62
63impl<S, T, F> Layer<S> for FilteredOpenTelemetryLayer<S, T, F>
64where
65    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
66    OpenTelemetryLayer<S, T>: Layer<S>,
67    F: Filter<S> + 'static,
68{
69    fn on_layer(&mut self, subscriber: &mut S) {
70        self.inner.on_layer(subscriber);
71    }
72
73    fn register_callsite(
74        &self,
75        metadata: &'static tracing::Metadata<'static>,
76    ) -> tracing_core::Interest {
77        self.inner.register_callsite(metadata)
78    }
79
80    fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {
81        self.inner.enabled(metadata, ctx)
82    }
83
84    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
85        self.inner.on_new_span(attrs, id, ctx);
86    }
87
88    fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
89        self.inner.on_record(span, values, ctx);
90    }
91
92    fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
93        self.inner.on_follows_from(span, follows, ctx);
94    }
95
96    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
97        let Some(span) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {
98            event
99                .is_contextual()
100                .then(|| ctx.lookup_current())
101                .flatten()
102        }) else {
103            return;
104        };
105
106        {
107            let mut extensions = span.extensions_mut();
108
109            if let Some(count) = extensions.get_mut::<EventCount>() {
110                count.0 += 1;
111            } else {
112                extensions.insert(EventCount(1));
113            }
114        }
115
116        drop(span);
117
118        println!("evaluating event with level {}", event.metadata().level());
119        if self.filter.enabled(event.metadata(), &ctx) {
120            println!("processing event with level {}", event.metadata().level());
121            self.inner.on_event(event, ctx);
122        }
123    }
124
125    fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
126        self.inner.on_enter(id, ctx);
127    }
128
129    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
130        self.inner.on_exit(id, ctx);
131    }
132
133    fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
134        let span = ctx.span(&id).expect("Span not found, this is a bug");
135        let mut extensions = span.extensions_mut();
136
137        let count = extensions.remove::<EventCount>().map_or(0, |count| count.0);
138        if let Some(OtelData { state, end_time: _ }) = extensions.get_mut::<OtelData>() {
139            let key_value = KeyValue::new(
140                Key::from_static_str(SPAN_EVENT_COUNT_FIELD),
141                Value::I64(i64::from(count)),
142            );
143            match state {
144                OtelDataState::Builder {
145                    builder,
146                    parent_cx: _,
147                    status: _,
148                } => {
149                    builder.attributes.get_or_insert(Vec::new()).push(key_value);
150                }
151                OtelDataState::Context { current_cx } => {
152                    let span = current_cx.span();
153                    span.set_attribute(key_value);
154                }
155            }
156        }
157
158        drop(extensions);
159        drop(span);
160
161        self.inner.on_close(id, ctx);
162    }
163
164    fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
165        self.inner.on_id_change(old, new, ctx);
166    }
167
168    /// SAFETY: this is sound as long as the inner implementation is sound.
169    unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
170        if id == TypeId::of::<Self>() {
171            Some(self as *const _ as *const ())
172        } else {
173            unsafe { self.inner.downcast_raw(id) }
174        }
175    }
176
177    // `and_then`, `with_subscriber`, and `with_filter` are not implemented on purpose. Other
178    // methods should probably be implemented manually if there are new provided methods.
179}