pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::WatchMode;
15use crate::procs::PROCS;
16use crate::settings::settings;
17use indexmap::IndexMap;
18use std::collections::HashMap;
19use std::path::PathBuf;
20
21#[derive(Debug)]
25pub(crate) struct UpsertDaemonOpts {
26 pub id: DaemonId,
27 pub pid: Option<u32>,
28 pub status: DaemonStatus,
29 pub shell_pid: Option<u32>,
30 pub dir: Option<PathBuf>,
31 pub cmd: Option<Vec<String>>,
32 pub autostop: bool,
33 pub cron_schedule: Option<String>,
34 pub cron_retrigger: Option<CronRetrigger>,
35 pub last_exit_success: Option<bool>,
36 pub retry: Option<u32>,
37 pub retry_count: Option<u32>,
38 pub ready_delay: Option<u64>,
39 pub ready_output: Option<String>,
40 pub ready_http: Option<String>,
41 pub ready_port: Option<u16>,
42 pub ready_cmd: Option<String>,
43 pub expected_port: Vec<u16>,
45 pub resolved_port: Vec<u16>,
47 pub active_port: Option<u16>,
49 pub slug: Option<String>,
51 pub proxy: Option<bool>,
53 pub auto_bump_port: Option<bool>,
54 pub port_bump_attempts: Option<u32>,
55 pub depends: Option<Vec<DaemonId>>,
56 pub env: Option<IndexMap<String, String>>,
57 pub watch: Option<Vec<String>>,
58 pub watch_mode: Option<WatchMode>,
59 pub watch_base_dir: Option<PathBuf>,
60 pub mise: Option<bool>,
61 pub user: Option<String>,
63 pub memory_limit: Option<MemoryLimit>,
65 pub cpu_limit: Option<CpuLimit>,
67}
68
69#[derive(Debug)]
81pub(crate) struct UpsertDaemonOptsBuilder {
82 pub opts: UpsertDaemonOpts,
83}
84
85impl UpsertDaemonOpts {
86 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
88 UpsertDaemonOptsBuilder {
89 opts: UpsertDaemonOpts {
90 id,
91 pid: None,
92 status: DaemonStatus::default(),
93 shell_pid: None,
94 dir: None,
95 cmd: None,
96 autostop: false,
97 cron_schedule: None,
98 cron_retrigger: None,
99 last_exit_success: None,
100 retry: None,
101 retry_count: None,
102 ready_delay: None,
103 ready_output: None,
104 ready_http: None,
105 ready_port: None,
106 ready_cmd: None,
107 expected_port: Vec::new(),
108 resolved_port: Vec::new(),
109 active_port: None,
110 slug: None,
111 proxy: None,
112 auto_bump_port: None,
113 port_bump_attempts: None,
114 depends: None,
115 env: None,
116 watch: None,
117 watch_mode: None,
118 watch_base_dir: None,
119 mise: None,
120 user: None,
121 memory_limit: None,
122 cpu_limit: None,
123 },
124 }
125 }
126}
127
128impl UpsertDaemonOptsBuilder {
129 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
131 f(&mut self.opts);
132 self
133 }
134
135 pub fn build(self) -> UpsertDaemonOpts {
137 self.opts
138 }
139}
140
141impl Supervisor {
142 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
144 info!(
145 "upserting daemon: {} pid: {} status: {}",
146 opts.id,
147 opts.pid.unwrap_or(0),
148 opts.status
149 );
150 let mut state_file = self.state_file.lock().await;
151 let existing = state_file.daemons.get(&opts.id);
152 let daemon = Daemon {
153 id: opts.id.clone(),
154 title: opts.pid.and_then(|pid| PROCS.title(pid)),
155 pid: opts.pid,
156 status: opts.status,
157 shell_pid: opts.shell_pid,
158 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
159 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
160 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
161 cron_schedule: opts
162 .cron_schedule
163 .or(existing.and_then(|d| d.cron_schedule.clone())),
164 cron_retrigger: opts
165 .cron_retrigger
166 .or(existing.and_then(|d| d.cron_retrigger)),
167 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
168 last_exit_success: opts
169 .last_exit_success
170 .or(existing.and_then(|d| d.last_exit_success)),
171 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
172 retry_count: opts
173 .retry_count
174 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
175 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
176 ready_output: opts
177 .ready_output
178 .or(existing.and_then(|d| d.ready_output.clone())),
179 ready_http: opts
180 .ready_http
181 .or(existing.and_then(|d| d.ready_http.clone())),
182 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
183 ready_cmd: opts
184 .ready_cmd
185 .or(existing.and_then(|d| d.ready_cmd.clone())),
186 expected_port: if opts.expected_port.is_empty() {
187 existing
188 .map(|d| d.expected_port.clone())
189 .unwrap_or_default()
190 } else {
191 opts.expected_port
192 },
193 resolved_port: if opts.resolved_port.is_empty() {
194 existing
195 .map(|d| d.resolved_port.clone())
196 .unwrap_or_default()
197 } else {
198 opts.resolved_port
199 },
200 auto_bump_port: opts
201 .auto_bump_port
202 .unwrap_or(existing.map(|d| d.auto_bump_port).unwrap_or(false)),
203 port_bump_attempts: opts.port_bump_attempts.unwrap_or(
204 existing
205 .map(|d| d.port_bump_attempts)
206 .unwrap_or_else(|| settings().default_port_bump_attempts()),
207 ),
208 depends: opts
209 .depends
210 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
211 env: opts.env.or(existing.and_then(|d| d.env.clone())),
212 watch: opts
213 .watch
214 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
215 watch_mode: opts
216 .watch_mode
217 .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
218 watch_base_dir: opts
219 .watch_base_dir
220 .or(existing.and_then(|d| d.watch_base_dir.clone())),
221 mise: opts.mise.or(existing.and_then(|d| d.mise)),
222 user: opts.user.or(existing.and_then(|d| d.user.clone())),
223 proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
224 active_port: opts.active_port,
230 slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
231 memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
232 cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
233 };
234 state_file.daemons.insert(opts.id.clone(), daemon.clone());
235 if let Err(err) = state_file.write() {
236 warn!("failed to update state file: {err:#}");
237 }
238 Ok(daemon)
239 }
240
241 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
243 info!("enabling daemon: {id}");
244 let config = PitchforkToml::all_merged()?;
245 let mut state_file = self.state_file.lock().await;
246 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
247 if !exists {
248 return Err(miette::miette!("daemon '{}' not found", id));
249 }
250 let result = state_file.disabled.remove(id);
251 state_file.write()?;
252 Ok(result)
253 }
254
255 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
257 info!("disabling daemon: {id}");
258 let config = PitchforkToml::all_merged()?;
259 let mut state_file = self.state_file.lock().await;
260 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
261 if !exists {
262 return Err(miette::miette!("daemon '{}' not found", id));
263 }
264 let result = state_file.disabled.insert(id.clone());
265 state_file.write()?;
266 Ok(result)
267 }
268
269 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
271 self.state_file.lock().await.daemons.get(id).cloned()
272 }
273
274 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
276 let pitchfork_id = DaemonId::pitchfork();
277 self.state_file
278 .lock()
279 .await
280 .daemons
281 .values()
282 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
283 .cloned()
284 .collect()
285 }
286
287 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
289 let mut state_file = self.state_file.lock().await;
290 state_file.daemons.remove(id);
291 if let Err(err) = state_file.write() {
292 warn!("failed to update state file: {err:#}");
293 }
294 Ok(())
295 }
296
297 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
299 let mut state_file = self.state_file.lock().await;
300 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
301 state_file.write()?;
302 Ok(())
303 }
304
305 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
307 self.state_file
308 .lock()
309 .await
310 .shell_dirs
311 .get(&shell_pid.to_string())
312 .cloned()
313 }
314
315 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
317 let mut state_file = self.state_file.lock().await;
318 if state_file
319 .shell_dirs
320 .remove(&shell_pid.to_string())
321 .is_some()
322 {
323 state_file.write()?;
324 }
325 Ok(())
326 }
327
328 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
330 self.state_file.lock().await.shell_dirs.iter().fold(
331 HashMap::new(),
332 |mut acc, (pid, dir)| {
333 if let Ok(pid) = pid.parse() {
334 acc.entry(dir.clone()).or_default().push(pid);
335 }
336 acc
337 },
338 )
339 }
340
341 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
343 self.pending_notifications.lock().await.drain(..).collect()
344 }
345
346 pub(crate) async fn clean(&self) -> Result<()> {
348 let mut state_file = self.state_file.lock().await;
349 state_file.daemons.retain(|_id, d| d.pid.is_some());
350 state_file.write()?;
351 Ok(())
352 }
353}