Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CronRetrigger;
11use crate::pitchfork_toml::PitchforkToml;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use indexmap::IndexMap;
15use std::collections::HashMap;
16use std::path::PathBuf;
17
18/// Options for upserting a daemon's state.
19///
20/// Use `UpsertDaemonOpts::builder(id)` to create, then set fields directly and call `.build()`.
21#[derive(Debug)]
22pub(crate) struct UpsertDaemonOpts {
23    pub id: DaemonId,
24    pub pid: Option<u32>,
25    pub status: DaemonStatus,
26    pub shell_pid: Option<u32>,
27    pub dir: Option<PathBuf>,
28    pub cmd: Option<Vec<String>>,
29    pub autostop: bool,
30    pub cron_schedule: Option<String>,
31    pub cron_retrigger: Option<CronRetrigger>,
32    pub last_exit_success: Option<bool>,
33    pub retry: Option<u32>,
34    pub retry_count: Option<u32>,
35    pub ready_delay: Option<u64>,
36    pub ready_output: Option<String>,
37    pub ready_http: Option<String>,
38    pub ready_port: Option<u16>,
39    pub ready_cmd: Option<String>,
40    /// Expected ports from configuration (before auto-bump resolution)
41    pub expected_port: Vec<u16>,
42    /// Resolved ports actually used after auto-bump (may differ from expected)
43    pub resolved_port: Vec<u16>,
44    pub auto_bump_port: Option<bool>,
45    pub port_bump_attempts: Option<u32>,
46    pub depends: Option<Vec<DaemonId>>,
47    pub env: Option<IndexMap<String, String>>,
48    pub watch: Option<Vec<String>>,
49    pub watch_base_dir: Option<PathBuf>,
50    pub mise: Option<bool>,
51}
52
53/// Builder for UpsertDaemonOpts - ensures daemon ID is always provided.
54///
55/// # Example
56/// ```ignore
57/// let opts = UpsertDaemonOpts::builder(daemon_id)
58///     .set(|o| {
59///         o.pid = Some(pid);
60///         o.status = DaemonStatus::Running;
61///     })
62///     .build();
63/// ```
64#[derive(Debug)]
65pub(crate) struct UpsertDaemonOptsBuilder {
66    pub opts: UpsertDaemonOpts,
67}
68
69impl UpsertDaemonOpts {
70    /// Create a builder with the required daemon ID.
71    pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
72        UpsertDaemonOptsBuilder {
73            opts: UpsertDaemonOpts {
74                id,
75                pid: None,
76                status: DaemonStatus::default(),
77                shell_pid: None,
78                dir: None,
79                cmd: None,
80                autostop: false,
81                cron_schedule: None,
82                cron_retrigger: None,
83                last_exit_success: None,
84                retry: None,
85                retry_count: None,
86                ready_delay: None,
87                ready_output: None,
88                ready_http: None,
89                ready_port: None,
90                ready_cmd: None,
91                expected_port: Vec::new(),
92                resolved_port: Vec::new(),
93                auto_bump_port: None,
94                port_bump_attempts: None,
95                depends: None,
96                env: None,
97                watch: None,
98                watch_base_dir: None,
99                mise: None,
100            },
101        }
102    }
103}
104
105impl UpsertDaemonOptsBuilder {
106    /// Modify opts fields with a closure.
107    pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
108        f(&mut self.opts);
109        self
110    }
111
112    /// Build the UpsertDaemonOpts.
113    pub fn build(self) -> UpsertDaemonOpts {
114        self.opts
115    }
116}
117
118impl Supervisor {
119    /// Upsert a daemon's state, merging with existing values
120    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
121        info!(
122            "upserting daemon: {} pid: {} status: {}",
123            opts.id,
124            opts.pid.unwrap_or(0),
125            opts.status
126        );
127        let mut state_file = self.state_file.lock().await;
128        let existing = state_file.daemons.get(&opts.id);
129        let daemon = Daemon {
130            id: opts.id.clone(),
131            title: opts.pid.and_then(|pid| PROCS.title(pid)),
132            pid: opts.pid,
133            status: opts.status,
134            shell_pid: opts.shell_pid,
135            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
136            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
137            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
138            cron_schedule: opts
139                .cron_schedule
140                .or(existing.and_then(|d| d.cron_schedule.clone())),
141            cron_retrigger: opts
142                .cron_retrigger
143                .or(existing.and_then(|d| d.cron_retrigger)),
144            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
145            last_exit_success: opts
146                .last_exit_success
147                .or(existing.and_then(|d| d.last_exit_success)),
148            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
149            retry_count: opts
150                .retry_count
151                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
152            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
153            ready_output: opts
154                .ready_output
155                .or(existing.and_then(|d| d.ready_output.clone())),
156            ready_http: opts
157                .ready_http
158                .or(existing.and_then(|d| d.ready_http.clone())),
159            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
160            ready_cmd: opts
161                .ready_cmd
162                .or(existing.and_then(|d| d.ready_cmd.clone())),
163            expected_port: if opts.expected_port.is_empty() {
164                existing
165                    .map(|d| d.expected_port.clone())
166                    .unwrap_or_default()
167            } else {
168                opts.expected_port
169            },
170            resolved_port: if opts.resolved_port.is_empty() {
171                existing
172                    .map(|d| d.resolved_port.clone())
173                    .unwrap_or_default()
174            } else {
175                opts.resolved_port
176            },
177            auto_bump_port: opts
178                .auto_bump_port
179                .unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
180            port_bump_attempts: opts.port_bump_attempts.unwrap_or(
181                existing
182                    .map(|d| d.port_bump_attempts)
183                    .unwrap_or_else(|| settings().default_port_bump_attempts()),
184            ),
185            depends: opts
186                .depends
187                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
188            env: opts.env.or(existing.and_then(|d| d.env.clone())),
189            watch: opts
190                .watch
191                .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
192            watch_base_dir: opts
193                .watch_base_dir
194                .or(existing.and_then(|d| d.watch_base_dir.clone())),
195            mise: opts
196                .mise
197                .unwrap_or(existing.map(|d| d.mise).unwrap_or(settings().general.mise)),
198        };
199        state_file.daemons.insert(opts.id.clone(), daemon.clone());
200        if let Err(err) = state_file.write() {
201            warn!("failed to update state file: {err:#}");
202        }
203        Ok(daemon)
204    }
205
206    /// Enable a daemon (remove from disabled set)
207    pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
208        info!("enabling daemon: {id}");
209        let config = PitchforkToml::all_merged()?;
210        let mut state_file = self.state_file.lock().await;
211        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
212        if !exists {
213            return Err(miette::miette!("daemon '{}' not found", id));
214        }
215        let result = state_file.disabled.remove(id);
216        state_file.write()?;
217        Ok(result)
218    }
219
220    /// Disable a daemon (add to disabled set)
221    pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
222        info!("disabling daemon: {id}");
223        let config = PitchforkToml::all_merged()?;
224        let mut state_file = self.state_file.lock().await;
225        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
226        if !exists {
227            return Err(miette::miette!("daemon '{}' not found", id));
228        }
229        let result = state_file.disabled.insert(id.clone());
230        state_file.write()?;
231        Ok(result)
232    }
233
234    /// Get a daemon by ID
235    pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
236        self.state_file.lock().await.daemons.get(id).cloned()
237    }
238
239    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
240    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
241        let pitchfork_id = DaemonId::pitchfork();
242        self.state_file
243            .lock()
244            .await
245            .daemons
246            .values()
247            .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
248            .cloned()
249            .collect()
250    }
251
252    /// Remove a daemon from state
253    pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
254        let mut state_file = self.state_file.lock().await;
255        state_file.daemons.remove(id);
256        if let Err(err) = state_file.write() {
257            warn!("failed to update state file: {err:#}");
258        }
259        Ok(())
260    }
261
262    /// Set the shell's working directory
263    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
264        let mut state_file = self.state_file.lock().await;
265        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
266        state_file.write()?;
267        Ok(())
268    }
269
270    /// Get the shell's working directory
271    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
272        self.state_file
273            .lock()
274            .await
275            .shell_dirs
276            .get(&shell_pid.to_string())
277            .cloned()
278    }
279
280    /// Remove a shell PID from tracking
281    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
282        let mut state_file = self.state_file.lock().await;
283        if state_file
284            .shell_dirs
285            .remove(&shell_pid.to_string())
286            .is_some()
287        {
288            state_file.write()?;
289        }
290        Ok(())
291    }
292
293    /// Get all directories with their associated shell PIDs
294    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
295        self.state_file.lock().await.shell_dirs.iter().fold(
296            HashMap::new(),
297            |mut acc, (pid, dir)| {
298                if let Ok(pid) = pid.parse() {
299                    acc.entry(dir.clone()).or_default().push(pid);
300                }
301                acc
302            },
303        )
304    }
305
306    /// Get pending notifications and clear the queue
307    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
308        self.pending_notifications.lock().await.drain(..).collect()
309    }
310
311    /// Clean up daemons that have no PID
312    pub(crate) async fn clean(&self) -> Result<()> {
313        let mut state_file = self.state_file.lock().await;
314        state_file.daemons.retain(|_id, d| d.pid.is_some());
315        state_file.write()?;
316        Ok(())
317    }
318}