tracing_loki/
lib.rs

1//! A [`tracing`] layer for shipping logs to [Grafana
2//! Loki](https://grafana.com/oss/loki/).
3//!
4//! Usage
5//! =====
6//!
7//! ```rust
8//! use tracing_subscriber::layer::SubscriberExt;
9//! use tracing_subscriber::util::SubscriberInitExt;
10//! use std::process;
11//! use url::Url;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), tracing_loki::Error> {
15//!     let (layer, task) = tracing_loki::builder()
16//!         .label("host", "mine")?
17//!         .extra_field("pid", format!("{}", process::id()))?
18//!         .build_url(Url::parse("http://127.0.0.1:3100").unwrap())?;
19//!
20//!     // We need to register our layer with `tracing`.
21//!     tracing_subscriber::registry()
22//!         .with(layer)
23//!         // One could add more layers here, for example logging to stdout:
24//!         // .with(tracing_subscriber::fmt::Layer::new())
25//!         .init();
26//!
27//!     // The background task needs to be spawned so the logs actually get
28//!     // delivered.
29//!     tokio::spawn(task);
30//!
31//!     tracing::info!(
32//!         task = "tracing_setup",
33//!         result = "success",
34//!         "tracing successfully set up",
35//!     );
36//!
37//!     Ok(())
38//! }
39//! ```
40
41#![allow(clippy::or_fun_call)]
42#![allow(clippy::type_complexity)]
43#![deny(missing_docs)]
44
45#[cfg(not(feature = "compat-0-2-1"))]
46compile_error!(
47    "The feature `compat-0-2-1` must be enabled to ensure \
48    forward compatibility with future versions of this crate"
49);
50
51/// The re-exported `url` dependency of this crate.
52///
53/// Use this to avoid depending on a potentially-incompatible `url` version yourself.
54pub extern crate url;
55
56use loki_api::logproto as loki;
57use loki_api::prost;
58use serde::Serialize;
59use std::cmp;
60use std::collections::HashMap;
61use std::error;
62use std::fmt;
63use std::future::Future;
64use std::mem;
65use std::pin::Pin;
66use std::task::Context;
67use std::task::Poll;
68use std::time::Duration;
69use std::time::SystemTime;
70use tokio::sync::mpsc;
71use tokio_stream::wrappers::ReceiverStream;
72use tokio_stream::Stream;
73use tracing::instrument::WithSubscriber;
74use tracing_core::field::Field;
75use tracing_core::field::Visit;
76use tracing_core::span::Attributes;
77use tracing_core::span::Id;
78use tracing_core::span::Record;
79use tracing_core::Event;
80use tracing_core::Level;
81use tracing_core::Subscriber;
82use tracing_log::NormalizeEvent;
83use tracing_subscriber::layer::Context as TracingContext;
84use tracing_subscriber::registry::LookupSpan;
85use url::Url;
86
87use labels::FormattedLabels;
88use level_map::LevelMap;
89use log_support::SerializeEventFieldMapStrippingLog;
90use no_subscriber::NoSubscriber;
91use ErrorInner as ErrorI;
92
93pub use builder::builder;
94pub use builder::Builder;
95
96mod builder;
97mod labels;
98mod level_map;
99mod log_support;
100mod no_subscriber;
101
102#[cfg(doctest)]
103#[doc = include_str!("../README.md")]
104struct ReadmeDoctests;
105
106fn event_channel() -> (
107    mpsc::Sender<Option<LokiEvent>>,
108    mpsc::Receiver<Option<LokiEvent>>,
109) {
110    mpsc::channel(512)
111}
112
113/// The error type for constructing a [`Layer`].
114///
115/// Nothing except for the [`std::error::Error`] (and [`std::fmt::Debug`] and
116/// [`std::fmt::Display`]) implementation of this type is exposed.
117pub struct Error(ErrorInner);
118
119impl fmt::Debug for Error {
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        self.0.fmt(f)
122    }
123}
124impl fmt::Display for Error {
125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126        self.0.fmt(f)
127    }
128}
129impl error::Error for Error {}
130
131#[derive(Debug)]
132enum ErrorInner {
133    DuplicateExtraField(String),
134    DuplicateHttpHeader(String),
135    DuplicateLabel(String),
136    InvalidHttpHeaderName(String),
137    InvalidHttpHeaderValue(String),
138    InvalidLabelCharacter(String, char),
139    InvalidLokiUrl,
140    ReservedLabelLevel,
141}
142
143impl fmt::Display for ErrorInner {
144    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145        use self::ErrorInner::*;
146        match self {
147            DuplicateExtraField(key) => write!(f, "duplicate extra field key {:?}", key),
148            DuplicateHttpHeader(name) => write!(f, "duplicate HTTP header {:?}", name),
149            DuplicateLabel(key) => write!(f, "duplicate label key {:?}", key),
150            InvalidHttpHeaderName(name) => write!(f, "invalid HTTP header name {:?}", name),
151            InvalidHttpHeaderValue(name) => write!(f, "invalid HTTP header value for {:?}", name),
152            InvalidLabelCharacter(key, c) => {
153                write!(f, "invalid label character {:?} in key {:?}", c, key)
154            }
155            InvalidLokiUrl => write!(f, "invalid Loki URL"),
156            ReservedLabelLevel => write!(f, "cannot add custom label for \"level\""),
157        }
158    }
159}
160
161/// Construct a [`Layer`] and its corresponding [`BackgroundTask`].
162///
163/// The [`Layer`] needs to be registered with a
164/// [`tracing_subscriber::Registry`], and the [`BackgroundTask`] needs to be
165/// [`tokio::spawn`]ed.
166///
167/// **Note** that unlike the [`Builder::build_url`] function, this function
168/// **strips off** the path component of `loki_url` before appending
169/// `/loki/api/v1/push`.
170///
171/// See [`builder()`] and this crate's root documentation for a more flexible
172/// method.
173///
174/// # Example
175///
176/// ```rust
177/// use tracing_subscriber::layer::SubscriberExt;
178/// use tracing_subscriber::util::SubscriberInitExt;
179/// use url::Url;
180///
181/// #[tokio::main]
182/// async fn main() -> Result<(), tracing_loki::Error> {
183///     let (layer, task) = tracing_loki::layer(
184///         Url::parse("http://127.0.0.1:3100").unwrap(),
185///         vec![("host".into(), "mine".into())].into_iter().collect(),
186///         vec![].into_iter().collect(),
187///     )?;
188///
189///     // We need to register our layer with `tracing`.
190///     tracing_subscriber::registry()
191///         .with(layer)
192///         // One could add more layers here, for example logging to stdout:
193///         // .with(tracing_subscriber::fmt::Layer::new())
194///         .init();
195///
196///     // The background task needs to be spawned so the logs actually get
197///     // delivered.
198///     tokio::spawn(task);
199///
200///     tracing::info!(
201///         task = "tracing_setup",
202///         result = "success",
203///         "tracing successfully set up",
204///     );
205///
206///     Ok(())
207/// }
208/// ```
209pub fn layer(
210    loki_url: Url,
211    labels: HashMap<String, String>,
212    extra_fields: HashMap<String, String>,
213) -> Result<(Layer, BackgroundTask), Error> {
214    let mut builder = builder();
215    for (key, value) in labels {
216        builder = builder.label(key, value)?;
217    }
218    for (key, value) in extra_fields {
219        builder = builder.extra_field(key, value)?;
220    }
221    builder.build_url(
222        loki_url
223            .join("/")
224            .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
225    )
226}
227
228/// The [`tracing_subscriber::Layer`] implementation for the Loki backend.
229///
230/// See the crate's root documentation for an example.
231pub struct Layer {
232    extra_fields: HashMap<String, String>,
233    sender: mpsc::Sender<Option<LokiEvent>>,
234}
235
236struct LokiEvent {
237    trigger_send: bool,
238    timestamp: SystemTime,
239    level: Level,
240    message: String,
241}
242
243#[derive(Serialize)]
244struct SerializedEvent<'a> {
245    #[serde(flatten)]
246    event: SerializeEventFieldMapStrippingLog<'a>,
247    #[serde(flatten)]
248    extra_fields: &'a HashMap<String, String>,
249    #[serde(flatten)]
250    span_fields: serde_json::Map<String, serde_json::Value>,
251    _spans: &'a [&'a str],
252    _target: &'a str,
253    _module_path: Option<&'a str>,
254    _file: Option<&'a str>,
255    _line: Option<u32>,
256}
257
258#[derive(Default)]
259struct Fields {
260    fields: serde_json::Map<String, serde_json::Value>,
261}
262
263impl Fields {
264    fn record_impl(&mut self, field: &Field, value: serde_json::Value) {
265        self.fields.insert(field.name().into(), value);
266    }
267    fn record<T: Into<serde_json::Value>>(&mut self, field: &Field, value: T) {
268        self.record_impl(field, value.into());
269    }
270}
271
272impl Visit for Fields {
273    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
274        self.record(field, format!("{:?}", value));
275    }
276    fn record_f64(&mut self, field: &Field, value: f64) {
277        self.record(field, value);
278    }
279    fn record_i64(&mut self, field: &Field, value: i64) {
280        self.record(field, value);
281    }
282    fn record_u64(&mut self, field: &Field, value: u64) {
283        self.record(field, value);
284    }
285    fn record_bool(&mut self, field: &Field, value: bool) {
286        self.record(field, value);
287    }
288    fn record_str(&mut self, field: &Field, value: &str) {
289        self.record(field, value);
290    }
291    fn record_error(&mut self, field: &Field, value: &(dyn error::Error + 'static)) {
292        self.record(field, format!("{}", value));
293    }
294}
295
296impl<S: Subscriber + for<'a> LookupSpan<'a>> tracing_subscriber::Layer<S> for Layer {
297    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: TracingContext<'_, S>) {
298        let span = ctx.span(id).expect("Span not found, this is a bug");
299        let mut extensions = span.extensions_mut();
300        if extensions.get_mut::<Fields>().is_none() {
301            let mut fields = Fields::default();
302            attrs.record(&mut fields);
303            extensions.insert(fields);
304        }
305    }
306    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: TracingContext<'_, S>) {
307        let span = ctx.span(id).expect("Span not found, this is a bug");
308        let mut extensions = span.extensions_mut();
309        let fields = extensions.get_mut::<Fields>().expect("unregistered span");
310        values.record(fields);
311    }
312    fn on_event(&self, event: &Event<'_>, ctx: TracingContext<'_, S>) {
313        let timestamp = SystemTime::now();
314        let normalized_meta = event.normalized_metadata();
315        let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
316        let mut span_fields: serde_json::Map<String, serde_json::Value> = Default::default();
317        let spans = event
318            .parent()
319            .cloned()
320            .or_else(|| ctx.current_span().id().cloned())
321            .and_then(|id| {
322                ctx.span_scope(&id).map(|scope| {
323                    scope.from_root().fold(Vec::new(), |mut spans, span| {
324                        span_fields.extend(
325                            span.extensions()
326                                .get::<Fields>()
327                                .expect("unregistered span")
328                                .fields
329                                .iter()
330                                .map(|(f, v)| (f.clone(), v.clone())),
331                        );
332                        spans.push(span.name());
333                        spans
334                    })
335                })
336            })
337            .unwrap_or(Vec::new());
338        // TODO: Anything useful to do when the capacity has been reached?
339        let _ = self.sender.try_send(Some(LokiEvent {
340            trigger_send: !meta.target().starts_with("tracing_loki"),
341            timestamp,
342            level: *meta.level(),
343            message: serde_json::to_string(&SerializedEvent {
344                event: SerializeEventFieldMapStrippingLog(event),
345                extra_fields: &self.extra_fields,
346                span_fields,
347                _spans: &spans,
348                _target: meta.target(),
349                _module_path: meta.module_path(),
350                _file: meta.file(),
351                _line: meta.line(),
352            })
353            .expect("json serialization shouldn't fail"),
354        }));
355    }
356}
357
358struct SendQueue {
359    encoded_labels: String,
360    sending: Vec<LokiEvent>,
361    to_send: Vec<LokiEvent>,
362}
363
364impl SendQueue {
365    fn new(encoded_labels: String) -> SendQueue {
366        SendQueue {
367            encoded_labels,
368            sending: Vec::new(),
369            to_send: Vec::new(),
370        }
371    }
372    fn push(&mut self, event: LokiEvent) {
373        // TODO: Add limit.
374        self.to_send.push(event);
375    }
376    fn drop_outstanding(&mut self) -> usize {
377        let len = self.sending.len();
378        self.sending.clear();
379        len
380    }
381    fn on_send_result(&mut self, result: Result<(), ()>) {
382        match result {
383            Ok(()) => self.sending.clear(),
384            Err(()) => {
385                self.sending.append(&mut self.to_send);
386                mem::swap(&mut self.sending, &mut self.to_send);
387            }
388        }
389    }
390    fn should_send(&self) -> bool {
391        self.to_send.iter().any(|e| e.trigger_send)
392    }
393    fn prepare_sending(&mut self) -> loki::StreamAdapter {
394        if !self.sending.is_empty() {
395            panic!("can only prepare sending while no request is in flight");
396        }
397        mem::swap(&mut self.sending, &mut self.to_send);
398        loki::StreamAdapter {
399            labels: self.encoded_labels.clone(),
400            entries: self
401                .sending
402                .iter()
403                .map(|e| loki::EntryAdapter {
404                    timestamp: Some(e.timestamp.into()),
405                    line: e.message.clone(),
406                })
407                .collect(),
408            // Couldn't find documentation except for the promtail source code:
409            // https://github.com/grafana/loki/blob/8c06c546ab15a568f255461f10318dae37e022d3/clients/pkg/promtail/client/batch.go#L55-L58
410            //
411            // In the Go code, the hash value isn't initialized explicitly,
412            // hence it is set to 0.
413            hash: 0,
414        }
415    }
416}
417
418#[derive(Debug)]
419struct BadRedirect {
420    status: u16,
421    to: Url,
422}
423
424impl fmt::Display for BadRedirect {
425    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
426        // Following such a redirect drops the request body, and will likely
427        // give an HTTP 200 response even though nobody ever looked at the POST
428        // body.
429        //
430        // This can e.g. happen for login redirects when you post to a
431        // login-protected URL.
432        write!(f, "invalid HTTP {} redirect to {}", self.status, self.to)
433    }
434}
435
436impl error::Error for BadRedirect {}
437
438/// The background task that ships logs to Loki. It must be [`tokio::spawn`]ed
439/// by the calling application.
440///
441/// See the crate's root documentation for an example.
442pub struct BackgroundTask {
443    loki_url: Url,
444    receiver: ReceiverStream<Option<LokiEvent>>,
445    queues: LevelMap<SendQueue>,
446    buffer: Buffer,
447    http_client: reqwest::Client,
448    backoff_count: u32,
449    backoff: Option<Pin<Box<tokio::time::Sleep>>>,
450    quitting: bool,
451    send_task:
452        Option<Pin<Box<dyn Future<Output = Result<(), Box<dyn error::Error>>> + Send + 'static>>>,
453}
454
455impl BackgroundTask {
456    fn new(
457        loki_url: Url,
458        http_headers: reqwest::header::HeaderMap,
459        receiver: mpsc::Receiver<Option<LokiEvent>>,
460        labels: &FormattedLabels,
461    ) -> Result<BackgroundTask, Error> {
462        Ok(BackgroundTask {
463            receiver: ReceiverStream::new(receiver),
464            loki_url: loki_url
465                .join("loki/api/v1/push")
466                .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
467            queues: LevelMap::from_fn(|level| SendQueue::new(labels.finish(level))),
468            buffer: Buffer::new(),
469            http_client: reqwest::Client::builder()
470                .user_agent(concat!(
471                    env!("CARGO_PKG_NAME"),
472                    "/",
473                    env!("CARGO_PKG_VERSION")
474                ))
475                .default_headers(http_headers)
476                .redirect(reqwest::redirect::Policy::custom(|a| {
477                    let status = a.status().as_u16();
478                    if status == 302 || status == 303 {
479                        let to = a.url().clone();
480                        return a.error(BadRedirect { status, to });
481                    }
482                    reqwest::redirect::Policy::default().redirect(a)
483                }))
484                .build()
485                .expect("reqwest client builder"),
486            backoff_count: 0,
487            backoff: None,
488            quitting: false,
489            send_task: None,
490        })
491    }
492    fn backoff_time(&self) -> (bool, Duration) {
493        let backoff_time = if self.backoff_count >= 1 {
494            Duration::from_millis(
495                500u64
496                    .checked_shl(self.backoff_count - 1)
497                    .unwrap_or(u64::MAX),
498            )
499        } else {
500            Duration::from_millis(0)
501        };
502        (
503            backoff_time >= Duration::from_secs(30),
504            cmp::min(backoff_time, Duration::from_secs(600)),
505        )
506    }
507}
508
509impl Future for BackgroundTask {
510    type Output = ();
511    fn poll(mut self: Pin<&mut BackgroundTask>, cx: &mut Context<'_>) -> Poll<()> {
512        let mut default_guard = tracing::subscriber::set_default(NoSubscriber::default());
513
514        while let Poll::Ready(maybe_maybe_item) = Pin::new(&mut self.receiver).poll_next(cx) {
515            match maybe_maybe_item {
516                Some(Some(item)) => self.queues[item.level].push(item),
517                Some(None) => self.quitting = true, // Explicit close.
518                None => self.quitting = true,       // The sender was dropped.
519            }
520        }
521
522        let mut backing_off = if let Some(backoff) = &mut self.backoff {
523            matches!(Pin::new(backoff).poll(cx), Poll::Pending)
524        } else {
525            false
526        };
527        if !backing_off {
528            self.backoff = None;
529        }
530        loop {
531            if let Some(send_task) = &mut self.send_task {
532                match Pin::new(send_task).poll(cx) {
533                    Poll::Ready(res) => {
534                        if let Err(e) = &res {
535                            let (drop_outstanding, backoff_time) = self.backoff_time();
536                            drop(default_guard);
537                            tracing::error!(
538                                error_count = self.backoff_count + 1,
539                                ?backoff_time,
540                                error = %e,
541                                "couldn't send logs to loki",
542                            );
543                            default_guard =
544                                tracing::subscriber::set_default(NoSubscriber::default());
545                            if drop_outstanding {
546                                let num_dropped: usize =
547                                    self.queues.values_mut().map(|q| q.drop_outstanding()).sum();
548                                drop(default_guard);
549                                tracing::error!(
550                                    num_dropped,
551                                    "dropped outstanding messages due to sending errors",
552                                );
553                                default_guard =
554                                    tracing::subscriber::set_default(NoSubscriber::default());
555                            }
556                            self.backoff = Some(Box::pin(tokio::time::sleep(backoff_time)));
557                            self.backoff_count += 1;
558                            backing_off = true;
559                        } else {
560                            self.backoff_count = 0;
561                        }
562                        let res = res.map_err(|_| ());
563                        for q in self.queues.values_mut() {
564                            q.on_send_result(res);
565                        }
566                        self.send_task = None;
567                    }
568                    Poll::Pending => {}
569                }
570            }
571            if self.send_task.is_none()
572                && !backing_off
573                && self.queues.values().any(|q| q.should_send())
574            {
575                let streams = self
576                    .queues
577                    .values_mut()
578                    .map(|q| q.prepare_sending())
579                    .filter(|s| !s.entries.is_empty())
580                    .collect();
581                let body = self
582                    .buffer
583                    .encode(&loki::PushRequest { streams })
584                    .to_owned();
585                let request_builder = self.http_client.post(self.loki_url.clone());
586                self.send_task = Some(Box::pin(
587                    async move {
588                        request_builder
589                            .header(reqwest::header::CONTENT_TYPE, "application/x-snappy")
590                            .body(body)
591                            .send()
592                            .await?
593                            .error_for_status()?;
594                        Ok(())
595                    }
596                    .with_subscriber(NoSubscriber::default()),
597                ));
598            } else {
599                break;
600            }
601        }
602        if self.quitting && self.send_task.is_none() {
603            Poll::Ready(())
604        } else {
605            Poll::Pending
606        }
607    }
608}
609
610struct Buffer {
611    encoded: Vec<u8>,
612    snappy: Vec<u8>,
613}
614
615impl Buffer {
616    pub fn new() -> Buffer {
617        Buffer {
618            encoded: Vec::new(),
619            snappy: Vec::new(),
620        }
621    }
622    pub fn encode<'a, T: prost::Message>(&'a mut self, message: &T) -> &'a [u8] {
623        self.encoded.clear();
624        message
625            .encode(&mut self.encoded)
626            .expect("protobuf encoding is infallible");
627        self.compress_encoded()
628    }
629    fn compress_encoded(&mut self) -> &[u8] {
630        self.snappy
631            .resize(snap::raw::max_compress_len(self.encoded.len()), 0);
632        // Couldn't find documentation except for the promtail source code:
633        // https://github.com/grafana/loki/blob/8c06c546ab15a568f255461f10318dae37e022d3/clients/pkg/promtail/client/batch.go#L101
634        //
635        // In the Go code, `snappy.Encode` is used, which corresponds to the
636        // snappy block format, and not the snappy stream format. hence
637        // `snap::raw` instead of `snap::write` is needed.
638        let snappy_len = snap::raw::Encoder::new()
639            .compress(&self.encoded, &mut self.snappy)
640            .expect("snappy encoding is infallible");
641        &self.snappy[..snappy_len]
642    }
643}
644
645/// Handle to cleanly shut down the `BackgroundTask`.
646///
647/// It'll still try to send all available data and then quit.
648pub struct BackgroundTaskController {
649    sender: mpsc::Sender<Option<LokiEvent>>,
650}
651
652impl BackgroundTaskController {
653    /// Shut down the associated `BackgroundTask`.
654    pub async fn shutdown(&self) {
655        // Ignore the error. If no one is listening, it already shut down.
656        let _ = self.sender.send(None).await;
657    }
658}