Skip to main content

xet_runtime/logging/
init.rs

1use std::ffi::OsStr;
2use std::io;
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5use std::thread::JoinHandle;
6use std::time::Duration;
7
8use chrono::{DateTime, FixedOffset, Local, Utc};
9use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System};
10use tracing::{debug, error, info, warn};
11use tracing_appender::{non_blocking, rolling};
12use tracing_subscriber::layer::SubscriberExt;
13use tracing_subscriber::util::SubscriberInitExt;
14use tracing_subscriber::{EnvFilter, Layer};
15
16use super::config::*;
17use super::constants::{DEFAULT_LOG_LEVEL_CONSOLE, DEFAULT_LOG_LEVEL_FILE};
18use crate::error_printer::ErrorPrinter;
19use crate::utils::ByteSize;
20
21/// Global variable to hold the JoinHandle for the log cleanup thread
22static LOG_CLEANUP_HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
23
24/// Wait for the log directory cleanup to complete.
25/// This function blocks until the background cleanup thread finishes.
26pub fn wait_for_log_directory_cleanup() {
27    if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock()
28        && let Some(handle) = handle_opt.take()
29    {
30        let _ = handle.join();
31    }
32}
33
34/// The main entry point to set up logging.  Should only be called once.
35pub fn init(cfg: LoggingConfig) {
36    let mut dir_cleanup_task = None;
37
38    let maybe_log_file: Option<PathBuf> = {
39        match &cfg.logging_mode {
40            LoggingMode::Directory(log_dir) => {
41                if cfg.enable_log_dir_cleanup && log_dir.exists() && log_dir.is_dir() {
42                    dir_cleanup_task =
43                        Some(|| run_log_directory_cleanup_background(cfg.log_dir_config.clone(), log_dir));
44                }
45
46                Some(log_file_in_dir(&cfg.log_dir_config, log_dir))
47            },
48            LoggingMode::File(path_buf) => Some(path_buf.clone()),
49            LoggingMode::Console => None,
50        }
51    };
52
53    // Set up either logging to console or to a log file.
54    if let Some(log_file) = maybe_log_file {
55        // Attempt logging to a file, but fallback to console logging on error.
56        if let Err(e) = init_logging_to_file(&log_file, cfg.use_json) {
57            init_logging_to_console(&cfg);
58            error!("Error logging to file {log_file:?} ({e}); falling back to console logging.");
59        }
60    } else {
61        init_logging_to_console(&cfg);
62    }
63
64    // Log the version information.
65    info!("{}, xet-core revision {}", &cfg.version, git_version::git_version!(fallback = "unknown"));
66
67    if let Some(dir_cleanup_task_fn) = dir_cleanup_task {
68        dir_cleanup_task_fn();
69    }
70}
71
72fn init_logging_to_console(cfg: &LoggingConfig) {
73    // Now, just use basic console logging.
74    let registry = tracing_subscriber::registry();
75
76    #[cfg(feature = "tokio-console")]
77    let registry = {
78        // Console subscriber layer for tokio-console, custom filter for tokio trace level events
79        let console_layer = console_subscriber::spawn().with_filter(EnvFilter::new("tokio=trace,runtime=trace"));
80        registry.with(console_layer)
81    };
82
83    let fmt_layer_base = tracing_subscriber::fmt::layer()
84        .with_line_number(true)
85        .with_file(true)
86        .with_target(false);
87    let fmt_filter = EnvFilter::try_from_default_env()
88        .or_else(|_| EnvFilter::try_new(DEFAULT_LOG_LEVEL_CONSOLE))
89        .unwrap_or_default();
90
91    if cfg.use_json {
92        let filtered_fmt_layer = fmt_layer_base.json().with_filter(fmt_filter);
93        registry.with(filtered_fmt_layer).init();
94    } else {
95        let filtered_fmt_layer = fmt_layer_base.pretty().with_filter(fmt_filter);
96        registry.with(filtered_fmt_layer).init();
97    }
98}
99
100fn init_logging_to_file(path: &Path, use_json: bool) -> Result<(), std::io::Error> {
101    // Set up logging to a file.
102    let (path, file_name) = match path.file_name() {
103        Some(name) => (path.to_path_buf(), name),
104        None => (path.join("xet.log"), OsStr::new("xet.log")),
105    };
106
107    let log_directory = match path.parent() {
108        Some(parent) => {
109            std::fs::create_dir_all(parent)?;
110            parent
111        },
112        None => Path::new("."),
113    };
114
115    // Make sure the log location is writeable so we error early here and dump to stderr on failure.
116    std::fs::write(&path, [])?;
117
118    // Build a non‑blocking file appender. • `rolling::never` = one static file, no rotation. • Keep the
119    // `WorkerGuard` alive so the background thread doesn’t shut down and drop messages.
120    let file_appender = rolling::never(log_directory, file_name);
121
122    let (writer, guard) = non_blocking(file_appender);
123
124    // Store the guard globally so it isn’t dropped.
125    static FILE_GUARD: OnceLock<tracing_appender::non_blocking::WorkerGuard> = OnceLock::new();
126    let _ = FILE_GUARD.set(guard); // ignore error if already initialised
127
128    let registry = tracing_subscriber::registry();
129
130    #[cfg(feature = "tokio-console")]
131    let registry = {
132        // Console subscriber layer for tokio-console, custom filter for tokio trace level events
133        let console_layer = console_subscriber::spawn().with_filter(EnvFilter::new("tokio=trace,runtime=trace"));
134        registry.with(console_layer)
135    };
136
137    // Build the fmt layer.
138    let fmt_layer_base = tracing_subscriber::fmt::layer()
139        .with_line_number(true)
140        .with_file(true)
141        .with_target(false)
142        .with_writer(writer);
143    // Standard filter layer: RUST_LOG env var or DEFAULT_LOG_LEVEL fallback.
144    let fmt_filter = EnvFilter::try_from_default_env()
145        .or_else(|_| EnvFilter::try_new(DEFAULT_LOG_LEVEL_FILE))
146        .unwrap_or_default();
147
148    if use_json {
149        registry.with(fmt_layer_base.json().with_filter(fmt_filter)).init();
150    } else {
151        registry.with(fmt_layer_base.pretty().with_filter(fmt_filter)).init();
152    };
153
154    Ok(())
155}
156
157/// Build `<prefix>_<YYYYMMDD>T<HHMMSS><mmm><+/-HHMM>_<pid>.log` in `dir`.
158/// Timestamp is in *local time with numeric offset* (e.g., -0700), filename-safe.
159pub fn log_file_in_dir(cfg: &LogDirConfig, dir: impl AsRef<Path>) -> PathBuf {
160    let now_local: DateTime<Local> = Local::now();
161    let now_fixed: DateTime<FixedOffset> = now_local.with_timezone(now_local.offset());
162
163    // ISO 8601 basic, filename-safe (no colons): 20250915T083210123-0700
164    let ts = now_fixed.format("%Y%m%dT%H%M%S%3f%z"); // %z => ±HHMM
165
166    let pid = std::process::id();
167    let prefix = &cfg.filename_prefix;
168    let filename = format!("{}_{}_{}.log", prefix, ts, pid);
169    dir.as_ref().join(filename)
170}
171
172/// Parse `<prefix>_<YYYYMMDD>T<HHMMSS><mmm><+/-HHMM>_<pid>.log`
173/// Works with full paths or bare filenames.
174/// Returns (prefix, timestamp with fixed offset, pid).
175pub fn parse_log_file_name(path: impl AsRef<Path>) -> Option<(String, DateTime<FixedOffset>, u32)> {
176    let path = path.as_ref();
177    let file_name = path.file_name()?.to_str()?;
178
179    // Returns None if it doesn't end with .log.
180    let file_name = file_name.strip_suffix(".log")?;
181
182    // Split from the RIGHT so base may contain underscores.
183    // Expect exactly: <base>_<timestamp>_<pid>
184    let mut parts = file_name.rsplitn(3, '_');
185    let pid_str = parts.next()?;
186    let ts_str = parts.next()?;
187    let prefix = parts.next()?; // remainder is the full base (may include underscores)
188
189    let pid: u32 = pid_str.parse().ok()?;
190
191    // Parse ISO 8601-basic with offset, no colons
192    let ts = DateTime::parse_from_str(ts_str, "%Y%m%dT%H%M%S%3f%z").ok()?;
193
194    Some((prefix.to_string(), ts, pid))
195}
196
197// A utility struct to help with the directory cleanup.
198struct CandidateLogFile {
199    path: PathBuf,
200    size: u64,
201    age: Duration,
202}
203
204/// Returns true if a running process with `pid` plausibly owns the log file named with
205/// `log_timestamp` (embedded in the filename at creation time).
206///
207/// PIDs are recycled on Windows and other OSes; [`System::process`] alone would treat a
208/// stale log file as protected whenever an unrelated process reuses the same PID. If the
209/// process start time is clearly after the log's embedded timestamp, the PID no longer
210/// refers to the process that created this file.
211fn pid_protects_log_file(sys: &System, pid: u32, log_timestamp: DateTime<FixedOffset>) -> bool {
212    let Some(proc) = sys.process(Pid::from_u32(pid)) else {
213        return false;
214    };
215    let log_epoch_secs = log_timestamp.timestamp();
216    let proc_start_secs = i64::try_from(proc.start_time()).unwrap_or(i64::MAX);
217    if proc_start_secs > log_epoch_secs {
218        debug!("PID {pid} likely reused: process start {proc_start_secs}s > log timestamp {log_epoch_secs}s");
219        return false;
220    }
221    true
222}
223
224fn run_log_directory_cleanup_background(cfg: LogDirConfig, log_dir: &Path) {
225    // Spawn run_log_directory_cleanup as background thread, logging any errors as a warn!
226    let log_dir = log_dir.to_path_buf();
227    let handle = std::thread::spawn(move || {
228        if let Err(e) = run_log_directory_cleanup(cfg, &log_dir) {
229            warn!("Error during log directory cleanup in {:?}: {}", log_dir, e);
230        }
231    });
232
233    // Store the JoinHandle in the global variable
234    if let Ok(mut handle_opt) = LOG_CLEANUP_HANDLE.lock() {
235        debug_assert!(handle_opt.is_none(), "Log directory cleanup called multiple times.");
236        *handle_opt = Some(handle);
237    }
238}
239
240fn run_log_directory_cleanup(cfg: LogDirConfig, log_dir: &Path) -> io::Result<()> {
241    info!(
242        "starting log cleanup in {:?} (min_age={:?}, max_retention={:?}, max_size={} bytes)",
243        log_dir,
244        cfg.min_deletion_age,
245        cfg.max_retention_age,
246        ByteSize::new(cfg.size_limit)
247    );
248
249    // Initialize sysinfo once to get a list of the active process ids.  To ensure we never delete
250    // a log file associated with an active process, we preserve any log file associated with a currently
251    // active PID.
252    let sys = System::new_with_specifics(RefreshKind::nothing().with_processes(ProcessRefreshKind::everything()));
253
254    // Collect candidate files.
255    let mut candidates = Vec::<CandidateLogFile>::new();
256    let mut total_bytes: u64 = 0;
257    let mut candidate_deletion_bytes: u64 = 0;
258
259    let now = Utc::now();
260    let mut n_log_files = 0usize;
261
262    for entry in std::fs::read_dir(log_dir)? {
263        let Ok(entry) = entry.warn_error_fn(|| format!("read_dir error while reading {log_dir:?}")) else {
264            continue;
265        };
266
267        let path = entry.path();
268
269        let Ok(ft) = entry.file_type() else { continue };
270        if !ft.is_file() {
271            continue;
272        }
273
274        let Some((prefix, timestamp, pid)) = parse_log_file_name(&path) else {
275            debug!("ignoring unparseable log file {:?}", path);
276            continue;
277        };
278
279        if prefix != cfg.filename_prefix {
280            debug!("ignoring log file {:?} with differing prefix {prefix} (!={})", path, &cfg.filename_prefix);
281            continue;
282        }
283
284        // Only use info here as it could be another process deleted it.
285        let Ok(meta) = entry
286            .metadata()
287            .info_error_fn(|| format!("Reading metadata failed for {:?}", path))
288        else {
289            continue;
290        };
291
292        let size = meta.len();
293        total_bytes += size;
294        n_log_files += 1;
295
296        let Ok(age) = (now - timestamp.to_utc()).to_std() else {
297            debug!("Skipping deletion for very new log file {path:?}");
298            continue;
299        };
300
301        // Skip if it's too new.
302        if age < cfg.min_deletion_age {
303            debug!("Skipping deletion for new log file {path:?}");
304            continue;
305        }
306
307        // Skip if there is an active PID associated with the file (and not PID reuse).
308        if pid_protects_log_file(&sys, pid, timestamp) {
309            debug!("Skipping deletion for log file {path:?} with active associated PID.");
310            continue;
311        }
312
313        // These files are available for deletion.
314        candidates.push(CandidateLogFile { path, size, age });
315
316        candidate_deletion_bytes += size;
317    }
318
319    info!(
320        "Log Directory Cleanup: found {:?} of logs in {} log files, with {:?} in {} files eligible for deletion.",
321        ByteSize::new(total_bytes),
322        n_log_files,
323        ByteSize::new(candidate_deletion_bytes),
324        candidates.len()
325    );
326
327    // 1) Hard expiration pass: delete anything older than max_retention, unless protected.
328    let mut deleted_bytes: u64 = 0;
329    candidates.retain(|lf| {
330        if lf.age > cfg.max_retention_age {
331            let path = &lf.path;
332            match std::fs::remove_file(path) {
333                Ok(_) => {
334                    deleted_bytes += lf.size;
335                    debug!("Log Directory Cleanup: Removed old log file {path:?})");
336                },
337                Err(e) => {
338                    // If the error is because the file no longer exists, then count it towards the deleted bytes;
339                    // otherwise log and skip.
340                    if e.kind() == io::ErrorKind::NotFound {
341                        deleted_bytes += lf.size;
342                        debug!("Log Directory Cleanup: Old log file {path:?} already deleted.");
343                    } else {
344                        info!("Log Directory Cleanup: Error removing old log file {path:?}, skipping: {e}");
345                    }
346                },
347            };
348            false
349        } else {
350            true
351        }
352    });
353
354    // 2) Size trimming: if above the limit, delete oldest eligible (unprotected) first.
355    let mut n_pruned = 0;
356    if total_bytes - deleted_bytes > cfg.size_limit {
357        // Sort by oldest first.
358        candidates.sort_by_key(|lf| std::cmp::Reverse(lf.age));
359        for lf in &candidates {
360            if total_bytes - deleted_bytes <= cfg.size_limit {
361                break;
362            }
363
364            match std::fs::remove_file(&lf.path) {
365                Ok(()) => {
366                    deleted_bytes += lf.size;
367                    n_pruned += 1;
368                    debug!("Log Directory cleanup: Pruned log file {:?}.", lf.path);
369                },
370                Err(e) => {
371                    if e.kind() == io::ErrorKind::NotFound {
372                        deleted_bytes += lf.size;
373                        n_pruned += 1;
374                        debug!("Log Directory cleanup: Log file {:?} already deleted, ignoring.", lf.path);
375                    } else {
376                        info!("Log Directory Cleanup: Error removing size-pruned log file {:?}: {}", lf.path, e);
377                    }
378                },
379            }
380        }
381    }
382
383    info!(
384        "Log Directory Cleanup: deleted {:?} in {} files",
385        ByteSize::new(deleted_bytes),
386        candidates.len() - n_pruned
387    );
388    Ok(())
389}
390
391#[cfg(test)]
392mod tests {
393
394    use chrono::{Datelike, Timelike};
395
396    use super::*;
397
398    #[test]
399    fn round_trip_make_and_parse() {
400        let dir = Path::new("/tmp");
401        let cfg = LogDirConfig::default();
402        let path = log_file_in_dir(&cfg, dir);
403        let (base, ts, pid) = parse_log_file_name(&path).expect("parse");
404        assert_eq!(base, cfg.filename_prefix);
405        assert!(pid > 0);
406
407        // Verify that the timestamp string matches what's embedded in the filename
408        let fname = path.file_name().unwrap().to_str().unwrap();
409        let ts_part = fname
410            .strip_prefix(&format!("{}_", base))
411            .unwrap()
412            .strip_suffix(&format!("_{}.log", pid))
413            .unwrap();
414        assert_eq!(ts_part, ts.format("%Y%m%dT%H%M%S%3f%z").to_string());
415    }
416
417    #[test]
418    fn parse_known_file() {
419        let fname = "app_base_20250915T083210123-0700_12345.log";
420        let (base, ts, pid) = parse_log_file_name(fname).expect("parse");
421        assert_eq!(base, "app_base");
422        assert_eq!(pid, 12345);
423        assert_eq!(ts.format("%Y%m%dT%H%M%S%3f%z").to_string(), "20250915T083210123-0700");
424        assert_eq!(ts.year(), 2025);
425        assert_eq!(ts.month(), 9);
426        assert_eq!(ts.day(), 15);
427        assert_eq!(ts.hour(), 8);
428        assert_eq!(ts.minute(), 32);
429        assert_eq!(ts.second(), 10);
430        assert_eq!(ts.timestamp_subsec_millis(), 123);
431        assert_eq!(ts.offset().local_minus_utc(), -7 * 3600);
432    }
433
434    #[test]
435    fn allows_underscores_in_base() {
436        let fname = "my_cool_app_20240102T030405006+0530_999.log";
437        let (base, ts, pid) = parse_log_file_name(fname).expect("parse");
438        assert_eq!(base, "my_cool_app");
439        assert_eq!(pid, 999);
440        assert_eq!(ts.format("%Y%m%dT%H%M%S%3f%z").to_string(), "20240102T030405006+0530");
441    }
442
443    #[test]
444    fn parse_with_directory_path() {
445        let path = Path::new("/var/log/myprog/app_20250915T083210123-0700_12345.log");
446        let (base, _, pid) = parse_log_file_name(path).expect("parse");
447        assert_eq!(base, "app");
448        assert_eq!(pid, 12345);
449    }
450}