Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_status::DaemonStatus;
9use crate::pitchfork_toml::CronRetrigger;
10use crate::procs::PROCS;
11use std::collections::HashMap;
12use std::path::PathBuf;
13
14/// Options for upserting a daemon's state
15#[derive(Debug)]
16pub(crate) struct UpsertDaemonOpts {
17    pub id: String,
18    pub pid: Option<u32>,
19    pub status: DaemonStatus,
20    pub shell_pid: Option<u32>,
21    pub dir: Option<PathBuf>,
22    pub cmd: Option<Vec<String>>,
23    pub autostop: bool,
24    pub cron_schedule: Option<String>,
25    pub cron_retrigger: Option<CronRetrigger>,
26    pub last_exit_success: Option<bool>,
27    pub retry: Option<u32>,
28    pub retry_count: Option<u32>,
29    pub ready_delay: Option<u64>,
30    pub ready_output: Option<String>,
31    pub ready_http: Option<String>,
32    pub ready_port: Option<u16>,
33    pub ready_cmd: Option<String>,
34    pub depends: Option<Vec<String>>,
35}
36
37impl Default for UpsertDaemonOpts {
38    fn default() -> Self {
39        Self {
40            id: "".to_string(),
41            pid: None,
42            status: DaemonStatus::Stopped,
43            shell_pid: None,
44            dir: None,
45            cmd: None,
46            autostop: false,
47            cron_schedule: None,
48            cron_retrigger: None,
49            last_exit_success: None,
50            retry: None,
51            retry_count: None,
52            ready_delay: None,
53            ready_output: None,
54            ready_http: None,
55            ready_port: None,
56            ready_cmd: None,
57            depends: None,
58        }
59    }
60}
61
62impl Supervisor {
63    /// Upsert a daemon's state, merging with existing values
64    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
65        info!(
66            "upserting daemon: {} pid: {} status: {}",
67            opts.id,
68            opts.pid.unwrap_or(0),
69            opts.status
70        );
71        let mut state_file = self.state_file.lock().await;
72        let existing = state_file.daemons.get(&opts.id);
73        let daemon = Daemon {
74            id: opts.id.to_string(),
75            title: opts.pid.and_then(|pid| PROCS.title(pid)),
76            pid: opts.pid,
77            status: opts.status,
78            shell_pid: opts.shell_pid,
79            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
80            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
81            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
82            cron_schedule: opts
83                .cron_schedule
84                .or(existing.and_then(|d| d.cron_schedule.clone())),
85            cron_retrigger: opts
86                .cron_retrigger
87                .or(existing.and_then(|d| d.cron_retrigger)),
88            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
89            last_exit_success: opts
90                .last_exit_success
91                .or(existing.and_then(|d| d.last_exit_success)),
92            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
93            retry_count: opts
94                .retry_count
95                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
96            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
97            ready_output: opts
98                .ready_output
99                .or(existing.and_then(|d| d.ready_output.clone())),
100            ready_http: opts
101                .ready_http
102                .or(existing.and_then(|d| d.ready_http.clone())),
103            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
104            ready_cmd: opts
105                .ready_cmd
106                .or(existing.and_then(|d| d.ready_cmd.clone())),
107            depends: opts
108                .depends
109                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
110        };
111        state_file
112            .daemons
113            .insert(opts.id.to_string(), daemon.clone());
114        if let Err(err) = state_file.write() {
115            warn!("failed to update state file: {err:#}");
116        }
117        Ok(daemon)
118    }
119
120    /// Enable a daemon (remove from disabled set)
121    pub async fn enable(&self, id: String) -> Result<bool> {
122        info!("enabling daemon: {id}");
123        let mut state_file = self.state_file.lock().await;
124        let result = state_file.disabled.remove(&id);
125        state_file.write()?;
126        Ok(result)
127    }
128
129    /// Disable a daemon (add to disabled set)
130    pub async fn disable(&self, id: String) -> Result<bool> {
131        info!("disabling daemon: {id}");
132        let mut state_file = self.state_file.lock().await;
133        let result = state_file.disabled.insert(id);
134        state_file.write()?;
135        Ok(result)
136    }
137
138    /// Get a daemon by ID
139    pub(crate) async fn get_daemon(&self, id: &str) -> Option<Daemon> {
140        self.state_file.lock().await.daemons.get(id).cloned()
141    }
142
143    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
144    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
145        self.state_file
146            .lock()
147            .await
148            .daemons
149            .values()
150            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
151            .cloned()
152            .collect()
153    }
154
155    /// Remove a daemon from state
156    pub(crate) async fn remove_daemon(&self, id: &str) -> Result<()> {
157        self.state_file.lock().await.daemons.remove(id);
158        if let Err(err) = self.state_file.lock().await.write() {
159            warn!("failed to update state file: {err:#}");
160        }
161        Ok(())
162    }
163
164    /// Set the shell's working directory
165    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
166        let mut state_file = self.state_file.lock().await;
167        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
168        state_file.write()?;
169        Ok(())
170    }
171
172    /// Get the shell's working directory
173    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
174        self.state_file
175            .lock()
176            .await
177            .shell_dirs
178            .get(&shell_pid.to_string())
179            .cloned()
180    }
181
182    /// Remove a shell PID from tracking
183    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
184        let mut state_file = self.state_file.lock().await;
185        if state_file
186            .shell_dirs
187            .remove(&shell_pid.to_string())
188            .is_some()
189        {
190            state_file.write()?;
191        }
192        Ok(())
193    }
194
195    /// Get all directories with their associated shell PIDs
196    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
197        self.state_file.lock().await.shell_dirs.iter().fold(
198            HashMap::new(),
199            |mut acc, (pid, dir)| {
200                if let Ok(pid) = pid.parse() {
201                    acc.entry(dir.clone()).or_default().push(pid);
202                }
203                acc
204            },
205        )
206    }
207
208    /// Get pending notifications and clear the queue
209    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
210        self.pending_notifications.lock().await.drain(..).collect()
211    }
212
213    /// Clean up daemons that have no PID
214    pub(crate) async fn clean(&self) -> Result<()> {
215        let mut state_file = self.state_file.lock().await;
216        state_file.daemons.retain(|_id, d| d.pid.is_some());
217        state_file.write()?;
218        Ok(())
219    }
220}