pitchfork_cli/
supervisor.rs

1use crate::daemon::{Daemon, RunOptions, validate_daemon_id};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::{IpcServer, IpcServerHandle};
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{Result, env};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use regex::Regex;
14use std::collections::HashMap;
15use std::fs;
16use std::iter::once;
17use std::path::{Path, PathBuf};
18use std::process::exit;
19use std::sync::atomic;
20use std::sync::atomic::AtomicBool;
21use std::time::Duration;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
23#[cfg(unix)]
24use tokio::signal::unix::SignalKind;
25use tokio::sync::Mutex;
26use tokio::sync::oneshot;
27use tokio::{select, signal, time};
28
29/// Cache for compiled regex patterns to avoid recompilation on daemon restarts
30static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
31    Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
32
33/// Get or compile a regex pattern, caching the result for future use
34fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
35    let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
36    if let Some(re) = cache.get(pattern) {
37        return Some(re.clone());
38    }
39    match Regex::new(pattern) {
40        Ok(re) => {
41            cache.insert(pattern.to_string(), re.clone());
42            Some(re)
43        }
44        Err(e) => {
45            error!("invalid regex pattern '{}': {}", pattern, e);
46            None
47        }
48    }
49}
50
51pub struct Supervisor {
52    state_file: Mutex<StateFile>,
53    pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
54    last_refreshed_at: Mutex<time::Instant>,
55    /// Map of daemon ID to scheduled autostop time
56    pending_autostops: Mutex<HashMap<String, time::Instant>>,
57    /// Handle for graceful IPC server shutdown
58    ipc_shutdown: Mutex<Option<IpcServerHandle>>,
59}
60
61fn interval_duration() -> Duration {
62    Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
63}
64
65pub static SUPERVISOR: Lazy<Supervisor> =
66    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
67
68pub fn start_if_not_running() -> Result<()> {
69    let sf = StateFile::get();
70    if let Some(d) = sf.daemons.get("pitchfork")
71        && let Some(pid) = d.pid
72        && PROCS.is_running(pid)
73    {
74        return Ok(());
75    }
76    start_in_background()
77}
78
79pub fn start_in_background() -> Result<()> {
80    debug!("starting supervisor in background");
81    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
82        .stdout_null()
83        .stderr_null()
84        .start()
85        .into_diagnostic()?;
86    Ok(())
87}
88
89impl Supervisor {
90    pub fn new() -> Result<Self> {
91        Ok(Self {
92            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
93            last_refreshed_at: Mutex::new(time::Instant::now()),
94            pending_notifications: Mutex::new(vec![]),
95            pending_autostops: Mutex::new(HashMap::new()),
96            ipc_shutdown: Mutex::new(None),
97        })
98    }
99
100    pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
101        let pid = std::process::id();
102        info!("Starting supervisor with pid {pid}");
103
104        self.upsert_daemon(UpsertDaemonOpts {
105            id: "pitchfork".to_string(),
106            pid: Some(pid),
107            status: DaemonStatus::Running,
108            ..Default::default()
109        })
110        .await?;
111
112        // If this is a boot start, automatically start boot_start daemons
113        if is_boot {
114            info!("Boot start mode enabled, starting boot_start daemons");
115            self.start_boot_daemons().await?;
116        }
117
118        self.interval_watch()?;
119        self.cron_watch()?;
120        self.signals()?;
121        // self.file_watch().await?;
122
123        // Start web server if port is configured
124        if let Some(port) = web_port {
125            tokio::spawn(async move {
126                if let Err(e) = crate::web::serve(port).await {
127                    error!("Web server error: {}", e);
128                }
129            });
130        }
131
132        let (ipc, ipc_handle) = IpcServer::new()?;
133        *self.ipc_shutdown.lock().await = Some(ipc_handle);
134        self.conn_watch(ipc).await
135    }
136
137    async fn refresh(&self) -> Result<()> {
138        trace!("refreshing");
139
140        // Collect PIDs we need to check (shell PIDs only)
141        // This is more efficient than refreshing all processes on the system
142        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
143        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
144
145        if pids_to_check.is_empty() {
146            // No PIDs to check, skip the expensive refresh
147            trace!("no shell PIDs to check, skipping process refresh");
148        } else {
149            PROCS.refresh_pids(&pids_to_check);
150        }
151
152        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
153        *last_refreshed_at = time::Instant::now();
154
155        for (dir, pids) in dirs_with_pids {
156            let to_remove = pids
157                .iter()
158                .filter(|pid| !PROCS.is_running(**pid))
159                .collect_vec();
160            for pid in &to_remove {
161                self.remove_shell_pid(**pid).await?
162            }
163            if to_remove.len() == pids.len() {
164                self.leave_dir(&dir).await?;
165            }
166        }
167
168        self.check_retry().await?;
169        self.process_pending_autostops().await?;
170
171        Ok(())
172    }
173
174    async fn check_retry(&self) -> Result<()> {
175        // Collect only IDs of daemons that need retrying (avoids cloning entire Daemon structs)
176        let ids_to_retry: Vec<String> = {
177            let state_file = self.state_file.lock().await;
178            state_file
179                .daemons
180                .iter()
181                .filter(|(_id, d)| {
182                    // Daemon is errored, not currently running, and has retries remaining
183                    d.status.is_errored()
184                        && d.pid.is_none()
185                        && d.retry > 0
186                        && d.retry_count < d.retry
187                })
188                .map(|(id, _d)| id.clone())
189                .collect()
190        };
191
192        for id in ids_to_retry {
193            // Look up daemon when needed and re-verify retry criteria
194            // (state may have changed since we collected IDs)
195            let daemon = {
196                let state_file = self.state_file.lock().await;
197                match state_file.daemons.get(&id) {
198                    Some(d)
199                        if d.status.is_errored()
200                            && d.pid.is_none()
201                            && d.retry > 0
202                            && d.retry_count < d.retry =>
203                    {
204                        d.clone()
205                    }
206                    _ => continue, // Daemon was removed or no longer needs retry
207                }
208            };
209            info!(
210                "retrying daemon {} ({}/{} attempts)",
211                id,
212                daemon.retry_count + 1,
213                daemon.retry
214            );
215
216            // Get command from pitchfork.toml
217            if let Some(run_cmd) = self.get_daemon_run_command(&id) {
218                let cmd = match shell_words::split(&run_cmd) {
219                    Ok(cmd) => cmd,
220                    Err(e) => {
221                        error!("failed to parse command for daemon {}: {}", id, e);
222                        // Mark as exhausted to prevent infinite retry loop, preserving error status
223                        self.upsert_daemon(UpsertDaemonOpts {
224                            id,
225                            status: daemon.status.clone(),
226                            retry_count: Some(daemon.retry),
227                            ..Default::default()
228                        })
229                        .await?;
230                        continue;
231                    }
232                };
233                let retry_opts = RunOptions {
234                    id: id.clone(),
235                    cmd,
236                    force: false,
237                    shell_pid: daemon.shell_pid,
238                    dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
239                    autostop: daemon.autostop,
240                    cron_schedule: daemon.cron_schedule,
241                    cron_retrigger: daemon.cron_retrigger,
242                    retry: daemon.retry,
243                    retry_count: daemon.retry_count + 1,
244                    ready_delay: daemon.ready_delay,
245                    ready_output: daemon.ready_output.clone(),
246                    ready_http: daemon.ready_http.clone(),
247                    ready_port: daemon.ready_port,
248                    wait_ready: false,
249                    depends: daemon.depends.clone(),
250                };
251                if let Err(e) = self.run(retry_opts).await {
252                    error!("failed to retry daemon {}: {}", id, e);
253                }
254            } else {
255                warn!("no run command found for daemon {}, cannot retry", id);
256                // Mark as exhausted
257                self.upsert_daemon(UpsertDaemonOpts {
258                    id,
259                    retry_count: Some(daemon.retry),
260                    ..Default::default()
261                })
262                .await?;
263            }
264        }
265
266        Ok(())
267    }
268
269    async fn leave_dir(&self, dir: &Path) -> Result<()> {
270        debug!("left dir {}", dir.display());
271        let shell_dirs = self.get_dirs_with_shell_pids().await;
272        let shell_dirs = shell_dirs.keys().collect_vec();
273        let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
274
275        for daemon in self.active_daemons().await {
276            if !daemon.autostop {
277                continue;
278            }
279            // if this daemon's dir starts with the left dir
280            // and no other shell pid has this dir as a prefix
281            // schedule the daemon for autostop
282            if let Some(daemon_dir) = daemon.dir.as_ref()
283                && daemon_dir.starts_with(dir)
284                && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
285            {
286                if delay_secs == 0 {
287                    // No delay configured, stop immediately
288                    info!("autostopping {daemon}");
289                    self.stop(&daemon.id).await?;
290                    self.add_notification(Info, format!("autostopped {daemon}"))
291                        .await;
292                } else {
293                    // Schedule autostop with delay
294                    let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
295                    let mut pending = self.pending_autostops.lock().await;
296                    if !pending.contains_key(&daemon.id) {
297                        info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
298                        pending.insert(daemon.id.clone(), stop_at);
299                    }
300                }
301            }
302        }
303        Ok(())
304    }
305
306    /// Cancel any pending autostop for daemons in the given directory
307    /// Also cancels autostops for daemons in parent directories (e.g., entering /project/subdir
308    /// cancels pending autostop for daemon in /project)
309    async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
310        let mut pending = self.pending_autostops.lock().await;
311        let daemons_to_cancel: Vec<String> = {
312            let state_file = self.state_file.lock().await;
313            state_file
314                .daemons
315                .iter()
316                .filter(|(_id, d)| {
317                    d.dir.as_ref().is_some_and(|daemon_dir| {
318                        // Cancel if entering a directory inside or equal to daemon's directory
319                        // OR if daemon is in a subdirectory of the entered directory
320                        dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
321                    })
322                })
323                .map(|(id, _)| id.clone())
324                .collect()
325        };
326
327        for daemon_id in daemons_to_cancel {
328            if pending.remove(&daemon_id).is_some() {
329                info!("cancelled pending autostop for {}", daemon_id);
330            }
331        }
332    }
333
334    /// Process any pending autostops that have reached their scheduled time
335    async fn process_pending_autostops(&self) -> Result<()> {
336        let now = time::Instant::now();
337        let to_stop: Vec<String> = {
338            let pending = self.pending_autostops.lock().await;
339            pending
340                .iter()
341                .filter(|(_, stop_at)| now >= **stop_at)
342                .map(|(id, _)| id.clone())
343                .collect()
344        };
345
346        for daemon_id in to_stop {
347            // Remove from pending first
348            {
349                let mut pending = self.pending_autostops.lock().await;
350                pending.remove(&daemon_id);
351            }
352
353            // Check if daemon is still running and should be stopped
354            if let Some(daemon) = self.get_daemon(&daemon_id).await
355                && daemon.autostop
356                && daemon.status.is_running()
357            {
358                // Verify no shell is in the daemon's directory
359                let shell_dirs = self.get_dirs_with_shell_pids().await;
360                let shell_dirs = shell_dirs.keys().collect_vec();
361                if let Some(daemon_dir) = daemon.dir.as_ref()
362                    && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
363                {
364                    info!("autostopping {} (after delay)", daemon_id);
365                    self.stop(&daemon_id).await?;
366                    self.add_notification(Info, format!("autostopped {daemon_id}"))
367                        .await;
368                }
369            }
370        }
371        Ok(())
372    }
373
374    async fn start_boot_daemons(&self) -> Result<()> {
375        use crate::pitchfork_toml::PitchforkToml;
376
377        info!("Scanning for boot_start daemons");
378        let pt = PitchforkToml::all_merged();
379
380        let boot_daemons: Vec<_> = pt
381            .daemons
382            .iter()
383            .filter(|(_id, d)| d.boot_start.unwrap_or(false))
384            .collect();
385
386        if boot_daemons.is_empty() {
387            info!("No daemons configured with boot_start = true");
388            return Ok(());
389        }
390
391        info!("Found {} daemon(s) to start at boot", boot_daemons.len());
392
393        for (id, daemon) in boot_daemons {
394            info!("Starting boot daemon: {}", id);
395
396            let dir = daemon
397                .path
398                .as_ref()
399                .and_then(|p| p.parent())
400                .map(|p| p.to_path_buf())
401                .unwrap_or_else(|| env::CWD.clone());
402
403            let cmd = match shell_words::split(&daemon.run) {
404                Ok(cmd) => cmd,
405                Err(e) => {
406                    error!("failed to parse command for boot daemon {}: {}", id, e);
407                    continue;
408                }
409            };
410            let run_opts = RunOptions {
411                id: id.clone(),
412                cmd,
413                force: false,
414                shell_pid: None,
415                dir,
416                autostop: false, // Boot daemons should not autostop
417                cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
418                cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
419                retry: daemon.retry,
420                retry_count: 0,
421                ready_delay: daemon.ready_delay,
422                ready_output: daemon.ready_output.clone(),
423                ready_http: daemon.ready_http.clone(),
424                ready_port: daemon.ready_port,
425                wait_ready: false, // Don't block on boot daemons
426                depends: daemon.depends.clone(),
427            };
428
429            match self.run(run_opts).await {
430                Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
431                    info!("Successfully started boot daemon: {}", id);
432                }
433                Ok(IpcResponse::DaemonAlreadyRunning) => {
434                    info!("Boot daemon already running: {}", id);
435                }
436                Ok(other) => {
437                    warn!(
438                        "Unexpected response when starting boot daemon {}: {:?}",
439                        id, other
440                    );
441                }
442                Err(e) => {
443                    error!("Failed to start boot daemon {}: {}", id, e);
444                }
445            }
446        }
447
448        Ok(())
449    }
450
451    pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
452        let id = &opts.id;
453        let cmd = opts.cmd.clone();
454
455        // Clear any pending autostop for this daemon since it's being started
456        {
457            let mut pending = self.pending_autostops.lock().await;
458            if pending.remove(id).is_some() {
459                info!("cleared pending autostop for {} (daemon starting)", id);
460            }
461        }
462
463        let daemon = self.get_daemon(id).await;
464        if let Some(daemon) = daemon {
465            // Stopping state is treated as "not running" - the monitoring task will clean it up
466            // Only check for Running state with a valid PID
467            if !daemon.status.is_stopping()
468                && !daemon.status.is_stopped()
469                && let Some(pid) = daemon.pid
470            {
471                if opts.force {
472                    self.stop(id).await?;
473                    info!("run: stop completed for daemon {id}");
474                } else {
475                    warn!("daemon {id} already running with pid {pid}");
476                    return Ok(IpcResponse::DaemonAlreadyRunning);
477                }
478            }
479        }
480
481        // If wait_ready is true and retry is configured, implement retry loop
482        if opts.wait_ready && opts.retry > 0 {
483            let max_attempts = opts.retry + 1; // initial attempt + retries
484            for attempt in 0..max_attempts {
485                let mut retry_opts = opts.clone();
486                retry_opts.retry_count = attempt;
487                retry_opts.cmd = cmd.clone();
488
489                let result = self.run_once(retry_opts).await?;
490
491                match result {
492                    IpcResponse::DaemonReady { daemon } => {
493                        return Ok(IpcResponse::DaemonReady { daemon });
494                    }
495                    IpcResponse::DaemonFailedWithCode { exit_code } => {
496                        if attempt < opts.retry {
497                            let backoff_secs = 2u64.pow(attempt);
498                            info!(
499                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
500                                attempt + 1,
501                                max_attempts,
502                                backoff_secs
503                            );
504                            time::sleep(Duration::from_secs(backoff_secs)).await;
505                            continue;
506                        } else {
507                            info!("daemon {id} failed after {} attempts", max_attempts);
508                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
509                        }
510                    }
511                    other => return Ok(other),
512                }
513            }
514        }
515
516        // No retry or wait_ready is false
517        self.run_once(opts).await
518    }
519
520    async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
521        let id = &opts.id;
522        let cmd = opts.cmd;
523
524        // Create channel for readiness notification if wait_ready is true
525        let (ready_tx, ready_rx) = if opts.wait_ready {
526            let (tx, rx) = oneshot::channel();
527            (Some(tx), Some(rx))
528        } else {
529            (None, None)
530        };
531
532        let cmd = once("exec".to_string())
533            .chain(cmd.into_iter())
534            .collect_vec();
535        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
536        let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
537        if let Some(parent) = log_path.parent() {
538            xx::file::mkdirp(parent)?;
539        }
540        info!("run: spawning daemon {id} with args: {args:?}");
541        let mut cmd = tokio::process::Command::new("sh");
542        cmd.args(&args)
543            .stdin(std::process::Stdio::null())
544            .stdout(std::process::Stdio::piped())
545            .stderr(std::process::Stdio::piped())
546            .current_dir(&opts.dir);
547
548        // Ensure daemon can find user tools by using the original PATH
549        if let Some(ref path) = *env::ORIGINAL_PATH {
550            cmd.env("PATH", path);
551        }
552
553        let mut child = cmd.spawn().into_diagnostic()?;
554        let pid = match child.id() {
555            Some(p) => p,
556            None => {
557                warn!("Daemon {id} exited before PID could be captured");
558                return Ok(IpcResponse::DaemonFailed {
559                    error: "Process exited immediately".to_string(),
560                });
561            }
562        };
563        info!("started daemon {id} with pid {pid}");
564        let daemon = self
565            .upsert_daemon(UpsertDaemonOpts {
566                id: id.to_string(),
567                pid: Some(pid),
568                status: DaemonStatus::Running,
569                shell_pid: opts.shell_pid,
570                dir: Some(opts.dir.clone()),
571                autostop: opts.autostop,
572                cron_schedule: opts.cron_schedule.clone(),
573                cron_retrigger: opts.cron_retrigger,
574                last_exit_success: None,
575                retry: Some(opts.retry),
576                retry_count: Some(opts.retry_count),
577                ready_delay: opts.ready_delay,
578                ready_output: opts.ready_output.clone(),
579                ready_http: opts.ready_http.clone(),
580                ready_port: opts.ready_port,
581                depends: Some(opts.depends.clone()),
582            })
583            .await?;
584
585        let id_clone = id.to_string();
586        let ready_delay = opts.ready_delay;
587        let ready_output = opts.ready_output.clone();
588        let ready_http = opts.ready_http.clone();
589        let ready_port = opts.ready_port;
590
591        tokio::spawn(async move {
592            let id = id_clone;
593            let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
594                (Some(out), Some(err)) => (out, err),
595                _ => {
596                    error!("Failed to capture stdout/stderr for daemon {id}");
597                    return;
598                }
599            };
600            let mut stdout = tokio::io::BufReader::new(stdout).lines();
601            let mut stderr = tokio::io::BufReader::new(stderr).lines();
602            let log_file = match tokio::fs::File::options()
603                .append(true)
604                .create(true)
605                .open(&log_path)
606                .await
607            {
608                Ok(f) => f,
609                Err(e) => {
610                    error!("Failed to open log file for daemon {id}: {e}");
611                    return;
612                }
613            };
614            let mut log_appender = BufWriter::new(log_file);
615
616            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
617            let format_line = |line: String| {
618                if line.starts_with(&format!("{id} ")) {
619                    // mise tasks often already have the id printed
620                    format!("{} {line}\n", now())
621                } else {
622                    format!("{} {id} {line}\n", now())
623                }
624            };
625
626            // Setup readiness checking
627            let mut ready_notified = false;
628            let mut ready_tx = ready_tx;
629            let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
630
631            let mut delay_timer =
632                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
633
634            // Setup HTTP readiness check interval (poll every 500ms)
635            let mut http_check_interval = ready_http
636                .as_ref()
637                .map(|_| tokio::time::interval(Duration::from_millis(500)));
638            let http_client = ready_http.as_ref().map(|_| {
639                reqwest::Client::builder()
640                    .timeout(Duration::from_secs(5))
641                    .build()
642                    .unwrap_or_default()
643            });
644
645            // Setup TCP port readiness check interval (poll every 500ms)
646            let mut port_check_interval =
647                ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
648
649            // Setup periodic log flush interval (every 500ms - balances I/O reduction with responsiveness)
650            let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
651
652            // Use a channel to communicate process exit status
653            let (exit_tx, mut exit_rx) =
654                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
655
656            // Spawn a task to wait for process exit
657            let child_pid = child.id().unwrap_or(0);
658            tokio::spawn(async move {
659                let result = child.wait().await;
660                debug!(
661                    "daemon pid {child_pid} wait() completed with result: {:?}",
662                    result
663                );
664                let _ = exit_tx.send(result).await;
665            });
666
667            #[allow(unused_assignments)]
668            // Initial None is a safety net; loop only exits via exit_rx.recv() which sets it
669            let mut exit_status = None;
670
671            loop {
672                select! {
673                    Ok(Some(line)) = stdout.next_line() => {
674                        let formatted = format_line(line.clone());
675                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
676                            error!("Failed to write to log for daemon {id}: {e}");
677                        }
678                        trace!("stdout: {id} {formatted}");
679
680                        // Check if output matches ready pattern
681                        if !ready_notified
682                            && let Some(ref pattern) = ready_pattern
683                                && pattern.is_match(&line) {
684                                    info!("daemon {id} ready: output matched pattern");
685                                    ready_notified = true;
686                                    // Flush logs before notifying so clients see logs immediately
687                                    let _ = log_appender.flush().await;
688                                    if let Some(tx) = ready_tx.take() {
689                                        let _ = tx.send(Ok(()));
690                                    }
691                                }
692                    }
693                    Ok(Some(line)) = stderr.next_line() => {
694                        let formatted = format_line(line.clone());
695                        if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
696                            error!("Failed to write to log for daemon {id}: {e}");
697                        }
698                        trace!("stderr: {id} {formatted}");
699
700                        // Check if output matches ready pattern (also check stderr)
701                        if !ready_notified
702                            && let Some(ref pattern) = ready_pattern
703                                && pattern.is_match(&line) {
704                                    info!("daemon {id} ready: output matched pattern");
705                                    ready_notified = true;
706                                    // Flush logs before notifying so clients see logs immediately
707                                    let _ = log_appender.flush().await;
708                                    if let Some(tx) = ready_tx.take() {
709                                        let _ = tx.send(Ok(()));
710                                    }
711                                }
712                    },
713                    Some(result) = exit_rx.recv() => {
714                        // Process exited - save exit status and notify if not ready yet
715                        exit_status = Some(result);
716                        debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
717                        // Flush logs before notifying so clients see logs immediately
718                        let _ = log_appender.flush().await;
719                        if !ready_notified {
720                            if let Some(tx) = ready_tx.take() {
721                                // Check if process exited successfully
722                                let is_success = exit_status.as_ref()
723                                    .and_then(|r| r.as_ref().ok())
724                                    .map(|s| s.success())
725                                    .unwrap_or(false);
726
727                                if is_success {
728                                    debug!("daemon {id} exited successfully before ready check, sending success notification");
729                                    let _ = tx.send(Ok(()));
730                                } else {
731                                    let exit_code = exit_status.as_ref()
732                                        .and_then(|r| r.as_ref().ok())
733                                        .and_then(|s| s.code());
734                                    debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
735                                    let _ = tx.send(Err(exit_code));
736                                }
737                            }
738                        } else {
739                            debug!("daemon {id} was already marked ready, not sending notification");
740                        }
741                        break;
742                    }
743                    _ = async {
744                        if let Some(ref mut interval) = http_check_interval {
745                            interval.tick().await;
746                        } else {
747                            std::future::pending::<()>().await;
748                        }
749                    }, if !ready_notified && ready_http.is_some() => {
750                        if let (Some(url), Some(client)) = (&ready_http, &http_client) {
751                            match client.get(url).send().await {
752                                Ok(response) if response.status().is_success() => {
753                                    info!("daemon {id} ready: HTTP check passed (status {})", response.status());
754                                    ready_notified = true;
755                                    // Flush logs before notifying so clients see logs immediately
756                                    let _ = log_appender.flush().await;
757                                    if let Some(tx) = ready_tx.take() {
758                                        let _ = tx.send(Ok(()));
759                                    }
760                                    // Stop checking once ready
761                                    http_check_interval = None;
762                                }
763                                Ok(response) => {
764                                    trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
765                                }
766                                Err(e) => {
767                                    trace!("daemon {id} HTTP check failed: {e}");
768                                }
769                            }
770                        }
771                    }
772                    _ = async {
773                        if let Some(ref mut interval) = port_check_interval {
774                            interval.tick().await;
775                        } else {
776                            std::future::pending::<()>().await;
777                        }
778                    }, if !ready_notified && ready_port.is_some() => {
779                        if let Some(port) = ready_port {
780                            match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
781                                Ok(_) => {
782                                    info!("daemon {id} ready: TCP port {port} is listening");
783                                    ready_notified = true;
784                                    // Flush logs before notifying so clients see logs immediately
785                                    let _ = log_appender.flush().await;
786                                    if let Some(tx) = ready_tx.take() {
787                                        let _ = tx.send(Ok(()));
788                                    }
789                                    // Stop checking once ready
790                                    port_check_interval = None;
791                                }
792                                Err(_) => {
793                                    trace!("daemon {id} port check: port {port} not listening yet");
794                                }
795                            }
796                        }
797                    }
798                    _ = async {
799                        if let Some(ref mut timer) = delay_timer {
800                            timer.await;
801                        } else {
802                            std::future::pending::<()>().await;
803                        }
804                    } => {
805                        if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
806                            info!("daemon {id} ready: delay elapsed");
807                            ready_notified = true;
808                            // Flush logs before notifying so clients see logs immediately
809                            let _ = log_appender.flush().await;
810                            if let Some(tx) = ready_tx.take() {
811                                let _ = tx.send(Ok(()));
812                            }
813                        }
814                        // Disable timer after it fires
815                        delay_timer = None;
816                    }
817                    _ = log_flush_interval.tick() => {
818                        // Periodic flush to ensure logs are written to disk
819                        if let Err(e) = log_appender.flush().await {
820                            error!("Failed to flush log for daemon {id}: {e}");
821                        }
822                    }
823                    // Note: No `else => break` because log_flush_interval.tick() is always available,
824                    // making the else branch unreachable. The loop exits via the exit_rx.recv() branch.
825                }
826            }
827
828            // Final flush to ensure all buffered logs are written
829            if let Err(e) = log_appender.flush().await {
830                error!("Failed to final flush log for daemon {id}: {e}");
831            }
832
833            // Get the final exit status
834            let exit_status = if let Some(status) = exit_status {
835                status
836            } else {
837                // Streams closed but process hasn't exited yet, wait for it
838                match exit_rx.recv().await {
839                    Some(status) => status,
840                    None => {
841                        warn!("daemon {id} exit channel closed without receiving status");
842                        Err(std::io::Error::other("exit channel closed"))
843                    }
844                }
845            };
846            let current_daemon = SUPERVISOR.get_daemon(&id).await;
847
848            // Check if this monitoring task is for the current daemon process
849            if current_daemon.is_none()
850                || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
851            {
852                // Another process has taken over, don't update status
853                return;
854            }
855            let is_stopping = current_daemon
856                .as_ref()
857                .is_some_and(|d| d.status.is_stopping());
858
859            if current_daemon.is_some_and(|d| d.status.is_stopped()) {
860                // was stopped by this supervisor so don't update status
861                return;
862            }
863            if let Ok(status) = exit_status {
864                info!("daemon {id} exited with status {status}");
865                if status.success() || is_stopping {
866                    // If stopping, always mark as Stopped with success
867                    // This allows monitoring task to clear PID after stop() was called
868                    if let Err(e) = SUPERVISOR
869                        .upsert_daemon(UpsertDaemonOpts {
870                            id: id.clone(),
871                            pid: None, // Clear PID now that process has exited
872                            status: DaemonStatus::Stopped,
873                            last_exit_success: Some(status.success()),
874                            ..Default::default()
875                        })
876                        .await
877                    {
878                        error!("Failed to update daemon state for {id}: {e}");
879                    }
880                } else {
881                    // Handle error exit - mark for retry
882                    // retry_count increment will be handled by interval_watch
883                    if let Err(e) = SUPERVISOR
884                        .upsert_daemon(UpsertDaemonOpts {
885                            id: id.clone(),
886                            pid: None,
887                            status: DaemonStatus::Errored(status.code()),
888                            last_exit_success: Some(false),
889                            ..Default::default()
890                        })
891                        .await
892                    {
893                        error!("Failed to update daemon state for {id}: {e}");
894                    }
895                }
896            } else if let Err(e) = SUPERVISOR
897                .upsert_daemon(UpsertDaemonOpts {
898                    id: id.clone(),
899                    pid: None,
900                    status: DaemonStatus::Errored(None),
901                    last_exit_success: Some(false),
902                    ..Default::default()
903                })
904                .await
905            {
906                error!("Failed to update daemon state for {id}: {e}");
907            }
908        });
909
910        // If wait_ready is true, wait for readiness notification
911        if let Some(ready_rx) = ready_rx {
912            match ready_rx.await {
913                Ok(Ok(())) => {
914                    info!("daemon {id} is ready");
915                    Ok(IpcResponse::DaemonReady { daemon })
916                }
917                Ok(Err(exit_code)) => {
918                    error!("daemon {id} failed before becoming ready");
919                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
920                }
921                Err(_) => {
922                    error!("readiness channel closed unexpectedly for daemon {id}");
923                    Ok(IpcResponse::DaemonStart { daemon })
924                }
925            }
926        } else {
927            Ok(IpcResponse::DaemonStart { daemon })
928        }
929    }
930
931    pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
932        if id == "pitchfork" {
933            return Ok(IpcResponse::Error(
934                "Cannot stop supervisor via stop command".into(),
935            ));
936        }
937        info!("stopping daemon: {id}");
938        if let Some(daemon) = self.get_daemon(id).await {
939            trace!("daemon to stop: {daemon}");
940            if let Some(pid) = daemon.pid {
941                trace!("killing pid: {pid}");
942                PROCS.refresh_processes();
943                if PROCS.is_running(pid) {
944                    // First set status to Stopping (keeps PID for monitoring task)
945                    self.upsert_daemon(UpsertDaemonOpts {
946                        id: id.to_string(),
947                        status: DaemonStatus::Stopping,
948                        ..Default::default()
949                    })
950                    .await?;
951
952                    // Then kill the process
953                    if let Err(e) = PROCS.kill_async(pid).await {
954                        warn!("failed to kill pid {pid}: {e}");
955                    }
956                    PROCS.refresh_processes();
957                    for child_pid in PROCS.all_children(pid) {
958                        debug!("killing child pid: {child_pid}");
959                        if let Err(e) = PROCS.kill_async(child_pid).await {
960                            warn!("failed to kill child pid {child_pid}: {e}");
961                        }
962                    }
963                    // Monitoring task will clear PID and set to Stopped when it detects exit
964                } else {
965                    debug!("pid {pid} not running");
966                    // Process already dead, directly mark as stopped
967                    self.upsert_daemon(UpsertDaemonOpts {
968                        id: id.to_string(),
969                        pid: None,
970                        status: DaemonStatus::Stopped,
971                        ..Default::default()
972                    })
973                    .await?;
974                }
975                return Ok(IpcResponse::Ok);
976            } else {
977                debug!("daemon {id} not running");
978            }
979        } else {
980            debug!("daemon {id} not found");
981        }
982        Ok(IpcResponse::DaemonAlreadyStopped)
983    }
984
985    #[cfg(unix)]
986    fn signals(&self) -> Result<()> {
987        let signals = [
988            SignalKind::terminate(),
989            SignalKind::alarm(),
990            SignalKind::interrupt(),
991            SignalKind::quit(),
992            SignalKind::hangup(),
993            SignalKind::user_defined1(),
994            SignalKind::user_defined2(),
995        ];
996        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
997        for signal in signals {
998            let stream = match signal::unix::signal(signal) {
999                Ok(s) => s,
1000                Err(e) => {
1001                    warn!("Failed to register signal handler for {:?}: {}", signal, e);
1002                    continue;
1003                }
1004            };
1005            tokio::spawn(async move {
1006                let mut stream = stream;
1007                loop {
1008                    stream.recv().await;
1009                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1010                        exit(1);
1011                    } else {
1012                        SUPERVISOR.handle_signal().await;
1013                    }
1014                }
1015            });
1016        }
1017        Ok(())
1018    }
1019
1020    #[cfg(windows)]
1021    fn signals(&self) -> Result<()> {
1022        tokio::spawn(async move {
1023            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1024            loop {
1025                if let Err(e) = signal::ctrl_c().await {
1026                    error!("Failed to wait for ctrl-c: {}", e);
1027                    return;
1028                }
1029                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1030                    exit(1);
1031                } else {
1032                    SUPERVISOR.handle_signal().await;
1033                }
1034            }
1035        });
1036        Ok(())
1037    }
1038
1039    async fn handle_signal(&self) {
1040        info!("received signal, stopping");
1041        self.close().await;
1042        exit(0)
1043    }
1044
1045    // async fn file_watch(&self) -> Result<()> {
1046    //     let state_file = self.state_file.lock().await.path.clone();
1047    //     task::spawn(async move {
1048    //         let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap();
1049    //
1050    //         wf.watch(&state_file, RecursiveMode::NonRecursive).unwrap();
1051    //
1052    //         while let Some(paths) = wf.rx.recv().await {
1053    //             if let Err(err) = SUPERVISOR.handle_file_change(paths).await {
1054    //                 error!("failed to handle file change: {err}");
1055    //             }
1056    //         }
1057    //     });
1058    //
1059    //     Ok(())
1060    // }
1061
1062    // async fn handle_file_change(&self, paths: Vec<PathBuf>) -> Result<()> {
1063    //     debug!("file change: {:?}", paths);
1064    //     // let path = self.state_file.lock().await.path.clone();
1065    //     // if paths.contains(&path) {
1066    //     //     *self.state_file.lock().await = StateFile::read(&path)?;
1067    //     // }
1068    //     self.refresh().await
1069    // }
1070
1071    fn interval_watch(&self) -> Result<()> {
1072        tokio::spawn(async move {
1073            let mut interval = time::interval(interval_duration());
1074            loop {
1075                interval.tick().await;
1076                if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
1077                    && let Err(err) = SUPERVISOR.refresh().await
1078                {
1079                    error!("failed to refresh: {err}");
1080                }
1081            }
1082        });
1083        Ok(())
1084    }
1085
1086    fn cron_watch(&self) -> Result<()> {
1087        tokio::spawn(async move {
1088            // Check every minute for cron schedules
1089            // FIXME: need a better logic, what if the schedule gap is very short (30s, 1min)?
1090            let mut interval = time::interval(Duration::from_secs(60));
1091            loop {
1092                interval.tick().await;
1093                if let Err(err) = SUPERVISOR.check_cron_schedules().await {
1094                    error!("failed to check cron schedules: {err}");
1095                }
1096            }
1097        });
1098        Ok(())
1099    }
1100
1101    async fn check_cron_schedules(&self) -> Result<()> {
1102        use cron::Schedule;
1103        use std::str::FromStr;
1104
1105        let now = chrono::Local::now();
1106
1107        // Collect only IDs of daemons with cron schedules (avoids cloning entire HashMap)
1108        let cron_daemon_ids: Vec<String> = {
1109            let state_file = self.state_file.lock().await;
1110            state_file
1111                .daemons
1112                .iter()
1113                .filter(|(_id, d)| d.cron_schedule.is_some() && d.cron_retrigger.is_some())
1114                .map(|(id, _d)| id.clone())
1115                .collect()
1116        };
1117
1118        for id in cron_daemon_ids {
1119            // Look up daemon when needed
1120            let daemon = {
1121                let state_file = self.state_file.lock().await;
1122                match state_file.daemons.get(&id) {
1123                    Some(d) => d.clone(),
1124                    None => continue,
1125                }
1126            };
1127
1128            if let Some(schedule_str) = &daemon.cron_schedule
1129                && let Some(retrigger) = daemon.cron_retrigger
1130            {
1131                // Parse the cron schedule
1132                let schedule = match Schedule::from_str(schedule_str) {
1133                    Ok(s) => s,
1134                    Err(e) => {
1135                        warn!("invalid cron schedule for daemon {id}: {e}");
1136                        continue;
1137                    }
1138                };
1139
1140                // Check if we should trigger now
1141                let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
1142                    // If the next execution is within the next minute, trigger it
1143                    let diff = next.signed_duration_since(now);
1144                    diff.num_seconds() < 60 && diff.num_seconds() >= 0
1145                });
1146
1147                if should_trigger {
1148                    let should_run = match retrigger {
1149                        crate::pitchfork_toml::CronRetrigger::Finish => {
1150                            // Run if not currently running
1151                            daemon.pid.is_none()
1152                        }
1153                        crate::pitchfork_toml::CronRetrigger::Always => {
1154                            // Always run (force restart handled in run method)
1155                            true
1156                        }
1157                        crate::pitchfork_toml::CronRetrigger::Success => {
1158                            // Run only if previous command succeeded
1159                            daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1160                        }
1161                        crate::pitchfork_toml::CronRetrigger::Fail => {
1162                            // Run only if previous command failed
1163                            daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1164                        }
1165                    };
1166
1167                    if should_run {
1168                        info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1169                        // Get the run command from pitchfork.toml
1170                        if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1171                            let cmd = match shell_words::split(&run_cmd) {
1172                                Ok(cmd) => cmd,
1173                                Err(e) => {
1174                                    error!("failed to parse command for cron daemon {}: {}", id, e);
1175                                    continue;
1176                                }
1177                            };
1178                            let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1179                            // Use force: true for Always retrigger to ensure restart
1180                            let force =
1181                                matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1182                            let opts = RunOptions {
1183                                id: id.clone(),
1184                                cmd,
1185                                force,
1186                                shell_pid: None,
1187                                dir,
1188                                autostop: daemon.autostop,
1189                                cron_schedule: Some(schedule_str.clone()),
1190                                cron_retrigger: Some(retrigger),
1191                                retry: daemon.retry,
1192                                retry_count: daemon.retry_count,
1193                                ready_delay: daemon.ready_delay,
1194                                ready_output: daemon.ready_output.clone(),
1195                                ready_http: daemon.ready_http.clone(),
1196                                ready_port: daemon.ready_port,
1197                                wait_ready: false,
1198                                depends: daemon.depends.clone(),
1199                            };
1200                            if let Err(e) = self.run(opts).await {
1201                                error!("failed to run cron daemon {id}: {e}");
1202                            }
1203                        } else {
1204                            warn!("no run command found for cron daemon {id}");
1205                        }
1206                    }
1207                }
1208            }
1209        }
1210
1211        Ok(())
1212    }
1213
1214    fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1215        use crate::pitchfork_toml::PitchforkToml;
1216        let pt = PitchforkToml::all_merged();
1217        pt.daemons.get(id).map(|d| d.run.clone())
1218    }
1219
1220    async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1221        loop {
1222            let (msg, send) = match ipc.read().await {
1223                Ok(msg) => msg,
1224                Err(e) => {
1225                    error!("failed to accept connection: {:?}", e);
1226                    continue;
1227                }
1228            };
1229            debug!("received message: {:?}", msg);
1230            tokio::spawn(async move {
1231                let rsp = SUPERVISOR
1232                    .handle_ipc(msg)
1233                    .await
1234                    .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1235                if let Err(err) = send.send(rsp).await {
1236                    debug!("failed to send message: {:?}", err);
1237                }
1238            });
1239        }
1240    }
1241
1242    async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1243        let rsp = match req {
1244            IpcRequest::Connect => {
1245                debug!("received connect message");
1246                IpcResponse::Ok
1247            }
1248            IpcRequest::Stop { id } => {
1249                if let Err(e) = validate_daemon_id(&id) {
1250                    return Ok(IpcResponse::Error(e));
1251                }
1252                self.stop(&id).await?
1253            }
1254            IpcRequest::Run(opts) => {
1255                if let Err(e) = validate_daemon_id(&opts.id) {
1256                    return Ok(IpcResponse::Error(e));
1257                }
1258                self.run(opts).await?
1259            }
1260            IpcRequest::Enable { id } => {
1261                if let Err(e) = validate_daemon_id(&id) {
1262                    return Ok(IpcResponse::Error(e));
1263                }
1264                if self.enable(id).await? {
1265                    IpcResponse::Yes
1266                } else {
1267                    IpcResponse::No
1268                }
1269            }
1270            IpcRequest::Disable { id } => {
1271                if let Err(e) = validate_daemon_id(&id) {
1272                    return Ok(IpcResponse::Error(e));
1273                }
1274                if self.disable(id).await? {
1275                    IpcResponse::Yes
1276                } else {
1277                    IpcResponse::No
1278                }
1279            }
1280            IpcRequest::GetActiveDaemons => {
1281                let daemons = self.active_daemons().await;
1282                IpcResponse::ActiveDaemons(daemons)
1283            }
1284            IpcRequest::GetNotifications => {
1285                let notifications = self.get_notifications().await;
1286                IpcResponse::Notifications(notifications)
1287            }
1288            IpcRequest::UpdateShellDir { shell_pid, dir } => {
1289                let prev = self.get_shell_dir(shell_pid).await;
1290                self.set_shell_dir(shell_pid, dir.clone()).await?;
1291                // Cancel any pending autostops for daemons in the new directory
1292                self.cancel_pending_autostops_for_dir(&dir).await;
1293                if let Some(prev) = prev {
1294                    self.leave_dir(&prev).await?;
1295                }
1296                self.refresh().await?;
1297                IpcResponse::Ok
1298            }
1299            IpcRequest::Clean => {
1300                self.clean().await?;
1301                IpcResponse::Ok
1302            }
1303            IpcRequest::GetDisabledDaemons => {
1304                let disabled = self.state_file.lock().await.disabled.clone();
1305                IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1306            }
1307        };
1308        Ok(rsp)
1309    }
1310
1311    async fn close(&self) {
1312        for daemon in self.active_daemons().await {
1313            if daemon.id == "pitchfork" {
1314                continue;
1315            }
1316            if let Err(err) = self.stop(&daemon.id).await {
1317                error!("failed to stop daemon {daemon}: {err}");
1318            }
1319        }
1320        let _ = self.remove_daemon("pitchfork").await;
1321
1322        // Signal IPC server to shut down gracefully
1323        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
1324            handle.shutdown();
1325        }
1326
1327        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1328    }
1329
1330    async fn add_notification(&self, level: log::LevelFilter, message: String) {
1331        self.pending_notifications
1332            .lock()
1333            .await
1334            .push((level, message));
1335    }
1336
1337    async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1338        self.pending_notifications.lock().await.drain(..).collect()
1339    }
1340
1341    async fn active_daemons(&self) -> Vec<Daemon> {
1342        self.state_file
1343            .lock()
1344            .await
1345            .daemons
1346            .values()
1347            .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1348            .cloned()
1349            .collect()
1350    }
1351
1352    async fn remove_daemon(&self, id: &str) -> Result<()> {
1353        self.state_file.lock().await.daemons.remove(id);
1354        if let Err(err) = self.state_file.lock().await.write() {
1355            warn!("failed to update state file: {err:#}");
1356        }
1357        Ok(())
1358    }
1359
1360    async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1361        info!(
1362            "upserting daemon: {} pid: {} status: {}",
1363            opts.id,
1364            opts.pid.unwrap_or(0),
1365            opts.status
1366        );
1367        let mut state_file = self.state_file.lock().await;
1368        let existing = state_file.daemons.get(&opts.id);
1369        let daemon = Daemon {
1370            id: opts.id.to_string(),
1371            title: opts.pid.and_then(|pid| PROCS.title(pid)),
1372            pid: opts.pid,
1373            status: opts.status,
1374            shell_pid: opts.shell_pid,
1375            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1376            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1377            cron_schedule: opts
1378                .cron_schedule
1379                .or(existing.and_then(|d| d.cron_schedule.clone())),
1380            cron_retrigger: opts
1381                .cron_retrigger
1382                .or(existing.and_then(|d| d.cron_retrigger)),
1383            last_exit_success: opts
1384                .last_exit_success
1385                .or(existing.and_then(|d| d.last_exit_success)),
1386            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1387            retry_count: opts
1388                .retry_count
1389                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1390            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1391            ready_output: opts
1392                .ready_output
1393                .or(existing.and_then(|d| d.ready_output.clone())),
1394            ready_http: opts
1395                .ready_http
1396                .or(existing.and_then(|d| d.ready_http.clone())),
1397            ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1398            depends: opts
1399                .depends
1400                .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
1401        };
1402        state_file
1403            .daemons
1404            .insert(opts.id.to_string(), daemon.clone());
1405        if let Err(err) = state_file.write() {
1406            warn!("failed to update state file: {err:#}");
1407        }
1408        Ok(daemon)
1409    }
1410
1411    pub async fn enable(&self, id: String) -> Result<bool> {
1412        info!("enabling daemon: {id}");
1413        let mut state_file = self.state_file.lock().await;
1414        let result = state_file.disabled.remove(&id);
1415        state_file.write()?;
1416        Ok(result)
1417    }
1418
1419    pub async fn disable(&self, id: String) -> Result<bool> {
1420        info!("disabling daemon: {id}");
1421        let mut state_file = self.state_file.lock().await;
1422        let result = state_file.disabled.insert(id);
1423        state_file.write()?;
1424        Ok(result)
1425    }
1426
1427    async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1428        self.state_file.lock().await.daemons.get(id).cloned()
1429    }
1430
1431    async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1432        let mut state_file = self.state_file.lock().await;
1433        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1434        state_file.write()?;
1435        Ok(())
1436    }
1437
1438    async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1439        self.state_file
1440            .lock()
1441            .await
1442            .shell_dirs
1443            .get(&shell_pid.to_string())
1444            .cloned()
1445    }
1446
1447    async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1448        let mut state_file = self.state_file.lock().await;
1449        if state_file
1450            .shell_dirs
1451            .remove(&shell_pid.to_string())
1452            .is_some()
1453        {
1454            state_file.write()?;
1455        }
1456        Ok(())
1457    }
1458
1459    async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1460        self.state_file.lock().await.shell_dirs.iter().fold(
1461            HashMap::new(),
1462            |mut acc, (pid, dir)| {
1463                if let Ok(pid) = pid.parse() {
1464                    acc.entry(dir.clone()).or_default().push(pid);
1465                }
1466                acc
1467            },
1468        )
1469    }
1470
1471    async fn clean(&self) -> Result<()> {
1472        let mut state_file = self.state_file.lock().await;
1473        state_file.daemons.retain(|_id, d| d.pid.is_some());
1474        state_file.write()?;
1475        Ok(())
1476    }
1477}
1478
1479#[derive(Debug)]
1480struct UpsertDaemonOpts {
1481    id: String,
1482    pid: Option<u32>,
1483    status: DaemonStatus,
1484    shell_pid: Option<u32>,
1485    dir: Option<PathBuf>,
1486    autostop: bool,
1487    cron_schedule: Option<String>,
1488    cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1489    last_exit_success: Option<bool>,
1490    retry: Option<u32>,
1491    retry_count: Option<u32>,
1492    ready_delay: Option<u64>,
1493    ready_output: Option<String>,
1494    ready_http: Option<String>,
1495    ready_port: Option<u16>,
1496    depends: Option<Vec<String>>,
1497}
1498
1499impl Default for UpsertDaemonOpts {
1500    fn default() -> Self {
1501        Self {
1502            id: "".to_string(),
1503            pid: None,
1504            status: DaemonStatus::Stopped,
1505            shell_pid: None,
1506            dir: None,
1507            autostop: false,
1508            cron_schedule: None,
1509            cron_retrigger: None,
1510            last_exit_success: None,
1511            retry: None,
1512            retry_count: None,
1513            ready_delay: None,
1514            ready_output: None,
1515            ready_http: None,
1516            ready_port: None,
1517            depends: None,
1518        }
1519    }
1520}