Skip to main content

pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::hooks::{HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::{IntoDiagnostic, WrapErr};
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22use std::iter::once;
23use std::net::TcpListener;
24use std::sync::atomic;
25use std::time::Duration;
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
27use tokio::select;
28use tokio::sync::oneshot;
29use tokio::time;
30
31/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
33    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
34
35/// Get or compile a regex pattern, caching the result for future use
36pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
37    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
38    if let Some(re) = cache.get(pattern) {
39        return Some(re.clone());
40    }
41    match Regex::new(pattern) {
42        Ok(re) => {
43            cache.insert(pattern.to_string(), re.clone());
44            Some(re)
45        }
46        Err(e) => {
47            error!("invalid regex pattern '{pattern}': {e}");
48            None
49        }
50    }
51}
52
53impl Supervisor {
54    /// Run a daemon, handling retries if configured
55    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
56        let id = &opts.id;
57        let cmd = opts.cmd.clone();
58
59        // Clear any pending autostop for this daemon since it's being started
60        {
61            let mut pending = self.pending_autostops.lock().await;
62            if pending.remove(id).is_some() {
63                info!("cleared pending autostop for {id} (daemon starting)");
64            }
65        }
66
67        let daemon = self.get_daemon(id).await;
68        if let Some(daemon) = daemon {
69            // Stopping state is treated as "not running" - the monitoring task will clean it up
70            // Only check for Running state with a valid PID
71            if !daemon.status.is_stopping()
72                && !daemon.status.is_stopped()
73                && let Some(pid) = daemon.pid
74            {
75                if opts.force {
76                    self.stop(id).await?;
77                    info!("run: stop completed for daemon {id}");
78                } else {
79                    warn!("daemon {id} already running with pid {pid}");
80                    return Ok(IpcResponse::DaemonAlreadyRunning);
81                }
82            }
83        }
84
85        // If wait_ready is true and retry is configured, implement retry loop
86        if opts.wait_ready && opts.retry > 0 {
87            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
88            let max_attempts = opts.retry.saturating_add(1);
89            for attempt in 0..max_attempts {
90                let mut retry_opts = opts.clone();
91                retry_opts.retry_count = attempt;
92                retry_opts.cmd = cmd.clone();
93
94                let result = self.run_once(retry_opts).await?;
95
96                match result {
97                    IpcResponse::DaemonReady { daemon } => {
98                        return Ok(IpcResponse::DaemonReady { daemon });
99                    }
100                    IpcResponse::DaemonFailedWithCode { exit_code } => {
101                        if attempt < opts.retry {
102                            let backoff_secs = 2u64.pow(attempt);
103                            info!(
104                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
105                                attempt + 1,
106                                max_attempts,
107                                backoff_secs
108                            );
109                            fire_hook(
110                                HookType::OnRetry,
111                                id.clone(),
112                                opts.dir.clone(),
113                                attempt + 1,
114                                opts.env.clone(),
115                                vec![],
116                            )
117                            .await;
118                            time::sleep(Duration::from_secs(backoff_secs)).await;
119                            continue;
120                        } else {
121                            info!("daemon {id} failed after {max_attempts} attempts");
122                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
123                        }
124                    }
125                    other => return Ok(other),
126                }
127            }
128        }
129
130        // No retry or wait_ready is false
131        self.run_once(opts).await
132    }
133
134    /// Run a daemon once (single attempt)
135    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
136        let id = &opts.id;
137        let original_cmd = opts.cmd.clone(); // Save original command for persistence
138        let cmd = opts.cmd;
139
140        // Create channel for readiness notification if wait_ready is true
141        let (ready_tx, ready_rx) = if opts.wait_ready {
142            let (tx, rx) = oneshot::channel();
143            (Some(tx), Some(rx))
144        } else {
145            (None, None)
146        };
147
148        // Check port availability and apply auto-bump if configured
149        let expected_ports = opts.expected_port.clone();
150        let (resolved_ports, effective_ready_port) = if !opts.expected_port.is_empty() {
151            match check_ports_available(
152                &opts.expected_port,
153                opts.auto_bump_port,
154                opts.port_bump_attempts,
155            )
156            .await
157            {
158                Ok(resolved) => {
159                    let ready_port = if let Some(configured_port) = opts.ready_port {
160                        // If ready_port matches one of the expected ports, apply the same bump offset
161                        let bump_offset = resolved
162                            .first()
163                            .unwrap_or(&0)
164                            .saturating_sub(*opts.expected_port.first().unwrap_or(&0));
165                        if opts.expected_port.contains(&configured_port) && bump_offset > 0 {
166                            configured_port
167                                .checked_add(bump_offset)
168                                .or(Some(configured_port))
169                        } else {
170                            Some(configured_port)
171                        }
172                    } else {
173                        // Don't use port 0 for readiness checks - it's a special value
174                        // that requests an ephemeral port from the OS
175                        resolved.first().copied().filter(|&p| p != 0)
176                    };
177                    info!(
178                        "daemon {id}: ports {:?} resolved to {:?}",
179                        opts.expected_port, resolved
180                    );
181                    (resolved, ready_port)
182                }
183                Err(e) => {
184                    error!("daemon {id}: port check failed: {e}");
185                    // Convert PortError to structured IPC response
186                    if let Some(port_error) = e.downcast_ref::<PortError>() {
187                        match port_error {
188                            PortError::InUse { port, process, pid } => {
189                                return Ok(IpcResponse::PortConflict {
190                                    port: *port,
191                                    process: process.clone(),
192                                    pid: *pid,
193                                });
194                            }
195                            PortError::NoAvailablePort {
196                                start_port,
197                                attempts,
198                            } => {
199                                return Ok(IpcResponse::NoAvailablePort {
200                                    start_port: *start_port,
201                                    attempts: *attempts,
202                                });
203                            }
204                        }
205                    }
206                    return Ok(IpcResponse::DaemonFailed {
207                        error: e.to_string(),
208                    });
209                }
210            }
211        } else {
212            (Vec::new(), opts.ready_port)
213        };
214
215        let cmd: Vec<String> = if opts.mise {
216            match settings().resolve_mise_bin() {
217                Some(mise_bin) => {
218                    let mise_bin_str = mise_bin.to_string_lossy().to_string();
219                    info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
220                    once("exec".to_string())
221                        .chain(once(mise_bin_str))
222                        .chain(once("x".to_string()))
223                        .chain(once("--".to_string()))
224                        .chain(cmd)
225                        .collect_vec()
226                }
227                None => {
228                    warn!("daemon {id}: mise=true but mise binary not found, running without mise");
229                    once("exec".to_string()).chain(cmd).collect_vec()
230                }
231            }
232        } else {
233            once("exec".to_string()).chain(cmd).collect_vec()
234        };
235        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
236        let log_path = id.log_path();
237        if let Some(parent) = log_path.parent() {
238            xx::file::mkdirp(parent)?;
239        }
240        info!("run: spawning daemon {id} with args: {args:?}");
241        let mut cmd = tokio::process::Command::new("sh");
242        cmd.args(&args)
243            .stdin(std::process::Stdio::null())
244            .stdout(std::process::Stdio::piped())
245            .stderr(std::process::Stdio::piped())
246            .current_dir(&opts.dir);
247
248        // Ensure daemon can find user tools by using the original PATH
249        if let Some(ref path) = *env::ORIGINAL_PATH {
250            cmd.env("PATH", path);
251        }
252
253        // Apply custom environment variables from config
254        if let Some(ref env_vars) = opts.env {
255            cmd.envs(env_vars);
256        }
257
258        // Inject pitchfork metadata env vars AFTER user env so they can't be overwritten
259        cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
260        cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
261        cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
262
263        // Inject the resolved ports for the daemon to use
264        if !resolved_ports.is_empty() {
265            // Set PORT to the first port for backward compatibility
266            // When there's only one port, both PORT and PORT0 will be set to the same value.
267            // This follows the convention used by many deployment platforms (Heroku, etc.).
268            cmd.env("PORT", resolved_ports[0].to_string());
269            // Set individual ports as PORT0, PORT1, etc.
270            for (i, port) in resolved_ports.iter().enumerate() {
271                cmd.env(format!("PORT{}", i), port.to_string());
272            }
273        }
274
275        #[cfg(unix)]
276        {
277            unsafe {
278                cmd.pre_exec(move || {
279                    if libc::setsid() == -1 {
280                        return Err(std::io::Error::last_os_error());
281                    }
282                    Ok(())
283                });
284            }
285        }
286
287        let mut child = cmd.spawn().into_diagnostic()?;
288        let pid = match child.id() {
289            Some(p) => p,
290            None => {
291                warn!("Daemon {id} exited before PID could be captured");
292                return Ok(IpcResponse::DaemonFailed {
293                    error: "Process exited immediately".to_string(),
294                });
295            }
296        };
297        info!("started daemon {id} with pid {pid}");
298        let daemon = self
299            .upsert_daemon(
300                UpsertDaemonOpts::builder(id.clone())
301                    .set(|o| {
302                        o.pid = Some(pid);
303                        o.status = DaemonStatus::Running;
304                        o.shell_pid = opts.shell_pid;
305                        o.dir = Some(opts.dir.clone());
306                        o.cmd = Some(original_cmd);
307                        o.autostop = opts.autostop;
308                        o.cron_schedule = opts.cron_schedule.clone();
309                        o.cron_retrigger = opts.cron_retrigger;
310                        o.retry = Some(opts.retry);
311                        o.retry_count = Some(opts.retry_count);
312                        o.ready_delay = opts.ready_delay;
313                        o.ready_output = opts.ready_output.clone();
314                        o.ready_http = opts.ready_http.clone();
315                        o.ready_port = effective_ready_port;
316                        o.ready_cmd = opts.ready_cmd.clone();
317                        o.expected_port = expected_ports;
318                        o.resolved_port = resolved_ports;
319                        o.auto_bump_port = Some(opts.auto_bump_port);
320                        o.port_bump_attempts = Some(opts.port_bump_attempts);
321                        o.depends = Some(opts.depends.clone());
322                        o.env = opts.env.clone();
323                        o.watch = Some(opts.watch.clone());
324                        o.watch_base_dir = opts.watch_base_dir.clone();
325                        o.mise = Some(opts.mise);
326                        o.memory_limit = opts.memory_limit;
327                        o.cpu_limit = opts.cpu_limit;
328                    })
329                    .build(),
330            )
331            .await?;
332
333        let id_clone = id.clone();
334        let ready_delay = opts.ready_delay;
335        let ready_output = opts.ready_output.clone();
336        let ready_http = opts.ready_http.clone();
337        let ready_port = effective_ready_port;
338        let ready_cmd = opts.ready_cmd.clone();
339        let daemon_dir = opts.dir.clone();
340        let hook_retry_count = opts.retry_count;
341        let hook_retry = opts.retry;
342        let hook_daemon_env = opts.env.clone();
343
344        tokio::spawn(async move {
345            let id = id_clone;
346            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
347                (Some(out), Some(err)) => (out, err),
348                _ => {
349                    error!("Failed to capture stdout/stderr for daemon {id}");
350                    return;
351                }
352            };
353            let mut stdout = tokio::io::BufReader::new(stdout).lines();
354            let mut stderr = tokio::io::BufReader::new(stderr).lines();
355            let log_file = match tokio::fs::File::options()
356                .append(true)
357                .create(true)
358                .open(&log_path)
359                .await
360            {
361                Ok(f) => f,
362                Err(e) => {
363                    error!("Failed to open log file for daemon {id}: {e}");
364                    return;
365                }
366            };
367            let mut log_appender = BufWriter::new(log_file);
368
369            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
370            let format_line = |line: String| {
371                if line.starts_with(&format!("{id} ")) {
372                    // mise tasks often already have the id printed
373                    format!("{} {line}\n", now())
374                } else {
375                    format!("{} {id} {line}\n", now())
376                }
377            };
378
379            // Setup readiness checking
380            let mut ready_notified = false;
381            let mut ready_tx = ready_tx;
382            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
383
384            let mut delay_timer =
385                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
386
387            // Get settings for intervals
388            let s = settings();
389            let ready_check_interval = s.supervisor_ready_check_interval();
390            let http_client_timeout = s.supervisor_http_client_timeout();
391            let log_flush_interval_duration = s.supervisor_log_flush_interval();
392
393            // Setup HTTP readiness check interval
394            let mut http_check_interval = ready_http
395                .as_ref()
396                .map(|_| tokio::time::interval(ready_check_interval));
397            let http_client = ready_http.as_ref().map(|_| {
398                reqwest::Client::builder()
399                    .timeout(http_client_timeout)
400                    .build()
401                    .unwrap_or_default()
402            });
403
404            // Setup TCP port readiness check interval
405            let mut port_check_interval =
406                ready_port.map(|_| tokio::time::interval(ready_check_interval));
407
408            // Setup command readiness check interval
409            let mut cmd_check_interval = ready_cmd
410                .as_ref()
411                .map(|_| tokio::time::interval(ready_check_interval));
412
413            // Setup periodic log flush interval
414            let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
415
416            // Use a channel to communicate process exit status
417            let (exit_tx, mut exit_rx) =
418                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
419
420            // Spawn a task to wait for process exit
421            let child_pid = child.id().unwrap_or(0);
422            tokio::spawn(async move {
423                let result = child.wait().await;
424                debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
425                let _ = exit_tx.send(result).await;
426            });
427
428            #[allow(unused_assignments)]
429            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
430            let mut exit_status = None;
431
432            loop {
433                select! {
434                    Ok(Some(line)) = stdout.next_line() => {
435                        let formatted = format_line(line.clone());
436                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
437                            error!("Failed to write to log for daemon {id}: {e}");
438                        }
439                        trace!("stdout: {id} {formatted}");
440
441                        // Check if output matches ready pattern
442                        if !ready_notified
443                            && let Some(ref pattern) = ready_pattern
444                                && pattern.is_match(&line) {
445                                    info!("daemon {id} ready: output matched pattern");
446                                    ready_notified = true;
447                                    // Flush logs before notifying so clients see logs immediately
448                                    let _ = log_appender.flush().await;
449                                    if let Some(tx) = ready_tx.take() {
450                                        let _ = tx.send(Ok(()));
451                                    }
452                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
453                                }
454                    }
455                    Ok(Some(line)) = stderr.next_line() => {
456                        let formatted = format_line(line.clone());
457                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
458                            error!("Failed to write to log for daemon {id}: {e}");
459                        }
460                        trace!("stderr: {id} {formatted}");
461
462                        // Check if output matches ready pattern (also check stderr)
463                        if !ready_notified
464                            && let Some(ref pattern) = ready_pattern
465                                && pattern.is_match(&line) {
466                                    info!("daemon {id} ready: output matched pattern");
467                                    ready_notified = true;
468                                    // Flush logs before notifying so clients see logs immediately
469                                    let _ = log_appender.flush().await;
470                                    if let Some(tx) = ready_tx.take() {
471                                        let _ = tx.send(Ok(()));
472                                    }
473                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
474                                }
475                    },
476                    Some(result) = exit_rx.recv() => {
477                        // Process exited - save exit status and notify if not ready yet
478                        exit_status = Some(result);
479                        debug!("daemon {id} process exited, exit_status: {exit_status:?}");
480                        // Flush logs before notifying so clients see logs immediately
481                        let _ = log_appender.flush().await;
482                        if !ready_notified {
483                            if let Some(tx) = ready_tx.take() {
484                                // Check if process exited successfully
485                                let is_success = exit_status.as_ref()
486                                    .and_then(|r| r.as_ref().ok())
487                                    .map(|s| s.success())
488                                    .unwrap_or(false);
489
490                                if is_success {
491                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
492                                    let _ = tx.send(Ok(()));
493                                } else {
494                                    let exit_code = exit_status.as_ref()
495                                        .and_then(|r| r.as_ref().ok())
496                                        .and_then(|s| s.code());
497                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
498                                    let _ = tx.send(Err(exit_code));
499                                }
500                            }
501                        } else {
502                            debug!("daemon {id} was already marked ready, not sending notification");
503                        }
504                        break;
505                    }
506                    _ = async {
507                        if let Some(ref mut interval) = http_check_interval {
508                            interval.tick().await;
509                        } else {
510                            std::future::pending::<()>().await;
511                        }
512                    }, if !ready_notified && ready_http.is_some() => {
513                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
514                            match client.get(url).send().await {
515                                Ok(response) if response.status().is_success() => {
516                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
517                                    ready_notified = true;
518                                    // Flush logs before notifying so clients see logs immediately
519                                    let _ = log_appender.flush().await;
520                                    if let Some(tx) = ready_tx.take() {
521                                        let _ = tx.send(Ok(()));
522                                    }
523                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
524                                    // Stop checking once ready
525                                    http_check_interval = None;
526                                }
527                                Ok(response) => {
528                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
529                                }
530                                Err(e) => {
531                                    trace!("daemon {id} HTTP check failed: {e}");
532                                }
533                            }
534                        }
535                    }
536                    _ = async {
537                        if let Some(ref mut interval) = port_check_interval {
538                            interval.tick().await;
539                        } else {
540                            std::future::pending::<()>().await;
541                        }
542                    }, if !ready_notified && ready_port.is_some() => {
543                        if let Some(port) = ready_port {
544                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
545                                Ok(_) => {
546                                    info!("daemon {id} ready: TCP port {port} is listening");
547                                    ready_notified = true;
548                                    // Flush logs before notifying so clients see logs immediately
549                                    let _ = log_appender.flush().await;
550                                    if let Some(tx) = ready_tx.take() {
551                                        let _ = tx.send(Ok(()));
552                                    }
553                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
554                                    // Stop checking once ready
555                                    port_check_interval = None;
556                                }
557                                Err(_) => {
558                                    trace!("daemon {id} port check: port {port} not listening yet");
559                                }
560                            }
561                        }
562                    }
563                    _ = async {
564                        if let Some(ref mut interval) = cmd_check_interval {
565                            interval.tick().await;
566                        } else {
567                            std::future::pending::<()>().await;
568                        }
569                    }, if !ready_notified && ready_cmd.is_some() => {
570                        if let Some(ref cmd) = ready_cmd {
571                            // Run the readiness check command using the shell abstraction
572                            let mut command = Shell::default_for_platform().command(cmd);
573                            command
574                                .current_dir(&daemon_dir)
575                                .stdout(std::process::Stdio::null())
576                                .stderr(std::process::Stdio::null());
577                            let result: std::io::Result<std::process::ExitStatus> = command.status().await;
578                            match result {
579                                Ok(status) if status.success() => {
580                                    info!("daemon {id} ready: readiness command succeeded");
581                                    ready_notified = true;
582                                    let _ = log_appender.flush().await;
583                                    if let Some(tx) = ready_tx.take() {
584                                        let _ = tx.send(Ok(()));
585                                    }
586                                    fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
587                                    // Stop checking once ready
588                                    cmd_check_interval = None;
589                                }
590                                Ok(_) => {
591                                    trace!("daemon {id} cmd check: command returned non-zero (not ready)");
592                                }
593                                Err(e) => {
594                                    trace!("daemon {id} cmd check failed: {e}");
595                                }
596                            }
597                        }
598                    }
599                    _ = async {
600                        if let Some(ref mut timer) = delay_timer {
601                            timer.await;
602                        } else {
603                            std::future::pending::<()>().await;
604                        }
605                    } => {
606                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
607                            info!("daemon {id} ready: delay elapsed");
608                            ready_notified = true;
609                            // Flush logs before notifying so clients see logs immediately
610                            let _ = log_appender.flush().await;
611                            if let Some(tx) = ready_tx.take() {
612                                let _ = tx.send(Ok(()));
613                            }
614                            fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
615                        }
616                        // Disable timer after it fires
617                        delay_timer = None;
618                    }
619                    _ = log_flush_interval.tick() => {
620                        // Periodic flush to ensure logs are written to disk
621                        if let Err(e) = log_appender.flush().await {
622                            error!("Failed to flush log for daemon {id}: {e}");
623                        }
624                    }
625                    // Note: No `else => break` because log_flush_interval.tick() is always available,
626                    // making the else branch unreachable. The loop exits via the exit_rx.recv() branch.
627                }
628            }
629
630            // Final flush to ensure all buffered logs are written
631            if let Err(e) = log_appender.flush().await {
632                error!("Failed to final flush log for daemon {id}: {e}");
633            }
634
635            // Get the final exit status
636            let exit_status = if let Some(status) = exit_status {
637                status
638            } else {
639                // Streams closed but process hasn't exited yet, wait for it
640                match exit_rx.recv().await {
641                    Some(status) => status,
642                    None => {
643                        warn!("daemon {id} exit channel closed without receiving status");
644                        Err(std::io::Error::other("exit channel closed"))
645                    }
646                }
647            };
648            let current_daemon = SUPERVISOR.get_daemon(&id).await;
649
650            // Signal that this monitoring task is processing its exit path.
651            // The RAII guard will decrement the counter and notify close()
652            // when the task finishes (including all fire_hook registrations),
653            // regardless of which return path is taken.
654            SUPERVISOR
655                .active_monitors
656                .fetch_add(1, atomic::Ordering::Release);
657            struct MonitorGuard;
658            impl Drop for MonitorGuard {
659                fn drop(&mut self) {
660                    SUPERVISOR
661                        .active_monitors
662                        .fetch_sub(1, atomic::Ordering::Release);
663                    SUPERVISOR.monitor_done.notify_waiters();
664                }
665            }
666            let _monitor_guard = MonitorGuard;
667            // Check if this monitoring task is for the current daemon process.
668            // Allow Stopped/Stopping daemons through: stop() clears pid atomically,
669            // so d.pid != Some(pid) would be true, but we still need the is_stopped()
670            // branch below to fire on_stop/on_exit hooks.
671            if current_daemon.is_none()
672                || current_daemon.as_ref().is_some_and(|d| {
673                    d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
674                })
675            {
676                // Another process has taken over, don't update status
677                return;
678            }
679            // Capture the intentional-stop flag BEFORE any state changes.
680            // stop() transitions Stopping → Stopped and clears pid. If stop() wins the race
681            // and sets Stopped before this task runs, we still need to fire on_stop/on_exit.
682            // Treat both Stopping and Stopped as "intentional stop by pitchfork".
683            let already_stopped = current_daemon
684                .as_ref()
685                .is_some_and(|d| d.status.is_stopped());
686            let is_stopping = already_stopped
687                || current_daemon
688                    .as_ref()
689                    .is_some_and(|d| d.status.is_stopping());
690
691            // --- Phase 1: Determine exit_code, exit_reason, and update daemon state ---
692            let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
693                (Ok(status), true) => {
694                    // Intentional stop (by pitchfork). status.code() returns None
695                    // on Unix when killed by signal (e.g. SIGTERM); use -1 to
696                    // distinguish from a clean exit code 0.
697                    (status.code().unwrap_or(-1), "stop")
698                }
699                (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
700                (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
701                (Err(_), true) => {
702                    // child.wait() error while stopping (e.g. sysinfo reaped the process)
703                    (-1, "stop")
704                }
705                (Err(_), false) => (-1, "fail"),
706            };
707
708            // Update daemon state unless stop() already did it (won the race).
709            if !already_stopped {
710                if let Ok(status) = &exit_status {
711                    info!("daemon {id} exited with status {status}");
712                }
713                let (new_status, last_exit_success) = match exit_reason {
714                    "stop" | "exit" => (
715                        DaemonStatus::Stopped,
716                        exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
717                    ),
718                    _ => (DaemonStatus::Errored(exit_code), false),
719                };
720                if let Err(e) = SUPERVISOR
721                    .upsert_daemon(
722                        UpsertDaemonOpts::builder(id.clone())
723                            .set(|o| {
724                                o.pid = None;
725                                o.status = new_status;
726                                o.last_exit_success = Some(last_exit_success);
727                            })
728                            .build(),
729                    )
730                    .await
731                {
732                    error!("Failed to update daemon state for {id}: {e}");
733                }
734            }
735
736            // --- Phase 2: Fire hooks ---
737            let hook_extra_env = vec![
738                ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
739                ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
740            ];
741
742            // Determine which hooks to fire based on exit reason
743            let hooks_to_fire: Vec<HookType> = match exit_reason {
744                "stop" => vec![HookType::OnStop, HookType::OnExit],
745                "exit" => vec![HookType::OnExit],
746                // "fail": fire on_fail + on_exit only when retries are exhausted
747                _ if hook_retry_count >= hook_retry => {
748                    vec![HookType::OnFail, HookType::OnExit]
749                }
750                _ => vec![],
751            };
752
753            for hook_type in hooks_to_fire {
754                fire_hook(
755                    hook_type,
756                    id.clone(),
757                    daemon_dir.clone(),
758                    hook_retry_count,
759                    hook_daemon_env.clone(),
760                    hook_extra_env.clone(),
761                )
762                .await;
763            }
764        });
765
766        // If wait_ready is true, wait for readiness notification
767        if let Some(ready_rx) = ready_rx {
768            match ready_rx.await {
769                Ok(Ok(())) => {
770                    info!("daemon {id} is ready");
771                    Ok(IpcResponse::DaemonReady { daemon })
772                }
773                Ok(Err(exit_code)) => {
774                    error!("daemon {id} failed before becoming ready");
775                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
776                }
777                Err(_) => {
778                    error!("readiness channel closed unexpectedly for daemon {id}");
779                    Ok(IpcResponse::DaemonStart { daemon })
780                }
781            }
782        } else {
783            Ok(IpcResponse::DaemonStart { daemon })
784        }
785    }
786
787    /// Stop a running daemon
788    pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
789        let pitchfork_id = DaemonId::pitchfork();
790        if *id == pitchfork_id {
791            return Ok(IpcResponse::Error(
792                "Cannot stop supervisor via stop command".into(),
793            ));
794        }
795        info!("stopping daemon: {id}");
796        if let Some(daemon) = self.get_daemon(id).await {
797            trace!("daemon to stop: {daemon}");
798            if let Some(pid) = daemon.pid {
799                trace!("killing pid: {pid}");
800                PROCS.refresh_processes();
801                if PROCS.is_running(pid) {
802                    // First set status to Stopping (preserve PID for monitoring task)
803                    self.upsert_daemon(
804                        UpsertDaemonOpts::builder(id.clone())
805                            .set(|o| {
806                                o.pid = Some(pid);
807                                o.status = DaemonStatus::Stopping;
808                            })
809                            .build(),
810                    )
811                    .await?;
812
813                    // Kill the entire process group atomically (daemon PID == PGID
814                    // because we called setsid() at spawn time)
815                    if let Err(e) = PROCS.kill_process_group_async(pid).await {
816                        debug!("failed to kill pid {pid}: {e}");
817                        // Check if the process is actually stopped despite the error
818                        PROCS.refresh_processes();
819                        if PROCS.is_running(pid) {
820                            // Process still running after kill attempt - set back to Running
821                            debug!("failed to stop pid {pid}: process still running after kill");
822                            self.upsert_daemon(
823                                UpsertDaemonOpts::builder(id.clone())
824                                    .set(|o| {
825                                        o.pid = Some(pid); // Preserve PID to avoid orphaning the process
826                                        o.status = DaemonStatus::Running;
827                                    })
828                                    .build(),
829                            )
830                            .await?;
831                            return Ok(IpcResponse::DaemonStopFailed {
832                                error: format!(
833                                    "process {pid} still running after kill attempt: {e}"
834                                ),
835                            });
836                        }
837                    }
838
839                    // Process successfully stopped
840                    // Note: kill_async uses SIGTERM -> wait ~3s -> SIGKILL strategy,
841                    // and also detects zombie processes, so by the time it returns,
842                    // the process should be fully terminated.
843                    self.upsert_daemon(
844                        UpsertDaemonOpts::builder(id.clone())
845                            .set(|o| {
846                                o.pid = None;
847                                o.status = DaemonStatus::Stopped;
848                                o.last_exit_success = Some(true); // Manual stop is considered successful
849                            })
850                            .build(),
851                    )
852                    .await?;
853                } else {
854                    debug!("pid {pid} not running, process may have exited unexpectedly");
855                    // Process already dead, directly mark as stopped
856                    // Note that the cleanup logic is handled in monitor task
857                    self.upsert_daemon(
858                        UpsertDaemonOpts::builder(id.clone())
859                            .set(|o| {
860                                o.pid = None;
861                                o.status = DaemonStatus::Stopped;
862                            })
863                            .build(),
864                    )
865                    .await?;
866                    return Ok(IpcResponse::DaemonWasNotRunning);
867                }
868                Ok(IpcResponse::Ok)
869            } else {
870                debug!("daemon {id} not running");
871                Ok(IpcResponse::DaemonNotRunning)
872            }
873        } else {
874            debug!("daemon {id} not found");
875            Ok(IpcResponse::DaemonNotFound)
876        }
877    }
878}
879
880/// Check if multiple ports are available and optionally auto-bump to find available ports.
881///
882/// All ports are bumped by the same offset to maintain relative port spacing.
883/// Returns the resolved ports (either the original or bumped ones).
884/// Returns an error if any port is in use and auto_bump is disabled,
885/// or if no available ports can be found after max attempts.
886async fn check_ports_available(
887    expected_ports: &[u16],
888    auto_bump: bool,
889    max_attempts: u32,
890) -> Result<Vec<u16>> {
891    if expected_ports.is_empty() {
892        return Ok(Vec::new());
893    }
894
895    for bump_offset in 0..=max_attempts {
896        // Use wrapping_add to handle overflow correctly - ports wrap around at 65535
897        let candidate_ports: Vec<u16> = expected_ports
898            .iter()
899            .map(|&p| p.wrapping_add(bump_offset as u16))
900            .collect();
901
902        // Check if all ports in this set are available
903        let mut all_available = true;
904        let mut conflicting_port = None;
905
906        for &port in &candidate_ports {
907            // Port 0 is a special case - it requests an ephemeral port from the OS.
908            // Skip the availability check for port 0 since binding to it always succeeds.
909            if port == 0 {
910                continue;
911            }
912
913            // Use spawn_blocking to avoid blocking the async runtime during TCP bind checks
914            // Bind to 0.0.0.0 to detect conflicts on all interfaces, not just localhost
915            //
916            // NOTE: This check has a time-of-check-to-time-of-use (TOCTOU) race condition.
917            // Another process could grab the port between our check and the daemon actually
918            // binding. This is inherent to the approach and acceptable for our use case
919            // since we're primarily detecting conflicts with already-running daemons.
920            let port_check =
921                tokio::task::spawn_blocking(move || match TcpListener::bind(("0.0.0.0", port)) {
922                    Ok(listener) => {
923                        drop(listener);
924                        true
925                    }
926                    Err(_) => false,
927                })
928                .await
929                .into_diagnostic()
930                .wrap_err("failed to check port availability")?;
931
932            if !port_check {
933                all_available = false;
934                conflicting_port = Some(port);
935                break;
936            }
937        }
938
939        if all_available {
940            // Check for overflow (port wrapped around to 0 due to wrapping_add)
941            // If any candidate port is 0 but the original expected port wasn't 0,
942            // it means we've wrapped around and should stop
943            if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
944                return Err(PortError::NoAvailablePort {
945                    start_port: expected_ports[0],
946                    attempts: bump_offset + 1,
947                }
948                .into());
949            }
950            if bump_offset > 0 {
951                info!(
952                    "ports {:?} bumped by {} to {:?}",
953                    expected_ports, bump_offset, candidate_ports
954                );
955            }
956            return Ok(candidate_ports);
957        }
958
959        // Port is in use
960        if bump_offset == 0 {
961            // First attempt - try to get process info using lsof
962            if let Some(port) = conflicting_port {
963                if let Some((pid, process_name)) = get_process_using_port(port).await {
964                    if !auto_bump {
965                        return Err(PortError::InUse {
966                            port,
967                            process: process_name,
968                            pid,
969                        }
970                        .into());
971                    }
972                } else if !auto_bump {
973                    // Couldn't identify process, but port is definitely in use
974                    return Err(PortError::InUse {
975                        port,
976                        process: "unknown".to_string(),
977                        pid: 0,
978                    }
979                    .into());
980                }
981            }
982        }
983    }
984
985    // No available ports found after max attempts
986    Err(PortError::NoAvailablePort {
987        start_port: expected_ports[0],
988        attempts: max_attempts + 1,
989    }
990    .into())
991}
992
993/// Get the process using a specific port.
994///
995/// Returns (pid, process_name) if found, None otherwise.
996async fn get_process_using_port(port: u16) -> Option<(u32, String)> {
997    tokio::task::spawn_blocking(move || {
998        listeners::get_all()
999            .ok()?
1000            .into_iter()
1001            .find(|listener| listener.socket.port() == port)
1002            .map(|listener| (listener.process.pid, listener.process.name))
1003    })
1004    .await
1005    .ok()
1006    .flatten()
1007}