pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_status::DaemonStatus;
9use crate::pitchfork_toml::CronRetrigger;
10use crate::procs::PROCS;
11use std::collections::HashMap;
12use std::path::PathBuf;
13
14#[derive(Debug)]
16pub(crate) struct UpsertDaemonOpts {
17 pub id: String,
18 pub pid: Option<u32>,
19 pub status: DaemonStatus,
20 pub shell_pid: Option<u32>,
21 pub dir: Option<PathBuf>,
22 pub autostop: bool,
23 pub cron_schedule: Option<String>,
24 pub cron_retrigger: Option<CronRetrigger>,
25 pub last_exit_success: Option<bool>,
26 pub retry: Option<u32>,
27 pub retry_count: Option<u32>,
28 pub ready_delay: Option<u64>,
29 pub ready_output: Option<String>,
30 pub ready_http: Option<String>,
31 pub ready_port: Option<u16>,
32 pub depends: Option<Vec<String>>,
33}
34
35impl Default for UpsertDaemonOpts {
36 fn default() -> Self {
37 Self {
38 id: "".to_string(),
39 pid: None,
40 status: DaemonStatus::Stopped,
41 shell_pid: None,
42 dir: None,
43 autostop: false,
44 cron_schedule: None,
45 cron_retrigger: None,
46 last_exit_success: None,
47 retry: None,
48 retry_count: None,
49 ready_delay: None,
50 ready_output: None,
51 ready_http: None,
52 ready_port: None,
53 depends: None,
54 }
55 }
56}
57
58impl Supervisor {
59 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
61 info!(
62 "upserting daemon: {} pid: {} status: {}",
63 opts.id,
64 opts.pid.unwrap_or(0),
65 opts.status
66 );
67 let mut state_file = self.state_file.lock().await;
68 let existing = state_file.daemons.get(&opts.id);
69 let daemon = Daemon {
70 id: opts.id.to_string(),
71 title: opts.pid.and_then(|pid| PROCS.title(pid)),
72 pid: opts.pid,
73 status: opts.status,
74 shell_pid: opts.shell_pid,
75 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
76 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
77 cron_schedule: opts
78 .cron_schedule
79 .or(existing.and_then(|d| d.cron_schedule.clone())),
80 cron_retrigger: opts
81 .cron_retrigger
82 .or(existing.and_then(|d| d.cron_retrigger)),
83 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
84 last_exit_success: opts
85 .last_exit_success
86 .or(existing.and_then(|d| d.last_exit_success)),
87 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
88 retry_count: opts
89 .retry_count
90 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
91 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
92 ready_output: opts
93 .ready_output
94 .or(existing.and_then(|d| d.ready_output.clone())),
95 ready_http: opts
96 .ready_http
97 .or(existing.and_then(|d| d.ready_http.clone())),
98 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
99 depends: opts
100 .depends
101 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
102 };
103 state_file
104 .daemons
105 .insert(opts.id.to_string(), daemon.clone());
106 if let Err(err) = state_file.write() {
107 warn!("failed to update state file: {err:#}");
108 }
109 Ok(daemon)
110 }
111
112 pub async fn enable(&self, id: String) -> Result<bool> {
114 info!("enabling daemon: {id}");
115 let mut state_file = self.state_file.lock().await;
116 let result = state_file.disabled.remove(&id);
117 state_file.write()?;
118 Ok(result)
119 }
120
121 pub async fn disable(&self, id: String) -> Result<bool> {
123 info!("disabling daemon: {id}");
124 let mut state_file = self.state_file.lock().await;
125 let result = state_file.disabled.insert(id);
126 state_file.write()?;
127 Ok(result)
128 }
129
130 pub(crate) async fn get_daemon(&self, id: &str) -> Option<Daemon> {
132 self.state_file.lock().await.daemons.get(id).cloned()
133 }
134
135 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
137 self.state_file
138 .lock()
139 .await
140 .daemons
141 .values()
142 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
143 .cloned()
144 .collect()
145 }
146
147 pub(crate) async fn remove_daemon(&self, id: &str) -> Result<()> {
149 self.state_file.lock().await.daemons.remove(id);
150 if let Err(err) = self.state_file.lock().await.write() {
151 warn!("failed to update state file: {err:#}");
152 }
153 Ok(())
154 }
155
156 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
158 let mut state_file = self.state_file.lock().await;
159 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
160 state_file.write()?;
161 Ok(())
162 }
163
164 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
166 self.state_file
167 .lock()
168 .await
169 .shell_dirs
170 .get(&shell_pid.to_string())
171 .cloned()
172 }
173
174 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
176 let mut state_file = self.state_file.lock().await;
177 if state_file
178 .shell_dirs
179 .remove(&shell_pid.to_string())
180 .is_some()
181 {
182 state_file.write()?;
183 }
184 Ok(())
185 }
186
187 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
189 self.state_file.lock().await.shell_dirs.iter().fold(
190 HashMap::new(),
191 |mut acc, (pid, dir)| {
192 if let Ok(pid) = pid.parse() {
193 acc.entry(dir.clone()).or_default().push(pid);
194 }
195 acc
196 },
197 )
198 }
199
200 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
202 self.pending_notifications.lock().await.drain(..).collect()
203 }
204
205 pub(crate) async fn clean(&self) -> Result<()> {
207 let mut state_file = self.state_file.lock().await;
208 state_file.daemons.retain(|_id, d| d.pid.is_some());
209 state_file.write()?;
210 Ok(())
211 }
212}