pitchfork_cli/
supervisor.rs

1use crate::daemon::{Daemon, RunOptions, validate_daemon_id};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::IpcServer;
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{Result, env};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use regex::Regex;
14use std::collections::HashMap;
15use std::fs;
16use std::iter::once;
17use std::path::{Path, PathBuf};
18use std::process::exit;
19use std::sync::atomic;
20use std::sync::atomic::AtomicBool;
21use std::time::Duration;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
23#[cfg(unix)]
24use tokio::signal::unix::SignalKind;
25use tokio::sync::Mutex;
26use tokio::sync::oneshot;
27use tokio::{select, signal, time};
28
29/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
30static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
31    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
32
33/// Get or compile a regex pattern, caching the result for future use
34fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
35    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
36    if let Some(re) = cache.get(pattern) {
37        return Some(re.clone());
38    }
39    match Regex::new(pattern) {
40        Ok(re) => {
41            cache.insert(pattern.to_string(), re.clone());
42            Some(re)
43        }
44        Err(e) => {
45            error!("invalid regex pattern '{}': {}", pattern, e);
46            None
47        }
48    }
49}
50
51pub struct Supervisor {
52    state_file: Mutex<StateFile>,
53    pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
54    last_refreshed_at: Mutex<time::Instant>,
55    /// Map of daemon ID to scheduled autostop time
56    pending_autostops: Mutex<HashMap<String, time::Instant>>,
57}
58
59fn interval_duration() -> Duration {
60    Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
61}
62
63pub static SUPERVISOR: Lazy<Supervisor> =
64    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
65
66pub fn start_if_not_running() -> Result<()> {
67    let sf = StateFile::get();
68    if let Some(d) = sf.daemons.get("pitchfork")
69        && let Some(pid) = d.pid
70        && PROCS.is_running(pid)
71    {
72        return Ok(());
73    }
74    start_in_background()
75}
76
77pub fn start_in_background() -> Result<()> {
78    debug!("starting supervisor in background");
79    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
80        .stdout_null()
81        .stderr_null()
82        .start()
83        .into_diagnostic()?;
84    Ok(())
85}
86
87impl Supervisor {
88    pub fn new() -> Result<Self> {
89        Ok(Self {
90            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
91            last_refreshed_at: Mutex::new(time::Instant::now()),
92            pending_notifications: Mutex::new(vec![]),
93            pending_autostops: Mutex::new(HashMap::new()),
94        })
95    }
96
97    pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
98        let pid = std::process::id();
99        info!("Starting supervisor with pid {pid}");
100
101        self.upsert_daemon(UpsertDaemonOpts {
102            id: "pitchfork".to_string(),
103            pid: Some(pid),
104            status: DaemonStatus::Running,
105            ..Default::default()
106        })
107        .await?;
108
109        // If this is a boot start, automatically start boot_start daemons
110        if is_boot {
111            info!("Boot start mode enabled, starting boot_start daemons");
112            self.start_boot_daemons().await?;
113        }
114
115        self.interval_watch()?;
116        self.cron_watch()?;
117        self.signals()?;
118        // self.file_watch().await?;
119
120        // Start web server if port is configured
121        if let Some(port) = web_port {
122            tokio::spawn(async move {
123                if let Err(e) = crate::web::serve(port).await {
124                    error!("Web server error: {}", e);
125                }
126            });
127        }
128
129        let ipc = IpcServer::new()?;
130        self.conn_watch(ipc).await
131    }
132
133    async fn refresh(&self) -> Result<()> {
134        trace!("refreshing");
135
136        // Collect PIDs we need to check (shell PIDs only)
137        // This is more efficient than refreshing all processes on the system
138        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
139        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
140
141        if pids_to_check.is_empty() {
142            // No PIDs to check, skip the expensive refresh
143            trace!("no shell PIDs to check, skipping process refresh");
144        } else {
145            PROCS.refresh_pids(&pids_to_check);
146        }
147
148        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
149        *last_refreshed_at = time::Instant::now();
150
151        for (dir, pids) in dirs_with_pids {
152            let to_remove = pids
153                .iter()
154                .filter(|pid| !PROCS.is_running(**pid))
155                .collect_vec();
156            for pid in &to_remove {
157                self.remove_shell_pid(**pid).await?
158            }
159            if to_remove.len() == pids.len() {
160                self.leave_dir(&dir).await?;
161            }
162        }
163
164        self.check_retry().await?;
165        self.process_pending_autostops().await?;
166
167        Ok(())
168    }
169
170    async fn check_retry(&self) -> Result<()> {
171        // Collect only IDs of daemons that need retrying (avoids cloning entire Daemon structs)
172        let ids_to_retry: Vec<String> = {
173            let state_file = self.state_file.lock().await;
174            state_file
175                .daemons
176                .iter()
177                .filter(|(_id, d)| {
178                    // Daemon is errored, not currently running, and has retries remaining
179                    d.status.is_errored()
180                        && d.pid.is_none()
181                        && d.retry > 0
182                        && d.retry_count < d.retry
183                })
184                .map(|(id, _d)| id.clone())
185                .collect()
186        };
187
188        for id in ids_to_retry {
189            // Look up daemon when needed and re-verify retry criteria
190            // (state may have changed since we collected IDs)
191            let daemon = {
192                let state_file = self.state_file.lock().await;
193                match state_file.daemons.get(&id) {
194                    Some(d)
195                        if d.status.is_errored()
196                            && d.pid.is_none()
197                            && d.retry > 0
198                            && d.retry_count < d.retry =>
199                    {
200                        d.clone()
201                    }
202                    _ => continue, // Daemon was removed or no longer needs retry
203                }
204            };
205            info!(
206                "retrying daemon {} ({}/{} attempts)",
207                id,
208                daemon.retry_count + 1,
209                daemon.retry
210            );
211
212            // Get command from pitchfork.toml
213            if let Some(run_cmd) = self.get_daemon_run_command(&id) {
214                let cmd = match shell_words::split(&run_cmd) {
215                    Ok(cmd) => cmd,
216                    Err(e) => {
217                        error!("failed to parse command for daemon {}: {}", id, e);
218                        // Mark as exhausted to prevent infinite retry loop, preserving error status
219                        self.upsert_daemon(UpsertDaemonOpts {
220                            id,
221                            status: daemon.status.clone(),
222                            retry_count: Some(daemon.retry),
223                            ..Default::default()
224                        })
225                        .await?;
226                        continue;
227                    }
228                };
229                let retry_opts = RunOptions {
230                    id: id.clone(),
231                    cmd,
232                    force: false,
233                    shell_pid: daemon.shell_pid,
234                    dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
235                    autostop: daemon.autostop,
236                    cron_schedule: daemon.cron_schedule,
237                    cron_retrigger: daemon.cron_retrigger,
238                    retry: daemon.retry,
239                    retry_count: daemon.retry_count + 1,
240                    ready_delay: daemon.ready_delay,
241                    ready_output: daemon.ready_output.clone(),
242                    ready_http: daemon.ready_http.clone(),
243                    ready_port: daemon.ready_port,
244                    wait_ready: false,
245                    depends: daemon.depends.clone(),
246                };
247                if let Err(e) = self.run(retry_opts).await {
248                    error!("failed to retry daemon {}: {}", id, e);
249                }
250            } else {
251                warn!("no run command found for daemon {}, cannot retry", id);
252                // Mark as exhausted
253                self.upsert_daemon(UpsertDaemonOpts {
254                    id,
255                    retry_count: Some(daemon.retry),
256                    ..Default::default()
257                })
258                .await?;
259            }
260        }
261
262        Ok(())
263    }
264
265    async fn leave_dir(&self, dir: &Path) -> Result<()> {
266        debug!("left dir {}", dir.display());
267        let shell_dirs = self.get_dirs_with_shell_pids().await;
268        let shell_dirs = shell_dirs.keys().collect_vec();
269        let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
270
271        for daemon in self.active_daemons().await {
272            if !daemon.autostop {
273                continue;
274            }
275            // if this daemon's dir starts with the left dir
276            // and no other shell pid has this dir as a prefix
277            // schedule the daemon for autostop
278            if let Some(daemon_dir) = daemon.dir.as_ref()
279                && daemon_dir.starts_with(dir)
280                && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
281            {
282                if delay_secs == 0 {
283                    // No delay configured, stop immediately
284                    info!("autostopping {daemon}");
285                    self.stop(&daemon.id).await?;
286                    self.add_notification(Info, format!("autostopped {daemon}"))
287                        .await;
288                } else {
289                    // Schedule autostop with delay
290                    let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
291                    let mut pending = self.pending_autostops.lock().await;
292                    if !pending.contains_key(&daemon.id) {
293                        info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
294                        pending.insert(daemon.id.clone(), stop_at);
295                    }
296                }
297            }
298        }
299        Ok(())
300    }
301
302    /// Cancel any pending autostop for daemons in the given directory
303    /// Also cancels autostops for daemons in parent directories (e.g., entering /project/subdir
304    /// cancels pending autostop for daemon in /project)
305    async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
306        let mut pending = self.pending_autostops.lock().await;
307        let daemons_to_cancel: Vec<String> = {
308            let state_file = self.state_file.lock().await;
309            state_file
310                .daemons
311                .iter()
312                .filter(|(_id, d)| {
313                    d.dir.as_ref().is_some_and(|daemon_dir| {
314                        // Cancel if entering a directory inside or equal to daemon's directory
315                        // OR if daemon is in a subdirectory of the entered directory
316                        dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
317                    })
318                })
319                .map(|(id, _)| id.clone())
320                .collect()
321        };
322
323        for daemon_id in daemons_to_cancel {
324            if pending.remove(&daemon_id).is_some() {
325                info!("cancelled pending autostop for {}", daemon_id);
326            }
327        }
328    }
329
330    /// Process any pending autostops that have reached their scheduled time
331    async fn process_pending_autostops(&self) -> Result<()> {
332        let now = time::Instant::now();
333        let to_stop: Vec<String> = {
334            let pending = self.pending_autostops.lock().await;
335            pending
336                .iter()
337                .filter(|(_, stop_at)| now >= **stop_at)
338                .map(|(id, _)| id.clone())
339                .collect()
340        };
341
342        for daemon_id in to_stop {
343            // Remove from pending first
344            {
345                let mut pending = self.pending_autostops.lock().await;
346                pending.remove(&daemon_id);
347            }
348
349            // Check if daemon is still running and should be stopped
350            if let Some(daemon) = self.get_daemon(&daemon_id).await
351                && daemon.autostop
352                && daemon.status.is_running()
353            {
354                // Verify no shell is in the daemon's directory
355                let shell_dirs = self.get_dirs_with_shell_pids().await;
356                let shell_dirs = shell_dirs.keys().collect_vec();
357                if let Some(daemon_dir) = daemon.dir.as_ref()
358                    && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
359                {
360                    info!("autostopping {} (after delay)", daemon_id);
361                    self.stop(&daemon_id).await?;
362                    self.add_notification(Info, format!("autostopped {daemon_id}"))
363                        .await;
364                }
365            }
366        }
367        Ok(())
368    }
369
370    async fn start_boot_daemons(&self) -> Result<()> {
371        use crate::pitchfork_toml::PitchforkToml;
372
373        info!("Scanning for boot_start daemons");
374        let pt = PitchforkToml::all_merged();
375
376        let boot_daemons: Vec<_> = pt
377            .daemons
378            .iter()
379            .filter(|(_id, d)| d.boot_start.unwrap_or(false))
380            .collect();
381
382        if boot_daemons.is_empty() {
383            info!("No daemons configured with boot_start = true");
384            return Ok(());
385        }
386
387        info!("Found {} daemon(s) to start at boot", boot_daemons.len());
388
389        for (id, daemon) in boot_daemons {
390            info!("Starting boot daemon: {}", id);
391
392            let dir = daemon
393                .path
394                .as_ref()
395                .and_then(|p| p.parent())
396                .map(|p| p.to_path_buf())
397                .unwrap_or_else(|| env::CWD.clone());
398
399            let cmd = match shell_words::split(&daemon.run) {
400                Ok(cmd) => cmd,
401                Err(e) => {
402                    error!("failed to parse command for boot daemon {}: {}", id, e);
403                    continue;
404                }
405            };
406            let run_opts = RunOptions {
407                id: id.clone(),
408                cmd,
409                force: false,
410                shell_pid: None,
411                dir,
412                autostop: false, // Boot daemons should not autostop
413                cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
414                cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
415                retry: daemon.retry,
416                retry_count: 0,
417                ready_delay: daemon.ready_delay,
418                ready_output: daemon.ready_output.clone(),
419                ready_http: daemon.ready_http.clone(),
420                ready_port: daemon.ready_port,
421                wait_ready: false, // Don't block on boot daemons
422                depends: daemon.depends.clone(),
423            };
424
425            match self.run(run_opts).await {
426                Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
427                    info!("Successfully started boot daemon: {}", id);
428                }
429                Ok(IpcResponse::DaemonAlreadyRunning) => {
430                    info!("Boot daemon already running: {}", id);
431                }
432                Ok(other) => {
433                    warn!(
434                        "Unexpected response when starting boot daemon {}: {:?}",
435                        id, other
436                    );
437                }
438                Err(e) => {
439                    error!("Failed to start boot daemon {}: {}", id, e);
440                }
441            }
442        }
443
444        Ok(())
445    }
446
447    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
448        let id = &opts.id;
449        let cmd = opts.cmd.clone();
450
451        // Clear any pending autostop for this daemon since it's being started
452        {
453            let mut pending = self.pending_autostops.lock().await;
454            if pending.remove(id).is_some() {
455                info!("cleared pending autostop for {} (daemon starting)", id);
456            }
457        }
458
459        let daemon = self.get_daemon(id).await;
460        if let Some(daemon) = daemon {
461            // Stopping state is treated as "not running" - the monitoring task will clean it up
462            // Only check for Running state with a valid PID
463            if !daemon.status.is_stopping()
464                && !daemon.status.is_stopped()
465                && let Some(pid) = daemon.pid
466            {
467                if opts.force {
468                    self.stop(id).await?;
469                    info!("run: stop completed for daemon {id}");
470                } else {
471                    warn!("daemon {id} already running with pid {pid}");
472                    return Ok(IpcResponse::DaemonAlreadyRunning);
473                }
474            }
475        }
476
477        // If wait_ready is true and retry is configured, implement retry loop
478        if opts.wait_ready && opts.retry > 0 {
479            let max_attempts = opts.retry + 1; // initial attempt + retries
480            for attempt in 0..max_attempts {
481                let mut retry_opts = opts.clone();
482                retry_opts.retry_count = attempt;
483                retry_opts.cmd = cmd.clone();
484
485                let result = self.run_once(retry_opts).await?;
486
487                match result {
488                    IpcResponse::DaemonReady { daemon } => {
489                        return Ok(IpcResponse::DaemonReady { daemon });
490                    }
491                    IpcResponse::DaemonFailedWithCode { exit_code } => {
492                        if attempt < opts.retry {
493                            let backoff_secs = 2u64.pow(attempt);
494                            info!(
495                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
496                                attempt + 1,
497                                max_attempts,
498                                backoff_secs
499                            );
500                            time::sleep(Duration::from_secs(backoff_secs)).await;
501                            continue;
502                        } else {
503                            info!("daemon {id} failed after {} attempts", max_attempts);
504                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
505                        }
506                    }
507                    other => return Ok(other),
508                }
509            }
510        }
511
512        // No retry or wait_ready is false
513        self.run_once(opts).await
514    }
515
516    async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
517        let id = &opts.id;
518        let cmd = opts.cmd;
519
520        // Create channel for readiness notification if wait_ready is true
521        let (ready_tx, ready_rx) = if opts.wait_ready {
522            let (tx, rx) = oneshot::channel();
523            (Some(tx), Some(rx))
524        } else {
525            (None, None)
526        };
527
528        let cmd = once("exec".to_string())
529            .chain(cmd.into_iter())
530            .collect_vec();
531        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
532        let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
533        if let Some(parent) = log_path.parent() {
534            xx::file::mkdirp(parent)?;
535        }
536        info!("run: spawning daemon {id} with args: {args:?}");
537        let mut cmd = tokio::process::Command::new("sh");
538        cmd.args(&args)
539            .stdin(std::process::Stdio::null())
540            .stdout(std::process::Stdio::piped())
541            .stderr(std::process::Stdio::piped())
542            .current_dir(&opts.dir);
543
544        // Ensure daemon can find user tools by using the original PATH
545        if let Some(ref path) = *env::ORIGINAL_PATH {
546            cmd.env("PATH", path);
547        }
548
549        let mut child = cmd.spawn().into_diagnostic()?;
550        let pid = match child.id() {
551            Some(p) => p,
552            None => {
553                warn!("Daemon {id} exited before PID could be captured");
554                return Ok(IpcResponse::DaemonFailed {
555                    error: "Process exited immediately".to_string(),
556                });
557            }
558        };
559        info!("started daemon {id} with pid {pid}");
560        let daemon = self
561            .upsert_daemon(UpsertDaemonOpts {
562                id: id.to_string(),
563                pid: Some(pid),
564                status: DaemonStatus::Running,
565                shell_pid: opts.shell_pid,
566                dir: Some(opts.dir.clone()),
567                autostop: opts.autostop,
568                cron_schedule: opts.cron_schedule.clone(),
569                cron_retrigger: opts.cron_retrigger,
570                last_exit_success: None,
571                retry: Some(opts.retry),
572                retry_count: Some(opts.retry_count),
573                ready_delay: opts.ready_delay,
574                ready_output: opts.ready_output.clone(),
575                ready_http: opts.ready_http.clone(),
576                ready_port: opts.ready_port,
577                depends: Some(opts.depends.clone()),
578            })
579            .await?;
580
581        let id_clone = id.to_string();
582        let ready_delay = opts.ready_delay;
583        let ready_output = opts.ready_output.clone();
584        let ready_http = opts.ready_http.clone();
585        let ready_port = opts.ready_port;
586
587        tokio::spawn(async move {
588            let id = id_clone;
589            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
590                (Some(out), Some(err)) => (out, err),
591                _ => {
592                    error!("Failed to capture stdout/stderr for daemon {id}");
593                    return;
594                }
595            };
596            let mut stdout = tokio::io::BufReader::new(stdout).lines();
597            let mut stderr = tokio::io::BufReader::new(stderr).lines();
598            let log_file = match tokio::fs::File::options()
599                .append(true)
600                .create(true)
601                .open(&log_path)
602                .await
603            {
604                Ok(f) => f,
605                Err(e) => {
606                    error!("Failed to open log file for daemon {id}: {e}");
607                    return;
608                }
609            };
610            let mut log_appender = BufWriter::new(log_file);
611
612            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
613            let format_line = |line: String| {
614                if line.starts_with(&format!("{id} ")) {
615                    // mise tasks often already have the id printed
616                    format!("{} {line}\n", now())
617                } else {
618                    format!("{} {id} {line}\n", now())
619                }
620            };
621
622            // Setup readiness checking
623            let mut ready_notified = false;
624            let mut ready_tx = ready_tx;
625            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
626
627            let mut delay_timer =
628                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
629
630            // Setup HTTP readiness check interval (poll every 500ms)
631            let mut http_check_interval = ready_http
632                .as_ref()
633                .map(|_| tokio::time::interval(Duration::from_millis(500)));
634            let http_client = ready_http.as_ref().map(|_| {
635                reqwest::Client::builder()
636                    .timeout(Duration::from_secs(5))
637                    .build()
638                    .unwrap_or_default()
639            });
640
641            // Setup TCP port readiness check interval (poll every 500ms)
642            let mut port_check_interval =
643                ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
644
645            // Setup periodic log flush interval (every 500ms - balances I/O reduction with responsiveness)
646            let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
647
648            // Use a channel to communicate process exit status
649            let (exit_tx, mut exit_rx) =
650                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
651
652            // Spawn a task to wait for process exit
653            let child_pid = child.id().unwrap_or(0);
654            tokio::spawn(async move {
655                let result = child.wait().await;
656                debug!(
657                    "daemon pid {child_pid} wait() completed with result: {:?}",
658                    result
659                );
660                let _ = exit_tx.send(result).await;
661            });
662
663            #[allow(unused_assignments)]
664            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
665            let mut exit_status = None;
666
667            loop {
668                select! {
669                    Ok(Some(line)) = stdout.next_line() => {
670                        let formatted = format_line(line.clone());
671                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
672                            error!("Failed to write to log for daemon {id}: {e}");
673                        }
674                        trace!("stdout: {id} {formatted}");
675
676                        // Check if output matches ready pattern
677                        if !ready_notified
678                            && let Some(ref pattern) = ready_pattern
679                                && pattern.is_match(&line) {
680                                    info!("daemon {id} ready: output matched pattern");
681                                    ready_notified = true;
682                                    // Flush logs before notifying so clients see logs immediately
683                                    let _ = log_appender.flush().await;
684                                    if let Some(tx) = ready_tx.take() {
685                                        let _ = tx.send(Ok(()));
686                                    }
687                                }
688                    }
689                    Ok(Some(line)) = stderr.next_line() => {
690                        let formatted = format_line(line.clone());
691                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
692                            error!("Failed to write to log for daemon {id}: {e}");
693                        }
694                        trace!("stderr: {id} {formatted}");
695
696                        // Check if output matches ready pattern (also check stderr)
697                        if !ready_notified
698                            && let Some(ref pattern) = ready_pattern
699                                && pattern.is_match(&line) {
700                                    info!("daemon {id} ready: output matched pattern");
701                                    ready_notified = true;
702                                    // Flush logs before notifying so clients see logs immediately
703                                    let _ = log_appender.flush().await;
704                                    if let Some(tx) = ready_tx.take() {
705                                        let _ = tx.send(Ok(()));
706                                    }
707                                }
708                    },
709                    Some(result) = exit_rx.recv() => {
710                        // Process exited - save exit status and notify if not ready yet
711                        exit_status = Some(result);
712                        debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
713                        // Flush logs before notifying so clients see logs immediately
714                        let _ = log_appender.flush().await;
715                        if !ready_notified {
716                            if let Some(tx) = ready_tx.take() {
717                                // Check if process exited successfully
718                                let is_success = exit_status.as_ref()
719                                    .and_then(|r| r.as_ref().ok())
720                                    .map(|s| s.success())
721                                    .unwrap_or(false);
722
723                                if is_success {
724                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
725                                    let _ = tx.send(Ok(()));
726                                } else {
727                                    let exit_code = exit_status.as_ref()
728                                        .and_then(|r| r.as_ref().ok())
729                                        .and_then(|s| s.code());
730                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
731                                    let _ = tx.send(Err(exit_code));
732                                }
733                            }
734                        } else {
735                            debug!("daemon {id} was already marked ready, not sending notification");
736                        }
737                        break;
738                    }
739                    _ = async {
740                        if let Some(ref mut interval) = http_check_interval {
741                            interval.tick().await;
742                        } else {
743                            std::future::pending::<()>().await;
744                        }
745                    }, if !ready_notified && ready_http.is_some() => {
746                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
747                            match client.get(url).send().await {
748                                Ok(response) if response.status().is_success() => {
749                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
750                                    ready_notified = true;
751                                    // Flush logs before notifying so clients see logs immediately
752                                    let _ = log_appender.flush().await;
753                                    if let Some(tx) = ready_tx.take() {
754                                        let _ = tx.send(Ok(()));
755                                    }
756                                    // Stop checking once ready
757                                    http_check_interval = None;
758                                }
759                                Ok(response) => {
760                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
761                                }
762                                Err(e) => {
763                                    trace!("daemon {id} HTTP check failed: {e}");
764                                }
765                            }
766                        }
767                    }
768                    _ = async {
769                        if let Some(ref mut interval) = port_check_interval {
770                            interval.tick().await;
771                        } else {
772                            std::future::pending::<()>().await;
773                        }
774                    }, if !ready_notified && ready_port.is_some() => {
775                        if let Some(port) = ready_port {
776                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
777                                Ok(_) => {
778                                    info!("daemon {id} ready: TCP port {port} is listening");
779                                    ready_notified = true;
780                                    // Flush logs before notifying so clients see logs immediately
781                                    let _ = log_appender.flush().await;
782                                    if let Some(tx) = ready_tx.take() {
783                                        let _ = tx.send(Ok(()));
784                                    }
785                                    // Stop checking once ready
786                                    port_check_interval = None;
787                                }
788                                Err(_) => {
789                                    trace!("daemon {id} port check: port {port} not listening yet");
790                                }
791                            }
792                        }
793                    }
794                    _ = async {
795                        if let Some(ref mut timer) = delay_timer {
796                            timer.await;
797                        } else {
798                            std::future::pending::<()>().await;
799                        }
800                    } => {
801                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
802                            info!("daemon {id} ready: delay elapsed");
803                            ready_notified = true;
804                            // Flush logs before notifying so clients see logs immediately
805                            let _ = log_appender.flush().await;
806                            if let Some(tx) = ready_tx.take() {
807                                let _ = tx.send(Ok(()));
808                            }
809                        }
810                        // Disable timer after it fires
811                        delay_timer = None;
812                    }
813                    _ = log_flush_interval.tick() => {
814                        // Periodic flush to ensure logs are written to disk
815                        if let Err(e) = log_appender.flush().await {
816                            error!("Failed to flush log for daemon {id}: {e}");
817                        }
818                    }
819                    // Note: No `else => break` because log_flush_interval.tick() is always available,
820                    // making the else branch unreachable. The loop exits via the exit_rx.recv() branch.
821                }
822            }
823
824            // Final flush to ensure all buffered logs are written
825            if let Err(e) = log_appender.flush().await {
826                error!("Failed to final flush log for daemon {id}: {e}");
827            }
828
829            // Get the final exit status
830            let exit_status = if let Some(status) = exit_status {
831                status
832            } else {
833                // Streams closed but process hasn't exited yet, wait for it
834                match exit_rx.recv().await {
835                    Some(status) => status,
836                    None => {
837                        warn!("daemon {id} exit channel closed without receiving status");
838                        Err(std::io::Error::other("exit channel closed"))
839                    }
840                }
841            };
842            let current_daemon = SUPERVISOR.get_daemon(&id).await;
843
844            // Check if this monitoring task is for the current daemon process
845            if current_daemon.is_none()
846                || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
847            {
848                // Another process has taken over, don't update status
849                return;
850            }
851            let is_stopping = current_daemon
852                .as_ref()
853                .is_some_and(|d| d.status.is_stopping());
854
855            if current_daemon.is_some_and(|d| d.status.is_stopped()) {
856                // was stopped by this supervisor so don't update status
857                return;
858            }
859            if let Ok(status) = exit_status {
860                info!("daemon {id} exited with status {status}");
861                if status.success() || is_stopping {
862                    // If stopping, always mark as Stopped with success
863                    // This allows monitoring task to clear PID after stop() was called
864                    if let Err(e) = SUPERVISOR
865                        .upsert_daemon(UpsertDaemonOpts {
866                            id: id.clone(),
867                            pid: None, // Clear PID now that process has exited
868                            status: DaemonStatus::Stopped,
869                            last_exit_success: Some(status.success()),
870                            ..Default::default()
871                        })
872                        .await
873                    {
874                        error!("Failed to update daemon state for {id}: {e}");
875                    }
876                } else {
877                    // Handle error exit - mark for retry
878                    // retry_count increment will be handled by interval_watch
879                    if let Err(e) = SUPERVISOR
880                        .upsert_daemon(UpsertDaemonOpts {
881                            id: id.clone(),
882                            pid: None,
883                            status: DaemonStatus::Errored(status.code()),
884                            last_exit_success: Some(false),
885                            ..Default::default()
886                        })
887                        .await
888                    {
889                        error!("Failed to update daemon state for {id}: {e}");
890                    }
891                }
892            } else if let Err(e) = SUPERVISOR
893                .upsert_daemon(UpsertDaemonOpts {
894                    id: id.clone(),
895                    pid: None,
896                    status: DaemonStatus::Errored(None),
897                    last_exit_success: Some(false),
898                    ..Default::default()
899                })
900                .await
901            {
902                error!("Failed to update daemon state for {id}: {e}");
903            }
904        });
905
906        // If wait_ready is true, wait for readiness notification
907        if let Some(ready_rx) = ready_rx {
908            match ready_rx.await {
909                Ok(Ok(())) => {
910                    info!("daemon {id} is ready");
911                    Ok(IpcResponse::DaemonReady { daemon })
912                }
913                Ok(Err(exit_code)) => {
914                    error!("daemon {id} failed before becoming ready");
915                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
916                }
917                Err(_) => {
918                    error!("readiness channel closed unexpectedly for daemon {id}");
919                    Ok(IpcResponse::DaemonStart { daemon })
920                }
921            }
922        } else {
923            Ok(IpcResponse::DaemonStart { daemon })
924        }
925    }
926
927    pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
928        if id == "pitchfork" {
929            return Ok(IpcResponse::Error(
930                "Cannot stop supervisor via stop command".into(),
931            ));
932        }
933        info!("stopping daemon: {id}");
934        if let Some(daemon) = self.get_daemon(id).await {
935            trace!("daemon to stop: {daemon}");
936            if let Some(pid) = daemon.pid {
937                trace!("killing pid: {pid}");
938                PROCS.refresh_processes();
939                if PROCS.is_running(pid) {
940                    // First set status to Stopping (keeps PID for monitoring task)
941                    self.upsert_daemon(UpsertDaemonOpts {
942                        id: id.to_string(),
943                        status: DaemonStatus::Stopping,
944                        ..Default::default()
945                    })
946                    .await?;
947
948                    // Then kill the process
949                    if let Err(e) = PROCS.kill_async(pid).await {
950                        warn!("failed to kill pid {pid}: {e}");
951                    }
952                    PROCS.refresh_processes();
953                    for child_pid in PROCS.all_children(pid) {
954                        debug!("killing child pid: {child_pid}");
955                        if let Err(e) = PROCS.kill_async(child_pid).await {
956                            warn!("failed to kill child pid {child_pid}: {e}");
957                        }
958                    }
959                    // Monitoring task will clear PID and set to Stopped when it detects exit
960                } else {
961                    debug!("pid {pid} not running");
962                    // Process already dead, directly mark as stopped
963                    self.upsert_daemon(UpsertDaemonOpts {
964                        id: id.to_string(),
965                        pid: None,
966                        status: DaemonStatus::Stopped,
967                        ..Default::default()
968                    })
969                    .await?;
970                }
971                return Ok(IpcResponse::Ok);
972            } else {
973                debug!("daemon {id} not running");
974            }
975        } else {
976            debug!("daemon {id} not found");
977        }
978        Ok(IpcResponse::DaemonAlreadyStopped)
979    }
980
981    #[cfg(unix)]
982    fn signals(&self) -> Result<()> {
983        let signals = [
984            SignalKind::terminate(),
985            SignalKind::alarm(),
986            SignalKind::interrupt(),
987            SignalKind::quit(),
988            SignalKind::hangup(),
989            SignalKind::user_defined1(),
990            SignalKind::user_defined2(),
991        ];
992        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
993        for signal in signals {
994            let stream = match signal::unix::signal(signal) {
995                Ok(s) => s,
996                Err(e) => {
997                    warn!("Failed to register signal handler for {:?}: {}", signal, e);
998                    continue;
999                }
1000            };
1001            tokio::spawn(async move {
1002                let mut stream = stream;
1003                loop {
1004                    stream.recv().await;
1005                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1006                        exit(1);
1007                    } else {
1008                        SUPERVISOR.handle_signal().await;
1009                    }
1010                }
1011            });
1012        }
1013        Ok(())
1014    }
1015
1016    #[cfg(windows)]
1017    fn signals(&self) -> Result<()> {
1018        tokio::spawn(async move {
1019            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1020            loop {
1021                if let Err(e) = signal::ctrl_c().await {
1022                    error!("Failed to wait for ctrl-c: {}", e);
1023                    return;
1024                }
1025                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1026                    exit(1);
1027                } else {
1028                    SUPERVISOR.handle_signal().await;
1029                }
1030            }
1031        });
1032        Ok(())
1033    }
1034
1035    async fn handle_signal(&self) {
1036        info!("received signal, stopping");
1037        self.close().await;
1038        exit(0)
1039    }
1040
1041    // async fn file_watch(&self) -> Result<()> {
1042    //     let state_file = self.state_file.lock().await.path.clone();
1043    //     task::spawn(async move {
1044    //         let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap();
1045    //
1046    //         wf.watch(&state_file, RecursiveMode::NonRecursive).unwrap();
1047    //
1048    //         while let Some(paths) = wf.rx.recv().await {
1049    //             if let Err(err) = SUPERVISOR.handle_file_change(paths).await {
1050    //                 error!("failed to handle file change: {err}");
1051    //             }
1052    //         }
1053    //     });
1054    //
1055    //     Ok(())
1056    // }
1057
1058    // async fn handle_file_change(&self, paths: Vec<PathBuf>) -> Result<()> {
1059    //     debug!("file change: {:?}", paths);
1060    //     // let path = self.state_file.lock().await.path.clone();
1061    //     // if paths.contains(&path) {
1062    //     //     *self.state_file.lock().await = StateFile::read(&path)?;
1063    //     // }
1064    //     self.refresh().await
1065    // }
1066
1067    fn interval_watch(&self) -> Result<()> {
1068        tokio::spawn(async move {
1069            let mut interval = time::interval(interval_duration());
1070            loop {
1071                interval.tick().await;
1072                if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
1073                    && let Err(err) = SUPERVISOR.refresh().await
1074                {
1075                    error!("failed to refresh: {err}");
1076                }
1077            }
1078        });
1079        Ok(())
1080    }
1081
1082    fn cron_watch(&self) -> Result<()> {
1083        tokio::spawn(async move {
1084            // Check every minute for cron schedules
1085            // FIXME: need a better logic, what if the schedule gap is very short (30s, 1min)?
1086            let mut interval = time::interval(Duration::from_secs(60));
1087            loop {
1088                interval.tick().await;
1089                if let Err(err) = SUPERVISOR.check_cron_schedules().await {
1090                    error!("failed to check cron schedules: {err}");
1091                }
1092            }
1093        });
1094        Ok(())
1095    }
1096
1097    async fn check_cron_schedules(&self) -> Result<()> {
1098        use cron::Schedule;
1099        use std::str::FromStr;
1100
1101        let now = chrono::Local::now();
1102
1103        // Collect only IDs of daemons with cron schedules (avoids cloning entire HashMap)
1104        let cron_daemon_ids: Vec<String> = {
1105            let state_file = self.state_file.lock().await;
1106            state_file
1107                .daemons
1108                .iter()
1109                .filter(|(_id, d)| d.cron_schedule.is_some() && d.cron_retrigger.is_some())
1110                .map(|(id, _d)| id.clone())
1111                .collect()
1112        };
1113
1114        for id in cron_daemon_ids {
1115            // Look up daemon when needed
1116            let daemon = {
1117                let state_file = self.state_file.lock().await;
1118                match state_file.daemons.get(&id) {
1119                    Some(d) => d.clone(),
1120                    None => continue,
1121                }
1122            };
1123
1124            if let Some(schedule_str) = &daemon.cron_schedule
1125                && let Some(retrigger) = daemon.cron_retrigger
1126            {
1127                // Parse the cron schedule
1128                let schedule = match Schedule::from_str(schedule_str) {
1129                    Ok(s) => s,
1130                    Err(e) => {
1131                        warn!("invalid cron schedule for daemon {id}: {e}");
1132                        continue;
1133                    }
1134                };
1135
1136                // Check if we should trigger now
1137                let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
1138                    // If the next execution is within the next minute, trigger it
1139                    let diff = next.signed_duration_since(now);
1140                    diff.num_seconds() < 60 && diff.num_seconds() >= 0
1141                });
1142
1143                if should_trigger {
1144                    let should_run = match retrigger {
1145                        crate::pitchfork_toml::CronRetrigger::Finish => {
1146                            // Run if not currently running
1147                            daemon.pid.is_none()
1148                        }
1149                        crate::pitchfork_toml::CronRetrigger::Always => {
1150                            // Always run (force restart handled in run method)
1151                            true
1152                        }
1153                        crate::pitchfork_toml::CronRetrigger::Success => {
1154                            // Run only if previous command succeeded
1155                            daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1156                        }
1157                        crate::pitchfork_toml::CronRetrigger::Fail => {
1158                            // Run only if previous command failed
1159                            daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1160                        }
1161                    };
1162
1163                    if should_run {
1164                        info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1165                        // Get the run command from pitchfork.toml
1166                        if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1167                            let cmd = match shell_words::split(&run_cmd) {
1168                                Ok(cmd) => cmd,
1169                                Err(e) => {
1170                                    error!("failed to parse command for cron daemon {}: {}", id, e);
1171                                    continue;
1172                                }
1173                            };
1174                            let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1175                            // Use force: true for Always retrigger to ensure restart
1176                            let force =
1177                                matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1178                            let opts = RunOptions {
1179                                id: id.clone(),
1180                                cmd,
1181                                force,
1182                                shell_pid: None,
1183                                dir,
1184                                autostop: daemon.autostop,
1185                                cron_schedule: Some(schedule_str.clone()),
1186                                cron_retrigger: Some(retrigger),
1187                                retry: daemon.retry,
1188                                retry_count: daemon.retry_count,
1189                                ready_delay: daemon.ready_delay,
1190                                ready_output: daemon.ready_output.clone(),
1191                                ready_http: daemon.ready_http.clone(),
1192                                ready_port: daemon.ready_port,
1193                                wait_ready: false,
1194                                depends: daemon.depends.clone(),
1195                            };
1196                            if let Err(e) = self.run(opts).await {
1197                                error!("failed to run cron daemon {id}: {e}");
1198                            }
1199                        } else {
1200                            warn!("no run command found for cron daemon {id}");
1201                        }
1202                    }
1203                }
1204            }
1205        }
1206
1207        Ok(())
1208    }
1209
1210    fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1211        use crate::pitchfork_toml::PitchforkToml;
1212        let pt = PitchforkToml::all_merged();
1213        pt.daemons.get(id).map(|d| d.run.clone())
1214    }
1215
1216    async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1217        loop {
1218            let (msg, send) = match ipc.read().await {
1219                Ok(msg) => msg,
1220                Err(e) => {
1221                    error!("failed to accept connection: {:?}", e);
1222                    continue;
1223                }
1224            };
1225            debug!("received message: {:?}", msg);
1226            tokio::spawn(async move {
1227                let rsp = SUPERVISOR
1228                    .handle_ipc(msg)
1229                    .await
1230                    .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1231                if let Err(err) = send.send(rsp).await {
1232                    debug!("failed to send message: {:?}", err);
1233                }
1234            });
1235        }
1236    }
1237
1238    async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1239        let rsp = match req {
1240            IpcRequest::Connect => {
1241                debug!("received connect message");
1242                IpcResponse::Ok
1243            }
1244            IpcRequest::Stop { id } => {
1245                if let Err(e) = validate_daemon_id(&id) {
1246                    return Ok(IpcResponse::Error(e));
1247                }
1248                self.stop(&id).await?
1249            }
1250            IpcRequest::Run(opts) => {
1251                if let Err(e) = validate_daemon_id(&opts.id) {
1252                    return Ok(IpcResponse::Error(e));
1253                }
1254                self.run(opts).await?
1255            }
1256            IpcRequest::Enable { id } => {
1257                if let Err(e) = validate_daemon_id(&id) {
1258                    return Ok(IpcResponse::Error(e));
1259                }
1260                if self.enable(id).await? {
1261                    IpcResponse::Yes
1262                } else {
1263                    IpcResponse::No
1264                }
1265            }
1266            IpcRequest::Disable { id } => {
1267                if let Err(e) = validate_daemon_id(&id) {
1268                    return Ok(IpcResponse::Error(e));
1269                }
1270                if self.disable(id).await? {
1271                    IpcResponse::Yes
1272                } else {
1273                    IpcResponse::No
1274                }
1275            }
1276            IpcRequest::GetActiveDaemons => {
1277                let daemons = self.active_daemons().await;
1278                IpcResponse::ActiveDaemons(daemons)
1279            }
1280            IpcRequest::GetNotifications => {
1281                let notifications = self.get_notifications().await;
1282                IpcResponse::Notifications(notifications)
1283            }
1284            IpcRequest::UpdateShellDir { shell_pid, dir } => {
1285                let prev = self.get_shell_dir(shell_pid).await;
1286                self.set_shell_dir(shell_pid, dir.clone()).await?;
1287                // Cancel any pending autostops for daemons in the new directory
1288                self.cancel_pending_autostops_for_dir(&dir).await;
1289                if let Some(prev) = prev {
1290                    self.leave_dir(&prev).await?;
1291                }
1292                self.refresh().await?;
1293                IpcResponse::Ok
1294            }
1295            IpcRequest::Clean => {
1296                self.clean().await?;
1297                IpcResponse::Ok
1298            }
1299            IpcRequest::GetDisabledDaemons => {
1300                let disabled = self.state_file.lock().await.disabled.clone();
1301                IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1302            }
1303        };
1304        Ok(rsp)
1305    }
1306
1307    async fn close(&self) {
1308        for daemon in self.active_daemons().await {
1309            if daemon.id == "pitchfork" {
1310                continue;
1311            }
1312            if let Err(err) = self.stop(&daemon.id).await {
1313                error!("failed to stop daemon {daemon}: {err}");
1314            }
1315        }
1316        let _ = self.remove_daemon("pitchfork").await;
1317        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1318        // TODO: cleanly stop ipc server
1319    }
1320
1321    async fn add_notification(&self, level: log::LevelFilter, message: String) {
1322        self.pending_notifications
1323            .lock()
1324            .await
1325            .push((level, message));
1326    }
1327
1328    async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1329        self.pending_notifications.lock().await.drain(..).collect()
1330    }
1331
1332    async fn active_daemons(&self) -> Vec<Daemon> {
1333        self.state_file
1334            .lock()
1335            .await
1336            .daemons
1337            .values()
1338            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1339            .cloned()
1340            .collect()
1341    }
1342
1343    async fn remove_daemon(&self, id: &str) -> Result<()> {
1344        self.state_file.lock().await.daemons.remove(id);
1345        if let Err(err) = self.state_file.lock().await.write() {
1346            warn!("failed to update state file: {err:#}");
1347        }
1348        Ok(())
1349    }
1350
1351    async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1352        info!(
1353            "upserting daemon: {} pid: {} status: {}",
1354            opts.id,
1355            opts.pid.unwrap_or(0),
1356            opts.status
1357        );
1358        let mut state_file = self.state_file.lock().await;
1359        let existing = state_file.daemons.get(&opts.id);
1360        let daemon = Daemon {
1361            id: opts.id.to_string(),
1362            title: opts.pid.and_then(|pid| PROCS.title(pid)),
1363            pid: opts.pid,
1364            status: opts.status,
1365            shell_pid: opts.shell_pid,
1366            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1367            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1368            cron_schedule: opts
1369                .cron_schedule
1370                .or(existing.and_then(|d| d.cron_schedule.clone())),
1371            cron_retrigger: opts
1372                .cron_retrigger
1373                .or(existing.and_then(|d| d.cron_retrigger)),
1374            last_exit_success: opts
1375                .last_exit_success
1376                .or(existing.and_then(|d| d.last_exit_success)),
1377            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1378            retry_count: opts
1379                .retry_count
1380                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1381            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1382            ready_output: opts
1383                .ready_output
1384                .or(existing.and_then(|d| d.ready_output.clone())),
1385            ready_http: opts
1386                .ready_http
1387                .or(existing.and_then(|d| d.ready_http.clone())),
1388            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1389            depends: opts
1390                .depends
1391                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
1392        };
1393        state_file
1394            .daemons
1395            .insert(opts.id.to_string(), daemon.clone());
1396        if let Err(err) = state_file.write() {
1397            warn!("failed to update state file: {err:#}");
1398        }
1399        Ok(daemon)
1400    }
1401
1402    pub async fn enable(&self, id: String) -> Result<bool> {
1403        info!("enabling daemon: {id}");
1404        let mut state_file = self.state_file.lock().await;
1405        let result = state_file.disabled.remove(&id);
1406        state_file.write()?;
1407        Ok(result)
1408    }
1409
1410    pub async fn disable(&self, id: String) -> Result<bool> {
1411        info!("disabling daemon: {id}");
1412        let mut state_file = self.state_file.lock().await;
1413        let result = state_file.disabled.insert(id);
1414        state_file.write()?;
1415        Ok(result)
1416    }
1417
1418    async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1419        self.state_file.lock().await.daemons.get(id).cloned()
1420    }
1421
1422    async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1423        let mut state_file = self.state_file.lock().await;
1424        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1425        state_file.write()?;
1426        Ok(())
1427    }
1428
1429    async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1430        self.state_file
1431            .lock()
1432            .await
1433            .shell_dirs
1434            .get(&shell_pid.to_string())
1435            .cloned()
1436    }
1437
1438    async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1439        let mut state_file = self.state_file.lock().await;
1440        if state_file
1441            .shell_dirs
1442            .remove(&shell_pid.to_string())
1443            .is_some()
1444        {
1445            state_file.write()?;
1446        }
1447        Ok(())
1448    }
1449
1450    async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1451        self.state_file.lock().await.shell_dirs.iter().fold(
1452            HashMap::new(),
1453            |mut acc, (pid, dir)| {
1454                if let Ok(pid) = pid.parse() {
1455                    acc.entry(dir.clone()).or_default().push(pid);
1456                }
1457                acc
1458            },
1459        )
1460    }
1461
1462    async fn clean(&self) -> Result<()> {
1463        let mut state_file = self.state_file.lock().await;
1464        state_file.daemons.retain(|_id, d| d.pid.is_some());
1465        state_file.write()?;
1466        Ok(())
1467    }
1468}
1469
1470#[derive(Debug)]
1471struct UpsertDaemonOpts {
1472    id: String,
1473    pid: Option<u32>,
1474    status: DaemonStatus,
1475    shell_pid: Option<u32>,
1476    dir: Option<PathBuf>,
1477    autostop: bool,
1478    cron_schedule: Option<String>,
1479    cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1480    last_exit_success: Option<bool>,
1481    retry: Option<u32>,
1482    retry_count: Option<u32>,
1483    ready_delay: Option<u64>,
1484    ready_output: Option<String>,
1485    ready_http: Option<String>,
1486    ready_port: Option<u16>,
1487    depends: Option<Vec<String>>,
1488}
1489
1490impl Default for UpsertDaemonOpts {
1491    fn default() -> Self {
1492        Self {
1493            id: "".to_string(),
1494            pid: None,
1495            status: DaemonStatus::Stopped,
1496            shell_pid: None,
1497            dir: None,
1498            autostop: false,
1499            cron_schedule: None,
1500            cron_retrigger: None,
1501            last_exit_success: None,
1502            retry: None,
1503            retry_count: None,
1504            ready_delay: None,
1505            ready_output: None,
1506            ready_http: None,
1507            ready_port: None,
1508            depends: None,
1509        }
1510    }
1511}