pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::ReadyHttp;
16use crate::pitchfork_toml::Retry;
17use crate::pitchfork_toml::StopConfig;
18use crate::pitchfork_toml::WatchMode;
19use crate::procs::PROCS;
20use indexmap::IndexMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23
24#[derive(Debug, Default)]
28pub(crate) struct UpsertDaemonOpts {
29 pub id: DaemonId,
30 pub pid: Option<u32>,
31 pub status: DaemonStatus,
32 pub shell_pid: Option<u32>,
33 pub dir: Option<PathBuf>,
34 pub cmd: Option<Vec<String>>,
35 pub run: Option<String>,
36 pub autostop: bool,
37 pub cron_schedule: Option<String>,
38 pub cron_retrigger: Option<CronRetrigger>,
39 pub cron_immediate: Option<bool>,
40 pub last_exit_success: Option<bool>,
41 pub retry: Option<Retry>,
42 pub retry_count: Option<u32>,
43 pub ready_delay: Option<u64>,
44 pub ready_output: Option<String>,
45 pub ready_http: Option<ReadyHttp>,
46 pub ready_port: Option<u16>,
47 pub ready_cmd: Option<String>,
48 pub port: Option<PortConfig>,
50 pub resolved_port: Vec<u16>,
52 pub active_port: Option<u16>,
54 pub slug: Option<String>,
56 pub proxy: Option<bool>,
58 pub depends: Option<Vec<DaemonId>>,
59 pub env: Option<IndexMap<String, String>>,
60 pub watch: Option<Vec<String>>,
61 pub watch_mode: Option<WatchMode>,
62 pub watch_base_dir: Option<PathBuf>,
63 pub mise: Option<bool>,
64 pub user: Option<String>,
66 pub memory_limit: Option<MemoryLimit>,
68 pub cpu_limit: Option<CpuLimit>,
70 pub stop_signal: Option<StopConfig>,
72 pub pty: Option<bool>,
74}
75
76#[derive(Debug)]
88pub(crate) struct UpsertDaemonOptsBuilder {
89 pub opts: UpsertDaemonOpts,
90}
91
92impl UpsertDaemonOpts {
93 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
95 UpsertDaemonOptsBuilder {
96 opts: UpsertDaemonOpts {
97 id,
98 ..Default::default()
99 },
100 }
101 }
102}
103
104impl UpsertDaemonOptsBuilder {
105 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
107 f(&mut self.opts);
108 self
109 }
110
111 pub fn build(self) -> UpsertDaemonOpts {
113 self.opts
114 }
115}
116
117impl Supervisor {
118 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
120 info!(
121 "upserting daemon: {} pid: {} status: {}",
122 opts.id,
123 opts.pid.unwrap_or(0),
124 opts.status
125 );
126 let mut state_file = self.state_file.lock().await;
127 let existing = state_file.daemons.get(&opts.id);
128 let daemon = Daemon {
129 id: opts.id.clone(),
130 title: opts.pid.and_then(|pid| PROCS.title(pid)),
131 pid: opts.pid,
132 status: opts.status,
133 shell_pid: opts.shell_pid,
134 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
135 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
136 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
137 run: opts.run.or(existing.and_then(|d| d.run.clone())),
138 cron_schedule: opts
139 .cron_schedule
140 .or(existing.and_then(|d| d.cron_schedule.clone())),
141 cron_retrigger: opts
142 .cron_retrigger
143 .or(existing.and_then(|d| d.cron_retrigger)),
144 cron_immediate: opts
145 .cron_immediate
146 .or(existing.and_then(|d| d.cron_immediate)),
147 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
148 last_exit_success: opts
149 .last_exit_success
150 .or(existing.and_then(|d| d.last_exit_success)),
151 retry: opts
152 .retry
153 .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
154 retry_count: opts
155 .retry_count
156 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
157 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
158 ready_output: opts
159 .ready_output
160 .or(existing.and_then(|d| d.ready_output.clone())),
161 ready_http: opts
162 .ready_http
163 .or(existing.and_then(|d| d.ready_http.clone())),
164 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
165 ready_cmd: opts
166 .ready_cmd
167 .or(existing.and_then(|d| d.ready_cmd.clone())),
168 port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
169 resolved_port: if opts.resolved_port.is_empty() {
170 existing
171 .map(|d| d.resolved_port.clone())
172 .unwrap_or_default()
173 } else {
174 opts.resolved_port
175 },
176 depends: opts
177 .depends
178 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
179 env: opts.env.or(existing.and_then(|d| d.env.clone())),
180 watch: opts
181 .watch
182 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
183 watch_mode: opts
184 .watch_mode
185 .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
186 watch_base_dir: opts
187 .watch_base_dir
188 .or(existing.and_then(|d| d.watch_base_dir.clone())),
189 mise: opts.mise.or(existing.and_then(|d| d.mise)),
190 user: opts.user.or(existing.and_then(|d| d.user.clone())),
191 proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
192 active_port: opts.active_port,
198 slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
199 memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
200 cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
201 stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
202 pty: opts.pty.or(existing.and_then(|d| d.pty)),
203 };
204 state_file.insert_daemon(&opts.id, daemon.clone());
205 Ok(daemon)
206 }
207
208 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
210 info!("enabling daemon: {id}");
211 let config = PitchforkToml::all_merged_all_namespaces()?;
212 let mut state_file = self.state_file.lock().await;
213 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
214 if !exists {
215 return Err(miette::miette!("daemon '{}' not found", id));
216 }
217 let result = state_file.enable_daemon(id);
218 Ok(result)
219 }
220
221 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
223 info!("disabling daemon: {id}");
224 let config = PitchforkToml::all_merged_all_namespaces()?;
225 let mut state_file = self.state_file.lock().await;
226 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
227 if !exists {
228 return Err(miette::miette!("daemon '{}' not found", id));
229 }
230 let result = state_file.disable_daemon(id);
231 Ok(result)
232 }
233
234 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
236 self.state_file.lock().await.daemons.get(id).cloned()
237 }
238
239 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
241 let pitchfork_id = DaemonId::pitchfork();
242 self.state_file
243 .lock()
244 .await
245 .daemons
246 .values()
247 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
248 .cloned()
249 .collect()
250 }
251
252 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
254 let mut state_file = self.state_file.lock().await;
255 state_file.remove_daemon(id);
256 Ok(())
257 }
258
259 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
261 let mut state_file = self.state_file.lock().await;
262 state_file.set_shell_dir(shell_pid, dir);
263 Ok(())
264 }
265
266 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
268 self.state_file
269 .lock()
270 .await
271 .shell_dirs
272 .get(&shell_pid.to_string())
273 .cloned()
274 }
275
276 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
278 let mut state_file = self.state_file.lock().await;
279 state_file.remove_shell_dir(shell_pid);
280 Ok(())
281 }
282
283 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
285 self.state_file.lock().await.shell_dirs.iter().fold(
286 HashMap::new(),
287 |mut acc, (pid, dir)| {
288 if let Ok(pid) = pid.parse() {
289 acc.entry(dir.clone()).or_default().push(pid);
290 }
291 acc
292 },
293 )
294 }
295
296 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
298 self.pending_notifications.lock().await.drain(..).collect()
299 }
300
301 pub(crate) async fn clean(&self) -> Result<()> {
303 let mut state_file = self.state_file.lock().await;
304 state_file.retain_daemons(|_id, d| d.pid.is_some());
305 Ok(())
306 }
307}