pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59 Lazy::new(|| Mutex::new(HashMap::new()));
60
61pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65 pub(crate) state_file: Mutex<StateFile>,
66 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67 pub(crate) last_refreshed_at: Mutex<time::Instant>,
68 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74 pub(crate) active_monitors: AtomicU32,
78 pub(crate) monitor_done: Notify,
81 pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84 pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86}
87
88pub(crate) fn interval_duration() -> Duration {
89 settings().general_interval()
90}
91
92pub static SUPERVISOR: Lazy<Supervisor> =
93 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
94
95pub fn start_if_not_running() -> Result<()> {
96 let sf = StateFile::get();
97 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
98 && let Some(pid) = d.pid
99 && PROCS.is_running(pid)
100 {
101 return Ok(());
102 }
103 start_in_background()
104}
105
106pub fn start_in_background() -> Result<()> {
107 debug!("starting supervisor in background");
108 let log_file = &*env::PITCHFORK_LOG_FILE;
112 if let Some(parent) = log_file.parent() {
113 let _ = fs::create_dir_all(parent);
114 }
115 let stderr_file = fs::OpenOptions::new()
116 .create(true)
117 .append(true)
118 .open(log_file)
119 .into_diagnostic()?;
120 #[cfg(unix)]
121 fix_state_dir_permissions();
122 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
123 .stdout_null()
124 .stderr_file(stderr_file)
125 .start()
126 .into_diagnostic()?;
127 Ok(())
128}
129
130impl Supervisor {
131 pub fn new() -> Result<Self> {
132 Ok(Self {
133 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
134 last_refreshed_at: Mutex::new(time::Instant::now()),
135 pending_notifications: Mutex::new(vec![]),
136 pending_autostops: Mutex::new(HashMap::new()),
137 ipc_shutdown: Mutex::new(None),
138 hook_tasks: Mutex::new(Vec::new()),
139 active_monitors: AtomicU32::new(0),
140 monitor_done: Notify::new(),
141 proxy_cancel: Mutex::new(None),
142 proxy_task: Mutex::new(None),
143 })
144 }
145
146 pub async fn start(
147 &self,
148 is_boot: bool,
149 container: bool,
150 web_port: Option<u16>,
151 web_path: Option<String>,
152 ) -> Result<()> {
153 #[cfg(unix)]
158 fix_state_dir_permissions();
159
160 let pid = std::process::id();
161 let container_mode = container || settings().supervisor.container;
163 if container_mode {
164 info!("Starting supervisor in container/PID1 mode with pid {pid}");
165 } else {
166 info!("Starting supervisor with pid {pid}");
167 }
168
169 self.upsert_daemon(
170 UpsertDaemonOpts::builder(DaemonId::pitchfork())
171 .set(|o| {
172 o.pid = Some(pid);
173 o.status = DaemonStatus::Running;
174 })
175 .build(),
176 )
177 .await?;
178 #[cfg(unix)]
179 fix_state_dir_permissions();
180
181 if is_boot {
183 info!("Boot start mode enabled, starting boot_start daemons");
184 self.start_boot_daemons().await?;
185 }
186
187 self.interval_watch()?;
188 self.cron_watch()?;
189 self.signals()?;
190 self.daemon_file_watch()?;
191
192 #[cfg(unix)]
194 if container_mode {
195 self.reap_zombies()?;
196 }
197
198 let s = settings();
200 let effective_port = web_port.or_else(|| {
201 if s.web.auto_start {
202 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
203 Some(p) => Some(p),
204 None => {
205 error!(
206 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
207 s.web.bind_port
208 );
209 None
210 }
211 }
212 } else {
213 None
214 }
215 });
216 let effective_path = web_path.or_else(|| {
218 let bp = s.web.base_path.clone();
219 if bp.is_empty() { None } else { Some(bp) }
220 });
221 if let Some(port) = effective_port {
222 tokio::spawn(async move {
223 if let Err(e) = crate::web::serve(port, effective_path).await {
224 error!("Web server error: {e}");
225 }
226 });
227 }
228
229 if s.proxy.enable {
231 #[cfg(feature = "proxy-tls")]
235 if s.proxy.https {
236 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
237 let ca_cert_path = proxy_dir.join("ca.pem");
238 let ca_key_path = proxy_dir.join("ca-key.pem");
239 if !ca_cert_path.exists() || !ca_key_path.exists() {
240 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
241 Ok(()) => {
242 info!(
243 "Generated local CA certificate at {}",
244 ca_cert_path.display()
245 );
246 info!("To trust the CA in your browser, run: pitchfork proxy trust");
247 }
248 Err(e) => {
249 error!("Failed to generate CA certificate: {e}");
250 }
251 }
252 }
253 }
254 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
258 let proxy_cancel = tokio_util::sync::CancellationToken::new();
259 let proxy_cancel_clone = proxy_cancel.clone();
260 *self.proxy_cancel.lock().await = Some(proxy_cancel);
261 let proxy_task = tokio::spawn(async move {
262 if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
263 error!("Proxy server error: {e}");
264 }
265 });
266 *self.proxy_task.lock().await = Some(proxy_task);
267 match bind_rx.await {
268 Ok(Ok(())) => {
269 }
271 Ok(Err(msg)) => {
272 error!("{msg}");
273 self.add_notification(log::LevelFilter::Error, msg).await;
274 }
275 Err(_) => {
276 }
280 }
281 }
282
283 let (ipc, ipc_handle) = IpcServer::new()?;
284 *self.ipc_shutdown.lock().await = Some(ipc_handle);
285 self.conn_watch(ipc).await
286 }
287
288 pub(crate) async fn refresh(&self) -> Result<()> {
289 trace!("refreshing");
290
291 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
294 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
295
296 if pids_to_check.is_empty() {
297 trace!("no shell PIDs to check, skipping process refresh");
299 } else {
300 PROCS.refresh_pids(&pids_to_check);
301 }
302
303 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
304 *last_refreshed_at = time::Instant::now();
305
306 for (dir, pids) in dirs_with_pids {
307 let to_remove = pids
308 .iter()
309 .filter(|pid| !PROCS.is_running(**pid))
310 .collect::<Vec<_>>();
311 for pid in &to_remove {
312 self.remove_shell_pid(**pid).await?
313 }
314 if to_remove.len() == pids.len() {
315 self.leave_dir(&dir).await?;
316 }
317 }
318
319 self.check_retry().await?;
320 self.process_pending_autostops().await?;
321
322 Ok(())
323 }
324
325 #[cfg(unix)]
350 fn reap_zombies(&self) -> Result<()> {
351 let mut stream = signal::unix::signal(SignalKind::child())
352 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
353 tokio::spawn(async move {
354 loop {
355 stream.recv().await;
356 let managed_pids: HashSet<u32> = SUPERVISOR
358 .state_file
359 .lock()
360 .await
361 .daemons
362 .values()
363 .filter_map(|d| d.pid)
364 .collect();
365 Self::reap_unmanaged_zombies(&managed_pids).await;
367 }
368 });
369 info!("container mode: SIGCHLD zombie reaper installed");
370 Ok(())
371 }
372
373 #[cfg(target_os = "linux")]
379 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
380 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
381 use nix::unistd::Pid;
382
383 loop {
384 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
386 match waitid(Id::All, peek_flags) {
387 Ok(WaitStatus::StillAlive) => break,
388 Ok(status) => {
389 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
390 break;
391 };
392 if managed_pids.contains(&pid_raw) {
393 trace!(
397 "zombie reaper: skipping managed daemon pid {pid_raw}, \
398 leaving for Tokio to reap"
399 );
400 break;
401 }
402 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
404 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
405 Err(nix::errno::Errno::ECHILD) => break,
406 Err(e) => {
407 trace!("waitpid error reaping pid {pid_raw}: {e}");
408 break;
409 }
410 }
411 }
412 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
414 trace!("waitid error in zombie reaper: {e}");
415 break;
416 }
417 }
418 }
419 }
420
421 #[cfg(all(unix, not(target_os = "linux")))]
427 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
428 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
429
430 loop {
431 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
432 Ok(WaitStatus::StillAlive) => break,
433 Ok(status) => {
434 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
435 continue;
436 };
437 if managed_pids.contains(&pid) {
438 let exit_code = match status {
440 WaitStatus::Exited(_, code) => code,
441 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
442 _ => -1,
443 };
444 warn!(
445 "zombie reaper reaped managed daemon pid {pid} \
446 (exit_code={exit_code}); stashing status for recovery"
447 );
448 REAPED_STATUSES.lock().await.insert(pid, exit_code);
449 } else {
450 trace!("reaped orphaned zombie child: {status:?}");
451 }
452 }
453 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
455 trace!("waitpid error in zombie reaper: {e}");
456 break;
457 }
458 }
459 }
460 }
461
462 #[cfg(unix)]
463 fn signals(&self) -> Result<()> {
464 let signals = [
465 SignalKind::terminate(),
466 SignalKind::alarm(),
467 SignalKind::interrupt(),
468 SignalKind::quit(),
469 SignalKind::hangup(),
470 SignalKind::user_defined1(),
471 SignalKind::user_defined2(),
472 ];
473 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
474 for signal in signals {
475 let stream = match signal::unix::signal(signal) {
476 Ok(s) => s,
477 Err(e) => {
478 warn!("Failed to register signal handler for {signal:?}: {e}");
479 continue;
480 }
481 };
482 tokio::spawn(async move {
483 let mut stream = stream;
484 loop {
485 stream.recv().await;
486 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
487 exit(1);
488 } else {
489 SUPERVISOR.handle_signal().await;
490 }
491 }
492 });
493 }
494 Ok(())
495 }
496
497 #[cfg(windows)]
498 fn signals(&self) -> Result<()> {
499 tokio::spawn(async move {
500 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
501 loop {
502 if let Err(e) = signal::ctrl_c().await {
503 error!("Failed to wait for ctrl-c: {}", e);
504 return;
505 }
506 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
507 exit(1);
508 } else {
509 SUPERVISOR.handle_signal().await;
510 }
511 }
512 });
513 Ok(())
514 }
515
516 async fn handle_signal(&self) {
517 info!("received signal, stopping");
518 self.close().await;
519 exit(0)
520 }
521
522 pub(crate) async fn close(&self) {
523 if let Some(cancel) = self.proxy_cancel.lock().await.take() {
527 cancel.cancel();
528 }
529
530 if let Some(proxy_task) = self.proxy_task.lock().await.take() {
531 let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
532 }
533
534 let s = settings();
536 if s.proxy.enable && s.proxy.sync_hosts {
537 crate::proxy::hosts::clean_hosts_file();
538 }
539
540 let pitchfork_id = DaemonId::pitchfork();
541 let active = self.active_daemons().await;
542 let active_ids: Vec<DaemonId> = active
543 .iter()
544 .filter(|d| d.id != pitchfork_id)
545 .map(|d| d.id.clone())
546 .collect();
547
548 let stop_levels = compute_reverse_stop_order(&active_ids);
553 for level in &stop_levels {
554 let mut tasks = Vec::new();
555 for id in level {
556 let id = id.clone();
557 tasks.push(tokio::spawn(async move {
558 if let Err(err) = SUPERVISOR.stop(&id).await {
559 error!("failed to stop daemon {id}: {err}");
560 }
561 }));
562 }
563 for task in tasks {
564 let _ = task.await;
565 }
566 }
567 let _ = self.remove_daemon(&pitchfork_id).await;
568
569 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
571 handle.shutdown();
572 }
573
574 let drain_timeout = time::sleep(Duration::from_secs(5));
580 tokio::pin!(drain_timeout);
581 loop {
582 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
583 break;
584 }
585 tokio::select! {
586 _ = self.monitor_done.notified() => {}
587 _ = &mut drain_timeout => {
588 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
589 break;
590 }
591 }
592 }
593 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
594 let hook_timeout = Duration::from_secs(30);
595 for handle in handles {
596 match time::timeout(hook_timeout, handle).await {
597 Ok(_) => {} Err(_) => {
599 warn!(
600 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
601 );
602 }
603 }
604 }
605
606 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
607 }
608
609 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
610 self.pending_notifications
611 .lock()
612 .await
613 .push((level, message));
614 }
615}
616
617#[cfg(unix)]
635fn fix_state_dir_permissions() {
636 let state_dir = &*env::PITCHFORK_STATE_DIR;
637 if let Some((uid, gid)) = state_owner_ids() {
638 if !state_dir.exists()
639 && let Err(err) = fs::create_dir_all(state_dir)
640 {
641 warn!(
642 "failed to create state directory for ownership fix at {}: {err}",
643 state_dir.display()
644 );
645 return;
646 }
647
648 chown_recursive(state_dir, uid, gid, true);
650 debug!(
651 "chowned state directory to uid={uid} gid={gid} at {}",
652 state_dir.display()
653 );
654 } else {
655 if !state_dir.exists() {
656 return;
657 }
658
659 chmod_safe_subtrees(state_dir);
662 debug!(
663 "relaxed permissions on safe subtrees at {}",
664 state_dir.display()
665 );
666 }
667}
668
669#[cfg(unix)]
670pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
671 if !nix::unistd::Uid::effective().is_root() {
672 return None;
673 }
674
675 let user = settings().supervisor.user.trim();
676 if !user.is_empty() {
677 return resolve_supervisor_user_ids(user).or_else(|| {
678 warn!(
679 "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
680 );
681 parse_sudo_ids()
682 });
683 }
684
685 parse_sudo_ids()
686}
687
688#[cfg(unix)]
689fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
690 let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
691 let uid = user.parse::<u32>().ok()?;
692 nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
693 .ok()
694 .flatten()
695 } else {
696 nix::unistd::User::from_name(user).ok().flatten()
697 }?;
698
699 Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
700}
701
702#[cfg(unix)]
708fn parse_sudo_ids() -> Option<(u32, u32)> {
709 if !nix::unistd::Uid::effective().is_root() {
710 return None;
711 }
712 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
713 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
714 Some((uid, gid))
715}
716
717#[cfg(unix)]
720fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
721 let _ = chown_path(dir, uid, gid);
723
724 let entries = match std::fs::read_dir(dir) {
725 Ok(e) => e,
726 Err(_) => return,
727 };
728 for entry in entries.flatten() {
729 let path = entry.path();
730 if path.is_dir() {
731 if skip_proxy {
733 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
734 if name == "proxy" {
735 continue;
736 }
737 }
738 }
739 chown_recursive(&path, uid, gid, false);
740 } else {
741 let _ = chown_path(&path, uid, gid);
742 }
743 }
744}
745
746#[cfg(unix)]
748fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
749 use std::ffi::CString;
750 use std::os::unix::ffi::OsStrExt;
751 let c_path = CString::new(path.as_os_str().as_bytes())
752 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
753 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
754 if ret == 0 {
755 Ok(())
756 } else {
757 Err(std::io::Error::last_os_error())
758 }
759}
760
761#[cfg(unix)]
764fn chmod_safe_subtrees(state_dir: &std::path::Path) {
765 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
767
768 let state_file = state_dir.join("state.toml");
770 if state_file.exists() {
771 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
772 }
773
774 for subdir_name in &["sock", "logs"] {
776 let subdir = state_dir.join(subdir_name);
777 if subdir.is_dir() {
778 chmod_recursive(&subdir);
779 }
780 }
781}
782
783#[cfg(unix)]
785fn chmod_recursive(dir: &std::path::Path) {
786 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
787 let entries = match fs::read_dir(dir) {
788 Ok(e) => e,
789 Err(_) => return,
790 };
791 for entry in entries.flatten() {
792 let path = entry.path();
793 if path.is_dir() {
794 chmod_recursive(&path);
795 } else {
796 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
797 }
798 }
799}