pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_status::DaemonStatus;
9use crate::pitchfork_toml::CronRetrigger;
10use crate::procs::PROCS;
11use std::collections::HashMap;
12use std::path::PathBuf;
13
14/// Options for upserting a daemon's state
15#[derive(Debug)]
16pub(crate) struct UpsertDaemonOpts {
17    pub id: String,
18    pub pid: Option<u32>,
19    pub status: DaemonStatus,
20    pub shell_pid: Option<u32>,
21    pub dir: Option<PathBuf>,
22    pub autostop: bool,
23    pub cron_schedule: Option<String>,
24    pub cron_retrigger: Option<CronRetrigger>,
25    pub last_exit_success: Option<bool>,
26    pub retry: Option<u32>,
27    pub retry_count: Option<u32>,
28    pub ready_delay: Option<u64>,
29    pub ready_output: Option<String>,
30    pub ready_http: Option<String>,
31    pub ready_port: Option<u16>,
32    pub depends: Option<Vec<String>>,
33}
34
35impl Default for UpsertDaemonOpts {
36    fn default() -> Self {
37        Self {
38            id: "".to_string(),
39            pid: None,
40            status: DaemonStatus::Stopped,
41            shell_pid: None,
42            dir: None,
43            autostop: false,
44            cron_schedule: None,
45            cron_retrigger: None,
46            last_exit_success: None,
47            retry: None,
48            retry_count: None,
49            ready_delay: None,
50            ready_output: None,
51            ready_http: None,
52            ready_port: None,
53            depends: None,
54        }
55    }
56}
57
58impl Supervisor {
59    /// Upsert a daemon's state, merging with existing values
60    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
61        info!(
62            "upserting daemon: {} pid: {} status: {}",
63            opts.id,
64            opts.pid.unwrap_or(0),
65            opts.status
66        );
67        let mut state_file = self.state_file.lock().await;
68        let existing = state_file.daemons.get(&opts.id);
69        let daemon = Daemon {
70            id: opts.id.to_string(),
71            title: opts.pid.and_then(|pid| PROCS.title(pid)),
72            pid: opts.pid,
73            status: opts.status,
74            shell_pid: opts.shell_pid,
75            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
76            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
77            cron_schedule: opts
78                .cron_schedule
79                .or(existing.and_then(|d| d.cron_schedule.clone())),
80            cron_retrigger: opts
81                .cron_retrigger
82                .or(existing.and_then(|d| d.cron_retrigger)),
83            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
84            last_exit_success: opts
85                .last_exit_success
86                .or(existing.and_then(|d| d.last_exit_success)),
87            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
88            retry_count: opts
89                .retry_count
90                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
91            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
92            ready_output: opts
93                .ready_output
94                .or(existing.and_then(|d| d.ready_output.clone())),
95            ready_http: opts
96                .ready_http
97                .or(existing.and_then(|d| d.ready_http.clone())),
98            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
99            depends: opts
100                .depends
101                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
102        };
103        state_file
104            .daemons
105            .insert(opts.id.to_string(), daemon.clone());
106        if let Err(err) = state_file.write() {
107            warn!("failed to update state file: {err:#}");
108        }
109        Ok(daemon)
110    }
111
112    /// Enable a daemon (remove from disabled set)
113    pub async fn enable(&self, id: String) -> Result<bool> {
114        info!("enabling daemon: {id}");
115        let mut state_file = self.state_file.lock().await;
116        let result = state_file.disabled.remove(&id);
117        state_file.write()?;
118        Ok(result)
119    }
120
121    /// Disable a daemon (add to disabled set)
122    pub async fn disable(&self, id: String) -> Result<bool> {
123        info!("disabling daemon: {id}");
124        let mut state_file = self.state_file.lock().await;
125        let result = state_file.disabled.insert(id);
126        state_file.write()?;
127        Ok(result)
128    }
129
130    /// Get a daemon by ID
131    pub(crate) async fn get_daemon(&self, id: &str) -> Option<Daemon> {
132        self.state_file.lock().await.daemons.get(id).cloned()
133    }
134
135    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
136    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
137        self.state_file
138            .lock()
139            .await
140            .daemons
141            .values()
142            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
143            .cloned()
144            .collect()
145    }
146
147    /// Remove a daemon from state
148    pub(crate) async fn remove_daemon(&self, id: &str) -> Result<()> {
149        self.state_file.lock().await.daemons.remove(id);
150        if let Err(err) = self.state_file.lock().await.write() {
151            warn!("failed to update state file: {err:#}");
152        }
153        Ok(())
154    }
155
156    /// Set the shell's working directory
157    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
158        let mut state_file = self.state_file.lock().await;
159        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
160        state_file.write()?;
161        Ok(())
162    }
163
164    /// Get the shell's working directory
165    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
166        self.state_file
167            .lock()
168            .await
169            .shell_dirs
170            .get(&shell_pid.to_string())
171            .cloned()
172    }
173
174    /// Remove a shell PID from tracking
175    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
176        let mut state_file = self.state_file.lock().await;
177        if state_file
178            .shell_dirs
179            .remove(&shell_pid.to_string())
180            .is_some()
181        {
182            state_file.write()?;
183        }
184        Ok(())
185    }
186
187    /// Get all directories with their associated shell PIDs
188    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
189        self.state_file.lock().await.shell_dirs.iter().fold(
190            HashMap::new(),
191            |mut acc, (pid, dir)| {
192                if let Ok(pid) = pid.parse() {
193                    acc.entry(dir.clone()).or_default().push(pid);
194                }
195                acc
196            },
197        )
198    }
199
200    /// Get pending notifications and clear the queue
201    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
202        self.pending_notifications.lock().await.drain(..).collect()
203    }
204
205    /// Clean up daemons that have no PID
206    pub(crate) async fn clean(&self) -> Result<()> {
207        let mut state_file = self.state_file.lock().await;
208        state_file.daemons.retain(|_id, d| d.pid.is_some());
209        state_file.write()?;
210        Ok(())
211    }
212}