pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15mod retry;
16mod state;
17mod watchers;
18
19use crate::daemon_id::DaemonId;
20use crate::daemon_status::DaemonStatus;
21use crate::ipc::server::{IpcServer, IpcServerHandle};
22use crate::procs::PROCS;
23use crate::settings::settings;
24use crate::state_file::StateFile;
25use crate::{Result, env};
26use duct::cmd;
27use miette::IntoDiagnostic;
28use once_cell::sync::Lazy;
29use std::collections::HashMap;
30use std::fs;
31use std::process::exit;
32use std::sync::atomic;
33use std::sync::atomic::{AtomicBool, AtomicU32};
34use std::time::Duration;
35#[cfg(unix)]
36use tokio::signal::unix::SignalKind;
37use tokio::sync::{Mutex, Notify};
38use tokio::task::JoinHandle;
39use tokio::{signal, time};
40
41pub(crate) use state::UpsertDaemonOpts;
43
44pub struct Supervisor {
45 pub(crate) state_file: Mutex<StateFile>,
46 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
47 pub(crate) last_refreshed_at: Mutex<time::Instant>,
48 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
50 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
52 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
54 pub(crate) active_monitors: AtomicU32,
58 pub(crate) monitor_done: Notify,
61}
62
63pub(crate) fn interval_duration() -> Duration {
64 settings().general_interval()
65}
66
67pub static SUPERVISOR: Lazy<Supervisor> =
68 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
69
70pub fn start_if_not_running() -> Result<()> {
71 let sf = StateFile::get();
72 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
73 && let Some(pid) = d.pid
74 && PROCS.is_running(pid)
75 {
76 return Ok(());
77 }
78 start_in_background()
79}
80
81pub fn start_in_background() -> Result<()> {
82 debug!("starting supervisor in background");
83 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
84 .stdout_null()
85 .stderr_null()
86 .start()
87 .into_diagnostic()?;
88 Ok(())
89}
90
91impl Supervisor {
92 pub fn new() -> Result<Self> {
93 Ok(Self {
94 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
95 last_refreshed_at: Mutex::new(time::Instant::now()),
96 pending_notifications: Mutex::new(vec![]),
97 pending_autostops: Mutex::new(HashMap::new()),
98 ipc_shutdown: Mutex::new(None),
99 hook_tasks: Mutex::new(Vec::new()),
100 active_monitors: AtomicU32::new(0),
101 monitor_done: Notify::new(),
102 })
103 }
104
105 pub async fn start(
106 &self,
107 is_boot: bool,
108 web_port: Option<u16>,
109 web_path: Option<String>,
110 ) -> Result<()> {
111 let pid = std::process::id();
112 info!("Starting supervisor with pid {pid}");
113
114 self.upsert_daemon(
115 UpsertDaemonOpts::builder(DaemonId::pitchfork())
116 .set(|o| {
117 o.pid = Some(pid);
118 o.status = DaemonStatus::Running;
119 })
120 .build(),
121 )
122 .await?;
123
124 if is_boot {
126 info!("Boot start mode enabled, starting boot_start daemons");
127 self.start_boot_daemons().await?;
128 }
129
130 self.interval_watch()?;
131 self.cron_watch()?;
132 self.signals()?;
133 self.daemon_file_watch()?;
134
135 let s = settings();
137 let effective_port = web_port.or_else(|| {
138 if s.web.auto_start {
139 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
140 Some(p) => Some(p),
141 None => {
142 error!(
143 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
144 s.web.bind_port
145 );
146 None
147 }
148 }
149 } else {
150 None
151 }
152 });
153 let effective_path = web_path.or_else(|| {
155 let bp = s.web.base_path.clone();
156 if bp.is_empty() { None } else { Some(bp) }
157 });
158 if let Some(port) = effective_port {
159 tokio::spawn(async move {
160 if let Err(e) = crate::web::serve(port, effective_path).await {
161 error!("Web server error: {e}");
162 }
163 });
164 }
165
166 let (ipc, ipc_handle) = IpcServer::new()?;
167 *self.ipc_shutdown.lock().await = Some(ipc_handle);
168 self.conn_watch(ipc).await
169 }
170
171 pub(crate) async fn refresh(&self) -> Result<()> {
172 trace!("refreshing");
173
174 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
177 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
178
179 if pids_to_check.is_empty() {
180 trace!("no shell PIDs to check, skipping process refresh");
182 } else {
183 PROCS.refresh_pids(&pids_to_check);
184 }
185
186 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
187 *last_refreshed_at = time::Instant::now();
188
189 for (dir, pids) in dirs_with_pids {
190 let to_remove = pids
191 .iter()
192 .filter(|pid| !PROCS.is_running(**pid))
193 .collect::<Vec<_>>();
194 for pid in &to_remove {
195 self.remove_shell_pid(**pid).await?
196 }
197 if to_remove.len() == pids.len() {
198 self.leave_dir(&dir).await?;
199 }
200 }
201
202 self.check_retry().await?;
203 self.process_pending_autostops().await?;
204
205 Ok(())
206 }
207
208 #[cfg(unix)]
209 fn signals(&self) -> Result<()> {
210 let signals = [
211 SignalKind::terminate(),
212 SignalKind::alarm(),
213 SignalKind::interrupt(),
214 SignalKind::quit(),
215 SignalKind::hangup(),
216 SignalKind::user_defined1(),
217 SignalKind::user_defined2(),
218 ];
219 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
220 for signal in signals {
221 let stream = match signal::unix::signal(signal) {
222 Ok(s) => s,
223 Err(e) => {
224 warn!("Failed to register signal handler for {signal:?}: {e}");
225 continue;
226 }
227 };
228 tokio::spawn(async move {
229 let mut stream = stream;
230 loop {
231 stream.recv().await;
232 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
233 exit(1);
234 } else {
235 SUPERVISOR.handle_signal().await;
236 }
237 }
238 });
239 }
240 Ok(())
241 }
242
243 #[cfg(windows)]
244 fn signals(&self) -> Result<()> {
245 tokio::spawn(async move {
246 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
247 loop {
248 if let Err(e) = signal::ctrl_c().await {
249 error!("Failed to wait for ctrl-c: {}", e);
250 return;
251 }
252 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
253 exit(1);
254 } else {
255 SUPERVISOR.handle_signal().await;
256 }
257 }
258 });
259 Ok(())
260 }
261
262 async fn handle_signal(&self) {
263 info!("received signal, stopping");
264 self.close().await;
265 exit(0)
266 }
267
268 pub(crate) async fn close(&self) {
269 let pitchfork_id = DaemonId::pitchfork();
270 for daemon in self.active_daemons().await {
271 if daemon.id == pitchfork_id {
272 continue;
273 }
274 if let Err(err) = self.stop(&daemon.id).await {
275 error!("failed to stop daemon {daemon}: {err}");
276 }
277 }
278 let _ = self.remove_daemon(&pitchfork_id).await;
279
280 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
282 handle.shutdown();
283 }
284
285 let drain_timeout = time::sleep(Duration::from_secs(5));
291 tokio::pin!(drain_timeout);
292 loop {
293 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
294 break;
295 }
296 tokio::select! {
297 _ = self.monitor_done.notified() => {}
298 _ = &mut drain_timeout => {
299 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
300 break;
301 }
302 }
303 }
304 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
305 let hook_timeout = Duration::from_secs(30);
306 for handle in handles {
307 match time::timeout(hook_timeout, handle).await {
308 Ok(_) => {} Err(_) => {
310 warn!(
311 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
312 );
313 }
314 }
315 }
316
317 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
318 }
319
320 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
321 self.pending_notifications
322 .lock()
323 .await
324 .push((level, message));
325 }
326}