tracing_opentelemetry/layer/
filtered.rs1use std::any::TypeId;
2
3use opentelemetry::{trace::TraceContextExt as _, Key, KeyValue, Value};
4use tracing::{span, Event, Subscriber};
5use tracing_subscriber::{
6 layer::{Context, Filter},
7 registry::LookupSpan,
8 Layer,
9};
10
11use crate::{OtelData, OtelDataState};
12
13use super::{OpenTelemetryLayer, SPAN_EVENT_COUNT_FIELD};
14
15pub struct FilteredOpenTelemetryLayer<S, T, F> {
26 inner: OpenTelemetryLayer<S, T>,
27 filter: F,
28}
29
30impl<S, T, F> FilteredOpenTelemetryLayer<S, T, F> {
31 pub fn map_inner<Mapper, S2, T2>(self, mapper: Mapper) -> FilteredOpenTelemetryLayer<S2, T2, F>
32 where
33 Mapper: FnOnce(OpenTelemetryLayer<S, T>) -> OpenTelemetryLayer<S2, T2>,
34 F: Filter<S>,
35 {
36 FilteredOpenTelemetryLayer {
37 inner: mapper(self.inner),
38 filter: self.filter,
39 }
40 }
41
42 pub fn with_counting_event_filter<F2>(self, filter: F2) -> FilteredOpenTelemetryLayer<S, T, F2>
43 where
44 F2: Filter<S>,
45 {
46 FilteredOpenTelemetryLayer {
47 inner: self.inner,
48 filter,
49 }
50 }
51
52 pub(crate) fn new(inner: OpenTelemetryLayer<S, T>, filter: F) -> Self
53 where
54 S: Subscriber + for<'span> LookupSpan<'span>,
55 F: Filter<S>,
56 {
57 Self { inner, filter }
58 }
59}
60
61struct EventCount(u32);
62
63impl<S, T, F> Layer<S> for FilteredOpenTelemetryLayer<S, T, F>
64where
65 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
66 OpenTelemetryLayer<S, T>: Layer<S>,
67 F: Filter<S> + 'static,
68{
69 fn on_layer(&mut self, subscriber: &mut S) {
70 self.inner.on_layer(subscriber);
71 }
72
73 fn register_callsite(
74 &self,
75 metadata: &'static tracing::Metadata<'static>,
76 ) -> tracing_core::Interest {
77 self.inner.register_callsite(metadata)
78 }
79
80 fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {
81 self.inner.enabled(metadata, ctx)
82 }
83
84 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
85 self.inner.on_new_span(attrs, id, ctx);
86 }
87
88 fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
89 self.inner.on_record(span, values, ctx);
90 }
91
92 fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
93 self.inner.on_follows_from(span, follows, ctx);
94 }
95
96 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
97 let Some(span) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {
98 event
99 .is_contextual()
100 .then(|| ctx.lookup_current())
101 .flatten()
102 }) else {
103 return;
104 };
105
106 {
107 let mut extensions = span.extensions_mut();
108
109 if let Some(count) = extensions.get_mut::<EventCount>() {
110 count.0 += 1;
111 } else {
112 extensions.insert(EventCount(1));
113 }
114 }
115
116 drop(span);
117
118 println!("evaluating event with level {}", event.metadata().level());
119 if self.filter.enabled(event.metadata(), &ctx) {
120 println!("processing event with level {}", event.metadata().level());
121 self.inner.on_event(event, ctx);
122 }
123 }
124
125 fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
126 self.inner.on_enter(id, ctx);
127 }
128
129 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
130 self.inner.on_exit(id, ctx);
131 }
132
133 fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
134 let span = ctx.span(&id).expect("Span not found, this is a bug");
135 let mut extensions = span.extensions_mut();
136
137 let count = extensions.remove::<EventCount>().map_or(0, |count| count.0);
138 if let Some(OtelData { state, end_time: _ }) = extensions.get_mut::<OtelData>() {
139 let key_value = KeyValue::new(
140 Key::from_static_str(SPAN_EVENT_COUNT_FIELD),
141 Value::I64(i64::from(count)),
142 );
143 match state {
144 OtelDataState::Builder {
145 builder,
146 parent_cx: _,
147 status: _,
148 } => {
149 builder.attributes.get_or_insert(Vec::new()).push(key_value);
150 }
151 OtelDataState::Context { current_cx } => {
152 let span = current_cx.span();
153 span.set_attribute(key_value);
154 }
155 }
156 }
157
158 drop(extensions);
159 drop(span);
160
161 self.inner.on_close(id, ctx);
162 }
163
164 fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
165 self.inner.on_id_change(old, new, ctx);
166 }
167
168 unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
170 if id == TypeId::of::<Self>() {
171 Some(self as *const _ as *const ())
172 } else {
173 unsafe { self.inner.downcast_raw(id) }
174 }
175 }
176
177 }