1#![allow(clippy::or_fun_call)]
42#![allow(clippy::type_complexity)]
43#![deny(missing_docs)]
44
45#[cfg(not(feature = "compat-0-2-1"))]
46compile_error!(
47 "The feature `compat-0-2-1` must be enabled to ensure \
48 forward compatibility with future versions of this crate"
49);
50
51pub extern crate url;
55
56use loki_api::logproto as loki;
57use loki_api::prost;
58use serde::Serialize;
59use std::cmp;
60use std::collections::HashMap;
61use std::error;
62use std::fmt;
63use std::future::Future;
64use std::mem;
65use std::pin::Pin;
66use std::task::Context;
67use std::task::Poll;
68use std::time::Duration;
69use std::time::SystemTime;
70use tokio::sync::mpsc;
71use tracing::instrument::WithSubscriber;
72use tracing_core::field::Field;
73use tracing_core::field::Visit;
74use tracing_core::span::Attributes;
75use tracing_core::span::Id;
76use tracing_core::span::Record;
77use tracing_core::Event;
78use tracing_core::Level;
79use tracing_core::Subscriber;
80use tracing_log::NormalizeEvent;
81use tracing_subscriber::layer::Context as TracingContext;
82use tracing_subscriber::registry::LookupSpan;
83use url::Url;
84
85use labels::FormattedLabels;
86use level_map::LevelMap;
87use log_support::SerializeEventFieldMapStrippingLog;
88use no_subscriber::NoSubscriber;
89use ErrorInner as ErrorI;
90
91pub use builder::builder;
92pub use builder::Builder;
93
94mod builder;
95mod labels;
96mod level_map;
97mod log_support;
98mod no_subscriber;
99
100#[cfg(doctest)]
101#[doc = include_str!("../README.md")]
102struct ReadmeDoctests;
103
104fn event_channel() -> (
105 mpsc::Sender<Option<LokiEvent>>,
106 mpsc::Receiver<Option<LokiEvent>>,
107) {
108 mpsc::channel(512)
109}
110
111pub struct Error(ErrorInner);
116
117impl fmt::Debug for Error {
118 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
119 self.0.fmt(f)
120 }
121}
122impl fmt::Display for Error {
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 self.0.fmt(f)
125 }
126}
127impl error::Error for Error {}
128
129#[derive(Debug)]
130enum ErrorInner {
131 DuplicateExtraField(String),
132 DuplicateHttpHeader(String),
133 DuplicateLabel(String),
134 InvalidHttpHeaderName(String),
135 InvalidHttpHeaderValue(String),
136 InvalidLabelCharacter(String, char),
137 InvalidLokiUrl,
138 ReservedLabelLevel,
139}
140
141impl fmt::Display for ErrorInner {
142 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143 use self::ErrorInner::*;
144 match self {
145 DuplicateExtraField(key) => write!(f, "duplicate extra field key {:?}", key),
146 DuplicateHttpHeader(name) => write!(f, "duplicate HTTP header {:?}", name),
147 DuplicateLabel(key) => write!(f, "duplicate label key {:?}", key),
148 InvalidHttpHeaderName(name) => write!(f, "invalid HTTP header name {:?}", name),
149 InvalidHttpHeaderValue(name) => write!(f, "invalid HTTP header value for {:?}", name),
150 InvalidLabelCharacter(key, c) => {
151 write!(f, "invalid label character {:?} in key {:?}", c, key)
152 }
153 InvalidLokiUrl => write!(f, "invalid Loki URL"),
154 ReservedLabelLevel => write!(f, "cannot add custom label for \"level\""),
155 }
156 }
157}
158
159pub fn layer(
208 loki_url: Url,
209 labels: HashMap<String, String>,
210 extra_fields: HashMap<String, String>,
211) -> Result<(Layer, BackgroundTask), Error> {
212 let mut builder = builder();
213 for (key, value) in labels {
214 builder = builder.label(key, value)?;
215 }
216 for (key, value) in extra_fields {
217 builder = builder.extra_field(key, value)?;
218 }
219 builder.build_url(
220 loki_url
221 .join("/")
222 .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
223 )
224}
225
226pub struct Layer {
230 extra_fields: HashMap<String, String>,
231 sender: mpsc::Sender<Option<LokiEvent>>,
232}
233
234struct LokiEvent {
235 trigger_send: bool,
236 timestamp: SystemTime,
237 level: Level,
238 message: String,
239}
240
241#[derive(Serialize)]
242struct SerializedEvent<'a> {
243 #[serde(flatten)]
244 event: SerializeEventFieldMapStrippingLog<'a>,
245 #[serde(flatten)]
246 extra_fields: &'a HashMap<String, String>,
247 #[serde(flatten)]
248 span_fields: serde_json::Map<String, serde_json::Value>,
249 _spans: &'a [&'a str],
250 _target: &'a str,
251 _module_path: Option<&'a str>,
252 _file: Option<&'a str>,
253 _line: Option<u32>,
254}
255
256#[derive(Default)]
257struct Fields {
258 fields: serde_json::Map<String, serde_json::Value>,
259}
260
261impl Fields {
262 fn record_impl(&mut self, field: &Field, value: serde_json::Value) {
263 self.fields.insert(field.name().into(), value);
264 }
265 fn record<T: Into<serde_json::Value>>(&mut self, field: &Field, value: T) {
266 self.record_impl(field, value.into());
267 }
268}
269
270impl Visit for Fields {
271 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
272 self.record(field, format!("{:?}", value));
273 }
274 fn record_f64(&mut self, field: &Field, value: f64) {
275 self.record(field, value);
276 }
277 fn record_i64(&mut self, field: &Field, value: i64) {
278 self.record(field, value);
279 }
280 fn record_u64(&mut self, field: &Field, value: u64) {
281 self.record(field, value);
282 }
283 fn record_bool(&mut self, field: &Field, value: bool) {
284 self.record(field, value);
285 }
286 fn record_str(&mut self, field: &Field, value: &str) {
287 self.record(field, value);
288 }
289 fn record_error(&mut self, field: &Field, value: &(dyn error::Error + 'static)) {
290 self.record(field, format!("{}", value));
291 }
292}
293
294impl<S: Subscriber + for<'a> LookupSpan<'a>> tracing_subscriber::Layer<S> for Layer {
295 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: TracingContext<'_, S>) {
296 let span = ctx.span(id).expect("Span not found, this is a bug");
297 let mut extensions = span.extensions_mut();
298 if extensions.get_mut::<Fields>().is_none() {
299 let mut fields = Fields::default();
300 attrs.record(&mut fields);
301 extensions.insert(fields);
302 }
303 }
304 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: TracingContext<'_, S>) {
305 let span = ctx.span(id).expect("Span not found, this is a bug");
306 let mut extensions = span.extensions_mut();
307 let fields = extensions.get_mut::<Fields>().expect("unregistered span");
308 values.record(fields);
309 }
310 fn on_event(&self, event: &Event<'_>, ctx: TracingContext<'_, S>) {
311 let timestamp = SystemTime::now();
312 let normalized_meta = event.normalized_metadata();
313 let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
314 let mut span_fields: serde_json::Map<String, serde_json::Value> = Default::default();
315 let spans = event
316 .parent()
317 .cloned()
318 .or_else(|| ctx.current_span().id().cloned())
319 .and_then(|id| {
320 ctx.span_scope(&id).map(|scope| {
321 scope.from_root().fold(Vec::new(), |mut spans, span| {
322 span_fields.extend(
323 span.extensions()
324 .get::<Fields>()
325 .expect("unregistered span")
326 .fields
327 .iter()
328 .map(|(f, v)| (f.clone(), v.clone())),
329 );
330 spans.push(span.name());
331 spans
332 })
333 })
334 })
335 .unwrap_or(Vec::new());
336 let _ = self.sender.try_send(Some(LokiEvent {
338 trigger_send: !meta.target().starts_with("tracing_loki"),
339 timestamp,
340 level: *meta.level(),
341 message: serde_json::to_string(&SerializedEvent {
342 event: SerializeEventFieldMapStrippingLog(event),
343 extra_fields: &self.extra_fields,
344 span_fields,
345 _spans: &spans,
346 _target: meta.target(),
347 _module_path: meta.module_path(),
348 _file: meta.file(),
349 _line: meta.line(),
350 })
351 .expect("json serialization shouldn't fail"),
352 }));
353 }
354}
355
356struct SendQueue {
357 encoded_labels: String,
358 sending: Vec<LokiEvent>,
359 to_send: Vec<LokiEvent>,
360}
361
362impl SendQueue {
363 fn new(encoded_labels: String) -> SendQueue {
364 SendQueue {
365 encoded_labels,
366 sending: Vec::new(),
367 to_send: Vec::new(),
368 }
369 }
370 fn push(&mut self, event: LokiEvent) {
371 self.to_send.push(event);
373 }
374 fn drop_outstanding(&mut self) -> usize {
375 let len = self.sending.len();
376 self.sending.clear();
377 len
378 }
379 fn on_send_result(&mut self, result: Result<(), ()>) {
380 match result {
381 Ok(()) => self.sending.clear(),
382 Err(()) => {
383 self.sending.append(&mut self.to_send);
384 mem::swap(&mut self.sending, &mut self.to_send);
385 }
386 }
387 }
388 fn should_send(&self) -> bool {
389 self.to_send.iter().any(|e| e.trigger_send)
390 }
391 fn prepare_sending(&mut self) -> loki::StreamAdapter {
392 if !self.sending.is_empty() {
393 panic!("can only prepare sending while no request is in flight");
394 }
395 mem::swap(&mut self.sending, &mut self.to_send);
396 loki::StreamAdapter {
397 labels: self.encoded_labels.clone(),
398 entries: self
399 .sending
400 .iter()
401 .map(|e| loki::EntryAdapter {
402 timestamp: Some(e.timestamp.into()),
403 line: e.message.clone(),
404 })
405 .collect(),
406 hash: 0,
412 }
413 }
414}
415
416#[derive(Debug)]
417struct BadRedirect {
418 status: u16,
419 to: Url,
420}
421
422impl fmt::Display for BadRedirect {
423 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424 write!(f, "invalid HTTP {} redirect to {}", self.status, self.to)
431 }
432}
433
434impl error::Error for BadRedirect {}
435
436pub struct BackgroundTask {
441 loki_url: Url,
442 receiver: mpsc::Receiver<Option<LokiEvent>>,
443 queues: LevelMap<SendQueue>,
444 buffer: Buffer,
445 http_client: reqwest::Client,
446 backoff_count: u32,
447 backoff: Option<Pin<Box<tokio::time::Sleep>>>,
448 quitting: bool,
449 send_task:
450 Option<Pin<Box<dyn Future<Output = Result<(), Box<dyn error::Error>>> + Send + 'static>>>,
451}
452
453impl BackgroundTask {
454 fn new(
455 loki_url: Url,
456 http_headers: reqwest::header::HeaderMap,
457 receiver: mpsc::Receiver<Option<LokiEvent>>,
458 labels: &FormattedLabels,
459 ) -> Result<BackgroundTask, Error> {
460 Ok(BackgroundTask {
461 receiver,
462 loki_url: loki_url
463 .join("loki/api/v1/push")
464 .map_err(|_| Error(ErrorI::InvalidLokiUrl))?,
465 queues: LevelMap::from_fn(|level| SendQueue::new(labels.finish(level))),
466 buffer: Buffer::new(),
467 http_client: reqwest::Client::builder()
468 .user_agent(concat!(
469 env!("CARGO_PKG_NAME"),
470 "/",
471 env!("CARGO_PKG_VERSION")
472 ))
473 .default_headers(http_headers)
474 .redirect(reqwest::redirect::Policy::custom(|a| {
475 let status = a.status().as_u16();
476 if status == 302 || status == 303 {
477 let to = a.url().clone();
478 return a.error(BadRedirect { status, to });
479 }
480 reqwest::redirect::Policy::default().redirect(a)
481 }))
482 .build()
483 .expect("reqwest client builder"),
484 backoff_count: 0,
485 backoff: None,
486 quitting: false,
487 send_task: None,
488 })
489 }
490 fn backoff_time(&self) -> (bool, Duration) {
491 let backoff_time = if self.backoff_count >= 1 {
492 Duration::from_millis(
493 500u64
494 .checked_shl(self.backoff_count - 1)
495 .unwrap_or(u64::MAX),
496 )
497 } else {
498 Duration::from_millis(0)
499 };
500 (
501 backoff_time >= Duration::from_secs(30),
502 cmp::min(backoff_time, Duration::from_secs(600)),
503 )
504 }
505}
506
507impl Future for BackgroundTask {
508 type Output = ();
509 fn poll(mut self: Pin<&mut BackgroundTask>, cx: &mut Context<'_>) -> Poll<()> {
510 let mut default_guard = tracing::subscriber::set_default(NoSubscriber::default());
511
512 while let Poll::Ready(maybe_maybe_item) = Pin::new(&mut self.receiver).poll_recv(cx) {
513 match maybe_maybe_item {
514 Some(Some(item)) => self.queues[item.level].push(item),
515 Some(None) => self.quitting = true, None => self.quitting = true, }
518 }
519
520 let mut backing_off = if let Some(backoff) = &mut self.backoff {
521 matches!(Pin::new(backoff).poll(cx), Poll::Pending)
522 } else {
523 false
524 };
525 if !backing_off {
526 self.backoff = None;
527 }
528 loop {
529 if let Some(send_task) = &mut self.send_task {
530 match Pin::new(send_task).poll(cx) {
531 Poll::Ready(res) => {
532 if let Err(e) = &res {
533 let (drop_outstanding, backoff_time) = self.backoff_time();
534 drop(default_guard);
535 tracing::error!(
536 error_count = self.backoff_count + 1,
537 ?backoff_time,
538 error = %e,
539 "couldn't send logs to loki",
540 );
541 default_guard =
542 tracing::subscriber::set_default(NoSubscriber::default());
543 if drop_outstanding {
544 let num_dropped: usize =
545 self.queues.values_mut().map(|q| q.drop_outstanding()).sum();
546 drop(default_guard);
547 tracing::error!(
548 num_dropped,
549 "dropped outstanding messages due to sending errors",
550 );
551 default_guard =
552 tracing::subscriber::set_default(NoSubscriber::default());
553 }
554 self.backoff = Some(Box::pin(tokio::time::sleep(backoff_time)));
555 self.backoff_count += 1;
556 backing_off = true;
557 } else {
558 self.backoff_count = 0;
559 }
560 let res = res.map_err(|_| ());
561 for q in self.queues.values_mut() {
562 q.on_send_result(res);
563 }
564 self.send_task = None;
565 }
566 Poll::Pending => {}
567 }
568 }
569 if self.send_task.is_none()
570 && !backing_off
571 && self.queues.values().any(|q| q.should_send())
572 {
573 let streams = self
574 .queues
575 .values_mut()
576 .map(|q| q.prepare_sending())
577 .filter(|s| !s.entries.is_empty())
578 .collect();
579 let body = self
580 .buffer
581 .encode(&loki::PushRequest { streams })
582 .to_owned();
583 let request_builder = self.http_client.post(self.loki_url.clone());
584 self.send_task = Some(Box::pin(
585 async move {
586 request_builder
587 .header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
588 .header(reqwest::header::CONTENT_ENCODING, "snappy")
589 .body(body)
590 .send()
591 .await?
592 .error_for_status()?;
593 Ok(())
594 }
595 .with_subscriber(NoSubscriber::default()),
596 ));
597 } else {
598 break;
599 }
600 }
601 if self.quitting && self.send_task.is_none() {
602 Poll::Ready(())
603 } else {
604 Poll::Pending
605 }
606 }
607}
608
609struct Buffer {
610 encoded: Vec<u8>,
611 snappy: Vec<u8>,
612}
613
614impl Buffer {
615 pub fn new() -> Buffer {
616 Buffer {
617 encoded: Vec::new(),
618 snappy: Vec::new(),
619 }
620 }
621 pub fn encode<'a, T: prost::Message>(&'a mut self, message: &T) -> &'a [u8] {
622 self.encoded.clear();
623 message
624 .encode(&mut self.encoded)
625 .expect("protobuf encoding is infallible");
626 self.compress_encoded()
627 }
628 fn compress_encoded(&mut self) -> &[u8] {
629 self.snappy
630 .resize(snap::raw::max_compress_len(self.encoded.len()), 0);
631 let snappy_len = snap::raw::Encoder::new()
638 .compress(&self.encoded, &mut self.snappy)
639 .expect("snappy encoding is infallible");
640 &self.snappy[..snappy_len]
641 }
642}
643
644pub struct BackgroundTaskController {
648 sender: mpsc::Sender<Option<LokiEvent>>,
649}
650
651impl BackgroundTaskController {
652 pub async fn shutdown(&self) {
654 let _ = self.sender.send(None).await;
656 }
657}