tracing_modality/async/
layer.rs1use crate::common::options::Options;
2use crate::InitError;
3
4use crate::common::layer::{LayerHandler, LocalMetadata};
5use crate::ingest;
6use crate::ingest::{ModalityIngest, ModalityIngestTaskHandle, WrappedMessage};
7
8use anyhow::Context as _;
9use once_cell::sync::Lazy;
10use std::{cell::Cell, thread::LocalKey, thread_local};
11use tokio::sync::mpsc::{self, UnboundedSender};
12use tracing_core::Subscriber;
13use tracing_subscriber::{layer::SubscriberExt, Registry};
14use uuid::Uuid;
15
16pub struct ModalityLayer {
21 sender: UnboundedSender<WrappedMessage>,
22}
23
24impl ModalityLayer {
25 thread_local! {
26 static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
27 LocalMetadata {
28 thread_timeline: ingest::current_timeline(),
29 }
30 });
31 static THREAD_TIMELINE_INITIALIZED: Cell<bool> =const { Cell::new(false) };
32 }
33
34 pub async fn init() -> Result<(Self, ModalityIngestTaskHandle), InitError> {
36 Self::init_with_options(Default::default()).await
37 }
38
39 pub async fn init_with_options(
41 mut opts: Options,
42 ) -> Result<(Self, ModalityIngestTaskHandle), InitError> {
43 let run_id = Uuid::new_v4();
44 opts.add_metadata("run_id", run_id.to_string());
45
46 let ingest = ModalityIngest::async_connect(opts)
47 .await
48 .context("connect to modality")?;
49 let ingest_handle = ingest.spawn_task().await;
50 let sender = ingest_handle.ingest_sender.clone();
51
52 Ok((ModalityLayer { sender }, ingest_handle))
53 }
54
55 pub fn into_subscriber(self) -> impl Subscriber {
58 Registry::default().with(self)
59 }
60}
61
62impl LayerHandler for ModalityLayer {
63 fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
64 self.sender.send(msg)
65 }
66
67 fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
68 &Self::LOCAL_METADATA
69 }
70
71 fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
72 &Self::THREAD_TIMELINE_INITIALIZED
73 }
74}