tracing_async2/
lib.rs

1//!
2//! This crate makes it easy to create your own custom tracing layers using a
3//! simple callback mechanism. One abvious use is to store tracing events into
4//! a database but you could just as well send them to a downstream service
5//! using gRPC or http. Or, for testing purposes, to accumulate them into a
6//! vector.
7//!
8//! This crate provides a set of tracing layers that can be used to process
9//! [`tracing::Event`]s using simple callbacks or even in an asynchronous
10//! context. This is done using variations the [`CallbackLayer`].
11//!
12//! # Using `callback_layer`
13//!
14//! If your needs are really simple, like accumulating traces in a vector.
15//! Then you can use the [`callback_layer`]:
16//!
17//! ```rust
18//! use tracing_async2::{callback_layer, OwnedEvent};
19//! use tracing_subscriber::{EnvFilter, prelude::*};
20//! use std::sync::{Arc, RwLock};
21//!
22//! let log = Arc::new(RwLock::new(Vec::new()));
23//! let cb_log = log.clone();
24//!
25//! tracing_subscriber::registry()
26//!     .with(EnvFilter::new("tracing_async2=trace"))
27//!     .with(callback_layer(move |event| {
28//!         if let Ok(mut log) = cb_log.write() {
29//!             log.push(OwnedEvent::from(event));
30//!         }
31//!     }))
32//!     .init();
33//! ```
34//!
35//!
36//! # Using `channel_layer`
37//!
38//! If you ever had the need to record traces in a database or do something
39//! asynchronously with [`tracing::Event`], then you can use the
40//! [`channel_layer`]:
41//!
42//! ```rust
43//! use tracing_async2::channel_layer;
44//! use tracing_subscriber::{EnvFilter, prelude::*};
45//! use tokio::{sync::mpsc, runtime, task};
46//!
47//! let rt = tokio::runtime::Builder::new_current_thread()
48//!     .build()
49//!     .expect("could not start tokio runtime");
50//!
51//! rt.block_on(async move {
52//!
53//!     let (tx, mut rx) = mpsc::channel(100);
54//!     tracing_subscriber::registry()
55//!         .with(EnvFilter::new("tracing_async2=trace"))
56//!         .with(channel_layer(tx)) // <--- use the channel
57//!         .init();
58//!
59//!     tokio::spawn(async move {
60//!         while let Some(event) = rx.recv().await {
61//!             // Do something with the event like saving it to the database.
62//!         }
63//!     });
64//! });
65//! ```
66//!
67//! # Using `async_layer`
68//!
69//! If you don't want to handle the channel yourself, then you might consider
70//! the use of [`async_layer`] instead:
71//!
72//! ```rust
73//! use tracing_async2::async_layer;
74//! use tracing_subscriber::{EnvFilter, prelude::*};
75//! use tokio::{sync::mpsc, runtime, task};
76//!
77//! let rt = tokio::runtime::Builder::new_current_thread()
78//!     .build()
79//!     .expect("could not start tokio runtime");
80//!
81//! rt.block_on(async move {
82//!
83//!     tracing_subscriber::registry()
84//!         .with(EnvFilter::new("tracing_async2=trace"))
85//!         .with(async_layer(16, move |event| {
86//!             async move {
87//!                 // Do something with the event like saving it to the database.
88//!             }
89//!         }))
90//!         .init();
91//! });
92//! ```
93//!
94#[cfg(feature = "async")]
95use tokio::sync::mpsc::{self, error::TrySendError};
96
97#[cfg(feature = "accumulator")]
98use std::sync::{Arc, RwLock};
99
100use {
101    std::fmt,
102    tracing::{
103        Event, Subscriber,
104        field::{Field, Visit},
105        span,
106    },
107    tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
108};
109
110#[cfg(feature = "span")]
111type JsonMap = serde_json::Map<String, serde_json::Value>;
112
113///
114/// Returns a simple callback layer that will call the provided callback
115/// on each relevant event.
116///
117pub fn callback_layer<F>(callback: F) -> CallbackLayer
118where
119    F: Fn(&Event<'_>) + Send + Sync + 'static,
120{
121    CallbackLayer::new(callback)
122}
123
124///
125/// Returns a simple callback layer that will call the provided callback
126/// on each relevant event and will include parent spans.
127///
128#[cfg(feature = "span")]
129pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
130where
131    F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
132{
133    CallbackLayerWithSpan::new(callback)
134}
135
136///
137/// Returns a layer that will send an [`OwnedEvent`] on the provided channel
138/// on each relevant event.
139///
140#[cfg(feature = "async")]
141pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
142    CallbackLayer::new(move |event: &Event<'_>| {
143        tx.try_send(event.into()).ok();
144    })
145}
146
147///
148/// Returns a layer that will send an [`OwnedEvent`] on the provided channel
149/// on each relevant event along with a vector of parent spans.
150///
151#[cfg(all(feature = "async", feature = "span"))]
152pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
153    CallbackLayerWithSpan::new(move |event, spans| {
154        if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
155            match e {
156                TrySendError::Full(o) => {
157                    eprintln!("dropping tracing event: {:?}", o);
158                }
159                TrySendError::Closed(o) => {
160                    eprintln!("channel closed for tracing event: {:?}", o);
161                }
162            }
163        }
164    })
165}
166
167///
168/// Returns a layer that will call an async callback on each relevant event.
169/// This type of layer can be used, for example, to save tracing events into
170/// a database.
171///
172/// Note that this is NOT an async closure but a normal function that
173/// returns a future. In practice that will often be a function whose return
174/// value is an async block that is not awaited.
175///
176#[cfg(feature = "async")]
177pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
178where
179    F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
180    Fut: Future<Output = ()> + Send + Sync + 'static,
181{
182    let (tx, mut rx) = mpsc::channel(buffer_size);
183    tokio::spawn(async move {
184        while let Some(event) = rx.recv().await {
185            callback(OwnedEvent::from(event)).await;
186        }
187    });
188    channel_layer(tx)
189}
190
191///
192/// Returns a layer that will call an async callback on each relevant event
193/// along with parent spans. This type of layer can be used, for example,
194/// to save tracing events into a database.
195///
196/// Note that this is NOT an async closure but a normal function that
197/// returns a future. In practice that will often be a function whose return
198/// value is an async block that is not awaited.
199///
200#[cfg(all(feature = "async", feature = "span"))]
201pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
202where
203    F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
204    Fut: Future<Output = ()> + Send + 'static,
205{
206    let (tx, mut rx) = mpsc::channel(buffer_size);
207    tokio::spawn(async move {
208        while let Some(owned_with_span) = rx.recv().await {
209            callback(owned_with_span).await;
210        }
211    });
212    channel_layer_with_spans(tx)
213}
214
215#[cfg(feature = "accumulator")]
216pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
217
218///
219/// Returns a layer that accumulates [`OwnedEvent`] into a shared vector.
220/// This can be useful for testing and achives something similar to what the
221/// tracing_test crate does but without the extra function injections.
222///
223#[cfg(feature = "accumulator")]
224pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
225    CallbackLayer::new(move |event: &Event<'_>| {
226        if let Ok(mut log) = log.write() {
227            log.push(event.into());
228        }
229    })
230}
231
232///
233/// Returns a layer that accumulates [`OwnedEvent`] and parent spans into a
234/// shared vector. This can be useful for testing and achives something similar
235/// to what the tracing_test crate does but without the extra function injections.
236///
237#[cfg(all(feature = "accumulator", feature = "span"))]
238pub fn accumulating_layer_with_spans(
239    log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
240) -> CallbackLayerWithSpan {
241    CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
242        if let Ok(mut log) = log.write() {
243            log.push(OwnedEventWithSpans::new(event, spans));
244        }
245    })
246}
247
248///
249/// [`CallbackLayer`] is the simple but is the basis for all of the functional
250/// variations available in this crate. All it does is execute a callback for
251/// every tracing event after filtering. If you require [`tracing::span::Span`]
252/// information then use the [`CallbackLayerWithSpan`] instead.
253///
254pub struct CallbackLayer {
255    callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
256}
257
258impl CallbackLayer {
259    pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
260        let callback = Box::new(callback);
261        Self { callback }
262    }
263}
264
265impl<S> Layer<S> for CallbackLayer
266where
267    S: Subscriber,
268    S: for<'lookup> LookupSpan<'lookup>,
269{
270    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
271        (self.callback)(event);
272    }
273}
274
275///
276/// [`CallbackLayerWithSpan`] is similar to [`CallbackLayer`] but it also includes parent spans.
277/// Therefor the provided callback must accept two parameters. The callback is executed for
278/// every tracing event after filtering. If you don't require [`span::Span`] information
279/// then use the [`CallbackLayer`] instead.
280///
281#[cfg(feature = "span")]
282pub struct CallbackLayerWithSpan {
283    callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
284}
285
286#[cfg(feature = "span")]
287impl CallbackLayerWithSpan {
288    pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
289        callback: F,
290    ) -> Self {
291        let callback = Box::new(callback);
292        Self { callback }
293    }
294}
295
296#[cfg(feature = "span")]
297impl<S> Layer<S> for CallbackLayerWithSpan
298where
299    S: Subscriber,
300    S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
301{
302    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
303        let mut visitor = FieldVisitor::default();
304        attrs.record(&mut visitor);
305        if let Some(span) = ctx.span(id) {
306            span.extensions_mut().insert(visitor);
307        }
308    }
309    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
310        let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
311            scope
312                .from_root()
313                .map(|span| {
314                    let fields: Option<JsonMap> = span
315                        .extensions()
316                        .get::<FieldVisitor>()
317                        .cloned()
318                        .map(|FieldVisitor(json_map)| json_map);
319
320                    let meta = span.metadata();
321                    let mut map = JsonMap::default();
322                    map.insert("level".into(), format!("{}", meta.level()).into());
323                    map.insert("file".into(), meta.file().into());
324                    map.insert("target".into(), meta.target().into());
325                    map.insert("line".into(), meta.line().into());
326                    map.insert("name".into(), span.name().into());
327                    map.insert("fields".into(), fields.into());
328                    map
329                })
330                .collect()
331        });
332        (self.callback)(event, spans);
333    }
334}
335
336///
337/// # OwnedEvent
338///
339/// [`OwnedEvent`] is the an owned version of the [`Event`], which
340/// is not `Send`. Our event must be `Send` to be sent on a channel.
341///
342#[derive(Debug, Clone, serde::Serialize)]
343pub struct OwnedEvent {
344    pub level: TracingLevel,
345    pub file: Option<String>,
346    pub target: String,
347    pub line: Option<u32>,
348    pub name: &'static str,
349    pub message: Option<String>,
350    pub fields: serde_json::Map<String, serde_json::Value>,
351}
352
353///
354/// # OwnedEvent
355///
356/// [`OwnedEventWithSpan`] is the an owned version of the [`Event`],
357/// which is not `Send` with their accompanying parent spans.
358///
359#[derive(Debug, Clone, serde::Serialize)]
360pub struct OwnedEventWithSpans {
361    pub event: OwnedEvent,
362    pub spans: Option<Vec<JsonMap>>,
363}
364
365impl OwnedEventWithSpans {
366    pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
367        OwnedEventWithSpans {
368            event: event.into(),
369            spans,
370        }
371    }
372}
373
374///
375/// Same as [`tracing::Level`] but serializable.
376///
377#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
378pub enum TracingLevel {
379    Trace = 0,
380    Debug = 1,
381    Info = 2,
382    Warn = 3,
383    Error = 4,
384}
385
386impl From<&tracing::Level> for TracingLevel {
387    fn from(value: &tracing::Level) -> Self {
388        match *value {
389            tracing::Level::TRACE => TracingLevel::Trace,
390            tracing::Level::DEBUG => TracingLevel::Debug,
391            tracing::Level::INFO => TracingLevel::Info,
392            tracing::Level::WARN => TracingLevel::Warn,
393            tracing::Level::ERROR => TracingLevel::Error,
394        }
395    }
396}
397
398///
399/// Converts a [`tracing::Event`] to a [`OwnedEvent`].
400///
401/// The conversion essentially makes the `Event` an owned value,
402/// which is necessary for it to be `Send` and hence to be sent on a channel.
403///
404impl From<&Event<'_>> for OwnedEvent {
405    fn from(event: &Event<'_>) -> Self {
406        let mut visitor = FieldVisitor::default();
407        event.record(&mut visitor);
408
409        // We make an exception for the message field because it's usually
410        // the one we are always interested in.
411        let message = visitor.0.remove("message").and_then(|v| {
412            if let serde_json::Value::String(s) = v {
413                Some(s)
414            } else {
415                None
416            }
417        });
418
419        let meta = event.metadata();
420        Self {
421            name: meta.name(),
422            target: meta.target().into(),
423            level: meta.level().into(),
424            file: meta.file().map(String::from),
425            line: meta.line().clone(),
426            message,
427            fields: visitor.0,
428        }
429    }
430}
431
432///
433/// # FieldVisitor
434///
435/// Private structure that allows us to collect field values into as Json map.
436///
437#[cfg(feature = "span")]
438#[derive(Default, Clone)]
439struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
440
441impl Visit for FieldVisitor {
442    fn record_bool(&mut self, field: &Field, value: bool) {
443        self.0.insert(field.name().into(), value.into());
444    }
445
446    fn record_f64(&mut self, field: &Field, value: f64) {
447        self.0.insert(field.name().into(), value.into());
448    }
449    fn record_i64(&mut self, field: &Field, value: i64) {
450        self.0.insert(field.name().into(), value.into());
451    }
452    fn record_u64(&mut self, field: &Field, value: u64) {
453        self.0.insert(field.name().into(), value.into());
454    }
455    fn record_str(&mut self, field: &Field, value: &str) {
456        self.0.insert(field.name().into(), value.into());
457    }
458    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
459        let text = format!("{:?}", value);
460        self.0.insert(field.name().into(), text.into());
461    }
462}
463
464// ----------------------------------------------------------------------------
465//
466// Unit Testing
467//
468// ----------------------------------------------------------------------------
469
470#[cfg(test)]
471mod tests {
472
473    use {
474        super::*,
475        insta::{
476            internals::{Content, ContentPath},
477            *,
478        },
479        std::sync::{Arc, RwLock},
480        tokio::sync::mpsc,
481        tracing_subscriber::{EnvFilter, prelude::*},
482    };
483
484    // ----------------------------------------------------------------------------
485    // Helper Functions
486    // ----------------------------------------------------------------------------
487
488    fn run_trace_events() {
489        let span = tracing::info_span!("root-info", recurse = 0);
490        span.in_scope(|| {
491            tracing::trace!(foo = 1, bar = 2, "this is a trace message");
492            tracing::debug!(pi = 3.14159265, "this is a debug message");
493            tracing::info!(job = "foo", "this is an info message");
494            tracing::warn!(job = "foo", "this is a warning message");
495            tracing::error!(job = "foo", "this is an error message");
496        });
497    }
498
499    fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
500        let events = logs.read().expect("could not read events");
501        events.clone()
502    }
503
504    fn redact_name(value: Content, _path: ContentPath) -> String {
505        let s = value.as_str().unwrap_or_default();
506        if s.contains(":") {
507            s.split_once(":")
508                .map(|p| format!("{}:<line>", p.0))
509                .unwrap_or_default()
510        } else {
511            s.to_string()
512        }
513    }
514
515    // ----------------------------------------------------------------------------
516    // Tests
517    // ----------------------------------------------------------------------------
518
519    #[tokio::test]
520    async fn test_callback_layer() {
521        //
522        // Collect logs into a vector
523        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
524        let cb_events = events.clone();
525
526        let _guard = tracing_subscriber::registry()
527            .with(EnvFilter::new("tracing_async2=trace"))
528            .with(callback_layer(move |event| {
529                if let Ok(mut events) = cb_events.write() {
530                    events.push(event.into());
531                }
532            }))
533            .set_default();
534
535        run_trace_events();
536
537        assert_json_snapshot!("callback-layer", extract_events(&events), {
538            "[].line" => "<line>",
539            "[].name" => dynamic_redaction(redact_name),
540        });
541    }
542
543    #[cfg(feature = "span")]
544    #[tokio::test]
545    async fn test_callback_layer_with_spans() {
546        //
547        // Collect logs into a vector
548        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
549        let cb_events = events.clone();
550
551        let _guard = tracing_subscriber::registry()
552            .with(EnvFilter::new("tracing_async2=trace"))
553            .with(callback_layer_with_spans(move |event, spans| {
554                if let Ok(mut events) = cb_events.write() {
555                    events.push(OwnedEventWithSpans::new(event, spans));
556                }
557            }))
558            .set_default();
559
560        run_trace_events();
561
562        assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
563            "[].event.line" => "<line>",
564            "[].event.name" => dynamic_redaction(redact_name),
565            "[].spans[].line" => "<line>",
566            "[].spans[].name" => dynamic_redaction(redact_name),
567        });
568    }
569
570    #[tokio::test]
571    async fn test_channel_layer() {
572        //
573        use std::time::Duration;
574        use tokio::time::sleep;
575
576        //
577        // Collect logs into a vector
578        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
579        let cb_events = events.clone();
580
581        let (tx, mut rx) = mpsc::channel(100);
582
583        let _guard = tracing_subscriber::registry()
584            .with(EnvFilter::new("tracing_async2=trace"))
585            .with(channel_layer(tx))
586            .set_default();
587
588        let handle = tokio::spawn(async move {
589            while let Some(event) = rx.recv().await {
590                if let Ok(mut events) = cb_events.write() {
591                    events.push(event);
592                }
593            }
594        });
595
596        run_trace_events();
597        sleep(Duration::from_millis(100)).await;
598        handle.abort();
599
600        assert_json_snapshot!("channel-layer", extract_events(&events), {
601            "[].line" => "<line>",
602            "[].name" => dynamic_redaction(redact_name),
603        });
604    }
605
606    #[cfg(feature = "span")]
607    #[tokio::test]
608    async fn test_channel_layer_with_spans() {
609        //
610        use std::time::Duration;
611        use tokio::time::sleep;
612
613        //
614        // Collect logs into a vector
615        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
616        let cb_events = events.clone();
617
618        let (tx, mut rx) = mpsc::channel(100);
619
620        let _guard = tracing_subscriber::registry()
621            .with(EnvFilter::new("tracing_async2=trace"))
622            .with(channel_layer_with_spans(tx))
623            .set_default();
624
625        let handle = tokio::spawn(async move {
626            while let Some(event) = rx.recv().await {
627                if let Ok(mut events) = cb_events.write() {
628                    events.push(event);
629                }
630            }
631        });
632
633        run_trace_events();
634        sleep(Duration::from_millis(100)).await;
635        handle.abort();
636
637        assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
638            "[].event.line" => "<line>",
639            "[].event.name" => dynamic_redaction(redact_name),
640            "[].spans[].line" => "<line>",
641            "[].spans[].name" => dynamic_redaction(redact_name),
642        });
643    }
644
645    #[tokio::test]
646    async fn test_async_layer() {
647        //
648        use std::time::Duration;
649        use tokio::time::sleep;
650
651        //
652        // Collect logs into a vector
653        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
654        let cb_events = events.clone();
655
656        let _guard = tracing_subscriber::registry()
657            .with(EnvFilter::new("tracing_async2=trace"))
658            .with(async_layer(16, move |event| {
659                let f_events = cb_events.clone();
660                async move {
661                    if let Ok(mut events) = f_events.write() {
662                        events.push(event);
663                    }
664                }
665            }))
666            .set_default();
667
668        run_trace_events();
669        sleep(Duration::from_millis(100)).await;
670
671        assert_json_snapshot!("async-layer", extract_events(&events), {
672            "[].line" => "<line>",
673            "[].name" => dynamic_redaction(redact_name),
674        });
675    }
676
677    #[cfg(feature = "span")]
678    #[tokio::test]
679    async fn test_async_layer_with_spans() {
680        //
681        use std::time::Duration;
682        use tokio::time::sleep;
683
684        //
685        // Collect logs into a vector
686        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
687        let cb_events = events.clone();
688
689        let _guard = tracing_subscriber::registry()
690            .with(EnvFilter::new("tracing_async2=trace"))
691            .with(async_layer_with_spans(16, move |event_with_span| {
692                let f_events = cb_events.clone();
693                async move {
694                    if let Ok(mut events) = f_events.write() {
695                        events.push(event_with_span);
696                    }
697                }
698            }))
699            .set_default();
700
701        run_trace_events();
702        sleep(Duration::from_millis(100)).await;
703
704        assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
705            "[].event.line" => "<line>",
706            "[].event.name" => dynamic_redaction(redact_name),
707            "[].spans[].line" => "<line>",
708            "[].spans[].name" => dynamic_redaction(redact_name),
709        });
710    }
711
712    #[tokio::test]
713    async fn test_accumulating_layer() {
714        //
715        use std::time::Duration;
716        use tokio::time::sleep;
717
718        //
719        // Collect logs into a vector
720        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
721
722        let _guard = tracing_subscriber::registry()
723            .with(EnvFilter::new("tracing_async2=trace"))
724            .with(accumulating_layer(events.clone()))
725            .set_default();
726
727        run_trace_events();
728        sleep(Duration::from_millis(100)).await;
729
730        assert_json_snapshot!("accumulating-layer", extract_events(&events), {
731            "[].line" => "<line>",
732            "[].name" => dynamic_redaction(redact_name),
733        });
734    }
735
736    #[cfg(feature = "span")]
737    #[tokio::test]
738    async fn test_accumulating_layer_with_spans() {
739        //
740        use std::time::Duration;
741        use tokio::time::sleep;
742
743        //
744        // Collect logs into a vector
745        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
746
747        let _guard = tracing_subscriber::registry()
748            .with(EnvFilter::new("tracing_async2=trace"))
749            .with(accumulating_layer_with_spans(events.clone()))
750            .set_default();
751
752        run_trace_events();
753        sleep(Duration::from_millis(100)).await;
754
755        assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
756            "[].event.line" => "<line>",
757            "[].event.name" => dynamic_redaction(redact_name),
758            "[].spans[].line" => "<line>",
759            "[].spans[].name" => dynamic_redaction(redact_name),
760        });
761    }
762}