use std::{
fs::{File, OpenOptions},
io::{self, Write},
path::{Path, PathBuf},
time::{Duration, Instant},
};
use parking_lot::Mutex;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{
EnvFilter, Layer as _, fmt, layer::SubscriberExt, util::SubscriberInitExt,
};
use crate::config::DaemonConfig;
const OVERFLOW_FACTOR: u64 = 2;
const RETRY_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Debug)]
enum DegradedReason {
RenameFailed {
#[allow(dead_code)]
io_kind: io::ErrorKind,
attempt: u32,
},
OverflowBeyondLimit,
}
struct RollingState {
base_path: PathBuf,
max_bytes: u64,
keep: u32,
current: Option<File>,
current_bytes: u64,
degraded: Option<DegradedReason>,
last_retry: Option<Instant>,
}
impl RollingState {
fn log_path(&self) -> PathBuf {
self.base_path.clone()
}
fn rotated_path(&self, n: u32) -> PathBuf {
let mut p = self.base_path.clone();
let fname = p
.file_name()
.map(|f| format!("{}.{}", f.to_string_lossy(), n))
.unwrap_or_else(|| format!("sqryd.log.{n}"));
p.set_file_name(fname);
p
}
fn ensure_open(&mut self) -> io::Result<()> {
if self.current.is_none() {
let f = open_log_file(&self.log_path())?;
self.current = Some(f);
}
Ok(())
}
fn try_rotate(&mut self) {
if let Some(mut f) = self.current.take() {
let _ = f.flush();
}
if let Err(e) = self.shift_rotated_files() {
let attempt = match &self.degraded {
Some(DegradedReason::RenameFailed { attempt, .. }) => attempt + 1,
_ => 1,
};
self.degraded = Some(DegradedReason::RenameFailed {
io_kind: e.kind(),
attempt,
});
self.last_retry = Some(Instant::now());
report_degraded(&self.base_path, &e);
let _ = self.ensure_open();
return;
}
match open_log_file_fresh(&self.log_path()) {
Ok(f) => {
self.current = Some(f);
self.current_bytes = 0;
self.degraded = None;
self.last_retry = None;
}
Err(e) => {
let attempt = match &self.degraded {
Some(DegradedReason::RenameFailed { attempt, .. }) => attempt + 1,
_ => 1,
};
self.degraded = Some(DegradedReason::RenameFailed {
io_kind: e.kind(),
attempt,
});
self.last_retry = Some(Instant::now());
report_degraded(&self.base_path, &e);
let _ = self.ensure_open();
}
}
}
fn shift_rotated_files(&self) -> io::Result<()> {
let oldest = self.rotated_path(self.keep);
if oldest.exists() {
std::fs::remove_file(&oldest)?;
}
for i in (1..self.keep).rev() {
let src = self.rotated_path(i);
let dst = self.rotated_path(i + 1);
if src.exists() {
rename_file(&src, &dst)?;
}
}
let log = self.log_path();
if log.exists() {
rename_file(&log, &self.rotated_path(1))?;
}
Ok(())
}
}
fn rename_file(src: &Path, dst: &Path) -> io::Result<()> {
#[cfg(windows)]
{
use std::os::windows::ffi::OsStrExt;
use windows_sys::Win32::Foundation::FALSE;
use windows_sys::Win32::Storage::FileSystem::{MOVEFILE_REPLACE_EXISTING, MoveFileExW};
let src_wide: Vec<u16> = src.as_os_str().encode_wide().chain(Some(0)).collect();
let dst_wide: Vec<u16> = dst.as_os_str().encode_wide().chain(Some(0)).collect();
let ok = unsafe {
MoveFileExW(
src_wide.as_ptr(),
dst_wide.as_ptr(),
MOVEFILE_REPLACE_EXISTING,
)
};
if ok == FALSE {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(not(windows))]
{
std::fs::rename(src, dst)
}
}
fn open_log_file(path: &Path) -> io::Result<File> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let f = OpenOptions::new().create(true).append(true).open(path)?;
set_log_permissions(&f);
Ok(f)
}
fn open_log_file_fresh(path: &Path) -> io::Result<File> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
set_log_permissions(&f);
Ok(f)
}
#[allow(unused_variables)]
fn set_log_permissions(f: &File) {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = f.set_permissions(std::fs::Permissions::from_mode(0o600));
}
}
fn report_degraded(base_path: &Path, err: &io::Error) {
use chrono::Utc;
let sidecar = {
let mut s = base_path.as_os_str().to_owned();
s.push(".rotate-errors.log");
PathBuf::from(s)
};
let ts = Utc::now().to_rfc3339();
let line = format!("{ts} rotation failure: {err:?}\n");
if let Ok(mut f) = OpenOptions::new().create(true).append(true).open(&sidecar) {
set_log_permissions(&f);
let _ = f.write_all(line.as_bytes());
let _ = f.flush();
}
tracing::error!(target: "sqryd.rotate", "log rotation failure: {err:?}");
}
pub struct RollingSizeAppender {
state: Mutex<RollingState>,
}
impl RollingSizeAppender {
pub fn new(base_path: PathBuf, max_bytes: u64, keep: u32) -> io::Result<Self> {
let keep = keep.clamp(1, 100);
let f = open_log_file(&base_path)?;
let current_bytes = f.metadata().map(|m| m.len()).unwrap_or(0);
Ok(Self {
state: Mutex::new(RollingState {
base_path,
max_bytes,
keep,
current: Some(f),
current_bytes,
degraded: None,
last_retry: None,
}),
})
}
}
impl Write for RollingSizeAppender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
write_impl(&self.state, buf)
}
fn flush(&mut self) -> io::Result<()> {
flush_impl(&self.state)
}
}
impl Write for &RollingSizeAppender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
write_impl(&self.state, buf)
}
fn flush(&mut self) -> io::Result<()> {
flush_impl(&self.state)
}
}
fn write_impl(state: &Mutex<RollingState>, buf: &[u8]) -> io::Result<usize> {
let mut s = state.lock();
if matches!(s.degraded, Some(DegradedReason::OverflowBeyondLimit)) {
return Err(io::Error::new(
io::ErrorKind::StorageFull,
"log rotation in overflow-beyond-limit degraded state; writes refused",
));
}
let in_rename_degraded = matches!(s.degraded, Some(DegradedReason::RenameFailed { .. }));
if in_rename_degraded {
let should_retry = s
.last_retry
.map(|t| t.elapsed() >= RETRY_INTERVAL)
.unwrap_or(true);
if should_retry {
s.try_rotate(); }
}
if s.degraded.is_none() {
let len = buf.len() as u64;
if s.current_bytes + len > s.max_bytes {
s.try_rotate();
}
}
if matches!(s.degraded, Some(DegradedReason::RenameFailed { .. })) {
let len = buf.len() as u64;
if s.current_bytes + len > s.max_bytes.saturating_mul(OVERFLOW_FACTOR) {
s.degraded = Some(DegradedReason::OverflowBeyondLimit);
return Err(io::Error::new(
io::ErrorKind::StorageFull,
"log file exceeded overflow limit; writes refused until rotation succeeds",
));
}
}
s.ensure_open()?;
let n = s
.current
.as_mut()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"log file handle unexpectedly absent after ensure_open",
)
})?
.write(buf)?;
s.current_bytes += n as u64;
Ok(n)
}
fn flush_impl(state: &Mutex<RollingState>) -> io::Result<()> {
let mut s = state.lock();
if let Some(f) = s.current.as_mut() {
f.flush()?;
}
Ok(())
}
pub fn install_tracing(
cfg: &DaemonConfig,
cli_level: Option<&str>,
) -> crate::DaemonResult<Option<WorkerGuard>> {
let filter_str = cli_level
.map(ToOwned::to_owned)
.or_else(|| std::env::var("SQRY_DAEMON_LOG_LEVEL").ok())
.unwrap_or_else(|| cfg.log_level.clone());
let under_systemd = crate::lifecycle::notify::is_under_systemd();
let resolved_log_path = cfg.log_file.resolve();
if let (false, Some(log_path)) = (under_systemd, resolved_log_path) {
let max_bytes = cfg.log_max_size_mb.saturating_mul(1024 * 1024);
let keep = cfg.log_keep_rotations;
let appender = match RollingSizeAppender::new(log_path.clone(), max_bytes, keep) {
Ok(a) => a,
Err(e) => {
eprintln!(
"sqryd: WARN: cannot open log file {} ({e}); falling back to stderr-only logging. \
Set `log_file` in daemon.toml or `SQRY_DAEMON_LOG_FILE` to a writable path \
(or `log_file = \"stderr\"` to silence this warning).",
log_path.display(),
);
let filter =
EnvFilter::try_new(&filter_str).unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::registry()
.with(
fmt::layer()
.compact()
.with_writer(io::stderr)
.with_filter(filter),
)
.try_init()
.map_err(|e| {
crate::DaemonError::Internal(anyhow::anyhow!(
"global tracing/log subscriber already installed: {e}"
))
})?;
return Ok(None);
}
};
let (non_blocking, guard) = tracing_appender::non_blocking(appender);
let rolling_filter =
EnvFilter::try_new(format!("{filter_str},not[{{target=sqryd.rotate}}]"))
.unwrap_or_else(|_| EnvFilter::new("info,not[{target=sqryd.rotate}]"));
let rotate_stderr_filter = EnvFilter::new("sqryd.rotate=error");
tracing_subscriber::registry()
.with(
fmt::layer()
.json()
.with_writer(non_blocking)
.with_filter(rolling_filter),
)
.with(
fmt::layer()
.compact()
.with_writer(io::stderr)
.with_filter(rotate_stderr_filter),
)
.try_init()
.map_err(|e| {
crate::DaemonError::Internal(anyhow::anyhow!(
"global tracing/log subscriber already installed: {e}"
))
})?;
return Ok(Some(guard));
}
let filter = EnvFilter::try_new(&filter_str).unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::registry()
.with(
fmt::layer()
.compact()
.with_writer(io::stderr)
.with_filter(filter),
)
.try_init()
.map_err(|e| {
crate::DaemonError::Internal(anyhow::anyhow!(
"global tracing/log subscriber already installed: {e}"
))
})?;
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lifecycle::test_support::NotifySocketGuard;
use std::sync::{Arc, Barrier};
use std::thread;
use tempfile::TempDir;
fn make_appender(dir: &TempDir, max_bytes: u64, keep: u32) -> RollingSizeAppender {
let path = dir.path().join("sqryd.log");
RollingSizeAppender::new(path, max_bytes, keep)
.expect("RollingSizeAppender::new must succeed in a tempdir")
}
#[test]
fn rotates_on_size_exceeded() {
let dir = TempDir::new().unwrap();
let log_path = dir.path().join("sqryd.log");
let max_bytes = 64_u64;
let mut appender = make_appender(&dir, max_bytes, 3);
let chunk = vec![b'A'; 60];
appender.write_all(&chunk).unwrap();
appender.flush().unwrap();
assert!(
log_path.exists(),
"active log file must exist after writing"
);
let overflow = vec![b'B'; 10];
appender.write_all(&overflow).unwrap();
appender.flush().unwrap();
let rotated = dir.path().join("sqryd.log.1");
assert!(
rotated.exists(),
"rotated .1 file must exist after rotation"
);
assert!(
log_path.exists(),
"active log file must be re-created after rotation"
);
let content = std::fs::read(&rotated).unwrap();
assert_eq!(&content, &chunk, ".1 must contain the pre-rotation bytes");
}
#[test]
fn keep_limit_enforced() {
let dir = TempDir::new().unwrap();
let max_bytes = 32_u64;
let keep = 3_u32;
let mut appender = make_appender(&dir, max_bytes, keep);
for i in 0..(keep + 2) {
let chunk = vec![b'0' + (i % 10) as u8; 40]; appender.write_all(&chunk).unwrap();
appender.flush().unwrap();
}
let rotated_count = (1..=keep + 2)
.filter(|n| dir.path().join(format!("sqryd.log.{n}")).exists())
.count();
assert!(
rotated_count <= keep as usize,
"expected at most {keep} rotated copies, found {rotated_count}"
);
let too_old = dir.path().join(format!("sqryd.log.{}", keep + 1));
assert!(
!too_old.exists(),
"rotated copy beyond keep limit must be deleted"
);
}
#[test]
#[cfg(unix)] fn rename_failure_enters_degraded_mode_not_panic() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let dir = TempDir::new().unwrap();
let log_path = dir.path().join("sqryd.log");
let max_bytes = 32_u64;
let mut appender = make_appender(&dir, max_bytes, 3);
let chunk = vec![b'X'; 30];
appender.write_all(&chunk).unwrap();
fs::set_permissions(dir.path(), fs::Permissions::from_mode(0o555)).unwrap();
let trigger = vec![b'Y'; 10];
let _ = appender.write_all(&trigger);
fs::set_permissions(dir.path(), fs::Permissions::from_mode(0o755)).unwrap();
assert!(
log_path.exists(),
"log file must still exist after failed rotation"
);
{
let state = appender.state.lock();
assert!(
matches!(state.degraded, Some(DegradedReason::RenameFailed { .. })),
"appender must be in RenameFailed degraded mode after rotation failure, \
got: {:?}",
state.degraded
);
}
}
#[test]
#[cfg(unix)]
fn overflow_beyond_limit_returns_storage_full() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let dir = TempDir::new().unwrap();
let max_bytes = 32_u64;
let mut appender = make_appender(&dir, max_bytes, 3);
appender.write_all(&[b'A'; 30]).unwrap();
fs::set_permissions(dir.path(), fs::Permissions::from_mode(0o555)).unwrap();
let _ = appender.write_all(&[b'B'; 10]);
for _ in 0..5 {
let _ = appender.write_all(&[b'C'; 10]);
}
let result = appender.write(&[b'D'; 1]);
fs::set_permissions(dir.path(), fs::Permissions::from_mode(0o755)).unwrap();
match result {
Err(e) if e.kind() == io::ErrorKind::StorageFull => { }
Err(e) => panic!("expected StorageFull, got {:?}", e),
Ok(_) => {
}
}
}
#[test]
#[cfg(unix)]
fn degraded_recovers_via_retry_after_interval() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let dir = TempDir::new().unwrap();
let max_bytes = 32_u64;
let mut appender = make_appender(&dir, max_bytes, 3);
appender.write_all(&[b'A'; 30]).unwrap();
fs::set_permissions(dir.path(), fs::Permissions::from_mode(0o555)).unwrap();
let _ = appender.write_all(&[b'B'; 10]);
fs::set_permissions(dir.path(), fs::Permissions::from_mode(0o755)).unwrap();
{
let state = appender.state.lock();
assert!(
matches!(state.degraded, Some(DegradedReason::RenameFailed { .. })),
"appender must be in RenameFailed degraded mode"
);
}
{
let mut state = appender.state.lock();
state.last_retry = Some(Instant::now() - RETRY_INTERVAL - Duration::from_secs(1));
}
appender.write_all(&[b'C'; 5]).unwrap();
let state = appender.state.lock();
assert!(
state.degraded.is_none(),
"appender must leave degraded mode after successful retry"
);
}
#[test]
#[cfg(windows)]
fn windows_rename_after_explicit_drop_succeeds() {
let dir = TempDir::new().unwrap();
let log_path = dir.path().join("sqryd.log");
let max_bytes = 32_u64;
let mut appender = make_appender(&dir, max_bytes, 2);
appender.write_all(&[b'W'; 40]).unwrap();
appender.flush().unwrap();
assert!(
dir.path().join("sqryd.log.1").exists(),
"rotated .1 must exist on Windows after M3 drop-before-rename"
);
assert!(
log_path.exists(),
"active log must be re-created on Windows"
);
}
#[test]
fn handle_closed_before_rename_m3_sequence_verified() {
let dir = TempDir::new().unwrap();
let log_path = dir.path().join("sqryd.log");
let max_bytes = 32_u64;
let mut appender = make_appender(&dir, max_bytes, 2);
appender.write_all(&[b'M'; 40]).unwrap();
appender.flush().unwrap();
assert!(
dir.path().join("sqryd.log.1").exists(),
"M3: rotated .1 must exist — rename must have succeeded after handle was closed"
);
assert!(
log_path.exists(),
"M3: active log must be re-created after rotation"
);
{
let state = appender.state.lock();
assert!(
state.current.is_some(),
"M3: current file handle must be Some after successful rotation \
(drop-before-rename + reopen sequence)"
);
}
}
#[test]
fn concurrent_writes_serialized() {
const NUM_THREADS: usize = 8;
const BYTES_PER_THREAD: usize = 1024 * 1024;
let dir = TempDir::new().unwrap();
let appender = Arc::new(
RollingSizeAppender::new(
dir.path().join("sqryd.log"),
512 * 1024, 20,
)
.unwrap(),
);
let barrier = Arc::new(Barrier::new(NUM_THREADS));
let mut handles = Vec::with_capacity(NUM_THREADS);
for tid in 0..NUM_THREADS {
let app = Arc::clone(&appender);
let bar = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
bar.wait(); let payload = vec![b'0' + tid as u8; BYTES_PER_THREAD];
for chunk in payload.chunks(4096) {
(&*app).write_all(chunk).unwrap();
}
}));
}
for h in handles {
h.join().expect("worker thread must not panic");
}
(&*appender).flush().unwrap();
let total_bytes: u64 = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("sqryd.log") && !n.ends_with("rotate-errors.log"))
.unwrap_or(false)
})
.filter_map(|e| e.metadata().ok().map(|m| m.len()))
.sum();
let expected = (NUM_THREADS * BYTES_PER_THREAD) as u64;
assert_eq!(
total_bytes, expected,
"total bytes on disk ({total_bytes}) must equal total bytes written ({expected})"
);
}
#[test]
fn notify_socket_set_skips_rolling_appender() {
let dir = TempDir::new().unwrap();
let log_path = dir.path().join("sqryd.log");
let _guard = NotifySocketGuard::set("/run/systemd/notify.sock");
assert!(
crate::lifecycle::notify::is_under_systemd(),
"is_under_systemd must return true when NOTIFY_SOCKET is set (gate for \
RollingSizeAppender skip in install_tracing)"
);
assert!(
!log_path.exists(),
"pre-condition: log file must not exist before install_tracing is called"
);
let cfg = crate::config::DaemonConfig {
log_file: crate::config::LogFileSetting::Path(log_path.clone()),
..crate::config::DaemonConfig::default()
};
let result = install_tracing(&cfg, None);
assert!(
!matches!(result, Ok(Some(_))),
"install_tracing must not return Ok(Some(WorkerGuard)) under systemd — \
rolling appender must be skipped when NOTIFY_SOCKET is set; \
log_file was {:?}",
cfg.log_file.resolve()
);
assert!(
!log_path.exists(),
"log file must NOT be created when install_tracing goes through the \
stderr-fallback path under systemd supervision (NOTIFY_SOCKET set); \
file existence would prove RollingSizeAppender was instantiated"
);
}
}