tracing_better_stack/
layer.rs1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
4use tracing::{Event, Level, Subscriber};
5use tracing_subscriber::Layer;
6use tracing_subscriber::layer::Context;
7use tracing_subscriber::registry::LookupSpan;
8
9use crate::batch::BatchProcessor;
10use crate::config::BetterStackConfig;
11use crate::log_event::{LogEvent, LogValue};
12
13pub struct BetterStackLayer {
39 sender: Arc<Mutex<Option<UnboundedSender<LogEvent>>>>,
41 config: Arc<BetterStackConfig>,
43}
44
45impl BetterStackLayer {
46 pub fn new(config: BetterStackConfig) -> Self {
66 Self {
67 sender: Arc::new(Mutex::new(None)),
68 config: Arc::new(config),
69 }
70 }
71
72 pub fn builder(
101 ingesting_host: impl Into<String>,
102 source_token: impl Into<String>,
103 ) -> crate::config::BetterStackConfigBuilder {
104 BetterStackConfig::builder(ingesting_host, source_token)
105 }
106
107 fn ensure_initialized(&self) {
112 let mut sender = self.sender.lock().unwrap();
113 if sender.is_none()
114 && let Ok(handle) = tokio::runtime::Handle::try_current()
115 {
116 let (tx, rx) = unbounded_channel();
117 let config = Arc::clone(&self.config);
118
119 handle.spawn(async move {
120 let processor = BatchProcessor::new(rx, (*config).clone());
121 processor.run().await;
122 });
123
124 *sender = Some(tx);
125 }
126 }
127
128 fn send_event(&self, event: LogEvent) {
134 self.ensure_initialized();
135
136 if let Some(tx) = self.sender.lock().unwrap().as_ref()
137 && tx.send(event).is_err()
138 {
139 eprintln!("Failed to send event to Better Stack: channel closed");
140 }
141 }
142}
143
144impl<S> Layer<S> for BetterStackLayer
145where
146 S: Subscriber + for<'a> LookupSpan<'a>,
147{
148 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
149 let mut fields = HashMap::new();
150 let mut visitor = FieldVisitor::new(&mut fields);
151 event.record(&mut visitor);
152
153 let level = match *event.metadata().level() {
154 Level::ERROR => "error",
155 Level::WARN => "warn",
156 Level::INFO => "info",
157 Level::DEBUG => "debug",
158 Level::TRACE => "trace",
159 };
160
161 let message = fields
163 .remove("message")
164 .and_then(|v| match v {
165 LogValue::String(s) => Some(s),
166 _ => None,
167 })
168 .unwrap_or_default();
169
170 let mut log_event = LogEvent::new(level, message).with_target(event.metadata().target());
172
173 if self.config.include_location {
175 let file = event.metadata().file().map(|s| s.to_string());
176 let line = event.metadata().line();
177 log_event = log_event.with_location(file, line);
178 }
179
180 if self.config.include_spans
182 && let Some(scope) = ctx.event_scope(event)
183 {
184 let mut span_fields = HashMap::new();
185 let mut span_name = String::new();
186
187 for span in scope.from_root() {
188 if span_name.is_empty() {
189 span_name = span.name().to_string();
190 }
191
192 let extensions = span.extensions();
193 if let Some(visitor) = extensions.get::<SpanFieldStorage>() {
195 span_fields.extend(visitor.fields.clone());
196 }
197 }
198
199 if !span_name.is_empty() && !span_fields.is_empty() {
200 log_event = log_event.with_span(span_name, span_fields);
201 }
202 }
203
204 for (key, value) in fields {
206 log_event.add_field(key, value);
207 }
208
209 self.send_event(log_event);
210 }
211
212 fn on_new_span(
213 &self,
214 attrs: &tracing::span::Attributes<'_>,
215 id: &tracing::span::Id,
216 ctx: Context<'_, S>,
217 ) {
218 if !self.config.include_spans {
219 return;
220 }
221
222 let span = ctx.span(id).unwrap();
223 let mut fields = HashMap::new();
224 let mut visitor = FieldVisitor::new(&mut fields);
225 attrs.record(&mut visitor);
226
227 let mut extensions = span.extensions_mut();
228 extensions.insert(SpanFieldStorage { fields });
229 }
230}
231
232struct SpanFieldStorage {
233 fields: HashMap<String, LogValue>,
234}
235
236struct FieldVisitor<'a> {
237 fields: &'a mut HashMap<String, LogValue>,
238}
239
240impl<'a> FieldVisitor<'a> {
241 fn new(fields: &'a mut HashMap<String, LogValue>) -> Self {
242 Self { fields }
243 }
244}
245
246impl<'a> tracing::field::Visit for FieldVisitor<'a> {
247 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
248 self.fields.insert(
249 field.name().to_string(),
250 LogValue::String(value.to_string()),
251 );
252 }
253
254 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
255 self.fields
256 .insert(field.name().to_string(), LogValue::Number(value));
257 }
258
259 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
260 self.fields
261 .insert(field.name().to_string(), LogValue::Number(value as i64));
262 }
263
264 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
265 self.fields
266 .insert(field.name().to_string(), LogValue::Bool(value));
267 }
268
269 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
270 self.fields
271 .insert(field.name().to_string(), LogValue::Float(value));
272 }
273
274 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
275 self.fields.insert(
276 field.name().to_string(),
277 LogValue::String(format!("{:?}", value)),
278 );
279 }
280}