pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::Retry;
16use crate::pitchfork_toml::StopConfig;
17use crate::pitchfork_toml::WatchMode;
18use crate::procs::PROCS;
19use indexmap::IndexMap;
20use std::collections::HashMap;
21use std::path::PathBuf;
22
23#[derive(Debug, Default)]
27pub(crate) struct UpsertDaemonOpts {
28 pub id: DaemonId,
29 pub pid: Option<u32>,
30 pub status: DaemonStatus,
31 pub shell_pid: Option<u32>,
32 pub dir: Option<PathBuf>,
33 pub cmd: Option<Vec<String>>,
34 pub autostop: bool,
35 pub cron_schedule: Option<String>,
36 pub cron_retrigger: Option<CronRetrigger>,
37 pub last_exit_success: Option<bool>,
38 pub retry: Option<Retry>,
39 pub retry_count: Option<u32>,
40 pub ready_delay: Option<u64>,
41 pub ready_output: Option<String>,
42 pub ready_http: Option<String>,
43 pub ready_port: Option<u16>,
44 pub ready_cmd: Option<String>,
45 pub port: Option<PortConfig>,
47 pub resolved_port: Vec<u16>,
49 pub active_port: Option<u16>,
51 pub slug: Option<String>,
53 pub proxy: Option<bool>,
55 pub depends: Option<Vec<DaemonId>>,
56 pub env: Option<IndexMap<String, String>>,
57 pub watch: Option<Vec<String>>,
58 pub watch_mode: Option<WatchMode>,
59 pub watch_base_dir: Option<PathBuf>,
60 pub mise: Option<bool>,
61 pub user: Option<String>,
63 pub memory_limit: Option<MemoryLimit>,
65 pub cpu_limit: Option<CpuLimit>,
67 pub stop_signal: Option<StopConfig>,
69 pub pty: Option<bool>,
71}
72
73#[derive(Debug)]
85pub(crate) struct UpsertDaemonOptsBuilder {
86 pub opts: UpsertDaemonOpts,
87}
88
89impl UpsertDaemonOpts {
90 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
92 UpsertDaemonOptsBuilder {
93 opts: UpsertDaemonOpts {
94 id,
95 ..Default::default()
96 },
97 }
98 }
99}
100
101impl UpsertDaemonOptsBuilder {
102 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
104 f(&mut self.opts);
105 self
106 }
107
108 pub fn build(self) -> UpsertDaemonOpts {
110 self.opts
111 }
112}
113
114impl Supervisor {
115 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
117 info!(
118 "upserting daemon: {} pid: {} status: {}",
119 opts.id,
120 opts.pid.unwrap_or(0),
121 opts.status
122 );
123 let mut state_file = self.state_file.lock().await;
124 let existing = state_file.daemons.get(&opts.id);
125 let daemon = Daemon {
126 id: opts.id.clone(),
127 title: opts.pid.and_then(|pid| PROCS.title(pid)),
128 pid: opts.pid,
129 status: opts.status,
130 shell_pid: opts.shell_pid,
131 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
132 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
133 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
134 cron_schedule: opts
135 .cron_schedule
136 .or(existing.and_then(|d| d.cron_schedule.clone())),
137 cron_retrigger: opts
138 .cron_retrigger
139 .or(existing.and_then(|d| d.cron_retrigger)),
140 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
141 last_exit_success: opts
142 .last_exit_success
143 .or(existing.and_then(|d| d.last_exit_success)),
144 retry: opts
145 .retry
146 .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
147 retry_count: opts
148 .retry_count
149 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
150 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
151 ready_output: opts
152 .ready_output
153 .or(existing.and_then(|d| d.ready_output.clone())),
154 ready_http: opts
155 .ready_http
156 .or(existing.and_then(|d| d.ready_http.clone())),
157 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
158 ready_cmd: opts
159 .ready_cmd
160 .or(existing.and_then(|d| d.ready_cmd.clone())),
161 port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
162 resolved_port: if opts.resolved_port.is_empty() {
163 existing
164 .map(|d| d.resolved_port.clone())
165 .unwrap_or_default()
166 } else {
167 opts.resolved_port
168 },
169 depends: opts
170 .depends
171 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
172 env: opts.env.or(existing.and_then(|d| d.env.clone())),
173 watch: opts
174 .watch
175 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
176 watch_mode: opts
177 .watch_mode
178 .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
179 watch_base_dir: opts
180 .watch_base_dir
181 .or(existing.and_then(|d| d.watch_base_dir.clone())),
182 mise: opts.mise.or(existing.and_then(|d| d.mise)),
183 user: opts.user.or(existing.and_then(|d| d.user.clone())),
184 proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
185 active_port: opts.active_port,
191 slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
192 memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
193 cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
194 stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
195 pty: opts.pty.or(existing.and_then(|d| d.pty)),
196 };
197 state_file.daemons.insert(opts.id.clone(), daemon.clone());
198 if let Err(err) = state_file.write() {
199 warn!("failed to update state file: {err:#}");
200 }
201 Ok(daemon)
202 }
203
204 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
206 info!("enabling daemon: {id}");
207 let config = PitchforkToml::all_merged()?;
208 let mut state_file = self.state_file.lock().await;
209 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
210 if !exists {
211 return Err(miette::miette!("daemon '{}' not found", id));
212 }
213 let result = state_file.disabled.remove(id);
214 state_file.write()?;
215 Ok(result)
216 }
217
218 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
220 info!("disabling daemon: {id}");
221 let config = PitchforkToml::all_merged()?;
222 let mut state_file = self.state_file.lock().await;
223 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
224 if !exists {
225 return Err(miette::miette!("daemon '{}' not found", id));
226 }
227 let result = state_file.disabled.insert(id.clone());
228 state_file.write()?;
229 Ok(result)
230 }
231
232 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
234 self.state_file.lock().await.daemons.get(id).cloned()
235 }
236
237 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
239 let pitchfork_id = DaemonId::pitchfork();
240 self.state_file
241 .lock()
242 .await
243 .daemons
244 .values()
245 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
246 .cloned()
247 .collect()
248 }
249
250 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
252 let mut state_file = self.state_file.lock().await;
253 state_file.daemons.remove(id);
254 if let Err(err) = state_file.write() {
255 warn!("failed to update state file: {err:#}");
256 }
257 Ok(())
258 }
259
260 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
262 let mut state_file = self.state_file.lock().await;
263 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
264 state_file.write()?;
265 Ok(())
266 }
267
268 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
270 self.state_file
271 .lock()
272 .await
273 .shell_dirs
274 .get(&shell_pid.to_string())
275 .cloned()
276 }
277
278 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
280 let mut state_file = self.state_file.lock().await;
281 if state_file
282 .shell_dirs
283 .remove(&shell_pid.to_string())
284 .is_some()
285 {
286 state_file.write()?;
287 }
288 Ok(())
289 }
290
291 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
293 self.state_file.lock().await.shell_dirs.iter().fold(
294 HashMap::new(),
295 |mut acc, (pid, dir)| {
296 if let Ok(pid) = pid.parse() {
297 acc.entry(dir.clone()).or_default().push(pid);
298 }
299 acc
300 },
301 )
302 }
303
304 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
306 self.pending_notifications.lock().await.drain(..).collect()
307 }
308
309 pub(crate) async fn clean(&self) -> Result<()> {
311 let mut state_file = self.state_file.lock().await;
312 state_file.daemons.retain(|_id, d| d.pid.is_some());
313 state_file.write()?;
314 Ok(())
315 }
316}