tracing_modality/blocking/
layer.rs1use crate::common::options::Options;
2use crate::InitError;
3
4use crate::common::layer::{LayerHandler, LocalMetadata};
5use crate::ingest;
6use crate::ingest::{ModalityIngest, ModalityIngestThreadHandle, WrappedMessage};
7
8use anyhow::Context as _;
9use once_cell::sync::Lazy;
10use std::{cell::Cell, thread::LocalKey, thread_local};
11use tokio::sync::mpsc::{self, UnboundedSender};
12use tracing_core::Subscriber;
13use tracing_subscriber::{layer::SubscriberExt, Registry};
14use uuid::Uuid;
15
16pub struct ModalityLayer {
21 sender: UnboundedSender<WrappedMessage>,
22}
23
24impl ModalityLayer {
25 thread_local! {
26 static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
27 LocalMetadata {
28 thread_timeline: ingest::current_timeline(),
29 }
30 });
31 static THREAD_TIMELINE_INITIALIZED: Cell<bool> = const { Cell::new(false) };
32 }
33
34 pub fn init() -> Result<(Self, ModalityIngestThreadHandle), InitError> {
36 Self::init_with_options(Default::default())
37 }
38
39 pub fn init_with_options(
41 mut opts: Options,
42 ) -> Result<(Self, ModalityIngestThreadHandle), InitError> {
43 let run_id = Uuid::new_v4();
44 opts.add_metadata("run_id", run_id.to_string());
45
46 let ingest = ModalityIngest::connect(opts).context("connect to modality")?;
47 let ingest_handle = ingest.spawn_thread();
48 let sender = ingest_handle.ingest_sender.clone();
49
50 Ok((ModalityLayer { sender }, ingest_handle))
51 }
52
53 pub fn into_subscriber(self) -> impl Subscriber {
56 Registry::default().with(self)
57 }
58}
59
60impl LayerHandler for ModalityLayer {
61 fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
62 self.sender.send(msg)
63 }
64
65 fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
66 &Self::LOCAL_METADATA
67 }
68
69 fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
70 &Self::THREAD_TIMELINE_INITIALIZED
71 }
72}