use std::{fs, sync::Arc};
use anyhow::Result;
use chrono::Utc;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{UnixListener, UnixStream},
sync::Mutex,
time::{sleep, Duration},
};
use crate::{
config,
daemon::ipc::{Request, Response},
process::{runner, store::Store, Process, ProcessStatus},
};
pub fn run() -> ! {
config::init();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to build Tokio runtime");
rt.block_on(async {
if let Err(e) = async_main().await {
eprintln!("[proses-daemon] fatal: {:#}", e);
std::process::exit(1);
}
});
std::process::exit(0);
}
async fn async_main() -> Result<()> {
let cfg = config::get();
let pid = std::process::id();
fs::write(&cfg.pid_path, pid.to_string())?;
if cfg.sock_path.exists() {
let _ = fs::remove_file(&cfg.sock_path);
}
let store: Arc<Mutex<Store>> = Arc::new(Mutex::new(Store::load(&cfg.store_path)));
let monitor_store = store.clone();
tokio::spawn(async move {
monitor_loop(monitor_store).await;
});
let listener = UnixListener::bind(&cfg.sock_path)?;
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let store = store.clone();
tokio::spawn(async move {
handle_connection(stream, store).await;
});
}
Err(e) => {
eprintln!("[proses-daemon] accept error: {}", e);
}
}
}
}
async fn monitor_loop(store: Arc<Mutex<Store>>) {
loop {
sleep(Duration::from_secs(2)).await;
let cfg = config::get();
let running_ids: Vec<u32> = {
let s = store.lock().await;
s.processes
.values()
.filter(|p| p.status == ProcessStatus::Running)
.map(|p| p.id)
.collect()
};
for id in running_ids {
let (pid, restarts, max_restarts) = {
let s = store.lock().await;
match s.processes.get(&id) {
Some(p) if p.status == ProcessStatus::Running => {
(p.pid, p.restarts, p.max_restarts)
}
_ => continue,
}
};
if runner::is_running(pid) {
continue;
}
let mut s = store.lock().await;
let proc = match s.processes.get_mut(&id) {
Some(p) if p.status == ProcessStatus::Running => p,
_ => continue,
};
if max_restarts == 0 || restarts < max_restarts {
let snapshot = proc.clone();
match runner::spawn(&snapshot) {
Ok(new_pid) => {
proc.pid = new_pid;
proc.restarts += 1;
proc.started_at = Utc::now();
proc.status = ProcessStatus::Running;
}
Err(e) => {
eprintln!("[proses-daemon] failed to restart '{}': {}", proc.name, e);
proc.status = ProcessStatus::Errored;
}
}
} else {
eprintln!(
"[proses-daemon] '{}' exceeded max_restarts ({}), marking errored",
proc.name, max_restarts
);
proc.status = ProcessStatus::Errored;
}
let _ = s.save(&cfg.store_path);
}
}
}
async fn handle_connection(stream: UnixStream, store: Arc<Mutex<Store>>) {
let (reader_half, mut writer_half) = stream.into_split();
let mut reader = BufReader::new(reader_half);
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(0) => return, Err(e) => {
eprintln!("[proses-daemon] read error: {}", e);
return;
}
Ok(_) => {}
}
let response = match serde_json::from_str::<Request>(line.trim()) {
Ok(req) => handle_request(req, store).await,
Err(e) => Response::err(format!("Malformed request: {}", e)),
};
let mut json = match serde_json::to_string(&response) {
Ok(s) => s,
Err(_) => r#"{"success":false,"message":"response serialisation error"}"#.to_string(),
};
json.push('\n');
let _ = writer_half.write_all(json.as_bytes()).await;
}
async fn handle_request(req: Request, store: Arc<Mutex<Store>>) -> Response {
match req {
Request::Start {
name,
command,
cwd,
env,
max_restarts,
} => {
let cfg = config::get();
let mut s = store.lock().await;
if s.find(&name).is_some() {
return Response::err(format!(
"A process named '{}' already exists. \
Use `restart` or `delete` it first.",
name
));
}
let id = s.alloc_id();
let log_out = cfg.log_dir.join(format!("{}-{}.out", name, id));
let log_err = cfg.log_dir.join(format!("{}-{}.err", name, id));
let mut proc = Process {
id,
name: name.clone(),
pid: 0,
command,
cwd: std::path::PathBuf::from(cwd),
env,
status: ProcessStatus::Stopped,
restarts: 0,
max_restarts,
started_at: Utc::now(),
log_out,
log_err,
};
match runner::spawn(&proc) {
Ok(pid) => {
proc.pid = pid;
proc.status = ProcessStatus::Running;
proc.started_at = Utc::now();
let resp = Response::ok(format!("Process '{}' started (pid {})", name, pid))
.with_process(proc.clone());
s.processes.insert(id, proc);
let _ = s.save(&cfg.store_path);
resp
}
Err(e) => {
proc.status = ProcessStatus::Errored;
s.processes.insert(id, proc);
let _ = s.save(&cfg.store_path);
Response::err(format!("Failed to start '{}': {}", name, e))
}
}
}
Request::Stop { name_or_id } => {
let cfg = config::get();
let mut s = store.lock().await;
let id = match s.find_id(&name_or_id) {
Some(id) => id,
None => return Response::err(format!("Process '{}' not found", name_or_id)),
};
let pid = s.processes[&id].pid;
let _ = runner::stop(pid);
if let Some(proc) = s.processes.get_mut(&id) {
proc.status = ProcessStatus::Stopped;
proc.pid = 0;
}
let _ = s.save(&cfg.store_path);
Response::ok(format!("Process '{}' stopped", name_or_id))
}
Request::Restart { name_or_id } => {
let cfg = config::get();
let (old_pid, snapshot) = {
let s = store.lock().await;
match s.find(&name_or_id) {
None => return Response::err(format!("Process '{}' not found", name_or_id)),
Some(p) => (p.pid, p.clone()),
}
};
let _ = runner::stop(old_pid);
sleep(Duration::from_millis(300)).await;
let new_pid = match runner::spawn(&snapshot) {
Ok(pid) => pid,
Err(e) => {
return Response::err(format!("Failed to restart '{}': {}", name_or_id, e))
}
};
let mut s = store.lock().await;
if let Some(id) = s.find_id(&name_or_id) {
if let Some(proc) = s.processes.get_mut(&id) {
proc.pid = new_pid;
proc.started_at = Utc::now();
proc.status = ProcessStatus::Running;
}
let _ = s.save(&cfg.store_path);
Response::ok(format!(
"Process '{}' restarted (pid {})",
name_or_id, new_pid
))
} else {
Response::err(format!("Process '{}' vanished during restart", name_or_id))
}
}
Request::Delete { name_or_id } => {
let cfg = config::get();
let mut s = store.lock().await;
let id = match s.find_id(&name_or_id) {
Some(id) => id,
None => return Response::err(format!("Process '{}' not found", name_or_id)),
};
let pid_to_stop = s
.processes
.get(&id)
.filter(|p| p.status == ProcessStatus::Running)
.map(|p| p.pid);
if let Some(pid) = pid_to_stop {
let _ = runner::stop(pid);
}
s.processes.remove(&id);
let _ = s.save(&cfg.store_path);
Response::ok(format!("Process '{}' deleted", name_or_id))
}
Request::List => {
let s = store.lock().await;
let procs: Vec<Process> = s.processes.values().cloned().collect();
let count = procs.len();
Response::ok(format!("{} process(es)", count)).with_processes(procs)
}
Request::Logs { name_or_id, lines } => {
let log_path = {
let s = store.lock().await;
match s.find(&name_or_id) {
None => return Response::err(format!("Process '{}' not found", name_or_id)),
Some(p) => p.log_out.clone(),
}
};
match fs::read_to_string(&log_path) {
Err(e) => Response::err(format!("Cannot read log {:?}: {}", log_path, e)),
Ok(content) => {
let all_lines: Vec<&str> = content.lines().collect();
let start = all_lines.len().saturating_sub(lines);
let tail = all_lines[start..].join("\n");
Response::ok(format!("Last {} line(s) of stdout log", lines)).with_logs(tail)
}
}
}
Request::Show { name_or_id } => {
let s = store.lock().await;
match s.find(&name_or_id) {
None => Response::err(format!("Process '{}' not found", name_or_id)),
Some(p) => {
Response::ok(format!("Process '{}'", name_or_id)).with_process(p.clone())
}
}
}
Request::Save => {
let cfg = config::get();
let s = store.lock().await;
match s.save(&cfg.store_path) {
Ok(_) => Response::ok("Store saved to disk"),
Err(e) => Response::err(format!("Save failed: {}", e)),
}
}
Request::Resurrect => {
let cfg = config::get();
let mut s = store.lock().await;
let dead_running_ids: Vec<u32> = s
.processes
.values()
.filter(|p| p.status == ProcessStatus::Running && !runner::is_running(p.pid))
.map(|p| p.id)
.collect();
let mut spawned = 0u32;
let mut failed = 0u32;
for id in dead_running_ids {
if let Some(proc) = s.processes.get_mut(&id) {
let snapshot = proc.clone();
match runner::spawn(&snapshot) {
Ok(new_pid) => {
proc.pid = new_pid;
proc.started_at = Utc::now();
proc.status = ProcessStatus::Running;
spawned += 1;
}
Err(e) => {
eprintln!(
"[proses-daemon] resurrect failed for '{}': {}",
proc.name, e
);
proc.status = ProcessStatus::Errored;
failed += 1;
}
}
}
}
let _ = s.save(&cfg.store_path);
if failed == 0 {
Response::ok(format!("Resurrected {} process(es)", spawned))
} else {
Response::err(format!(
"Resurrected {}, failed to spawn {}",
spawned, failed
))
}
}
Request::Health => Response::ok("daemon is healthy"),
Request::Shutdown => {
let cfg = config::get();
{
let mut s = store.lock().await;
let ids: Vec<u32> = s.processes.keys().cloned().collect();
for id in ids {
if let Some(proc) = s.processes.get_mut(&id) {
if proc.status == ProcessStatus::Running {
let _ = runner::stop(proc.pid);
proc.status = ProcessStatus::Stopped;
proc.pid = 0;
}
}
}
let _ = s.save(&cfg.store_path);
}
tokio::spawn(async {
sleep(Duration::from_millis(150)).await;
let cfg = config::get();
let _ = fs::remove_file(&cfg.pid_path);
let _ = fs::remove_file(&cfg.sock_path);
std::process::exit(0);
});
Response::ok("Daemon is shutting down")
}
}
}