pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15mod pty;
16mod retry;
17mod state;
18mod watchers;
19
20use crate::daemon_id::DaemonId;
21use crate::daemon_status::DaemonStatus;
22use crate::deps::compute_reverse_stop_order;
23use crate::ipc::server::{IpcServer, IpcServerHandle};
24
25use crate::procs::PROCS;
26use crate::settings::settings;
27use crate::state_file::StateFile;
28use crate::{Result, env};
29use duct::cmd;
30use miette::IntoDiagnostic;
31use once_cell::sync::Lazy;
32use std::collections::{HashMap, HashSet};
33use std::fs;
34#[cfg(unix)]
35use std::os::unix::fs::PermissionsExt;
36use std::process::exit;
37use std::sync::atomic;
38use std::sync::atomic::{AtomicBool, AtomicU32};
39use std::time::Duration;
40#[cfg(unix)]
41use tokio::signal::unix::SignalKind;
42use tokio::sync::{Mutex, Notify};
43use tokio::task::JoinHandle;
44use tokio::{signal, time};
45
46#[cfg(all(unix, not(target_os = "linux")))]
55pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
56 Lazy::new(|| Mutex::new(HashMap::new()));
57
58pub(crate) use state::UpsertDaemonOpts;
60
61pub struct Supervisor {
62 pub(crate) state_file: Mutex<StateFile>,
63 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
64 pub(crate) last_refreshed_at: Mutex<time::Instant>,
65 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
67 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
69 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
71 pub(crate) active_monitors: AtomicU32,
75 pub(crate) monitor_done: Notify,
78}
79
80pub(crate) fn interval_duration() -> Duration {
81 settings().general_interval()
82}
83
84pub static SUPERVISOR: Lazy<Supervisor> =
85 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
86
87pub fn start_if_not_running() -> Result<()> {
88 let sf = StateFile::get();
89 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
90 && let Some(pid) = d.pid
91 && PROCS.is_running(pid)
92 {
93 return Ok(());
94 }
95 start_in_background()
96}
97
98pub fn start_in_background() -> Result<()> {
99 debug!("starting supervisor in background");
100 let log_file = &*env::PITCHFORK_LOG_FILE;
104 if let Some(parent) = log_file.parent() {
105 let _ = fs::create_dir_all(parent);
106 }
107 let stderr_file = fs::OpenOptions::new()
108 .create(true)
109 .append(true)
110 .open(log_file)
111 .into_diagnostic()?;
112 #[cfg(unix)]
113 fix_state_dir_permissions();
114 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
115 .stdout_null()
116 .stderr_file(stderr_file)
117 .start()
118 .into_diagnostic()?;
119 Ok(())
120}
121
122impl Supervisor {
123 pub fn new() -> Result<Self> {
124 Ok(Self {
125 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
126 last_refreshed_at: Mutex::new(time::Instant::now()),
127 pending_notifications: Mutex::new(vec![]),
128 pending_autostops: Mutex::new(HashMap::new()),
129 ipc_shutdown: Mutex::new(None),
130 hook_tasks: Mutex::new(Vec::new()),
131 active_monitors: AtomicU32::new(0),
132 monitor_done: Notify::new(),
133 })
134 }
135
136 pub async fn start(
137 &self,
138 is_boot: bool,
139 container: bool,
140 web_port: Option<u16>,
141 web_path: Option<String>,
142 ) -> Result<()> {
143 #[cfg(unix)]
148 fix_state_dir_permissions();
149
150 let pid = std::process::id();
151 let container_mode = container || settings().supervisor.container;
153 if container_mode {
154 info!("Starting supervisor in container/PID1 mode with pid {pid}");
155 } else {
156 info!("Starting supervisor with pid {pid}");
157 }
158
159 self.upsert_daemon(
160 UpsertDaemonOpts::builder(DaemonId::pitchfork())
161 .set(|o| {
162 o.pid = Some(pid);
163 o.status = DaemonStatus::Running;
164 })
165 .build(),
166 )
167 .await?;
168 #[cfg(unix)]
169 fix_state_dir_permissions();
170
171 if is_boot {
173 info!("Boot start mode enabled, starting boot_start daemons");
174 self.start_boot_daemons().await?;
175 }
176
177 self.interval_watch()?;
178 self.cron_watch()?;
179 self.signals()?;
180 self.daemon_file_watch()?;
181
182 #[cfg(unix)]
184 if container_mode {
185 self.reap_zombies()?;
186 }
187
188 let s = settings();
190 let effective_port = web_port.or_else(|| {
191 if s.web.auto_start {
192 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
193 Some(p) => Some(p),
194 None => {
195 error!(
196 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
197 s.web.bind_port
198 );
199 None
200 }
201 }
202 } else {
203 None
204 }
205 });
206 let effective_path = web_path.or_else(|| {
208 let bp = s.web.base_path.clone();
209 if bp.is_empty() { None } else { Some(bp) }
210 });
211 if let Some(port) = effective_port {
212 tokio::spawn(async move {
213 if let Err(e) = crate::web::serve(port, effective_path).await {
214 error!("Web server error: {e}");
215 }
216 });
217 }
218
219 if s.proxy.enable {
221 #[cfg(feature = "proxy-tls")]
225 if s.proxy.https {
226 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
227 let ca_cert_path = proxy_dir.join("ca.pem");
228 let ca_key_path = proxy_dir.join("ca-key.pem");
229 if !ca_cert_path.exists() || !ca_key_path.exists() {
230 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
231 Ok(()) => {
232 info!(
233 "Generated local CA certificate at {}",
234 ca_cert_path.display()
235 );
236 info!("To trust the CA in your browser, run: pitchfork proxy trust");
237 }
238 Err(e) => {
239 error!("Failed to generate CA certificate: {e}");
240 }
241 }
242 }
243 }
244 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
248 tokio::spawn(async {
249 if let Err(e) = crate::proxy::server::serve(bind_tx).await {
250 error!("Proxy server error: {e}");
251 }
252 });
253 match bind_rx.await {
254 Ok(Ok(())) => {
255 }
257 Ok(Err(msg)) => {
258 error!("{msg}");
259 self.add_notification(log::LevelFilter::Error, msg).await;
260 }
261 Err(_) => {
262 }
266 }
267 }
268
269 let (ipc, ipc_handle) = IpcServer::new()?;
270 *self.ipc_shutdown.lock().await = Some(ipc_handle);
271 self.conn_watch(ipc).await
272 }
273
274 pub(crate) async fn refresh(&self) -> Result<()> {
275 trace!("refreshing");
276
277 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
280 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
281
282 if pids_to_check.is_empty() {
283 trace!("no shell PIDs to check, skipping process refresh");
285 } else {
286 PROCS.refresh_pids(&pids_to_check);
287 }
288
289 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
290 *last_refreshed_at = time::Instant::now();
291
292 for (dir, pids) in dirs_with_pids {
293 let to_remove = pids
294 .iter()
295 .filter(|pid| !PROCS.is_running(**pid))
296 .collect::<Vec<_>>();
297 for pid in &to_remove {
298 self.remove_shell_pid(**pid).await?
299 }
300 if to_remove.len() == pids.len() {
301 self.leave_dir(&dir).await?;
302 }
303 }
304
305 self.check_retry().await?;
306 self.process_pending_autostops().await?;
307
308 Ok(())
309 }
310
311 #[cfg(unix)]
336 fn reap_zombies(&self) -> Result<()> {
337 let mut stream = signal::unix::signal(SignalKind::child())
338 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
339 tokio::spawn(async move {
340 loop {
341 stream.recv().await;
342 let managed_pids: HashSet<u32> = SUPERVISOR
344 .state_file
345 .lock()
346 .await
347 .daemons
348 .values()
349 .filter_map(|d| d.pid)
350 .collect();
351 Self::reap_unmanaged_zombies(&managed_pids).await;
353 }
354 });
355 info!("container mode: SIGCHLD zombie reaper installed");
356 Ok(())
357 }
358
359 #[cfg(target_os = "linux")]
365 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
366 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
367 use nix::unistd::Pid;
368
369 loop {
370 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
372 match waitid(Id::All, peek_flags) {
373 Ok(WaitStatus::StillAlive) => break,
374 Ok(status) => {
375 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
376 break;
377 };
378 if managed_pids.contains(&pid_raw) {
379 trace!(
383 "zombie reaper: skipping managed daemon pid {pid_raw}, \
384 leaving for Tokio to reap"
385 );
386 break;
387 }
388 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
390 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
391 Err(nix::errno::Errno::ECHILD) => break,
392 Err(e) => {
393 trace!("waitpid error reaping pid {pid_raw}: {e}");
394 break;
395 }
396 }
397 }
398 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
400 trace!("waitid error in zombie reaper: {e}");
401 break;
402 }
403 }
404 }
405 }
406
407 #[cfg(all(unix, not(target_os = "linux")))]
413 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
414 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
415
416 loop {
417 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
418 Ok(WaitStatus::StillAlive) => break,
419 Ok(status) => {
420 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
421 continue;
422 };
423 if managed_pids.contains(&pid) {
424 let exit_code = match status {
426 WaitStatus::Exited(_, code) => code,
427 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
428 _ => -1,
429 };
430 warn!(
431 "zombie reaper reaped managed daemon pid {pid} \
432 (exit_code={exit_code}); stashing status for recovery"
433 );
434 REAPED_STATUSES.lock().await.insert(pid, exit_code);
435 } else {
436 trace!("reaped orphaned zombie child: {status:?}");
437 }
438 }
439 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
441 trace!("waitpid error in zombie reaper: {e}");
442 break;
443 }
444 }
445 }
446 }
447
448 #[cfg(unix)]
449 fn signals(&self) -> Result<()> {
450 let signals = [
451 SignalKind::terminate(),
452 SignalKind::alarm(),
453 SignalKind::interrupt(),
454 SignalKind::quit(),
455 SignalKind::hangup(),
456 SignalKind::user_defined1(),
457 SignalKind::user_defined2(),
458 ];
459 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
460 for signal in signals {
461 let stream = match signal::unix::signal(signal) {
462 Ok(s) => s,
463 Err(e) => {
464 warn!("Failed to register signal handler for {signal:?}: {e}");
465 continue;
466 }
467 };
468 tokio::spawn(async move {
469 let mut stream = stream;
470 loop {
471 stream.recv().await;
472 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
473 exit(1);
474 } else {
475 SUPERVISOR.handle_signal().await;
476 }
477 }
478 });
479 }
480 Ok(())
481 }
482
483 #[cfg(windows)]
484 fn signals(&self) -> Result<()> {
485 tokio::spawn(async move {
486 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
487 loop {
488 if let Err(e) = signal::ctrl_c().await {
489 error!("Failed to wait for ctrl-c: {}", e);
490 return;
491 }
492 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
493 exit(1);
494 } else {
495 SUPERVISOR.handle_signal().await;
496 }
497 }
498 });
499 Ok(())
500 }
501
502 async fn handle_signal(&self) {
503 info!("received signal, stopping");
504 self.close().await;
505 exit(0)
506 }
507
508 pub(crate) async fn close(&self) {
509 let pitchfork_id = DaemonId::pitchfork();
510 let active = self.active_daemons().await;
511 let active_ids: Vec<DaemonId> = active
512 .iter()
513 .filter(|d| d.id != pitchfork_id)
514 .map(|d| d.id.clone())
515 .collect();
516
517 let stop_levels = compute_reverse_stop_order(&active_ids);
522 for level in &stop_levels {
523 let mut tasks = Vec::new();
524 for id in level {
525 let id = id.clone();
526 tasks.push(tokio::spawn(async move {
527 if let Err(err) = SUPERVISOR.stop(&id).await {
528 error!("failed to stop daemon {id}: {err}");
529 }
530 }));
531 }
532 for task in tasks {
533 let _ = task.await;
534 }
535 }
536 let _ = self.remove_daemon(&pitchfork_id).await;
537
538 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
540 handle.shutdown();
541 }
542
543 let drain_timeout = time::sleep(Duration::from_secs(5));
549 tokio::pin!(drain_timeout);
550 loop {
551 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
552 break;
553 }
554 tokio::select! {
555 _ = self.monitor_done.notified() => {}
556 _ = &mut drain_timeout => {
557 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
558 break;
559 }
560 }
561 }
562 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
563 let hook_timeout = Duration::from_secs(30);
564 for handle in handles {
565 match time::timeout(hook_timeout, handle).await {
566 Ok(_) => {} Err(_) => {
568 warn!(
569 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
570 );
571 }
572 }
573 }
574
575 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
576 }
577
578 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
579 self.pending_notifications
580 .lock()
581 .await
582 .push((level, message));
583 }
584}
585
586#[cfg(unix)]
604fn fix_state_dir_permissions() {
605 let state_dir = &*env::PITCHFORK_STATE_DIR;
606 if let Some((uid, gid)) = state_owner_ids() {
607 if !state_dir.exists()
608 && let Err(err) = fs::create_dir_all(state_dir)
609 {
610 warn!(
611 "failed to create state directory for ownership fix at {}: {err}",
612 state_dir.display()
613 );
614 return;
615 }
616
617 chown_recursive(state_dir, uid, gid, true);
619 debug!(
620 "chowned state directory to uid={uid} gid={gid} at {}",
621 state_dir.display()
622 );
623 } else {
624 if !state_dir.exists() {
625 return;
626 }
627
628 chmod_safe_subtrees(state_dir);
631 debug!(
632 "relaxed permissions on safe subtrees at {}",
633 state_dir.display()
634 );
635 }
636}
637
638#[cfg(unix)]
639pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
640 if !nix::unistd::Uid::effective().is_root() {
641 return None;
642 }
643
644 let user = settings().supervisor.user.trim();
645 if !user.is_empty() {
646 return resolve_supervisor_user_ids(user).or_else(|| {
647 warn!(
648 "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
649 );
650 parse_sudo_ids()
651 });
652 }
653
654 parse_sudo_ids()
655}
656
657#[cfg(unix)]
658fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
659 let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
660 let uid = user.parse::<u32>().ok()?;
661 nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
662 .ok()
663 .flatten()
664 } else {
665 nix::unistd::User::from_name(user).ok().flatten()
666 }?;
667
668 Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
669}
670
671#[cfg(unix)]
677fn parse_sudo_ids() -> Option<(u32, u32)> {
678 if !nix::unistd::Uid::effective().is_root() {
679 return None;
680 }
681 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
682 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
683 Some((uid, gid))
684}
685
686#[cfg(unix)]
689fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
690 let _ = chown_path(dir, uid, gid);
692
693 let entries = match std::fs::read_dir(dir) {
694 Ok(e) => e,
695 Err(_) => return,
696 };
697 for entry in entries.flatten() {
698 let path = entry.path();
699 if path.is_dir() {
700 if skip_proxy {
702 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
703 if name == "proxy" {
704 continue;
705 }
706 }
707 }
708 chown_recursive(&path, uid, gid, false);
709 } else {
710 let _ = chown_path(&path, uid, gid);
711 }
712 }
713}
714
715#[cfg(unix)]
717fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
718 use std::ffi::CString;
719 use std::os::unix::ffi::OsStrExt;
720 let c_path = CString::new(path.as_os_str().as_bytes())
721 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
722 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
723 if ret == 0 {
724 Ok(())
725 } else {
726 Err(std::io::Error::last_os_error())
727 }
728}
729
730#[cfg(unix)]
733fn chmod_safe_subtrees(state_dir: &std::path::Path) {
734 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
736
737 let state_file = state_dir.join("state.toml");
739 if state_file.exists() {
740 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
741 }
742
743 for subdir_name in &["sock", "logs"] {
745 let subdir = state_dir.join(subdir_name);
746 if subdir.is_dir() {
747 chmod_recursive(&subdir);
748 }
749 }
750}
751
752#[cfg(unix)]
754fn chmod_recursive(dir: &std::path::Path) {
755 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
756 let entries = match fs::read_dir(dir) {
757 Ok(e) => e,
758 Err(_) => return,
759 };
760 for entry in entries.flatten() {
761 let path = entry.path();
762 if path.is_dir() {
763 chmod_recursive(&path);
764 } else {
765 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
766 }
767 }
768}