pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15mod retry;
16mod state;
17mod watchers;
18
19use crate::daemon_id::DaemonId;
20use crate::daemon_status::DaemonStatus;
21use crate::deps::compute_reverse_stop_order;
22use crate::ipc::server::{IpcServer, IpcServerHandle};
23
24use crate::procs::PROCS;
25use crate::settings::settings;
26use crate::state_file::StateFile;
27use crate::{Result, env};
28use duct::cmd;
29use miette::IntoDiagnostic;
30use once_cell::sync::Lazy;
31use std::collections::{HashMap, HashSet};
32use std::fs;
33#[cfg(unix)]
34use std::os::unix::fs::PermissionsExt;
35use std::process::exit;
36use std::sync::atomic;
37use std::sync::atomic::{AtomicBool, AtomicU32};
38use std::time::Duration;
39#[cfg(unix)]
40use tokio::signal::unix::SignalKind;
41use tokio::sync::{Mutex, Notify};
42use tokio::task::JoinHandle;
43use tokio::{signal, time};
44
45#[cfg(all(unix, not(target_os = "linux")))]
54pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
55 Lazy::new(|| Mutex::new(HashMap::new()));
56
57pub(crate) use state::UpsertDaemonOpts;
59
60pub struct Supervisor {
61 pub(crate) state_file: Mutex<StateFile>,
62 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
63 pub(crate) last_refreshed_at: Mutex<time::Instant>,
64 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
66 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
68 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
70 pub(crate) active_monitors: AtomicU32,
74 pub(crate) monitor_done: Notify,
77}
78
79pub(crate) fn interval_duration() -> Duration {
80 settings().general_interval()
81}
82
83pub static SUPERVISOR: Lazy<Supervisor> =
84 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
85
86pub fn start_if_not_running() -> Result<()> {
87 let sf = StateFile::get();
88 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
89 && let Some(pid) = d.pid
90 && PROCS.is_running(pid)
91 {
92 return Ok(());
93 }
94 start_in_background()
95}
96
97pub fn start_in_background() -> Result<()> {
98 debug!("starting supervisor in background");
99 let log_file = &*env::PITCHFORK_LOG_FILE;
103 if let Some(parent) = log_file.parent() {
104 let _ = fs::create_dir_all(parent);
105 }
106 let stderr_file = fs::OpenOptions::new()
107 .create(true)
108 .append(true)
109 .open(log_file)
110 .into_diagnostic()?;
111 #[cfg(unix)]
112 fix_state_dir_permissions();
113 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
114 .stdout_null()
115 .stderr_file(stderr_file)
116 .start()
117 .into_diagnostic()?;
118 Ok(())
119}
120
121impl Supervisor {
122 pub fn new() -> Result<Self> {
123 Ok(Self {
124 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
125 last_refreshed_at: Mutex::new(time::Instant::now()),
126 pending_notifications: Mutex::new(vec![]),
127 pending_autostops: Mutex::new(HashMap::new()),
128 ipc_shutdown: Mutex::new(None),
129 hook_tasks: Mutex::new(Vec::new()),
130 active_monitors: AtomicU32::new(0),
131 monitor_done: Notify::new(),
132 })
133 }
134
135 pub async fn start(
136 &self,
137 is_boot: bool,
138 container: bool,
139 web_port: Option<u16>,
140 web_path: Option<String>,
141 ) -> Result<()> {
142 #[cfg(unix)]
147 fix_state_dir_permissions();
148
149 let pid = std::process::id();
150 let container_mode = container || settings().supervisor.container;
152 if container_mode {
153 info!("Starting supervisor in container/PID1 mode with pid {pid}");
154 } else {
155 info!("Starting supervisor with pid {pid}");
156 }
157
158 self.upsert_daemon(
159 UpsertDaemonOpts::builder(DaemonId::pitchfork())
160 .set(|o| {
161 o.pid = Some(pid);
162 o.status = DaemonStatus::Running;
163 })
164 .build(),
165 )
166 .await?;
167 #[cfg(unix)]
168 fix_state_dir_permissions();
169
170 if is_boot {
172 info!("Boot start mode enabled, starting boot_start daemons");
173 self.start_boot_daemons().await?;
174 }
175
176 self.interval_watch()?;
177 self.cron_watch()?;
178 self.signals()?;
179 self.daemon_file_watch()?;
180
181 #[cfg(unix)]
183 if container_mode {
184 self.reap_zombies()?;
185 }
186
187 let s = settings();
189 let effective_port = web_port.or_else(|| {
190 if s.web.auto_start {
191 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
192 Some(p) => Some(p),
193 None => {
194 error!(
195 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
196 s.web.bind_port
197 );
198 None
199 }
200 }
201 } else {
202 None
203 }
204 });
205 let effective_path = web_path.or_else(|| {
207 let bp = s.web.base_path.clone();
208 if bp.is_empty() { None } else { Some(bp) }
209 });
210 if let Some(port) = effective_port {
211 tokio::spawn(async move {
212 if let Err(e) = crate::web::serve(port, effective_path).await {
213 error!("Web server error: {e}");
214 }
215 });
216 }
217
218 if s.proxy.enable {
220 #[cfg(feature = "proxy-tls")]
224 if s.proxy.https {
225 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
226 let ca_cert_path = proxy_dir.join("ca.pem");
227 let ca_key_path = proxy_dir.join("ca-key.pem");
228 if !ca_cert_path.exists() || !ca_key_path.exists() {
229 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
230 Ok(()) => {
231 info!(
232 "Generated local CA certificate at {}",
233 ca_cert_path.display()
234 );
235 info!("To trust the CA in your browser, run: pitchfork proxy trust");
236 }
237 Err(e) => {
238 error!("Failed to generate CA certificate: {e}");
239 }
240 }
241 }
242 }
243 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
247 tokio::spawn(async {
248 if let Err(e) = crate::proxy::server::serve(bind_tx).await {
249 error!("Proxy server error: {e}");
250 }
251 });
252 match bind_rx.await {
253 Ok(Ok(())) => {
254 }
256 Ok(Err(msg)) => {
257 error!("{msg}");
258 self.add_notification(log::LevelFilter::Error, msg).await;
259 }
260 Err(_) => {
261 }
265 }
266 }
267
268 let (ipc, ipc_handle) = IpcServer::new()?;
269 *self.ipc_shutdown.lock().await = Some(ipc_handle);
270 self.conn_watch(ipc).await
271 }
272
273 pub(crate) async fn refresh(&self) -> Result<()> {
274 trace!("refreshing");
275
276 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
279 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
280
281 if pids_to_check.is_empty() {
282 trace!("no shell PIDs to check, skipping process refresh");
284 } else {
285 PROCS.refresh_pids(&pids_to_check);
286 }
287
288 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
289 *last_refreshed_at = time::Instant::now();
290
291 for (dir, pids) in dirs_with_pids {
292 let to_remove = pids
293 .iter()
294 .filter(|pid| !PROCS.is_running(**pid))
295 .collect::<Vec<_>>();
296 for pid in &to_remove {
297 self.remove_shell_pid(**pid).await?
298 }
299 if to_remove.len() == pids.len() {
300 self.leave_dir(&dir).await?;
301 }
302 }
303
304 self.check_retry().await?;
305 self.process_pending_autostops().await?;
306
307 Ok(())
308 }
309
310 #[cfg(unix)]
335 fn reap_zombies(&self) -> Result<()> {
336 let mut stream = signal::unix::signal(SignalKind::child())
337 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
338 tokio::spawn(async move {
339 loop {
340 stream.recv().await;
341 let managed_pids: HashSet<u32> = SUPERVISOR
343 .state_file
344 .lock()
345 .await
346 .daemons
347 .values()
348 .filter_map(|d| d.pid)
349 .collect();
350 Self::reap_unmanaged_zombies(&managed_pids).await;
352 }
353 });
354 info!("container mode: SIGCHLD zombie reaper installed");
355 Ok(())
356 }
357
358 #[cfg(target_os = "linux")]
364 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
365 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
366 use nix::unistd::Pid;
367
368 loop {
369 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
371 match waitid(Id::All, peek_flags) {
372 Ok(WaitStatus::StillAlive) => break,
373 Ok(status) => {
374 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
375 break;
376 };
377 if managed_pids.contains(&pid_raw) {
378 trace!(
382 "zombie reaper: skipping managed daemon pid {pid_raw}, \
383 leaving for Tokio to reap"
384 );
385 break;
386 }
387 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
389 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
390 Err(nix::errno::Errno::ECHILD) => break,
391 Err(e) => {
392 trace!("waitpid error reaping pid {pid_raw}: {e}");
393 break;
394 }
395 }
396 }
397 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
399 trace!("waitid error in zombie reaper: {e}");
400 break;
401 }
402 }
403 }
404 }
405
406 #[cfg(all(unix, not(target_os = "linux")))]
412 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
413 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
414
415 loop {
416 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
417 Ok(WaitStatus::StillAlive) => break,
418 Ok(status) => {
419 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
420 continue;
421 };
422 if managed_pids.contains(&pid) {
423 let exit_code = match status {
425 WaitStatus::Exited(_, code) => code,
426 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
427 _ => -1,
428 };
429 warn!(
430 "zombie reaper reaped managed daemon pid {pid} \
431 (exit_code={exit_code}); stashing status for recovery"
432 );
433 REAPED_STATUSES.lock().await.insert(pid, exit_code);
434 } else {
435 trace!("reaped orphaned zombie child: {status:?}");
436 }
437 }
438 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
440 trace!("waitpid error in zombie reaper: {e}");
441 break;
442 }
443 }
444 }
445 }
446
447 #[cfg(unix)]
448 fn signals(&self) -> Result<()> {
449 let signals = [
450 SignalKind::terminate(),
451 SignalKind::alarm(),
452 SignalKind::interrupt(),
453 SignalKind::quit(),
454 SignalKind::hangup(),
455 SignalKind::user_defined1(),
456 SignalKind::user_defined2(),
457 ];
458 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
459 for signal in signals {
460 let stream = match signal::unix::signal(signal) {
461 Ok(s) => s,
462 Err(e) => {
463 warn!("Failed to register signal handler for {signal:?}: {e}");
464 continue;
465 }
466 };
467 tokio::spawn(async move {
468 let mut stream = stream;
469 loop {
470 stream.recv().await;
471 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
472 exit(1);
473 } else {
474 SUPERVISOR.handle_signal().await;
475 }
476 }
477 });
478 }
479 Ok(())
480 }
481
482 #[cfg(windows)]
483 fn signals(&self) -> Result<()> {
484 tokio::spawn(async move {
485 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
486 loop {
487 if let Err(e) = signal::ctrl_c().await {
488 error!("Failed to wait for ctrl-c: {}", e);
489 return;
490 }
491 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
492 exit(1);
493 } else {
494 SUPERVISOR.handle_signal().await;
495 }
496 }
497 });
498 Ok(())
499 }
500
501 async fn handle_signal(&self) {
502 info!("received signal, stopping");
503 self.close().await;
504 exit(0)
505 }
506
507 pub(crate) async fn close(&self) {
508 let pitchfork_id = DaemonId::pitchfork();
509 let active = self.active_daemons().await;
510 let active_ids: Vec<DaemonId> = active
511 .iter()
512 .filter(|d| d.id != pitchfork_id)
513 .map(|d| d.id.clone())
514 .collect();
515
516 let stop_levels = compute_reverse_stop_order(&active_ids);
521 for level in &stop_levels {
522 let mut tasks = Vec::new();
523 for id in level {
524 let id = id.clone();
525 tasks.push(tokio::spawn(async move {
526 if let Err(err) = SUPERVISOR.stop(&id).await {
527 error!("failed to stop daemon {id}: {err}");
528 }
529 }));
530 }
531 for task in tasks {
532 let _ = task.await;
533 }
534 }
535 let _ = self.remove_daemon(&pitchfork_id).await;
536
537 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
539 handle.shutdown();
540 }
541
542 let drain_timeout = time::sleep(Duration::from_secs(5));
548 tokio::pin!(drain_timeout);
549 loop {
550 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
551 break;
552 }
553 tokio::select! {
554 _ = self.monitor_done.notified() => {}
555 _ = &mut drain_timeout => {
556 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
557 break;
558 }
559 }
560 }
561 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
562 let hook_timeout = Duration::from_secs(30);
563 for handle in handles {
564 match time::timeout(hook_timeout, handle).await {
565 Ok(_) => {} Err(_) => {
567 warn!(
568 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
569 );
570 }
571 }
572 }
573
574 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
575 }
576
577 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
578 self.pending_notifications
579 .lock()
580 .await
581 .push((level, message));
582 }
583}
584
585#[cfg(unix)]
603fn fix_state_dir_permissions() {
604 let state_dir = &*env::PITCHFORK_STATE_DIR;
605 if let Some((uid, gid)) = state_owner_ids() {
606 if !state_dir.exists()
607 && let Err(err) = fs::create_dir_all(state_dir)
608 {
609 warn!(
610 "failed to create state directory for ownership fix at {}: {err}",
611 state_dir.display()
612 );
613 return;
614 }
615
616 chown_recursive(state_dir, uid, gid, true);
618 debug!(
619 "chowned state directory to uid={uid} gid={gid} at {}",
620 state_dir.display()
621 );
622 } else {
623 if !state_dir.exists() {
624 return;
625 }
626
627 chmod_safe_subtrees(state_dir);
630 debug!(
631 "relaxed permissions on safe subtrees at {}",
632 state_dir.display()
633 );
634 }
635}
636
637#[cfg(unix)]
638pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
639 if !nix::unistd::Uid::effective().is_root() {
640 return None;
641 }
642
643 let user = settings().supervisor.user.trim();
644 if !user.is_empty() {
645 return resolve_supervisor_user_ids(user).or_else(|| {
646 warn!(
647 "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
648 );
649 parse_sudo_ids()
650 });
651 }
652
653 parse_sudo_ids()
654}
655
656#[cfg(unix)]
657fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
658 let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
659 let uid = user.parse::<u32>().ok()?;
660 nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
661 .ok()
662 .flatten()
663 } else {
664 nix::unistd::User::from_name(user).ok().flatten()
665 }?;
666
667 Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
668}
669
670#[cfg(unix)]
676fn parse_sudo_ids() -> Option<(u32, u32)> {
677 if !nix::unistd::Uid::effective().is_root() {
678 return None;
679 }
680 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
681 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
682 Some((uid, gid))
683}
684
685#[cfg(unix)]
688fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
689 let _ = chown_path(dir, uid, gid);
691
692 let entries = match std::fs::read_dir(dir) {
693 Ok(e) => e,
694 Err(_) => return,
695 };
696 for entry in entries.flatten() {
697 let path = entry.path();
698 if path.is_dir() {
699 if skip_proxy {
701 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
702 if name == "proxy" {
703 continue;
704 }
705 }
706 }
707 chown_recursive(&path, uid, gid, false);
708 } else {
709 let _ = chown_path(&path, uid, gid);
710 }
711 }
712}
713
714#[cfg(unix)]
716fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
717 use std::ffi::CString;
718 use std::os::unix::ffi::OsStrExt;
719 let c_path = CString::new(path.as_os_str().as_bytes())
720 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
721 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
722 if ret == 0 {
723 Ok(())
724 } else {
725 Err(std::io::Error::last_os_error())
726 }
727}
728
729#[cfg(unix)]
732fn chmod_safe_subtrees(state_dir: &std::path::Path) {
733 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
735
736 let state_file = state_dir.join("state.toml");
738 if state_file.exists() {
739 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
740 }
741
742 for subdir_name in &["sock", "logs"] {
744 let subdir = state_dir.join(subdir_name);
745 if subdir.is_dir() {
746 chmod_recursive(&subdir);
747 }
748 }
749}
750
751#[cfg(unix)]
753fn chmod_recursive(dir: &std::path::Path) {
754 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
755 let entries = match fs::read_dir(dir) {
756 Ok(e) => e,
757 Err(_) => return,
758 };
759 for entry in entries.flatten() {
760 let path = entry.path();
761 if path.is_dir() {
762 chmod_recursive(&path);
763 } else {
764 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
765 }
766 }
767}