Skip to main content

pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::hooks::{HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
33static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39    Inherit,
40    Switch {
41        uid: nix::unistd::Uid,
42        gid: nix::unistd::Gid,
43        username: Option<CString>,
44    },
45}
46
47/// Get or compile a regex pattern, caching the result for future use
48pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50    if let Some(re) = cache.get(pattern) {
51        return Some(re.clone());
52    }
53    match Regex::new(pattern) {
54        Ok(re) => {
55            cache.insert(pattern.to_string(), re.clone());
56            Some(re)
57        }
58        Err(e) => {
59            error!("invalid regex pattern '{pattern}': {e}");
60            None
61        }
62    }
63}
64
65impl Supervisor {
66    /// Run a daemon, handling retries if configured
67    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68        let id = &opts.id;
69        let cmd = opts.cmd.clone();
70
71        // Clear any pending autostop for this daemon since it's being started
72        {
73            let mut pending = self.pending_autostops.lock().await;
74            if pending.remove(id).is_some() {
75                info!("cleared pending autostop for {id} (daemon starting)");
76            }
77        }
78
79        let daemon = self.get_daemon(id).await;
80        if let Some(daemon) = daemon {
81            // Stopping state is treated as "not running" - the monitoring task will clean it up
82            // Only check for Running state with a valid PID
83            if !daemon.status.is_stopping()
84                && !daemon.status.is_stopped()
85                && let Some(pid) = daemon.pid
86            {
87                if opts.force {
88                    self.stop(id).await?;
89                    info!("run: stop completed for daemon {id}");
90                } else {
91                    warn!("daemon {id} already running with pid {pid}");
92                    return Ok(IpcResponse::DaemonAlreadyRunning);
93                }
94            }
95        }
96
97        // If wait_ready is true and retry is configured, implement retry loop
98        if opts.wait_ready && opts.retry > 0 {
99            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
100            let max_attempts = opts.retry.saturating_add(1);
101            for attempt in 0..max_attempts {
102                let mut retry_opts = opts.clone();
103                retry_opts.retry_count = attempt;
104                retry_opts.cmd = cmd.clone();
105
106                let result = self.run_once(retry_opts).await?;
107
108                match result {
109                    IpcResponse::DaemonReady { daemon } => {
110                        return Ok(IpcResponse::DaemonReady { daemon });
111                    }
112                    IpcResponse::DaemonFailedWithCode { exit_code } => {
113                        if attempt < opts.retry {
114                            let backoff_secs = 2u64.pow(attempt);
115                            info!(
116                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
117                                attempt + 1,
118                                max_attempts,
119                                backoff_secs
120                            );
121                            fire_hook(
122                                HookType::OnRetry,
123                                id.clone(),
124                                opts.dir.clone(),
125                                attempt + 1,
126                                opts.env.clone(),
127                                vec![],
128                            )
129                            .await;
130                            time::sleep(Duration::from_secs(backoff_secs)).await;
131                            continue;
132                        } else {
133                            info!("daemon {id} failed after {max_attempts} attempts");
134                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135                        }
136                    }
137                    other => return Ok(other),
138                }
139            }
140        }
141
142        // No retry or wait_ready is false
143        self.run_once(opts).await
144    }
145
146    /// Run a daemon once (single attempt)
147    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148        let id = &opts.id;
149        let original_cmd = opts.cmd.clone(); // Save original command for persistence
150        let cmd = opts.cmd;
151
152        // Create channel for readiness notification if wait_ready is true
153        let (ready_tx, ready_rx) = if opts.wait_ready {
154            let (tx, rx) = oneshot::channel();
155            (Some(tx), Some(rx))
156        } else {
157            (None, None)
158        };
159
160        // Check port availability and apply auto-bump if configured
161        let expected_ports = opts.expected_port.clone();
162        let (resolved_ports, effective_ready_port) = if !opts.expected_port.is_empty() {
163            match check_ports_available(
164                &opts.expected_port,
165                opts.auto_bump_port,
166                opts.port_bump_attempts,
167            )
168            .await
169            {
170                Ok(resolved) => {
171                    let ready_port = if let Some(configured_port) = opts.ready_port {
172                        // If ready_port matches one of the expected ports, apply the same bump offset
173                        let bump_offset = resolved
174                            .first()
175                            .unwrap_or(&0)
176                            .saturating_sub(*opts.expected_port.first().unwrap_or(&0));
177                        if opts.expected_port.contains(&configured_port) && bump_offset > 0 {
178                            configured_port
179                                .checked_add(bump_offset)
180                                .or(Some(configured_port))
181                        } else {
182                            Some(configured_port)
183                        }
184                    } else if opts.ready_output.is_none()
185                        && opts.ready_http.is_none()
186                        && opts.ready_cmd.is_none()
187                        && opts.ready_delay.is_none()
188                    {
189                        // No other ready check configured — use the first expected port as a
190                        // TCP port readiness check so the daemon is considered ready once it
191                        // starts listening.  Skip port 0 (ephemeral port request).
192                        resolved.first().copied().filter(|&p| p != 0)
193                    } else {
194                        // Another ready check is configured (output/http/cmd/delay).
195                        // Don't add an implicit TCP port check — it could race and fire
196                        // before the daemon has produced any output.
197                        None
198                    };
199                    info!(
200                        "daemon {id}: ports {:?} resolved to {:?}",
201                        opts.expected_port, resolved
202                    );
203                    (resolved, ready_port)
204                }
205                Err(e) => {
206                    error!("daemon {id}: port check failed: {e}");
207                    // Convert PortError to structured IPC response
208                    if let Some(port_error) = e.downcast_ref::<PortError>() {
209                        match port_error {
210                            PortError::InUse { port, process, pid } => {
211                                return Ok(IpcResponse::PortConflict {
212                                    port: *port,
213                                    process: process.clone(),
214                                    pid: *pid,
215                                });
216                            }
217                            PortError::NoAvailablePort {
218                                start_port,
219                                attempts,
220                            } => {
221                                return Ok(IpcResponse::NoAvailablePort {
222                                    start_port: *start_port,
223                                    attempts: *attempts,
224                                });
225                            }
226                        }
227                    }
228                    return Ok(IpcResponse::DaemonFailed {
229                        error: e.to_string(),
230                    });
231                }
232            }
233        } else {
234            // When ready_port is set without expected_port, check that the port
235            // is not already occupied.  If another process is listening on it,
236            // the TCP readiness probe would immediately succeed and pitchfork
237            // would falsely consider the daemon ready — routing proxy traffic to
238            // the wrong process.
239            if let Some(port) = opts.ready_port {
240                if port > 0 {
241                    if let Some((pid, process)) = detect_port_conflict(port).await {
242                        return Ok(IpcResponse::PortConflict { port, process, pid });
243                    }
244                }
245            }
246            (Vec::new(), opts.ready_port)
247        };
248
249        let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
250            match settings().resolve_mise_bin() {
251                Some(mise_bin) => {
252                    let mise_bin_str = mise_bin.to_string_lossy().to_string();
253                    info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
254                    once("exec".to_string())
255                        .chain(once(mise_bin_str))
256                        .chain(once("x".to_string()))
257                        .chain(once("--".to_string()))
258                        .chain(cmd)
259                        .collect_vec()
260                }
261                None => {
262                    warn!("daemon {id}: mise=true but mise binary not found, running without mise");
263                    once("exec".to_string()).chain(cmd).collect_vec()
264                }
265            }
266        } else {
267            once("exec".to_string()).chain(cmd).collect_vec()
268        };
269        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
270        let log_path = id.log_path();
271        if let Some(parent) = log_path.parent() {
272            xx::file::mkdirp(parent)?;
273        }
274        #[cfg(unix)]
275        let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
276            Ok(identity) => identity,
277            Err(e) => {
278                return Ok(IpcResponse::DaemonFailed {
279                    error: e.to_string(),
280                });
281            }
282        };
283        info!("run: spawning daemon {id} with args: {args:?}");
284        let mut cmd = tokio::process::Command::new("sh");
285        cmd.args(&args)
286            .stdin(std::process::Stdio::null())
287            .stdout(std::process::Stdio::piped())
288            .stderr(std::process::Stdio::piped())
289            .current_dir(&opts.dir);
290
291        // Ensure daemon can find user tools by using the original PATH
292        if let Some(ref path) = *env::ORIGINAL_PATH {
293            cmd.env("PATH", path);
294        }
295
296        // Apply custom environment variables from config
297        if let Some(ref env_vars) = opts.env {
298            cmd.envs(env_vars);
299        }
300
301        // Inject pitchfork metadata env vars AFTER user env so they can't be overwritten
302        cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
303        cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
304        cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
305
306        // Inject the resolved ports for the daemon to use
307        if !resolved_ports.is_empty() {
308            // Set PORT to the first port for backward compatibility
309            // When there's only one port, both PORT and PORT0 will be set to the same value.
310            // This follows the convention used by many deployment platforms (Heroku, etc.).
311            cmd.env("PORT", resolved_ports[0].to_string());
312            // Set individual ports as PORT0, PORT1, etc.
313            for (i, port) in resolved_ports.iter().enumerate() {
314                cmd.env(format!("PORT{i}"), port.to_string());
315            }
316        }
317
318        #[cfg(unix)]
319        {
320            let run_identity = run_identity.clone();
321            unsafe {
322                cmd.pre_exec(move || {
323                    nix::unistd::setsid().map_err(nix_to_io_error)?;
324                    apply_run_identity(&run_identity)?;
325                    Ok(())
326                });
327            }
328        }
329
330        let mut child = cmd.spawn().into_diagnostic()?;
331        let pid = match child.id() {
332            Some(p) => p,
333            None => {
334                warn!("Daemon {id} exited before PID could be captured");
335                return Ok(IpcResponse::DaemonFailed {
336                    error: "Process exited immediately".to_string(),
337                });
338            }
339        };
340        info!("started daemon {id} with pid {pid}");
341        let daemon = self
342            .upsert_daemon(
343                UpsertDaemonOpts::builder(id.clone())
344                    .set(|o| {
345                        o.pid = Some(pid);
346                        o.status = DaemonStatus::Running;
347                        o.shell_pid = opts.shell_pid;
348                        o.dir = Some(opts.dir.clone());
349                        o.cmd = Some(original_cmd);
350                        o.autostop = opts.autostop;
351                        o.cron_schedule = opts.cron_schedule.clone();
352                        o.cron_retrigger = opts.cron_retrigger;
353                        o.retry = Some(opts.retry);
354                        o.retry_count = Some(opts.retry_count);
355                        o.ready_delay = opts.ready_delay;
356                        o.ready_output = opts.ready_output.clone();
357                        o.ready_http = opts.ready_http.clone();
358                        o.ready_port = effective_ready_port;
359                        o.ready_cmd = opts.ready_cmd.clone();
360                        o.expected_port = expected_ports;
361                        o.resolved_port = resolved_ports;
362                        o.auto_bump_port = Some(opts.auto_bump_port);
363                        o.port_bump_attempts = Some(opts.port_bump_attempts);
364                        o.depends = Some(opts.depends.clone());
365                        o.env = opts.env.clone();
366                        o.watch = Some(opts.watch.clone());
367                        o.watch_mode = Some(opts.watch_mode);
368                        o.watch_base_dir = opts.watch_base_dir.clone();
369                        o.mise = opts.mise;
370                        o.user = opts.user.clone();
371                        o.memory_limit = opts.memory_limit;
372                        o.cpu_limit = opts.cpu_limit;
373                    })
374                    .build(),
375            )
376            .await?;
377
378        let id_clone = id.clone();
379        let ready_delay = opts.ready_delay;
380        let ready_output = opts.ready_output.clone();
381        let ready_http = opts.ready_http.clone();
382        let ready_port = effective_ready_port;
383        let ready_cmd = opts.ready_cmd.clone();
384        let daemon_dir = opts.dir.clone();
385        let hook_retry_count = opts.retry_count;
386        let hook_retry = opts.retry;
387        let hook_daemon_env = opts.env.clone();
388        // Whether this daemon has any port-related config — used to skip the
389        // active_port detection task for daemons that never bind a port (e.g. `sleep 60`).
390        // When the proxy is enabled, only detect active_port for daemons that are
391        // actually referenced by a registered slug, rather than blanket-polling every
392        // daemon (which wastes ~7.5 s of listeners::get_all() calls per port-less daemon).
393        let has_port_config = !opts.expected_port.is_empty()
394            || (settings().proxy.enable && is_daemon_slug_target(id));
395        let daemon_pid = pid;
396
397        tokio::spawn(async move {
398            let id = id_clone;
399            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
400                (Some(out), Some(err)) => (out, err),
401                _ => {
402                    error!("Failed to capture stdout/stderr for daemon {id}");
403                    return;
404                }
405            };
406            let mut stdout = tokio::io::BufReader::new(stdout).lines();
407            let mut stderr = tokio::io::BufReader::new(stderr).lines();
408            let log_file = match tokio::fs::File::options()
409                .append(true)
410                .create(true)
411                .open(&log_path)
412                .await
413            {
414                Ok(f) => f,
415                Err(e) => {
416                    error!("Failed to open log file for daemon {id}: {e}");
417                    return;
418                }
419            };
420            let mut log_appender = BufWriter::new(log_file);
421
422            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
423            let format_line = |line: String| {
424                if line.starts_with(&format!("{id} ")) {
425                    // mise tasks often already have the id printed
426                    format!("{} {line}\n", now())
427                } else {
428                    format!("{} {id} {line}\n", now())
429                }
430            };
431
432            // Setup readiness checking
433            let mut ready_notified = false;
434            let mut ready_tx = ready_tx;
435            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
436            // Track whether we've already spawned the active_port detection task
437            let mut active_port_spawned = false;
438
439            let mut delay_timer =
440                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
441
442            // Get settings for intervals
443            let s = settings();
444            let ready_check_interval = s.supervisor_ready_check_interval();
445            let http_client_timeout = s.supervisor_http_client_timeout();
446            let log_flush_interval_duration = s.supervisor_log_flush_interval();
447
448            // Setup HTTP readiness check interval
449            let mut http_check_interval = ready_http
450                .as_ref()
451                .map(|_| tokio::time::interval(ready_check_interval));
452            let http_client = ready_http.as_ref().map(|_| {
453                reqwest::Client::builder()
454                    .timeout(http_client_timeout)
455                    .build()
456                    .unwrap_or_default()
457            });
458
459            // Setup TCP port readiness check interval
460            let mut port_check_interval =
461                ready_port.map(|_| tokio::time::interval(ready_check_interval));
462
463            // Setup command readiness check interval
464            let mut cmd_check_interval = ready_cmd
465                .as_ref()
466                .map(|_| tokio::time::interval(ready_check_interval));
467
468            // Setup periodic log flush interval
469            let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
470
471            // Use a channel to communicate process exit status
472            let (exit_tx, mut exit_rx) =
473                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
474
475            // Spawn a task to wait for process exit
476            let child_pid = child.id().unwrap_or(0);
477            tokio::spawn(async move {
478                let result = child.wait().await;
479                // On non-Linux Unix (e.g. macOS) the zombie reaper may win the
480                // race and consume the exit status via waitpid(None, WNOHANG)
481                // before Tokio's child.wait() gets to it. When that happens,
482                // Tokio returns an ECHILD io::Error. We recover by checking
483                // REAPED_STATUSES for the stashed exit code.
484                //
485                // On Linux this is unnecessary because the reaper uses
486                // waitid(WNOWAIT) to peek before reaping, which avoids the
487                // race entirely.
488                #[cfg(all(unix, not(target_os = "linux")))]
489                let result = match &result {
490                    Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
491                        if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
492                            warn!(
493                                "daemon pid {child_pid} wait() got ECHILD; \
494                                 recovered exit code {code} from zombie reaper"
495                            );
496                            // Synthesize an ExitStatus from the stashed code.
497                            // On Unix we can use `ExitStatus::from_raw()` with
498                            // a wait-style status word (code << 8 for normal
499                            // exit, or raw signal number for signal death).
500                            use std::os::unix::process::ExitStatusExt;
501                            if code >= 0 {
502                                Ok(std::process::ExitStatus::from_raw(code << 8))
503                            } else {
504                                // Negative code means killed by signal (-sig)
505                                Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
506                            }
507                        } else {
508                            warn!(
509                                "daemon pid {child_pid} wait() got ECHILD but no \
510                                 stashed status found; reporting as error"
511                            );
512                            result
513                        }
514                    }
515                    _ => result,
516                };
517                debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
518                let _ = exit_tx.send(result).await;
519            });
520
521            #[allow(unused_assignments)]
522            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
523            let mut exit_status = None;
524
525            // If there is no ready check of any kind and no delay, the daemon is
526            // considered immediately ready and the active_port detection task would
527            // never be triggered inside the select loop.  Kick it off right away so
528            // that daemons without any readiness configuration still get their
529            // active_port populated (needed for proxy routing).
530            if has_port_config
531                && ready_pattern.is_none()
532                && ready_http.is_none()
533                && ready_port.is_none()
534                && ready_cmd.is_none()
535                && delay_timer.is_none()
536            {
537                active_port_spawned = true;
538                detect_and_store_active_port(id.clone(), daemon_pid);
539            }
540
541            loop {
542                select! {
543                                Ok(Some(line)) = stdout.next_line() => {
544                                    let formatted = format_line(line.clone());
545                                    if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
546                                        error!("Failed to write to log for daemon {id}: {e}");
547                                    }
548                                    trace!("stdout: {id} {formatted}");
549
550                        // Check if output matches ready pattern
551                        if !ready_notified
552                            && let Some(ref pattern) = ready_pattern
553                            && pattern.is_match(&line)
554                        {
555                            info!("daemon {id} ready: output matched pattern");
556                            ready_notified = true;
557                            let _ = log_appender.flush().await;
558                            if let Some(tx) = ready_tx.take() {
559                                let _ = tx.send(Ok(()));
560                            }
561                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
562                            if !active_port_spawned && has_port_config {
563                                active_port_spawned = true;
564                                detect_and_store_active_port(id.clone(), daemon_pid);
565                            }
566                        }
567                    }
568                    Ok(Some(line)) = stderr.next_line() => {
569                        let formatted = format_line(line.clone());
570                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
571                            error!("Failed to write to log for daemon {id}: {e}");
572                        }
573                        trace!("stderr: {id} {formatted}");
574
575                        if !ready_notified
576                            && let Some(ref pattern) = ready_pattern
577                            && pattern.is_match(&line)
578                        {
579                            info!("daemon {id} ready: output matched pattern");
580                            ready_notified = true;
581                            let _ = log_appender.flush().await;
582                            if let Some(tx) = ready_tx.take() {
583                                let _ = tx.send(Ok(()));
584                            }
585                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
586                            if !active_port_spawned && has_port_config {
587                                active_port_spawned = true;
588                                detect_and_store_active_port(id.clone(), daemon_pid);
589                            }
590                        }
591                    },
592                    Some(result) = exit_rx.recv() => {
593                        // Process exited - save exit status and notify if not ready yet
594                        exit_status = Some(result);
595                        debug!("daemon {id} process exited, exit_status: {exit_status:?}");
596                        // Flush logs before notifying so clients see logs immediately
597                        let _ = log_appender.flush().await;
598                        if !ready_notified {
599                            if let Some(tx) = ready_tx.take() {
600                                // Check if process exited successfully
601                                let is_success = exit_status.as_ref()
602                                    .and_then(|r| r.as_ref().ok())
603                                    .map(|s| s.success())
604                                    .unwrap_or(false);
605
606                                if is_success {
607                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
608                                    let _ = tx.send(Ok(()));
609                                } else {
610                                    let exit_code = exit_status.as_ref()
611                                        .and_then(|r| r.as_ref().ok())
612                                        .and_then(|s| s.code());
613                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
614                                    let _ = tx.send(Err(exit_code));
615                                }
616                            }
617                        } else {
618                            debug!("daemon {id} was already marked ready, not sending notification");
619                        }
620                        break;
621                    },
622                    _ = async {
623                        if let Some(ref mut interval) = http_check_interval {
624                            interval.tick().await;
625                        } else {
626                            std::future::pending::<()>().await;
627                        }
628                    }, if !ready_notified && ready_http.is_some() => {
629                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
630                            match client.get(url).send().await {
631                                Ok(response) if response.status().is_success() => {
632                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
633                                    ready_notified = true;
634                                    let _ = log_appender.flush().await;
635                                    if let Some(tx) = ready_tx.take() {
636                                        let _ = tx.send(Ok(()));
637                                    }
638                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
639                                    http_check_interval = None;
640                                    if !active_port_spawned && has_port_config {
641                                        active_port_spawned = true;
642                                        detect_and_store_active_port(id.clone(), daemon_pid);
643                                    }
644                                }
645                                Ok(response) => {
646                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
647                                }
648                                Err(e) => {
649                                    trace!("daemon {id} HTTP check failed: {e}");
650                                }
651                            }
652                        }
653                    }
654                    _ = async {
655                        if let Some(ref mut interval) = port_check_interval {
656                            interval.tick().await;
657                        } else {
658                            std::future::pending::<()>().await;
659                        }
660                    }, if !ready_notified && ready_port.is_some() => {
661                        if let Some(port) = ready_port {
662                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
663                                Ok(_) => {
664                                    info!("daemon {id} ready: TCP port {port} is listening");
665                                    ready_notified = true;
666                                    // Flush logs before notifying so clients see logs immediately
667                                    let _ = log_appender.flush().await;
668                                    if let Some(tx) = ready_tx.take() {
669                                        let _ = tx.send(Ok(()));
670                                    }
671                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
672                                    // Stop checking once ready
673                                    port_check_interval = None;
674                                    if !active_port_spawned && has_port_config {
675                                        active_port_spawned = true;
676                                        detect_and_store_active_port(id.clone(), daemon_pid);
677                                    }
678                                }
679                                Err(_) => {
680                                    trace!("daemon {id} port check: port {port} not listening yet");
681                                }
682                            }
683                        }
684                    }
685                    _ = async {
686                        if let Some(ref mut interval) = cmd_check_interval {
687                            interval.tick().await;
688                        } else {
689                            std::future::pending::<()>().await;
690                        }
691                    }, if !ready_notified && ready_cmd.is_some() => {
692                        if let Some(ref cmd) = ready_cmd {
693                            // Run the readiness check command using the shell abstraction
694                            let mut command = Shell::default_for_platform().command(cmd);
695                            command
696                                .current_dir(&daemon_dir)
697                                .stdout(std::process::Stdio::null())
698                                .stderr(std::process::Stdio::null());
699                            let result: std::io::Result<std::process::ExitStatus> = command.status().await;
700                            match result {
701                                Ok(status) if status.success() => {
702                                    info!("daemon {id} ready: readiness command succeeded");
703                                    ready_notified = true;
704                                    let _ = log_appender.flush().await;
705                                    if let Some(tx) = ready_tx.take() {
706                                        let _ = tx.send(Ok(()));
707                                    }
708                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
709                                    // Stop checking once ready
710                                    cmd_check_interval = None;
711                                    if !active_port_spawned && has_port_config {
712                                        active_port_spawned = true;
713                                        detect_and_store_active_port(id.clone(), daemon_pid);
714                                    }
715                                }
716                                Ok(_) => {
717                                    trace!("daemon {id} cmd check: command returned non-zero (not ready)");
718                                }
719                                Err(e) => {
720                                    trace!("daemon {id} cmd check failed: {e}");
721                                }
722                            }
723                        }
724                    }
725                    _ = async {
726                        if let Some(ref mut timer) = delay_timer {
727                            timer.await;
728                        } else {
729                            std::future::pending::<()>().await;
730                        }
731                    } => {
732                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
733                            info!("daemon {id} ready: delay elapsed");
734                            ready_notified = true;
735                            // Flush logs before notifying so clients see logs immediately
736                            let _ = log_appender.flush().await;
737                            if let Some(tx) = ready_tx.take() {
738                                let _ = tx.send(Ok(()));
739                            }
740                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
741                        }
742                        // Disable timer after it fires
743                        delay_timer = None;
744                        if !active_port_spawned && has_port_config {
745                            active_port_spawned = true;
746                            detect_and_store_active_port(id.clone(), daemon_pid);
747                        }
748                    }
749                    _ = log_flush_interval.tick() => {
750                        // Periodic flush to ensure logs are written to disk
751                        if let Err(e) = log_appender.flush().await {
752                            error!("Failed to flush log for daemon {id}: {e}");
753                        }
754                    }
755                }
756            }
757
758            // Final flush to ensure all buffered logs are written
759            if let Err(e) = log_appender.flush().await {
760                error!("Failed to final flush log for daemon {id}: {e}");
761            }
762
763            // Clear active_port since the process is no longer running
764            {
765                let mut state_file = SUPERVISOR.state_file.lock().await;
766                if let Some(d) = state_file.daemons.get_mut(&id) {
767                    d.active_port = None;
768                }
769                if let Err(e) = state_file.write() {
770                    debug!("Failed to write state after clearing active_port for {id}: {e}");
771                }
772            }
773
774            // Get the final exit status
775            let exit_status = if let Some(status) = exit_status {
776                status
777            } else {
778                // Streams closed but process hasn't exited yet, wait for it
779                match exit_rx.recv().await {
780                    Some(status) => status,
781                    None => {
782                        warn!("daemon {id} exit channel closed without receiving status");
783                        Err(std::io::Error::other("exit channel closed"))
784                    }
785                }
786            };
787            let current_daemon = SUPERVISOR.get_daemon(&id).await;
788
789            // Signal that this monitoring task is processing its exit path.
790            // The RAII guard will decrement the counter and notify close()
791            // when the task finishes (including all fire_hook registrations),
792            // regardless of which return path is taken.
793            SUPERVISOR
794                .active_monitors
795                .fetch_add(1, atomic::Ordering::Release);
796            struct MonitorGuard;
797            impl Drop for MonitorGuard {
798                fn drop(&mut self) {
799                    SUPERVISOR
800                        .active_monitors
801                        .fetch_sub(1, atomic::Ordering::Release);
802                    SUPERVISOR.monitor_done.notify_waiters();
803                }
804            }
805            let _monitor_guard = MonitorGuard;
806            // Check if this monitoring task is for the current daemon process.
807            // Allow Stopped/Stopping daemons through: stop() clears pid atomically,
808            // so d.pid != Some(pid) would be true, but we still need the is_stopped()
809            // branch below to fire on_stop/on_exit hooks.
810            if current_daemon.is_none()
811                || current_daemon.as_ref().is_some_and(|d| {
812                    d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
813                })
814            {
815                // Another process has taken over, don't update status
816                return;
817            }
818            // Capture the intentional-stop flag BEFORE any state changes.
819            // stop() transitions Stopping → Stopped and clears pid. If stop() wins the race
820            // and sets Stopped before this task runs, we still need to fire on_stop/on_exit.
821            // Treat both Stopping and Stopped as "intentional stop by pitchfork".
822            let already_stopped = current_daemon
823                .as_ref()
824                .is_some_and(|d| d.status.is_stopped());
825            let is_stopping = already_stopped
826                || current_daemon
827                    .as_ref()
828                    .is_some_and(|d| d.status.is_stopping());
829
830            // --- Phase 1: Determine exit_code, exit_reason, and update daemon state ---
831            let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
832                (Ok(status), true) => {
833                    // Intentional stop (by pitchfork). status.code() returns None
834                    // on Unix when killed by signal (e.g. SIGTERM); use -1 to
835                    // distinguish from a clean exit code 0.
836                    (status.code().unwrap_or(-1), "stop")
837                }
838                (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
839                (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
840                (Err(_), true) => {
841                    // child.wait() error while stopping (e.g. sysinfo reaped the process)
842                    (-1, "stop")
843                }
844                (Err(_), false) => (-1, "fail"),
845            };
846
847            // Update daemon state unless stop() already did it (won the race).
848            if !already_stopped {
849                if let Ok(status) = &exit_status {
850                    info!("daemon {id} exited with status {status}");
851                }
852                let (new_status, last_exit_success) = match exit_reason {
853                    "stop" | "exit" => (
854                        DaemonStatus::Stopped,
855                        exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
856                    ),
857                    _ => (DaemonStatus::Errored(exit_code), false),
858                };
859                if let Err(e) = SUPERVISOR
860                    .upsert_daemon(
861                        UpsertDaemonOpts::builder(id.clone())
862                            .set(|o| {
863                                o.pid = None;
864                                o.status = new_status;
865                                o.last_exit_success = Some(last_exit_success);
866                            })
867                            .build(),
868                    )
869                    .await
870                {
871                    error!("Failed to update daemon state for {id}: {e}");
872                }
873            }
874
875            // --- Phase 2: Fire hooks ---
876            let hook_extra_env = vec![
877                ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
878                ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
879            ];
880
881            // Determine which hooks to fire based on exit reason
882            let hooks_to_fire: Vec<HookType> = match exit_reason {
883                "stop" => vec![HookType::OnStop, HookType::OnExit],
884                "exit" => vec![HookType::OnExit],
885                // "fail": fire on_fail + on_exit only when retries are exhausted
886                _ if hook_retry_count >= hook_retry => {
887                    vec![HookType::OnFail, HookType::OnExit]
888                }
889                _ => vec![],
890            };
891
892            for hook_type in hooks_to_fire {
893                fire_hook(
894                    hook_type,
895                    id.clone(),
896                    daemon_dir.clone(),
897                    hook_retry_count,
898                    hook_daemon_env.clone(),
899                    hook_extra_env.clone(),
900                )
901                .await;
902            }
903        });
904
905        // If wait_ready is true, wait for readiness notification
906        if let Some(ready_rx) = ready_rx {
907            match ready_rx.await {
908                Ok(Ok(())) => {
909                    info!("daemon {id} is ready");
910                    Ok(IpcResponse::DaemonReady { daemon })
911                }
912                Ok(Err(exit_code)) => {
913                    error!("daemon {id} failed before becoming ready");
914                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
915                }
916                Err(_) => {
917                    error!("readiness channel closed unexpectedly for daemon {id}");
918                    Ok(IpcResponse::DaemonStart { daemon })
919                }
920            }
921        } else {
922            Ok(IpcResponse::DaemonStart { daemon })
923        }
924    }
925
926    /// Stop a running daemon
927    pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
928        let pitchfork_id = DaemonId::pitchfork();
929        if *id == pitchfork_id {
930            return Ok(IpcResponse::Error(
931                "Cannot stop supervisor via stop command".into(),
932            ));
933        }
934        info!("stopping daemon: {id}");
935        if let Some(daemon) = self.get_daemon(id).await {
936            trace!("daemon to stop: {daemon}");
937            if let Some(pid) = daemon.pid {
938                trace!("killing pid: {pid}");
939                PROCS.refresh_processes();
940                if PROCS.is_running(pid) {
941                    // First set status to Stopping (preserve PID for monitoring task)
942                    self.upsert_daemon(
943                        UpsertDaemonOpts::builder(id.clone())
944                            .set(|o| {
945                                o.pid = Some(pid);
946                                o.status = DaemonStatus::Stopping;
947                            })
948                            .build(),
949                    )
950                    .await?;
951
952                    // Kill the entire process group atomically (daemon PID == PGID
953                    // because we called setsid() at spawn time)
954                    if let Err(e) = PROCS.kill_process_group_async(pid).await {
955                        debug!("failed to kill pid {pid}: {e}");
956                        // Check if the process is actually stopped despite the error
957                        PROCS.refresh_processes();
958                        if PROCS.is_running(pid) {
959                            // Process still running after kill attempt - set back to Running
960                            debug!("failed to stop pid {pid}: process still running after kill");
961                            self.upsert_daemon(
962                                UpsertDaemonOpts::builder(id.clone())
963                                    .set(|o| {
964                                        o.pid = Some(pid); // Preserve PID to avoid orphaning the process
965                                        o.status = DaemonStatus::Running;
966                                    })
967                                    .build(),
968                            )
969                            .await?;
970                            return Ok(IpcResponse::DaemonStopFailed {
971                                error: format!(
972                                    "process {pid} still running after kill attempt: {e}"
973                                ),
974                            });
975                        }
976                    }
977
978                    // Process successfully stopped
979                    // Note: kill_async uses SIGTERM -> wait ~3s -> SIGKILL strategy,
980                    // and also detects zombie processes, so by the time it returns,
981                    // the process should be fully terminated.
982                    self.upsert_daemon(
983                        UpsertDaemonOpts::builder(id.clone())
984                            .set(|o| {
985                                o.pid = None;
986                                o.status = DaemonStatus::Stopped;
987                                o.last_exit_success = Some(true); // Manual stop is considered successful
988                            })
989                            .build(),
990                    )
991                    .await?;
992                } else {
993                    debug!("pid {pid} not running, process may have exited unexpectedly");
994                    // Process already dead, directly mark as stopped
995                    // Note that the cleanup logic is handled in monitor task
996                    self.upsert_daemon(
997                        UpsertDaemonOpts::builder(id.clone())
998                            .set(|o| {
999                                o.pid = None;
1000                                o.status = DaemonStatus::Stopped;
1001                            })
1002                            .build(),
1003                    )
1004                    .await?;
1005                    return Ok(IpcResponse::DaemonWasNotRunning);
1006                }
1007                Ok(IpcResponse::Ok)
1008            } else {
1009                debug!("daemon {id} not running");
1010                Ok(IpcResponse::DaemonNotRunning)
1011            }
1012        } else {
1013            debug!("daemon {id} not found");
1014            Ok(IpcResponse::DaemonNotFound)
1015        }
1016    }
1017}
1018
1019#[cfg(unix)]
1020fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1021    let settings_user = settings().supervisor.user.trim();
1022    let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1023    let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1024    let configured = daemon_user.or(settings_user);
1025    let current_uid = nix::unistd::Uid::effective().as_raw();
1026    let current_gid = nix::unistd::Gid::effective().as_raw();
1027    resolve_run_identity(
1028        configured,
1029        current_uid,
1030        current_gid,
1031        std::env::var("SUDO_UID").ok().as_deref(),
1032        std::env::var("SUDO_GID").ok().as_deref(),
1033    )
1034}
1035
1036#[cfg(unix)]
1037fn resolve_run_identity(
1038    configured: Option<&str>,
1039    current_uid: u32,
1040    current_gid: u32,
1041    sudo_uid: Option<&str>,
1042    sudo_gid: Option<&str>,
1043) -> Result<RunIdentity> {
1044    let current_uid = nix::unistd::Uid::from_raw(current_uid);
1045    let current_gid = nix::unistd::Gid::from_raw(current_gid);
1046    if let Some(user) = configured {
1047        let identity = resolve_configured_user(user)?;
1048        ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1049        if identity.matches(current_uid, current_gid) {
1050            return Ok(RunIdentity::Inherit);
1051        }
1052        return Ok(identity);
1053    }
1054
1055    if current_uid.is_root()
1056        && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1057    {
1058        return Ok(identity);
1059    }
1060
1061    Ok(RunIdentity::Inherit)
1062}
1063
1064#[cfg(unix)]
1065fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1066    if user.chars().all(|c| c.is_ascii_digit()) {
1067        let uid = user
1068            .parse::<u32>()
1069            .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1070        let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1071            .into_diagnostic()?
1072            .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1073        return run_identity_from_user_record(user_record);
1074    }
1075
1076    let user_record = nix::unistd::User::from_name(user)
1077        .into_diagnostic()?
1078        .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1079    run_identity_from_user_record(user_record)
1080}
1081
1082#[cfg(unix)]
1083fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1084    let username = CString::new(user.name)
1085        .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1086    Ok(RunIdentity::Switch {
1087        uid: user.uid,
1088        gid: user.gid,
1089        username: Some(username),
1090    })
1091}
1092
1093#[cfg(unix)]
1094fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1095    RunIdentity::Switch {
1096        uid: nix::unistd::Uid::from_raw(uid),
1097        gid: nix::unistd::Gid::from_raw(gid),
1098        username,
1099    }
1100}
1101
1102#[cfg(unix)]
1103fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1104    let uid = sudo_uid?.parse::<u32>().ok()?;
1105    let gid = sudo_gid?.parse::<u32>().ok()?;
1106    let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1107        .ok()
1108        .flatten()
1109        .and_then(|u| CString::new(u.name).ok());
1110    Some(run_identity_from_raw_ids(uid, gid, username))
1111}
1112
1113#[cfg(unix)]
1114fn ensure_can_use_identity(
1115    configured_user: &str,
1116    identity: &RunIdentity,
1117    current_uid: nix::unistd::Uid,
1118    current_gid: nix::unistd::Gid,
1119) -> Result<()> {
1120    let RunIdentity::Switch { uid, gid, .. } = identity else {
1121        return Ok(());
1122    };
1123    if *uid == current_uid && *gid == current_gid {
1124        return Ok(());
1125    }
1126    if current_uid.is_root() {
1127        return Ok(());
1128    }
1129    Err(miette::miette!(
1130        "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1131        configured_user,
1132        current_uid.as_raw(),
1133        current_gid.as_raw(),
1134        uid.as_raw(),
1135        gid.as_raw()
1136    ))
1137}
1138
1139#[cfg(unix)]
1140fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1141    let RunIdentity::Switch { uid, gid, username } = identity else {
1142        return Ok(());
1143    };
1144    if let Some(username) = username {
1145        initgroups_for_user(username, *gid)?;
1146    } else {
1147        setgroups_to_primary(*gid)?;
1148    }
1149    nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1150    nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1151    Ok(())
1152}
1153
1154#[cfg(unix)]
1155impl RunIdentity {
1156    fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1157        matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1158    }
1159}
1160
1161#[cfg(unix)]
1162fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1163    let groups = [gid.as_raw() as libc::gid_t];
1164    #[cfg(any(target_os = "linux", target_os = "android"))]
1165    let group_count = groups.len();
1166    #[cfg(not(any(target_os = "linux", target_os = "android")))]
1167    let group_count = groups.len() as libc::c_int;
1168    let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1169    if rc == -1 {
1170        Err(std::io::Error::last_os_error())
1171    } else {
1172        Ok(())
1173    }
1174}
1175
1176#[cfg(unix)]
1177fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1178    let gid = gid.as_raw();
1179    #[cfg(any(
1180        target_os = "macos",
1181        target_os = "ios",
1182        target_os = "tvos",
1183        target_os = "watchos"
1184    ))]
1185    let base_gid = i32::try_from(gid)
1186        .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1187
1188    #[cfg(not(any(
1189        target_os = "macos",
1190        target_os = "ios",
1191        target_os = "tvos",
1192        target_os = "watchos"
1193    )))]
1194    let base_gid = gid as libc::gid_t;
1195
1196    // SAFETY: `username` is a valid nul-terminated C string and `base_gid`
1197    // is derived from a resolved system account or sudo-provided gid.
1198    let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1199    if rc == -1 {
1200        Err(std::io::Error::last_os_error())
1201    } else {
1202        Ok(())
1203    }
1204}
1205
1206#[cfg(unix)]
1207fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1208    std::io::Error::from_raw_os_error(err as i32)
1209}
1210
1211/// Check if multiple ports are available and optionally auto-bump to find available ports.
1212///
1213/// All ports are bumped by the same offset to maintain relative port spacing.
1214/// Returns the resolved ports (either the original or bumped ones).
1215/// Returns an error if any port is in use and auto_bump is disabled,
1216/// or if no available ports can be found after max attempts.
1217async fn check_ports_available(
1218    expected_ports: &[u16],
1219    auto_bump: bool,
1220    max_attempts: u32,
1221) -> Result<Vec<u16>> {
1222    if expected_ports.is_empty() {
1223        return Ok(Vec::new());
1224    }
1225
1226    for bump_offset in 0..=max_attempts {
1227        // Use wrapping_add to handle overflow correctly - ports wrap around at 65535
1228        let candidate_ports: Vec<u16> = expected_ports
1229            .iter()
1230            .map(|&p| p.wrapping_add(bump_offset as u16))
1231            .collect();
1232
1233        // Check if all ports in this set are available
1234        let mut all_available = true;
1235        let mut conflicting_port = None;
1236
1237        for &port in &candidate_ports {
1238            // Port 0 is a special case - it requests an ephemeral port from the OS.
1239            // Skip the availability check for port 0 since binding to it always succeeds.
1240            if port == 0 {
1241                continue;
1242            }
1243
1244            // Use spawn_blocking to avoid blocking the async runtime during TCP bind checks.
1245            //
1246            // We check multiple addresses to avoid false-negatives caused by SO_REUSEADDR.
1247            // On macOS/BSD, Rust's TcpListener::bind sets SO_REUSEADDR by default, which
1248            // allows binding 0.0.0.0:port even when 127.0.0.1:port is already in use
1249            // (because 0.0.0.0 is technically a different address).  Most daemons bind
1250            // to localhost, so checking 127.0.0.1 is essential to detect real conflicts.
1251            // We also check [::1] to cover IPv6 loopback listeners.
1252            //
1253            // NOTE: This check has a time-of-check-to-time-of-use (TOCTOU) race condition.
1254            // Another process could grab the port between our check and the daemon actually
1255            // binding. This is inherent to the approach and acceptable for our use case
1256            // since we're primarily detecting conflicts with already-running daemons.
1257            if is_port_in_use(port).await {
1258                all_available = false;
1259                conflicting_port = Some(port);
1260                break;
1261            }
1262        }
1263
1264        if all_available {
1265            // Check for overflow (port wrapped around to 0 due to wrapping_add)
1266            // If any candidate port is 0 but the original expected port wasn't 0,
1267            // it means we've wrapped around and should stop
1268            if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1269                return Err(PortError::NoAvailablePort {
1270                    start_port: expected_ports[0],
1271                    attempts: bump_offset + 1,
1272                }
1273                .into());
1274            }
1275            if bump_offset > 0 {
1276                info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1277            }
1278            return Ok(candidate_ports);
1279        }
1280
1281        // Port is in use
1282        if bump_offset == 0 && !auto_bump {
1283            if let Some(port) = conflicting_port {
1284                let (pid, process) = identify_port_owner(port).await;
1285                return Err(PortError::InUse { port, process, pid }.into());
1286            }
1287        }
1288    }
1289
1290    // No available ports found after max attempts
1291    Err(PortError::NoAvailablePort {
1292        start_port: expected_ports[0],
1293        attempts: max_attempts + 1,
1294    }
1295    .into())
1296}
1297
1298/// Check whether a port is currently in use by attempting to bind on multiple addresses.
1299///
1300/// Returns `true` when at least one bind attempt gets `AddrInUse`, meaning another
1301/// process is listening.  Other errors (e.g. `AddrNotAvailable` on an address family
1302/// the OS doesn't support) are ignored so they don't produce false positives.
1303async fn is_port_in_use(port: u16) -> bool {
1304    tokio::task::spawn_blocking(move || {
1305        for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1306            match std::net::TcpListener::bind((addr, port)) {
1307                Ok(listener) => drop(listener),
1308                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1309                Err(_) => continue,
1310            }
1311        }
1312        false
1313    })
1314    .await
1315    .unwrap_or(false)
1316}
1317
1318/// Best-effort lookup of the process occupying a port via `listeners::get_all()`.
1319///
1320/// Returns `(pid, process_name)`.  Falls back to `(0, "unknown")` when the
1321/// system call fails (permission error, unsupported OS, etc.).
1322async fn identify_port_owner(port: u16) -> (u32, String) {
1323    tokio::task::spawn_blocking(move || {
1324        listeners::get_all()
1325            .ok()
1326            .and_then(|list| {
1327                list.into_iter()
1328                    .find(|l| l.socket.port() == port)
1329                    .map(|l| (l.process.pid, l.process.name))
1330            })
1331            .unwrap_or((0, "unknown".to_string()))
1332    })
1333    .await
1334    .unwrap_or((0, "unknown".to_string()))
1335}
1336
1337/// Detect whether a port is in use, and if so, identify the owning process.
1338///
1339/// Combines `is_port_in_use` (reliable bind probe) with `identify_port_owner`
1340/// (best-effort process lookup).  Returns `None` when the port is free.
1341async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1342    if !is_port_in_use(port).await {
1343        return None;
1344    }
1345    Some(identify_port_owner(port).await)
1346}
1347
1348/// Spawn a background task that detects the first port the daemon process is listening on
1349/// and stores it in the state file as `active_port`.
1350///
1351/// This is called once when the daemon becomes ready. The port is cleared when the daemon stops.
1352///
1353/// Port selection strategy:
1354/// 1. If the daemon has `expected_port` configured, prefer the first port from that list
1355///    (it is the port the operator explicitly designated as the primary service port).
1356/// 2. Otherwise, take the first port the process is actually listening on (in the order
1357///    returned by the OS), which is typically the port bound earliest.
1358///
1359/// Using `min()` (lowest port number) was previously used here but is incorrect: many
1360/// applications listen on multiple ports (e.g. HTTP + metrics) and the lowest-numbered
1361/// port is not necessarily the primary service port.
1362fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1363    tokio::spawn(async move {
1364        // Retry with exponential backoff so that slow-starting daemons (JVM,
1365        // Node.js, Python, etc.) that take more than 500 ms to bind their port
1366        // are still detected.  Total wait budget: 500+1000+2000+4000 = 7.5 s.
1367        for delay_ms in [500u64, 1000, 2000, 4000] {
1368            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1369
1370            // Read daemon state atomically: check if still alive and get expected_port
1371            // in a single lock acquisition to avoid TOCTOU and unnecessary lock overhead.
1372            let expected_port: Option<u16> = {
1373                let state_file = SUPERVISOR.state_file.lock().await;
1374                match state_file.daemons.get(&id) {
1375                    Some(d) if d.pid.is_none() => {
1376                        debug!("daemon {id}: aborting active_port detection — process exited");
1377                        return;
1378                    }
1379                    Some(d) => d.expected_port.first().copied().filter(|&p| p > 0),
1380                    None => None,
1381                }
1382            };
1383
1384            let active_port = tokio::task::spawn_blocking(move || {
1385                let listeners = listeners::get_all().ok()?;
1386                let process_ports: Vec<u16> = listeners
1387                    .into_iter()
1388                    .filter(|listener| listener.process.pid == pid)
1389                    .map(|listener| listener.socket.port())
1390                    .filter(|&port| port > 0)
1391                    .collect();
1392
1393                if process_ports.is_empty() {
1394                    return None;
1395                }
1396
1397                // Prefer the configured expected_port if the process is actually
1398                // listening on it; otherwise fall back to the first port found.
1399                if let Some(ep) = expected_port {
1400                    if process_ports.contains(&ep) {
1401                        return Some(ep);
1402                    }
1403                }
1404
1405                // No expected_port match — return the first port in the list.
1406                // The list order reflects the order the OS reports listeners,
1407                // which is generally the order they were bound (earliest first).
1408                // Do NOT sort: the lowest-numbered port is not necessarily the
1409                // primary service port (e.g. HTTP vs metrics).
1410                process_ports.into_iter().next()
1411            })
1412            .await
1413            .ok()
1414            .flatten();
1415
1416            if let Some(port) = active_port {
1417                debug!("daemon {id} active_port detected: {port}");
1418                let mut state_file = SUPERVISOR.state_file.lock().await;
1419                if let Some(d) = state_file.daemons.get_mut(&id) {
1420                    // Guard against PID reuse: if the original process exited and the OS
1421                    // assigned the same PID to an unrelated process that happens to bind
1422                    // a port, we must not route proxy traffic to that unrelated service.
1423                    if d.pid == Some(pid) {
1424                        d.active_port = Some(port);
1425                    } else {
1426                        debug!(
1427                            "daemon {id}: skipping active_port write — PID mismatch \
1428                             (expected {pid}, current {:?})",
1429                            d.pid
1430                        );
1431                        return;
1432                    }
1433                }
1434                if let Err(e) = state_file.write() {
1435                    debug!("Failed to write state after detecting active_port for {id}: {e}");
1436                }
1437                return;
1438            }
1439
1440            debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1441        }
1442
1443        debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1444    });
1445}
1446
1447/// Check whether a daemon (by its qualified ID) is the target of any registered
1448/// slug in the global config.  This is used to decide whether to run the
1449/// `detect_and_store_active_port` polling task — only slug-targeted daemons need
1450/// it, avoiding wasted `listeners::get_all()` calls for port-less daemons.
1451///
1452/// Delegates to `proxy::server::is_slug_target()` which uses the same in-memory
1453/// slug cache as the proxy hot path, so this check is cheap.
1454fn is_daemon_slug_target(id: &DaemonId) -> bool {
1455    // read_global_slugs is called once per daemon start — acceptable cost.
1456    // We intentionally avoid making this async to keep has_port_config evaluation
1457    // simple and synchronous in run_once().
1458    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1459    slugs.iter().any(|(slug, entry)| {
1460        let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1461        id.name() == daemon_name
1462    })
1463}
1464
1465#[cfg(all(test, unix))]
1466mod tests {
1467    use super::*;
1468
1469    #[test]
1470    fn test_resolve_run_identity_empty_without_sudo() {
1471        let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1472        assert_eq!(identity, RunIdentity::Inherit);
1473    }
1474
1475    #[test]
1476    fn test_resolve_run_identity_sudo_fallback() {
1477        let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1478        let RunIdentity::Switch { uid, gid, .. } = identity else {
1479            panic!("expected identity switch");
1480        };
1481        assert_eq!(uid.as_raw(), 501);
1482        assert_eq!(gid.as_raw(), 20);
1483    }
1484
1485    #[test]
1486    fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1487        let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1488        assert_eq!(identity, RunIdentity::Inherit);
1489    }
1490
1491    #[test]
1492    fn test_resolve_configured_user_root_name() {
1493        let identity = resolve_configured_user("root").unwrap();
1494        let RunIdentity::Switch { uid, username, .. } = identity else {
1495            panic!("expected identity switch");
1496        };
1497        assert_eq!(uid.as_raw(), 0);
1498        assert_eq!(
1499            username.as_deref().and_then(|s| s.to_str().ok()),
1500            Some("root")
1501        );
1502    }
1503
1504    #[test]
1505    fn test_resolve_configured_user_root_uid() {
1506        let identity = resolve_configured_user("0").unwrap();
1507        let RunIdentity::Switch { uid, username, .. } = identity else {
1508            panic!("expected identity switch");
1509        };
1510        assert_eq!(uid.as_raw(), 0);
1511        assert_eq!(
1512            username.as_deref().and_then(|s| s.to_str().ok()),
1513            Some("root")
1514        );
1515    }
1516
1517    #[test]
1518    fn test_resolve_configured_user_missing_user_fails() {
1519        let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1520            .unwrap_err()
1521            .to_string();
1522        assert!(err.contains("does not exist"));
1523    }
1524
1525    #[test]
1526    fn test_resolve_run_identity_requires_root_for_user_switch() {
1527        let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1528            .unwrap_err()
1529            .to_string();
1530        assert!(err.contains("Restart the supervisor with sudo"));
1531    }
1532
1533    #[test]
1534    fn test_resolve_run_identity_same_user_is_noop() {
1535        let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1536        assert_eq!(identity, RunIdentity::Inherit);
1537    }
1538}