Skip to main content

pitchfork_cli/supervisor/
mod.rs

1//! Supervisor module - daemon process supervisor
2//!
3//! This module is split into focused submodules:
4//! - `state`: State access layer (get/set operations)
5//! - `lifecycle`: Daemon start/stop operations
6//! - `autostop`: Autostop logic and boot daemon startup
7//! - `retry`: Retry logic with backoff
8//! - `watchers`: Background tasks (interval, cron, file watching)
9//! - `ipc_handlers`: IPC request dispatch
10
11mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49/// Exit statuses reaped by the container-mode zombie reaper for managed daemon
50/// PIDs. On non-Linux Unix platforms where `waitid(WNOWAIT)` is unavailable,
51/// `waitpid(None, WNOHANG)` may race with Tokio's `child.wait()`. When the
52/// zombie reaper wins, the exit status is stashed here so the monitoring task
53/// in lifecycle.rs can recover it instead of treating the ECHILD as a failure.
54///
55/// On Linux this map is unused because the reaper uses `waitid` with `WNOWAIT`
56/// to peek before reaping, which avoids the race entirely.
57#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59    Lazy::new(|| Mutex::new(HashMap::new()));
60
61// Re-export types needed by other modules
62pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65    pub(crate) state_file: Mutex<StateFile>,
66    pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67    pub(crate) last_refreshed_at: Mutex<time::Instant>,
68    /// Map of daemon ID to scheduled autostop time
69    pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70    /// Handle for graceful IPC server shutdown
71    pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72    /// Tracks in-flight hook tasks so shutdown can wait for them to complete
73    pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74    /// Number of monitoring tasks that are still running (between process exit
75    /// and hook registration completion). Used by `close()` to know when it is
76    /// safe to drain `hook_tasks`.
77    pub(crate) active_monitors: AtomicU32,
78    /// Signalled by each monitoring task after it finishes registering hooks
79    /// (or decides it has nothing to register). `close()` waits on this.
80    pub(crate) monitor_done: Notify,
81    /// Cancellation token for the proxy server — cancelled on shutdown to
82    /// stop accepting new connections and drain in-flight ones.
83    pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84    /// Join handle for the proxy task so shutdown can wait for cleanup.
85    pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86    /// mDNS publisher for LAN mode (None if LAN mode is disabled).
87    /// Shared with the LAN IP monitor task so it can re-publish on IP change.
88    pub(crate) mdns_publisher:
89        Mutex<Option<std::sync::Arc<tokio::sync::Mutex<crate::proxy::mdns::MdnsPublisher>>>>,
90    /// Join handle for the LAN IP monitor task.
91    pub(crate) lan_monitor_task: Mutex<Option<JoinHandle<()>>>,
92}
93
94pub(crate) fn interval_duration() -> Duration {
95    settings().general_interval()
96}
97
98pub static SUPERVISOR: Lazy<Supervisor> =
99    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
100
101pub fn start_if_not_running() -> Result<()> {
102    let sf = StateFile::get();
103    if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
104        && let Some(pid) = d.pid
105    {
106        PROCS.refresh_pids(&[pid]);
107        if PROCS.is_running(pid) {
108            return Ok(());
109        }
110    }
111    start_in_background()
112}
113
114pub fn start_in_background() -> Result<()> {
115    debug!("starting supervisor in background");
116    // Ensure the log directory exists so we can redirect stderr there.
117    // Panics and other fatal errors from the background supervisor process
118    // would otherwise be silently swallowed.
119    let log_file = &*env::PITCHFORK_LOG_FILE;
120    if let Some(parent) = log_file.parent() {
121        let _ = fs::create_dir_all(parent);
122    }
123    let stderr_file = fs::OpenOptions::new()
124        .create(true)
125        .append(true)
126        .open(log_file)
127        .into_diagnostic()?;
128    #[cfg(unix)]
129    fix_state_dir_permissions();
130    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
131        .stdout_null()
132        .stderr_file(stderr_file)
133        .start()
134        .into_diagnostic()?;
135    Ok(())
136}
137
138impl Supervisor {
139    pub fn new() -> Result<Self> {
140        Ok(Self {
141            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
142            last_refreshed_at: Mutex::new(time::Instant::now()),
143            pending_notifications: Mutex::new(vec![]),
144            pending_autostops: Mutex::new(HashMap::new()),
145            ipc_shutdown: Mutex::new(None),
146            hook_tasks: Mutex::new(Vec::new()),
147            active_monitors: AtomicU32::new(0),
148            monitor_done: Notify::new(),
149            proxy_cancel: Mutex::new(None),
150            proxy_task: Mutex::new(None),
151            mdns_publisher: Mutex::new(None),
152            lan_monitor_task: Mutex::new(None),
153        })
154    }
155
156    pub async fn start(
157        &self,
158        is_boot: bool,
159        container: bool,
160        web_port: Option<u16>,
161        web_path: Option<String>,
162    ) -> Result<()> {
163        // Ensure the state directory and its contents are accessible by non-root
164        // users. This is needed when the supervisor is started with `sudo` — all
165        // files it creates are owned by root, which prevents normal CLI clients
166        // from reading/writing state or connecting to the IPC socket.
167        #[cfg(unix)]
168        fix_state_dir_permissions();
169
170        let pid = std::process::id();
171        // Ensure PROCS has data for the supervisor PID before upsert_daemon reads title()
172        PROCS.refresh_pids(&[pid]);
173        // Determine container mode: CLI flag takes priority, then settings
174        let container_mode = container || settings().supervisor.container;
175        if container_mode {
176            info!("Starting supervisor in container/PID1 mode with pid {pid}");
177        } else {
178            info!("Starting supervisor with pid {pid}");
179        }
180
181        self.upsert_daemon(
182            UpsertDaemonOpts::builder(DaemonId::pitchfork())
183                .set(|o| {
184                    o.pid = Some(pid);
185                    o.status = DaemonStatus::Running;
186                })
187                .build(),
188        )
189        .await?;
190        #[cfg(unix)]
191        fix_state_dir_permissions();
192
193        // If this is a boot start, automatically start boot_start daemons
194        if is_boot {
195            info!("Boot start mode enabled, starting boot_start daemons");
196            self.start_boot_daemons().await?;
197        }
198
199        self.interval_watch()?;
200        self.cron_watch()?;
201        self.signals()?;
202        self.daemon_file_watch()?;
203
204        // In container mode, install SIGCHLD handler to reap orphaned/zombie processes
205        #[cfg(unix)]
206        if container_mode {
207            self.reap_zombies()?;
208        }
209
210        // Start web server: CLI --web-port takes priority, then settings.web.auto_start + bind_port
211        let s = settings();
212        let effective_port = web_port.or_else(|| {
213            if s.web.auto_start {
214                match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
215                    Some(p) => Some(p),
216                    None => {
217                        error!(
218                            "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
219                            s.web.bind_port
220                        );
221                        None
222                    }
223                }
224            } else {
225                None
226            }
227        });
228        // CLI --web-path takes priority, then settings.web.base_path
229        let effective_path = web_path.or_else(|| {
230            let bp = s.web.base_path.clone();
231            if bp.is_empty() { None } else { Some(bp) }
232        });
233        if let Some(port) = effective_port {
234            tokio::spawn(async move {
235                if let Err(e) = crate::web::serve(port, effective_path).await {
236                    error!("Web server error: {e}");
237                }
238            });
239        }
240
241        // Start reverse proxy server if enabled
242        if s.proxy.enable {
243            // Pre-generate the TLS certificate synchronously before spawning the proxy
244            // task. This ensures the cert exists immediately after `sup start` returns,
245            // so `proxy trust` can be run right away without waiting for the async task.
246            #[cfg(feature = "proxy-tls")]
247            if s.proxy.https {
248                let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
249                let ca_cert_path = proxy_dir.join("ca.pem");
250                let ca_key_path = proxy_dir.join("ca-key.pem");
251                if !ca_cert_path.exists() || !ca_key_path.exists() {
252                    match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
253                        Ok(()) => {
254                            info!(
255                                "Generated local CA certificate at {}",
256                                ca_cert_path.display()
257                            );
258                        }
259                        Err(e) => {
260                            error!("Failed to generate CA certificate: {e}");
261                        }
262                    }
263                }
264
265                // Auto-trust: attempt to install the CA certificate into the
266                // system trust store. May fail silently due to permissions;
267                // user can run `pitchfork proxy trust` manually.
268                if s.proxy.auto_trust && ca_cert_path.exists() {
269                    use crate::proxy::trust::{AutoTrustResult, auto_trust};
270                    match auto_trust(&ca_cert_path) {
271                        AutoTrustResult::AlreadyTrusted => {}
272                        AutoTrustResult::Trusted => {
273                            info!("CA certificate auto-trusted in system store");
274                        }
275                        AutoTrustResult::NotTrusted { reason } => {
276                            warn!("Auto-trust skipped: {reason}");
277                            warn!("Run `pitchfork proxy trust` to install manually");
278                        }
279                    }
280                }
281            }
282            // Spawn the proxy server and wait for its bind result via a oneshot
283            // channel.  This avoids the TOCTOU race of a pre-flight bind check
284            // while still surfacing binding failures immediately.
285            let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
286            let proxy_cancel = tokio_util::sync::CancellationToken::new();
287            let proxy_cancel_clone = proxy_cancel.clone();
288            *self.proxy_cancel.lock().await = Some(proxy_cancel);
289            let proxy_task = tokio::spawn(async move {
290                if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
291                    error!("Proxy server error: {e}");
292                }
293            });
294            *self.proxy_task.lock().await = Some(proxy_task);
295            match bind_rx.await {
296                Ok(Ok(())) => {
297                    // Proxy bound successfully — start mDNS if LAN mode is enabled.
298                    self.start_mdns().await;
299                }
300                Ok(Err(msg)) => {
301                    error!("{msg}");
302                    self.add_notification(log::LevelFilter::Error, msg).await;
303                }
304                Err(_) => {
305                    // Sender dropped without sending — serve() panicked or
306                    // returned before signalling.  Already logged by the
307                    // spawn error handler above.
308                }
309            }
310        }
311
312        let (ipc, ipc_handle) = IpcServer::new()?;
313        *self.ipc_shutdown.lock().await = Some(ipc_handle);
314        self.conn_watch(ipc).await
315    }
316
317    /// Start mDNS publishing for LAN mode (called after the proxy binds successfully).
318    async fn start_mdns(&self) {
319        let s = crate::settings::settings();
320        let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
321        if !s.proxy.enable || !lan_enabled {
322            return;
323        }
324
325        let lan_ip = if !s.proxy.lan_ip.is_empty() {
326            match s.proxy.lan_ip.parse::<std::net::Ipv4Addr>() {
327                Ok(ip) => Some(ip),
328                Err(e) => {
329                    error!(
330                        "proxy.lan_ip {:?} is not a valid IPv4 address: {e}",
331                        s.proxy.lan_ip
332                    );
333                    return;
334                }
335            }
336        } else {
337            match crate::proxy::lan_ip::detect_lan_ip().await {
338                Some(ip) => Some(ip),
339                None => {
340                    error!(
341                        "LAN mode is enabled but no LAN IP address could be detected. \
342                         Set proxy.lan_ip to a specific address, or ensure you are connected to a network."
343                    );
344                    return;
345                }
346            }
347        };
348
349        let Some(lan_ip) = lan_ip else { return };
350        let port = u16::try_from(s.proxy.port).unwrap_or(443);
351
352        let Some(mut publisher) = crate::proxy::mdns::MdnsPublisher::new(lan_ip) else {
353            error!("Failed to start mDNS publisher. Is Avahi (Linux) or Bonjour (macOS) running?");
354            return;
355        };
356
357        // Publish all registered slugs.
358        let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
359        for slug in slugs.keys() {
360            let hostname = format!("{slug}.local");
361            publisher.publish(&hostname, port);
362        }
363
364        log::info!(
365            "LAN mode: mDNS publishing on {lan_ip}, {} slug(s) registered",
366            slugs.len()
367        );
368
369        let publisher = std::sync::Arc::new(tokio::sync::Mutex::new(publisher));
370
371        // Start the IP monitor (only when IP is auto-detected, not pinned).
372        let ip_pinned = !s.proxy.lan_ip.is_empty();
373        if !ip_pinned {
374            let monitor_cancel = self.proxy_cancel.lock().await.clone();
375            let publisher_clone = publisher.clone();
376            let task = tokio::spawn(async move {
377                let mut last_ip = lan_ip;
378                let interval = std::time::Duration::from_secs(5);
379                let mut ticker = tokio::time::interval(interval);
380                ticker.tick().await; // first tick is immediate
381                loop {
382                    ticker.tick().await;
383                    if let Some(cancel) = monitor_cancel.as_ref() {
384                        if cancel.is_cancelled() {
385                            break;
386                        }
387                    }
388                    if let Some(new_ip) =
389                        crate::proxy::lan_ip::detect_lan_ip_if_changed(last_ip).await
390                    {
391                        log::info!("LAN IP changed: {last_ip} → {new_ip}");
392                        last_ip = new_ip;
393                        let mut pub_guard = publisher_clone.lock().await;
394                        pub_guard.republish_all(new_ip, port);
395                    }
396                }
397            });
398            *self.lan_monitor_task.lock().await = Some(task);
399        }
400
401        *self.mdns_publisher.lock().await = Some(publisher);
402    }
403
404    /// Re-read slugs from config and update mDNS records.
405    ///
406    /// Publishes new slugs and unpublishes removed ones. Called via IPC when
407    /// `proxy add` or `proxy remove` modifies the slug registry.
408    async fn sync_mdns(&self) {
409        // Clone the Arc and release the outer lock immediately so we don't
410        // block close() from taking the publisher during shutdown.
411        let publisher = {
412            let guard = self.mdns_publisher.lock().await;
413            match guard.as_ref() {
414                Some(p) => p.clone(),
415                None => {
416                    debug!("sync_mdns: mDNS publisher not active, skipping");
417                    return;
418                }
419            }
420        };
421
422        let s = crate::settings::settings();
423        let port = u16::try_from(s.proxy.port).unwrap_or(443);
424
425        let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
426        let mut pub_guard = publisher.lock().await;
427
428        // Unpublish slugs that no longer exist in config.
429        let current_keys: Vec<&String> = slugs.keys().collect();
430        let registered: Vec<String> = pub_guard.registered_hostnames();
431        for hostname in &registered {
432            // hostname is "slug.local" — extract slug part.
433            let slug = hostname.strip_suffix(".local").unwrap_or(hostname);
434            if !current_keys.iter().any(|k| k.as_str() == slug) {
435                log::info!("mDNS: unpublishing removed slug {slug}");
436                pub_guard.unpublish(hostname);
437            }
438        }
439
440        // Publish new slugs that aren't yet registered.
441        for slug in slugs.keys() {
442            let hostname = format!("{slug}.local");
443            if !pub_guard.is_published(&hostname) {
444                log::info!("mDNS: publishing new slug {slug}");
445                pub_guard.publish(&hostname, port);
446            }
447        }
448    }
449
450    pub(crate) async fn refresh(&self) -> Result<()> {
451        trace!("refreshing");
452
453        // Collect PIDs we need to check (shell PIDs only)
454        // This is more efficient than refreshing all processes on the system
455        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
456        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
457
458        if pids_to_check.is_empty() {
459            // No PIDs to check, skip the expensive refresh
460            trace!("no shell PIDs to check, skipping process refresh");
461        } else {
462            PROCS.refresh_pids(&pids_to_check);
463        }
464
465        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
466        *last_refreshed_at = time::Instant::now();
467
468        for (dir, pids) in dirs_with_pids {
469            let to_remove = pids
470                .iter()
471                .filter(|pid| !PROCS.is_running(**pid))
472                .collect::<Vec<_>>();
473            for pid in &to_remove {
474                self.remove_shell_pid(**pid).await?
475            }
476            if to_remove.len() == pids.len() {
477                self.leave_dir(&dir).await?;
478            }
479        }
480
481        self.check_retry().await?;
482        self.process_pending_autostops().await?;
483
484        Ok(())
485    }
486
487    /// Install a SIGCHLD handler that reaps orphaned zombie child processes.
488    ///
489    /// When running as PID 1 inside a container, orphaned processes are
490    /// re-parented to PID 1. Without explicit reaping, they accumulate
491    /// as zombies in the process table indefinitely.
492    ///
493    /// Only reaps processes that are NOT managed by the supervisor (i.e.
494    /// not tracked in the state file). Managed daemon processes are reaped
495    /// by their monitoring tasks via `child.wait()`.
496    ///
497    /// ## Strategy
498    ///
499    /// **Linux**: Uses `waitid(Id::All, WNOHANG | WNOWAIT | WEXITED)` to
500    /// *peek* at the next zombie without consuming its status. If the PID
501    /// belongs to a managed daemon, the reaper skips it so Tokio's
502    /// `child.wait()` can collect the status normally. Only unmanaged
503    /// orphans are actually reaped (via `waitpid(Pid, WNOHANG)`). This
504    /// eliminates the race entirely.
505    ///
506    /// **Non-Linux Unix** (e.g. macOS — mainly for local development;
507    /// container mode targets Linux): `waitid` is unavailable, so we fall
508    /// back to `waitpid(None, WNOHANG)`. If the reaper accidentally
509    /// consumes a managed PID's status, it stashes the exit code in
510    /// [`REAPED_STATUSES`] for the monitoring task to recover.
511    #[cfg(unix)]
512    fn reap_zombies(&self) -> Result<()> {
513        let mut stream = signal::unix::signal(SignalKind::child())
514            .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
515        tokio::spawn(async move {
516            loop {
517                stream.recv().await;
518                // Collect PIDs of managed daemons so we don't steal their exit status
519                let managed_pids: HashSet<u32> = SUPERVISOR
520                    .state_file
521                    .lock()
522                    .await
523                    .daemons
524                    .values()
525                    .filter_map(|d| d.pid)
526                    .collect();
527                // Reap all available zombie children that are NOT managed
528                Self::reap_unmanaged_zombies(&managed_pids).await;
529            }
530        });
531        info!("container mode: SIGCHLD zombie reaper installed");
532        Ok(())
533    }
534
535    /// Linux implementation: peek with `waitid(WNOWAIT)` then selectively reap.
536    ///
537    /// `WNOWAIT` leaves the zombie in the table so we can inspect its PID
538    /// without consuming the exit status. Only if the PID is *not* managed
539    /// do we call `waitpid(Pid, WNOHANG)` to actually reap it.
540    #[cfg(target_os = "linux")]
541    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
542        use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
543        use nix::unistd::Pid;
544
545        loop {
546            // Peek at the next zombie without consuming it
547            let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
548            match waitid(Id::All, peek_flags) {
549                Ok(WaitStatus::StillAlive) => break,
550                Ok(status) => {
551                    let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
552                        break;
553                    };
554                    if managed_pids.contains(&pid_raw) {
555                        // This is a managed daemon — leave it for Tokio's child.wait().
556                        // We must break out of the loop because waitid(Id::All) would
557                        // keep returning the same zombie if we don't consume it.
558                        trace!(
559                            "zombie reaper: skipping managed daemon pid {pid_raw}, \
560                             leaving for Tokio to reap"
561                        );
562                        break;
563                    }
564                    // Not managed — actually reap it
565                    match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
566                        Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
567                        Err(nix::errno::Errno::ECHILD) => break,
568                        Err(e) => {
569                            trace!("waitpid error reaping pid {pid_raw}: {e}");
570                            break;
571                        }
572                    }
573                }
574                Err(nix::errno::Errno::ECHILD) => break, // no children at all
575                Err(e) => {
576                    trace!("waitid error in zombie reaper: {e}");
577                    break;
578                }
579            }
580        }
581    }
582
583    /// Non-Linux fallback: blind `waitpid(None, WNOHANG)` with stash recovery.
584    ///
585    /// Since `waitid(WNOWAIT)` is not available, we cannot peek. If we
586    /// accidentally reap a managed PID, we stash the exit code in
587    /// [`REAPED_STATUSES`] so the monitoring task can recover it.
588    #[cfg(all(unix, not(target_os = "linux")))]
589    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
590        use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
591
592        loop {
593            match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
594                Ok(WaitStatus::StillAlive) => break,
595                Ok(status) => {
596                    let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
597                        continue;
598                    };
599                    if managed_pids.contains(&pid) {
600                        // Race lost — stash the exit code for lifecycle recovery
601                        let exit_code = match status {
602                            WaitStatus::Exited(_, code) => code,
603                            WaitStatus::Signaled(_, sig, _) => -(sig as i32),
604                            _ => -1,
605                        };
606                        warn!(
607                            "zombie reaper reaped managed daemon pid {pid} \
608                             (exit_code={exit_code}); stashing status for recovery"
609                        );
610                        REAPED_STATUSES.lock().await.insert(pid, exit_code);
611                    } else {
612                        trace!("reaped orphaned zombie child: {status:?}");
613                    }
614                }
615                Err(nix::errno::Errno::ECHILD) => break, // no more children
616                Err(e) => {
617                    trace!("waitpid error in zombie reaper: {e}");
618                    break;
619                }
620            }
621        }
622    }
623
624    #[cfg(unix)]
625    fn signals(&self) -> Result<()> {
626        let signals = [
627            SignalKind::terminate(),
628            SignalKind::alarm(),
629            SignalKind::interrupt(),
630            SignalKind::quit(),
631            SignalKind::hangup(),
632            SignalKind::user_defined1(),
633            SignalKind::user_defined2(),
634        ];
635        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
636        for signal in signals {
637            let stream = match signal::unix::signal(signal) {
638                Ok(s) => s,
639                Err(e) => {
640                    warn!("Failed to register signal handler for {signal:?}: {e}");
641                    continue;
642                }
643            };
644            tokio::spawn(async move {
645                let mut stream = stream;
646                loop {
647                    stream.recv().await;
648                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
649                        exit(1);
650                    } else {
651                        SUPERVISOR.handle_signal().await;
652                    }
653                }
654            });
655        }
656        Ok(())
657    }
658
659    #[cfg(windows)]
660    fn signals(&self) -> Result<()> {
661        tokio::spawn(async move {
662            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
663            loop {
664                if let Err(e) = signal::ctrl_c().await {
665                    error!("Failed to wait for ctrl-c: {}", e);
666                    return;
667                }
668                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
669                    exit(1);
670                } else {
671                    SUPERVISOR.handle_signal().await;
672                }
673            }
674        });
675        Ok(())
676    }
677
678    async fn handle_signal(&self) {
679        info!("received signal, stopping");
680        self.close().await;
681        exit(0)
682    }
683
684    pub(crate) async fn close(&self) {
685        // Signal the proxy server to stop accepting new connections
686        // and drain in-flight ones, *before* stopping daemons so the
687        // proxy has time to finish forwarding active requests.
688        if let Some(cancel) = self.proxy_cancel.lock().await.take() {
689            cancel.cancel();
690        }
691
692        // Stop the LAN IP monitor task.
693        if let Some(monitor_task) = self.lan_monitor_task.lock().await.take() {
694            monitor_task.abort();
695        }
696
697        // Shutdown the mDNS publisher (sends goodbye packets).
698        if let Some(publisher) = self.mdns_publisher.lock().await.take() {
699            publisher.lock().await.shutdown();
700        }
701
702        if let Some(proxy_task) = self.proxy_task.lock().await.take() {
703            let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
704        }
705
706        // Clean up /etc/hosts entries managed by pitchfork
707        let s = settings();
708        if s.proxy.enable && s.proxy.sync_hosts {
709            crate::proxy::hosts::clean_hosts_file();
710        }
711
712        let pitchfork_id = DaemonId::pitchfork();
713        let active = self.active_daemons().await;
714        let active_ids: Vec<DaemonId> = active
715            .iter()
716            .filter(|d| d.id != pitchfork_id)
717            .map(|d| d.id.clone())
718            .collect();
719
720        // Stop daemons in reverse dependency order.
721        // If dependency resolution fails (e.g. config changed), fall back to
722        // stopping in arbitrary order so we still shut down cleanly.
723        // Daemons within the same level are stopped concurrently.
724        let stop_levels = compute_reverse_stop_order(&active_ids);
725        for level in &stop_levels {
726            let mut tasks = Vec::new();
727            for id in level {
728                let id = id.clone();
729                tasks.push(tokio::spawn(async move {
730                    if let Err(err) = SUPERVISOR.stop(&id).await {
731                        error!("failed to stop daemon {id}: {err}");
732                    }
733                }));
734            }
735            for task in tasks {
736                let _ = task.await;
737            }
738        }
739        let _ = self.remove_daemon(&pitchfork_id).await;
740
741        // Signal IPC server to shut down gracefully
742        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
743            handle.shutdown();
744        }
745
746        // Wait for all in-flight monitoring tasks to finish registering their
747        // hook handles. Each monitoring task increments `active_monitors` when
748        // its process exits, and decrements it (+ notifies `monitor_done`)
749        // after all fire_hook() calls complete. This replaces the old
750        // yield_now() approach which had a race window.
751        let drain_timeout = time::sleep(Duration::from_secs(5));
752        tokio::pin!(drain_timeout);
753        loop {
754            if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
755                break;
756            }
757            tokio::select! {
758                _ = self.monitor_done.notified() => {}
759                _ = &mut drain_timeout => {
760                    warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
761                    break;
762                }
763            }
764        }
765        let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
766        let hook_timeout = Duration::from_secs(30);
767        for handle in handles {
768            match time::timeout(hook_timeout, handle).await {
769                Ok(_) => {} // Hook completed (success or error, doesn't matter)
770                Err(_) => {
771                    warn!(
772                        "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
773                    );
774                }
775            }
776        }
777
778        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
779    }
780
781    pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
782        self.pending_notifications
783            .lock()
784            .await
785            .push((level, message));
786    }
787}
788
789/// Fix ownership on the state directory so non-root users can access files
790/// created by a `sudo`-started supervisor.
791///
792/// When `[settings.supervisor] user` or `SUDO_UID`/`SUDO_GID` are set, we
793/// `chown` the state directory and safe subdirectories back to that non-root
794/// runtime user. This is strictly better than `chmod 0o666` because it does not
795/// widen the permission bits — the files stay owner-only (0o600/0o700) but the
796/// *owner* is the user that daemon processes and CLI clients need to share.
797///
798/// **Security**: The `proxy/` subtree is intentionally skipped. It contains
799/// `ca-key.pem` which must remain `0o600` and owned by the process that
800/// generated it. Changing its ownership or permissions would expose the CA
801/// private key to other local users.
802///
803/// If neither `user` nor `SUDO_UID`/`SUDO_GID` are available (e.g. direct
804/// root login), we fall back to relaxing permissions on only the `sock/` and
805/// `logs/` subdirectories (plus `state.toml`) so CLI clients can still function.
806#[cfg(unix)]
807fn fix_state_dir_permissions() {
808    let state_dir = &*env::PITCHFORK_STATE_DIR;
809    if let Some((uid, gid)) = state_owner_ids() {
810        if !state_dir.exists()
811            && let Err(err) = fs::create_dir_all(state_dir)
812        {
813            warn!(
814                "failed to create state directory for ownership fix at {}: {err}",
815                state_dir.display()
816            );
817            return;
818        }
819
820        // Best path: chown back to the runtime user. Permissions stay tight.
821        chown_recursive(state_dir, uid, gid, true);
822        debug!(
823            "chowned state directory to uid={uid} gid={gid} at {}",
824            state_dir.display()
825        );
826    } else {
827        if !state_dir.exists() {
828            return;
829        }
830
831        // Fallback: relax permissions on safe subdirectories only.
832        // proxy/ is never touched.
833        chmod_safe_subtrees(state_dir);
834        debug!(
835            "relaxed permissions on safe subtrees at {}",
836            state_dir.display()
837        );
838    }
839}
840
841#[cfg(unix)]
842pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
843    if !nix::unistd::Uid::effective().is_root() {
844        return None;
845    }
846
847    let user = settings().supervisor.user.trim();
848    if !user.is_empty() {
849        return resolve_supervisor_user_ids(user).or_else(|| {
850            warn!(
851                "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
852            );
853            parse_sudo_ids()
854        });
855    }
856
857    parse_sudo_ids()
858}
859
860#[cfg(unix)]
861fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
862    let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
863        let uid = user.parse::<u32>().ok()?;
864        nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
865            .ok()
866            .flatten()
867    } else {
868        nix::unistd::User::from_name(user).ok().flatten()
869    }?;
870
871    Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
872}
873
874/// Parse `SUDO_UID` and `SUDO_GID` environment variables into numeric IDs.
875///
876/// Returns `None` unless the effective UID is 0 (root). This prevents stale
877/// `SUDO_UID`/`SUDO_GID` values inherited into non-sudo environments from
878/// triggering incorrect `chown` operations.
879#[cfg(unix)]
880fn parse_sudo_ids() -> Option<(u32, u32)> {
881    if !nix::unistd::Uid::effective().is_root() {
882        return None;
883    }
884    let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
885    let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
886    Some((uid, gid))
887}
888
889/// Recursively `chown` a directory tree. If `skip_proxy` is true, the `proxy/`
890/// subdirectory is skipped entirely to protect the CA private key.
891#[cfg(unix)]
892fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
893    // chown the directory itself
894    let _ = chown_path(dir, uid, gid);
895
896    let entries = match std::fs::read_dir(dir) {
897        Ok(e) => e,
898        Err(_) => return,
899    };
900    for entry in entries.flatten() {
901        let path = entry.path();
902        if path.is_dir() {
903            // Skip proxy/ at the top level of the state directory
904            if skip_proxy {
905                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
906                    if name == "proxy" {
907                        continue;
908                    }
909                }
910            }
911            chown_recursive(&path, uid, gid, false);
912        } else {
913            let _ = chown_path(&path, uid, gid);
914        }
915    }
916}
917
918/// `chown` a single path using libc. Returns Ok(()) on success.
919#[cfg(unix)]
920fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
921    use std::ffi::CString;
922    use std::os::unix::ffi::OsStrExt;
923    let c_path = CString::new(path.as_os_str().as_bytes())
924        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
925    let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
926    if ret == 0 {
927        Ok(())
928    } else {
929        Err(std::io::Error::last_os_error())
930    }
931}
932
933/// Fallback: relax permissions on safe subdirectories only (sock/, logs/, and
934/// state.toml). The proxy/ subtree is never touched.
935#[cfg(unix)]
936fn chmod_safe_subtrees(state_dir: &std::path::Path) {
937    // The state directory itself needs to be traversable
938    let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
939
940    // state.toml — needs to be readable by CLI clients
941    let state_file = state_dir.join("state.toml");
942    if state_file.exists() {
943        let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
944    }
945
946    // Safe subdirectories: sock/ and logs/
947    for subdir_name in &["sock", "logs"] {
948        let subdir = state_dir.join(subdir_name);
949        if subdir.is_dir() {
950            chmod_recursive(&subdir);
951        }
952    }
953}
954
955/// Recursively chmod: directories → 0o755, files → 0o644.
956#[cfg(unix)]
957fn chmod_recursive(dir: &std::path::Path) {
958    let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
959    let entries = match fs::read_dir(dir) {
960        Ok(e) => e,
961        Err(_) => return,
962    };
963    for entry in entries.flatten() {
964        let path = entry.path();
965        if path.is_dir() {
966            chmod_recursive(&path);
967        } else {
968            let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
969        }
970    }
971}