Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::procs::PROCS;
15use crate::settings::settings;
16use indexmap::IndexMap;
17use std::collections::HashMap;
18use std::path::PathBuf;
19
20/// Options for upserting a daemon's state.
21///
22/// Use `UpsertDaemonOpts::builder(id)` to create, then set fields directly and call `.build()`.
23#[derive(Debug)]
24pub(crate) struct UpsertDaemonOpts {
25    pub id: DaemonId,
26    pub pid: Option<u32>,
27    pub status: DaemonStatus,
28    pub shell_pid: Option<u32>,
29    pub dir: Option<PathBuf>,
30    pub cmd: Option<Vec<String>>,
31    pub autostop: bool,
32    pub cron_schedule: Option<String>,
33    pub cron_retrigger: Option<CronRetrigger>,
34    pub last_exit_success: Option<bool>,
35    pub retry: Option<u32>,
36    pub retry_count: Option<u32>,
37    pub ready_delay: Option<u64>,
38    pub ready_output: Option<String>,
39    pub ready_http: Option<String>,
40    pub ready_port: Option<u16>,
41    pub ready_cmd: Option<String>,
42    /// Expected ports from configuration (before auto-bump resolution)
43    pub expected_port: Vec<u16>,
44    /// Resolved ports actually used after auto-bump (may differ from expected)
45    pub resolved_port: Vec<u16>,
46    pub auto_bump_port: Option<bool>,
47    pub port_bump_attempts: Option<u32>,
48    pub depends: Option<Vec<DaemonId>>,
49    pub env: Option<IndexMap<String, String>>,
50    pub watch: Option<Vec<String>>,
51    pub watch_base_dir: Option<PathBuf>,
52    pub mise: Option<bool>,
53    /// Memory limit for the daemon process
54    pub memory_limit: Option<MemoryLimit>,
55    /// CPU usage limit as a percentage
56    pub cpu_limit: Option<CpuLimit>,
57}
58
59/// Builder for UpsertDaemonOpts - ensures daemon ID is always provided.
60///
61/// # Example
62/// ```ignore
63/// let opts = UpsertDaemonOpts::builder(daemon_id)
64///     .set(|o| {
65///         o.pid = Some(pid);
66///         o.status = DaemonStatus::Running;
67///     })
68///     .build();
69/// ```
70#[derive(Debug)]
71pub(crate) struct UpsertDaemonOptsBuilder {
72    pub opts: UpsertDaemonOpts,
73}
74
75impl UpsertDaemonOpts {
76    /// Create a builder with the required daemon ID.
77    pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
78        UpsertDaemonOptsBuilder {
79            opts: UpsertDaemonOpts {
80                id,
81                pid: None,
82                status: DaemonStatus::default(),
83                shell_pid: None,
84                dir: None,
85                cmd: None,
86                autostop: false,
87                cron_schedule: None,
88                cron_retrigger: None,
89                last_exit_success: None,
90                retry: None,
91                retry_count: None,
92                ready_delay: None,
93                ready_output: None,
94                ready_http: None,
95                ready_port: None,
96                ready_cmd: None,
97                expected_port: Vec::new(),
98                resolved_port: Vec::new(),
99                auto_bump_port: None,
100                port_bump_attempts: None,
101                depends: None,
102                env: None,
103                watch: None,
104                watch_base_dir: None,
105                mise: None,
106                memory_limit: None,
107                cpu_limit: None,
108            },
109        }
110    }
111}
112
113impl UpsertDaemonOptsBuilder {
114    /// Modify opts fields with a closure.
115    pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
116        f(&mut self.opts);
117        self
118    }
119
120    /// Build the UpsertDaemonOpts.
121    pub fn build(self) -> UpsertDaemonOpts {
122        self.opts
123    }
124}
125
126impl Supervisor {
127    /// Upsert a daemon's state, merging with existing values
128    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
129        info!(
130            "upserting daemon: {} pid: {} status: {}",
131            opts.id,
132            opts.pid.unwrap_or(0),
133            opts.status
134        );
135        let mut state_file = self.state_file.lock().await;
136        let existing = state_file.daemons.get(&opts.id);
137        let daemon = Daemon {
138            id: opts.id.clone(),
139            title: opts.pid.and_then(|pid| PROCS.title(pid)),
140            pid: opts.pid,
141            status: opts.status,
142            shell_pid: opts.shell_pid,
143            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
144            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
145            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
146            cron_schedule: opts
147                .cron_schedule
148                .or(existing.and_then(|d| d.cron_schedule.clone())),
149            cron_retrigger: opts
150                .cron_retrigger
151                .or(existing.and_then(|d| d.cron_retrigger)),
152            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
153            last_exit_success: opts
154                .last_exit_success
155                .or(existing.and_then(|d| d.last_exit_success)),
156            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
157            retry_count: opts
158                .retry_count
159                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
160            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
161            ready_output: opts
162                .ready_output
163                .or(existing.and_then(|d| d.ready_output.clone())),
164            ready_http: opts
165                .ready_http
166                .or(existing.and_then(|d| d.ready_http.clone())),
167            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
168            ready_cmd: opts
169                .ready_cmd
170                .or(existing.and_then(|d| d.ready_cmd.clone())),
171            expected_port: if opts.expected_port.is_empty() {
172                existing
173                    .map(|d| d.expected_port.clone())
174                    .unwrap_or_default()
175            } else {
176                opts.expected_port
177            },
178            resolved_port: if opts.resolved_port.is_empty() {
179                existing
180                    .map(|d| d.resolved_port.clone())
181                    .unwrap_or_default()
182            } else {
183                opts.resolved_port
184            },
185            auto_bump_port: opts
186                .auto_bump_port
187                .unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
188            port_bump_attempts: opts.port_bump_attempts.unwrap_or(
189                existing
190                    .map(|d| d.port_bump_attempts)
191                    .unwrap_or_else(|| settings().default_port_bump_attempts()),
192            ),
193            depends: opts
194                .depends
195                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
196            env: opts.env.or(existing.and_then(|d| d.env.clone())),
197            watch: opts
198                .watch
199                .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
200            watch_base_dir: opts
201                .watch_base_dir
202                .or(existing.and_then(|d| d.watch_base_dir.clone())),
203            mise: opts
204                .mise
205                .unwrap_or(existing.map(|d| d.mise).unwrap_or(settings().general.mise)),
206            memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
207            cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
208        };
209        state_file.daemons.insert(opts.id.clone(), daemon.clone());
210        if let Err(err) = state_file.write() {
211            warn!("failed to update state file: {err:#}");
212        }
213        Ok(daemon)
214    }
215
216    /// Enable a daemon (remove from disabled set)
217    pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
218        info!("enabling daemon: {id}");
219        let config = PitchforkToml::all_merged()?;
220        let mut state_file = self.state_file.lock().await;
221        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
222        if !exists {
223            return Err(miette::miette!("daemon '{}' not found", id));
224        }
225        let result = state_file.disabled.remove(id);
226        state_file.write()?;
227        Ok(result)
228    }
229
230    /// Disable a daemon (add to disabled set)
231    pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
232        info!("disabling daemon: {id}");
233        let config = PitchforkToml::all_merged()?;
234        let mut state_file = self.state_file.lock().await;
235        let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
236        if !exists {
237            return Err(miette::miette!("daemon '{}' not found", id));
238        }
239        let result = state_file.disabled.insert(id.clone());
240        state_file.write()?;
241        Ok(result)
242    }
243
244    /// Get a daemon by ID
245    pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
246        self.state_file.lock().await.daemons.get(id).cloned()
247    }
248
249    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
250    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
251        let pitchfork_id = DaemonId::pitchfork();
252        self.state_file
253            .lock()
254            .await
255            .daemons
256            .values()
257            .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
258            .cloned()
259            .collect()
260    }
261
262    /// Remove a daemon from state
263    pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
264        let mut state_file = self.state_file.lock().await;
265        state_file.daemons.remove(id);
266        if let Err(err) = state_file.write() {
267            warn!("failed to update state file: {err:#}");
268        }
269        Ok(())
270    }
271
272    /// Set the shell's working directory
273    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
274        let mut state_file = self.state_file.lock().await;
275        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
276        state_file.write()?;
277        Ok(())
278    }
279
280    /// Get the shell's working directory
281    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
282        self.state_file
283            .lock()
284            .await
285            .shell_dirs
286            .get(&shell_pid.to_string())
287            .cloned()
288    }
289
290    /// Remove a shell PID from tracking
291    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
292        let mut state_file = self.state_file.lock().await;
293        if state_file
294            .shell_dirs
295            .remove(&shell_pid.to_string())
296            .is_some()
297        {
298            state_file.write()?;
299        }
300        Ok(())
301    }
302
303    /// Get all directories with their associated shell PIDs
304    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
305        self.state_file.lock().await.shell_dirs.iter().fold(
306            HashMap::new(),
307            |mut acc, (pid, dir)| {
308                if let Ok(pid) = pid.parse() {
309                    acc.entry(dir.clone()).or_default().push(pid);
310                }
311                acc
312            },
313        )
314    }
315
316    /// Get pending notifications and clear the queue
317    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
318        self.pending_notifications.lock().await.drain(..).collect()
319    }
320
321    /// Clean up daemons that have no PID
322    pub(crate) async fn clean(&self) -> Result<()> {
323        let mut state_file = self.state_file.lock().await;
324        state_file.daemons.retain(|_id, d| d.pid.is_some());
325        state_file.write()?;
326        Ok(())
327    }
328}