tracing_async2/
lib.rs

1//!
2//! This crate makes it easy to create your own custom tracing layers using a
3//! simple callback mechanism. One abvious use is to store tracing events into
4//! a database but you could just as well send them to a downstream service
5//! using gRPC or http. Or, for testing purposes, to accumulate them into a
6//! vector.
7//!
8//! This crate provides a set of tracing layers that can be used to process
9//! [`tracing::Event`]s using simple callbacks or even in an asynchronous
10//! context. This is done using variations the [`CallbackLayer`].
11//!
12//!
13//! # Using the `tracing_layer_async_within` macro
14//!
15//! This macro simplifies some async scenarios where the provided callback was
16//! not `Sync` or `Send`. Here is an example of how you could use this macro to
17//! create a layer that saves tracing events into a database using `tokio_postgres`:
18//!
19//! ```no_run
20//! #[tracing_layer_async_within]
21//! pub fn pg_tracing_layer(client: &PGClient, event: OwnedEvent) -> Result<(), tokio_postgres::Error> {
22//!     // Do what needs to be done!
23//! }
24//! ```
25//!
26//! The above code gets expanded into the code below:
27//!
28//! ```no_run
29//! pub fn pg_tracing_layer(client: PGClient, buffer_size: usize) -> CallbackLayer {
30//!
31//!     let (tx, mut rx) = mpsc::channel::<OwnedEvent>(buffer_size);
32//!
33//!     tokio::spawn(async move {
34//!         let client = Arc::new(client);
35//!         while let Some(event) = rx.recv().await {
36//!             if let Err(e) = save_tracing_event_to_database(&client, event).await {
37//!                 eprintln!("{} error: {}", "pg_tracing_layer", e);
38//!             }
39//!         }
40//!     });
41//!
42//!     pub async fn save(
43//!        client: &Arc<tokio_postgres::Client>,
44//!        event: OwnedEvent,
45//!     ) -> Result<(), tokio_postgres::Error> {
46//!         // Do what needs to be done!
47//!     }
48//!
49//!     channel_layer(tx)
50//! }
51//! ```
52//!
53//! Of note are the following:
54//! - The `PGClient` was declared as a reference but the generated returned function requires it to be owned.
55//! - `buffer_size` is an additional parameter to the generated function.
56//!
57//!
58//!
59//! # Using `callback_layer`
60//!
61//! If your needs are really simple, like accumulating traces in a vector.
62//! Then you can use the [`callback_layer`]:
63//!
64//! ```rust
65//! use tracing_async2::{callback_layer, OwnedEvent};
66//! use tracing_subscriber::{EnvFilter, prelude::*};
67//! use std::sync::{Arc, RwLock};
68//!
69//! let log = Arc::new(RwLock::new(Vec::new()));
70//! let cb_log = log.clone();
71//!
72//! tracing_subscriber::registry()
73//!     .with(EnvFilter::new("tracing_async2=trace"))
74//!     .with(callback_layer(move |event| {
75//!         if let Ok(mut log) = cb_log.write() {
76//!             log.push(OwnedEvent::from(event));
77//!         }
78//!     }))
79//!     .init();
80//! ```
81//!
82//!
83//! # Using `channel_layer`
84//!
85//! If you ever had the need to record traces in a database or do something
86//! asynchronously with [`tracing::Event`], then you can use the
87//! [`channel_layer`]:
88//!
89//! ```rust
90//! use tracing_async2::channel_layer;
91//! use tracing_subscriber::{EnvFilter, prelude::*};
92//! use tokio::{sync::mpsc, runtime, task};
93//!
94//! let rt = tokio::runtime::Builder::new_current_thread()
95//!     .build()
96//!     .expect("could not start tokio runtime");
97//!
98//! rt.block_on(async move {
99//!
100//!     let (tx, mut rx) = mpsc::channel(100);
101//!     tracing_subscriber::registry()
102//!         .with(EnvFilter::new("tracing_async2=trace"))
103//!         .with(channel_layer(tx)) // <--- use the channel
104//!         .init();
105//!
106//!     tokio::spawn(async move {
107//!         while let Some(event) = rx.recv().await {
108//!             // Do something with the event like saving it to the database.
109//!         }
110//!     });
111//! });
112//! ```
113//!
114//! # Using `async_layer`
115//!
116//! If you don't want to handle the channel yourself, then you might consider
117//! the use of [`async_layer`] instead:
118//!
119//! ```rust
120//! use tracing_async2::async_layer;
121//! use tracing_subscriber::{EnvFilter, prelude::*};
122//! use tokio::{sync::mpsc, runtime, task};
123//!
124//! let rt = tokio::runtime::Builder::new_current_thread()
125//!     .build()
126//!     .expect("could not start tokio runtime");
127//!
128//! rt.block_on(async move {
129//!
130//!     tracing_subscriber::registry()
131//!         .with(EnvFilter::new("tracing_async2=trace"))
132//!         .with(async_layer(16, move |event| {
133//!             async move {
134//!                 // Do something with the event like saving it to the database.
135//!             }
136//!         }))
137//!         .init();
138//! });
139//! ```
140//!
141#[cfg(feature = "async")]
142use tokio::sync::mpsc::{self, error::TrySendError};
143
144#[cfg(feature = "accumulator")]
145use std::sync::{Arc, RwLock};
146
147use {
148    std::fmt::{self, Display},
149    tracing::{
150        Event, Subscriber,
151        field::{Field, Visit},
152        span,
153    },
154    tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
155};
156
157#[cfg(feature = "macros")]
158pub use tracing_async2_macros::*;
159
160#[cfg(feature = "span")]
161type JsonMap = serde_json::Map<String, serde_json::Value>;
162
163///
164/// Returns a simple callback layer that will call the provided callback
165/// on each relevant event.
166///
167pub fn callback_layer<F>(callback: F) -> CallbackLayer
168where
169    F: Fn(&Event<'_>) + Send + Sync + 'static,
170{
171    CallbackLayer::new(callback)
172}
173
174///
175/// Returns a simple callback layer that will call the provided callback
176/// on each relevant event and will include parent spans.
177///
178#[cfg(feature = "span")]
179pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
180where
181    F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
182{
183    CallbackLayerWithSpan::new(callback)
184}
185
186///
187/// Returns a layer that will send an [`OwnedEvent`] on the provided channel
188/// on each relevant event.
189///
190#[cfg(feature = "async")]
191pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
192    CallbackLayer::new(move |event: &Event<'_>| {
193        tx.try_send(event.into()).ok();
194    })
195}
196
197///
198/// Returns a layer that will send an [`OwnedEvent`] on the provided channel
199/// on each relevant event along with a vector of parent spans.
200///
201#[cfg(all(feature = "async", feature = "span"))]
202pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
203    CallbackLayerWithSpan::new(move |event, spans| {
204        if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
205            match e {
206                TrySendError::Full(o) => {
207                    eprintln!("dropping tracing event: {:?}", o);
208                }
209                TrySendError::Closed(o) => {
210                    eprintln!("channel closed for tracing event: {:?}", o);
211                }
212            }
213        }
214    })
215}
216
217///
218/// Returns a layer that will call an async callback on each relevant event.
219/// This type of layer can be used, for example, to save tracing events into
220/// a database.
221///
222/// Note that this is NOT an async closure but a normal function that
223/// returns a future. In practice that will often be a function whose return
224/// value is an async block that is not awaited.
225///
226#[cfg(feature = "async")]
227pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
228where
229    F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
230    Fut: Future<Output = ()> + Send + Sync + 'static,
231{
232    let (tx, mut rx) = mpsc::channel(buffer_size);
233    tokio::spawn(async move {
234        while let Some(event) = rx.recv().await {
235            callback(event).await;
236        }
237    });
238    channel_layer(tx)
239}
240
241///
242/// Returns a layer that will call an async callback on each relevant event
243/// along with parent spans. This type of layer can be used, for example,
244/// to save tracing events into a database.
245///
246/// Note that this is NOT an async closure but a normal function that
247/// returns a future. In practice that will often be a function whose return
248/// value is an async block that is not awaited.
249///
250#[cfg(all(feature = "async", feature = "span"))]
251pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
252where
253    F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
254    Fut: Future<Output = ()> + Send + 'static,
255{
256    let (tx, mut rx) = mpsc::channel(buffer_size);
257    tokio::spawn(async move {
258        while let Some(owned_with_span) = rx.recv().await {
259            callback(owned_with_span).await;
260        }
261    });
262    channel_layer_with_spans(tx)
263}
264
265#[cfg(feature = "accumulator")]
266pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
267
268///
269/// Returns a layer that accumulates [`OwnedEvent`] into a shared vector.
270/// This can be useful for testing and achives something similar to what the
271/// tracing_test crate does but without the extra function injections.
272///
273#[cfg(feature = "accumulator")]
274pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
275    CallbackLayer::new(move |event: &Event<'_>| {
276        if let Ok(mut log) = log.write() {
277            log.push(event.into());
278        }
279    })
280}
281
282///
283/// Returns a layer that accumulates [`OwnedEvent`] and parent spans into a
284/// shared vector. This can be useful for testing and achives something similar
285/// to what the tracing_test crate does but without the extra function injections.
286///
287#[cfg(all(feature = "accumulator", feature = "span"))]
288pub fn accumulating_layer_with_spans(
289    log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
290) -> CallbackLayerWithSpan {
291    CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
292        if let Ok(mut log) = log.write() {
293            log.push(OwnedEventWithSpans::new(event, spans));
294        }
295    })
296}
297
298///
299/// [`CallbackLayer`] is the simple but is the basis for all of the functional
300/// variations available in this crate. All it does is execute a callback for
301/// every tracing event after filtering. If you require [`tracing::span::Span`]
302/// information then use the [`CallbackLayerWithSpan`] instead.
303///
304pub struct CallbackLayer {
305    callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
306}
307
308impl CallbackLayer {
309    pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
310        let callback = Box::new(callback);
311        Self { callback }
312    }
313}
314
315impl<S> Layer<S> for CallbackLayer
316where
317    S: Subscriber,
318    S: for<'lookup> LookupSpan<'lookup>,
319{
320    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
321        (self.callback)(event);
322    }
323}
324
325///
326/// [`CallbackLayerWithSpan`] is similar to [`CallbackLayer`] but it also includes parent spans.
327/// Therefor the provided callback must accept two parameters. The callback is executed for
328/// every tracing event after filtering. If you don't require [`span::Span`] information
329/// then use the [`CallbackLayer`] instead.
330///
331#[cfg(feature = "span")]
332pub struct CallbackLayerWithSpan {
333    #[allow(clippy::type_complexity)]
334    callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
335}
336
337#[cfg(feature = "span")]
338impl CallbackLayerWithSpan {
339    pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
340        callback: F,
341    ) -> Self {
342        let callback = Box::new(callback);
343        Self { callback }
344    }
345}
346
347#[cfg(feature = "span")]
348impl<S> Layer<S> for CallbackLayerWithSpan
349where
350    S: Subscriber,
351    S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
352{
353    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
354        let mut visitor = FieldVisitor::default();
355        attrs.record(&mut visitor);
356        if let Some(span) = ctx.span(id) {
357            span.extensions_mut().insert(visitor);
358        }
359    }
360    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
361        let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
362            scope
363                .from_root()
364                .map(|span| {
365                    let fields: Option<JsonMap> = span
366                        .extensions()
367                        .get::<FieldVisitor>()
368                        .cloned()
369                        .map(|FieldVisitor(json_map)| json_map);
370
371                    let meta = span.metadata();
372                    let mut map = JsonMap::default();
373                    map.insert("level".into(), format!("{}", meta.level()).into());
374                    map.insert("file".into(), meta.file().into());
375                    map.insert("target".into(), meta.target().into());
376                    map.insert("line".into(), meta.line().into());
377                    map.insert("name".into(), span.name().into());
378                    map.insert("fields".into(), fields.into());
379                    map
380                })
381                .collect()
382        });
383        (self.callback)(event, spans);
384    }
385}
386
387///
388/// # OwnedEvent
389///
390/// [`OwnedEvent`] is the an owned version of the [`Event`], which
391/// is not `Send`. Our event must be `Send` to be sent on a channel.
392///
393#[derive(Debug, Clone, serde::Serialize)]
394pub struct OwnedEvent {
395    pub level: TracingLevel,
396    pub file: Option<String>,
397    pub target: String,
398    pub line: Option<u32>,
399    pub name: &'static str,
400    pub message: Option<String>,
401    pub fields: serde_json::Map<String, serde_json::Value>,
402}
403
404///
405/// # OwnedEvent
406///
407/// [`OwnedEventWithSpan`] is the an owned version of the [`Event`],
408/// which is not `Send` with their accompanying parent spans.
409///
410#[derive(Debug, Clone, serde::Serialize)]
411pub struct OwnedEventWithSpans {
412    pub event: OwnedEvent,
413    pub spans: Option<Vec<JsonMap>>,
414}
415
416impl OwnedEventWithSpans {
417    pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
418        OwnedEventWithSpans {
419            event: event.into(),
420            spans,
421        }
422    }
423}
424
425///
426/// Same as [`tracing::Level`] but serializable.
427///
428#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
429pub enum TracingLevel {
430    Trace = 0,
431    Debug = 1,
432    Info = 2,
433    Warn = 3,
434    Error = 4,
435}
436
437impl From<&tracing::Level> for TracingLevel {
438    fn from(value: &tracing::Level) -> Self {
439        match *value {
440            tracing::Level::TRACE => TracingLevel::Trace,
441            tracing::Level::DEBUG => TracingLevel::Debug,
442            tracing::Level::INFO => TracingLevel::Info,
443            tracing::Level::WARN => TracingLevel::Warn,
444            tracing::Level::ERROR => TracingLevel::Error,
445        }
446    }
447}
448
449impl AsRef<str> for TracingLevel {
450    fn as_ref(&self) -> &str {
451        use TracingLevel::*;
452        match self {
453            Trace => "TRACE",
454            Debug => "DEBUG",
455            Info => "INFO",
456            Warn => "WARN",
457            Error => "ERROR",
458        }
459    }
460}
461
462impl Display for TracingLevel {
463    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464        write!(f, "{}", self.as_ref())
465    }
466}
467
468///
469/// Converts a [`tracing::Event`] to a [`OwnedEvent`].
470///
471/// The conversion essentially makes the `Event` an owned value,
472/// which is necessary for it to be `Send` and hence to be sent on a channel.
473///
474impl From<&Event<'_>> for OwnedEvent {
475    fn from(event: &Event<'_>) -> Self {
476        let mut visitor = FieldVisitor::default();
477        event.record(&mut visitor);
478
479        // We make an exception for the message field because it's usually
480        // the one we are always interested in.
481        let message = visitor.0.remove("message").and_then(|v| {
482            if let serde_json::Value::String(s) = v {
483                Some(s)
484            } else {
485                None
486            }
487        });
488
489        let meta = event.metadata();
490        Self {
491            name: meta.name(),
492            target: meta.target().into(),
493            level: meta.level().into(),
494            file: meta.file().map(String::from),
495            line: meta.line(),
496            message,
497            fields: visitor.0,
498        }
499    }
500}
501
502///
503/// # FieldVisitor
504///
505/// Private structure that allows us to collect field values into as Json map.
506///
507#[cfg(feature = "span")]
508#[derive(Default, Clone)]
509struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
510
511impl Visit for FieldVisitor {
512    fn record_bool(&mut self, field: &Field, value: bool) {
513        self.0.insert(field.name().into(), value.into());
514    }
515
516    fn record_f64(&mut self, field: &Field, value: f64) {
517        self.0.insert(field.name().into(), value.into());
518    }
519    fn record_i64(&mut self, field: &Field, value: i64) {
520        self.0.insert(field.name().into(), value.into());
521    }
522    fn record_u64(&mut self, field: &Field, value: u64) {
523        self.0.insert(field.name().into(), value.into());
524    }
525    fn record_str(&mut self, field: &Field, value: &str) {
526        self.0.insert(field.name().into(), value.into());
527    }
528    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
529        let text = format!("{:?}", value);
530        self.0.insert(field.name().into(), text.into());
531    }
532}
533
534// ----------------------------------------------------------------------------
535//
536// Unit Testing
537//
538// ----------------------------------------------------------------------------
539
540#[cfg(test)]
541mod tests {
542
543    #[cfg(feature = "async")]
544    use {
545        std::sync::{Arc, RwLock},
546        tokio::sync::mpsc,
547    };
548
549    use {
550        super::*,
551        insta::{
552            internals::{Content, ContentPath},
553            *,
554        },
555        tracing_subscriber::{EnvFilter, prelude::*},
556    };
557
558    // ----------------------------------------------------------------------------
559    // Helper Functions
560    // ----------------------------------------------------------------------------
561
562    fn run_trace_events() {
563        let span = tracing::info_span!("root-info", recurse = 0);
564        span.in_scope(|| {
565            tracing::trace!(foo = 1, bar = 2, "this is a trace message");
566            tracing::debug!(pi = 3.14159265, "this is a debug message");
567            tracing::info!(job = "foo", "this is an info message");
568            tracing::warn!(job = "foo", "this is a warning message");
569            tracing::error!(job = "foo", "this is an error message");
570        });
571    }
572
573    fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
574        let events = logs.read().expect("could not read events");
575        events.clone()
576    }
577
578    fn redact_name(value: Content, _path: ContentPath) -> String {
579        let s = value.as_str().unwrap_or_default();
580        if s.contains(":") {
581            s.split_once(":")
582                .map(|p| format!("{}:<line>", p.0))
583                .unwrap_or_default()
584        } else {
585            s.to_string()
586        }
587    }
588
589    // ----------------------------------------------------------------------------
590    // Tests
591    // ----------------------------------------------------------------------------
592
593    #[test]
594    fn test_callback_layer() {
595        //
596        // Collect logs into a vector
597        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
598        let cb_events = events.clone();
599
600        let _guard = tracing_subscriber::registry()
601            .with(EnvFilter::new("tracing_async2=trace"))
602            .with(callback_layer(move |event| {
603                if let Ok(mut events) = cb_events.write() {
604                    events.push(event.into());
605                }
606            }))
607            .set_default();
608
609        run_trace_events();
610
611        assert_json_snapshot!("callback-layer", extract_events(&events), {
612            "[].line" => "<line>",
613            "[].name" => dynamic_redaction(redact_name),
614        });
615    }
616
617    #[cfg(feature = "span")]
618    #[test]
619    fn test_callback_layer_with_spans() {
620        //
621        // Collect logs into a vector
622        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
623        let cb_events = events.clone();
624
625        let _guard = tracing_subscriber::registry()
626            .with(EnvFilter::new("tracing_async2=trace"))
627            .with(callback_layer_with_spans(move |event, spans| {
628                if let Ok(mut events) = cb_events.write() {
629                    events.push(OwnedEventWithSpans::new(event, spans));
630                }
631            }))
632            .set_default();
633
634        run_trace_events();
635
636        assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
637            "[].event.line" => "<line>",
638            "[].event.name" => dynamic_redaction(redact_name),
639            "[].spans[].line" => "<line>",
640            "[].spans[].name" => dynamic_redaction(redact_name),
641        });
642    }
643
644    #[cfg(feature = "async")]
645    #[tokio::test]
646    async fn test_channel_layer() {
647        //
648        use std::time::Duration;
649        use tokio::time::sleep;
650
651        //
652        // Collect logs into a vector
653        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
654        let cb_events = events.clone();
655
656        let (tx, mut rx) = mpsc::channel(100);
657
658        let _guard = tracing_subscriber::registry()
659            .with(EnvFilter::new("tracing_async2=trace"))
660            .with(channel_layer(tx))
661            .set_default();
662
663        let handle = tokio::spawn(async move {
664            while let Some(event) = rx.recv().await {
665                if let Ok(mut events) = cb_events.write() {
666                    events.push(event);
667                }
668            }
669        });
670
671        run_trace_events();
672        sleep(Duration::from_millis(100)).await;
673        handle.abort();
674
675        assert_json_snapshot!("channel-layer", extract_events(&events), {
676            "[].line" => "<line>",
677            "[].name" => dynamic_redaction(redact_name),
678        });
679    }
680
681    #[cfg(all(feature = "async", feature = "span"))]
682    #[tokio::test]
683    async fn test_channel_layer_with_spans() {
684        //
685        use std::time::Duration;
686        use tokio::time::sleep;
687
688        //
689        // Collect logs into a vector
690        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
691        let cb_events = events.clone();
692
693        let (tx, mut rx) = mpsc::channel(100);
694
695        let _guard = tracing_subscriber::registry()
696            .with(EnvFilter::new("tracing_async2=trace"))
697            .with(channel_layer_with_spans(tx))
698            .set_default();
699
700        let handle = tokio::spawn(async move {
701            while let Some(event) = rx.recv().await {
702                if let Ok(mut events) = cb_events.write() {
703                    events.push(event);
704                }
705            }
706        });
707
708        run_trace_events();
709        sleep(Duration::from_millis(100)).await;
710        handle.abort();
711
712        assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
713            "[].event.line" => "<line>",
714            "[].event.name" => dynamic_redaction(redact_name),
715            "[].spans[].line" => "<line>",
716            "[].spans[].name" => dynamic_redaction(redact_name),
717        });
718    }
719
720    #[cfg(feature = "async")]
721    #[tokio::test]
722    async fn test_async_layer() {
723        //
724        use std::time::Duration;
725        use tokio::time::sleep;
726
727        //
728        // Collect logs into a vector
729        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
730        let cb_events = events.clone();
731
732        let _guard = tracing_subscriber::registry()
733            .with(EnvFilter::new("tracing_async2=trace"))
734            .with(async_layer(16, move |event| {
735                let f_events = cb_events.clone();
736                async move {
737                    if let Ok(mut events) = f_events.write() {
738                        events.push(event);
739                    }
740                }
741            }))
742            .set_default();
743
744        run_trace_events();
745        sleep(Duration::from_millis(100)).await;
746
747        assert_json_snapshot!("async-layer", extract_events(&events), {
748            "[].line" => "<line>",
749            "[].name" => dynamic_redaction(redact_name),
750        });
751    }
752
753    #[cfg(all(feature = "async", feature = "span"))]
754    #[tokio::test]
755    async fn test_async_layer_with_spans() {
756        //
757        use std::time::Duration;
758        use tokio::time::sleep;
759
760        //
761        // Collect logs into a vector
762        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
763        let cb_events = events.clone();
764
765        let _guard = tracing_subscriber::registry()
766            .with(EnvFilter::new("tracing_async2=trace"))
767            .with(async_layer_with_spans(16, move |event_with_span| {
768                let f_events = cb_events.clone();
769                async move {
770                    if let Ok(mut events) = f_events.write() {
771                        events.push(event_with_span);
772                    }
773                }
774            }))
775            .set_default();
776
777        run_trace_events();
778        sleep(Duration::from_millis(100)).await;
779
780        assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
781            "[].event.line" => "<line>",
782            "[].event.name" => dynamic_redaction(redact_name),
783            "[].spans[].line" => "<line>",
784            "[].spans[].name" => dynamic_redaction(redact_name),
785        });
786    }
787
788    #[cfg(feature = "accumulator")]
789    #[test]
790    fn test_accumulating_layer() {
791        //
792        // Collect logs into a vector
793        let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
794
795        let _guard = tracing_subscriber::registry()
796            .with(EnvFilter::new("tracing_async2=trace"))
797            .with(accumulating_layer(events.clone()))
798            .set_default();
799
800        run_trace_events();
801
802        assert_json_snapshot!("accumulating-layer", extract_events(&events), {
803            "[].line" => "<line>",
804            "[].name" => dynamic_redaction(redact_name),
805        });
806    }
807
808    #[cfg(all(feature = "span", feature = "accumulator"))]
809    #[test]
810    fn test_accumulating_layer_with_spans() {
811        //
812        // Collect logs into a vector
813        let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
814
815        let _guard = tracing_subscriber::registry()
816            .with(EnvFilter::new("tracing_async2=trace"))
817            .with(accumulating_layer_with_spans(events.clone()))
818            .set_default();
819
820        run_trace_events();
821
822        assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
823            "[].event.line" => "<line>",
824            "[].event.name" => dynamic_redaction(redact_name),
825            "[].spans[].line" => "<line>",
826            "[].spans[].name" => dynamic_redaction(redact_name),
827        });
828    }
829}