Skip to main content

pitchfork_cli/supervisor/
mod.rs

1//! Supervisor module - daemon process supervisor
2//!
3//! This module is split into focused submodules:
4//! - `state`: State access layer (get/set operations)
5//! - `lifecycle`: Daemon start/stop operations
6//! - `autostop`: Autostop logic and boot daemon startup
7//! - `retry`: Retry logic with backoff
8//! - `watchers`: Background tasks (interval, cron, file watching)
9//! - `ipc_handlers`: IPC request dispatch
10
11mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15mod retry;
16mod state;
17mod watchers;
18
19use crate::daemon_id::DaemonId;
20use crate::daemon_status::DaemonStatus;
21use crate::ipc::server::{IpcServer, IpcServerHandle};
22use crate::procs::PROCS;
23use crate::settings::settings;
24use crate::state_file::StateFile;
25use crate::{Result, env};
26use duct::cmd;
27use miette::IntoDiagnostic;
28use once_cell::sync::Lazy;
29use std::collections::HashMap;
30use std::fs;
31use std::process::exit;
32use std::sync::atomic;
33use std::sync::atomic::{AtomicBool, AtomicU32};
34use std::time::Duration;
35#[cfg(unix)]
36use tokio::signal::unix::SignalKind;
37use tokio::sync::{Mutex, Notify};
38use tokio::task::JoinHandle;
39use tokio::{signal, time};
40
41// Re-export types needed by other modules
42pub(crate) use state::UpsertDaemonOpts;
43
44pub struct Supervisor {
45    pub(crate) state_file: Mutex<StateFile>,
46    pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
47    pub(crate) last_refreshed_at: Mutex<time::Instant>,
48    /// Map of daemon ID to scheduled autostop time
49    pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
50    /// Handle for graceful IPC server shutdown
51    pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
52    /// Tracks in-flight hook tasks so shutdown can wait for them to complete
53    pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
54    /// Number of monitoring tasks that are still running (between process exit
55    /// and hook registration completion). Used by `close()` to know when it is
56    /// safe to drain `hook_tasks`.
57    pub(crate) active_monitors: AtomicU32,
58    /// Signalled by each monitoring task after it finishes registering hooks
59    /// (or decides it has nothing to register). `close()` waits on this.
60    pub(crate) monitor_done: Notify,
61}
62
63pub(crate) fn interval_duration() -> Duration {
64    settings().general_interval()
65}
66
67pub static SUPERVISOR: Lazy<Supervisor> =
68    Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
69
70pub fn start_if_not_running() -> Result<()> {
71    let sf = StateFile::get();
72    if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
73        && let Some(pid) = d.pid
74        && PROCS.is_running(pid)
75    {
76        return Ok(());
77    }
78    start_in_background()
79}
80
81pub fn start_in_background() -> Result<()> {
82    debug!("starting supervisor in background");
83    cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
84        .stdout_null()
85        .stderr_null()
86        .start()
87        .into_diagnostic()?;
88    Ok(())
89}
90
91impl Supervisor {
92    pub fn new() -> Result<Self> {
93        Ok(Self {
94            state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
95            last_refreshed_at: Mutex::new(time::Instant::now()),
96            pending_notifications: Mutex::new(vec![]),
97            pending_autostops: Mutex::new(HashMap::new()),
98            ipc_shutdown: Mutex::new(None),
99            hook_tasks: Mutex::new(Vec::new()),
100            active_monitors: AtomicU32::new(0),
101            monitor_done: Notify::new(),
102        })
103    }
104
105    pub async fn start(
106        &self,
107        is_boot: bool,
108        web_port: Option<u16>,
109        web_path: Option<String>,
110    ) -> Result<()> {
111        let pid = std::process::id();
112        info!("Starting supervisor with pid {pid}");
113
114        self.upsert_daemon(
115            UpsertDaemonOpts::builder(DaemonId::pitchfork())
116                .set(|o| {
117                    o.pid = Some(pid);
118                    o.status = DaemonStatus::Running;
119                })
120                .build(),
121        )
122        .await?;
123
124        // If this is a boot start, automatically start boot_start daemons
125        if is_boot {
126            info!("Boot start mode enabled, starting boot_start daemons");
127            self.start_boot_daemons().await?;
128        }
129
130        self.interval_watch()?;
131        self.cron_watch()?;
132        self.signals()?;
133        self.daemon_file_watch()?;
134
135        // Start web server: CLI --web-port takes priority, then settings.web.auto_start + bind_port
136        let s = settings();
137        let effective_port = web_port.or_else(|| {
138            if s.web.auto_start {
139                match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
140                    Some(p) => Some(p),
141                    None => {
142                        error!(
143                            "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
144                            s.web.bind_port
145                        );
146                        None
147                    }
148                }
149            } else {
150                None
151            }
152        });
153        // CLI --web-path takes priority, then settings.web.base_path
154        let effective_path = web_path.or_else(|| {
155            let bp = s.web.base_path.clone();
156            if bp.is_empty() { None } else { Some(bp) }
157        });
158        if let Some(port) = effective_port {
159            tokio::spawn(async move {
160                if let Err(e) = crate::web::serve(port, effective_path).await {
161                    error!("Web server error: {e}");
162                }
163            });
164        }
165
166        let (ipc, ipc_handle) = IpcServer::new()?;
167        *self.ipc_shutdown.lock().await = Some(ipc_handle);
168        self.conn_watch(ipc).await
169    }
170
171    pub(crate) async fn refresh(&self) -> Result<()> {
172        trace!("refreshing");
173
174        // Collect PIDs we need to check (shell PIDs only)
175        // This is more efficient than refreshing all processes on the system
176        let dirs_with_pids = self.get_dirs_with_shell_pids().await;
177        let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
178
179        if pids_to_check.is_empty() {
180            // No PIDs to check, skip the expensive refresh
181            trace!("no shell PIDs to check, skipping process refresh");
182        } else {
183            PROCS.refresh_pids(&pids_to_check);
184        }
185
186        let mut last_refreshed_at = self.last_refreshed_at.lock().await;
187        *last_refreshed_at = time::Instant::now();
188
189        for (dir, pids) in dirs_with_pids {
190            let to_remove = pids
191                .iter()
192                .filter(|pid| !PROCS.is_running(**pid))
193                .collect::<Vec<_>>();
194            for pid in &to_remove {
195                self.remove_shell_pid(**pid).await?
196            }
197            if to_remove.len() == pids.len() {
198                self.leave_dir(&dir).await?;
199            }
200        }
201
202        self.check_retry().await?;
203        self.process_pending_autostops().await?;
204
205        Ok(())
206    }
207
208    #[cfg(unix)]
209    fn signals(&self) -> Result<()> {
210        let signals = [
211            SignalKind::terminate(),
212            SignalKind::alarm(),
213            SignalKind::interrupt(),
214            SignalKind::quit(),
215            SignalKind::hangup(),
216            SignalKind::user_defined1(),
217            SignalKind::user_defined2(),
218        ];
219        static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
220        for signal in signals {
221            let stream = match signal::unix::signal(signal) {
222                Ok(s) => s,
223                Err(e) => {
224                    warn!("Failed to register signal handler for {signal:?}: {e}");
225                    continue;
226                }
227            };
228            tokio::spawn(async move {
229                let mut stream = stream;
230                loop {
231                    stream.recv().await;
232                    if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
233                        exit(1);
234                    } else {
235                        SUPERVISOR.handle_signal().await;
236                    }
237                }
238            });
239        }
240        Ok(())
241    }
242
243    #[cfg(windows)]
244    fn signals(&self) -> Result<()> {
245        tokio::spawn(async move {
246            static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
247            loop {
248                if let Err(e) = signal::ctrl_c().await {
249                    error!("Failed to wait for ctrl-c: {}", e);
250                    return;
251                }
252                if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
253                    exit(1);
254                } else {
255                    SUPERVISOR.handle_signal().await;
256                }
257            }
258        });
259        Ok(())
260    }
261
262    async fn handle_signal(&self) {
263        info!("received signal, stopping");
264        self.close().await;
265        exit(0)
266    }
267
268    pub(crate) async fn close(&self) {
269        let pitchfork_id = DaemonId::pitchfork();
270        for daemon in self.active_daemons().await {
271            if daemon.id == pitchfork_id {
272                continue;
273            }
274            if let Err(err) = self.stop(&daemon.id).await {
275                error!("failed to stop daemon {daemon}: {err}");
276            }
277        }
278        let _ = self.remove_daemon(&pitchfork_id).await;
279
280        // Signal IPC server to shut down gracefully
281        if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
282            handle.shutdown();
283        }
284
285        // Wait for all in-flight monitoring tasks to finish registering their
286        // hook handles. Each monitoring task increments `active_monitors` when
287        // its process exits, and decrements it (+ notifies `monitor_done`)
288        // after all fire_hook() calls complete. This replaces the old
289        // yield_now() approach which had a race window.
290        let drain_timeout = time::sleep(Duration::from_secs(5));
291        tokio::pin!(drain_timeout);
292        loop {
293            if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
294                break;
295            }
296            tokio::select! {
297                _ = self.monitor_done.notified() => {}
298                _ = &mut drain_timeout => {
299                    warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
300                    break;
301                }
302            }
303        }
304        let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
305        let hook_timeout = Duration::from_secs(30);
306        for handle in handles {
307            match time::timeout(hook_timeout, handle).await {
308                Ok(_) => {} // Hook completed (success or error, doesn't matter)
309                Err(_) => {
310                    warn!(
311                        "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
312                    );
313                }
314            }
315        }
316
317        let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
318    }
319
320    pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
321        self.pending_notifications
322            .lock()
323            .await
324            .push((level, message));
325    }
326}