pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::ReadyHttp;
16use crate::pitchfork_toml::Retry;
17use crate::pitchfork_toml::StopConfig;
18use crate::pitchfork_toml::WatchMode;
19use crate::procs::PROCS;
20use indexmap::IndexMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23
24#[derive(Debug, Default)]
28pub(crate) struct UpsertDaemonOpts {
29 pub id: DaemonId,
30 pub pid: Option<u32>,
31 pub status: DaemonStatus,
32 pub shell_pid: Option<u32>,
33 pub dir: Option<PathBuf>,
34 pub cmd: Option<Vec<String>>,
35 pub autostop: bool,
36 pub cron_schedule: Option<String>,
37 pub cron_retrigger: Option<CronRetrigger>,
38 pub last_exit_success: Option<bool>,
39 pub retry: Option<Retry>,
40 pub retry_count: Option<u32>,
41 pub ready_delay: Option<u64>,
42 pub ready_output: Option<String>,
43 pub ready_http: Option<ReadyHttp>,
44 pub ready_port: Option<u16>,
45 pub ready_cmd: Option<String>,
46 pub port: Option<PortConfig>,
48 pub resolved_port: Vec<u16>,
50 pub active_port: Option<u16>,
52 pub slug: Option<String>,
54 pub proxy: Option<bool>,
56 pub depends: Option<Vec<DaemonId>>,
57 pub env: Option<IndexMap<String, String>>,
58 pub watch: Option<Vec<String>>,
59 pub watch_mode: Option<WatchMode>,
60 pub watch_base_dir: Option<PathBuf>,
61 pub mise: Option<bool>,
62 pub user: Option<String>,
64 pub memory_limit: Option<MemoryLimit>,
66 pub cpu_limit: Option<CpuLimit>,
68 pub stop_signal: Option<StopConfig>,
70 pub pty: Option<bool>,
72}
73
74#[derive(Debug)]
86pub(crate) struct UpsertDaemonOptsBuilder {
87 pub opts: UpsertDaemonOpts,
88}
89
90impl UpsertDaemonOpts {
91 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
93 UpsertDaemonOptsBuilder {
94 opts: UpsertDaemonOpts {
95 id,
96 ..Default::default()
97 },
98 }
99 }
100}
101
102impl UpsertDaemonOptsBuilder {
103 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
105 f(&mut self.opts);
106 self
107 }
108
109 pub fn build(self) -> UpsertDaemonOpts {
111 self.opts
112 }
113}
114
115impl Supervisor {
116 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
118 info!(
119 "upserting daemon: {} pid: {} status: {}",
120 opts.id,
121 opts.pid.unwrap_or(0),
122 opts.status
123 );
124 let mut state_file = self.state_file.lock().await;
125 let existing = state_file.daemons.get(&opts.id);
126 let daemon = Daemon {
127 id: opts.id.clone(),
128 title: opts.pid.and_then(|pid| PROCS.title(pid)),
129 pid: opts.pid,
130 status: opts.status,
131 shell_pid: opts.shell_pid,
132 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
133 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
134 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
135 cron_schedule: opts
136 .cron_schedule
137 .or(existing.and_then(|d| d.cron_schedule.clone())),
138 cron_retrigger: opts
139 .cron_retrigger
140 .or(existing.and_then(|d| d.cron_retrigger)),
141 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
142 last_exit_success: opts
143 .last_exit_success
144 .or(existing.and_then(|d| d.last_exit_success)),
145 retry: opts
146 .retry
147 .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
148 retry_count: opts
149 .retry_count
150 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
151 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
152 ready_output: opts
153 .ready_output
154 .or(existing.and_then(|d| d.ready_output.clone())),
155 ready_http: opts
156 .ready_http
157 .or(existing.and_then(|d| d.ready_http.clone())),
158 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
159 ready_cmd: opts
160 .ready_cmd
161 .or(existing.and_then(|d| d.ready_cmd.clone())),
162 port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
163 resolved_port: if opts.resolved_port.is_empty() {
164 existing
165 .map(|d| d.resolved_port.clone())
166 .unwrap_or_default()
167 } else {
168 opts.resolved_port
169 },
170 depends: opts
171 .depends
172 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
173 env: opts.env.or(existing.and_then(|d| d.env.clone())),
174 watch: opts
175 .watch
176 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
177 watch_mode: opts
178 .watch_mode
179 .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
180 watch_base_dir: opts
181 .watch_base_dir
182 .or(existing.and_then(|d| d.watch_base_dir.clone())),
183 mise: opts.mise.or(existing.and_then(|d| d.mise)),
184 user: opts.user.or(existing.and_then(|d| d.user.clone())),
185 proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
186 active_port: opts.active_port,
192 slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
193 memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
194 cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
195 stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
196 pty: opts.pty.or(existing.and_then(|d| d.pty)),
197 };
198 state_file.daemons.insert(opts.id.clone(), daemon.clone());
199 if let Err(err) = state_file.write() {
200 warn!("failed to update state file: {err:#}");
201 }
202 Ok(daemon)
203 }
204
205 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
207 info!("enabling daemon: {id}");
208 let config = PitchforkToml::all_merged()?;
209 let mut state_file = self.state_file.lock().await;
210 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
211 if !exists {
212 return Err(miette::miette!("daemon '{}' not found", id));
213 }
214 let result = state_file.disabled.remove(id);
215 state_file.write()?;
216 Ok(result)
217 }
218
219 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
221 info!("disabling daemon: {id}");
222 let config = PitchforkToml::all_merged()?;
223 let mut state_file = self.state_file.lock().await;
224 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
225 if !exists {
226 return Err(miette::miette!("daemon '{}' not found", id));
227 }
228 let result = state_file.disabled.insert(id.clone());
229 state_file.write()?;
230 Ok(result)
231 }
232
233 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
235 self.state_file.lock().await.daemons.get(id).cloned()
236 }
237
238 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
240 let pitchfork_id = DaemonId::pitchfork();
241 self.state_file
242 .lock()
243 .await
244 .daemons
245 .values()
246 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
247 .cloned()
248 .collect()
249 }
250
251 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
253 let mut state_file = self.state_file.lock().await;
254 state_file.daemons.remove(id);
255 if let Err(err) = state_file.write() {
256 warn!("failed to update state file: {err:#}");
257 }
258 Ok(())
259 }
260
261 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
263 let mut state_file = self.state_file.lock().await;
264 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
265 state_file.write()?;
266 Ok(())
267 }
268
269 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
271 self.state_file
272 .lock()
273 .await
274 .shell_dirs
275 .get(&shell_pid.to_string())
276 .cloned()
277 }
278
279 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
281 let mut state_file = self.state_file.lock().await;
282 if state_file
283 .shell_dirs
284 .remove(&shell_pid.to_string())
285 .is_some()
286 {
287 state_file.write()?;
288 }
289 Ok(())
290 }
291
292 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
294 self.state_file.lock().await.shell_dirs.iter().fold(
295 HashMap::new(),
296 |mut acc, (pid, dir)| {
297 if let Ok(pid) = pid.parse() {
298 acc.entry(dir.clone()).or_default().push(pid);
299 }
300 acc
301 },
302 )
303 }
304
305 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
307 self.pending_notifications.lock().await.drain(..).collect()
308 }
309
310 pub(crate) async fn clean(&self) -> Result<()> {
312 let mut state_file = self.state_file.lock().await;
313 state_file.daemons.retain(|_id, d| d.pid.is_some());
314 state_file.write()?;
315 Ok(())
316 }
317}