use std::borrow::Cow;
use std::io::stderr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use chrono::Utc;
use serde::Serialize;
use serde_json::Value;
use crate::constants::{DEFAULT_BUFFER_FULL_LAST_WARN_MS, DEFAULT_TIMESTAMP_KEY};
use crate::logger::LoggerContext;
use crate::utils::{FormatState, write_log_line};
use crate::{
colors::ColorSettings,
constants::{
DEFAULT_BATCH_DURATION_MS, DEFAULT_BATCH_SIZE, DEFAULT_BUFFER_SIZE,
DEFAULT_TIMESTAMP_FORMAT,
},
};
use super::{levels::LogLevel, options::LoggerOptions};
#[derive(Serialize)]
pub struct LogObject {
pub(crate) log_level: LogLevel,
pub(crate) data: Value,
#[serde(skip)] pub(crate) timestamp: chrono::DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) message: Option<String>,
#[serde(skip)] pub(crate) context: Arc<LoggerContext>,
}
pub struct ShutdownHandle {
shutdown_sender: Mutex<Option<crossbeam_channel::Sender<()>>>,
worker_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
}
impl ShutdownHandle {
pub(crate) const fn new(
shutdown_sender: crossbeam_channel::Sender<()>,
worker_thread: std::thread::JoinHandle<()>,
) -> Self {
Self {
shutdown_sender: Mutex::new(Some(shutdown_sender)),
worker_thread: Mutex::new(Some(worker_thread)),
}
}
pub(crate) fn shutdown(&self) {
if let Ok(mut sender) = self.shutdown_sender.lock() {
sender.take();
}
if let Ok(mut handle) = self.worker_thread.lock() {
if let Some(thread) = handle.take() {
let _ = thread.join();
}
}
}
}
impl Drop for ShutdownHandle {
fn drop(&mut self) {
self.shutdown();
}
}
pub struct Logger {
pub(crate) log_sender: crossbeam_channel::Sender<LogObject>,
pub(crate) min_level: LogLevel,
pub(crate) shutdown_handle: Arc<ShutdownHandle>,
pub(crate) context: Arc<LoggerContext>,
pub(crate) format_state: Arc<FormatState>,
pub(crate) buffer_full_last_warn_ms: AtomicU64,
}
impl Logger {
#[must_use]
pub fn init() -> LoggerOptions {
LoggerOptions {
buffer_size: DEFAULT_BUFFER_SIZE,
batch_size: DEFAULT_BATCH_SIZE,
batch_duration_ms: DEFAULT_BATCH_DURATION_MS,
min_level: LogLevel::Debug,
timestamp_format: DEFAULT_TIMESTAMP_FORMAT.to_string(),
timestamp_key: DEFAULT_TIMESTAMP_KEY.to_string(),
color_settings: ColorSettings::default(),
context: LoggerContext::new(),
pretty: false,
}
}
fn log<T: Serialize>(&self, message: Option<Cow<'static, str>>, data: &T, log_level: LogLevel) {
let value = match serde_json::to_value(data) {
Ok(v) => v,
Err(e) => {
eprintln!("Failed to serialize {e}");
return;
}
};
let log_object = LogObject {
log_level,
data: value,
message: message.map(|m| m.into_owned()),
timestamp: Utc::now(),
context: Arc::clone(&self.context),
};
if let Err(err) = self.log_sender.try_send(log_object) {
let mut stderr = stderr().lock();
match err {
crossbeam_channel::TrySendError::Full(log) => {
let now = chrono::Utc::now().timestamp_millis() as u64;
let last = self.buffer_full_last_warn_ms.load(Ordering::Relaxed);
if now.saturating_sub(last) >= DEFAULT_BUFFER_FULL_LAST_WARN_MS
&& self
.buffer_full_last_warn_ms
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
let warning = LogObject {
message: None,
log_level: LogLevel::Warn,
data: serde_json::to_value("Logger buffer full - consider increasing the buffer_size! This log bypassed batching.").unwrap(),
timestamp: Utc::now(),
context: Arc::clone(&self.context),
};
write_log_line(&mut stderr, &warning, &self.format_state).ok();
}
let inline = LogObject {
log_level: log.log_level,
data: log.data,
message: log.message,
timestamp: Utc::now(),
context: Arc::clone(&self.context),
};
write_log_line(&mut stderr, &inline, &self.format_state).ok();
}
crossbeam_channel::TrySendError::Disconnected(log) => {
let inline = LogObject {
log_level: log.log_level,
data: log.data,
message: log.message,
timestamp: Utc::now(),
context: Arc::clone(&self.context),
};
write_log_line(&mut stderr, &inline, &self.format_state).ok();
}
}
}
}
pub fn info<T: Serialize>(&self, data: &T) {
self.log(None, data, LogLevel::Info);
}
pub fn error<T: Serialize>(&self, data: &T) {
self.log(None, data, LogLevel::Error);
}
pub fn warn<T: Serialize>(&self, data: &T) {
self.log(None, data, LogLevel::Warn);
}
pub fn debug<T: Serialize>(&self, data: &T) {
self.log(None, data, LogLevel::Debug);
}
pub fn __log_with_message<T: Serialize>(
&self,
message: Option<Cow<'static, str>>,
data: &T,
level: LogLevel,
) {
if level < self.min_level {
return;
}
self.log(message, data, level);
}
}