Skip to main content

pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::log_store::LogStore;
13use crate::log_store::sqlite::LOG_STORE;
14use crate::procs::PROCS;
15use crate::settings::settings;
16use crate::shell::Shell;
17use crate::supervisor::state::UpsertDaemonOpts;
18use crate::{Result, env};
19use miette::IntoDiagnostic;
20use once_cell::sync::Lazy;
21use regex::Regex;
22use std::collections::HashMap;
23#[cfg(unix)]
24use std::ffi::CString;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::AsyncBufReadExt;
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
33static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39    Inherit,
40    Switch {
41        uid: nix::unistd::Uid,
42        gid: nix::unistd::Gid,
43        username: Option<CString>,
44    },
45}
46
47/// Get or compile a regex pattern, caching the result for future use
48pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50    if let Some(re) = cache.get(pattern) {
51        return Some(re.clone());
52    }
53    match Regex::new(pattern) {
54        Ok(re) => {
55            cache.insert(pattern.to_string(), re.clone());
56            Some(re)
57        }
58        Err(e) => {
59            error!("invalid regex pattern '{pattern}': {e}");
60            None
61        }
62    }
63}
64
65impl Supervisor {
66    /// Run a daemon, handling retries if configured
67    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68        let id = &opts.id;
69        let cmd = opts.cmd.clone();
70
71        // Clear any pending autostop for this daemon since it's being started
72        {
73            let mut pending = self.pending_autostops.lock().await;
74            if pending.remove(id).is_some() {
75                info!("cleared pending autostop for {id} (daemon starting)");
76            }
77        }
78
79        let daemon = self.get_daemon(id).await;
80        if let Some(daemon) = daemon {
81            // Stopping state is treated as "not running" - the monitoring task will clean it up
82            // Only check for Running state with a valid PID
83            if !daemon.status.is_stopping()
84                && !daemon.status.is_stopped()
85                && let Some(pid) = daemon.pid
86            {
87                if opts.force {
88                    self.stop(id).await?;
89                    info!("run: stop completed for daemon {id}");
90                } else {
91                    warn!("daemon {id} already running with pid {pid}");
92                    return Ok(IpcResponse::DaemonAlreadyRunning);
93                }
94            }
95        }
96
97        // If wait_ready is true and retry is configured, implement retry loop
98        if opts.wait_ready && opts.retry.count() > 0 {
99            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
100            let max_attempts = opts.retry.count().saturating_add(1);
101            for attempt in 0..max_attempts {
102                let mut retry_opts = opts.clone();
103                retry_opts.retry_count = attempt;
104                retry_opts.cmd = cmd.clone();
105
106                let result = self.run_once(retry_opts).await?;
107
108                match result {
109                    IpcResponse::DaemonReady { daemon } => {
110                        return Ok(IpcResponse::DaemonReady { daemon });
111                    }
112                    IpcResponse::DaemonFailedWithCode { exit_code } => {
113                        if attempt < opts.retry.count() {
114                            let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115                            info!(
116                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
117                                attempt + 1,
118                                max_attempts,
119                                backoff_secs
120                            );
121                            fire_hook(
122                                HookType::OnRetry,
123                                id.clone(),
124                                opts.dir.0.clone(),
125                                attempt + 1,
126                                opts.env.clone(),
127                                vec![],
128                            )
129                            .await;
130                            time::sleep(Duration::from_secs(backoff_secs)).await;
131                            continue;
132                        } else {
133                            info!("daemon {id} failed after {max_attempts} attempts");
134                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135                        }
136                    }
137                    other => return Ok(other),
138                }
139            }
140        }
141
142        // No retry or wait_ready is false
143        self.run_once(opts).await
144    }
145
146    /// Run a daemon once (single attempt)
147    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148        let id = &opts.id;
149        let original_cmd = opts.cmd.clone(); // Save original command for persistence
150
151        // Create channel for readiness notification if wait_ready is true
152        let (ready_tx, ready_rx) = if opts.wait_ready {
153            let (tx, rx) = oneshot::channel();
154            (Some(tx), Some(rx))
155        } else {
156            (None, None)
157        };
158
159        // Check port availability and apply auto-bump if configured
160        let expected_ports = opts
161            .port
162            .as_ref()
163            .map(|p| p.expect.clone())
164            .unwrap_or_default();
165        let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
166            let port_cfg = opts.port.as_ref().unwrap();
167            match check_ports_available(
168                &expected_ports,
169                port_cfg.auto_bump(),
170                port_cfg.max_bump_attempts(),
171            )
172            .await
173            {
174                Ok(resolved) => {
175                    let ready_port = if let Some(configured_port) = opts.ready_port {
176                        // If ready_port matches one of the expected ports, apply the same bump offset
177                        let bump_offset = resolved
178                            .first()
179                            .unwrap_or(&0)
180                            .saturating_sub(*expected_ports.first().unwrap_or(&0));
181                        if expected_ports.contains(&configured_port) && bump_offset > 0 {
182                            configured_port
183                                .checked_add(bump_offset)
184                                .or(Some(configured_port))
185                        } else {
186                            Some(configured_port)
187                        }
188                    } else if opts.ready_output.is_none()
189                        && opts.ready_http.is_none()
190                        && opts.ready_cmd.is_none()
191                        && opts.ready_delay.is_none()
192                    {
193                        // No other ready check configured — use the first expected port as a
194                        // TCP port readiness check so the daemon is considered ready once it
195                        // starts listening.  Skip port 0 (ephemeral port request).
196                        resolved.first().copied().filter(|&p| p != 0)
197                    } else {
198                        // Another ready check is configured (output/http/cmd/delay).
199                        // Don't add an implicit TCP port check — it could race and fire
200                        // before the daemon has produced any output.
201                        None
202                    };
203                    info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
204                    (resolved, ready_port)
205                }
206                Err(e) => {
207                    error!("daemon {id}: port check failed: {e}");
208                    // Convert PortError to structured IPC response
209                    if let Some(port_error) = e.downcast_ref::<PortError>() {
210                        match port_error {
211                            PortError::InUse { port, process, pid } => {
212                                return Ok(IpcResponse::PortConflict {
213                                    port: *port,
214                                    process: process.clone(),
215                                    pid: *pid,
216                                });
217                            }
218                            PortError::NoAvailablePort {
219                                start_port,
220                                attempts,
221                            } => {
222                                return Ok(IpcResponse::NoAvailablePort {
223                                    start_port: *start_port,
224                                    attempts: *attempts,
225                                });
226                            }
227                        }
228                    }
229                    return Ok(IpcResponse::DaemonFailed {
230                        error: e.to_string(),
231                    });
232                }
233            }
234        } else {
235            // When ready_port is set without expected_port, check that the port
236            // is not already occupied.  If another process is listening on it,
237            // the TCP readiness probe would immediately succeed and pitchfork
238            // would falsely consider the daemon ready — routing proxy traffic to
239            // the wrong process.
240            if let Some(port) = opts.ready_port {
241                if port > 0 {
242                    if let Some((pid, process)) = detect_port_conflict(port).await {
243                        return Ok(IpcResponse::PortConflict { port, process, pid });
244                    }
245                }
246            }
247            (Vec::new(), opts.ready_port)
248        };
249
250        // Parse the configured shell (default "sh -c") into program + args.
251        // The run script is passed verbatim as the final argument, avoiding the
252        // lossy split->join round-trip that previously mangled $VAR/glob expansion.
253        let shell_setting = settings().general.shell.clone();
254        let shell_parts = match shell_words::split(&shell_setting) {
255            Ok(parts) if !parts.is_empty() => parts,
256            Ok(_) => {
257                return Ok(IpcResponse::DaemonFailed {
258                    error: "general.shell setting is empty".to_string(),
259                });
260            }
261            Err(e) => {
262                return Ok(IpcResponse::DaemonFailed {
263                    error: format!("failed to parse general.shell setting {shell_setting:?}: {e}"),
264                });
265            }
266        };
267        let (shell_program, shell_args) = shell_parts.split_first().unwrap();
268
269        // Use the original run string verbatim; fall back to joining cmd for
270        // ad-hoc commands (e.g. `pitchfork run -- cmd args`) that have no run string.
271        // We don't prepend `exec` because it breaks compound commands (e.g. `exec a && b`
272        // silently drops `b`). Users can add `exec` themselves in the run string.
273        let run_script = opts
274            .run
275            .clone()
276            .unwrap_or_else(|| shell_words::join(&original_cmd));
277
278        let (program, args) = if opts.mise.unwrap_or(settings().general.mise) {
279            match settings().resolve_mise_bin() {
280                Some(mise_bin) => {
281                    let mise_bin_str = mise_bin.to_string_lossy().to_string();
282                    info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
283                    let mut args = vec!["x".to_string(), "--".to_string()];
284                    args.push(shell_program.clone());
285                    args.extend(shell_args.iter().cloned());
286                    args.push(run_script);
287                    (mise_bin_str, args)
288                }
289                None => {
290                    warn!("daemon {id}: mise=true but mise binary not found, running without mise");
291                    let mut args: Vec<String> = shell_args.to_vec();
292                    args.push(run_script);
293                    (shell_program.clone(), args)
294                }
295            }
296        } else {
297            let mut args: Vec<String> = shell_args.to_vec();
298            args.push(run_script);
299            (shell_program.clone(), args)
300        };
301        #[cfg(unix)]
302        let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
303            Ok(identity) => identity,
304            Err(e) => {
305                return Ok(IpcResponse::DaemonFailed {
306                    error: e.to_string(),
307                });
308            }
309        };
310        info!("run: spawning daemon {id} with {program} {args:?}");
311
312        // Allocate PTY if configured
313        #[cfg(unix)]
314        let pty_pair = if opts.pty.unwrap_or(false) {
315            match super::pty::openpty() {
316                Ok(pair) => {
317                    info!("daemon {id}: allocated PTY (pty = true)");
318                    Some(pair)
319                }
320                Err(e) => {
321                    warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
322                    None
323                }
324            }
325        } else {
326            None
327        };
328
329        let mut cmd = tokio::process::Command::new(&program);
330
331        #[cfg(unix)]
332        if let Some(ref pair) = pty_pair {
333            // PTY mode: connect both stdout and stderr to the slave PTY.
334            // The child uses the slave for stdin/stdout/stderr, and we read
335            // output from the master.
336            let slave_file = std::fs::File::from(
337                pair.slave
338                    .try_clone()
339                    .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
340            );
341            cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
342                |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
343            )?));
344            cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
345                |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
346            )?));
347            cmd.stderr(std::process::Stdio::from(slave_file));
348        } else {
349            cmd.stdout(std::process::Stdio::piped())
350                .stderr(std::process::Stdio::piped());
351        }
352
353        #[cfg(not(unix))]
354        {
355            cmd.stdout(std::process::Stdio::piped())
356                .stderr(std::process::Stdio::piped());
357        }
358
359        cmd.args(&args).current_dir(&opts.dir);
360
361        #[cfg(unix)]
362        if pty_pair.is_none() {
363            cmd.stdin(std::process::Stdio::null());
364        }
365
366        #[cfg(not(unix))]
367        cmd.stdin(std::process::Stdio::null());
368
369        // Ensure daemon can find user tools by using the original PATH
370        if let Some(ref path) = *env::ORIGINAL_PATH {
371            cmd.env("PATH", path);
372        }
373
374        // Apply custom environment variables from config
375        if let Some(ref env_vars) = opts.env {
376            cmd.envs(env_vars);
377        }
378
379        // Inject pitchfork metadata env vars AFTER user env so they can't be overwritten
380        cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
381        cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
382        cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
383
384        // Inject the resolved ports for the daemon to use
385        if !resolved_ports.is_empty() {
386            // Set PORT to the first port for backward compatibility
387            // When there's only one port, both PORT and PORT0 will be set to the same value.
388            // This follows the convention used by many deployment platforms (Heroku, etc.).
389            cmd.env("PORT", resolved_ports[0].to_string());
390            // Set individual ports as PORT0, PORT1, etc.
391            for (i, port) in resolved_ports.iter().enumerate() {
392                cmd.env(format!("PORT{i}"), port.to_string());
393            }
394        }
395
396        // Inject proxy-related environment variables
397        inject_proxy_env(&mut cmd, &opts.slug);
398
399        #[cfg(unix)]
400        {
401            let run_identity = run_identity.clone();
402            let use_pty = pty_pair.is_some();
403            unsafe {
404                cmd.pre_exec(move || {
405                    nix::unistd::setsid().map_err(nix_to_io_error)?;
406
407                    // When using a PTY, set the slave as the controlling terminal.
408                    // The slave FD has already been dup'd onto stdin/stdout/stderr
409                    // by tokio, so we can use stdin (fd 0) for TIOCSCTTY.
410                    if use_pty {
411                        let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
412                        if ret < 0 {
413                            // Non-fatal: the process can still run without
414                            // a controlling terminal.
415                            #[cfg(target_os = "linux")]
416                            eprintln!(
417                                "pitchfork: TIOCSCTTY failed: {}",
418                                std::io::Error::last_os_error()
419                            );
420                        }
421                    }
422
423                    apply_run_identity(&run_identity)?;
424                    Ok(())
425                });
426            }
427        }
428
429        let mut child = cmd.spawn().into_diagnostic()?;
430        let pid = match child.id() {
431            Some(p) => p,
432            None => {
433                warn!("Daemon {id} exited before PID could be captured");
434                return Ok(IpcResponse::DaemonFailed {
435                    error: "Process exited immediately".to_string(),
436                });
437            }
438        };
439        info!("started daemon {id} with pid {pid}");
440        PROCS.refresh_pids(&[pid]);
441        let daemon = self
442            .upsert_daemon(
443                UpsertDaemonOpts::builder(id.clone())
444                    .set(|o| {
445                        o.pid = Some(pid);
446                        o.status = DaemonStatus::Running;
447                        o.shell_pid = opts.shell_pid;
448                        o.dir = Some(opts.dir.0.clone());
449                        o.cmd = Some(original_cmd);
450                        o.run = opts.run.clone();
451                        o.autostop = opts.autostop;
452                        o.cron_schedule = opts.cron_schedule.clone();
453                        o.cron_retrigger = opts.cron_retrigger;
454                        o.cron_immediate = opts.cron_immediate;
455                        o.retry = Some(opts.retry);
456                        o.retry_count = Some(opts.retry_count);
457                        o.ready_delay = opts.ready_delay;
458                        o.ready_output = opts.ready_output.clone();
459                        o.ready_http = opts.ready_http.clone();
460                        o.ready_port = effective_ready_port;
461                        o.ready_cmd = opts.ready_cmd.clone();
462                        o.port = crate::config_types::PortConfig::from_parts(
463                            expected_ports,
464                            opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
465                        );
466                        o.resolved_port = resolved_ports;
467                        o.depends = Some(opts.depends.clone());
468                        o.env = opts.env.clone();
469                        o.watch = Some(opts.watch.clone());
470                        o.watch_mode = Some(opts.watch_mode);
471                        o.watch_base_dir = opts.watch_base_dir.clone();
472                        o.mise = opts.mise;
473                        o.user = opts.user.clone();
474                        o.memory_limit = opts.memory_limit;
475                        o.cpu_limit = opts.cpu_limit;
476                        o.stop_signal = opts.stop_signal;
477                        o.pty = opts.pty;
478                    })
479                    .build(),
480            )
481            .await?;
482
483        let id_clone = id.clone();
484        let ready_delay = opts.ready_delay;
485        let ready_output = opts.ready_output.clone();
486        let ready_http = opts.ready_http.clone();
487        let ready_port = effective_ready_port;
488        let ready_cmd = opts.ready_cmd.clone();
489        let daemon_dir = opts.dir.0.clone();
490        let hook_retry_count = opts.retry_count;
491        let hook_retry = opts.retry;
492        let hook_daemon_env = opts.env.clone();
493        let on_output_hook = opts.on_output_hook.clone();
494        // Whether this daemon has any port-related config — used to skip the
495        // active_port detection task for daemons that never bind a port (e.g. `sleep 60`).
496        // When the proxy is enabled, only detect active_port for daemons that are
497        // actually referenced by a registered slug, rather than blanket-polling every
498        // daemon (which wastes ~7.5 s of listeners::get_all() calls per port-less daemon).
499        let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
500            || (settings().proxy.enable && is_daemon_slug_target(id));
501        let daemon_pid = pid;
502
503        // Prepare output readers before spawning the monitoring task.
504        // In PTY mode, we read from the PTY master FD.
505        // In pipe mode, we read from separate stdout/stderr pipes.
506        #[cfg(unix)]
507        let pty_reader = pty_pair.map(|p| {
508            tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
509                .lines()
510        });
511        #[cfg(not(unix))]
512        let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
513        let stdout_reader = if pty_reader.is_none() {
514            child
515                .stdout
516                .take()
517                .map(|s| tokio::io::BufReader::new(s).lines())
518        } else {
519            None
520        };
521        let stderr_reader = if pty_reader.is_none() {
522            child
523                .stderr
524                .take()
525                .map(|s| tokio::io::BufReader::new(s).lines())
526        } else {
527            None
528        };
529
530        if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
531            error!("Failed to capture stdout/stderr for daemon {id}");
532        }
533
534        tokio::spawn(async move {
535            let id = id_clone;
536
537            // Merge all output sources (PTY master OR stdout+stderr) into a single channel.
538            let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
539
540            if let Some(mut reader) = pty_reader {
541                // PTY mode: single merged stream from the master.
542                // output_tx is moved into the spawn; when the reader ends the
543                // channel closes automatically.
544                tokio::spawn(async move {
545                    while let Ok(Some(mut line)) = reader.next_line().await {
546                        // PTY slave uses ONLCR: \n → \r\n; strip the trailing \r.
547                        if line.ends_with('\r') {
548                            line.pop();
549                        }
550                        if output_tx.send(line).await.is_err() {
551                            break;
552                        }
553                    }
554                });
555            } else {
556                // Pipe mode: stdout and stderr are merged into the same channel.
557                // Both `ready_output` and `on_output_hook` patterns match against
558                // lines from either stream, which is the expected behavior (a
559                // "server ready" message may appear on stderr in some tools).
560                if let Some(mut stdout) = stdout_reader {
561                    let tx = output_tx.clone();
562                    tokio::spawn(async move {
563                        while let Ok(Some(line)) = stdout.next_line().await {
564                            if tx.send(line).await.is_err() {
565                                break;
566                            }
567                        }
568                    });
569                }
570                if let Some(mut stderr) = stderr_reader {
571                    let tx = output_tx.clone();
572                    tokio::spawn(async move {
573                        while let Ok(Some(line)) = stderr.next_line().await {
574                            if tx.send(line).await.is_err() {
575                                break;
576                            }
577                        }
578                    });
579                }
580                // Drop the last sender so the channel closes when all readers finish.
581                drop(output_tx);
582            }
583            let log_store = &*LOG_STORE;
584            let format_line = |line: String| line;
585
586            // SQLite WAL mode provides automatic durability; no explicit flush needed.
587
588            // Setup readiness checking
589            let mut ready_notified = false;
590            let mut ready_tx = ready_tx;
591            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
592            // Track whether we've already spawned the active_port detection task
593            let mut active_port_spawned = false;
594
595            // Validate on_output config early; discard the hook on any error so
596            // a bad regex does not silently fall through to the (None, None) => true
597            // match arm and fire on every line.
598            let on_output_hook = match on_output_hook {
599                Some(ref hook) => match hook.validate(id.name()) {
600                    Ok(()) => on_output_hook,
601                    Err(e) => {
602                        error!("{e}");
603                        None
604                    }
605                },
606                None => None,
607            };
608
609            // Compile the regex pattern after validation so we only attempt this
610            // when the hook is known-good (validate() already checked the syntax).
611            let on_output_pattern: Option<regex::Regex> = on_output_hook
612                .as_ref()
613                .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
614            let on_output_debounce = on_output_hook
615                .as_ref()
616                .map(|h| h.debounce_duration())
617                .unwrap_or(Duration::from_millis(1000));
618            // Last time the on_output hook fired; None means it has never fired.
619            let mut on_output_last_fired: Option<std::time::Instant> = None;
620
621            let mut delay_timer =
622                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
623
624            // Get settings for intervals
625            let s = settings();
626            let ready_check_interval = s.supervisor_ready_check_interval();
627            let http_client_timeout = s.supervisor_http_client_timeout();
628
629            // Setup HTTP readiness check interval
630            let mut http_check_interval = ready_http
631                .as_ref()
632                .map(|_| tokio::time::interval(ready_check_interval));
633            let http_client = ready_http.as_ref().map(|_| {
634                reqwest::Client::builder()
635                    .timeout(http_client_timeout)
636                    .build()
637                    .unwrap_or_default()
638            });
639
640            // Setup TCP port readiness check interval
641            let mut port_check_interval =
642                ready_port.map(|_| tokio::time::interval(ready_check_interval));
643
644            // Setup command readiness check interval
645            let mut cmd_check_interval = ready_cmd
646                .as_ref()
647                .map(|_| tokio::time::interval(ready_check_interval));
648
649            // Use a channel to communicate process exit status
650            let (exit_tx, mut exit_rx) =
651                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
652
653            // Spawn a task to wait for process exit
654            let child_pid = child.id().unwrap_or(0);
655            tokio::spawn(async move {
656                let result = child.wait().await;
657                // On non-Linux Unix (e.g. macOS) the zombie reaper may win the
658                // race and consume the exit status via waitpid(None, WNOHANG)
659                // before Tokio's child.wait() gets to it. When that happens,
660                // Tokio returns an ECHILD io::Error. We recover by checking
661                // REAPED_STATUSES for the stashed exit code.
662                //
663                // On Linux this is unnecessary because the reaper uses
664                // waitid(WNOWAIT) to peek before reaping, which avoids the
665                // race entirely.
666                #[cfg(all(unix, not(target_os = "linux")))]
667                let result = match &result {
668                    Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
669                        if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
670                            warn!(
671                                "daemon pid {child_pid} wait() got ECHILD; \
672                                 recovered exit code {code} from zombie reaper"
673                            );
674                            // Synthesize an ExitStatus from the stashed code.
675                            // On Unix we can use `ExitStatus::from_raw()` with
676                            // a wait-style status word (code << 8 for normal
677                            // exit, or raw signal number for signal death).
678                            use std::os::unix::process::ExitStatusExt;
679                            if code >= 0 {
680                                Ok(std::process::ExitStatus::from_raw(code << 8))
681                            } else {
682                                // Negative code means killed by signal (-sig)
683                                Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
684                            }
685                        } else {
686                            warn!(
687                                "daemon pid {child_pid} wait() got ECHILD but no \
688                                 stashed status found; reporting as error"
689                            );
690                            result
691                        }
692                    }
693                    _ => result,
694                };
695                debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
696                let _ = exit_tx.send(result).await;
697            });
698
699            #[allow(unused_assignments)]
700            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
701            let mut exit_status = None;
702
703            // If there is no ready check of any kind and no delay, the daemon is
704            // considered immediately ready and the active_port detection task would
705            // never be triggered inside the select loop.  Kick it off right away so
706            // that daemons without any readiness configuration still get their
707            // active_port populated (needed for proxy routing).
708            if has_port_config
709                && ready_pattern.is_none()
710                && ready_http.is_none()
711                && ready_port.is_none()
712                && ready_cmd.is_none()
713                && delay_timer.is_none()
714            {
715                active_port_spawned = true;
716                detect_and_store_active_port(id.clone(), daemon_pid);
717            }
718
719            loop {
720                select! {
721                    Some(line) = output_rx.recv() => {
722                        let formatted = format_line(line.clone());
723                        if let Err(e) = log_store.append(&id, &formatted) {
724                            error!("Failed to write to log for daemon {id}: {e}");
725                        }
726                        trace!("output: {id} {formatted}");
727
728                        // Strip ANSI for pattern matching so user-written patterns
729                        // work regardless of whether the process emits color codes.
730                        let line_clean = console::strip_ansi_codes(&line).to_string();
731
732                        // Check if output matches ready pattern
733                        if !ready_notified
734                            && let Some(ref pattern) = ready_pattern
735                            && pattern.is_match(&line_clean)
736                        {
737                            info!("daemon {id} ready: output matched pattern");
738                            ready_notified = true;
739                            if let Some(tx) = ready_tx.take() {
740                                let _ = tx.send(Ok(()));
741                            }
742                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
743                            if !active_port_spawned && has_port_config {
744                                active_port_spawned = true;
745                                detect_and_store_active_port(id.clone(), daemon_pid);
746                            }
747                        }
748
749                        // Check on_output hook
750                        if let Some(ref hook) = on_output_hook {
751                            let matched = match (&hook.filter, &on_output_pattern) {
752                                (Some(substr), _) => line_clean.contains(substr.as_str()),
753                                (None, Some(re)) => re.is_match(&line_clean),
754                                (None, None) => true,
755                            };
756                            if matched {
757                                let now = std::time::Instant::now();
758                                let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
759                                if elapsed.is_none_or(|e| e >= on_output_debounce) {
760                                    on_output_last_fired = Some(now);
761                                    hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
762                                }
763                            }
764                        }
765                    }
766                    Some(result) = exit_rx.recv() => {
767                        // Process exited - save exit status and notify if not ready yet
768                        exit_status = Some(result);
769                        debug!("daemon {id} process exited, exit_status: {exit_status:?}");
770                        if !ready_notified {
771                            if let Some(tx) = ready_tx.take() {
772                                // Check if process exited successfully
773                                let is_success = exit_status.as_ref()
774                                    .and_then(|r| r.as_ref().ok())
775                                    .map(|s| s.success())
776                                    .unwrap_or(false);
777
778                                if is_success {
779                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
780                                    let _ = tx.send(Ok(()));
781                                } else {
782                                    let exit_code = exit_status.as_ref()
783                                        .and_then(|r| r.as_ref().ok())
784                                        .and_then(|s| s.code());
785                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
786                                    let _ = tx.send(Err(exit_code));
787                                }
788                            }
789                        } else {
790                            debug!("daemon {id} was already marked ready, not sending notification");
791                        }
792                        break;
793                    },
794                    _ = async {
795                        if let Some(ref mut interval) = http_check_interval {
796                            interval.tick().await;
797                        } else {
798                            std::future::pending::<()>().await;
799                        }
800                    }, if !ready_notified && ready_http.is_some() => {
801                        if let (Some(http), Some(client)) = (&ready_http, &http_client) {
802                            match client.get(&http.url).send().await {
803                                Ok(response) if http.accepts_status(response.status().as_u16()) => {
804                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
805                                    ready_notified = true;
806                                    if let Some(tx) = ready_tx.take() {
807                                        let _ = tx.send(Ok(()));
808                                    }
809                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
810                                    http_check_interval = None;
811                                    if !active_port_spawned && has_port_config {
812                                        active_port_spawned = true;
813                                        detect_and_store_active_port(id.clone(), daemon_pid);
814                                    }
815                                }
816                                Ok(response) => {
817                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
818                                }
819                                Err(e) => {
820                                    trace!("daemon {id} HTTP check failed: {e}");
821                                }
822                            }
823                        }
824                    }
825                    _ = async {
826                        if let Some(ref mut interval) = port_check_interval {
827                            interval.tick().await;
828                        } else {
829                            std::future::pending::<()>().await;
830                        }
831                    }, if !ready_notified && ready_port.is_some() => {
832                        if let Some(port) = ready_port {
833                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
834                                Ok(_) => {
835                                    info!("daemon {id} ready: TCP port {port} is listening");
836                                    ready_notified = true;
837                                    if let Some(tx) = ready_tx.take() {
838                                        let _ = tx.send(Ok(()));
839                                    }
840                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
841                                    // Stop checking once ready
842                                    port_check_interval = None;
843                                    if !active_port_spawned && has_port_config {
844                                        active_port_spawned = true;
845                                        detect_and_store_active_port(id.clone(), daemon_pid);
846                                    }
847                                }
848                                Err(_) => {
849                                    trace!("daemon {id} port check: port {port} not listening yet");
850                                }
851                            }
852                        }
853                    }
854                    _ = async {
855                        if let Some(ref mut interval) = cmd_check_interval {
856                            interval.tick().await;
857                        } else {
858                            std::future::pending::<()>().await;
859                        }
860                    }, if !ready_notified && ready_cmd.is_some() => {
861                        if let Some(ref cmd) = ready_cmd {
862                            // Run the readiness check command using the shell abstraction
863                            let mut command = Shell::default_for_platform().command(cmd);
864                            command
865                                .current_dir(&daemon_dir)
866                                .stdout(std::process::Stdio::null())
867                                .stderr(std::process::Stdio::null());
868                            let result: std::io::Result<std::process::ExitStatus> = command.status().await;
869                            match result {
870                                Ok(status) if status.success() => {
871                                    info!("daemon {id} ready: readiness command succeeded");
872                                    ready_notified = true;
873                                    if let Some(tx) = ready_tx.take() {
874                                        let _ = tx.send(Ok(()));
875                                    }
876                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
877                                    // Stop checking once ready
878                                    cmd_check_interval = None;
879                                    if !active_port_spawned && has_port_config {
880                                        active_port_spawned = true;
881                                        detect_and_store_active_port(id.clone(), daemon_pid);
882                                    }
883                                }
884                                Ok(_) => {
885                                    trace!("daemon {id} cmd check: command returned non-zero (not ready)");
886                                }
887                                Err(e) => {
888                                    trace!("daemon {id} cmd check failed: {e}");
889                                }
890                            }
891                        }
892                    }
893                    _ = async {
894                        if let Some(ref mut timer) = delay_timer {
895                            timer.await;
896                        } else {
897                            std::future::pending::<()>().await;
898                        }
899                    } => {
900                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
901                            info!("daemon {id} ready: delay elapsed");
902                            ready_notified = true;
903                            if let Some(tx) = ready_tx.take() {
904                                let _ = tx.send(Ok(()));
905                            }
906                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
907                        }
908                        // Disable timer after it fires
909                        delay_timer = None;
910                        if !active_port_spawned && has_port_config {
911                            active_port_spawned = true;
912                            detect_and_store_active_port(id.clone(), daemon_pid);
913                        }
914                    }
915                }
916            }
917
918            // Clear active_port since the process is no longer running
919            {
920                let mut state_file = SUPERVISOR.state_file.lock().await;
921                state_file.clear_active_port(&id);
922            }
923
924            // Get the final exit status
925            let exit_status = if let Some(status) = exit_status {
926                status
927            } else {
928                // Streams closed but process hasn't exited yet, wait for it
929                match exit_rx.recv().await {
930                    Some(status) => status,
931                    None => {
932                        warn!("daemon {id} exit channel closed without receiving status");
933                        Err(std::io::Error::other("exit channel closed"))
934                    }
935                }
936            };
937            let current_daemon = SUPERVISOR.get_daemon(&id).await;
938
939            // Signal that this monitoring task is processing its exit path.
940            // The RAII guard will decrement the counter and notify close()
941            // when the task finishes (including all fire_hook registrations),
942            // regardless of which return path is taken.
943            SUPERVISOR
944                .active_monitors
945                .fetch_add(1, atomic::Ordering::Release);
946            struct MonitorGuard;
947            impl Drop for MonitorGuard {
948                fn drop(&mut self) {
949                    SUPERVISOR
950                        .active_monitors
951                        .fetch_sub(1, atomic::Ordering::Release);
952                    SUPERVISOR.monitor_done.notify_waiters();
953                }
954            }
955            let _monitor_guard = MonitorGuard;
956            // Check if this monitoring task is for the current daemon process.
957            // Allow Stopped/Stopping daemons through: stop() clears pid atomically,
958            // so d.pid != Some(pid) would be true, but we still need the is_stopped()
959            // branch below to fire on_stop/on_exit hooks.
960            if current_daemon.is_none()
961                || current_daemon.as_ref().is_some_and(|d| {
962                    d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
963                })
964            {
965                // Another process has taken over, don't update status
966                return;
967            }
968            // Capture the intentional-stop flag BEFORE any state changes.
969            // stop() transitions Stopping → Stopped and clears pid. If stop() wins the race
970            // and sets Stopped before this task runs, we still need to fire on_stop/on_exit.
971            // Treat both Stopping and Stopped as "intentional stop by pitchfork".
972            let already_stopped = current_daemon
973                .as_ref()
974                .is_some_and(|d| d.status.is_stopped());
975            let is_stopping = already_stopped
976                || current_daemon
977                    .as_ref()
978                    .is_some_and(|d| d.status.is_stopping());
979
980            // --- Phase 1: Determine exit_code, exit_reason, and update daemon state ---
981            let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
982                (Ok(status), true) => {
983                    // Intentional stop (by pitchfork). status.code() returns None
984                    // on Unix when killed by signal (e.g. SIGTERM); use -1 to
985                    // distinguish from a clean exit code 0.
986                    (status.code().unwrap_or(-1), "stop")
987                }
988                (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
989                (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
990                (Err(_), true) => {
991                    // child.wait() error while stopping (e.g. sysinfo reaped the process)
992                    (-1, "stop")
993                }
994                (Err(_), false) => (-1, "fail"),
995            };
996
997            // Update daemon state unless stop() already did it (won the race).
998            if !already_stopped {
999                if let Ok(status) = &exit_status {
1000                    info!("daemon {id} exited with status {status}");
1001                }
1002                let (new_status, last_exit_success) = match exit_reason {
1003                    "stop" | "exit" => (
1004                        DaemonStatus::Stopped,
1005                        exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1006                    ),
1007                    _ => (DaemonStatus::Errored(exit_code), false),
1008                };
1009                if let Err(e) = SUPERVISOR
1010                    .upsert_daemon(
1011                        UpsertDaemonOpts::builder(id.clone())
1012                            .set(|o| {
1013                                o.pid = None;
1014                                o.status = new_status;
1015                                o.last_exit_success = Some(last_exit_success);
1016                            })
1017                            .build(),
1018                    )
1019                    .await
1020                {
1021                    error!("Failed to update daemon state for {id}: {e}");
1022                }
1023            }
1024
1025            // --- Phase 2: Fire hooks ---
1026            let hook_extra_env = vec![
1027                ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1028                ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1029            ];
1030
1031            // Determine which hooks to fire based on exit reason
1032            let hooks_to_fire: Vec<HookType> = match exit_reason {
1033                "stop" => vec![HookType::OnStop, HookType::OnExit],
1034                "exit" => vec![HookType::OnExit],
1035                // "fail": fire on_fail + on_exit only when retries are exhausted
1036                _ if hook_retry_count >= hook_retry.count() => {
1037                    vec![HookType::OnFail, HookType::OnExit]
1038                }
1039                _ => vec![],
1040            };
1041
1042            for hook_type in hooks_to_fire {
1043                fire_hook(
1044                    hook_type,
1045                    id.clone(),
1046                    daemon_dir.clone(),
1047                    hook_retry_count,
1048                    hook_daemon_env.clone(),
1049                    hook_extra_env.clone(),
1050                )
1051                .await;
1052            }
1053        });
1054
1055        // If wait_ready is true, wait for readiness notification
1056        if let Some(ready_rx) = ready_rx {
1057            match ready_rx.await {
1058                Ok(Ok(())) => {
1059                    info!("daemon {id} is ready");
1060                    Ok(IpcResponse::DaemonReady { daemon })
1061                }
1062                Ok(Err(exit_code)) => {
1063                    error!("daemon {id} failed before becoming ready");
1064                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1065                }
1066                Err(_) => {
1067                    error!("readiness channel closed unexpectedly for daemon {id}");
1068                    Ok(IpcResponse::DaemonStart { daemon })
1069                }
1070            }
1071        } else {
1072            Ok(IpcResponse::DaemonStart { daemon })
1073        }
1074    }
1075
1076    /// Stop a running daemon
1077    pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1078        let pitchfork_id = DaemonId::pitchfork();
1079        if *id == pitchfork_id {
1080            return Ok(IpcResponse::Error(
1081                "Cannot stop supervisor via stop command".into(),
1082            ));
1083        }
1084        info!("stopping daemon: {id}");
1085        if let Some(daemon) = self.get_daemon(id).await {
1086            trace!("daemon to stop: {daemon}");
1087            if let Some(pid) = daemon.pid {
1088                trace!("killing pid: {pid}");
1089                PROCS.refresh_pids(&[pid]);
1090                if PROCS.is_running(pid) {
1091                    // First set status to Stopping (preserve PID for monitoring task)
1092                    self.upsert_daemon(
1093                        UpsertDaemonOpts::builder(id.clone())
1094                            .set(|o| {
1095                                o.pid = Some(pid);
1096                                o.status = DaemonStatus::Stopping;
1097                            })
1098                            .build(),
1099                    )
1100                    .await?;
1101
1102                    // Kill the entire process group atomically (daemon PID == PGID
1103                    // because we called setsid() at spawn time)
1104                    let stop_cfg = daemon.stop_signal.unwrap_or_default();
1105                    let stop_signal: i32 = stop_cfg.signal.into();
1106                    if let Err(e) = PROCS
1107                        .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1108                        .await
1109                    {
1110                        debug!("failed to kill pid {pid}: {e}");
1111                        // Check if the process is actually stopped despite the error
1112                        PROCS.refresh_processes();
1113                        if PROCS.is_running(pid) {
1114                            // Process still running after kill attempt - set back to Running
1115                            debug!("failed to stop pid {pid}: process still running after kill");
1116                            self.upsert_daemon(
1117                                UpsertDaemonOpts::builder(id.clone())
1118                                    .set(|o| {
1119                                        o.pid = Some(pid); // Preserve PID to avoid orphaning the process
1120                                        o.status = DaemonStatus::Running;
1121                                    })
1122                                    .build(),
1123                            )
1124                            .await?;
1125                            return Ok(IpcResponse::DaemonStopFailed {
1126                                error: format!(
1127                                    "process {pid} still running after kill attempt: {e}"
1128                                ),
1129                            });
1130                        }
1131                    }
1132
1133                    // Process successfully stopped
1134                    // Note: kill_async uses SIGTERM -> wait ~3s -> SIGKILL strategy,
1135                    // and also detects zombie processes, so by the time it returns,
1136                    // the process should be fully terminated.
1137                    self.upsert_daemon(
1138                        UpsertDaemonOpts::builder(id.clone())
1139                            .set(|o| {
1140                                o.pid = None;
1141                                o.status = DaemonStatus::Stopped;
1142                                o.last_exit_success = Some(true); // Manual stop is considered successful
1143                            })
1144                            .build(),
1145                    )
1146                    .await?;
1147                } else {
1148                    debug!("pid {pid} not running, process may have exited unexpectedly");
1149                    // Process already dead, directly mark as stopped
1150                    // Note that the cleanup logic is handled in monitor task
1151                    self.upsert_daemon(
1152                        UpsertDaemonOpts::builder(id.clone())
1153                            .set(|o| {
1154                                o.pid = None;
1155                                o.status = DaemonStatus::Stopped;
1156                            })
1157                            .build(),
1158                    )
1159                    .await?;
1160                    return Ok(IpcResponse::DaemonWasNotRunning);
1161                }
1162                Ok(IpcResponse::Ok)
1163            } else {
1164                debug!("daemon {id} not running");
1165                Ok(IpcResponse::DaemonNotRunning)
1166            }
1167        } else {
1168            debug!("daemon {id} not found");
1169            Ok(IpcResponse::DaemonNotFound)
1170        }
1171    }
1172}
1173
1174#[cfg(unix)]
1175fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1176    let s = settings();
1177    let settings_user = s.supervisor.user.trim();
1178    let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1179    let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1180    let configured = daemon_user.or(settings_user);
1181    let current_uid = nix::unistd::Uid::effective().as_raw();
1182    let current_gid = nix::unistd::Gid::effective().as_raw();
1183    resolve_run_identity(
1184        configured,
1185        current_uid,
1186        current_gid,
1187        std::env::var("SUDO_UID").ok().as_deref(),
1188        std::env::var("SUDO_GID").ok().as_deref(),
1189    )
1190}
1191
1192#[cfg(unix)]
1193fn resolve_run_identity(
1194    configured: Option<&str>,
1195    current_uid: u32,
1196    current_gid: u32,
1197    sudo_uid: Option<&str>,
1198    sudo_gid: Option<&str>,
1199) -> Result<RunIdentity> {
1200    let current_uid = nix::unistd::Uid::from_raw(current_uid);
1201    let current_gid = nix::unistd::Gid::from_raw(current_gid);
1202    if let Some(user) = configured {
1203        let identity = resolve_configured_user(user)?;
1204        ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1205        if identity.matches(current_uid, current_gid) {
1206            return Ok(RunIdentity::Inherit);
1207        }
1208        return Ok(identity);
1209    }
1210
1211    if current_uid.is_root()
1212        && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1213    {
1214        return Ok(identity);
1215    }
1216
1217    Ok(RunIdentity::Inherit)
1218}
1219
1220#[cfg(unix)]
1221fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1222    if user.chars().all(|c| c.is_ascii_digit()) {
1223        let uid = user
1224            .parse::<u32>()
1225            .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1226        let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1227            .into_diagnostic()?
1228            .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1229        return run_identity_from_user_record(user_record);
1230    }
1231
1232    let user_record = nix::unistd::User::from_name(user)
1233        .into_diagnostic()?
1234        .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1235    run_identity_from_user_record(user_record)
1236}
1237
1238#[cfg(unix)]
1239fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1240    let username = CString::new(user.name)
1241        .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1242    Ok(RunIdentity::Switch {
1243        uid: user.uid,
1244        gid: user.gid,
1245        username: Some(username),
1246    })
1247}
1248
1249#[cfg(unix)]
1250fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1251    RunIdentity::Switch {
1252        uid: nix::unistd::Uid::from_raw(uid),
1253        gid: nix::unistd::Gid::from_raw(gid),
1254        username,
1255    }
1256}
1257
1258#[cfg(unix)]
1259fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1260    let uid = sudo_uid?.parse::<u32>().ok()?;
1261    let gid = sudo_gid?.parse::<u32>().ok()?;
1262    let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1263        .ok()
1264        .flatten()
1265        .and_then(|u| CString::new(u.name).ok());
1266    Some(run_identity_from_raw_ids(uid, gid, username))
1267}
1268
1269#[cfg(unix)]
1270fn ensure_can_use_identity(
1271    configured_user: &str,
1272    identity: &RunIdentity,
1273    current_uid: nix::unistd::Uid,
1274    current_gid: nix::unistd::Gid,
1275) -> Result<()> {
1276    let RunIdentity::Switch { uid, gid, .. } = identity else {
1277        return Ok(());
1278    };
1279    if *uid == current_uid && *gid == current_gid {
1280        return Ok(());
1281    }
1282    if current_uid.is_root() {
1283        return Ok(());
1284    }
1285    Err(miette::miette!(
1286        "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1287        configured_user,
1288        current_uid.as_raw(),
1289        current_gid.as_raw(),
1290        uid.as_raw(),
1291        gid.as_raw()
1292    ))
1293}
1294
1295#[cfg(unix)]
1296fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1297    let RunIdentity::Switch { uid, gid, username } = identity else {
1298        return Ok(());
1299    };
1300    if let Some(username) = username {
1301        initgroups_for_user(username, *gid)?;
1302    } else {
1303        setgroups_to_primary(*gid)?;
1304    }
1305    nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1306    nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1307    Ok(())
1308}
1309
1310#[cfg(unix)]
1311impl RunIdentity {
1312    fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1313        matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1314    }
1315}
1316
1317#[cfg(unix)]
1318fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1319    let groups = [gid.as_raw() as libc::gid_t];
1320    #[cfg(any(target_os = "linux", target_os = "android"))]
1321    let group_count = groups.len();
1322    #[cfg(not(any(target_os = "linux", target_os = "android")))]
1323    let group_count = groups.len() as libc::c_int;
1324    let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1325    if rc == -1 {
1326        Err(std::io::Error::last_os_error())
1327    } else {
1328        Ok(())
1329    }
1330}
1331
1332#[cfg(unix)]
1333fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1334    let gid = gid.as_raw();
1335    #[cfg(any(
1336        target_os = "macos",
1337        target_os = "ios",
1338        target_os = "tvos",
1339        target_os = "watchos"
1340    ))]
1341    let base_gid = i32::try_from(gid)
1342        .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1343
1344    #[cfg(not(any(
1345        target_os = "macos",
1346        target_os = "ios",
1347        target_os = "tvos",
1348        target_os = "watchos"
1349    )))]
1350    let base_gid = gid as libc::gid_t;
1351
1352    // SAFETY: `username` is a valid nul-terminated C string and `base_gid`
1353    // is derived from a resolved system account or sudo-provided gid.
1354    let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1355    if rc == -1 {
1356        Err(std::io::Error::last_os_error())
1357    } else {
1358        Ok(())
1359    }
1360}
1361
1362#[cfg(unix)]
1363fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1364    std::io::Error::from_raw_os_error(err as i32)
1365}
1366
1367/// Check if multiple ports are available and optionally auto-bump to find available ports.
1368///
1369/// All ports are bumped by the same offset to maintain relative port spacing.
1370/// Returns the resolved ports (either the original or bumped ones).
1371/// Returns an error if any port is in use and auto_bump is disabled,
1372/// or if no available ports can be found after max attempts.
1373async fn check_ports_available(
1374    expected_ports: &[u16],
1375    auto_bump: bool,
1376    max_attempts: u32,
1377) -> Result<Vec<u16>> {
1378    if expected_ports.is_empty() {
1379        return Ok(Vec::new());
1380    }
1381
1382    for bump_offset in 0..=max_attempts {
1383        // Use wrapping_add to handle overflow correctly - ports wrap around at 65535
1384        let candidate_ports: Vec<u16> = expected_ports
1385            .iter()
1386            .map(|&p| p.wrapping_add(bump_offset as u16))
1387            .collect();
1388
1389        // Check if all ports in this set are available
1390        let mut all_available = true;
1391        let mut conflicting_port = None;
1392
1393        for &port in &candidate_ports {
1394            // Port 0 is a special case - it requests an ephemeral port from the OS.
1395            // Skip the availability check for port 0 since binding to it always succeeds.
1396            if port == 0 {
1397                continue;
1398            }
1399
1400            // Use spawn_blocking to avoid blocking the async runtime during TCP bind checks.
1401            //
1402            // We check multiple addresses to avoid false-negatives caused by SO_REUSEADDR.
1403            // On macOS/BSD, Rust's TcpListener::bind sets SO_REUSEADDR by default, which
1404            // allows binding 0.0.0.0:port even when 127.0.0.1:port is already in use
1405            // (because 0.0.0.0 is technically a different address).  Most daemons bind
1406            // to localhost, so checking 127.0.0.1 is essential to detect real conflicts.
1407            // We also check [::1] to cover IPv6 loopback listeners.
1408            //
1409            // NOTE: This check has a time-of-check-to-time-of-use (TOCTOU) race condition.
1410            // Another process could grab the port between our check and the daemon actually
1411            // binding. This is inherent to the approach and acceptable for our use case
1412            // since we're primarily detecting conflicts with already-running daemons.
1413            if is_port_in_use(port).await {
1414                all_available = false;
1415                conflicting_port = Some(port);
1416                break;
1417            }
1418        }
1419
1420        if all_available {
1421            // Check for overflow (port wrapped around to 0 due to wrapping_add)
1422            // If any candidate port is 0 but the original expected port wasn't 0,
1423            // it means we've wrapped around and should stop
1424            if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1425                return Err(PortError::NoAvailablePort {
1426                    start_port: expected_ports[0],
1427                    attempts: bump_offset + 1,
1428                }
1429                .into());
1430            }
1431            if bump_offset > 0 {
1432                info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1433            }
1434            return Ok(candidate_ports);
1435        }
1436
1437        // Port is in use
1438        if bump_offset == 0 && !auto_bump {
1439            if let Some(port) = conflicting_port {
1440                let (pid, process) = identify_port_owner(port).await;
1441                return Err(PortError::InUse { port, process, pid }.into());
1442            }
1443        }
1444    }
1445
1446    // No available ports found after max attempts
1447    Err(PortError::NoAvailablePort {
1448        start_port: expected_ports[0],
1449        attempts: max_attempts + 1,
1450    }
1451    .into())
1452}
1453
1454/// Check whether a port is currently in use by attempting to bind on multiple addresses.
1455///
1456/// Returns `true` when at least one bind attempt gets `AddrInUse`, meaning another
1457/// process is listening.  Other errors (e.g. `AddrNotAvailable` on an address family
1458/// the OS doesn't support) are ignored so they don't produce false positives.
1459async fn is_port_in_use(port: u16) -> bool {
1460    tokio::task::spawn_blocking(move || {
1461        for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1462            match std::net::TcpListener::bind((addr, port)) {
1463                Ok(listener) => drop(listener),
1464                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1465                Err(_) => continue,
1466            }
1467        }
1468        false
1469    })
1470    .await
1471    .unwrap_or(false)
1472}
1473
1474/// Best-effort lookup of the process occupying a port via `listeners::get_all()`.
1475///
1476/// Returns `(pid, process_name)`.  Falls back to `(0, "unknown")` when the
1477/// system call fails (permission error, unsupported OS, etc.).
1478async fn identify_port_owner(port: u16) -> (u32, String) {
1479    tokio::task::spawn_blocking(move || {
1480        listeners::get_all()
1481            .ok()
1482            .and_then(|list| {
1483                list.into_iter()
1484                    .find(|l| l.socket.port() == port)
1485                    .map(|l| (l.process.pid, l.process.name))
1486            })
1487            .unwrap_or((0, "unknown".to_string()))
1488    })
1489    .await
1490    .unwrap_or((0, "unknown".to_string()))
1491}
1492
1493/// Detect whether a port is in use, and if so, identify the owning process.
1494///
1495/// Combines `is_port_in_use` (reliable bind probe) with `identify_port_owner`
1496/// (best-effort process lookup).  Returns `None` when the port is free.
1497async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1498    if !is_port_in_use(port).await {
1499        return None;
1500    }
1501    Some(identify_port_owner(port).await)
1502}
1503
1504/// Spawn a background task that detects the first port the daemon process is listening on
1505/// and stores it in the state file as `active_port`.
1506///
1507/// This is called once when the daemon becomes ready. The port is cleared when the daemon stops.
1508///
1509/// Port selection strategy:
1510/// 1. If the daemon has `expected_port` configured, prefer the first port from that list
1511///    (it is the port the operator explicitly designated as the primary service port).
1512/// 2. Otherwise, take the first port the process is actually listening on (in the order
1513///    returned by the OS), which is typically the port bound earliest.
1514///
1515/// Using `min()` (lowest port number) was previously used here but is incorrect: many
1516/// applications listen on multiple ports (e.g. HTTP + metrics) and the lowest-numbered
1517/// port is not necessarily the primary service port.
1518fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1519    tokio::spawn(async move {
1520        // Retry with exponential backoff so that slow-starting daemons (JVM,
1521        // Node.js, Python, etc.) that take more than 500 ms to bind their port
1522        // are still detected.  Total wait budget: 500+1000+2000+4000 = 7.5 s.
1523        for delay_ms in [500u64, 1000, 2000, 4000] {
1524            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1525
1526            // Read daemon state atomically: check if still alive and get expected_port
1527            // in a single lock acquisition to avoid TOCTOU and unnecessary lock overhead.
1528            let expected_port: Option<u16> = {
1529                let state_file = SUPERVISOR.state_file.lock().await;
1530                match state_file.daemons.get(&id) {
1531                    Some(d) if d.pid.is_none() => {
1532                        debug!("daemon {id}: aborting active_port detection — process exited");
1533                        return;
1534                    }
1535                    Some(d) => d
1536                        .port
1537                        .as_ref()
1538                        .and_then(|p| p.expect.first().copied())
1539                        .filter(|&p| p > 0),
1540                    None => None,
1541                }
1542            };
1543
1544            let active_port = tokio::task::spawn_blocking(move || {
1545                let listeners = listeners::get_all().ok()?;
1546
1547                // Refresh process tree so all_children sees current descendants.
1548                PROCS.refresh_processes();
1549
1550                let descendant_pids: std::collections::HashSet<u32> = PROCS
1551                    .all_children(pid)
1552                    .into_iter()
1553                    .chain(std::iter::once(pid))
1554                    .collect();
1555
1556                let process_ports: Vec<u16> = listeners
1557                    .into_iter()
1558                    .filter(|listener| descendant_pids.contains(&listener.process.pid))
1559                    .map(|listener| listener.socket.port())
1560                    .filter(|&port| port > 0)
1561                    .collect();
1562
1563                if process_ports.is_empty() {
1564                    return None;
1565                }
1566
1567                // Prefer the configured expected_port if the process is actually
1568                // listening on it; otherwise fall back to the first port found.
1569                if let Some(ep) = expected_port {
1570                    if process_ports.contains(&ep) {
1571                        return Some(ep);
1572                    }
1573                }
1574
1575                // No expected_port match — return the first port in the list.
1576                // The list order reflects the order the OS reports listeners,
1577                // which is generally the order they were bound (earliest first).
1578                // Do NOT sort: the lowest-numbered port is not necessarily the
1579                // primary service port (e.g. HTTP vs metrics).
1580                process_ports.into_iter().next()
1581            })
1582            .await
1583            .ok()
1584            .flatten();
1585
1586            if let Some(port) = active_port {
1587                debug!("daemon {id} active_port detected: {port}");
1588                let mut state_file = SUPERVISOR.state_file.lock().await;
1589                if let Some(d) = state_file.daemons.get(&id) {
1590                    // Guard against PID reuse: if the original process exited and the OS
1591                    // assigned the same PID to an unrelated process that happens to bind
1592                    // a port, we must not route proxy traffic to that unrelated service.
1593                    if d.pid == Some(pid) {
1594                        state_file.set_active_port(&id, port);
1595                    } else {
1596                        debug!(
1597                            "daemon {id}: skipping active_port write — PID mismatch \
1598                             (expected {pid}, current {:?})",
1599                            d.pid
1600                        );
1601                        return;
1602                    }
1603                }
1604                return;
1605            }
1606
1607            debug!(
1608                "daemon {id}: no active port detected for pid {pid} or its descendants (will retry)"
1609            );
1610        }
1611
1612        debug!(
1613            "daemon {id}: active port detection exhausted all retries for pid {pid} and its descendants"
1614        );
1615    });
1616}
1617
1618/// Check whether a daemon (by its qualified ID) is the target of any registered
1619/// slug in the global config.  This is used to decide whether to run the
1620/// `detect_and_store_active_port` polling task — only slug-targeted daemons need
1621/// it, avoiding wasted `listeners::get_all()` calls for port-less daemons.
1622///
1623/// Delegates to `proxy::server::is_slug_target()` which uses the same in-memory
1624/// slug cache as the proxy hot path, so this check is cheap.
1625fn is_daemon_slug_target(id: &DaemonId) -> bool {
1626    // read_global_slugs is called once per daemon start — acceptable cost.
1627    // We intentionally avoid making this async to keep has_port_config evaluation
1628    // simple and synchronous in run_once().
1629    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1630    slugs.iter().any(|(slug, entry)| {
1631        let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1632        id.name() == daemon_name
1633    })
1634}
1635
1636#[cfg(all(test, unix))]
1637mod tests {
1638    use super::*;
1639
1640    #[test]
1641    fn test_resolve_run_identity_empty_without_sudo() {
1642        let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1643        assert_eq!(identity, RunIdentity::Inherit);
1644    }
1645
1646    #[test]
1647    fn test_resolve_run_identity_sudo_fallback() {
1648        let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1649        let RunIdentity::Switch { uid, gid, .. } = identity else {
1650            panic!("expected identity switch");
1651        };
1652        assert_eq!(uid.as_raw(), 501);
1653        assert_eq!(gid.as_raw(), 20);
1654    }
1655
1656    #[test]
1657    fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1658        let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1659        assert_eq!(identity, RunIdentity::Inherit);
1660    }
1661
1662    #[test]
1663    fn test_resolve_configured_user_root_name() {
1664        let identity = resolve_configured_user("root").unwrap();
1665        let RunIdentity::Switch { uid, username, .. } = identity else {
1666            panic!("expected identity switch");
1667        };
1668        assert_eq!(uid.as_raw(), 0);
1669        assert_eq!(
1670            username.as_deref().and_then(|s| s.to_str().ok()),
1671            Some("root")
1672        );
1673    }
1674
1675    #[test]
1676    fn test_resolve_configured_user_root_uid() {
1677        let identity = resolve_configured_user("0").unwrap();
1678        let RunIdentity::Switch { uid, username, .. } = identity else {
1679            panic!("expected identity switch");
1680        };
1681        assert_eq!(uid.as_raw(), 0);
1682        assert_eq!(
1683            username.as_deref().and_then(|s| s.to_str().ok()),
1684            Some("root")
1685        );
1686    }
1687
1688    #[test]
1689    fn test_resolve_configured_user_missing_user_fails() {
1690        let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1691            .unwrap_err()
1692            .to_string();
1693        assert!(err.contains("does not exist"));
1694    }
1695
1696    #[test]
1697    fn test_resolve_run_identity_requires_root_for_user_switch() {
1698        let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1699            .unwrap_err()
1700            .to_string();
1701        assert!(err.contains("Restart the supervisor with sudo"));
1702    }
1703
1704    #[test]
1705    fn test_resolve_run_identity_same_user_is_noop() {
1706        let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1707        assert_eq!(identity, RunIdentity::Inherit);
1708    }
1709}
1710
1711/// Inject proxy-related environment variables into a daemon's command.
1712///
1713/// Adds:
1714/// - `HOST` — the address the daemon should bind to (`127.0.0.1`, omitted in LAN mode)
1715/// - `PITCHFORK_URL` — the public proxy URL for this daemon (if it has a slug)
1716/// - `NODE_EXTRA_CA_CERTS` — path to the pitchfork CA cert (if HTTPS enabled)
1717/// - `__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS` — `.<tld>` for Vite host allowlisting
1718/// - `PITCHFORK_LAN` — set to `"1"` when LAN mode is active
1719fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1720    let s = crate::settings::settings();
1721    let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1722
1723    if should_force_loopback_host(slug) && !lan_enabled {
1724        // Only force loopback binding for daemons that are actually routed via a slug.
1725        // In LAN mode, daemons need to bind to 0.0.0.0 to be reachable from the network.
1726        cmd.env("HOST", "127.0.0.1");
1727    }
1728
1729    // PITCHFORK_URL: the daemon's public proxy URL (only if it has a slug and proxy is enabled)
1730    if let Some(url) = build_pitchfork_url(slug, &s) {
1731        cmd.env("PITCHFORK_URL", &url);
1732    }
1733
1734    // NODE_EXTRA_CA_CERTS: let Node.js backends trust the pitchfork CA
1735    if s.proxy.enable && s.proxy.https {
1736        let ca_path = if s.proxy.tls_cert.is_empty() {
1737            crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1738        } else {
1739            std::path::PathBuf::from(&s.proxy.tls_cert)
1740        };
1741        if ca_path.exists() {
1742            cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1743        }
1744    }
1745
1746    // __VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS: Vite host allowlisting
1747    if s.proxy.enable {
1748        let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1749        cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1750    }
1751
1752    // PITCHFORK_LAN: signal to daemons that LAN mode is active
1753    if lan_enabled {
1754        cmd.env("PITCHFORK_LAN", "1");
1755    }
1756}
1757
1758fn should_force_loopback_host(slug: &Option<String>) -> bool {
1759    let Some(slug) = slug.as_deref() else {
1760        return false;
1761    };
1762
1763    let s = crate::settings::settings();
1764    if !s.proxy.enable {
1765        return false;
1766    }
1767
1768    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1769    slugs.contains_key(slug)
1770}
1771
1772/// Compute the public proxy URL for a daemon.
1773///
1774/// Returns `None` if the daemon has no slug or the proxy is not enabled.
1775fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1776    let slug = slug.as_ref()?;
1777    if !s.proxy.enable {
1778        return None;
1779    }
1780    let scheme = if s.proxy.https { "https" } else { "http" };
1781    let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1782    let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1783        String::new()
1784    } else {
1785        format!(":{port}")
1786    };
1787    let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1788    let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1789    Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1790}