Skip to main content

pitchfork_cli/supervisor/
mod.rs

1//! Supervisor module - daemon process supervisor
2//!
3//! This module is split into focused submodules:
4//! - `state`: State access layer (get/set operations)
5//! - `lifecycle`: Daemon start/stop operations
6//! - `autostop`: Autostop logic and boot daemon startup
7//! - `retry`: Retry logic with backoff
8//! - `watchers`: Background tasks (interval, cron, file watching)
9//! - `ipc_handlers`: IPC request dispatch
10
11mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49/// Exit statuses reaped by the container-mode zombie reaper for managed daemon
50/// PIDs. On non-Linux Unix platforms where `waitid(WNOWAIT)` is unavailable,
51/// `waitpid(None, WNOHANG)` may race with Tokio's `child.wait()`. When the
52/// zombie reaper wins, the exit status is stashed here so the monitoring task
53/// in lifecycle.rs can recover it instead of treating the ECHILD as a failure.
54///
55/// On Linux this map is unused because the reaper uses `waitid` with `WNOWAIT`
56/// to peek before reaping, which avoids the race entirely.
57#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59    Lazy::new(|| Mutex::new(HashMap::new()));
60
61// Re-export types needed by other modules
62pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65    pub(crate) state_file: Mutex<StateFile>,
66    pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67    pub(crate) last_refreshed_at: Mutex<time::Instant>,
68    /// Map of daemon ID to scheduled autostop time
69    pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70    /// Handle for graceful IPC server shutdown
71    pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72    /// Tracks in-flight hook tasks so shutdown can wait for them to complete
73    pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74    /// Number of monitoring tasks that are still running (between process exit
75    /// and hook registration completion). Used by `close()` to know when it is
76    /// safe to drain `hook_tasks`.
77    pub(crate) active_monitors: AtomicU32,
78    /// Signalled by each monitoring task after it finishes registering hooks
79    /// (or decides it has nothing to register). `close()` waits on this.
80    pub(crate) monitor_done: Notify,
81    /// Cancellation token for the proxy server — cancelled on shutdown to
82    /// stop accepting new connections and drain in-flight ones.
83    pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84    /// Join handle for the proxy task so shutdown can wait for cleanup.
85    pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86}
87
88pub(crate) fn interval_duration() -> Duration {
89    settings().general_interval()
90}
91
92pub static SUPERVISOR: Lazy<Supervisor> =
93    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
94
95pub fn start_if_not_running() -> Result<()> {
96    let sf = StateFile::get();
97    if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
98        && let Some(pid) = d.pid
99        && PROCS.is_running(pid)
100    {
101        return Ok(());
102    }
103    start_in_background()
104}
105
106pub fn start_in_background() -> Result<()> {
107    debug!("starting supervisor in background");
108    // Ensure the log directory exists so we can redirect stderr there.
109    // Panics and other fatal errors from the background supervisor process
110    // would otherwise be silently swallowed.
111    let log_file = &*env::PITCHFORK_LOG_FILE;
112    if let Some(parent) = log_file.parent() {
113        let _ = fs::create_dir_all(parent);
114    }
115    let stderr_file = fs::OpenOptions::new()
116        .create(true)
117        .append(true)
118        .open(log_file)
119        .into_diagnostic()?;
120    #[cfg(unix)]
121    fix_state_dir_permissions();
122    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
123        .stdout_null()
124        .stderr_file(stderr_file)
125        .start()
126        .into_diagnostic()?;
127    Ok(())
128}
129
130impl Supervisor {
131    pub fn new() -> Result<Self> {
132        Ok(Self {
133            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
134            last_refreshed_at: Mutex::new(time::Instant::now()),
135            pending_notifications: Mutex::new(vec![]),
136            pending_autostops: Mutex::new(HashMap::new()),
137            ipc_shutdown: Mutex::new(None),
138            hook_tasks: Mutex::new(Vec::new()),
139            active_monitors: AtomicU32::new(0),
140            monitor_done: Notify::new(),
141            proxy_cancel: Mutex::new(None),
142            proxy_task: Mutex::new(None),
143        })
144    }
145
146    pub async fn start(
147        &self,
148        is_boot: bool,
149        container: bool,
150        web_port: Option<u16>,
151        web_path: Option<String>,
152    ) -> Result<()> {
153        // Ensure the state directory and its contents are accessible by non-root
154        // users. This is needed when the supervisor is started with `sudo` — all
155        // files it creates are owned by root, which prevents normal CLI clients
156        // from reading/writing state or connecting to the IPC socket.
157        #[cfg(unix)]
158        fix_state_dir_permissions();
159
160        let pid = std::process::id();
161        // Determine container mode: CLI flag takes priority, then settings
162        let container_mode = container || settings().supervisor.container;
163        if container_mode {
164            info!("Starting supervisor in container/PID1 mode with pid {pid}");
165        } else {
166            info!("Starting supervisor with pid {pid}");
167        }
168
169        self.upsert_daemon(
170            UpsertDaemonOpts::builder(DaemonId::pitchfork())
171                .set(|o| {
172                    o.pid = Some(pid);
173                    o.status = DaemonStatus::Running;
174                })
175                .build(),
176        )
177        .await?;
178        #[cfg(unix)]
179        fix_state_dir_permissions();
180
181        // If this is a boot start, automatically start boot_start daemons
182        if is_boot {
183            info!("Boot start mode enabled, starting boot_start daemons");
184            self.start_boot_daemons().await?;
185        }
186
187        self.interval_watch()?;
188        self.cron_watch()?;
189        self.signals()?;
190        self.daemon_file_watch()?;
191
192        // In container mode, install SIGCHLD handler to reap orphaned/zombie processes
193        #[cfg(unix)]
194        if container_mode {
195            self.reap_zombies()?;
196        }
197
198        // Start web server: CLI --web-port takes priority, then settings.web.auto_start + bind_port
199        let s = settings();
200        let effective_port = web_port.or_else(|| {
201            if s.web.auto_start {
202                match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
203                    Some(p) => Some(p),
204                    None => {
205                        error!(
206                            "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
207                            s.web.bind_port
208                        );
209                        None
210                    }
211                }
212            } else {
213                None
214            }
215        });
216        // CLI --web-path takes priority, then settings.web.base_path
217        let effective_path = web_path.or_else(|| {
218            let bp = s.web.base_path.clone();
219            if bp.is_empty() { None } else { Some(bp) }
220        });
221        if let Some(port) = effective_port {
222            tokio::spawn(async move {
223                if let Err(e) = crate::web::serve(port, effective_path).await {
224                    error!("Web server error: {e}");
225                }
226            });
227        }
228
229        // Start reverse proxy server if enabled
230        if s.proxy.enable {
231            // Pre-generate the TLS certificate synchronously before spawning the proxy
232            // task. This ensures the cert exists immediately after `sup start` returns,
233            // so `proxy trust` can be run right away without waiting for the async task.
234            #[cfg(feature = "proxy-tls")]
235            if s.proxy.https {
236                let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
237                let ca_cert_path = proxy_dir.join("ca.pem");
238                let ca_key_path = proxy_dir.join("ca-key.pem");
239                if !ca_cert_path.exists() || !ca_key_path.exists() {
240                    match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
241                        Ok(()) => {
242                            info!(
243                                "Generated local CA certificate at {}",
244                                ca_cert_path.display()
245                            );
246                            info!("To trust the CA in your browser, run: pitchfork proxy trust");
247                        }
248                        Err(e) => {
249                            error!("Failed to generate CA certificate: {e}");
250                        }
251                    }
252                }
253            }
254            // Spawn the proxy server and wait for its bind result via a oneshot
255            // channel.  This avoids the TOCTOU race of a pre-flight bind check
256            // while still surfacing binding failures immediately.
257            let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
258            let proxy_cancel = tokio_util::sync::CancellationToken::new();
259            let proxy_cancel_clone = proxy_cancel.clone();
260            *self.proxy_cancel.lock().await = Some(proxy_cancel);
261            let proxy_task = tokio::spawn(async move {
262                if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
263                    error!("Proxy server error: {e}");
264                }
265            });
266            *self.proxy_task.lock().await = Some(proxy_task);
267            match bind_rx.await {
268                Ok(Ok(())) => {
269                    // Proxy bound successfully — nothing to do.
270                }
271                Ok(Err(msg)) => {
272                    error!("{msg}");
273                    self.add_notification(log::LevelFilter::Error, msg).await;
274                }
275                Err(_) => {
276                    // Sender dropped without sending — serve() panicked or
277                    // returned before signalling.  Already logged by the
278                    // spawn error handler above.
279                }
280            }
281        }
282
283        let (ipc, ipc_handle) = IpcServer::new()?;
284        *self.ipc_shutdown.lock().await = Some(ipc_handle);
285        self.conn_watch(ipc).await
286    }
287
288    pub(crate) async fn refresh(&self) -> Result<()> {
289        trace!("refreshing");
290
291        // Collect PIDs we need to check (shell PIDs only)
292        // This is more efficient than refreshing all processes on the system
293        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
294        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
295
296        if pids_to_check.is_empty() {
297            // No PIDs to check, skip the expensive refresh
298            trace!("no shell PIDs to check, skipping process refresh");
299        } else {
300            PROCS.refresh_pids(&pids_to_check);
301        }
302
303        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
304        *last_refreshed_at = time::Instant::now();
305
306        for (dir, pids) in dirs_with_pids {
307            let to_remove = pids
308                .iter()
309                .filter(|pid| !PROCS.is_running(**pid))
310                .collect::<Vec<_>>();
311            for pid in &to_remove {
312                self.remove_shell_pid(**pid).await?
313            }
314            if to_remove.len() == pids.len() {
315                self.leave_dir(&dir).await?;
316            }
317        }
318
319        self.check_retry().await?;
320        self.process_pending_autostops().await?;
321
322        Ok(())
323    }
324
325    /// Install a SIGCHLD handler that reaps orphaned zombie child processes.
326    ///
327    /// When running as PID 1 inside a container, orphaned processes are
328    /// re-parented to PID 1. Without explicit reaping, they accumulate
329    /// as zombies in the process table indefinitely.
330    ///
331    /// Only reaps processes that are NOT managed by the supervisor (i.e.
332    /// not tracked in the state file). Managed daemon processes are reaped
333    /// by their monitoring tasks via `child.wait()`.
334    ///
335    /// ## Strategy
336    ///
337    /// **Linux**: Uses `waitid(Id::All, WNOHANG | WNOWAIT | WEXITED)` to
338    /// *peek* at the next zombie without consuming its status. If the PID
339    /// belongs to a managed daemon, the reaper skips it so Tokio's
340    /// `child.wait()` can collect the status normally. Only unmanaged
341    /// orphans are actually reaped (via `waitpid(Pid, WNOHANG)`). This
342    /// eliminates the race entirely.
343    ///
344    /// **Non-Linux Unix** (e.g. macOS — mainly for local development;
345    /// container mode targets Linux): `waitid` is unavailable, so we fall
346    /// back to `waitpid(None, WNOHANG)`. If the reaper accidentally
347    /// consumes a managed PID's status, it stashes the exit code in
348    /// [`REAPED_STATUSES`] for the monitoring task to recover.
349    #[cfg(unix)]
350    fn reap_zombies(&self) -> Result<()> {
351        let mut stream = signal::unix::signal(SignalKind::child())
352            .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
353        tokio::spawn(async move {
354            loop {
355                stream.recv().await;
356                // Collect PIDs of managed daemons so we don't steal their exit status
357                let managed_pids: HashSet<u32> = SUPERVISOR
358                    .state_file
359                    .lock()
360                    .await
361                    .daemons
362                    .values()
363                    .filter_map(|d| d.pid)
364                    .collect();
365                // Reap all available zombie children that are NOT managed
366                Self::reap_unmanaged_zombies(&managed_pids).await;
367            }
368        });
369        info!("container mode: SIGCHLD zombie reaper installed");
370        Ok(())
371    }
372
373    /// Linux implementation: peek with `waitid(WNOWAIT)` then selectively reap.
374    ///
375    /// `WNOWAIT` leaves the zombie in the table so we can inspect its PID
376    /// without consuming the exit status. Only if the PID is *not* managed
377    /// do we call `waitpid(Pid, WNOHANG)` to actually reap it.
378    #[cfg(target_os = "linux")]
379    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
380        use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
381        use nix::unistd::Pid;
382
383        loop {
384            // Peek at the next zombie without consuming it
385            let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
386            match waitid(Id::All, peek_flags) {
387                Ok(WaitStatus::StillAlive) => break,
388                Ok(status) => {
389                    let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
390                        break;
391                    };
392                    if managed_pids.contains(&pid_raw) {
393                        // This is a managed daemon — leave it for Tokio's child.wait().
394                        // We must break out of the loop because waitid(Id::All) would
395                        // keep returning the same zombie if we don't consume it.
396                        trace!(
397                            "zombie reaper: skipping managed daemon pid {pid_raw}, \
398                             leaving for Tokio to reap"
399                        );
400                        break;
401                    }
402                    // Not managed — actually reap it
403                    match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
404                        Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
405                        Err(nix::errno::Errno::ECHILD) => break,
406                        Err(e) => {
407                            trace!("waitpid error reaping pid {pid_raw}: {e}");
408                            break;
409                        }
410                    }
411                }
412                Err(nix::errno::Errno::ECHILD) => break, // no children at all
413                Err(e) => {
414                    trace!("waitid error in zombie reaper: {e}");
415                    break;
416                }
417            }
418        }
419    }
420
421    /// Non-Linux fallback: blind `waitpid(None, WNOHANG)` with stash recovery.
422    ///
423    /// Since `waitid(WNOWAIT)` is not available, we cannot peek. If we
424    /// accidentally reap a managed PID, we stash the exit code in
425    /// [`REAPED_STATUSES`] so the monitoring task can recover it.
426    #[cfg(all(unix, not(target_os = "linux")))]
427    async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
428        use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
429
430        loop {
431            match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
432                Ok(WaitStatus::StillAlive) => break,
433                Ok(status) => {
434                    let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
435                        continue;
436                    };
437                    if managed_pids.contains(&pid) {
438                        // Race lost — stash the exit code for lifecycle recovery
439                        let exit_code = match status {
440                            WaitStatus::Exited(_, code) => code,
441                            WaitStatus::Signaled(_, sig, _) => -(sig as i32),
442                            _ => -1,
443                        };
444                        warn!(
445                            "zombie reaper reaped managed daemon pid {pid} \
446                             (exit_code={exit_code}); stashing status for recovery"
447                        );
448                        REAPED_STATUSES.lock().await.insert(pid, exit_code);
449                    } else {
450                        trace!("reaped orphaned zombie child: {status:?}");
451                    }
452                }
453                Err(nix::errno::Errno::ECHILD) => break, // no more children
454                Err(e) => {
455                    trace!("waitpid error in zombie reaper: {e}");
456                    break;
457                }
458            }
459        }
460    }
461
462    #[cfg(unix)]
463    fn signals(&self) -> Result<()> {
464        let signals = [
465            SignalKind::terminate(),
466            SignalKind::alarm(),
467            SignalKind::interrupt(),
468            SignalKind::quit(),
469            SignalKind::hangup(),
470            SignalKind::user_defined1(),
471            SignalKind::user_defined2(),
472        ];
473        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
474        for signal in signals {
475            let stream = match signal::unix::signal(signal) {
476                Ok(s) => s,
477                Err(e) => {
478                    warn!("Failed to register signal handler for {signal:?}: {e}");
479                    continue;
480                }
481            };
482            tokio::spawn(async move {
483                let mut stream = stream;
484                loop {
485                    stream.recv().await;
486                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
487                        exit(1);
488                    } else {
489                        SUPERVISOR.handle_signal().await;
490                    }
491                }
492            });
493        }
494        Ok(())
495    }
496
497    #[cfg(windows)]
498    fn signals(&self) -> Result<()> {
499        tokio::spawn(async move {
500            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
501            loop {
502                if let Err(e) = signal::ctrl_c().await {
503                    error!("Failed to wait for ctrl-c: {}", e);
504                    return;
505                }
506                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
507                    exit(1);
508                } else {
509                    SUPERVISOR.handle_signal().await;
510                }
511            }
512        });
513        Ok(())
514    }
515
516    async fn handle_signal(&self) {
517        info!("received signal, stopping");
518        self.close().await;
519        exit(0)
520    }
521
522    pub(crate) async fn close(&self) {
523        // Signal the proxy server to stop accepting new connections
524        // and drain in-flight ones, *before* stopping daemons so the
525        // proxy has time to finish forwarding active requests.
526        if let Some(cancel) = self.proxy_cancel.lock().await.take() {
527            cancel.cancel();
528        }
529
530        if let Some(proxy_task) = self.proxy_task.lock().await.take() {
531            let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
532        }
533
534        // Clean up /etc/hosts entries managed by pitchfork
535        let s = settings();
536        if s.proxy.enable && s.proxy.sync_hosts {
537            crate::proxy::hosts::clean_hosts_file();
538        }
539
540        let pitchfork_id = DaemonId::pitchfork();
541        let active = self.active_daemons().await;
542        let active_ids: Vec<DaemonId> = active
543            .iter()
544            .filter(|d| d.id != pitchfork_id)
545            .map(|d| d.id.clone())
546            .collect();
547
548        // Stop daemons in reverse dependency order.
549        // If dependency resolution fails (e.g. config changed), fall back to
550        // stopping in arbitrary order so we still shut down cleanly.
551        // Daemons within the same level are stopped concurrently.
552        let stop_levels = compute_reverse_stop_order(&active_ids);
553        for level in &stop_levels {
554            let mut tasks = Vec::new();
555            for id in level {
556                let id = id.clone();
557                tasks.push(tokio::spawn(async move {
558                    if let Err(err) = SUPERVISOR.stop(&id).await {
559                        error!("failed to stop daemon {id}: {err}");
560                    }
561                }));
562            }
563            for task in tasks {
564                let _ = task.await;
565            }
566        }
567        let _ = self.remove_daemon(&pitchfork_id).await;
568
569        // Signal IPC server to shut down gracefully
570        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
571            handle.shutdown();
572        }
573
574        // Wait for all in-flight monitoring tasks to finish registering their
575        // hook handles. Each monitoring task increments `active_monitors` when
576        // its process exits, and decrements it (+ notifies `monitor_done`)
577        // after all fire_hook() calls complete. This replaces the old
578        // yield_now() approach which had a race window.
579        let drain_timeout = time::sleep(Duration::from_secs(5));
580        tokio::pin!(drain_timeout);
581        loop {
582            if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
583                break;
584            }
585            tokio::select! {
586                _ = self.monitor_done.notified() => {}
587                _ = &mut drain_timeout => {
588                    warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
589                    break;
590                }
591            }
592        }
593        let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
594        let hook_timeout = Duration::from_secs(30);
595        for handle in handles {
596            match time::timeout(hook_timeout, handle).await {
597                Ok(_) => {} // Hook completed (success or error, doesn't matter)
598                Err(_) => {
599                    warn!(
600                        "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
601                    );
602                }
603            }
604        }
605
606        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
607    }
608
609    pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
610        self.pending_notifications
611            .lock()
612            .await
613            .push((level, message));
614    }
615}
616
617/// Fix ownership on the state directory so non-root users can access files
618/// created by a `sudo`-started supervisor.
619///
620/// When `[settings.supervisor] user` or `SUDO_UID`/`SUDO_GID` are set, we
621/// `chown` the state directory and safe subdirectories back to that non-root
622/// runtime user. This is strictly better than `chmod 0o666` because it does not
623/// widen the permission bits — the files stay owner-only (0o600/0o700) but the
624/// *owner* is the user that daemon processes and CLI clients need to share.
625///
626/// **Security**: The `proxy/` subtree is intentionally skipped. It contains
627/// `ca-key.pem` which must remain `0o600` and owned by the process that
628/// generated it. Changing its ownership or permissions would expose the CA
629/// private key to other local users.
630///
631/// If neither `user` nor `SUDO_UID`/`SUDO_GID` are available (e.g. direct
632/// root login), we fall back to relaxing permissions on only the `sock/` and
633/// `logs/` subdirectories (plus `state.toml`) so CLI clients can still function.
634#[cfg(unix)]
635fn fix_state_dir_permissions() {
636    let state_dir = &*env::PITCHFORK_STATE_DIR;
637    if let Some((uid, gid)) = state_owner_ids() {
638        if !state_dir.exists()
639            && let Err(err) = fs::create_dir_all(state_dir)
640        {
641            warn!(
642                "failed to create state directory for ownership fix at {}: {err}",
643                state_dir.display()
644            );
645            return;
646        }
647
648        // Best path: chown back to the runtime user. Permissions stay tight.
649        chown_recursive(state_dir, uid, gid, true);
650        debug!(
651            "chowned state directory to uid={uid} gid={gid} at {}",
652            state_dir.display()
653        );
654    } else {
655        if !state_dir.exists() {
656            return;
657        }
658
659        // Fallback: relax permissions on safe subdirectories only.
660        // proxy/ is never touched.
661        chmod_safe_subtrees(state_dir);
662        debug!(
663            "relaxed permissions on safe subtrees at {}",
664            state_dir.display()
665        );
666    }
667}
668
669#[cfg(unix)]
670pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
671    if !nix::unistd::Uid::effective().is_root() {
672        return None;
673    }
674
675    let user = settings().supervisor.user.trim();
676    if !user.is_empty() {
677        return resolve_supervisor_user_ids(user).or_else(|| {
678            warn!(
679                "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
680            );
681            parse_sudo_ids()
682        });
683    }
684
685    parse_sudo_ids()
686}
687
688#[cfg(unix)]
689fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
690    let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
691        let uid = user.parse::<u32>().ok()?;
692        nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
693            .ok()
694            .flatten()
695    } else {
696        nix::unistd::User::from_name(user).ok().flatten()
697    }?;
698
699    Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
700}
701
702/// Parse `SUDO_UID` and `SUDO_GID` environment variables into numeric IDs.
703///
704/// Returns `None` unless the effective UID is 0 (root). This prevents stale
705/// `SUDO_UID`/`SUDO_GID` values inherited into non-sudo environments from
706/// triggering incorrect `chown` operations.
707#[cfg(unix)]
708fn parse_sudo_ids() -> Option<(u32, u32)> {
709    if !nix::unistd::Uid::effective().is_root() {
710        return None;
711    }
712    let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
713    let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
714    Some((uid, gid))
715}
716
717/// Recursively `chown` a directory tree. If `skip_proxy` is true, the `proxy/`
718/// subdirectory is skipped entirely to protect the CA private key.
719#[cfg(unix)]
720fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
721    // chown the directory itself
722    let _ = chown_path(dir, uid, gid);
723
724    let entries = match std::fs::read_dir(dir) {
725        Ok(e) => e,
726        Err(_) => return,
727    };
728    for entry in entries.flatten() {
729        let path = entry.path();
730        if path.is_dir() {
731            // Skip proxy/ at the top level of the state directory
732            if skip_proxy {
733                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
734                    if name == "proxy" {
735                        continue;
736                    }
737                }
738            }
739            chown_recursive(&path, uid, gid, false);
740        } else {
741            let _ = chown_path(&path, uid, gid);
742        }
743    }
744}
745
746/// `chown` a single path using libc. Returns Ok(()) on success.
747#[cfg(unix)]
748fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
749    use std::ffi::CString;
750    use std::os::unix::ffi::OsStrExt;
751    let c_path = CString::new(path.as_os_str().as_bytes())
752        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
753    let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
754    if ret == 0 {
755        Ok(())
756    } else {
757        Err(std::io::Error::last_os_error())
758    }
759}
760
761/// Fallback: relax permissions on safe subdirectories only (sock/, logs/, and
762/// state.toml). The proxy/ subtree is never touched.
763#[cfg(unix)]
764fn chmod_safe_subtrees(state_dir: &std::path::Path) {
765    // The state directory itself needs to be traversable
766    let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
767
768    // state.toml — needs to be readable by CLI clients
769    let state_file = state_dir.join("state.toml");
770    if state_file.exists() {
771        let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
772    }
773
774    // Safe subdirectories: sock/ and logs/
775    for subdir_name in &["sock", "logs"] {
776        let subdir = state_dir.join(subdir_name);
777        if subdir.is_dir() {
778            chmod_recursive(&subdir);
779        }
780    }
781}
782
783/// Recursively chmod: directories → 0o755, files → 0o644.
784#[cfg(unix)]
785fn chmod_recursive(dir: &std::path::Path) {
786    let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
787    let entries = match fs::read_dir(dir) {
788        Ok(e) => e,
789        Err(_) => return,
790    };
791    for entry in entries.flatten() {
792        let path = entry.path();
793        if path.is_dir() {
794            chmod_recursive(&path);
795        } else {
796            let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
797        }
798    }
799}