pitchfork_cli/supervisor/
lifecycle.rs

1//! Daemon lifecycle management - start/stop operations
2//!
3//! Contains the core `run()`, `run_once()`, and `stop()` methods for daemon process management.
4
5use super::{SUPERVISOR, Supervisor, UpsertDaemonOpts};
6use crate::daemon::RunOptions;
7use crate::daemon_status::DaemonStatus;
8use crate::ipc::IpcResponse;
9use crate::procs::PROCS;
10use crate::{Result, env};
11use itertools::Itertools;
12use miette::IntoDiagnostic;
13use once_cell::sync::Lazy;
14use regex::Regex;
15use std::collections::HashMap;
16use std::iter::once;
17use std::time::Duration;
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
19use tokio::select;
20use tokio::sync::oneshot;
21use tokio::time;
22
23/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
24static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
25    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
26
27/// Get or compile a regex pattern, caching the result for future use
28pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
29    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
30    if let Some(re) = cache.get(pattern) {
31        return Some(re.clone());
32    }
33    match Regex::new(pattern) {
34        Ok(re) => {
35            cache.insert(pattern.to_string(), re.clone());
36            Some(re)
37        }
38        Err(e) => {
39            error!("invalid regex pattern '{}': {}", pattern, e);
40            None
41        }
42    }
43}
44
45impl Supervisor {
46    /// Run a daemon, handling retries if configured
47    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
48        let id = &opts.id;
49        let cmd = opts.cmd.clone();
50
51        // Clear any pending autostop for this daemon since it's being started
52        {
53            let mut pending = self.pending_autostops.lock().await;
54            if pending.remove(id).is_some() {
55                info!("cleared pending autostop for {} (daemon starting)", id);
56            }
57        }
58
59        let daemon = self.get_daemon(id).await;
60        if let Some(daemon) = daemon {
61            // Stopping state is treated as "not running" - the monitoring task will clean it up
62            // Only check for Running state with a valid PID
63            if !daemon.status.is_stopping()
64                && !daemon.status.is_stopped()
65                && let Some(pid) = daemon.pid
66            {
67                if opts.force {
68                    self.stop(id).await?;
69                    info!("run: stop completed for daemon {id}");
70                } else {
71                    warn!("daemon {id} already running with pid {pid}");
72                    return Ok(IpcResponse::DaemonAlreadyRunning);
73                }
74            }
75        }
76
77        // If wait_ready is true and retry is configured, implement retry loop
78        if opts.wait_ready && opts.retry > 0 {
79            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
80            let max_attempts = opts.retry.saturating_add(1);
81            for attempt in 0..max_attempts {
82                let mut retry_opts = opts.clone();
83                retry_opts.retry_count = attempt;
84                retry_opts.cmd = cmd.clone();
85
86                let result = self.run_once(retry_opts).await?;
87
88                match result {
89                    IpcResponse::DaemonReady { daemon } => {
90                        return Ok(IpcResponse::DaemonReady { daemon });
91                    }
92                    IpcResponse::DaemonFailedWithCode { exit_code } => {
93                        if attempt < opts.retry {
94                            let backoff_secs = 2u64.pow(attempt);
95                            info!(
96                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
97                                attempt + 1,
98                                max_attempts,
99                                backoff_secs
100                            );
101                            time::sleep(Duration::from_secs(backoff_secs)).await;
102                            continue;
103                        } else {
104                            info!("daemon {id} failed after {} attempts", max_attempts);
105                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
106                        }
107                    }
108                    other => return Ok(other),
109                }
110            }
111        }
112
113        // No retry or wait_ready is false
114        self.run_once(opts).await
115    }
116
117    /// Run a daemon once (single attempt)
118    pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
119        let id = &opts.id;
120        let cmd = opts.cmd;
121
122        // Create channel for readiness notification if wait_ready is true
123        let (ready_tx, ready_rx) = if opts.wait_ready {
124            let (tx, rx) = oneshot::channel();
125            (Some(tx), Some(rx))
126        } else {
127            (None, None)
128        };
129
130        let cmd = once("exec".to_string())
131            .chain(cmd.into_iter())
132            .collect_vec();
133        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
134        let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
135        if let Some(parent) = log_path.parent() {
136            xx::file::mkdirp(parent)?;
137        }
138        info!("run: spawning daemon {id} with args: {args:?}");
139        let mut cmd = tokio::process::Command::new("sh");
140        cmd.args(&args)
141            .stdin(std::process::Stdio::null())
142            .stdout(std::process::Stdio::piped())
143            .stderr(std::process::Stdio::piped())
144            .current_dir(&opts.dir);
145
146        // Ensure daemon can find user tools by using the original PATH
147        if let Some(ref path) = *env::ORIGINAL_PATH {
148            cmd.env("PATH", path);
149        }
150
151        let mut child = cmd.spawn().into_diagnostic()?;
152        let pid = match child.id() {
153            Some(p) => p,
154            None => {
155                warn!("Daemon {id} exited before PID could be captured");
156                return Ok(IpcResponse::DaemonFailed {
157                    error: "Process exited immediately".to_string(),
158                });
159            }
160        };
161        info!("started daemon {id} with pid {pid}");
162        let daemon = self
163            .upsert_daemon(UpsertDaemonOpts {
164                id: id.to_string(),
165                pid: Some(pid),
166                status: DaemonStatus::Running,
167                shell_pid: opts.shell_pid,
168                dir: Some(opts.dir.clone()),
169                autostop: opts.autostop,
170                cron_schedule: opts.cron_schedule.clone(),
171                cron_retrigger: opts.cron_retrigger,
172                last_exit_success: None,
173                retry: Some(opts.retry),
174                retry_count: Some(opts.retry_count),
175                ready_delay: opts.ready_delay,
176                ready_output: opts.ready_output.clone(),
177                ready_http: opts.ready_http.clone(),
178                ready_port: opts.ready_port,
179                depends: Some(opts.depends.clone()),
180            })
181            .await?;
182
183        let id_clone = id.to_string();
184        let ready_delay = opts.ready_delay;
185        let ready_output = opts.ready_output.clone();
186        let ready_http = opts.ready_http.clone();
187        let ready_port = opts.ready_port;
188
189        tokio::spawn(async move {
190            let id = id_clone;
191            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
192                (Some(out), Some(err)) => (out, err),
193                _ => {
194                    error!("Failed to capture stdout/stderr for daemon {id}");
195                    return;
196                }
197            };
198            let mut stdout = tokio::io::BufReader::new(stdout).lines();
199            let mut stderr = tokio::io::BufReader::new(stderr).lines();
200            let log_file = match tokio::fs::File::options()
201                .append(true)
202                .create(true)
203                .open(&log_path)
204                .await
205            {
206                Ok(f) => f,
207                Err(e) => {
208                    error!("Failed to open log file for daemon {id}: {e}");
209                    return;
210                }
211            };
212            let mut log_appender = BufWriter::new(log_file);
213
214            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
215            let format_line = |line: String| {
216                if line.starts_with(&format!("{id} ")) {
217                    // mise tasks often already have the id printed
218                    format!("{} {line}\n", now())
219                } else {
220                    format!("{} {id} {line}\n", now())
221                }
222            };
223
224            // Setup readiness checking
225            let mut ready_notified = false;
226            let mut ready_tx = ready_tx;
227            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
228
229            let mut delay_timer =
230                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
231
232            // Setup HTTP readiness check interval (poll every 500ms)
233            let mut http_check_interval = ready_http
234                .as_ref()
235                .map(|_| tokio::time::interval(Duration::from_millis(500)));
236            let http_client = ready_http.as_ref().map(|_| {
237                reqwest::Client::builder()
238                    .timeout(Duration::from_secs(5))
239                    .build()
240                    .unwrap_or_default()
241            });
242
243            // Setup TCP port readiness check interval (poll every 500ms)
244            let mut port_check_interval =
245                ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
246
247            // Setup periodic log flush interval (every 500ms - balances I/O reduction with responsiveness)
248            let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
249
250            // Use a channel to communicate process exit status
251            let (exit_tx, mut exit_rx) =
252                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
253
254            // Spawn a task to wait for process exit
255            let child_pid = child.id().unwrap_or(0);
256            tokio::spawn(async move {
257                let result = child.wait().await;
258                debug!(
259                    "daemon pid {child_pid} wait() completed with result: {:?}",
260                    result
261                );
262                let _ = exit_tx.send(result).await;
263            });
264
265            #[allow(unused_assignments)]
266            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
267            let mut exit_status = None;
268
269            loop {
270                select! {
271                    Ok(Some(line)) = stdout.next_line() => {
272                        let formatted = format_line(line.clone());
273                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
274                            error!("Failed to write to log for daemon {id}: {e}");
275                        }
276                        trace!("stdout: {id} {formatted}");
277
278                        // Check if output matches ready pattern
279                        if !ready_notified
280                            && let Some(ref pattern) = ready_pattern
281                                && pattern.is_match(&line) {
282                                    info!("daemon {id} ready: output matched pattern");
283                                    ready_notified = true;
284                                    // Flush logs before notifying so clients see logs immediately
285                                    let _ = log_appender.flush().await;
286                                    if let Some(tx) = ready_tx.take() {
287                                        let _ = tx.send(Ok(()));
288                                    }
289                                }
290                    }
291                    Ok(Some(line)) = stderr.next_line() => {
292                        let formatted = format_line(line.clone());
293                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
294                            error!("Failed to write to log for daemon {id}: {e}");
295                        }
296                        trace!("stderr: {id} {formatted}");
297
298                        // Check if output matches ready pattern (also check stderr)
299                        if !ready_notified
300                            && let Some(ref pattern) = ready_pattern
301                                && pattern.is_match(&line) {
302                                    info!("daemon {id} ready: output matched pattern");
303                                    ready_notified = true;
304                                    // Flush logs before notifying so clients see logs immediately
305                                    let _ = log_appender.flush().await;
306                                    if let Some(tx) = ready_tx.take() {
307                                        let _ = tx.send(Ok(()));
308                                    }
309                                }
310                    },
311                    Some(result) = exit_rx.recv() => {
312                        // Process exited - save exit status and notify if not ready yet
313                        exit_status = Some(result);
314                        debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
315                        // Flush logs before notifying so clients see logs immediately
316                        let _ = log_appender.flush().await;
317                        if !ready_notified {
318                            if let Some(tx) = ready_tx.take() {
319                                // Check if process exited successfully
320                                let is_success = exit_status.as_ref()
321                                    .and_then(|r| r.as_ref().ok())
322                                    .map(|s| s.success())
323                                    .unwrap_or(false);
324
325                                if is_success {
326                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
327                                    let _ = tx.send(Ok(()));
328                                } else {
329                                    let exit_code = exit_status.as_ref()
330                                        .and_then(|r| r.as_ref().ok())
331                                        .and_then(|s| s.code());
332                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
333                                    let _ = tx.send(Err(exit_code));
334                                }
335                            }
336                        } else {
337                            debug!("daemon {id} was already marked ready, not sending notification");
338                        }
339                        break;
340                    }
341                    _ = async {
342                        if let Some(ref mut interval) = http_check_interval {
343                            interval.tick().await;
344                        } else {
345                            std::future::pending::<()>().await;
346                        }
347                    }, if !ready_notified && ready_http.is_some() => {
348                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
349                            match client.get(url).send().await {
350                                Ok(response) if response.status().is_success() => {
351                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
352                                    ready_notified = true;
353                                    // Flush logs before notifying so clients see logs immediately
354                                    let _ = log_appender.flush().await;
355                                    if let Some(tx) = ready_tx.take() {
356                                        let _ = tx.send(Ok(()));
357                                    }
358                                    // Stop checking once ready
359                                    http_check_interval = None;
360                                }
361                                Ok(response) => {
362                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
363                                }
364                                Err(e) => {
365                                    trace!("daemon {id} HTTP check failed: {e}");
366                                }
367                            }
368                        }
369                    }
370                    _ = async {
371                        if let Some(ref mut interval) = port_check_interval {
372                            interval.tick().await;
373                        } else {
374                            std::future::pending::<()>().await;
375                        }
376                    }, if !ready_notified && ready_port.is_some() => {
377                        if let Some(port) = ready_port {
378                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
379                                Ok(_) => {
380                                    info!("daemon {id} ready: TCP port {port} is listening");
381                                    ready_notified = true;
382                                    // Flush logs before notifying so clients see logs immediately
383                                    let _ = log_appender.flush().await;
384                                    if let Some(tx) = ready_tx.take() {
385                                        let _ = tx.send(Ok(()));
386                                    }
387                                    // Stop checking once ready
388                                    port_check_interval = None;
389                                }
390                                Err(_) => {
391                                    trace!("daemon {id} port check: port {port} not listening yet");
392                                }
393                            }
394                        }
395                    }
396                    _ = async {
397                        if let Some(ref mut timer) = delay_timer {
398                            timer.await;
399                        } else {
400                            std::future::pending::<()>().await;
401                        }
402                    } => {
403                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
404                            info!("daemon {id} ready: delay elapsed");
405                            ready_notified = true;
406                            // Flush logs before notifying so clients see logs immediately
407                            let _ = log_appender.flush().await;
408                            if let Some(tx) = ready_tx.take() {
409                                let _ = tx.send(Ok(()));
410                            }
411                        }
412                        // Disable timer after it fires
413                        delay_timer = None;
414                    }
415                    _ = log_flush_interval.tick() => {
416                        // Periodic flush to ensure logs are written to disk
417                        if let Err(e) = log_appender.flush().await {
418                            error!("Failed to flush log for daemon {id}: {e}");
419                        }
420                    }
421                    // Note: No `else => break` because log_flush_interval.tick() is always available,
422                    // making the else branch unreachable. The loop exits via the exit_rx.recv() branch.
423                }
424            }
425
426            // Final flush to ensure all buffered logs are written
427            if let Err(e) = log_appender.flush().await {
428                error!("Failed to final flush log for daemon {id}: {e}");
429            }
430
431            // Get the final exit status
432            let exit_status = if let Some(status) = exit_status {
433                status
434            } else {
435                // Streams closed but process hasn't exited yet, wait for it
436                match exit_rx.recv().await {
437                    Some(status) => status,
438                    None => {
439                        warn!("daemon {id} exit channel closed without receiving status");
440                        Err(std::io::Error::other("exit channel closed"))
441                    }
442                }
443            };
444            let current_daemon = SUPERVISOR.get_daemon(&id).await;
445
446            // Check if this monitoring task is for the current daemon process
447            if current_daemon.is_none()
448                || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
449            {
450                // Another process has taken over, don't update status
451                return;
452            }
453            let is_stopping = current_daemon
454                .as_ref()
455                .is_some_and(|d| d.status.is_stopping());
456
457            if current_daemon.is_some_and(|d| d.status.is_stopped()) {
458                // was stopped by this supervisor so don't update status
459                return;
460            }
461            if let Ok(status) = exit_status {
462                info!("daemon {id} exited with status {status}");
463                if status.success() || is_stopping {
464                    // If stopping, always mark as Stopped with success
465                    // This allows monitoring task to clear PID after stop() was called
466                    if let Err(e) = SUPERVISOR
467                        .upsert_daemon(UpsertDaemonOpts {
468                            id: id.clone(),
469                            pid: None, // Clear PID now that process has exited
470                            status: DaemonStatus::Stopped,
471                            last_exit_success: Some(status.success()),
472                            ..Default::default()
473                        })
474                        .await
475                    {
476                        error!("Failed to update daemon state for {id}: {e}");
477                    }
478                } else {
479                    // Handle error exit - mark for retry
480                    // retry_count increment will be handled by interval_watch
481                    if let Err(e) = SUPERVISOR
482                        .upsert_daemon(UpsertDaemonOpts {
483                            id: id.clone(),
484                            pid: None,
485                            status: DaemonStatus::Errored(status.code()),
486                            last_exit_success: Some(false),
487                            ..Default::default()
488                        })
489                        .await
490                    {
491                        error!("Failed to update daemon state for {id}: {e}");
492                    }
493                }
494            } else if let Err(e) = SUPERVISOR
495                .upsert_daemon(UpsertDaemonOpts {
496                    id: id.clone(),
497                    pid: None,
498                    status: DaemonStatus::Errored(None),
499                    last_exit_success: Some(false),
500                    ..Default::default()
501                })
502                .await
503            {
504                error!("Failed to update daemon state for {id}: {e}");
505            }
506        });
507
508        // If wait_ready is true, wait for readiness notification
509        if let Some(ready_rx) = ready_rx {
510            match ready_rx.await {
511                Ok(Ok(())) => {
512                    info!("daemon {id} is ready");
513                    Ok(IpcResponse::DaemonReady { daemon })
514                }
515                Ok(Err(exit_code)) => {
516                    error!("daemon {id} failed before becoming ready");
517                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
518                }
519                Err(_) => {
520                    error!("readiness channel closed unexpectedly for daemon {id}");
521                    Ok(IpcResponse::DaemonStart { daemon })
522                }
523            }
524        } else {
525            Ok(IpcResponse::DaemonStart { daemon })
526        }
527    }
528
529    /// Stop a running daemon
530    pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
531        if id == "pitchfork" {
532            return Ok(IpcResponse::Error(
533                "Cannot stop supervisor via stop command".into(),
534            ));
535        }
536        info!("stopping daemon: {id}");
537        if let Some(daemon) = self.get_daemon(id).await {
538            trace!("daemon to stop: {daemon}");
539            if let Some(pid) = daemon.pid {
540                trace!("killing pid: {pid}");
541                PROCS.refresh_processes();
542                if PROCS.is_running(pid) {
543                    // First set status to Stopping (keeps PID for monitoring task)
544                    self.upsert_daemon(UpsertDaemonOpts {
545                        id: id.to_string(),
546                        status: DaemonStatus::Stopping,
547                        ..Default::default()
548                    })
549                    .await?;
550
551                    // Then kill the process
552                    if let Err(e) = PROCS.kill_async(pid).await {
553                        warn!("failed to kill pid {pid}: {e}");
554                    }
555                    PROCS.refresh_processes();
556                    for child_pid in PROCS.all_children(pid) {
557                        debug!("killing child pid: {child_pid}");
558                        if let Err(e) = PROCS.kill_async(child_pid).await {
559                            warn!("failed to kill child pid {child_pid}: {e}");
560                        }
561                    }
562                    // Monitoring task will clear PID and set to Stopped when it detects exit
563                } else {
564                    debug!("pid {pid} not running");
565                    // Process already dead, directly mark as stopped
566                    self.upsert_daemon(UpsertDaemonOpts {
567                        id: id.to_string(),
568                        pid: None,
569                        status: DaemonStatus::Stopped,
570                        ..Default::default()
571                    })
572                    .await?;
573                }
574                return Ok(IpcResponse::Ok);
575            } else {
576                debug!("daemon {id} not running");
577            }
578        } else {
579            debug!("daemon {id} not found");
580        }
581        Ok(IpcResponse::DaemonAlreadyStopped)
582    }
583}