tracing_forest/runtime.rs
1//! Run asynchronous code in the context of a `tracing-forest` subscriber.
2//!
3//! This module provides useful abstractions for executing async code:
4//! [`worker_task`] for `main` functions, and [`capture`] for unit tests,
5//! both of which return a configurable [`Builder`] object.
6//!
7//! # Nonblocking log processing with `worker_task`
8//!
9//! `tracing-forest` collects trace data into trees, and can sometimes
10//! produce large trees that need to be processed. To avoid blocking the main
11//! task in these cases, a common strategy is to send this data to a worker
12//! task for formatting and writing.
13//!
14//! The [`worker_task`] function provides this behavior as a first-class feature of this
15//! crate, and handles the configuration, initialization, and graceful shutdown
16//! of a subscriber with an associated worker task for formatting and writing.
17//!
18//! Unlike [`tracing-appender`] which uses a writer thread for formatted logs,
19//! this module allows for log trees to be sent to a worker task before formatting,
20//! allowing more log-related work to be offloaded to the worker task.
21//!
22//! [`tracing-appender`]: https://crates.io/crates/tracing-appender
23//!
24//! ## Examples
25//!
26//! ```
27//! use tracing::{info, info_span};
28//!
29//! #[tokio::main]
30//! async fn main() {
31//! tracing_forest::worker_task()
32//! .build()
33//! .on(async {
34//! info!("Hello, world!");
35//!
36//! info_span!("my_span").in_scope(|| {
37//! info!("Relevant information");
38//! })
39//! })
40//! .await;
41//! }
42//! ```
43//! Produces the output:
44//! ```log
45//! INFO i [info]: Hello, world!
46//! INFO my_span [ 26.0µs | 100.000% ]
47//! INFO ┕━ i [info]: Relevant information
48//! ```
49//!
50//! For full configuration options, see the [`Builder`] documentation.
51//!
52//! # Inspecting trace data in unit tests with `capture`
53//!
54//! The [`capture`] function offers the ability to programmatically inspect log
55//! trees generated by `tracing-forest`. It is the unit testing analog of
56//! [`worker_task`], except it returns `Vec<Tree>` after the future is completed,
57//! which can be then be inspected.
58//!
59//! ## Examples
60//!
61//! ```
62//! use tracing_forest::tree::{Tree, Event, Span};
63//! use tracing::{info, info_span};
64//!
65//! #[tokio::main]
66//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
67//! let logs: Vec<Tree> = tracing_forest::capture()
68//! .build()
69//! .on(async {
70//! info!("Hello, world!");
71//!
72//! info_span!("my_span").in_scope(|| {
73//! info!("Relevant information");
74//! })
75//! })
76//! .await;
77//!
78//! // There is one event and one span at the root level
79//! assert!(logs.len() == 2);
80//!
81//! // Inspect the first event
82//! let hello_world: &Event = logs[0].event()?;
83//! assert!(hello_world.message() == Some("Hello, world!"));
84//!
85//! // Inspect the span
86//! let my_span: &Span = logs[1].span()?;
87//! assert!(my_span.name() == "my_span");
88//!
89//! // Only the `info` event is recorded
90//! assert!(my_span.nodes().len() == 1);
91//!
92//! let relevant_info: &Event = my_span.nodes()[0].event()?;
93//!
94//! assert!(relevant_info.message() == Some("Relevant information"));
95//!
96//! Ok(())
97//! }
98//! ```
99//!
100//! Additional options for tree inspection can be found in the
101//! [`tree` module-level documentation](crate::tree)
102//!
103//! For full configuration options, see the [`Builder`] documentation.
104use crate::fail;
105use crate::layer::ForestLayer;
106use crate::printer::PrettyPrinter;
107use crate::processor::{self, Processor, WithFallback};
108use crate::tag::{NoTag, TagParser};
109use crate::tree::Tree;
110use std::future::Future;
111use std::iter;
112use tokio::sync::mpsc::{self, UnboundedReceiver};
113use tokio::sync::oneshot;
114use tracing::Subscriber;
115use tracing_subscriber::layer::{Layered, SubscriberExt as _};
116use tracing_subscriber::Registry;
117
118/// Begins the configuration of a `ForestLayer` subscriber that sends log trees
119/// to a processing task for formatting and writing.
120///
121/// For full configuration options, see [`Builder`].
122///
123/// For a high-level overview on usage, see the [module-level documentation][nonblocking-processing]
124/// for more details.
125///
126/// # Note
127///
128/// The [`worker_task`] function defaults to setting the global subscriber, which is required
129/// to detect logs in multithreading scenarios, but prevents setting other [`Subscriber`]s
130/// globally afterwards. This can be disabled via the [`set_global`] method.
131///
132/// [nonblocking-processing]: crate::runtime#nonblocking-log-processing-with-worker_task
133/// [`set_global`]: Builder::set_global
134pub fn worker_task() -> Builder<InnerSender<impl Processor>, WorkerTask<PrettyPrinter>, NoTag> {
135 worker_task_inner(WorkerTask(PrettyPrinter::new()), true)
136}
137
138/// Begins the configuration of a `ForestLayer` subscriber that sends log trees
139/// to a buffer that can later be inspected programatically.
140///
141/// For full configuration options, see [`Builder`].
142///
143/// For a high-level overview on usage, see the [module-level documentation][inspecting-trace-data]
144/// for more details.
145///
146/// # Note
147///
148/// The [`capture`] function defaults to not setting the global subscriber, which
149/// allows multiple unit tests in the same file, but prevents trace data from other
150/// threads to be collected. This can be enabled via the [`set_global`] method.
151///
152/// [inspecting-trace-data]: crate::runtime#inspecting-trace-data-in-unit-tests-with-capture
153/// [`set_global`]: Builder::set_global
154pub fn capture() -> Builder<InnerSender<impl Processor>, Capture, NoTag> {
155 worker_task_inner(Capture(()), false)
156}
157
158fn worker_task_inner<P>(
159 worker_processor: P,
160 is_global: bool,
161) -> Builder<InnerSender<impl Processor>, P, NoTag> {
162 let (tx, rx) = mpsc::unbounded_channel();
163
164 let sender_processor = processor::from_fn(move |tree| {
165 tx.send(tree).map_err(|err| {
166 let msg = err.to_string().into();
167 processor::error(err.0, msg)
168 })
169 });
170
171 Builder {
172 sender_processor: InnerSender(sender_processor),
173 worker_processor,
174 receiver: rx,
175 tag: NoTag,
176 is_global,
177 }
178}
179
180/// Return type of [`worker_task`] and [`capture`].
181///
182/// # Configuring a `Runtime`
183///
184/// `Builder` follows the [builder pattern][builder] to configure a [`Runtime`].
185///
186/// Configuration options include:
187/// * Setting the [tag][set_tag].
188/// * Installing [globally][set_global].
189/// * Configuring the [internal sender][map_sender] with fallbacks.
190/// * Configuring the [processor][map_receiver] in the worker task.
191///
192/// To finish the `Runtime`, call the [`build`] method to compose the configured
193/// `ForestLayer` onto a [`Registry`]. Alternatively, the [`build_on`] method
194/// can be used construct arbitrary `Subscriber`s from the configured `ForestLayer`,
195/// which is used in the returned `Runtime`.
196///
197/// [builder]: https://rust-lang.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder
198/// [set_tag]: Builder::set_tag
199/// [set_global]: Builder::set_global
200/// [map_sender]: Builder::map_sender
201/// [map_receiver]: Builder::map_receiver
202/// [`build`]: Builder::build
203/// [`build_on`]: Builder::build_on
204pub struct Builder<Tx, Rx, T> {
205 sender_processor: Tx,
206 worker_processor: Rx,
207 receiver: UnboundedReceiver<Tree>,
208 tag: T,
209 is_global: bool,
210}
211
212/// A marker type indicating that trace data should be captured for later use.
213pub struct Capture(());
214
215/// A marker type indicating that trace data should be processed.
216pub struct WorkerTask<P>(P);
217
218/// The [`Processor`] used within a `tracing-forest` subscriber for sending logs
219/// to a processing task.
220///
221/// This type cannot be constructed by downstream users.
222#[derive(Debug)]
223pub struct InnerSender<P>(P);
224
225impl<P: Processor> Processor for InnerSender<P> {
226 fn process(&self, tree: Tree) -> processor::Result {
227 self.0.process(tree)
228 }
229}
230
231mod sealed {
232 pub trait Sealed {}
233}
234
235impl<P> sealed::Sealed for InnerSender<P> {}
236
237impl<P: sealed::Sealed, F> sealed::Sealed for WithFallback<P, F> {}
238
239impl<Tx, P, T> Builder<Tx, WorkerTask<P>, T>
240where
241 P: Processor,
242{
243 /// Configure the processor on the receiving end of the log channel.
244 /// This is particularly useful for adding fallbacks.
245 ///
246 /// This method accepts a closure that accepts the current [`Processor`] on the
247 /// worker task, and maps it to another [`Processor`].
248 ///
249 /// # Note
250 ///
251 /// This method is only available if called after [`worker_task`].
252 ///
253 /// # Examples
254 ///
255 /// Configuring the writing task to write to a file, or else fall back to stderr.
256 /// ```no_run
257 /// # #[tokio::main]
258 /// # async fn main() {
259 /// use tracing_forest::traits::*;
260 /// use std::fs::File;
261 ///
262 /// let out = File::create("out.log").unwrap();
263 ///
264 /// tracing_forest::worker_task()
265 /// .map_receiver(|printer| printer
266 /// .writer(out)
267 /// .or_stderr()
268 /// )
269 /// .build()
270 /// .on(async {
271 /// // ...
272 /// })
273 /// .await;
274 /// # }
275 /// ```
276 pub fn map_receiver<F, P2>(self, f: F) -> Builder<Tx, WorkerTask<P2>, T>
277 where
278 F: FnOnce(P) -> P2,
279 P2: Processor,
280 {
281 Builder {
282 sender_processor: self.sender_processor,
283 worker_processor: WorkerTask(f(self.worker_processor.0)),
284 receiver: self.receiver,
285 tag: self.tag,
286 is_global: self.is_global,
287 }
288 }
289}
290
291impl<Tx, Rx, T> Builder<Tx, Rx, T>
292where
293 Tx: Processor + sealed::Sealed,
294 T: TagParser,
295{
296 /// Configure the processer within the subscriber that sends log trees to
297 /// a processing task. This allows for dangling tasks to still generate trace
298 /// data, even after the worker task closes.
299 ///
300 /// # Examples
301 ///
302 /// Allowing the subscriber to defer to stderr if the worker task finished.
303 /// ```no_run
304 /// # #[tokio::main]
305 /// # async fn main() {
306 /// use tracing_forest::traits::*;
307 ///
308 /// tracing_forest::worker_task()
309 /// .map_sender(|sender| sender.or_stderr())
310 /// .build()
311 /// .on(async {
312 /// # mod tokio {
313 /// # pub async fn spawn<T>(_: T) {}
314 /// # pub mod signal {
315 /// # pub async fn ctrl_c() -> Result<(), ()> { Ok(()) }
316 /// # }
317 /// # }
318 /// // The handle is immediately dropped, leaving the task dangling
319 /// tokio::spawn(async {
320 /// // Some unending task
321 /// });
322 ///
323 /// // Wait until the user stops the application
324 /// tokio::signal::ctrl_c().await.expect("Failed to listen for CTRL-C");
325 /// })
326 /// .await;
327 /// // The worker task is completed and the channel is closed at this point.
328 /// // Any new trace data generated by the dangling task at this point
329 /// // is deferred to stderr because of the added fallback.
330 /// # }
331 /// ```
332 ///
333 /// Since dropping the sender half would make the receiver task useless, this
334 /// method uses traits to enforce at compile time that the function returns
335 /// some derivation of the sender. Currently, the only accepted wrapping is
336 /// through adding a fallback.
337 /// ```compile_fail
338 /// use tracing_forest::PrettyPrinter;
339 ///
340 /// # #[tokio::main]
341 /// # async fn main() {
342 /// tracing_forest::worker_task()
343 /// .map_sender(|_sender| {
344 /// // Some variation of the sender isn't returned, so this won't compile.
345 /// PrettyPrinter::new()
346 /// })
347 /// .build()
348 /// .on(async {
349 /// // ...
350 /// })
351 /// .await;
352 /// # }
353 /// ```
354 pub fn map_sender<F, Tx2>(self, f: F) -> Builder<Tx2, Rx, T>
355 where
356 F: FnOnce(Tx) -> Tx2,
357 Tx2: Processor + sealed::Sealed,
358 {
359 Builder {
360 sender_processor: f(self.sender_processor),
361 worker_processor: self.worker_processor,
362 receiver: self.receiver,
363 tag: self.tag,
364 is_global: self.is_global,
365 }
366 }
367
368 /// Set the [`TagParser`].
369 ///
370 /// # Examples
371 ///
372 /// ```
373 /// use tracing_forest::{util::*, Tag};
374 ///
375 /// fn simple_tag(event: &Event) -> Option<Tag> {
376 /// // -- snip --
377 /// # None
378 /// }
379 ///
380 /// #[tokio::main]
381 /// async fn main() {
382 /// tracing_forest::worker_task()
383 /// .set_tag(simple_tag)
384 /// .build()
385 /// .on(async {
386 /// // ...
387 /// })
388 /// .await;
389 /// }
390 /// ```
391 pub fn set_tag<T2>(self, tag: T2) -> Builder<Tx, Rx, T2>
392 where
393 T2: TagParser,
394 {
395 Builder {
396 sender_processor: self.sender_processor,
397 worker_processor: self.worker_processor,
398 receiver: self.receiver,
399 tag,
400 is_global: self.is_global,
401 }
402 }
403
404 /// Set whether or not the subscriber should be set globally.
405 ///
406 /// Setting the subscriber globally is intended for `main` functions, since
407 /// it allows logs to be be collected across multithreaded environments. Not
408 /// setting globally is intended for test functions, which need to set a new
409 /// subscriber multiple times in the same program.
410 ///
411 /// # Examples
412 ///
413 /// For multithreaded tests, `set_global` can be used so that the subscriber
414 /// applies to all the threads. However, each function that sets a global
415 /// subscriber must be in its own compilation unit, like an integration test,
416 /// otherwise the global subscriber will carry over across tests.
417 /// ```
418 /// #[tokio::test(flavor = "multi_thread")]
419 /// async fn test_multithreading() {
420 /// let logs = tracing_forest::capture()
421 /// .set_global(true)
422 /// .build()
423 /// .on(async {
424 /// // spawn some tasks
425 /// })
426 /// .await;
427 ///
428 /// // inspect logs...
429 /// }
430 /// ```
431 pub fn set_global(mut self, is_global: bool) -> Self {
432 self.is_global = is_global;
433 self
434 }
435
436 /// Finishes the `ForestLayer` by composing it into a [`Registry`], and
437 /// returns it as a [`Runtime`].
438 ///
439 /// This method is useful for a basic configuration of a `Subscriber`. For
440 /// a more advanced configuration, see the [`build_on`] and [`build_with`]
441 /// methods.
442 ///
443 /// [`build_on`]: Builder::build_on
444 /// [`build_with`]: Builder::build_with
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// #[tokio::main]
450 /// async fn main() {
451 /// tracing_forest::worker_task()
452 /// .build()
453 /// .on(async {
454 /// // ...
455 /// })
456 /// .await;
457 /// }
458 /// ```
459 pub fn build(self) -> Runtime<Layered<ForestLayer<Tx, T>, Registry>, Rx> {
460 self.build_on(|x| x)
461 }
462
463 /// Finishes the `ForestLayer` by calling a function to build a `Subscriber`,
464 /// and returns in as a [`Runtime`].
465 ///
466 /// Unlike [`build_with`], this method composes the layer onto a [`Registry`]
467 /// prior to passing it into the function. This makes it more convenient for
468 /// the majority of use cases.
469 ///
470 /// This method is useful for advanced configuration of `Subscriber`s as
471 /// defined in [`tracing-subscriber`s documentation]. For a basic configuration,
472 /// see the [`build`] method.
473 ///
474 /// [`build_with`]: Builder::build_with
475 /// [`tracing-subscriber`s documentation]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/layer/index.html#composing-layers
476 /// [`build`]: Builder::build
477 ///
478 /// # Examples
479 ///
480 /// Composing a `Subscriber` with multiple layers:
481 /// ```
482 /// use tracing_forest::{traits::*, util::*};
483 ///
484 /// #[tokio::main]
485 /// async fn main() {
486 /// tracing_forest::worker_task()
487 /// .build_on(|subscriber| subscriber.with(LevelFilter::INFO))
488 /// .on(async {
489 /// // ...
490 /// })
491 /// .await;
492 /// }
493 /// ```
494 pub fn build_on<F, S>(self, f: F) -> Runtime<S, Rx>
495 where
496 F: FnOnce(Layered<ForestLayer<Tx, T>, Registry>) -> S,
497 S: Subscriber,
498 {
499 self.build_with(|layer| f(Registry::default().with(layer)))
500 }
501
502 /// Finishes the `ForestLayer` by calling a function to build a `Subscriber`,
503 /// and returns it as a [`Runtime`].
504 ///
505 /// Unlike [`build_on`], this method passes the `ForestLayer` to the function
506 /// without presupposing a [`Registry`] base. This makes it the most flexible
507 /// option for construction.
508 ///
509 /// This method is useful for advanced configuration of `Subscriber`s as
510 /// defined in [`tracing-subscriber`s documentation]. For a basic configuration,
511 /// see the [`build`] method.
512 ///
513 /// [`build_on`]: Builder::build_on
514 /// [`tracing-subscriber`s documentation]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/layer/index.html#composing-layers
515 /// [`build`]: Builder::build
516 ///
517 /// # Examples
518 ///
519 /// Composing a `Subscriber` with multiple layers:
520 /// ```
521 /// use tracing_subscriber::Registry;
522 /// use tracing_forest::{traits::*, util::*};
523 ///
524 /// #[tokio::main]
525 /// async fn main() {
526 /// tracing_forest::worker_task()
527 /// .build_with(|layer: ForestLayer<_, _>| {
528 /// Registry::default()
529 /// .with(layer)
530 /// .with(LevelFilter::INFO)
531 /// })
532 /// .on(async {
533 /// // ...
534 /// })
535 /// .await;
536 /// }
537 /// ```
538 pub fn build_with<F, S>(self, f: F) -> Runtime<S, Rx>
539 where
540 F: FnOnce(ForestLayer<Tx, T>) -> S,
541 S: Subscriber,
542 {
543 let layer = ForestLayer::new(self.sender_processor, self.tag);
544 let subscriber = f(layer);
545
546 Runtime {
547 subscriber,
548 worker_processor: self.worker_processor,
549 receiver: self.receiver,
550 is_global: self.is_global,
551 }
552 }
553}
554
555/// Execute a `Future` in the context of a subscriber with a `ForestLayer`.
556///
557/// This type is returned by [`Builder::build`] and [`Builder::build_with`].
558pub struct Runtime<S, P> {
559 subscriber: S,
560 worker_processor: P, // either `Process<_>` or `Capture`
561 receiver: UnboundedReceiver<Tree>,
562 is_global: bool,
563}
564
565impl<S, P> Runtime<S, WorkerTask<P>>
566where
567 S: Subscriber + Send + Sync,
568 P: Processor + Send,
569{
570 /// Execute a future in the context of the configured subscriber.
571 pub async fn on<F: Future>(self, f: F) -> F::Output {
572 let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
573 let processor = self.worker_processor.0;
574 let mut receiver = self.receiver;
575
576 let handle = tokio::spawn(async move {
577 loop {
578 tokio::select! {
579 Some(tree) = receiver.recv() => processor.process(tree).expect(fail::PROCESSING_ERROR),
580 Ok(()) = &mut shutdown_rx => break,
581 else => break,
582 }
583 }
584
585 receiver.close();
586
587 // Drain any remaining logs in the channel buffer.
588 while let Ok(tree) = receiver.try_recv() {
589 processor.process(tree).expect(fail::PROCESSING_ERROR);
590 }
591 });
592
593 let output = {
594 let _guard = if self.is_global {
595 tracing::subscriber::set_global_default(self.subscriber)
596 .expect("global default already set");
597 None
598 } else {
599 Some(tracing::subscriber::set_default(self.subscriber))
600 };
601
602 f.await
603 };
604
605 shutdown_tx
606 .send(())
607 .expect("Shutdown signal couldn't send, this is a bug");
608
609 handle
610 .await
611 .expect("Failed to join the writing task, this is a bug");
612
613 output
614 }
615}
616
617impl<S> Runtime<S, Capture>
618where
619 S: Subscriber + Send + Sync,
620{
621 /// Execute a future in the context of the configured subscriber, and return
622 /// a `Vec<Tree>` of generated logs.
623 pub async fn on(self, f: impl Future<Output = ()>) -> Vec<Tree> {
624 {
625 let _guard = if self.is_global {
626 tracing::subscriber::set_global_default(self.subscriber)
627 .expect("global default already set");
628 None
629 } else {
630 Some(tracing::subscriber::set_default(self.subscriber))
631 };
632
633 f.await;
634 }
635
636 let mut receiver = self.receiver;
637
638 receiver.close();
639
640 iter::from_fn(|| receiver.try_recv().ok()).collect()
641 }
642}