Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::ReadyHttp;
16use crate::pitchfork_toml::Retry;
17use crate::pitchfork_toml::StopConfig;
18use crate::pitchfork_toml::WatchMode;
19use crate::procs::PROCS;
20use indexmap::IndexMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23
24/// Options for upserting a daemon's state.
25///
26/// Use `UpsertDaemonOpts::builder(id)` to create, then set fields directly and call `.build()`.
27#[derive(Debug, Default)]
28pub(crate) struct UpsertDaemonOpts {
29    pub id: DaemonId,
30    pub pid: Option<u32>,
31    pub status: DaemonStatus,
32    pub shell_pid: Option<u32>,
33    pub dir: Option<PathBuf>,
34    pub cmd: Option<Vec<String>>,
35    pub autostop: bool,
36    pub cron_schedule: Option<String>,
37    pub cron_retrigger: Option<CronRetrigger>,
38    pub last_exit_success: Option<bool>,
39    pub retry: Option<Retry>,
40    pub retry_count: Option<u32>,
41    pub ready_delay: Option<u64>,
42    pub ready_output: Option<String>,
43    pub ready_http: Option<ReadyHttp>,
44    pub ready_port: Option<u16>,
45    pub ready_cmd: Option<String>,
46    /// Port configuration
47    pub port: Option<PortConfig>,
48    /// Resolved ports actually used after auto-bump (may differ from expected)
49    pub resolved_port: Vec<u16>,
50    /// The first port the process is actually listening on (detected at runtime).
51    pub active_port: Option<u16>,
52    /// Optional stable slug alias for this daemon.
53    pub slug: Option<String>,
54    /// Whether to proxy this daemon (None = use global proxy.enable setting).
55    pub proxy: Option<bool>,
56    pub depends: Option<Vec<DaemonId>>,
57    pub env: Option<IndexMap<String, String>>,
58    pub watch: Option<Vec<String>>,
59    pub watch_mode: Option<WatchMode>,
60    pub watch_base_dir: Option<PathBuf>,
61    pub mise: Option<bool>,
62    /// Unix user to run this daemon as
63    pub user: Option<String>,
64    /// Memory limit for the daemon process
65    pub memory_limit: Option<MemoryLimit>,
66    /// CPU usage limit as a percentage
67    pub cpu_limit: Option<CpuLimit>,
68    /// Unix signal to send for graceful shutdown
69    pub stop_signal: Option<StopConfig>,
70    /// Allocate a pseudo-terminal for the daemon process
71    pub pty: Option<bool>,
72}
73
74/// Builder for UpsertDaemonOpts - ensures daemon ID is always provided.
75///
76/// # Example
77/// ```ignore
78/// let opts = UpsertDaemonOpts::builder(daemon_id)
79///     .set(|o| {
80///         o.pid = Some(pid);
81///         o.status = DaemonStatus::Running;
82///     })
83///     .build();
84/// ```
85#[derive(Debug)]
86pub(crate) struct UpsertDaemonOptsBuilder {
87    pub opts: UpsertDaemonOpts,
88}
89
90impl UpsertDaemonOpts {
91    /// Create a builder with the required daemon ID.
92    pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
93        UpsertDaemonOptsBuilder {
94            opts: UpsertDaemonOpts {
95                id,
96                ..Default::default()
97            },
98        }
99    }
100}
101
102impl UpsertDaemonOptsBuilder {
103    /// Modify opts fields with a closure.
104    pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
105        f(&mut self.opts);
106        self
107    }
108
109    /// Build the UpsertDaemonOpts.
110    pub fn build(self) -> UpsertDaemonOpts {
111        self.opts
112    }
113}
114
115impl Supervisor {
116    /// Upsert a daemon's state, merging with existing values
117    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
118        info!(
119            "upserting daemon: {} pid: {} status: {}",
120            opts.id,
121            opts.pid.unwrap_or(0),
122            opts.status
123        );
124        let mut state_file = self.state_file.lock().await;
125        let existing = state_file.daemons.get(&opts.id);
126        let daemon = Daemon {
127            id: opts.id.clone(),
128            title: opts.pid.and_then(|pid| PROCS.title(pid)),
129            pid: opts.pid,
130            status: opts.status,
131            shell_pid: opts.shell_pid,
132            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
133            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
134            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
135            cron_schedule: opts
136                .cron_schedule
137                .or(existing.and_then(|d| d.cron_schedule.clone())),
138            cron_retrigger: opts
139                .cron_retrigger
140                .or(existing.and_then(|d| d.cron_retrigger)),
141            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
142            last_exit_success: opts
143                .last_exit_success
144                .or(existing.and_then(|d| d.last_exit_success)),
145            retry: opts
146                .retry
147                .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
148            retry_count: opts
149                .retry_count
150                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
151            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
152            ready_output: opts
153                .ready_output
154                .or(existing.and_then(|d| d.ready_output.clone())),
155            ready_http: opts
156                .ready_http
157                .or(existing.and_then(|d| d.ready_http.clone())),
158            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
159            ready_cmd: opts
160                .ready_cmd
161                .or(existing.and_then(|d| d.ready_cmd.clone())),
162            port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
163            resolved_port: if opts.resolved_port.is_empty() {
164                existing
165                    .map(|d| d.resolved_port.clone())
166                    .unwrap_or_default()
167            } else {
168                opts.resolved_port
169            },
170            depends: opts
171                .depends
172                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
173            env: opts.env.or(existing.and_then(|d| d.env.clone())),
174            watch: opts
175                .watch
176                .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
177            watch_mode: opts
178                .watch_mode
179                .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
180            watch_base_dir: opts
181                .watch_base_dir
182                .or(existing.and_then(|d| d.watch_base_dir.clone())),
183            mise: opts.mise.or(existing.and_then(|d| d.mise)),
184            user: opts.user.or(existing.and_then(|d| d.user.clone())),
185            proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
186            // active_port is intentionally NOT inherited from the existing daemon.
187            // When a daemon restarts, the new process has not yet bound a port, so
188            // carrying over the old process's active_port would cause the proxy to
189            // route to a port that is no longer listening.  The port will be
190            // re-detected by detect_and_store_active_port once the new process is ready.
191            active_port: opts.active_port,
192            slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
193            memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
194            cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
195            stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
196            pty: opts.pty.or(existing.and_then(|d| d.pty)),
197        };
198        state_file.daemons.insert(opts.id.clone(), daemon.clone());
199        if let Err(err) = state_file.write() {
200            warn!("failed to update state file: {err:#}");
201        }
202        Ok(daemon)
203    }
204
205    /// Enable a daemon (remove from disabled set)
206    pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
207        info!("enabling daemon: {id}");
208        let config = PitchforkToml::all_merged()?;
209        let mut state_file = self.state_file.lock().await;
210        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
211        if !exists {
212            return Err(miette::miette!("daemon '{}' not found", id));
213        }
214        let result = state_file.disabled.remove(id);
215        state_file.write()?;
216        Ok(result)
217    }
218
219    /// Disable a daemon (add to disabled set)
220    pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
221        info!("disabling daemon: {id}");
222        let config = PitchforkToml::all_merged()?;
223        let mut state_file = self.state_file.lock().await;
224        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
225        if !exists {
226            return Err(miette::miette!("daemon '{}' not found", id));
227        }
228        let result = state_file.disabled.insert(id.clone());
229        state_file.write()?;
230        Ok(result)
231    }
232
233    /// Get a daemon by ID
234    pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
235        self.state_file.lock().await.daemons.get(id).cloned()
236    }
237
238    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
239    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
240        let pitchfork_id = DaemonId::pitchfork();
241        self.state_file
242            .lock()
243            .await
244            .daemons
245            .values()
246            .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
247            .cloned()
248            .collect()
249    }
250
251    /// Remove a daemon from state
252    pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
253        let mut state_file = self.state_file.lock().await;
254        state_file.daemons.remove(id);
255        if let Err(err) = state_file.write() {
256            warn!("failed to update state file: {err:#}");
257        }
258        Ok(())
259    }
260
261    /// Set the shell's working directory
262    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
263        let mut state_file = self.state_file.lock().await;
264        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
265        state_file.write()?;
266        Ok(())
267    }
268
269    /// Get the shell's working directory
270    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
271        self.state_file
272            .lock()
273            .await
274            .shell_dirs
275            .get(&shell_pid.to_string())
276            .cloned()
277    }
278
279    /// Remove a shell PID from tracking
280    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
281        let mut state_file = self.state_file.lock().await;
282        if state_file
283            .shell_dirs
284            .remove(&shell_pid.to_string())
285            .is_some()
286        {
287            state_file.write()?;
288        }
289        Ok(())
290    }
291
292    /// Get all directories with their associated shell PIDs
293    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
294        self.state_file.lock().await.shell_dirs.iter().fold(
295            HashMap::new(),
296            |mut acc, (pid, dir)| {
297                if let Ok(pid) = pid.parse() {
298                    acc.entry(dir.clone()).or_default().push(pid);
299                }
300                acc
301            },
302        )
303    }
304
305    /// Get pending notifications and clear the queue
306    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
307        self.pending_notifications.lock().await.drain(..).collect()
308    }
309
310    /// Clean up daemons that have no PID
311    pub(crate) async fn clean(&self) -> Result<()> {
312        let mut state_file = self.state_file.lock().await;
313        state_file.daemons.retain(|_id, d| d.pid.is_some());
314        state_file.write()?;
315        Ok(())
316    }
317}