Skip to main content

pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::log_store::LogStore;
13use crate::log_store::sqlite::LOG_STORE;
14use crate::procs::PROCS;
15use crate::settings::settings;
16use crate::shell::Shell;
17use crate::supervisor::state::UpsertDaemonOpts;
18use crate::{Result, env};
19use itertools::Itertools;
20use miette::IntoDiagnostic;
21use once_cell::sync::Lazy;
22use regex::Regex;
23use std::collections::HashMap;
24#[cfg(unix)]
25use std::ffi::CString;
26use std::iter::once;
27use std::sync::atomic;
28use std::time::Duration;
29use tokio::io::AsyncBufReadExt;
30use tokio::select;
31use tokio::sync::oneshot;
32use tokio::time;
33
34/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
35static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
36    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
37
38#[cfg(unix)]
39#[derive(Clone, Debug, PartialEq, Eq)]
40enum RunIdentity {
41    Inherit,
42    Switch {
43        uid: nix::unistd::Uid,
44        gid: nix::unistd::Gid,
45        username: Option<CString>,
46    },
47}
48
49/// Get or compile a regex pattern, caching the result for future use
50pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
51    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
52    if let Some(re) = cache.get(pattern) {
53        return Some(re.clone());
54    }
55    match Regex::new(pattern) {
56        Ok(re) => {
57            cache.insert(pattern.to_string(), re.clone());
58            Some(re)
59        }
60        Err(e) => {
61            error!("invalid regex pattern '{pattern}': {e}");
62            None
63        }
64    }
65}
66
67impl Supervisor {
68    /// Run a daemon, handling retries if configured
69    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
70        let id = &opts.id;
71        let cmd = opts.cmd.clone();
72
73        // Clear any pending autostop for this daemon since it's being started
74        {
75            let mut pending = self.pending_autostops.lock().await;
76            if pending.remove(id).is_some() {
77                info!("cleared pending autostop for {id} (daemon starting)");
78            }
79        }
80
81        let daemon = self.get_daemon(id).await;
82        if let Some(daemon) = daemon {
83            // Stopping state is treated as "not running" - the monitoring task will clean it up
84            // Only check for Running state with a valid PID
85            if !daemon.status.is_stopping()
86                && !daemon.status.is_stopped()
87                && let Some(pid) = daemon.pid
88            {
89                if opts.force {
90                    self.stop(id).await?;
91                    info!("run: stop completed for daemon {id}");
92                } else {
93                    warn!("daemon {id} already running with pid {pid}");
94                    return Ok(IpcResponse::DaemonAlreadyRunning);
95                }
96            }
97        }
98
99        // If wait_ready is true and retry is configured, implement retry loop
100        if opts.wait_ready && opts.retry.count() > 0 {
101            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
102            let max_attempts = opts.retry.count().saturating_add(1);
103            for attempt in 0..max_attempts {
104                let mut retry_opts = opts.clone();
105                retry_opts.retry_count = attempt;
106                retry_opts.cmd = cmd.clone();
107
108                let result = self.run_once(retry_opts).await?;
109
110                match result {
111                    IpcResponse::DaemonReady { daemon } => {
112                        return Ok(IpcResponse::DaemonReady { daemon });
113                    }
114                    IpcResponse::DaemonFailedWithCode { exit_code } => {
115                        if attempt < opts.retry.count() {
116                            let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
117                            info!(
118                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
119                                attempt + 1,
120                                max_attempts,
121                                backoff_secs
122                            );
123                            fire_hook(
124                                HookType::OnRetry,
125                                id.clone(),
126                                opts.dir.0.clone(),
127                                attempt + 1,
128                                opts.env.clone(),
129                                vec![],
130                            )
131                            .await;
132                            time::sleep(Duration::from_secs(backoff_secs)).await;
133                            continue;
134                        } else {
135                            info!("daemon {id} failed after {max_attempts} attempts");
136                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
137                        }
138                    }
139                    other => return Ok(other),
140                }
141            }
142        }
143
144        // No retry or wait_ready is false
145        self.run_once(opts).await
146    }
147
148    /// Run a daemon once (single attempt)
149    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
150        let id = &opts.id;
151        let original_cmd = opts.cmd.clone(); // Save original command for persistence
152        let cmd = opts.cmd;
153
154        // Create channel for readiness notification if wait_ready is true
155        let (ready_tx, ready_rx) = if opts.wait_ready {
156            let (tx, rx) = oneshot::channel();
157            (Some(tx), Some(rx))
158        } else {
159            (None, None)
160        };
161
162        // Check port availability and apply auto-bump if configured
163        let expected_ports = opts
164            .port
165            .as_ref()
166            .map(|p| p.expect.clone())
167            .unwrap_or_default();
168        let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
169            let port_cfg = opts.port.as_ref().unwrap();
170            match check_ports_available(
171                &expected_ports,
172                port_cfg.auto_bump(),
173                port_cfg.max_bump_attempts(),
174            )
175            .await
176            {
177                Ok(resolved) => {
178                    let ready_port = if let Some(configured_port) = opts.ready_port {
179                        // If ready_port matches one of the expected ports, apply the same bump offset
180                        let bump_offset = resolved
181                            .first()
182                            .unwrap_or(&0)
183                            .saturating_sub(*expected_ports.first().unwrap_or(&0));
184                        if expected_ports.contains(&configured_port) && bump_offset > 0 {
185                            configured_port
186                                .checked_add(bump_offset)
187                                .or(Some(configured_port))
188                        } else {
189                            Some(configured_port)
190                        }
191                    } else if opts.ready_output.is_none()
192                        && opts.ready_http.is_none()
193                        && opts.ready_cmd.is_none()
194                        && opts.ready_delay.is_none()
195                    {
196                        // No other ready check configured — use the first expected port as a
197                        // TCP port readiness check so the daemon is considered ready once it
198                        // starts listening.  Skip port 0 (ephemeral port request).
199                        resolved.first().copied().filter(|&p| p != 0)
200                    } else {
201                        // Another ready check is configured (output/http/cmd/delay).
202                        // Don't add an implicit TCP port check — it could race and fire
203                        // before the daemon has produced any output.
204                        None
205                    };
206                    info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
207                    (resolved, ready_port)
208                }
209                Err(e) => {
210                    error!("daemon {id}: port check failed: {e}");
211                    // Convert PortError to structured IPC response
212                    if let Some(port_error) = e.downcast_ref::<PortError>() {
213                        match port_error {
214                            PortError::InUse { port, process, pid } => {
215                                return Ok(IpcResponse::PortConflict {
216                                    port: *port,
217                                    process: process.clone(),
218                                    pid: *pid,
219                                });
220                            }
221                            PortError::NoAvailablePort {
222                                start_port,
223                                attempts,
224                            } => {
225                                return Ok(IpcResponse::NoAvailablePort {
226                                    start_port: *start_port,
227                                    attempts: *attempts,
228                                });
229                            }
230                        }
231                    }
232                    return Ok(IpcResponse::DaemonFailed {
233                        error: e.to_string(),
234                    });
235                }
236            }
237        } else {
238            // When ready_port is set without expected_port, check that the port
239            // is not already occupied.  If another process is listening on it,
240            // the TCP readiness probe would immediately succeed and pitchfork
241            // would falsely consider the daemon ready — routing proxy traffic to
242            // the wrong process.
243            if let Some(port) = opts.ready_port {
244                if port > 0 {
245                    if let Some((pid, process)) = detect_port_conflict(port).await {
246                        return Ok(IpcResponse::PortConflict { port, process, pid });
247                    }
248                }
249            }
250            (Vec::new(), opts.ready_port)
251        };
252
253        let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
254            match settings().resolve_mise_bin() {
255                Some(mise_bin) => {
256                    let mise_bin_str = mise_bin.to_string_lossy().to_string();
257                    info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
258                    once("exec".to_string())
259                        .chain(once(mise_bin_str))
260                        .chain(once("x".to_string()))
261                        .chain(once("--".to_string()))
262                        .chain(cmd)
263                        .collect_vec()
264                }
265                None => {
266                    warn!("daemon {id}: mise=true but mise binary not found, running without mise");
267                    once("exec".to_string()).chain(cmd).collect_vec()
268                }
269            }
270        } else {
271            once("exec".to_string()).chain(cmd).collect_vec()
272        };
273        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
274        #[cfg(unix)]
275        let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
276            Ok(identity) => identity,
277            Err(e) => {
278                return Ok(IpcResponse::DaemonFailed {
279                    error: e.to_string(),
280                });
281            }
282        };
283        info!("run: spawning daemon {id} with args: {args:?}");
284
285        // Allocate PTY if configured
286        #[cfg(unix)]
287        let pty_pair = if opts.pty.unwrap_or(false) {
288            match super::pty::openpty() {
289                Ok(pair) => {
290                    info!("daemon {id}: allocated PTY (pty = true)");
291                    Some(pair)
292                }
293                Err(e) => {
294                    warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
295                    None
296                }
297            }
298        } else {
299            None
300        };
301
302        let mut cmd = tokio::process::Command::new("sh");
303
304        #[cfg(unix)]
305        if let Some(ref pair) = pty_pair {
306            // PTY mode: connect both stdout and stderr to the slave PTY.
307            // The child uses the slave for stdin/stdout/stderr, and we read
308            // output from the master.
309            let slave_file = std::fs::File::from(
310                pair.slave
311                    .try_clone()
312                    .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
313            );
314            cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
315                |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
316            )?));
317            cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
318                |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
319            )?));
320            cmd.stderr(std::process::Stdio::from(slave_file));
321        } else {
322            cmd.stdout(std::process::Stdio::piped())
323                .stderr(std::process::Stdio::piped());
324        }
325
326        #[cfg(not(unix))]
327        {
328            cmd.stdout(std::process::Stdio::piped())
329                .stderr(std::process::Stdio::piped());
330        }
331
332        cmd.args(&args).current_dir(&opts.dir);
333
334        #[cfg(unix)]
335        if pty_pair.is_none() {
336            cmd.stdin(std::process::Stdio::null());
337        }
338
339        #[cfg(not(unix))]
340        cmd.stdin(std::process::Stdio::null());
341
342        // Ensure daemon can find user tools by using the original PATH
343        if let Some(ref path) = *env::ORIGINAL_PATH {
344            cmd.env("PATH", path);
345        }
346
347        // Apply custom environment variables from config
348        if let Some(ref env_vars) = opts.env {
349            cmd.envs(env_vars);
350        }
351
352        // Inject pitchfork metadata env vars AFTER user env so they can't be overwritten
353        cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
354        cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
355        cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
356
357        // Inject the resolved ports for the daemon to use
358        if !resolved_ports.is_empty() {
359            // Set PORT to the first port for backward compatibility
360            // When there's only one port, both PORT and PORT0 will be set to the same value.
361            // This follows the convention used by many deployment platforms (Heroku, etc.).
362            cmd.env("PORT", resolved_ports[0].to_string());
363            // Set individual ports as PORT0, PORT1, etc.
364            for (i, port) in resolved_ports.iter().enumerate() {
365                cmd.env(format!("PORT{i}"), port.to_string());
366            }
367        }
368
369        // Inject proxy-related environment variables
370        inject_proxy_env(&mut cmd, &opts.slug);
371
372        #[cfg(unix)]
373        {
374            let run_identity = run_identity.clone();
375            let use_pty = pty_pair.is_some();
376            unsafe {
377                cmd.pre_exec(move || {
378                    nix::unistd::setsid().map_err(nix_to_io_error)?;
379
380                    // When using a PTY, set the slave as the controlling terminal.
381                    // The slave FD has already been dup'd onto stdin/stdout/stderr
382                    // by tokio, so we can use stdin (fd 0) for TIOCSCTTY.
383                    if use_pty {
384                        let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
385                        if ret < 0 {
386                            // Non-fatal: the process can still run without
387                            // a controlling terminal.
388                            #[cfg(target_os = "linux")]
389                            eprintln!(
390                                "pitchfork: TIOCSCTTY failed: {}",
391                                std::io::Error::last_os_error()
392                            );
393                        }
394                    }
395
396                    apply_run_identity(&run_identity)?;
397                    Ok(())
398                });
399            }
400        }
401
402        let mut child = cmd.spawn().into_diagnostic()?;
403        let pid = match child.id() {
404            Some(p) => p,
405            None => {
406                warn!("Daemon {id} exited before PID could be captured");
407                return Ok(IpcResponse::DaemonFailed {
408                    error: "Process exited immediately".to_string(),
409                });
410            }
411        };
412        info!("started daemon {id} with pid {pid}");
413        PROCS.refresh_pids(&[pid]);
414        let daemon = self
415            .upsert_daemon(
416                UpsertDaemonOpts::builder(id.clone())
417                    .set(|o| {
418                        o.pid = Some(pid);
419                        o.status = DaemonStatus::Running;
420                        o.shell_pid = opts.shell_pid;
421                        o.dir = Some(opts.dir.0.clone());
422                        o.cmd = Some(original_cmd);
423                        o.autostop = opts.autostop;
424                        o.cron_schedule = opts.cron_schedule.clone();
425                        o.cron_retrigger = opts.cron_retrigger;
426                        o.cron_immediate = opts.cron_immediate;
427                        o.retry = Some(opts.retry);
428                        o.retry_count = Some(opts.retry_count);
429                        o.ready_delay = opts.ready_delay;
430                        o.ready_output = opts.ready_output.clone();
431                        o.ready_http = opts.ready_http.clone();
432                        o.ready_port = effective_ready_port;
433                        o.ready_cmd = opts.ready_cmd.clone();
434                        o.port = crate::config_types::PortConfig::from_parts(
435                            expected_ports,
436                            opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
437                        );
438                        o.resolved_port = resolved_ports;
439                        o.depends = Some(opts.depends.clone());
440                        o.env = opts.env.clone();
441                        o.watch = Some(opts.watch.clone());
442                        o.watch_mode = Some(opts.watch_mode);
443                        o.watch_base_dir = opts.watch_base_dir.clone();
444                        o.mise = opts.mise;
445                        o.user = opts.user.clone();
446                        o.memory_limit = opts.memory_limit;
447                        o.cpu_limit = opts.cpu_limit;
448                        o.stop_signal = opts.stop_signal;
449                        o.pty = opts.pty;
450                    })
451                    .build(),
452            )
453            .await?;
454
455        let id_clone = id.clone();
456        let ready_delay = opts.ready_delay;
457        let ready_output = opts.ready_output.clone();
458        let ready_http = opts.ready_http.clone();
459        let ready_port = effective_ready_port;
460        let ready_cmd = opts.ready_cmd.clone();
461        let daemon_dir = opts.dir.0.clone();
462        let hook_retry_count = opts.retry_count;
463        let hook_retry = opts.retry;
464        let hook_daemon_env = opts.env.clone();
465        let on_output_hook = opts.on_output_hook.clone();
466        // Whether this daemon has any port-related config — used to skip the
467        // active_port detection task for daemons that never bind a port (e.g. `sleep 60`).
468        // When the proxy is enabled, only detect active_port for daemons that are
469        // actually referenced by a registered slug, rather than blanket-polling every
470        // daemon (which wastes ~7.5 s of listeners::get_all() calls per port-less daemon).
471        let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
472            || (settings().proxy.enable && is_daemon_slug_target(id));
473        let daemon_pid = pid;
474
475        // Prepare output readers before spawning the monitoring task.
476        // In PTY mode, we read from the PTY master FD.
477        // In pipe mode, we read from separate stdout/stderr pipes.
478        #[cfg(unix)]
479        let pty_reader = pty_pair.map(|p| {
480            tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
481                .lines()
482        });
483        #[cfg(not(unix))]
484        let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
485        let stdout_reader = if pty_reader.is_none() {
486            child
487                .stdout
488                .take()
489                .map(|s| tokio::io::BufReader::new(s).lines())
490        } else {
491            None
492        };
493        let stderr_reader = if pty_reader.is_none() {
494            child
495                .stderr
496                .take()
497                .map(|s| tokio::io::BufReader::new(s).lines())
498        } else {
499            None
500        };
501
502        if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
503            error!("Failed to capture stdout/stderr for daemon {id}");
504        }
505
506        tokio::spawn(async move {
507            let id = id_clone;
508
509            // Merge all output sources (PTY master OR stdout+stderr) into a single channel.
510            let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
511
512            if let Some(mut reader) = pty_reader {
513                // PTY mode: single merged stream from the master.
514                // output_tx is moved into the spawn; when the reader ends the
515                // channel closes automatically.
516                tokio::spawn(async move {
517                    while let Ok(Some(mut line)) = reader.next_line().await {
518                        // PTY slave uses ONLCR: \n → \r\n; strip the trailing \r.
519                        if line.ends_with('\r') {
520                            line.pop();
521                        }
522                        if output_tx.send(line).await.is_err() {
523                            break;
524                        }
525                    }
526                });
527            } else {
528                // Pipe mode: stdout and stderr are merged into the same channel.
529                // Both `ready_output` and `on_output_hook` patterns match against
530                // lines from either stream, which is the expected behavior (a
531                // "server ready" message may appear on stderr in some tools).
532                if let Some(mut stdout) = stdout_reader {
533                    let tx = output_tx.clone();
534                    tokio::spawn(async move {
535                        while let Ok(Some(line)) = stdout.next_line().await {
536                            if tx.send(line).await.is_err() {
537                                break;
538                            }
539                        }
540                    });
541                }
542                if let Some(mut stderr) = stderr_reader {
543                    let tx = output_tx.clone();
544                    tokio::spawn(async move {
545                        while let Ok(Some(line)) = stderr.next_line().await {
546                            if tx.send(line).await.is_err() {
547                                break;
548                            }
549                        }
550                    });
551                }
552                // Drop the last sender so the channel closes when all readers finish.
553                drop(output_tx);
554            }
555            let log_store = &*LOG_STORE;
556            let format_line = |line: String| line;
557
558            // SQLite WAL mode provides automatic durability; no explicit flush needed.
559
560            // Setup readiness checking
561            let mut ready_notified = false;
562            let mut ready_tx = ready_tx;
563            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
564            // Track whether we've already spawned the active_port detection task
565            let mut active_port_spawned = false;
566
567            // Validate on_output config early; discard the hook on any error so
568            // a bad regex does not silently fall through to the (None, None) => true
569            // match arm and fire on every line.
570            let on_output_hook = match on_output_hook {
571                Some(ref hook) => match hook.validate(id.name()) {
572                    Ok(()) => on_output_hook,
573                    Err(e) => {
574                        error!("{e}");
575                        None
576                    }
577                },
578                None => None,
579            };
580
581            // Compile the regex pattern after validation so we only attempt this
582            // when the hook is known-good (validate() already checked the syntax).
583            let on_output_pattern: Option<regex::Regex> = on_output_hook
584                .as_ref()
585                .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
586            let on_output_debounce = on_output_hook
587                .as_ref()
588                .map(|h| h.debounce_duration())
589                .unwrap_or(Duration::from_millis(1000));
590            // Last time the on_output hook fired; None means it has never fired.
591            let mut on_output_last_fired: Option<std::time::Instant> = None;
592
593            let mut delay_timer =
594                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
595
596            // Get settings for intervals
597            let s = settings();
598            let ready_check_interval = s.supervisor_ready_check_interval();
599            let http_client_timeout = s.supervisor_http_client_timeout();
600
601            // Setup HTTP readiness check interval
602            let mut http_check_interval = ready_http
603                .as_ref()
604                .map(|_| tokio::time::interval(ready_check_interval));
605            let http_client = ready_http.as_ref().map(|_| {
606                reqwest::Client::builder()
607                    .timeout(http_client_timeout)
608                    .build()
609                    .unwrap_or_default()
610            });
611
612            // Setup TCP port readiness check interval
613            let mut port_check_interval =
614                ready_port.map(|_| tokio::time::interval(ready_check_interval));
615
616            // Setup command readiness check interval
617            let mut cmd_check_interval = ready_cmd
618                .as_ref()
619                .map(|_| tokio::time::interval(ready_check_interval));
620
621            // Use a channel to communicate process exit status
622            let (exit_tx, mut exit_rx) =
623                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
624
625            // Spawn a task to wait for process exit
626            let child_pid = child.id().unwrap_or(0);
627            tokio::spawn(async move {
628                let result = child.wait().await;
629                // On non-Linux Unix (e.g. macOS) the zombie reaper may win the
630                // race and consume the exit status via waitpid(None, WNOHANG)
631                // before Tokio's child.wait() gets to it. When that happens,
632                // Tokio returns an ECHILD io::Error. We recover by checking
633                // REAPED_STATUSES for the stashed exit code.
634                //
635                // On Linux this is unnecessary because the reaper uses
636                // waitid(WNOWAIT) to peek before reaping, which avoids the
637                // race entirely.
638                #[cfg(all(unix, not(target_os = "linux")))]
639                let result = match &result {
640                    Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
641                        if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
642                            warn!(
643                                "daemon pid {child_pid} wait() got ECHILD; \
644                                 recovered exit code {code} from zombie reaper"
645                            );
646                            // Synthesize an ExitStatus from the stashed code.
647                            // On Unix we can use `ExitStatus::from_raw()` with
648                            // a wait-style status word (code << 8 for normal
649                            // exit, or raw signal number for signal death).
650                            use std::os::unix::process::ExitStatusExt;
651                            if code >= 0 {
652                                Ok(std::process::ExitStatus::from_raw(code << 8))
653                            } else {
654                                // Negative code means killed by signal (-sig)
655                                Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
656                            }
657                        } else {
658                            warn!(
659                                "daemon pid {child_pid} wait() got ECHILD but no \
660                                 stashed status found; reporting as error"
661                            );
662                            result
663                        }
664                    }
665                    _ => result,
666                };
667                debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
668                let _ = exit_tx.send(result).await;
669            });
670
671            #[allow(unused_assignments)]
672            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
673            let mut exit_status = None;
674
675            // If there is no ready check of any kind and no delay, the daemon is
676            // considered immediately ready and the active_port detection task would
677            // never be triggered inside the select loop.  Kick it off right away so
678            // that daemons without any readiness configuration still get their
679            // active_port populated (needed for proxy routing).
680            if has_port_config
681                && ready_pattern.is_none()
682                && ready_http.is_none()
683                && ready_port.is_none()
684                && ready_cmd.is_none()
685                && delay_timer.is_none()
686            {
687                active_port_spawned = true;
688                detect_and_store_active_port(id.clone(), daemon_pid);
689            }
690
691            loop {
692                select! {
693                    Some(line) = output_rx.recv() => {
694                        let formatted = format_line(line.clone());
695                        if let Err(e) = log_store.append(&id, &formatted) {
696                            error!("Failed to write to log for daemon {id}: {e}");
697                        }
698                        trace!("output: {id} {formatted}");
699
700                        // Strip ANSI for pattern matching so user-written patterns
701                        // work regardless of whether the process emits color codes.
702                        let line_clean = console::strip_ansi_codes(&line).to_string();
703
704                        // Check if output matches ready pattern
705                        if !ready_notified
706                            && let Some(ref pattern) = ready_pattern
707                            && pattern.is_match(&line_clean)
708                        {
709                            info!("daemon {id} ready: output matched pattern");
710                            ready_notified = true;
711                            if let Some(tx) = ready_tx.take() {
712                                let _ = tx.send(Ok(()));
713                            }
714                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
715                            if !active_port_spawned && has_port_config {
716                                active_port_spawned = true;
717                                detect_and_store_active_port(id.clone(), daemon_pid);
718                            }
719                        }
720
721                        // Check on_output hook
722                        if let Some(ref hook) = on_output_hook {
723                            let matched = match (&hook.filter, &on_output_pattern) {
724                                (Some(substr), _) => line_clean.contains(substr.as_str()),
725                                (None, Some(re)) => re.is_match(&line_clean),
726                                (None, None) => true,
727                            };
728                            if matched {
729                                let now = std::time::Instant::now();
730                                let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
731                                if elapsed.is_none_or(|e| e >= on_output_debounce) {
732                                    on_output_last_fired = Some(now);
733                                    hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
734                                }
735                            }
736                        }
737                    }
738                    Some(result) = exit_rx.recv() => {
739                        // Process exited - save exit status and notify if not ready yet
740                        exit_status = Some(result);
741                        debug!("daemon {id} process exited, exit_status: {exit_status:?}");
742                        if !ready_notified {
743                            if let Some(tx) = ready_tx.take() {
744                                // Check if process exited successfully
745                                let is_success = exit_status.as_ref()
746                                    .and_then(|r| r.as_ref().ok())
747                                    .map(|s| s.success())
748                                    .unwrap_or(false);
749
750                                if is_success {
751                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
752                                    let _ = tx.send(Ok(()));
753                                } else {
754                                    let exit_code = exit_status.as_ref()
755                                        .and_then(|r| r.as_ref().ok())
756                                        .and_then(|s| s.code());
757                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
758                                    let _ = tx.send(Err(exit_code));
759                                }
760                            }
761                        } else {
762                            debug!("daemon {id} was already marked ready, not sending notification");
763                        }
764                        break;
765                    },
766                    _ = async {
767                        if let Some(ref mut interval) = http_check_interval {
768                            interval.tick().await;
769                        } else {
770                            std::future::pending::<()>().await;
771                        }
772                    }, if !ready_notified && ready_http.is_some() => {
773                        if let (Some(http), Some(client)) = (&ready_http, &http_client) {
774                            match client.get(&http.url).send().await {
775                                Ok(response) if http.accepts_status(response.status().as_u16()) => {
776                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
777                                    ready_notified = true;
778                                    if let Some(tx) = ready_tx.take() {
779                                        let _ = tx.send(Ok(()));
780                                    }
781                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
782                                    http_check_interval = None;
783                                    if !active_port_spawned && has_port_config {
784                                        active_port_spawned = true;
785                                        detect_and_store_active_port(id.clone(), daemon_pid);
786                                    }
787                                }
788                                Ok(response) => {
789                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
790                                }
791                                Err(e) => {
792                                    trace!("daemon {id} HTTP check failed: {e}");
793                                }
794                            }
795                        }
796                    }
797                    _ = async {
798                        if let Some(ref mut interval) = port_check_interval {
799                            interval.tick().await;
800                        } else {
801                            std::future::pending::<()>().await;
802                        }
803                    }, if !ready_notified && ready_port.is_some() => {
804                        if let Some(port) = ready_port {
805                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
806                                Ok(_) => {
807                                    info!("daemon {id} ready: TCP port {port} is listening");
808                                    ready_notified = true;
809                                    if let Some(tx) = ready_tx.take() {
810                                        let _ = tx.send(Ok(()));
811                                    }
812                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
813                                    // Stop checking once ready
814                                    port_check_interval = None;
815                                    if !active_port_spawned && has_port_config {
816                                        active_port_spawned = true;
817                                        detect_and_store_active_port(id.clone(), daemon_pid);
818                                    }
819                                }
820                                Err(_) => {
821                                    trace!("daemon {id} port check: port {port} not listening yet");
822                                }
823                            }
824                        }
825                    }
826                    _ = async {
827                        if let Some(ref mut interval) = cmd_check_interval {
828                            interval.tick().await;
829                        } else {
830                            std::future::pending::<()>().await;
831                        }
832                    }, if !ready_notified && ready_cmd.is_some() => {
833                        if let Some(ref cmd) = ready_cmd {
834                            // Run the readiness check command using the shell abstraction
835                            let mut command = Shell::default_for_platform().command(cmd);
836                            command
837                                .current_dir(&daemon_dir)
838                                .stdout(std::process::Stdio::null())
839                                .stderr(std::process::Stdio::null());
840                            let result: std::io::Result<std::process::ExitStatus> = command.status().await;
841                            match result {
842                                Ok(status) if status.success() => {
843                                    info!("daemon {id} ready: readiness command succeeded");
844                                    ready_notified = true;
845                                    if let Some(tx) = ready_tx.take() {
846                                        let _ = tx.send(Ok(()));
847                                    }
848                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
849                                    // Stop checking once ready
850                                    cmd_check_interval = None;
851                                    if !active_port_spawned && has_port_config {
852                                        active_port_spawned = true;
853                                        detect_and_store_active_port(id.clone(), daemon_pid);
854                                    }
855                                }
856                                Ok(_) => {
857                                    trace!("daemon {id} cmd check: command returned non-zero (not ready)");
858                                }
859                                Err(e) => {
860                                    trace!("daemon {id} cmd check failed: {e}");
861                                }
862                            }
863                        }
864                    }
865                    _ = async {
866                        if let Some(ref mut timer) = delay_timer {
867                            timer.await;
868                        } else {
869                            std::future::pending::<()>().await;
870                        }
871                    } => {
872                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
873                            info!("daemon {id} ready: delay elapsed");
874                            ready_notified = true;
875                            if let Some(tx) = ready_tx.take() {
876                                let _ = tx.send(Ok(()));
877                            }
878                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
879                        }
880                        // Disable timer after it fires
881                        delay_timer = None;
882                        if !active_port_spawned && has_port_config {
883                            active_port_spawned = true;
884                            detect_and_store_active_port(id.clone(), daemon_pid);
885                        }
886                    }
887                }
888            }
889
890            // Clear active_port since the process is no longer running
891            {
892                let mut state_file = SUPERVISOR.state_file.lock().await;
893                state_file.clear_active_port(&id);
894            }
895
896            // Get the final exit status
897            let exit_status = if let Some(status) = exit_status {
898                status
899            } else {
900                // Streams closed but process hasn't exited yet, wait for it
901                match exit_rx.recv().await {
902                    Some(status) => status,
903                    None => {
904                        warn!("daemon {id} exit channel closed without receiving status");
905                        Err(std::io::Error::other("exit channel closed"))
906                    }
907                }
908            };
909            let current_daemon = SUPERVISOR.get_daemon(&id).await;
910
911            // Signal that this monitoring task is processing its exit path.
912            // The RAII guard will decrement the counter and notify close()
913            // when the task finishes (including all fire_hook registrations),
914            // regardless of which return path is taken.
915            SUPERVISOR
916                .active_monitors
917                .fetch_add(1, atomic::Ordering::Release);
918            struct MonitorGuard;
919            impl Drop for MonitorGuard {
920                fn drop(&mut self) {
921                    SUPERVISOR
922                        .active_monitors
923                        .fetch_sub(1, atomic::Ordering::Release);
924                    SUPERVISOR.monitor_done.notify_waiters();
925                }
926            }
927            let _monitor_guard = MonitorGuard;
928            // Check if this monitoring task is for the current daemon process.
929            // Allow Stopped/Stopping daemons through: stop() clears pid atomically,
930            // so d.pid != Some(pid) would be true, but we still need the is_stopped()
931            // branch below to fire on_stop/on_exit hooks.
932            if current_daemon.is_none()
933                || current_daemon.as_ref().is_some_and(|d| {
934                    d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
935                })
936            {
937                // Another process has taken over, don't update status
938                return;
939            }
940            // Capture the intentional-stop flag BEFORE any state changes.
941            // stop() transitions Stopping → Stopped and clears pid. If stop() wins the race
942            // and sets Stopped before this task runs, we still need to fire on_stop/on_exit.
943            // Treat both Stopping and Stopped as "intentional stop by pitchfork".
944            let already_stopped = current_daemon
945                .as_ref()
946                .is_some_and(|d| d.status.is_stopped());
947            let is_stopping = already_stopped
948                || current_daemon
949                    .as_ref()
950                    .is_some_and(|d| d.status.is_stopping());
951
952            // --- Phase 1: Determine exit_code, exit_reason, and update daemon state ---
953            let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
954                (Ok(status), true) => {
955                    // Intentional stop (by pitchfork). status.code() returns None
956                    // on Unix when killed by signal (e.g. SIGTERM); use -1 to
957                    // distinguish from a clean exit code 0.
958                    (status.code().unwrap_or(-1), "stop")
959                }
960                (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
961                (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
962                (Err(_), true) => {
963                    // child.wait() error while stopping (e.g. sysinfo reaped the process)
964                    (-1, "stop")
965                }
966                (Err(_), false) => (-1, "fail"),
967            };
968
969            // Update daemon state unless stop() already did it (won the race).
970            if !already_stopped {
971                if let Ok(status) = &exit_status {
972                    info!("daemon {id} exited with status {status}");
973                }
974                let (new_status, last_exit_success) = match exit_reason {
975                    "stop" | "exit" => (
976                        DaemonStatus::Stopped,
977                        exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
978                    ),
979                    _ => (DaemonStatus::Errored(exit_code), false),
980                };
981                if let Err(e) = SUPERVISOR
982                    .upsert_daemon(
983                        UpsertDaemonOpts::builder(id.clone())
984                            .set(|o| {
985                                o.pid = None;
986                                o.status = new_status;
987                                o.last_exit_success = Some(last_exit_success);
988                            })
989                            .build(),
990                    )
991                    .await
992                {
993                    error!("Failed to update daemon state for {id}: {e}");
994                }
995            }
996
997            // --- Phase 2: Fire hooks ---
998            let hook_extra_env = vec![
999                ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1000                ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1001            ];
1002
1003            // Determine which hooks to fire based on exit reason
1004            let hooks_to_fire: Vec<HookType> = match exit_reason {
1005                "stop" => vec![HookType::OnStop, HookType::OnExit],
1006                "exit" => vec![HookType::OnExit],
1007                // "fail": fire on_fail + on_exit only when retries are exhausted
1008                _ if hook_retry_count >= hook_retry.count() => {
1009                    vec![HookType::OnFail, HookType::OnExit]
1010                }
1011                _ => vec![],
1012            };
1013
1014            for hook_type in hooks_to_fire {
1015                fire_hook(
1016                    hook_type,
1017                    id.clone(),
1018                    daemon_dir.clone(),
1019                    hook_retry_count,
1020                    hook_daemon_env.clone(),
1021                    hook_extra_env.clone(),
1022                )
1023                .await;
1024            }
1025        });
1026
1027        // If wait_ready is true, wait for readiness notification
1028        if let Some(ready_rx) = ready_rx {
1029            match ready_rx.await {
1030                Ok(Ok(())) => {
1031                    info!("daemon {id} is ready");
1032                    Ok(IpcResponse::DaemonReady { daemon })
1033                }
1034                Ok(Err(exit_code)) => {
1035                    error!("daemon {id} failed before becoming ready");
1036                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1037                }
1038                Err(_) => {
1039                    error!("readiness channel closed unexpectedly for daemon {id}");
1040                    Ok(IpcResponse::DaemonStart { daemon })
1041                }
1042            }
1043        } else {
1044            Ok(IpcResponse::DaemonStart { daemon })
1045        }
1046    }
1047
1048    /// Stop a running daemon
1049    pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1050        let pitchfork_id = DaemonId::pitchfork();
1051        if *id == pitchfork_id {
1052            return Ok(IpcResponse::Error(
1053                "Cannot stop supervisor via stop command".into(),
1054            ));
1055        }
1056        info!("stopping daemon: {id}");
1057        if let Some(daemon) = self.get_daemon(id).await {
1058            trace!("daemon to stop: {daemon}");
1059            if let Some(pid) = daemon.pid {
1060                trace!("killing pid: {pid}");
1061                PROCS.refresh_pids(&[pid]);
1062                if PROCS.is_running(pid) {
1063                    // First set status to Stopping (preserve PID for monitoring task)
1064                    self.upsert_daemon(
1065                        UpsertDaemonOpts::builder(id.clone())
1066                            .set(|o| {
1067                                o.pid = Some(pid);
1068                                o.status = DaemonStatus::Stopping;
1069                            })
1070                            .build(),
1071                    )
1072                    .await?;
1073
1074                    // Kill the entire process group atomically (daemon PID == PGID
1075                    // because we called setsid() at spawn time)
1076                    let stop_cfg = daemon.stop_signal.unwrap_or_default();
1077                    let stop_signal: i32 = stop_cfg.signal.into();
1078                    if let Err(e) = PROCS
1079                        .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1080                        .await
1081                    {
1082                        debug!("failed to kill pid {pid}: {e}");
1083                        // Check if the process is actually stopped despite the error
1084                        PROCS.refresh_processes();
1085                        if PROCS.is_running(pid) {
1086                            // Process still running after kill attempt - set back to Running
1087                            debug!("failed to stop pid {pid}: process still running after kill");
1088                            self.upsert_daemon(
1089                                UpsertDaemonOpts::builder(id.clone())
1090                                    .set(|o| {
1091                                        o.pid = Some(pid); // Preserve PID to avoid orphaning the process
1092                                        o.status = DaemonStatus::Running;
1093                                    })
1094                                    .build(),
1095                            )
1096                            .await?;
1097                            return Ok(IpcResponse::DaemonStopFailed {
1098                                error: format!(
1099                                    "process {pid} still running after kill attempt: {e}"
1100                                ),
1101                            });
1102                        }
1103                    }
1104
1105                    // Process successfully stopped
1106                    // Note: kill_async uses SIGTERM -> wait ~3s -> SIGKILL strategy,
1107                    // and also detects zombie processes, so by the time it returns,
1108                    // the process should be fully terminated.
1109                    self.upsert_daemon(
1110                        UpsertDaemonOpts::builder(id.clone())
1111                            .set(|o| {
1112                                o.pid = None;
1113                                o.status = DaemonStatus::Stopped;
1114                                o.last_exit_success = Some(true); // Manual stop is considered successful
1115                            })
1116                            .build(),
1117                    )
1118                    .await?;
1119                } else {
1120                    debug!("pid {pid} not running, process may have exited unexpectedly");
1121                    // Process already dead, directly mark as stopped
1122                    // Note that the cleanup logic is handled in monitor task
1123                    self.upsert_daemon(
1124                        UpsertDaemonOpts::builder(id.clone())
1125                            .set(|o| {
1126                                o.pid = None;
1127                                o.status = DaemonStatus::Stopped;
1128                            })
1129                            .build(),
1130                    )
1131                    .await?;
1132                    return Ok(IpcResponse::DaemonWasNotRunning);
1133                }
1134                Ok(IpcResponse::Ok)
1135            } else {
1136                debug!("daemon {id} not running");
1137                Ok(IpcResponse::DaemonNotRunning)
1138            }
1139        } else {
1140            debug!("daemon {id} not found");
1141            Ok(IpcResponse::DaemonNotFound)
1142        }
1143    }
1144}
1145
1146#[cfg(unix)]
1147fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1148    let s = settings();
1149    let settings_user = s.supervisor.user.trim();
1150    let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1151    let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1152    let configured = daemon_user.or(settings_user);
1153    let current_uid = nix::unistd::Uid::effective().as_raw();
1154    let current_gid = nix::unistd::Gid::effective().as_raw();
1155    resolve_run_identity(
1156        configured,
1157        current_uid,
1158        current_gid,
1159        std::env::var("SUDO_UID").ok().as_deref(),
1160        std::env::var("SUDO_GID").ok().as_deref(),
1161    )
1162}
1163
1164#[cfg(unix)]
1165fn resolve_run_identity(
1166    configured: Option<&str>,
1167    current_uid: u32,
1168    current_gid: u32,
1169    sudo_uid: Option<&str>,
1170    sudo_gid: Option<&str>,
1171) -> Result<RunIdentity> {
1172    let current_uid = nix::unistd::Uid::from_raw(current_uid);
1173    let current_gid = nix::unistd::Gid::from_raw(current_gid);
1174    if let Some(user) = configured {
1175        let identity = resolve_configured_user(user)?;
1176        ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1177        if identity.matches(current_uid, current_gid) {
1178            return Ok(RunIdentity::Inherit);
1179        }
1180        return Ok(identity);
1181    }
1182
1183    if current_uid.is_root()
1184        && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1185    {
1186        return Ok(identity);
1187    }
1188
1189    Ok(RunIdentity::Inherit)
1190}
1191
1192#[cfg(unix)]
1193fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1194    if user.chars().all(|c| c.is_ascii_digit()) {
1195        let uid = user
1196            .parse::<u32>()
1197            .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1198        let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1199            .into_diagnostic()?
1200            .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1201        return run_identity_from_user_record(user_record);
1202    }
1203
1204    let user_record = nix::unistd::User::from_name(user)
1205        .into_diagnostic()?
1206        .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1207    run_identity_from_user_record(user_record)
1208}
1209
1210#[cfg(unix)]
1211fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1212    let username = CString::new(user.name)
1213        .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1214    Ok(RunIdentity::Switch {
1215        uid: user.uid,
1216        gid: user.gid,
1217        username: Some(username),
1218    })
1219}
1220
1221#[cfg(unix)]
1222fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1223    RunIdentity::Switch {
1224        uid: nix::unistd::Uid::from_raw(uid),
1225        gid: nix::unistd::Gid::from_raw(gid),
1226        username,
1227    }
1228}
1229
1230#[cfg(unix)]
1231fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1232    let uid = sudo_uid?.parse::<u32>().ok()?;
1233    let gid = sudo_gid?.parse::<u32>().ok()?;
1234    let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1235        .ok()
1236        .flatten()
1237        .and_then(|u| CString::new(u.name).ok());
1238    Some(run_identity_from_raw_ids(uid, gid, username))
1239}
1240
1241#[cfg(unix)]
1242fn ensure_can_use_identity(
1243    configured_user: &str,
1244    identity: &RunIdentity,
1245    current_uid: nix::unistd::Uid,
1246    current_gid: nix::unistd::Gid,
1247) -> Result<()> {
1248    let RunIdentity::Switch { uid, gid, .. } = identity else {
1249        return Ok(());
1250    };
1251    if *uid == current_uid && *gid == current_gid {
1252        return Ok(());
1253    }
1254    if current_uid.is_root() {
1255        return Ok(());
1256    }
1257    Err(miette::miette!(
1258        "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1259        configured_user,
1260        current_uid.as_raw(),
1261        current_gid.as_raw(),
1262        uid.as_raw(),
1263        gid.as_raw()
1264    ))
1265}
1266
1267#[cfg(unix)]
1268fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1269    let RunIdentity::Switch { uid, gid, username } = identity else {
1270        return Ok(());
1271    };
1272    if let Some(username) = username {
1273        initgroups_for_user(username, *gid)?;
1274    } else {
1275        setgroups_to_primary(*gid)?;
1276    }
1277    nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1278    nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1279    Ok(())
1280}
1281
1282#[cfg(unix)]
1283impl RunIdentity {
1284    fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1285        matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1286    }
1287}
1288
1289#[cfg(unix)]
1290fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1291    let groups = [gid.as_raw() as libc::gid_t];
1292    #[cfg(any(target_os = "linux", target_os = "android"))]
1293    let group_count = groups.len();
1294    #[cfg(not(any(target_os = "linux", target_os = "android")))]
1295    let group_count = groups.len() as libc::c_int;
1296    let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1297    if rc == -1 {
1298        Err(std::io::Error::last_os_error())
1299    } else {
1300        Ok(())
1301    }
1302}
1303
1304#[cfg(unix)]
1305fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1306    let gid = gid.as_raw();
1307    #[cfg(any(
1308        target_os = "macos",
1309        target_os = "ios",
1310        target_os = "tvos",
1311        target_os = "watchos"
1312    ))]
1313    let base_gid = i32::try_from(gid)
1314        .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1315
1316    #[cfg(not(any(
1317        target_os = "macos",
1318        target_os = "ios",
1319        target_os = "tvos",
1320        target_os = "watchos"
1321    )))]
1322    let base_gid = gid as libc::gid_t;
1323
1324    // SAFETY: `username` is a valid nul-terminated C string and `base_gid`
1325    // is derived from a resolved system account or sudo-provided gid.
1326    let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1327    if rc == -1 {
1328        Err(std::io::Error::last_os_error())
1329    } else {
1330        Ok(())
1331    }
1332}
1333
1334#[cfg(unix)]
1335fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1336    std::io::Error::from_raw_os_error(err as i32)
1337}
1338
1339/// Check if multiple ports are available and optionally auto-bump to find available ports.
1340///
1341/// All ports are bumped by the same offset to maintain relative port spacing.
1342/// Returns the resolved ports (either the original or bumped ones).
1343/// Returns an error if any port is in use and auto_bump is disabled,
1344/// or if no available ports can be found after max attempts.
1345async fn check_ports_available(
1346    expected_ports: &[u16],
1347    auto_bump: bool,
1348    max_attempts: u32,
1349) -> Result<Vec<u16>> {
1350    if expected_ports.is_empty() {
1351        return Ok(Vec::new());
1352    }
1353
1354    for bump_offset in 0..=max_attempts {
1355        // Use wrapping_add to handle overflow correctly - ports wrap around at 65535
1356        let candidate_ports: Vec<u16> = expected_ports
1357            .iter()
1358            .map(|&p| p.wrapping_add(bump_offset as u16))
1359            .collect();
1360
1361        // Check if all ports in this set are available
1362        let mut all_available = true;
1363        let mut conflicting_port = None;
1364
1365        for &port in &candidate_ports {
1366            // Port 0 is a special case - it requests an ephemeral port from the OS.
1367            // Skip the availability check for port 0 since binding to it always succeeds.
1368            if port == 0 {
1369                continue;
1370            }
1371
1372            // Use spawn_blocking to avoid blocking the async runtime during TCP bind checks.
1373            //
1374            // We check multiple addresses to avoid false-negatives caused by SO_REUSEADDR.
1375            // On macOS/BSD, Rust's TcpListener::bind sets SO_REUSEADDR by default, which
1376            // allows binding 0.0.0.0:port even when 127.0.0.1:port is already in use
1377            // (because 0.0.0.0 is technically a different address).  Most daemons bind
1378            // to localhost, so checking 127.0.0.1 is essential to detect real conflicts.
1379            // We also check [::1] to cover IPv6 loopback listeners.
1380            //
1381            // NOTE: This check has a time-of-check-to-time-of-use (TOCTOU) race condition.
1382            // Another process could grab the port between our check and the daemon actually
1383            // binding. This is inherent to the approach and acceptable for our use case
1384            // since we're primarily detecting conflicts with already-running daemons.
1385            if is_port_in_use(port).await {
1386                all_available = false;
1387                conflicting_port = Some(port);
1388                break;
1389            }
1390        }
1391
1392        if all_available {
1393            // Check for overflow (port wrapped around to 0 due to wrapping_add)
1394            // If any candidate port is 0 but the original expected port wasn't 0,
1395            // it means we've wrapped around and should stop
1396            if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1397                return Err(PortError::NoAvailablePort {
1398                    start_port: expected_ports[0],
1399                    attempts: bump_offset + 1,
1400                }
1401                .into());
1402            }
1403            if bump_offset > 0 {
1404                info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1405            }
1406            return Ok(candidate_ports);
1407        }
1408
1409        // Port is in use
1410        if bump_offset == 0 && !auto_bump {
1411            if let Some(port) = conflicting_port {
1412                let (pid, process) = identify_port_owner(port).await;
1413                return Err(PortError::InUse { port, process, pid }.into());
1414            }
1415        }
1416    }
1417
1418    // No available ports found after max attempts
1419    Err(PortError::NoAvailablePort {
1420        start_port: expected_ports[0],
1421        attempts: max_attempts + 1,
1422    }
1423    .into())
1424}
1425
1426/// Check whether a port is currently in use by attempting to bind on multiple addresses.
1427///
1428/// Returns `true` when at least one bind attempt gets `AddrInUse`, meaning another
1429/// process is listening.  Other errors (e.g. `AddrNotAvailable` on an address family
1430/// the OS doesn't support) are ignored so they don't produce false positives.
1431async fn is_port_in_use(port: u16) -> bool {
1432    tokio::task::spawn_blocking(move || {
1433        for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1434            match std::net::TcpListener::bind((addr, port)) {
1435                Ok(listener) => drop(listener),
1436                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1437                Err(_) => continue,
1438            }
1439        }
1440        false
1441    })
1442    .await
1443    .unwrap_or(false)
1444}
1445
1446/// Best-effort lookup of the process occupying a port via `listeners::get_all()`.
1447///
1448/// Returns `(pid, process_name)`.  Falls back to `(0, "unknown")` when the
1449/// system call fails (permission error, unsupported OS, etc.).
1450async fn identify_port_owner(port: u16) -> (u32, String) {
1451    tokio::task::spawn_blocking(move || {
1452        listeners::get_all()
1453            .ok()
1454            .and_then(|list| {
1455                list.into_iter()
1456                    .find(|l| l.socket.port() == port)
1457                    .map(|l| (l.process.pid, l.process.name))
1458            })
1459            .unwrap_or((0, "unknown".to_string()))
1460    })
1461    .await
1462    .unwrap_or((0, "unknown".to_string()))
1463}
1464
1465/// Detect whether a port is in use, and if so, identify the owning process.
1466///
1467/// Combines `is_port_in_use` (reliable bind probe) with `identify_port_owner`
1468/// (best-effort process lookup).  Returns `None` when the port is free.
1469async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1470    if !is_port_in_use(port).await {
1471        return None;
1472    }
1473    Some(identify_port_owner(port).await)
1474}
1475
1476/// Spawn a background task that detects the first port the daemon process is listening on
1477/// and stores it in the state file as `active_port`.
1478///
1479/// This is called once when the daemon becomes ready. The port is cleared when the daemon stops.
1480///
1481/// Port selection strategy:
1482/// 1. If the daemon has `expected_port` configured, prefer the first port from that list
1483///    (it is the port the operator explicitly designated as the primary service port).
1484/// 2. Otherwise, take the first port the process is actually listening on (in the order
1485///    returned by the OS), which is typically the port bound earliest.
1486///
1487/// Using `min()` (lowest port number) was previously used here but is incorrect: many
1488/// applications listen on multiple ports (e.g. HTTP + metrics) and the lowest-numbered
1489/// port is not necessarily the primary service port.
1490fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1491    tokio::spawn(async move {
1492        // Retry with exponential backoff so that slow-starting daemons (JVM,
1493        // Node.js, Python, etc.) that take more than 500 ms to bind their port
1494        // are still detected.  Total wait budget: 500+1000+2000+4000 = 7.5 s.
1495        for delay_ms in [500u64, 1000, 2000, 4000] {
1496            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1497
1498            // Read daemon state atomically: check if still alive and get expected_port
1499            // in a single lock acquisition to avoid TOCTOU and unnecessary lock overhead.
1500            let expected_port: Option<u16> = {
1501                let state_file = SUPERVISOR.state_file.lock().await;
1502                match state_file.daemons.get(&id) {
1503                    Some(d) if d.pid.is_none() => {
1504                        debug!("daemon {id}: aborting active_port detection — process exited");
1505                        return;
1506                    }
1507                    Some(d) => d
1508                        .port
1509                        .as_ref()
1510                        .and_then(|p| p.expect.first().copied())
1511                        .filter(|&p| p > 0),
1512                    None => None,
1513                }
1514            };
1515
1516            let active_port = tokio::task::spawn_blocking(move || {
1517                let listeners = listeners::get_all().ok()?;
1518
1519                // Refresh process tree so all_children sees current descendants.
1520                PROCS.refresh_processes();
1521
1522                let descendant_pids: std::collections::HashSet<u32> = PROCS
1523                    .all_children(pid)
1524                    .into_iter()
1525                    .chain(std::iter::once(pid))
1526                    .collect();
1527
1528                let process_ports: Vec<u16> = listeners
1529                    .into_iter()
1530                    .filter(|listener| descendant_pids.contains(&listener.process.pid))
1531                    .map(|listener| listener.socket.port())
1532                    .filter(|&port| port > 0)
1533                    .collect();
1534
1535                if process_ports.is_empty() {
1536                    return None;
1537                }
1538
1539                // Prefer the configured expected_port if the process is actually
1540                // listening on it; otherwise fall back to the first port found.
1541                if let Some(ep) = expected_port {
1542                    if process_ports.contains(&ep) {
1543                        return Some(ep);
1544                    }
1545                }
1546
1547                // No expected_port match — return the first port in the list.
1548                // The list order reflects the order the OS reports listeners,
1549                // which is generally the order they were bound (earliest first).
1550                // Do NOT sort: the lowest-numbered port is not necessarily the
1551                // primary service port (e.g. HTTP vs metrics).
1552                process_ports.into_iter().next()
1553            })
1554            .await
1555            .ok()
1556            .flatten();
1557
1558            if let Some(port) = active_port {
1559                debug!("daemon {id} active_port detected: {port}");
1560                let mut state_file = SUPERVISOR.state_file.lock().await;
1561                if let Some(d) = state_file.daemons.get(&id) {
1562                    // Guard against PID reuse: if the original process exited and the OS
1563                    // assigned the same PID to an unrelated process that happens to bind
1564                    // a port, we must not route proxy traffic to that unrelated service.
1565                    if d.pid == Some(pid) {
1566                        state_file.set_active_port(&id, port);
1567                    } else {
1568                        debug!(
1569                            "daemon {id}: skipping active_port write — PID mismatch \
1570                             (expected {pid}, current {:?})",
1571                            d.pid
1572                        );
1573                        return;
1574                    }
1575                }
1576                return;
1577            }
1578
1579            debug!(
1580                "daemon {id}: no active port detected for pid {pid} or its descendants (will retry)"
1581            );
1582        }
1583
1584        debug!(
1585            "daemon {id}: active port detection exhausted all retries for pid {pid} and its descendants"
1586        );
1587    });
1588}
1589
1590/// Check whether a daemon (by its qualified ID) is the target of any registered
1591/// slug in the global config.  This is used to decide whether to run the
1592/// `detect_and_store_active_port` polling task — only slug-targeted daemons need
1593/// it, avoiding wasted `listeners::get_all()` calls for port-less daemons.
1594///
1595/// Delegates to `proxy::server::is_slug_target()` which uses the same in-memory
1596/// slug cache as the proxy hot path, so this check is cheap.
1597fn is_daemon_slug_target(id: &DaemonId) -> bool {
1598    // read_global_slugs is called once per daemon start — acceptable cost.
1599    // We intentionally avoid making this async to keep has_port_config evaluation
1600    // simple and synchronous in run_once().
1601    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1602    slugs.iter().any(|(slug, entry)| {
1603        let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1604        id.name() == daemon_name
1605    })
1606}
1607
1608#[cfg(all(test, unix))]
1609mod tests {
1610    use super::*;
1611
1612    #[test]
1613    fn test_resolve_run_identity_empty_without_sudo() {
1614        let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1615        assert_eq!(identity, RunIdentity::Inherit);
1616    }
1617
1618    #[test]
1619    fn test_resolve_run_identity_sudo_fallback() {
1620        let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1621        let RunIdentity::Switch { uid, gid, .. } = identity else {
1622            panic!("expected identity switch");
1623        };
1624        assert_eq!(uid.as_raw(), 501);
1625        assert_eq!(gid.as_raw(), 20);
1626    }
1627
1628    #[test]
1629    fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1630        let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1631        assert_eq!(identity, RunIdentity::Inherit);
1632    }
1633
1634    #[test]
1635    fn test_resolve_configured_user_root_name() {
1636        let identity = resolve_configured_user("root").unwrap();
1637        let RunIdentity::Switch { uid, username, .. } = identity else {
1638            panic!("expected identity switch");
1639        };
1640        assert_eq!(uid.as_raw(), 0);
1641        assert_eq!(
1642            username.as_deref().and_then(|s| s.to_str().ok()),
1643            Some("root")
1644        );
1645    }
1646
1647    #[test]
1648    fn test_resolve_configured_user_root_uid() {
1649        let identity = resolve_configured_user("0").unwrap();
1650        let RunIdentity::Switch { uid, username, .. } = identity else {
1651            panic!("expected identity switch");
1652        };
1653        assert_eq!(uid.as_raw(), 0);
1654        assert_eq!(
1655            username.as_deref().and_then(|s| s.to_str().ok()),
1656            Some("root")
1657        );
1658    }
1659
1660    #[test]
1661    fn test_resolve_configured_user_missing_user_fails() {
1662        let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1663            .unwrap_err()
1664            .to_string();
1665        assert!(err.contains("does not exist"));
1666    }
1667
1668    #[test]
1669    fn test_resolve_run_identity_requires_root_for_user_switch() {
1670        let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1671            .unwrap_err()
1672            .to_string();
1673        assert!(err.contains("Restart the supervisor with sudo"));
1674    }
1675
1676    #[test]
1677    fn test_resolve_run_identity_same_user_is_noop() {
1678        let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1679        assert_eq!(identity, RunIdentity::Inherit);
1680    }
1681}
1682
1683/// Inject proxy-related environment variables into a daemon's command.
1684///
1685/// Adds:
1686/// - `HOST` — the address the daemon should bind to (`127.0.0.1`, omitted in LAN mode)
1687/// - `PITCHFORK_URL` — the public proxy URL for this daemon (if it has a slug)
1688/// - `NODE_EXTRA_CA_CERTS` — path to the pitchfork CA cert (if HTTPS enabled)
1689/// - `__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS` — `.<tld>` for Vite host allowlisting
1690/// - `PITCHFORK_LAN` — set to `"1"` when LAN mode is active
1691fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1692    let s = crate::settings::settings();
1693    let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1694
1695    if should_force_loopback_host(slug) && !lan_enabled {
1696        // Only force loopback binding for daemons that are actually routed via a slug.
1697        // In LAN mode, daemons need to bind to 0.0.0.0 to be reachable from the network.
1698        cmd.env("HOST", "127.0.0.1");
1699    }
1700
1701    // PITCHFORK_URL: the daemon's public proxy URL (only if it has a slug and proxy is enabled)
1702    if let Some(url) = build_pitchfork_url(slug, &s) {
1703        cmd.env("PITCHFORK_URL", &url);
1704    }
1705
1706    // NODE_EXTRA_CA_CERTS: let Node.js backends trust the pitchfork CA
1707    if s.proxy.enable && s.proxy.https {
1708        let ca_path = if s.proxy.tls_cert.is_empty() {
1709            crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1710        } else {
1711            std::path::PathBuf::from(&s.proxy.tls_cert)
1712        };
1713        if ca_path.exists() {
1714            cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1715        }
1716    }
1717
1718    // __VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS: Vite host allowlisting
1719    if s.proxy.enable {
1720        let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1721        cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1722    }
1723
1724    // PITCHFORK_LAN: signal to daemons that LAN mode is active
1725    if lan_enabled {
1726        cmd.env("PITCHFORK_LAN", "1");
1727    }
1728}
1729
1730fn should_force_loopback_host(slug: &Option<String>) -> bool {
1731    let Some(slug) = slug.as_deref() else {
1732        return false;
1733    };
1734
1735    let s = crate::settings::settings();
1736    if !s.proxy.enable {
1737        return false;
1738    }
1739
1740    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1741    slugs.contains_key(slug)
1742}
1743
1744/// Compute the public proxy URL for a daemon.
1745///
1746/// Returns `None` if the daemon has no slug or the proxy is not enabled.
1747fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1748    let slug = slug.as_ref()?;
1749    if !s.proxy.enable {
1750        return None;
1751    }
1752    let scheme = if s.proxy.https { "https" } else { "http" };
1753    let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1754    let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1755        String::new()
1756    } else {
1757        format!(":{port}")
1758    };
1759    let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1760    let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1761    Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1762}