pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15mod retry;
16mod state;
17mod watchers;
18
19use crate::daemon_id::DaemonId;
20use crate::daemon_status::DaemonStatus;
21use crate::deps::compute_reverse_stop_order;
22use crate::ipc::server::{IpcServer, IpcServerHandle};
23
24use crate::procs::PROCS;
25use crate::settings::settings;
26use crate::state_file::StateFile;
27use crate::{Result, env};
28use duct::cmd;
29use miette::IntoDiagnostic;
30use once_cell::sync::Lazy;
31use std::collections::{HashMap, HashSet};
32use std::fs;
33#[cfg(unix)]
34use std::os::unix::fs::PermissionsExt;
35use std::process::exit;
36use std::sync::atomic;
37use std::sync::atomic::{AtomicBool, AtomicU32};
38use std::time::Duration;
39#[cfg(unix)]
40use tokio::signal::unix::SignalKind;
41use tokio::sync::{Mutex, Notify};
42use tokio::task::JoinHandle;
43use tokio::{signal, time};
44
45#[cfg(unix)]
54pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
55 Lazy::new(|| Mutex::new(HashMap::new()));
56
57pub(crate) use state::UpsertDaemonOpts;
59
60pub struct Supervisor {
61 pub(crate) state_file: Mutex<StateFile>,
62 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
63 pub(crate) last_refreshed_at: Mutex<time::Instant>,
64 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
66 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
68 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
70 pub(crate) active_monitors: AtomicU32,
74 pub(crate) monitor_done: Notify,
77}
78
79pub(crate) fn interval_duration() -> Duration {
80 settings().general_interval()
81}
82
83pub static SUPERVISOR: Lazy<Supervisor> =
84 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
85
86pub fn start_if_not_running() -> Result<()> {
87 let sf = StateFile::get();
88 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
89 && let Some(pid) = d.pid
90 && PROCS.is_running(pid)
91 {
92 return Ok(());
93 }
94 start_in_background()
95}
96
97pub fn start_in_background() -> Result<()> {
98 debug!("starting supervisor in background");
99 let log_file = &*env::PITCHFORK_LOG_FILE;
103 if let Some(parent) = log_file.parent() {
104 let _ = fs::create_dir_all(parent);
105 }
106 let stderr_file = fs::OpenOptions::new()
107 .create(true)
108 .append(true)
109 .open(log_file)
110 .into_diagnostic()?;
111 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
112 .stdout_null()
113 .stderr_file(stderr_file)
114 .start()
115 .into_diagnostic()?;
116 Ok(())
117}
118
119impl Supervisor {
120 pub fn new() -> Result<Self> {
121 Ok(Self {
122 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
123 last_refreshed_at: Mutex::new(time::Instant::now()),
124 pending_notifications: Mutex::new(vec![]),
125 pending_autostops: Mutex::new(HashMap::new()),
126 ipc_shutdown: Mutex::new(None),
127 hook_tasks: Mutex::new(Vec::new()),
128 active_monitors: AtomicU32::new(0),
129 monitor_done: Notify::new(),
130 })
131 }
132
133 pub async fn start(
134 &self,
135 is_boot: bool,
136 container: bool,
137 web_port: Option<u16>,
138 web_path: Option<String>,
139 ) -> Result<()> {
140 #[cfg(unix)]
145 fix_state_dir_permissions();
146
147 let pid = std::process::id();
148 let container_mode = container || settings().supervisor.container;
150 if container_mode {
151 info!("Starting supervisor in container/PID1 mode with pid {pid}");
152 } else {
153 info!("Starting supervisor with pid {pid}");
154 }
155
156 self.upsert_daemon(
157 UpsertDaemonOpts::builder(DaemonId::pitchfork())
158 .set(|o| {
159 o.pid = Some(pid);
160 o.status = DaemonStatus::Running;
161 })
162 .build(),
163 )
164 .await?;
165
166 if is_boot {
168 info!("Boot start mode enabled, starting boot_start daemons");
169 self.start_boot_daemons().await?;
170 }
171
172 self.interval_watch()?;
173 self.cron_watch()?;
174 self.signals()?;
175 self.daemon_file_watch()?;
176
177 #[cfg(unix)]
179 if container_mode {
180 self.reap_zombies()?;
181 }
182
183 let s = settings();
185 let effective_port = web_port.or_else(|| {
186 if s.web.auto_start {
187 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
188 Some(p) => Some(p),
189 None => {
190 error!(
191 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
192 s.web.bind_port
193 );
194 None
195 }
196 }
197 } else {
198 None
199 }
200 });
201 let effective_path = web_path.or_else(|| {
203 let bp = s.web.base_path.clone();
204 if bp.is_empty() { None } else { Some(bp) }
205 });
206 if let Some(port) = effective_port {
207 tokio::spawn(async move {
208 if let Err(e) = crate::web::serve(port, effective_path).await {
209 error!("Web server error: {e}");
210 }
211 });
212 }
213
214 if s.proxy.enable {
216 #[cfg(feature = "proxy-tls")]
220 if s.proxy.https {
221 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
222 let ca_cert_path = proxy_dir.join("ca.pem");
223 let ca_key_path = proxy_dir.join("ca-key.pem");
224 if !ca_cert_path.exists() || !ca_key_path.exists() {
225 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
226 Ok(()) => {
227 info!(
228 "Generated local CA certificate at {}",
229 ca_cert_path.display()
230 );
231 info!("To trust the CA in your browser, run: pitchfork proxy trust");
232 }
233 Err(e) => {
234 error!("Failed to generate CA certificate: {e}");
235 }
236 }
237 }
238 }
239 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
243 tokio::spawn(async {
244 if let Err(e) = crate::proxy::server::serve(bind_tx).await {
245 error!("Proxy server error: {e}");
246 }
247 });
248 match bind_rx.await {
249 Ok(Ok(())) => {
250 }
252 Ok(Err(msg)) => {
253 error!("{msg}");
254 self.add_notification(log::LevelFilter::Error, msg).await;
255 }
256 Err(_) => {
257 }
261 }
262 }
263
264 let (ipc, ipc_handle) = IpcServer::new()?;
265 *self.ipc_shutdown.lock().await = Some(ipc_handle);
266 self.conn_watch(ipc).await
267 }
268
269 pub(crate) async fn refresh(&self) -> Result<()> {
270 trace!("refreshing");
271
272 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
275 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
276
277 if pids_to_check.is_empty() {
278 trace!("no shell PIDs to check, skipping process refresh");
280 } else {
281 PROCS.refresh_pids(&pids_to_check);
282 }
283
284 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
285 *last_refreshed_at = time::Instant::now();
286
287 for (dir, pids) in dirs_with_pids {
288 let to_remove = pids
289 .iter()
290 .filter(|pid| !PROCS.is_running(**pid))
291 .collect::<Vec<_>>();
292 for pid in &to_remove {
293 self.remove_shell_pid(**pid).await?
294 }
295 if to_remove.len() == pids.len() {
296 self.leave_dir(&dir).await?;
297 }
298 }
299
300 self.check_retry().await?;
301 self.process_pending_autostops().await?;
302
303 Ok(())
304 }
305
306 #[cfg(unix)]
331 fn reap_zombies(&self) -> Result<()> {
332 let mut stream = signal::unix::signal(SignalKind::child())
333 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
334 tokio::spawn(async move {
335 loop {
336 stream.recv().await;
337 let managed_pids: HashSet<u32> = SUPERVISOR
339 .state_file
340 .lock()
341 .await
342 .daemons
343 .values()
344 .filter_map(|d| d.pid)
345 .collect();
346 Self::reap_unmanaged_zombies(&managed_pids).await;
348 }
349 });
350 info!("container mode: SIGCHLD zombie reaper installed");
351 Ok(())
352 }
353
354 #[cfg(target_os = "linux")]
360 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
361 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
362 use nix::unistd::Pid;
363
364 loop {
365 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
367 match waitid(Id::All, peek_flags) {
368 Ok(WaitStatus::StillAlive) => break,
369 Ok(status) => {
370 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
371 break;
372 };
373 if managed_pids.contains(&pid_raw) {
374 trace!(
378 "zombie reaper: skipping managed daemon pid {pid_raw}, \
379 leaving for Tokio to reap"
380 );
381 break;
382 }
383 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
385 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
386 Err(nix::errno::Errno::ECHILD) => break,
387 Err(e) => {
388 trace!("waitpid error reaping pid {pid_raw}: {e}");
389 break;
390 }
391 }
392 }
393 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
395 trace!("waitid error in zombie reaper: {e}");
396 break;
397 }
398 }
399 }
400 }
401
402 #[cfg(all(unix, not(target_os = "linux")))]
408 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
409 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
410
411 loop {
412 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
413 Ok(WaitStatus::StillAlive) => break,
414 Ok(status) => {
415 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
416 continue;
417 };
418 if managed_pids.contains(&pid) {
419 let exit_code = match status {
421 WaitStatus::Exited(_, code) => code,
422 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
423 _ => -1,
424 };
425 warn!(
426 "zombie reaper reaped managed daemon pid {pid} \
427 (exit_code={exit_code}); stashing status for recovery"
428 );
429 REAPED_STATUSES.lock().await.insert(pid, exit_code);
430 } else {
431 trace!("reaped orphaned zombie child: {status:?}");
432 }
433 }
434 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
436 trace!("waitpid error in zombie reaper: {e}");
437 break;
438 }
439 }
440 }
441 }
442
443 #[cfg(unix)]
444 fn signals(&self) -> Result<()> {
445 let signals = [
446 SignalKind::terminate(),
447 SignalKind::alarm(),
448 SignalKind::interrupt(),
449 SignalKind::quit(),
450 SignalKind::hangup(),
451 SignalKind::user_defined1(),
452 SignalKind::user_defined2(),
453 ];
454 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
455 for signal in signals {
456 let stream = match signal::unix::signal(signal) {
457 Ok(s) => s,
458 Err(e) => {
459 warn!("Failed to register signal handler for {signal:?}: {e}");
460 continue;
461 }
462 };
463 tokio::spawn(async move {
464 let mut stream = stream;
465 loop {
466 stream.recv().await;
467 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
468 exit(1);
469 } else {
470 SUPERVISOR.handle_signal().await;
471 }
472 }
473 });
474 }
475 Ok(())
476 }
477
478 #[cfg(windows)]
479 fn signals(&self) -> Result<()> {
480 tokio::spawn(async move {
481 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
482 loop {
483 if let Err(e) = signal::ctrl_c().await {
484 error!("Failed to wait for ctrl-c: {}", e);
485 return;
486 }
487 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
488 exit(1);
489 } else {
490 SUPERVISOR.handle_signal().await;
491 }
492 }
493 });
494 Ok(())
495 }
496
497 async fn handle_signal(&self) {
498 info!("received signal, stopping");
499 self.close().await;
500 exit(0)
501 }
502
503 pub(crate) async fn close(&self) {
504 let pitchfork_id = DaemonId::pitchfork();
505 let active = self.active_daemons().await;
506 let active_ids: Vec<DaemonId> = active
507 .iter()
508 .filter(|d| d.id != pitchfork_id)
509 .map(|d| d.id.clone())
510 .collect();
511
512 let stop_levels = compute_reverse_stop_order(&active_ids);
517 for level in &stop_levels {
518 let mut tasks = Vec::new();
519 for id in level {
520 let id = id.clone();
521 tasks.push(tokio::spawn(async move {
522 if let Err(err) = SUPERVISOR.stop(&id).await {
523 error!("failed to stop daemon {id}: {err}");
524 }
525 }));
526 }
527 for task in tasks {
528 let _ = task.await;
529 }
530 }
531 let _ = self.remove_daemon(&pitchfork_id).await;
532
533 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
535 handle.shutdown();
536 }
537
538 let drain_timeout = time::sleep(Duration::from_secs(5));
544 tokio::pin!(drain_timeout);
545 loop {
546 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
547 break;
548 }
549 tokio::select! {
550 _ = self.monitor_done.notified() => {}
551 _ = &mut drain_timeout => {
552 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
553 break;
554 }
555 }
556 }
557 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
558 let hook_timeout = Duration::from_secs(30);
559 for handle in handles {
560 match time::timeout(hook_timeout, handle).await {
561 Ok(_) => {} Err(_) => {
563 warn!(
564 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
565 );
566 }
567 }
568 }
569
570 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
571 }
572
573 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
574 self.pending_notifications
575 .lock()
576 .await
577 .push((level, message));
578 }
579}
580
581#[cfg(unix)]
599fn fix_state_dir_permissions() {
600 let state_dir = &*env::PITCHFORK_STATE_DIR;
601 if !state_dir.exists() {
602 return;
603 }
604
605 let sudo_ids = parse_sudo_ids();
607
608 if let Some((uid, gid)) = sudo_ids {
609 chown_recursive(state_dir, uid, gid, true);
611 debug!(
612 "chowned state directory to uid={uid} gid={gid} at {}",
613 state_dir.display()
614 );
615 } else {
616 chmod_safe_subtrees(state_dir);
619 debug!(
620 "relaxed permissions on safe subtrees at {}",
621 state_dir.display()
622 );
623 }
624}
625
626#[cfg(unix)]
628fn parse_sudo_ids() -> Option<(u32, u32)> {
629 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
630 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
631 Some((uid, gid))
632}
633
634#[cfg(unix)]
637fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
638 let _ = chown_path(dir, uid, gid);
640
641 let entries = match std::fs::read_dir(dir) {
642 Ok(e) => e,
643 Err(_) => return,
644 };
645 for entry in entries.flatten() {
646 let path = entry.path();
647 if path.is_dir() {
648 if skip_proxy {
650 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
651 if name == "proxy" {
652 continue;
653 }
654 }
655 }
656 chown_recursive(&path, uid, gid, false);
657 } else {
658 let _ = chown_path(&path, uid, gid);
659 }
660 }
661}
662
663#[cfg(unix)]
665fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
666 use std::ffi::CString;
667 use std::os::unix::ffi::OsStrExt;
668 let c_path = CString::new(path.as_os_str().as_bytes())
669 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
670 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
671 if ret == 0 {
672 Ok(())
673 } else {
674 Err(std::io::Error::last_os_error())
675 }
676}
677
678#[cfg(unix)]
681fn chmod_safe_subtrees(state_dir: &std::path::Path) {
682 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
684
685 let state_file = state_dir.join("state.toml");
687 if state_file.exists() {
688 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
689 }
690
691 for subdir_name in &["sock", "logs"] {
693 let subdir = state_dir.join(subdir_name);
694 if subdir.is_dir() {
695 chmod_recursive(&subdir);
696 }
697 }
698}
699
700#[cfg(unix)]
702fn chmod_recursive(dir: &std::path::Path) {
703 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
704 let entries = match fs::read_dir(dir) {
705 Ok(e) => e,
706 Err(_) => return,
707 };
708 for entry in entries.flatten() {
709 let path = entry.path();
710 if path.is_dir() {
711 chmod_recursive(&path);
712 } else {
713 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
714 }
715 }
716}