1#![warn(missing_debug_implementations, missing_docs)]
2
3mod connection;
57mod visitor;
58
59use std::{borrow::Cow, collections::HashMap, fmt::Display};
60
61use bytes::Bytes;
62use serde_json::{map::Map, Value};
63use tokio::net::ToSocketAddrs;
64use tokio::sync::mpsc;
65use tokio_stream::wrappers::ReceiverStream;
66use tracing_core::{
67 dispatcher::SetGlobalDefaultError,
68 span::{Attributes, Id, Record},
69 Event, Subscriber,
70};
71use tracing_subscriber::{
72 layer::{Context, Layer},
73 registry::LookupSpan,
74 Registry,
75};
76
77pub use connection::*;
78
79const DEFAULT_BUFFER: usize = 512;
80const DEFAULT_VERSION: &str = "1.1";
81const DEFAULT_SHORT_MESSAGE: &str = "null";
82
83#[derive(Debug)]
85pub struct Logger {
86 base_object: HashMap<Cow<'static, str>, Value>,
87 line_numbers: bool,
88 file_names: bool,
89 module_paths: bool,
90 spans: bool,
91 sender: mpsc::Sender<Bytes>,
92}
93
94impl Logger {
95 pub fn builder() -> Builder {
97 Builder::default()
98 }
99}
100
101#[derive(Debug, thiserror::Error)]
103#[non_exhaustive]
104pub enum BuilderError {
105 #[error("hostname resolution failed")]
107 HostnameResolution(#[source] std::io::Error),
108 #[error("hostname could not be parsed as an OsString: {}", .0.to_string_lossy().as_ref())]
110 OsString(std::ffi::OsString),
111 #[error("global dispatcher failed to initialize")]
113 Global(#[source] SetGlobalDefaultError),
114}
115
116#[derive(Debug)]
118pub struct Builder {
119 additional_fields: HashMap<Cow<'static, str>, Value>,
120 version: Option<String>,
121 host: Option<String>,
122 file_names: bool,
123 line_numbers: bool,
124 module_paths: bool,
125 spans: bool,
126 buffer: Option<usize>,
127}
128
129impl Default for Builder {
130 fn default() -> Self {
131 Builder {
132 additional_fields: HashMap::with_capacity(32),
133 version: None,
134 host: None,
135 file_names: true,
136 line_numbers: true,
137 module_paths: true,
138 spans: true,
139 buffer: None,
140 }
141 }
142}
143
144impl Builder {
145 pub fn additional_field<K, V>(mut self, key: K, value: V) -> Self
147 where
148 K: Display,
149 V: Into<Value>,
150 {
151 let coerced_value: Value = match value.into() {
152 Value::Number(n) => Value::Number(n),
153 Value::String(x) => Value::String(x),
154 x => Value::String(x.to_string()),
155 };
156 self.additional_fields
157 .insert(format!("_{}", key).into(), coerced_value);
158 self
159 }
160
161 pub fn default_short_message<V: ToString>(mut self, short_message: V) -> Self {
166 self.additional_fields
167 .insert("short_message".into(), short_message.to_string().into());
168 self
169 }
170
171 pub fn version<V>(mut self, version: V) -> Self
173 where
174 V: ToString,
175 {
176 self.version = Some(version.to_string());
177 self
178 }
179
180 pub fn host<V>(mut self, host: V) -> Self
182 where
183 V: ToString,
184 {
185 self.host = Some(host.to_string());
186 self
187 }
188
189 pub fn line_numbers(mut self, value: bool) -> Self {
191 self.line_numbers = value;
192 self
193 }
194
195 pub fn file_names(mut self, value: bool) -> Self {
197 self.file_names = value;
198 self
199 }
200
201 pub fn module_paths(mut self, value: bool) -> Self {
203 self.module_paths = value;
204 self
205 }
206
207 pub fn buffer(mut self, length: usize) -> Self {
209 self.buffer = Some(length);
210 self
211 }
212
213 fn connect<A, Conn>(
214 self,
215 addr: A,
216 conn: Conn,
217 ) -> Result<(Logger, ConnectionHandle<A, Conn>), BuilderError>
218 where
219 A: ToSocketAddrs,
220 A: Send + Sync + 'static,
221 {
222 let mut base_object = self.additional_fields;
224
225 let hostname = if let Some(host) = self.host {
227 host
228 } else {
229 hostname::get()
230 .map_err(BuilderError::HostnameResolution)?
231 .into_string()
232 .map_err(BuilderError::OsString)?
233 };
234 base_object.insert("host".into(), hostname.into());
235
236 let version = self.version.unwrap_or_else(|| DEFAULT_VERSION.to_string());
238 base_object.insert("version".into(), version.into());
239
240 if !base_object.contains_key("short_message") {
242 base_object.insert("short_message".into(), DEFAULT_SHORT_MESSAGE.into());
243 }
244
245 let buffer = self.buffer.unwrap_or(DEFAULT_BUFFER);
247
248 let (sender, receiver) = mpsc::channel::<Bytes>(buffer);
250 let handle = ConnectionHandle {
251 addr,
252 receiver: ReceiverStream::new(receiver),
253 conn,
254 };
255 let logger = Logger {
256 base_object,
257 file_names: self.file_names,
258 line_numbers: self.line_numbers,
259 module_paths: self.module_paths,
260 spans: self.spans,
261 sender,
262 };
263
264 Ok((logger, handle))
265 }
266
267 pub fn connect_udp<A>(
269 self,
270 addr: A,
271 ) -> Result<(Logger, ConnectionHandle<A, UdpConnection>), BuilderError>
272 where
273 A: ToSocketAddrs,
274 A: Send + Sync + 'static,
275 {
276 self.connect(addr, UdpConnection)
277 }
278
279 pub fn connect_tcp<A>(
281 self,
282 addr: A,
283 ) -> Result<(Logger, ConnectionHandle<A, TcpConnection>), BuilderError>
284 where
285 A: ToSocketAddrs,
286 A: Send + Sync + 'static,
287 {
288 self.connect(addr, TcpConnection)
289 }
290
291 #[cfg(feature = "rustls-tls")]
293 pub fn connect_tls<A>(
294 self,
295 addr: A,
296 server_name: rustls_pki_types::ServerName<'static>,
297 client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
298 ) -> Result<(Logger, ConnectionHandle<A, TlsConnection>), BuilderError>
299 where
300 A: ToSocketAddrs,
301 A: Send + Sync + 'static,
302 {
303 self.connect(
304 addr,
305 TlsConnection {
306 server_name,
307 client_config,
308 },
309 )
310 }
311
312 pub fn init_udp_with_subscriber<S, A>(
314 self,
315 addr: A,
316 subscriber: S,
317 ) -> Result<ConnectionHandle<A, UdpConnection>, BuilderError>
318 where
319 S: Subscriber + for<'a> LookupSpan<'a>,
320 S: Send + Sync + 'static,
321 A: ToSocketAddrs,
322 A: Send + Sync + 'static,
323 {
324 let (logger, bg_task) = self.connect_udp(addr)?;
325 let subscriber = Layer::with_subscriber(logger, subscriber);
326 tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
327 subscriber,
328 ))
329 .map_err(BuilderError::Global)?;
330
331 Ok(bg_task)
332 }
333
334 pub fn init_tcp_with_subscriber<A, S>(
336 self,
337 addr: A,
338 subscriber: S,
339 ) -> Result<ConnectionHandle<A, TcpConnection>, BuilderError>
340 where
341 A: ToSocketAddrs,
342 A: Send + Sync + 'static,
343
344 S: Subscriber + for<'a> LookupSpan<'a>,
345 S: Send + Sync + 'static,
346 {
347 let (logger, bg_task) = self.connect_tcp(addr)?;
348
349 let subscriber = Layer::with_subscriber(logger, subscriber);
351 tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
352 subscriber,
353 ))
354 .map_err(BuilderError::Global)?;
355
356 Ok(bg_task)
357 }
358
359 #[cfg(feature = "rustls-tls")]
361 pub fn init_tls_with_subscriber<A, S>(
362 self,
363 addr: A,
364 server_name: rustls_pki_types::ServerName<'static>,
365 client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
366 subscriber: S,
367 ) -> Result<ConnectionHandle<A, TlsConnection>, BuilderError>
368 where
369 A: ToSocketAddrs + Send + Sync + 'static,
370 S: Subscriber + for<'a> LookupSpan<'a>,
371 S: Send + Sync + 'static,
372 {
373 let (logger, bg_task) = self.connect_tls(addr, server_name, client_config)?;
374
375 let subscriber = Layer::with_subscriber(logger, subscriber);
377 tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
378 subscriber,
379 ))
380 .map_err(BuilderError::Global)?;
381
382 Ok(bg_task)
383 }
384
385 pub fn init_tcp<A>(self, addr: A) -> Result<ConnectionHandle<A, TcpConnection>, BuilderError>
387 where
388 A: ToSocketAddrs,
389 A: Send + Sync + 'static,
390 {
391 self.init_tcp_with_subscriber(addr, Registry::default())
392 }
393
394 #[cfg(feature = "rustls-tls")]
396 pub fn init_tls<A>(
397 self,
398 addr: A,
399 server_name: rustls_pki_types::ServerName<'static>,
400 client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
401 ) -> Result<ConnectionHandle<A, TlsConnection>, BuilderError>
402 where
403 A: ToSocketAddrs,
404 A: Send + Sync + 'static,
405 {
406 self.init_tls_with_subscriber(addr, server_name, client_config, Registry::default())
407 }
408
409 pub fn init_udp<A>(self, addr: A) -> Result<ConnectionHandle<A, UdpConnection>, BuilderError>
411 where
412 A: ToSocketAddrs,
413 A: Send + Sync + 'static,
414 {
415 self.init_udp_with_subscriber(addr, Registry::default())
416 }
417}
418
419impl<S> Layer<S> for Logger
420where
421 S: Subscriber + for<'a> LookupSpan<'a>,
422{
423 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
424 let span = ctx.span(id).expect("span not found, this is a bug");
425
426 let mut extensions = span.extensions_mut();
427
428 if extensions.get_mut::<Map<String, Value>>().is_none() {
429 let mut object = HashMap::with_capacity(16);
430 let mut visitor = visitor::AdditionalFieldVisitor::new(&mut object);
431 attrs.record(&mut visitor);
432 extensions.insert(object);
433 }
434 }
435
436 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
437 let span = ctx.span(id).expect("span not found, this is a bug");
438 let mut extensions = span.extensions_mut();
439 if let Some(object) = extensions.get_mut::<HashMap<Cow<'static, str>, Value>>() {
440 let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(object);
441 values.record(&mut add_field_visitor);
442 } else {
443 let mut object = HashMap::with_capacity(16);
444 let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
445 values.record(&mut add_field_visitor);
446 extensions.insert(object)
447 }
448 }
449
450 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
451 let mut object = self.base_object.clone();
453
454 if self.spans {
456 let span = ctx.current_span().id().and_then(|id| {
457 ctx.span_scope(id).map(|scope| {
458 scope.from_root().fold(String::new(), |mut spans, span| {
459 if let Some(span_object) =
461 span.extensions().get::<HashMap<Cow<'static, str>, Value>>()
462 {
463 object.extend(span_object.clone());
464 }
465 if !spans.is_empty() {
466 spans = format!("{}:{}", spans, span.name());
467 } else {
468 spans = span.name().to_string();
469 }
470
471 spans
472 })
473 })
474 });
475
476 if let Some(span) = span {
477 object.insert("_span".into(), span.into());
478 }
479 }
480
481 let metadata = event.metadata();
484 let level_num = match *metadata.level() {
485 tracing_core::Level::ERROR => 3,
486 tracing_core::Level::WARN => 4,
487 tracing_core::Level::INFO => 5,
488 tracing_core::Level::DEBUG => 6,
489 tracing_core::Level::TRACE => 7,
490 };
491 object.insert("level".into(), level_num.into());
492
493 if self.file_names {
495 if let Some(file) = metadata.file() {
496 object.insert("_file".into(), file.into());
497 }
498 }
499
500 if self.line_numbers {
502 if let Some(line) = metadata.line() {
503 object.insert("_line".into(), line.into());
504 }
505 }
506
507 if self.module_paths {
509 if let Some(module_path) = metadata.module_path() {
510 object.insert("_module_path".into(), module_path.into());
511 }
512 }
513
514 let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
516 event.record(&mut add_field_visitor);
517
518 let object = object
520 .into_iter()
521 .map(|(key, value)| (key.to_string(), value))
522 .collect();
523 let final_object = Value::Object(object);
524 let mut raw = serde_json::to_vec(&final_object).unwrap(); raw.push(0);
526
527 if let Err(_err) = self.sender.clone().try_send(Bytes::from(raw)) {
529 };
531 }
532}