Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::Retry;
16use crate::pitchfork_toml::StopConfig;
17use crate::pitchfork_toml::WatchMode;
18use crate::procs::PROCS;
19use indexmap::IndexMap;
20use std::collections::HashMap;
21use std::path::PathBuf;
22
23/// Options for upserting a daemon's state.
24///
25/// Use `UpsertDaemonOpts::builder(id)` to create, then set fields directly and call `.build()`.
26#[derive(Debug, Default)]
27pub(crate) struct UpsertDaemonOpts {
28    pub id: DaemonId,
29    pub pid: Option<u32>,
30    pub status: DaemonStatus,
31    pub shell_pid: Option<u32>,
32    pub dir: Option<PathBuf>,
33    pub cmd: Option<Vec<String>>,
34    pub autostop: bool,
35    pub cron_schedule: Option<String>,
36    pub cron_retrigger: Option<CronRetrigger>,
37    pub last_exit_success: Option<bool>,
38    pub retry: Option<Retry>,
39    pub retry_count: Option<u32>,
40    pub ready_delay: Option<u64>,
41    pub ready_output: Option<String>,
42    pub ready_http: Option<String>,
43    pub ready_port: Option<u16>,
44    pub ready_cmd: Option<String>,
45    /// Port configuration
46    pub port: Option<PortConfig>,
47    /// Resolved ports actually used after auto-bump (may differ from expected)
48    pub resolved_port: Vec<u16>,
49    /// The first port the process is actually listening on (detected at runtime).
50    pub active_port: Option<u16>,
51    /// Optional stable slug alias for this daemon.
52    pub slug: Option<String>,
53    /// Whether to proxy this daemon (None = use global proxy.enable setting).
54    pub proxy: Option<bool>,
55    pub depends: Option<Vec<DaemonId>>,
56    pub env: Option<IndexMap<String, String>>,
57    pub watch: Option<Vec<String>>,
58    pub watch_mode: Option<WatchMode>,
59    pub watch_base_dir: Option<PathBuf>,
60    pub mise: Option<bool>,
61    /// Unix user to run this daemon as
62    pub user: Option<String>,
63    /// Memory limit for the daemon process
64    pub memory_limit: Option<MemoryLimit>,
65    /// CPU usage limit as a percentage
66    pub cpu_limit: Option<CpuLimit>,
67    /// Unix signal to send for graceful shutdown
68    pub stop_signal: Option<StopConfig>,
69    /// Allocate a pseudo-terminal for the daemon process
70    pub pty: Option<bool>,
71}
72
73/// Builder for UpsertDaemonOpts - ensures daemon ID is always provided.
74///
75/// # Example
76/// ```ignore
77/// let opts = UpsertDaemonOpts::builder(daemon_id)
78///     .set(|o| {
79///         o.pid = Some(pid);
80///         o.status = DaemonStatus::Running;
81///     })
82///     .build();
83/// ```
84#[derive(Debug)]
85pub(crate) struct UpsertDaemonOptsBuilder {
86    pub opts: UpsertDaemonOpts,
87}
88
89impl UpsertDaemonOpts {
90    /// Create a builder with the required daemon ID.
91    pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
92        UpsertDaemonOptsBuilder {
93            opts: UpsertDaemonOpts {
94                id,
95                ..Default::default()
96            },
97        }
98    }
99}
100
101impl UpsertDaemonOptsBuilder {
102    /// Modify opts fields with a closure.
103    pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
104        f(&mut self.opts);
105        self
106    }
107
108    /// Build the UpsertDaemonOpts.
109    pub fn build(self) -> UpsertDaemonOpts {
110        self.opts
111    }
112}
113
114impl Supervisor {
115    /// Upsert a daemon's state, merging with existing values
116    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
117        info!(
118            "upserting daemon: {} pid: {} status: {}",
119            opts.id,
120            opts.pid.unwrap_or(0),
121            opts.status
122        );
123        let mut state_file = self.state_file.lock().await;
124        let existing = state_file.daemons.get(&opts.id);
125        let daemon = Daemon {
126            id: opts.id.clone(),
127            title: opts.pid.and_then(|pid| PROCS.title(pid)),
128            pid: opts.pid,
129            status: opts.status,
130            shell_pid: opts.shell_pid,
131            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
132            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
133            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
134            cron_schedule: opts
135                .cron_schedule
136                .or(existing.and_then(|d| d.cron_schedule.clone())),
137            cron_retrigger: opts
138                .cron_retrigger
139                .or(existing.and_then(|d| d.cron_retrigger)),
140            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
141            last_exit_success: opts
142                .last_exit_success
143                .or(existing.and_then(|d| d.last_exit_success)),
144            retry: opts
145                .retry
146                .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
147            retry_count: opts
148                .retry_count
149                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
150            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
151            ready_output: opts
152                .ready_output
153                .or(existing.and_then(|d| d.ready_output.clone())),
154            ready_http: opts
155                .ready_http
156                .or(existing.and_then(|d| d.ready_http.clone())),
157            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
158            ready_cmd: opts
159                .ready_cmd
160                .or(existing.and_then(|d| d.ready_cmd.clone())),
161            port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
162            resolved_port: if opts.resolved_port.is_empty() {
163                existing
164                    .map(|d| d.resolved_port.clone())
165                    .unwrap_or_default()
166            } else {
167                opts.resolved_port
168            },
169            depends: opts
170                .depends
171                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
172            env: opts.env.or(existing.and_then(|d| d.env.clone())),
173            watch: opts
174                .watch
175                .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
176            watch_mode: opts
177                .watch_mode
178                .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
179            watch_base_dir: opts
180                .watch_base_dir
181                .or(existing.and_then(|d| d.watch_base_dir.clone())),
182            mise: opts.mise.or(existing.and_then(|d| d.mise)),
183            user: opts.user.or(existing.and_then(|d| d.user.clone())),
184            proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
185            // active_port is intentionally NOT inherited from the existing daemon.
186            // When a daemon restarts, the new process has not yet bound a port, so
187            // carrying over the old process's active_port would cause the proxy to
188            // route to a port that is no longer listening.  The port will be
189            // re-detected by detect_and_store_active_port once the new process is ready.
190            active_port: opts.active_port,
191            slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
192            memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
193            cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
194            stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
195            pty: opts.pty.or(existing.and_then(|d| d.pty)),
196        };
197        state_file.daemons.insert(opts.id.clone(), daemon.clone());
198        if let Err(err) = state_file.write() {
199            warn!("failed to update state file: {err:#}");
200        }
201        Ok(daemon)
202    }
203
204    /// Enable a daemon (remove from disabled set)
205    pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
206        info!("enabling daemon: {id}");
207        let config = PitchforkToml::all_merged()?;
208        let mut state_file = self.state_file.lock().await;
209        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
210        if !exists {
211            return Err(miette::miette!("daemon '{}' not found", id));
212        }
213        let result = state_file.disabled.remove(id);
214        state_file.write()?;
215        Ok(result)
216    }
217
218    /// Disable a daemon (add to disabled set)
219    pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
220        info!("disabling daemon: {id}");
221        let config = PitchforkToml::all_merged()?;
222        let mut state_file = self.state_file.lock().await;
223        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
224        if !exists {
225            return Err(miette::miette!("daemon '{}' not found", id));
226        }
227        let result = state_file.disabled.insert(id.clone());
228        state_file.write()?;
229        Ok(result)
230    }
231
232    /// Get a daemon by ID
233    pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
234        self.state_file.lock().await.daemons.get(id).cloned()
235    }
236
237    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
238    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
239        let pitchfork_id = DaemonId::pitchfork();
240        self.state_file
241            .lock()
242            .await
243            .daemons
244            .values()
245            .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
246            .cloned()
247            .collect()
248    }
249
250    /// Remove a daemon from state
251    pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
252        let mut state_file = self.state_file.lock().await;
253        state_file.daemons.remove(id);
254        if let Err(err) = state_file.write() {
255            warn!("failed to update state file: {err:#}");
256        }
257        Ok(())
258    }
259
260    /// Set the shell's working directory
261    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
262        let mut state_file = self.state_file.lock().await;
263        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
264        state_file.write()?;
265        Ok(())
266    }
267
268    /// Get the shell's working directory
269    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
270        self.state_file
271            .lock()
272            .await
273            .shell_dirs
274            .get(&shell_pid.to_string())
275            .cloned()
276    }
277
278    /// Remove a shell PID from tracking
279    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
280        let mut state_file = self.state_file.lock().await;
281        if state_file
282            .shell_dirs
283            .remove(&shell_pid.to_string())
284            .is_some()
285        {
286            state_file.write()?;
287        }
288        Ok(())
289    }
290
291    /// Get all directories with their associated shell PIDs
292    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
293        self.state_file.lock().await.shell_dirs.iter().fold(
294            HashMap::new(),
295            |mut acc, (pid, dir)| {
296                if let Ok(pid) = pid.parse() {
297                    acc.entry(dir.clone()).or_default().push(pid);
298                }
299                acc
300            },
301        )
302    }
303
304    /// Get pending notifications and clear the queue
305    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
306        self.pending_notifications.lock().await.drain(..).collect()
307    }
308
309    /// Clean up daemons that have no PID
310    pub(crate) async fn clean(&self) -> Result<()> {
311        let mut state_file = self.state_file.lock().await;
312        state_file.daemons.retain(|_id, d| d.pid.is_some());
313        state_file.write()?;
314        Ok(())
315    }
316}