use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum LogDriver {
#[default]
JsonFile,
Syslog,
None,
}
impl std::fmt::Display for LogDriver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::JsonFile => write!(f, "json-file"),
Self::Syslog => write!(f, "syslog"),
Self::None => write!(f, "none"),
}
}
}
impl std::str::FromStr for LogDriver {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"json-file" => Ok(Self::JsonFile),
"syslog" => Ok(Self::Syslog),
"none" => Ok(Self::None),
_ => Err(format!(
"unknown log driver: '{}' (supported: json-file, syslog, none)",
s
)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogConfig {
pub driver: LogDriver,
#[serde(default)]
pub options: HashMap<String, String>,
}
impl Default for LogConfig {
fn default() -> Self {
Self {
driver: LogDriver::JsonFile,
options: HashMap::new(),
}
}
}
impl LogConfig {
pub fn max_size(&self) -> u64 {
self.options
.get("max-size")
.and_then(|s| parse_size(s).ok())
.unwrap_or(10 * 1024 * 1024)
}
pub fn max_file(&self) -> u32 {
self.options
.get("max-file")
.and_then(|s| s.parse().ok())
.unwrap_or(3)
}
pub fn syslog_address(&self) -> &str {
self.options
.get("syslog-address")
.map(|s| s.as_str())
.unwrap_or("udp://localhost:514")
}
pub fn syslog_facility(&self) -> &str {
self.options
.get("syslog-facility")
.map(|s| s.as_str())
.unwrap_or("daemon")
}
pub fn tag(&self) -> Option<&str> {
self.options.get("tag").map(|s| s.as_str())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LogEntry {
pub log: String,
pub stream: String,
pub time: String,
}
fn parse_size(s: &str) -> std::result::Result<u64, String> {
let s = s.trim().to_lowercase();
if let Ok(n) = s.parse::<u64>() {
return Ok(n);
}
let (num, mult) = if s.ends_with("gb") || s.ends_with('g') {
(
s.trim_end_matches("gb").trim_end_matches('g'),
1024u64 * 1024 * 1024,
)
} else if s.ends_with("mb") || s.ends_with('m') {
(
s.trim_end_matches("mb").trim_end_matches('m'),
1024u64 * 1024,
)
} else if s.ends_with("kb") || s.ends_with('k') {
(s.trim_end_matches("kb").trim_end_matches('k'), 1024u64)
} else if s.ends_with('b') {
(s.trim_end_matches('b'), 1u64)
} else {
return Err(format!("unrecognized size format: {s}"));
};
let n: u64 = num.parse().map_err(|_| format!("invalid number: {num}"))?;
Ok(n * mult)
}
use std::io::{BufRead, BufReader, Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
fn console_truncate_if_over(path: &Path, cap: u64) -> bool {
let Ok(meta) = std::fs::metadata(path) else {
return false;
};
if meta.len() <= cap {
return false;
}
match std::fs::OpenOptions::new().write(true).open(path) {
Ok(f) if f.set_len(0).is_ok() => {
tracing::debug!(path = %path.display(), cap, "console log exceeded cap; truncated");
true
}
_ => false,
}
}
pub fn json_log_path(log_dir: &Path) -> PathBuf {
log_dir.join("container.json")
}
pub fn is_runtime_console_noise(line: &str) -> bool {
line.starts_with("init.krun:")
}
fn tail_next_line(
reader: &mut (impl BufRead + Seek),
buf: &mut String,
stop: &AtomicBool,
on_eof: Option<&dyn Fn() -> bool>,
) -> Option<String> {
loop {
match reader.read_line(buf) {
Ok(0) | Err(_) => {
if stop.load(Ordering::Relaxed) {
if buf.is_empty() {
return None;
}
let line = std::mem::take(buf);
return Some(line.trim_end_matches(['\n', '\r']).to_string());
}
if buf.is_empty() {
if let Some(on_eof) = on_eof {
if on_eof() {
let _ = reader.seek(std::io::SeekFrom::Start(0));
}
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}
Ok(_) => {}
}
if !buf.ends_with('\n') {
continue;
}
let line = std::mem::take(buf);
return Some(line.trim_end_matches(['\n', '\r']).to_string());
}
}
pub fn run_log_processor(
console_log: &Path,
log_dir: &Path,
config: &LogConfig,
stop: &AtomicBool,
) {
match config.driver {
LogDriver::None => run_discard_processor(
console_log,
Some(console_cap(config.max_size(), config.max_file())),
stop,
),
LogDriver::JsonFile => run_json_file_processor(
console_log,
log_dir,
config.max_size(),
config.max_file(),
stop,
),
LogDriver::Syslog => run_syslog_processor(
console_log,
config.syslog_address(),
config.syslog_facility(),
config.tag().unwrap_or("a3s-box"),
Some(console_cap(config.max_size(), config.max_file())),
stop,
),
}
}
fn open_console(console_log: &Path, stop: &AtomicBool) -> Option<std::fs::File> {
for _ in 0..300 {
if console_log.exists() {
break;
}
if stop.load(Ordering::Relaxed) && !console_log.exists() {
return None;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
std::fs::File::open(console_log).ok()
}
pub fn stderr_console_path(console_log: &Path) -> PathBuf {
console_log.with_file_name("console.err.log")
}
fn run_tagged_tail(
file: &Path,
stream: &str,
filter_noise: bool,
bound: Option<u64>,
stop: &AtomicBool,
emit: &(dyn Fn(&str, &str) + Sync),
) {
let f = match open_console(file, stop) {
Some(f) => f,
None => return,
};
let mut reader = BufReader::new(f);
let mut buf = String::new();
let truncate = bound.map(|cap| move || console_truncate_if_over(file, cap));
let on_eof: Option<&dyn Fn() -> bool> = truncate.as_ref().map(|t| t as &dyn Fn() -> bool);
while let Some(line) = tail_next_line(&mut reader, &mut buf, stop, on_eof) {
if filter_noise && is_runtime_console_noise(&line) {
continue;
}
emit(&line, stream);
}
}
fn console_cap(max_size: u64, max_file: u32) -> u64 {
max_size.saturating_mul(u64::from(max_file.max(1)))
}
fn run_discard_processor(console_log: &Path, cap: Option<u64>, stop: &AtomicBool) {
let err_log = stderr_console_path(console_log);
let discard = |_line: &str, _stream: &str| {};
let discard: &(dyn Fn(&str, &str) + Sync) = &discard;
std::thread::scope(|s| {
s.spawn(|| run_tagged_tail(console_log, "stdout", false, cap, stop, discard));
s.spawn(|| run_tagged_tail(&err_log, "stderr", false, cap, stop, discard));
});
}
fn run_json_file_processor(
console_log: &Path,
log_dir: &Path,
max_size: u64,
max_file: u32,
stop: &AtomicBool,
) {
let json_path = json_log_path(log_dir);
let writer = std::sync::Mutex::new(match RotatingWriter::new(&json_path, max_size, max_file) {
Ok(w) => w,
Err(_) => return,
});
let err_log = stderr_console_path(console_log);
let emit = |line: &str, stream: &str| {
let entry = LogEntry {
log: format!("{line}\n"),
stream: stream.to_string(),
time: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
};
if let Ok(json) = serde_json::to_string(&entry) {
if let Ok(mut w) = writer.lock() {
let _ = w.write_line(&json);
}
}
};
let emit: &(dyn Fn(&str, &str) + Sync) = &emit;
let cap = Some(console_cap(max_size, max_file));
std::thread::scope(|s| {
s.spawn(|| run_tagged_tail(console_log, "stdout", true, cap, stop, emit));
s.spawn(|| run_tagged_tail(&err_log, "stderr", true, cap, stop, emit));
});
}
fn run_syslog_processor(
console_log: &Path,
address: &str,
_facility: &str,
tag: &str,
cap: Option<u64>,
stop: &AtomicBool,
) {
use std::net::UdpSocket;
let (proto, addr) = if let Some(rest) = address.strip_prefix("udp://") {
("udp", rest)
} else if let Some(rest) = address.strip_prefix("tcp://") {
("tcp", rest)
} else {
("udp", address)
};
let err_log = stderr_console_path(console_log);
match proto {
"udp" => {
let socket = match UdpSocket::bind("0.0.0.0:0") {
Ok(s) => s,
Err(_) => return,
};
let emit = |line: &str, _stream: &str| {
let msg = format!("<30>{tag}: {line}");
let _ = socket.send_to(msg.as_bytes(), addr);
};
let emit: &(dyn Fn(&str, &str) + Sync) = &emit;
std::thread::scope(|s| {
s.spawn(|| run_tagged_tail(console_log, "stdout", true, cap, stop, emit));
s.spawn(|| run_tagged_tail(&err_log, "stderr", true, cap, stop, emit));
});
}
"tcp" => {
let stream = match std::net::TcpStream::connect(addr) {
Ok(s) => std::sync::Mutex::new(s),
Err(_) => return,
};
let emit = |line: &str, _stream: &str| {
let msg = format!("<30>{tag}: {line}\n");
if let Ok(mut s) = stream.lock() {
if s.write_all(msg.as_bytes()).is_err() {
if let Ok(news) = std::net::TcpStream::connect(addr) {
*s = news;
let _ = s.write_all(msg.as_bytes());
}
}
}
};
let emit: &(dyn Fn(&str, &str) + Sync) = &emit;
std::thread::scope(|sc| {
sc.spawn(|| run_tagged_tail(console_log, "stdout", true, cap, stop, emit));
sc.spawn(|| run_tagged_tail(&err_log, "stderr", true, cap, stop, emit));
});
}
_ => {}
}
}
struct RotatingWriter {
path: PathBuf,
file: std::fs::File,
written: u64,
max_size: u64,
max_file: u32,
}
impl RotatingWriter {
fn new(path: &Path, max_size: u64, max_file: u32) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
let written = file.metadata()?.len();
Ok(Self {
path: path.to_path_buf(),
file,
written,
max_size,
max_file,
})
}
fn write_line(&mut self, line: &str) -> std::io::Result<()> {
let bytes = format!("{line}\n");
self.file.write_all(bytes.as_bytes())?;
self.file.flush()?;
self.written += bytes.len() as u64;
if self.written >= self.max_size {
self.rotate()?;
}
Ok(())
}
fn rotate(&mut self) -> std::io::Result<()> {
for i in (1..self.max_file).rev() {
let from = rotated_path(&self.path, i);
let to = rotated_path(&self.path, i + 1);
if from.exists() {
std::fs::rename(&from, &to)?;
}
}
let oldest = rotated_path(&self.path, self.max_file);
if oldest.exists() {
std::fs::remove_file(&oldest)?;
}
let rotated = rotated_path(&self.path, 1);
compress_file(&self.path, &rotated)?;
std::fs::remove_file(&self.path)?;
self.file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
self.written = 0;
Ok(())
}
}
fn compress_file(src: &Path, dst: &Path) -> std::io::Result<()> {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Read;
let mut input = std::fs::File::open(src)?;
let output = std::fs::File::create(dst)?;
let mut encoder = GzEncoder::new(output, Compression::fast());
let mut buf = [0u8; 8192];
loop {
let n = input.read(&mut buf)?;
if n == 0 {
break;
}
encoder.write_all(&buf[..n])?;
}
encoder.finish()?;
Ok(())
}
fn rotated_path(base: &Path, index: u32) -> PathBuf {
let mut p = base.as_os_str().to_owned();
p.push(format!(".{index}.gz"));
PathBuf::from(p)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_log_driver_from_str() {
assert_eq!(
"json-file".parse::<LogDriver>().unwrap(),
LogDriver::JsonFile
);
assert_eq!("syslog".parse::<LogDriver>().unwrap(), LogDriver::Syslog);
assert_eq!("none".parse::<LogDriver>().unwrap(), LogDriver::None);
assert!("unknown".parse::<LogDriver>().is_err());
}
#[test]
fn test_log_config_defaults() {
let config = LogConfig::default();
assert_eq!(config.driver, LogDriver::JsonFile);
assert_eq!(config.max_size(), 10 * 1024 * 1024);
assert_eq!(config.max_file(), 3);
}
#[test]
fn test_log_config_custom_options() {
let mut config = LogConfig::default();
config
.options
.insert("max-size".to_string(), "50m".to_string());
config
.options
.insert("max-file".to_string(), "5".to_string());
assert_eq!(config.max_size(), 50 * 1024 * 1024);
assert_eq!(config.max_file(), 5);
}
#[test]
fn test_parse_size() {
assert_eq!(parse_size("1024").unwrap(), 1024);
assert_eq!(parse_size("10m").unwrap(), 10 * 1024 * 1024);
assert_eq!(parse_size("1g").unwrap(), 1024 * 1024 * 1024);
assert_eq!(parse_size("512k").unwrap(), 512 * 1024);
assert!(parse_size("abc").is_err());
}
#[test]
fn test_log_entry_serialization() {
let entry = LogEntry {
log: "hello\n".to_string(),
stream: "stdout".to_string(),
time: "2026-02-12T06:00:00.000000000Z".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"log\":\"hello\\n\""));
assert!(json.contains("\"stream\":\"stdout\""));
}
#[test]
fn test_syslog_config_defaults() {
let config = LogConfig {
driver: LogDriver::Syslog,
options: HashMap::new(),
};
assert_eq!(config.syslog_address(), "udp://localhost:514");
assert_eq!(config.syslog_facility(), "daemon");
assert_eq!(config.tag(), None);
}
#[test]
fn test_syslog_config_custom() {
let mut options = HashMap::new();
options.insert(
"syslog-address".to_string(),
"tcp://loghost:1514".to_string(),
);
options.insert("syslog-facility".to_string(), "local0".to_string());
options.insert("tag".to_string(), "myapp".to_string());
let config = LogConfig {
driver: LogDriver::Syslog,
options,
};
assert_eq!(config.syslog_address(), "tcp://loghost:1514");
assert_eq!(config.syslog_facility(), "local0");
assert_eq!(config.tag(), Some("myapp"));
}
#[test]
fn test_log_driver_display() {
assert_eq!(LogDriver::JsonFile.to_string(), "json-file");
assert_eq!(LogDriver::Syslog.to_string(), "syslog");
assert_eq!(LogDriver::None.to_string(), "none");
}
#[test]
fn test_log_driver_serde_roundtrip() {
let driver = LogDriver::Syslog;
let json = serde_json::to_string(&driver).unwrap();
assert_eq!(json, "\"syslog\"");
let parsed: LogDriver = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, LogDriver::Syslog);
}
#[test]
fn test_tail_next_line_returns_complete_lines() {
use std::io::Cursor;
let mut reader = BufReader::new(Cursor::new(b"alpha\r\nbeta\n".to_vec()));
let mut buf = String::new();
let stop = AtomicBool::new(true);
assert_eq!(
tail_next_line(&mut reader, &mut buf, &stop, None),
Some("alpha".to_string())
);
assert_eq!(
tail_next_line(&mut reader, &mut buf, &stop, None),
Some("beta".to_string())
);
assert_eq!(tail_next_line(&mut reader, &mut buf, &stop, None), None);
assert!(buf.is_empty());
}
#[test]
fn test_tail_next_line_flushes_trailing_partial_on_stop() {
use std::io::Cursor;
let mut reader = BufReader::new(Cursor::new(b"only-partial".to_vec()));
let mut buf = String::new();
let stop = AtomicBool::new(true);
assert_eq!(
tail_next_line(&mut reader, &mut buf, &stop, None),
Some("only-partial".to_string())
);
assert_eq!(tail_next_line(&mut reader, &mut buf, &stop, None), None);
}
#[test]
fn test_console_truncate_if_over_only_when_over_cap() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("c.log");
std::fs::write(&path, b"hello").unwrap();
assert!(!console_truncate_if_over(&path, 10)); assert_eq!(std::fs::metadata(&path).unwrap().len(), 5);
assert!(console_truncate_if_over(&path, 4)); assert_eq!(std::fs::metadata(&path).unwrap().len(), 0);
assert!(!console_truncate_if_over(&dir.path().join("nope"), 0));
}
#[test]
fn test_run_tagged_tail_truncates_over_cap_and_keeps_emitting() {
use std::sync::{Arc, Mutex};
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("console.log");
std::fs::write(&path, b"l1\nl2\nl3\n").unwrap();
let cap = 4u64;
let collected = Arc::new(Mutex::new(Vec::<String>::new()));
let stop = Arc::new(AtomicBool::new(false));
let (c2, s2, p2) = (collected.clone(), stop.clone(), path.clone());
let handle = std::thread::spawn(move || {
let emit = move |line: &str, _stream: &str| c2.lock().unwrap().push(line.to_string());
let emit: &(dyn Fn(&str, &str) + Sync) = &emit;
run_tagged_tail(&p2, "stdout", false, Some(cap), &s2, emit);
});
std::thread::sleep(Duration::from_millis(300));
{
use std::io::Write as _;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
f.write_all(b"l4\nl5\n").unwrap();
}
std::thread::sleep(Duration::from_millis(300));
stop.store(true, Ordering::Relaxed);
handle.join().unwrap();
let got = collected.lock().unwrap().clone();
for line in ["l1", "l3", "l4", "l5"] {
assert!(got.contains(&line.to_string()), "missing {line} in {got:?}");
}
let final_len = std::fs::metadata(&path).unwrap().len();
assert!(
final_len <= cap + 6,
"console.log unbounded: {final_len} bytes"
);
}
#[test]
fn test_none_driver_still_bounds_console() {
use std::sync::Arc;
use std::time::Duration;
let dir = tempfile::tempdir().unwrap();
let console = dir.path().join("console.log");
std::fs::write(&console, b"l1\nl2\nl3\n").unwrap(); std::fs::write(dir.path().join("console.err.log"), b"").unwrap();
let mut options = HashMap::new();
options.insert("max-size".to_string(), "4".to_string());
options.insert("max-file".to_string(), "1".to_string());
let config = LogConfig {
driver: LogDriver::None,
options,
};
let stop = Arc::new(AtomicBool::new(false));
let (s2, c2, d2) = (stop.clone(), console.clone(), dir.path().to_path_buf());
let handle = std::thread::spawn(move || run_log_processor(&c2, &d2, &config, &s2));
std::thread::sleep(Duration::from_millis(300));
stop.store(true, Ordering::Relaxed);
handle.join().unwrap();
assert!(std::fs::metadata(&console).unwrap().len() <= 4);
assert!(!dir.path().join("container.json").exists());
}
#[test]
fn test_is_runtime_console_noise() {
assert!(is_runtime_console_noise("init.krun: mount_filesystems ok"));
assert!(!is_runtime_console_noise("L1"));
assert!(!is_runtime_console_noise(
"starting app (init.krun: ignored)"
));
assert!(!is_runtime_console_noise(""));
}
#[test]
fn test_run_json_file_processor_captures_all_lines_after_stop() {
let dir = tempfile::tempdir().unwrap();
let console = dir.path().join("console.log");
std::fs::write(&console, "AAA\ninit.krun: noise\nBBB\n").unwrap();
let stop = AtomicBool::new(true);
run_json_file_processor(&console, dir.path(), 10 * 1024 * 1024, 3, &stop);
let json = std::fs::read_to_string(json_log_path(dir.path())).unwrap();
assert!(json.contains("\"log\":\"AAA\\n\""), "AAA missing: {json}");
assert!(
json.contains("\"log\":\"BBB\\n\""),
"BBB (after a quiet line) missing: {json}"
);
assert!(
!json.contains("init.krun"),
"runtime noise must be filtered: {json}"
);
}
#[test]
fn test_rotating_writer_rotates_and_gzips() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("container.json");
let mut w = RotatingWriter::new(&path, 20, 3).unwrap();
for i in 0..10 {
w.write_line(&format!("line-{i}")).unwrap();
}
assert!(
rotated_path(&path, 1).exists(),
"expected a rotated .1.gz file"
);
}
}