Skip to main content

pitchfork_cli/supervisor/
state.rs

1//! State access layer for the supervisor
2//!
3//! All state getter/setter operations for daemons, shell directories, and notifications.
4
5use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_status::DaemonStatus;
9use crate::pitchfork_toml::CronRetrigger;
10use crate::procs::PROCS;
11use indexmap::IndexMap;
12use std::collections::HashMap;
13use std::path::PathBuf;
14
15/// Options for upserting a daemon's state
16#[derive(Debug)]
17pub(crate) struct UpsertDaemonOpts {
18    pub id: String,
19    pub pid: Option<u32>,
20    pub status: DaemonStatus,
21    pub shell_pid: Option<u32>,
22    pub dir: Option<PathBuf>,
23    pub cmd: Option<Vec<String>>,
24    pub autostop: bool,
25    pub cron_schedule: Option<String>,
26    pub cron_retrigger: Option<CronRetrigger>,
27    pub last_exit_success: Option<bool>,
28    pub retry: Option<u32>,
29    pub retry_count: Option<u32>,
30    pub ready_delay: Option<u64>,
31    pub ready_output: Option<String>,
32    pub ready_http: Option<String>,
33    pub ready_port: Option<u16>,
34    pub ready_cmd: Option<String>,
35    pub depends: Option<Vec<String>>,
36    pub env: Option<IndexMap<String, String>>,
37}
38
39impl Default for UpsertDaemonOpts {
40    fn default() -> Self {
41        Self {
42            id: "".to_string(),
43            pid: None,
44            status: DaemonStatus::Stopped,
45            shell_pid: None,
46            dir: None,
47            cmd: None,
48            autostop: false,
49            cron_schedule: None,
50            cron_retrigger: None,
51            last_exit_success: None,
52            retry: None,
53            retry_count: None,
54            ready_delay: None,
55            ready_output: None,
56            ready_http: None,
57            ready_port: None,
58            ready_cmd: None,
59            depends: None,
60            env: None,
61        }
62    }
63}
64
65impl Supervisor {
66    /// Upsert a daemon's state, merging with existing values
67    pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
68        info!(
69            "upserting daemon: {} pid: {} status: {}",
70            opts.id,
71            opts.pid.unwrap_or(0),
72            opts.status
73        );
74        let mut state_file = self.state_file.lock().await;
75        let existing = state_file.daemons.get(&opts.id);
76        let daemon = Daemon {
77            id: opts.id.to_string(),
78            title: opts.pid.and_then(|pid| PROCS.title(pid)),
79            pid: opts.pid,
80            status: opts.status,
81            shell_pid: opts.shell_pid,
82            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
83            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
84            cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
85            cron_schedule: opts
86                .cron_schedule
87                .or(existing.and_then(|d| d.cron_schedule.clone())),
88            cron_retrigger: opts
89                .cron_retrigger
90                .or(existing.and_then(|d| d.cron_retrigger)),
91            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
92            last_exit_success: opts
93                .last_exit_success
94                .or(existing.and_then(|d| d.last_exit_success)),
95            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
96            retry_count: opts
97                .retry_count
98                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
99            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
100            ready_output: opts
101                .ready_output
102                .or(existing.and_then(|d| d.ready_output.clone())),
103            ready_http: opts
104                .ready_http
105                .or(existing.and_then(|d| d.ready_http.clone())),
106            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
107            ready_cmd: opts
108                .ready_cmd
109                .or(existing.and_then(|d| d.ready_cmd.clone())),
110            depends: opts
111                .depends
112                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
113            env: opts.env.or(existing.and_then(|d| d.env.clone())),
114        };
115        state_file
116            .daemons
117            .insert(opts.id.to_string(), daemon.clone());
118        if let Err(err) = state_file.write() {
119            warn!("failed to update state file: {err:#}");
120        }
121        Ok(daemon)
122    }
123
124    /// Enable a daemon (remove from disabled set)
125    pub async fn enable(&self, id: String) -> Result<bool> {
126        info!("enabling daemon: {id}");
127        let mut state_file = self.state_file.lock().await;
128        let result = state_file.disabled.remove(&id);
129        state_file.write()?;
130        Ok(result)
131    }
132
133    /// Disable a daemon (add to disabled set)
134    pub async fn disable(&self, id: String) -> Result<bool> {
135        info!("disabling daemon: {id}");
136        let mut state_file = self.state_file.lock().await;
137        let result = state_file.disabled.insert(id);
138        state_file.write()?;
139        Ok(result)
140    }
141
142    /// Get a daemon by ID
143    pub(crate) async fn get_daemon(&self, id: &str) -> Option<Daemon> {
144        self.state_file.lock().await.daemons.get(id).cloned()
145    }
146
147    /// Get all active daemons (those with PIDs, excluding pitchfork itself)
148    pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
149        self.state_file
150            .lock()
151            .await
152            .daemons
153            .values()
154            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
155            .cloned()
156            .collect()
157    }
158
159    /// Remove a daemon from state
160    pub(crate) async fn remove_daemon(&self, id: &str) -> Result<()> {
161        self.state_file.lock().await.daemons.remove(id);
162        if let Err(err) = self.state_file.lock().await.write() {
163            warn!("failed to update state file: {err:#}");
164        }
165        Ok(())
166    }
167
168    /// Set the shell's working directory
169    pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
170        let mut state_file = self.state_file.lock().await;
171        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
172        state_file.write()?;
173        Ok(())
174    }
175
176    /// Get the shell's working directory
177    pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
178        self.state_file
179            .lock()
180            .await
181            .shell_dirs
182            .get(&shell_pid.to_string())
183            .cloned()
184    }
185
186    /// Remove a shell PID from tracking
187    pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
188        let mut state_file = self.state_file.lock().await;
189        if state_file
190            .shell_dirs
191            .remove(&shell_pid.to_string())
192            .is_some()
193        {
194            state_file.write()?;
195        }
196        Ok(())
197    }
198
199    /// Get all directories with their associated shell PIDs
200    pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
201        self.state_file.lock().await.shell_dirs.iter().fold(
202            HashMap::new(),
203            |mut acc, (pid, dir)| {
204                if let Ok(pid) = pid.parse() {
205                    acc.entry(dir.clone()).or_default().push(pid);
206                }
207                acc
208            },
209        )
210    }
211
212    /// Get pending notifications and clear the queue
213    pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
214        self.pending_notifications.lock().await.drain(..).collect()
215    }
216
217    /// Clean up daemons that have no PID
218    pub(crate) async fn clean(&self) -> Result<()> {
219        let mut state_file = self.state_file.lock().await;
220        state_file.daemons.retain(|_id, d| d.pid.is_some());
221        state_file.write()?;
222        Ok(())
223    }
224}