Skip to main content

pitchfork_cli/supervisor/
mod.rs

1//! Supervisor module - daemon process supervisor
2//!
3//! This module is split into focused submodules:
4//! - `state`: State access layer (get/set operations)
5//! - `lifecycle`: Daemon start/stop operations
6//! - `autostop`: Autostop logic and boot daemon startup
7//! - `retry`: Retry logic with backoff
8//! - `watchers`: Background tasks (interval, cron, file watching)
9//! - `ipc_handlers`: IPC request dispatch
10
11mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49/// Exit statuses reaped by the container-mode zombie reaper for managed daemon
50/// PIDs. On non-Linux Unix platforms where `waitid(WNOWAIT)` is unavailable,
51/// `waitpid(None, WNOHANG)` may race with Tokio's `child.wait()`. When the
52/// zombie reaper wins, the exit status is stashed here so the monitoring task
53/// in lifecycle.rs can recover it instead of treating the ECHILD as a failure.
54///
55/// On Linux this map is unused because the reaper uses `waitid` with `WNOWAIT`
56/// to peek before reaping, which avoids the race entirely.
57#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59    Lazy::new(|| Mutex::new(HashMap::new()));
60
61// Re-export types needed by other modules
62pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65    pub(crate) state_file: Mutex<StateFile>,
66    pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67    pub(crate) last_refreshed_at: Mutex<time::Instant>,
68    /// Map of daemon ID to scheduled autostop time
69    pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70    /// Handle for graceful IPC server shutdown
71    pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72    /// Tracks in-flight hook tasks so shutdown can wait for them to complete
73    pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74    /// Number of monitoring tasks that are still running (between process exit
75    /// and hook registration completion). Used by `close()` to know when it is
76    /// safe to drain `hook_tasks`.
77    pub(crate) active_monitors: AtomicU32,
78    /// Signalled by each monitoring task after it finishes registering hooks
79    /// (or decides it has nothing to register). `close()` waits on this.
80    pub(crate) monitor_done: Notify,
81    /// Cancellation token for the proxy server — cancelled on shutdown to
82    /// stop accepting new connections and drain in-flight ones.
83    pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84    /// Join handle for the proxy task so shutdown can wait for cleanup.
85    pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86    /// mDNS publisher for LAN mode (None if LAN mode is disabled).
87    /// Shared with the LAN IP monitor task so it can re-publish on IP change.
88    pub(crate) mdns_publisher:
89        Mutex<Option<std::sync::Arc<tokio::sync::Mutex<crate::proxy::mdns::MdnsPublisher>>>>,
90    /// Join handle for the LAN IP monitor task.
91    pub(crate) lan_monitor_task: Mutex<Option<JoinHandle<()>>>,
92    /// Cancellation token for the background state flush task.
93    pub(crate) flush_cancel: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
94}
95
96pub(crate) fn interval_duration() -> Duration {
97    settings().general_interval()
98}
99
100pub static SUPERVISOR: Lazy<Supervisor> =
101    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
102
103pub fn start_if_not_running() -> Result<()> {
104    let sf = StateFile::get();
105    if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
106        && let Some(pid) = d.pid
107    {
108        PROCS.refresh_pids(&[pid]);
109        if PROCS.is_running(pid) {
110            return Ok(());
111        }
112    }
113    start_in_background()
114}
115
116pub fn start_in_background() -> Result<()> {
117    debug!("starting supervisor in background");
118    // Ensure the log directory exists so we can redirect stderr there.
119    // Panics and other fatal errors from the background supervisor process
120    // would otherwise be silently swallowed.
121    let log_file = &*env::PITCHFORK_LOG_FILE;
122    if let Some(parent) = log_file.parent() {
123        let _ = fs::create_dir_all(parent);
124    }
125    let stderr_file = fs::OpenOptions::new()
126        .create(true)
127        .append(true)
128        .open(log_file)
129        .into_diagnostic()?;
130    #[cfg(unix)]
131    fix_state_dir_permissions();
132    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
133        .stdout_null()
134        .stderr_file(stderr_file)
135        .start()
136        .into_diagnostic()?;
137    Ok(())
138}
139
140impl Supervisor {
141    pub fn new() -> Result<Self> {
142        Ok(Self {
143            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
144            last_refreshed_at: Mutex::new(time::Instant::now()),
145            pending_notifications: Mutex::new(vec![]),
146            pending_autostops: Mutex::new(HashMap::new()),
147            ipc_shutdown: Mutex::new(None),
148            hook_tasks: Mutex::new(Vec::new()),
149            active_monitors: AtomicU32::new(0),
150            monitor_done: Notify::new(),
151            proxy_cancel: Mutex::new(None),
152            proxy_task: Mutex::new(None),
153            mdns_publisher: Mutex::new(None),
154            lan_monitor_task: Mutex::new(None),
155            flush_cancel: std::sync::Mutex::new(None),
156        })
157    }
158
159    pub async fn start(
160        &self,
161        is_boot: bool,
162        container: bool,
163        web_port: Option<u16>,
164        web_path: Option<String>,
165    ) -> Result<()> {
166        // Ensure the state directory and its contents are accessible by non-root
167        // users. This is needed when the supervisor is started with `sudo` — all
168        // files it creates are owned by root, which prevents normal CLI clients
169        // from reading/writing state or connecting to the IPC socket.
170        #[cfg(unix)]
171        fix_state_dir_permissions();
172
173        let pid = std::process::id();
174        // Ensure PROCS has data for the supervisor PID before upsert_daemon reads title()
175        PROCS.refresh_pids(&[pid]);
176        // Determine container mode: CLI flag takes priority, then settings
177        let container_mode = container || settings().supervisor.container;
178        if container_mode {
179            info!("Starting supervisor in container/PID1 mode with pid {pid}");
180        } else {
181            info!("Starting supervisor with pid {pid}");
182        }
183
184        // If the previous supervisor died uncleanly, its daemon child processes
185        // may still be alive (orphaned, re-parented to init).  Terminate them
186        // before we take over so we don't end up with duplicate processes
187        // holding the same ports.
188        cleanup_orphaned_daemons(self).await;
189
190        self.upsert_daemon(
191            UpsertDaemonOpts::builder(DaemonId::pitchfork())
192                .set(|o| {
193                    o.pid = Some(pid);
194                    o.status = DaemonStatus::Running;
195                })
196                .build(),
197        )
198        .await?;
199        #[cfg(unix)]
200        fix_state_dir_permissions();
201
202        // If this is a boot start, automatically start boot_start daemons
203        if is_boot {
204            info!("Boot start mode enabled, starting boot_start daemons");
205            self.start_boot_daemons().await?;
206        }
207
208        self.interval_watch()?;
209        self.cron_watch()?;
210        self.signals()?;
211        self.daemon_file_watch()?;
212
213        // In container mode, install SIGCHLD handler to reap orphaned/zombie processes
214        #[cfg(unix)]
215        if container_mode {
216            self.reap_zombies()?;
217        }
218
219        // Start web server: CLI --web-port takes priority, then settings.web.auto_start + bind_port
220        let s = settings();
221        let effective_port = web_port.or_else(|| {
222            if s.web.auto_start {
223                match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
224                    Some(p) => Some(p),
225                    None => {
226                        error!(
227                            "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
228                            s.web.bind_port
229                        );
230                        None
231                    }
232                }
233            } else {
234                None
235            }
236        });
237        // CLI --web-path takes priority, then settings.web.base_path
238        let effective_path = web_path.or_else(|| {
239            let bp = s.web.base_path.clone();
240            if bp.is_empty() { None } else { Some(bp) }
241        });
242        if let Some(port) = effective_port {
243            tokio::spawn(async move {
244                if let Err(e) = crate::web::serve(port, effective_path).await {
245                    error!("Web server error: {e}");
246                }
247            });
248        }
249
250        // Start standalone API server if configured
251        let api_port = if s.api.auto_start {
252            match u16::try_from(s.api.bind_port).ok().filter(|&p| p > 0) {
253                Some(p) => Some(p),
254                None => {
255                    error!(
256                        "api.bind_port {} is out of valid port range (1-65535), API server disabled",
257                        s.api.bind_port
258                    );
259                    None
260                }
261            }
262        } else {
263            None
264        };
265        if let Some(port) = api_port {
266            tokio::spawn(async move {
267                if let Err(e) = crate::web::serve_api(port, None).await {
268                    error!("API server error: {e}");
269                }
270            });
271        }
272
273        // Start reverse proxy server if enabled
274        if s.proxy.enable {
275            // Pre-generate the TLS certificate synchronously before spawning the proxy
276            // task. This ensures the cert exists immediately after `sup start` returns,
277            // so `proxy trust` can be run right away without waiting for the async task.
278            #[cfg(feature = "proxy-tls")]
279            if s.proxy.https {
280                let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
281                let ca_cert_path = proxy_dir.join("ca.pem");
282                let ca_key_path = proxy_dir.join("ca-key.pem");
283                if !ca_cert_path.exists() || !ca_key_path.exists() {
284                    match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
285                        Ok(()) => {
286                            info!(
287                                "Generated local CA certificate at {}",
288                                ca_cert_path.display()
289                            );
290                        }
291                        Err(e) => {
292                            error!("Failed to generate CA certificate: {e}");
293                        }
294                    }
295                }
296
297                // Auto-trust: attempt to install the CA certificate into the
298                // system trust store. May fail silently due to permissions;
299                // user can run `pitchfork proxy trust` manually.
300                if s.proxy.auto_trust && ca_cert_path.exists() {
301                    use crate::proxy::trust::{AutoTrustResult, auto_trust};
302                    match auto_trust(&ca_cert_path) {
303                        AutoTrustResult::AlreadyTrusted => {}
304                        AutoTrustResult::Trusted => {
305                            info!("CA certificate auto-trusted in system store");
306                        }
307                        AutoTrustResult::NotTrusted { reason } => {
308                            warn!("Auto-trust skipped: {reason}");
309                            warn!("Run `pitchfork proxy trust` to install manually");
310                        }
311                    }
312                }
313            }
314            // Spawn the proxy server and wait for its bind result via a oneshot
315            // channel.  This avoids the TOCTOU race of a pre-flight bind check
316            // while still surfacing binding failures immediately.
317            let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
318            let proxy_cancel = tokio_util::sync::CancellationToken::new();
319            let proxy_cancel_clone = proxy_cancel.clone();
320            *self.proxy_cancel.lock().await = Some(proxy_cancel);
321            let proxy_task = tokio::spawn(async move {
322                if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
323                    error!("Proxy server error: {e}");
324                }
325            });
326            *self.proxy_task.lock().await = Some(proxy_task);
327            match bind_rx.await {
328                Ok(Ok(())) => {
329                    info!("Proxy server bound successfully");
330                    self.start_mdns().await;
331                }
332                Ok(Err(msg)) => {
333                    error!("{msg}");
334                    self.add_notification(log::LevelFilter::Error, msg).await;
335                }
336                Err(_) => {
337                    // Sender dropped without sending — serve() panicked or
338                    // returned before signalling.  Already logged by the
339                    // spawn error handler above.
340                }
341            }
342        }
343
344        // Pre-warm slug cache so the first /api/proxies request is fast.
345        // Spawned as a background task so it does not block startup.
346        tokio::spawn(async {
347            crate::proxy::server::get_cached_slugs().await;
348        });
349
350        let (ipc, ipc_handle) = IpcServer::new()?;
351        *self.ipc_shutdown.lock().await = Some(ipc_handle);
352        self.start_state_flush_task();
353        self.conn_watch(ipc).await
354    }
355
356    /// Start mDNS publishing for LAN mode (called after the proxy binds successfully).
357    async fn start_mdns(&self) {
358        let s = crate::settings::settings();
359        let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
360        if !s.proxy.enable || !lan_enabled {
361            return;
362        }
363
364        let lan_ip = if !s.proxy.lan_ip.is_empty() {
365            match s.proxy.lan_ip.parse::<std::net::Ipv4Addr>() {
366                Ok(ip) => Some(ip),
367                Err(e) => {
368                    error!(
369                        "proxy.lan_ip {:?} is not a valid IPv4 address: {e}",
370                        s.proxy.lan_ip
371                    );
372                    return;
373                }
374            }
375        } else {
376            match crate::proxy::lan_ip::detect_lan_ip().await {
377                Some(ip) => Some(ip),
378                None => {
379                    error!(
380                        "LAN mode is enabled but no LAN IP address could be detected. \
381                         Set proxy.lan_ip to a specific address, or ensure you are connected to a network."
382                    );
383                    return;
384                }
385            }
386        };
387
388        let Some(lan_ip) = lan_ip else { return };
389        let port = u16::try_from(s.proxy.port).unwrap_or(443);
390
391        let Some(mut publisher) = crate::proxy::mdns::MdnsPublisher::new(lan_ip) else {
392            error!("Failed to start mDNS publisher. Is Avahi (Linux) or Bonjour (macOS) running?");
393            return;
394        };
395
396        // Publish all registered slugs.
397        let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
398        for slug in slugs.keys() {
399            let hostname = format!("{slug}.local");
400            publisher.publish(&hostname, port);
401        }
402
403        log::info!(
404            "LAN mode: mDNS publishing on {lan_ip}, {} slug(s) registered",
405            slugs.len()
406        );
407
408        let publisher = std::sync::Arc::new(tokio::sync::Mutex::new(publisher));
409
410        // Start the IP monitor (only when IP is auto-detected, not pinned).
411        let ip_pinned = !s.proxy.lan_ip.is_empty();
412        if !ip_pinned {
413            let monitor_cancel = self.proxy_cancel.lock().await.clone();
414            let publisher_clone = publisher.clone();
415            let task = tokio::spawn(async move {
416                let mut last_ip = lan_ip;
417                let interval = std::time::Duration::from_secs(5);
418                let mut ticker = tokio::time::interval(interval);
419                ticker.tick().await; // first tick is immediate
420                loop {
421                    ticker.tick().await;
422                    if let Some(cancel) = monitor_cancel.as_ref() {
423                        if cancel.is_cancelled() {
424                            break;
425                        }
426                    }
427                    if let Some(new_ip) =
428                        crate::proxy::lan_ip::detect_lan_ip_if_changed(last_ip).await
429                    {
430                        log::info!("LAN IP changed: {last_ip} → {new_ip}");
431                        last_ip = new_ip;
432                        let mut pub_guard = publisher_clone.lock().await;
433                        pub_guard.republish_all(new_ip, port);
434                    }
435                }
436            });
437            *self.lan_monitor_task.lock().await = Some(task);
438        }
439
440        *self.mdns_publisher.lock().await = Some(publisher);
441    }
442
443    /// Re-read slugs from config and update mDNS records.
444    ///
445    /// Publishes new slugs and unpublishes removed ones. Called via IPC when
446    /// `proxy add` or `proxy remove` modifies the slug registry.
447    async fn sync_mdns(&self) {
448        // Clone the Arc and release the outer lock immediately so we don't
449        // block close() from taking the publisher during shutdown.
450        let publisher = {
451            let guard = self.mdns_publisher.lock().await;
452            match guard.as_ref() {
453                Some(p) => p.clone(),
454                None => {
455                    debug!("sync_mdns: mDNS publisher not active, skipping");
456                    return;
457                }
458            }
459        };
460
461        let s = crate::settings::settings();
462        let port = u16::try_from(s.proxy.port).unwrap_or(443);
463
464        let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
465        let mut pub_guard = publisher.lock().await;
466
467        // Unpublish slugs that no longer exist in config.
468        let current_keys: Vec<&String> = slugs.keys().collect();
469        let registered: Vec<String> = pub_guard.registered_hostnames();
470        for hostname in &registered {
471            // hostname is "slug.local" — extract slug part.
472            let slug = hostname.strip_suffix(".local").unwrap_or(hostname);
473            if !current_keys.iter().any(|k| k.as_str() == slug) {
474                log::info!("mDNS: unpublishing removed slug {slug}");
475                pub_guard.unpublish(hostname);
476            }
477        }
478
479        // Publish new slugs that aren't yet registered.
480        for slug in slugs.keys() {
481            let hostname = format!("{slug}.local");
482            if !pub_guard.is_published(&hostname) {
483                log::info!("mDNS: publishing new slug {slug}");
484                pub_guard.publish(&hostname, port);
485            }
486        }
487    }
488
489    /// Spawn a background task that periodically flushes the state file to
490    /// disk if it has been marked dirty.  Uses debouncing (1s interval) to
491    /// batch rapid state changes.
492    fn start_state_flush_task(&self) {
493        let cancel = tokio_util::sync::CancellationToken::new();
494        *self.flush_cancel.lock().unwrap() = Some(cancel.clone());
495        tokio::spawn(async move {
496            let mut interval = time::interval(Duration::from_secs(1));
497            interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
498            loop {
499                tokio::select! {
500                    _ = interval.tick() => {}
501                    _ = cancel.cancelled() => {
502                        debug!("state flush task received shutdown signal");
503                        break;
504                    }
505                }
506                let state = SUPERVISOR.state_file.lock().await;
507                if state.is_dirty() {
508                    if let Err(e) = state.write() {
509                        warn!("failed to flush state file: {e}");
510                    }
511                }
512            }
513            debug!("state flush task exiting");
514        });
515    }
516
517    pub(crate) async fn flush_state(&self) {
518        let state = self.state_file.lock().await;
519        if state.is_dirty() {
520            if let Err(e) = state.write() {
521                warn!("failed to flush state file: {e}");
522            }
523        }
524    }
525
526    pub(crate) async fn refresh(&self) -> Result<()> {
527        trace!("refreshing");
528
529        // Collect PIDs we need to check (shell PIDs only)
530        // This is more efficient than refreshing all processes on the system
531        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
532        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
533
534        if pids_to_check.is_empty() {
535            // No PIDs to check, skip the expensive refresh
536            trace!("no shell PIDs to check, skipping process refresh");
537        } else {
538            PROCS.refresh_pids(&pids_to_check);
539        }
540
541        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
542        *last_refreshed_at = time::Instant::now();
543
544        for (dir, pids) in dirs_with_pids {
545            let to_remove = pids
546                .iter()
547                .filter(|pid| !PROCS.is_running(**pid))
548                .collect::<Vec<_>>();
549            for pid in &to_remove {
550                self.remove_shell_pid(**pid).await?
551            }
552            if to_remove.len() == pids.len() {
553                self.leave_dir(&dir).await?;
554            }
555        }
556
557        self.check_retry().await?;
558        self.process_pending_autostops().await?;
559
560        Ok(())
561    }
562
563    /// Install a SIGCHLD handler that reaps orphaned zombie child processes.
564    ///
565    /// When running as PID 1 inside a container, orphaned processes are
566    /// re-parented to PID 1. Without explicit reaping, they accumulate
567    /// as zombies in the process table indefinitely.
568    ///
569    /// Only reaps processes that are NOT managed by the supervisor (i.e.
570    /// not tracked in the state file). Managed daemon processes are reaped
571    /// by their monitoring tasks via `child.wait()`.
572    ///
573    /// ## Strategy
574    ///
575    /// **Linux**: Uses `waitid(Id::All, WNOHANG | WNOWAIT | WEXITED)` to
576    /// *peek* at the next zombie without consuming its status. If the PID
577    /// belongs to a managed daemon, the reaper skips it so Tokio's
578    /// `child.wait()` can collect the status normally. Only unmanaged
579    /// orphans are actually reaped (via `waitpid(Pid, WNOHANG)`). This
580    /// eliminates the race entirely.
581    ///
582    /// **Non-Linux Unix** (e.g. macOS — mainly for local development;
583    /// container mode targets Linux): `waitid` is unavailable, so we fall
584    /// back to `waitpid(None, WNOHANG)`. If the reaper accidentally
585    /// consumes a managed PID's status, it stashes the exit code in
586    /// [`REAPED_STATUSES`] for the monitoring task to recover.
587    #[cfg(unix)]
588    fn reap_zombies(&self) -> Result<()> {
589        let mut stream = signal::unix::signal(SignalKind::child())
590            .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
591        tokio::spawn(async move {
592            loop {
593                stream.recv().await;
594                // Collect PIDs of managed daemons so we don't steal their exit status
595                let managed_pids: HashSet<u32> = SUPERVISOR
596                    .state_file
597                    .lock()
598                    .await
599                    .daemons
600                    .values()
601                    .filter_map(|d| d.pid)
602                    .collect();
603                // Reap all available zombie children that are NOT managed
604                Self::reap_unmanaged_zombies(&managed_pids).await;
605            }
606        });
607        info!("container mode: SIGCHLD zombie reaper installed");
608        Ok(())
609    }
610
611    /// Linux implementation: peek with `waitid(WNOWAIT)` then selectively reap.
612    ///
613    /// `WNOWAIT` leaves the zombie in the table so we can inspect its PID
614    /// without consuming the exit status. Only if the PID is *not* managed
615    /// do we call `waitpid(Pid, WNOHANG)` to actually reap it.
616    #[cfg(target_os = "linux")]
617    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
618        use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
619        use nix::unistd::Pid;
620
621        loop {
622            // Peek at the next zombie without consuming it
623            let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
624            match waitid(Id::All, peek_flags) {
625                Ok(WaitStatus::StillAlive) => break,
626                Ok(status) => {
627                    let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
628                        break;
629                    };
630                    if managed_pids.contains(&pid_raw) {
631                        // This is a managed daemon — leave it for Tokio's child.wait().
632                        // We must break out of the loop because waitid(Id::All) would
633                        // keep returning the same zombie if we don't consume it.
634                        trace!(
635                            "zombie reaper: skipping managed daemon pid {pid_raw}, \
636                             leaving for Tokio to reap"
637                        );
638                        break;
639                    }
640                    // Not managed — actually reap it
641                    match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
642                        Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
643                        Err(nix::errno::Errno::ECHILD) => break,
644                        Err(e) => {
645                            trace!("waitpid error reaping pid {pid_raw}: {e}");
646                            break;
647                        }
648                    }
649                }
650                Err(nix::errno::Errno::ECHILD) => break, // no children at all
651                Err(e) => {
652                    trace!("waitid error in zombie reaper: {e}");
653                    break;
654                }
655            }
656        }
657    }
658
659    /// Non-Linux fallback: blind `waitpid(None, WNOHANG)` with stash recovery.
660    ///
661    /// Since `waitid(WNOWAIT)` is not available, we cannot peek. If we
662    /// accidentally reap a managed PID, we stash the exit code in
663    /// [`REAPED_STATUSES`] so the monitoring task can recover it.
664    #[cfg(all(unix, not(target_os = "linux")))]
665    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
666        use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
667
668        loop {
669            match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
670                Ok(WaitStatus::StillAlive) => break,
671                Ok(status) => {
672                    let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
673                        continue;
674                    };
675                    if managed_pids.contains(&pid) {
676                        // Race lost — stash the exit code for lifecycle recovery
677                        let exit_code = match status {
678                            WaitStatus::Exited(_, code) => code,
679                            WaitStatus::Signaled(_, sig, _) => -(sig as i32),
680                            _ => -1,
681                        };
682                        warn!(
683                            "zombie reaper reaped managed daemon pid {pid} \
684                             (exit_code={exit_code}); stashing status for recovery"
685                        );
686                        REAPED_STATUSES.lock().await.insert(pid, exit_code);
687                    } else {
688                        trace!("reaped orphaned zombie child: {status:?}");
689                    }
690                }
691                Err(nix::errno::Errno::ECHILD) => break, // no more children
692                Err(e) => {
693                    trace!("waitpid error in zombie reaper: {e}");
694                    break;
695                }
696            }
697        }
698    }
699
700    #[cfg(unix)]
701    fn signals(&self) -> Result<()> {
702        let signals = [
703            SignalKind::terminate(),
704            SignalKind::alarm(),
705            SignalKind::interrupt(),
706            SignalKind::quit(),
707            SignalKind::hangup(),
708            SignalKind::user_defined1(),
709            SignalKind::user_defined2(),
710        ];
711        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
712        for signal in signals {
713            let stream = match signal::unix::signal(signal) {
714                Ok(s) => s,
715                Err(e) => {
716                    warn!("Failed to register signal handler for {signal:?}: {e}");
717                    continue;
718                }
719            };
720            tokio::spawn(async move {
721                let mut stream = stream;
722                loop {
723                    stream.recv().await;
724                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
725                        exit(1);
726                    } else {
727                        SUPERVISOR.handle_signal().await;
728                    }
729                }
730            });
731        }
732        Ok(())
733    }
734
735    #[cfg(windows)]
736    fn signals(&self) -> Result<()> {
737        tokio::spawn(async move {
738            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
739            loop {
740                if let Err(e) = signal::ctrl_c().await {
741                    error!("Failed to wait for ctrl-c: {}", e);
742                    return;
743                }
744                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
745                    exit(1);
746                } else {
747                    SUPERVISOR.handle_signal().await;
748                }
749            }
750        });
751        Ok(())
752    }
753
754    async fn handle_signal(&self) {
755        info!("received signal, stopping");
756        self.close().await;
757        exit(0)
758    }
759
760    pub(crate) async fn close(&self) {
761        // Signal the proxy server to stop accepting new connections
762        // and drain in-flight ones, *before* stopping daemons so the
763        // proxy has time to finish forwarding active requests.
764        if let Some(cancel) = self.proxy_cancel.lock().await.take() {
765            cancel.cancel();
766        }
767
768        // Stop the LAN IP monitor task.
769        if let Some(monitor_task) = self.lan_monitor_task.lock().await.take() {
770            monitor_task.abort();
771        }
772
773        // Shutdown the mDNS publisher (sends goodbye packets).
774        if let Some(publisher) = self.mdns_publisher.lock().await.take() {
775            publisher.lock().await.shutdown();
776        }
777
778        if let Some(proxy_task) = self.proxy_task.lock().await.take() {
779            let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
780        }
781
782        // Clean up /etc/hosts entries managed by pitchfork
783        let s = settings();
784        if s.proxy.enable && s.proxy.sync_hosts {
785            crate::proxy::hosts::clean_hosts_file();
786        }
787
788        let pitchfork_id = DaemonId::pitchfork();
789        let active = self.active_daemons().await;
790        let active_ids: Vec<DaemonId> = active
791            .iter()
792            .filter(|d| d.id != pitchfork_id)
793            .map(|d| d.id.clone())
794            .collect();
795
796        // Stop daemons in reverse dependency order.
797        // If dependency resolution fails (e.g. config changed), fall back to
798        // stopping in arbitrary order so we still shut down cleanly.
799        // Daemons within the same level are stopped concurrently.
800        let stop_levels = compute_reverse_stop_order(&active_ids);
801        for level in &stop_levels {
802            let mut tasks = Vec::new();
803            for id in level {
804                let id = id.clone();
805                tasks.push(tokio::spawn(async move {
806                    if let Err(err) = SUPERVISOR.stop(&id).await {
807                        error!("failed to stop daemon {id}: {err}");
808                    }
809                }));
810            }
811            for task in tasks {
812                let _ = task.await;
813            }
814        }
815        let _ = self.remove_daemon(&pitchfork_id).await;
816
817        // Signal the background state flush task to exit so it doesn't
818        // keep waking up and acquiring the state mutex after shutdown.
819        if let Some(cancel) = self.flush_cancel.lock().unwrap().take() {
820            cancel.cancel();
821        }
822
823        // Force-flush state to disk before shutting down IPC so no
824        // in-memory-only changes are lost.
825        {
826            let state = self.state_file.lock().await;
827            if state.is_dirty() {
828                if let Err(e) = state.write() {
829                    warn!("failed to flush state file during shutdown: {e}");
830                }
831            }
832        }
833
834        // Signal IPC server to shut down gracefully
835        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
836            handle.shutdown();
837        }
838
839        // Wait for all in-flight monitoring tasks to finish registering their
840        // hook handles. Each monitoring task increments `active_monitors` when
841        // its process exits, and decrements it (+ notifies `monitor_done`)
842        // after all fire_hook() calls complete. This replaces the old
843        // yield_now() approach which had a race window.
844        let drain_timeout = time::sleep(Duration::from_secs(5));
845        tokio::pin!(drain_timeout);
846        loop {
847            if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
848                break;
849            }
850            tokio::select! {
851                _ = self.monitor_done.notified() => {}
852                _ = &mut drain_timeout => {
853                    warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
854                    break;
855                }
856            }
857        }
858        let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
859        let hook_timeout = Duration::from_secs(30);
860        for handle in handles {
861            match time::timeout(hook_timeout, handle).await {
862                Ok(_) => {} // Hook completed (success or error, doesn't matter)
863                Err(_) => {
864                    warn!(
865                        "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
866                    );
867                }
868            }
869        }
870
871        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
872    }
873
874    pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
875        self.pending_notifications
876            .lock()
877            .await
878            .push((level, message));
879    }
880}
881
882/// Fix ownership on the state directory so non-root users can access files
883/// created by a `sudo`-started supervisor.
884///
885/// When `[settings.supervisor] user` or `SUDO_UID`/`SUDO_GID` are set, we
886/// `chown` the state directory and safe subdirectories back to that non-root
887/// runtime user. This is strictly better than `chmod 0o666` because it does not
888/// widen the permission bits — the files stay owner-only (0o600/0o700) but the
889/// *owner* is the user that daemon processes and CLI clients need to share.
890///
891/// **Security**: The `proxy/` subtree is intentionally skipped. It contains
892/// `ca-key.pem` which must remain `0o600` and owned by the process that
893/// generated it. Changing its ownership or permissions would expose the CA
894/// private key to other local users.
895///
896/// If neither `user` nor `SUDO_UID`/`SUDO_GID` are available (e.g. direct
897/// root login), we fall back to relaxing permissions on only the `sock/` and
898/// `logs/` subdirectories (plus `state.toml`) so CLI clients can still function.
899#[cfg(unix)]
900fn fix_state_dir_permissions() {
901    let state_dir = &*env::PITCHFORK_STATE_DIR;
902    if let Some((uid, gid)) = state_owner_ids() {
903        if !state_dir.exists()
904            && let Err(err) = fs::create_dir_all(state_dir)
905        {
906            warn!(
907                "failed to create state directory for ownership fix at {}: {err}",
908                state_dir.display()
909            );
910            return;
911        }
912
913        // Best path: chown back to the runtime user. Permissions stay tight.
914        chown_recursive(state_dir, uid, gid, true);
915        debug!(
916            "chowned state directory to uid={uid} gid={gid} at {}",
917            state_dir.display()
918        );
919    } else {
920        if !state_dir.exists() {
921            return;
922        }
923
924        // Fallback: relax permissions on safe subdirectories only.
925        // proxy/ is never touched.
926        chmod_safe_subtrees(state_dir);
927        debug!(
928            "relaxed permissions on safe subtrees at {}",
929            state_dir.display()
930        );
931    }
932}
933
934#[cfg(unix)]
935pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
936    if !nix::unistd::Uid::effective().is_root() {
937        return None;
938    }
939
940    let s = settings();
941    let user = s.supervisor.user.trim();
942    if !user.is_empty() {
943        return resolve_supervisor_user_ids(user).or_else(|| {
944            warn!(
945                "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
946            );
947            parse_sudo_ids()
948        });
949    }
950
951    parse_sudo_ids()
952}
953
954#[cfg(unix)]
955fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
956    let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
957        let uid = user.parse::<u32>().ok()?;
958        nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
959            .ok()
960            .flatten()
961    } else {
962        nix::unistd::User::from_name(user).ok().flatten()
963    }?;
964
965    Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
966}
967
968/// Parse `SUDO_UID` and `SUDO_GID` environment variables into numeric IDs.
969///
970/// Returns `None` unless the effective UID is 0 (root). This prevents stale
971/// `SUDO_UID`/`SUDO_GID` values inherited into non-sudo environments from
972/// triggering incorrect `chown` operations.
973#[cfg(unix)]
974fn parse_sudo_ids() -> Option<(u32, u32)> {
975    if !nix::unistd::Uid::effective().is_root() {
976        return None;
977    }
978    let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
979    let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
980    Some((uid, gid))
981}
982
983/// Recursively `chown` a directory tree. If `skip_proxy` is true, the `proxy/`
984/// subdirectory is skipped entirely to protect the CA private key.
985#[cfg(unix)]
986fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
987    // chown the directory itself
988    let _ = chown_path(dir, uid, gid);
989
990    let entries = match std::fs::read_dir(dir) {
991        Ok(e) => e,
992        Err(_) => return,
993    };
994    for entry in entries.flatten() {
995        let path = entry.path();
996        if path.is_dir() {
997            // Skip proxy/ at the top level of the state directory
998            if skip_proxy {
999                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
1000                    if name == "proxy" {
1001                        continue;
1002                    }
1003                }
1004            }
1005            chown_recursive(&path, uid, gid, false);
1006        } else {
1007            let _ = chown_path(&path, uid, gid);
1008        }
1009    }
1010}
1011
1012/// `chown` a single path using libc. Returns Ok(()) on success.
1013#[cfg(unix)]
1014fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
1015    use std::ffi::CString;
1016    use std::os::unix::ffi::OsStrExt;
1017    let c_path = CString::new(path.as_os_str().as_bytes())
1018        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
1019    let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
1020    if ret == 0 {
1021        Ok(())
1022    } else {
1023        Err(std::io::Error::last_os_error())
1024    }
1025}
1026
1027/// Fallback: relax permissions on safe subdirectories only (sock/, logs/, and
1028/// state.toml). The proxy/ subtree is never touched.
1029#[cfg(unix)]
1030fn chmod_safe_subtrees(state_dir: &std::path::Path) {
1031    // The state directory itself needs to be traversable
1032    let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
1033
1034    // state.toml — needs to be readable by CLI clients
1035    let state_file = state_dir.join("state.toml");
1036    if state_file.exists() {
1037        let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
1038    }
1039
1040    // Safe subdirectories: sock/ and logs/
1041    for subdir_name in &["sock", "logs"] {
1042        let subdir = state_dir.join(subdir_name);
1043        if subdir.is_dir() {
1044            chmod_recursive(&subdir);
1045        }
1046    }
1047}
1048
1049/// On startup, kill any daemon processes left behind by a previous supervisor
1050/// that was terminated unexpectedly (e.g. `kill -9`).
1051///
1052/// This iterates the state file for daemon entries with a recorded PID.  If the
1053/// PID is still alive and the process name matches the recorded `title`, it is
1054/// assumed to be an orphan from the previous supervisor session and is killed.
1055/// The daemon's state is then reset to `Stopped` with no PID.
1056///
1057/// The process-name check protects against the rare case where a PID has been
1058/// recycled by an unrelated process since the state file was written.
1059///
1060/// This is gated by the `supervisor.cleanup_orphans` setting (default: true).
1061async fn cleanup_orphaned_daemons(supervisor: &Supervisor) {
1062    if !settings().supervisor.cleanup_orphans {
1063        return;
1064    }
1065
1066    let candidates: Vec<_> = {
1067        let state = supervisor.state_file.lock().await;
1068        state
1069            .daemons
1070            .values()
1071            .filter(|d| d.id != DaemonId::pitchfork() && d.pid.is_some())
1072            .cloned()
1073            .collect()
1074    };
1075
1076    if candidates.is_empty() {
1077        return;
1078    }
1079
1080    info!(
1081        "checking {} daemon(s) for orphaned processes",
1082        candidates.len()
1083    );
1084
1085    for daemon in candidates {
1086        let Some(pid) = daemon.pid else { continue };
1087
1088        PROCS.refresh_pids(&[pid]);
1089
1090        if !PROCS.is_running(pid) {
1091            // PID already dead — just reset its state
1092            let _ = supervisor
1093                .upsert_daemon(
1094                    UpsertDaemonOpts::builder(daemon.id.clone())
1095                        .set(|o| {
1096                            o.pid = None;
1097                            o.status = DaemonStatus::Stopped;
1098                            o.active_port = None;
1099                        })
1100                        .build(),
1101                )
1102                .await;
1103            continue;
1104        }
1105
1106        // Safety check: verify the process name matches what we recorded.
1107        // This avoids accidentally killing an unrelated process if the PID
1108        // was recycled by the kernel between state-file writes.
1109        let current_title = PROCS.title(pid);
1110        let matches = match (&current_title, &daemon.title) {
1111            (Some(current), Some(expected)) => current == expected,
1112            // If we don't have a recorded title, fall back to allowing the kill
1113            // (this is a degraded but functional state — the process was probably
1114            // started before title tracking was added, or the state was reset).
1115            _ => true,
1116        };
1117
1118        if !matches {
1119            warn!(
1120                "pid {pid} for daemon {} has changed name (expected '{}', found '{}'); skipping orphan cleanup",
1121                daemon.id,
1122                daemon.title.as_deref().unwrap_or("?"),
1123                current_title.as_deref().unwrap_or("?")
1124            );
1125            // Still reset state so we don't track a stale PID
1126            let _ = supervisor
1127                .upsert_daemon(
1128                    UpsertDaemonOpts::builder(daemon.id.clone())
1129                        .set(|o| {
1130                            o.pid = None;
1131                            o.status = DaemonStatus::Stopped;
1132                            o.active_port = None;
1133                        })
1134                        .build(),
1135                )
1136                .await;
1137            continue;
1138        }
1139
1140        info!("terminating orphaned daemon {} (pid {pid})", daemon.id);
1141
1142        let stop_cfg = daemon.stop_signal.unwrap_or_default();
1143        let _ = PROCS
1144            .kill_process_group_async(pid, stop_cfg.signal.into(), stop_cfg.timeout)
1145            .await;
1146
1147        let _ = supervisor
1148            .upsert_daemon(
1149                UpsertDaemonOpts::builder(daemon.id.clone())
1150                    .set(|o| {
1151                        o.pid = None;
1152                        o.status = DaemonStatus::Stopped;
1153                        o.active_port = None;
1154                    })
1155                    .build(),
1156            )
1157            .await;
1158    }
1159}
1160
1161/// Recursively chmod: directories → 0o755, files → 0o644.
1162#[cfg(unix)]
1163fn chmod_recursive(dir: &std::path::Path) {
1164    let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
1165    let entries = match fs::read_dir(dir) {
1166        Ok(e) => e,
1167        Err(_) => return,
1168    };
1169    for entry in entries.flatten() {
1170        let path = entry.path();
1171        if path.is_dir() {
1172            chmod_recursive(&path);
1173        } else {
1174            let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
1175        }
1176    }
1177}