pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::procs::PROCS;
15use crate::settings::settings;
16use indexmap::IndexMap;
17use std::collections::HashMap;
18use std::path::PathBuf;
19
20#[derive(Debug)]
24pub(crate) struct UpsertDaemonOpts {
25 pub id: DaemonId,
26 pub pid: Option<u32>,
27 pub status: DaemonStatus,
28 pub shell_pid: Option<u32>,
29 pub dir: Option<PathBuf>,
30 pub cmd: Option<Vec<String>>,
31 pub autostop: bool,
32 pub cron_schedule: Option<String>,
33 pub cron_retrigger: Option<CronRetrigger>,
34 pub last_exit_success: Option<bool>,
35 pub retry: Option<u32>,
36 pub retry_count: Option<u32>,
37 pub ready_delay: Option<u64>,
38 pub ready_output: Option<String>,
39 pub ready_http: Option<String>,
40 pub ready_port: Option<u16>,
41 pub ready_cmd: Option<String>,
42 pub expected_port: Vec<u16>,
44 pub resolved_port: Vec<u16>,
46 pub auto_bump_port: Option<bool>,
47 pub port_bump_attempts: Option<u32>,
48 pub depends: Option<Vec<DaemonId>>,
49 pub env: Option<IndexMap<String, String>>,
50 pub watch: Option<Vec<String>>,
51 pub watch_base_dir: Option<PathBuf>,
52 pub mise: Option<bool>,
53 pub memory_limit: Option<MemoryLimit>,
55 pub cpu_limit: Option<CpuLimit>,
57}
58
59#[derive(Debug)]
71pub(crate) struct UpsertDaemonOptsBuilder {
72 pub opts: UpsertDaemonOpts,
73}
74
75impl UpsertDaemonOpts {
76 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
78 UpsertDaemonOptsBuilder {
79 opts: UpsertDaemonOpts {
80 id,
81 pid: None,
82 status: DaemonStatus::default(),
83 shell_pid: None,
84 dir: None,
85 cmd: None,
86 autostop: false,
87 cron_schedule: None,
88 cron_retrigger: None,
89 last_exit_success: None,
90 retry: None,
91 retry_count: None,
92 ready_delay: None,
93 ready_output: None,
94 ready_http: None,
95 ready_port: None,
96 ready_cmd: None,
97 expected_port: Vec::new(),
98 resolved_port: Vec::new(),
99 auto_bump_port: None,
100 port_bump_attempts: None,
101 depends: None,
102 env: None,
103 watch: None,
104 watch_base_dir: None,
105 mise: None,
106 memory_limit: None,
107 cpu_limit: None,
108 },
109 }
110 }
111}
112
113impl UpsertDaemonOptsBuilder {
114 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
116 f(&mut self.opts);
117 self
118 }
119
120 pub fn build(self) -> UpsertDaemonOpts {
122 self.opts
123 }
124}
125
126impl Supervisor {
127 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
129 info!(
130 "upserting daemon: {} pid: {} status: {}",
131 opts.id,
132 opts.pid.unwrap_or(0),
133 opts.status
134 );
135 let mut state_file = self.state_file.lock().await;
136 let existing = state_file.daemons.get(&opts.id);
137 let daemon = Daemon {
138 id: opts.id.clone(),
139 title: opts.pid.and_then(|pid| PROCS.title(pid)),
140 pid: opts.pid,
141 status: opts.status,
142 shell_pid: opts.shell_pid,
143 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
144 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
145 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
146 cron_schedule: opts
147 .cron_schedule
148 .or(existing.and_then(|d| d.cron_schedule.clone())),
149 cron_retrigger: opts
150 .cron_retrigger
151 .or(existing.and_then(|d| d.cron_retrigger)),
152 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
153 last_exit_success: opts
154 .last_exit_success
155 .or(existing.and_then(|d| d.last_exit_success)),
156 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
157 retry_count: opts
158 .retry_count
159 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
160 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
161 ready_output: opts
162 .ready_output
163 .or(existing.and_then(|d| d.ready_output.clone())),
164 ready_http: opts
165 .ready_http
166 .or(existing.and_then(|d| d.ready_http.clone())),
167 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
168 ready_cmd: opts
169 .ready_cmd
170 .or(existing.and_then(|d| d.ready_cmd.clone())),
171 expected_port: if opts.expected_port.is_empty() {
172 existing
173 .map(|d| d.expected_port.clone())
174 .unwrap_or_default()
175 } else {
176 opts.expected_port
177 },
178 resolved_port: if opts.resolved_port.is_empty() {
179 existing
180 .map(|d| d.resolved_port.clone())
181 .unwrap_or_default()
182 } else {
183 opts.resolved_port
184 },
185 auto_bump_port: opts
186 .auto_bump_port
187 .unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
188 port_bump_attempts: opts.port_bump_attempts.unwrap_or(
189 existing
190 .map(|d| d.port_bump_attempts)
191 .unwrap_or_else(|| settings().default_port_bump_attempts()),
192 ),
193 depends: opts
194 .depends
195 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
196 env: opts.env.or(existing.and_then(|d| d.env.clone())),
197 watch: opts
198 .watch
199 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
200 watch_base_dir: opts
201 .watch_base_dir
202 .or(existing.and_then(|d| d.watch_base_dir.clone())),
203 mise: opts
204 .mise
205 .unwrap_or(existing.map(|d| d.mise).unwrap_or(settings().general.mise)),
206 memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
207 cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
208 };
209 state_file.daemons.insert(opts.id.clone(), daemon.clone());
210 if let Err(err) = state_file.write() {
211 warn!("failed to update state file: {err:#}");
212 }
213 Ok(daemon)
214 }
215
216 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
218 info!("enabling daemon: {id}");
219 let config = PitchforkToml::all_merged()?;
220 let mut state_file = self.state_file.lock().await;
221 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
222 if !exists {
223 return Err(miette::miette!("daemon '{}' not found", id));
224 }
225 let result = state_file.disabled.remove(id);
226 state_file.write()?;
227 Ok(result)
228 }
229
230 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
232 info!("disabling daemon: {id}");
233 let config = PitchforkToml::all_merged()?;
234 let mut state_file = self.state_file.lock().await;
235 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
236 if !exists {
237 return Err(miette::miette!("daemon '{}' not found", id));
238 }
239 let result = state_file.disabled.insert(id.clone());
240 state_file.write()?;
241 Ok(result)
242 }
243
244 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
246 self.state_file.lock().await.daemons.get(id).cloned()
247 }
248
249 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
251 let pitchfork_id = DaemonId::pitchfork();
252 self.state_file
253 .lock()
254 .await
255 .daemons
256 .values()
257 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
258 .cloned()
259 .collect()
260 }
261
262 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
264 let mut state_file = self.state_file.lock().await;
265 state_file.daemons.remove(id);
266 if let Err(err) = state_file.write() {
267 warn!("failed to update state file: {err:#}");
268 }
269 Ok(())
270 }
271
272 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
274 let mut state_file = self.state_file.lock().await;
275 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
276 state_file.write()?;
277 Ok(())
278 }
279
280 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
282 self.state_file
283 .lock()
284 .await
285 .shell_dirs
286 .get(&shell_pid.to_string())
287 .cloned()
288 }
289
290 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
292 let mut state_file = self.state_file.lock().await;
293 if state_file
294 .shell_dirs
295 .remove(&shell_pid.to_string())
296 .is_some()
297 {
298 state_file.write()?;
299 }
300 Ok(())
301 }
302
303 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
305 self.state_file.lock().await.shell_dirs.iter().fold(
306 HashMap::new(),
307 |mut acc, (pid, dir)| {
308 if let Ok(pid) = pid.parse() {
309 acc.entry(dir.clone()).or_default().push(pid);
310 }
311 acc
312 },
313 )
314 }
315
316 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
318 self.pending_notifications.lock().await.drain(..).collect()
319 }
320
321 pub(crate) async fn clean(&self) -> Result<()> {
323 let mut state_file = self.state_file.lock().await;
324 state_file.daemons.retain(|_id, d| d.pid.is_some());
325 state_file.write()?;
326 Ok(())
327 }
328}