pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59 Lazy::new(|| Mutex::new(HashMap::new()));
60
61pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65 pub(crate) state_file: Mutex<StateFile>,
66 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67 pub(crate) last_refreshed_at: Mutex<time::Instant>,
68 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74 pub(crate) active_monitors: AtomicU32,
78 pub(crate) monitor_done: Notify,
81}
82
83pub(crate) fn interval_duration() -> Duration {
84 settings().general_interval()
85}
86
87pub static SUPERVISOR: Lazy<Supervisor> =
88 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
89
90pub fn start_if_not_running() -> Result<()> {
91 let sf = StateFile::get();
92 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
93 && let Some(pid) = d.pid
94 && PROCS.is_running(pid)
95 {
96 return Ok(());
97 }
98 start_in_background()
99}
100
101pub fn start_in_background() -> Result<()> {
102 debug!("starting supervisor in background");
103 let log_file = &*env::PITCHFORK_LOG_FILE;
107 if let Some(parent) = log_file.parent() {
108 let _ = fs::create_dir_all(parent);
109 }
110 let stderr_file = fs::OpenOptions::new()
111 .create(true)
112 .append(true)
113 .open(log_file)
114 .into_diagnostic()?;
115 #[cfg(unix)]
116 fix_state_dir_permissions();
117 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
118 .stdout_null()
119 .stderr_file(stderr_file)
120 .start()
121 .into_diagnostic()?;
122 Ok(())
123}
124
125impl Supervisor {
126 pub fn new() -> Result<Self> {
127 Ok(Self {
128 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
129 last_refreshed_at: Mutex::new(time::Instant::now()),
130 pending_notifications: Mutex::new(vec![]),
131 pending_autostops: Mutex::new(HashMap::new()),
132 ipc_shutdown: Mutex::new(None),
133 hook_tasks: Mutex::new(Vec::new()),
134 active_monitors: AtomicU32::new(0),
135 monitor_done: Notify::new(),
136 })
137 }
138
139 pub async fn start(
140 &self,
141 is_boot: bool,
142 container: bool,
143 web_port: Option<u16>,
144 web_path: Option<String>,
145 ) -> Result<()> {
146 #[cfg(unix)]
151 fix_state_dir_permissions();
152
153 let pid = std::process::id();
154 let container_mode = container || settings().supervisor.container;
156 if container_mode {
157 info!("Starting supervisor in container/PID1 mode with pid {pid}");
158 } else {
159 info!("Starting supervisor with pid {pid}");
160 }
161
162 self.upsert_daemon(
163 UpsertDaemonOpts::builder(DaemonId::pitchfork())
164 .set(|o| {
165 o.pid = Some(pid);
166 o.status = DaemonStatus::Running;
167 })
168 .build(),
169 )
170 .await?;
171 #[cfg(unix)]
172 fix_state_dir_permissions();
173
174 if is_boot {
176 info!("Boot start mode enabled, starting boot_start daemons");
177 self.start_boot_daemons().await?;
178 }
179
180 self.interval_watch()?;
181 self.cron_watch()?;
182 self.signals()?;
183 self.daemon_file_watch()?;
184
185 #[cfg(unix)]
187 if container_mode {
188 self.reap_zombies()?;
189 }
190
191 let s = settings();
193 let effective_port = web_port.or_else(|| {
194 if s.web.auto_start {
195 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
196 Some(p) => Some(p),
197 None => {
198 error!(
199 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
200 s.web.bind_port
201 );
202 None
203 }
204 }
205 } else {
206 None
207 }
208 });
209 let effective_path = web_path.or_else(|| {
211 let bp = s.web.base_path.clone();
212 if bp.is_empty() { None } else { Some(bp) }
213 });
214 if let Some(port) = effective_port {
215 tokio::spawn(async move {
216 if let Err(e) = crate::web::serve(port, effective_path).await {
217 error!("Web server error: {e}");
218 }
219 });
220 }
221
222 if s.proxy.enable {
224 #[cfg(feature = "proxy-tls")]
228 if s.proxy.https {
229 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
230 let ca_cert_path = proxy_dir.join("ca.pem");
231 let ca_key_path = proxy_dir.join("ca-key.pem");
232 if !ca_cert_path.exists() || !ca_key_path.exists() {
233 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
234 Ok(()) => {
235 info!(
236 "Generated local CA certificate at {}",
237 ca_cert_path.display()
238 );
239 info!("To trust the CA in your browser, run: pitchfork proxy trust");
240 }
241 Err(e) => {
242 error!("Failed to generate CA certificate: {e}");
243 }
244 }
245 }
246 }
247 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
251 tokio::spawn(async {
252 if let Err(e) = crate::proxy::server::serve(bind_tx).await {
253 error!("Proxy server error: {e}");
254 }
255 });
256 match bind_rx.await {
257 Ok(Ok(())) => {
258 }
260 Ok(Err(msg)) => {
261 error!("{msg}");
262 self.add_notification(log::LevelFilter::Error, msg).await;
263 }
264 Err(_) => {
265 }
269 }
270 }
271
272 let (ipc, ipc_handle) = IpcServer::new()?;
273 *self.ipc_shutdown.lock().await = Some(ipc_handle);
274 self.conn_watch(ipc).await
275 }
276
277 pub(crate) async fn refresh(&self) -> Result<()> {
278 trace!("refreshing");
279
280 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
283 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
284
285 if pids_to_check.is_empty() {
286 trace!("no shell PIDs to check, skipping process refresh");
288 } else {
289 PROCS.refresh_pids(&pids_to_check);
290 }
291
292 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
293 *last_refreshed_at = time::Instant::now();
294
295 for (dir, pids) in dirs_with_pids {
296 let to_remove = pids
297 .iter()
298 .filter(|pid| !PROCS.is_running(**pid))
299 .collect::<Vec<_>>();
300 for pid in &to_remove {
301 self.remove_shell_pid(**pid).await?
302 }
303 if to_remove.len() == pids.len() {
304 self.leave_dir(&dir).await?;
305 }
306 }
307
308 self.check_retry().await?;
309 self.process_pending_autostops().await?;
310
311 Ok(())
312 }
313
314 #[cfg(unix)]
339 fn reap_zombies(&self) -> Result<()> {
340 let mut stream = signal::unix::signal(SignalKind::child())
341 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
342 tokio::spawn(async move {
343 loop {
344 stream.recv().await;
345 let managed_pids: HashSet<u32> = SUPERVISOR
347 .state_file
348 .lock()
349 .await
350 .daemons
351 .values()
352 .filter_map(|d| d.pid)
353 .collect();
354 Self::reap_unmanaged_zombies(&managed_pids).await;
356 }
357 });
358 info!("container mode: SIGCHLD zombie reaper installed");
359 Ok(())
360 }
361
362 #[cfg(target_os = "linux")]
368 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
369 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
370 use nix::unistd::Pid;
371
372 loop {
373 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
375 match waitid(Id::All, peek_flags) {
376 Ok(WaitStatus::StillAlive) => break,
377 Ok(status) => {
378 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
379 break;
380 };
381 if managed_pids.contains(&pid_raw) {
382 trace!(
386 "zombie reaper: skipping managed daemon pid {pid_raw}, \
387 leaving for Tokio to reap"
388 );
389 break;
390 }
391 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
393 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
394 Err(nix::errno::Errno::ECHILD) => break,
395 Err(e) => {
396 trace!("waitpid error reaping pid {pid_raw}: {e}");
397 break;
398 }
399 }
400 }
401 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
403 trace!("waitid error in zombie reaper: {e}");
404 break;
405 }
406 }
407 }
408 }
409
410 #[cfg(all(unix, not(target_os = "linux")))]
416 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
417 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
418
419 loop {
420 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
421 Ok(WaitStatus::StillAlive) => break,
422 Ok(status) => {
423 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
424 continue;
425 };
426 if managed_pids.contains(&pid) {
427 let exit_code = match status {
429 WaitStatus::Exited(_, code) => code,
430 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
431 _ => -1,
432 };
433 warn!(
434 "zombie reaper reaped managed daemon pid {pid} \
435 (exit_code={exit_code}); stashing status for recovery"
436 );
437 REAPED_STATUSES.lock().await.insert(pid, exit_code);
438 } else {
439 trace!("reaped orphaned zombie child: {status:?}");
440 }
441 }
442 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
444 trace!("waitpid error in zombie reaper: {e}");
445 break;
446 }
447 }
448 }
449 }
450
451 #[cfg(unix)]
452 fn signals(&self) -> Result<()> {
453 let signals = [
454 SignalKind::terminate(),
455 SignalKind::alarm(),
456 SignalKind::interrupt(),
457 SignalKind::quit(),
458 SignalKind::hangup(),
459 SignalKind::user_defined1(),
460 SignalKind::user_defined2(),
461 ];
462 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
463 for signal in signals {
464 let stream = match signal::unix::signal(signal) {
465 Ok(s) => s,
466 Err(e) => {
467 warn!("Failed to register signal handler for {signal:?}: {e}");
468 continue;
469 }
470 };
471 tokio::spawn(async move {
472 let mut stream = stream;
473 loop {
474 stream.recv().await;
475 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
476 exit(1);
477 } else {
478 SUPERVISOR.handle_signal().await;
479 }
480 }
481 });
482 }
483 Ok(())
484 }
485
486 #[cfg(windows)]
487 fn signals(&self) -> Result<()> {
488 tokio::spawn(async move {
489 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
490 loop {
491 if let Err(e) = signal::ctrl_c().await {
492 error!("Failed to wait for ctrl-c: {}", e);
493 return;
494 }
495 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
496 exit(1);
497 } else {
498 SUPERVISOR.handle_signal().await;
499 }
500 }
501 });
502 Ok(())
503 }
504
505 async fn handle_signal(&self) {
506 info!("received signal, stopping");
507 self.close().await;
508 exit(0)
509 }
510
511 pub(crate) async fn close(&self) {
512 let pitchfork_id = DaemonId::pitchfork();
513 let active = self.active_daemons().await;
514 let active_ids: Vec<DaemonId> = active
515 .iter()
516 .filter(|d| d.id != pitchfork_id)
517 .map(|d| d.id.clone())
518 .collect();
519
520 let stop_levels = compute_reverse_stop_order(&active_ids);
525 for level in &stop_levels {
526 let mut tasks = Vec::new();
527 for id in level {
528 let id = id.clone();
529 tasks.push(tokio::spawn(async move {
530 if let Err(err) = SUPERVISOR.stop(&id).await {
531 error!("failed to stop daemon {id}: {err}");
532 }
533 }));
534 }
535 for task in tasks {
536 let _ = task.await;
537 }
538 }
539 let _ = self.remove_daemon(&pitchfork_id).await;
540
541 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
543 handle.shutdown();
544 }
545
546 let drain_timeout = time::sleep(Duration::from_secs(5));
552 tokio::pin!(drain_timeout);
553 loop {
554 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
555 break;
556 }
557 tokio::select! {
558 _ = self.monitor_done.notified() => {}
559 _ = &mut drain_timeout => {
560 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
561 break;
562 }
563 }
564 }
565 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
566 let hook_timeout = Duration::from_secs(30);
567 for handle in handles {
568 match time::timeout(hook_timeout, handle).await {
569 Ok(_) => {} Err(_) => {
571 warn!(
572 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
573 );
574 }
575 }
576 }
577
578 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
579 }
580
581 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
582 self.pending_notifications
583 .lock()
584 .await
585 .push((level, message));
586 }
587}
588
589#[cfg(unix)]
607fn fix_state_dir_permissions() {
608 let state_dir = &*env::PITCHFORK_STATE_DIR;
609 if let Some((uid, gid)) = state_owner_ids() {
610 if !state_dir.exists()
611 && let Err(err) = fs::create_dir_all(state_dir)
612 {
613 warn!(
614 "failed to create state directory for ownership fix at {}: {err}",
615 state_dir.display()
616 );
617 return;
618 }
619
620 chown_recursive(state_dir, uid, gid, true);
622 debug!(
623 "chowned state directory to uid={uid} gid={gid} at {}",
624 state_dir.display()
625 );
626 } else {
627 if !state_dir.exists() {
628 return;
629 }
630
631 chmod_safe_subtrees(state_dir);
634 debug!(
635 "relaxed permissions on safe subtrees at {}",
636 state_dir.display()
637 );
638 }
639}
640
641#[cfg(unix)]
642pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
643 if !nix::unistd::Uid::effective().is_root() {
644 return None;
645 }
646
647 let user = settings().supervisor.user.trim();
648 if !user.is_empty() {
649 return resolve_supervisor_user_ids(user).or_else(|| {
650 warn!(
651 "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
652 );
653 parse_sudo_ids()
654 });
655 }
656
657 parse_sudo_ids()
658}
659
660#[cfg(unix)]
661fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
662 let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
663 let uid = user.parse::<u32>().ok()?;
664 nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
665 .ok()
666 .flatten()
667 } else {
668 nix::unistd::User::from_name(user).ok().flatten()
669 }?;
670
671 Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
672}
673
674#[cfg(unix)]
680fn parse_sudo_ids() -> Option<(u32, u32)> {
681 if !nix::unistd::Uid::effective().is_root() {
682 return None;
683 }
684 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
685 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
686 Some((uid, gid))
687}
688
689#[cfg(unix)]
692fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
693 let _ = chown_path(dir, uid, gid);
695
696 let entries = match std::fs::read_dir(dir) {
697 Ok(e) => e,
698 Err(_) => return,
699 };
700 for entry in entries.flatten() {
701 let path = entry.path();
702 if path.is_dir() {
703 if skip_proxy {
705 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
706 if name == "proxy" {
707 continue;
708 }
709 }
710 }
711 chown_recursive(&path, uid, gid, false);
712 } else {
713 let _ = chown_path(&path, uid, gid);
714 }
715 }
716}
717
718#[cfg(unix)]
720fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
721 use std::ffi::CString;
722 use std::os::unix::ffi::OsStrExt;
723 let c_path = CString::new(path.as_os_str().as_bytes())
724 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
725 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
726 if ret == 0 {
727 Ok(())
728 } else {
729 Err(std::io::Error::last_os_error())
730 }
731}
732
733#[cfg(unix)]
736fn chmod_safe_subtrees(state_dir: &std::path::Path) {
737 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
739
740 let state_file = state_dir.join("state.toml");
742 if state_file.exists() {
743 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
744 }
745
746 for subdir_name in &["sock", "logs"] {
748 let subdir = state_dir.join(subdir_name);
749 if subdir.is_dir() {
750 chmod_recursive(&subdir);
751 }
752 }
753}
754
755#[cfg(unix)]
757fn chmod_recursive(dir: &std::path::Path) {
758 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
759 let entries = match fs::read_dir(dir) {
760 Ok(e) => e,
761 Err(_) => return,
762 };
763 for entry in entries.flatten() {
764 let path = entry.path();
765 if path.is_dir() {
766 chmod_recursive(&path);
767 } else {
768 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
769 }
770 }
771}