tracing_async2/
lib.rs

1//!
2//! This crate makes it easy to create your own custom tracing layers using a
3//! simple callback mechanism. One abvious use is to store tracing events into
4//! a database but you could just as well send them to a downstream service
5//! using gRPC or http. Or, for testing purposes, to accumulate them into a
6//! vector.
7//!
8//! This crate provides a set of tracing layers that can be used to process
9//! [`tracing::Event`]s using simple callbacks or even in an asynchronous
10//! context. This is done using variations the [`CallbackLayer`].
11//!
12//!
13//! # Using the `tracing_layer_async_within` macro
14//!
15//! This macro simplifies some async scenarios where the provided callback was
16//! not `Sync` or `Send`. Here is an example of how you could use this macro to
17//! create a layer that saves tracing events into a database using `tokio_postgres`:
18//!
19//! ```no_run
20//! #[tracing_layer_async_within]
21//! pub fn pg_tracing_layer(client: &PGClient, event: OwnedEvent) -> Result<(), tokio_postgres::Error> {
22//!     // Do what needs to be done!
23//! }
24//! ```
25//!
26//! The above code gets expanded into the code below:
27//!
28//! ```no_run
29//! pub fn pg_tracing_layer(client: PGClient, buffer_size: usize) -> CallbackLayer {
30//!
31//!     let (tx, mut rx) = mpsc::channel::<OwnedEvent>(buffer_size);
32//!
33//!     tokio::spawn(async move {
34//!         let client = Arc::new(client);
35//!         while let Some(event) = rx.recv().await {
36//!             if let Err(e) = save(&client, event).await {
37//!                 eprintln!("{} error: {}", "pg_tracing_layer", e);
38//!             }
39//!         }
40//!     });
41//!
42//!     pub async fn save(client: &PGClient, event: OwnedEvent) -> Result<(), tokio_postgres::Error> {
43//!         // Do what needs to be done!
44//!     }
45//!
46//!     channel_layer(tx)
47//! }
48//! ```
49//!
50//! Of note are the following:
51//! - The `PGClient` was declared as a reference but the generated returned function requires it to be owned.
52//! - `buffer_size` is an additional parameter to the generated function.
53//!
54//!
55//!
56//! # Using `callback_layer`
57//!
58//! If your needs are really simple, like accumulating traces in a vector.
59//! Then you can use the [`callback_layer`]:
60//!
61//! ```rust
62//! use tracing_async2::{callback_layer, OwnedEvent};
63//! use tracing_subscriber::{EnvFilter, prelude::*};
64//! use std::sync::{Arc, RwLock};
65//!
66//! let log = Arc::new(RwLock::new(Vec::new()));
67//! let cb_log = log.clone();
68//!
69//! tracing_subscriber::registry()
70//!     .with(EnvFilter::new("tracing_async2=trace"))
71//!     .with(callback_layer(move |event| {
72//!         if let Ok(mut log) = cb_log.write() {
73//!             log.push(OwnedEvent::from(event));
74//!         }
75//!     }))
76//!     .init();
77//! ```
78//!
79//!
80//! # Using `channel_layer`
81//!
82//! If you ever had the need to record traces in a database or do something
83//! asynchronously with [`tracing::Event`], then you can use the
84//! [`channel_layer`]:
85//!
86//! ```rust
87//! use tracing_async2::channel_layer;
88//! use tracing_subscriber::{EnvFilter, prelude::*};
89//! use tokio::{sync::mpsc, runtime, task};
90//!
91//! let rt = tokio::runtime::Builder::new_current_thread()
92//!     .build()
93//!     .expect("could not start tokio runtime");
94//!
95//! rt.block_on(async move {
96//!
97//!     let (tx, mut rx) = mpsc::channel(100);
98//!     tracing_subscriber::registry()
99//!         .with(EnvFilter::new("tracing_async2=trace"))
100//!         .with(channel_layer(tx)) // <--- use the channel
101//!         .init();
102//!
103//!     tokio::spawn(async move {
104//!         while let Some(event) = rx.recv().await {
105//!             // Do something with the event like saving it to the database.
106//!         }
107//!     });
108//! });
109//! ```
110//!
111//! # Using `async_layer`
112//!
113//! If you don't want to handle the channel yourself, then you might consider
114//! the use of [`async_layer`] instead:
115//!
116//! ```rust
117//! use tracing_async2::async_layer;
118//! use tracing_subscriber::{EnvFilter, prelude::*};
119//! use tokio::{sync::mpsc, runtime, task};
120//!
121//! let rt = tokio::runtime::Builder::new_current_thread()
122//!     .build()
123//!     .expect("could not start tokio runtime");
124//!
125//! rt.block_on(async move {
126//!
127//!     tracing_subscriber::registry()
128//!         .with(EnvFilter::new("tracing_async2=trace"))
129//!         .with(async_layer(16, move |event| {
130//!             async move {
131//!                 // Do something with the event like saving it to the database.
132//!             }
133//!         }))
134//!         .init();
135//! });
136//! ```
137//!
138#[cfg(feature = "async")]
139use tokio::sync::mpsc::{self, error::TrySendError};
140
141#[cfg(feature = "accumulator")]
142use std::sync::{Arc, RwLock};
143
144use {
145    std::fmt::{self, Display},
146    tracing::{
147        Event, Subscriber,
148        field::{Field, Visit},
149        span,
150    },
151    tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
152};
153
154#[cfg(feature = "macros")]
155pub use tracing_async2_macros::*;
156
157#[cfg(feature = "span")]
158type JsonMap = serde_json::Map<String, serde_json::Value>;
159
160///
161/// Returns a simple callback layer that will call the provided callback
162/// on each relevant event.
163///
164pub fn callback_layer<F>(callback: F) -> CallbackLayer
165where
166    F: Fn(&Event<'_>) + Send + Sync + 'static,
167{
168    CallbackLayer::new(callback)
169}
170
171///
172/// Returns a simple callback layer that will call the provided callback
173/// on each relevant event and will include parent spans.
174///
175#[cfg(feature = "span")]
176pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
177where
178    F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
179{
180    CallbackLayerWithSpan::new(callback)
181}
182
183///
184/// Returns a layer that will send an [`OwnedEvent`] on the provided channel
185/// on each relevant event.
186///
187#[cfg(feature = "async")]
188pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
189    CallbackLayer::new(move |event: &Event<'_>| {
190        tx.try_send(event.into()).ok();
191    })
192}
193
194///
195/// Returns a layer that will send an [`OwnedEvent`] on the provided channel
196/// on each relevant event along with a vector of parent spans.
197///
198#[cfg(all(feature = "async", feature = "span"))]
199pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
200    CallbackLayerWithSpan::new(move |event, spans| {
201        if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
202            match e {
203                TrySendError::Full(o) => {
204                    eprintln!("dropping tracing event: {:?}", o);
205                }
206                TrySendError::Closed(o) => {
207                    eprintln!("channel closed for tracing event: {:?}", o);
208                }
209            }
210        }
211    })
212}
213
214///
215/// Returns a layer that will call an async callback on each relevant event.
216/// This type of layer can be used, for example, to save tracing events into
217/// a database.
218///
219/// Note that this is NOT an async closure but a normal function that
220/// returns a future. In practice that will often be a function whose return
221/// value is an async block that is not awaited.
222///
223#[cfg(feature = "async")]
224pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
225where
226    F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
227    Fut: Future<Output = ()> + Send + Sync + 'static,
228{
229    let (tx, mut rx) = mpsc::channel(buffer_size);
230    tokio::spawn(async move {
231        while let Some(event) = rx.recv().await {
232            callback(event).await;
233        }
234    });
235    channel_layer(tx)
236}
237
238///
239/// Returns a layer that will call an async callback on each relevant event
240/// along with parent spans. This type of layer can be used, for example,
241/// to save tracing events into a database.
242///
243/// Note that this is NOT an async closure but a normal function that
244/// returns a future. In practice that will often be a function whose return
245/// value is an async block that is not awaited.
246///
247#[cfg(all(feature = "async", feature = "span"))]
248pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
249where
250    F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
251    Fut: Future<Output = ()> + Send + 'static,
252{
253    let (tx, mut rx) = mpsc::channel(buffer_size);
254    tokio::spawn(async move {
255        while let Some(owned_with_span) = rx.recv().await {
256            callback(owned_with_span).await;
257        }
258    });
259    channel_layer_with_spans(tx)
260}
261
262#[cfg(feature = "accumulator")]
263pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
264
265///
266/// Returns a layer that accumulates [`OwnedEvent`] into a shared vector.
267/// This can be useful for testing and achives something similar to what the
268/// tracing_test crate does but without the extra function injections.
269///
270#[cfg(feature = "accumulator")]
271pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
272    CallbackLayer::new(move |event: &Event<'_>| {
273        if let Ok(mut log) = log.write() {
274            log.push(event.into());
275        }
276    })
277}
278
279///
280/// Returns a layer that accumulates [`OwnedEvent`] and parent spans into a
281/// shared vector. This can be useful for testing and achives something similar
282/// to what the tracing_test crate does but without the extra function injections.
283///
284#[cfg(all(feature = "accumulator", feature = "span"))]
285pub fn accumulating_layer_with_spans(
286    log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
287) -> CallbackLayerWithSpan {
288    CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
289        if let Ok(mut log) = log.write() {
290            log.push(OwnedEventWithSpans::new(event, spans));
291        }
292    })
293}
294
295///
296/// [`CallbackLayer`] is the simple but is the basis for all of the functional
297/// variations available in this crate. All it does is execute a callback for
298/// every tracing event after filtering. If you require [`tracing::span::Span`]
299/// information then use the [`CallbackLayerWithSpan`] instead.
300///
301pub struct CallbackLayer {
302    callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
303}
304
305impl CallbackLayer {
306    pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
307        let callback = Box::new(callback);
308        Self { callback }
309    }
310}
311
312impl<S> Layer<S> for CallbackLayer
313where
314    S: Subscriber,
315    S: for<'lookup> LookupSpan<'lookup>,
316{
317    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
318        (self.callback)(event);
319    }
320}
321
322///
323/// [`CallbackLayerWithSpan`] is similar to [`CallbackLayer`] but it also includes parent spans.
324/// Therefor the provided callback must accept two parameters. The callback is executed for
325/// every tracing event after filtering. If you don't require [`span::Span`] information
326/// then use the [`CallbackLayer`] instead.
327///
328#[cfg(feature = "span")]
329pub struct CallbackLayerWithSpan {
330    #[allow(clippy::type_complexity)]
331    callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
332}
333
334#[cfg(feature = "span")]
335impl CallbackLayerWithSpan {
336    pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
337        callback: F,
338    ) -> Self {
339        let callback = Box::new(callback);
340        Self { callback }
341    }
342}
343
344#[cfg(feature = "span")]
345impl<S> Layer<S> for CallbackLayerWithSpan
346where
347    S: Subscriber,
348    S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
349{
350    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
351        let mut visitor = FieldVisitor::default();
352        attrs.record(&mut visitor);
353        if let Some(span) = ctx.span(id) {
354            span.extensions_mut().insert(visitor);
355        }
356    }
357    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
358        let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
359            scope
360                .from_root()
361                .map(|span| {
362                    let fields: Option<JsonMap> = span
363                        .extensions()
364                        .get::<FieldVisitor>()
365                        .cloned()
366                        .map(|FieldVisitor(json_map)| json_map);
367
368                    let meta = span.metadata();
369                    let mut map = JsonMap::default();
370                    map.insert("level".into(), format!("{}", meta.level()).into());
371                    map.insert("file".into(), meta.file().into());
372                    map.insert("target".into(), meta.target().into());
373                    map.insert("line".into(), meta.line().into());
374                    map.insert("name".into(), span.name().into());
375                    map.insert("fields".into(), fields.into());
376                    map
377                })
378                .collect()
379        });
380        (self.callback)(event, spans);
381    }
382}
383
384///
385/// # OwnedEvent
386///
387/// [`OwnedEvent`] is the an owned version of the [`Event`], which
388/// is not `Send`. Our event must be `Send` to be sent on a channel.
389///
390#[derive(Debug, Clone, serde::Serialize)]
391pub struct OwnedEvent {
392    pub level: TracingLevel,
393    pub file: Option<String>,
394    pub target: String,
395    pub line: Option<u32>,
396    pub name: &'static str,
397    pub message: Option<String>,
398    pub fields: serde_json::Map<String, serde_json::Value>,
399}
400
401///
402/// # OwnedEvent
403///
404/// [`OwnedEventWithSpan`] is the an owned version of the [`Event`],
405/// which is not `Send` with their accompanying parent spans.
406///
407#[derive(Debug, Clone, serde::Serialize)]
408pub struct OwnedEventWithSpans {
409    pub event: OwnedEvent,
410    pub spans: Option<Vec<JsonMap>>,
411}
412
413impl OwnedEventWithSpans {
414    pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
415        OwnedEventWithSpans {
416            event: event.into(),
417            spans,
418        }
419    }
420}
421
422///
423/// Same as [`tracing::Level`] but serializable.
424///
425#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
426pub enum TracingLevel {
427    Trace = 0,
428    Debug = 1,
429    Info = 2,
430    Warn = 3,
431    Error = 4,
432}
433
434impl From<&tracing::Level> for TracingLevel {
435    fn from(value: &tracing::Level) -> Self {
436        match *value {
437            tracing::Level::TRACE => TracingLevel::Trace,
438            tracing::Level::DEBUG => TracingLevel::Debug,
439            tracing::Level::INFO => TracingLevel::Info,
440            tracing::Level::WARN => TracingLevel::Warn,
441            tracing::Level::ERROR => TracingLevel::Error,
442        }
443    }
444}
445
446impl AsRef<str> for TracingLevel {
447    fn as_ref(&self) -> &str {
448        use TracingLevel::*;
449        match self {
450            Trace => "TRACE",
451            Debug => "DEBUG",
452            Info => "INFO",
453            Warn => "WARN",
454            Error => "ERROR",
455        }
456    }
457}
458
459impl Display for TracingLevel {
460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461        write!(f, "{}", self.as_ref())
462    }
463}
464
465///
466/// Converts a [`tracing::Event`] to a [`OwnedEvent`].
467///
468/// The conversion essentially makes the `Event` an owned value,
469/// which is necessary for it to be `Send` and hence to be sent on a channel.
470///
471impl From<&Event<'_>> for OwnedEvent {
472    fn from(event: &Event<'_>) -> Self {
473        let mut visitor = FieldVisitor::default();
474        event.record(&mut visitor);
475
476        // We make an exception for the message field because it's usually
477        // the one we are always interested in.
478        let message = visitor.0.remove("message").and_then(|v| {
479            if let serde_json::Value::String(s) = v {
480                Some(s)
481            } else {
482                None
483            }
484        });
485
486        let meta = event.metadata();
487        Self {
488            name: meta.name(),
489            target: meta.target().into(),
490            level: meta.level().into(),
491            file: meta.file().map(String::from),
492            line: meta.line(),
493            message,
494            fields: visitor.0,
495        }
496    }
497}
498
499///
500/// # FieldVisitor
501///
502/// Private structure that allows us to collect field values into as Json map.
503///
504#[cfg(feature = "span")]
505#[derive(Default, Clone)]
506struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
507
508impl Visit for FieldVisitor {
509    fn record_bool(&mut self, field: &Field, value: bool) {
510        self.0.insert(field.name().into(), value.into());
511    }
512
513    fn record_f64(&mut self, field: &Field, value: f64) {
514        self.0.insert(field.name().into(), value.into());
515    }
516    fn record_i64(&mut self, field: &Field, value: i64) {
517        self.0.insert(field.name().into(), value.into());
518    }
519    fn record_u64(&mut self, field: &Field, value: u64) {
520        self.0.insert(field.name().into(), value.into());
521    }
522    fn record_str(&mut self, field: &Field, value: &str) {
523        self.0.insert(field.name().into(), value.into());
524    }
525    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
526        let text = format!("{:?}", value);
527        self.0.insert(field.name().into(), text.into());
528    }
529}
530
531// ----------------------------------------------------------------------------
532//
533// Unit Testing
534//
535// ----------------------------------------------------------------------------
536
537#[cfg(test)]
538mod tests {
539
540    #[cfg(feature = "async")]
541    use {
542        std::sync::{Arc, RwLock},
543        tokio::sync::mpsc,
544    };
545
546    use {
547        super::*,
548        insta::{
549            internals::{Content, ContentPath},
550            *,
551        },
552        tracing_subscriber::{EnvFilter, prelude::*},
553    };
554
555    // ----------------------------------------------------------------------------
556    // Helper Functions
557    // ----------------------------------------------------------------------------
558
559    fn run_trace_events() {
560        let span = tracing::info_span!("root-info", recurse = 0);
561        span.in_scope(|| {
562            tracing::trace!(foo = 1, bar = 2, "this is a trace message");
563            tracing::debug!(pi = 3.14159265, "this is a debug message");
564            tracing::info!(job = "foo", "this is an info message");
565            tracing::warn!(job = "foo", "this is a warning message");
566            tracing::error!(job = "foo", "this is an error message");
567        });
568    }
569
570    fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
571        let events = logs.read().expect("could not read events");
572        events.clone()
573    }
574
575    fn redact_name(value: Content, _path: ContentPath) -> String {
576        let s = value.as_str().unwrap_or_default();
577        if s.contains(":") {
578            s.split_once(":")
579                .map(|p| format!("{}:<line>", p.0))
580                .unwrap_or_default()
581        } else {
582            s.to_string()
583        }
584    }
585
586    // ----------------------------------------------------------------------------
587    // Tests
588    // ----------------------------------------------------------------------------
589
590    #[test]
591    fn test_callback_layer() {
592        //
593        // Collect logs into a vector
594        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
595        let cb_events = events.clone();
596
597        let _guard = tracing_subscriber::registry()
598            .with(EnvFilter::new("tracing_async2=trace"))
599            .with(callback_layer(move |event| {
600                if let Ok(mut events) = cb_events.write() {
601                    events.push(event.into());
602                }
603            }))
604            .set_default();
605
606        run_trace_events();
607
608        assert_json_snapshot!("callback-layer", extract_events(&events), {
609            "[].line" => "<line>",
610            "[].name" => dynamic_redaction(redact_name),
611        });
612    }
613
614    #[cfg(feature = "span")]
615    #[test]
616    fn test_callback_layer_with_spans() {
617        //
618        // Collect logs into a vector
619        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
620        let cb_events = events.clone();
621
622        let _guard = tracing_subscriber::registry()
623            .with(EnvFilter::new("tracing_async2=trace"))
624            .with(callback_layer_with_spans(move |event, spans| {
625                if let Ok(mut events) = cb_events.write() {
626                    events.push(OwnedEventWithSpans::new(event, spans));
627                }
628            }))
629            .set_default();
630
631        run_trace_events();
632
633        assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
634            "[].event.line" => "<line>",
635            "[].event.name" => dynamic_redaction(redact_name),
636            "[].spans[].line" => "<line>",
637            "[].spans[].name" => dynamic_redaction(redact_name),
638        });
639    }
640
641    #[cfg(feature = "async")]
642    #[tokio::test]
643    async fn test_channel_layer() {
644        //
645        use std::time::Duration;
646        use tokio::time::sleep;
647
648        //
649        // Collect logs into a vector
650        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
651        let cb_events = events.clone();
652
653        let (tx, mut rx) = mpsc::channel(100);
654
655        let _guard = tracing_subscriber::registry()
656            .with(EnvFilter::new("tracing_async2=trace"))
657            .with(channel_layer(tx))
658            .set_default();
659
660        let handle = tokio::spawn(async move {
661            while let Some(event) = rx.recv().await {
662                if let Ok(mut events) = cb_events.write() {
663                    events.push(event);
664                }
665            }
666        });
667
668        run_trace_events();
669        sleep(Duration::from_millis(100)).await;
670        handle.abort();
671
672        assert_json_snapshot!("channel-layer", extract_events(&events), {
673            "[].line" => "<line>",
674            "[].name" => dynamic_redaction(redact_name),
675        });
676    }
677
678    #[cfg(all(feature = "async", feature = "span"))]
679    #[tokio::test]
680    async fn test_channel_layer_with_spans() {
681        //
682        use std::time::Duration;
683        use tokio::time::sleep;
684
685        //
686        // Collect logs into a vector
687        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
688        let cb_events = events.clone();
689
690        let (tx, mut rx) = mpsc::channel(100);
691
692        let _guard = tracing_subscriber::registry()
693            .with(EnvFilter::new("tracing_async2=trace"))
694            .with(channel_layer_with_spans(tx))
695            .set_default();
696
697        let handle = tokio::spawn(async move {
698            while let Some(event) = rx.recv().await {
699                if let Ok(mut events) = cb_events.write() {
700                    events.push(event);
701                }
702            }
703        });
704
705        run_trace_events();
706        sleep(Duration::from_millis(100)).await;
707        handle.abort();
708
709        assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
710            "[].event.line" => "<line>",
711            "[].event.name" => dynamic_redaction(redact_name),
712            "[].spans[].line" => "<line>",
713            "[].spans[].name" => dynamic_redaction(redact_name),
714        });
715    }
716
717    #[cfg(feature = "async")]
718    #[tokio::test]
719    async fn test_async_layer() {
720        //
721        use std::time::Duration;
722        use tokio::time::sleep;
723
724        //
725        // Collect logs into a vector
726        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
727        let cb_events = events.clone();
728
729        let _guard = tracing_subscriber::registry()
730            .with(EnvFilter::new("tracing_async2=trace"))
731            .with(async_layer(16, move |event| {
732                let f_events = cb_events.clone();
733                async move {
734                    if let Ok(mut events) = f_events.write() {
735                        events.push(event);
736                    }
737                }
738            }))
739            .set_default();
740
741        run_trace_events();
742        sleep(Duration::from_millis(100)).await;
743
744        assert_json_snapshot!("async-layer", extract_events(&events), {
745            "[].line" => "<line>",
746            "[].name" => dynamic_redaction(redact_name),
747        });
748    }
749
750    #[cfg(all(feature = "async", feature = "span"))]
751    #[tokio::test]
752    async fn test_async_layer_with_spans() {
753        //
754        use std::time::Duration;
755        use tokio::time::sleep;
756
757        //
758        // Collect logs into a vector
759        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
760        let cb_events = events.clone();
761
762        let _guard = tracing_subscriber::registry()
763            .with(EnvFilter::new("tracing_async2=trace"))
764            .with(async_layer_with_spans(16, move |event_with_span| {
765                let f_events = cb_events.clone();
766                async move {
767                    if let Ok(mut events) = f_events.write() {
768                        events.push(event_with_span);
769                    }
770                }
771            }))
772            .set_default();
773
774        run_trace_events();
775        sleep(Duration::from_millis(100)).await;
776
777        assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
778            "[].event.line" => "<line>",
779            "[].event.name" => dynamic_redaction(redact_name),
780            "[].spans[].line" => "<line>",
781            "[].spans[].name" => dynamic_redaction(redact_name),
782        });
783    }
784
785    #[cfg(feature = "accumulator")]
786    #[test]
787    fn test_accumulating_layer() {
788        //
789        // Collect logs into a vector
790        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
791
792        let _guard = tracing_subscriber::registry()
793            .with(EnvFilter::new("tracing_async2=trace"))
794            .with(accumulating_layer(events.clone()))
795            .set_default();
796
797        run_trace_events();
798
799        assert_json_snapshot!("accumulating-layer", extract_events(&events), {
800            "[].line" => "<line>",
801            "[].name" => dynamic_redaction(redact_name),
802        });
803    }
804
805    #[cfg(all(feature = "span", feature = "accumulator"))]
806    #[test]
807    fn test_accumulating_layer_with_spans() {
808        //
809        // Collect logs into a vector
810        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
811
812        let _guard = tracing_subscriber::registry()
813            .with(EnvFilter::new("tracing_async2=trace"))
814            .with(accumulating_layer_with_spans(events.clone()))
815            .set_default();
816
817        run_trace_events();
818
819        assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
820            "[].event.line" => "<line>",
821            "[].event.name" => dynamic_redaction(redact_name),
822            "[].spans[].line" => "<line>",
823            "[].spans[].name" => dynamic_redaction(redact_name),
824        });
825    }
826}