pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_status::DaemonStatus;
9use crate::pitchfork_toml::CronRetrigger;
10use crate::procs::PROCS;
11use std::collections::HashMap;
12use std::path::PathBuf;
13
14#[derive(Debug)]
16pub(crate) struct UpsertDaemonOpts {
17 pub id: String,
18 pub pid: Option<u32>,
19 pub status: DaemonStatus,
20 pub shell_pid: Option<u32>,
21 pub dir: Option<PathBuf>,
22 pub cmd: Option<Vec<String>>,
23 pub autostop: bool,
24 pub cron_schedule: Option<String>,
25 pub cron_retrigger: Option<CronRetrigger>,
26 pub last_exit_success: Option<bool>,
27 pub retry: Option<u32>,
28 pub retry_count: Option<u32>,
29 pub ready_delay: Option<u64>,
30 pub ready_output: Option<String>,
31 pub ready_http: Option<String>,
32 pub ready_port: Option<u16>,
33 pub ready_cmd: Option<String>,
34 pub depends: Option<Vec<String>>,
35}
36
37impl Default for UpsertDaemonOpts {
38 fn default() -> Self {
39 Self {
40 id: "".to_string(),
41 pid: None,
42 status: DaemonStatus::Stopped,
43 shell_pid: None,
44 dir: None,
45 cmd: None,
46 autostop: false,
47 cron_schedule: None,
48 cron_retrigger: None,
49 last_exit_success: None,
50 retry: None,
51 retry_count: None,
52 ready_delay: None,
53 ready_output: None,
54 ready_http: None,
55 ready_port: None,
56 ready_cmd: None,
57 depends: None,
58 }
59 }
60}
61
62impl Supervisor {
63 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
65 info!(
66 "upserting daemon: {} pid: {} status: {}",
67 opts.id,
68 opts.pid.unwrap_or(0),
69 opts.status
70 );
71 let mut state_file = self.state_file.lock().await;
72 let existing = state_file.daemons.get(&opts.id);
73 let daemon = Daemon {
74 id: opts.id.to_string(),
75 title: opts.pid.and_then(|pid| PROCS.title(pid)),
76 pid: opts.pid,
77 status: opts.status,
78 shell_pid: opts.shell_pid,
79 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
80 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
81 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
82 cron_schedule: opts
83 .cron_schedule
84 .or(existing.and_then(|d| d.cron_schedule.clone())),
85 cron_retrigger: opts
86 .cron_retrigger
87 .or(existing.and_then(|d| d.cron_retrigger)),
88 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
89 last_exit_success: opts
90 .last_exit_success
91 .or(existing.and_then(|d| d.last_exit_success)),
92 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
93 retry_count: opts
94 .retry_count
95 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
96 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
97 ready_output: opts
98 .ready_output
99 .or(existing.and_then(|d| d.ready_output.clone())),
100 ready_http: opts
101 .ready_http
102 .or(existing.and_then(|d| d.ready_http.clone())),
103 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
104 ready_cmd: opts
105 .ready_cmd
106 .or(existing.and_then(|d| d.ready_cmd.clone())),
107 depends: opts
108 .depends
109 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
110 };
111 state_file
112 .daemons
113 .insert(opts.id.to_string(), daemon.clone());
114 if let Err(err) = state_file.write() {
115 warn!("failed to update state file: {err:#}");
116 }
117 Ok(daemon)
118 }
119
120 pub async fn enable(&self, id: String) -> Result<bool> {
122 info!("enabling daemon: {id}");
123 let mut state_file = self.state_file.lock().await;
124 let result = state_file.disabled.remove(&id);
125 state_file.write()?;
126 Ok(result)
127 }
128
129 pub async fn disable(&self, id: String) -> Result<bool> {
131 info!("disabling daemon: {id}");
132 let mut state_file = self.state_file.lock().await;
133 let result = state_file.disabled.insert(id);
134 state_file.write()?;
135 Ok(result)
136 }
137
138 pub(crate) async fn get_daemon(&self, id: &str) -> Option<Daemon> {
140 self.state_file.lock().await.daemons.get(id).cloned()
141 }
142
143 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
145 self.state_file
146 .lock()
147 .await
148 .daemons
149 .values()
150 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
151 .cloned()
152 .collect()
153 }
154
155 pub(crate) async fn remove_daemon(&self, id: &str) -> Result<()> {
157 self.state_file.lock().await.daemons.remove(id);
158 if let Err(err) = self.state_file.lock().await.write() {
159 warn!("failed to update state file: {err:#}");
160 }
161 Ok(())
162 }
163
164 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
166 let mut state_file = self.state_file.lock().await;
167 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
168 state_file.write()?;
169 Ok(())
170 }
171
172 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
174 self.state_file
175 .lock()
176 .await
177 .shell_dirs
178 .get(&shell_pid.to_string())
179 .cloned()
180 }
181
182 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
184 let mut state_file = self.state_file.lock().await;
185 if state_file
186 .shell_dirs
187 .remove(&shell_pid.to_string())
188 .is_some()
189 {
190 state_file.write()?;
191 }
192 Ok(())
193 }
194
195 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
197 self.state_file.lock().await.shell_dirs.iter().fold(
198 HashMap::new(),
199 |mut acc, (pid, dir)| {
200 if let Ok(pid) = pid.parse() {
201 acc.entry(dir.clone()).or_default().push(pid);
202 }
203 acc
204 },
205 )
206 }
207
208 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
210 self.pending_notifications.lock().await.drain(..).collect()
211 }
212
213 pub(crate) async fn clean(&self) -> Result<()> {
215 let mut state_file = self.state_file.lock().await;
216 state_file.daemons.retain(|_id, d| d.pid.is_some());
217 state_file.write()?;
218 Ok(())
219 }
220}