tracing_fluentd/lib.rs
1//! [tracing](https://github.com/tokio-rs/tracing) for [fluentd](https://www.fluentd.org/).
2//!
3//!## Features
4//!
5//!- `event_time` - Specifies to encode timestamp as EventTime instead of default unix timestamp
6//!
7//!## Example
8//!
9//!```rust
10//!use tracing_subscriber::layer::SubscriberExt;
11//!
12//!let layer = tracing_fluentd::Builder::new("rust").flatten().layer().expect("Create layer");
13//!let sub = tracing_subscriber::Registry::default().with(layer);
14//!let guard = tracing::subscriber::set_default(sub);
15//!```
16
17#![warn(missing_docs)]
18#![cfg_attr(feature = "cargo-clippy", allow(clippy::style))]
19
20use std::net::{TcpStream, SocketAddrV4, SocketAddr, Ipv4Addr};
21use std::io::Write;
22use core::num;
23
24mod tracing;
25pub mod fluent;
26mod worker;
27mod default_writers;
28
29pub use self::tracing::FieldFormatter;
30
31///Policy to insert span data as object.
32///
33///Specifically, any span's or event metadata's attributes are associated with its name inside
34///record.
35///For example having span `lolka` would add key `lolka` to the record, with span's attributes as
36///value.
37///
38///Special case is event metadata which is always inserted with key `metadata` and contains
39///information such location in code and event level.
40pub struct NestedFmt;
41///Policy to insert span data as flattent object.
42///
43///Specifically, any span's or event metadata's attributes are inserted at the root of event
44///record.
45///For example, having span `lolka` with attribute `arg: 1` would result in `arg: 1` to be inserted
46///alongside `message` and other attributes of the event.
47pub struct FlattenFmt;
48
49///Describers creation of sink for `tracing` record.
50pub trait MakeWriter: 'static + Send {
51 ///Writer type
52 type Writer: Write;
53
54 ///Creates instance of `Writer`.
55 ///
56 ///It should be noted that it is ok to cache `Writer`.
57 ///
58 ///In case of failure working with writer, subscriber shall retry at least once
59 fn make(&self) -> std::io::Result<Self::Writer>;
60}
61
62impl<W: Write, T: 'static + Send + Fn() -> std::io::Result<W>> MakeWriter for T {
63 type Writer = W;
64 #[inline(always)]
65 fn make(&self) -> std::io::Result<Self::Writer> {
66 (self)()
67 }
68}
69
70fn default() -> std::io::Result<TcpStream> {
71 use core::time::Duration;
72
73 let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 24224));
74 TcpStream::connect_timeout(&addr, Duration::from_secs(1))
75}
76
77///`tracing`'s Layer
78pub struct Layer<F, C> {
79 consumer: C,
80 fmt: F,
81}
82
83///Builder to enable forwarding `tracing` events towards the `fluentd` server.
84///
85///## Type params
86///
87///- `F` - Attributes formatter, determines how to compose `fluent::Record`.
88///- `A` - function that returns `Fluentd` wrter. Default is to create tcp socket towards `127.0.0.1:24224` with timeout of 1s.
89pub struct Builder<F=NestedFmt, A=fn() -> std::io::Result<TcpStream>> {
90 tag: &'static str,
91 writer: A,
92 fmt: F,
93 max_msg_record: usize,
94}
95
96impl Builder {
97 #[inline(always)]
98 ///Creates default configuration.
99 ///
100 ///## Params:
101 ///
102 ///`tag` - Event category to send for each record.
103 pub fn new(tag: &'static str) -> Self {
104 const DEFAULT_MAX_MSG_RECORD: usize = 10;
105 Self {
106 tag,
107 writer: default,
108 fmt: NestedFmt,
109 max_msg_record: DEFAULT_MAX_MSG_RECORD,
110 }
111 }
112
113 #[inline(always)]
114 ///Provides max message record to fetch up.
115 pub fn with_max_msg_record(self, max_msg_record: num::NonZeroUsize) -> Self {
116 Self {
117 tag: self.tag,
118 writer: self.writer,
119 fmt: self.fmt,
120 max_msg_record: max_msg_record.get()
121 }
122 }
123}
124
125impl<A: MakeWriter> Builder<NestedFmt, A> {
126 #[inline(always)]
127 ///Configures to flatten span/metadata attributes within record.
128 ///Instead of the default nesting behavior.
129 pub fn flatten(self) -> Builder<FlattenFmt, A> {
130 Builder {
131 tag: self.tag,
132 writer: self.writer,
133 fmt: FlattenFmt,
134 max_msg_record: self.max_msg_record,
135 }
136 }
137}
138
139impl<F: FieldFormatter, A: MakeWriter> Builder<F, A> {
140 #[inline(always)]
141 ///Provides formatter.
142 pub fn with_formatter<NF: FieldFormatter>(self, fmt: NF) -> Builder<NF, A> {
143 Builder {
144 tag: self.tag,
145 writer: self.writer,
146 fmt,
147 max_msg_record: self.max_msg_record,
148 }
149 }
150
151 #[inline(always)]
152 ///Provides callback to get writer where to write records.
153 ///
154 ///Normally fluentd server expects connection to be closed immediately upon sending records.
155 ///hence created writer is dropped immediately upon writing being finished.
156 pub fn with_writer<MW: MakeWriter>(self, writer: MW) -> Builder<F, MW> {
157 Builder {
158 tag: self.tag,
159 writer,
160 fmt: self.fmt,
161 max_msg_record: self.max_msg_record,
162 }
163 }
164
165 #[inline(always)]
166 ///Creates `tracing` layer.
167 ///
168 ///If you do not want to create multiple threads, consider using
169 ///`layer_guarded`/`layer_from_guard`.
170 ///
171 ///`Error` can happen during creation of worker thread.
172 pub fn layer(self) -> Result<Layer<F, worker::ThreadWorker>, std::io::Error> {
173 let consumer = worker::thread(self.tag, self.writer, self.max_msg_record)?;
174
175 Ok(Layer {
176 consumer,
177 fmt: self.fmt,
178 })
179 }
180
181 #[inline]
182 ///Creates `tracing` layer, returning guard that allows to stop `fluentd` worker on `Drop`.
183 ///
184 ///This may be necessary due to bad API that `tracing` provides to control lifetime of global
185 ///logger. As underlying implementations employs caching, it needs to perform flush once logger
186 ///is no longer necessary hence this API is provided.
187 ///
188 ///`Error` can happen during creation of worker thread.
189 pub fn layer_guarded(self) -> Result<(Layer<F, worker::WorkerChannel>, FlushingGuard), std::io::Error> {
190 let consumer = worker::thread(self.tag, self.writer, self.max_msg_record)?;
191 let guard = FlushingGuard(consumer);
192 let layer = Layer {
193 consumer: worker::WorkerChannel(guard.0.sender()),
194 fmt: self.fmt,
195 };
196
197 Ok((layer, guard))
198 }
199
200 #[inline(always)]
201 ///Creates `tracing` layer, using guard returned by `layer_guarded`.
202 ///
203 ///Specifically, it will use the same worker thread as first instance of `layer_guarded`,
204 ///without affecting lifetime of `guard`.
205 ///Hence once `guard` is dropped, worker for all connected layers will stop sending logs.
206 ///
207 ///`Error` can happen during creation of worker thread.
208 pub fn layer_from_guard(self, guard: &FlushingGuard) -> Layer<F, worker::WorkerChannel> {
209 Layer {
210 consumer: worker::WorkerChannel(guard.0.sender()),
211 fmt: self.fmt,
212 }
213 }
214}
215
216#[repr(transparent)]
217///Guard that flushes and terminates `fluentd` worker.
218///
219///Droping this guard should be done only when `Layer` is no longer needed.
220///
221///As part of destructor, it awaits to finish flushing `fluentd` records.
222pub struct FlushingGuard(worker::ThreadWorker);
223
224impl Drop for FlushingGuard {
225 fn drop(&mut self) {
226 self.0.stop();
227 }
228}