pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CronRetrigger;
11use crate::pitchfork_toml::PitchforkToml;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use indexmap::IndexMap;
15use std::collections::HashMap;
16use std::path::PathBuf;
17
18#[derive(Debug)]
22pub(crate) struct UpsertDaemonOpts {
23 pub id: DaemonId,
24 pub pid: Option<u32>,
25 pub status: DaemonStatus,
26 pub shell_pid: Option<u32>,
27 pub dir: Option<PathBuf>,
28 pub cmd: Option<Vec<String>>,
29 pub autostop: bool,
30 pub cron_schedule: Option<String>,
31 pub cron_retrigger: Option<CronRetrigger>,
32 pub last_exit_success: Option<bool>,
33 pub retry: Option<u32>,
34 pub retry_count: Option<u32>,
35 pub ready_delay: Option<u64>,
36 pub ready_output: Option<String>,
37 pub ready_http: Option<String>,
38 pub ready_port: Option<u16>,
39 pub ready_cmd: Option<String>,
40 pub expected_port: Vec<u16>,
42 pub resolved_port: Vec<u16>,
44 pub auto_bump_port: Option<bool>,
45 pub port_bump_attempts: Option<u32>,
46 pub depends: Option<Vec<DaemonId>>,
47 pub env: Option<IndexMap<String, String>>,
48 pub watch: Option<Vec<String>>,
49 pub watch_base_dir: Option<PathBuf>,
50 pub mise: Option<bool>,
51}
52
53#[derive(Debug)]
65pub(crate) struct UpsertDaemonOptsBuilder {
66 pub opts: UpsertDaemonOpts,
67}
68
69impl UpsertDaemonOpts {
70 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
72 UpsertDaemonOptsBuilder {
73 opts: UpsertDaemonOpts {
74 id,
75 pid: None,
76 status: DaemonStatus::default(),
77 shell_pid: None,
78 dir: None,
79 cmd: None,
80 autostop: false,
81 cron_schedule: None,
82 cron_retrigger: None,
83 last_exit_success: None,
84 retry: None,
85 retry_count: None,
86 ready_delay: None,
87 ready_output: None,
88 ready_http: None,
89 ready_port: None,
90 ready_cmd: None,
91 expected_port: Vec::new(),
92 resolved_port: Vec::new(),
93 auto_bump_port: None,
94 port_bump_attempts: None,
95 depends: None,
96 env: None,
97 watch: None,
98 watch_base_dir: None,
99 mise: None,
100 },
101 }
102 }
103}
104
105impl UpsertDaemonOptsBuilder {
106 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
108 f(&mut self.opts);
109 self
110 }
111
112 pub fn build(self) -> UpsertDaemonOpts {
114 self.opts
115 }
116}
117
118impl Supervisor {
119 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
121 info!(
122 "upserting daemon: {} pid: {} status: {}",
123 opts.id,
124 opts.pid.unwrap_or(0),
125 opts.status
126 );
127 let mut state_file = self.state_file.lock().await;
128 let existing = state_file.daemons.get(&opts.id);
129 let daemon = Daemon {
130 id: opts.id.clone(),
131 title: opts.pid.and_then(|pid| PROCS.title(pid)),
132 pid: opts.pid,
133 status: opts.status,
134 shell_pid: opts.shell_pid,
135 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
136 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
137 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
138 cron_schedule: opts
139 .cron_schedule
140 .or(existing.and_then(|d| d.cron_schedule.clone())),
141 cron_retrigger: opts
142 .cron_retrigger
143 .or(existing.and_then(|d| d.cron_retrigger)),
144 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
145 last_exit_success: opts
146 .last_exit_success
147 .or(existing.and_then(|d| d.last_exit_success)),
148 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
149 retry_count: opts
150 .retry_count
151 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
152 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
153 ready_output: opts
154 .ready_output
155 .or(existing.and_then(|d| d.ready_output.clone())),
156 ready_http: opts
157 .ready_http
158 .or(existing.and_then(|d| d.ready_http.clone())),
159 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
160 ready_cmd: opts
161 .ready_cmd
162 .or(existing.and_then(|d| d.ready_cmd.clone())),
163 expected_port: if opts.expected_port.is_empty() {
164 existing
165 .map(|d| d.expected_port.clone())
166 .unwrap_or_default()
167 } else {
168 opts.expected_port
169 },
170 resolved_port: if opts.resolved_port.is_empty() {
171 existing
172 .map(|d| d.resolved_port.clone())
173 .unwrap_or_default()
174 } else {
175 opts.resolved_port
176 },
177 auto_bump_port: opts
178 .auto_bump_port
179 .unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
180 port_bump_attempts: opts.port_bump_attempts.unwrap_or(
181 existing
182 .map(|d| d.port_bump_attempts)
183 .unwrap_or_else(|| settings().default_port_bump_attempts()),
184 ),
185 depends: opts
186 .depends
187 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
188 env: opts.env.or(existing.and_then(|d| d.env.clone())),
189 watch: opts
190 .watch
191 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
192 watch_base_dir: opts
193 .watch_base_dir
194 .or(existing.and_then(|d| d.watch_base_dir.clone())),
195 mise: opts
196 .mise
197 .unwrap_or(existing.map(|d| d.mise).unwrap_or(settings().general.mise)),
198 };
199 state_file.daemons.insert(opts.id.clone(), daemon.clone());
200 if let Err(err) = state_file.write() {
201 warn!("failed to update state file: {err:#}");
202 }
203 Ok(daemon)
204 }
205
206 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
208 info!("enabling daemon: {id}");
209 let config = PitchforkToml::all_merged()?;
210 let mut state_file = self.state_file.lock().await;
211 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
212 if !exists {
213 return Err(miette::miette!("daemon '{}' not found", id));
214 }
215 let result = state_file.disabled.remove(id);
216 state_file.write()?;
217 Ok(result)
218 }
219
220 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
222 info!("disabling daemon: {id}");
223 let config = PitchforkToml::all_merged()?;
224 let mut state_file = self.state_file.lock().await;
225 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
226 if !exists {
227 return Err(miette::miette!("daemon '{}' not found", id));
228 }
229 let result = state_file.disabled.insert(id.clone());
230 state_file.write()?;
231 Ok(result)
232 }
233
234 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
236 self.state_file.lock().await.daemons.get(id).cloned()
237 }
238
239 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
241 let pitchfork_id = DaemonId::pitchfork();
242 self.state_file
243 .lock()
244 .await
245 .daemons
246 .values()
247 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
248 .cloned()
249 .collect()
250 }
251
252 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
254 let mut state_file = self.state_file.lock().await;
255 state_file.daemons.remove(id);
256 if let Err(err) = state_file.write() {
257 warn!("failed to update state file: {err:#}");
258 }
259 Ok(())
260 }
261
262 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
264 let mut state_file = self.state_file.lock().await;
265 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
266 state_file.write()?;
267 Ok(())
268 }
269
270 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
272 self.state_file
273 .lock()
274 .await
275 .shell_dirs
276 .get(&shell_pid.to_string())
277 .cloned()
278 }
279
280 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
282 let mut state_file = self.state_file.lock().await;
283 if state_file
284 .shell_dirs
285 .remove(&shell_pid.to_string())
286 .is_some()
287 {
288 state_file.write()?;
289 }
290 Ok(())
291 }
292
293 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
295 self.state_file.lock().await.shell_dirs.iter().fold(
296 HashMap::new(),
297 |mut acc, (pid, dir)| {
298 if let Ok(pid) = pid.parse() {
299 acc.entry(dir.clone()).or_default().push(pid);
300 }
301 acc
302 },
303 )
304 }
305
306 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
308 self.pending_notifications.lock().await.drain(..).collect()
309 }
310
311 pub(crate) async fn clean(&self) -> Result<()> {
313 let mut state_file = self.state_file.lock().await;
314 state_file.daemons.retain(|_id, d| d.pid.is_some());
315 state_file.write()?;
316 Ok(())
317 }
318}