Skip to main content

tracing_loki/
lib.rs

1//! A [`tracing`] layer for shipping logs to [Grafana
2//! Loki](https://grafana.com/oss/loki/).
3//!
4//! Usage
5//! =====
6//!
7//! ```rust
8//! use tracing_subscriber::layer::SubscriberExt;
9//! use tracing_subscriber::util::SubscriberInitExt;
10//! use std::process;
11//! use url::Url;
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), tracing_loki::Error> {
15//!     let (layer, task) = tracing_loki::builder()
16//!         .label("host", "mine")?
17//!         .extra_field("pid", format!("{}", process::id()))?
18//!         .build_url(Url::parse("http://127.0.0.1:3100").unwrap())?;
19//!
20//!     // We need to register our layer with `tracing`.
21//!     tracing_subscriber::registry()
22//!         .with(layer)
23//!         // One could add more layers here, for example logging to stdout:
24//!         // .with(tracing_subscriber::fmt::Layer::new())
25//!         .init();
26//!
27//!     // The background task needs to be spawned so the logs actually get
28//!     // delivered.
29//!     tokio::spawn(task);
30//!
31//!     tracing::info!(
32//!         task = "tracing_setup",
33//!         result = "success",
34//!         "tracing successfully set up",
35//!     );
36//!
37//!     Ok(())
38//! }
39//! ```
40
41#![allow(clippy::or_fun_call)]
42#![allow(clippy::type_complexity)]
43#![deny(missing_docs)]
44
45#[cfg(not(feature = "compat-0-2-1"))]
46compile_error!(
47    "The feature `compat-0-2-1` must be enabled to ensure \
48    forward compatibility with future versions of this crate"
49);
50
51/// The re-exported `url` dependency of this crate.
52///
53/// Use this to avoid depending on a potentially-incompatible `url` version yourself.
54pub extern crate url;
55
56use loki_api::logproto as loki;
57use loki_api::prost;
58use serde::Serialize;
59use std::cmp;
60use std::collections::HashMap;
61use std::error;
62use std::fmt;
63use std::future::Future;
64use std::mem;
65use std::pin::Pin;
66use std::task::Context;
67use std::task::Poll;
68use std::time::Duration;
69use std::time::SystemTime;
70use tokio::sync::mpsc;
71use tracing::instrument::WithSubscriber;
72use tracing_core::field::Field;
73use tracing_core::field::Visit;
74use tracing_core::span::Attributes;
75use tracing_core::span::Id;
76use tracing_core::span::Record;
77use tracing_core::Event;
78use tracing_core::Level;
79use tracing_core::Subscriber;
80use tracing_log::NormalizeEvent;
81use tracing_subscriber::layer::Context as TracingContext;
82use tracing_subscriber::registry::LookupSpan;
83use url::Url;
84
85use labels::FormattedLabels;
86use level_map::LevelMap;
87use log_support::SerializeEventFieldMapStrippingLog;
88use no_subscriber::NoSubscriber;
89use ErrorInner as ErrorI;
90
91pub use builder::builder;
92pub use builder::Builder;
93
94mod builder;
95mod labels;
96mod level_map;
97mod log_support;
98mod no_subscriber;
99
100#[cfg(doctest)]
101#[doc = include_str!("../README.md")]
102struct ReadmeDoctests;
103
104fn event_channel() -> (
105    mpsc::Sender<Option<LokiEvent>>,
106    mpsc::Receiver<Option<LokiEvent>>,
107) {
108    mpsc::channel(512)
109}
110
111/// The error type for constructing a [`Layer`].
112///
113/// Nothing except for the [`std::error::Error`] (and [`std::fmt::Debug`] and
114/// [`std::fmt::Display`]) implementation of this type is exposed.
115pub struct Error(ErrorInner);
116
117impl fmt::Debug for Error {
118    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
119        self.0.fmt(f)
120    }
121}
122impl fmt::Display for Error {
123    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124        self.0.fmt(f)
125    }
126}
127impl error::Error for Error {}
128
129#[derive(Debug)]
130enum ErrorInner {
131    DuplicateExtraField(String),
132    DuplicateHttpHeader(String),
133    DuplicateLabel(String),
134    InvalidHttpHeaderName(String),
135    InvalidHttpHeaderValue(String),
136    InvalidLabelCharacter(String, char),
137    InvalidLokiUrl,
138    ReservedLabelLevel,
139}
140
141impl fmt::Display for ErrorInner {
142    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143        use self::ErrorInner::*;
144        match self {
145            DuplicateExtraField(key) => write!(f, "duplicate extra field key {:?}", key),
146            DuplicateHttpHeader(name) => write!(f, "duplicate HTTP header {:?}", name),
147            DuplicateLabel(key) => write!(f, "duplicate label key {:?}", key),
148            InvalidHttpHeaderName(name) => write!(f, "invalid HTTP header name {:?}", name),
149            InvalidHttpHeaderValue(name) => write!(f, "invalid HTTP header value for {:?}", name),
150            InvalidLabelCharacter(key, c) => {
151                write!(f, "invalid label character {:?} in key {:?}", c, key)
152            }
153            InvalidLokiUrl => write!(f, "invalid Loki URL"),
154            ReservedLabelLevel => write!(f, "cannot add custom label for \"level\""),
155        }
156    }
157}
158
159/// Construct a [`Layer`] and its corresponding [`BackgroundTask`].
160///
161/// The [`Layer`] needs to be registered with a
162/// [`tracing_subscriber::Registry`], and the [`BackgroundTask`] needs to be
163/// [`tokio::spawn`]ed.
164///
165/// **Note** that unlike the [`Builder::build_url`] function, this function
166/// **strips off** the path component of `loki_url` before appending
167/// `/loki/api/v1/push`.
168///
169/// See [`builder()`] and this crate's root documentation for a more flexible
170/// method.
171///
172/// # Example
173///
174/// ```rust
175/// use tracing_subscriber::layer::SubscriberExt;
176/// use tracing_subscriber::util::SubscriberInitExt;
177/// use url::Url;
178///
179/// #[tokio::main]
180/// async fn main() -> Result<(), tracing_loki::Error> {
181///     let (layer, task) = tracing_loki::layer(
182///         Url::parse("http://127.0.0.1:3100").unwrap(),
183///         vec![("host".into(), "mine".into())].into_iter().collect(),
184///         vec![].into_iter().collect(),
185///     )?;
186///
187///     // We need to register our layer with `tracing`.
188///     tracing_subscriber::registry()
189///         .with(layer)
190///         // One could add more layers here, for example logging to stdout:
191///         // .with(tracing_subscriber::fmt::Layer::new())
192///         .init();
193///
194///     // The background task needs to be spawned so the logs actually get
195///     // delivered.
196///     tokio::spawn(task);
197///
198///     tracing::info!(
199///         task = "tracing_setup",
200///         result = "success",
201///         "tracing successfully set up",
202///     );
203///
204///     Ok(())
205/// }
206/// ```
207pub fn layer(
208    loki_url: Url,
209    labels: HashMap<String, String>,
210    extra_fields: HashMap<String, String>,
211) -> Result<(Layer, BackgroundTask), Error> {
212    let mut builder = builder();
213    for (key, value) in labels {
214        builder = builder.label(key, value)?;
215    }
216    for (key, value) in extra_fields {
217        builder = builder.extra_field(key, value)?;
218    }
219    builder.build_url(
220        loki_url
221            .join("/")
222            .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
223    )
224}
225
226/// The [`tracing_subscriber::Layer`] implementation for the Loki backend.
227///
228/// See the crate's root documentation for an example.
229pub struct Layer {
230    extra_fields: HashMap<String, String>,
231    sender: mpsc::Sender<Option<LokiEvent>>,
232}
233
234struct LokiEvent {
235    trigger_send: bool,
236    timestamp: SystemTime,
237    level: Level,
238    message: String,
239}
240
241#[derive(Serialize)]
242struct SerializedEvent<'a> {
243    #[serde(flatten)]
244    event: SerializeEventFieldMapStrippingLog<'a>,
245    #[serde(flatten)]
246    extra_fields: &'a HashMap<String, String>,
247    #[serde(flatten)]
248    span_fields: serde_json::Map<String, serde_json::Value>,
249    _spans: &'a [&'a str],
250    _target: &'a str,
251    _module_path: Option<&'a str>,
252    _file: Option<&'a str>,
253    _line: Option<u32>,
254}
255
256#[derive(Default)]
257struct Fields {
258    fields: serde_json::Map<String, serde_json::Value>,
259}
260
261impl Fields {
262    fn record_impl(&mut self, field: &Field, value: serde_json::Value) {
263        self.fields.insert(field.name().into(), value);
264    }
265    fn record<T: Into<serde_json::Value>>(&mut self, field: &Field, value: T) {
266        self.record_impl(field, value.into());
267    }
268}
269
270impl Visit for Fields {
271    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
272        self.record(field, format!("{:?}", value));
273    }
274    fn record_f64(&mut self, field: &Field, value: f64) {
275        self.record(field, value);
276    }
277    fn record_i64(&mut self, field: &Field, value: i64) {
278        self.record(field, value);
279    }
280    fn record_u64(&mut self, field: &Field, value: u64) {
281        self.record(field, value);
282    }
283    fn record_bool(&mut self, field: &Field, value: bool) {
284        self.record(field, value);
285    }
286    fn record_str(&mut self, field: &Field, value: &str) {
287        self.record(field, value);
288    }
289    fn record_error(&mut self, field: &Field, value: &(dyn error::Error + 'static)) {
290        self.record(field, format!("{}", value));
291    }
292}
293
294impl<S: Subscriber + for<'a> LookupSpan<'a>> tracing_subscriber::Layer<S> for Layer {
295    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: TracingContext<'_, S>) {
296        let span = ctx.span(id).expect("Span not found, this is a bug");
297        let mut extensions = span.extensions_mut();
298        if extensions.get_mut::<Fields>().is_none() {
299            let mut fields = Fields::default();
300            attrs.record(&mut fields);
301            extensions.insert(fields);
302        }
303    }
304    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: TracingContext<'_, S>) {
305        let span = ctx.span(id).expect("Span not found, this is a bug");
306        let mut extensions = span.extensions_mut();
307        let fields = extensions.get_mut::<Fields>().expect("unregistered span");
308        values.record(fields);
309    }
310    fn on_event(&self, event: &Event<'_>, ctx: TracingContext<'_, S>) {
311        let timestamp = SystemTime::now();
312        let normalized_meta = event.normalized_metadata();
313        let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
314        let mut span_fields: serde_json::Map<String, serde_json::Value> = Default::default();
315        let spans = event
316            .parent()
317            .cloned()
318            .or_else(|| ctx.current_span().id().cloned())
319            .and_then(|id| {
320                ctx.span_scope(&id).map(|scope| {
321                    scope.from_root().fold(Vec::new(), |mut spans, span| {
322                        span_fields.extend(
323                            span.extensions()
324                                .get::<Fields>()
325                                .expect("unregistered span")
326                                .fields
327                                .iter()
328                                .map(|(f, v)| (f.clone(), v.clone())),
329                        );
330                        spans.push(span.name());
331                        spans
332                    })
333                })
334            })
335            .unwrap_or(Vec::new());
336        // TODO: Anything useful to do when the capacity has been reached?
337        let _ = self.sender.try_send(Some(LokiEvent {
338            trigger_send: !meta.target().starts_with("tracing_loki"),
339            timestamp,
340            level: *meta.level(),
341            message: serde_json::to_string(&SerializedEvent {
342                event: SerializeEventFieldMapStrippingLog(event),
343                extra_fields: &self.extra_fields,
344                span_fields,
345                _spans: &spans,
346                _target: meta.target(),
347                _module_path: meta.module_path(),
348                _file: meta.file(),
349                _line: meta.line(),
350            })
351            .expect("json serialization shouldn't fail"),
352        }));
353    }
354}
355
356struct SendQueue {
357    encoded_labels: String,
358    sending: Vec<LokiEvent>,
359    to_send: Vec<LokiEvent>,
360}
361
362impl SendQueue {
363    fn new(encoded_labels: String) -> SendQueue {
364        SendQueue {
365            encoded_labels,
366            sending: Vec::new(),
367            to_send: Vec::new(),
368        }
369    }
370    fn push(&mut self, event: LokiEvent) {
371        // TODO: Add limit.
372        self.to_send.push(event);
373    }
374    fn drop_outstanding(&mut self) -> usize {
375        let len = self.sending.len();
376        self.sending.clear();
377        len
378    }
379    fn on_send_result(&mut self, result: Result<(), ()>) {
380        match result {
381            Ok(()) => self.sending.clear(),
382            Err(()) => {
383                self.sending.append(&mut self.to_send);
384                mem::swap(&mut self.sending, &mut self.to_send);
385            }
386        }
387    }
388    fn should_send(&self) -> bool {
389        self.to_send.iter().any(|e| e.trigger_send)
390    }
391    fn prepare_sending(&mut self) -> loki::StreamAdapter {
392        if !self.sending.is_empty() {
393            panic!("can only prepare sending while no request is in flight");
394        }
395        mem::swap(&mut self.sending, &mut self.to_send);
396        loki::StreamAdapter {
397            labels: self.encoded_labels.clone(),
398            entries: self
399                .sending
400                .iter()
401                .map(|e| loki::EntryAdapter {
402                    timestamp: Some(e.timestamp.into()),
403                    line: e.message.clone(),
404                })
405                .collect(),
406            // Couldn't find documentation except for the promtail source code:
407            // https://github.com/grafana/loki/blob/8c06c546ab15a568f255461f10318dae37e022d3/clients/pkg/promtail/client/batch.go#L55-L58
408            //
409            // In the Go code, the hash value isn't initialized explicitly,
410            // hence it is set to 0.
411            hash: 0,
412        }
413    }
414}
415
416#[derive(Debug)]
417struct BadRedirect {
418    status: u16,
419    to: Url,
420}
421
422impl fmt::Display for BadRedirect {
423    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424        // Following such a redirect drops the request body, and will likely
425        // give an HTTP 200 response even though nobody ever looked at the POST
426        // body.
427        //
428        // This can e.g. happen for login redirects when you post to a
429        // login-protected URL.
430        write!(f, "invalid HTTP {} redirect to {}", self.status, self.to)
431    }
432}
433
434impl error::Error for BadRedirect {}
435
436/// The background task that ships logs to Loki. It must be [`tokio::spawn`]ed
437/// by the calling application.
438///
439/// See the crate's root documentation for an example.
440pub struct BackgroundTask {
441    loki_url: Url,
442    receiver: mpsc::Receiver<Option<LokiEvent>>,
443    queues: LevelMap<SendQueue>,
444    buffer: Buffer,
445    http_client: reqwest::Client,
446    backoff_count: u32,
447    backoff: Option<Pin<Box<tokio::time::Sleep>>>,
448    quitting: bool,
449    send_task:
450        Option<Pin<Box<dyn Future<Output = Result<(), Box<dyn error::Error>>> + Send + 'static>>>,
451}
452
453impl BackgroundTask {
454    fn new(
455        loki_url: Url,
456        http_headers: reqwest::header::HeaderMap,
457        receiver: mpsc::Receiver<Option<LokiEvent>>,
458        labels: &FormattedLabels,
459    ) -> Result<BackgroundTask, Error> {
460        Ok(BackgroundTask {
461            receiver,
462            loki_url: loki_url
463                .join("loki/api/v1/push")
464                .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
465            queues: LevelMap::from_fn(|level| SendQueue::new(labels.finish(level))),
466            buffer: Buffer::new(),
467            http_client: reqwest::Client::builder()
468                .user_agent(concat!(
469                    env!("CARGO_PKG_NAME"),
470                    "/",
471                    env!("CARGO_PKG_VERSION")
472                ))
473                .default_headers(http_headers)
474                .redirect(reqwest::redirect::Policy::custom(|a| {
475                    let status = a.status().as_u16();
476                    if status == 302 || status == 303 {
477                        let to = a.url().clone();
478                        return a.error(BadRedirect { status, to });
479                    }
480                    reqwest::redirect::Policy::default().redirect(a)
481                }))
482                .build()
483                .expect("reqwest client builder"),
484            backoff_count: 0,
485            backoff: None,
486            quitting: false,
487            send_task: None,
488        })
489    }
490    fn backoff_time(&self) -> (bool, Duration) {
491        let backoff_time = if self.backoff_count >= 1 {
492            Duration::from_millis(
493                500u64
494                    .checked_shl(self.backoff_count - 1)
495                    .unwrap_or(u64::MAX),
496            )
497        } else {
498            Duration::from_millis(0)
499        };
500        (
501            backoff_time >= Duration::from_secs(30),
502            cmp::min(backoff_time, Duration::from_secs(600)),
503        )
504    }
505}
506
507impl Future for BackgroundTask {
508    type Output = ();
509    fn poll(mut self: Pin<&mut BackgroundTask>, cx: &mut Context<'_>) -> Poll<()> {
510        let mut default_guard = tracing::subscriber::set_default(NoSubscriber::default());
511
512        while let Poll::Ready(maybe_maybe_item) = Pin::new(&mut self.receiver).poll_recv(cx) {
513            match maybe_maybe_item {
514                Some(Some(item)) => self.queues[item.level].push(item),
515                Some(None) => self.quitting = true, // Explicit close.
516                None => self.quitting = true,       // The sender was dropped.
517            }
518        }
519
520        let mut backing_off = if let Some(backoff) = &mut self.backoff {
521            matches!(Pin::new(backoff).poll(cx), Poll::Pending)
522        } else {
523            false
524        };
525        if !backing_off {
526            self.backoff = None;
527        }
528        loop {
529            if let Some(send_task) = &mut self.send_task {
530                match Pin::new(send_task).poll(cx) {
531                    Poll::Ready(res) => {
532                        if let Err(e) = &res {
533                            let (drop_outstanding, backoff_time) = self.backoff_time();
534                            drop(default_guard);
535                            tracing::error!(
536                                error_count = self.backoff_count + 1,
537                                ?backoff_time,
538                                error = %e,
539                                "couldn't send logs to loki",
540                            );
541                            default_guard =
542                                tracing::subscriber::set_default(NoSubscriber::default());
543                            if drop_outstanding {
544                                let num_dropped: usize =
545                                    self.queues.values_mut().map(|q| q.drop_outstanding()).sum();
546                                drop(default_guard);
547                                tracing::error!(
548                                    num_dropped,
549                                    "dropped outstanding messages due to sending errors",
550                                );
551                                default_guard =
552                                    tracing::subscriber::set_default(NoSubscriber::default());
553                            }
554                            self.backoff = Some(Box::pin(tokio::time::sleep(backoff_time)));
555                            self.backoff_count += 1;
556                            backing_off = true;
557                        } else {
558                            self.backoff_count = 0;
559                        }
560                        let res = res.map_err(|_| ());
561                        for q in self.queues.values_mut() {
562                            q.on_send_result(res);
563                        }
564                        self.send_task = None;
565                    }
566                    Poll::Pending => {}
567                }
568            }
569            if self.send_task.is_none()
570                && !backing_off
571                && self.queues.values().any(|q| q.should_send())
572            {
573                let streams = self
574                    .queues
575                    .values_mut()
576                    .map(|q| q.prepare_sending())
577                    .filter(|s| !s.entries.is_empty())
578                    .collect();
579                let body = self
580                    .buffer
581                    .encode(&loki::PushRequest { streams })
582                    .to_owned();
583                let request_builder = self.http_client.post(self.loki_url.clone());
584                self.send_task = Some(Box::pin(
585                    async move {
586                        request_builder
587                            .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
588                            .header(reqwest::header::CONTENT_ENCODING, "snappy")
589                            .body(body)
590                            .send()
591                            .await?
592                            .error_for_status()?;
593                        Ok(())
594                    }
595                    .with_subscriber(NoSubscriber::default()),
596                ));
597            } else {
598                break;
599            }
600        }
601        if self.quitting && self.send_task.is_none() {
602            Poll::Ready(())
603        } else {
604            Poll::Pending
605        }
606    }
607}
608
609struct Buffer {
610    encoded: Vec<u8>,
611    snappy: Vec<u8>,
612}
613
614impl Buffer {
615    pub fn new() -> Buffer {
616        Buffer {
617            encoded: Vec::new(),
618            snappy: Vec::new(),
619        }
620    }
621    pub fn encode<'a, T: prost::Message>(&'a mut self, message: &T) -> &'a [u8] {
622        self.encoded.clear();
623        message
624            .encode(&mut self.encoded)
625            .expect("protobuf encoding is infallible");
626        self.compress_encoded()
627    }
628    fn compress_encoded(&mut self) -> &[u8] {
629        self.snappy
630            .resize(snap::raw::max_compress_len(self.encoded.len()), 0);
631        // Couldn't find documentation except for the promtail source code:
632        // https://github.com/grafana/loki/blob/8c06c546ab15a568f255461f10318dae37e022d3/clients/pkg/promtail/client/batch.go#L101
633        //
634        // In the Go code, `snappy.Encode` is used, which corresponds to the
635        // snappy block format, and not the snappy stream format. hence
636        // `snap::raw` instead of `snap::write` is needed.
637        let snappy_len = snap::raw::Encoder::new()
638            .compress(&self.encoded, &mut self.snappy)
639            .expect("snappy encoding is infallible");
640        &self.snappy[..snappy_len]
641    }
642}
643
644/// Handle to cleanly shut down the `BackgroundTask`.
645///
646/// It'll still try to send all available data and then quit.
647pub struct BackgroundTaskController {
648    sender: mpsc::Sender<Option<LokiEvent>>,
649}
650
651impl BackgroundTaskController {
652    /// Shut down the associated `BackgroundTask`.
653    pub async fn shutdown(&self) {
654        // Ignore the error. If no one is listening, it already shut down.
655        let _ = self.sender.send(None).await;
656    }
657}