tracing_fluentd/
lib.rs

1//! [tracing](https://github.com/tokio-rs/tracing) for [fluentd](https://www.fluentd.org/).
2//!
3//!## Features
4//!
5//!- `event_time` - Specifies to encode timestamp as EventTime instead of default unix timestamp
6//!
7//!## Example
8//!
9//!```rust
10//!use tracing_subscriber::layer::SubscriberExt;
11//!
12//!let layer = tracing_fluentd::Builder::new("rust").flatten().layer().expect("Create layer");
13//!let sub = tracing_subscriber::Registry::default().with(layer);
14//!let guard = tracing::subscriber::set_default(sub);
15//!```
16
17#![warn(missing_docs)]
18#![cfg_attr(feature = "cargo-clippy", allow(clippy::style))]
19
20use std::net::{TcpStream, SocketAddrV4, SocketAddr, Ipv4Addr};
21use std::io::Write;
22use core::num;
23
24mod tracing;
25pub mod fluent;
26mod worker;
27mod default_writers;
28
29pub use self::tracing::FieldFormatter;
30
31///Policy to insert span data as object.
32///
33///Specifically, any span's or event metadata's attributes are associated with its name inside
34///record.
35///For example having span `lolka` would add key `lolka` to the record, with span's attributes as
36///value.
37///
38///Special case is event metadata which is always inserted with key `metadata` and contains
39///information such location in code and event level.
40pub struct NestedFmt;
41///Policy to insert span data as flattent object.
42///
43///Specifically, any span's or event metadata's attributes are inserted at the root of event
44///record.
45///For example, having span `lolka` with attribute `arg: 1` would result in `arg: 1` to be inserted
46///alongside `message` and other attributes of the event.
47pub struct FlattenFmt;
48
49///Describers creation of sink for `tracing` record.
50pub trait MakeWriter: 'static + Send {
51    ///Writer type
52    type Writer: Write;
53
54    ///Creates instance of `Writer`.
55    ///
56    ///It should be noted that it is ok to cache `Writer`.
57    ///
58    ///In case of failure working with writer, subscriber shall retry at least once
59    fn make(&self) -> std::io::Result<Self::Writer>;
60}
61
62impl<W: Write, T: 'static + Send + Fn() -> std::io::Result<W>> MakeWriter for T {
63    type Writer = W;
64    #[inline(always)]
65    fn make(&self) -> std::io::Result<Self::Writer> {
66        (self)()
67    }
68}
69
70fn default() -> std::io::Result<TcpStream> {
71    use core::time::Duration;
72
73    let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 24224));
74    TcpStream::connect_timeout(&addr, Duration::from_secs(1))
75}
76
77///`tracing`'s Layer
78pub struct Layer<F, C> {
79    consumer: C,
80    fmt: F,
81}
82
83///Builder to enable forwarding `tracing` events towards the `fluentd` server.
84///
85///## Type params
86///
87///- `F` - Attributes formatter, determines how to compose `fluent::Record`.
88///- `A` - function that returns `Fluentd` wrter. Default is to create tcp socket towards `127.0.0.1:24224` with timeout of 1s.
89pub struct Builder<F=NestedFmt, A=fn() -> std::io::Result<TcpStream>> {
90    tag: &'static str,
91    writer: A,
92    fmt: F,
93    max_msg_record: usize,
94}
95
96impl Builder {
97    #[inline(always)]
98    ///Creates default configuration.
99    ///
100    ///## Params:
101    ///
102    ///`tag` - Event category to send for each record.
103    pub fn new(tag: &'static str) -> Self {
104        const DEFAULT_MAX_MSG_RECORD: usize = 10;
105        Self {
106            tag,
107            writer: default,
108            fmt: NestedFmt,
109            max_msg_record: DEFAULT_MAX_MSG_RECORD,
110        }
111    }
112
113    #[inline(always)]
114    ///Provides max message record to fetch up.
115    pub fn with_max_msg_record(self, max_msg_record: num::NonZeroUsize) -> Self {
116        Self {
117            tag: self.tag,
118            writer: self.writer,
119            fmt: self.fmt,
120            max_msg_record: max_msg_record.get()
121        }
122    }
123}
124
125impl<A: MakeWriter> Builder<NestedFmt, A> {
126    #[inline(always)]
127    ///Configures to flatten span/metadata attributes within record.
128    ///Instead of the default nesting behavior.
129    pub fn flatten(self) -> Builder<FlattenFmt, A> {
130        Builder {
131            tag: self.tag,
132            writer: self.writer,
133            fmt: FlattenFmt,
134            max_msg_record: self.max_msg_record,
135        }
136    }
137}
138
139impl<F: FieldFormatter, A: MakeWriter> Builder<F, A> {
140    #[inline(always)]
141    ///Provides formatter.
142    pub fn with_formatter<NF: FieldFormatter>(self, fmt: NF) -> Builder<NF, A> {
143        Builder {
144            tag: self.tag,
145            writer: self.writer,
146            fmt,
147            max_msg_record: self.max_msg_record,
148        }
149    }
150
151    #[inline(always)]
152    ///Provides callback to get writer where to write records.
153    ///
154    ///Normally fluentd server expects connection to be closed immediately upon sending records.
155    ///hence created writer is dropped immediately upon writing being finished.
156    pub fn with_writer<MW: MakeWriter>(self, writer: MW) -> Builder<F, MW> {
157        Builder {
158            tag: self.tag,
159            writer,
160            fmt: self.fmt,
161            max_msg_record: self.max_msg_record,
162        }
163    }
164
165    #[inline(always)]
166    ///Creates `tracing` layer.
167    ///
168    ///If you do not want to create multiple threads, consider using
169    ///`layer_guarded`/`layer_from_guard`.
170    ///
171    ///`Error` can happen during creation of worker thread.
172    pub fn layer(self) -> Result<Layer<F, worker::ThreadWorker>, std::io::Error> {
173        let consumer = worker::thread(self.tag, self.writer, self.max_msg_record)?;
174
175        Ok(Layer {
176            consumer,
177            fmt: self.fmt,
178        })
179    }
180
181    #[inline]
182    ///Creates `tracing` layer, returning guard that allows to stop `fluentd` worker on `Drop`.
183    ///
184    ///This may be necessary due to bad API that `tracing` provides to control lifetime of global
185    ///logger. As underlying implementations employs caching, it needs to perform flush once logger
186    ///is no longer necessary hence this API is provided.
187    ///
188    ///`Error` can happen during creation of worker thread.
189    pub fn layer_guarded(self) -> Result<(Layer<F, worker::WorkerChannel>, FlushingGuard), std::io::Error> {
190        let consumer = worker::thread(self.tag, self.writer, self.max_msg_record)?;
191        let guard = FlushingGuard(consumer);
192        let layer = Layer {
193            consumer: worker::WorkerChannel(guard.0.sender()),
194            fmt: self.fmt,
195        };
196
197        Ok((layer, guard))
198    }
199
200    #[inline(always)]
201    ///Creates `tracing` layer, using guard returned by  `layer_guarded`.
202    ///
203    ///Specifically, it will use the same worker thread as first instance of `layer_guarded`,
204    ///without affecting lifetime of `guard`.
205    ///Hence once `guard` is dropped, worker for all connected layers will stop sending logs.
206    ///
207    ///`Error` can happen during creation of worker thread.
208    pub fn layer_from_guard(self, guard: &FlushingGuard) -> Layer<F, worker::WorkerChannel> {
209        Layer {
210            consumer: worker::WorkerChannel(guard.0.sender()),
211            fmt: self.fmt,
212        }
213    }
214}
215
216#[repr(transparent)]
217///Guard that flushes and terminates `fluentd` worker.
218///
219///Droping this guard should be done only when `Layer` is no longer needed.
220///
221///As part of destructor, it awaits to finish flushing `fluentd` records.
222pub struct FlushingGuard(worker::ThreadWorker);
223
224impl Drop for FlushingGuard {
225    fn drop(&mut self) {
226        self.0.stop();
227    }
228}