log_tracing_layer/
layer.rs1use crate::log_ingestor::Log;
2use crate::log_ingestor::LogIngestor;
3use crate::visitor::JsonVisitor;
4use serde_json::json;
5use serde_json::Map;
6use serde_json::Value;
7use tokio::sync::mpsc::unbounded_channel;
8use tracing::span;
9use tracing::Subscriber;
10use tracing_subscriber::registry::LookupSpan;
11use tracing_subscriber::Layer;
12
13#[allow(clippy::module_name_repetitions)]
14#[derive(Debug)]
15pub struct LogLayer {
16 tx: Option<tokio::sync::mpsc::UnboundedSender<Log>>,
17 handle: Option<std::thread::JoinHandle<()>>,
18}
19
20impl LogLayer {
21 pub fn new<I>(mut ingestor: I) -> Self
38 where
39 I: LogIngestor + 'static,
40 {
41 let (tx, mut rx) = unbounded_channel::<Log>();
42 let handle = std::thread::Builder::new()
44 .name(ingestor.name().into())
45 .spawn(move || {
46 let rt = match tokio::runtime::Builder::new_current_thread()
47 .enable_all()
48 .build()
49 {
50 Err(e) => {
51 eprintln!("Runtime creation failure: {e:?}");
52 return;
53 }
54 Ok(r) => r,
55 };
56
57 rt.block_on(async move {
58 ingestor.start();
59 while let Some(log) = rx.recv().await {
60 ingestor.ingest(log).await;
61 }
62 ingestor.flush().await;
63 });
64 drop(rt);
65 })
66 .expect("Something went wrong spawning the thread");
67
68 Self {
69 tx: Some(tx),
70 handle: Some(handle),
71 }
72 }
73
74 fn create_log<S: Subscriber + for<'a> LookupSpan<'a>>(
75 event: &tracing::Event<'_>,
76 ctx: &tracing_subscriber::layer::Context<'_, S>,
77 ) -> Map<String, Value> {
78 let mut log: Map<String, Value> = Map::new();
79 let mut spans: Vec<Map<String, Value>> = vec![];
80
81 if let Some(scope) = ctx.event_scope(event) {
82 for span in scope.from_root() {
83 let mut new_span: Map<String, Value> = Map::new();
84 new_span.insert("name".to_string(), json!(span.name()));
85
86 if let Some(fields) = span.extensions().get::<Map<String, Value>>() {
87 fields.iter().for_each(|(k, v)| {
88 new_span.insert(k.clone(), v.clone());
89 });
90 }
91
92 spans.push(new_span);
93 }
94 }
95
96 if let Some(last) = spans.last() {
98 log.insert("span".to_string(), json!(last));
99 log.insert("spans".to_string(), json!(spans));
100 }
101
102 log.insert(
103 "level".to_string(),
104 json!(event.metadata().level().as_str()),
105 );
106 log.insert("target".to_string(), json!(event.metadata().target()));
107
108 if let Some(file) = event.metadata().file() {
109 log.insert("file".to_string(), json!(file));
110 }
111 if let Some(line) = event.metadata().line() {
112 log.insert("line".to_string(), json!(line));
113 }
114
115 let mut visitor = JsonVisitor::default();
116 event.record(&mut visitor);
117
118 visitor.fields.iter().for_each(|(k, v)| {
119 log.insert(k.clone(), v.clone());
120 });
121
122 log.insert(
123 "timestamp".to_string(),
124 json!(chrono::Utc::now().to_rfc3339()),
125 );
126
127 log
128 }
129}
130
131impl Drop for LogLayer {
132 fn drop(&mut self) {
133 if let Some(tx) = self.tx.take() {
135 drop(tx);
136 }
137 if let Some(handle) = self.handle.take() {
139 let _result = handle.join();
140 }
141 }
142}
143
144impl<S> Layer<S> for LogLayer
145where
146 S: Subscriber + for<'a> LookupSpan<'a>,
147{
148 fn on_new_span(
149 &self,
150 attrs: &span::Attributes<'_>,
151 id: &span::Id,
152 ctx: tracing_subscriber::layer::Context<'_, S>,
153 ) {
154 let span = ctx.span(id).expect("Span not found, this is a bug");
155 let mut extensions = span.extensions_mut();
156 let mut visitor = JsonVisitor::default();
159 attrs.record(&mut visitor);
160 extensions.insert(visitor.fields);
161 }
162
163 fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
164 if let Some(tx) = &self.tx {
166 let log = Self::create_log(event, &ctx);
167 if let Err(e) = tx.send(log) {
168 eprintln!("LAYER: Error sending log to ingestor: {e:?}");
169 }
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use async_trait::async_trait;
178 use std::sync::Arc;
179 use std::sync::RwLock;
180 use tracing::info;
181 use tracing_subscriber::layer::SubscriberExt;
182 use tracing_subscriber::util::SubscriberInitExt;
183
184 pub struct TestLogIngestor {
185 pub vec: Arc<RwLock<Vec<Value>>>,
186 }
187
188 impl TestLogIngestor {
189 pub fn new(vec: Arc<RwLock<Vec<Value>>>) -> Self {
190 Self { vec }
191 }
192 }
193
194 #[async_trait]
195 impl LogIngestor for TestLogIngestor {
196 fn name(&self) -> &'static str {
197 "TestLogIngestor"
198 }
199 fn start(&self) {}
200 async fn ingest(&mut self, log: Log) {
201 self.vec
202 .write()
203 .unwrap()
204 .push(log.get("span").unwrap().clone());
205 }
206 async fn flush(&mut self) {}
207 }
208
209 #[test]
212 fn should_have_span_in_second_log() {
213 let vec = Arc::new(RwLock::new(vec![]));
215 let test_log_ingestor = TestLogIngestor::new(vec.clone());
216 let log = LogLayer::new(test_log_ingestor);
217
218 let _subscriber = tracing_subscriber::registry()
219 .with(tracing_subscriber::fmt::Layer::new())
220 .with(log)
221 .init();
222
223 let span = span!(tracing::Level::INFO, "test", "foo" = "bar");
225 span.in_scope(|| {
226 info!("log1");
227 info!("log2");
228 });
229
230 std::thread::sleep(std::time::Duration::from_millis(100));
232
233 let read_lock = vec.read().unwrap();
235
236 println!("READ: {:?}", read_lock);
237
238 assert_eq!(read_lock.len(), 2);
239
240 assert_eq!(read_lock[0]["name"].as_str().unwrap(), "test");
241 assert_eq!(read_lock[0]["foo"].as_str().unwrap(), "bar");
242
243 assert_eq!(read_lock[1]["name"].as_str().unwrap(), "test");
244 assert_eq!(read_lock[1]["foo"].as_str().unwrap(), "bar");
245 }
246}