use super::Supervisor;
use crate::Result;
use crate::daemon::Daemon;
use crate::daemon_id::DaemonId;
use crate::daemon_status::DaemonStatus;
use crate::pitchfork_toml::CpuLimit;
use crate::pitchfork_toml::CronRetrigger;
use crate::pitchfork_toml::MemoryLimit;
use crate::pitchfork_toml::PitchforkToml;
use crate::procs::PROCS;
use crate::settings::settings;
use indexmap::IndexMap;
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Debug)]
pub(crate) struct UpsertDaemonOpts {
pub id: DaemonId,
pub pid: Option<u32>,
pub status: DaemonStatus,
pub shell_pid: Option<u32>,
pub dir: Option<PathBuf>,
pub cmd: Option<Vec<String>>,
pub autostop: bool,
pub cron_schedule: Option<String>,
pub cron_retrigger: Option<CronRetrigger>,
pub last_exit_success: Option<bool>,
pub retry: Option<u32>,
pub retry_count: Option<u32>,
pub ready_delay: Option<u64>,
pub ready_output: Option<String>,
pub ready_http: Option<String>,
pub ready_port: Option<u16>,
pub ready_cmd: Option<String>,
pub expected_port: Vec<u16>,
pub resolved_port: Vec<u16>,
pub active_port: Option<u16>,
pub slug: Option<String>,
pub proxy: Option<bool>,
pub auto_bump_port: Option<bool>,
pub port_bump_attempts: Option<u32>,
pub depends: Option<Vec<DaemonId>>,
pub env: Option<IndexMap<String, String>>,
pub watch: Option<Vec<String>>,
pub watch_base_dir: Option<PathBuf>,
pub mise: Option<bool>,
pub memory_limit: Option<MemoryLimit>,
pub cpu_limit: Option<CpuLimit>,
}
#[derive(Debug)]
pub(crate) struct UpsertDaemonOptsBuilder {
pub opts: UpsertDaemonOpts,
}
impl UpsertDaemonOpts {
pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
UpsertDaemonOptsBuilder {
opts: UpsertDaemonOpts {
id,
pid: None,
status: DaemonStatus::default(),
shell_pid: None,
dir: None,
cmd: None,
autostop: false,
cron_schedule: None,
cron_retrigger: None,
last_exit_success: None,
retry: None,
retry_count: None,
ready_delay: None,
ready_output: None,
ready_http: None,
ready_port: None,
ready_cmd: None,
expected_port: Vec::new(),
resolved_port: Vec::new(),
active_port: None,
slug: None,
proxy: None,
auto_bump_port: None,
port_bump_attempts: None,
depends: None,
env: None,
watch: None,
watch_base_dir: None,
mise: None,
memory_limit: None,
cpu_limit: None,
},
}
}
}
impl UpsertDaemonOptsBuilder {
pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
f(&mut self.opts);
self
}
pub fn build(self) -> UpsertDaemonOpts {
self.opts
}
}
impl Supervisor {
pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
info!(
"upserting daemon: {} pid: {} status: {}",
opts.id,
opts.pid.unwrap_or(0),
opts.status
);
let mut state_file = self.state_file.lock().await;
let existing = state_file.daemons.get(&opts.id);
let daemon = Daemon {
id: opts.id.clone(),
title: opts.pid.and_then(|pid| PROCS.title(pid)),
pid: opts.pid,
status: opts.status,
shell_pid: opts.shell_pid,
autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
cron_schedule: opts
.cron_schedule
.or(existing.and_then(|d| d.cron_schedule.clone())),
cron_retrigger: opts
.cron_retrigger
.or(existing.and_then(|d| d.cron_retrigger)),
last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
last_exit_success: opts
.last_exit_success
.or(existing.and_then(|d| d.last_exit_success)),
retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
retry_count: opts
.retry_count
.unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
ready_output: opts
.ready_output
.or(existing.and_then(|d| d.ready_output.clone())),
ready_http: opts
.ready_http
.or(existing.and_then(|d| d.ready_http.clone())),
ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
ready_cmd: opts
.ready_cmd
.or(existing.and_then(|d| d.ready_cmd.clone())),
expected_port: if opts.expected_port.is_empty() {
existing
.map(|d| d.expected_port.clone())
.unwrap_or_default()
} else {
opts.expected_port
},
resolved_port: if opts.resolved_port.is_empty() {
existing
.map(|d| d.resolved_port.clone())
.unwrap_or_default()
} else {
opts.resolved_port
},
auto_bump_port: opts
.auto_bump_port
.unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
port_bump_attempts: opts.port_bump_attempts.unwrap_or(
existing
.map(|d| d.port_bump_attempts)
.unwrap_or_else(|| settings().default_port_bump_attempts()),
),
depends: opts
.depends
.unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
env: opts.env.or(existing.and_then(|d| d.env.clone())),
watch: opts
.watch
.unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
watch_base_dir: opts
.watch_base_dir
.or(existing.and_then(|d| d.watch_base_dir.clone())),
mise: opts.mise.or(existing.and_then(|d| d.mise)),
proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
active_port: opts.active_port,
slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
};
state_file.daemons.insert(opts.id.clone(), daemon.clone());
if let Err(err) = state_file.write() {
warn!("failed to update state file: {err:#}");
}
Ok(daemon)
}
pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
info!("enabling daemon: {id}");
let config = PitchforkToml::all_merged()?;
let mut state_file = self.state_file.lock().await;
let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
if !exists {
return Err(miette::miette!("daemon '{}' not found", id));
}
let result = state_file.disabled.remove(id);
state_file.write()?;
Ok(result)
}
pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
info!("disabling daemon: {id}");
let config = PitchforkToml::all_merged()?;
let mut state_file = self.state_file.lock().await;
let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
if !exists {
return Err(miette::miette!("daemon '{}' not found", id));
}
let result = state_file.disabled.insert(id.clone());
state_file.write()?;
Ok(result)
}
pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
self.state_file.lock().await.daemons.get(id).cloned()
}
pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
let pitchfork_id = DaemonId::pitchfork();
self.state_file
.lock()
.await
.daemons
.values()
.filter(|d| d.pid.is_some() && d.id != pitchfork_id)
.cloned()
.collect()
}
pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.daemons.remove(id);
if let Err(err) = state_file.write() {
warn!("failed to update state file: {err:#}");
}
Ok(())
}
pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.shell_dirs.insert(shell_pid.to_string(), dir);
state_file.write()?;
Ok(())
}
pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
self.state_file
.lock()
.await
.shell_dirs
.get(&shell_pid.to_string())
.cloned()
}
pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
let mut state_file = self.state_file.lock().await;
if state_file
.shell_dirs
.remove(&shell_pid.to_string())
.is_some()
{
state_file.write()?;
}
Ok(())
}
pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
self.state_file.lock().await.shell_dirs.iter().fold(
HashMap::new(),
|mut acc, (pid, dir)| {
if let Ok(pid) = pid.parse() {
acc.entry(dir.clone()).or_default().push(pid);
}
acc
},
)
}
pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
self.pending_notifications.lock().await.drain(..).collect()
}
pub(crate) async fn clean(&self) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.daemons.retain(|_id, d| d.pid.is_some());
state_file.write()?;
Ok(())
}
}