tracing_gelf/
lib.rs

1#![warn(missing_debug_implementations, missing_docs)]
2
3//! Provides a [`tracing`](https://docs.rs/tracing) [`Layer`] for Graylog structured logging.
4//!
5//! # Usage
6//!
7//! ```rust
8//! use std::net::SocketAddr;
9//! use tracing_gelf::Logger;
10//!
11//! #[tokio::main]
12//! async fn main() {
13//!    // Graylog address
14//!    let address = "127.0.0.1:12201";
15//!
16//!    // Initialize subscriber
17//!    let mut conn_handle = Logger::builder().init_tcp(address).unwrap();
18//!
19//!    // Spawn background task
20//!    // Any futures executor can be used
21//!    tokio::spawn(async move { conn_handle.connect().await });
22//!
23//!    // Send a log to Graylog
24//!    tracing::info!(message = "oooh, what's in here?");
25//!
26//!    // Create a span
27//!    let span = tracing::info_span!("cave");
28//!    span.in_scope(|| {
29//!        let test = tracing::info_span!("deeper in cave", smell = "damp");
30//!        test.in_scope(|| {
31//!            // Send a log to Graylog, inside a nested span
32//!            tracing::warn!(message = "oh god, it's dark in here");
33//!        })
34//!    });
35//!
36//!    // Send a log to Graylog
37//!    tracing::error!(message = "i'm glad to be out", spook_lvl = 3, ruck_sack = ?["glasses", "inhaler", "large bat"]);
38//! }
39//! ```
40//!
41//! # GELF Encoding
42//!
43//! [`Events`](tracing_core::Event) are encoded into [GELF format] as follows:
44//! * [Event] fields are inserted as [GELF] additional fields, `_field_name`.
45//! * [Event] field named `message` is renamed to `short_message`.
46//! * If `short_message` (or `message`) [Event] field is missing then `short_message` is
47//!      set to the empty string.
48//! * [Event] fields whose names collide with [GELF] required fields are coerced
49//!     into the required types and overrides defaults given in the builder.
50//! * The hierarchy of spans is concatenated and inserted as `span_a:span_b:span_c` and
51//!     inserted as an additional field `_span`.
52//!
53//! [GELF]: https://docs.graylog.org/en/3.1/pages/gelf.html
54//! [GELF format]: https://docs.graylog.org/en/3.1/pages/gelf.html
55
56mod connection;
57mod visitor;
58
59use std::{borrow::Cow, collections::HashMap, fmt::Display};
60
61use bytes::Bytes;
62use serde_json::{map::Map, Value};
63use tokio::net::ToSocketAddrs;
64use tokio::sync::mpsc;
65use tokio_stream::wrappers::ReceiverStream;
66use tracing_core::{
67    dispatcher::SetGlobalDefaultError,
68    span::{Attributes, Id, Record},
69    Event, Subscriber,
70};
71use tracing_subscriber::{
72    layer::{Context, Layer},
73    registry::LookupSpan,
74    Registry,
75};
76
77pub use connection::*;
78
79const DEFAULT_BUFFER: usize = 512;
80const DEFAULT_VERSION: &str = "1.1";
81const DEFAULT_SHORT_MESSAGE: &str = "null";
82
83/// A [`Layer`] responsible for sending structured logs to Graylog.
84#[derive(Debug)]
85pub struct Logger {
86    base_object: HashMap<Cow<'static, str>, Value>,
87    line_numbers: bool,
88    file_names: bool,
89    module_paths: bool,
90    spans: bool,
91    sender: mpsc::Sender<Bytes>,
92}
93
94impl Logger {
95    /// Creates a default [`Logger`] configuration, which can then be customized.
96    pub fn builder() -> Builder {
97        Builder::default()
98    }
99}
100
101/// The error type for [`Logger`] building.
102#[derive(Debug, thiserror::Error)]
103#[non_exhaustive]
104pub enum BuilderError {
105    /// Could not resolve the hostname.
106    #[error("hostname resolution failed")]
107    HostnameResolution(#[source] std::io::Error),
108    /// Could not coerce the [`OsString`](std::ffi::OsString) into a string.
109    #[error("hostname could not be parsed as an OsString: {}", .0.to_string_lossy().as_ref())]
110    OsString(std::ffi::OsString),
111    /// Global dispatcher failed.
112    #[error("global dispatcher failed to initialize")]
113    Global(#[source] SetGlobalDefaultError),
114}
115
116/// A builder for [`Logger`].
117#[derive(Debug)]
118pub struct Builder {
119    additional_fields: HashMap<Cow<'static, str>, Value>,
120    version: Option<String>,
121    host: Option<String>,
122    file_names: bool,
123    line_numbers: bool,
124    module_paths: bool,
125    spans: bool,
126    buffer: Option<usize>,
127}
128
129impl Default for Builder {
130    fn default() -> Self {
131        Builder {
132            additional_fields: HashMap::with_capacity(32),
133            version: None,
134            host: None,
135            file_names: true,
136            line_numbers: true,
137            module_paths: true,
138            spans: true,
139            buffer: None,
140        }
141    }
142}
143
144impl Builder {
145    /// Adds a persistent additional field to the GELF messages.
146    pub fn additional_field<K, V>(mut self, key: K, value: V) -> Self
147    where
148        K: Display,
149        V: Into<Value>,
150    {
151        let coerced_value: Value = match value.into() {
152            Value::Number(n) => Value::Number(n),
153            Value::String(x) => Value::String(x),
154            x => Value::String(x.to_string()),
155        };
156        self.additional_fields
157            .insert(format!("_{}", key).into(), coerced_value);
158        self
159    }
160
161    /// Sets the 'short_message' field's default value. Defaults to "null".
162    ///
163    /// [`Logger`] uses the default value for `short_message` when an event does not specify
164    /// `message` or `short_message` in its fields.
165    pub fn default_short_message<V: ToString>(mut self, short_message: V) -> Self {
166        self.additional_fields
167            .insert("short_message".into(), short_message.to_string().into());
168        self
169    }
170
171    /// Sets the GELF version number. Defaults to "1.1".
172    pub fn version<V>(mut self, version: V) -> Self
173    where
174        V: ToString,
175    {
176        self.version = Some(version.to_string());
177        self
178    }
179
180    /// Sets the `host` field. Defaults to the system's host name.
181    pub fn host<V>(mut self, host: V) -> Self
182    where
183        V: ToString,
184    {
185        self.host = Some(host.to_string());
186        self
187    }
188
189    /// Sets whether line numbers should be logged. Defaults to true.
190    pub fn line_numbers(mut self, value: bool) -> Self {
191        self.line_numbers = value;
192        self
193    }
194
195    /// Sets whether file names should be logged. Defaults to true.
196    pub fn file_names(mut self, value: bool) -> Self {
197        self.file_names = value;
198        self
199    }
200
201    /// Sets whether module paths should be logged. Defaults to true.
202    pub fn module_paths(mut self, value: bool) -> Self {
203        self.module_paths = value;
204        self
205    }
206
207    /// Sets the buffer length. Defaults to 512.
208    pub fn buffer(mut self, length: usize) -> Self {
209        self.buffer = Some(length);
210        self
211    }
212
213    fn connect<A, Conn>(
214        self,
215        addr: A,
216        conn: Conn,
217    ) -> Result<(Logger, ConnectionHandle<A, Conn>), BuilderError>
218    where
219        A: ToSocketAddrs,
220        A: Send + Sync + 'static,
221    {
222        // Persistent fields
223        let mut base_object = self.additional_fields;
224
225        // Get hostname
226        let hostname = if let Some(host) = self.host {
227            host
228        } else {
229            hostname::get()
230                .map_err(BuilderError::HostnameResolution)?
231                .into_string()
232                .map_err(BuilderError::OsString)?
233        };
234        base_object.insert("host".into(), hostname.into());
235
236        // Add version
237        let version = self.version.unwrap_or_else(|| DEFAULT_VERSION.to_string());
238        base_object.insert("version".into(), version.into());
239
240        // Set default short_message if not specified
241        if !base_object.contains_key("short_message") {
242            base_object.insert("short_message".into(), DEFAULT_SHORT_MESSAGE.into());
243        }
244
245        // Set buffer
246        let buffer = self.buffer.unwrap_or(DEFAULT_BUFFER);
247
248        // Construct background task
249        let (sender, receiver) = mpsc::channel::<Bytes>(buffer);
250        let handle = ConnectionHandle {
251            addr,
252            receiver: ReceiverStream::new(receiver),
253            conn,
254        };
255        let logger = Logger {
256            base_object,
257            file_names: self.file_names,
258            line_numbers: self.line_numbers,
259            module_paths: self.module_paths,
260            spans: self.spans,
261            sender,
262        };
263
264        Ok((logger, handle))
265    }
266
267    /// Returns a [`Logger`] and its UDP [`ConnectionHandle`].
268    pub fn connect_udp<A>(
269        self,
270        addr: A,
271    ) -> Result<(Logger, ConnectionHandle<A, UdpConnection>), BuilderError>
272    where
273        A: ToSocketAddrs,
274        A: Send + Sync + 'static,
275    {
276        self.connect(addr, UdpConnection)
277    }
278
279    /// Returns a [`Logger`] and its TCP [`ConnectionHandle`].
280    pub fn connect_tcp<A>(
281        self,
282        addr: A,
283    ) -> Result<(Logger, ConnectionHandle<A, TcpConnection>), BuilderError>
284    where
285        A: ToSocketAddrs,
286        A: Send + Sync + 'static,
287    {
288        self.connect(addr, TcpConnection)
289    }
290
291    /// Returns a [`Logger`] and its TLS [`ConnectionHandle`].
292    #[cfg(feature = "rustls-tls")]
293    pub fn connect_tls<A>(
294        self,
295        addr: A,
296        server_name: rustls_pki_types::ServerName<'static>,
297        client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
298    ) -> Result<(Logger, ConnectionHandle<A, TlsConnection>), BuilderError>
299    where
300        A: ToSocketAddrs,
301        A: Send + Sync + 'static,
302    {
303        self.connect(
304            addr,
305            TlsConnection {
306                server_name,
307                client_config,
308            },
309        )
310    }
311
312    /// Initialize logging with a given [`Subscriber`] and returns its UDP [`ConnectionHandle`].
313    pub fn init_udp_with_subscriber<S, A>(
314        self,
315        addr: A,
316        subscriber: S,
317    ) -> Result<ConnectionHandle<A, UdpConnection>, BuilderError>
318    where
319        S: Subscriber + for<'a> LookupSpan<'a>,
320        S: Send + Sync + 'static,
321        A: ToSocketAddrs,
322        A: Send + Sync + 'static,
323    {
324        let (logger, bg_task) = self.connect_udp(addr)?;
325        let subscriber = Layer::with_subscriber(logger, subscriber);
326        tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
327            subscriber,
328        ))
329        .map_err(BuilderError::Global)?;
330
331        Ok(bg_task)
332    }
333
334    /// Initializes logging with a given [`Subscriber`] and returns its TCP [`ConnectionHandle`].
335    pub fn init_tcp_with_subscriber<A, S>(
336        self,
337        addr: A,
338        subscriber: S,
339    ) -> Result<ConnectionHandle<A, TcpConnection>, BuilderError>
340    where
341        A: ToSocketAddrs,
342        A: Send + Sync + 'static,
343
344        S: Subscriber + for<'a> LookupSpan<'a>,
345        S: Send + Sync + 'static,
346    {
347        let (logger, bg_task) = self.connect_tcp(addr)?;
348
349        // If a subscriber was set then use it as the inner subscriber.
350        let subscriber = Layer::with_subscriber(logger, subscriber);
351        tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
352            subscriber,
353        ))
354        .map_err(BuilderError::Global)?;
355
356        Ok(bg_task)
357    }
358
359    /// Initialize logging with a given [`Subscriber`] and returns its [`ConnectionHandle`].
360    #[cfg(feature = "rustls-tls")]
361    pub fn init_tls_with_subscriber<A, S>(
362        self,
363        addr: A,
364        server_name: rustls_pki_types::ServerName<'static>,
365        client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
366        subscriber: S,
367    ) -> Result<ConnectionHandle<A, TlsConnection>, BuilderError>
368    where
369        A: ToSocketAddrs + Send + Sync + 'static,
370        S: Subscriber + for<'a> LookupSpan<'a>,
371        S: Send + Sync + 'static,
372    {
373        let (logger, bg_task) = self.connect_tls(addr, server_name, client_config)?;
374
375        // If a subscriber was set then use it as the inner subscriber.
376        let subscriber = Layer::with_subscriber(logger, subscriber);
377        tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
378            subscriber,
379        ))
380        .map_err(BuilderError::Global)?;
381
382        Ok(bg_task)
383    }
384
385    /// Initializes TCP logging and returns its [`ConnectionHandle`].
386    pub fn init_tcp<A>(self, addr: A) -> Result<ConnectionHandle<A, TcpConnection>, BuilderError>
387    where
388        A: ToSocketAddrs,
389        A: Send + Sync + 'static,
390    {
391        self.init_tcp_with_subscriber(addr, Registry::default())
392    }
393
394    /// Initializes TLS logging and returns its [`ConnectionHandle`].
395    #[cfg(feature = "rustls-tls")]
396    pub fn init_tls<A>(
397        self,
398        addr: A,
399        server_name: rustls_pki_types::ServerName<'static>,
400        client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
401    ) -> Result<ConnectionHandle<A, TlsConnection>, BuilderError>
402    where
403        A: ToSocketAddrs,
404        A: Send + Sync + 'static,
405    {
406        self.init_tls_with_subscriber(addr, server_name, client_config, Registry::default())
407    }
408
409    /// Initialize UDP logging and returns its [`ConnectionHandle`].
410    pub fn init_udp<A>(self, addr: A) -> Result<ConnectionHandle<A, UdpConnection>, BuilderError>
411    where
412        A: ToSocketAddrs,
413        A: Send + Sync + 'static,
414    {
415        self.init_udp_with_subscriber(addr, Registry::default())
416    }
417}
418
419impl<S> Layer<S> for Logger
420where
421    S: Subscriber + for<'a> LookupSpan<'a>,
422{
423    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
424        let span = ctx.span(id).expect("span not found, this is a bug");
425
426        let mut extensions = span.extensions_mut();
427
428        if extensions.get_mut::<Map<String, Value>>().is_none() {
429            let mut object = HashMap::with_capacity(16);
430            let mut visitor = visitor::AdditionalFieldVisitor::new(&mut object);
431            attrs.record(&mut visitor);
432            extensions.insert(object);
433        }
434    }
435
436    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
437        let span = ctx.span(id).expect("span not found, this is a bug");
438        let mut extensions = span.extensions_mut();
439        if let Some(object) = extensions.get_mut::<HashMap<Cow<'static, str>, Value>>() {
440            let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(object);
441            values.record(&mut add_field_visitor);
442        } else {
443            let mut object = HashMap::with_capacity(16);
444            let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
445            values.record(&mut add_field_visitor);
446            extensions.insert(object)
447        }
448    }
449
450    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
451        // GELF object
452        let mut object = self.base_object.clone();
453
454        // Get span name
455        if self.spans {
456            let span = ctx.current_span().id().and_then(|id| {
457                ctx.span_scope(id).map(|scope| {
458                    scope.from_root().fold(String::new(), |mut spans, span| {
459                        // Add span fields to the base object
460                        if let Some(span_object) =
461                            span.extensions().get::<HashMap<Cow<'static, str>, Value>>()
462                        {
463                            object.extend(span_object.clone());
464                        }
465                        if !spans.is_empty() {
466                            spans = format!("{}:{}", spans, span.name());
467                        } else {
468                            spans = span.name().to_string();
469                        }
470
471                        spans
472                    })
473                })
474            });
475
476            if let Some(span) = span {
477                object.insert("_span".into(), span.into());
478            }
479        }
480
481        // Extract metadata
482        // Insert level
483        let metadata = event.metadata();
484        let level_num = match *metadata.level() {
485            tracing_core::Level::ERROR => 3,
486            tracing_core::Level::WARN => 4,
487            tracing_core::Level::INFO => 5,
488            tracing_core::Level::DEBUG => 6,
489            tracing_core::Level::TRACE => 7,
490        };
491        object.insert("level".into(), level_num.into());
492
493        // Insert file
494        if self.file_names {
495            if let Some(file) = metadata.file() {
496                object.insert("_file".into(), file.into());
497            }
498        }
499
500        // Insert line
501        if self.line_numbers {
502            if let Some(line) = metadata.line() {
503                object.insert("_line".into(), line.into());
504            }
505        }
506
507        // Insert module path
508        if self.module_paths {
509            if let Some(module_path) = metadata.module_path() {
510                object.insert("_module_path".into(), module_path.into());
511            }
512        }
513
514        // Append additional fields
515        let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
516        event.record(&mut add_field_visitor);
517
518        // Serialize
519        let object = object
520            .into_iter()
521            .map(|(key, value)| (key.to_string(), value))
522            .collect();
523        let final_object = Value::Object(object);
524        let mut raw = serde_json::to_vec(&final_object).unwrap(); // This is safe
525        raw.push(0);
526
527        // Send
528        if let Err(_err) = self.sender.clone().try_send(Bytes::from(raw)) {
529            // TODO: Add handler
530        };
531    }
532}