use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::OnceLock;
use parking_lot::Mutex;
use tracing::Level;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::util::SubscriberInitExt as _;
use tracing_subscriber::{EnvFilter, Layer, Registry};
use crate::core::types::{DynError, Status};
mod format;
mod host;
mod syslog;
pub use format::{LogFormat, LogFormatParseError};
pub use host::local_hostname;
pub const LOG_EMERG: u8 = 0;
pub const LOG_ALERT: u8 = 1;
pub const LOG_CRIT: u8 = 2;
pub const LOG_ERR: u8 = 3;
pub const LOG_WARN: u8 = 4;
pub const LOG_NOTICE: u8 = 5;
pub const LOG_INFO: u8 = 6;
pub const LOG_DEBUG: u8 = 7;
pub const LOG_VERB: u8 = 8;
pub const LOG_VVERB: u8 = 9;
pub const LOG_VVVERB: u8 = 10;
pub const LOG_PVERB: u8 = 11;
pub const LOG_LEVEL_MAX: u8 = LOG_PVERB;
pub fn tracing_level_for(level: u8) -> Level {
match level {
0..=4 => Level::ERROR,
5..=6 => Level::INFO,
7 => Level::DEBUG,
_ => Level::TRACE,
}
}
pub fn clamp_level(level: u8) -> u8 {
level.min(LOG_LEVEL_MAX)
}
struct State {
path: Mutex<Option<PathBuf>>,
sink: Mutex<Box<dyn Write + Send>>,
nerror: Mutex<u64>,
}
static STATE: OnceLock<State> = OnceLock::new();
static CURRENT_LEVEL: AtomicU8 = AtomicU8::new(LOG_NOTICE);
#[derive(Clone)]
struct LoggerWriter;
impl Write for LoggerWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let Some(state) = STATE.get() else {
return io::stderr().write(buf);
};
let mut sink = state.sink.lock();
match sink.write_all(buf) {
Ok(()) => Ok(buf.len()),
Err(err) => {
*state.nerror.lock() += 1;
Err(err)
}
}
}
fn flush(&mut self) -> io::Result<()> {
let Some(state) = STATE.get() else {
return io::stderr().flush();
};
let mut sink = state.sink.lock();
sink.flush()
}
}
impl<'a> MakeWriter<'a> for LoggerWriter {
type Writer = LoggerWriter;
fn make_writer(&'a self) -> Self::Writer {
LoggerWriter
}
}
fn open_log_file(path: &Path) -> io::Result<File> {
OpenOptions::new()
.append(true)
.create(true)
.mode_for_append()
.open(path)
}
trait OpenOptionsExt {
fn mode_for_append(&mut self) -> &mut Self;
}
impl OpenOptionsExt for OpenOptions {
#[cfg(unix)]
fn mode_for_append(&mut self) -> &mut Self {
use std::os::unix::fs::OpenOptionsExt as _;
self.mode(0o644)
}
#[cfg(not(unix))]
fn mode_for_append(&mut self) -> &mut Self {
self
}
}
#[derive(Debug, Clone)]
pub struct LogConfig {
pub verbosity: u8,
pub output: Option<PathBuf>,
pub format: LogFormat,
}
impl LogConfig {
pub fn new(verbosity: u8, output: Option<PathBuf>, format: LogFormat) -> Self {
Self {
verbosity,
output,
format,
}
}
}
pub type LogsLayer = Box<dyn Layer<Registry> + Send + Sync + 'static>;
#[derive(Debug)]
#[must_use = "the reopen handle must be threaded into install_global so SIGHUP-reopen is wired"]
pub struct ReopenHandle {
_private: (),
}
pub fn build_env_filter(verbosity: u8) -> EnvFilter {
let level_filter = LevelFilter::from_level(tracing_level_for(clamp_level(verbosity)));
EnvFilter::builder()
.with_default_directive(level_filter.into())
.from_env_lossy()
}
fn init_reopen_state(verbosity: u8, path: Option<&Path>) -> Result<ReopenHandle, DynError> {
let sink: Box<dyn Write + Send> = match path {
Some(p) => Box::new(open_log_file(p).map_err(DynError::Io)?),
None => Box::new(io::stderr()),
};
let stored_path = path.map(PathBuf::from);
let state = State {
path: Mutex::new(stored_path),
sink: Mutex::new(sink),
nerror: Mutex::new(0),
};
STATE
.set(state)
.map_err(|_| DynError::generic("log: writer state already installed"))?;
CURRENT_LEVEL.store(clamp_level(verbosity), Ordering::Relaxed);
Ok(ReopenHandle { _private: () })
}
pub fn build_logs_layer(cfg: &LogConfig) -> Result<(LogsLayer, ReopenHandle), DynError> {
let reopen = init_reopen_state(cfg.verbosity, cfg.output.as_deref())?;
let layer = fmt_layer_for_format::<Registry>(cfg.format);
Ok((layer, reopen))
}
fn fmt_layer_for_format<S>(format: LogFormat) -> Box<dyn Layer<S> + Send + Sync + 'static>
where
S: tracing::Subscriber + for<'lookup> LookupSpan<'lookup>,
{
match format {
LogFormat::Default => Box::new(
tracing_subscriber::fmt::Layer::default()
.with_writer(LoggerWriter)
.with_target(true),
),
LogFormat::Json => Box::new(
tracing_subscriber::fmt::Layer::default()
.with_writer(LoggerWriter)
.with_target(true)
.json()
.flatten_event(false)
.with_current_span(true)
.with_span_list(false),
),
LogFormat::Rfc5424 => Box::new(
tracing_subscriber::fmt::Layer::default()
.with_writer(LoggerWriter)
.event_format(syslog::Rfc5424Formatter::new())
.with_ansi(false),
),
LogFormat::Rfc3164 => Box::new(
tracing_subscriber::fmt::Layer::default()
.with_writer(LoggerWriter)
.event_format(syslog::Rfc3164Formatter::new())
.with_ansi(false),
),
}
}
pub fn install_logs_only(cfg: &LogConfig) -> Status {
let env = build_env_filter(cfg.verbosity);
let (fmt_layer, _reopen) = build_logs_layer(cfg)?;
tracing_subscriber::registry()
.with(fmt_layer)
.with(env)
.try_init()
.map_err(|e| DynError::generic(format!("install_logs_only: {e}")))?;
Ok(())
}
pub fn log_init(level: u8, path: Option<&Path>) -> Status {
log_init_with_format(level, path, LogFormat::Default)
}
pub fn log_init_with_format(level: u8, path: Option<&Path>, format: LogFormat) -> Status {
install_logs_only(&LogConfig {
verbosity: level,
output: path.map(PathBuf::from),
format,
})
}
pub fn reopen_on_sighup() -> Status {
let state = STATE
.get()
.ok_or_else(|| DynError::generic("reopen_on_sighup: log not initialised"))?;
let path_guard = state.path.lock();
let Some(path) = path_guard.as_ref() else {
return Ok(());
};
let new_file = open_log_file(path).map_err(DynError::Io)?;
*state.sink.lock() = Box::new(new_file);
Ok(())
}
pub fn write_error_count() -> u64 {
STATE.get().map_or(0, |s| *s.nerror.lock())
}
pub fn current_level() -> u8 {
CURRENT_LEVEL.load(Ordering::Relaxed)
}
pub fn log_level_increment() -> u8 {
let prev = CURRENT_LEVEL.load(Ordering::Relaxed);
let next = clamp_level(prev.saturating_add(1));
CURRENT_LEVEL.store(next, Ordering::Relaxed);
next
}
pub fn log_level_decrement() -> u8 {
let prev = CURRENT_LEVEL.load(Ordering::Relaxed);
let next = prev.saturating_sub(1);
CURRENT_LEVEL.store(next, Ordering::Relaxed);
next
}
pub fn log_level_set(level: u8) {
CURRENT_LEVEL.store(clamp_level(level), Ordering::Relaxed);
}
pub fn log_loggable(level: u8) -> bool {
level <= current_level()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn level_mapping_is_monotone_in_verbosity() {
let severity = |l: Level| -> u8 {
match l {
Level::ERROR => 0,
Level::WARN => 1,
Level::INFO => 2,
Level::DEBUG => 3,
Level::TRACE => 4,
}
};
let mut prev = severity(tracing_level_for(0));
for lvl in 1..=LOG_LEVEL_MAX {
let cur = severity(tracing_level_for(lvl));
assert!(cur >= prev, "level {lvl}: severity {cur} not >= {prev}");
prev = cur;
}
}
#[test]
fn clamp_saturates() {
assert_eq!(clamp_level(0), 0);
assert_eq!(clamp_level(LOG_LEVEL_MAX), LOG_LEVEL_MAX);
assert_eq!(clamp_level(LOG_LEVEL_MAX + 5), LOG_LEVEL_MAX);
assert_eq!(clamp_level(255), LOG_LEVEL_MAX);
}
#[test]
fn level_constants_match_c() {
assert_eq!(LOG_EMERG, 0);
assert_eq!(LOG_ALERT, 1);
assert_eq!(LOG_CRIT, 2);
assert_eq!(LOG_ERR, 3);
assert_eq!(LOG_WARN, 4);
assert_eq!(LOG_NOTICE, 5);
assert_eq!(LOG_INFO, 6);
assert_eq!(LOG_DEBUG, 7);
assert_eq!(LOG_VERB, 8);
assert_eq!(LOG_VVERB, 9);
assert_eq!(LOG_VVVERB, 10);
assert_eq!(LOG_PVERB, 11);
}
#[test]
fn level_increment_and_decrement_saturate() {
log_level_set(0);
assert_eq!(log_level_decrement(), 0);
for _ in 0..(u32::from(LOG_LEVEL_MAX) + 5) {
log_level_increment();
}
assert_eq!(current_level(), LOG_LEVEL_MAX);
log_level_set(5);
assert!(log_loggable(0));
assert!(log_loggable(5));
assert!(!log_loggable(6));
}
#[test]
fn build_logs_layer_writer_state_swaps_on_reopen() {
use std::fs;
let dir = tempfile::tempdir().expect("tempdir");
let log_path = dir.path().join("dyn.log");
let cfg = LogConfig::new(LOG_NOTICE, Some(log_path.clone()), LogFormat::Default);
let (fmt_layer, _reopen) = build_logs_layer(&cfg).expect("build layer");
let env = build_env_filter(LOG_NOTICE);
let sub = tracing_subscriber::registry().with(fmt_layer).with(env);
tracing::subscriber::with_default(sub, || {
tracing::info!(target: "dynomite::test", "first-line-marker");
let rotated = dir.path().join("dyn.log.1");
fs::rename(&log_path, &rotated).expect("rotate file");
reopen_on_sighup().expect("reopen");
tracing::info!(target: "dynomite::test", "second-line-marker");
});
let rotated_contents =
fs::read_to_string(dir.path().join("dyn.log.1")).expect("read rotated");
let new_contents = fs::read_to_string(&log_path).expect("read new");
assert!(
rotated_contents.contains("first-line-marker"),
"rotated file missing first marker: {rotated_contents:?}",
);
assert!(
!rotated_contents.contains("second-line-marker"),
"rotated file unexpectedly contained second marker: {rotated_contents:?}",
);
assert!(
new_contents.contains("second-line-marker"),
"new file missing second marker: {new_contents:?}",
);
assert!(
!new_contents.contains("first-line-marker"),
"new file unexpectedly contained first marker: {new_contents:?}",
);
}
}
#[cfg(test)]
mod format_tests {
use std::io::{self, Write};
use std::sync::{Arc, Mutex};
use regex::Regex;
use tracing_subscriber::fmt::MakeWriter;
use super::syslog::{Rfc3164Formatter, Rfc5424Formatter};
#[derive(Clone, Default)]
struct CaptureBuffer(Arc<Mutex<Vec<u8>>>);
impl CaptureBuffer {
fn snapshot(&self) -> Vec<u8> {
self.0.lock().expect("lock CaptureBuffer").clone()
}
fn snapshot_string(&self) -> String {
String::from_utf8(self.snapshot()).expect("captured bytes are utf-8")
}
}
impl Write for CaptureBuffer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut guard = self.0.lock().expect("lock CaptureBuffer");
guard.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<'a> MakeWriter<'a> for CaptureBuffer {
type Writer = CaptureBuffer;
fn make_writer(&'a self) -> Self::Writer {
self.clone()
}
}
fn run_default(buf: &CaptureBuffer) {
let sub = tracing_subscriber::fmt()
.with_writer(buf.clone())
.with_target(true)
.with_ansi(false)
.finish();
tracing::subscriber::with_default(sub, || {
tracing::info!(answer = 42, name = "ada", "hello");
});
}
fn run_rfc5424(buf: &CaptureBuffer) {
use tracing_subscriber::layer::SubscriberExt as _;
let layer = tracing_subscriber::fmt::Layer::new()
.with_writer(buf.clone())
.event_format(Rfc5424Formatter::new())
.with_ansi(false);
let sub = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(sub, || {
tracing::info!(answer = 42, "hello");
});
}
fn run_rfc3164(buf: &CaptureBuffer) {
use tracing_subscriber::layer::SubscriberExt as _;
let layer = tracing_subscriber::fmt::Layer::new()
.with_writer(buf.clone())
.event_format(Rfc3164Formatter::new())
.with_ansi(false);
let sub = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(sub, || {
tracing::info!(answer = 42, "hello");
});
}
fn run_json(buf: &CaptureBuffer) {
let sub = tracing_subscriber::fmt()
.with_writer(buf.clone())
.json()
.with_target(true)
.flatten_event(false)
.with_current_span(true)
.with_span_list(false)
.finish();
tracing::subscriber::with_default(sub, || {
tracing::info!(answer = 42, name = "ada", "first");
tracing::warn!(retry = true, "second");
});
}
#[test]
fn default_format_unchanged_from_baseline() {
let buf = CaptureBuffer::default();
run_default(&buf);
let text = buf.snapshot_string();
assert!(text.contains(" INFO "), "missing INFO level: {text:?}");
assert!(text.contains("hello"), "missing message text: {text:?}");
assert!(
text.contains("answer=42"),
"missing kv 'answer=42': {text:?}"
);
assert!(text.contains("name=\"ada\""), "missing kv 'name': {text:?}");
assert!(text.ends_with('\n'), "missing trailing newline");
}
#[test]
fn rfc5424_format_starts_with_pri_version() {
let buf = CaptureBuffer::default();
run_rfc5424(&buf);
let text = buf.snapshot_string();
let re =
Regex::new(r"^<\d+>1 [\d-]+T[\d:.+\-]+ \S+ dynomited \d+ - ").expect("compile regex");
let first_line = text.lines().next().expect("at least one line");
assert!(
re.is_match(first_line),
"RFC 5424 line did not match regex: {first_line:?}"
);
assert!(
first_line.contains("origin@32473"),
"missing structured-data ID: {first_line:?}"
);
assert!(
first_line.contains("hello"),
"missing message: {first_line:?}"
);
}
#[test]
fn rfc3164_format_starts_with_pri_then_timestamp() {
let buf = CaptureBuffer::default();
run_rfc3164(&buf);
let text = buf.snapshot_string();
let re = Regex::new(r"^<\d+>[A-Z][a-z]{2} [\d ]\d \d{2}:\d{2}:\d{2} \S+ \S+: ")
.expect("compile regex");
let first_line = text.lines().next().expect("at least one line");
assert!(
re.is_match(first_line),
"RFC 3164 line did not match regex: {first_line:?}"
);
assert!(
first_line.contains("hello"),
"missing message: {first_line:?}"
);
}
#[test]
fn ndjson_format_is_one_json_per_line() {
let buf = CaptureBuffer::default();
run_json(&buf);
let text = buf.snapshot_string();
let lines: Vec<_> = text.lines().filter(|l| !l.is_empty()).collect();
assert!(
lines.len() >= 2,
"expected at least two JSON lines: {text:?}"
);
for line in &lines {
let v: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("line is not valid JSON ({e}): {line:?}"));
for key in ["timestamp", "level", "target", "fields"] {
assert!(
v.get(key).is_some(),
"JSON line missing key {key:?}: {line}"
);
}
assert!(!line.contains('\n'));
}
}
}