Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::ReadyHttp;
16use crate::pitchfork_toml::Retry;
17use crate::pitchfork_toml::StopConfig;
18use crate::pitchfork_toml::WatchMode;
19use crate::procs::PROCS;
20use indexmap::IndexMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23
24/// Options for upserting a daemon's state.
25///
26/// Use `UpsertDaemonOpts::builder(id)` to create, then set fields directly and call `.build()`.
27#[derive(Debug, Default)]
28pub(crate) struct UpsertDaemonOpts {
29    pub id: DaemonId,
30    pub pid: Option<u32>,
31    pub status: DaemonStatus,
32    pub shell_pid: Option<u32>,
33    pub dir: Option<PathBuf>,
34    pub cmd: Option<Vec<String>>,
35    pub run: Option<String>,
36    pub autostop: bool,
37    pub cron_schedule: Option<String>,
38    pub cron_retrigger: Option<CronRetrigger>,
39    pub cron_immediate: Option<bool>,
40    pub last_exit_success: Option<bool>,
41    pub retry: Option<Retry>,
42    pub retry_count: Option<u32>,
43    pub ready_delay: Option<u64>,
44    pub ready_output: Option<String>,
45    pub ready_http: Option<ReadyHttp>,
46    pub ready_port: Option<u16>,
47    pub ready_cmd: Option<String>,
48    /// Port configuration
49    pub port: Option<PortConfig>,
50    /// Resolved ports actually used after auto-bump (may differ from expected)
51    pub resolved_port: Vec<u16>,
52    /// The first port the process is actually listening on (detected at runtime).
53    pub active_port: Option<u16>,
54    /// Optional stable slug alias for this daemon.
55    pub slug: Option<String>,
56    /// Whether to proxy this daemon (None = use global proxy.enable setting).
57    pub proxy: Option<bool>,
58    pub depends: Option<Vec<DaemonId>>,
59    pub env: Option<IndexMap<String, String>>,
60    pub watch: Option<Vec<String>>,
61    pub watch_mode: Option<WatchMode>,
62    pub watch_base_dir: Option<PathBuf>,
63    pub mise: Option<bool>,
64    /// Unix user to run this daemon as
65    pub user: Option<String>,
66    /// Memory limit for the daemon process
67    pub memory_limit: Option<MemoryLimit>,
68    /// CPU usage limit as a percentage
69    pub cpu_limit: Option<CpuLimit>,
70    /// Unix signal to send for graceful shutdown
71    pub stop_signal: Option<StopConfig>,
72    /// Allocate a pseudo-terminal for the daemon process
73    pub pty: Option<bool>,
74}
75
76/// Builder for UpsertDaemonOpts - ensures daemon ID is always provided.
77///
78/// # Example
79/// ```ignore
80/// let opts = UpsertDaemonOpts::builder(daemon_id)
81///     .set(|o| {
82///         o.pid = Some(pid);
83///         o.status = DaemonStatus::Running;
84///     })
85///     .build();
86/// ```
87#[derive(Debug)]
88pub(crate) struct UpsertDaemonOptsBuilder {
89    pub opts: UpsertDaemonOpts,
90}
91
92impl UpsertDaemonOpts {
93    /// Create a builder with the required daemon ID.
94    pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
95        UpsertDaemonOptsBuilder {
96            opts: UpsertDaemonOpts {
97                id,
98                ..Default::default()
99            },
100        }
101    }
102}
103
104impl UpsertDaemonOptsBuilder {
105    /// Modify opts fields with a closure.
106    pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
107        f(&mut self.opts);
108        self
109    }
110
111    /// Build the UpsertDaemonOpts.
112    pub fn build(self) -> UpsertDaemonOpts {
113        self.opts
114    }
115}
116
117impl Supervisor {
118    /// Upsert a daemon's state, merging with existing values
119    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
120        info!(
121            "upserting daemon: {} pid: {} status: {}",
122            opts.id,
123            opts.pid.unwrap_or(0),
124            opts.status
125        );
126        let mut state_file = self.state_file.lock().await;
127        let existing = state_file.daemons.get(&opts.id);
128        let daemon = Daemon {
129            id: opts.id.clone(),
130            title: opts.pid.and_then(|pid| PROCS.title(pid)),
131            pid: opts.pid,
132            status: opts.status,
133            shell_pid: opts.shell_pid,
134            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
135            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
136            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
137            run: opts.run.or(existing.and_then(|d| d.run.clone())),
138            cron_schedule: opts
139                .cron_schedule
140                .or(existing.and_then(|d| d.cron_schedule.clone())),
141            cron_retrigger: opts
142                .cron_retrigger
143                .or(existing.and_then(|d| d.cron_retrigger)),
144            cron_immediate: opts
145                .cron_immediate
146                .or(existing.and_then(|d| d.cron_immediate)),
147            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
148            last_exit_success: opts
149                .last_exit_success
150                .or(existing.and_then(|d| d.last_exit_success)),
151            retry: opts
152                .retry
153                .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
154            retry_count: opts
155                .retry_count
156                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
157            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
158            ready_output: opts
159                .ready_output
160                .or(existing.and_then(|d| d.ready_output.clone())),
161            ready_http: opts
162                .ready_http
163                .or(existing.and_then(|d| d.ready_http.clone())),
164            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
165            ready_cmd: opts
166                .ready_cmd
167                .or(existing.and_then(|d| d.ready_cmd.clone())),
168            port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
169            resolved_port: if opts.resolved_port.is_empty() {
170                existing
171                    .map(|d| d.resolved_port.clone())
172                    .unwrap_or_default()
173            } else {
174                opts.resolved_port
175            },
176            depends: opts
177                .depends
178                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
179            env: opts.env.or(existing.and_then(|d| d.env.clone())),
180            watch: opts
181                .watch
182                .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
183            watch_mode: opts
184                .watch_mode
185                .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
186            watch_base_dir: opts
187                .watch_base_dir
188                .or(existing.and_then(|d| d.watch_base_dir.clone())),
189            mise: opts.mise.or(existing.and_then(|d| d.mise)),
190            user: opts.user.or(existing.and_then(|d| d.user.clone())),
191            proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
192            // active_port is intentionally NOT inherited from the existing daemon.
193            // When a daemon restarts, the new process has not yet bound a port, so
194            // carrying over the old process's active_port would cause the proxy to
195            // route to a port that is no longer listening.  The port will be
196            // re-detected by detect_and_store_active_port once the new process is ready.
197            active_port: opts.active_port,
198            slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
199            memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
200            cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
201            stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
202            pty: opts.pty.or(existing.and_then(|d| d.pty)),
203        };
204        state_file.insert_daemon(&opts.id, daemon.clone());
205        Ok(daemon)
206    }
207
208    /// Enable a daemon (remove from disabled set)
209    pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
210        info!("enabling daemon: {id}");
211        let config = PitchforkToml::all_merged_all_namespaces()?;
212        let mut state_file = self.state_file.lock().await;
213        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
214        if !exists {
215            return Err(miette::miette!("daemon '{}' not found", id));
216        }
217        let result = state_file.enable_daemon(id);
218        Ok(result)
219    }
220
221    /// Disable a daemon (add to disabled set)
222    pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
223        info!("disabling daemon: {id}");
224        let config = PitchforkToml::all_merged_all_namespaces()?;
225        let mut state_file = self.state_file.lock().await;
226        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
227        if !exists {
228            return Err(miette::miette!("daemon '{}' not found", id));
229        }
230        let result = state_file.disable_daemon(id);
231        Ok(result)
232    }
233
234    /// Get a daemon by ID
235    pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
236        self.state_file.lock().await.daemons.get(id).cloned()
237    }
238
239    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
240    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
241        let pitchfork_id = DaemonId::pitchfork();
242        self.state_file
243            .lock()
244            .await
245            .daemons
246            .values()
247            .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
248            .cloned()
249            .collect()
250    }
251
252    /// Remove a daemon from state
253    pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
254        let mut state_file = self.state_file.lock().await;
255        state_file.remove_daemon(id);
256        Ok(())
257    }
258
259    /// Set the shell's working directory
260    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
261        let mut state_file = self.state_file.lock().await;
262        state_file.set_shell_dir(shell_pid, dir);
263        Ok(())
264    }
265
266    /// Get the shell's working directory
267    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
268        self.state_file
269            .lock()
270            .await
271            .shell_dirs
272            .get(&shell_pid.to_string())
273            .cloned()
274    }
275
276    /// Remove a shell PID from tracking
277    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
278        let mut state_file = self.state_file.lock().await;
279        state_file.remove_shell_dir(shell_pid);
280        Ok(())
281    }
282
283    /// Get all directories with their associated shell PIDs
284    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
285        self.state_file.lock().await.shell_dirs.iter().fold(
286            HashMap::new(),
287            |mut acc, (pid, dir)| {
288                if let Ok(pid) = pid.parse() {
289                    acc.entry(dir.clone()).or_default().push(pid);
290                }
291                acc
292            },
293        )
294    }
295
296    /// Get pending notifications and clear the queue
297    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
298        self.pending_notifications.lock().await.drain(..).collect()
299    }
300
301    /// Clean up daemons that have no PID
302    pub(crate) async fn clean(&self) -> Result<()> {
303        let mut state_file = self.state_file.lock().await;
304        state_file.retain_daemons(|_id, d| d.pid.is_some());
305        Ok(())
306    }
307}