pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod ipc_handlers;
13mod lifecycle;
14mod retry;
15mod state;
16mod watchers;
17
18use crate::daemon_status::DaemonStatus;
19use crate::ipc::server::{IpcServer, IpcServerHandle};
20use crate::procs::PROCS;
21use crate::state_file::StateFile;
22use crate::{Result, env};
23use duct::cmd;
24use miette::IntoDiagnostic;
25use once_cell::sync::Lazy;
26use std::collections::HashMap;
27use std::fs;
28use std::process::exit;
29use std::sync::atomic;
30use std::sync::atomic::AtomicBool;
31use std::time::Duration;
32#[cfg(unix)]
33use tokio::signal::unix::SignalKind;
34use tokio::sync::Mutex;
35use tokio::{signal, time};
36
37pub(crate) use state::UpsertDaemonOpts;
39
40pub struct Supervisor {
41 pub(crate) state_file: Mutex<StateFile>,
42 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
43 pub(crate) last_refreshed_at: Mutex<time::Instant>,
44 pub(crate) pending_autostops: Mutex<HashMap<String, time::Instant>>,
46 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
48}
49
50pub(crate) fn interval_duration() -> Duration {
51 Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
52}
53
54pub static SUPERVISOR: Lazy<Supervisor> =
55 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
56
57pub fn start_if_not_running() -> Result<()> {
58 let sf = StateFile::get();
59 if let Some(d) = sf.daemons.get("pitchfork")
60 && let Some(pid) = d.pid
61 && PROCS.is_running(pid)
62 {
63 return Ok(());
64 }
65 start_in_background()
66}
67
68pub fn start_in_background() -> Result<()> {
69 debug!("starting supervisor in background");
70 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
71 .stdout_null()
72 .stderr_null()
73 .start()
74 .into_diagnostic()?;
75 Ok(())
76}
77
78impl Supervisor {
79 pub fn new() -> Result<Self> {
80 Ok(Self {
81 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
82 last_refreshed_at: Mutex::new(time::Instant::now()),
83 pending_notifications: Mutex::new(vec![]),
84 pending_autostops: Mutex::new(HashMap::new()),
85 ipc_shutdown: Mutex::new(None),
86 })
87 }
88
89 pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
90 let pid = std::process::id();
91 info!("Starting supervisor with pid {pid}");
92
93 self.upsert_daemon(UpsertDaemonOpts {
94 id: "pitchfork".to_string(),
95 pid: Some(pid),
96 status: DaemonStatus::Running,
97 ..Default::default()
98 })
99 .await?;
100
101 if is_boot {
103 info!("Boot start mode enabled, starting boot_start daemons");
104 self.start_boot_daemons().await?;
105 }
106
107 self.interval_watch()?;
108 self.cron_watch()?;
109 self.signals()?;
110 self.daemon_file_watch()?;
111
112 if let Some(port) = web_port {
114 tokio::spawn(async move {
115 if let Err(e) = crate::web::serve(port).await {
116 error!("Web server error: {}", e);
117 }
118 });
119 }
120
121 let (ipc, ipc_handle) = IpcServer::new()?;
122 *self.ipc_shutdown.lock().await = Some(ipc_handle);
123 self.conn_watch(ipc).await
124 }
125
126 pub(crate) async fn refresh(&self) -> Result<()> {
127 trace!("refreshing");
128
129 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
132 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
133
134 if pids_to_check.is_empty() {
135 trace!("no shell PIDs to check, skipping process refresh");
137 } else {
138 PROCS.refresh_pids(&pids_to_check);
139 }
140
141 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
142 *last_refreshed_at = time::Instant::now();
143
144 for (dir, pids) in dirs_with_pids {
145 let to_remove = pids
146 .iter()
147 .filter(|pid| !PROCS.is_running(**pid))
148 .collect::<Vec<_>>();
149 for pid in &to_remove {
150 self.remove_shell_pid(**pid).await?
151 }
152 if to_remove.len() == pids.len() {
153 self.leave_dir(&dir).await?;
154 }
155 }
156
157 self.check_retry().await?;
158 self.process_pending_autostops().await?;
159
160 Ok(())
161 }
162
163 #[cfg(unix)]
164 fn signals(&self) -> Result<()> {
165 let signals = [
166 SignalKind::terminate(),
167 SignalKind::alarm(),
168 SignalKind::interrupt(),
169 SignalKind::quit(),
170 SignalKind::hangup(),
171 SignalKind::user_defined1(),
172 SignalKind::user_defined2(),
173 ];
174 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
175 for signal in signals {
176 let stream = match signal::unix::signal(signal) {
177 Ok(s) => s,
178 Err(e) => {
179 warn!("Failed to register signal handler for {:?}: {}", signal, e);
180 continue;
181 }
182 };
183 tokio::spawn(async move {
184 let mut stream = stream;
185 loop {
186 stream.recv().await;
187 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
188 exit(1);
189 } else {
190 SUPERVISOR.handle_signal().await;
191 }
192 }
193 });
194 }
195 Ok(())
196 }
197
198 #[cfg(windows)]
199 fn signals(&self) -> Result<()> {
200 tokio::spawn(async move {
201 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
202 loop {
203 if let Err(e) = signal::ctrl_c().await {
204 error!("Failed to wait for ctrl-c: {}", e);
205 return;
206 }
207 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
208 exit(1);
209 } else {
210 SUPERVISOR.handle_signal().await;
211 }
212 }
213 });
214 Ok(())
215 }
216
217 async fn handle_signal(&self) {
218 info!("received signal, stopping");
219 self.close().await;
220 exit(0)
221 }
222
223 pub(crate) async fn close(&self) {
224 for daemon in self.active_daemons().await {
225 if daemon.id == "pitchfork" {
226 continue;
227 }
228 if let Err(err) = self.stop(&daemon.id).await {
229 error!("failed to stop daemon {daemon}: {err}");
230 }
231 }
232 let _ = self.remove_daemon("pitchfork").await;
233
234 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
236 handle.shutdown();
237 }
238
239 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
240 }
241
242 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
243 self.pending_notifications
244 .lock()
245 .await
246 .push((level, message));
247 }
248}