pitchfork_cli/
supervisor.rs

1use crate::daemon::{Daemon, RunOptions, validate_daemon_id};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::{IpcServer, IpcServerHandle};
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::pitchfork_toml::PitchforkToml;
6use crate::procs::PROCS;
7use crate::state_file::StateFile;
8use crate::watch_files::{WatchFiles, expand_watch_patterns, path_matches_patterns};
9use crate::{Result, env};
10use duct::cmd;
11use itertools::Itertools;
12use log::LevelFilter::Info;
13use miette::IntoDiagnostic;
14use notify::RecursiveMode;
15use once_cell::sync::Lazy;
16use regex::Regex;
17use std::collections::HashMap;
18use std::fs;
19use std::iter::once;
20use std::path::{Path, PathBuf};
21use std::process::exit;
22use std::sync::atomic;
23use std::sync::atomic::AtomicBool;
24use std::time::Duration;
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
26#[cfg(unix)]
27use tokio::signal::unix::SignalKind;
28use tokio::sync::Mutex;
29use tokio::sync::oneshot;
30use tokio::{select, signal, time};
31
32/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
33static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36/// Get or compile a regex pattern, caching the result for future use
37fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
38    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
39    if let Some(re) = cache.get(pattern) {
40        return Some(re.clone());
41    }
42    match Regex::new(pattern) {
43        Ok(re) => {
44            cache.insert(pattern.to_string(), re.clone());
45            Some(re)
46        }
47        Err(e) => {
48            error!("invalid regex pattern '{}': {}", pattern, e);
49            None
50        }
51    }
52}
53
54pub struct Supervisor {
55    state_file: Mutex<StateFile>,
56    pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
57    last_refreshed_at: Mutex<time::Instant>,
58    /// Map of daemon ID to scheduled autostop time
59    pending_autostops: Mutex<HashMap<String, time::Instant>>,
60    /// Handle for graceful IPC server shutdown
61    ipc_shutdown: Mutex<Option<IpcServerHandle>>,
62}
63
64fn interval_duration() -> Duration {
65    Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
66}
67
68pub static SUPERVISOR: Lazy<Supervisor> =
69    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
70
71pub fn start_if_not_running() -> Result<()> {
72    let sf = StateFile::get();
73    if let Some(d) = sf.daemons.get("pitchfork")
74        && let Some(pid) = d.pid
75        && PROCS.is_running(pid)
76    {
77        return Ok(());
78    }
79    start_in_background()
80}
81
82pub fn start_in_background() -> Result<()> {
83    debug!("starting supervisor in background");
84    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
85        .stdout_null()
86        .stderr_null()
87        .start()
88        .into_diagnostic()?;
89    Ok(())
90}
91
92impl Supervisor {
93    pub fn new() -> Result<Self> {
94        Ok(Self {
95            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
96            last_refreshed_at: Mutex::new(time::Instant::now()),
97            pending_notifications: Mutex::new(vec![]),
98            pending_autostops: Mutex::new(HashMap::new()),
99            ipc_shutdown: Mutex::new(None),
100        })
101    }
102
103    pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
104        let pid = std::process::id();
105        info!("Starting supervisor with pid {pid}");
106
107        self.upsert_daemon(UpsertDaemonOpts {
108            id: "pitchfork".to_string(),
109            pid: Some(pid),
110            status: DaemonStatus::Running,
111            ..Default::default()
112        })
113        .await?;
114
115        // If this is a boot start, automatically start boot_start daemons
116        if is_boot {
117            info!("Boot start mode enabled, starting boot_start daemons");
118            self.start_boot_daemons().await?;
119        }
120
121        self.interval_watch()?;
122        self.cron_watch()?;
123        self.signals()?;
124        self.daemon_file_watch()?;
125
126        // Start web server if port is configured
127        if let Some(port) = web_port {
128            tokio::spawn(async move {
129                if let Err(e) = crate::web::serve(port).await {
130                    error!("Web server error: {}", e);
131                }
132            });
133        }
134
135        let (ipc, ipc_handle) = IpcServer::new()?;
136        *self.ipc_shutdown.lock().await = Some(ipc_handle);
137        self.conn_watch(ipc).await
138    }
139
140    async fn refresh(&self) -> Result<()> {
141        trace!("refreshing");
142
143        // Collect PIDs we need to check (shell PIDs only)
144        // This is more efficient than refreshing all processes on the system
145        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
146        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
147
148        if pids_to_check.is_empty() {
149            // No PIDs to check, skip the expensive refresh
150            trace!("no shell PIDs to check, skipping process refresh");
151        } else {
152            PROCS.refresh_pids(&pids_to_check);
153        }
154
155        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
156        *last_refreshed_at = time::Instant::now();
157
158        for (dir, pids) in dirs_with_pids {
159            let to_remove = pids
160                .iter()
161                .filter(|pid| !PROCS.is_running(**pid))
162                .collect_vec();
163            for pid in &to_remove {
164                self.remove_shell_pid(**pid).await?
165            }
166            if to_remove.len() == pids.len() {
167                self.leave_dir(&dir).await?;
168            }
169        }
170
171        self.check_retry().await?;
172        self.process_pending_autostops().await?;
173
174        Ok(())
175    }
176
177    async fn check_retry(&self) -> Result<()> {
178        // Collect only IDs of daemons that need retrying (avoids cloning entire Daemon structs)
179        let ids_to_retry: Vec<String> = {
180            let state_file = self.state_file.lock().await;
181            state_file
182                .daemons
183                .iter()
184                .filter(|(_id, d)| {
185                    // Daemon is errored, not currently running, and has retries remaining
186                    d.status.is_errored()
187                        && d.pid.is_none()
188                        && d.retry > 0
189                        && d.retry_count < d.retry
190                })
191                .map(|(id, _d)| id.clone())
192                .collect()
193        };
194
195        for id in ids_to_retry {
196            // Look up daemon when needed and re-verify retry criteria
197            // (state may have changed since we collected IDs)
198            let daemon = {
199                let state_file = self.state_file.lock().await;
200                match state_file.daemons.get(&id) {
201                    Some(d)
202                        if d.status.is_errored()
203                            && d.pid.is_none()
204                            && d.retry > 0
205                            && d.retry_count < d.retry =>
206                    {
207                        d.clone()
208                    }
209                    _ => continue, // Daemon was removed or no longer needs retry
210                }
211            };
212            info!(
213                "retrying daemon {} ({}/{} attempts)",
214                id,
215                daemon.retry_count + 1,
216                daemon.retry
217            );
218
219            // Get command from pitchfork.toml
220            if let Some(run_cmd) = self.get_daemon_run_command(&id) {
221                let cmd = match shell_words::split(&run_cmd) {
222                    Ok(cmd) => cmd,
223                    Err(e) => {
224                        error!("failed to parse command for daemon {}: {}", id, e);
225                        // Mark as exhausted to prevent infinite retry loop, preserving error status
226                        self.upsert_daemon(UpsertDaemonOpts {
227                            id,
228                            status: daemon.status.clone(),
229                            retry_count: Some(daemon.retry),
230                            ..Default::default()
231                        })
232                        .await?;
233                        continue;
234                    }
235                };
236                let retry_opts = RunOptions {
237                    id: id.clone(),
238                    cmd,
239                    force: false,
240                    shell_pid: daemon.shell_pid,
241                    dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
242                    autostop: daemon.autostop,
243                    cron_schedule: daemon.cron_schedule,
244                    cron_retrigger: daemon.cron_retrigger,
245                    retry: daemon.retry,
246                    retry_count: daemon.retry_count + 1,
247                    ready_delay: daemon.ready_delay,
248                    ready_output: daemon.ready_output.clone(),
249                    ready_http: daemon.ready_http.clone(),
250                    ready_port: daemon.ready_port,
251                    wait_ready: false,
252                    depends: daemon.depends.clone(),
253                };
254                if let Err(e) = self.run(retry_opts).await {
255                    error!("failed to retry daemon {}: {}", id, e);
256                }
257            } else {
258                warn!("no run command found for daemon {}, cannot retry", id);
259                // Mark as exhausted
260                self.upsert_daemon(UpsertDaemonOpts {
261                    id,
262                    retry_count: Some(daemon.retry),
263                    ..Default::default()
264                })
265                .await?;
266            }
267        }
268
269        Ok(())
270    }
271
272    async fn leave_dir(&self, dir: &Path) -> Result<()> {
273        debug!("left dir {}", dir.display());
274        let shell_dirs = self.get_dirs_with_shell_pids().await;
275        let shell_dirs = shell_dirs.keys().collect_vec();
276        let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
277
278        for daemon in self.active_daemons().await {
279            if !daemon.autostop {
280                continue;
281            }
282            // if this daemon's dir starts with the left dir
283            // and no other shell pid has this dir as a prefix
284            // schedule the daemon for autostop
285            if let Some(daemon_dir) = daemon.dir.as_ref()
286                && daemon_dir.starts_with(dir)
287                && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
288            {
289                if delay_secs == 0 {
290                    // No delay configured, stop immediately
291                    info!("autostopping {daemon}");
292                    self.stop(&daemon.id).await?;
293                    self.add_notification(Info, format!("autostopped {daemon}"))
294                        .await;
295                } else {
296                    // Schedule autostop with delay
297                    let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
298                    let mut pending = self.pending_autostops.lock().await;
299                    if !pending.contains_key(&daemon.id) {
300                        info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
301                        pending.insert(daemon.id.clone(), stop_at);
302                    }
303                }
304            }
305        }
306        Ok(())
307    }
308
309    /// Cancel any pending autostop for daemons in the given directory
310    /// Also cancels autostops for daemons in parent directories (e.g., entering /project/subdir
311    /// cancels pending autostop for daemon in /project)
312    async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
313        let mut pending = self.pending_autostops.lock().await;
314        let daemons_to_cancel: Vec<String> = {
315            let state_file = self.state_file.lock().await;
316            state_file
317                .daemons
318                .iter()
319                .filter(|(_id, d)| {
320                    d.dir.as_ref().is_some_and(|daemon_dir| {
321                        // Cancel if entering a directory inside or equal to daemon's directory
322                        // OR if daemon is in a subdirectory of the entered directory
323                        dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
324                    })
325                })
326                .map(|(id, _)| id.clone())
327                .collect()
328        };
329
330        for daemon_id in daemons_to_cancel {
331            if pending.remove(&daemon_id).is_some() {
332                info!("cancelled pending autostop for {}", daemon_id);
333            }
334        }
335    }
336
337    /// Process any pending autostops that have reached their scheduled time
338    async fn process_pending_autostops(&self) -> Result<()> {
339        let now = time::Instant::now();
340        let to_stop: Vec<String> = {
341            let pending = self.pending_autostops.lock().await;
342            pending
343                .iter()
344                .filter(|(_, stop_at)| now >= **stop_at)
345                .map(|(id, _)| id.clone())
346                .collect()
347        };
348
349        for daemon_id in to_stop {
350            // Remove from pending first
351            {
352                let mut pending = self.pending_autostops.lock().await;
353                pending.remove(&daemon_id);
354            }
355
356            // Check if daemon is still running and should be stopped
357            if let Some(daemon) = self.get_daemon(&daemon_id).await
358                && daemon.autostop
359                && daemon.status.is_running()
360            {
361                // Verify no shell is in the daemon's directory
362                let shell_dirs = self.get_dirs_with_shell_pids().await;
363                let shell_dirs = shell_dirs.keys().collect_vec();
364                if let Some(daemon_dir) = daemon.dir.as_ref()
365                    && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
366                {
367                    info!("autostopping {} (after delay)", daemon_id);
368                    self.stop(&daemon_id).await?;
369                    self.add_notification(Info, format!("autostopped {daemon_id}"))
370                        .await;
371                }
372            }
373        }
374        Ok(())
375    }
376
377    async fn start_boot_daemons(&self) -> Result<()> {
378        use crate::pitchfork_toml::PitchforkToml;
379
380        info!("Scanning for boot_start daemons");
381        let pt = PitchforkToml::all_merged();
382
383        let boot_daemons: Vec<_> = pt
384            .daemons
385            .iter()
386            .filter(|(_id, d)| d.boot_start.unwrap_or(false))
387            .collect();
388
389        if boot_daemons.is_empty() {
390            info!("No daemons configured with boot_start = true");
391            return Ok(());
392        }
393
394        info!("Found {} daemon(s) to start at boot", boot_daemons.len());
395
396        for (id, daemon) in boot_daemons {
397            info!("Starting boot daemon: {}", id);
398
399            let dir = daemon
400                .path
401                .as_ref()
402                .and_then(|p| p.parent())
403                .map(|p| p.to_path_buf())
404                .unwrap_or_else(|| env::CWD.clone());
405
406            let cmd = match shell_words::split(&daemon.run) {
407                Ok(cmd) => cmd,
408                Err(e) => {
409                    error!("failed to parse command for boot daemon {}: {}", id, e);
410                    continue;
411                }
412            };
413            let run_opts = RunOptions {
414                id: id.clone(),
415                cmd,
416                force: false,
417                shell_pid: None,
418                dir,
419                autostop: false, // Boot daemons should not autostop
420                cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
421                cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
422                retry: daemon.retry.count(),
423                retry_count: 0,
424                ready_delay: daemon.ready_delay,
425                ready_output: daemon.ready_output.clone(),
426                ready_http: daemon.ready_http.clone(),
427                ready_port: daemon.ready_port,
428                wait_ready: false, // Don't block on boot daemons
429                depends: daemon.depends.clone(),
430            };
431
432            match self.run(run_opts).await {
433                Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
434                    info!("Successfully started boot daemon: {}", id);
435                }
436                Ok(IpcResponse::DaemonAlreadyRunning) => {
437                    info!("Boot daemon already running: {}", id);
438                }
439                Ok(other) => {
440                    warn!(
441                        "Unexpected response when starting boot daemon {}: {:?}",
442                        id, other
443                    );
444                }
445                Err(e) => {
446                    error!("Failed to start boot daemon {}: {}", id, e);
447                }
448            }
449        }
450
451        Ok(())
452    }
453
454    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
455        let id = &opts.id;
456        let cmd = opts.cmd.clone();
457
458        // Clear any pending autostop for this daemon since it's being started
459        {
460            let mut pending = self.pending_autostops.lock().await;
461            if pending.remove(id).is_some() {
462                info!("cleared pending autostop for {} (daemon starting)", id);
463            }
464        }
465
466        let daemon = self.get_daemon(id).await;
467        if let Some(daemon) = daemon {
468            // Stopping state is treated as "not running" - the monitoring task will clean it up
469            // Only check for Running state with a valid PID
470            if !daemon.status.is_stopping()
471                && !daemon.status.is_stopped()
472                && let Some(pid) = daemon.pid
473            {
474                if opts.force {
475                    self.stop(id).await?;
476                    info!("run: stop completed for daemon {id}");
477                } else {
478                    warn!("daemon {id} already running with pid {pid}");
479                    return Ok(IpcResponse::DaemonAlreadyRunning);
480                }
481            }
482        }
483
484        // If wait_ready is true and retry is configured, implement retry loop
485        if opts.wait_ready && opts.retry > 0 {
486            // Use saturating_add to avoid overflow when retry = u32::MAX (infinite)
487            let max_attempts = opts.retry.saturating_add(1);
488            for attempt in 0..max_attempts {
489                let mut retry_opts = opts.clone();
490                retry_opts.retry_count = attempt;
491                retry_opts.cmd = cmd.clone();
492
493                let result = self.run_once(retry_opts).await?;
494
495                match result {
496                    IpcResponse::DaemonReady { daemon } => {
497                        return Ok(IpcResponse::DaemonReady { daemon });
498                    }
499                    IpcResponse::DaemonFailedWithCode { exit_code } => {
500                        if attempt < opts.retry {
501                            let backoff_secs = 2u64.pow(attempt);
502                            info!(
503                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
504                                attempt + 1,
505                                max_attempts,
506                                backoff_secs
507                            );
508                            time::sleep(Duration::from_secs(backoff_secs)).await;
509                            continue;
510                        } else {
511                            info!("daemon {id} failed after {} attempts", max_attempts);
512                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
513                        }
514                    }
515                    other => return Ok(other),
516                }
517            }
518        }
519
520        // No retry or wait_ready is false
521        self.run_once(opts).await
522    }
523
524    async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
525        let id = &opts.id;
526        let cmd = opts.cmd;
527
528        // Create channel for readiness notification if wait_ready is true
529        let (ready_tx, ready_rx) = if opts.wait_ready {
530            let (tx, rx) = oneshot::channel();
531            (Some(tx), Some(rx))
532        } else {
533            (None, None)
534        };
535
536        let cmd = once("exec".to_string())
537            .chain(cmd.into_iter())
538            .collect_vec();
539        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
540        let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
541        if let Some(parent) = log_path.parent() {
542            xx::file::mkdirp(parent)?;
543        }
544        info!("run: spawning daemon {id} with args: {args:?}");
545        let mut cmd = tokio::process::Command::new("sh");
546        cmd.args(&args)
547            .stdin(std::process::Stdio::null())
548            .stdout(std::process::Stdio::piped())
549            .stderr(std::process::Stdio::piped())
550            .current_dir(&opts.dir);
551
552        // Ensure daemon can find user tools by using the original PATH
553        if let Some(ref path) = *env::ORIGINAL_PATH {
554            cmd.env("PATH", path);
555        }
556
557        let mut child = cmd.spawn().into_diagnostic()?;
558        let pid = match child.id() {
559            Some(p) => p,
560            None => {
561                warn!("Daemon {id} exited before PID could be captured");
562                return Ok(IpcResponse::DaemonFailed {
563                    error: "Process exited immediately".to_string(),
564                });
565            }
566        };
567        info!("started daemon {id} with pid {pid}");
568        let daemon = self
569            .upsert_daemon(UpsertDaemonOpts {
570                id: id.to_string(),
571                pid: Some(pid),
572                status: DaemonStatus::Running,
573                shell_pid: opts.shell_pid,
574                dir: Some(opts.dir.clone()),
575                autostop: opts.autostop,
576                cron_schedule: opts.cron_schedule.clone(),
577                cron_retrigger: opts.cron_retrigger,
578                last_exit_success: None,
579                retry: Some(opts.retry),
580                retry_count: Some(opts.retry_count),
581                ready_delay: opts.ready_delay,
582                ready_output: opts.ready_output.clone(),
583                ready_http: opts.ready_http.clone(),
584                ready_port: opts.ready_port,
585                depends: Some(opts.depends.clone()),
586            })
587            .await?;
588
589        let id_clone = id.to_string();
590        let ready_delay = opts.ready_delay;
591        let ready_output = opts.ready_output.clone();
592        let ready_http = opts.ready_http.clone();
593        let ready_port = opts.ready_port;
594
595        tokio::spawn(async move {
596            let id = id_clone;
597            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
598                (Some(out), Some(err)) => (out, err),
599                _ => {
600                    error!("Failed to capture stdout/stderr for daemon {id}");
601                    return;
602                }
603            };
604            let mut stdout = tokio::io::BufReader::new(stdout).lines();
605            let mut stderr = tokio::io::BufReader::new(stderr).lines();
606            let log_file = match tokio::fs::File::options()
607                .append(true)
608                .create(true)
609                .open(&log_path)
610                .await
611            {
612                Ok(f) => f,
613                Err(e) => {
614                    error!("Failed to open log file for daemon {id}: {e}");
615                    return;
616                }
617            };
618            let mut log_appender = BufWriter::new(log_file);
619
620            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
621            let format_line = |line: String| {
622                if line.starts_with(&format!("{id} ")) {
623                    // mise tasks often already have the id printed
624                    format!("{} {line}\n", now())
625                } else {
626                    format!("{} {id} {line}\n", now())
627                }
628            };
629
630            // Setup readiness checking
631            let mut ready_notified = false;
632            let mut ready_tx = ready_tx;
633            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
634
635            let mut delay_timer =
636                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
637
638            // Setup HTTP readiness check interval (poll every 500ms)
639            let mut http_check_interval = ready_http
640                .as_ref()
641                .map(|_| tokio::time::interval(Duration::from_millis(500)));
642            let http_client = ready_http.as_ref().map(|_| {
643                reqwest::Client::builder()
644                    .timeout(Duration::from_secs(5))
645                    .build()
646                    .unwrap_or_default()
647            });
648
649            // Setup TCP port readiness check interval (poll every 500ms)
650            let mut port_check_interval =
651                ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
652
653            // Setup periodic log flush interval (every 500ms - balances I/O reduction with responsiveness)
654            let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
655
656            // Use a channel to communicate process exit status
657            let (exit_tx, mut exit_rx) =
658                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
659
660            // Spawn a task to wait for process exit
661            let child_pid = child.id().unwrap_or(0);
662            tokio::spawn(async move {
663                let result = child.wait().await;
664                debug!(
665                    "daemon pid {child_pid} wait() completed with result: {:?}",
666                    result
667                );
668                let _ = exit_tx.send(result).await;
669            });
670
671            #[allow(unused_assignments)]
672            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
673            let mut exit_status = None;
674
675            loop {
676                select! {
677                    Ok(Some(line)) = stdout.next_line() => {
678                        let formatted = format_line(line.clone());
679                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
680                            error!("Failed to write to log for daemon {id}: {e}");
681                        }
682                        trace!("stdout: {id} {formatted}");
683
684                        // Check if output matches ready pattern
685                        if !ready_notified
686                            && let Some(ref pattern) = ready_pattern
687                                && pattern.is_match(&line) {
688                                    info!("daemon {id} ready: output matched pattern");
689                                    ready_notified = true;
690                                    // Flush logs before notifying so clients see logs immediately
691                                    let _ = log_appender.flush().await;
692                                    if let Some(tx) = ready_tx.take() {
693                                        let _ = tx.send(Ok(()));
694                                    }
695                                }
696                    }
697                    Ok(Some(line)) = stderr.next_line() => {
698                        let formatted = format_line(line.clone());
699                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
700                            error!("Failed to write to log for daemon {id}: {e}");
701                        }
702                        trace!("stderr: {id} {formatted}");
703
704                        // Check if output matches ready pattern (also check stderr)
705                        if !ready_notified
706                            && let Some(ref pattern) = ready_pattern
707                                && pattern.is_match(&line) {
708                                    info!("daemon {id} ready: output matched pattern");
709                                    ready_notified = true;
710                                    // Flush logs before notifying so clients see logs immediately
711                                    let _ = log_appender.flush().await;
712                                    if let Some(tx) = ready_tx.take() {
713                                        let _ = tx.send(Ok(()));
714                                    }
715                                }
716                    },
717                    Some(result) = exit_rx.recv() => {
718                        // Process exited - save exit status and notify if not ready yet
719                        exit_status = Some(result);
720                        debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
721                        // Flush logs before notifying so clients see logs immediately
722                        let _ = log_appender.flush().await;
723                        if !ready_notified {
724                            if let Some(tx) = ready_tx.take() {
725                                // Check if process exited successfully
726                                let is_success = exit_status.as_ref()
727                                    .and_then(|r| r.as_ref().ok())
728                                    .map(|s| s.success())
729                                    .unwrap_or(false);
730
731                                if is_success {
732                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
733                                    let _ = tx.send(Ok(()));
734                                } else {
735                                    let exit_code = exit_status.as_ref()
736                                        .and_then(|r| r.as_ref().ok())
737                                        .and_then(|s| s.code());
738                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
739                                    let _ = tx.send(Err(exit_code));
740                                }
741                            }
742                        } else {
743                            debug!("daemon {id} was already marked ready, not sending notification");
744                        }
745                        break;
746                    }
747                    _ = async {
748                        if let Some(ref mut interval) = http_check_interval {
749                            interval.tick().await;
750                        } else {
751                            std::future::pending::<()>().await;
752                        }
753                    }, if !ready_notified && ready_http.is_some() => {
754                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
755                            match client.get(url).send().await {
756                                Ok(response) if response.status().is_success() => {
757                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
758                                    ready_notified = true;
759                                    // Flush logs before notifying so clients see logs immediately
760                                    let _ = log_appender.flush().await;
761                                    if let Some(tx) = ready_tx.take() {
762                                        let _ = tx.send(Ok(()));
763                                    }
764                                    // Stop checking once ready
765                                    http_check_interval = None;
766                                }
767                                Ok(response) => {
768                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
769                                }
770                                Err(e) => {
771                                    trace!("daemon {id} HTTP check failed: {e}");
772                                }
773                            }
774                        }
775                    }
776                    _ = async {
777                        if let Some(ref mut interval) = port_check_interval {
778                            interval.tick().await;
779                        } else {
780                            std::future::pending::<()>().await;
781                        }
782                    }, if !ready_notified && ready_port.is_some() => {
783                        if let Some(port) = ready_port {
784                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
785                                Ok(_) => {
786                                    info!("daemon {id} ready: TCP port {port} is listening");
787                                    ready_notified = true;
788                                    // Flush logs before notifying so clients see logs immediately
789                                    let _ = log_appender.flush().await;
790                                    if let Some(tx) = ready_tx.take() {
791                                        let _ = tx.send(Ok(()));
792                                    }
793                                    // Stop checking once ready
794                                    port_check_interval = None;
795                                }
796                                Err(_) => {
797                                    trace!("daemon {id} port check: port {port} not listening yet");
798                                }
799                            }
800                        }
801                    }
802                    _ = async {
803                        if let Some(ref mut timer) = delay_timer {
804                            timer.await;
805                        } else {
806                            std::future::pending::<()>().await;
807                        }
808                    } => {
809                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
810                            info!("daemon {id} ready: delay elapsed");
811                            ready_notified = true;
812                            // Flush logs before notifying so clients see logs immediately
813                            let _ = log_appender.flush().await;
814                            if let Some(tx) = ready_tx.take() {
815                                let _ = tx.send(Ok(()));
816                            }
817                        }
818                        // Disable timer after it fires
819                        delay_timer = None;
820                    }
821                    _ = log_flush_interval.tick() => {
822                        // Periodic flush to ensure logs are written to disk
823                        if let Err(e) = log_appender.flush().await {
824                            error!("Failed to flush log for daemon {id}: {e}");
825                        }
826                    }
827                    // Note: No `else => break` because log_flush_interval.tick() is always available,
828                    // making the else branch unreachable. The loop exits via the exit_rx.recv() branch.
829                }
830            }
831
832            // Final flush to ensure all buffered logs are written
833            if let Err(e) = log_appender.flush().await {
834                error!("Failed to final flush log for daemon {id}: {e}");
835            }
836
837            // Get the final exit status
838            let exit_status = if let Some(status) = exit_status {
839                status
840            } else {
841                // Streams closed but process hasn't exited yet, wait for it
842                match exit_rx.recv().await {
843                    Some(status) => status,
844                    None => {
845                        warn!("daemon {id} exit channel closed without receiving status");
846                        Err(std::io::Error::other("exit channel closed"))
847                    }
848                }
849            };
850            let current_daemon = SUPERVISOR.get_daemon(&id).await;
851
852            // Check if this monitoring task is for the current daemon process
853            if current_daemon.is_none()
854                || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
855            {
856                // Another process has taken over, don't update status
857                return;
858            }
859            let is_stopping = current_daemon
860                .as_ref()
861                .is_some_and(|d| d.status.is_stopping());
862
863            if current_daemon.is_some_and(|d| d.status.is_stopped()) {
864                // was stopped by this supervisor so don't update status
865                return;
866            }
867            if let Ok(status) = exit_status {
868                info!("daemon {id} exited with status {status}");
869                if status.success() || is_stopping {
870                    // If stopping, always mark as Stopped with success
871                    // This allows monitoring task to clear PID after stop() was called
872                    if let Err(e) = SUPERVISOR
873                        .upsert_daemon(UpsertDaemonOpts {
874                            id: id.clone(),
875                            pid: None, // Clear PID now that process has exited
876                            status: DaemonStatus::Stopped,
877                            last_exit_success: Some(status.success()),
878                            ..Default::default()
879                        })
880                        .await
881                    {
882                        error!("Failed to update daemon state for {id}: {e}");
883                    }
884                } else {
885                    // Handle error exit - mark for retry
886                    // retry_count increment will be handled by interval_watch
887                    if let Err(e) = SUPERVISOR
888                        .upsert_daemon(UpsertDaemonOpts {
889                            id: id.clone(),
890                            pid: None,
891                            status: DaemonStatus::Errored(status.code()),
892                            last_exit_success: Some(false),
893                            ..Default::default()
894                        })
895                        .await
896                    {
897                        error!("Failed to update daemon state for {id}: {e}");
898                    }
899                }
900            } else if let Err(e) = SUPERVISOR
901                .upsert_daemon(UpsertDaemonOpts {
902                    id: id.clone(),
903                    pid: None,
904                    status: DaemonStatus::Errored(None),
905                    last_exit_success: Some(false),
906                    ..Default::default()
907                })
908                .await
909            {
910                error!("Failed to update daemon state for {id}: {e}");
911            }
912        });
913
914        // If wait_ready is true, wait for readiness notification
915        if let Some(ready_rx) = ready_rx {
916            match ready_rx.await {
917                Ok(Ok(())) => {
918                    info!("daemon {id} is ready");
919                    Ok(IpcResponse::DaemonReady { daemon })
920                }
921                Ok(Err(exit_code)) => {
922                    error!("daemon {id} failed before becoming ready");
923                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
924                }
925                Err(_) => {
926                    error!("readiness channel closed unexpectedly for daemon {id}");
927                    Ok(IpcResponse::DaemonStart { daemon })
928                }
929            }
930        } else {
931            Ok(IpcResponse::DaemonStart { daemon })
932        }
933    }
934
935    pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
936        if id == "pitchfork" {
937            return Ok(IpcResponse::Error(
938                "Cannot stop supervisor via stop command".into(),
939            ));
940        }
941        info!("stopping daemon: {id}");
942        if let Some(daemon) = self.get_daemon(id).await {
943            trace!("daemon to stop: {daemon}");
944            if let Some(pid) = daemon.pid {
945                trace!("killing pid: {pid}");
946                PROCS.refresh_processes();
947                if PROCS.is_running(pid) {
948                    // First set status to Stopping (keeps PID for monitoring task)
949                    self.upsert_daemon(UpsertDaemonOpts {
950                        id: id.to_string(),
951                        status: DaemonStatus::Stopping,
952                        ..Default::default()
953                    })
954                    .await?;
955
956                    // Then kill the process
957                    if let Err(e) = PROCS.kill_async(pid).await {
958                        warn!("failed to kill pid {pid}: {e}");
959                    }
960                    PROCS.refresh_processes();
961                    for child_pid in PROCS.all_children(pid) {
962                        debug!("killing child pid: {child_pid}");
963                        if let Err(e) = PROCS.kill_async(child_pid).await {
964                            warn!("failed to kill child pid {child_pid}: {e}");
965                        }
966                    }
967                    // Monitoring task will clear PID and set to Stopped when it detects exit
968                } else {
969                    debug!("pid {pid} not running");
970                    // Process already dead, directly mark as stopped
971                    self.upsert_daemon(UpsertDaemonOpts {
972                        id: id.to_string(),
973                        pid: None,
974                        status: DaemonStatus::Stopped,
975                        ..Default::default()
976                    })
977                    .await?;
978                }
979                return Ok(IpcResponse::Ok);
980            } else {
981                debug!("daemon {id} not running");
982            }
983        } else {
984            debug!("daemon {id} not found");
985        }
986        Ok(IpcResponse::DaemonAlreadyStopped)
987    }
988
989    #[cfg(unix)]
990    fn signals(&self) -> Result<()> {
991        let signals = [
992            SignalKind::terminate(),
993            SignalKind::alarm(),
994            SignalKind::interrupt(),
995            SignalKind::quit(),
996            SignalKind::hangup(),
997            SignalKind::user_defined1(),
998            SignalKind::user_defined2(),
999        ];
1000        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1001        for signal in signals {
1002            let stream = match signal::unix::signal(signal) {
1003                Ok(s) => s,
1004                Err(e) => {
1005                    warn!("Failed to register signal handler for {:?}: {}", signal, e);
1006                    continue;
1007                }
1008            };
1009            tokio::spawn(async move {
1010                let mut stream = stream;
1011                loop {
1012                    stream.recv().await;
1013                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1014                        exit(1);
1015                    } else {
1016                        SUPERVISOR.handle_signal().await;
1017                    }
1018                }
1019            });
1020        }
1021        Ok(())
1022    }
1023
1024    #[cfg(windows)]
1025    fn signals(&self) -> Result<()> {
1026        tokio::spawn(async move {
1027            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1028            loop {
1029                if let Err(e) = signal::ctrl_c().await {
1030                    error!("Failed to wait for ctrl-c: {}", e);
1031                    return;
1032                }
1033                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1034                    exit(1);
1035                } else {
1036                    SUPERVISOR.handle_signal().await;
1037                }
1038            }
1039        });
1040        Ok(())
1041    }
1042
1043    async fn handle_signal(&self) {
1044        info!("received signal, stopping");
1045        self.close().await;
1046        exit(0)
1047    }
1048
1049    /// Watch files for daemons that have `watch` patterns configured.
1050    /// When a watched file changes, the daemon is automatically restarted.
1051    fn daemon_file_watch(&self) -> Result<()> {
1052        let pt = PitchforkToml::all_merged();
1053
1054        // Collect all daemons with watch patterns and their base directories
1055        let watch_configs: Vec<(String, Vec<String>, PathBuf)> = pt
1056            .daemons
1057            .iter()
1058            .filter(|(_, d)| !d.watch.is_empty())
1059            .map(|(id, d)| {
1060                let base_dir = d
1061                    .path
1062                    .as_ref()
1063                    .and_then(|p| p.parent())
1064                    .map(|p| p.to_path_buf())
1065                    .unwrap_or_else(|| env::CWD.clone());
1066                (id.clone(), d.watch.clone(), base_dir)
1067            })
1068            .collect();
1069
1070        if watch_configs.is_empty() {
1071            debug!("No daemons with watch patterns configured");
1072            return Ok(());
1073        }
1074
1075        info!(
1076            "Setting up file watching for {} daemon(s)",
1077            watch_configs.len()
1078        );
1079
1080        // Collect all directories to watch
1081        let mut all_dirs = std::collections::HashSet::new();
1082        for (id, patterns, base_dir) in &watch_configs {
1083            match expand_watch_patterns(patterns, base_dir) {
1084                Ok(dirs) => {
1085                    for dir in &dirs {
1086                        debug!("Watching {} for daemon {}", dir.display(), id);
1087                    }
1088                    all_dirs.extend(dirs);
1089                }
1090                Err(e) => {
1091                    warn!("Failed to expand watch patterns for {}: {}", id, e);
1092                }
1093            }
1094        }
1095
1096        if all_dirs.is_empty() {
1097            debug!("No directories to watch after expanding patterns");
1098            return Ok(());
1099        }
1100
1101        // Spawn the file watcher task
1102        tokio::spawn(async move {
1103            let mut wf = match WatchFiles::new(Duration::from_secs(1)) {
1104                Ok(wf) => wf,
1105                Err(e) => {
1106                    error!("Failed to create file watcher: {}", e);
1107                    return;
1108                }
1109            };
1110
1111            // Register all directories with the watcher
1112            for dir in all_dirs {
1113                if let Err(e) = wf.watch(&dir, RecursiveMode::Recursive) {
1114                    warn!("Failed to watch directory {}: {}", dir.display(), e);
1115                }
1116            }
1117
1118            info!("File watcher started");
1119
1120            // Process file change events
1121            while let Some(changed_paths) = wf.rx.recv().await {
1122                debug!("File changes detected: {:?}", changed_paths);
1123
1124                // Find which daemons should be restarted based on the changed paths
1125                let mut daemons_to_restart = std::collections::HashSet::new();
1126
1127                for changed_path in &changed_paths {
1128                    for (id, patterns, base_dir) in &watch_configs {
1129                        if path_matches_patterns(changed_path, patterns, base_dir) {
1130                            info!(
1131                                "File {} matched pattern for daemon {}, scheduling restart",
1132                                changed_path.display(),
1133                                id
1134                            );
1135                            daemons_to_restart.insert(id.clone());
1136                        }
1137                    }
1138                }
1139
1140                // Restart each affected daemon
1141                for id in daemons_to_restart {
1142                    if let Err(e) = SUPERVISOR.restart_watched_daemon(&id).await {
1143                        error!("Failed to restart daemon {} after file change: {}", id, e);
1144                    }
1145                }
1146            }
1147        });
1148
1149        Ok(())
1150    }
1151
1152    /// Restart a daemon that is being watched for file changes.
1153    /// Only restarts if the daemon is currently running.
1154    async fn restart_watched_daemon(&self, id: &str) -> Result<()> {
1155        // Check if daemon is running
1156        let daemon = self.get_daemon(id).await;
1157        let is_running = daemon
1158            .as_ref()
1159            .is_some_and(|d| d.pid.is_some() && d.status.is_running());
1160
1161        if !is_running {
1162            debug!(
1163                "Daemon {} is not running, skipping restart on file change",
1164                id
1165            );
1166            return Ok(());
1167        }
1168
1169        // Check if daemon is disabled
1170        let is_disabled = self.state_file.lock().await.disabled.contains(id);
1171        if is_disabled {
1172            debug!("Daemon {} is disabled, skipping restart on file change", id);
1173            return Ok(());
1174        }
1175
1176        info!("Restarting daemon {} due to file change", id);
1177
1178        // Get the daemon config to rebuild RunOptions
1179        let pt = PitchforkToml::all_merged();
1180        let Some(daemon_config) = pt.daemons.get(id) else {
1181            warn!("Daemon {} not found in config, cannot restart", id);
1182            return Ok(());
1183        };
1184
1185        let dir = daemon_config
1186            .path
1187            .as_ref()
1188            .and_then(|p| p.parent())
1189            .map(|p| p.to_path_buf())
1190            .unwrap_or_else(|| env::CWD.clone());
1191
1192        let cmd = match shell_words::split(&daemon_config.run) {
1193            Ok(cmd) => cmd,
1194            Err(e) => {
1195                error!("Failed to parse command for daemon {}: {}", id, e);
1196                return Ok(());
1197            }
1198        };
1199
1200        // Extract values from daemon before stopping
1201        let shell_pid = daemon.as_ref().and_then(|d| d.shell_pid);
1202        let autostop = daemon.as_ref().map(|d| d.autostop).unwrap_or(false);
1203
1204        // Stop the daemon first
1205        let _ = self.stop(id).await;
1206
1207        // Small delay to allow the process to fully stop
1208        time::sleep(Duration::from_millis(100)).await;
1209
1210        // Restart the daemon
1211        let run_opts = RunOptions {
1212            id: id.to_string(),
1213            cmd,
1214            force: true,
1215            shell_pid,
1216            dir,
1217            autostop,
1218            cron_schedule: daemon_config.cron.as_ref().map(|c| c.schedule.clone()),
1219            cron_retrigger: daemon_config.cron.as_ref().map(|c| c.retrigger),
1220            retry: daemon_config.retry.count(),
1221            retry_count: 0,
1222            ready_delay: daemon_config.ready_delay,
1223            ready_output: daemon_config.ready_output.clone(),
1224            ready_http: daemon_config.ready_http.clone(),
1225            ready_port: daemon_config.ready_port,
1226            wait_ready: false, // Don't block on file-triggered restarts
1227            depends: daemon_config.depends.clone(),
1228        };
1229
1230        match self.run(run_opts).await {
1231            Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
1232                info!("Successfully restarted daemon {} after file change", id);
1233            }
1234            Ok(other) => {
1235                warn!(
1236                    "Unexpected response when restarting daemon {}: {:?}",
1237                    id, other
1238                );
1239            }
1240            Err(e) => {
1241                error!("Failed to restart daemon {}: {}", id, e);
1242            }
1243        }
1244
1245        Ok(())
1246    }
1247
1248    fn interval_watch(&self) -> Result<()> {
1249        tokio::spawn(async move {
1250            let mut interval = time::interval(interval_duration());
1251            loop {
1252                interval.tick().await;
1253                if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
1254                    && let Err(err) = SUPERVISOR.refresh().await
1255                {
1256                    error!("failed to refresh: {err}");
1257                }
1258            }
1259        });
1260        Ok(())
1261    }
1262
1263    fn cron_watch(&self) -> Result<()> {
1264        tokio::spawn(async move {
1265            // Check every 10 seconds to support sub-minute cron schedules
1266            let mut interval = time::interval(Duration::from_secs(10));
1267            loop {
1268                interval.tick().await;
1269                if let Err(err) = SUPERVISOR.check_cron_schedules().await {
1270                    error!("failed to check cron schedules: {err}");
1271                }
1272            }
1273        });
1274        Ok(())
1275    }
1276
1277    async fn check_cron_schedules(&self) -> Result<()> {
1278        use cron::Schedule;
1279        use std::str::FromStr;
1280
1281        let now = chrono::Local::now();
1282
1283        // Collect only IDs of daemons with cron schedules (avoids cloning entire HashMap)
1284        let cron_daemon_ids: Vec<String> = {
1285            let state_file = self.state_file.lock().await;
1286            state_file
1287                .daemons
1288                .iter()
1289                .filter(|(_id, d)| d.cron_schedule.is_some() && d.cron_retrigger.is_some())
1290                .map(|(id, _d)| id.clone())
1291                .collect()
1292        };
1293
1294        for id in cron_daemon_ids {
1295            // Look up daemon when needed
1296            let daemon = {
1297                let state_file = self.state_file.lock().await;
1298                match state_file.daemons.get(&id) {
1299                    Some(d) => d.clone(),
1300                    None => continue,
1301                }
1302            };
1303
1304            if let Some(schedule_str) = &daemon.cron_schedule
1305                && let Some(retrigger) = daemon.cron_retrigger
1306            {
1307                // Parse the cron schedule
1308                let schedule = match Schedule::from_str(schedule_str) {
1309                    Ok(s) => s,
1310                    Err(e) => {
1311                        warn!("invalid cron schedule for daemon {id}: {e}");
1312                        continue;
1313                    }
1314                };
1315
1316                // Check if we should trigger: look for a scheduled time that has passed
1317                // since our last trigger (or last 10 seconds if never triggered)
1318                let check_since = daemon
1319                    .last_cron_triggered
1320                    .unwrap_or_else(|| now - chrono::Duration::seconds(10));
1321
1322                // Find if there's a scheduled time between check_since and now
1323                let should_trigger = schedule
1324                    .after(&check_since)
1325                    .take_while(|t| *t <= now)
1326                    .next()
1327                    .is_some();
1328
1329                if should_trigger {
1330                    // Update last_cron_triggered to prevent re-triggering the same event
1331                    {
1332                        let mut state_file = self.state_file.lock().await;
1333                        if let Some(d) = state_file.daemons.get_mut(&id) {
1334                            d.last_cron_triggered = Some(now);
1335                        }
1336                        if let Err(e) = state_file.write() {
1337                            error!("failed to update cron trigger time: {e}");
1338                        }
1339                    }
1340
1341                    let should_run = match retrigger {
1342                        crate::pitchfork_toml::CronRetrigger::Finish => {
1343                            // Run if not currently running
1344                            daemon.pid.is_none()
1345                        }
1346                        crate::pitchfork_toml::CronRetrigger::Always => {
1347                            // Always run (force restart handled in run method)
1348                            true
1349                        }
1350                        crate::pitchfork_toml::CronRetrigger::Success => {
1351                            // Run only if previous command succeeded
1352                            daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1353                        }
1354                        crate::pitchfork_toml::CronRetrigger::Fail => {
1355                            // Run only if previous command failed
1356                            daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1357                        }
1358                    };
1359
1360                    if should_run {
1361                        info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1362                        // Get the run command from pitchfork.toml
1363                        if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1364                            let cmd = match shell_words::split(&run_cmd) {
1365                                Ok(cmd) => cmd,
1366                                Err(e) => {
1367                                    error!("failed to parse command for cron daemon {}: {}", id, e);
1368                                    continue;
1369                                }
1370                            };
1371                            let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1372                            // Use force: true for Always retrigger to ensure restart
1373                            let force =
1374                                matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1375                            let opts = RunOptions {
1376                                id: id.clone(),
1377                                cmd,
1378                                force,
1379                                shell_pid: None,
1380                                dir,
1381                                autostop: daemon.autostop,
1382                                cron_schedule: Some(schedule_str.clone()),
1383                                cron_retrigger: Some(retrigger),
1384                                retry: daemon.retry,
1385                                retry_count: daemon.retry_count,
1386                                ready_delay: daemon.ready_delay,
1387                                ready_output: daemon.ready_output.clone(),
1388                                ready_http: daemon.ready_http.clone(),
1389                                ready_port: daemon.ready_port,
1390                                wait_ready: false,
1391                                depends: daemon.depends.clone(),
1392                            };
1393                            if let Err(e) = self.run(opts).await {
1394                                error!("failed to run cron daemon {id}: {e}");
1395                            }
1396                        } else {
1397                            warn!("no run command found for cron daemon {id}");
1398                        }
1399                    }
1400                }
1401            }
1402        }
1403
1404        Ok(())
1405    }
1406
1407    fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1408        use crate::pitchfork_toml::PitchforkToml;
1409        let pt = PitchforkToml::all_merged();
1410        pt.daemons.get(id).map(|d| d.run.clone())
1411    }
1412
1413    async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1414        loop {
1415            let (msg, send) = match ipc.read().await {
1416                Ok(msg) => msg,
1417                Err(e) => {
1418                    error!("failed to accept connection: {:?}", e);
1419                    continue;
1420                }
1421            };
1422            debug!("received message: {:?}", msg);
1423            tokio::spawn(async move {
1424                let rsp = SUPERVISOR
1425                    .handle_ipc(msg)
1426                    .await
1427                    .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1428                if let Err(err) = send.send(rsp).await {
1429                    debug!("failed to send message: {:?}", err);
1430                }
1431            });
1432        }
1433    }
1434
1435    async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1436        let rsp = match req {
1437            IpcRequest::Connect => {
1438                debug!("received connect message");
1439                IpcResponse::Ok
1440            }
1441            IpcRequest::Stop { id } => {
1442                if let Err(e) = validate_daemon_id(&id) {
1443                    return Ok(IpcResponse::Error(e));
1444                }
1445                self.stop(&id).await?
1446            }
1447            IpcRequest::Run(opts) => {
1448                if let Err(e) = validate_daemon_id(&opts.id) {
1449                    return Ok(IpcResponse::Error(e));
1450                }
1451                self.run(opts).await?
1452            }
1453            IpcRequest::Enable { id } => {
1454                if let Err(e) = validate_daemon_id(&id) {
1455                    return Ok(IpcResponse::Error(e));
1456                }
1457                if self.enable(id).await? {
1458                    IpcResponse::Yes
1459                } else {
1460                    IpcResponse::No
1461                }
1462            }
1463            IpcRequest::Disable { id } => {
1464                if let Err(e) = validate_daemon_id(&id) {
1465                    return Ok(IpcResponse::Error(e));
1466                }
1467                if self.disable(id).await? {
1468                    IpcResponse::Yes
1469                } else {
1470                    IpcResponse::No
1471                }
1472            }
1473            IpcRequest::GetActiveDaemons => {
1474                let daemons = self.active_daemons().await;
1475                IpcResponse::ActiveDaemons(daemons)
1476            }
1477            IpcRequest::GetNotifications => {
1478                let notifications = self.get_notifications().await;
1479                IpcResponse::Notifications(notifications)
1480            }
1481            IpcRequest::UpdateShellDir { shell_pid, dir } => {
1482                let prev = self.get_shell_dir(shell_pid).await;
1483                self.set_shell_dir(shell_pid, dir.clone()).await?;
1484                // Cancel any pending autostops for daemons in the new directory
1485                self.cancel_pending_autostops_for_dir(&dir).await;
1486                if let Some(prev) = prev {
1487                    self.leave_dir(&prev).await?;
1488                }
1489                self.refresh().await?;
1490                IpcResponse::Ok
1491            }
1492            IpcRequest::Clean => {
1493                self.clean().await?;
1494                IpcResponse::Ok
1495            }
1496            IpcRequest::GetDisabledDaemons => {
1497                let disabled = self.state_file.lock().await.disabled.clone();
1498                IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1499            }
1500        };
1501        Ok(rsp)
1502    }
1503
1504    async fn close(&self) {
1505        for daemon in self.active_daemons().await {
1506            if daemon.id == "pitchfork" {
1507                continue;
1508            }
1509            if let Err(err) = self.stop(&daemon.id).await {
1510                error!("failed to stop daemon {daemon}: {err}");
1511            }
1512        }
1513        let _ = self.remove_daemon("pitchfork").await;
1514
1515        // Signal IPC server to shut down gracefully
1516        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
1517            handle.shutdown();
1518        }
1519
1520        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1521    }
1522
1523    async fn add_notification(&self, level: log::LevelFilter, message: String) {
1524        self.pending_notifications
1525            .lock()
1526            .await
1527            .push((level, message));
1528    }
1529
1530    async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1531        self.pending_notifications.lock().await.drain(..).collect()
1532    }
1533
1534    async fn active_daemons(&self) -> Vec<Daemon> {
1535        self.state_file
1536            .lock()
1537            .await
1538            .daemons
1539            .values()
1540            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1541            .cloned()
1542            .collect()
1543    }
1544
1545    async fn remove_daemon(&self, id: &str) -> Result<()> {
1546        self.state_file.lock().await.daemons.remove(id);
1547        if let Err(err) = self.state_file.lock().await.write() {
1548            warn!("failed to update state file: {err:#}");
1549        }
1550        Ok(())
1551    }
1552
1553    async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1554        info!(
1555            "upserting daemon: {} pid: {} status: {}",
1556            opts.id,
1557            opts.pid.unwrap_or(0),
1558            opts.status
1559        );
1560        let mut state_file = self.state_file.lock().await;
1561        let existing = state_file.daemons.get(&opts.id);
1562        let daemon = Daemon {
1563            id: opts.id.to_string(),
1564            title: opts.pid.and_then(|pid| PROCS.title(pid)),
1565            pid: opts.pid,
1566            status: opts.status,
1567            shell_pid: opts.shell_pid,
1568            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1569            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1570            cron_schedule: opts
1571                .cron_schedule
1572                .or(existing.and_then(|d| d.cron_schedule.clone())),
1573            cron_retrigger: opts
1574                .cron_retrigger
1575                .or(existing.and_then(|d| d.cron_retrigger)),
1576            last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
1577            last_exit_success: opts
1578                .last_exit_success
1579                .or(existing.and_then(|d| d.last_exit_success)),
1580            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1581            retry_count: opts
1582                .retry_count
1583                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1584            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1585            ready_output: opts
1586                .ready_output
1587                .or(existing.and_then(|d| d.ready_output.clone())),
1588            ready_http: opts
1589                .ready_http
1590                .or(existing.and_then(|d| d.ready_http.clone())),
1591            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1592            depends: opts
1593                .depends
1594                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
1595        };
1596        state_file
1597            .daemons
1598            .insert(opts.id.to_string(), daemon.clone());
1599        if let Err(err) = state_file.write() {
1600            warn!("failed to update state file: {err:#}");
1601        }
1602        Ok(daemon)
1603    }
1604
1605    pub async fn enable(&self, id: String) -> Result<bool> {
1606        info!("enabling daemon: {id}");
1607        let mut state_file = self.state_file.lock().await;
1608        let result = state_file.disabled.remove(&id);
1609        state_file.write()?;
1610        Ok(result)
1611    }
1612
1613    pub async fn disable(&self, id: String) -> Result<bool> {
1614        info!("disabling daemon: {id}");
1615        let mut state_file = self.state_file.lock().await;
1616        let result = state_file.disabled.insert(id);
1617        state_file.write()?;
1618        Ok(result)
1619    }
1620
1621    async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1622        self.state_file.lock().await.daemons.get(id).cloned()
1623    }
1624
1625    async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1626        let mut state_file = self.state_file.lock().await;
1627        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1628        state_file.write()?;
1629        Ok(())
1630    }
1631
1632    async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1633        self.state_file
1634            .lock()
1635            .await
1636            .shell_dirs
1637            .get(&shell_pid.to_string())
1638            .cloned()
1639    }
1640
1641    async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1642        let mut state_file = self.state_file.lock().await;
1643        if state_file
1644            .shell_dirs
1645            .remove(&shell_pid.to_string())
1646            .is_some()
1647        {
1648            state_file.write()?;
1649        }
1650        Ok(())
1651    }
1652
1653    async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1654        self.state_file.lock().await.shell_dirs.iter().fold(
1655            HashMap::new(),
1656            |mut acc, (pid, dir)| {
1657                if let Ok(pid) = pid.parse() {
1658                    acc.entry(dir.clone()).or_default().push(pid);
1659                }
1660                acc
1661            },
1662        )
1663    }
1664
1665    async fn clean(&self) -> Result<()> {
1666        let mut state_file = self.state_file.lock().await;
1667        state_file.daemons.retain(|_id, d| d.pid.is_some());
1668        state_file.write()?;
1669        Ok(())
1670    }
1671}
1672
1673#[derive(Debug)]
1674struct UpsertDaemonOpts {
1675    id: String,
1676    pid: Option<u32>,
1677    status: DaemonStatus,
1678    shell_pid: Option<u32>,
1679    dir: Option<PathBuf>,
1680    autostop: bool,
1681    cron_schedule: Option<String>,
1682    cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1683    last_exit_success: Option<bool>,
1684    retry: Option<u32>,
1685    retry_count: Option<u32>,
1686    ready_delay: Option<u64>,
1687    ready_output: Option<String>,
1688    ready_http: Option<String>,
1689    ready_port: Option<u16>,
1690    depends: Option<Vec<String>>,
1691}
1692
1693impl Default for UpsertDaemonOpts {
1694    fn default() -> Self {
1695        Self {
1696            id: "".to_string(),
1697            pid: None,
1698            status: DaemonStatus::Stopped,
1699            shell_pid: None,
1700            dir: None,
1701            autostop: false,
1702            cron_schedule: None,
1703            cron_retrigger: None,
1704            last_exit_success: None,
1705            retry: None,
1706            retry_count: None,
1707            ready_delay: None,
1708            ready_output: None,
1709            ready_http: None,
1710            ready_port: None,
1711            depends: None,
1712        }
1713    }
1714}