1use crate::daemon::{Daemon, RunOptions};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::IpcServer;
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{Result, env};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use std::collections::HashMap;
14use std::fs;
15use std::iter::once;
16use std::path::{Path, PathBuf};
17use std::process::exit;
18use std::sync::atomic;
19use std::sync::atomic::AtomicBool;
20use std::time::Duration;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
22#[cfg(unix)]
23use tokio::signal::unix::SignalKind;
24use tokio::sync::Mutex;
25use tokio::sync::oneshot;
26use tokio::{select, signal, time};
27
28pub struct Supervisor {
29 state_file: Mutex<StateFile>,
30 pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
31 last_refreshed_at: Mutex<time::Instant>,
32 pending_autostops: Mutex<HashMap<String, time::Instant>>,
34}
35
36fn interval_duration() -> Duration {
37 Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
38}
39
40pub static SUPERVISOR: Lazy<Supervisor> =
41 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
42
43pub fn start_if_not_running() -> Result<()> {
44 let sf = StateFile::get();
45 if let Some(d) = sf.daemons.get("pitchfork")
46 && let Some(pid) = d.pid
47 && PROCS.is_running(pid)
48 {
49 return Ok(());
50 }
51 start_in_background()
52}
53
54pub fn start_in_background() -> Result<()> {
55 debug!("starting supervisor in background");
56 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
57 .stdout_null()
58 .stderr_null()
59 .start()
60 .into_diagnostic()?;
61 Ok(())
62}
63
64impl Supervisor {
65 pub fn new() -> Result<Self> {
66 Ok(Self {
67 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
68 last_refreshed_at: Mutex::new(time::Instant::now()),
69 pending_notifications: Mutex::new(vec![]),
70 pending_autostops: Mutex::new(HashMap::new()),
71 })
72 }
73
74 pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
75 let pid = std::process::id();
76 info!("Starting supervisor with pid {pid}");
77
78 self.upsert_daemon(UpsertDaemonOpts {
79 id: "pitchfork".to_string(),
80 pid: Some(pid),
81 status: DaemonStatus::Running,
82 ..Default::default()
83 })
84 .await?;
85
86 if is_boot {
88 info!("Boot start mode enabled, starting boot_start daemons");
89 self.start_boot_daemons().await?;
90 }
91
92 self.interval_watch()?;
93 self.cron_watch()?;
94 self.signals()?;
95 if let Some(port) = web_port {
99 tokio::spawn(async move {
100 if let Err(e) = crate::web::serve(port).await {
101 error!("Web server error: {}", e);
102 }
103 });
104 }
105
106 let ipc = IpcServer::new()?;
107 self.conn_watch(ipc).await
108 }
109
110 async fn refresh(&self) -> Result<()> {
111 trace!("refreshing");
112 PROCS.refresh_processes();
113 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
114 *last_refreshed_at = time::Instant::now();
115
116 for (dir, pids) in self.get_dirs_with_shell_pids().await {
117 let to_remove = pids
118 .iter()
119 .filter(|pid| !PROCS.is_running(**pid))
120 .collect_vec();
121 for pid in &to_remove {
122 self.remove_shell_pid(**pid).await?
123 }
124 if to_remove.len() == pids.len() {
125 self.leave_dir(&dir).await?;
126 }
127 }
128
129 self.check_retry().await?;
130 self.process_pending_autostops().await?;
131
132 Ok(())
133 }
134
135 async fn check_retry(&self) -> Result<()> {
136 let state_file = self.state_file.lock().await;
137 let daemons_to_retry: Vec<(String, Daemon)> = state_file
138 .daemons
139 .iter()
140 .filter(|(_id, d)| {
141 d.status.is_errored() && d.pid.is_none() && d.retry > 0 && d.retry_count < d.retry
143 })
144 .map(|(id, d)| (id.clone(), d.clone()))
145 .collect();
146 drop(state_file);
147
148 for (id, daemon) in daemons_to_retry {
149 info!(
150 "retrying daemon {} ({}/{} attempts)",
151 id,
152 daemon.retry_count + 1,
153 daemon.retry
154 );
155
156 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
158 let retry_opts = RunOptions {
159 id: id.clone(),
160 cmd: shell_words::split(&run_cmd).unwrap_or_default(),
161 force: false,
162 shell_pid: daemon.shell_pid,
163 dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
164 autostop: daemon.autostop,
165 cron_schedule: daemon.cron_schedule,
166 cron_retrigger: daemon.cron_retrigger,
167 retry: daemon.retry,
168 retry_count: daemon.retry_count + 1,
169 ready_delay: daemon.ready_delay,
170 ready_output: daemon.ready_output.clone(),
171 ready_http: daemon.ready_http.clone(),
172 ready_port: daemon.ready_port,
173 wait_ready: false,
174 };
175 if let Err(e) = self.run(retry_opts).await {
176 error!("failed to retry daemon {}: {}", id, e);
177 }
178 } else {
179 warn!("no run command found for daemon {}, cannot retry", id);
180 self.upsert_daemon(UpsertDaemonOpts {
182 id,
183 retry_count: Some(daemon.retry),
184 ..Default::default()
185 })
186 .await?;
187 }
188 }
189
190 Ok(())
191 }
192
193 async fn leave_dir(&self, dir: &Path) -> Result<()> {
194 debug!("left dir {}", dir.display());
195 let shell_dirs = self.get_dirs_with_shell_pids().await;
196 let shell_dirs = shell_dirs.keys().collect_vec();
197 let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
198
199 for daemon in self.active_daemons().await {
200 if !daemon.autostop {
201 continue;
202 }
203 if let Some(daemon_dir) = daemon.dir.as_ref()
207 && daemon_dir.starts_with(dir)
208 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
209 {
210 if delay_secs == 0 {
211 info!("autostopping {daemon}");
213 self.stop(&daemon.id).await?;
214 self.add_notification(Info, format!("autostopped {daemon}"))
215 .await;
216 } else {
217 let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
219 let mut pending = self.pending_autostops.lock().await;
220 if !pending.contains_key(&daemon.id) {
221 info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
222 pending.insert(daemon.id.clone(), stop_at);
223 }
224 }
225 }
226 }
227 Ok(())
228 }
229
230 async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
234 let mut pending = self.pending_autostops.lock().await;
235 let daemons_to_cancel: Vec<String> = {
236 let state_file = self.state_file.lock().await;
237 state_file
238 .daemons
239 .iter()
240 .filter(|(_id, d)| {
241 d.dir.as_ref().is_some_and(|daemon_dir| {
242 dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
245 })
246 })
247 .map(|(id, _)| id.clone())
248 .collect()
249 };
250
251 for daemon_id in daemons_to_cancel {
252 if pending.remove(&daemon_id).is_some() {
253 info!("cancelled pending autostop for {}", daemon_id);
254 }
255 }
256 }
257
258 async fn process_pending_autostops(&self) -> Result<()> {
260 let now = time::Instant::now();
261 let to_stop: Vec<String> = {
262 let pending = self.pending_autostops.lock().await;
263 pending
264 .iter()
265 .filter(|(_, stop_at)| now >= **stop_at)
266 .map(|(id, _)| id.clone())
267 .collect()
268 };
269
270 for daemon_id in to_stop {
271 {
273 let mut pending = self.pending_autostops.lock().await;
274 pending.remove(&daemon_id);
275 }
276
277 if let Some(daemon) = self.get_daemon(&daemon_id).await
279 && daemon.autostop
280 && daemon.status.is_running()
281 {
282 let shell_dirs = self.get_dirs_with_shell_pids().await;
284 let shell_dirs = shell_dirs.keys().collect_vec();
285 if let Some(daemon_dir) = daemon.dir.as_ref()
286 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
287 {
288 info!("autostopping {} (after delay)", daemon_id);
289 self.stop(&daemon_id).await?;
290 self.add_notification(Info, format!("autostopped {daemon_id}"))
291 .await;
292 }
293 }
294 }
295 Ok(())
296 }
297
298 async fn start_boot_daemons(&self) -> Result<()> {
299 use crate::pitchfork_toml::PitchforkToml;
300
301 info!("Scanning for boot_start daemons");
302 let pt = PitchforkToml::all_merged();
303
304 let boot_daemons: Vec<_> = pt
305 .daemons
306 .iter()
307 .filter(|(_id, d)| d.boot_start.unwrap_or(false))
308 .collect();
309
310 if boot_daemons.is_empty() {
311 info!("No daemons configured with boot_start = true");
312 return Ok(());
313 }
314
315 info!("Found {} daemon(s) to start at boot", boot_daemons.len());
316
317 for (id, daemon) in boot_daemons {
318 info!("Starting boot daemon: {}", id);
319
320 let dir = daemon
321 .path
322 .as_ref()
323 .and_then(|p| p.parent())
324 .map(|p| p.to_path_buf())
325 .unwrap_or_else(|| env::CWD.clone());
326
327 let run_opts = RunOptions {
328 id: id.clone(),
329 cmd: shell_words::split(&daemon.run).unwrap_or_default(),
330 force: false,
331 shell_pid: None,
332 dir,
333 autostop: false, cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
335 cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
336 retry: daemon.retry,
337 retry_count: 0,
338 ready_delay: daemon.ready_delay,
339 ready_output: daemon.ready_output.clone(),
340 ready_http: daemon.ready_http.clone(),
341 ready_port: daemon.ready_port,
342 wait_ready: false, };
344
345 match self.run(run_opts).await {
346 Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
347 info!("Successfully started boot daemon: {}", id);
348 }
349 Ok(IpcResponse::DaemonAlreadyRunning) => {
350 info!("Boot daemon already running: {}", id);
351 }
352 Ok(other) => {
353 warn!(
354 "Unexpected response when starting boot daemon {}: {:?}",
355 id, other
356 );
357 }
358 Err(e) => {
359 error!("Failed to start boot daemon {}: {}", id, e);
360 }
361 }
362 }
363
364 Ok(())
365 }
366
367 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
368 let id = &opts.id;
369 let cmd = opts.cmd.clone();
370
371 {
373 let mut pending = self.pending_autostops.lock().await;
374 if pending.remove(id).is_some() {
375 info!("cleared pending autostop for {} (daemon starting)", id);
376 }
377 }
378
379 let daemon = self.get_daemon(id).await;
380 if let Some(daemon) = daemon {
381 if !daemon.status.is_stopping()
384 && !daemon.status.is_stopped()
385 && let Some(pid) = daemon.pid
386 {
387 if opts.force {
388 self.stop(id).await?;
389 info!("run: stop completed for daemon {id}");
390 } else {
391 warn!("daemon {id} already running with pid {pid}");
392 return Ok(IpcResponse::DaemonAlreadyRunning);
393 }
394 }
395 }
396
397 if opts.wait_ready && opts.retry > 0 {
399 let max_attempts = opts.retry + 1; for attempt in 0..max_attempts {
401 let mut retry_opts = opts.clone();
402 retry_opts.retry_count = attempt;
403 retry_opts.cmd = cmd.clone();
404
405 let result = self.run_once(retry_opts).await?;
406
407 match result {
408 IpcResponse::DaemonReady { daemon } => {
409 return Ok(IpcResponse::DaemonReady { daemon });
410 }
411 IpcResponse::DaemonFailedWithCode { exit_code } => {
412 if attempt < opts.retry {
413 let backoff_secs = 2u64.pow(attempt);
414 info!(
415 "daemon {id} failed (attempt {}/{}), retrying in {}s",
416 attempt + 1,
417 max_attempts,
418 backoff_secs
419 );
420 time::sleep(Duration::from_secs(backoff_secs)).await;
421 continue;
422 } else {
423 info!("daemon {id} failed after {} attempts", max_attempts);
424 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
425 }
426 }
427 other => return Ok(other),
428 }
429 }
430 }
431
432 self.run_once(opts).await
434 }
435
436 async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
437 let id = &opts.id;
438 let cmd = opts.cmd;
439
440 let (ready_tx, ready_rx) = if opts.wait_ready {
442 let (tx, rx) = oneshot::channel();
443 (Some(tx), Some(rx))
444 } else {
445 (None, None)
446 };
447
448 let cmd = once("exec".to_string())
449 .chain(cmd.into_iter())
450 .collect_vec();
451 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
452 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
453 if let Some(parent) = log_path.parent() {
454 xx::file::mkdirp(parent)?;
455 }
456 info!("run: spawning daemon {id} with args: {args:?}");
457 let mut cmd = tokio::process::Command::new("sh");
458 cmd.args(&args)
459 .stdin(std::process::Stdio::null())
460 .stdout(std::process::Stdio::piped())
461 .stderr(std::process::Stdio::piped())
462 .current_dir(&opts.dir);
463
464 if let Some(ref path) = *env::ORIGINAL_PATH {
466 cmd.env("PATH", path);
467 }
468
469 let mut child = cmd.spawn().into_diagnostic()?;
470 let pid = match child.id() {
471 Some(p) => p,
472 None => {
473 warn!("Daemon {id} exited before PID could be captured");
474 return Ok(IpcResponse::DaemonFailed {
475 error: "Process exited immediately".to_string(),
476 });
477 }
478 };
479 info!("started daemon {id} with pid {pid}");
480 let daemon = self
481 .upsert_daemon(UpsertDaemonOpts {
482 id: id.to_string(),
483 pid: Some(pid),
484 status: DaemonStatus::Running,
485 shell_pid: opts.shell_pid,
486 dir: Some(opts.dir.clone()),
487 autostop: opts.autostop,
488 cron_schedule: opts.cron_schedule.clone(),
489 cron_retrigger: opts.cron_retrigger,
490 last_exit_success: None,
491 retry: Some(opts.retry),
492 retry_count: Some(opts.retry_count),
493 ready_delay: opts.ready_delay,
494 ready_output: opts.ready_output.clone(),
495 ready_http: opts.ready_http.clone(),
496 ready_port: opts.ready_port,
497 })
498 .await?;
499
500 let id_clone = id.to_string();
501 let ready_delay = opts.ready_delay;
502 let ready_output = opts.ready_output.clone();
503 let ready_http = opts.ready_http.clone();
504 let ready_port = opts.ready_port;
505
506 tokio::spawn(async move {
507 let id = id_clone;
508 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
509 (Some(out), Some(err)) => (out, err),
510 _ => {
511 error!("Failed to capture stdout/stderr for daemon {id}");
512 return;
513 }
514 };
515 let mut stdout = tokio::io::BufReader::new(stdout).lines();
516 let mut stderr = tokio::io::BufReader::new(stderr).lines();
517 let log_file = match tokio::fs::File::options()
518 .append(true)
519 .create(true)
520 .open(&log_path)
521 .await
522 {
523 Ok(f) => f,
524 Err(e) => {
525 error!("Failed to open log file for daemon {id}: {e}");
526 return;
527 }
528 };
529 let mut log_appender = BufWriter::new(log_file);
530
531 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
532 let format_line = |line: String| {
533 if line.starts_with(&format!("{id} ")) {
534 format!("{} {line}\n", now())
536 } else {
537 format!("{} {id} {line}\n", now())
538 }
539 };
540
541 let mut ready_notified = false;
543 let mut ready_tx = ready_tx;
544 let ready_pattern =
545 ready_output
546 .as_ref()
547 .and_then(|pattern| match regex::Regex::new(pattern) {
548 Ok(re) => Some(re),
549 Err(e) => {
550 error!("invalid regex pattern for daemon {id}: {e}");
551 None
552 }
553 });
554
555 let mut delay_timer =
556 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
557
558 let mut http_check_interval = ready_http
560 .as_ref()
561 .map(|_| tokio::time::interval(Duration::from_millis(500)));
562 let http_client = ready_http.as_ref().map(|_| {
563 reqwest::Client::builder()
564 .timeout(Duration::from_secs(5))
565 .build()
566 .unwrap_or_default()
567 });
568
569 let mut port_check_interval =
571 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
572
573 let (exit_tx, mut exit_rx) =
575 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
576
577 let child_pid = child.id().unwrap_or(0);
579 tokio::spawn(async move {
580 let result = child.wait().await;
581 debug!(
582 "daemon pid {child_pid} wait() completed with result: {:?}",
583 result
584 );
585 let _ = exit_tx.send(result).await;
586 });
587
588 let mut exit_status = None;
589
590 loop {
591 select! {
592 Ok(Some(line)) = stdout.next_line() => {
593 let formatted = format_line(line.clone());
594 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
595 error!("Failed to write to log for daemon {id}: {e}");
596 }
597 if let Err(e) = log_appender.flush().await {
598 error!("Failed to flush log for daemon {id}: {e}");
599 }
600 trace!("stdout: {id} {formatted}");
601
602 if !ready_notified
604 && let Some(ref pattern) = ready_pattern
605 && pattern.is_match(&line) {
606 info!("daemon {id} ready: output matched pattern");
607 ready_notified = true;
608 if let Some(tx) = ready_tx.take() {
609 let _ = tx.send(Ok(()));
610 }
611 }
612 }
613 Ok(Some(line)) = stderr.next_line() => {
614 let formatted = format_line(line.clone());
615 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
616 error!("Failed to write to log for daemon {id}: {e}");
617 }
618 if let Err(e) = log_appender.flush().await {
619 error!("Failed to flush log for daemon {id}: {e}");
620 }
621 trace!("stderr: {id} {formatted}");
622
623 if !ready_notified
625 && let Some(ref pattern) = ready_pattern
626 && pattern.is_match(&line) {
627 info!("daemon {id} ready: output matched pattern");
628 ready_notified = true;
629 if let Some(tx) = ready_tx.take() {
630 let _ = tx.send(Ok(()));
631 }
632 }
633 },
634 Some(result) = exit_rx.recv() => {
635 exit_status = Some(result);
637 debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
638 if !ready_notified {
639 if let Some(tx) = ready_tx.take() {
640 let is_success = exit_status.as_ref()
642 .and_then(|r| r.as_ref().ok())
643 .map(|s| s.success())
644 .unwrap_or(false);
645
646 if is_success {
647 debug!("daemon {id} exited successfully before ready check, sending success notification");
648 let _ = tx.send(Ok(()));
649 } else {
650 let exit_code = exit_status.as_ref()
651 .and_then(|r| r.as_ref().ok())
652 .and_then(|s| s.code());
653 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
654 let _ = tx.send(Err(exit_code));
655 }
656 }
657 } else {
658 debug!("daemon {id} was already marked ready, not sending notification");
659 }
660 break;
661 }
662 _ = async {
663 if let Some(ref mut interval) = http_check_interval {
664 interval.tick().await;
665 } else {
666 std::future::pending::<()>().await;
667 }
668 }, if !ready_notified && ready_http.is_some() => {
669 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
670 match client.get(url).send().await {
671 Ok(response) if response.status().is_success() => {
672 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
673 ready_notified = true;
674 if let Some(tx) = ready_tx.take() {
675 let _ = tx.send(Ok(()));
676 }
677 http_check_interval = None;
679 }
680 Ok(response) => {
681 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
682 }
683 Err(e) => {
684 trace!("daemon {id} HTTP check failed: {e}");
685 }
686 }
687 }
688 }
689 _ = async {
690 if let Some(ref mut interval) = port_check_interval {
691 interval.tick().await;
692 } else {
693 std::future::pending::<()>().await;
694 }
695 }, if !ready_notified && ready_port.is_some() => {
696 if let Some(port) = ready_port {
697 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
698 Ok(_) => {
699 info!("daemon {id} ready: TCP port {port} is listening");
700 ready_notified = true;
701 if let Some(tx) = ready_tx.take() {
702 let _ = tx.send(Ok(()));
703 }
704 port_check_interval = None;
706 }
707 Err(_) => {
708 trace!("daemon {id} port check: port {port} not listening yet");
709 }
710 }
711 }
712 }
713 _ = async {
714 if let Some(ref mut timer) = delay_timer {
715 timer.await;
716 } else {
717 std::future::pending::<()>().await;
718 }
719 } => {
720 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
721 info!("daemon {id} ready: delay elapsed");
722 ready_notified = true;
723 if let Some(tx) = ready_tx.take() {
724 let _ = tx.send(Ok(()));
725 }
726 }
727 delay_timer = None;
729 }
730 else => break,
731 }
732 }
733
734 let exit_status = if let Some(status) = exit_status {
736 status
737 } else {
738 match exit_rx.recv().await {
740 Some(status) => status,
741 None => {
742 warn!("daemon {id} exit channel closed without receiving status");
743 Err(std::io::Error::other("exit channel closed"))
744 }
745 }
746 };
747 let current_daemon = SUPERVISOR.get_daemon(&id).await;
748
749 if current_daemon.is_none()
751 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
752 {
753 return;
755 }
756 let is_stopping = current_daemon
757 .as_ref()
758 .is_some_and(|d| d.status.is_stopping());
759
760 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
761 return;
763 }
764 if let Ok(status) = exit_status {
765 info!("daemon {id} exited with status {status}");
766 if status.success() || is_stopping {
767 if let Err(e) = SUPERVISOR
770 .upsert_daemon(UpsertDaemonOpts {
771 id: id.clone(),
772 pid: None, status: DaemonStatus::Stopped,
774 last_exit_success: Some(status.success()),
775 ..Default::default()
776 })
777 .await
778 {
779 error!("Failed to update daemon state for {id}: {e}");
780 }
781 } else {
782 if let Err(e) = SUPERVISOR
785 .upsert_daemon(UpsertDaemonOpts {
786 id: id.clone(),
787 pid: None,
788 status: DaemonStatus::Errored(status.code()),
789 last_exit_success: Some(false),
790 ..Default::default()
791 })
792 .await
793 {
794 error!("Failed to update daemon state for {id}: {e}");
795 }
796 }
797 } else if let Err(e) = SUPERVISOR
798 .upsert_daemon(UpsertDaemonOpts {
799 id: id.clone(),
800 pid: None,
801 status: DaemonStatus::Errored(None),
802 last_exit_success: Some(false),
803 ..Default::default()
804 })
805 .await
806 {
807 error!("Failed to update daemon state for {id}: {e}");
808 }
809 });
810
811 if let Some(ready_rx) = ready_rx {
813 match ready_rx.await {
814 Ok(Ok(())) => {
815 info!("daemon {id} is ready");
816 Ok(IpcResponse::DaemonReady { daemon })
817 }
818 Ok(Err(exit_code)) => {
819 error!("daemon {id} failed before becoming ready");
820 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
821 }
822 Err(_) => {
823 error!("readiness channel closed unexpectedly for daemon {id}");
824 Ok(IpcResponse::DaemonStart { daemon })
825 }
826 }
827 } else {
828 Ok(IpcResponse::DaemonStart { daemon })
829 }
830 }
831
832 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
833 if id == "pitchfork" {
834 return Ok(IpcResponse::Error(
835 "Cannot stop supervisor via stop command".into(),
836 ));
837 }
838 info!("stopping daemon: {id}");
839 if let Some(daemon) = self.get_daemon(id).await {
840 trace!("daemon to stop: {daemon}");
841 if let Some(pid) = daemon.pid {
842 trace!("killing pid: {pid}");
843 PROCS.refresh_processes();
844 if PROCS.is_running(pid) {
845 self.upsert_daemon(UpsertDaemonOpts {
847 id: id.to_string(),
848 status: DaemonStatus::Stopping,
849 ..Default::default()
850 })
851 .await?;
852
853 if let Err(e) = PROCS.kill_async(pid).await {
855 warn!("failed to kill pid {pid}: {e}");
856 }
857 PROCS.refresh_processes();
858 for child_pid in PROCS.all_children(pid) {
859 debug!("killing child pid: {child_pid}");
860 if let Err(e) = PROCS.kill_async(child_pid).await {
861 warn!("failed to kill child pid {child_pid}: {e}");
862 }
863 }
864 } else {
866 debug!("pid {pid} not running");
867 self.upsert_daemon(UpsertDaemonOpts {
869 id: id.to_string(),
870 pid: None,
871 status: DaemonStatus::Stopped,
872 ..Default::default()
873 })
874 .await?;
875 }
876 return Ok(IpcResponse::Ok);
877 } else {
878 debug!("daemon {id} not running");
879 }
880 } else {
881 debug!("daemon {id} not found");
882 }
883 Ok(IpcResponse::DaemonAlreadyStopped)
884 }
885
886 #[cfg(unix)]
887 fn signals(&self) -> Result<()> {
888 let signals = [
889 SignalKind::terminate(),
890 SignalKind::alarm(),
891 SignalKind::interrupt(),
892 SignalKind::quit(),
893 SignalKind::hangup(),
894 SignalKind::user_defined1(),
895 SignalKind::user_defined2(),
896 ];
897 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
898 for signal in signals {
899 let stream = match signal::unix::signal(signal) {
900 Ok(s) => s,
901 Err(e) => {
902 warn!("Failed to register signal handler for {:?}: {}", signal, e);
903 continue;
904 }
905 };
906 tokio::spawn(async move {
907 let mut stream = stream;
908 loop {
909 stream.recv().await;
910 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
911 exit(1);
912 } else {
913 SUPERVISOR.handle_signal().await;
914 }
915 }
916 });
917 }
918 Ok(())
919 }
920
921 #[cfg(windows)]
922 fn signals(&self) -> Result<()> {
923 tokio::spawn(async move {
924 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
925 loop {
926 if let Err(e) = signal::ctrl_c().await {
927 error!("Failed to wait for ctrl-c: {}", e);
928 return;
929 }
930 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
931 exit(1);
932 } else {
933 SUPERVISOR.handle_signal().await;
934 }
935 }
936 });
937 Ok(())
938 }
939
940 async fn handle_signal(&self) {
941 info!("received signal, stopping");
942 self.close().await;
943 exit(0)
944 }
945
946 fn interval_watch(&self) -> Result<()> {
973 tokio::spawn(async move {
974 let mut interval = time::interval(interval_duration());
975 loop {
976 interval.tick().await;
977 if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
978 && let Err(err) = SUPERVISOR.refresh().await
979 {
980 error!("failed to refresh: {err}");
981 }
982 }
983 });
984 Ok(())
985 }
986
987 fn cron_watch(&self) -> Result<()> {
988 tokio::spawn(async move {
989 let mut interval = time::interval(Duration::from_secs(60));
992 loop {
993 interval.tick().await;
994 if let Err(err) = SUPERVISOR.check_cron_schedules().await {
995 error!("failed to check cron schedules: {err}");
996 }
997 }
998 });
999 Ok(())
1000 }
1001
1002 async fn check_cron_schedules(&self) -> Result<()> {
1003 use cron::Schedule;
1004 use std::str::FromStr;
1005
1006 let now = chrono::Local::now();
1007 let daemons = self.state_file.lock().await.daemons.clone();
1008
1009 for (id, daemon) in daemons {
1010 if let Some(schedule_str) = &daemon.cron_schedule
1011 && let Some(retrigger) = daemon.cron_retrigger
1012 {
1013 let schedule = match Schedule::from_str(schedule_str) {
1015 Ok(s) => s,
1016 Err(e) => {
1017 warn!("invalid cron schedule for daemon {id}: {e}");
1018 continue;
1019 }
1020 };
1021
1022 let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
1024 let diff = next.signed_duration_since(now);
1026 diff.num_seconds() < 60 && diff.num_seconds() >= 0
1027 });
1028
1029 if should_trigger {
1030 let should_run = match retrigger {
1031 crate::pitchfork_toml::CronRetrigger::Finish => {
1032 daemon.pid.is_none()
1034 }
1035 crate::pitchfork_toml::CronRetrigger::Always => {
1036 true
1038 }
1039 crate::pitchfork_toml::CronRetrigger::Success => {
1040 daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1042 }
1043 crate::pitchfork_toml::CronRetrigger::Fail => {
1044 daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1046 }
1047 };
1048
1049 if should_run {
1050 info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1051 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1053 let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1054 let force =
1056 matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1057 let opts = RunOptions {
1058 id: id.clone(),
1059 cmd: shell_words::split(&run_cmd).unwrap_or_default(),
1060 force,
1061 shell_pid: None,
1062 dir,
1063 autostop: daemon.autostop,
1064 cron_schedule: Some(schedule_str.clone()),
1065 cron_retrigger: Some(retrigger),
1066 retry: daemon.retry,
1067 retry_count: daemon.retry_count,
1068 ready_delay: daemon.ready_delay,
1069 ready_output: daemon.ready_output.clone(),
1070 ready_http: daemon.ready_http.clone(),
1071 ready_port: daemon.ready_port,
1072 wait_ready: false,
1073 };
1074 if let Err(e) = self.run(opts).await {
1075 error!("failed to run cron daemon {id}: {e}");
1076 }
1077 } else {
1078 warn!("no run command found for cron daemon {id}");
1079 }
1080 }
1081 }
1082 }
1083 }
1084
1085 Ok(())
1086 }
1087
1088 fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1089 use crate::pitchfork_toml::PitchforkToml;
1090 let pt = PitchforkToml::all_merged();
1091 pt.daemons.get(id).map(|d| d.run.clone())
1092 }
1093
1094 async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1095 loop {
1096 let (msg, send) = match ipc.read().await {
1097 Ok(msg) => msg,
1098 Err(e) => {
1099 error!("failed to accept connection: {:?}", e);
1100 continue;
1101 }
1102 };
1103 debug!("received message: {:?}", msg);
1104 tokio::spawn(async move {
1105 let rsp = SUPERVISOR
1106 .handle_ipc(msg)
1107 .await
1108 .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1109 if let Err(err) = send.send(rsp).await {
1110 debug!("failed to send message: {:?}", err);
1111 }
1112 });
1113 }
1114 }
1115
1116 async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1117 let rsp = match req {
1118 IpcRequest::Connect => {
1119 debug!("received connect message");
1120 IpcResponse::Ok
1121 }
1122 IpcRequest::Stop { id } => self.stop(&id).await?,
1123 IpcRequest::Run(opts) => self.run(opts).await?,
1124 IpcRequest::Enable { id } => {
1125 if self.enable(id).await? {
1126 IpcResponse::Yes
1127 } else {
1128 IpcResponse::No
1129 }
1130 }
1131 IpcRequest::Disable { id } => {
1132 if self.disable(id).await? {
1133 IpcResponse::Yes
1134 } else {
1135 IpcResponse::No
1136 }
1137 }
1138 IpcRequest::GetActiveDaemons => {
1139 let daemons = self.active_daemons().await;
1140 IpcResponse::ActiveDaemons(daemons)
1141 }
1142 IpcRequest::GetNotifications => {
1143 let notifications = self.get_notifications().await;
1144 IpcResponse::Notifications(notifications)
1145 }
1146 IpcRequest::UpdateShellDir { shell_pid, dir } => {
1147 let prev = self.get_shell_dir(shell_pid).await;
1148 self.set_shell_dir(shell_pid, dir.clone()).await?;
1149 self.cancel_pending_autostops_for_dir(&dir).await;
1151 if let Some(prev) = prev {
1152 self.leave_dir(&prev).await?;
1153 }
1154 self.refresh().await?;
1155 IpcResponse::Ok
1156 }
1157 IpcRequest::Clean => {
1158 self.clean().await?;
1159 IpcResponse::Ok
1160 }
1161 IpcRequest::GetDisabledDaemons => {
1162 let disabled = self.state_file.lock().await.disabled.clone();
1163 IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1164 }
1165 };
1166 Ok(rsp)
1167 }
1168
1169 async fn close(&self) {
1170 for daemon in self.active_daemons().await {
1171 if daemon.id == "pitchfork" {
1172 continue;
1173 }
1174 if let Err(err) = self.stop(&daemon.id).await {
1175 error!("failed to stop daemon {daemon}: {err}");
1176 }
1177 }
1178 let _ = self.remove_daemon("pitchfork").await;
1179 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1180 }
1182
1183 async fn add_notification(&self, level: log::LevelFilter, message: String) {
1184 self.pending_notifications
1185 .lock()
1186 .await
1187 .push((level, message));
1188 }
1189
1190 async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1191 self.pending_notifications.lock().await.drain(..).collect()
1192 }
1193
1194 async fn active_daemons(&self) -> Vec<Daemon> {
1195 self.state_file
1196 .lock()
1197 .await
1198 .daemons
1199 .values()
1200 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1201 .cloned()
1202 .collect()
1203 }
1204
1205 async fn remove_daemon(&self, id: &str) -> Result<()> {
1206 self.state_file.lock().await.daemons.remove(id);
1207 if let Err(err) = self.state_file.lock().await.write() {
1208 warn!("failed to update state file: {err:#}");
1209 }
1210 Ok(())
1211 }
1212
1213 async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1214 info!(
1215 "upserting daemon: {} pid: {} status: {}",
1216 opts.id,
1217 opts.pid.unwrap_or(0),
1218 opts.status
1219 );
1220 let mut state_file = self.state_file.lock().await;
1221 let existing = state_file.daemons.get(&opts.id);
1222 let daemon = Daemon {
1223 id: opts.id.to_string(),
1224 title: opts.pid.and_then(|pid| PROCS.title(pid)),
1225 pid: opts.pid,
1226 status: opts.status,
1227 shell_pid: opts.shell_pid,
1228 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1229 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1230 cron_schedule: opts
1231 .cron_schedule
1232 .or(existing.and_then(|d| d.cron_schedule.clone())),
1233 cron_retrigger: opts
1234 .cron_retrigger
1235 .or(existing.and_then(|d| d.cron_retrigger)),
1236 last_exit_success: opts
1237 .last_exit_success
1238 .or(existing.and_then(|d| d.last_exit_success)),
1239 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1240 retry_count: opts
1241 .retry_count
1242 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1243 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1244 ready_output: opts
1245 .ready_output
1246 .or(existing.and_then(|d| d.ready_output.clone())),
1247 ready_http: opts
1248 .ready_http
1249 .or(existing.and_then(|d| d.ready_http.clone())),
1250 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1251 };
1252 state_file
1253 .daemons
1254 .insert(opts.id.to_string(), daemon.clone());
1255 if let Err(err) = state_file.write() {
1256 warn!("failed to update state file: {err:#}");
1257 }
1258 Ok(daemon)
1259 }
1260
1261 pub async fn enable(&self, id: String) -> Result<bool> {
1262 info!("enabling daemon: {id}");
1263 let mut state_file = self.state_file.lock().await;
1264 let result = state_file.disabled.remove(&id);
1265 state_file.write()?;
1266 Ok(result)
1267 }
1268
1269 pub async fn disable(&self, id: String) -> Result<bool> {
1270 info!("disabling daemon: {id}");
1271 let mut state_file = self.state_file.lock().await;
1272 let result = state_file.disabled.insert(id);
1273 state_file.write()?;
1274 Ok(result)
1275 }
1276
1277 async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1278 self.state_file.lock().await.daemons.get(id).cloned()
1279 }
1280
1281 async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1282 let mut state_file = self.state_file.lock().await;
1283 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1284 state_file.write()?;
1285 Ok(())
1286 }
1287
1288 async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1289 self.state_file
1290 .lock()
1291 .await
1292 .shell_dirs
1293 .get(&shell_pid.to_string())
1294 .cloned()
1295 }
1296
1297 async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1298 let mut state_file = self.state_file.lock().await;
1299 if state_file
1300 .shell_dirs
1301 .remove(&shell_pid.to_string())
1302 .is_some()
1303 {
1304 state_file.write()?;
1305 }
1306 Ok(())
1307 }
1308
1309 async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1310 self.state_file.lock().await.shell_dirs.iter().fold(
1311 HashMap::new(),
1312 |mut acc, (pid, dir)| {
1313 if let Ok(pid) = pid.parse() {
1314 acc.entry(dir.clone()).or_default().push(pid);
1315 }
1316 acc
1317 },
1318 )
1319 }
1320
1321 async fn clean(&self) -> Result<()> {
1322 let mut state_file = self.state_file.lock().await;
1323 state_file.daemons.retain(|_id, d| d.pid.is_some());
1324 state_file.write()?;
1325 Ok(())
1326 }
1327}
1328
1329#[derive(Debug)]
1330struct UpsertDaemonOpts {
1331 id: String,
1332 pid: Option<u32>,
1333 status: DaemonStatus,
1334 shell_pid: Option<u32>,
1335 dir: Option<PathBuf>,
1336 autostop: bool,
1337 cron_schedule: Option<String>,
1338 cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1339 last_exit_success: Option<bool>,
1340 retry: Option<u32>,
1341 retry_count: Option<u32>,
1342 ready_delay: Option<u64>,
1343 ready_output: Option<String>,
1344 ready_http: Option<String>,
1345 ready_port: Option<u16>,
1346}
1347
1348impl Default for UpsertDaemonOpts {
1349 fn default() -> Self {
1350 Self {
1351 id: "".to_string(),
1352 pid: None,
1353 status: DaemonStatus::Stopped,
1354 shell_pid: None,
1355 dir: None,
1356 autostop: false,
1357 cron_schedule: None,
1358 cron_retrigger: None,
1359 last_exit_success: None,
1360 retry: None,
1361 retry_count: None,
1362 ready_delay: None,
1363 ready_output: None,
1364 ready_http: None,
1365 ready_port: None,
1366 }
1367 }
1368}