pitchfork_cli/supervisor/
mod.rs

1//! Supervisor module - daemon process supervisor
2//!
3//! This module is split into focused submodules:
4//! - `state`: State access layer (get/set operations)
5//! - `lifecycle`: Daemon start/stop operations
6//! - `autostop`: Autostop logic and boot daemon startup
7//! - `retry`: Retry logic with backoff
8//! - `watchers`: Background tasks (interval, cron, file watching)
9//! - `ipc_handlers`: IPC request dispatch
10
11mod autostop;
12mod ipc_handlers;
13mod lifecycle;
14mod retry;
15mod state;
16mod watchers;
17
18use crate::daemon_status::DaemonStatus;
19use crate::ipc::server::{IpcServer, IpcServerHandle};
20use crate::procs::PROCS;
21use crate::state_file::StateFile;
22use crate::{Result, env};
23use duct::cmd;
24use miette::IntoDiagnostic;
25use once_cell::sync::Lazy;
26use std::collections::HashMap;
27use std::fs;
28use std::process::exit;
29use std::sync::atomic;
30use std::sync::atomic::AtomicBool;
31use std::time::Duration;
32#[cfg(unix)]
33use tokio::signal::unix::SignalKind;
34use tokio::sync::Mutex;
35use tokio::{signal, time};
36
37// Re-export types needed by other modules
38pub(crate) use state::UpsertDaemonOpts;
39
40pub struct Supervisor {
41    pub(crate) state_file: Mutex<StateFile>,
42    pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
43    pub(crate) last_refreshed_at: Mutex<time::Instant>,
44    /// Map of daemon ID to scheduled autostop time
45    pub(crate) pending_autostops: Mutex<HashMap<String, time::Instant>>,
46    /// Handle for graceful IPC server shutdown
47    pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
48}
49
50pub(crate) fn interval_duration() -> Duration {
51    Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
52}
53
54pub static SUPERVISOR: Lazy<Supervisor> =
55    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
56
57pub fn start_if_not_running() -> Result<()> {
58    let sf = StateFile::get();
59    if let Some(d) = sf.daemons.get("pitchfork")
60        && let Some(pid) = d.pid
61        && PROCS.is_running(pid)
62    {
63        return Ok(());
64    }
65    start_in_background()
66}
67
68pub fn start_in_background() -> Result<()> {
69    debug!("starting supervisor in background");
70    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
71        .stdout_null()
72        .stderr_null()
73        .start()
74        .into_diagnostic()?;
75    Ok(())
76}
77
78impl Supervisor {
79    pub fn new() -> Result<Self> {
80        Ok(Self {
81            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
82            last_refreshed_at: Mutex::new(time::Instant::now()),
83            pending_notifications: Mutex::new(vec![]),
84            pending_autostops: Mutex::new(HashMap::new()),
85            ipc_shutdown: Mutex::new(None),
86        })
87    }
88
89    pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
90        let pid = std::process::id();
91        info!("Starting supervisor with pid {pid}");
92
93        self.upsert_daemon(UpsertDaemonOpts {
94            id: "pitchfork".to_string(),
95            pid: Some(pid),
96            status: DaemonStatus::Running,
97            ..Default::default()
98        })
99        .await?;
100
101        // If this is a boot start, automatically start boot_start daemons
102        if is_boot {
103            info!("Boot start mode enabled, starting boot_start daemons");
104            self.start_boot_daemons().await?;
105        }
106
107        self.interval_watch()?;
108        self.cron_watch()?;
109        self.signals()?;
110        self.daemon_file_watch()?;
111
112        // Start web server if port is configured
113        if let Some(port) = web_port {
114            tokio::spawn(async move {
115                if let Err(e) = crate::web::serve(port).await {
116                    error!("Web server error: {}", e);
117                }
118            });
119        }
120
121        let (ipc, ipc_handle) = IpcServer::new()?;
122        *self.ipc_shutdown.lock().await = Some(ipc_handle);
123        self.conn_watch(ipc).await
124    }
125
126    pub(crate) async fn refresh(&self) -> Result<()> {
127        trace!("refreshing");
128
129        // Collect PIDs we need to check (shell PIDs only)
130        // This is more efficient than refreshing all processes on the system
131        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
132        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
133
134        if pids_to_check.is_empty() {
135            // No PIDs to check, skip the expensive refresh
136            trace!("no shell PIDs to check, skipping process refresh");
137        } else {
138            PROCS.refresh_pids(&pids_to_check);
139        }
140
141        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
142        *last_refreshed_at = time::Instant::now();
143
144        for (dir, pids) in dirs_with_pids {
145            let to_remove = pids
146                .iter()
147                .filter(|pid| !PROCS.is_running(**pid))
148                .collect::<Vec<_>>();
149            for pid in &to_remove {
150                self.remove_shell_pid(**pid).await?
151            }
152            if to_remove.len() == pids.len() {
153                self.leave_dir(&dir).await?;
154            }
155        }
156
157        self.check_retry().await?;
158        self.process_pending_autostops().await?;
159
160        Ok(())
161    }
162
163    #[cfg(unix)]
164    fn signals(&self) -> Result<()> {
165        let signals = [
166            SignalKind::terminate(),
167            SignalKind::alarm(),
168            SignalKind::interrupt(),
169            SignalKind::quit(),
170            SignalKind::hangup(),
171            SignalKind::user_defined1(),
172            SignalKind::user_defined2(),
173        ];
174        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
175        for signal in signals {
176            let stream = match signal::unix::signal(signal) {
177                Ok(s) => s,
178                Err(e) => {
179                    warn!("Failed to register signal handler for {:?}: {}", signal, e);
180                    continue;
181                }
182            };
183            tokio::spawn(async move {
184                let mut stream = stream;
185                loop {
186                    stream.recv().await;
187                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
188                        exit(1);
189                    } else {
190                        SUPERVISOR.handle_signal().await;
191                    }
192                }
193            });
194        }
195        Ok(())
196    }
197
198    #[cfg(windows)]
199    fn signals(&self) -> Result<()> {
200        tokio::spawn(async move {
201            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
202            loop {
203                if let Err(e) = signal::ctrl_c().await {
204                    error!("Failed to wait for ctrl-c: {}", e);
205                    return;
206                }
207                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
208                    exit(1);
209                } else {
210                    SUPERVISOR.handle_signal().await;
211                }
212            }
213        });
214        Ok(())
215    }
216
217    async fn handle_signal(&self) {
218        info!("received signal, stopping");
219        self.close().await;
220        exit(0)
221    }
222
223    pub(crate) async fn close(&self) {
224        for daemon in self.active_daemons().await {
225            if daemon.id == "pitchfork" {
226                continue;
227            }
228            if let Err(err) = self.stop(&daemon.id).await {
229                error!("failed to stop daemon {daemon}: {err}");
230            }
231        }
232        let _ = self.remove_daemon("pitchfork").await;
233
234        // Signal IPC server to shut down gracefully
235        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
236            handle.shutdown();
237        }
238
239        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
240    }
241
242    pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
243        self.pending_notifications
244            .lock()
245            .await
246            .push((level, message));
247    }
248}