Skip to main content

pitchfork_cli/supervisor/
mod.rs

1//! Supervisor module - daemon process supervisor
2//!
3//! This module is split into focused submodules:
4//! - `state`: State access layer (get/set operations)
5//! - `lifecycle`: Daemon start/stop operations
6//! - `autostop`: Autostop logic and boot daemon startup
7//! - `retry`: Retry logic with backoff
8//! - `watchers`: Background tasks (interval, cron, file watching)
9//! - `ipc_handlers`: IPC request dispatch
10
11mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49/// Exit statuses reaped by the container-mode zombie reaper for managed daemon
50/// PIDs. On non-Linux Unix platforms where `waitid(WNOWAIT)` is unavailable,
51/// `waitpid(None, WNOHANG)` may race with Tokio's `child.wait()`. When the
52/// zombie reaper wins, the exit status is stashed here so the monitoring task
53/// in lifecycle.rs can recover it instead of treating the ECHILD as a failure.
54///
55/// On Linux this map is unused because the reaper uses `waitid` with `WNOWAIT`
56/// to peek before reaping, which avoids the race entirely.
57#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59    Lazy::new(|| Mutex::new(HashMap::new()));
60
61// Re-export types needed by other modules
62pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65    pub(crate) state_file: Mutex<StateFile>,
66    pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67    pub(crate) last_refreshed_at: Mutex<time::Instant>,
68    /// Map of daemon ID to scheduled autostop time
69    pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70    /// Handle for graceful IPC server shutdown
71    pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72    /// Tracks in-flight hook tasks so shutdown can wait for them to complete
73    pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74    /// Number of monitoring tasks that are still running (between process exit
75    /// and hook registration completion). Used by `close()` to know when it is
76    /// safe to drain `hook_tasks`.
77    pub(crate) active_monitors: AtomicU32,
78    /// Signalled by each monitoring task after it finishes registering hooks
79    /// (or decides it has nothing to register). `close()` waits on this.
80    pub(crate) monitor_done: Notify,
81    /// Cancellation token for the proxy server — cancelled on shutdown to
82    /// stop accepting new connections and drain in-flight ones.
83    pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84    /// Join handle for the proxy task so shutdown can wait for cleanup.
85    pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86    /// mDNS publisher for LAN mode (None if LAN mode is disabled).
87    /// Shared with the LAN IP monitor task so it can re-publish on IP change.
88    pub(crate) mdns_publisher:
89        Mutex<Option<std::sync::Arc<tokio::sync::Mutex<crate::proxy::mdns::MdnsPublisher>>>>,
90    /// Join handle for the LAN IP monitor task.
91    pub(crate) lan_monitor_task: Mutex<Option<JoinHandle<()>>>,
92    /// Cancellation token for the background state flush task.
93    pub(crate) flush_cancel: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
94}
95
96pub(crate) fn interval_duration() -> Duration {
97    settings().general_interval()
98}
99
100pub static SUPERVISOR: Lazy<Supervisor> =
101    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
102
103pub fn start_if_not_running() -> Result<()> {
104    let sf = StateFile::get();
105    if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
106        && let Some(pid) = d.pid
107    {
108        PROCS.refresh_pids(&[pid]);
109        if PROCS.is_running(pid) {
110            return Ok(());
111        }
112    }
113    start_in_background()
114}
115
116pub fn start_in_background() -> Result<()> {
117    debug!("starting supervisor in background");
118    // Ensure the log directory exists so we can redirect stderr there.
119    // Panics and other fatal errors from the background supervisor process
120    // would otherwise be silently swallowed.
121    let log_file = &*env::PITCHFORK_LOG_FILE;
122    if let Some(parent) = log_file.parent() {
123        let _ = fs::create_dir_all(parent);
124    }
125    let stderr_file = fs::OpenOptions::new()
126        .create(true)
127        .append(true)
128        .open(log_file)
129        .into_diagnostic()?;
130    #[cfg(unix)]
131    fix_state_dir_permissions();
132    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
133        .stdout_null()
134        .stderr_file(stderr_file)
135        .start()
136        .into_diagnostic()?;
137    Ok(())
138}
139
140impl Supervisor {
141    pub fn new() -> Result<Self> {
142        Ok(Self {
143            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
144            last_refreshed_at: Mutex::new(time::Instant::now()),
145            pending_notifications: Mutex::new(vec![]),
146            pending_autostops: Mutex::new(HashMap::new()),
147            ipc_shutdown: Mutex::new(None),
148            hook_tasks: Mutex::new(Vec::new()),
149            active_monitors: AtomicU32::new(0),
150            monitor_done: Notify::new(),
151            proxy_cancel: Mutex::new(None),
152            proxy_task: Mutex::new(None),
153            mdns_publisher: Mutex::new(None),
154            lan_monitor_task: Mutex::new(None),
155            flush_cancel: std::sync::Mutex::new(None),
156        })
157    }
158
159    pub async fn start(
160        &self,
161        is_boot: bool,
162        container: bool,
163        web_port: Option<u16>,
164        web_path: Option<String>,
165    ) -> Result<()> {
166        // Ensure the state directory and its contents are accessible by non-root
167        // users. This is needed when the supervisor is started with `sudo` — all
168        // files it creates are owned by root, which prevents normal CLI clients
169        // from reading/writing state or connecting to the IPC socket.
170        #[cfg(unix)]
171        fix_state_dir_permissions();
172
173        let pid = std::process::id();
174        // Ensure PROCS has data for the supervisor PID before upsert_daemon reads title()
175        PROCS.refresh_pids(&[pid]);
176        // Determine container mode: CLI flag takes priority, then settings
177        let container_mode = container || settings().supervisor.container;
178        if container_mode {
179            info!("Starting supervisor in container/PID1 mode with pid {pid}");
180        } else {
181            info!("Starting supervisor with pid {pid}");
182        }
183
184        self.upsert_daemon(
185            UpsertDaemonOpts::builder(DaemonId::pitchfork())
186                .set(|o| {
187                    o.pid = Some(pid);
188                    o.status = DaemonStatus::Running;
189                })
190                .build(),
191        )
192        .await?;
193        #[cfg(unix)]
194        fix_state_dir_permissions();
195
196        // If this is a boot start, automatically start boot_start daemons
197        if is_boot {
198            info!("Boot start mode enabled, starting boot_start daemons");
199            self.start_boot_daemons().await?;
200        }
201
202        self.interval_watch()?;
203        self.cron_watch()?;
204        self.signals()?;
205        self.daemon_file_watch()?;
206
207        // In container mode, install SIGCHLD handler to reap orphaned/zombie processes
208        #[cfg(unix)]
209        if container_mode {
210            self.reap_zombies()?;
211        }
212
213        // Start web server: CLI --web-port takes priority, then settings.web.auto_start + bind_port
214        let s = settings();
215        let effective_port = web_port.or_else(|| {
216            if s.web.auto_start {
217                match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
218                    Some(p) => Some(p),
219                    None => {
220                        error!(
221                            "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
222                            s.web.bind_port
223                        );
224                        None
225                    }
226                }
227            } else {
228                None
229            }
230        });
231        // CLI --web-path takes priority, then settings.web.base_path
232        let effective_path = web_path.or_else(|| {
233            let bp = s.web.base_path.clone();
234            if bp.is_empty() { None } else { Some(bp) }
235        });
236        if let Some(port) = effective_port {
237            tokio::spawn(async move {
238                if let Err(e) = crate::web::serve(port, effective_path).await {
239                    error!("Web server error: {e}");
240                }
241            });
242        }
243
244        // Start reverse proxy server if enabled
245        if s.proxy.enable {
246            // Pre-generate the TLS certificate synchronously before spawning the proxy
247            // task. This ensures the cert exists immediately after `sup start` returns,
248            // so `proxy trust` can be run right away without waiting for the async task.
249            #[cfg(feature = "proxy-tls")]
250            if s.proxy.https {
251                let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
252                let ca_cert_path = proxy_dir.join("ca.pem");
253                let ca_key_path = proxy_dir.join("ca-key.pem");
254                if !ca_cert_path.exists() || !ca_key_path.exists() {
255                    match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
256                        Ok(()) => {
257                            info!(
258                                "Generated local CA certificate at {}",
259                                ca_cert_path.display()
260                            );
261                        }
262                        Err(e) => {
263                            error!("Failed to generate CA certificate: {e}");
264                        }
265                    }
266                }
267
268                // Auto-trust: attempt to install the CA certificate into the
269                // system trust store. May fail silently due to permissions;
270                // user can run `pitchfork proxy trust` manually.
271                if s.proxy.auto_trust && ca_cert_path.exists() {
272                    use crate::proxy::trust::{AutoTrustResult, auto_trust};
273                    match auto_trust(&ca_cert_path) {
274                        AutoTrustResult::AlreadyTrusted => {}
275                        AutoTrustResult::Trusted => {
276                            info!("CA certificate auto-trusted in system store");
277                        }
278                        AutoTrustResult::NotTrusted { reason } => {
279                            warn!("Auto-trust skipped: {reason}");
280                            warn!("Run `pitchfork proxy trust` to install manually");
281                        }
282                    }
283                }
284            }
285            // Spawn the proxy server and wait for its bind result via a oneshot
286            // channel.  This avoids the TOCTOU race of a pre-flight bind check
287            // while still surfacing binding failures immediately.
288            let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
289            let proxy_cancel = tokio_util::sync::CancellationToken::new();
290            let proxy_cancel_clone = proxy_cancel.clone();
291            *self.proxy_cancel.lock().await = Some(proxy_cancel);
292            let proxy_task = tokio::spawn(async move {
293                if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
294                    error!("Proxy server error: {e}");
295                }
296            });
297            *self.proxy_task.lock().await = Some(proxy_task);
298            match bind_rx.await {
299                Ok(Ok(())) => {
300                    // Proxy bound successfully — start mDNS if LAN mode is enabled.
301                    self.start_mdns().await;
302                }
303                Ok(Err(msg)) => {
304                    error!("{msg}");
305                    self.add_notification(log::LevelFilter::Error, msg).await;
306                }
307                Err(_) => {
308                    // Sender dropped without sending — serve() panicked or
309                    // returned before signalling.  Already logged by the
310                    // spawn error handler above.
311                }
312            }
313        }
314
315        let (ipc, ipc_handle) = IpcServer::new()?;
316        *self.ipc_shutdown.lock().await = Some(ipc_handle);
317        self.start_state_flush_task();
318        self.conn_watch(ipc).await
319    }
320
321    /// Start mDNS publishing for LAN mode (called after the proxy binds successfully).
322    async fn start_mdns(&self) {
323        let s = crate::settings::settings();
324        let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
325        if !s.proxy.enable || !lan_enabled {
326            return;
327        }
328
329        let lan_ip = if !s.proxy.lan_ip.is_empty() {
330            match s.proxy.lan_ip.parse::<std::net::Ipv4Addr>() {
331                Ok(ip) => Some(ip),
332                Err(e) => {
333                    error!(
334                        "proxy.lan_ip {:?} is not a valid IPv4 address: {e}",
335                        s.proxy.lan_ip
336                    );
337                    return;
338                }
339            }
340        } else {
341            match crate::proxy::lan_ip::detect_lan_ip().await {
342                Some(ip) => Some(ip),
343                None => {
344                    error!(
345                        "LAN mode is enabled but no LAN IP address could be detected. \
346                         Set proxy.lan_ip to a specific address, or ensure you are connected to a network."
347                    );
348                    return;
349                }
350            }
351        };
352
353        let Some(lan_ip) = lan_ip else { return };
354        let port = u16::try_from(s.proxy.port).unwrap_or(443);
355
356        let Some(mut publisher) = crate::proxy::mdns::MdnsPublisher::new(lan_ip) else {
357            error!("Failed to start mDNS publisher. Is Avahi (Linux) or Bonjour (macOS) running?");
358            return;
359        };
360
361        // Publish all registered slugs.
362        let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
363        for slug in slugs.keys() {
364            let hostname = format!("{slug}.local");
365            publisher.publish(&hostname, port);
366        }
367
368        log::info!(
369            "LAN mode: mDNS publishing on {lan_ip}, {} slug(s) registered",
370            slugs.len()
371        );
372
373        let publisher = std::sync::Arc::new(tokio::sync::Mutex::new(publisher));
374
375        // Start the IP monitor (only when IP is auto-detected, not pinned).
376        let ip_pinned = !s.proxy.lan_ip.is_empty();
377        if !ip_pinned {
378            let monitor_cancel = self.proxy_cancel.lock().await.clone();
379            let publisher_clone = publisher.clone();
380            let task = tokio::spawn(async move {
381                let mut last_ip = lan_ip;
382                let interval = std::time::Duration::from_secs(5);
383                let mut ticker = tokio::time::interval(interval);
384                ticker.tick().await; // first tick is immediate
385                loop {
386                    ticker.tick().await;
387                    if let Some(cancel) = monitor_cancel.as_ref() {
388                        if cancel.is_cancelled() {
389                            break;
390                        }
391                    }
392                    if let Some(new_ip) =
393                        crate::proxy::lan_ip::detect_lan_ip_if_changed(last_ip).await
394                    {
395                        log::info!("LAN IP changed: {last_ip} → {new_ip}");
396                        last_ip = new_ip;
397                        let mut pub_guard = publisher_clone.lock().await;
398                        pub_guard.republish_all(new_ip, port);
399                    }
400                }
401            });
402            *self.lan_monitor_task.lock().await = Some(task);
403        }
404
405        *self.mdns_publisher.lock().await = Some(publisher);
406    }
407
408    /// Re-read slugs from config and update mDNS records.
409    ///
410    /// Publishes new slugs and unpublishes removed ones. Called via IPC when
411    /// `proxy add` or `proxy remove` modifies the slug registry.
412    async fn sync_mdns(&self) {
413        // Clone the Arc and release the outer lock immediately so we don't
414        // block close() from taking the publisher during shutdown.
415        let publisher = {
416            let guard = self.mdns_publisher.lock().await;
417            match guard.as_ref() {
418                Some(p) => p.clone(),
419                None => {
420                    debug!("sync_mdns: mDNS publisher not active, skipping");
421                    return;
422                }
423            }
424        };
425
426        let s = crate::settings::settings();
427        let port = u16::try_from(s.proxy.port).unwrap_or(443);
428
429        let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
430        let mut pub_guard = publisher.lock().await;
431
432        // Unpublish slugs that no longer exist in config.
433        let current_keys: Vec<&String> = slugs.keys().collect();
434        let registered: Vec<String> = pub_guard.registered_hostnames();
435        for hostname in &registered {
436            // hostname is "slug.local" — extract slug part.
437            let slug = hostname.strip_suffix(".local").unwrap_or(hostname);
438            if !current_keys.iter().any(|k| k.as_str() == slug) {
439                log::info!("mDNS: unpublishing removed slug {slug}");
440                pub_guard.unpublish(hostname);
441            }
442        }
443
444        // Publish new slugs that aren't yet registered.
445        for slug in slugs.keys() {
446            let hostname = format!("{slug}.local");
447            if !pub_guard.is_published(&hostname) {
448                log::info!("mDNS: publishing new slug {slug}");
449                pub_guard.publish(&hostname, port);
450            }
451        }
452    }
453
454    /// Spawn a background task that periodically flushes the state file to
455    /// disk if it has been marked dirty.  Uses debouncing (1s interval) to
456    /// batch rapid state changes.
457    fn start_state_flush_task(&self) {
458        let cancel = tokio_util::sync::CancellationToken::new();
459        *self.flush_cancel.lock().unwrap() = Some(cancel.clone());
460        tokio::spawn(async move {
461            let mut interval = time::interval(Duration::from_secs(1));
462            interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
463            loop {
464                tokio::select! {
465                    _ = interval.tick() => {}
466                    _ = cancel.cancelled() => {
467                        debug!("state flush task received shutdown signal");
468                        break;
469                    }
470                }
471                let state = SUPERVISOR.state_file.lock().await;
472                if state.is_dirty() {
473                    if let Err(e) = state.write() {
474                        warn!("failed to flush state file: {e}");
475                    }
476                }
477            }
478            debug!("state flush task exiting");
479        });
480    }
481
482    pub(crate) async fn flush_state(&self) {
483        let state = self.state_file.lock().await;
484        if state.is_dirty() {
485            if let Err(e) = state.write() {
486                warn!("failed to flush state file: {e}");
487            }
488        }
489    }
490
491    pub(crate) async fn refresh(&self) -> Result<()> {
492        trace!("refreshing");
493
494        // Collect PIDs we need to check (shell PIDs only)
495        // This is more efficient than refreshing all processes on the system
496        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
497        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
498
499        if pids_to_check.is_empty() {
500            // No PIDs to check, skip the expensive refresh
501            trace!("no shell PIDs to check, skipping process refresh");
502        } else {
503            PROCS.refresh_pids(&pids_to_check);
504        }
505
506        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
507        *last_refreshed_at = time::Instant::now();
508
509        for (dir, pids) in dirs_with_pids {
510            let to_remove = pids
511                .iter()
512                .filter(|pid| !PROCS.is_running(**pid))
513                .collect::<Vec<_>>();
514            for pid in &to_remove {
515                self.remove_shell_pid(**pid).await?
516            }
517            if to_remove.len() == pids.len() {
518                self.leave_dir(&dir).await?;
519            }
520        }
521
522        self.check_retry().await?;
523        self.process_pending_autostops().await?;
524
525        Ok(())
526    }
527
528    /// Install a SIGCHLD handler that reaps orphaned zombie child processes.
529    ///
530    /// When running as PID 1 inside a container, orphaned processes are
531    /// re-parented to PID 1. Without explicit reaping, they accumulate
532    /// as zombies in the process table indefinitely.
533    ///
534    /// Only reaps processes that are NOT managed by the supervisor (i.e.
535    /// not tracked in the state file). Managed daemon processes are reaped
536    /// by their monitoring tasks via `child.wait()`.
537    ///
538    /// ## Strategy
539    ///
540    /// **Linux**: Uses `waitid(Id::All, WNOHANG | WNOWAIT | WEXITED)` to
541    /// *peek* at the next zombie without consuming its status. If the PID
542    /// belongs to a managed daemon, the reaper skips it so Tokio's
543    /// `child.wait()` can collect the status normally. Only unmanaged
544    /// orphans are actually reaped (via `waitpid(Pid, WNOHANG)`). This
545    /// eliminates the race entirely.
546    ///
547    /// **Non-Linux Unix** (e.g. macOS — mainly for local development;
548    /// container mode targets Linux): `waitid` is unavailable, so we fall
549    /// back to `waitpid(None, WNOHANG)`. If the reaper accidentally
550    /// consumes a managed PID's status, it stashes the exit code in
551    /// [`REAPED_STATUSES`] for the monitoring task to recover.
552    #[cfg(unix)]
553    fn reap_zombies(&self) -> Result<()> {
554        let mut stream = signal::unix::signal(SignalKind::child())
555            .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
556        tokio::spawn(async move {
557            loop {
558                stream.recv().await;
559                // Collect PIDs of managed daemons so we don't steal their exit status
560                let managed_pids: HashSet<u32> = SUPERVISOR
561                    .state_file
562                    .lock()
563                    .await
564                    .daemons
565                    .values()
566                    .filter_map(|d| d.pid)
567                    .collect();
568                // Reap all available zombie children that are NOT managed
569                Self::reap_unmanaged_zombies(&managed_pids).await;
570            }
571        });
572        info!("container mode: SIGCHLD zombie reaper installed");
573        Ok(())
574    }
575
576    /// Linux implementation: peek with `waitid(WNOWAIT)` then selectively reap.
577    ///
578    /// `WNOWAIT` leaves the zombie in the table so we can inspect its PID
579    /// without consuming the exit status. Only if the PID is *not* managed
580    /// do we call `waitpid(Pid, WNOHANG)` to actually reap it.
581    #[cfg(target_os = "linux")]
582    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
583        use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
584        use nix::unistd::Pid;
585
586        loop {
587            // Peek at the next zombie without consuming it
588            let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
589            match waitid(Id::All, peek_flags) {
590                Ok(WaitStatus::StillAlive) => break,
591                Ok(status) => {
592                    let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
593                        break;
594                    };
595                    if managed_pids.contains(&pid_raw) {
596                        // This is a managed daemon — leave it for Tokio's child.wait().
597                        // We must break out of the loop because waitid(Id::All) would
598                        // keep returning the same zombie if we don't consume it.
599                        trace!(
600                            "zombie reaper: skipping managed daemon pid {pid_raw}, \
601                             leaving for Tokio to reap"
602                        );
603                        break;
604                    }
605                    // Not managed — actually reap it
606                    match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
607                        Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
608                        Err(nix::errno::Errno::ECHILD) => break,
609                        Err(e) => {
610                            trace!("waitpid error reaping pid {pid_raw}: {e}");
611                            break;
612                        }
613                    }
614                }
615                Err(nix::errno::Errno::ECHILD) => break, // no children at all
616                Err(e) => {
617                    trace!("waitid error in zombie reaper: {e}");
618                    break;
619                }
620            }
621        }
622    }
623
624    /// Non-Linux fallback: blind `waitpid(None, WNOHANG)` with stash recovery.
625    ///
626    /// Since `waitid(WNOWAIT)` is not available, we cannot peek. If we
627    /// accidentally reap a managed PID, we stash the exit code in
628    /// [`REAPED_STATUSES`] so the monitoring task can recover it.
629    #[cfg(all(unix, not(target_os = "linux")))]
630    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
631        use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
632
633        loop {
634            match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
635                Ok(WaitStatus::StillAlive) => break,
636                Ok(status) => {
637                    let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
638                        continue;
639                    };
640                    if managed_pids.contains(&pid) {
641                        // Race lost — stash the exit code for lifecycle recovery
642                        let exit_code = match status {
643                            WaitStatus::Exited(_, code) => code,
644                            WaitStatus::Signaled(_, sig, _) => -(sig as i32),
645                            _ => -1,
646                        };
647                        warn!(
648                            "zombie reaper reaped managed daemon pid {pid} \
649                             (exit_code={exit_code}); stashing status for recovery"
650                        );
651                        REAPED_STATUSES.lock().await.insert(pid, exit_code);
652                    } else {
653                        trace!("reaped orphaned zombie child: {status:?}");
654                    }
655                }
656                Err(nix::errno::Errno::ECHILD) => break, // no more children
657                Err(e) => {
658                    trace!("waitpid error in zombie reaper: {e}");
659                    break;
660                }
661            }
662        }
663    }
664
665    #[cfg(unix)]
666    fn signals(&self) -> Result<()> {
667        let signals = [
668            SignalKind::terminate(),
669            SignalKind::alarm(),
670            SignalKind::interrupt(),
671            SignalKind::quit(),
672            SignalKind::hangup(),
673            SignalKind::user_defined1(),
674            SignalKind::user_defined2(),
675        ];
676        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
677        for signal in signals {
678            let stream = match signal::unix::signal(signal) {
679                Ok(s) => s,
680                Err(e) => {
681                    warn!("Failed to register signal handler for {signal:?}: {e}");
682                    continue;
683                }
684            };
685            tokio::spawn(async move {
686                let mut stream = stream;
687                loop {
688                    stream.recv().await;
689                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
690                        exit(1);
691                    } else {
692                        SUPERVISOR.handle_signal().await;
693                    }
694                }
695            });
696        }
697        Ok(())
698    }
699
700    #[cfg(windows)]
701    fn signals(&self) -> Result<()> {
702        tokio::spawn(async move {
703            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
704            loop {
705                if let Err(e) = signal::ctrl_c().await {
706                    error!("Failed to wait for ctrl-c: {}", e);
707                    return;
708                }
709                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
710                    exit(1);
711                } else {
712                    SUPERVISOR.handle_signal().await;
713                }
714            }
715        });
716        Ok(())
717    }
718
719    async fn handle_signal(&self) {
720        info!("received signal, stopping");
721        self.close().await;
722        exit(0)
723    }
724
725    pub(crate) async fn close(&self) {
726        // Signal the proxy server to stop accepting new connections
727        // and drain in-flight ones, *before* stopping daemons so the
728        // proxy has time to finish forwarding active requests.
729        if let Some(cancel) = self.proxy_cancel.lock().await.take() {
730            cancel.cancel();
731        }
732
733        // Stop the LAN IP monitor task.
734        if let Some(monitor_task) = self.lan_monitor_task.lock().await.take() {
735            monitor_task.abort();
736        }
737
738        // Shutdown the mDNS publisher (sends goodbye packets).
739        if let Some(publisher) = self.mdns_publisher.lock().await.take() {
740            publisher.lock().await.shutdown();
741        }
742
743        if let Some(proxy_task) = self.proxy_task.lock().await.take() {
744            let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
745        }
746
747        // Clean up /etc/hosts entries managed by pitchfork
748        let s = settings();
749        if s.proxy.enable && s.proxy.sync_hosts {
750            crate::proxy::hosts::clean_hosts_file();
751        }
752
753        let pitchfork_id = DaemonId::pitchfork();
754        let active = self.active_daemons().await;
755        let active_ids: Vec<DaemonId> = active
756            .iter()
757            .filter(|d| d.id != pitchfork_id)
758            .map(|d| d.id.clone())
759            .collect();
760
761        // Stop daemons in reverse dependency order.
762        // If dependency resolution fails (e.g. config changed), fall back to
763        // stopping in arbitrary order so we still shut down cleanly.
764        // Daemons within the same level are stopped concurrently.
765        let stop_levels = compute_reverse_stop_order(&active_ids);
766        for level in &stop_levels {
767            let mut tasks = Vec::new();
768            for id in level {
769                let id = id.clone();
770                tasks.push(tokio::spawn(async move {
771                    if let Err(err) = SUPERVISOR.stop(&id).await {
772                        error!("failed to stop daemon {id}: {err}");
773                    }
774                }));
775            }
776            for task in tasks {
777                let _ = task.await;
778            }
779        }
780        let _ = self.remove_daemon(&pitchfork_id).await;
781
782        // Signal the background state flush task to exit so it doesn't
783        // keep waking up and acquiring the state mutex after shutdown.
784        if let Some(cancel) = self.flush_cancel.lock().unwrap().take() {
785            cancel.cancel();
786        }
787
788        // Force-flush state to disk before shutting down IPC so no
789        // in-memory-only changes are lost.
790        {
791            let state = self.state_file.lock().await;
792            if state.is_dirty() {
793                if let Err(e) = state.write() {
794                    warn!("failed to flush state file during shutdown: {e}");
795                }
796            }
797        }
798
799        // Signal IPC server to shut down gracefully
800        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
801            handle.shutdown();
802        }
803
804        // Wait for all in-flight monitoring tasks to finish registering their
805        // hook handles. Each monitoring task increments `active_monitors` when
806        // its process exits, and decrements it (+ notifies `monitor_done`)
807        // after all fire_hook() calls complete. This replaces the old
808        // yield_now() approach which had a race window.
809        let drain_timeout = time::sleep(Duration::from_secs(5));
810        tokio::pin!(drain_timeout);
811        loop {
812            if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
813                break;
814            }
815            tokio::select! {
816                _ = self.monitor_done.notified() => {}
817                _ = &mut drain_timeout => {
818                    warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
819                    break;
820                }
821            }
822        }
823        let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
824        let hook_timeout = Duration::from_secs(30);
825        for handle in handles {
826            match time::timeout(hook_timeout, handle).await {
827                Ok(_) => {} // Hook completed (success or error, doesn't matter)
828                Err(_) => {
829                    warn!(
830                        "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
831                    );
832                }
833            }
834        }
835
836        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
837    }
838
839    pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
840        self.pending_notifications
841            .lock()
842            .await
843            .push((level, message));
844    }
845}
846
847/// Fix ownership on the state directory so non-root users can access files
848/// created by a `sudo`-started supervisor.
849///
850/// When `[settings.supervisor] user` or `SUDO_UID`/`SUDO_GID` are set, we
851/// `chown` the state directory and safe subdirectories back to that non-root
852/// runtime user. This is strictly better than `chmod 0o666` because it does not
853/// widen the permission bits — the files stay owner-only (0o600/0o700) but the
854/// *owner* is the user that daemon processes and CLI clients need to share.
855///
856/// **Security**: The `proxy/` subtree is intentionally skipped. It contains
857/// `ca-key.pem` which must remain `0o600` and owned by the process that
858/// generated it. Changing its ownership or permissions would expose the CA
859/// private key to other local users.
860///
861/// If neither `user` nor `SUDO_UID`/`SUDO_GID` are available (e.g. direct
862/// root login), we fall back to relaxing permissions on only the `sock/` and
863/// `logs/` subdirectories (plus `state.toml`) so CLI clients can still function.
864#[cfg(unix)]
865fn fix_state_dir_permissions() {
866    let state_dir = &*env::PITCHFORK_STATE_DIR;
867    if let Some((uid, gid)) = state_owner_ids() {
868        if !state_dir.exists()
869            && let Err(err) = fs::create_dir_all(state_dir)
870        {
871            warn!(
872                "failed to create state directory for ownership fix at {}: {err}",
873                state_dir.display()
874            );
875            return;
876        }
877
878        // Best path: chown back to the runtime user. Permissions stay tight.
879        chown_recursive(state_dir, uid, gid, true);
880        debug!(
881            "chowned state directory to uid={uid} gid={gid} at {}",
882            state_dir.display()
883        );
884    } else {
885        if !state_dir.exists() {
886            return;
887        }
888
889        // Fallback: relax permissions on safe subdirectories only.
890        // proxy/ is never touched.
891        chmod_safe_subtrees(state_dir);
892        debug!(
893            "relaxed permissions on safe subtrees at {}",
894            state_dir.display()
895        );
896    }
897}
898
899#[cfg(unix)]
900pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
901    if !nix::unistd::Uid::effective().is_root() {
902        return None;
903    }
904
905    let user = settings().supervisor.user.trim();
906    if !user.is_empty() {
907        return resolve_supervisor_user_ids(user).or_else(|| {
908            warn!(
909                "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
910            );
911            parse_sudo_ids()
912        });
913    }
914
915    parse_sudo_ids()
916}
917
918#[cfg(unix)]
919fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
920    let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
921        let uid = user.parse::<u32>().ok()?;
922        nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
923            .ok()
924            .flatten()
925    } else {
926        nix::unistd::User::from_name(user).ok().flatten()
927    }?;
928
929    Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
930}
931
932/// Parse `SUDO_UID` and `SUDO_GID` environment variables into numeric IDs.
933///
934/// Returns `None` unless the effective UID is 0 (root). This prevents stale
935/// `SUDO_UID`/`SUDO_GID` values inherited into non-sudo environments from
936/// triggering incorrect `chown` operations.
937#[cfg(unix)]
938fn parse_sudo_ids() -> Option<(u32, u32)> {
939    if !nix::unistd::Uid::effective().is_root() {
940        return None;
941    }
942    let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
943    let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
944    Some((uid, gid))
945}
946
947/// Recursively `chown` a directory tree. If `skip_proxy` is true, the `proxy/`
948/// subdirectory is skipped entirely to protect the CA private key.
949#[cfg(unix)]
950fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
951    // chown the directory itself
952    let _ = chown_path(dir, uid, gid);
953
954    let entries = match std::fs::read_dir(dir) {
955        Ok(e) => e,
956        Err(_) => return,
957    };
958    for entry in entries.flatten() {
959        let path = entry.path();
960        if path.is_dir() {
961            // Skip proxy/ at the top level of the state directory
962            if skip_proxy {
963                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
964                    if name == "proxy" {
965                        continue;
966                    }
967                }
968            }
969            chown_recursive(&path, uid, gid, false);
970        } else {
971            let _ = chown_path(&path, uid, gid);
972        }
973    }
974}
975
976/// `chown` a single path using libc. Returns Ok(()) on success.
977#[cfg(unix)]
978fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
979    use std::ffi::CString;
980    use std::os::unix::ffi::OsStrExt;
981    let c_path = CString::new(path.as_os_str().as_bytes())
982        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
983    let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
984    if ret == 0 {
985        Ok(())
986    } else {
987        Err(std::io::Error::last_os_error())
988    }
989}
990
991/// Fallback: relax permissions on safe subdirectories only (sock/, logs/, and
992/// state.toml). The proxy/ subtree is never touched.
993#[cfg(unix)]
994fn chmod_safe_subtrees(state_dir: &std::path::Path) {
995    // The state directory itself needs to be traversable
996    let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
997
998    // state.toml — needs to be readable by CLI clients
999    let state_file = state_dir.join("state.toml");
1000    if state_file.exists() {
1001        let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
1002    }
1003
1004    // Safe subdirectories: sock/ and logs/
1005    for subdir_name in &["sock", "logs"] {
1006        let subdir = state_dir.join(subdir_name);
1007        if subdir.is_dir() {
1008            chmod_recursive(&subdir);
1009        }
1010    }
1011}
1012
1013/// Recursively chmod: directories → 0o755, files → 0o644.
1014#[cfg(unix)]
1015fn chmod_recursive(dir: &std::path::Path) {
1016    let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
1017    let entries = match fs::read_dir(dir) {
1018        Ok(e) => e,
1019        Err(_) => return,
1020    };
1021    for entry in entries.flatten() {
1022        let path = entry.path();
1023        if path.is_dir() {
1024            chmod_recursive(&path);
1025        } else {
1026            let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
1027        }
1028    }
1029}