pitchfork_cli/supervisor/
state.rs1use super::Supervisor;
6use crate::Result;
7use crate::daemon::Daemon;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::pitchfork_toml::CpuLimit;
11use crate::pitchfork_toml::CronRetrigger;
12use crate::pitchfork_toml::MemoryLimit;
13use crate::pitchfork_toml::PitchforkToml;
14use crate::pitchfork_toml::PortConfig;
15use crate::pitchfork_toml::ReadyHttp;
16use crate::pitchfork_toml::Retry;
17use crate::pitchfork_toml::StopConfig;
18use crate::pitchfork_toml::WatchMode;
19use crate::procs::PROCS;
20use indexmap::IndexMap;
21use std::collections::HashMap;
22use std::path::PathBuf;
23
24#[derive(Debug, Default)]
28pub(crate) struct UpsertDaemonOpts {
29 pub id: DaemonId,
30 pub pid: Option<u32>,
31 pub status: DaemonStatus,
32 pub shell_pid: Option<u32>,
33 pub dir: Option<PathBuf>,
34 pub cmd: Option<Vec<String>>,
35 pub autostop: bool,
36 pub cron_schedule: Option<String>,
37 pub cron_retrigger: Option<CronRetrigger>,
38 pub cron_immediate: Option<bool>,
39 pub last_exit_success: Option<bool>,
40 pub retry: Option<Retry>,
41 pub retry_count: Option<u32>,
42 pub ready_delay: Option<u64>,
43 pub ready_output: Option<String>,
44 pub ready_http: Option<ReadyHttp>,
45 pub ready_port: Option<u16>,
46 pub ready_cmd: Option<String>,
47 pub port: Option<PortConfig>,
49 pub resolved_port: Vec<u16>,
51 pub active_port: Option<u16>,
53 pub slug: Option<String>,
55 pub proxy: Option<bool>,
57 pub depends: Option<Vec<DaemonId>>,
58 pub env: Option<IndexMap<String, String>>,
59 pub watch: Option<Vec<String>>,
60 pub watch_mode: Option<WatchMode>,
61 pub watch_base_dir: Option<PathBuf>,
62 pub mise: Option<bool>,
63 pub user: Option<String>,
65 pub memory_limit: Option<MemoryLimit>,
67 pub cpu_limit: Option<CpuLimit>,
69 pub stop_signal: Option<StopConfig>,
71 pub pty: Option<bool>,
73}
74
75#[derive(Debug)]
87pub(crate) struct UpsertDaemonOptsBuilder {
88 pub opts: UpsertDaemonOpts,
89}
90
91impl UpsertDaemonOpts {
92 pub fn builder(id: DaemonId) -> UpsertDaemonOptsBuilder {
94 UpsertDaemonOptsBuilder {
95 opts: UpsertDaemonOpts {
96 id,
97 ..Default::default()
98 },
99 }
100 }
101}
102
103impl UpsertDaemonOptsBuilder {
104 pub fn set<F: FnOnce(&mut UpsertDaemonOpts)>(mut self, f: F) -> Self {
106 f(&mut self.opts);
107 self
108 }
109
110 pub fn build(self) -> UpsertDaemonOpts {
112 self.opts
113 }
114}
115
116impl Supervisor {
117 pub(crate) async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
119 info!(
120 "upserting daemon: {} pid: {} status: {}",
121 opts.id,
122 opts.pid.unwrap_or(0),
123 opts.status
124 );
125 let mut state_file = self.state_file.lock().await;
126 let existing = state_file.daemons.get(&opts.id);
127 let daemon = Daemon {
128 id: opts.id.clone(),
129 title: opts.pid.and_then(|pid| PROCS.title(pid)),
130 pid: opts.pid,
131 status: opts.status,
132 shell_pid: opts.shell_pid,
133 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
134 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
135 cmd: opts.cmd.or(existing.and_then(|d| d.cmd.clone())),
136 cron_schedule: opts
137 .cron_schedule
138 .or(existing.and_then(|d| d.cron_schedule.clone())),
139 cron_retrigger: opts
140 .cron_retrigger
141 .or(existing.and_then(|d| d.cron_retrigger)),
142 cron_immediate: opts
143 .cron_immediate
144 .or(existing.and_then(|d| d.cron_immediate)),
145 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
146 last_exit_success: opts
147 .last_exit_success
148 .or(existing.and_then(|d| d.last_exit_success)),
149 retry: opts
150 .retry
151 .unwrap_or_else(|| existing.map(|d| d.retry).unwrap_or_default()),
152 retry_count: opts
153 .retry_count
154 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
155 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
156 ready_output: opts
157 .ready_output
158 .or(existing.and_then(|d| d.ready_output.clone())),
159 ready_http: opts
160 .ready_http
161 .or(existing.and_then(|d| d.ready_http.clone())),
162 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
163 ready_cmd: opts
164 .ready_cmd
165 .or(existing.and_then(|d| d.ready_cmd.clone())),
166 port: opts.port.or_else(|| existing.and_then(|d| d.port.clone())),
167 resolved_port: if opts.resolved_port.is_empty() {
168 existing
169 .map(|d| d.resolved_port.clone())
170 .unwrap_or_default()
171 } else {
172 opts.resolved_port
173 },
174 depends: opts
175 .depends
176 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
177 env: opts.env.or(existing.and_then(|d| d.env.clone())),
178 watch: opts
179 .watch
180 .unwrap_or_else(|| existing.map(|d| d.watch.clone()).unwrap_or_default()),
181 watch_mode: opts
182 .watch_mode
183 .unwrap_or_else(|| existing.map(|d| d.watch_mode).unwrap_or_default()),
184 watch_base_dir: opts
185 .watch_base_dir
186 .or(existing.and_then(|d| d.watch_base_dir.clone())),
187 mise: opts.mise.or(existing.and_then(|d| d.mise)),
188 user: opts.user.or(existing.and_then(|d| d.user.clone())),
189 proxy: opts.proxy.or(existing.and_then(|d| d.proxy)),
190 active_port: opts.active_port,
196 slug: opts.slug.or(existing.and_then(|d| d.slug.clone())),
197 memory_limit: opts.memory_limit.or(existing.and_then(|d| d.memory_limit)),
198 cpu_limit: opts.cpu_limit.or(existing.and_then(|d| d.cpu_limit)),
199 stop_signal: opts.stop_signal.or(existing.and_then(|d| d.stop_signal)),
200 pty: opts.pty.or(existing.and_then(|d| d.pty)),
201 };
202 state_file.insert_daemon(&opts.id, daemon.clone());
203 Ok(daemon)
204 }
205
206 pub async fn enable(&self, id: &DaemonId) -> Result<bool> {
208 info!("enabling daemon: {id}");
209 let config = PitchforkToml::all_merged_all_namespaces()?;
210 let mut state_file = self.state_file.lock().await;
211 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
212 if !exists {
213 return Err(miette::miette!("daemon '{}' not found", id));
214 }
215 let result = state_file.enable_daemon(id);
216 Ok(result)
217 }
218
219 pub async fn disable(&self, id: &DaemonId) -> Result<bool> {
221 info!("disabling daemon: {id}");
222 let config = PitchforkToml::all_merged_all_namespaces()?;
223 let mut state_file = self.state_file.lock().await;
224 let exists = state_file.daemons.contains_key(id) || config.daemons.contains_key(id);
225 if !exists {
226 return Err(miette::miette!("daemon '{}' not found", id));
227 }
228 let result = state_file.disable_daemon(id);
229 Ok(result)
230 }
231
232 pub(crate) async fn get_daemon(&self, id: &DaemonId) -> Option<Daemon> {
234 self.state_file.lock().await.daemons.get(id).cloned()
235 }
236
237 pub(crate) async fn active_daemons(&self) -> Vec<Daemon> {
239 let pitchfork_id = DaemonId::pitchfork();
240 self.state_file
241 .lock()
242 .await
243 .daemons
244 .values()
245 .filter(|d| d.pid.is_some() && d.id != pitchfork_id)
246 .cloned()
247 .collect()
248 }
249
250 pub(crate) async fn remove_daemon(&self, id: &DaemonId) -> Result<()> {
252 let mut state_file = self.state_file.lock().await;
253 state_file.remove_daemon(id);
254 Ok(())
255 }
256
257 pub(crate) async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
259 let mut state_file = self.state_file.lock().await;
260 state_file.set_shell_dir(shell_pid, dir);
261 Ok(())
262 }
263
264 pub(crate) async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
266 self.state_file
267 .lock()
268 .await
269 .shell_dirs
270 .get(&shell_pid.to_string())
271 .cloned()
272 }
273
274 pub(crate) async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
276 let mut state_file = self.state_file.lock().await;
277 state_file.remove_shell_dir(shell_pid);
278 Ok(())
279 }
280
281 pub(crate) async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
283 self.state_file.lock().await.shell_dirs.iter().fold(
284 HashMap::new(),
285 |mut acc, (pid, dir)| {
286 if let Ok(pid) = pid.parse() {
287 acc.entry(dir.clone()).or_default().push(pid);
288 }
289 acc
290 },
291 )
292 }
293
294 pub(crate) async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
296 self.pending_notifications.lock().await.drain(..).collect()
297 }
298
299 pub(crate) async fn clean(&self) -> Result<()> {
301 let mut state_file = self.state_file.lock().await;
302 state_file.retain_daemons(|_id, d| d.pid.is_some());
303 Ok(())
304 }
305}