pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_status::DaemonStatus;
9use crate::pitchfork_toml::CronRetrigger;
10use crate::procs::PROCS;
11use indexmap::IndexMap;
12use std::collections::HashMap;
13use std::path::PathBuf;
14
15#[derive(Debug)]
17pub(crate) struct UpsertDaemonOpts {
18 pub id: String,
19 pub pid: Option<u32>,
20 pub status: DaemonStatus,
21 pub shell_pid: Option<u32>,
22 pub dir: Option<PathBuf>,
23 pub cmd: Option<Vec<String>>,
24 pub autostop: bool,
25 pub cron_schedule: Option<String>,
26 pub cron_retrigger: Option<CronRetrigger>,
27 pub last_exit_success: Option<bool>,
28 pub retry: Option<u32>,
29 pub retry_count: Option<u32>,
30 pub ready_delay: Option<u64>,
31 pub ready_output: Option<String>,
32 pub ready_http: Option<String>,
33 pub ready_port: Option<u16>,
34 pub ready_cmd: Option<String>,
35 pub depends: Option<Vec<String>>,
36 pub env: Option<IndexMap<String, String>>,
37}
38
39impl Default for UpsertDaemonOpts {
40 fn default() -> Self {
41 Self {
42 id: "".to_string(),
43 pid: None,
44 status: DaemonStatus::Stopped,
45 shell_pid: None,
46 dir: None,
47 cmd: None,
48 autostop: false,
49 cron_schedule: None,
50 cron_retrigger: None,
51 last_exit_success: None,
52 retry: None,
53 retry_count: None,
54 ready_delay: None,
55 ready_output: None,
56 ready_http: None,
57 ready_port: None,
58 ready_cmd: None,
59 depends: None,
60 env: None,
61 }
62 }
63}
64
65impl Supervisor {
66 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
68 info!(
69 "upserting daemon: {} pid: {} status: {}",
70 opts.id,
71 opts.pid.unwrap_or(0),
72 opts.status
73 );
74 let mut state_file = self.state_file.lock().await;
75 let existing = state_file.daemons.get(&opts.id);
76 let daemon = Daemon {
77 id: opts.id.to_string(),
78 title: opts.pid.and_then(|pid| PROCS.title(pid)),
79 pid: opts.pid,
80 status: opts.status,
81 shell_pid: opts.shell_pid,
82 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
83 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
84 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
85 cron_schedule: opts
86 .cron_schedule
87 .or(existing.and_then(|d| d.cron_schedule.clone())),
88 cron_retrigger: opts
89 .cron_retrigger
90 .or(existing.and_then(|d| d.cron_retrigger)),
91 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
92 last_exit_success: opts
93 .last_exit_success
94 .or(existing.and_then(|d| d.last_exit_success)),
95 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
96 retry_count: opts
97 .retry_count
98 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
99 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
100 ready_output: opts
101 .ready_output
102 .or(existing.and_then(|d| d.ready_output.clone())),
103 ready_http: opts
104 .ready_http
105 .or(existing.and_then(|d| d.ready_http.clone())),
106 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
107 ready_cmd: opts
108 .ready_cmd
109 .or(existing.and_then(|d| d.ready_cmd.clone())),
110 depends: opts
111 .depends
112 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
113 env: opts.env.or(existing.and_then(|d| d.env.clone())),
114 };
115 state_file
116 .daemons
117 .insert(opts.id.to_string(), daemon.clone());
118 if let Err(err) = state_file.write() {
119 warn!("failed to update state file: {err:#}");
120 }
121 Ok(daemon)
122 }
123
124 pub async fn enable(&self, id: String) -> Result<bool> {
126 info!("enabling daemon: {id}");
127 let mut state_file = self.state_file.lock().await;
128 let result = state_file.disabled.remove(&id);
129 state_file.write()?;
130 Ok(result)
131 }
132
133 pub async fn disable(&self, id: String) -> Result<bool> {
135 info!("disabling daemon: {id}");
136 let mut state_file = self.state_file.lock().await;
137 let result = state_file.disabled.insert(id);
138 state_file.write()?;
139 Ok(result)
140 }
141
142 pub(crate) async fn get_daemon(&self, id: &str) -> Option<Daemon> {
144 self.state_file.lock().await.daemons.get(id).cloned()
145 }
146
147 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
149 self.state_file
150 .lock()
151 .await
152 .daemons
153 .values()
154 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
155 .cloned()
156 .collect()
157 }
158
159 pub(crate) async fn remove_daemon(&self, id: &str) -> Result<()> {
161 self.state_file.lock().await.daemons.remove(id);
162 if let Err(err) = self.state_file.lock().await.write() {
163 warn!("failed to update state file: {err:#}");
164 }
165 Ok(())
166 }
167
168 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
170 let mut state_file = self.state_file.lock().await;
171 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
172 state_file.write()?;
173 Ok(())
174 }
175
176 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
178 self.state_file
179 .lock()
180 .await
181 .shell_dirs
182 .get(&shell_pid.to_string())
183 .cloned()
184 }
185
186 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
188 let mut state_file = self.state_file.lock().await;
189 if state_file
190 .shell_dirs
191 .remove(&shell_pid.to_string())
192 .is_some()
193 {
194 state_file.write()?;
195 }
196 Ok(())
197 }
198
199 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
201 self.state_file.lock().await.shell_dirs.iter().fold(
202 HashMap::new(),
203 |mut acc, (pid, dir)| {
204 if let Ok(pid) = pid.parse() {
205 acc.entry(dir.clone()).or_default().push(pid);
206 }
207 acc
208 },
209 )
210 }
211
212 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
214 self.pending_notifications.lock().await.drain(..).collect()
215 }
216
217 pub(crate) async fn clean(&self) -> Result<()> {
219 let mut state_file = self.state_file.lock().await;
220 state_file.daemons.retain(|_id, d| d.pid.is_some());
221 state_file.write()?;
222 Ok(())
223 }
224}