log_tracing_layer/
layer.rs1use crate::log_ingestor::Log;
2use crate::log_ingestor::LogIngestor;
3use crate::visitor::JsonVisitor;
4use serde_json::json;
5use serde_json::Map;
6use serde_json::Value;
7use tokio::sync::mpsc::unbounded_channel;
8use tracing::span;
9use tracing::Subscriber;
10use tracing_subscriber::registry::LookupSpan;
11use tracing_subscriber::Layer;
12
13#[allow(clippy::module_name_repetitions)]
14#[derive(Debug)]
15pub struct LogLayer {
16 tx: Option<tokio::sync::mpsc::UnboundedSender<Log>>,
17 handle: Option<std::thread::JoinHandle<()>>,
18}
19
20impl LogLayer {
21 pub fn new<I>(mut ingestor: I) -> Self
22 where
23 I: LogIngestor + 'static,
24 {
25 let (tx, mut rx) = unbounded_channel::<Log>();
26 let handle = std::thread::Builder::new()
28 .name(ingestor.name().into())
29 .spawn(move || {
30 let rt = match tokio::runtime::Builder::new_current_thread()
31 .enable_all()
32 .build()
33 {
34 Err(e) => {
35 eprintln!("Runtime creation failure: {e:?}");
36 return;
37 }
38 Ok(r) => r,
39 };
40
41 rt.block_on(async move {
42 ingestor.start();
43 while let Some(log) = rx.recv().await {
44 ingestor.ingest(log).await;
45 }
46 ingestor.flush().await;
47 });
48 drop(rt);
49 })
50 .expect("Something went wrong spawning the thread");
51
52 Self {
53 tx: Some(tx),
54 handle: Some(handle),
55 }
56 }
57
58 fn create_log<S: Subscriber + for<'a> LookupSpan<'a>>(
59 event: &tracing::Event<'_>,
60 ctx: &tracing_subscriber::layer::Context<'_, S>,
61 ) -> Map<String, Value> {
62 let mut log: Map<String, Value> = Map::new();
63 let mut spans: Vec<Map<String, Value>> = vec![];
64
65 if let Some(scope) = ctx.event_scope(event) {
66 for span in scope.from_root() {
67 let mut new_span: Map<String, Value> = Map::new();
68 new_span.insert("name".to_string(), json!(span.name()));
69 if let Some(fields) = span.extensions_mut().get_mut::<Map<String, Value>>() {
70 new_span.append(fields);
71 }
72 spans.push(new_span);
73 }
74 }
75
76 if let Some(last) = spans.last() {
78 log.insert("span".to_string(), json!(last));
79 log.insert("spans".to_string(), json!(spans));
80 }
81
82 log.insert(
83 "level".to_string(),
84 json!(event.metadata().level().as_str()),
85 );
86 log.insert("target".to_string(), json!(event.metadata().target()));
87
88 if let Some(file) = event.metadata().file() {
89 log.insert("file".to_string(), json!(file));
90 }
91 if let Some(line) = event.metadata().line() {
92 log.insert("line".to_string(), json!(line));
93 }
94
95 let mut visitor = JsonVisitor::default();
96 event.record(&mut visitor);
97
98 visitor.fields.iter().for_each(|(k, v)| {
99 log.insert(k.clone(), v.clone());
100 });
101
102 log.insert(
103 "timestamp".to_string(),
104 json!(chrono::Utc::now().to_rfc3339()),
105 );
106
107 log
108 }
109}
110
111impl Drop for LogLayer {
112 fn drop(&mut self) {
113 if let Some(tx) = self.tx.take() {
115 drop(tx);
116 }
117 if let Some(handle) = self.handle.take() {
119 let _result = handle.join();
120 }
121 }
122}
123
124impl<S> Layer<S> for LogLayer
125where
126 S: Subscriber + for<'a> LookupSpan<'a>,
127{
128 fn on_new_span(
129 &self,
130 attrs: &span::Attributes<'_>,
131 id: &span::Id,
132 ctx: tracing_subscriber::layer::Context<'_, S>,
133 ) {
134 let span = ctx.span(id).expect("Span not found, this is a bug");
135 let mut extensions = span.extensions_mut();
136 let mut visitor = JsonVisitor::default();
139 attrs.record(&mut visitor);
140 extensions.insert(visitor.fields);
141 }
142
143 fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
144 if let Some(tx) = &self.tx {
146 let log = Self::create_log(event, &ctx);
147 if let Err(e) = tx.send(log) {
148 eprintln!("LAYER: Error sending log to ingestor: {e:?}");
149 }
150 }
151 }
152}