log_tracing_layer/
layer.rs

1use crate::log_ingestor::Log;
2use crate::log_ingestor::LogIngestor;
3use crate::visitor::JsonVisitor;
4use serde_json::json;
5use serde_json::Map;
6use serde_json::Value;
7use tokio::sync::mpsc::unbounded_channel;
8use tracing::span;
9use tracing::Subscriber;
10use tracing_subscriber::registry::LookupSpan;
11use tracing_subscriber::Layer;
12
13#[allow(clippy::module_name_repetitions)]
14#[derive(Debug)]
15pub struct LogLayer {
16    tx: Option<tokio::sync::mpsc::UnboundedSender<Log>>,
17    handle: Option<std::thread::JoinHandle<()>>,
18}
19
20impl LogLayer {
21    /// Creates a new `LogLayer` with the provided log ingestor.
22    ///
23    /// This method spawns a dedicated thread running a Tokio runtime to handle log ingestion asynchronously.
24    /// The ingestor will receive logs sent through an unbounded channel, process them, and flush on shutdown.
25    ///
26    /// # Parameters
27    ///
28    /// - `ingestor`: An instance implementing the [`LogIngestor`] trait, responsible for handling log events.
29    ///
30    /// # Returns
31    ///
32    /// A new `LogLayer` instance with the ingestion thread running.
33    ///
34    /// # Panics
35    ///
36    /// Panics if the ingestion thread cannot be spawned.
37    pub fn new<I>(mut ingestor: I) -> Self
38    where
39        I: LogIngestor + 'static,
40    {
41        let (tx, mut rx) = unbounded_channel::<Log>();
42        // create a separate thread to manage log ingestion
43        let handle = std::thread::Builder::new()
44            .name(ingestor.name().into())
45            .spawn(move || {
46                let rt = match tokio::runtime::Builder::new_current_thread()
47                    .enable_all()
48                    .build()
49                {
50                    Err(e) => {
51                        eprintln!("Runtime creation failure: {e:?}");
52                        return;
53                    }
54                    Ok(r) => r,
55                };
56
57                rt.block_on(async move {
58                    ingestor.start();
59                    while let Some(log) = rx.recv().await {
60                        ingestor.ingest(log).await;
61                    }
62                    ingestor.flush().await;
63                });
64                drop(rt);
65            })
66            .expect("Something went wrong spawning the thread");
67
68        Self {
69            tx: Some(tx),
70            handle: Some(handle),
71        }
72    }
73
74    fn create_log<S: Subscriber + for<'a> LookupSpan<'a>>(
75        event: &tracing::Event<'_>,
76        ctx: &tracing_subscriber::layer::Context<'_, S>,
77    ) -> Map<String, Value> {
78        let mut log: Map<String, Value> = Map::new();
79        let mut spans: Vec<Map<String, Value>> = vec![];
80
81        if let Some(scope) = ctx.event_scope(event) {
82            for span in scope.from_root() {
83                let mut new_span: Map<String, Value> = Map::new();
84                new_span.insert("name".to_string(), json!(span.name()));
85
86                if let Some(fields) = span.extensions().get::<Map<String, Value>>() {
87                    fields.iter().for_each(|(k, v)| {
88                        new_span.insert(k.clone(), v.clone());
89                    });
90                }
91
92                spans.push(new_span);
93            }
94        }
95
96        // if no last span, it means there are no spans at all
97        if let Some(last) = spans.last() {
98            log.insert("span".to_string(), json!(last));
99            log.insert("spans".to_string(), json!(spans));
100        }
101
102        log.insert(
103            "level".to_string(),
104            json!(event.metadata().level().as_str()),
105        );
106        log.insert("target".to_string(), json!(event.metadata().target()));
107
108        if let Some(file) = event.metadata().file() {
109            log.insert("file".to_string(), json!(file));
110        }
111        if let Some(line) = event.metadata().line() {
112            log.insert("line".to_string(), json!(line));
113        }
114
115        let mut visitor = JsonVisitor::default();
116        event.record(&mut visitor);
117
118        visitor.fields.iter().for_each(|(k, v)| {
119            log.insert(k.clone(), v.clone());
120        });
121
122        log.insert(
123            "timestamp".to_string(),
124            json!(chrono::Utc::now().to_rfc3339()),
125        );
126
127        log
128    }
129}
130
131impl Drop for LogLayer {
132    fn drop(&mut self) {
133        // closing the channel
134        if let Some(tx) = self.tx.take() {
135            drop(tx);
136        }
137        // waiting for the thread to finish
138        if let Some(handle) = self.handle.take() {
139            let _result = handle.join();
140        }
141    }
142}
143
144impl<S> Layer<S> for LogLayer
145where
146    S: Subscriber + for<'a> LookupSpan<'a>,
147{
148    fn on_new_span(
149        &self,
150        attrs: &span::Attributes<'_>,
151        id: &span::Id,
152        ctx: tracing_subscriber::layer::Context<'_, S>,
153    ) {
154        let span = ctx.span(id).expect("Span not found, this is a bug");
155        let mut extensions = span.extensions_mut();
156        // visit values and insert them into extensions as serde_json::Map<String, serde_json::Value>
157        // this way, we will be able to access them later
158        let mut visitor = JsonVisitor::default();
159        attrs.record(&mut visitor);
160        extensions.insert(visitor.fields);
161    }
162
163    fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
164        // send to the channel
165        if let Some(tx) = &self.tx {
166            let log = Self::create_log(event, &ctx);
167            if let Err(e) = tx.send(log) {
168                eprintln!("LAYER: Error sending log to ingestor: {e:?}");
169            }
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use async_trait::async_trait;
178    use std::sync::Arc;
179    use std::sync::RwLock;
180    use tracing::info;
181    use tracing_subscriber::layer::SubscriberExt;
182    use tracing_subscriber::util::SubscriberInitExt;
183
184    pub struct TestLogIngestor {
185        pub vec: Arc<RwLock<Vec<Value>>>,
186    }
187
188    impl TestLogIngestor {
189        pub fn new(vec: Arc<RwLock<Vec<Value>>>) -> Self {
190            Self { vec }
191        }
192    }
193
194    #[async_trait]
195    impl LogIngestor for TestLogIngestor {
196        fn name(&self) -> &'static str {
197            "TestLogIngestor"
198        }
199        fn start(&self) {}
200        async fn ingest(&mut self, log: Log) {
201            self.vec
202                .write()
203                .unwrap()
204                .push(log.get("span").unwrap().clone());
205        }
206        async fn flush(&mut self) {}
207    }
208
209    // Credits to https://github.com/DirkRusche
210    // See https://github.com/robertohuertasm/log-tracing-layer/issues/7#issuecomment-3161295281
211    #[test]
212    fn should_have_span_in_second_log() {
213        // given
214        let vec = Arc::new(RwLock::new(vec![]));
215        let test_log_ingestor = TestLogIngestor::new(vec.clone());
216        let log = LogLayer::new(test_log_ingestor);
217
218        let _subscriber = tracing_subscriber::registry()
219            .with(tracing_subscriber::fmt::Layer::new())
220            .with(log)
221            .init();
222
223        // when
224        let span = span!(tracing::Level::INFO, "test", "foo" = "bar");
225        span.in_scope(|| {
226            info!("log1");
227            info!("log2");
228        });
229
230        // to make sure the ingestion is finished
231        std::thread::sleep(std::time::Duration::from_millis(100));
232
233        // then
234        let read_lock = vec.read().unwrap();
235
236        println!("READ: {:?}", read_lock);
237
238        assert_eq!(read_lock.len(), 2);
239
240        assert_eq!(read_lock[0]["name"].as_str().unwrap(), "test");
241        assert_eq!(read_lock[0]["foo"].as_str().unwrap(), "bar");
242
243        assert_eq!(read_lock[1]["name"].as_str().unwrap(), "test");
244        assert_eq!(read_lock[1]["foo"].as_str().unwrap(), "bar");
245    }
246}