1use std::ffi::OsStr;
2use std::io;
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5use std::thread::JoinHandle;
6use std::time::Duration;
7
8use chrono::{DateTime, FixedOffset, Local, Utc};
9use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System};
10use tracing::{debug, error, info, warn};
11use tracing_appender::{non_blocking, rolling};
12use tracing_subscriber::layer::SubscriberExt;
13use tracing_subscriber::util::SubscriberInitExt;
14use tracing_subscriber::{EnvFilter, Layer};
15
16use super::config::*;
17use super::constants::{DEFAULT_LOG_LEVEL_CONSOLE, DEFAULT_LOG_LEVEL_FILE};
18use crate::error_printer::ErrorPrinter;
19use crate::utils::ByteSize;
20
21static LOG_CLEANUP_HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
23
24pub fn wait_for_log_directory_cleanup() {
27 if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock()
28 && let Some(handle) = handle_opt.take()
29 {
30 let _ = handle.join();
31 }
32}
33
34pub fn init(cfg: LoggingConfig) {
36 let mut dir_cleanup_task = None;
37
38 let maybe_log_file: Option<PathBuf> = {
39 match &cfg.logging_mode {
40 LoggingMode::Directory(log_dir) => {
41 if cfg.enable_log_dir_cleanup && log_dir.exists() && log_dir.is_dir() {
42 dir_cleanup_task =
43 Some(|| run_log_directory_cleanup_background(cfg.log_dir_config.clone(), log_dir));
44 }
45
46 Some(log_file_in_dir(&cfg.log_dir_config, log_dir))
47 },
48 LoggingMode::File(path_buf) => Some(path_buf.clone()),
49 LoggingMode::Console => None,
50 }
51 };
52
53 if let Some(log_file) = maybe_log_file {
55 if let Err(e) = init_logging_to_file(&log_file, cfg.use_json) {
57 init_logging_to_console(&cfg);
58 error!("Error logging to file {log_file:?} ({e}); falling back to console logging.");
59 }
60 } else {
61 init_logging_to_console(&cfg);
62 }
63
64 info!("{}, xet-core revision {}", &cfg.version, git_version::git_version!(fallback = "unknown"));
66
67 if let Some(dir_cleanup_task_fn) = dir_cleanup_task {
68 dir_cleanup_task_fn();
69 }
70}
71
72fn init_logging_to_console(cfg: &LoggingConfig) {
73 let registry = tracing_subscriber::registry();
75
76 #[cfg(feature = "tokio-console")]
77 let registry = {
78 let console_layer = console_subscriber::spawn().with_filter(EnvFilter::new("tokio=trace,runtime=trace"));
80 registry.with(console_layer)
81 };
82
83 let fmt_layer_base = tracing_subscriber::fmt::layer()
84 .with_line_number(true)
85 .with_file(true)
86 .with_target(false);
87 let fmt_filter = EnvFilter::try_from_default_env()
88 .or_else(|_| EnvFilter::try_new(DEFAULT_LOG_LEVEL_CONSOLE))
89 .unwrap_or_default();
90
91 if cfg.use_json {
92 let filtered_fmt_layer = fmt_layer_base.json().with_filter(fmt_filter);
93 registry.with(filtered_fmt_layer).init();
94 } else {
95 let filtered_fmt_layer = fmt_layer_base.pretty().with_filter(fmt_filter);
96 registry.with(filtered_fmt_layer).init();
97 }
98}
99
100fn init_logging_to_file(path: &Path, use_json: bool) -> Result<(), std::io::Error> {
101 let (path, file_name) = match path.file_name() {
103 Some(name) => (path.to_path_buf(), name),
104 None => (path.join("xet.log"), OsStr::new("xet.log")),
105 };
106
107 let log_directory = match path.parent() {
108 Some(parent) => {
109 std::fs::create_dir_all(parent)?;
110 parent
111 },
112 None => Path::new("."),
113 };
114
115 std::fs::write(&path, [])?;
117
118 let file_appender = rolling::never(log_directory, file_name);
121
122 let (writer, guard) = non_blocking(file_appender);
123
124 static FILE_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
126 let _ = FILE_GUARD.set(guard); let registry = tracing_subscriber::registry();
129
130 #[cfg(feature = "tokio-console")]
131 let registry = {
132 let console_layer = console_subscriber::spawn().with_filter(EnvFilter::new("tokio=trace,runtime=trace"));
134 registry.with(console_layer)
135 };
136
137 let fmt_layer_base = tracing_subscriber::fmt::layer()
139 .with_line_number(true)
140 .with_file(true)
141 .with_target(false)
142 .with_writer(writer);
143 let fmt_filter = EnvFilter::try_from_default_env()
145 .or_else(|_| EnvFilter::try_new(DEFAULT_LOG_LEVEL_FILE))
146 .unwrap_or_default();
147
148 if use_json {
149 registry.with(fmt_layer_base.json().with_filter(fmt_filter)).init();
150 } else {
151 registry.with(fmt_layer_base.pretty().with_filter(fmt_filter)).init();
152 };
153
154 Ok(())
155}
156
157pub fn log_file_in_dir(cfg: &LogDirConfig, dir: impl AsRef<Path>) -> PathBuf {
160 let now_local: DateTime<Local> = Local::now();
161 let now_fixed: DateTime<FixedOffset> = now_local.with_timezone(now_local.offset());
162
163 let ts = now_fixed.format("%Y%m%dT%H%M%S%3f%z"); let pid = std::process::id();
167 let prefix = &cfg.filename_prefix;
168 let filename = format!("{}_{}_{}.log", prefix, ts, pid);
169 dir.as_ref().join(filename)
170}
171
172pub fn parse_log_file_name(path: impl AsRef<Path>) -> Option<(String, DateTime<FixedOffset>, u32)> {
176 let path = path.as_ref();
177 let file_name = path.file_name()?.to_str()?;
178
179 let file_name = file_name.strip_suffix(".log")?;
181
182 let mut parts = file_name.rsplitn(3, '_');
185 let pid_str = parts.next()?;
186 let ts_str = parts.next()?;
187 let prefix = parts.next()?; let pid: u32 = pid_str.parse().ok()?;
190
191 let ts = DateTime::parse_from_str(ts_str, "%Y%m%dT%H%M%S%3f%z").ok()?;
193
194 Some((prefix.to_string(), ts, pid))
195}
196
197struct CandidateLogFile {
199 path: PathBuf,
200 size: u64,
201 age: Duration,
202}
203
204fn pid_protects_log_file(sys: &System, pid: u32, log_timestamp: DateTime<FixedOffset>) -> bool {
212 let Some(proc) = sys.process(Pid::from_u32(pid)) else {
213 return false;
214 };
215 let log_epoch_secs = log_timestamp.timestamp();
216 let proc_start_secs = i64::try_from(proc.start_time()).unwrap_or(i64::MAX);
217 if proc_start_secs > log_epoch_secs {
218 debug!("PID {pid} likely reused: process start {proc_start_secs}s > log timestamp {log_epoch_secs}s");
219 return false;
220 }
221 true
222}
223
224fn run_log_directory_cleanup_background(cfg: LogDirConfig, log_dir: &Path) {
225 let log_dir = log_dir.to_path_buf();
227 let handle = std::thread::spawn(move || {
228 if let Err(e) = run_log_directory_cleanup(cfg, &log_dir) {
229 warn!("Error during log directory cleanup in {:?}: {}", log_dir, e);
230 }
231 });
232
233 if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock() {
235 debug_assert!(handle_opt.is_none(), "Log directory cleanup called multiple times.");
236 *handle_opt = Some(handle);
237 }
238}
239
240fn run_log_directory_cleanup(cfg: LogDirConfig, log_dir: &Path) -> io::Result<()> {
241 info!(
242 "starting log cleanup in {:?} (min_age={:?}, max_retention={:?}, max_size={} bytes)",
243 log_dir,
244 cfg.min_deletion_age,
245 cfg.max_retention_age,
246 ByteSize::new(cfg.size_limit)
247 );
248
249 let sys = System::new_with_specifics(RefreshKind::nothing().with_processes(ProcessRefreshKind::everything()));
253
254 let mut candidates = Vec::<CandidateLogFile>::new();
256 let mut total_bytes: u64 = 0;
257 let mut candidate_deletion_bytes: u64 = 0;
258
259 let now = Utc::now();
260 let mut n_log_files = 0usize;
261
262 for entry in std::fs::read_dir(log_dir)? {
263 let Ok(entry) = entry.warn_error_fn(|| format!("read_dir error while reading {log_dir:?}")) else {
264 continue;
265 };
266
267 let path = entry.path();
268
269 let Ok(ft) = entry.file_type() else { continue };
270 if !ft.is_file() {
271 continue;
272 }
273
274 let Some((prefix, timestamp, pid)) = parse_log_file_name(&path) else {
275 debug!("ignoring unparseable log file {:?}", path);
276 continue;
277 };
278
279 if prefix != cfg.filename_prefix {
280 debug!("ignoring log file {:?} with differing prefix {prefix} (!={})", path, &cfg.filename_prefix);
281 continue;
282 }
283
284 let Ok(meta) = entry
286 .metadata()
287 .info_error_fn(|| format!("Reading metadata failed for {:?}", path))
288 else {
289 continue;
290 };
291
292 let size = meta.len();
293 total_bytes += size;
294 n_log_files += 1;
295
296 let Ok(age) = (now - timestamp.to_utc()).to_std() else {
297 debug!("Skipping deletion for very new log file {path:?}");
298 continue;
299 };
300
301 if age < cfg.min_deletion_age {
303 debug!("Skipping deletion for new log file {path:?}");
304 continue;
305 }
306
307 if pid_protects_log_file(&sys, pid, timestamp) {
309 debug!("Skipping deletion for log file {path:?} with active associated PID.");
310 continue;
311 }
312
313 candidates.push(CandidateLogFile { path, size, age });
315
316 candidate_deletion_bytes += size;
317 }
318
319 info!(
320 "Log Directory Cleanup: found {:?} of logs in {} log files, with {:?} in {} files eligible for deletion.",
321 ByteSize::new(total_bytes),
322 n_log_files,
323 ByteSize::new(candidate_deletion_bytes),
324 candidates.len()
325 );
326
327 let mut deleted_bytes: u64 = 0;
329 candidates.retain(|lf| {
330 if lf.age > cfg.max_retention_age {
331 let path = &lf.path;
332 match std::fs::remove_file(path) {
333 Ok(_) => {
334 deleted_bytes += lf.size;
335 debug!("Log Directory Cleanup: Removed old log file {path:?})");
336 },
337 Err(e) => {
338 if e.kind() == io::ErrorKind::NotFound {
341 deleted_bytes += lf.size;
342 debug!("Log Directory Cleanup: Old log file {path:?} already deleted.");
343 } else {
344 info!("Log Directory Cleanup: Error removing old log file {path:?}, skipping: {e}");
345 }
346 },
347 };
348 false
349 } else {
350 true
351 }
352 });
353
354 let mut n_pruned = 0;
356 if total_bytes - deleted_bytes > cfg.size_limit {
357 candidates.sort_by_key(|lf| std::cmp::Reverse(lf.age));
359 for lf in &candidates {
360 if total_bytes - deleted_bytes <= cfg.size_limit {
361 break;
362 }
363
364 match std::fs::remove_file(&lf.path) {
365 Ok(()) => {
366 deleted_bytes += lf.size;
367 n_pruned += 1;
368 debug!("Log Directory cleanup: Pruned log file {:?}.", lf.path);
369 },
370 Err(e) => {
371 if e.kind() == io::ErrorKind::NotFound {
372 deleted_bytes += lf.size;
373 n_pruned += 1;
374 debug!("Log Directory cleanup: Log file {:?} already deleted, ignoring.", lf.path);
375 } else {
376 info!("Log Directory Cleanup: Error removing size-pruned log file {:?}: {}", lf.path, e);
377 }
378 },
379 }
380 }
381 }
382
383 info!(
384 "Log Directory Cleanup: deleted {:?} in {} files",
385 ByteSize::new(deleted_bytes),
386 candidates.len() - n_pruned
387 );
388 Ok(())
389}
390
391#[cfg(test)]
392mod tests {
393
394 use chrono::{Datelike, Timelike};
395
396 use super::*;
397
398 #[test]
399 fn round_trip_make_and_parse() {
400 let dir = Path::new("/tmp");
401 let cfg = LogDirConfig::default();
402 let path = log_file_in_dir(&cfg, dir);
403 let (base, ts, pid) = parse_log_file_name(&path).expect("parse");
404 assert_eq!(base, cfg.filename_prefix);
405 assert!(pid > 0);
406
407 let fname = path.file_name().unwrap().to_str().unwrap();
409 let ts_part = fname
410 .strip_prefix(&format!("{}_", base))
411 .unwrap()
412 .strip_suffix(&format!("_{}.log", pid))
413 .unwrap();
414 assert_eq!(ts_part, ts.format("%Y%m%dT%H%M%S%3f%z").to_string());
415 }
416
417 #[test]
418 fn parse_known_file() {
419 let fname = "app_base_20250915T083210123-0700_12345.log";
420 let (base, ts, pid) = parse_log_file_name(fname).expect("parse");
421 assert_eq!(base, "app_base");
422 assert_eq!(pid, 12345);
423 assert_eq!(ts.format("%Y%m%dT%H%M%S%3f%z").to_string(), "20250915T083210123-0700");
424 assert_eq!(ts.year(), 2025);
425 assert_eq!(ts.month(), 9);
426 assert_eq!(ts.day(), 15);
427 assert_eq!(ts.hour(), 8);
428 assert_eq!(ts.minute(), 32);
429 assert_eq!(ts.second(), 10);
430 assert_eq!(ts.timestamp_subsec_millis(), 123);
431 assert_eq!(ts.offset().local_minus_utc(), -7 * 3600);
432 }
433
434 #[test]
435 fn allows_underscores_in_base() {
436 let fname = "my_cool_app_20240102T030405006+0530_999.log";
437 let (base, ts, pid) = parse_log_file_name(fname).expect("parse");
438 assert_eq!(base, "my_cool_app");
439 assert_eq!(pid, 999);
440 assert_eq!(ts.format("%Y%m%dT%H%M%S%3f%z").to_string(), "20240102T030405006+0530");
441 }
442
443 #[test]
444 fn parse_with_directory_path() {
445 let path = Path::new("/var/log/myprog/app_20250915T083210123-0700_12345.log");
446 let (base, _, pid) = parse_log_file_name(path).expect("parse");
447 assert_eq!(base, "app");
448 assert_eq!(pid, 12345);
449 }
450}