tracing_forest/
runtime.rs

1//! Run asynchronous code in the context of a `tracing-forest` subscriber.
2//!
3//! This module provides useful abstractions for executing async code:
4//! [`worker_task`] for `main` functions, and [`capture`] for unit tests,
5//! both of which return a configurable [`Builder`] object.
6//!
7//! # Nonblocking log processing with `worker_task`
8//!
9//! `tracing-forest` collects trace data into trees, and can sometimes
10//! produce large trees that need to be processed. To avoid blocking the main
11//! task in these cases, a common strategy is to send this data to a worker
12//! task for formatting and writing.
13//!
14//! The [`worker_task`] function provides this behavior as a first-class feature of this
15//! crate, and handles the configuration, initialization, and graceful shutdown
16//! of a subscriber with an associated worker task for formatting and writing.
17//!
18//! Unlike [`tracing-appender`] which uses a writer thread for formatted logs,
19//! this module allows for log trees to be sent to a worker task before formatting,
20//! allowing more log-related work to be offloaded to the worker task.
21//!
22//! [`tracing-appender`]: https://crates.io/crates/tracing-appender
23//!
24//! ## Examples
25//!
26//! ```
27//! use tracing::{info, info_span};
28//!
29//! #[tokio::main]
30//! async fn main() {
31//!     tracing_forest::worker_task()
32//!         .build()
33//!         .on(async {
34//!             info!("Hello, world!");
35//!
36//!             info_span!("my_span").in_scope(|| {
37//!                 info!("Relevant information");
38//!             })
39//!         })
40//!         .await;
41//! }
42//! ```
43//! Produces the output:
44//! ```log
45//! INFO     i [info]: Hello, world!
46//! INFO     my_span [ 26.0µs | 100.000% ]
47//! INFO     ┕━ i [info]: Relevant information
48//! ```
49//!
50//! For full configuration options, see the [`Builder`] documentation.
51//!
52//! # Inspecting trace data in unit tests with `capture`
53//!
54//! The [`capture`] function offers the ability to programmatically inspect log
55//! trees generated by `tracing-forest`. It is the unit testing analog of
56//! [`worker_task`], except it returns `Vec<Tree>` after the future is completed,
57//! which can be then be inspected.
58//!
59//! ## Examples
60//!
61//! ```
62//! use tracing_forest::tree::{Tree, Event, Span};
63//! use tracing::{info, info_span};
64//!
65//! #[tokio::main]
66//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
67//!     let logs: Vec<Tree> = tracing_forest::capture()
68//!         .build()
69//!         .on(async {
70//!             info!("Hello, world!");
71//!
72//!             info_span!("my_span").in_scope(|| {
73//!                 info!("Relevant information");
74//!             })
75//!         })
76//!         .await;
77//!
78//!     // There is one event and one span at the root level
79//!     assert!(logs.len() == 2);
80//!
81//!     // Inspect the first event
82//!     let hello_world: &Event = logs[0].event()?;
83//!     assert!(hello_world.message() == Some("Hello, world!"));
84//!
85//!     // Inspect the span
86//!     let my_span: &Span = logs[1].span()?;
87//!     assert!(my_span.name() == "my_span");
88//!
89//!     // Only the `info` event is recorded
90//!     assert!(my_span.nodes().len() == 1);
91//!
92//!     let relevant_info: &Event = my_span.nodes()[0].event()?;
93//!
94//!     assert!(relevant_info.message() == Some("Relevant information"));
95//!
96//!     Ok(())
97//! }
98//! ```
99//!
100//! Additional options for tree inspection can be found in the
101//! [`tree` module-level documentation](crate::tree)
102//!
103//! For full configuration options, see the [`Builder`] documentation.
104use crate::fail;
105use crate::layer::ForestLayer;
106use crate::printer::PrettyPrinter;
107use crate::processor::{self, Processor, WithFallback};
108use crate::tag::{NoTag, TagParser};
109use crate::tree::Tree;
110use std::future::Future;
111use std::iter;
112use tokio::sync::mpsc::{self, UnboundedReceiver};
113use tokio::sync::oneshot;
114use tracing::Subscriber;
115use tracing_subscriber::layer::{Layered, SubscriberExt as _};
116use tracing_subscriber::Registry;
117
118/// Begins the configuration of a `ForestLayer` subscriber that sends log trees
119/// to a processing task for formatting and writing.
120///
121/// For full configuration options, see [`Builder`].
122///
123/// For a high-level overview on usage, see the [module-level documentation][nonblocking-processing]
124/// for more details.
125///
126/// # Note
127///
128/// The [`worker_task`] function defaults to setting the global subscriber, which is required
129/// to detect logs in multithreading scenarios, but prevents setting other [`Subscriber`]s
130/// globally afterwards. This can be disabled via the [`set_global`] method.
131///
132/// [nonblocking-processing]: crate::runtime#nonblocking-log-processing-with-worker_task
133/// [`set_global`]: Builder::set_global
134pub fn worker_task() -> Builder<InnerSender<impl Processor>, WorkerTask<PrettyPrinter>, NoTag> {
135    worker_task_inner(WorkerTask(PrettyPrinter::new()), true)
136}
137
138/// Begins the configuration of a `ForestLayer` subscriber that sends log trees
139/// to a buffer that can later be inspected programatically.
140///
141/// For full configuration options, see [`Builder`].
142///
143/// For a high-level overview on usage, see the [module-level documentation][inspecting-trace-data]
144/// for more details.
145///
146/// # Note
147///
148/// The [`capture`] function defaults to not setting the global subscriber, which
149/// allows multiple unit tests in the same file, but prevents trace data from other
150/// threads to be collected. This can be enabled via the [`set_global`] method.
151///
152/// [inspecting-trace-data]: crate::runtime#inspecting-trace-data-in-unit-tests-with-capture
153/// [`set_global`]: Builder::set_global
154pub fn capture() -> Builder<InnerSender<impl Processor>, Capture, NoTag> {
155    worker_task_inner(Capture(()), false)
156}
157
158fn worker_task_inner<P>(
159    worker_processor: P,
160    is_global: bool,
161) -> Builder<InnerSender<impl Processor>, P, NoTag> {
162    let (tx, rx) = mpsc::unbounded_channel();
163
164    let sender_processor = processor::from_fn(move |tree| {
165        tx.send(tree).map_err(|err| {
166            let msg = err.to_string().into();
167            processor::error(err.0, msg)
168        })
169    });
170
171    Builder {
172        sender_processor: InnerSender(sender_processor),
173        worker_processor,
174        receiver: rx,
175        tag: NoTag,
176        is_global,
177    }
178}
179
180/// Return type of [`worker_task`] and [`capture`].
181///
182/// # Configuring a `Runtime`
183///
184/// `Builder` follows the [builder pattern][builder] to configure a [`Runtime`].
185///
186/// Configuration options include:
187/// * Setting the [tag][set_tag].
188/// * Installing [globally][set_global].
189/// * Configuring the [internal sender][map_sender] with fallbacks.
190/// * Configuring the [processor][map_receiver] in the worker task.
191///
192/// To finish the `Runtime`, call the [`build`] method to compose the configured
193/// `ForestLayer` onto a [`Registry`]. Alternatively, the [`build_on`] method
194/// can be used construct arbitrary `Subscriber`s from the configured `ForestLayer`,
195/// which is used in the returned `Runtime`.
196///
197/// [builder]: https://rust-lang.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder
198/// [set_tag]: Builder::set_tag
199/// [set_global]: Builder::set_global
200/// [map_sender]: Builder::map_sender
201/// [map_receiver]: Builder::map_receiver
202/// [`build`]: Builder::build
203/// [`build_on`]: Builder::build_on
204pub struct Builder<Tx, Rx, T> {
205    sender_processor: Tx,
206    worker_processor: Rx,
207    receiver: UnboundedReceiver<Tree>,
208    tag: T,
209    is_global: bool,
210}
211
212/// A marker type indicating that trace data should be captured for later use.
213pub struct Capture(());
214
215/// A marker type indicating that trace data should be processed.
216pub struct WorkerTask<P>(P);
217
218/// The [`Processor`] used within a `tracing-forest` subscriber for sending logs
219/// to a processing task.
220///
221/// This type cannot be constructed by downstream users.
222#[derive(Debug)]
223pub struct InnerSender<P>(P);
224
225impl<P: Processor> Processor for InnerSender<P> {
226    fn process(&self, tree: Tree) -> processor::Result {
227        self.0.process(tree)
228    }
229}
230
231mod sealed {
232    pub trait Sealed {}
233}
234
235impl<P> sealed::Sealed for InnerSender<P> {}
236
237impl<P: sealed::Sealed, F> sealed::Sealed for WithFallback<P, F> {}
238
239impl<Tx, P, T> Builder<Tx, WorkerTask<P>, T>
240where
241    P: Processor,
242{
243    /// Configure the processor on the receiving end of the log channel.
244    /// This is particularly useful for adding fallbacks.
245    ///
246    /// This method accepts a closure that accepts the current [`Processor`] on the
247    /// worker task, and maps it to another [`Processor`].
248    ///
249    /// # Note
250    ///
251    /// This method is only available if called after [`worker_task`].
252    ///
253    /// # Examples
254    ///
255    /// Configuring the writing task to write to a file, or else fall back to stderr.
256    /// ```no_run
257    /// # #[tokio::main]
258    /// # async fn main() {
259    /// use tracing_forest::traits::*;
260    /// use std::fs::File;
261    ///
262    /// let out = File::create("out.log").unwrap();
263    ///
264    /// tracing_forest::worker_task()
265    ///     .map_receiver(|printer| printer
266    ///         .writer(out)
267    ///         .or_stderr()
268    ///     )
269    ///     .build()
270    ///     .on(async {
271    ///         // ...
272    ///     })
273    ///     .await;
274    /// # }
275    /// ```
276    pub fn map_receiver<F, P2>(self, f: F) -> Builder<Tx, WorkerTask<P2>, T>
277    where
278        F: FnOnce(P) -> P2,
279        P2: Processor,
280    {
281        Builder {
282            sender_processor: self.sender_processor,
283            worker_processor: WorkerTask(f(self.worker_processor.0)),
284            receiver: self.receiver,
285            tag: self.tag,
286            is_global: self.is_global,
287        }
288    }
289}
290
291impl<Tx, Rx, T> Builder<Tx, Rx, T>
292where
293    Tx: Processor + sealed::Sealed,
294    T: TagParser,
295{
296    /// Configure the processer within the subscriber that sends log trees to
297    /// a processing task. This allows for dangling tasks to still generate trace
298    /// data, even after the worker task closes.
299    ///
300    /// # Examples
301    ///
302    /// Allowing the subscriber to defer to stderr if the worker task finished.
303    /// ```no_run
304    /// # #[tokio::main]
305    /// # async fn main() {
306    /// use tracing_forest::traits::*;
307    ///
308    /// tracing_forest::worker_task()
309    ///     .map_sender(|sender| sender.or_stderr())
310    ///     .build()
311    ///     .on(async {
312    /// #       mod tokio {
313    /// #          pub async fn spawn<T>(_: T) {}
314    /// #          pub mod signal {
315    /// #              pub async fn ctrl_c() -> Result<(), ()> { Ok(()) }
316    /// #          }
317    /// #       }
318    ///         // The handle is immediately dropped, leaving the task dangling
319    ///         tokio::spawn(async {
320    ///             // Some unending task
321    ///         });
322    ///
323    ///         // Wait until the user stops the application
324    ///         tokio::signal::ctrl_c().await.expect("Failed to listen for CTRL-C");
325    ///     })
326    ///     .await;
327    ///     // The worker task is completed and the channel is closed at this point.
328    ///     // Any new trace data generated by the dangling task at this point
329    ///     // is deferred to stderr because of the added fallback.
330    /// # }
331    /// ```
332    ///
333    /// Since dropping the sender half would make the receiver task useless, this
334    /// method uses traits to enforce at compile time that the function returns
335    /// some derivation of the sender. Currently, the only accepted wrapping is
336    /// through adding a fallback.
337    /// ```compile_fail
338    /// use tracing_forest::PrettyPrinter;
339    ///
340    /// # #[tokio::main]
341    /// # async fn main() {
342    /// tracing_forest::worker_task()
343    ///     .map_sender(|_sender| {
344    ///         // Some variation of the sender isn't returned, so this won't compile.
345    ///         PrettyPrinter::new()
346    ///     })
347    ///     .build()
348    ///     .on(async {
349    ///         // ...
350    ///     })
351    ///     .await;
352    /// # }
353    /// ```
354    pub fn map_sender<F, Tx2>(self, f: F) -> Builder<Tx2, Rx, T>
355    where
356        F: FnOnce(Tx) -> Tx2,
357        Tx2: Processor + sealed::Sealed,
358    {
359        Builder {
360            sender_processor: f(self.sender_processor),
361            worker_processor: self.worker_processor,
362            receiver: self.receiver,
363            tag: self.tag,
364            is_global: self.is_global,
365        }
366    }
367
368    /// Set the [`TagParser`].
369    ///
370    /// # Examples
371    ///
372    /// ```
373    /// use tracing_forest::{util::*, Tag};
374    ///
375    /// fn simple_tag(event: &Event) -> Option<Tag> {
376    ///     // -- snip --
377    ///     # None
378    /// }
379    ///
380    /// #[tokio::main]
381    /// async fn main() {
382    ///     tracing_forest::worker_task()
383    ///         .set_tag(simple_tag)
384    ///         .build()
385    ///         .on(async {
386    ///             // ...
387    ///         })
388    ///         .await;
389    /// }
390    /// ```
391    pub fn set_tag<T2>(self, tag: T2) -> Builder<Tx, Rx, T2>
392    where
393        T2: TagParser,
394    {
395        Builder {
396            sender_processor: self.sender_processor,
397            worker_processor: self.worker_processor,
398            receiver: self.receiver,
399            tag,
400            is_global: self.is_global,
401        }
402    }
403
404    /// Set whether or not the subscriber should be set globally.
405    ///
406    /// Setting the subscriber globally is intended for `main` functions, since
407    /// it allows logs to be be collected across multithreaded environments. Not
408    /// setting globally is intended for test functions, which need to set a new
409    /// subscriber multiple times in the same program.
410    ///
411    /// # Examples
412    ///
413    /// For multithreaded tests, `set_global` can be used so that the subscriber
414    /// applies to all the threads. However, each function that sets a global
415    /// subscriber must be in its own compilation unit, like an integration test,
416    /// otherwise the global subscriber will carry over across tests.
417    /// ```
418    /// #[tokio::test(flavor = "multi_thread")]
419    /// async fn test_multithreading() {
420    ///     let logs = tracing_forest::capture()
421    ///         .set_global(true)
422    ///         .build()
423    ///         .on(async {
424    ///             // spawn some tasks
425    ///         })
426    ///         .await;
427    ///
428    ///     // inspect logs...
429    /// }
430    /// ```
431    pub fn set_global(mut self, is_global: bool) -> Self {
432        self.is_global = is_global;
433        self
434    }
435
436    /// Finishes the `ForestLayer` by composing it into a [`Registry`], and
437    /// returns it as a [`Runtime`].
438    ///
439    /// This method is useful for a basic configuration of a `Subscriber`. For
440    /// a more advanced configuration, see the [`build_on`] and [`build_with`]
441    /// methods.
442    ///
443    /// [`build_on`]: Builder::build_on
444    /// [`build_with`]: Builder::build_with
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// #[tokio::main]
450    /// async fn main() {
451    ///     tracing_forest::worker_task()
452    ///         .build()
453    ///         .on(async {
454    ///             // ...
455    ///         })
456    ///         .await;
457    /// }
458    /// ```
459    pub fn build(self) -> Runtime<Layered<ForestLayer<Tx, T>, Registry>, Rx> {
460        self.build_on(|x| x)
461    }
462
463    /// Finishes the `ForestLayer` by calling a function to build a `Subscriber`,
464    /// and returns in as a [`Runtime`].
465    ///
466    /// Unlike [`build_with`], this method composes the layer onto a [`Registry`]
467    /// prior to passing it into the function. This makes it more convenient for
468    /// the majority of use cases.
469    ///
470    /// This method is useful for advanced configuration of `Subscriber`s as
471    /// defined in [`tracing-subscriber`s documentation]. For a basic configuration,
472    /// see the [`build`] method.
473    ///
474    /// [`build_with`]: Builder::build_with
475    /// [`tracing-subscriber`s documentation]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/layer/index.html#composing-layers
476    /// [`build`]: Builder::build
477    ///
478    /// # Examples
479    ///
480    /// Composing a `Subscriber` with multiple layers:
481    /// ```
482    /// use tracing_forest::{traits::*, util::*};
483    ///
484    /// #[tokio::main]
485    /// async fn main() {
486    ///     tracing_forest::worker_task()
487    ///         .build_on(|subscriber| subscriber.with(LevelFilter::INFO))
488    ///         .on(async {
489    ///             // ...
490    ///         })
491    ///         .await;
492    /// }
493    /// ```
494    pub fn build_on<F, S>(self, f: F) -> Runtime<S, Rx>
495    where
496        F: FnOnce(Layered<ForestLayer<Tx, T>, Registry>) -> S,
497        S: Subscriber,
498    {
499        self.build_with(|layer| f(Registry::default().with(layer)))
500    }
501
502    /// Finishes the `ForestLayer` by calling a function to build a `Subscriber`,
503    /// and returns it as a [`Runtime`].
504    ///
505    /// Unlike [`build_on`], this method passes the `ForestLayer` to the function
506    /// without presupposing a [`Registry`] base. This makes it the most flexible
507    /// option for construction.
508    ///
509    /// This method is useful for advanced configuration of `Subscriber`s as
510    /// defined in [`tracing-subscriber`s documentation]. For a basic configuration,
511    /// see the [`build`] method.
512    ///
513    /// [`build_on`]: Builder::build_on
514    /// [`tracing-subscriber`s documentation]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/layer/index.html#composing-layers
515    /// [`build`]: Builder::build
516    ///
517    /// # Examples
518    ///
519    /// Composing a `Subscriber` with multiple layers:
520    /// ```
521    /// use tracing_subscriber::Registry;
522    /// use tracing_forest::{traits::*, util::*};
523    ///
524    /// #[tokio::main]
525    /// async fn main() {
526    ///     tracing_forest::worker_task()
527    ///         .build_with(|layer: ForestLayer<_, _>| {
528    ///             Registry::default()
529    ///                 .with(layer)
530    ///                 .with(LevelFilter::INFO)
531    ///         })
532    ///         .on(async {
533    ///             // ...
534    ///         })
535    ///         .await;
536    /// }
537    /// ```
538    pub fn build_with<F, S>(self, f: F) -> Runtime<S, Rx>
539    where
540        F: FnOnce(ForestLayer<Tx, T>) -> S,
541        S: Subscriber,
542    {
543        let layer = ForestLayer::new(self.sender_processor, self.tag);
544        let subscriber = f(layer);
545
546        Runtime {
547            subscriber,
548            worker_processor: self.worker_processor,
549            receiver: self.receiver,
550            is_global: self.is_global,
551        }
552    }
553}
554
555/// Execute a `Future` in the context of a subscriber with a `ForestLayer`.
556///
557/// This type is returned by [`Builder::build`] and [`Builder::build_with`].
558pub struct Runtime<S, P> {
559    subscriber: S,
560    worker_processor: P, // either `Process<_>` or `Capture`
561    receiver: UnboundedReceiver<Tree>,
562    is_global: bool,
563}
564
565impl<S, P> Runtime<S, WorkerTask<P>>
566where
567    S: Subscriber + Send + Sync,
568    P: Processor + Send,
569{
570    /// Execute a future in the context of the configured subscriber.
571    pub async fn on<F: Future>(self, f: F) -> F::Output {
572        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
573        let processor = self.worker_processor.0;
574        let mut receiver = self.receiver;
575
576        let handle = tokio::spawn(async move {
577            loop {
578                tokio::select! {
579                    Some(tree) = receiver.recv() => processor.process(tree).expect(fail::PROCESSING_ERROR),
580                    Ok(()) = &mut shutdown_rx => break,
581                    else => break,
582                }
583            }
584
585            receiver.close();
586
587            // Drain any remaining logs in the channel buffer.
588            while let Ok(tree) = receiver.try_recv() {
589                processor.process(tree).expect(fail::PROCESSING_ERROR);
590            }
591        });
592
593        let output = {
594            let _guard = if self.is_global {
595                tracing::subscriber::set_global_default(self.subscriber)
596                    .expect("global default already set");
597                None
598            } else {
599                Some(tracing::subscriber::set_default(self.subscriber))
600            };
601
602            f.await
603        };
604
605        shutdown_tx
606            .send(())
607            .expect("Shutdown signal couldn't send, this is a bug");
608
609        handle
610            .await
611            .expect("Failed to join the writing task, this is a bug");
612
613        output
614    }
615}
616
617impl<S> Runtime<S, Capture>
618where
619    S: Subscriber + Send + Sync,
620{
621    /// Execute a future in the context of the configured subscriber, and return
622    /// a `Vec<Tree>` of generated logs.
623    pub async fn on(self, f: impl Future<Output = ()>) -> Vec<Tree> {
624        {
625            let _guard = if self.is_global {
626                tracing::subscriber::set_global_default(self.subscriber)
627                    .expect("global default already set");
628                None
629            } else {
630                Some(tracing::subscriber::set_default(self.subscriber))
631            };
632
633            f.await;
634        }
635
636        let mut receiver = self.receiver;
637
638        receiver.close();
639
640        iter::from_fn(|| receiver.try_recv().ok()).collect()
641    }
642}