use super::Supervisor;
use crate::Result;
use crate::daemon::Daemon;
use crate::daemon_id::DaemonId;
use crate::daemon_status::DaemonStatus;
use crate::pitchfork_toml::CpuLimit;
use crate::pitchfork_toml::CronRetrigger;
use crate::pitchfork_toml::MemoryLimit;
use crate::pitchfork_toml::PitchforkToml;
use crate::pitchfork_toml::PortConfig;
use crate::pitchfork_toml::ReadyHttp;
use crate::pitchfork_toml::Retry;
use crate::pitchfork_toml::StopConfig;
use crate::pitchfork_toml::WatchMode;
use crate::procs::PROCS;
use indexmap::IndexMap;
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Debug, Default)]
pub(crate) struct UpsertDaemonOpts {
pub id: DaemonId,
pub pid: Option<u32>,
pub status: DaemonStatus,
pub shell_pid: Option<u32>,
pub dir: Option<PathBuf>,
pub cmd: Option<Vec<String>>,
pub autostop: bool,
pub cron_schedule: Option<String>,
pub cron_retrigger: Option<CronRetrigger>,
pub cron_immediate: Option<bool>,
pub last_exit_success: Option<bool>,
pub retry: Option<Retry>,
pub retry_count: Option<u32>,
pub ready_delay: Option<u64>,
pub ready_output: Option<String>,
pub ready_http: Option<ReadyHttp>,
pub ready_port: Option<u16>,
pub ready_cmd: Option<String>,
pub port: Option<PortConfig>,
pub resolved_port: Vec<u16>,
pub active_port: Option<u16>,
pub slug: Option<String>,
pub proxy: Option<bool>,
pub depends: Option<Vec<DaemonId>>,
pub env: Option<IndexMap<String, String>>,
pub watch: Option<Vec<String>>,
pub watch_mode: Option<WatchMode>,
pub watch_base_dir: Option<PathBuf>,
pub mise: Option<bool>,
pub user: Option<String>,
pub memory_limit: Option<MemoryLimit>,
pub cpu_limit: Option<CpuLimit>,
pub stop_signal: Option<StopConfig>,
pub pty: Option<bool>,
}
#[derive(Debug)]
pub(crate) struct UpsertDaemonOptsBuilder {
pub opts: UpsertDaemonOpts,
}
impl UpsertDaemonOpts {
pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
UpsertDaemonOptsBuilder {
opts: UpsertDaemonOpts {
id,
..Default::default()
},
}
}
}
impl UpsertDaemonOptsBuilder {
pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
f(&mut self.opts);
self
}
pub fn build(self) -> UpsertDaemonOpts {
self.opts
}
}
impl Supervisor {
pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
info!(
"upserting daemon: {} pid: {} status: {}",
opts.id,
opts.pid.unwrap_or(0),
opts.status
);
let mut state_file = self.state_file.lock().await;
let existing = state_file.daemons.get(&opts.id);
let daemon = Daemon {
id: opts.id.clone(),
title: opts.pid.and_then(|pid| PROCS.title(pid)),
pid: opts.pid,
status: opts.status,
shell_pid: opts.shell_pid,
autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
cron_schedule: opts
.cron_schedule
.or(existing.and_then(|d| d.cron_schedule.clone())),
cron_retrigger: opts
.cron_retrigger
.or(existing.and_then(|d| d.cron_retrigger)),
cron_immediate: opts
.cron_immediate
.or(existing.and_then(|d| d.cron_immediate)),
last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
last_exit_success: opts
.last_exit_success
.or(existing.and_then(|d| d.last_exit_success)),
retry: opts
.retry
.unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
retry_count: opts
.retry_count
.unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
ready_output: opts
.ready_output
.or(existing.and_then(|d| d.ready_output.clone())),
ready_http: opts
.ready_http
.or(existing.and_then(|d| d.ready_http.clone())),
ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
ready_cmd: opts
.ready_cmd
.or(existing.and_then(|d| d.ready_cmd.clone())),
port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
resolved_port: if opts.resolved_port.is_empty() {
existing
.map(|d| d.resolved_port.clone())
.unwrap_or_default()
} else {
opts.resolved_port
},
depends: opts
.depends
.unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
env: opts.env.or(existing.and_then(|d| d.env.clone())),
watch: opts
.watch
.unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
watch_mode: opts
.watch_mode
.unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
watch_base_dir: opts
.watch_base_dir
.or(existing.and_then(|d| d.watch_base_dir.clone())),
mise: opts.mise.or(existing.and_then(|d| d.mise)),
user: opts.user.or(existing.and_then(|d| d.user.clone())),
proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
active_port: opts.active_port,
slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
pty: opts.pty.or(existing.and_then(|d| d.pty)),
};
state_file.insert_daemon(&opts.id, daemon.clone());
Ok(daemon)
}
pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
info!("enabling daemon: {id}");
let config = PitchforkToml::all_merged_all_namespaces()?;
let mut state_file = self.state_file.lock().await;
let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
if !exists {
return Err(miette::miette!("daemon '{}' not found", id));
}
let result = state_file.enable_daemon(id);
Ok(result)
}
pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
info!("disabling daemon: {id}");
let config = PitchforkToml::all_merged_all_namespaces()?;
let mut state_file = self.state_file.lock().await;
let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
if !exists {
return Err(miette::miette!("daemon '{}' not found", id));
}
let result = state_file.disable_daemon(id);
Ok(result)
}
pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
self.state_file.lock().await.daemons.get(id).cloned()
}
pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
let pitchfork_id = DaemonId::pitchfork();
self.state_file
.lock()
.await
.daemons
.values()
.filter(|d| d.pid.is_some() && d.id != pitchfork_id)
.cloned()
.collect()
}
pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.remove_daemon(id);
Ok(())
}
pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.set_shell_dir(shell_pid, dir);
Ok(())
}
pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
self.state_file
.lock()
.await
.shell_dirs
.get(&shell_pid.to_string())
.cloned()
}
pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.remove_shell_dir(shell_pid);
Ok(())
}
pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
self.state_file.lock().await.shell_dirs.iter().fold(
HashMap::new(),
|mut acc, (pid, dir)| {
if let Ok(pid) = pid.parse() {
acc.entry(dir.clone()).or_default().push(pid);
}
acc
},
)
}
pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
self.pending_notifications.lock().await.drain(..).collect()
}
pub(crate) async fn clean(&self) -> Result<()> {
let mut state_file = self.state_file.lock().await;
state_file.retain_daemons(|_id, d| d.pid.is_some());
Ok(())
}
}