use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::signal;
use tokio::sync::mpsc;
use crate::core::engine::JormEngine;
use crate::scheduler::cron::{Schedule, ScheduleError, Scheduler};
#[derive(Debug, thiserror::Error)]
pub enum DaemonError {
#[error("Daemon is already running")]
AlreadyRunning,
#[error("Daemon is not running")]
NotRunning,
#[error("Failed to read daemon state: {0}")]
StateReadError(String),
#[error("Failed to write daemon state: {0}")]
StateWriteError(String),
#[error("Schedule error: {0}")]
ScheduleError(#[from] ScheduleError),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct DaemonState {
pub pid: Option<u32>,
pub started_at: Option<DateTime<Utc>>,
pub schedules_file: Option<String>,
pub log_file: Option<String>,
}
pub struct Daemon {
engine: Arc<JormEngine>,
scheduler: Scheduler,
state_file: PathBuf,
schedules_file: Option<PathBuf>,
log_file: Option<PathBuf>,
}
impl Daemon {
pub fn new(
engine: Arc<JormEngine>,
state_file: PathBuf,
schedules_file: Option<PathBuf>,
log_file: Option<PathBuf>,
) -> Self {
let scheduler = Scheduler::new(engine.clone());
Self {
engine,
scheduler,
state_file,
schedules_file,
log_file,
}
}
pub async fn start(&mut self) -> Result<(), DaemonError> {
if self.is_running().await? {
return Err(DaemonError::AlreadyRunning);
}
if let Some(schedules_file) = self.schedules_file.clone() {
self.load_schedules(&schedules_file).await?;
}
let state = DaemonState {
pid: Some(std::process::id()),
started_at: Some(Utc::now()),
schedules_file: self
.schedules_file
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
log_file: self
.log_file
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
};
self.write_state(&state).await?;
println!("Jorm daemon started (PID: {})", std::process::id());
if let Some(log_file) = &self.log_file {
println!("Logging to: {}", log_file.display());
}
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
let _ = signal::ctrl_c().await;
println!("Received shutdown signal, stopping daemon...");
let _ = shutdown_tx_clone.send(()).await;
});
let mut scheduler_clone = Scheduler::new(self.engine.clone());
for schedule in self.scheduler.list_schedules() {
scheduler_clone.add_schedule(schedule.clone())?;
}
let scheduler_handle = tokio::spawn(async move {
if let Err(e) = scheduler_clone.start().await {
eprintln!("Scheduler error: {}", e);
}
});
let _ = shutdown_rx.recv().await;
scheduler_handle.abort();
self.stop().await?;
Ok(())
}
pub async fn stop(&mut self) -> Result<(), DaemonError> {
if !self.is_running().await? {
return Err(DaemonError::NotRunning);
}
self.scheduler.stop();
if self.state_file.exists() {
fs::remove_file(&self.state_file).await?;
}
println!("Jorm daemon stopped");
Ok(())
}
pub async fn status(&self) -> Result<DaemonState, DaemonError> {
self.read_state().await
}
pub async fn is_running(&self) -> Result<bool, DaemonError> {
match self.read_state().await {
Ok(state) => {
if let Some(pid) = state.pid {
Ok(self.is_process_running(pid))
} else {
Ok(false)
}
}
Err(_) => Ok(false), }
}
pub async fn add_schedule(&mut self, schedule: Schedule) -> Result<(), DaemonError> {
self.scheduler.add_schedule(schedule)?;
if let Some(schedules_file) = &self.schedules_file {
self.save_schedules(schedules_file).await?;
}
Ok(())
}
pub async fn remove_schedule(&mut self, id: &str) -> Result<Schedule, DaemonError> {
let schedule = self.scheduler.remove_schedule(id)?;
if let Some(schedules_file) = &self.schedules_file {
self.save_schedules(schedules_file).await?;
}
Ok(schedule)
}
pub fn list_schedules(&self) -> Vec<&Schedule> {
self.scheduler.list_schedules()
}
async fn read_state(&self) -> Result<DaemonState, DaemonError> {
if !self.state_file.exists() {
return Ok(DaemonState::default());
}
let content = fs::read_to_string(&self.state_file)
.await
.map_err(|e| DaemonError::StateReadError(e.to_string()))?;
serde_json::from_str(&content).map_err(|e| DaemonError::StateReadError(e.to_string()))
}
async fn write_state(&self, state: &DaemonState) -> Result<(), DaemonError> {
let content = serde_json::to_string_pretty(state)
.map_err(|e| DaemonError::StateWriteError(e.to_string()))?;
if let Some(parent) = self.state_file.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&self.state_file, content)
.await
.map_err(|e| DaemonError::StateWriteError(e.to_string()))?;
Ok(())
}
async fn load_schedules(&mut self, schedules_file: &PathBuf) -> Result<(), DaemonError> {
if !schedules_file.exists() {
return Ok(());
}
let content = fs::read_to_string(schedules_file)
.await
.map_err(|e| DaemonError::StateReadError(e.to_string()))?;
let schedules: Vec<Schedule> = serde_json::from_str(&content)
.map_err(|e| DaemonError::StateReadError(e.to_string()))?;
for schedule in schedules {
self.scheduler.add_schedule(schedule)?;
}
println!(
"Loaded {} schedules from {}",
self.scheduler.list_schedules().len(),
schedules_file.display()
);
Ok(())
}
async fn save_schedules(&self, schedules_file: &PathBuf) -> Result<(), DaemonError> {
let schedules: Vec<&Schedule> = self.scheduler.list_schedules();
let content = serde_json::to_string_pretty(&schedules)
.map_err(|e| DaemonError::StateWriteError(e.to_string()))?;
if let Some(parent) = schedules_file.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(schedules_file, content)
.await
.map_err(|e| DaemonError::StateWriteError(e.to_string()))?;
Ok(())
}
fn is_process_running(&self, pid: u32) -> bool {
#[cfg(windows)]
{
use std::process::Command;
let output = Command::new("tasklist")
.args(["/FI", &format!("PID eq {pid}")])
.output();
match output {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout);
stdout.contains(&pid.to_string())
}
Err(_) => false,
}
}
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill")
.args(&["-0", &pid.to_string()])
.output()
.map(|output| output.status.success())
.unwrap_or(false)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_daemon_state_operations() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let state_file = temp_dir.path().join("daemon.state");
let engine = Arc::new(JormEngine::new().await?);
let daemon = Daemon::new(engine, state_file.clone(), None, None);
let state = daemon.read_state().await?;
assert!(state.pid.is_none());
let test_state = DaemonState {
pid: Some(12345),
started_at: Some(Utc::now()),
schedules_file: None,
log_file: None,
};
daemon.write_state(&test_state).await?;
let read_state = daemon.read_state().await?;
assert_eq!(read_state.pid, Some(12345));
assert!(read_state.started_at.is_some());
Ok(())
}
#[tokio::test]
async fn test_daemon_not_running_initially() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let state_file = temp_dir.path().join("daemon.state");
let engine = Arc::new(JormEngine::new().await?);
let daemon = Daemon::new(engine, state_file, None, None);
assert!(!daemon.is_running().await?);
Ok(())
}
}