#![cfg(feature = "loki")]
#![cfg_attr(docsrs, doc(cfg(feature = "loki")))]
use crate::service::{HttpError, LokiData, LokiMessage, ServiceError};
use crate::{Fallback, LokiLogger, Message};
use chrono::{SecondsFormat, Utc};
use reqwest::blocking::Response;
use serde_json::json;
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::time::UNIX_EPOCH;
pub trait Loki: Fallback {
fn work(&self, receiver: Receiver<LokiMessage>, data: Arc<LokiData>) {
let mut messages = std::collections::BTreeMap::<String, Vec<LokiMessage>>::new();
while let Ok(message) = receiver.recv() {
let level = message.message.level().to_string();
messages.entry(level).or_insert_with(Vec::new).push(message);
while let Ok(message) = receiver.try_recv() {
let level = message.message.level().to_string();
messages.entry(level).or_insert_with(Vec::new).push(message);
}
for (level, batch) in &mut messages {
if !batch.is_empty() {
let _ = self.work_batch(level, batch, &data);
}
} } }
fn work_batch(
&self,
level: &str,
batch: &mut Vec<LokiMessage>,
data: &Arc<LokiData>,
) -> Result<Response, ServiceError> {
let payload_batch = batch
.iter()
.map(|message| {
let timestamp = message
.timestamp
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_nanos();
json!([timestamp.to_string(), message.message.content().to_string()])
})
.collect::<Vec<_>>();
let payload = json!({
"streams": [{
"stream": {
"app": data.config.get_app(),
"job": data.config.get_job(),
"env": data.config.get_env(),
"level": level,
},
"values": payload_batch
}]
});
let txt_payload = payload.to_string();
let mut response = LokiLogger::request_post(txt_payload, &data);
let needs_retry = match &response {
Ok(response) => !response.status().is_success(),
Err(_) => true,
};
if needs_retry {
let mut i = 0usize;
let payload = payload.to_string();
response = loop {
let current_response = LokiLogger::request_post(payload.clone(), &data);
let is_success = match ¤t_response {
Ok(current_response) => current_response.status().is_success(),
Err(_) => false,
};
if is_success {
break current_response;
}
if i >= data.config.get_max_retries() {
match current_response {
Ok(current_response) => {
break Err(ServiceError::Http(HttpError::new(
current_response.status().as_u16(),
)));
}
Err(_) => {
break current_response;
}
}
}
i += 1;
} }
if let Err(error) = &response {
for message in &*batch {
self.fallback(error, &message.message);
}
}
batch.clear();
response
}
}
pub struct StandardLoki {}
impl Loki for StandardLoki {}
impl Fallback for StandardLoki {
fn fallback(&self, error: &ServiceError, msg: &Message) {
let now: chrono::DateTime<Utc> = msg.instant().into();
let now = now.to_rfc3339_opts(SecondsFormat::Nanos, true);
match error {
ServiceError::Http(e) => eprintln!("Loki rejected log: Status {}", e.status_code()),
ServiceError::Network(e) => eprintln!("Loki network error: {}", e),
_ => eprintln!("Loki service failure: {}", error),
}
println!("{} [{}] | {}", now, msg.level(), msg.content());
}
}