use std::ffi::OsStr;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{Mutex, OnceLock};
use std::thread::JoinHandle;
use std::time::Duration;
use chrono::{DateTime, FixedOffset, Local, Utc};
use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System};
use tracing::{debug, error, info, warn};
use tracing_appender::{non_blocking, rolling};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
use super::config::*;
use super::constants::{DEFAULT_LOG_LEVEL_CONSOLE, DEFAULT_LOG_LEVEL_FILE};
use crate::error_printer::ErrorPrinter;
use crate::utils::ByteSize;
static LOG_CLEANUP_HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
pub fn wait_for_log_directory_cleanup() {
if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock()
&& let Some(handle) = handle_opt.take()
{
let _ = handle.join();
}
}
pub fn init(cfg: LoggingConfig) {
let mut dir_cleanup_task = None;
let maybe_log_file: Option<PathBuf> = {
match &cfg.logging_mode {
LoggingMode::Directory(log_dir) => {
if cfg.enable_log_dir_cleanup && log_dir.exists() && log_dir.is_dir() {
dir_cleanup_task =
Some(|| run_log_directory_cleanup_background(cfg.log_dir_config.clone(), log_dir));
}
Some(log_file_in_dir(&cfg.log_dir_config, log_dir))
},
LoggingMode::File(path_buf) => Some(path_buf.clone()),
LoggingMode::Console => None,
}
};
if let Some(log_file) = maybe_log_file {
if let Err(e) = init_logging_to_file(&log_file, cfg.use_json) {
init_logging_to_console(&cfg);
error!("Error logging to file {log_file:?} ({e}); falling back to console logging.");
}
} else {
init_logging_to_console(&cfg);
}
info!("{}, xet-core revision {}", &cfg.version, git_version::git_version!(fallback = "unknown"));
if let Some(dir_cleanup_task_fn) = dir_cleanup_task {
dir_cleanup_task_fn();
}
}
fn init_logging_to_console(cfg: &LoggingConfig) {
let registry = tracing_subscriber::registry();
#[cfg(feature = "tokio-console")]
let registry = {
let console_layer = console_subscriber::spawn().with_filter(EnvFilter::new("tokio=trace,runtime=trace"));
registry.with(console_layer)
};
let fmt_layer_base = tracing_subscriber::fmt::layer()
.with_line_number(true)
.with_file(true)
.with_target(false);
let fmt_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(DEFAULT_LOG_LEVEL_CONSOLE))
.unwrap_or_default();
if cfg.use_json {
let filtered_fmt_layer = fmt_layer_base.json().with_filter(fmt_filter);
registry.with(filtered_fmt_layer).init();
} else {
let filtered_fmt_layer = fmt_layer_base.pretty().with_filter(fmt_filter);
registry.with(filtered_fmt_layer).init();
}
}
fn init_logging_to_file(path: &Path, use_json: bool) -> Result<(), std::io::Error> {
let (path, file_name) = match path.file_name() {
Some(name) => (path.to_path_buf(), name),
None => (path.join("xet.log"), OsStr::new("xet.log")),
};
let log_directory = match path.parent() {
Some(parent) => {
std::fs::create_dir_all(parent)?;
parent
},
None => Path::new("."),
};
std::fs::write(&path, [])?;
let file_appender = rolling::never(log_directory, file_name);
let (writer, guard) = non_blocking(file_appender);
static FILE_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
let _ = FILE_GUARD.set(guard);
let registry = tracing_subscriber::registry();
#[cfg(feature = "tokio-console")]
let registry = {
let console_layer = console_subscriber::spawn().with_filter(EnvFilter::new("tokio=trace,runtime=trace"));
registry.with(console_layer)
};
let fmt_layer_base = tracing_subscriber::fmt::layer()
.with_line_number(true)
.with_file(true)
.with_target(false)
.with_writer(writer);
let fmt_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(DEFAULT_LOG_LEVEL_FILE))
.unwrap_or_default();
if use_json {
registry.with(fmt_layer_base.json().with_filter(fmt_filter)).init();
} else {
registry.with(fmt_layer_base.pretty().with_filter(fmt_filter)).init();
};
Ok(())
}
pub fn log_file_in_dir(cfg: &LogDirConfig, dir: impl AsRef<Path>) -> PathBuf {
let now_local: DateTime<Local> = Local::now();
let now_fixed: DateTime<FixedOffset> = now_local.with_timezone(now_local.offset());
let ts = now_fixed.format("%Y%m%dT%H%M%S%3f%z");
let pid = std::process::id();
let prefix = &cfg.filename_prefix;
let filename = format!("{}_{}_{}.log", prefix, ts, pid);
dir.as_ref().join(filename)
}
pub fn parse_log_file_name(path: impl AsRef<Path>) -> Option<(String, DateTime<FixedOffset>, u32)> {
let path = path.as_ref();
let file_name = path.file_name()?.to_str()?;
let file_name = file_name.strip_suffix(".log")?;
let mut parts = file_name.rsplitn(3, '_');
let pid_str = parts.next()?;
let ts_str = parts.next()?;
let prefix = parts.next()?;
let pid: u32 = pid_str.parse().ok()?;
let ts = DateTime::parse_from_str(ts_str, "%Y%m%dT%H%M%S%3f%z").ok()?;
Some((prefix.to_string(), ts, pid))
}
struct CandidateLogFile {
path: PathBuf,
size: u64,
age: Duration,
}
fn pid_protects_log_file(sys: &System, pid: u32, log_timestamp: DateTime<FixedOffset>) -> bool {
let Some(proc) = sys.process(Pid::from_u32(pid)) else {
return false;
};
let log_epoch_secs = log_timestamp.timestamp();
let proc_start_secs = i64::try_from(proc.start_time()).unwrap_or(i64::MAX);
if proc_start_secs > log_epoch_secs {
debug!("PID {pid} likely reused: process start {proc_start_secs}s > log timestamp {log_epoch_secs}s");
return false;
}
true
}
fn run_log_directory_cleanup_background(cfg: LogDirConfig, log_dir: &Path) {
let log_dir = log_dir.to_path_buf();
let handle = std::thread::spawn(move || {
if let Err(e) = run_log_directory_cleanup(cfg, &log_dir) {
warn!("Error during log directory cleanup in {:?}: {}", log_dir, e);
}
});
if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock() {
debug_assert!(handle_opt.is_none(), "Log directory cleanup called multiple times.");
*handle_opt = Some(handle);
}
}
fn run_log_directory_cleanup(cfg: LogDirConfig, log_dir: &Path) -> io::Result<()> {
info!(
"starting log cleanup in {:?} (min_age={:?}, max_retention={:?}, max_size={} bytes)",
log_dir,
cfg.min_deletion_age,
cfg.max_retention_age,
ByteSize::new(cfg.size_limit)
);
let sys = System::new_with_specifics(RefreshKind::nothing().with_processes(ProcessRefreshKind::everything()));
let mut candidates = Vec::<CandidateLogFile>::new();
let mut total_bytes: u64 = 0;
let mut candidate_deletion_bytes: u64 = 0;
let now = Utc::now();
let mut n_log_files = 0usize;
for entry in std::fs::read_dir(log_dir)? {
let Ok(entry) = entry.warn_error_fn(|| format!("read_dir error while reading {log_dir:?}")) else {
continue;
};
let path = entry.path();
let Ok(ft) = entry.file_type() else { continue };
if !ft.is_file() {
continue;
}
let Some((prefix, timestamp, pid)) = parse_log_file_name(&path) else {
debug!("ignoring unparseable log file {:?}", path);
continue;
};
if prefix != cfg.filename_prefix {
debug!("ignoring log file {:?} with differing prefix {prefix} (!={})", path, &cfg.filename_prefix);
continue;
}
let Ok(meta) = entry
.metadata()
.info_error_fn(|| format!("Reading metadata failed for {:?}", path))
else {
continue;
};
let size = meta.len();
total_bytes += size;
n_log_files += 1;
let Ok(age) = (now - timestamp.to_utc()).to_std() else {
debug!("Skipping deletion for very new log file {path:?}");
continue;
};
if age < cfg.min_deletion_age {
debug!("Skipping deletion for new log file {path:?}");
continue;
}
if pid_protects_log_file(&sys, pid, timestamp) {
debug!("Skipping deletion for log file {path:?} with active associated PID.");
continue;
}
candidates.push(CandidateLogFile { path, size, age });
candidate_deletion_bytes += size;
}
info!(
"Log Directory Cleanup: found {:?} of logs in {} log files, with {:?} in {} files eligible for deletion.",
ByteSize::new(total_bytes),
n_log_files,
ByteSize::new(candidate_deletion_bytes),
candidates.len()
);
let mut deleted_bytes: u64 = 0;
candidates.retain(|lf| {
if lf.age > cfg.max_retention_age {
let path = &lf.path;
match std::fs::remove_file(path) {
Ok(_) => {
deleted_bytes += lf.size;
debug!("Log Directory Cleanup: Removed old log file {path:?})");
},
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
deleted_bytes += lf.size;
debug!("Log Directory Cleanup: Old log file {path:?} already deleted.");
} else {
info!("Log Directory Cleanup: Error removing old log file {path:?}, skipping: {e}");
}
},
};
false
} else {
true
}
});
let mut n_pruned = 0;
if total_bytes - deleted_bytes > cfg.size_limit {
candidates.sort_by_key(|lf| std::cmp::Reverse(lf.age));
for lf in &candidates {
if total_bytes - deleted_bytes <= cfg.size_limit {
break;
}
match std::fs::remove_file(&lf.path) {
Ok(()) => {
deleted_bytes += lf.size;
n_pruned += 1;
debug!("Log Directory cleanup: Pruned log file {:?}.", lf.path);
},
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
deleted_bytes += lf.size;
n_pruned += 1;
debug!("Log Directory cleanup: Log file {:?} already deleted, ignoring.", lf.path);
} else {
info!("Log Directory Cleanup: Error removing size-pruned log file {:?}: {}", lf.path, e);
}
},
}
}
}
info!(
"Log Directory Cleanup: deleted {:?} in {} files",
ByteSize::new(deleted_bytes),
candidates.len() - n_pruned
);
Ok(())
}
#[cfg(test)]
mod tests {
use chrono::{Datelike, Timelike};
use super::*;
#[test]
fn round_trip_make_and_parse() {
let dir = Path::new("/tmp");
let cfg = LogDirConfig::default();
let path = log_file_in_dir(&cfg, dir);
let (base, ts, pid) = parse_log_file_name(&path).expect("parse");
assert_eq!(base, cfg.filename_prefix);
assert!(pid > 0);
let fname = path.file_name().unwrap().to_str().unwrap();
let ts_part = fname
.strip_prefix(&format!("{}_", base))
.unwrap()
.strip_suffix(&format!("_{}.log", pid))
.unwrap();
assert_eq!(ts_part, ts.format("%Y%m%dT%H%M%S%3f%z").to_string());
}
#[test]
fn parse_known_file() {
let fname = "app_base_20250915T083210123-0700_12345.log";
let (base, ts, pid) = parse_log_file_name(fname).expect("parse");
assert_eq!(base, "app_base");
assert_eq!(pid, 12345);
assert_eq!(ts.format("%Y%m%dT%H%M%S%3f%z").to_string(), "20250915T083210123-0700");
assert_eq!(ts.year(), 2025);
assert_eq!(ts.month(), 9);
assert_eq!(ts.day(), 15);
assert_eq!(ts.hour(), 8);
assert_eq!(ts.minute(), 32);
assert_eq!(ts.second(), 10);
assert_eq!(ts.timestamp_subsec_millis(), 123);
assert_eq!(ts.offset().local_minus_utc(), -7 * 3600);
}
#[test]
fn allows_underscores_in_base() {
let fname = "my_cool_app_20240102T030405006+0530_999.log";
let (base, ts, pid) = parse_log_file_name(fname).expect("parse");
assert_eq!(base, "my_cool_app");
assert_eq!(pid, 999);
assert_eq!(ts.format("%Y%m%dT%H%M%S%3f%z").to_string(), "20240102T030405006+0530");
}
#[test]
fn parse_with_directory_path() {
let path = Path::new("/var/log/myprog/app_20250915T083210123-0700_12345.log");
let (base, _, pid) = parse_log_file_name(path).expect("parse");
assert_eq!(base, "app");
assert_eq!(pid, 12345);
}
}