log_tracing_layer/
layer.rs

1use crate::log_ingestor::Log;
2use crate::log_ingestor::LogIngestor;
3use crate::visitor::JsonVisitor;
4use serde_json::json;
5use serde_json::Map;
6use serde_json::Value;
7use tokio::sync::mpsc::unbounded_channel;
8use tracing::span;
9use tracing::Subscriber;
10use tracing_subscriber::registry::LookupSpan;
11use tracing_subscriber::Layer;
12
13#[allow(clippy::module_name_repetitions)]
14#[derive(Debug)]
15pub struct LogLayer {
16    tx: Option<tokio::sync::mpsc::UnboundedSender<Log>>,
17    handle: Option<std::thread::JoinHandle<()>>,
18}
19
20impl LogLayer {
21    pub fn new<I>(mut ingestor: I) -> Self
22    where
23        I: LogIngestor + 'static,
24    {
25        let (tx, mut rx) = unbounded_channel::<Log>();
26        // create a separate thread to manage log ingestion
27        let handle = std::thread::Builder::new()
28            .name(ingestor.name().into())
29            .spawn(move || {
30                let rt = match tokio::runtime::Builder::new_current_thread()
31                    .enable_all()
32                    .build()
33                {
34                    Err(e) => {
35                        eprintln!("Runtime creation failure: {e:?}");
36                        return;
37                    }
38                    Ok(r) => r,
39                };
40
41                rt.block_on(async move {
42                    ingestor.start();
43                    while let Some(log) = rx.recv().await {
44                        ingestor.ingest(log).await;
45                    }
46                    ingestor.flush().await;
47                });
48                drop(rt);
49            })
50            .expect("Something went wrong spawning the thread");
51
52        Self {
53            tx: Some(tx),
54            handle: Some(handle),
55        }
56    }
57
58    fn create_log<S: Subscriber + for<'a> LookupSpan<'a>>(
59        event: &tracing::Event<'_>,
60        ctx: &tracing_subscriber::layer::Context<'_, S>,
61    ) -> Map<String, Value> {
62        let mut log: Map<String, Value> = Map::new();
63        let mut spans: Vec<Map<String, Value>> = vec![];
64
65        if let Some(scope) = ctx.event_scope(event) {
66            for span in scope.from_root() {
67                let mut new_span: Map<String, Value> = Map::new();
68                new_span.insert("name".to_string(), json!(span.name()));
69                if let Some(fields) = span.extensions_mut().get_mut::<Map<String, Value>>() {
70                    new_span.append(fields);
71                }
72                spans.push(new_span);
73            }
74        }
75
76        // if no last span, it means there are no spans at all
77        if let Some(last) = spans.last() {
78            log.insert("span".to_string(), json!(last));
79            log.insert("spans".to_string(), json!(spans));
80        }
81
82        log.insert(
83            "level".to_string(),
84            json!(event.metadata().level().as_str()),
85        );
86        log.insert("target".to_string(), json!(event.metadata().target()));
87
88        if let Some(file) = event.metadata().file() {
89            log.insert("file".to_string(), json!(file));
90        }
91        if let Some(line) = event.metadata().line() {
92            log.insert("line".to_string(), json!(line));
93        }
94
95        let mut visitor = JsonVisitor::default();
96        event.record(&mut visitor);
97
98        visitor.fields.iter().for_each(|(k, v)| {
99            log.insert(k.clone(), v.clone());
100        });
101
102        log.insert(
103            "timestamp".to_string(),
104            json!(chrono::Utc::now().to_rfc3339()),
105        );
106
107        log
108    }
109}
110
111impl Drop for LogLayer {
112    fn drop(&mut self) {
113        // closing the channel
114        if let Some(tx) = self.tx.take() {
115            drop(tx);
116        }
117        // waiting for the thread to finish
118        if let Some(handle) = self.handle.take() {
119            let _result = handle.join();
120        }
121    }
122}
123
124impl<S> Layer<S> for LogLayer
125where
126    S: Subscriber + for<'a> LookupSpan<'a>,
127{
128    fn on_new_span(
129        &self,
130        attrs: &span::Attributes<'_>,
131        id: &span::Id,
132        ctx: tracing_subscriber::layer::Context<'_, S>,
133    ) {
134        let span = ctx.span(id).expect("Span not found, this is a bug");
135        let mut extensions = span.extensions_mut();
136        // visit values and insert them into extensions as serde_json::Map<String, serde_json::Value>
137        // this way, we will be able to access them later
138        let mut visitor = JsonVisitor::default();
139        attrs.record(&mut visitor);
140        extensions.insert(visitor.fields);
141    }
142
143    fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
144        // send to the channel
145        if let Some(tx) = &self.tx {
146            let log = Self::create_log(event, &ctx);
147            if let Err(e) = tx.send(log) {
148                eprintln!("LAYER: Error sending log to ingestor: {e:?}");
149            }
150        }
151    }
152}