1#![allow(clippy::or_fun_call)]
42#![allow(clippy::type_complexity)]
43#![deny(missing_docs)]
44
45#[cfg(not(feature = "compat-0-2-1"))]
46compile_error!(
47 "The feature `compat-0-2-1` must be enabled to ensure \
48 forward compatibility with future versions of this crate"
49);
50
51pub extern crate url;
55
56use loki_api::logproto as loki;
57use loki_api::prost;
58use serde::Serialize;
59use std::cmp;
60use std::collections::HashMap;
61use std::error;
62use std::fmt;
63use std::future::Future;
64use std::mem;
65use std::pin::Pin;
66use std::task::Context;
67use std::task::Poll;
68use std::time::Duration;
69use std::time::SystemTime;
70use tokio::sync::mpsc;
71use tokio_stream::wrappers::ReceiverStream;
72use tokio_stream::Stream;
73use tracing::instrument::WithSubscriber;
74use tracing_core::field::Field;
75use tracing_core::field::Visit;
76use tracing_core::span::Attributes;
77use tracing_core::span::Id;
78use tracing_core::span::Record;
79use tracing_core::Event;
80use tracing_core::Level;
81use tracing_core::Subscriber;
82use tracing_log::NormalizeEvent;
83use tracing_subscriber::layer::Context as TracingContext;
84use tracing_subscriber::registry::LookupSpan;
85use url::Url;
86
87use labels::FormattedLabels;
88use level_map::LevelMap;
89use log_support::SerializeEventFieldMapStrippingLog;
90use no_subscriber::NoSubscriber;
91use ErrorInner as ErrorI;
92
93pub use builder::builder;
94pub use builder::Builder;
95
96mod builder;
97mod labels;
98mod level_map;
99mod log_support;
100mod no_subscriber;
101
102#[cfg(doctest)]
103#[doc = include_str!("../README.md")]
104struct ReadmeDoctests;
105
106fn event_channel() -> (
107 mpsc::Sender<Option<LokiEvent>>,
108 mpsc::Receiver<Option<LokiEvent>>,
109) {
110 mpsc::channel(512)
111}
112
113pub struct Error(ErrorInner);
118
119impl fmt::Debug for Error {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 self.0.fmt(f)
122 }
123}
124impl fmt::Display for Error {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 self.0.fmt(f)
127 }
128}
129impl error::Error for Error {}
130
131#[derive(Debug)]
132enum ErrorInner {
133 DuplicateExtraField(String),
134 DuplicateHttpHeader(String),
135 DuplicateLabel(String),
136 InvalidHttpHeaderName(String),
137 InvalidHttpHeaderValue(String),
138 InvalidLabelCharacter(String, char),
139 InvalidLokiUrl,
140 ReservedLabelLevel,
141}
142
143impl fmt::Display for ErrorInner {
144 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145 use self::ErrorInner::*;
146 match self {
147 DuplicateExtraField(key) => write!(f, "duplicate extra field key {:?}", key),
148 DuplicateHttpHeader(name) => write!(f, "duplicate HTTP header {:?}", name),
149 DuplicateLabel(key) => write!(f, "duplicate label key {:?}", key),
150 InvalidHttpHeaderName(name) => write!(f, "invalid HTTP header name {:?}", name),
151 InvalidHttpHeaderValue(name) => write!(f, "invalid HTTP header value for {:?}", name),
152 InvalidLabelCharacter(key, c) => {
153 write!(f, "invalid label character {:?} in key {:?}", c, key)
154 }
155 InvalidLokiUrl => write!(f, "invalid Loki URL"),
156 ReservedLabelLevel => write!(f, "cannot add custom label for \"level\""),
157 }
158 }
159}
160
161pub fn layer(
210 loki_url: Url,
211 labels: HashMap<String, String>,
212 extra_fields: HashMap<String, String>,
213) -> Result<(Layer, BackgroundTask), Error> {
214 let mut builder = builder();
215 for (key, value) in labels {
216 builder = builder.label(key, value)?;
217 }
218 for (key, value) in extra_fields {
219 builder = builder.extra_field(key, value)?;
220 }
221 builder.build_url(
222 loki_url
223 .join("/")
224 .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
225 )
226}
227
228pub struct Layer {
232 extra_fields: HashMap<String, String>,
233 sender: mpsc::Sender<Option<LokiEvent>>,
234}
235
236struct LokiEvent {
237 trigger_send: bool,
238 timestamp: SystemTime,
239 level: Level,
240 message: String,
241}
242
243#[derive(Serialize)]
244struct SerializedEvent<'a> {
245 #[serde(flatten)]
246 event: SerializeEventFieldMapStrippingLog<'a>,
247 #[serde(flatten)]
248 extra_fields: &'a HashMap<String, String>,
249 #[serde(flatten)]
250 span_fields: serde_json::Map<String, serde_json::Value>,
251 _spans: &'a [&'a str],
252 _target: &'a str,
253 _module_path: Option<&'a str>,
254 _file: Option<&'a str>,
255 _line: Option<u32>,
256}
257
258#[derive(Default)]
259struct Fields {
260 fields: serde_json::Map<String, serde_json::Value>,
261}
262
263impl Fields {
264 fn record_impl(&mut self, field: &Field, value: serde_json::Value) {
265 self.fields.insert(field.name().into(), value);
266 }
267 fn record<T: Into<serde_json::Value>>(&mut self, field: &Field, value: T) {
268 self.record_impl(field, value.into());
269 }
270}
271
272impl Visit for Fields {
273 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
274 self.record(field, format!("{:?}", value));
275 }
276 fn record_f64(&mut self, field: &Field, value: f64) {
277 self.record(field, value);
278 }
279 fn record_i64(&mut self, field: &Field, value: i64) {
280 self.record(field, value);
281 }
282 fn record_u64(&mut self, field: &Field, value: u64) {
283 self.record(field, value);
284 }
285 fn record_bool(&mut self, field: &Field, value: bool) {
286 self.record(field, value);
287 }
288 fn record_str(&mut self, field: &Field, value: &str) {
289 self.record(field, value);
290 }
291 fn record_error(&mut self, field: &Field, value: &(dyn error::Error + 'static)) {
292 self.record(field, format!("{}", value));
293 }
294}
295
296impl<S: Subscriber + for<'a> LookupSpan<'a>> tracing_subscriber::Layer<S> for Layer {
297 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: TracingContext<'_, S>) {
298 let span = ctx.span(id).expect("Span not found, this is a bug");
299 let mut extensions = span.extensions_mut();
300 if extensions.get_mut::<Fields>().is_none() {
301 let mut fields = Fields::default();
302 attrs.record(&mut fields);
303 extensions.insert(fields);
304 }
305 }
306 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: TracingContext<'_, S>) {
307 let span = ctx.span(id).expect("Span not found, this is a bug");
308 let mut extensions = span.extensions_mut();
309 let fields = extensions.get_mut::<Fields>().expect("unregistered span");
310 values.record(fields);
311 }
312 fn on_event(&self, event: &Event<'_>, ctx: TracingContext<'_, S>) {
313 let timestamp = SystemTime::now();
314 let normalized_meta = event.normalized_metadata();
315 let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
316 let mut span_fields: serde_json::Map<String, serde_json::Value> = Default::default();
317 let spans = event
318 .parent()
319 .cloned()
320 .or_else(|| ctx.current_span().id().cloned())
321 .and_then(|id| {
322 ctx.span_scope(&id).map(|scope| {
323 scope.from_root().fold(Vec::new(), |mut spans, span| {
324 span_fields.extend(
325 span.extensions()
326 .get::<Fields>()
327 .expect("unregistered span")
328 .fields
329 .iter()
330 .map(|(f, v)| (f.clone(), v.clone())),
331 );
332 spans.push(span.name());
333 spans
334 })
335 })
336 })
337 .unwrap_or(Vec::new());
338 let _ = self.sender.try_send(Some(LokiEvent {
340 trigger_send: !meta.target().starts_with("tracing_loki"),
341 timestamp,
342 level: *meta.level(),
343 message: serde_json::to_string(&SerializedEvent {
344 event: SerializeEventFieldMapStrippingLog(event),
345 extra_fields: &self.extra_fields,
346 span_fields,
347 _spans: &spans,
348 _target: meta.target(),
349 _module_path: meta.module_path(),
350 _file: meta.file(),
351 _line: meta.line(),
352 })
353 .expect("json serialization shouldn't fail"),
354 }));
355 }
356}
357
358struct SendQueue {
359 encoded_labels: String,
360 sending: Vec<LokiEvent>,
361 to_send: Vec<LokiEvent>,
362}
363
364impl SendQueue {
365 fn new(encoded_labels: String) -> SendQueue {
366 SendQueue {
367 encoded_labels,
368 sending: Vec::new(),
369 to_send: Vec::new(),
370 }
371 }
372 fn push(&mut self, event: LokiEvent) {
373 self.to_send.push(event);
375 }
376 fn drop_outstanding(&mut self) -> usize {
377 let len = self.sending.len();
378 self.sending.clear();
379 len
380 }
381 fn on_send_result(&mut self, result: Result<(), ()>) {
382 match result {
383 Ok(()) => self.sending.clear(),
384 Err(()) => {
385 self.sending.append(&mut self.to_send);
386 mem::swap(&mut self.sending, &mut self.to_send);
387 }
388 }
389 }
390 fn should_send(&self) -> bool {
391 self.to_send.iter().any(|e| e.trigger_send)
392 }
393 fn prepare_sending(&mut self) -> loki::StreamAdapter {
394 if !self.sending.is_empty() {
395 panic!("can only prepare sending while no request is in flight");
396 }
397 mem::swap(&mut self.sending, &mut self.to_send);
398 loki::StreamAdapter {
399 labels: self.encoded_labels.clone(),
400 entries: self
401 .sending
402 .iter()
403 .map(|e| loki::EntryAdapter {
404 timestamp: Some(e.timestamp.into()),
405 line: e.message.clone(),
406 })
407 .collect(),
408 hash: 0,
414 }
415 }
416}
417
418#[derive(Debug)]
419struct BadRedirect {
420 status: u16,
421 to: Url,
422}
423
424impl fmt::Display for BadRedirect {
425 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
426 write!(f, "invalid HTTP {} redirect to {}", self.status, self.to)
433 }
434}
435
436impl error::Error for BadRedirect {}
437
438pub struct BackgroundTask {
443 loki_url: Url,
444 receiver: ReceiverStream<Option<LokiEvent>>,
445 queues: LevelMap<SendQueue>,
446 buffer: Buffer,
447 http_client: reqwest::Client,
448 backoff_count: u32,
449 backoff: Option<Pin<Box<tokio::time::Sleep>>>,
450 quitting: bool,
451 send_task:
452 Option<Pin<Box<dyn Future<Output = Result<(), Box<dyn error::Error>>> + Send + 'static>>>,
453}
454
455impl BackgroundTask {
456 fn new(
457 loki_url: Url,
458 http_headers: reqwest::header::HeaderMap,
459 receiver: mpsc::Receiver<Option<LokiEvent>>,
460 labels: &FormattedLabels,
461 ) -> Result<BackgroundTask, Error> {
462 Ok(BackgroundTask {
463 receiver: ReceiverStream::new(receiver),
464 loki_url: loki_url
465 .join("loki/api/v1/push")
466 .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
467 queues: LevelMap::from_fn(|level| SendQueue::new(labels.finish(level))),
468 buffer: Buffer::new(),
469 http_client: reqwest::Client::builder()
470 .user_agent(concat!(
471 env!("CARGO_PKG_NAME"),
472 "/",
473 env!("CARGO_PKG_VERSION")
474 ))
475 .default_headers(http_headers)
476 .redirect(reqwest::redirect::Policy::custom(|a| {
477 let status = a.status().as_u16();
478 if status == 302 || status == 303 {
479 let to = a.url().clone();
480 return a.error(BadRedirect { status, to });
481 }
482 reqwest::redirect::Policy::default().redirect(a)
483 }))
484 .build()
485 .expect("reqwest client builder"),
486 backoff_count: 0,
487 backoff: None,
488 quitting: false,
489 send_task: None,
490 })
491 }
492 fn backoff_time(&self) -> (bool, Duration) {
493 let backoff_time = if self.backoff_count >= 1 {
494 Duration::from_millis(
495 500u64
496 .checked_shl(self.backoff_count - 1)
497 .unwrap_or(u64::MAX),
498 )
499 } else {
500 Duration::from_millis(0)
501 };
502 (
503 backoff_time >= Duration::from_secs(30),
504 cmp::min(backoff_time, Duration::from_secs(600)),
505 )
506 }
507}
508
509impl Future for BackgroundTask {
510 type Output = ();
511 fn poll(mut self: Pin<&mut BackgroundTask>, cx: &mut Context<'_>) -> Poll<()> {
512 let mut default_guard = tracing::subscriber::set_default(NoSubscriber::default());
513
514 while let Poll::Ready(maybe_maybe_item) = Pin::new(&mut self.receiver).poll_next(cx) {
515 match maybe_maybe_item {
516 Some(Some(item)) => self.queues[item.level].push(item),
517 Some(None) => self.quitting = true, None => self.quitting = true, }
520 }
521
522 let mut backing_off = if let Some(backoff) = &mut self.backoff {
523 matches!(Pin::new(backoff).poll(cx), Poll::Pending)
524 } else {
525 false
526 };
527 if !backing_off {
528 self.backoff = None;
529 }
530 loop {
531 if let Some(send_task) = &mut self.send_task {
532 match Pin::new(send_task).poll(cx) {
533 Poll::Ready(res) => {
534 if let Err(e) = &res {
535 let (drop_outstanding, backoff_time) = self.backoff_time();
536 drop(default_guard);
537 tracing::error!(
538 error_count = self.backoff_count + 1,
539 ?backoff_time,
540 error = %e,
541 "couldn't send logs to loki",
542 );
543 default_guard =
544 tracing::subscriber::set_default(NoSubscriber::default());
545 if drop_outstanding {
546 let num_dropped: usize =
547 self.queues.values_mut().map(|q| q.drop_outstanding()).sum();
548 drop(default_guard);
549 tracing::error!(
550 num_dropped,
551 "dropped outstanding messages due to sending errors",
552 );
553 default_guard =
554 tracing::subscriber::set_default(NoSubscriber::default());
555 }
556 self.backoff = Some(Box::pin(tokio::time::sleep(backoff_time)));
557 self.backoff_count += 1;
558 backing_off = true;
559 } else {
560 self.backoff_count = 0;
561 }
562 let res = res.map_err(|_| ());
563 for q in self.queues.values_mut() {
564 q.on_send_result(res);
565 }
566 self.send_task = None;
567 }
568 Poll::Pending => {}
569 }
570 }
571 if self.send_task.is_none()
572 && !backing_off
573 && self.queues.values().any(|q| q.should_send())
574 {
575 let streams = self
576 .queues
577 .values_mut()
578 .map(|q| q.prepare_sending())
579 .filter(|s| !s.entries.is_empty())
580 .collect();
581 let body = self
582 .buffer
583 .encode(&loki::PushRequest { streams })
584 .to_owned();
585 let request_builder = self.http_client.post(self.loki_url.clone());
586 self.send_task = Some(Box::pin(
587 async move {
588 request_builder
589 .header(reqwest::header::CONTENT_TYPE, "application/x-snappy")
590 .body(body)
591 .send()
592 .await?
593 .error_for_status()?;
594 Ok(())
595 }
596 .with_subscriber(NoSubscriber::default()),
597 ));
598 } else {
599 break;
600 }
601 }
602 if self.quitting && self.send_task.is_none() {
603 Poll::Ready(())
604 } else {
605 Poll::Pending
606 }
607 }
608}
609
610struct Buffer {
611 encoded: Vec<u8>,
612 snappy: Vec<u8>,
613}
614
615impl Buffer {
616 pub fn new() -> Buffer {
617 Buffer {
618 encoded: Vec::new(),
619 snappy: Vec::new(),
620 }
621 }
622 pub fn encode<'a, T: prost::Message>(&'a mut self, message: &T) -> &'a [u8] {
623 self.encoded.clear();
624 message
625 .encode(&mut self.encoded)
626 .expect("protobuf encoding is infallible");
627 self.compress_encoded()
628 }
629 fn compress_encoded(&mut self) -> &[u8] {
630 self.snappy
631 .resize(snap::raw::max_compress_len(self.encoded.len()), 0);
632 let snappy_len = snap::raw::Encoder::new()
639 .compress(&self.encoded, &mut self.snappy)
640 .expect("snappy encoding is infallible");
641 &self.snappy[..snappy_len]
642 }
643}
644
645pub struct BackgroundTaskController {
649 sender: mpsc::Sender<Option<LokiEvent>>,
650}
651
652impl BackgroundTaskController {
653 pub async fn shutdown(&self) {
655 let _ = self.sender.send(None).await;
657 }
658}