#![allow(unsafe_code)]
use std::fs;
use std::path::PathBuf;
#[cfg(windows)]
use std::process::Command;
use std::time::Duration;
use crate::cloud::client::CloudClient;
use crate::cloud::outbox::{DEFAULT_STALE_SECONDS, OutboxQueue, drain_outbox};
use crate::db::init_db;
use crate::paths;
pub fn pid_path() -> Result<PathBuf, String> {
Ok(paths::data_home()?.join("daemon.pid"))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DaemonStatus {
Running { pid: i32 },
Stale { pid: i32 },
NotRunning,
}
impl DaemonStatus {
pub fn short(&self) -> String {
match self {
Self::Running { pid } => format!("running (pid {pid})"),
Self::Stale { pid } => format!("stale pid file (pid {pid}); not running"),
Self::NotRunning => "not running".to_owned(),
}
}
}
pub fn status() -> DaemonStatus {
let Ok(path) = pid_path() else {
return DaemonStatus::NotRunning;
};
let Some(pid) = read_pid(&path) else {
return DaemonStatus::NotRunning;
};
if is_process_alive(pid) {
DaemonStatus::Running { pid }
} else {
DaemonStatus::Stale { pid }
}
}
fn read_pid(path: &std::path::Path) -> Option<i32> {
let raw = fs::read_to_string(path).ok()?;
raw.trim().parse::<i32>().ok()
}
fn write_pid(path: &std::path::Path, pid: i32) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|e| format!("create parent: {e}"))?;
}
fs::write(path, pid.to_string()).map_err(|e| format!("write pid: {e}"))
}
fn remove_pid_file(path: &std::path::Path) {
let _ = fs::remove_file(path);
}
#[cfg(unix)]
fn is_process_alive(pid: i32) -> bool {
unsafe { libc::kill(pid, 0) == 0 }
}
#[cfg(windows)]
fn is_process_alive(pid: i32) -> bool {
let Ok(output) = Command::new("tasklist")
.args(["/FI", &format!("PID eq {pid}"), "/FO", "CSV", "/NH"])
.output()
else {
return false;
};
if !output.status.success() {
return false;
}
let stdout = String::from_utf8_lossy(&output.stdout);
stdout.contains(&format!("\"{pid}\"")) || stdout.contains(&format!(",{pid},"))
}
#[cfg(unix)]
fn send_term(pid: i32) -> std::io::Result<()> {
send_signal(pid, libc::SIGTERM)
}
#[cfg(unix)]
fn send_kill(pid: i32) -> std::io::Result<()> {
send_signal(pid, libc::SIGKILL)
}
#[cfg(unix)]
fn send_signal(pid: i32, signum: libc::c_int) -> std::io::Result<()> {
let rc = unsafe { libc::kill(pid, signum) };
if rc == 0 {
Ok(())
} else {
Err(std::io::Error::last_os_error())
}
}
#[cfg(windows)]
fn send_term(pid: i32) -> std::io::Result<()> {
let status = Command::new("taskkill")
.args(["/PID", &pid.to_string()])
.status()?;
if status.success() {
Ok(())
} else {
Err(std::io::Error::other(format!(
"taskkill exited with {status}"
)))
}
}
#[cfg(windows)]
fn send_kill(pid: i32) -> std::io::Result<()> {
let status = Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/F"])
.status()?;
if status.success() {
Ok(())
} else {
Err(std::io::Error::other(format!(
"taskkill /F exited with {status}"
)))
}
}
pub async fn stop(grace_secs: u64) -> Result<StopOutcome, String> {
let path = pid_path()?;
let Some(pid) = read_pid(&path) else {
return Ok(StopOutcome::NotRunning);
};
if !is_process_alive(pid) {
remove_pid_file(&path);
return Ok(StopOutcome::StaleCleaned { pid });
}
send_term(pid).map_err(|e| format!("terminate pid {pid}: {e}"))?;
let poll = Duration::from_millis(200);
let deadline = tokio::time::Instant::now() + Duration::from_secs(grace_secs.max(1));
while tokio::time::Instant::now() < deadline {
if !is_process_alive(pid) {
remove_pid_file(&path);
return Ok(StopOutcome::Terminated { pid });
}
tokio::time::sleep(poll).await;
}
let _ = send_kill(pid);
remove_pid_file(&path);
Ok(StopOutcome::Killed { pid })
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StopOutcome {
NotRunning,
StaleCleaned { pid: i32 },
Terminated { pid: i32 },
Killed { pid: i32 },
}
pub async fn run(tick_interval_secs: u64, batch_size: usize) -> Result<(), String> {
let path = pid_path()?;
match status() {
DaemonStatus::Running { pid } => {
return Err(format!(
"another daemon is already running (pid {pid}); stop that process before starting another"
));
}
DaemonStatus::Stale { .. } | DaemonStatus::NotRunning => {}
}
let my_pid = std::process::id() as i32;
write_pid(&path, my_pid)?;
let db = init_db().await?;
let queue = OutboxQueue::new(db);
let client = CloudClient::create().await;
let shutdown = shutdown_signal_future();
tokio::pin!(shutdown);
let tick = Duration::from_secs(tick_interval_secs.max(1));
loop {
tokio::select! {
biased;
() = &mut shutdown => break,
() = tokio::time::sleep(tick) => {
let _ = queue.reset_stale(DEFAULT_STALE_SECONDS).await;
if let Err(e) = drain_outbox(&queue, &client, batch_size).await {
eprintln!("[difflore.daemon] drain error: {e}");
}
}
}
}
remove_pid_file(&path);
Ok(())
}
async fn shutdown_signal_future() {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let Ok(mut sigterm) = signal(SignalKind::terminate()) else {
return;
};
let Ok(mut sigint) = signal(SignalKind::interrupt()) else {
return;
};
tokio::select! {
_ = sigterm.recv() => {}
_ = sigint.recv() => {}
}
}
#[cfg(windows)]
{
let _ = tokio::signal::ctrl_c().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
static TEST_SERIAL: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
fn spawn_dead_pid() -> i32 {
#[cfg(unix)]
let mut child = std::process::Command::new("true")
.spawn()
.expect("spawn true");
#[cfg(windows)]
let mut child = Command::new("cmd")
.args(["/C", "exit", "0"])
.spawn()
.expect("spawn cmd");
let id = child.id() as i32;
let _ = child.wait();
id
}
#[test]
fn status_reports_not_running_when_pid_file_missing() {
let _g = TEST_SERIAL.blocking_lock();
let _ = crate::db::shared_test_home();
let path = pid_path().expect("pid path");
let _ = fs::remove_file(&path);
assert_eq!(status(), DaemonStatus::NotRunning);
}
#[test]
fn status_detects_stale_pid_file() {
let _g = TEST_SERIAL.blocking_lock();
let _ = crate::db::shared_test_home();
let path = pid_path().expect("pid path");
let dead_pid = spawn_dead_pid();
fs::write(&path, dead_pid.to_string()).unwrap();
let stored: i32 = fs::read_to_string(&path).unwrap().trim().parse().unwrap();
match status() {
DaemonStatus::Stale { pid } => assert_eq!(pid, stored),
other => panic!("expected Stale, got {other:?}"),
}
let _ = fs::remove_file(&path);
}
#[tokio::test]
async fn stop_is_noop_when_not_running() {
let _g = TEST_SERIAL.lock().await;
let _ = crate::db::shared_test_home();
let path = pid_path().unwrap();
let _ = fs::remove_file(&path);
let outcome = stop(1).await.unwrap();
assert_eq!(outcome, StopOutcome::NotRunning);
}
#[tokio::test]
async fn stop_cleans_stale_pid_file_without_signalling() {
let _g = TEST_SERIAL.lock().await;
let _ = crate::db::shared_test_home();
let path = pid_path().unwrap();
let dead_pid = spawn_dead_pid();
fs::write(&path, dead_pid.to_string()).unwrap();
let stored: i32 = fs::read_to_string(&path).unwrap().trim().parse().unwrap();
let outcome = stop(1).await.unwrap();
assert_eq!(outcome, StopOutcome::StaleCleaned { pid: stored });
assert!(!path.exists(), "stale pid file should be removed by stop()");
}
}