pitchfork_cli/
supervisor.rs

1use crate::daemon::{Daemon, RunOptions};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::IpcServer;
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{env, Result};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use std::collections::HashMap;
14use std::fs;
15use std::iter::once;
16use std::path::{Path, PathBuf};
17use std::process::exit;
18use std::sync::atomic;
19use std::sync::atomic::AtomicBool;
20use std::time::Duration;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
22#[cfg(unix)]
23use tokio::signal::unix::SignalKind;
24use tokio::sync::oneshot;
25use tokio::sync::Mutex;
26use tokio::{select, signal, time};
27
28pub struct Supervisor {
29    state_file: Mutex<StateFile>,
30    pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
31    last_refreshed_at: Mutex<time::Instant>,
32}
33
34const INTERVAL: Duration = Duration::from_secs(10);
35
36pub static SUPERVISOR: Lazy<Supervisor> =
37    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
38
39pub fn start_if_not_running() -> Result<()> {
40    let sf = StateFile::get();
41    if let Some(d) = sf.daemons.get("pitchfork") {
42        if let Some(pid) = d.pid {
43            if PROCS.is_running(pid) {
44                return Ok(());
45            }
46        }
47    }
48    start_in_background()
49}
50
51pub fn start_in_background() -> Result<()> {
52    debug!("starting supervisor in background");
53    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
54        .stdout_null()
55        .stderr_null()
56        .start()
57        .into_diagnostic()?;
58    Ok(())
59}
60
61impl Supervisor {
62    pub fn new() -> Result<Self> {
63        Ok(Self {
64            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
65            last_refreshed_at: Mutex::new(time::Instant::now()),
66            pending_notifications: Mutex::new(vec![]),
67        })
68    }
69
70    pub async fn start(&self) -> Result<()> {
71        let pid = std::process::id();
72        info!("Starting supervisor with pid {pid}");
73
74        self.upsert_daemon(UpsertDaemonOpts {
75            id: "pitchfork".to_string(),
76            pid: Some(pid),
77            status: DaemonStatus::Running,
78            ..Default::default()
79        })
80        .await?;
81
82        self.interval_watch()?;
83        self.cron_watch()?;
84        self.signals()?;
85        // self.file_watch().await?;
86
87        let ipc = IpcServer::new()?;
88        self.conn_watch(ipc).await
89    }
90
91    async fn refresh(&self) -> Result<()> {
92        trace!("refreshing");
93        PROCS.refresh_processes();
94        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
95        *last_refreshed_at = time::Instant::now();
96
97        for (dir, pids) in self.get_dirs_with_shell_pids().await {
98            let to_remove = pids
99                .iter()
100                .filter(|pid| !PROCS.is_running(**pid))
101                .collect_vec();
102            for pid in &to_remove {
103                self.remove_shell_pid(**pid).await?
104            }
105            if to_remove.len() == pids.len() {
106                self.leave_dir(&dir).await?;
107            }
108        }
109
110        self.check_retry().await?;
111
112        Ok(())
113    }
114
115    async fn check_retry(&self) -> Result<()> {
116        let state_file = self.state_file.lock().await;
117        let daemons_to_retry: Vec<(String, Daemon)> = state_file
118            .daemons
119            .iter()
120            .filter(|(_id, d)| {
121                // Daemon is errored, not currently running, and has retries remaining
122                d.status.is_errored() && d.pid.is_none() && d.retry > 0 && d.retry_count < d.retry
123            })
124            .map(|(id, d)| (id.clone(), d.clone()))
125            .collect();
126        drop(state_file);
127
128        for (id, daemon) in daemons_to_retry {
129            info!(
130                "retrying daemon {} ({}/{} attempts)",
131                id,
132                daemon.retry_count + 1,
133                daemon.retry
134            );
135
136            // Get command from pitchfork.toml
137            if let Some(run_cmd) = self.get_daemon_run_command(&id) {
138                let retry_opts = RunOptions {
139                    id: id.clone(),
140                    cmd: shell_words::split(&run_cmd).unwrap_or_default(),
141                    force: false,
142                    shell_pid: daemon.shell_pid,
143                    dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
144                    autostop: daemon.autostop,
145                    cron_schedule: daemon.cron_schedule,
146                    cron_retrigger: daemon.cron_retrigger,
147                    retry: daemon.retry,
148                    retry_count: daemon.retry_count + 1,
149                    ready_delay: daemon.ready_delay,
150                    ready_output: daemon.ready_output.clone(),
151                    wait_ready: false,
152                };
153                if let Err(e) = self.run(retry_opts).await {
154                    error!("failed to retry daemon {}: {}", id, e);
155                }
156            } else {
157                warn!("no run command found for daemon {}, cannot retry", id);
158                // Mark as exhausted
159                self.upsert_daemon(UpsertDaemonOpts {
160                    id,
161                    retry_count: Some(daemon.retry),
162                    ..Default::default()
163                })
164                .await?;
165            }
166        }
167
168        Ok(())
169    }
170
171    async fn leave_dir(&self, dir: &Path) -> Result<()> {
172        debug!("left dir {}", dir.display());
173        let shell_dirs = self.get_dirs_with_shell_pids().await;
174        let shell_dirs = shell_dirs.keys().collect_vec();
175        for daemon in self.active_daemons().await {
176            if !daemon.autostop {
177                continue;
178            }
179            // if this daemon's dir starts with the left dir
180            // and no other shell pid has this dir as a prefix
181            // stop the daemon
182            if let Some(daemon_dir) = daemon.dir.as_ref() {
183                if daemon_dir.starts_with(dir)
184                    && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
185                {
186                    info!("autostopping {daemon}");
187                    self.stop(&daemon.id).await?;
188                    self.add_notification(Info, format!("autostopped {daemon}"))
189                        .await;
190                }
191            }
192        }
193        Ok(())
194    }
195
196    async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
197        let id = &opts.id;
198        let cmd = opts.cmd.clone();
199        let daemon = self.get_daemon(id).await;
200        if let Some(daemon) = daemon {
201            // Stopping state is treated as "not running" - the monitoring task will clean it up
202            // Only check for Running state with a valid PID
203            if !daemon.status.is_stopping() && !daemon.status.is_stopped() {
204                if let Some(pid) = daemon.pid {
205                    if opts.force {
206                        self.stop(id).await?;
207                        info!("run: stop completed for daemon {id}");
208                    } else {
209                        warn!("daemon {id} already running with pid {pid}");
210                        return Ok(IpcResponse::DaemonAlreadyRunning);
211                    }
212                }
213            }
214        }
215
216        // If wait_ready is true and retry is configured, implement retry loop
217        if opts.wait_ready && opts.retry > 0 {
218            let max_attempts = opts.retry + 1; // initial attempt + retries
219            for attempt in 0..max_attempts {
220                let mut retry_opts = opts.clone();
221                retry_opts.retry_count = attempt;
222                retry_opts.cmd = cmd.clone();
223
224                let result = self.run_once(retry_opts).await?;
225
226                match result {
227                    IpcResponse::DaemonReady { daemon } => {
228                        return Ok(IpcResponse::DaemonReady { daemon });
229                    }
230                    IpcResponse::DaemonFailedWithCode { exit_code } => {
231                        if attempt < opts.retry {
232                            let backoff_secs = 2u64.pow(attempt);
233                            info!(
234                                "daemon {id} failed (attempt {}/{}), retrying in {}s",
235                                attempt + 1,
236                                max_attempts,
237                                backoff_secs
238                            );
239                            time::sleep(Duration::from_secs(backoff_secs)).await;
240                            continue;
241                        } else {
242                            info!("daemon {id} failed after {} attempts", max_attempts);
243                            return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
244                        }
245                    }
246                    other => return Ok(other),
247                }
248            }
249        }
250
251        // No retry or wait_ready is false
252        self.run_once(opts).await
253    }
254
255    async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
256        let id = &opts.id;
257        let cmd = opts.cmd;
258
259        // Create channel for readiness notification if wait_ready is true
260        let (ready_tx, ready_rx) = if opts.wait_ready {
261            let (tx, rx) = oneshot::channel();
262            (Some(tx), Some(rx))
263        } else {
264            (None, None)
265        };
266
267        let cmd = once("exec".to_string())
268            .chain(cmd.into_iter())
269            .collect_vec();
270        let args = vec!["-c".to_string(), shell_words::join(&cmd)];
271        let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
272        xx::file::mkdirp(log_path.parent().unwrap())?;
273        info!("run: spawning daemon {id} with args: {args:?}");
274        let mut cmd = tokio::process::Command::new("sh");
275        cmd.args(&args)
276            .stdin(std::process::Stdio::null())
277            .stdout(std::process::Stdio::piped())
278            .stderr(std::process::Stdio::piped())
279            .current_dir(&opts.dir);
280
281        // Ensure daemon can find user tools by using the original PATH
282        if let Some(ref path) = *env::ORIGINAL_PATH {
283            cmd.env("PATH", path);
284        }
285
286        let mut child = cmd.spawn().into_diagnostic()?;
287        let pid = child.id().unwrap();
288        info!("started daemon {id} with pid {pid}");
289        let daemon = self
290            .upsert_daemon(UpsertDaemonOpts {
291                id: id.to_string(),
292                pid: Some(pid),
293                status: DaemonStatus::Running,
294                shell_pid: opts.shell_pid,
295                dir: Some(opts.dir.clone()),
296                autostop: opts.autostop,
297                cron_schedule: opts.cron_schedule.clone(),
298                cron_retrigger: opts.cron_retrigger,
299                last_exit_success: None,
300                retry: Some(opts.retry),
301                retry_count: Some(opts.retry_count),
302                ready_delay: opts.ready_delay,
303                ready_output: opts.ready_output.clone(),
304            })
305            .await?;
306
307        let id_clone = id.to_string();
308        let ready_delay = opts.ready_delay;
309        let ready_output = opts.ready_output.clone();
310
311        tokio::spawn(async move {
312            let id = id_clone;
313            let stdout = child.stdout.take().unwrap();
314            let stderr = child.stderr.take().unwrap();
315            let mut stdout = tokio::io::BufReader::new(stdout).lines();
316            let mut stderr = tokio::io::BufReader::new(stderr).lines();
317            let mut log_appender = BufWriter::new(
318                tokio::fs::File::options()
319                    .append(true)
320                    .create(true)
321                    .open(&log_path)
322                    .await
323                    .unwrap(),
324            );
325
326            let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
327            let format_line = |line: String| {
328                if line.starts_with(&format!("{id} ")) {
329                    // mise tasks often already have the id printed
330                    format!("{} {line}\n", now())
331                } else {
332                    format!("{} {id} {line}\n", now())
333                }
334            };
335
336            // Setup readiness checking
337            let mut ready_notified = false;
338            let mut ready_tx = ready_tx;
339            let ready_pattern =
340                ready_output
341                    .as_ref()
342                    .and_then(|pattern| match regex::Regex::new(pattern) {
343                        Ok(re) => Some(re),
344                        Err(e) => {
345                            error!("invalid regex pattern for daemon {id}: {e}");
346                            None
347                        }
348                    });
349
350            let mut delay_timer =
351                ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
352
353            // Use a channel to communicate process exit status
354            let (exit_tx, mut exit_rx) =
355                tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
356
357            // Spawn a task to wait for process exit
358            let child_pid = child.id().unwrap();
359            tokio::spawn(async move {
360                let result = child.wait().await;
361                debug!(
362                    "daemon pid {child_pid} wait() completed with result: {:?}",
363                    result
364                );
365                let _ = exit_tx.send(result).await;
366            });
367
368            let mut exit_status = None;
369
370            loop {
371                select! {
372                    Ok(Some(line)) = stdout.next_line() => {
373                        let formatted = format_line(line.clone());
374                        log_appender.write_all(formatted.as_bytes()).await.unwrap();
375                        log_appender.flush().await.unwrap();
376                        trace!("stdout: {id} {formatted}");
377
378                        // Check if output matches ready pattern
379                        if !ready_notified {
380                            if let Some(ref pattern) = ready_pattern {
381                                if pattern.is_match(&line) {
382                                    info!("daemon {id} ready: output matched pattern");
383                                    ready_notified = true;
384                                    if let Some(tx) = ready_tx.take() {
385                                        let _ = tx.send(Ok(()));
386                                    }
387                                }
388                            }
389                        }
390                    }
391                    Ok(Some(line)) = stderr.next_line() => {
392                        let formatted = format_line(line.clone());
393                        log_appender.write_all(formatted.as_bytes()).await.unwrap();
394                        log_appender.flush().await.unwrap();
395                        trace!("stderr: {id} {formatted}");
396
397                        // Check if output matches ready pattern (also check stderr)
398                        if !ready_notified {
399                            if let Some(ref pattern) = ready_pattern {
400                                if pattern.is_match(&line) {
401                                    info!("daemon {id} ready: output matched pattern");
402                                    ready_notified = true;
403                                    if let Some(tx) = ready_tx.take() {
404                                        let _ = tx.send(Ok(()));
405                                    }
406                                }
407                            }
408                        }
409                    },
410                    Some(result) = exit_rx.recv() => {
411                        // Process exited - save exit status and notify if not ready yet
412                        exit_status = Some(result);
413                        debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
414                        if !ready_notified {
415                            if let Some(tx) = ready_tx.take() {
416                                let exit_code = exit_status.as_ref().and_then(|r| r.as_ref().ok().and_then(|s| s.code()));
417                                debug!("daemon {id} not ready yet, sending failure notification with exit_code: {:?}", exit_code);
418                                let _ = tx.send(Err(exit_code));
419                            }
420                        } else {
421                            debug!("daemon {id} was already marked ready, not sending failure notification");
422                        }
423                        break;
424                    }
425                    _ = async {
426                        if let Some(ref mut timer) = delay_timer {
427                            timer.await;
428                        } else {
429                            std::future::pending::<()>().await;
430                        }
431                    } => {
432                        if !ready_notified && ready_pattern.is_none() {
433                            info!("daemon {id} ready: delay elapsed");
434                            ready_notified = true;
435                            if let Some(tx) = ready_tx.take() {
436                                let _ = tx.send(Ok(()));
437                            }
438                        }
439                        // Disable timer after it fires
440                        delay_timer = None;
441                    }
442                    else => break,
443                }
444            }
445
446            // Get the final exit status
447            let exit_status = if let Some(status) = exit_status {
448                status
449            } else {
450                // Streams closed but process hasn't exited yet, wait for it
451                match exit_rx.recv().await {
452                    Some(status) => status,
453                    None => {
454                        warn!("daemon {id} exit channel closed without receiving status");
455                        Err(std::io::Error::other("exit channel closed"))
456                    }
457                }
458            };
459            let current_daemon = SUPERVISOR.get_daemon(&id).await;
460
461            // Check if this monitoring task is for the current daemon process
462            if current_daemon.is_none()
463                || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
464            {
465                // Another process has taken over, don't update status
466                return;
467            }
468            let current_daemon_clone = current_daemon.clone();
469            let is_stopping = current_daemon
470                .as_ref()
471                .is_some_and(|d| d.status.is_stopping());
472
473            if current_daemon.is_some_and(|d| d.status.is_stopped()) {
474                // was stopped by this supervisor so don't update status
475                return;
476            }
477            if let Ok(status) = exit_status {
478                info!("daemon {id} exited with status {status}");
479                if status.success() || is_stopping {
480                    // If stopping, always mark as Stopped with success
481                    // This allows monitoring task to clear PID after stop() was called
482                    SUPERVISOR
483                        .upsert_daemon(UpsertDaemonOpts {
484                            id: id.clone(),
485                            pid: None, // Clear PID now that process has exited
486                            status: DaemonStatus::Stopped,
487                            last_exit_success: Some(status.success()),
488                            ..Default::default()
489                        })
490                        .await
491                        .unwrap();
492                } else {
493                    // Handle error exit - mark for retry
494                    // retry_count increment will be handled by interval_watch
495                    SUPERVISOR
496                        .upsert_daemon(UpsertDaemonOpts {
497                            id: id.clone(),
498                            pid: None,
499                            status: DaemonStatus::Errored(status.code()),
500                            last_exit_success: Some(false),
501                            ..Default::default()
502                        })
503                        .await
504                        .unwrap();
505                }
506            } else {
507                SUPERVISOR
508                    .upsert_daemon(UpsertDaemonOpts {
509                        id: id.clone(),
510                        pid: None,
511                        status: DaemonStatus::Errored(None),
512                        last_exit_success: Some(false),
513                        ..Default::default()
514                    })
515                    .await
516                    .unwrap();
517            }
518        });
519
520        // If wait_ready is true, wait for readiness notification
521        if let Some(ready_rx) = ready_rx {
522            match ready_rx.await {
523                Ok(Ok(())) => {
524                    info!("daemon {id} is ready");
525                    Ok(IpcResponse::DaemonReady { daemon })
526                }
527                Ok(Err(exit_code)) => {
528                    error!("daemon {id} failed before becoming ready");
529                    Ok(IpcResponse::DaemonFailedWithCode { exit_code })
530                }
531                Err(_) => {
532                    error!("readiness channel closed unexpectedly for daemon {id}");
533                    Ok(IpcResponse::DaemonStart { daemon })
534                }
535            }
536        } else {
537            Ok(IpcResponse::DaemonStart { daemon })
538        }
539    }
540
541    async fn stop(&self, id: &str) -> Result<IpcResponse> {
542        info!("stopping daemon: {id}");
543        if let Some(daemon) = self.get_daemon(id).await {
544            trace!("daemon to stop: {daemon}");
545            if let Some(pid) = daemon.pid {
546                trace!("killing pid: {pid}");
547                PROCS.refresh_processes();
548                if PROCS.is_running(pid) {
549                    // First set status to Stopping (keeps PID for monitoring task)
550                    self.upsert_daemon(UpsertDaemonOpts {
551                        id: id.to_string(),
552                        status: DaemonStatus::Stopping,
553                        ..Default::default()
554                    })
555                    .await?;
556
557                    // Then kill the process
558                    if let Err(e) = PROCS.kill_async(pid).await {
559                        warn!("failed to kill pid {pid}: {e}");
560                    }
561                    PROCS.refresh_processes();
562                    for child_pid in PROCS.all_children(pid) {
563                        debug!("killing child pid: {child_pid}");
564                        if let Err(e) = PROCS.kill_async(child_pid).await {
565                            warn!("failed to kill child pid {child_pid}: {e}");
566                        }
567                    }
568                    // Monitoring task will clear PID and set to Stopped when it detects exit
569                } else {
570                    debug!("pid {pid} not running");
571                    // Process already dead, directly mark as stopped
572                    self.upsert_daemon(UpsertDaemonOpts {
573                        id: id.to_string(),
574                        pid: None,
575                        status: DaemonStatus::Stopped,
576                        ..Default::default()
577                    })
578                    .await?;
579                }
580                return Ok(IpcResponse::Ok);
581            } else {
582                debug!("daemon {id} not running");
583            }
584        } else {
585            debug!("daemon {id} not found");
586        }
587        Ok(IpcResponse::DaemonAlreadyStopped)
588    }
589
590    #[cfg(unix)]
591    fn signals(&self) -> Result<()> {
592        let signals = [
593            SignalKind::terminate(),
594            SignalKind::alarm(),
595            SignalKind::interrupt(),
596            SignalKind::quit(),
597            SignalKind::hangup(),
598            SignalKind::user_defined1(),
599            SignalKind::user_defined2(),
600        ];
601        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
602        for signal in signals {
603            tokio::spawn(async move {
604                let mut stream = signal::unix::signal(signal).unwrap();
605                loop {
606                    stream.recv().await;
607                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
608                        exit(1);
609                    } else {
610                        SUPERVISOR.handle_signal().await;
611                    }
612                }
613            });
614        }
615        Ok(())
616    }
617
618    #[cfg(windows)]
619    fn signals(&self) -> Result<()> {
620        tokio::spawn(async move {
621            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
622            loop {
623                signal::ctrl_c().await.unwrap();
624                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
625                    exit(1);
626                } else {
627                    SUPERVISOR.handle_signal().await;
628                }
629            }
630        });
631        Ok(())
632    }
633
634    async fn handle_signal(&self) {
635        info!("received signal, stopping");
636        self.close().await;
637        exit(0)
638    }
639
640    // async fn file_watch(&self) -> Result<()> {
641    //     let state_file = self.state_file.lock().await.path.clone();
642    //     task::spawn(async move {
643    //         let mut wf = WatchFiles::new(Duration::from_secs(2)).unwrap();
644    //
645    //         wf.watch(&state_file, RecursiveMode::NonRecursive).unwrap();
646    //
647    //         while let Some(paths) = wf.rx.recv().await {
648    //             if let Err(err) = SUPERVISOR.handle_file_change(paths).await {
649    //                 error!("failed to handle file change: {err}");
650    //             }
651    //         }
652    //     });
653    //
654    //     Ok(())
655    // }
656
657    // async fn handle_file_change(&self, paths: Vec<PathBuf>) -> Result<()> {
658    //     debug!("file change: {:?}", paths);
659    //     // let path = self.state_file.lock().await.path.clone();
660    //     // if paths.contains(&path) {
661    //     //     *self.state_file.lock().await = StateFile::read(&path)?;
662    //     // }
663    //     self.refresh().await
664    // }
665
666    fn interval_watch(&self) -> Result<()> {
667        tokio::spawn(async move {
668            let mut interval = time::interval(INTERVAL);
669            loop {
670                interval.tick().await;
671                if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > INTERVAL {
672                    if let Err(err) = SUPERVISOR.refresh().await {
673                        error!("failed to refresh: {err}");
674                    }
675                }
676            }
677        });
678        Ok(())
679    }
680
681    fn cron_watch(&self) -> Result<()> {
682        tokio::spawn(async move {
683            // Check every minute for cron schedules
684            // FIXME: need a better logic, what if the schedule gap is very short (30s, 1min)?
685            let mut interval = time::interval(Duration::from_secs(60));
686            loop {
687                interval.tick().await;
688                if let Err(err) = SUPERVISOR.check_cron_schedules().await {
689                    error!("failed to check cron schedules: {err}");
690                }
691            }
692        });
693        Ok(())
694    }
695
696    async fn check_cron_schedules(&self) -> Result<()> {
697        use cron::Schedule;
698        use std::str::FromStr;
699
700        let now = chrono::Local::now();
701        let daemons = self.state_file.lock().await.daemons.clone();
702
703        for (id, daemon) in daemons {
704            if let Some(schedule_str) = &daemon.cron_schedule {
705                if let Some(retrigger) = daemon.cron_retrigger {
706                    // Parse the cron schedule
707                    let schedule = match Schedule::from_str(schedule_str) {
708                        Ok(s) => s,
709                        Err(e) => {
710                            warn!("invalid cron schedule for daemon {id}: {e}");
711                            continue;
712                        }
713                    };
714
715                    // Check if we should trigger now
716                    let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
717                        // If the next execution is within the next minute, trigger it
718                        let diff = next.signed_duration_since(now);
719                        diff.num_seconds() < 60 && diff.num_seconds() >= 0
720                    });
721
722                    if should_trigger {
723                        let should_run = match retrigger {
724                            crate::pitchfork_toml::CronRetrigger::Finish => {
725                                // Run if not currently running
726                                daemon.pid.is_none()
727                            }
728                            crate::pitchfork_toml::CronRetrigger::Always => {
729                                // Always run (force restart handled in run method)
730                                true
731                            }
732                            crate::pitchfork_toml::CronRetrigger::Success => {
733                                // Run only if previous command succeeded
734                                daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
735                            }
736                            crate::pitchfork_toml::CronRetrigger::Fail => {
737                                // Run only if previous command failed
738                                daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
739                            }
740                        };
741
742                        if should_run {
743                            info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
744                            // Get the run command from pitchfork.toml
745                            if let Some(run_cmd) = self.get_daemon_run_command(&id) {
746                                let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
747                                // Use force: true for Always retrigger to ensure restart
748                                let force = matches!(
749                                    retrigger,
750                                    crate::pitchfork_toml::CronRetrigger::Always
751                                );
752                                let opts = RunOptions {
753                                    id: id.clone(),
754                                    cmd: shell_words::split(&run_cmd).unwrap_or_default(),
755                                    force,
756                                    shell_pid: None,
757                                    dir,
758                                    autostop: daemon.autostop,
759                                    cron_schedule: Some(schedule_str.clone()),
760                                    cron_retrigger: Some(retrigger),
761                                    retry: daemon.retry,
762                                    retry_count: daemon.retry_count,
763                                    ready_delay: daemon.ready_delay,
764                                    ready_output: daemon.ready_output.clone(),
765                                    wait_ready: false,
766                                };
767                                if let Err(e) = self.run(opts).await {
768                                    error!("failed to run cron daemon {id}: {e}");
769                                }
770                            } else {
771                                warn!("no run command found for cron daemon {id}");
772                            }
773                        }
774                    }
775                }
776            }
777        }
778
779        Ok(())
780    }
781
782    fn get_daemon_run_command(&self, id: &str) -> Option<String> {
783        use crate::pitchfork_toml::PitchforkToml;
784        let pt = PitchforkToml::all_merged();
785        pt.daemons.get(id).map(|d| d.run.clone())
786    }
787
788    async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
789        loop {
790            let (msg, send) = match ipc.read().await {
791                Ok(msg) => msg,
792                Err(e) => {
793                    error!("failed to accept connection: {:?}", e);
794                    continue;
795                }
796            };
797            debug!("received message: {:?}", msg);
798            tokio::spawn(async move {
799                let rsp = SUPERVISOR
800                    .handle_ipc(msg)
801                    .await
802                    .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
803                if let Err(err) = send.send(rsp).await {
804                    debug!("failed to send message: {:?}", err);
805                }
806            });
807        }
808    }
809
810    async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
811        let rsp = match req {
812            IpcRequest::Connect => {
813                debug!("received connect message");
814                IpcResponse::Ok
815            }
816            IpcRequest::Stop { id } => self.stop(&id).await?,
817            IpcRequest::Run(opts) => self.run(opts).await?,
818            IpcRequest::Enable { id } => {
819                if self.enable(id).await? {
820                    IpcResponse::Yes
821                } else {
822                    IpcResponse::No
823                }
824            }
825            IpcRequest::Disable { id } => {
826                if self.disable(id).await? {
827                    IpcResponse::Yes
828                } else {
829                    IpcResponse::No
830                }
831            }
832            IpcRequest::GetActiveDaemons => {
833                let daemons = self.active_daemons().await;
834                IpcResponse::ActiveDaemons(daemons)
835            }
836            IpcRequest::GetNotifications => {
837                let notifications = self.get_notifications().await;
838                IpcResponse::Notifications(notifications)
839            }
840            IpcRequest::UpdateShellDir { shell_pid, dir } => {
841                let prev = self.get_shell_dir(shell_pid).await;
842                self.set_shell_dir(shell_pid, dir).await?;
843                if let Some(prev) = prev {
844                    self.leave_dir(&prev).await?;
845                }
846                self.refresh().await?;
847                IpcResponse::Ok
848            }
849            IpcRequest::Clean => {
850                self.clean().await?;
851                IpcResponse::Ok
852            }
853            IpcRequest::GetDisabledDaemons => {
854                let disabled = self.state_file.lock().await.disabled.clone();
855                IpcResponse::DisabledDaemons(disabled.into_iter().collect())
856            }
857        };
858        Ok(rsp)
859    }
860
861    async fn close(&self) {
862        for daemon in self.active_daemons().await {
863            if daemon.id == "pitchfork" {
864                continue;
865            }
866            if let Err(err) = self.stop(&daemon.id).await {
867                error!("failed to stop daemon {daemon}: {err}");
868            }
869        }
870        let _ = self.remove_daemon("pitchfork").await;
871        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
872        // TODO: cleanly stop ipc server
873    }
874
875    async fn add_notification(&self, level: log::LevelFilter, message: String) {
876        self.pending_notifications
877            .lock()
878            .await
879            .push((level, message));
880    }
881
882    async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
883        self.pending_notifications.lock().await.drain(..).collect()
884    }
885
886    async fn active_daemons(&self) -> Vec<Daemon> {
887        self.state_file
888            .lock()
889            .await
890            .daemons
891            .values()
892            .filter(|d| d.pid.is_some())
893            .cloned()
894            .collect()
895    }
896
897    async fn remove_daemon(&self, id: &str) -> Result<()> {
898        self.state_file.lock().await.daemons.remove(id);
899        if let Err(err) = self.state_file.lock().await.write() {
900            warn!("failed to update state file: {err:#}");
901        }
902        Ok(())
903    }
904
905    async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
906        info!(
907            "upserting daemon: {} pid: {} status: {}",
908            opts.id,
909            opts.pid.unwrap_or(0),
910            opts.status
911        );
912        let mut state_file = self.state_file.lock().await;
913        let existing = state_file.daemons.get(&opts.id);
914        let daemon = Daemon {
915            id: opts.id.to_string(),
916            title: opts.pid.and_then(|pid| PROCS.title(pid)),
917            pid: opts.pid,
918            status: opts.status,
919            shell_pid: opts.shell_pid,
920            autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
921            dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
922            cron_schedule: opts
923                .cron_schedule
924                .or(existing.and_then(|d| d.cron_schedule.clone())),
925            cron_retrigger: opts
926                .cron_retrigger
927                .or(existing.and_then(|d| d.cron_retrigger)),
928            last_exit_success: opts
929                .last_exit_success
930                .or(existing.and_then(|d| d.last_exit_success)),
931            retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
932            retry_count: opts
933                .retry_count
934                .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
935            ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
936            ready_output: opts
937                .ready_output
938                .or(existing.and_then(|d| d.ready_output.clone())),
939        };
940        state_file
941            .daemons
942            .insert(opts.id.to_string(), daemon.clone());
943        if let Err(err) = state_file.write() {
944            warn!("failed to update state file: {err:#}");
945        }
946        Ok(daemon)
947    }
948
949    async fn enable(&self, id: String) -> Result<bool> {
950        info!("enabling daemon: {id}");
951        let mut state_file = self.state_file.lock().await;
952        let result = state_file.disabled.remove(&id);
953        state_file.write()?;
954        Ok(result)
955    }
956
957    async fn disable(&self, id: String) -> Result<bool> {
958        info!("disabling daemon: {id}");
959        let mut state_file = self.state_file.lock().await;
960        let result = state_file.disabled.insert(id);
961        state_file.write()?;
962        Ok(result)
963    }
964
965    async fn get_daemon(&self, id: &str) -> Option<Daemon> {
966        self.state_file.lock().await.daemons.get(id).cloned()
967    }
968
969    async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
970        let mut state_file = self.state_file.lock().await;
971        state_file.shell_dirs.insert(shell_pid.to_string(), dir);
972        state_file.write()?;
973        Ok(())
974    }
975
976    async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
977        self.state_file
978            .lock()
979            .await
980            .shell_dirs
981            .get(&shell_pid.to_string())
982            .cloned()
983    }
984
985    async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
986        let mut state_file = self.state_file.lock().await;
987        if state_file
988            .shell_dirs
989            .remove(&shell_pid.to_string())
990            .is_some()
991        {
992            state_file.write()?;
993        }
994        Ok(())
995    }
996
997    async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
998        self.state_file.lock().await.shell_dirs.iter().fold(
999            HashMap::new(),
1000            |mut acc, (pid, dir)| {
1001                acc.entry(dir.clone())
1002                    .or_default()
1003                    .push(pid.parse().unwrap());
1004                acc
1005            },
1006        )
1007    }
1008
1009    async fn clean(&self) -> Result<()> {
1010        let mut state_file = self.state_file.lock().await;
1011        state_file.daemons.retain(|_id, d| d.pid.is_some());
1012        state_file.write()?;
1013        Ok(())
1014    }
1015}
1016
1017#[derive(Debug)]
1018struct UpsertDaemonOpts {
1019    id: String,
1020    pid: Option<u32>,
1021    status: DaemonStatus,
1022    shell_pid: Option<u32>,
1023    dir: Option<PathBuf>,
1024    autostop: bool,
1025    cron_schedule: Option<String>,
1026    cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1027    last_exit_success: Option<bool>,
1028    retry: Option<u32>,
1029    retry_count: Option<u32>,
1030    ready_delay: Option<u64>,
1031    ready_output: Option<String>,
1032}
1033
1034impl Default for UpsertDaemonOpts {
1035    fn default() -> Self {
1036        Self {
1037            id: "".to_string(),
1038            pid: None,
1039            status: DaemonStatus::Stopped,
1040            shell_pid: None,
1041            dir: None,
1042            autostop: false,
1043            cron_schedule: None,
1044            cron_retrigger: None,
1045            last_exit_success: None,
1046            retry: None,
1047            retry_count: None,
1048            ready_delay: None,
1049            ready_output: None,
1050        }
1051    }
1052}