#![cfg(all(unix, feature = "daemon"))]
use std::path::PathBuf;
use std::process::Stdio;
use crate::error::SchedulerError;
use crate::pidfile::PidFile;
use crate::scheduler::Scheduler;
#[derive(Debug, Clone)]
pub struct DaemonConfig {
pub pid_file: PathBuf,
pub log_file: PathBuf,
pub catch_up: bool,
pub tick_secs: u64,
pub shutdown_grace_secs: u64,
}
#[derive(Debug, serde::Serialize)]
pub struct DaemonStatus {
pub running: bool,
pub pid: Option<u32>,
pub pid_file: PathBuf,
pub log_file: PathBuf,
pub task_count: usize,
pub recent_runs: Vec<TaskRunSummary>,
}
#[derive(Debug, serde::Serialize)]
pub struct TaskRunSummary {
pub name: String,
pub mode: String,
pub last_run: String,
pub next_run: String,
}
pub async fn run_foreground(
mut scheduler: Scheduler,
cfg: &DaemonConfig,
) -> Result<(), SchedulerError> {
let _span = tracing::info_span!(
"scheduler.daemon.start",
pid_file = %cfg.pid_file.display(),
detached = false,
)
.entered();
let _ = rustix::process::setsid();
let _pidfile = PidFile::acquire(&cfg.pid_file)?;
tracing::info!(
pid = std::process::id(),
pid_file = %cfg.pid_file.display(),
"scheduler daemon started (foreground)"
);
scheduler.init().await?;
if cfg.catch_up {
scheduler.catch_up_missed().await?;
}
scheduler
.run_with_interval_and_grace(cfg.tick_secs, cfg.shutdown_grace_secs)
.await;
tracing::info!("scheduler daemon stopped");
Ok(())
}
pub fn detach_and_run(cfg: &DaemonConfig, extra_args: &[&str]) -> Result<(), SchedulerError> {
let _span = tracing::info_span!(
"scheduler.daemon.start",
pid_file = %cfg.pid_file.display(),
detached = true,
)
.entered();
if let Some(parent) = cfg.log_file.parent().filter(|p| !p.as_os_str().is_empty()) {
std::fs::create_dir_all(parent).map_err(|e| {
SchedulerError::Detach(format!(
"failed to create log directory {}: {e}",
parent.display()
))
})?;
}
let log_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&cfg.log_file)
.map_err(|e| {
SchedulerError::Detach(format!(
"failed to open log file {}: {e}",
cfg.log_file.display()
))
})?;
let exe = std::env::current_exe()
.map_err(|e| SchedulerError::Detach(format!("failed to resolve current exe: {e}")))?;
let log_stderr = log_file
.try_clone()
.map_err(|e| SchedulerError::Detach(format!("failed to clone log fd: {e}")))?;
let child = std::process::Command::new(&exe)
.args(extra_args)
.stdin(Stdio::null())
.stdout(log_file)
.stderr(log_stderr)
.spawn()
.map_err(|e| {
SchedulerError::Detach(format!("failed to spawn daemon child process: {e}"))
})?;
tracing::info!(
child_pid = child.id(),
exe = %exe.display(),
log_file = %cfg.log_file.display(),
"scheduler daemon detached"
);
std::process::exit(0);
}
pub async fn daemon_status(
cfg: &DaemonConfig,
store_url: &str,
recent_n: usize,
) -> Result<DaemonStatus, SchedulerError> {
let pid = PidFile::read_alive(&cfg.pid_file);
let running = pid.is_some();
let store = crate::store::JobStore::open(store_url).await?;
store.init().await?;
let jobs = store.list_jobs_full().await?;
let task_count = jobs.len();
let recent_runs: Vec<TaskRunSummary> = jobs
.into_iter()
.take(recent_n)
.map(|j| TaskRunSummary {
name: j.name,
mode: j.task_mode,
last_run: String::new(), next_run: j.next_run,
})
.collect();
Ok(DaemonStatus {
running,
pid,
pid_file: cfg.pid_file.clone(),
log_file: cfg.log_file.clone(),
task_count,
recent_runs,
})
}
pub fn stop_daemon(cfg: &DaemonConfig, timeout_secs: u64) -> Result<(), SchedulerError> {
let Some(pid) = PidFile::read_alive(&cfg.pid_file) else {
return Err(SchedulerError::Io(format!(
"no running daemon found (pid file: {})",
cfg.pid_file.display()
)));
};
let rustix_pid = rustix::process::Pid::from_raw(pid.cast_signed())
.ok_or_else(|| SchedulerError::Io(format!("invalid pid {pid} in pid file")))?;
rustix::process::kill_process(rustix_pid, rustix::process::Signal::TERM)
.map_err(|e| SchedulerError::Io(format!("failed to send SIGTERM to pid {pid}: {e}")))?;
tracing::info!(pid, "SIGTERM sent to scheduler daemon");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
loop {
std::thread::sleep(std::time::Duration::from_millis(200));
if !crate::pidfile::is_process_alive(pid) {
tracing::info!(pid, "daemon stopped");
return Ok(());
}
if std::time::Instant::now() >= deadline {
break;
}
}
tracing::warn!(
pid,
"daemon did not stop within {timeout_secs}s — sending SIGKILL"
);
let rustix_pid = rustix::process::Pid::from_raw(pid.cast_signed())
.ok_or_else(|| SchedulerError::Io(format!("invalid pid {pid} in pid file")))?;
rustix::process::kill_process(rustix_pid, rustix::process::Signal::KILL)
.map_err(|e| SchedulerError::Io(format!("failed to send SIGKILL to pid {pid}: {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::DaemonConfig;
fn test_cfg() -> DaemonConfig {
DaemonConfig {
pid_file: PathBuf::from("/tmp/zeph-test.pid"),
log_file: PathBuf::from("/tmp/zeph-test.log"),
catch_up: true,
tick_secs: 60,
shutdown_grace_secs: 30,
}
}
#[test]
fn daemon_config_clone() {
let cfg = test_cfg();
let cfg2 = cfg.clone();
assert_eq!(cfg.tick_secs, cfg2.tick_secs);
assert_eq!(cfg.shutdown_grace_secs, cfg2.shutdown_grace_secs);
}
#[test]
fn daemon_config_defaults_reasonable() {
let cfg = test_cfg();
assert!(cfg.tick_secs >= 5);
assert!(cfg.shutdown_grace_secs >= 1);
}
}