#![cfg(feature = "loki")]
#![cfg_attr(docsrs, doc(cfg(feature = "loki")))]
use crate::service::{ LokiConfig, LokiData, LokiMessage, ServiceError};
use crate::{LoggerImpl, LoggerStatus, Message};
use reqwest::blocking::{Client, RequestBuilder, Response};
use std::any::Any;
use std::ops::AddAssign;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, SystemTime};
use crate::service::Loki as LokiService;
use crate::service::StandardLoki as StandardLokiService;
pub struct Loki {
highwater: Mutex<SystemTime>,
worker: Option<JoinHandle<()>>,
sender: Option<Sender<LokiMessage>>,
data: Arc<LokiData>,
service: Arc<dyn LokiService + Send + Sync>,
}
impl Loki {
pub fn new(config: LokiConfig) -> Box<Loki> {
Self::with_service(config, Box::new(StandardLokiService {}))
}
pub fn with_service(
config: LokiConfig,
service: Box<dyn LokiService + Send + Sync>,
) -> Box<Loki> {
let mut post_url = config.url.clone();
post_url.push_str("loki/api/v1/push");
let data = Arc::new(LokiData {
client: Self::build_client(&config),
config,
post_url,
});
let work_data = data.clone();
let service: Arc<dyn LokiService + Send + Sync> = Arc::from(service);
let work_service = service.clone();
let (sender, receiver) = std::sync::mpsc::channel::<LokiMessage>();
Box::new(Loki {
highwater: Mutex::new(SystemTime::now()),
worker: Some(thread::spawn(move || {
work_service.work(receiver, work_data)
})),
sender: Some(sender),
data,
service,
})
}
pub fn build_client(config: &LokiConfig) -> Client {
Client::builder()
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout)
.build()
.expect("Failed to build reqwest client")
}
pub fn request_post(payload: String, data: &Arc<LokiData>) -> Result<Response, ServiceError> {
let mut request = data
.client
.post(data.post_url.as_str())
.header("Content-Type", "application/json")
.body(payload);
request = Loki::request_auth(request, &data);
request.send().map_err(ServiceError::Network)
}
pub fn request_auth(mut request: RequestBuilder, data: &Arc<LokiData>) -> RequestBuilder {
if let Some(auth) = &data.config.basic_auth {
request = request.basic_auth(&auth.username, auth.password.as_deref());
}
if let Some(token) = &data.config.bearer_auth {
request = request.bearer_auth(token);
}
request
}
}
impl LoggerImpl for Loki {
fn status(&self) -> LoggerStatus {
let mut url = self.data.config.url.clone();
url.push_str("ready");
let mut request = self.data.client.get(&url);
request = Self::request_auth(request, &self.data);
request
.send()
.ok()
.map(|response| match response.status().is_success() {
true => LoggerStatus::Running,
false => LoggerStatus::Broken,
})
.unwrap_or(LoggerStatus::Broken)
}
fn log(&self, message: Message) {
let mut timestamp = SystemTime::now();
if let Ok(mut highwater) = self.highwater.lock() {
if timestamp <= *highwater {
highwater.add_assign(Duration::new(0, 1));
timestamp = *highwater
}
}
if let Some(sender) = &self.sender {
if let Err(error) = sender.send(LokiMessage { timestamp, message }) {
let message = error.0;
self.service
.fallback(&ServiceError::LockPoisoned, &message.message);
}
} else {
self.service.fallback(&ServiceError::LockPoisoned, &message);
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl Drop for Loki {
fn drop(&mut self) {
self.sender = None;
if let Some(worker) = self.worker.take() {
let _ = worker.join();
}
}
}