pitchfork_cli/
supervisor.rs

1use crate::daemon::{Daemon, RunOptions};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::IpcServer;
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{Result, env};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use std::collections::HashMap;
14use std::fs;
15use std::iter::once;
16use std::path::{Path, PathBuf};
17use std::process::exit;
18use std::sync::atomic;
19use std::sync::atomic::AtomicBool;
20use std::time::Duration;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
22#[cfg(unix)]
23use tokio::signal::unix::SignalKind;
24use tokio::sync::Mutex;
25use tokio::sync::oneshot;
26use tokio::{select, signal, time};
27
28pub struct Supervisor {
29    state_file: Mutex<StateFile>,
30    pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
31    last_refreshed_at: Mutex<time::Instant>,
32    /// Map of daemon ID to scheduled autostop time
33    pending_autostops: Mutex<HashMap<String, time::Instant>>,
34}
35
36fn interval_duration() -> Duration {
37    Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
38}
39
40pub static SUPERVISOR: Lazy<Supervisor> =
41    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
42
43pub fn start_if_not_running() -> Result<()> {
44    let sf = StateFile::get();
45    if let Some(d) = sf.daemons.get("pitchfork")
46        && let Some(pid) = d.pid
47        && PROCS.is_running(pid)
48    {
49        return Ok(());
50    }
51    start_in_background()
52}
53
54pub fn start_in_background() -> Result<()> {
55    debug!("starting supervisor in background");
56    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
57        .stdout_null()
58        .stderr_null()
59        .start()
60        .into_diagnostic()?;
61    Ok(())
62}
63
64impl Supervisor {
65    pub fn new() -> Result<Self> {
66        Ok(Self {
67            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
68            last_refreshed_at: Mutex::new(time::Instant::now()),
69            pending_notifications: Mutex::new(vec![]),
70            pending_autostops: Mutex::new(HashMap::new()),
71        })
72    }
73
74    pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
75        let pid = std::process::id();
76        info!("Starting supervisor with pid {pid}");
77
78        self.upsert_daemon(UpsertDaemonOpts {
79            id: "pitchfork".to_string(),
80            pid: Some(pid),
81            status: DaemonStatus::Running,
82            ..Default::default()
83        })
84        .await?;
85
86        // If this is a boot start, automatically start boot_start daemons
87        if is_boot {
88            info!("Boot start mode enabled, starting boot_start daemons");
89            self.start_boot_daemons().await?;
90        }
91
92        self.interval_watch()?;
93        self.cron_watch()?;
94        self.signals()?;
95        // self.file_watch().await?;
96
97        // Start web server if port is configured
98        if let Some(port) = web_port {
99            tokio::spawn(async move {
100                if let Err(e) = crate::web::serve(port).await {
101                    error!("Web server error: {}", e);
102                }
103            });
104        }
105
106        let ipc = IpcServer::new()?;
107        self.conn_watch(ipc).await
108    }
109
110    async fn refresh(&self) -> Result<()> {
111        trace!("refreshing");
112        PROCS.refresh_processes();
113        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
114        *last_refreshed_at = time::Instant::now();
115
116        for (dir, pids) in self.get_dirs_with_shell_pids().await {
117            let to_remove = pids
118                .iter()
119                .filter(|pid| !PROCS.is_running(**pid))
120                .collect_vec();
121            for pid in &to_remove {
122                self.remove_shell_pid(**pid).await?
123            }
124            if to_remove.len() == pids.len() {
125                self.leave_dir(&dir).await?;
126            }
127        }
128
129        self.check_retry().await?;
130        self.process_pending_autostops().await?;
131
132        Ok(())
133    }
134
135    async fn check_retry(&self) -> Result<()> {
136        let state_file = self.state_file.lock().await;
137        let daemons_to_retry: Vec<(String, Daemon)> = state_file
138            .daemons
139            .iter()
140            .filter(|(_id, d)| {
141                // Daemon is errored, not currently running, and has retries remaining
142                d.status.is_errored() && d.pid.is_none() && d.retry > 0 && d.retry_count < d.retry
143            })
144            .map(|(id, d)| (id.clone(), d.clone()))
145            .collect();
146        drop(state_file);
147
148        for (id, daemon) in daemons_to_retry {
149            info!(
150                "retrying daemon {} ({}/{} attempts)",
151                id,
152                daemon.retry_count + 1,
153                daemon.retry
154            );
155
156            // Get command from pitchfork.toml
157            if let Some(run_cmd) = self.get_daemon_run_command(&id) {
158                let retry_opts = RunOptions {
159                    id: id.clone(),
160                    cmd: shell_words::split(&run_cmd).unwrap_or_default(),
161                    force: false,
162                    shell_pid: daemon.shell_pid,
163                    dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
164                    autostop: daemon.autostop,
165                    cron_schedule: daemon.cron_schedule,
166                    cron_retrigger: daemon.cron_retrigger,
167                    retry: daemon.retry,
168                    retry_count: daemon.retry_count + 1,
169                    ready_delay: daemon.ready_delay,
170                    ready_output: daemon.ready_output.clone(),
171                    ready_http: daemon.ready_http.clone(),
172                    ready_port: daemon.ready_port,
173                    wait_ready: false,
174                };
175                if let Err(e) = self.run(retry_opts).await {
176                    error!("failed to retry daemon {}: {}", id, e);
177                }
178            } else {
179                warn!("no run command found for daemon {}, cannot retry", id);
180                // Mark as exhausted
181                self.upsert_daemon(UpsertDaemonOpts {
182                    id,
183                    retry_count: Some(daemon.retry),
184                    ..Default::default()
185                })
186                .await?;
187            }
188        }
189
190        Ok(())
191    }
192
193    async fn leave_dir(&self, dir: &Path) -> Result<()> {
194        debug!("left dir {}", dir.display());
195        let shell_dirs = self.get_dirs_with_shell_pids().await;
196        let shell_dirs = shell_dirs.keys().collect_vec();
197        let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
198
199        for daemon in self.active_daemons().await {
200            if !daemon.autostop {
201                continue;
202            }
203            // if this daemon's dir starts with the left dir
204            // and no other shell pid has this dir as a prefix
205            // schedule the daemon for autostop
206            if let Some(daemon_dir) = daemon.dir.as_ref()
207                && daemon_dir.starts_with(dir)
208                && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
209            {
210                if delay_secs == 0 {
211                    // No delay configured, stop immediately
212                    info!("autostopping {daemon}");
213                    self.stop(&daemon.id).await?;
214                    self.add_notification(Info, format!("autostopped {daemon}"))
215                        .await;
216                } else {
217                    // Schedule autostop with delay
218                    let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
219                    let mut pending = self.pending_autostops.lock().await;
220                    if !pending.contains_key(&daemon.id) {
221                        info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
222                        pending.insert(daemon.id.clone(), stop_at);
223                    }
224                }
225            }
226        }
227        Ok(())
228    }
229
230    /// Cancel any pending autostop for daemons in the given directory
231    /// Also cancels autostops for daemons in parent directories (e.g., entering /project/subdir
232    /// cancels pending autostop for daemon in /project)
233    async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
234        let mut pending = self.pending_autostops.lock().await;
235        let daemons_to_cancel: Vec<String> = {
236            let state_file = self.state_file.lock().await;
237            state_file
238                .daemons
239                .iter()
240                .filter(|(_id, d)| {
241                    d.dir.as_ref().is_some_and(|daemon_dir| {
242                        // Cancel if entering a directory inside or equal to daemon's directory
243                        // OR if daemon is in a subdirectory of the entered directory
244                        dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
245                    })
246                })
247                .map(|(id, _)| id.clone())
248                .collect()
249        };
250
251        for daemon_id in daemons_to_cancel {
252            if pending.remove(&daemon_id).is_some() {
253                info!("cancelled pending autostop for {}", daemon_id);
254            }
255        }
256    }
257
258    /// Process any pending autostops that have reached their scheduled time
259    async fn process_pending_autostops(&self) -> Result<()> {
260        let now = time::Instant::now();
261        let to_stop: Vec<String> = {
262            let pending = self.pending_autostops.lock().await;
263            pending
264                .iter()
265                .filter(|(_, stop_at)| now >= **stop_at)
266                .map(|(id, _)| id.clone())
267                .collect()
268        };
269
270        for daemon_id in to_stop {
271            // Remove from pending first
272            {
273                let mut pending = self.pending_autostops.lock().await;
274                pending.remove(&daemon_id);
275            }
276
277            // Check if daemon is still running and should be stopped
278            if let Some(daemon) = self.get_daemon(&daemon_id).await
279                && daemon.autostop
280                && daemon.status.is_running()
281            {
282                // Verify no shell is in the daemon's directory
283                let shell_dirs = self.get_dirs_with_shell_pids().await;
284                let shell_dirs = shell_dirs.keys().collect_vec();
285                if let Some(daemon_dir) = daemon.dir.as_ref()
286                    && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
287                {
288                    info!("autostopping {} (after delay)", daemon_id);
289                    self.stop(&daemon_id).await?;
290                    self.add_notification(Info, format!("autostopped {daemon_id}"))
291                        .await;
292                }
293            }
294        }
295        Ok(())
296    }
297
298    async fn start_boot_daemons(&self) -> Result<()> {
299        use crate::pitchfork_toml::PitchforkToml;
300
301        info!("Scanning for boot_start daemons");
302        let pt = PitchforkToml::all_merged();
303
304        let boot_daemons: Vec<_> = pt
305            .daemons
306            .iter()
307            .filter(|(_id, d)| d.boot_start.unwrap_or(false))
308            .collect();
309
310        if boot_daemons.is_empty() {
311            info!("No daemons configured with boot_start = true");
312            return Ok(());
313        }
314
315        info!("Found {} daemon(s) to start at boot", boot_daemons.len());
316
317        for (id, daemon) in boot_daemons {
318            info!("Starting boot daemon: {}", id);
319
320            let dir = daemon
321                .path
322                .as_ref()
323                .and_then(|p| p.parent())
324                .map(|p| p.to_path_buf())
325                .unwrap_or_else(|| env::CWD.clone());
326
327            let run_opts = RunOptions {
328                id: id.clone(),
329                cmd: shell_words::split(&daemon.run).unwrap_or_default(),
330                force: false,
331                shell_pid: None,
332                dir,
333                autostop: false, // Boot daemons should not autostop
334                cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
335                cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
336                retry: daemon.retry,
337                retry_count: 0,
338                ready_delay: daemon.ready_delay,
339                ready_output: daemon.ready_output.clone(),
340                ready_http: daemon.ready_http.clone(),
341                ready_port: daemon.ready_port,
342                wait_ready: false, // Don't block on boot daemons
343            };
344
345            match self.run(run_opts).await {
346                Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
347                    info!("Successfully started boot daemon: {}", id);
348                }
349                Ok(IpcResponse::DaemonAlreadyRunning) => {
350                    info!("Boot daemon already running: {}", id);
351                }
352                Ok(other) => {
353                    warn!(
354                        "Unexpected response when starting boot daemon {}: {:?}",
355                        id, other
356                    );
357                }
358                Err(e) => {
359                    error!("Failed to start boot daemon {}: {}", id, e);
360                }
361            }
362        }
363
364        Ok(())
365    }
366
367    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
368        let id = &opts.id;
369        let cmd = opts.cmd.clone();
370
371        // Clear any pending autostop for this daemon since it's being started
372        {
373            let mut pending = self.pending_autostops.lock().await;
374            if pending.remove(id).is_some() {
375                info!("cleared pending autostop for {} (daemon starting)", id);
376            }
377        }
378
379        let daemon = self.get_daemon(id).await;
380        if let Some(daemon) = daemon {
381            // Stopping state is treated as "not running" - the monitoring task will clean it up
382            // Only check for Running state with a valid PID
383            if !daemon.status.is_stopping()
384                && !daemon.status.is_stopped()
385                && let Some(pid) = daemon.pid
386            {
387                if opts.force {
388                    self.stop(id).await?;
389                    info!("run: stop completed for daemon {id}");
390                } else {
391                    warn!("daemon {id} already running with pid {pid}");
392                    return Ok(IpcResponse::DaemonAlreadyRunning);
393                }
394            }
395        }
396
397        // If wait_ready is true and retry is configured, implement retry loop
398        if opts.wait_ready && opts.retry > 0 {
399            let max_attempts = opts.retry + 1; // initial attempt + retries
400            for attempt in 0..max_attempts {
401                let mut retry_opts = opts.clone();
402                retry_opts.retry_count = attempt;
403                retry_opts.cmd = cmd.clone();
404
405                let result = self.run_once(retry_opts).await?;
406
407                match result {
408                    IpcResponse::DaemonReady { daemon } => {
409                        return Ok(IpcResponse::DaemonReady { daemon });
410                    }
411                    IpcResponse::DaemonFailedWithCode { exit_code } => {
412                        if attempt < opts.retry {
413                            let backoff_secs = 2u64.pow(attempt);
414                            info!(
415                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
416                                attempt + 1,
417                                max_attempts,
418                                backoff_secs
419                            );
420                            time::sleep(Duration::from_secs(backoff_secs)).await;
421                            continue;
422                        } else {
423                            info!("daemon {id} failed after {} attempts", max_attempts);
424                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
425                        }
426                    }
427                    other => return Ok(other),
428                }
429            }
430        }
431
432        // No retry or wait_ready is false
433        self.run_once(opts).await
434    }
435
436    async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
437        let id = &opts.id;
438        let cmd = opts.cmd;
439
440        // Create channel for readiness notification if wait_ready is true
441        let (ready_tx, ready_rx) = if opts.wait_ready {
442            let (tx, rx) = oneshot::channel();
443            (Some(tx), Some(rx))
444        } else {
445            (None, None)
446        };
447
448        let cmd = once("exec".to_string())
449            .chain(cmd.into_iter())
450            .collect_vec();
451        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
452        let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
453        if let Some(parent) = log_path.parent() {
454            xx::file::mkdirp(parent)?;
455        }
456        info!("run: spawning daemon {id} with args: {args:?}");
457        let mut cmd = tokio::process::Command::new("sh");
458        cmd.args(&args)
459            .stdin(std::process::Stdio::null())
460            .stdout(std::process::Stdio::piped())
461            .stderr(std::process::Stdio::piped())
462            .current_dir(&opts.dir);
463
464        // Ensure daemon can find user tools by using the original PATH
465        if let Some(ref path) = *env::ORIGINAL_PATH {
466            cmd.env("PATH", path);
467        }
468
469        let mut child = cmd.spawn().into_diagnostic()?;
470        let pid = match child.id() {
471            Some(p) => p,
472            None => {
473                warn!("Daemon {id} exited before PID could be captured");
474                return Ok(IpcResponse::DaemonFailed {
475                    error: "Process exited immediately".to_string(),
476                });
477            }
478        };
479        info!("started daemon {id} with pid {pid}");
480        let daemon = self
481            .upsert_daemon(UpsertDaemonOpts {
482                id: id.to_string(),
483                pid: Some(pid),
484                status: DaemonStatus::Running,
485                shell_pid: opts.shell_pid,
486                dir: Some(opts.dir.clone()),
487                autostop: opts.autostop,
488                cron_schedule: opts.cron_schedule.clone(),
489                cron_retrigger: opts.cron_retrigger,
490                last_exit_success: None,
491                retry: Some(opts.retry),
492                retry_count: Some(opts.retry_count),
493                ready_delay: opts.ready_delay,
494                ready_output: opts.ready_output.clone(),
495                ready_http: opts.ready_http.clone(),
496                ready_port: opts.ready_port,
497            })
498            .await?;
499
500        let id_clone = id.to_string();
501        let ready_delay = opts.ready_delay;
502        let ready_output = opts.ready_output.clone();
503        let ready_http = opts.ready_http.clone();
504        let ready_port = opts.ready_port;
505
506        tokio::spawn(async move {
507            let id = id_clone;
508            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
509                (Some(out), Some(err)) => (out, err),
510                _ => {
511                    error!("Failed to capture stdout/stderr for daemon {id}");
512                    return;
513                }
514            };
515            let mut stdout = tokio::io::BufReader::new(stdout).lines();
516            let mut stderr = tokio::io::BufReader::new(stderr).lines();
517            let log_file = match tokio::fs::File::options()
518                .append(true)
519                .create(true)
520                .open(&log_path)
521                .await
522            {
523                Ok(f) => f,
524                Err(e) => {
525                    error!("Failed to open log file for daemon {id}: {e}");
526                    return;
527                }
528            };
529            let mut log_appender = BufWriter::new(log_file);
530
531            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
532            let format_line = |line: String| {
533                if line.starts_with(&format!("{id} ")) {
534                    // mise tasks often already have the id printed
535                    format!("{} {line}\n", now())
536                } else {
537                    format!("{} {id} {line}\n", now())
538                }
539            };
540
541            // Setup readiness checking
542            let mut ready_notified = false;
543            let mut ready_tx = ready_tx;
544            let ready_pattern =
545                ready_output
546                    .as_ref()
547                    .and_then(|pattern| match regex::Regex::new(pattern) {
548                        Ok(re) => Some(re),
549                        Err(e) => {
550                            error!("invalid regex pattern for daemon {id}: {e}");
551                            None
552                        }
553                    });
554
555            let mut delay_timer =
556                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
557
558            // Setup HTTP readiness check interval (poll every 500ms)
559            let mut http_check_interval = ready_http
560                .as_ref()
561                .map(|_| tokio::time::interval(Duration::from_millis(500)));
562            let http_client = ready_http.as_ref().map(|_| {
563                reqwest::Client::builder()
564                    .timeout(Duration::from_secs(5))
565                    .build()
566                    .unwrap_or_default()
567            });
568
569            // Setup TCP port readiness check interval (poll every 500ms)
570            let mut port_check_interval =
571                ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
572
573            // Use a channel to communicate process exit status
574            let (exit_tx, mut exit_rx) =
575                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
576
577            // Spawn a task to wait for process exit
578            let child_pid = child.id().unwrap_or(0);
579            tokio::spawn(async move {
580                let result = child.wait().await;
581                debug!(
582                    "daemon pid {child_pid} wait() completed with result: {:?}",
583                    result
584                );
585                let _ = exit_tx.send(result).await;
586            });
587
588            let mut exit_status = None;
589
590            loop {
591                select! {
592                    Ok(Some(line)) = stdout.next_line() => {
593                        let formatted = format_line(line.clone());
594                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
595                            error!("Failed to write to log for daemon {id}: {e}");
596                        }
597                        if let Err(e) = log_appender.flush().await {
598                            error!("Failed to flush log for daemon {id}: {e}");
599                        }
600                        trace!("stdout: {id} {formatted}");
601
602                        // Check if output matches ready pattern
603                        if !ready_notified
604                            && let Some(ref pattern) = ready_pattern
605                                && pattern.is_match(&line) {
606                                    info!("daemon {id} ready: output matched pattern");
607                                    ready_notified = true;
608                                    if let Some(tx) = ready_tx.take() {
609                                        let _ = tx.send(Ok(()));
610                                    }
611                                }
612                    }
613                    Ok(Some(line)) = stderr.next_line() => {
614                        let formatted = format_line(line.clone());
615                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
616                            error!("Failed to write to log for daemon {id}: {e}");
617                        }
618                        if let Err(e) = log_appender.flush().await {
619                            error!("Failed to flush log for daemon {id}: {e}");
620                        }
621                        trace!("stderr: {id} {formatted}");
622
623                        // Check if output matches ready pattern (also check stderr)
624                        if !ready_notified
625                            && let Some(ref pattern) = ready_pattern
626                                && pattern.is_match(&line) {
627                                    info!("daemon {id} ready: output matched pattern");
628                                    ready_notified = true;
629                                    if let Some(tx) = ready_tx.take() {
630                                        let _ = tx.send(Ok(()));
631                                    }
632                                }
633                    },
634                    Some(result) = exit_rx.recv() => {
635                        // Process exited - save exit status and notify if not ready yet
636                        exit_status = Some(result);
637                        debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
638                        if !ready_notified {
639                            if let Some(tx) = ready_tx.take() {
640                                // Check if process exited successfully
641                                let is_success = exit_status.as_ref()
642                                    .and_then(|r| r.as_ref().ok())
643                                    .map(|s| s.success())
644                                    .unwrap_or(false);
645
646                                if is_success {
647                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
648                                    let _ = tx.send(Ok(()));
649                                } else {
650                                    let exit_code = exit_status.as_ref()
651                                        .and_then(|r| r.as_ref().ok())
652                                        .and_then(|s| s.code());
653                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
654                                    let _ = tx.send(Err(exit_code));
655                                }
656                            }
657                        } else {
658                            debug!("daemon {id} was already marked ready, not sending notification");
659                        }
660                        break;
661                    }
662                    _ = async {
663                        if let Some(ref mut interval) = http_check_interval {
664                            interval.tick().await;
665                        } else {
666                            std::future::pending::<()>().await;
667                        }
668                    }, if !ready_notified && ready_http.is_some() => {
669                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
670                            match client.get(url).send().await {
671                                Ok(response) if response.status().is_success() => {
672                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
673                                    ready_notified = true;
674                                    if let Some(tx) = ready_tx.take() {
675                                        let _ = tx.send(Ok(()));
676                                    }
677                                    // Stop checking once ready
678                                    http_check_interval = None;
679                                }
680                                Ok(response) => {
681                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
682                                }
683                                Err(e) => {
684                                    trace!("daemon {id} HTTP check failed: {e}");
685                                }
686                            }
687                        }
688                    }
689                    _ = async {
690                        if let Some(ref mut interval) = port_check_interval {
691                            interval.tick().await;
692                        } else {
693                            std::future::pending::<()>().await;
694                        }
695                    }, if !ready_notified && ready_port.is_some() => {
696                        if let Some(port) = ready_port {
697                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
698                                Ok(_) => {
699                                    info!("daemon {id} ready: TCP port {port} is listening");
700                                    ready_notified = true;
701                                    if let Some(tx) = ready_tx.take() {
702                                        let _ = tx.send(Ok(()));
703                                    }
704                                    // Stop checking once ready
705                                    port_check_interval = None;
706                                }
707                                Err(_) => {
708                                    trace!("daemon {id} port check: port {port} not listening yet");
709                                }
710                            }
711                        }
712                    }
713                    _ = async {
714                        if let Some(ref mut timer) = delay_timer {
715                            timer.await;
716                        } else {
717                            std::future::pending::<()>().await;
718                        }
719                    } => {
720                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
721                            info!("daemon {id} ready: delay elapsed");
722                            ready_notified = true;
723                            if let Some(tx) = ready_tx.take() {
724                                let _ = tx.send(Ok(()));
725                            }
726                        }
727                        // Disable timer after it fires
728                        delay_timer = None;
729                    }
730                    else => break,
731                }
732            }
733
734            // Get the final exit status
735            let exit_status = if let Some(status) = exit_status {
736                status
737            } else {
738                // Streams closed but process hasn't exited yet, wait for it
739                match exit_rx.recv().await {
740                    Some(status) => status,
741                    None => {
742                        warn!("daemon {id} exit channel closed without receiving status");
743                        Err(std::io::Error::other("exit channel closed"))
744                    }
745                }
746            };
747            let current_daemon = SUPERVISOR.get_daemon(&id).await;
748
749            // Check if this monitoring task is for the current daemon process
750            if current_daemon.is_none()
751                || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
752            {
753                // Another process has taken over, don't update status
754                return;
755            }
756            let is_stopping = current_daemon
757                .as_ref()
758                .is_some_and(|d| d.status.is_stopping());
759
760            if current_daemon.is_some_and(|d| d.status.is_stopped()) {
761                // was stopped by this supervisor so don't update status
762                return;
763            }
764            if let Ok(status) = exit_status {
765                info!("daemon {id} exited with status {status}");
766                if status.success() || is_stopping {
767                    // If stopping, always mark as Stopped with success
768                    // This allows monitoring task to clear PID after stop() was called
769                    if let Err(e) = SUPERVISOR
770                        .upsert_daemon(UpsertDaemonOpts {
771                            id: id.clone(),
772                            pid: None, // Clear PID now that process has exited
773                            status: DaemonStatus::Stopped,
774                            last_exit_success: Some(status.success()),
775                            ..Default::default()
776                        })
777                        .await
778                    {
779                        error!("Failed to update daemon state for {id}: {e}");
780                    }
781                } else {
782                    // Handle error exit - mark for retry
783                    // retry_count increment will be handled by interval_watch
784                    if let Err(e) = SUPERVISOR
785                        .upsert_daemon(UpsertDaemonOpts {
786                            id: id.clone(),
787                            pid: None,
788                            status: DaemonStatus::Errored(status.code()),
789                            last_exit_success: Some(false),
790                            ..Default::default()
791                        })
792                        .await
793                    {
794                        error!("Failed to update daemon state for {id}: {e}");
795                    }
796                }
797            } else if let Err(e) = SUPERVISOR
798                .upsert_daemon(UpsertDaemonOpts {
799                    id: id.clone(),
800                    pid: None,
801                    status: DaemonStatus::Errored(None),
802                    last_exit_success: Some(false),
803                    ..Default::default()
804                })
805                .await
806            {
807                error!("Failed to update daemon state for {id}: {e}");
808            }
809        });
810
811        // If wait_ready is true, wait for readiness notification
812        if let Some(ready_rx) = ready_rx {
813            match ready_rx.await {
814                Ok(Ok(())) => {
815                    info!("daemon {id} is ready");
816                    Ok(IpcResponse::DaemonReady { daemon })
817                }
818                Ok(Err(exit_code)) => {
819                    error!("daemon {id} failed before becoming ready");
820                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
821                }
822                Err(_) => {
823                    error!("readiness channel closed unexpectedly for daemon {id}");
824                    Ok(IpcResponse::DaemonStart { daemon })
825                }
826            }
827        } else {
828            Ok(IpcResponse::DaemonStart { daemon })
829        }
830    }
831
832    pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
833        if id == "pitchfork" {
834            return Ok(IpcResponse::Error(
835                "Cannot stop supervisor via stop command".into(),
836            ));
837        }
838        info!("stopping daemon: {id}");
839        if let Some(daemon) = self.get_daemon(id).await {
840            trace!("daemon to stop: {daemon}");
841            if let Some(pid) = daemon.pid {
842                trace!("killing pid: {pid}");
843                PROCS.refresh_processes();
844                if PROCS.is_running(pid) {
845                    // First set status to Stopping (keeps PID for monitoring task)
846                    self.upsert_daemon(UpsertDaemonOpts {
847                        id: id.to_string(),
848                        status: DaemonStatus::Stopping,
849                        ..Default::default()
850                    })
851                    .await?;
852
853                    // Then kill the process
854                    if let Err(e) = PROCS.kill_async(pid).await {
855                        warn!("failed to kill pid {pid}: {e}");
856                    }
857                    PROCS.refresh_processes();
858                    for child_pid in PROCS.all_children(pid) {
859                        debug!("killing child pid: {child_pid}");
860                        if let Err(e) = PROCS.kill_async(child_pid).await {
861                            warn!("failed to kill child pid {child_pid}: {e}");
862                        }
863                    }
864                    // Monitoring task will clear PID and set to Stopped when it detects exit
865                } else {
866                    debug!("pid {pid} not running");
867                    // Process already dead, directly mark as stopped
868                    self.upsert_daemon(UpsertDaemonOpts {
869                        id: id.to_string(),
870                        pid: None,
871                        status: DaemonStatus::Stopped,
872                        ..Default::default()
873                    })
874                    .await?;
875                }
876                return Ok(IpcResponse::Ok);
877            } else {
878                debug!("daemon {id} not running");
879            }
880        } else {
881            debug!("daemon {id} not found");
882        }
883        Ok(IpcResponse::DaemonAlreadyStopped)
884    }
885
886    #[cfg(unix)]
887    fn signals(&self) -> Result<()> {
888        let signals = [
889            SignalKind::terminate(),
890            SignalKind::alarm(),
891            SignalKind::interrupt(),
892            SignalKind::quit(),
893            SignalKind::hangup(),
894            SignalKind::user_defined1(),
895            SignalKind::user_defined2(),
896        ];
897        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
898        for signal in signals {
899            let stream = match signal::unix::signal(signal) {
900                Ok(s) => s,
901                Err(e) => {
902                    warn!("Failed to register signal handler for {:?}: {}", signal, e);
903                    continue;
904                }
905            };
906            tokio::spawn(async move {
907                let mut stream = stream;
908                loop {
909                    stream.recv().await;
910                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
911                        exit(1);
912                    } else {
913                        SUPERVISOR.handle_signal().await;
914                    }
915                }
916            });
917        }
918        Ok(())
919    }
920
921    #[cfg(windows)]
922    fn signals(&self) -> Result<()> {
923        tokio::spawn(async move {
924            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
925            loop {
926                if let Err(e) = signal::ctrl_c().await {
927                    error!("Failed to wait for ctrl-c: {}", e);
928                    return;
929                }
930                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
931                    exit(1);
932                } else {
933                    SUPERVISOR.handle_signal().await;
934                }
935            }
936        });
937        Ok(())
938    }
939
940    async fn handle_signal(&self) {
941        info!("received signal, stopping");
942        self.close().await;
943        exit(0)
944    }
945
946    // async fn file_watch(&self) -> Result<()> {
947    //     let state_file = self.state_file.lock().await.path.clone();
948    //     task::spawn(async move {
949    //         let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap();
950    //
951    //         wf.watch(&state_file, RecursiveMode::NonRecursive).unwrap();
952    //
953    //         while let Some(paths) = wf.rx.recv().await {
954    //             if let Err(err) = SUPERVISOR.handle_file_change(paths).await {
955    //                 error!("failed to handle file change: {err}");
956    //             }
957    //         }
958    //     });
959    //
960    //     Ok(())
961    // }
962
963    // async fn handle_file_change(&self, paths: Vec<PathBuf>) -> Result<()> {
964    //     debug!("file change: {:?}", paths);
965    //     // let path = self.state_file.lock().await.path.clone();
966    //     // if paths.contains(&path) {
967    //     //     *self.state_file.lock().await = StateFile::read(&path)?;
968    //     // }
969    //     self.refresh().await
970    // }
971
972    fn interval_watch(&self) -> Result<()> {
973        tokio::spawn(async move {
974            let mut interval = time::interval(interval_duration());
975            loop {
976                interval.tick().await;
977                if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
978                    && let Err(err) = SUPERVISOR.refresh().await
979                {
980                    error!("failed to refresh: {err}");
981                }
982            }
983        });
984        Ok(())
985    }
986
987    fn cron_watch(&self) -> Result<()> {
988        tokio::spawn(async move {
989            // Check every minute for cron schedules
990            // FIXME: need a better logic, what if the schedule gap is very short (30s, 1min)?
991            let mut interval = time::interval(Duration::from_secs(60));
992            loop {
993                interval.tick().await;
994                if let Err(err) = SUPERVISOR.check_cron_schedules().await {
995                    error!("failed to check cron schedules: {err}");
996                }
997            }
998        });
999        Ok(())
1000    }
1001
1002    async fn check_cron_schedules(&self) -> Result<()> {
1003        use cron::Schedule;
1004        use std::str::FromStr;
1005
1006        let now = chrono::Local::now();
1007        let daemons = self.state_file.lock().await.daemons.clone();
1008
1009        for (id, daemon) in daemons {
1010            if let Some(schedule_str) = &daemon.cron_schedule
1011                && let Some(retrigger) = daemon.cron_retrigger
1012            {
1013                // Parse the cron schedule
1014                let schedule = match Schedule::from_str(schedule_str) {
1015                    Ok(s) => s,
1016                    Err(e) => {
1017                        warn!("invalid cron schedule for daemon {id}: {e}");
1018                        continue;
1019                    }
1020                };
1021
1022                // Check if we should trigger now
1023                let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
1024                    // If the next execution is within the next minute, trigger it
1025                    let diff = next.signed_duration_since(now);
1026                    diff.num_seconds() < 60 && diff.num_seconds() >= 0
1027                });
1028
1029                if should_trigger {
1030                    let should_run = match retrigger {
1031                        crate::pitchfork_toml::CronRetrigger::Finish => {
1032                            // Run if not currently running
1033                            daemon.pid.is_none()
1034                        }
1035                        crate::pitchfork_toml::CronRetrigger::Always => {
1036                            // Always run (force restart handled in run method)
1037                            true
1038                        }
1039                        crate::pitchfork_toml::CronRetrigger::Success => {
1040                            // Run only if previous command succeeded
1041                            daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1042                        }
1043                        crate::pitchfork_toml::CronRetrigger::Fail => {
1044                            // Run only if previous command failed
1045                            daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1046                        }
1047                    };
1048
1049                    if should_run {
1050                        info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1051                        // Get the run command from pitchfork.toml
1052                        if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1053                            let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1054                            // Use force: true for Always retrigger to ensure restart
1055                            let force =
1056                                matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1057                            let opts = RunOptions {
1058                                id: id.clone(),
1059                                cmd: shell_words::split(&run_cmd).unwrap_or_default(),
1060                                force,
1061                                shell_pid: None,
1062                                dir,
1063                                autostop: daemon.autostop,
1064                                cron_schedule: Some(schedule_str.clone()),
1065                                cron_retrigger: Some(retrigger),
1066                                retry: daemon.retry,
1067                                retry_count: daemon.retry_count,
1068                                ready_delay: daemon.ready_delay,
1069                                ready_output: daemon.ready_output.clone(),
1070                                ready_http: daemon.ready_http.clone(),
1071                                ready_port: daemon.ready_port,
1072                                wait_ready: false,
1073                            };
1074                            if let Err(e) = self.run(opts).await {
1075                                error!("failed to run cron daemon {id}: {e}");
1076                            }
1077                        } else {
1078                            warn!("no run command found for cron daemon {id}");
1079                        }
1080                    }
1081                }
1082            }
1083        }
1084
1085        Ok(())
1086    }
1087
1088    fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1089        use crate::pitchfork_toml::PitchforkToml;
1090        let pt = PitchforkToml::all_merged();
1091        pt.daemons.get(id).map(|d| d.run.clone())
1092    }
1093
1094    async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1095        loop {
1096            let (msg, send) = match ipc.read().await {
1097                Ok(msg) => msg,
1098                Err(e) => {
1099                    error!("failed to accept connection: {:?}", e);
1100                    continue;
1101                }
1102            };
1103            debug!("received message: {:?}", msg);
1104            tokio::spawn(async move {
1105                let rsp = SUPERVISOR
1106                    .handle_ipc(msg)
1107                    .await
1108                    .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1109                if let Err(err) = send.send(rsp).await {
1110                    debug!("failed to send message: {:?}", err);
1111                }
1112            });
1113        }
1114    }
1115
1116    async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1117        let rsp = match req {
1118            IpcRequest::Connect => {
1119                debug!("received connect message");
1120                IpcResponse::Ok
1121            }
1122            IpcRequest::Stop { id } => self.stop(&id).await?,
1123            IpcRequest::Run(opts) => self.run(opts).await?,
1124            IpcRequest::Enable { id } => {
1125                if self.enable(id).await? {
1126                    IpcResponse::Yes
1127                } else {
1128                    IpcResponse::No
1129                }
1130            }
1131            IpcRequest::Disable { id } => {
1132                if self.disable(id).await? {
1133                    IpcResponse::Yes
1134                } else {
1135                    IpcResponse::No
1136                }
1137            }
1138            IpcRequest::GetActiveDaemons => {
1139                let daemons = self.active_daemons().await;
1140                IpcResponse::ActiveDaemons(daemons)
1141            }
1142            IpcRequest::GetNotifications => {
1143                let notifications = self.get_notifications().await;
1144                IpcResponse::Notifications(notifications)
1145            }
1146            IpcRequest::UpdateShellDir { shell_pid, dir } => {
1147                let prev = self.get_shell_dir(shell_pid).await;
1148                self.set_shell_dir(shell_pid, dir.clone()).await?;
1149                // Cancel any pending autostops for daemons in the new directory
1150                self.cancel_pending_autostops_for_dir(&dir).await;
1151                if let Some(prev) = prev {
1152                    self.leave_dir(&prev).await?;
1153                }
1154                self.refresh().await?;
1155                IpcResponse::Ok
1156            }
1157            IpcRequest::Clean => {
1158                self.clean().await?;
1159                IpcResponse::Ok
1160            }
1161            IpcRequest::GetDisabledDaemons => {
1162                let disabled = self.state_file.lock().await.disabled.clone();
1163                IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1164            }
1165        };
1166        Ok(rsp)
1167    }
1168
1169    async fn close(&self) {
1170        for daemon in self.active_daemons().await {
1171            if daemon.id == "pitchfork" {
1172                continue;
1173            }
1174            if let Err(err) = self.stop(&daemon.id).await {
1175                error!("failed to stop daemon {daemon}: {err}");
1176            }
1177        }
1178        let _ = self.remove_daemon("pitchfork").await;
1179        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1180        // TODO: cleanly stop ipc server
1181    }
1182
1183    async fn add_notification(&self, level: log::LevelFilter, message: String) {
1184        self.pending_notifications
1185            .lock()
1186            .await
1187            .push((level, message));
1188    }
1189
1190    async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1191        self.pending_notifications.lock().await.drain(..).collect()
1192    }
1193
1194    async fn active_daemons(&self) -> Vec<Daemon> {
1195        self.state_file
1196            .lock()
1197            .await
1198            .daemons
1199            .values()
1200            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1201            .cloned()
1202            .collect()
1203    }
1204
1205    async fn remove_daemon(&self, id: &str) -> Result<()> {
1206        self.state_file.lock().await.daemons.remove(id);
1207        if let Err(err) = self.state_file.lock().await.write() {
1208            warn!("failed to update state file: {err:#}");
1209        }
1210        Ok(())
1211    }
1212
1213    async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1214        info!(
1215            "upserting daemon: {} pid: {} status: {}",
1216            opts.id,
1217            opts.pid.unwrap_or(0),
1218            opts.status
1219        );
1220        let mut state_file = self.state_file.lock().await;
1221        let existing = state_file.daemons.get(&opts.id);
1222        let daemon = Daemon {
1223            id: opts.id.to_string(),
1224            title: opts.pid.and_then(|pid| PROCS.title(pid)),
1225            pid: opts.pid,
1226            status: opts.status,
1227            shell_pid: opts.shell_pid,
1228            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1229            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1230            cron_schedule: opts
1231                .cron_schedule
1232                .or(existing.and_then(|d| d.cron_schedule.clone())),
1233            cron_retrigger: opts
1234                .cron_retrigger
1235                .or(existing.and_then(|d| d.cron_retrigger)),
1236            last_exit_success: opts
1237                .last_exit_success
1238                .or(existing.and_then(|d| d.last_exit_success)),
1239            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1240            retry_count: opts
1241                .retry_count
1242                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1243            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1244            ready_output: opts
1245                .ready_output
1246                .or(existing.and_then(|d| d.ready_output.clone())),
1247            ready_http: opts
1248                .ready_http
1249                .or(existing.and_then(|d| d.ready_http.clone())),
1250            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1251        };
1252        state_file
1253            .daemons
1254            .insert(opts.id.to_string(), daemon.clone());
1255        if let Err(err) = state_file.write() {
1256            warn!("failed to update state file: {err:#}");
1257        }
1258        Ok(daemon)
1259    }
1260
1261    pub async fn enable(&self, id: String) -> Result<bool> {
1262        info!("enabling daemon: {id}");
1263        let mut state_file = self.state_file.lock().await;
1264        let result = state_file.disabled.remove(&id);
1265        state_file.write()?;
1266        Ok(result)
1267    }
1268
1269    pub async fn disable(&self, id: String) -> Result<bool> {
1270        info!("disabling daemon: {id}");
1271        let mut state_file = self.state_file.lock().await;
1272        let result = state_file.disabled.insert(id);
1273        state_file.write()?;
1274        Ok(result)
1275    }
1276
1277    async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1278        self.state_file.lock().await.daemons.get(id).cloned()
1279    }
1280
1281    async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1282        let mut state_file = self.state_file.lock().await;
1283        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1284        state_file.write()?;
1285        Ok(())
1286    }
1287
1288    async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1289        self.state_file
1290            .lock()
1291            .await
1292            .shell_dirs
1293            .get(&shell_pid.to_string())
1294            .cloned()
1295    }
1296
1297    async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1298        let mut state_file = self.state_file.lock().await;
1299        if state_file
1300            .shell_dirs
1301            .remove(&shell_pid.to_string())
1302            .is_some()
1303        {
1304            state_file.write()?;
1305        }
1306        Ok(())
1307    }
1308
1309    async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1310        self.state_file.lock().await.shell_dirs.iter().fold(
1311            HashMap::new(),
1312            |mut acc, (pid, dir)| {
1313                if let Ok(pid) = pid.parse() {
1314                    acc.entry(dir.clone()).or_default().push(pid);
1315                }
1316                acc
1317            },
1318        )
1319    }
1320
1321    async fn clean(&self) -> Result<()> {
1322        let mut state_file = self.state_file.lock().await;
1323        state_file.daemons.retain(|_id, d| d.pid.is_some());
1324        state_file.write()?;
1325        Ok(())
1326    }
1327}
1328
1329#[derive(Debug)]
1330struct UpsertDaemonOpts {
1331    id: String,
1332    pid: Option<u32>,
1333    status: DaemonStatus,
1334    shell_pid: Option<u32>,
1335    dir: Option<PathBuf>,
1336    autostop: bool,
1337    cron_schedule: Option<String>,
1338    cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1339    last_exit_success: Option<bool>,
1340    retry: Option<u32>,
1341    retry_count: Option<u32>,
1342    ready_delay: Option<u64>,
1343    ready_output: Option<String>,
1344    ready_http: Option<String>,
1345    ready_port: Option<u16>,
1346}
1347
1348impl Default for UpsertDaemonOpts {
1349    fn default() -> Self {
1350        Self {
1351            id: "".to_string(),
1352            pid: None,
1353            status: DaemonStatus::Stopped,
1354            shell_pid: None,
1355            dir: None,
1356            autostop: false,
1357            cron_schedule: None,
1358            cron_retrigger: None,
1359            last_exit_success: None,
1360            retry: None,
1361            retry_count: None,
1362            ready_delay: None,
1363            ready_output: None,
1364            ready_http: None,
1365            ready_port: None,
1366        }
1367    }
1368}