Skip to main content

pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
33static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39    Inherit,
40    Switch {
41        uid: nix::unistd::Uid,
42        gid: nix::unistd::Gid,
43        username: Option<CString>,
44    },
45}
46
47/// Get or compile a regex pattern, caching the result for future use
48pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50    if let Some(re) = cache.get(pattern) {
51        return Some(re.clone());
52    }
53    match Regex::new(pattern) {
54        Ok(re) => {
55            cache.insert(pattern.to_string(), re.clone());
56            Some(re)
57        }
58        Err(e) => {
59            error!("invalid regex pattern '{pattern}': {e}");
60            None
61        }
62    }
63}
64
65impl Supervisor {
66    /// Run a daemon, handling retries if configured
67    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68        let id = &opts.id;
69        let cmd = opts.cmd.clone();
70
71        // Clear any pending autostop for this daemon since it's being started
72        {
73            let mut pending = self.pending_autostops.lock().await;
74            if pending.remove(id).is_some() {
75                info!("cleared pending autostop for {id} (daemon starting)");
76            }
77        }
78
79        let daemon = self.get_daemon(id).await;
80        if let Some(daemon) = daemon {
81            // Stopping state is treated as "not running" - the monitoring task will clean it up
82            // Only check for Running state with a valid PID
83            if !daemon.status.is_stopping()
84                && !daemon.status.is_stopped()
85                && let Some(pid) = daemon.pid
86            {
87                if opts.force {
88                    self.stop(id).await?;
89                    info!("run: stop completed for daemon {id}");
90                } else {
91                    warn!("daemon {id} already running with pid {pid}");
92                    return Ok(IpcResponse::DaemonAlreadyRunning);
93                }
94            }
95        }
96
97        // If wait_ready is true and retry is configured, implement retry loop
98        if opts.wait_ready && opts.retry.count() > 0 {
99            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
100            let max_attempts = opts.retry.count().saturating_add(1);
101            for attempt in 0..max_attempts {
102                let mut retry_opts = opts.clone();
103                retry_opts.retry_count = attempt;
104                retry_opts.cmd = cmd.clone();
105
106                let result = self.run_once(retry_opts).await?;
107
108                match result {
109                    IpcResponse::DaemonReady { daemon } => {
110                        return Ok(IpcResponse::DaemonReady { daemon });
111                    }
112                    IpcResponse::DaemonFailedWithCode { exit_code } => {
113                        if attempt < opts.retry.count() {
114                            let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115                            info!(
116                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
117                                attempt + 1,
118                                max_attempts,
119                                backoff_secs
120                            );
121                            fire_hook(
122                                HookType::OnRetry,
123                                id.clone(),
124                                opts.dir.0.clone(),
125                                attempt + 1,
126                                opts.env.clone(),
127                                vec![],
128                            )
129                            .await;
130                            time::sleep(Duration::from_secs(backoff_secs)).await;
131                            continue;
132                        } else {
133                            info!("daemon {id} failed after {max_attempts} attempts");
134                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135                        }
136                    }
137                    other => return Ok(other),
138                }
139            }
140        }
141
142        // No retry or wait_ready is false
143        self.run_once(opts).await
144    }
145
146    /// Run a daemon once (single attempt)
147    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148        let id = &opts.id;
149        let original_cmd = opts.cmd.clone(); // Save original command for persistence
150        let cmd = opts.cmd;
151
152        // Create channel for readiness notification if wait_ready is true
153        let (ready_tx, ready_rx) = if opts.wait_ready {
154            let (tx, rx) = oneshot::channel();
155            (Some(tx), Some(rx))
156        } else {
157            (None, None)
158        };
159
160        // Check port availability and apply auto-bump if configured
161        let expected_ports = opts
162            .port
163            .as_ref()
164            .map(|p| p.expect.clone())
165            .unwrap_or_default();
166        let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
167            let port_cfg = opts.port.as_ref().unwrap();
168            match check_ports_available(
169                &expected_ports,
170                port_cfg.auto_bump(),
171                port_cfg.max_bump_attempts(),
172            )
173            .await
174            {
175                Ok(resolved) => {
176                    let ready_port = if let Some(configured_port) = opts.ready_port {
177                        // If ready_port matches one of the expected ports, apply the same bump offset
178                        let bump_offset = resolved
179                            .first()
180                            .unwrap_or(&0)
181                            .saturating_sub(*expected_ports.first().unwrap_or(&0));
182                        if expected_ports.contains(&configured_port) && bump_offset > 0 {
183                            configured_port
184                                .checked_add(bump_offset)
185                                .or(Some(configured_port))
186                        } else {
187                            Some(configured_port)
188                        }
189                    } else if opts.ready_output.is_none()
190                        && opts.ready_http.is_none()
191                        && opts.ready_cmd.is_none()
192                        && opts.ready_delay.is_none()
193                    {
194                        // No other ready check configured — use the first expected port as a
195                        // TCP port readiness check so the daemon is considered ready once it
196                        // starts listening.  Skip port 0 (ephemeral port request).
197                        resolved.first().copied().filter(|&p| p != 0)
198                    } else {
199                        // Another ready check is configured (output/http/cmd/delay).
200                        // Don't add an implicit TCP port check — it could race and fire
201                        // before the daemon has produced any output.
202                        None
203                    };
204                    info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
205                    (resolved, ready_port)
206                }
207                Err(e) => {
208                    error!("daemon {id}: port check failed: {e}");
209                    // Convert PortError to structured IPC response
210                    if let Some(port_error) = e.downcast_ref::<PortError>() {
211                        match port_error {
212                            PortError::InUse { port, process, pid } => {
213                                return Ok(IpcResponse::PortConflict {
214                                    port: *port,
215                                    process: process.clone(),
216                                    pid: *pid,
217                                });
218                            }
219                            PortError::NoAvailablePort {
220                                start_port,
221                                attempts,
222                            } => {
223                                return Ok(IpcResponse::NoAvailablePort {
224                                    start_port: *start_port,
225                                    attempts: *attempts,
226                                });
227                            }
228                        }
229                    }
230                    return Ok(IpcResponse::DaemonFailed {
231                        error: e.to_string(),
232                    });
233                }
234            }
235        } else {
236            // When ready_port is set without expected_port, check that the port
237            // is not already occupied.  If another process is listening on it,
238            // the TCP readiness probe would immediately succeed and pitchfork
239            // would falsely consider the daemon ready — routing proxy traffic to
240            // the wrong process.
241            if let Some(port) = opts.ready_port {
242                if port > 0 {
243                    if let Some((pid, process)) = detect_port_conflict(port).await {
244                        return Ok(IpcResponse::PortConflict { port, process, pid });
245                    }
246                }
247            }
248            (Vec::new(), opts.ready_port)
249        };
250
251        let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
252            match settings().resolve_mise_bin() {
253                Some(mise_bin) => {
254                    let mise_bin_str = mise_bin.to_string_lossy().to_string();
255                    info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
256                    once("exec".to_string())
257                        .chain(once(mise_bin_str))
258                        .chain(once("x".to_string()))
259                        .chain(once("--".to_string()))
260                        .chain(cmd)
261                        .collect_vec()
262                }
263                None => {
264                    warn!("daemon {id}: mise=true but mise binary not found, running without mise");
265                    once("exec".to_string()).chain(cmd).collect_vec()
266                }
267            }
268        } else {
269            once("exec".to_string()).chain(cmd).collect_vec()
270        };
271        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
272        let log_path = id.log_path();
273        if let Some(parent) = log_path.parent() {
274            xx::file::mkdirp(parent)?;
275        }
276        #[cfg(unix)]
277        let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
278            Ok(identity) => identity,
279            Err(e) => {
280                return Ok(IpcResponse::DaemonFailed {
281                    error: e.to_string(),
282                });
283            }
284        };
285        info!("run: spawning daemon {id} with args: {args:?}");
286
287        // Allocate PTY if configured
288        #[cfg(unix)]
289        let pty_pair = if opts.pty.unwrap_or(false) {
290            match super::pty::openpty() {
291                Ok(pair) => {
292                    info!("daemon {id}: allocated PTY (pty = true)");
293                    Some(pair)
294                }
295                Err(e) => {
296                    warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
297                    None
298                }
299            }
300        } else {
301            None
302        };
303
304        let mut cmd = tokio::process::Command::new("sh");
305
306        #[cfg(unix)]
307        if let Some(ref pair) = pty_pair {
308            // PTY mode: connect both stdout and stderr to the slave PTY.
309            // The child uses the slave for stdin/stdout/stderr, and we read
310            // output from the master.
311            let slave_file = std::fs::File::from(
312                pair.slave
313                    .try_clone()
314                    .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
315            );
316            cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
317                |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
318            )?));
319            cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
320                |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
321            )?));
322            cmd.stderr(std::process::Stdio::from(slave_file));
323        } else {
324            cmd.stdout(std::process::Stdio::piped())
325                .stderr(std::process::Stdio::piped());
326        }
327
328        #[cfg(not(unix))]
329        {
330            cmd.stdout(std::process::Stdio::piped())
331                .stderr(std::process::Stdio::piped());
332        }
333
334        cmd.args(&args).current_dir(&opts.dir);
335
336        #[cfg(unix)]
337        if pty_pair.is_none() {
338            cmd.stdin(std::process::Stdio::null());
339        }
340
341        #[cfg(not(unix))]
342        cmd.stdin(std::process::Stdio::null());
343
344        // Ensure daemon can find user tools by using the original PATH
345        if let Some(ref path) = *env::ORIGINAL_PATH {
346            cmd.env("PATH", path);
347        }
348
349        // Apply custom environment variables from config
350        if let Some(ref env_vars) = opts.env {
351            cmd.envs(env_vars);
352        }
353
354        // Inject pitchfork metadata env vars AFTER user env so they can't be overwritten
355        cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
356        cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
357        cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
358
359        // Inject the resolved ports for the daemon to use
360        if !resolved_ports.is_empty() {
361            // Set PORT to the first port for backward compatibility
362            // When there's only one port, both PORT and PORT0 will be set to the same value.
363            // This follows the convention used by many deployment platforms (Heroku, etc.).
364            cmd.env("PORT", resolved_ports[0].to_string());
365            // Set individual ports as PORT0, PORT1, etc.
366            for (i, port) in resolved_ports.iter().enumerate() {
367                cmd.env(format!("PORT{i}"), port.to_string());
368            }
369        }
370
371        // Inject proxy-related environment variables
372        inject_proxy_env(&mut cmd, &opts.slug);
373
374        #[cfg(unix)]
375        {
376            let run_identity = run_identity.clone();
377            let use_pty = pty_pair.is_some();
378            unsafe {
379                cmd.pre_exec(move || {
380                    nix::unistd::setsid().map_err(nix_to_io_error)?;
381
382                    // When using a PTY, set the slave as the controlling terminal.
383                    // The slave FD has already been dup'd onto stdin/stdout/stderr
384                    // by tokio, so we can use stdin (fd 0) for TIOCSCTTY.
385                    if use_pty {
386                        let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
387                        if ret < 0 {
388                            // Non-fatal: the process can still run without
389                            // a controlling terminal.
390                            #[cfg(target_os = "linux")]
391                            eprintln!(
392                                "pitchfork: TIOCSCTTY failed: {}",
393                                std::io::Error::last_os_error()
394                            );
395                        }
396                    }
397
398                    apply_run_identity(&run_identity)?;
399                    Ok(())
400                });
401            }
402        }
403
404        let mut child = cmd.spawn().into_diagnostic()?;
405        let pid = match child.id() {
406            Some(p) => p,
407            None => {
408                warn!("Daemon {id} exited before PID could be captured");
409                return Ok(IpcResponse::DaemonFailed {
410                    error: "Process exited immediately".to_string(),
411                });
412            }
413        };
414        info!("started daemon {id} with pid {pid}");
415        PROCS.refresh_pids(&[pid]);
416        let daemon = self
417            .upsert_daemon(
418                UpsertDaemonOpts::builder(id.clone())
419                    .set(|o| {
420                        o.pid = Some(pid);
421                        o.status = DaemonStatus::Running;
422                        o.shell_pid = opts.shell_pid;
423                        o.dir = Some(opts.dir.0.clone());
424                        o.cmd = Some(original_cmd);
425                        o.autostop = opts.autostop;
426                        o.cron_schedule = opts.cron_schedule.clone();
427                        o.cron_retrigger = opts.cron_retrigger;
428                        o.retry = Some(opts.retry);
429                        o.retry_count = Some(opts.retry_count);
430                        o.ready_delay = opts.ready_delay;
431                        o.ready_output = opts.ready_output.clone();
432                        o.ready_http = opts.ready_http.clone();
433                        o.ready_port = effective_ready_port;
434                        o.ready_cmd = opts.ready_cmd.clone();
435                        o.port = crate::config_types::PortConfig::from_parts(
436                            expected_ports,
437                            opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
438                        );
439                        o.resolved_port = resolved_ports;
440                        o.depends = Some(opts.depends.clone());
441                        o.env = opts.env.clone();
442                        o.watch = Some(opts.watch.clone());
443                        o.watch_mode = Some(opts.watch_mode);
444                        o.watch_base_dir = opts.watch_base_dir.clone();
445                        o.mise = opts.mise;
446                        o.user = opts.user.clone();
447                        o.memory_limit = opts.memory_limit;
448                        o.cpu_limit = opts.cpu_limit;
449                        o.stop_signal = opts.stop_signal;
450                        o.pty = opts.pty;
451                    })
452                    .build(),
453            )
454            .await?;
455
456        let id_clone = id.clone();
457        let ready_delay = opts.ready_delay;
458        let ready_output = opts.ready_output.clone();
459        let ready_http = opts.ready_http.clone();
460        let ready_port = effective_ready_port;
461        let ready_cmd = opts.ready_cmd.clone();
462        let daemon_dir = opts.dir.0.clone();
463        let hook_retry_count = opts.retry_count;
464        let hook_retry = opts.retry;
465        let hook_daemon_env = opts.env.clone();
466        let on_output_hook = opts.on_output_hook.clone();
467        // Whether this daemon has any port-related config — used to skip the
468        // active_port detection task for daemons that never bind a port (e.g. `sleep 60`).
469        // When the proxy is enabled, only detect active_port for daemons that are
470        // actually referenced by a registered slug, rather than blanket-polling every
471        // daemon (which wastes ~7.5 s of listeners::get_all() calls per port-less daemon).
472        let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
473            || (settings().proxy.enable && is_daemon_slug_target(id));
474        let daemon_pid = pid;
475
476        // Prepare output readers before spawning the monitoring task.
477        // In PTY mode, we read from the PTY master FD.
478        // In pipe mode, we read from separate stdout/stderr pipes.
479        #[cfg(unix)]
480        let pty_reader = pty_pair.map(|p| {
481            tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
482                .lines()
483        });
484        #[cfg(not(unix))]
485        let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
486        let stdout_reader = if pty_reader.is_none() {
487            child
488                .stdout
489                .take()
490                .map(|s| tokio::io::BufReader::new(s).lines())
491        } else {
492            None
493        };
494        let stderr_reader = if pty_reader.is_none() {
495            child
496                .stderr
497                .take()
498                .map(|s| tokio::io::BufReader::new(s).lines())
499        } else {
500            None
501        };
502
503        if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
504            error!("Failed to capture stdout/stderr for daemon {id}");
505        }
506
507        tokio::spawn(async move {
508            let id = id_clone;
509
510            // Merge all output sources (PTY master OR stdout+stderr) into a single channel.
511            let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
512
513            if let Some(mut reader) = pty_reader {
514                // PTY mode: single merged stream from the master.
515                // output_tx is moved into the spawn; when the reader ends the
516                // channel closes automatically.
517                tokio::spawn(async move {
518                    while let Ok(Some(mut line)) = reader.next_line().await {
519                        // PTY slave uses ONLCR: \n → \r\n; strip the trailing \r.
520                        if line.ends_with('\r') {
521                            line.pop();
522                        }
523                        if output_tx.send(line).await.is_err() {
524                            break;
525                        }
526                    }
527                });
528            } else {
529                // Pipe mode: stdout and stderr are merged into the same channel.
530                // Both `ready_output` and `on_output_hook` patterns match against
531                // lines from either stream, which is the expected behavior (a
532                // "server ready" message may appear on stderr in some tools).
533                if let Some(mut stdout) = stdout_reader {
534                    let tx = output_tx.clone();
535                    tokio::spawn(async move {
536                        while let Ok(Some(line)) = stdout.next_line().await {
537                            if tx.send(line).await.is_err() {
538                                break;
539                            }
540                        }
541                    });
542                }
543                if let Some(mut stderr) = stderr_reader {
544                    let tx = output_tx.clone();
545                    tokio::spawn(async move {
546                        while let Ok(Some(line)) = stderr.next_line().await {
547                            if tx.send(line).await.is_err() {
548                                break;
549                            }
550                        }
551                    });
552                }
553                // Drop the last sender so the channel closes when all readers finish.
554                drop(output_tx);
555            }
556            let log_file = match tokio::fs::File::options()
557                .append(true)
558                .create(true)
559                .open(&log_path)
560                .await
561            {
562                Ok(f) => f,
563                Err(e) => {
564                    error!("Failed to open log file for daemon {id}: {e}");
565                    return;
566                }
567            };
568            let mut log_appender = BufWriter::new(log_file);
569
570            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
571            let format_line = |line: String| {
572                let line_for_log = line;
573                if line_for_log.starts_with(&format!("{id} ")) {
574                    // mise tasks often already have the id printed
575                    format!("{} {line_for_log}\n", now())
576                } else {
577                    format!("{} {id} {line_for_log}\n", now())
578                }
579            };
580
581            // Setup readiness checking
582            let mut ready_notified = false;
583            let mut ready_tx = ready_tx;
584            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
585            // Track whether we've already spawned the active_port detection task
586            let mut active_port_spawned = false;
587
588            // Validate on_output config early; discard the hook on any error so
589            // a bad regex does not silently fall through to the (None, None) => true
590            // match arm and fire on every line.
591            let on_output_hook = match on_output_hook {
592                Some(ref hook) => match hook.validate(id.name()) {
593                    Ok(()) => on_output_hook,
594                    Err(e) => {
595                        error!("{e}");
596                        None
597                    }
598                },
599                None => None,
600            };
601
602            // Compile the regex pattern after validation so we only attempt this
603            // when the hook is known-good (validate() already checked the syntax).
604            let on_output_pattern: Option<regex::Regex> = on_output_hook
605                .as_ref()
606                .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
607            let on_output_debounce = on_output_hook
608                .as_ref()
609                .map(|h| h.debounce_duration())
610                .unwrap_or(Duration::from_millis(1000));
611            // Last time the on_output hook fired; None means it has never fired.
612            let mut on_output_last_fired: Option<std::time::Instant> = None;
613
614            let mut delay_timer =
615                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
616
617            // Get settings for intervals
618            let s = settings();
619            let ready_check_interval = s.supervisor_ready_check_interval();
620            let http_client_timeout = s.supervisor_http_client_timeout();
621            let log_flush_interval_duration = s.supervisor_log_flush_interval();
622
623            // Setup HTTP readiness check interval
624            let mut http_check_interval = ready_http
625                .as_ref()
626                .map(|_| tokio::time::interval(ready_check_interval));
627            let http_client = ready_http.as_ref().map(|_| {
628                reqwest::Client::builder()
629                    .timeout(http_client_timeout)
630                    .build()
631                    .unwrap_or_default()
632            });
633
634            // Setup TCP port readiness check interval
635            let mut port_check_interval =
636                ready_port.map(|_| tokio::time::interval(ready_check_interval));
637
638            // Setup command readiness check interval
639            let mut cmd_check_interval = ready_cmd
640                .as_ref()
641                .map(|_| tokio::time::interval(ready_check_interval));
642
643            // Setup periodic log flush interval
644            let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
645
646            // Use a channel to communicate process exit status
647            let (exit_tx, mut exit_rx) =
648                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
649
650            // Spawn a task to wait for process exit
651            let child_pid = child.id().unwrap_or(0);
652            tokio::spawn(async move {
653                let result = child.wait().await;
654                // On non-Linux Unix (e.g. macOS) the zombie reaper may win the
655                // race and consume the exit status via waitpid(None, WNOHANG)
656                // before Tokio's child.wait() gets to it. When that happens,
657                // Tokio returns an ECHILD io::Error. We recover by checking
658                // REAPED_STATUSES for the stashed exit code.
659                //
660                // On Linux this is unnecessary because the reaper uses
661                // waitid(WNOWAIT) to peek before reaping, which avoids the
662                // race entirely.
663                #[cfg(all(unix, not(target_os = "linux")))]
664                let result = match &result {
665                    Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
666                        if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
667                            warn!(
668                                "daemon pid {child_pid} wait() got ECHILD; \
669                                 recovered exit code {code} from zombie reaper"
670                            );
671                            // Synthesize an ExitStatus from the stashed code.
672                            // On Unix we can use `ExitStatus::from_raw()` with
673                            // a wait-style status word (code << 8 for normal
674                            // exit, or raw signal number for signal death).
675                            use std::os::unix::process::ExitStatusExt;
676                            if code >= 0 {
677                                Ok(std::process::ExitStatus::from_raw(code << 8))
678                            } else {
679                                // Negative code means killed by signal (-sig)
680                                Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
681                            }
682                        } else {
683                            warn!(
684                                "daemon pid {child_pid} wait() got ECHILD but no \
685                                 stashed status found; reporting as error"
686                            );
687                            result
688                        }
689                    }
690                    _ => result,
691                };
692                debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
693                let _ = exit_tx.send(result).await;
694            });
695
696            #[allow(unused_assignments)]
697            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
698            let mut exit_status = None;
699
700            // If there is no ready check of any kind and no delay, the daemon is
701            // considered immediately ready and the active_port detection task would
702            // never be triggered inside the select loop.  Kick it off right away so
703            // that daemons without any readiness configuration still get their
704            // active_port populated (needed for proxy routing).
705            if has_port_config
706                && ready_pattern.is_none()
707                && ready_http.is_none()
708                && ready_port.is_none()
709                && ready_cmd.is_none()
710                && delay_timer.is_none()
711            {
712                active_port_spawned = true;
713                detect_and_store_active_port(id.clone(), daemon_pid);
714            }
715
716            loop {
717                select! {
718                    Some(line) = output_rx.recv() => {
719                        let formatted = format_line(line.clone());
720                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
721                            error!("Failed to write to log for daemon {id}: {e}");
722                        }
723                        trace!("output: {id} {formatted}");
724
725                        // Strip ANSI for pattern matching so user-written patterns
726                        // work regardless of whether the process emits color codes.
727                        let line_clean = console::strip_ansi_codes(&line).to_string();
728
729                        // Check if output matches ready pattern
730                        if !ready_notified
731                            && let Some(ref pattern) = ready_pattern
732                            && pattern.is_match(&line_clean)
733                        {
734                            info!("daemon {id} ready: output matched pattern");
735                            ready_notified = true;
736                            let _ = log_appender.flush().await;
737                            if let Some(tx) = ready_tx.take() {
738                                let _ = tx.send(Ok(()));
739                            }
740                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
741                            if !active_port_spawned && has_port_config {
742                                active_port_spawned = true;
743                                detect_and_store_active_port(id.clone(), daemon_pid);
744                            }
745                        }
746
747                        // Check on_output hook
748                        if let Some(ref hook) = on_output_hook {
749                            let matched = match (&hook.filter, &on_output_pattern) {
750                                (Some(substr), _) => line_clean.contains(substr.as_str()),
751                                (None, Some(re)) => re.is_match(&line_clean),
752                                (None, None) => true,
753                            };
754                            if matched {
755                                let now = std::time::Instant::now();
756                                let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
757                                if elapsed.is_none_or(|e| e >= on_output_debounce) {
758                                    on_output_last_fired = Some(now);
759                                    hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
760                                }
761                            }
762                        }
763                    }
764                    Some(result) = exit_rx.recv() => {
765                        // Process exited - save exit status and notify if not ready yet
766                        exit_status = Some(result);
767                        debug!("daemon {id} process exited, exit_status: {exit_status:?}");
768                        // Flush logs before notifying so clients see logs immediately
769                        let _ = log_appender.flush().await;
770                        if !ready_notified {
771                            if let Some(tx) = ready_tx.take() {
772                                // Check if process exited successfully
773                                let is_success = exit_status.as_ref()
774                                    .and_then(|r| r.as_ref().ok())
775                                    .map(|s| s.success())
776                                    .unwrap_or(false);
777
778                                if is_success {
779                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
780                                    let _ = tx.send(Ok(()));
781                                } else {
782                                    let exit_code = exit_status.as_ref()
783                                        .and_then(|r| r.as_ref().ok())
784                                        .and_then(|s| s.code());
785                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
786                                    let _ = tx.send(Err(exit_code));
787                                }
788                            }
789                        } else {
790                            debug!("daemon {id} was already marked ready, not sending notification");
791                        }
792                        break;
793                    },
794                    _ = async {
795                        if let Some(ref mut interval) = http_check_interval {
796                            interval.tick().await;
797                        } else {
798                            std::future::pending::<()>().await;
799                        }
800                    }, if !ready_notified && ready_http.is_some() => {
801                        if let (Some(http), Some(client)) = (&ready_http, &http_client) {
802                            match client.get(&http.url).send().await {
803                                Ok(response) if http.accepts_status(response.status().as_u16()) => {
804                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
805                                    ready_notified = true;
806                                    let _ = log_appender.flush().await;
807                                    if let Some(tx) = ready_tx.take() {
808                                        let _ = tx.send(Ok(()));
809                                    }
810                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
811                                    http_check_interval = None;
812                                    if !active_port_spawned && has_port_config {
813                                        active_port_spawned = true;
814                                        detect_and_store_active_port(id.clone(), daemon_pid);
815                                    }
816                                }
817                                Ok(response) => {
818                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
819                                }
820                                Err(e) => {
821                                    trace!("daemon {id} HTTP check failed: {e}");
822                                }
823                            }
824                        }
825                    }
826                    _ = async {
827                        if let Some(ref mut interval) = port_check_interval {
828                            interval.tick().await;
829                        } else {
830                            std::future::pending::<()>().await;
831                        }
832                    }, if !ready_notified && ready_port.is_some() => {
833                        if let Some(port) = ready_port {
834                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
835                                Ok(_) => {
836                                    info!("daemon {id} ready: TCP port {port} is listening");
837                                    ready_notified = true;
838                                    // Flush logs before notifying so clients see logs immediately
839                                    let _ = log_appender.flush().await;
840                                    if let Some(tx) = ready_tx.take() {
841                                        let _ = tx.send(Ok(()));
842                                    }
843                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
844                                    // Stop checking once ready
845                                    port_check_interval = None;
846                                    if !active_port_spawned && has_port_config {
847                                        active_port_spawned = true;
848                                        detect_and_store_active_port(id.clone(), daemon_pid);
849                                    }
850                                }
851                                Err(_) => {
852                                    trace!("daemon {id} port check: port {port} not listening yet");
853                                }
854                            }
855                        }
856                    }
857                    _ = async {
858                        if let Some(ref mut interval) = cmd_check_interval {
859                            interval.tick().await;
860                        } else {
861                            std::future::pending::<()>().await;
862                        }
863                    }, if !ready_notified && ready_cmd.is_some() => {
864                        if let Some(ref cmd) = ready_cmd {
865                            // Run the readiness check command using the shell abstraction
866                            let mut command = Shell::default_for_platform().command(cmd);
867                            command
868                                .current_dir(&daemon_dir)
869                                .stdout(std::process::Stdio::null())
870                                .stderr(std::process::Stdio::null());
871                            let result: std::io::Result<std::process::ExitStatus> = command.status().await;
872                            match result {
873                                Ok(status) if status.success() => {
874                                    info!("daemon {id} ready: readiness command succeeded");
875                                    ready_notified = true;
876                                    let _ = log_appender.flush().await;
877                                    if let Some(tx) = ready_tx.take() {
878                                        let _ = tx.send(Ok(()));
879                                    }
880                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
881                                    // Stop checking once ready
882                                    cmd_check_interval = None;
883                                    if !active_port_spawned && has_port_config {
884                                        active_port_spawned = true;
885                                        detect_and_store_active_port(id.clone(), daemon_pid);
886                                    }
887                                }
888                                Ok(_) => {
889                                    trace!("daemon {id} cmd check: command returned non-zero (not ready)");
890                                }
891                                Err(e) => {
892                                    trace!("daemon {id} cmd check failed: {e}");
893                                }
894                            }
895                        }
896                    }
897                    _ = async {
898                        if let Some(ref mut timer) = delay_timer {
899                            timer.await;
900                        } else {
901                            std::future::pending::<()>().await;
902                        }
903                    } => {
904                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
905                            info!("daemon {id} ready: delay elapsed");
906                            ready_notified = true;
907                            // Flush logs before notifying so clients see logs immediately
908                            let _ = log_appender.flush().await;
909                            if let Some(tx) = ready_tx.take() {
910                                let _ = tx.send(Ok(()));
911                            }
912                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
913                        }
914                        // Disable timer after it fires
915                        delay_timer = None;
916                        if !active_port_spawned && has_port_config {
917                            active_port_spawned = true;
918                            detect_and_store_active_port(id.clone(), daemon_pid);
919                        }
920                    }
921                    _ = log_flush_interval.tick() => {
922                        // Periodic flush to ensure logs are written to disk
923                        if let Err(e) = log_appender.flush().await {
924                            error!("Failed to flush log for daemon {id}: {e}");
925                        }
926                    }
927                }
928            }
929
930            // Final flush to ensure all buffered logs are written
931            if let Err(e) = log_appender.flush().await {
932                error!("Failed to final flush log for daemon {id}: {e}");
933            }
934
935            // Clear active_port since the process is no longer running
936            {
937                let mut state_file = SUPERVISOR.state_file.lock().await;
938                state_file.clear_active_port(&id);
939            }
940
941            // Get the final exit status
942            let exit_status = if let Some(status) = exit_status {
943                status
944            } else {
945                // Streams closed but process hasn't exited yet, wait for it
946                match exit_rx.recv().await {
947                    Some(status) => status,
948                    None => {
949                        warn!("daemon {id} exit channel closed without receiving status");
950                        Err(std::io::Error::other("exit channel closed"))
951                    }
952                }
953            };
954            let current_daemon = SUPERVISOR.get_daemon(&id).await;
955
956            // Signal that this monitoring task is processing its exit path.
957            // The RAII guard will decrement the counter and notify close()
958            // when the task finishes (including all fire_hook registrations),
959            // regardless of which return path is taken.
960            SUPERVISOR
961                .active_monitors
962                .fetch_add(1, atomic::Ordering::Release);
963            struct MonitorGuard;
964            impl Drop for MonitorGuard {
965                fn drop(&mut self) {
966                    SUPERVISOR
967                        .active_monitors
968                        .fetch_sub(1, atomic::Ordering::Release);
969                    SUPERVISOR.monitor_done.notify_waiters();
970                }
971            }
972            let _monitor_guard = MonitorGuard;
973            // Check if this monitoring task is for the current daemon process.
974            // Allow Stopped/Stopping daemons through: stop() clears pid atomically,
975            // so d.pid != Some(pid) would be true, but we still need the is_stopped()
976            // branch below to fire on_stop/on_exit hooks.
977            if current_daemon.is_none()
978                || current_daemon.as_ref().is_some_and(|d| {
979                    d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
980                })
981            {
982                // Another process has taken over, don't update status
983                return;
984            }
985            // Capture the intentional-stop flag BEFORE any state changes.
986            // stop() transitions Stopping → Stopped and clears pid. If stop() wins the race
987            // and sets Stopped before this task runs, we still need to fire on_stop/on_exit.
988            // Treat both Stopping and Stopped as "intentional stop by pitchfork".
989            let already_stopped = current_daemon
990                .as_ref()
991                .is_some_and(|d| d.status.is_stopped());
992            let is_stopping = already_stopped
993                || current_daemon
994                    .as_ref()
995                    .is_some_and(|d| d.status.is_stopping());
996
997            // --- Phase 1: Determine exit_code, exit_reason, and update daemon state ---
998            let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
999                (Ok(status), true) => {
1000                    // Intentional stop (by pitchfork). status.code() returns None
1001                    // on Unix when killed by signal (e.g. SIGTERM); use -1 to
1002                    // distinguish from a clean exit code 0.
1003                    (status.code().unwrap_or(-1), "stop")
1004                }
1005                (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
1006                (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
1007                (Err(_), true) => {
1008                    // child.wait() error while stopping (e.g. sysinfo reaped the process)
1009                    (-1, "stop")
1010                }
1011                (Err(_), false) => (-1, "fail"),
1012            };
1013
1014            // Update daemon state unless stop() already did it (won the race).
1015            if !already_stopped {
1016                if let Ok(status) = &exit_status {
1017                    info!("daemon {id} exited with status {status}");
1018                }
1019                let (new_status, last_exit_success) = match exit_reason {
1020                    "stop" | "exit" => (
1021                        DaemonStatus::Stopped,
1022                        exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1023                    ),
1024                    _ => (DaemonStatus::Errored(exit_code), false),
1025                };
1026                if let Err(e) = SUPERVISOR
1027                    .upsert_daemon(
1028                        UpsertDaemonOpts::builder(id.clone())
1029                            .set(|o| {
1030                                o.pid = None;
1031                                o.status = new_status;
1032                                o.last_exit_success = Some(last_exit_success);
1033                            })
1034                            .build(),
1035                    )
1036                    .await
1037                {
1038                    error!("Failed to update daemon state for {id}: {e}");
1039                }
1040            }
1041
1042            // --- Phase 2: Fire hooks ---
1043            let hook_extra_env = vec![
1044                ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1045                ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1046            ];
1047
1048            // Determine which hooks to fire based on exit reason
1049            let hooks_to_fire: Vec<HookType> = match exit_reason {
1050                "stop" => vec![HookType::OnStop, HookType::OnExit],
1051                "exit" => vec![HookType::OnExit],
1052                // "fail": fire on_fail + on_exit only when retries are exhausted
1053                _ if hook_retry_count >= hook_retry.count() => {
1054                    vec![HookType::OnFail, HookType::OnExit]
1055                }
1056                _ => vec![],
1057            };
1058
1059            for hook_type in hooks_to_fire {
1060                fire_hook(
1061                    hook_type,
1062                    id.clone(),
1063                    daemon_dir.clone(),
1064                    hook_retry_count,
1065                    hook_daemon_env.clone(),
1066                    hook_extra_env.clone(),
1067                )
1068                .await;
1069            }
1070        });
1071
1072        // If wait_ready is true, wait for readiness notification
1073        if let Some(ready_rx) = ready_rx {
1074            match ready_rx.await {
1075                Ok(Ok(())) => {
1076                    info!("daemon {id} is ready");
1077                    Ok(IpcResponse::DaemonReady { daemon })
1078                }
1079                Ok(Err(exit_code)) => {
1080                    error!("daemon {id} failed before becoming ready");
1081                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1082                }
1083                Err(_) => {
1084                    error!("readiness channel closed unexpectedly for daemon {id}");
1085                    Ok(IpcResponse::DaemonStart { daemon })
1086                }
1087            }
1088        } else {
1089            Ok(IpcResponse::DaemonStart { daemon })
1090        }
1091    }
1092
1093    /// Stop a running daemon
1094    pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1095        let pitchfork_id = DaemonId::pitchfork();
1096        if *id == pitchfork_id {
1097            return Ok(IpcResponse::Error(
1098                "Cannot stop supervisor via stop command".into(),
1099            ));
1100        }
1101        info!("stopping daemon: {id}");
1102        if let Some(daemon) = self.get_daemon(id).await {
1103            trace!("daemon to stop: {daemon}");
1104            if let Some(pid) = daemon.pid {
1105                trace!("killing pid: {pid}");
1106                PROCS.refresh_pids(&[pid]);
1107                if PROCS.is_running(pid) {
1108                    // First set status to Stopping (preserve PID for monitoring task)
1109                    self.upsert_daemon(
1110                        UpsertDaemonOpts::builder(id.clone())
1111                            .set(|o| {
1112                                o.pid = Some(pid);
1113                                o.status = DaemonStatus::Stopping;
1114                            })
1115                            .build(),
1116                    )
1117                    .await?;
1118
1119                    // Kill the entire process group atomically (daemon PID == PGID
1120                    // because we called setsid() at spawn time)
1121                    let stop_cfg = daemon.stop_signal.unwrap_or_default();
1122                    let stop_signal: i32 = stop_cfg.signal.into();
1123                    if let Err(e) = PROCS
1124                        .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1125                        .await
1126                    {
1127                        debug!("failed to kill pid {pid}: {e}");
1128                        // Check if the process is actually stopped despite the error
1129                        PROCS.refresh_processes();
1130                        if PROCS.is_running(pid) {
1131                            // Process still running after kill attempt - set back to Running
1132                            debug!("failed to stop pid {pid}: process still running after kill");
1133                            self.upsert_daemon(
1134                                UpsertDaemonOpts::builder(id.clone())
1135                                    .set(|o| {
1136                                        o.pid = Some(pid); // Preserve PID to avoid orphaning the process
1137                                        o.status = DaemonStatus::Running;
1138                                    })
1139                                    .build(),
1140                            )
1141                            .await?;
1142                            return Ok(IpcResponse::DaemonStopFailed {
1143                                error: format!(
1144                                    "process {pid} still running after kill attempt: {e}"
1145                                ),
1146                            });
1147                        }
1148                    }
1149
1150                    // Process successfully stopped
1151                    // Note: kill_async uses SIGTERM -> wait ~3s -> SIGKILL strategy,
1152                    // and also detects zombie processes, so by the time it returns,
1153                    // the process should be fully terminated.
1154                    self.upsert_daemon(
1155                        UpsertDaemonOpts::builder(id.clone())
1156                            .set(|o| {
1157                                o.pid = None;
1158                                o.status = DaemonStatus::Stopped;
1159                                o.last_exit_success = Some(true); // Manual stop is considered successful
1160                            })
1161                            .build(),
1162                    )
1163                    .await?;
1164                } else {
1165                    debug!("pid {pid} not running, process may have exited unexpectedly");
1166                    // Process already dead, directly mark as stopped
1167                    // Note that the cleanup logic is handled in monitor task
1168                    self.upsert_daemon(
1169                        UpsertDaemonOpts::builder(id.clone())
1170                            .set(|o| {
1171                                o.pid = None;
1172                                o.status = DaemonStatus::Stopped;
1173                            })
1174                            .build(),
1175                    )
1176                    .await?;
1177                    return Ok(IpcResponse::DaemonWasNotRunning);
1178                }
1179                Ok(IpcResponse::Ok)
1180            } else {
1181                debug!("daemon {id} not running");
1182                Ok(IpcResponse::DaemonNotRunning)
1183            }
1184        } else {
1185            debug!("daemon {id} not found");
1186            Ok(IpcResponse::DaemonNotFound)
1187        }
1188    }
1189}
1190
1191#[cfg(unix)]
1192fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1193    let settings_user = settings().supervisor.user.trim();
1194    let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1195    let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1196    let configured = daemon_user.or(settings_user);
1197    let current_uid = nix::unistd::Uid::effective().as_raw();
1198    let current_gid = nix::unistd::Gid::effective().as_raw();
1199    resolve_run_identity(
1200        configured,
1201        current_uid,
1202        current_gid,
1203        std::env::var("SUDO_UID").ok().as_deref(),
1204        std::env::var("SUDO_GID").ok().as_deref(),
1205    )
1206}
1207
1208#[cfg(unix)]
1209fn resolve_run_identity(
1210    configured: Option<&str>,
1211    current_uid: u32,
1212    current_gid: u32,
1213    sudo_uid: Option<&str>,
1214    sudo_gid: Option<&str>,
1215) -> Result<RunIdentity> {
1216    let current_uid = nix::unistd::Uid::from_raw(current_uid);
1217    let current_gid = nix::unistd::Gid::from_raw(current_gid);
1218    if let Some(user) = configured {
1219        let identity = resolve_configured_user(user)?;
1220        ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1221        if identity.matches(current_uid, current_gid) {
1222            return Ok(RunIdentity::Inherit);
1223        }
1224        return Ok(identity);
1225    }
1226
1227    if current_uid.is_root()
1228        && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1229    {
1230        return Ok(identity);
1231    }
1232
1233    Ok(RunIdentity::Inherit)
1234}
1235
1236#[cfg(unix)]
1237fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1238    if user.chars().all(|c| c.is_ascii_digit()) {
1239        let uid = user
1240            .parse::<u32>()
1241            .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1242        let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1243            .into_diagnostic()?
1244            .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1245        return run_identity_from_user_record(user_record);
1246    }
1247
1248    let user_record = nix::unistd::User::from_name(user)
1249        .into_diagnostic()?
1250        .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1251    run_identity_from_user_record(user_record)
1252}
1253
1254#[cfg(unix)]
1255fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1256    let username = CString::new(user.name)
1257        .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1258    Ok(RunIdentity::Switch {
1259        uid: user.uid,
1260        gid: user.gid,
1261        username: Some(username),
1262    })
1263}
1264
1265#[cfg(unix)]
1266fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1267    RunIdentity::Switch {
1268        uid: nix::unistd::Uid::from_raw(uid),
1269        gid: nix::unistd::Gid::from_raw(gid),
1270        username,
1271    }
1272}
1273
1274#[cfg(unix)]
1275fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1276    let uid = sudo_uid?.parse::<u32>().ok()?;
1277    let gid = sudo_gid?.parse::<u32>().ok()?;
1278    let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1279        .ok()
1280        .flatten()
1281        .and_then(|u| CString::new(u.name).ok());
1282    Some(run_identity_from_raw_ids(uid, gid, username))
1283}
1284
1285#[cfg(unix)]
1286fn ensure_can_use_identity(
1287    configured_user: &str,
1288    identity: &RunIdentity,
1289    current_uid: nix::unistd::Uid,
1290    current_gid: nix::unistd::Gid,
1291) -> Result<()> {
1292    let RunIdentity::Switch { uid, gid, .. } = identity else {
1293        return Ok(());
1294    };
1295    if *uid == current_uid && *gid == current_gid {
1296        return Ok(());
1297    }
1298    if current_uid.is_root() {
1299        return Ok(());
1300    }
1301    Err(miette::miette!(
1302        "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1303        configured_user,
1304        current_uid.as_raw(),
1305        current_gid.as_raw(),
1306        uid.as_raw(),
1307        gid.as_raw()
1308    ))
1309}
1310
1311#[cfg(unix)]
1312fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1313    let RunIdentity::Switch { uid, gid, username } = identity else {
1314        return Ok(());
1315    };
1316    if let Some(username) = username {
1317        initgroups_for_user(username, *gid)?;
1318    } else {
1319        setgroups_to_primary(*gid)?;
1320    }
1321    nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1322    nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1323    Ok(())
1324}
1325
1326#[cfg(unix)]
1327impl RunIdentity {
1328    fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1329        matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1330    }
1331}
1332
1333#[cfg(unix)]
1334fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1335    let groups = [gid.as_raw() as libc::gid_t];
1336    #[cfg(any(target_os = "linux", target_os = "android"))]
1337    let group_count = groups.len();
1338    #[cfg(not(any(target_os = "linux", target_os = "android")))]
1339    let group_count = groups.len() as libc::c_int;
1340    let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1341    if rc == -1 {
1342        Err(std::io::Error::last_os_error())
1343    } else {
1344        Ok(())
1345    }
1346}
1347
1348#[cfg(unix)]
1349fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1350    let gid = gid.as_raw();
1351    #[cfg(any(
1352        target_os = "macos",
1353        target_os = "ios",
1354        target_os = "tvos",
1355        target_os = "watchos"
1356    ))]
1357    let base_gid = i32::try_from(gid)
1358        .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1359
1360    #[cfg(not(any(
1361        target_os = "macos",
1362        target_os = "ios",
1363        target_os = "tvos",
1364        target_os = "watchos"
1365    )))]
1366    let base_gid = gid as libc::gid_t;
1367
1368    // SAFETY: `username` is a valid nul-terminated C string and `base_gid`
1369    // is derived from a resolved system account or sudo-provided gid.
1370    let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1371    if rc == -1 {
1372        Err(std::io::Error::last_os_error())
1373    } else {
1374        Ok(())
1375    }
1376}
1377
1378#[cfg(unix)]
1379fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1380    std::io::Error::from_raw_os_error(err as i32)
1381}
1382
1383/// Check if multiple ports are available and optionally auto-bump to find available ports.
1384///
1385/// All ports are bumped by the same offset to maintain relative port spacing.
1386/// Returns the resolved ports (either the original or bumped ones).
1387/// Returns an error if any port is in use and auto_bump is disabled,
1388/// or if no available ports can be found after max attempts.
1389async fn check_ports_available(
1390    expected_ports: &[u16],
1391    auto_bump: bool,
1392    max_attempts: u32,
1393) -> Result<Vec<u16>> {
1394    if expected_ports.is_empty() {
1395        return Ok(Vec::new());
1396    }
1397
1398    for bump_offset in 0..=max_attempts {
1399        // Use wrapping_add to handle overflow correctly - ports wrap around at 65535
1400        let candidate_ports: Vec<u16> = expected_ports
1401            .iter()
1402            .map(|&p| p.wrapping_add(bump_offset as u16))
1403            .collect();
1404
1405        // Check if all ports in this set are available
1406        let mut all_available = true;
1407        let mut conflicting_port = None;
1408
1409        for &port in &candidate_ports {
1410            // Port 0 is a special case - it requests an ephemeral port from the OS.
1411            // Skip the availability check for port 0 since binding to it always succeeds.
1412            if port == 0 {
1413                continue;
1414            }
1415
1416            // Use spawn_blocking to avoid blocking the async runtime during TCP bind checks.
1417            //
1418            // We check multiple addresses to avoid false-negatives caused by SO_REUSEADDR.
1419            // On macOS/BSD, Rust's TcpListener::bind sets SO_REUSEADDR by default, which
1420            // allows binding 0.0.0.0:port even when 127.0.0.1:port is already in use
1421            // (because 0.0.0.0 is technically a different address).  Most daemons bind
1422            // to localhost, so checking 127.0.0.1 is essential to detect real conflicts.
1423            // We also check [::1] to cover IPv6 loopback listeners.
1424            //
1425            // NOTE: This check has a time-of-check-to-time-of-use (TOCTOU) race condition.
1426            // Another process could grab the port between our check and the daemon actually
1427            // binding. This is inherent to the approach and acceptable for our use case
1428            // since we're primarily detecting conflicts with already-running daemons.
1429            if is_port_in_use(port).await {
1430                all_available = false;
1431                conflicting_port = Some(port);
1432                break;
1433            }
1434        }
1435
1436        if all_available {
1437            // Check for overflow (port wrapped around to 0 due to wrapping_add)
1438            // If any candidate port is 0 but the original expected port wasn't 0,
1439            // it means we've wrapped around and should stop
1440            if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1441                return Err(PortError::NoAvailablePort {
1442                    start_port: expected_ports[0],
1443                    attempts: bump_offset + 1,
1444                }
1445                .into());
1446            }
1447            if bump_offset > 0 {
1448                info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1449            }
1450            return Ok(candidate_ports);
1451        }
1452
1453        // Port is in use
1454        if bump_offset == 0 && !auto_bump {
1455            if let Some(port) = conflicting_port {
1456                let (pid, process) = identify_port_owner(port).await;
1457                return Err(PortError::InUse { port, process, pid }.into());
1458            }
1459        }
1460    }
1461
1462    // No available ports found after max attempts
1463    Err(PortError::NoAvailablePort {
1464        start_port: expected_ports[0],
1465        attempts: max_attempts + 1,
1466    }
1467    .into())
1468}
1469
1470/// Check whether a port is currently in use by attempting to bind on multiple addresses.
1471///
1472/// Returns `true` when at least one bind attempt gets `AddrInUse`, meaning another
1473/// process is listening.  Other errors (e.g. `AddrNotAvailable` on an address family
1474/// the OS doesn't support) are ignored so they don't produce false positives.
1475async fn is_port_in_use(port: u16) -> bool {
1476    tokio::task::spawn_blocking(move || {
1477        for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1478            match std::net::TcpListener::bind((addr, port)) {
1479                Ok(listener) => drop(listener),
1480                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1481                Err(_) => continue,
1482            }
1483        }
1484        false
1485    })
1486    .await
1487    .unwrap_or(false)
1488}
1489
1490/// Best-effort lookup of the process occupying a port via `listeners::get_all()`.
1491///
1492/// Returns `(pid, process_name)`.  Falls back to `(0, "unknown")` when the
1493/// system call fails (permission error, unsupported OS, etc.).
1494async fn identify_port_owner(port: u16) -> (u32, String) {
1495    tokio::task::spawn_blocking(move || {
1496        listeners::get_all()
1497            .ok()
1498            .and_then(|list| {
1499                list.into_iter()
1500                    .find(|l| l.socket.port() == port)
1501                    .map(|l| (l.process.pid, l.process.name))
1502            })
1503            .unwrap_or((0, "unknown".to_string()))
1504    })
1505    .await
1506    .unwrap_or((0, "unknown".to_string()))
1507}
1508
1509/// Detect whether a port is in use, and if so, identify the owning process.
1510///
1511/// Combines `is_port_in_use` (reliable bind probe) with `identify_port_owner`
1512/// (best-effort process lookup).  Returns `None` when the port is free.
1513async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1514    if !is_port_in_use(port).await {
1515        return None;
1516    }
1517    Some(identify_port_owner(port).await)
1518}
1519
1520/// Spawn a background task that detects the first port the daemon process is listening on
1521/// and stores it in the state file as `active_port`.
1522///
1523/// This is called once when the daemon becomes ready. The port is cleared when the daemon stops.
1524///
1525/// Port selection strategy:
1526/// 1. If the daemon has `expected_port` configured, prefer the first port from that list
1527///    (it is the port the operator explicitly designated as the primary service port).
1528/// 2. Otherwise, take the first port the process is actually listening on (in the order
1529///    returned by the OS), which is typically the port bound earliest.
1530///
1531/// Using `min()` (lowest port number) was previously used here but is incorrect: many
1532/// applications listen on multiple ports (e.g. HTTP + metrics) and the lowest-numbered
1533/// port is not necessarily the primary service port.
1534fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1535    tokio::spawn(async move {
1536        // Retry with exponential backoff so that slow-starting daemons (JVM,
1537        // Node.js, Python, etc.) that take more than 500 ms to bind their port
1538        // are still detected.  Total wait budget: 500+1000+2000+4000 = 7.5 s.
1539        for delay_ms in [500u64, 1000, 2000, 4000] {
1540            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1541
1542            // Read daemon state atomically: check if still alive and get expected_port
1543            // in a single lock acquisition to avoid TOCTOU and unnecessary lock overhead.
1544            let expected_port: Option<u16> = {
1545                let state_file = SUPERVISOR.state_file.lock().await;
1546                match state_file.daemons.get(&id) {
1547                    Some(d) if d.pid.is_none() => {
1548                        debug!("daemon {id}: aborting active_port detection — process exited");
1549                        return;
1550                    }
1551                    Some(d) => d
1552                        .port
1553                        .as_ref()
1554                        .and_then(|p| p.expect.first().copied())
1555                        .filter(|&p| p > 0),
1556                    None => None,
1557                }
1558            };
1559
1560            let active_port = tokio::task::spawn_blocking(move || {
1561                let listeners = listeners::get_all().ok()?;
1562                let process_ports: Vec<u16> = listeners
1563                    .into_iter()
1564                    .filter(|listener| listener.process.pid == pid)
1565                    .map(|listener| listener.socket.port())
1566                    .filter(|&port| port > 0)
1567                    .collect();
1568
1569                if process_ports.is_empty() {
1570                    return None;
1571                }
1572
1573                // Prefer the configured expected_port if the process is actually
1574                // listening on it; otherwise fall back to the first port found.
1575                if let Some(ep) = expected_port {
1576                    if process_ports.contains(&ep) {
1577                        return Some(ep);
1578                    }
1579                }
1580
1581                // No expected_port match — return the first port in the list.
1582                // The list order reflects the order the OS reports listeners,
1583                // which is generally the order they were bound (earliest first).
1584                // Do NOT sort: the lowest-numbered port is not necessarily the
1585                // primary service port (e.g. HTTP vs metrics).
1586                process_ports.into_iter().next()
1587            })
1588            .await
1589            .ok()
1590            .flatten();
1591
1592            if let Some(port) = active_port {
1593                debug!("daemon {id} active_port detected: {port}");
1594                let mut state_file = SUPERVISOR.state_file.lock().await;
1595                if let Some(d) = state_file.daemons.get(&id) {
1596                    // Guard against PID reuse: if the original process exited and the OS
1597                    // assigned the same PID to an unrelated process that happens to bind
1598                    // a port, we must not route proxy traffic to that unrelated service.
1599                    if d.pid == Some(pid) {
1600                        state_file.set_active_port(&id, port);
1601                    } else {
1602                        debug!(
1603                            "daemon {id}: skipping active_port write — PID mismatch \
1604                             (expected {pid}, current {:?})",
1605                            d.pid
1606                        );
1607                        return;
1608                    }
1609                }
1610                return;
1611            }
1612
1613            debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1614        }
1615
1616        debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1617    });
1618}
1619
1620/// Check whether a daemon (by its qualified ID) is the target of any registered
1621/// slug in the global config.  This is used to decide whether to run the
1622/// `detect_and_store_active_port` polling task — only slug-targeted daemons need
1623/// it, avoiding wasted `listeners::get_all()` calls for port-less daemons.
1624///
1625/// Delegates to `proxy::server::is_slug_target()` which uses the same in-memory
1626/// slug cache as the proxy hot path, so this check is cheap.
1627fn is_daemon_slug_target(id: &DaemonId) -> bool {
1628    // read_global_slugs is called once per daemon start — acceptable cost.
1629    // We intentionally avoid making this async to keep has_port_config evaluation
1630    // simple and synchronous in run_once().
1631    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1632    slugs.iter().any(|(slug, entry)| {
1633        let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1634        id.name() == daemon_name
1635    })
1636}
1637
1638#[cfg(all(test, unix))]
1639mod tests {
1640    use super::*;
1641
1642    #[test]
1643    fn test_resolve_run_identity_empty_without_sudo() {
1644        let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1645        assert_eq!(identity, RunIdentity::Inherit);
1646    }
1647
1648    #[test]
1649    fn test_resolve_run_identity_sudo_fallback() {
1650        let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1651        let RunIdentity::Switch { uid, gid, .. } = identity else {
1652            panic!("expected identity switch");
1653        };
1654        assert_eq!(uid.as_raw(), 501);
1655        assert_eq!(gid.as_raw(), 20);
1656    }
1657
1658    #[test]
1659    fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1660        let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1661        assert_eq!(identity, RunIdentity::Inherit);
1662    }
1663
1664    #[test]
1665    fn test_resolve_configured_user_root_name() {
1666        let identity = resolve_configured_user("root").unwrap();
1667        let RunIdentity::Switch { uid, username, .. } = identity else {
1668            panic!("expected identity switch");
1669        };
1670        assert_eq!(uid.as_raw(), 0);
1671        assert_eq!(
1672            username.as_deref().and_then(|s| s.to_str().ok()),
1673            Some("root")
1674        );
1675    }
1676
1677    #[test]
1678    fn test_resolve_configured_user_root_uid() {
1679        let identity = resolve_configured_user("0").unwrap();
1680        let RunIdentity::Switch { uid, username, .. } = identity else {
1681            panic!("expected identity switch");
1682        };
1683        assert_eq!(uid.as_raw(), 0);
1684        assert_eq!(
1685            username.as_deref().and_then(|s| s.to_str().ok()),
1686            Some("root")
1687        );
1688    }
1689
1690    #[test]
1691    fn test_resolve_configured_user_missing_user_fails() {
1692        let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1693            .unwrap_err()
1694            .to_string();
1695        assert!(err.contains("does not exist"));
1696    }
1697
1698    #[test]
1699    fn test_resolve_run_identity_requires_root_for_user_switch() {
1700        let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1701            .unwrap_err()
1702            .to_string();
1703        assert!(err.contains("Restart the supervisor with sudo"));
1704    }
1705
1706    #[test]
1707    fn test_resolve_run_identity_same_user_is_noop() {
1708        let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1709        assert_eq!(identity, RunIdentity::Inherit);
1710    }
1711}
1712
1713/// Inject proxy-related environment variables into a daemon's command.
1714///
1715/// Adds:
1716/// - `HOST` — the address the daemon should bind to (`127.0.0.1`, omitted in LAN mode)
1717/// - `PITCHFORK_URL` — the public proxy URL for this daemon (if it has a slug)
1718/// - `NODE_EXTRA_CA_CERTS` — path to the pitchfork CA cert (if HTTPS enabled)
1719/// - `__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS` — `.<tld>` for Vite host allowlisting
1720/// - `PITCHFORK_LAN` — set to `"1"` when LAN mode is active
1721fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1722    let s = crate::settings::settings();
1723    let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1724
1725    if should_force_loopback_host(slug) && !lan_enabled {
1726        // Only force loopback binding for daemons that are actually routed via a slug.
1727        // In LAN mode, daemons need to bind to 0.0.0.0 to be reachable from the network.
1728        cmd.env("HOST", "127.0.0.1");
1729    }
1730
1731    // PITCHFORK_URL: the daemon's public proxy URL (only if it has a slug and proxy is enabled)
1732    if let Some(url) = build_pitchfork_url(slug, s) {
1733        cmd.env("PITCHFORK_URL", &url);
1734    }
1735
1736    // NODE_EXTRA_CA_CERTS: let Node.js backends trust the pitchfork CA
1737    if s.proxy.enable && s.proxy.https {
1738        let ca_path = if s.proxy.tls_cert.is_empty() {
1739            crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1740        } else {
1741            std::path::PathBuf::from(&s.proxy.tls_cert)
1742        };
1743        if ca_path.exists() {
1744            cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1745        }
1746    }
1747
1748    // __VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS: Vite host allowlisting
1749    if s.proxy.enable {
1750        let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1751        cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1752    }
1753
1754    // PITCHFORK_LAN: signal to daemons that LAN mode is active
1755    if lan_enabled {
1756        cmd.env("PITCHFORK_LAN", "1");
1757    }
1758}
1759
1760fn should_force_loopback_host(slug: &Option<String>) -> bool {
1761    let Some(slug) = slug.as_deref() else {
1762        return false;
1763    };
1764
1765    let s = crate::settings::settings();
1766    if !s.proxy.enable {
1767        return false;
1768    }
1769
1770    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1771    slugs.contains_key(slug)
1772}
1773
1774/// Compute the public proxy URL for a daemon.
1775///
1776/// Returns `None` if the daemon has no slug or the proxy is not enabled.
1777fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1778    let slug = slug.as_ref()?;
1779    if !s.proxy.enable {
1780        return None;
1781    }
1782    let scheme = if s.proxy.https { "https" } else { "http" };
1783    let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1784    let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1785        String::new()
1786    } else {
1787        format!(":{port}")
1788    };
1789    let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1790    let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1791    Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1792}