1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use crate::common::options::Options;
use crate::InitError;
use crate::common::layer::{LayerHandler, LocalMetadata};
use crate::ingest;
use crate::ingest::{ModalityIngest, ModalityIngestTaskHandle, WrappedMessage};
use anyhow::Context as _;
use once_cell::sync::Lazy;
use std::{cell::Cell, thread::LocalKey, thread_local};
use tokio::sync::mpsc::{self, UnboundedSender};
use tracing_core::Subscriber;
use tracing_subscriber::{layer::SubscriberExt, Registry};
use uuid::Uuid;
pub struct ModalityLayer {
sender: UnboundedSender<WrappedMessage>,
}
impl ModalityLayer {
thread_local! {
static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
LocalMetadata {
thread_timeline: ingest::current_timeline(),
}
});
static THREAD_TIMELINE_INITIALIZED: Cell<bool> = Cell::new(false);
}
pub async fn init() -> Result<(Self, ModalityIngestTaskHandle), InitError> {
Self::init_with_options(Default::default()).await
}
pub async fn init_with_options(
mut opts: Options,
) -> Result<(Self, ModalityIngestTaskHandle), InitError> {
let run_id = Uuid::new_v4();
opts.add_metadata("run_id", run_id.to_string());
let ingest = ModalityIngest::async_connect(opts)
.await
.context("connect to modality")?;
let ingest_handle = ingest.spawn_task().await;
let sender = ingest_handle.ingest_sender.clone();
Ok((ModalityLayer { sender }, ingest_handle))
}
pub fn into_subscriber(self) -> impl Subscriber {
Registry::default().with(self)
}
}
impl LayerHandler for ModalityLayer {
fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
self.sender.send(msg)
}
fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
&Self::LOCAL_METADATA
}
fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
&Self::THREAD_TIMELINE_INITIALIZED
}
}