tracing_modality/blocking/
layer.rs

1use crate::common::options::Options;
2use crate::InitError;
3
4use crate::common::layer::{LayerHandler, LocalMetadata};
5use crate::ingest;
6use crate::ingest::{ModalityIngest, ModalityIngestThreadHandle, WrappedMessage};
7
8use anyhow::Context as _;
9use once_cell::sync::Lazy;
10use std::{cell::Cell, thread::LocalKey, thread_local};
11use tokio::sync::mpsc::{self, UnboundedSender};
12use tracing_core::Subscriber;
13use tracing_subscriber::{layer::SubscriberExt, Registry};
14use uuid::Uuid;
15
16/// A `tracing` `Layer` that can be used to record trace events and stream them to modality in real
17/// time.
18///
19/// Can be transformed into a `Subscriber` with [`ModalityLayer::into_subscriber()`].
20pub struct ModalityLayer {
21    sender: UnboundedSender<WrappedMessage>,
22}
23
24impl ModalityLayer {
25    thread_local! {
26        static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
27            LocalMetadata {
28                thread_timeline: ingest::current_timeline(),
29            }
30        });
31        static THREAD_TIMELINE_INITIALIZED: Cell<bool> = const { Cell::new(false) };
32    }
33
34    /// Initialize a new `ModalityLayer`, with default options.
35    pub fn init() -> Result<(Self, ModalityIngestThreadHandle), InitError> {
36        Self::init_with_options(Default::default())
37    }
38
39    /// Initialize a new `ModalityLayer`, with specified options.
40    pub fn init_with_options(
41        mut opts: Options,
42    ) -> Result<(Self, ModalityIngestThreadHandle), InitError> {
43        let run_id = Uuid::new_v4();
44        opts.add_metadata("run_id", run_id.to_string());
45
46        let ingest = ModalityIngest::connect(opts).context("connect to modality")?;
47        let ingest_handle = ingest.spawn_thread();
48        let sender = ingest_handle.ingest_sender.clone();
49
50        Ok((ModalityLayer { sender }, ingest_handle))
51    }
52
53    /// Convert this `Layer` into a `Subscriber`by by layering it on a new instace of `tracing`'s
54    /// `Registry`.
55    pub fn into_subscriber(self) -> impl Subscriber {
56        Registry::default().with(self)
57    }
58}
59
60impl LayerHandler for ModalityLayer {
61    fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
62        self.sender.send(msg)
63    }
64
65    fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
66        &Self::LOCAL_METADATA
67    }
68
69    fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
70        &Self::THREAD_TIMELINE_INITIALIZED
71    }
72}