tracing_modality/async/
layer.rs

1use crate::common::options::Options;
2use crate::InitError;
3
4use crate::common::layer::{LayerHandler, LocalMetadata};
5use crate::ingest;
6use crate::ingest::{ModalityIngest, ModalityIngestTaskHandle, WrappedMessage};
7
8use anyhow::Context as _;
9use once_cell::sync::Lazy;
10use std::{cell::Cell, thread::LocalKey, thread_local};
11use tokio::sync::mpsc::{self, UnboundedSender};
12use tracing_core::Subscriber;
13use tracing_subscriber::{layer::SubscriberExt, Registry};
14use uuid::Uuid;
15
16/// A `tracing` `Layer` that can be used to record trace events and stream them to modality in real
17/// time.
18///
19/// Can be transformed into a `Subscriber` with [`ModalityLayer::into_subscriber()`].
20pub struct ModalityLayer {
21    sender: UnboundedSender<WrappedMessage>,
22}
23
24impl ModalityLayer {
25    thread_local! {
26        static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
27            LocalMetadata {
28                thread_timeline: ingest::current_timeline(),
29            }
30        });
31        static THREAD_TIMELINE_INITIALIZED: Cell<bool> =const {  Cell::new(false) };
32    }
33
34    /// Initialize a new `ModalityLayer`, with default options.
35    pub async fn init() -> Result<(Self, ModalityIngestTaskHandle), InitError> {
36        Self::init_with_options(Default::default()).await
37    }
38
39    /// Initialize a new `ModalityLayer`, with specified options.
40    pub async fn init_with_options(
41        mut opts: Options,
42    ) -> Result<(Self, ModalityIngestTaskHandle), InitError> {
43        let run_id = Uuid::new_v4();
44        opts.add_metadata("run_id", run_id.to_string());
45
46        let ingest = ModalityIngest::async_connect(opts)
47            .await
48            .context("connect to modality")?;
49        let ingest_handle = ingest.spawn_task().await;
50        let sender = ingest_handle.ingest_sender.clone();
51
52        Ok((ModalityLayer { sender }, ingest_handle))
53    }
54
55    /// Convert this `Layer` into a `Subscriber`by by layering it on a new instace of `tracing`'s
56    /// `Registry`.
57    pub fn into_subscriber(self) -> impl Subscriber {
58        Registry::default().with(self)
59    }
60}
61
62impl LayerHandler for ModalityLayer {
63    fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
64        self.sender.send(msg)
65    }
66
67    fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
68        &Self::LOCAL_METADATA
69    }
70
71    fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
72        &Self::THREAD_TIMELINE_INITIALIZED
73    }
74}