Skip to main content

pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::hooks::{HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22use std::iter::once;
23use std::sync::atomic;
24use std::time::Duration;
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
26use tokio::select;
27use tokio::sync::oneshot;
28use tokio::time;
29
30/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
31static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
32    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
33
34/// Get or compile a regex pattern, caching the result for future use
35pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
36    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
37    if let Some(re) = cache.get(pattern) {
38        return Some(re.clone());
39    }
40    match Regex::new(pattern) {
41        Ok(re) => {
42            cache.insert(pattern.to_string(), re.clone());
43            Some(re)
44        }
45        Err(e) => {
46            error!("invalid regex pattern '{pattern}': {e}");
47            None
48        }
49    }
50}
51
52impl Supervisor {
53    /// Run a daemon, handling retries if configured
54    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
55        let id = &opts.id;
56        let cmd = opts.cmd.clone();
57
58        // Clear any pending autostop for this daemon since it's being started
59        {
60            let mut pending = self.pending_autostops.lock().await;
61            if pending.remove(id).is_some() {
62                info!("cleared pending autostop for {id} (daemon starting)");
63            }
64        }
65
66        let daemon = self.get_daemon(id).await;
67        if let Some(daemon) = daemon {
68            // Stopping state is treated as "not running" - the monitoring task will clean it up
69            // Only check for Running state with a valid PID
70            if !daemon.status.is_stopping()
71                && !daemon.status.is_stopped()
72                && let Some(pid) = daemon.pid
73            {
74                if opts.force {
75                    self.stop(id).await?;
76                    info!("run: stop completed for daemon {id}");
77                } else {
78                    warn!("daemon {id} already running with pid {pid}");
79                    return Ok(IpcResponse::DaemonAlreadyRunning);
80                }
81            }
82        }
83
84        // If wait_ready is true and retry is configured, implement retry loop
85        if opts.wait_ready && opts.retry > 0 {
86            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
87            let max_attempts = opts.retry.saturating_add(1);
88            for attempt in 0..max_attempts {
89                let mut retry_opts = opts.clone();
90                retry_opts.retry_count = attempt;
91                retry_opts.cmd = cmd.clone();
92
93                let result = self.run_once(retry_opts).await?;
94
95                match result {
96                    IpcResponse::DaemonReady { daemon } => {
97                        return Ok(IpcResponse::DaemonReady { daemon });
98                    }
99                    IpcResponse::DaemonFailedWithCode { exit_code } => {
100                        if attempt < opts.retry {
101                            let backoff_secs = 2u64.pow(attempt);
102                            info!(
103                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
104                                attempt + 1,
105                                max_attempts,
106                                backoff_secs
107                            );
108                            fire_hook(
109                                HookType::OnRetry,
110                                id.clone(),
111                                opts.dir.clone(),
112                                attempt + 1,
113                                opts.env.clone(),
114                                vec![],
115                            )
116                            .await;
117                            time::sleep(Duration::from_secs(backoff_secs)).await;
118                            continue;
119                        } else {
120                            info!("daemon {id} failed after {max_attempts} attempts");
121                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
122                        }
123                    }
124                    other => return Ok(other),
125                }
126            }
127        }
128
129        // No retry or wait_ready is false
130        self.run_once(opts).await
131    }
132
133    /// Run a daemon once (single attempt)
134    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
135        let id = &opts.id;
136        let original_cmd = opts.cmd.clone(); // Save original command for persistence
137        let cmd = opts.cmd;
138
139        // Create channel for readiness notification if wait_ready is true
140        let (ready_tx, ready_rx) = if opts.wait_ready {
141            let (tx, rx) = oneshot::channel();
142            (Some(tx), Some(rx))
143        } else {
144            (None, None)
145        };
146
147        // Check port availability and apply auto-bump if configured
148        let expected_ports = opts.expected_port.clone();
149        let (resolved_ports, effective_ready_port) = if !opts.expected_port.is_empty() {
150            match check_ports_available(
151                &opts.expected_port,
152                opts.auto_bump_port,
153                opts.port_bump_attempts,
154            )
155            .await
156            {
157                Ok(resolved) => {
158                    let ready_port = if let Some(configured_port) = opts.ready_port {
159                        // If ready_port matches one of the expected ports, apply the same bump offset
160                        let bump_offset = resolved
161                            .first()
162                            .unwrap_or(&0)
163                            .saturating_sub(*opts.expected_port.first().unwrap_or(&0));
164                        if opts.expected_port.contains(&configured_port) && bump_offset > 0 {
165                            configured_port
166                                .checked_add(bump_offset)
167                                .or(Some(configured_port))
168                        } else {
169                            Some(configured_port)
170                        }
171                    } else if opts.ready_output.is_none()
172                        && opts.ready_http.is_none()
173                        && opts.ready_cmd.is_none()
174                        && opts.ready_delay.is_none()
175                    {
176                        // No other ready check configured — use the first expected port as a
177                        // TCP port readiness check so the daemon is considered ready once it
178                        // starts listening.  Skip port 0 (ephemeral port request).
179                        resolved.first().copied().filter(|&p| p != 0)
180                    } else {
181                        // Another ready check is configured (output/http/cmd/delay).
182                        // Don't add an implicit TCP port check — it could race and fire
183                        // before the daemon has produced any output.
184                        None
185                    };
186                    info!(
187                        "daemon {id}: ports {:?} resolved to {:?}",
188                        opts.expected_port, resolved
189                    );
190                    (resolved, ready_port)
191                }
192                Err(e) => {
193                    error!("daemon {id}: port check failed: {e}");
194                    // Convert PortError to structured IPC response
195                    if let Some(port_error) = e.downcast_ref::<PortError>() {
196                        match port_error {
197                            PortError::InUse { port, process, pid } => {
198                                return Ok(IpcResponse::PortConflict {
199                                    port: *port,
200                                    process: process.clone(),
201                                    pid: *pid,
202                                });
203                            }
204                            PortError::NoAvailablePort {
205                                start_port,
206                                attempts,
207                            } => {
208                                return Ok(IpcResponse::NoAvailablePort {
209                                    start_port: *start_port,
210                                    attempts: *attempts,
211                                });
212                            }
213                        }
214                    }
215                    return Ok(IpcResponse::DaemonFailed {
216                        error: e.to_string(),
217                    });
218                }
219            }
220        } else {
221            // When ready_port is set without expected_port, check that the port
222            // is not already occupied.  If another process is listening on it,
223            // the TCP readiness probe would immediately succeed and pitchfork
224            // would falsely consider the daemon ready — routing proxy traffic to
225            // the wrong process.
226            if let Some(port) = opts.ready_port {
227                if port > 0 {
228                    if let Some((pid, process)) = detect_port_conflict(port).await {
229                        return Ok(IpcResponse::PortConflict { port, process, pid });
230                    }
231                }
232            }
233            (Vec::new(), opts.ready_port)
234        };
235
236        let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
237            match settings().resolve_mise_bin() {
238                Some(mise_bin) => {
239                    let mise_bin_str = mise_bin.to_string_lossy().to_string();
240                    info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
241                    once("exec".to_string())
242                        .chain(once(mise_bin_str))
243                        .chain(once("x".to_string()))
244                        .chain(once("--".to_string()))
245                        .chain(cmd)
246                        .collect_vec()
247                }
248                None => {
249                    warn!("daemon {id}: mise=true but mise binary not found, running without mise");
250                    once("exec".to_string()).chain(cmd).collect_vec()
251                }
252            }
253        } else {
254            once("exec".to_string()).chain(cmd).collect_vec()
255        };
256        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
257        let log_path = id.log_path();
258        if let Some(parent) = log_path.parent() {
259            xx::file::mkdirp(parent)?;
260        }
261        info!("run: spawning daemon {id} with args: {args:?}");
262        let mut cmd = tokio::process::Command::new("sh");
263        cmd.args(&args)
264            .stdin(std::process::Stdio::null())
265            .stdout(std::process::Stdio::piped())
266            .stderr(std::process::Stdio::piped())
267            .current_dir(&opts.dir);
268
269        // Ensure daemon can find user tools by using the original PATH
270        if let Some(ref path) = *env::ORIGINAL_PATH {
271            cmd.env("PATH", path);
272        }
273
274        // Apply custom environment variables from config
275        if let Some(ref env_vars) = opts.env {
276            cmd.envs(env_vars);
277        }
278
279        // Inject pitchfork metadata env vars AFTER user env so they can't be overwritten
280        cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
281        cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
282        cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
283
284        // Inject the resolved ports for the daemon to use
285        if !resolved_ports.is_empty() {
286            // Set PORT to the first port for backward compatibility
287            // When there's only one port, both PORT and PORT0 will be set to the same value.
288            // This follows the convention used by many deployment platforms (Heroku, etc.).
289            cmd.env("PORT", resolved_ports[0].to_string());
290            // Set individual ports as PORT0, PORT1, etc.
291            for (i, port) in resolved_ports.iter().enumerate() {
292                cmd.env(format!("PORT{i}"), port.to_string());
293            }
294        }
295
296        #[cfg(unix)]
297        {
298            unsafe {
299                cmd.pre_exec(move || {
300                    if libc::setsid() == -1 {
301                        return Err(std::io::Error::last_os_error());
302                    }
303                    Ok(())
304                });
305            }
306        }
307
308        let mut child = cmd.spawn().into_diagnostic()?;
309        let pid = match child.id() {
310            Some(p) => p,
311            None => {
312                warn!("Daemon {id} exited before PID could be captured");
313                return Ok(IpcResponse::DaemonFailed {
314                    error: "Process exited immediately".to_string(),
315                });
316            }
317        };
318        info!("started daemon {id} with pid {pid}");
319        let daemon = self
320            .upsert_daemon(
321                UpsertDaemonOpts::builder(id.clone())
322                    .set(|o| {
323                        o.pid = Some(pid);
324                        o.status = DaemonStatus::Running;
325                        o.shell_pid = opts.shell_pid;
326                        o.dir = Some(opts.dir.clone());
327                        o.cmd = Some(original_cmd);
328                        o.autostop = opts.autostop;
329                        o.cron_schedule = opts.cron_schedule.clone();
330                        o.cron_retrigger = opts.cron_retrigger;
331                        o.retry = Some(opts.retry);
332                        o.retry_count = Some(opts.retry_count);
333                        o.ready_delay = opts.ready_delay;
334                        o.ready_output = opts.ready_output.clone();
335                        o.ready_http = opts.ready_http.clone();
336                        o.ready_port = effective_ready_port;
337                        o.ready_cmd = opts.ready_cmd.clone();
338                        o.expected_port = expected_ports;
339                        o.resolved_port = resolved_ports;
340                        o.auto_bump_port = Some(opts.auto_bump_port);
341                        o.port_bump_attempts = Some(opts.port_bump_attempts);
342                        o.depends = Some(opts.depends.clone());
343                        o.env = opts.env.clone();
344                        o.watch = Some(opts.watch.clone());
345                        o.watch_base_dir = opts.watch_base_dir.clone();
346                        o.mise = opts.mise;
347                        o.memory_limit = opts.memory_limit;
348                        o.cpu_limit = opts.cpu_limit;
349                    })
350                    .build(),
351            )
352            .await?;
353
354        let id_clone = id.clone();
355        let ready_delay = opts.ready_delay;
356        let ready_output = opts.ready_output.clone();
357        let ready_http = opts.ready_http.clone();
358        let ready_port = effective_ready_port;
359        let ready_cmd = opts.ready_cmd.clone();
360        let daemon_dir = opts.dir.clone();
361        let hook_retry_count = opts.retry_count;
362        let hook_retry = opts.retry;
363        let hook_daemon_env = opts.env.clone();
364        // Whether this daemon has any port-related config — used to skip the
365        // active_port detection task for daemons that never bind a port (e.g. `sleep 60`).
366        // When the proxy is enabled, only detect active_port for daemons that are
367        // actually referenced by a registered slug, rather than blanket-polling every
368        // daemon (which wastes ~7.5 s of listeners::get_all() calls per port-less daemon).
369        let has_port_config = !opts.expected_port.is_empty()
370            || (settings().proxy.enable && is_daemon_slug_target(id));
371        let daemon_pid = pid;
372
373        tokio::spawn(async move {
374            let id = id_clone;
375            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
376                (Some(out), Some(err)) => (out, err),
377                _ => {
378                    error!("Failed to capture stdout/stderr for daemon {id}");
379                    return;
380                }
381            };
382            let mut stdout = tokio::io::BufReader::new(stdout).lines();
383            let mut stderr = tokio::io::BufReader::new(stderr).lines();
384            let log_file = match tokio::fs::File::options()
385                .append(true)
386                .create(true)
387                .open(&log_path)
388                .await
389            {
390                Ok(f) => f,
391                Err(e) => {
392                    error!("Failed to open log file for daemon {id}: {e}");
393                    return;
394                }
395            };
396            let mut log_appender = BufWriter::new(log_file);
397
398            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
399            let format_line = |line: String| {
400                if line.starts_with(&format!("{id} ")) {
401                    // mise tasks often already have the id printed
402                    format!("{} {line}\n", now())
403                } else {
404                    format!("{} {id} {line}\n", now())
405                }
406            };
407
408            // Setup readiness checking
409            let mut ready_notified = false;
410            let mut ready_tx = ready_tx;
411            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
412            // Track whether we've already spawned the active_port detection task
413            let mut active_port_spawned = false;
414
415            let mut delay_timer =
416                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
417
418            // Get settings for intervals
419            let s = settings();
420            let ready_check_interval = s.supervisor_ready_check_interval();
421            let http_client_timeout = s.supervisor_http_client_timeout();
422            let log_flush_interval_duration = s.supervisor_log_flush_interval();
423
424            // Setup HTTP readiness check interval
425            let mut http_check_interval = ready_http
426                .as_ref()
427                .map(|_| tokio::time::interval(ready_check_interval));
428            let http_client = ready_http.as_ref().map(|_| {
429                reqwest::Client::builder()
430                    .timeout(http_client_timeout)
431                    .build()
432                    .unwrap_or_default()
433            });
434
435            // Setup TCP port readiness check interval
436            let mut port_check_interval =
437                ready_port.map(|_| tokio::time::interval(ready_check_interval));
438
439            // Setup command readiness check interval
440            let mut cmd_check_interval = ready_cmd
441                .as_ref()
442                .map(|_| tokio::time::interval(ready_check_interval));
443
444            // Setup periodic log flush interval
445            let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
446
447            // Use a channel to communicate process exit status
448            let (exit_tx, mut exit_rx) =
449                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
450
451            // Spawn a task to wait for process exit
452            let child_pid = child.id().unwrap_or(0);
453            tokio::spawn(async move {
454                let result = child.wait().await;
455                // On non-Linux Unix (e.g. macOS) the zombie reaper may win the
456                // race and consume the exit status via waitpid(None, WNOHANG)
457                // before Tokio's child.wait() gets to it. When that happens,
458                // Tokio returns an ECHILD io::Error. We recover by checking
459                // REAPED_STATUSES for the stashed exit code.
460                //
461                // On Linux this is unnecessary because the reaper uses
462                // waitid(WNOWAIT) to peek before reaping, which avoids the
463                // race entirely.
464                #[cfg(all(unix, not(target_os = "linux")))]
465                let result = match &result {
466                    Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
467                        if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
468                            warn!(
469                                "daemon pid {child_pid} wait() got ECHILD; \
470                                 recovered exit code {code} from zombie reaper"
471                            );
472                            // Synthesize an ExitStatus from the stashed code.
473                            // On Unix we can use `ExitStatus::from_raw()` with
474                            // a wait-style status word (code << 8 for normal
475                            // exit, or raw signal number for signal death).
476                            use std::os::unix::process::ExitStatusExt;
477                            if code >= 0 {
478                                Ok(std::process::ExitStatus::from_raw(code << 8))
479                            } else {
480                                // Negative code means killed by signal (-sig)
481                                Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
482                            }
483                        } else {
484                            warn!(
485                                "daemon pid {child_pid} wait() got ECHILD but no \
486                                 stashed status found; reporting as error"
487                            );
488                            result
489                        }
490                    }
491                    _ => result,
492                };
493                debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
494                let _ = exit_tx.send(result).await;
495            });
496
497            #[allow(unused_assignments)]
498            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
499            let mut exit_status = None;
500
501            // If there is no ready check of any kind and no delay, the daemon is
502            // considered immediately ready and the active_port detection task would
503            // never be triggered inside the select loop.  Kick it off right away so
504            // that daemons without any readiness configuration still get their
505            // active_port populated (needed for proxy routing).
506            if has_port_config
507                && ready_pattern.is_none()
508                && ready_http.is_none()
509                && ready_port.is_none()
510                && ready_cmd.is_none()
511                && delay_timer.is_none()
512            {
513                active_port_spawned = true;
514                detect_and_store_active_port(id.clone(), daemon_pid);
515            }
516
517            loop {
518                select! {
519                                Ok(Some(line)) = stdout.next_line() => {
520                                    let formatted = format_line(line.clone());
521                                    if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
522                                        error!("Failed to write to log for daemon {id}: {e}");
523                                    }
524                                    trace!("stdout: {id} {formatted}");
525
526                        // Check if output matches ready pattern
527                        if !ready_notified
528                            && let Some(ref pattern) = ready_pattern
529                            && pattern.is_match(&line)
530                        {
531                            info!("daemon {id} ready: output matched pattern");
532                            ready_notified = true;
533                            let _ = log_appender.flush().await;
534                            if let Some(tx) = ready_tx.take() {
535                                let _ = tx.send(Ok(()));
536                            }
537                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
538                            if !active_port_spawned && has_port_config {
539                                active_port_spawned = true;
540                                detect_and_store_active_port(id.clone(), daemon_pid);
541                            }
542                        }
543                    }
544                    Ok(Some(line)) = stderr.next_line() => {
545                        let formatted = format_line(line.clone());
546                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
547                            error!("Failed to write to log for daemon {id}: {e}");
548                        }
549                        trace!("stderr: {id} {formatted}");
550
551                        if !ready_notified
552                            && let Some(ref pattern) = ready_pattern
553                            && pattern.is_match(&line)
554                        {
555                            info!("daemon {id} ready: output matched pattern");
556                            ready_notified = true;
557                            let _ = log_appender.flush().await;
558                            if let Some(tx) = ready_tx.take() {
559                                let _ = tx.send(Ok(()));
560                            }
561                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
562                            if !active_port_spawned && has_port_config {
563                                active_port_spawned = true;
564                                detect_and_store_active_port(id.clone(), daemon_pid);
565                            }
566                        }
567                    },
568                    Some(result) = exit_rx.recv() => {
569                        // Process exited - save exit status and notify if not ready yet
570                        exit_status = Some(result);
571                        debug!("daemon {id} process exited, exit_status: {exit_status:?}");
572                        // Flush logs before notifying so clients see logs immediately
573                        let _ = log_appender.flush().await;
574                        if !ready_notified {
575                            if let Some(tx) = ready_tx.take() {
576                                // Check if process exited successfully
577                                let is_success = exit_status.as_ref()
578                                    .and_then(|r| r.as_ref().ok())
579                                    .map(|s| s.success())
580                                    .unwrap_or(false);
581
582                                if is_success {
583                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
584                                    let _ = tx.send(Ok(()));
585                                } else {
586                                    let exit_code = exit_status.as_ref()
587                                        .and_then(|r| r.as_ref().ok())
588                                        .and_then(|s| s.code());
589                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
590                                    let _ = tx.send(Err(exit_code));
591                                }
592                            }
593                        } else {
594                            debug!("daemon {id} was already marked ready, not sending notification");
595                        }
596                        break;
597                    },
598                    _ = async {
599                        if let Some(ref mut interval) = http_check_interval {
600                            interval.tick().await;
601                        } else {
602                            std::future::pending::<()>().await;
603                        }
604                    }, if !ready_notified && ready_http.is_some() => {
605                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
606                            match client.get(url).send().await {
607                                Ok(response) if response.status().is_success() => {
608                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
609                                    ready_notified = true;
610                                    let _ = log_appender.flush().await;
611                                    if let Some(tx) = ready_tx.take() {
612                                        let _ = tx.send(Ok(()));
613                                    }
614                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
615                                    http_check_interval = None;
616                                    if !active_port_spawned && has_port_config {
617                                        active_port_spawned = true;
618                                        detect_and_store_active_port(id.clone(), daemon_pid);
619                                    }
620                                }
621                                Ok(response) => {
622                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
623                                }
624                                Err(e) => {
625                                    trace!("daemon {id} HTTP check failed: {e}");
626                                }
627                            }
628                        }
629                    }
630                    _ = async {
631                        if let Some(ref mut interval) = port_check_interval {
632                            interval.tick().await;
633                        } else {
634                            std::future::pending::<()>().await;
635                        }
636                    }, if !ready_notified && ready_port.is_some() => {
637                        if let Some(port) = ready_port {
638                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
639                                Ok(_) => {
640                                    info!("daemon {id} ready: TCP port {port} is listening");
641                                    ready_notified = true;
642                                    // Flush logs before notifying so clients see logs immediately
643                                    let _ = log_appender.flush().await;
644                                    if let Some(tx) = ready_tx.take() {
645                                        let _ = tx.send(Ok(()));
646                                    }
647                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
648                                    // Stop checking once ready
649                                    port_check_interval = None;
650                                    if !active_port_spawned && has_port_config {
651                                        active_port_spawned = true;
652                                        detect_and_store_active_port(id.clone(), daemon_pid);
653                                    }
654                                }
655                                Err(_) => {
656                                    trace!("daemon {id} port check: port {port} not listening yet");
657                                }
658                            }
659                        }
660                    }
661                    _ = async {
662                        if let Some(ref mut interval) = cmd_check_interval {
663                            interval.tick().await;
664                        } else {
665                            std::future::pending::<()>().await;
666                        }
667                    }, if !ready_notified && ready_cmd.is_some() => {
668                        if let Some(ref cmd) = ready_cmd {
669                            // Run the readiness check command using the shell abstraction
670                            let mut command = Shell::default_for_platform().command(cmd);
671                            command
672                                .current_dir(&daemon_dir)
673                                .stdout(std::process::Stdio::null())
674                                .stderr(std::process::Stdio::null());
675                            let result: std::io::Result<std::process::ExitStatus> = command.status().await;
676                            match result {
677                                Ok(status) if status.success() => {
678                                    info!("daemon {id} ready: readiness command succeeded");
679                                    ready_notified = true;
680                                    let _ = log_appender.flush().await;
681                                    if let Some(tx) = ready_tx.take() {
682                                        let _ = tx.send(Ok(()));
683                                    }
684                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
685                                    // Stop checking once ready
686                                    cmd_check_interval = None;
687                                    if !active_port_spawned && has_port_config {
688                                        active_port_spawned = true;
689                                        detect_and_store_active_port(id.clone(), daemon_pid);
690                                    }
691                                }
692                                Ok(_) => {
693                                    trace!("daemon {id} cmd check: command returned non-zero (not ready)");
694                                }
695                                Err(e) => {
696                                    trace!("daemon {id} cmd check failed: {e}");
697                                }
698                            }
699                        }
700                    }
701                    _ = async {
702                        if let Some(ref mut timer) = delay_timer {
703                            timer.await;
704                        } else {
705                            std::future::pending::<()>().await;
706                        }
707                    } => {
708                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
709                            info!("daemon {id} ready: delay elapsed");
710                            ready_notified = true;
711                            // Flush logs before notifying so clients see logs immediately
712                            let _ = log_appender.flush().await;
713                            if let Some(tx) = ready_tx.take() {
714                                let _ = tx.send(Ok(()));
715                            }
716                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
717                        }
718                        // Disable timer after it fires
719                        delay_timer = None;
720                        if !active_port_spawned && has_port_config {
721                            active_port_spawned = true;
722                            detect_and_store_active_port(id.clone(), daemon_pid);
723                        }
724                    }
725                    _ = log_flush_interval.tick() => {
726                        // Periodic flush to ensure logs are written to disk
727                        if let Err(e) = log_appender.flush().await {
728                            error!("Failed to flush log for daemon {id}: {e}");
729                        }
730                    }
731                }
732            }
733
734            // Final flush to ensure all buffered logs are written
735            if let Err(e) = log_appender.flush().await {
736                error!("Failed to final flush log for daemon {id}: {e}");
737            }
738
739            // Clear active_port since the process is no longer running
740            {
741                let mut state_file = SUPERVISOR.state_file.lock().await;
742                if let Some(d) = state_file.daemons.get_mut(&id) {
743                    d.active_port = None;
744                }
745                if let Err(e) = state_file.write() {
746                    debug!("Failed to write state after clearing active_port for {id}: {e}");
747                }
748            }
749
750            // Get the final exit status
751            let exit_status = if let Some(status) = exit_status {
752                status
753            } else {
754                // Streams closed but process hasn't exited yet, wait for it
755                match exit_rx.recv().await {
756                    Some(status) => status,
757                    None => {
758                        warn!("daemon {id} exit channel closed without receiving status");
759                        Err(std::io::Error::other("exit channel closed"))
760                    }
761                }
762            };
763            let current_daemon = SUPERVISOR.get_daemon(&id).await;
764
765            // Signal that this monitoring task is processing its exit path.
766            // The RAII guard will decrement the counter and notify close()
767            // when the task finishes (including all fire_hook registrations),
768            // regardless of which return path is taken.
769            SUPERVISOR
770                .active_monitors
771                .fetch_add(1, atomic::Ordering::Release);
772            struct MonitorGuard;
773            impl Drop for MonitorGuard {
774                fn drop(&mut self) {
775                    SUPERVISOR
776                        .active_monitors
777                        .fetch_sub(1, atomic::Ordering::Release);
778                    SUPERVISOR.monitor_done.notify_waiters();
779                }
780            }
781            let _monitor_guard = MonitorGuard;
782            // Check if this monitoring task is for the current daemon process.
783            // Allow Stopped/Stopping daemons through: stop() clears pid atomically,
784            // so d.pid != Some(pid) would be true, but we still need the is_stopped()
785            // branch below to fire on_stop/on_exit hooks.
786            if current_daemon.is_none()
787                || current_daemon.as_ref().is_some_and(|d| {
788                    d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
789                })
790            {
791                // Another process has taken over, don't update status
792                return;
793            }
794            // Capture the intentional-stop flag BEFORE any state changes.
795            // stop() transitions Stopping → Stopped and clears pid. If stop() wins the race
796            // and sets Stopped before this task runs, we still need to fire on_stop/on_exit.
797            // Treat both Stopping and Stopped as "intentional stop by pitchfork".
798            let already_stopped = current_daemon
799                .as_ref()
800                .is_some_and(|d| d.status.is_stopped());
801            let is_stopping = already_stopped
802                || current_daemon
803                    .as_ref()
804                    .is_some_and(|d| d.status.is_stopping());
805
806            // --- Phase 1: Determine exit_code, exit_reason, and update daemon state ---
807            let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
808                (Ok(status), true) => {
809                    // Intentional stop (by pitchfork). status.code() returns None
810                    // on Unix when killed by signal (e.g. SIGTERM); use -1 to
811                    // distinguish from a clean exit code 0.
812                    (status.code().unwrap_or(-1), "stop")
813                }
814                (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
815                (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
816                (Err(_), true) => {
817                    // child.wait() error while stopping (e.g. sysinfo reaped the process)
818                    (-1, "stop")
819                }
820                (Err(_), false) => (-1, "fail"),
821            };
822
823            // Update daemon state unless stop() already did it (won the race).
824            if !already_stopped {
825                if let Ok(status) = &exit_status {
826                    info!("daemon {id} exited with status {status}");
827                }
828                let (new_status, last_exit_success) = match exit_reason {
829                    "stop" | "exit" => (
830                        DaemonStatus::Stopped,
831                        exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
832                    ),
833                    _ => (DaemonStatus::Errored(exit_code), false),
834                };
835                if let Err(e) = SUPERVISOR
836                    .upsert_daemon(
837                        UpsertDaemonOpts::builder(id.clone())
838                            .set(|o| {
839                                o.pid = None;
840                                o.status = new_status;
841                                o.last_exit_success = Some(last_exit_success);
842                            })
843                            .build(),
844                    )
845                    .await
846                {
847                    error!("Failed to update daemon state for {id}: {e}");
848                }
849            }
850
851            // --- Phase 2: Fire hooks ---
852            let hook_extra_env = vec![
853                ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
854                ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
855            ];
856
857            // Determine which hooks to fire based on exit reason
858            let hooks_to_fire: Vec<HookType> = match exit_reason {
859                "stop" => vec![HookType::OnStop, HookType::OnExit],
860                "exit" => vec![HookType::OnExit],
861                // "fail": fire on_fail + on_exit only when retries are exhausted
862                _ if hook_retry_count >= hook_retry => {
863                    vec![HookType::OnFail, HookType::OnExit]
864                }
865                _ => vec![],
866            };
867
868            for hook_type in hooks_to_fire {
869                fire_hook(
870                    hook_type,
871                    id.clone(),
872                    daemon_dir.clone(),
873                    hook_retry_count,
874                    hook_daemon_env.clone(),
875                    hook_extra_env.clone(),
876                )
877                .await;
878            }
879        });
880
881        // If wait_ready is true, wait for readiness notification
882        if let Some(ready_rx) = ready_rx {
883            match ready_rx.await {
884                Ok(Ok(())) => {
885                    info!("daemon {id} is ready");
886                    Ok(IpcResponse::DaemonReady { daemon })
887                }
888                Ok(Err(exit_code)) => {
889                    error!("daemon {id} failed before becoming ready");
890                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
891                }
892                Err(_) => {
893                    error!("readiness channel closed unexpectedly for daemon {id}");
894                    Ok(IpcResponse::DaemonStart { daemon })
895                }
896            }
897        } else {
898            Ok(IpcResponse::DaemonStart { daemon })
899        }
900    }
901
902    /// Stop a running daemon
903    pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
904        let pitchfork_id = DaemonId::pitchfork();
905        if *id == pitchfork_id {
906            return Ok(IpcResponse::Error(
907                "Cannot stop supervisor via stop command".into(),
908            ));
909        }
910        info!("stopping daemon: {id}");
911        if let Some(daemon) = self.get_daemon(id).await {
912            trace!("daemon to stop: {daemon}");
913            if let Some(pid) = daemon.pid {
914                trace!("killing pid: {pid}");
915                PROCS.refresh_processes();
916                if PROCS.is_running(pid) {
917                    // First set status to Stopping (preserve PID for monitoring task)
918                    self.upsert_daemon(
919                        UpsertDaemonOpts::builder(id.clone())
920                            .set(|o| {
921                                o.pid = Some(pid);
922                                o.status = DaemonStatus::Stopping;
923                            })
924                            .build(),
925                    )
926                    .await?;
927
928                    // Kill the entire process group atomically (daemon PID == PGID
929                    // because we called setsid() at spawn time)
930                    if let Err(e) = PROCS.kill_process_group_async(pid).await {
931                        debug!("failed to kill pid {pid}: {e}");
932                        // Check if the process is actually stopped despite the error
933                        PROCS.refresh_processes();
934                        if PROCS.is_running(pid) {
935                            // Process still running after kill attempt - set back to Running
936                            debug!("failed to stop pid {pid}: process still running after kill");
937                            self.upsert_daemon(
938                                UpsertDaemonOpts::builder(id.clone())
939                                    .set(|o| {
940                                        o.pid = Some(pid); // Preserve PID to avoid orphaning the process
941                                        o.status = DaemonStatus::Running;
942                                    })
943                                    .build(),
944                            )
945                            .await?;
946                            return Ok(IpcResponse::DaemonStopFailed {
947                                error: format!(
948                                    "process {pid} still running after kill attempt: {e}"
949                                ),
950                            });
951                        }
952                    }
953
954                    // Process successfully stopped
955                    // Note: kill_async uses SIGTERM -> wait ~3s -> SIGKILL strategy,
956                    // and also detects zombie processes, so by the time it returns,
957                    // the process should be fully terminated.
958                    self.upsert_daemon(
959                        UpsertDaemonOpts::builder(id.clone())
960                            .set(|o| {
961                                o.pid = None;
962                                o.status = DaemonStatus::Stopped;
963                                o.last_exit_success = Some(true); // Manual stop is considered successful
964                            })
965                            .build(),
966                    )
967                    .await?;
968                } else {
969                    debug!("pid {pid} not running, process may have exited unexpectedly");
970                    // Process already dead, directly mark as stopped
971                    // Note that the cleanup logic is handled in monitor task
972                    self.upsert_daemon(
973                        UpsertDaemonOpts::builder(id.clone())
974                            .set(|o| {
975                                o.pid = None;
976                                o.status = DaemonStatus::Stopped;
977                            })
978                            .build(),
979                    )
980                    .await?;
981                    return Ok(IpcResponse::DaemonWasNotRunning);
982                }
983                Ok(IpcResponse::Ok)
984            } else {
985                debug!("daemon {id} not running");
986                Ok(IpcResponse::DaemonNotRunning)
987            }
988        } else {
989            debug!("daemon {id} not found");
990            Ok(IpcResponse::DaemonNotFound)
991        }
992    }
993}
994
995/// Check if multiple ports are available and optionally auto-bump to find available ports.
996///
997/// All ports are bumped by the same offset to maintain relative port spacing.
998/// Returns the resolved ports (either the original or bumped ones).
999/// Returns an error if any port is in use and auto_bump is disabled,
1000/// or if no available ports can be found after max attempts.
1001async fn check_ports_available(
1002    expected_ports: &[u16],
1003    auto_bump: bool,
1004    max_attempts: u32,
1005) -> Result<Vec<u16>> {
1006    if expected_ports.is_empty() {
1007        return Ok(Vec::new());
1008    }
1009
1010    for bump_offset in 0..=max_attempts {
1011        // Use wrapping_add to handle overflow correctly - ports wrap around at 65535
1012        let candidate_ports: Vec<u16> = expected_ports
1013            .iter()
1014            .map(|&p| p.wrapping_add(bump_offset as u16))
1015            .collect();
1016
1017        // Check if all ports in this set are available
1018        let mut all_available = true;
1019        let mut conflicting_port = None;
1020
1021        for &port in &candidate_ports {
1022            // Port 0 is a special case - it requests an ephemeral port from the OS.
1023            // Skip the availability check for port 0 since binding to it always succeeds.
1024            if port == 0 {
1025                continue;
1026            }
1027
1028            // Use spawn_blocking to avoid blocking the async runtime during TCP bind checks.
1029            //
1030            // We check multiple addresses to avoid false-negatives caused by SO_REUSEADDR.
1031            // On macOS/BSD, Rust's TcpListener::bind sets SO_REUSEADDR by default, which
1032            // allows binding 0.0.0.0:port even when 127.0.0.1:port is already in use
1033            // (because 0.0.0.0 is technically a different address).  Most daemons bind
1034            // to localhost, so checking 127.0.0.1 is essential to detect real conflicts.
1035            // We also check [::1] to cover IPv6 loopback listeners.
1036            //
1037            // NOTE: This check has a time-of-check-to-time-of-use (TOCTOU) race condition.
1038            // Another process could grab the port between our check and the daemon actually
1039            // binding. This is inherent to the approach and acceptable for our use case
1040            // since we're primarily detecting conflicts with already-running daemons.
1041            if is_port_in_use(port).await {
1042                all_available = false;
1043                conflicting_port = Some(port);
1044                break;
1045            }
1046        }
1047
1048        if all_available {
1049            // Check for overflow (port wrapped around to 0 due to wrapping_add)
1050            // If any candidate port is 0 but the original expected port wasn't 0,
1051            // it means we've wrapped around and should stop
1052            if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1053                return Err(PortError::NoAvailablePort {
1054                    start_port: expected_ports[0],
1055                    attempts: bump_offset + 1,
1056                }
1057                .into());
1058            }
1059            if bump_offset > 0 {
1060                info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1061            }
1062            return Ok(candidate_ports);
1063        }
1064
1065        // Port is in use
1066        if bump_offset == 0 && !auto_bump {
1067            if let Some(port) = conflicting_port {
1068                let (pid, process) = identify_port_owner(port).await;
1069                return Err(PortError::InUse { port, process, pid }.into());
1070            }
1071        }
1072    }
1073
1074    // No available ports found after max attempts
1075    Err(PortError::NoAvailablePort {
1076        start_port: expected_ports[0],
1077        attempts: max_attempts + 1,
1078    }
1079    .into())
1080}
1081
1082/// Check whether a port is currently in use by attempting to bind on multiple addresses.
1083///
1084/// Returns `true` when at least one bind attempt gets `AddrInUse`, meaning another
1085/// process is listening.  Other errors (e.g. `AddrNotAvailable` on an address family
1086/// the OS doesn't support) are ignored so they don't produce false positives.
1087async fn is_port_in_use(port: u16) -> bool {
1088    tokio::task::spawn_blocking(move || {
1089        for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1090            match std::net::TcpListener::bind((addr, port)) {
1091                Ok(listener) => drop(listener),
1092                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1093                Err(_) => continue,
1094            }
1095        }
1096        false
1097    })
1098    .await
1099    .unwrap_or(false)
1100}
1101
1102/// Best-effort lookup of the process occupying a port via `listeners::get_all()`.
1103///
1104/// Returns `(pid, process_name)`.  Falls back to `(0, "unknown")` when the
1105/// system call fails (permission error, unsupported OS, etc.).
1106async fn identify_port_owner(port: u16) -> (u32, String) {
1107    tokio::task::spawn_blocking(move || {
1108        listeners::get_all()
1109            .ok()
1110            .and_then(|list| {
1111                list.into_iter()
1112                    .find(|l| l.socket.port() == port)
1113                    .map(|l| (l.process.pid, l.process.name))
1114            })
1115            .unwrap_or((0, "unknown".to_string()))
1116    })
1117    .await
1118    .unwrap_or((0, "unknown".to_string()))
1119}
1120
1121/// Detect whether a port is in use, and if so, identify the owning process.
1122///
1123/// Combines `is_port_in_use` (reliable bind probe) with `identify_port_owner`
1124/// (best-effort process lookup).  Returns `None` when the port is free.
1125async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1126    if !is_port_in_use(port).await {
1127        return None;
1128    }
1129    Some(identify_port_owner(port).await)
1130}
1131
1132/// Spawn a background task that detects the first port the daemon process is listening on
1133/// and stores it in the state file as `active_port`.
1134///
1135/// This is called once when the daemon becomes ready. The port is cleared when the daemon stops.
1136///
1137/// Port selection strategy:
1138/// 1. If the daemon has `expected_port` configured, prefer the first port from that list
1139///    (it is the port the operator explicitly designated as the primary service port).
1140/// 2. Otherwise, take the first port the process is actually listening on (in the order
1141///    returned by the OS), which is typically the port bound earliest.
1142///
1143/// Using `min()` (lowest port number) was previously used here but is incorrect: many
1144/// applications listen on multiple ports (e.g. HTTP + metrics) and the lowest-numbered
1145/// port is not necessarily the primary service port.
1146fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1147    tokio::spawn(async move {
1148        // Retry with exponential backoff so that slow-starting daemons (JVM,
1149        // Node.js, Python, etc.) that take more than 500 ms to bind their port
1150        // are still detected.  Total wait budget: 500+1000+2000+4000 = 7.5 s.
1151        for delay_ms in [500u64, 1000, 2000, 4000] {
1152            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1153
1154            // Read daemon state atomically: check if still alive and get expected_port
1155            // in a single lock acquisition to avoid TOCTOU and unnecessary lock overhead.
1156            let expected_port: Option<u16> = {
1157                let state_file = SUPERVISOR.state_file.lock().await;
1158                match state_file.daemons.get(&id) {
1159                    Some(d) if d.pid.is_none() => {
1160                        debug!("daemon {id}: aborting active_port detection — process exited");
1161                        return;
1162                    }
1163                    Some(d) => d.expected_port.first().copied().filter(|&p| p > 0),
1164                    None => None,
1165                }
1166            };
1167
1168            let active_port = tokio::task::spawn_blocking(move || {
1169                let listeners = listeners::get_all().ok()?;
1170                let process_ports: Vec<u16> = listeners
1171                    .into_iter()
1172                    .filter(|listener| listener.process.pid == pid)
1173                    .map(|listener| listener.socket.port())
1174                    .filter(|&port| port > 0)
1175                    .collect();
1176
1177                if process_ports.is_empty() {
1178                    return None;
1179                }
1180
1181                // Prefer the configured expected_port if the process is actually
1182                // listening on it; otherwise fall back to the first port found.
1183                if let Some(ep) = expected_port {
1184                    if process_ports.contains(&ep) {
1185                        return Some(ep);
1186                    }
1187                }
1188
1189                // No expected_port match — return the first port in the list.
1190                // The list order reflects the order the OS reports listeners,
1191                // which is generally the order they were bound (earliest first).
1192                // Do NOT sort: the lowest-numbered port is not necessarily the
1193                // primary service port (e.g. HTTP vs metrics).
1194                process_ports.into_iter().next()
1195            })
1196            .await
1197            .ok()
1198            .flatten();
1199
1200            if let Some(port) = active_port {
1201                debug!("daemon {id} active_port detected: {port}");
1202                let mut state_file = SUPERVISOR.state_file.lock().await;
1203                if let Some(d) = state_file.daemons.get_mut(&id) {
1204                    // Guard against PID reuse: if the original process exited and the OS
1205                    // assigned the same PID to an unrelated process that happens to bind
1206                    // a port, we must not route proxy traffic to that unrelated service.
1207                    if d.pid == Some(pid) {
1208                        d.active_port = Some(port);
1209                    } else {
1210                        debug!(
1211                            "daemon {id}: skipping active_port write — PID mismatch \
1212                             (expected {pid}, current {:?})",
1213                            d.pid
1214                        );
1215                        return;
1216                    }
1217                }
1218                if let Err(e) = state_file.write() {
1219                    debug!("Failed to write state after detecting active_port for {id}: {e}");
1220                }
1221                return;
1222            }
1223
1224            debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1225        }
1226
1227        debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1228    });
1229}
1230
1231/// Check whether a daemon (by its qualified ID) is the target of any registered
1232/// slug in the global config.  This is used to decide whether to run the
1233/// `detect_and_store_active_port` polling task — only slug-targeted daemons need
1234/// it, avoiding wasted `listeners::get_all()` calls for port-less daemons.
1235///
1236/// Delegates to `proxy::server::is_slug_target()` which uses the same in-memory
1237/// slug cache as the proxy hot path, so this check is cheap.
1238fn is_daemon_slug_target(id: &DaemonId) -> bool {
1239    // read_global_slugs is called once per daemon start — acceptable cost.
1240    // We intentionally avoid making this async to keep has_port_config evaluation
1241    // simple and synchronous in run_once().
1242    let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1243    slugs.iter().any(|(slug, entry)| {
1244        let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1245        id.name() == daemon_name
1246    })
1247}