Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::WatchMode;
15use crate::procs::PROCS;
16use crate::settings::settings;
17use indexmap::IndexMap;
18use std::collections::HashMap;
19use std::path::PathBuf;
20
21/// Options for upserting a daemon's state.
22///
23/// Use `UpsertDaemonOpts::builder(id)` to create, then set fields directly and call `.build()`.
24#[derive(Debug)]
25pub(crate) struct UpsertDaemonOpts {
26    pub id: DaemonId,
27    pub pid: Option<u32>,
28    pub status: DaemonStatus,
29    pub shell_pid: Option<u32>,
30    pub dir: Option<PathBuf>,
31    pub cmd: Option<Vec<String>>,
32    pub autostop: bool,
33    pub cron_schedule: Option<String>,
34    pub cron_retrigger: Option<CronRetrigger>,
35    pub last_exit_success: Option<bool>,
36    pub retry: Option<u32>,
37    pub retry_count: Option<u32>,
38    pub ready_delay: Option<u64>,
39    pub ready_output: Option<String>,
40    pub ready_http: Option<String>,
41    pub ready_port: Option<u16>,
42    pub ready_cmd: Option<String>,
43    /// Expected ports from configuration (before auto-bump resolution)
44    pub expected_port: Vec<u16>,
45    /// Resolved ports actually used after auto-bump (may differ from expected)
46    pub resolved_port: Vec<u16>,
47    /// The first port the process is actually listening on (detected at runtime).
48    pub active_port: Option<u16>,
49    /// Optional stable slug alias for this daemon.
50    pub slug: Option<String>,
51    /// Whether to proxy this daemon (None = use global proxy.enable setting).
52    pub proxy: Option<bool>,
53    pub auto_bump_port: Option<bool>,
54    pub port_bump_attempts: Option<u32>,
55    pub depends: Option<Vec<DaemonId>>,
56    pub env: Option<IndexMap<String, String>>,
57    pub watch: Option<Vec<String>>,
58    pub watch_mode: Option<WatchMode>,
59    pub watch_base_dir: Option<PathBuf>,
60    pub mise: Option<bool>,
61    /// Unix user to run this daemon as
62    pub user: Option<String>,
63    /// Memory limit for the daemon process
64    pub memory_limit: Option<MemoryLimit>,
65    /// CPU usage limit as a percentage
66    pub cpu_limit: Option<CpuLimit>,
67}
68
69/// Builder for UpsertDaemonOpts - ensures daemon ID is always provided.
70///
71/// # Example
72/// ```ignore
73/// let opts = UpsertDaemonOpts::builder(daemon_id)
74///     .set(|o| {
75///         o.pid = Some(pid);
76///         o.status = DaemonStatus::Running;
77///     })
78///     .build();
79/// ```
80#[derive(Debug)]
81pub(crate) struct UpsertDaemonOptsBuilder {
82    pub opts: UpsertDaemonOpts,
83}
84
85impl UpsertDaemonOpts {
86    /// Create a builder with the required daemon ID.
87    pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
88        UpsertDaemonOptsBuilder {
89            opts: UpsertDaemonOpts {
90                id,
91                pid: None,
92                status: DaemonStatus::default(),
93                shell_pid: None,
94                dir: None,
95                cmd: None,
96                autostop: false,
97                cron_schedule: None,
98                cron_retrigger: None,
99                last_exit_success: None,
100                retry: None,
101                retry_count: None,
102                ready_delay: None,
103                ready_output: None,
104                ready_http: None,
105                ready_port: None,
106                ready_cmd: None,
107                expected_port: Vec::new(),
108                resolved_port: Vec::new(),
109                active_port: None,
110                slug: None,
111                proxy: None,
112                auto_bump_port: None,
113                port_bump_attempts: None,
114                depends: None,
115                env: None,
116                watch: None,
117                watch_mode: None,
118                watch_base_dir: None,
119                mise: None,
120                user: None,
121                memory_limit: None,
122                cpu_limit: None,
123            },
124        }
125    }
126}
127
128impl UpsertDaemonOptsBuilder {
129    /// Modify opts fields with a closure.
130    pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
131        f(&mut self.opts);
132        self
133    }
134
135    /// Build the UpsertDaemonOpts.
136    pub fn build(self) -> UpsertDaemonOpts {
137        self.opts
138    }
139}
140
141impl Supervisor {
142    /// Upsert a daemon's state, merging with existing values
143    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
144        info!(
145            "upserting daemon: {} pid: {} status: {}",
146            opts.id,
147            opts.pid.unwrap_or(0),
148            opts.status
149        );
150        let mut state_file = self.state_file.lock().await;
151        let existing = state_file.daemons.get(&opts.id);
152        let daemon = Daemon {
153            id: opts.id.clone(),
154            title: opts.pid.and_then(|pid| PROCS.title(pid)),
155            pid: opts.pid,
156            status: opts.status,
157            shell_pid: opts.shell_pid,
158            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
159            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
160            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
161            cron_schedule: opts
162                .cron_schedule
163                .or(existing.and_then(|d| d.cron_schedule.clone())),
164            cron_retrigger: opts
165                .cron_retrigger
166                .or(existing.and_then(|d| d.cron_retrigger)),
167            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
168            last_exit_success: opts
169                .last_exit_success
170                .or(existing.and_then(|d| d.last_exit_success)),
171            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
172            retry_count: opts
173                .retry_count
174                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
175            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
176            ready_output: opts
177                .ready_output
178                .or(existing.and_then(|d| d.ready_output.clone())),
179            ready_http: opts
180                .ready_http
181                .or(existing.and_then(|d| d.ready_http.clone())),
182            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
183            ready_cmd: opts
184                .ready_cmd
185                .or(existing.and_then(|d| d.ready_cmd.clone())),
186            expected_port: if opts.expected_port.is_empty() {
187                existing
188                    .map(|d| d.expected_port.clone())
189                    .unwrap_or_default()
190            } else {
191                opts.expected_port
192            },
193            resolved_port: if opts.resolved_port.is_empty() {
194                existing
195                    .map(|d| d.resolved_port.clone())
196                    .unwrap_or_default()
197            } else {
198                opts.resolved_port
199            },
200            auto_bump_port: opts
201                .auto_bump_port
202                .unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
203            port_bump_attempts: opts.port_bump_attempts.unwrap_or(
204                existing
205                    .map(|d| d.port_bump_attempts)
206                    .unwrap_or_else(|| settings().default_port_bump_attempts()),
207            ),
208            depends: opts
209                .depends
210                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
211            env: opts.env.or(existing.and_then(|d| d.env.clone())),
212            watch: opts
213                .watch
214                .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
215            watch_mode: opts
216                .watch_mode
217                .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
218            watch_base_dir: opts
219                .watch_base_dir
220                .or(existing.and_then(|d| d.watch_base_dir.clone())),
221            mise: opts.mise.or(existing.and_then(|d| d.mise)),
222            user: opts.user.or(existing.and_then(|d| d.user.clone())),
223            proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
224            // active_port is intentionally NOT inherited from the existing daemon.
225            // When a daemon restarts, the new process has not yet bound a port, so
226            // carrying over the old process's active_port would cause the proxy to
227            // route to a port that is no longer listening.  The port will be
228            // re-detected by detect_and_store_active_port once the new process is ready.
229            active_port: opts.active_port,
230            slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
231            memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
232            cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
233        };
234        state_file.daemons.insert(opts.id.clone(), daemon.clone());
235        if let Err(err) = state_file.write() {
236            warn!("failed to update state file: {err:#}");
237        }
238        Ok(daemon)
239    }
240
241    /// Enable a daemon (remove from disabled set)
242    pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
243        info!("enabling daemon: {id}");
244        let config = PitchforkToml::all_merged()?;
245        let mut state_file = self.state_file.lock().await;
246        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
247        if !exists {
248            return Err(miette::miette!("daemon '{}' not found", id));
249        }
250        let result = state_file.disabled.remove(id);
251        state_file.write()?;
252        Ok(result)
253    }
254
255    /// Disable a daemon (add to disabled set)
256    pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
257        info!("disabling daemon: {id}");
258        let config = PitchforkToml::all_merged()?;
259        let mut state_file = self.state_file.lock().await;
260        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
261        if !exists {
262            return Err(miette::miette!("daemon '{}' not found", id));
263        }
264        let result = state_file.disabled.insert(id.clone());
265        state_file.write()?;
266        Ok(result)
267    }
268
269    /// Get a daemon by ID
270    pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
271        self.state_file.lock().await.daemons.get(id).cloned()
272    }
273
274    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
275    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
276        let pitchfork_id = DaemonId::pitchfork();
277        self.state_file
278            .lock()
279            .await
280            .daemons
281            .values()
282            .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
283            .cloned()
284            .collect()
285    }
286
287    /// Remove a daemon from state
288    pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
289        let mut state_file = self.state_file.lock().await;
290        state_file.daemons.remove(id);
291        if let Err(err) = state_file.write() {
292            warn!("failed to update state file: {err:#}");
293        }
294        Ok(())
295    }
296
297    /// Set the shell's working directory
298    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
299        let mut state_file = self.state_file.lock().await;
300        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
301        state_file.write()?;
302        Ok(())
303    }
304
305    /// Get the shell's working directory
306    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
307        self.state_file
308            .lock()
309            .await
310            .shell_dirs
311            .get(&shell_pid.to_string())
312            .cloned()
313    }
314
315    /// Remove a shell PID from tracking
316    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
317        let mut state_file = self.state_file.lock().await;
318        if state_file
319            .shell_dirs
320            .remove(&shell_pid.to_string())
321            .is_some()
322        {
323            state_file.write()?;
324        }
325        Ok(())
326    }
327
328    /// Get all directories with their associated shell PIDs
329    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
330        self.state_file.lock().await.shell_dirs.iter().fold(
331            HashMap::new(),
332            |mut acc, (pid, dir)| {
333                if let Ok(pid) = pid.parse() {
334                    acc.entry(dir.clone()).or_default().push(pid);
335                }
336                acc
337            },
338        )
339    }
340
341    /// Get pending notifications and clear the queue
342    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
343        self.pending_notifications.lock().await.drain(..).collect()
344    }
345
346    /// Clean up daemons that have no PID
347    pub(crate) async fn clean(&self) -> Result<()> {
348        let mut state_file = self.state_file.lock().await;
349        state_file.daemons.retain(|_id, d| d.pid.is_some());
350        state_file.write()?;
351        Ok(())
352    }
353}