1use crate::daemon::{Daemon, RunOptions, validate_daemon_id};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::IpcServer;
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{Result, env};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use regex::Regex;
14use std::collections::HashMap;
15use std::fs;
16use std::iter::once;
17use std::path::{Path, PathBuf};
18use std::process::exit;
19use std::sync::atomic;
20use std::sync::atomic::AtomicBool;
21use std::time::Duration;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
23#[cfg(unix)]
24use tokio::signal::unix::SignalKind;
25use tokio::sync::Mutex;
26use tokio::sync::oneshot;
27use tokio::{select, signal, time};
28
29static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
31 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
32
33fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
35 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
36 if let Some(re) = cache.get(pattern) {
37 return Some(re.clone());
38 }
39 match Regex::new(pattern) {
40 Ok(re) => {
41 cache.insert(pattern.to_string(), re.clone());
42 Some(re)
43 }
44 Err(e) => {
45 error!("invalid regex pattern '{}': {}", pattern, e);
46 None
47 }
48 }
49}
50
51pub struct Supervisor {
52 state_file: Mutex<StateFile>,
53 pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
54 last_refreshed_at: Mutex<time::Instant>,
55 pending_autostops: Mutex<HashMap<String, time::Instant>>,
57}
58
59fn interval_duration() -> Duration {
60 Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
61}
62
63pub static SUPERVISOR: Lazy<Supervisor> =
64 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
65
66pub fn start_if_not_running() -> Result<()> {
67 let sf = StateFile::get();
68 if let Some(d) = sf.daemons.get("pitchfork")
69 && let Some(pid) = d.pid
70 && PROCS.is_running(pid)
71 {
72 return Ok(());
73 }
74 start_in_background()
75}
76
77pub fn start_in_background() -> Result<()> {
78 debug!("starting supervisor in background");
79 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
80 .stdout_null()
81 .stderr_null()
82 .start()
83 .into_diagnostic()?;
84 Ok(())
85}
86
87impl Supervisor {
88 pub fn new() -> Result<Self> {
89 Ok(Self {
90 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
91 last_refreshed_at: Mutex::new(time::Instant::now()),
92 pending_notifications: Mutex::new(vec![]),
93 pending_autostops: Mutex::new(HashMap::new()),
94 })
95 }
96
97 pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
98 let pid = std::process::id();
99 info!("Starting supervisor with pid {pid}");
100
101 self.upsert_daemon(UpsertDaemonOpts {
102 id: "pitchfork".to_string(),
103 pid: Some(pid),
104 status: DaemonStatus::Running,
105 ..Default::default()
106 })
107 .await?;
108
109 if is_boot {
111 info!("Boot start mode enabled, starting boot_start daemons");
112 self.start_boot_daemons().await?;
113 }
114
115 self.interval_watch()?;
116 self.cron_watch()?;
117 self.signals()?;
118 if let Some(port) = web_port {
122 tokio::spawn(async move {
123 if let Err(e) = crate::web::serve(port).await {
124 error!("Web server error: {}", e);
125 }
126 });
127 }
128
129 let ipc = IpcServer::new()?;
130 self.conn_watch(ipc).await
131 }
132
133 async fn refresh(&self) -> Result<()> {
134 trace!("refreshing");
135
136 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
139 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
140
141 if pids_to_check.is_empty() {
142 trace!("no shell PIDs to check, skipping process refresh");
144 } else {
145 PROCS.refresh_pids(&pids_to_check);
146 }
147
148 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
149 *last_refreshed_at = time::Instant::now();
150
151 for (dir, pids) in dirs_with_pids {
152 let to_remove = pids
153 .iter()
154 .filter(|pid| !PROCS.is_running(**pid))
155 .collect_vec();
156 for pid in &to_remove {
157 self.remove_shell_pid(**pid).await?
158 }
159 if to_remove.len() == pids.len() {
160 self.leave_dir(&dir).await?;
161 }
162 }
163
164 self.check_retry().await?;
165 self.process_pending_autostops().await?;
166
167 Ok(())
168 }
169
170 async fn check_retry(&self) -> Result<()> {
171 let ids_to_retry: Vec<String> = {
173 let state_file = self.state_file.lock().await;
174 state_file
175 .daemons
176 .iter()
177 .filter(|(_id, d)| {
178 d.status.is_errored()
180 && d.pid.is_none()
181 && d.retry > 0
182 && d.retry_count < d.retry
183 })
184 .map(|(id, _d)| id.clone())
185 .collect()
186 };
187
188 for id in ids_to_retry {
189 let daemon = {
192 let state_file = self.state_file.lock().await;
193 match state_file.daemons.get(&id) {
194 Some(d)
195 if d.status.is_errored()
196 && d.pid.is_none()
197 && d.retry > 0
198 && d.retry_count < d.retry =>
199 {
200 d.clone()
201 }
202 _ => continue, }
204 };
205 info!(
206 "retrying daemon {} ({}/{} attempts)",
207 id,
208 daemon.retry_count + 1,
209 daemon.retry
210 );
211
212 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
214 let cmd = match shell_words::split(&run_cmd) {
215 Ok(cmd) => cmd,
216 Err(e) => {
217 error!("failed to parse command for daemon {}: {}", id, e);
218 self.upsert_daemon(UpsertDaemonOpts {
220 id,
221 status: daemon.status.clone(),
222 retry_count: Some(daemon.retry),
223 ..Default::default()
224 })
225 .await?;
226 continue;
227 }
228 };
229 let retry_opts = RunOptions {
230 id: id.clone(),
231 cmd,
232 force: false,
233 shell_pid: daemon.shell_pid,
234 dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
235 autostop: daemon.autostop,
236 cron_schedule: daemon.cron_schedule,
237 cron_retrigger: daemon.cron_retrigger,
238 retry: daemon.retry,
239 retry_count: daemon.retry_count + 1,
240 ready_delay: daemon.ready_delay,
241 ready_output: daemon.ready_output.clone(),
242 ready_http: daemon.ready_http.clone(),
243 ready_port: daemon.ready_port,
244 wait_ready: false,
245 depends: daemon.depends.clone(),
246 };
247 if let Err(e) = self.run(retry_opts).await {
248 error!("failed to retry daemon {}: {}", id, e);
249 }
250 } else {
251 warn!("no run command found for daemon {}, cannot retry", id);
252 self.upsert_daemon(UpsertDaemonOpts {
254 id,
255 retry_count: Some(daemon.retry),
256 ..Default::default()
257 })
258 .await?;
259 }
260 }
261
262 Ok(())
263 }
264
265 async fn leave_dir(&self, dir: &Path) -> Result<()> {
266 debug!("left dir {}", dir.display());
267 let shell_dirs = self.get_dirs_with_shell_pids().await;
268 let shell_dirs = shell_dirs.keys().collect_vec();
269 let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
270
271 for daemon in self.active_daemons().await {
272 if !daemon.autostop {
273 continue;
274 }
275 if let Some(daemon_dir) = daemon.dir.as_ref()
279 && daemon_dir.starts_with(dir)
280 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
281 {
282 if delay_secs == 0 {
283 info!("autostopping {daemon}");
285 self.stop(&daemon.id).await?;
286 self.add_notification(Info, format!("autostopped {daemon}"))
287 .await;
288 } else {
289 let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
291 let mut pending = self.pending_autostops.lock().await;
292 if !pending.contains_key(&daemon.id) {
293 info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
294 pending.insert(daemon.id.clone(), stop_at);
295 }
296 }
297 }
298 }
299 Ok(())
300 }
301
302 async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
306 let mut pending = self.pending_autostops.lock().await;
307 let daemons_to_cancel: Vec<String> = {
308 let state_file = self.state_file.lock().await;
309 state_file
310 .daemons
311 .iter()
312 .filter(|(_id, d)| {
313 d.dir.as_ref().is_some_and(|daemon_dir| {
314 dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
317 })
318 })
319 .map(|(id, _)| id.clone())
320 .collect()
321 };
322
323 for daemon_id in daemons_to_cancel {
324 if pending.remove(&daemon_id).is_some() {
325 info!("cancelled pending autostop for {}", daemon_id);
326 }
327 }
328 }
329
330 async fn process_pending_autostops(&self) -> Result<()> {
332 let now = time::Instant::now();
333 let to_stop: Vec<String> = {
334 let pending = self.pending_autostops.lock().await;
335 pending
336 .iter()
337 .filter(|(_, stop_at)| now >= **stop_at)
338 .map(|(id, _)| id.clone())
339 .collect()
340 };
341
342 for daemon_id in to_stop {
343 {
345 let mut pending = self.pending_autostops.lock().await;
346 pending.remove(&daemon_id);
347 }
348
349 if let Some(daemon) = self.get_daemon(&daemon_id).await
351 && daemon.autostop
352 && daemon.status.is_running()
353 {
354 let shell_dirs = self.get_dirs_with_shell_pids().await;
356 let shell_dirs = shell_dirs.keys().collect_vec();
357 if let Some(daemon_dir) = daemon.dir.as_ref()
358 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
359 {
360 info!("autostopping {} (after delay)", daemon_id);
361 self.stop(&daemon_id).await?;
362 self.add_notification(Info, format!("autostopped {daemon_id}"))
363 .await;
364 }
365 }
366 }
367 Ok(())
368 }
369
370 async fn start_boot_daemons(&self) -> Result<()> {
371 use crate::pitchfork_toml::PitchforkToml;
372
373 info!("Scanning for boot_start daemons");
374 let pt = PitchforkToml::all_merged();
375
376 let boot_daemons: Vec<_> = pt
377 .daemons
378 .iter()
379 .filter(|(_id, d)| d.boot_start.unwrap_or(false))
380 .collect();
381
382 if boot_daemons.is_empty() {
383 info!("No daemons configured with boot_start = true");
384 return Ok(());
385 }
386
387 info!("Found {} daemon(s) to start at boot", boot_daemons.len());
388
389 for (id, daemon) in boot_daemons {
390 info!("Starting boot daemon: {}", id);
391
392 let dir = daemon
393 .path
394 .as_ref()
395 .and_then(|p| p.parent())
396 .map(|p| p.to_path_buf())
397 .unwrap_or_else(|| env::CWD.clone());
398
399 let cmd = match shell_words::split(&daemon.run) {
400 Ok(cmd) => cmd,
401 Err(e) => {
402 error!("failed to parse command for boot daemon {}: {}", id, e);
403 continue;
404 }
405 };
406 let run_opts = RunOptions {
407 id: id.clone(),
408 cmd,
409 force: false,
410 shell_pid: None,
411 dir,
412 autostop: false, cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
414 cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
415 retry: daemon.retry,
416 retry_count: 0,
417 ready_delay: daemon.ready_delay,
418 ready_output: daemon.ready_output.clone(),
419 ready_http: daemon.ready_http.clone(),
420 ready_port: daemon.ready_port,
421 wait_ready: false, depends: daemon.depends.clone(),
423 };
424
425 match self.run(run_opts).await {
426 Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
427 info!("Successfully started boot daemon: {}", id);
428 }
429 Ok(IpcResponse::DaemonAlreadyRunning) => {
430 info!("Boot daemon already running: {}", id);
431 }
432 Ok(other) => {
433 warn!(
434 "Unexpected response when starting boot daemon {}: {:?}",
435 id, other
436 );
437 }
438 Err(e) => {
439 error!("Failed to start boot daemon {}: {}", id, e);
440 }
441 }
442 }
443
444 Ok(())
445 }
446
447 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
448 let id = &opts.id;
449 let cmd = opts.cmd.clone();
450
451 {
453 let mut pending = self.pending_autostops.lock().await;
454 if pending.remove(id).is_some() {
455 info!("cleared pending autostop for {} (daemon starting)", id);
456 }
457 }
458
459 let daemon = self.get_daemon(id).await;
460 if let Some(daemon) = daemon {
461 if !daemon.status.is_stopping()
464 && !daemon.status.is_stopped()
465 && let Some(pid) = daemon.pid
466 {
467 if opts.force {
468 self.stop(id).await?;
469 info!("run: stop completed for daemon {id}");
470 } else {
471 warn!("daemon {id} already running with pid {pid}");
472 return Ok(IpcResponse::DaemonAlreadyRunning);
473 }
474 }
475 }
476
477 if opts.wait_ready && opts.retry > 0 {
479 let max_attempts = opts.retry + 1; for attempt in 0..max_attempts {
481 let mut retry_opts = opts.clone();
482 retry_opts.retry_count = attempt;
483 retry_opts.cmd = cmd.clone();
484
485 let result = self.run_once(retry_opts).await?;
486
487 match result {
488 IpcResponse::DaemonReady { daemon } => {
489 return Ok(IpcResponse::DaemonReady { daemon });
490 }
491 IpcResponse::DaemonFailedWithCode { exit_code } => {
492 if attempt < opts.retry {
493 let backoff_secs = 2u64.pow(attempt);
494 info!(
495 "daemon {id} failed (attempt {}/{}), retrying in {}s",
496 attempt + 1,
497 max_attempts,
498 backoff_secs
499 );
500 time::sleep(Duration::from_secs(backoff_secs)).await;
501 continue;
502 } else {
503 info!("daemon {id} failed after {} attempts", max_attempts);
504 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
505 }
506 }
507 other => return Ok(other),
508 }
509 }
510 }
511
512 self.run_once(opts).await
514 }
515
516 async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
517 let id = &opts.id;
518 let cmd = opts.cmd;
519
520 let (ready_tx, ready_rx) = if opts.wait_ready {
522 let (tx, rx) = oneshot::channel();
523 (Some(tx), Some(rx))
524 } else {
525 (None, None)
526 };
527
528 let cmd = once("exec".to_string())
529 .chain(cmd.into_iter())
530 .collect_vec();
531 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
532 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
533 if let Some(parent) = log_path.parent() {
534 xx::file::mkdirp(parent)?;
535 }
536 info!("run: spawning daemon {id} with args: {args:?}");
537 let mut cmd = tokio::process::Command::new("sh");
538 cmd.args(&args)
539 .stdin(std::process::Stdio::null())
540 .stdout(std::process::Stdio::piped())
541 .stderr(std::process::Stdio::piped())
542 .current_dir(&opts.dir);
543
544 if let Some(ref path) = *env::ORIGINAL_PATH {
546 cmd.env("PATH", path);
547 }
548
549 let mut child = cmd.spawn().into_diagnostic()?;
550 let pid = match child.id() {
551 Some(p) => p,
552 None => {
553 warn!("Daemon {id} exited before PID could be captured");
554 return Ok(IpcResponse::DaemonFailed {
555 error: "Process exited immediately".to_string(),
556 });
557 }
558 };
559 info!("started daemon {id} with pid {pid}");
560 let daemon = self
561 .upsert_daemon(UpsertDaemonOpts {
562 id: id.to_string(),
563 pid: Some(pid),
564 status: DaemonStatus::Running,
565 shell_pid: opts.shell_pid,
566 dir: Some(opts.dir.clone()),
567 autostop: opts.autostop,
568 cron_schedule: opts.cron_schedule.clone(),
569 cron_retrigger: opts.cron_retrigger,
570 last_exit_success: None,
571 retry: Some(opts.retry),
572 retry_count: Some(opts.retry_count),
573 ready_delay: opts.ready_delay,
574 ready_output: opts.ready_output.clone(),
575 ready_http: opts.ready_http.clone(),
576 ready_port: opts.ready_port,
577 depends: Some(opts.depends.clone()),
578 })
579 .await?;
580
581 let id_clone = id.to_string();
582 let ready_delay = opts.ready_delay;
583 let ready_output = opts.ready_output.clone();
584 let ready_http = opts.ready_http.clone();
585 let ready_port = opts.ready_port;
586
587 tokio::spawn(async move {
588 let id = id_clone;
589 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
590 (Some(out), Some(err)) => (out, err),
591 _ => {
592 error!("Failed to capture stdout/stderr for daemon {id}");
593 return;
594 }
595 };
596 let mut stdout = tokio::io::BufReader::new(stdout).lines();
597 let mut stderr = tokio::io::BufReader::new(stderr).lines();
598 let log_file = match tokio::fs::File::options()
599 .append(true)
600 .create(true)
601 .open(&log_path)
602 .await
603 {
604 Ok(f) => f,
605 Err(e) => {
606 error!("Failed to open log file for daemon {id}: {e}");
607 return;
608 }
609 };
610 let mut log_appender = BufWriter::new(log_file);
611
612 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
613 let format_line = |line: String| {
614 if line.starts_with(&format!("{id} ")) {
615 format!("{} {line}\n", now())
617 } else {
618 format!("{} {id} {line}\n", now())
619 }
620 };
621
622 let mut ready_notified = false;
624 let mut ready_tx = ready_tx;
625 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
626
627 let mut delay_timer =
628 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
629
630 let mut http_check_interval = ready_http
632 .as_ref()
633 .map(|_| tokio::time::interval(Duration::from_millis(500)));
634 let http_client = ready_http.as_ref().map(|_| {
635 reqwest::Client::builder()
636 .timeout(Duration::from_secs(5))
637 .build()
638 .unwrap_or_default()
639 });
640
641 let mut port_check_interval =
643 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
644
645 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
647
648 let (exit_tx, mut exit_rx) =
650 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
651
652 let child_pid = child.id().unwrap_or(0);
654 tokio::spawn(async move {
655 let result = child.wait().await;
656 debug!(
657 "daemon pid {child_pid} wait() completed with result: {:?}",
658 result
659 );
660 let _ = exit_tx.send(result).await;
661 });
662
663 #[allow(unused_assignments)]
664 let mut exit_status = None;
666
667 loop {
668 select! {
669 Ok(Some(line)) = stdout.next_line() => {
670 let formatted = format_line(line.clone());
671 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
672 error!("Failed to write to log for daemon {id}: {e}");
673 }
674 trace!("stdout: {id} {formatted}");
675
676 if !ready_notified
678 && let Some(ref pattern) = ready_pattern
679 && pattern.is_match(&line) {
680 info!("daemon {id} ready: output matched pattern");
681 ready_notified = true;
682 let _ = log_appender.flush().await;
684 if let Some(tx) = ready_tx.take() {
685 let _ = tx.send(Ok(()));
686 }
687 }
688 }
689 Ok(Some(line)) = stderr.next_line() => {
690 let formatted = format_line(line.clone());
691 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
692 error!("Failed to write to log for daemon {id}: {e}");
693 }
694 trace!("stderr: {id} {formatted}");
695
696 if !ready_notified
698 && let Some(ref pattern) = ready_pattern
699 && pattern.is_match(&line) {
700 info!("daemon {id} ready: output matched pattern");
701 ready_notified = true;
702 let _ = log_appender.flush().await;
704 if let Some(tx) = ready_tx.take() {
705 let _ = tx.send(Ok(()));
706 }
707 }
708 },
709 Some(result) = exit_rx.recv() => {
710 exit_status = Some(result);
712 debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
713 let _ = log_appender.flush().await;
715 if !ready_notified {
716 if let Some(tx) = ready_tx.take() {
717 let is_success = exit_status.as_ref()
719 .and_then(|r| r.as_ref().ok())
720 .map(|s| s.success())
721 .unwrap_or(false);
722
723 if is_success {
724 debug!("daemon {id} exited successfully before ready check, sending success notification");
725 let _ = tx.send(Ok(()));
726 } else {
727 let exit_code = exit_status.as_ref()
728 .and_then(|r| r.as_ref().ok())
729 .and_then(|s| s.code());
730 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
731 let _ = tx.send(Err(exit_code));
732 }
733 }
734 } else {
735 debug!("daemon {id} was already marked ready, not sending notification");
736 }
737 break;
738 }
739 _ = async {
740 if let Some(ref mut interval) = http_check_interval {
741 interval.tick().await;
742 } else {
743 std::future::pending::<()>().await;
744 }
745 }, if !ready_notified && ready_http.is_some() => {
746 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
747 match client.get(url).send().await {
748 Ok(response) if response.status().is_success() => {
749 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
750 ready_notified = true;
751 let _ = log_appender.flush().await;
753 if let Some(tx) = ready_tx.take() {
754 let _ = tx.send(Ok(()));
755 }
756 http_check_interval = None;
758 }
759 Ok(response) => {
760 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
761 }
762 Err(e) => {
763 trace!("daemon {id} HTTP check failed: {e}");
764 }
765 }
766 }
767 }
768 _ = async {
769 if let Some(ref mut interval) = port_check_interval {
770 interval.tick().await;
771 } else {
772 std::future::pending::<()>().await;
773 }
774 }, if !ready_notified && ready_port.is_some() => {
775 if let Some(port) = ready_port {
776 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
777 Ok(_) => {
778 info!("daemon {id} ready: TCP port {port} is listening");
779 ready_notified = true;
780 let _ = log_appender.flush().await;
782 if let Some(tx) = ready_tx.take() {
783 let _ = tx.send(Ok(()));
784 }
785 port_check_interval = None;
787 }
788 Err(_) => {
789 trace!("daemon {id} port check: port {port} not listening yet");
790 }
791 }
792 }
793 }
794 _ = async {
795 if let Some(ref mut timer) = delay_timer {
796 timer.await;
797 } else {
798 std::future::pending::<()>().await;
799 }
800 } => {
801 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
802 info!("daemon {id} ready: delay elapsed");
803 ready_notified = true;
804 let _ = log_appender.flush().await;
806 if let Some(tx) = ready_tx.take() {
807 let _ = tx.send(Ok(()));
808 }
809 }
810 delay_timer = None;
812 }
813 _ = log_flush_interval.tick() => {
814 if let Err(e) = log_appender.flush().await {
816 error!("Failed to flush log for daemon {id}: {e}");
817 }
818 }
819 }
822 }
823
824 if let Err(e) = log_appender.flush().await {
826 error!("Failed to final flush log for daemon {id}: {e}");
827 }
828
829 let exit_status = if let Some(status) = exit_status {
831 status
832 } else {
833 match exit_rx.recv().await {
835 Some(status) => status,
836 None => {
837 warn!("daemon {id} exit channel closed without receiving status");
838 Err(std::io::Error::other("exit channel closed"))
839 }
840 }
841 };
842 let current_daemon = SUPERVISOR.get_daemon(&id).await;
843
844 if current_daemon.is_none()
846 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
847 {
848 return;
850 }
851 let is_stopping = current_daemon
852 .as_ref()
853 .is_some_and(|d| d.status.is_stopping());
854
855 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
856 return;
858 }
859 if let Ok(status) = exit_status {
860 info!("daemon {id} exited with status {status}");
861 if status.success() || is_stopping {
862 if let Err(e) = SUPERVISOR
865 .upsert_daemon(UpsertDaemonOpts {
866 id: id.clone(),
867 pid: None, status: DaemonStatus::Stopped,
869 last_exit_success: Some(status.success()),
870 ..Default::default()
871 })
872 .await
873 {
874 error!("Failed to update daemon state for {id}: {e}");
875 }
876 } else {
877 if let Err(e) = SUPERVISOR
880 .upsert_daemon(UpsertDaemonOpts {
881 id: id.clone(),
882 pid: None,
883 status: DaemonStatus::Errored(status.code()),
884 last_exit_success: Some(false),
885 ..Default::default()
886 })
887 .await
888 {
889 error!("Failed to update daemon state for {id}: {e}");
890 }
891 }
892 } else if let Err(e) = SUPERVISOR
893 .upsert_daemon(UpsertDaemonOpts {
894 id: id.clone(),
895 pid: None,
896 status: DaemonStatus::Errored(None),
897 last_exit_success: Some(false),
898 ..Default::default()
899 })
900 .await
901 {
902 error!("Failed to update daemon state for {id}: {e}");
903 }
904 });
905
906 if let Some(ready_rx) = ready_rx {
908 match ready_rx.await {
909 Ok(Ok(())) => {
910 info!("daemon {id} is ready");
911 Ok(IpcResponse::DaemonReady { daemon })
912 }
913 Ok(Err(exit_code)) => {
914 error!("daemon {id} failed before becoming ready");
915 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
916 }
917 Err(_) => {
918 error!("readiness channel closed unexpectedly for daemon {id}");
919 Ok(IpcResponse::DaemonStart { daemon })
920 }
921 }
922 } else {
923 Ok(IpcResponse::DaemonStart { daemon })
924 }
925 }
926
927 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
928 if id == "pitchfork" {
929 return Ok(IpcResponse::Error(
930 "Cannot stop supervisor via stop command".into(),
931 ));
932 }
933 info!("stopping daemon: {id}");
934 if let Some(daemon) = self.get_daemon(id).await {
935 trace!("daemon to stop: {daemon}");
936 if let Some(pid) = daemon.pid {
937 trace!("killing pid: {pid}");
938 PROCS.refresh_processes();
939 if PROCS.is_running(pid) {
940 self.upsert_daemon(UpsertDaemonOpts {
942 id: id.to_string(),
943 status: DaemonStatus::Stopping,
944 ..Default::default()
945 })
946 .await?;
947
948 if let Err(e) = PROCS.kill_async(pid).await {
950 warn!("failed to kill pid {pid}: {e}");
951 }
952 PROCS.refresh_processes();
953 for child_pid in PROCS.all_children(pid) {
954 debug!("killing child pid: {child_pid}");
955 if let Err(e) = PROCS.kill_async(child_pid).await {
956 warn!("failed to kill child pid {child_pid}: {e}");
957 }
958 }
959 } else {
961 debug!("pid {pid} not running");
962 self.upsert_daemon(UpsertDaemonOpts {
964 id: id.to_string(),
965 pid: None,
966 status: DaemonStatus::Stopped,
967 ..Default::default()
968 })
969 .await?;
970 }
971 return Ok(IpcResponse::Ok);
972 } else {
973 debug!("daemon {id} not running");
974 }
975 } else {
976 debug!("daemon {id} not found");
977 }
978 Ok(IpcResponse::DaemonAlreadyStopped)
979 }
980
981 #[cfg(unix)]
982 fn signals(&self) -> Result<()> {
983 let signals = [
984 SignalKind::terminate(),
985 SignalKind::alarm(),
986 SignalKind::interrupt(),
987 SignalKind::quit(),
988 SignalKind::hangup(),
989 SignalKind::user_defined1(),
990 SignalKind::user_defined2(),
991 ];
992 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
993 for signal in signals {
994 let stream = match signal::unix::signal(signal) {
995 Ok(s) => s,
996 Err(e) => {
997 warn!("Failed to register signal handler for {:?}: {}", signal, e);
998 continue;
999 }
1000 };
1001 tokio::spawn(async move {
1002 let mut stream = stream;
1003 loop {
1004 stream.recv().await;
1005 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1006 exit(1);
1007 } else {
1008 SUPERVISOR.handle_signal().await;
1009 }
1010 }
1011 });
1012 }
1013 Ok(())
1014 }
1015
1016 #[cfg(windows)]
1017 fn signals(&self) -> Result<()> {
1018 tokio::spawn(async move {
1019 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1020 loop {
1021 if let Err(e) = signal::ctrl_c().await {
1022 error!("Failed to wait for ctrl-c: {}", e);
1023 return;
1024 }
1025 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1026 exit(1);
1027 } else {
1028 SUPERVISOR.handle_signal().await;
1029 }
1030 }
1031 });
1032 Ok(())
1033 }
1034
1035 async fn handle_signal(&self) {
1036 info!("received signal, stopping");
1037 self.close().await;
1038 exit(0)
1039 }
1040
1041 fn interval_watch(&self) -> Result<()> {
1068 tokio::spawn(async move {
1069 let mut interval = time::interval(interval_duration());
1070 loop {
1071 interval.tick().await;
1072 if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
1073 && let Err(err) = SUPERVISOR.refresh().await
1074 {
1075 error!("failed to refresh: {err}");
1076 }
1077 }
1078 });
1079 Ok(())
1080 }
1081
1082 fn cron_watch(&self) -> Result<()> {
1083 tokio::spawn(async move {
1084 let mut interval = time::interval(Duration::from_secs(60));
1087 loop {
1088 interval.tick().await;
1089 if let Err(err) = SUPERVISOR.check_cron_schedules().await {
1090 error!("failed to check cron schedules: {err}");
1091 }
1092 }
1093 });
1094 Ok(())
1095 }
1096
1097 async fn check_cron_schedules(&self) -> Result<()> {
1098 use cron::Schedule;
1099 use std::str::FromStr;
1100
1101 let now = chrono::Local::now();
1102
1103 let cron_daemon_ids: Vec<String> = {
1105 let state_file = self.state_file.lock().await;
1106 state_file
1107 .daemons
1108 .iter()
1109 .filter(|(_id, d)| d.cron_schedule.is_some() && d.cron_retrigger.is_some())
1110 .map(|(id, _d)| id.clone())
1111 .collect()
1112 };
1113
1114 for id in cron_daemon_ids {
1115 let daemon = {
1117 let state_file = self.state_file.lock().await;
1118 match state_file.daemons.get(&id) {
1119 Some(d) => d.clone(),
1120 None => continue,
1121 }
1122 };
1123
1124 if let Some(schedule_str) = &daemon.cron_schedule
1125 && let Some(retrigger) = daemon.cron_retrigger
1126 {
1127 let schedule = match Schedule::from_str(schedule_str) {
1129 Ok(s) => s,
1130 Err(e) => {
1131 warn!("invalid cron schedule for daemon {id}: {e}");
1132 continue;
1133 }
1134 };
1135
1136 let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
1138 let diff = next.signed_duration_since(now);
1140 diff.num_seconds() < 60 && diff.num_seconds() >= 0
1141 });
1142
1143 if should_trigger {
1144 let should_run = match retrigger {
1145 crate::pitchfork_toml::CronRetrigger::Finish => {
1146 daemon.pid.is_none()
1148 }
1149 crate::pitchfork_toml::CronRetrigger::Always => {
1150 true
1152 }
1153 crate::pitchfork_toml::CronRetrigger::Success => {
1154 daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1156 }
1157 crate::pitchfork_toml::CronRetrigger::Fail => {
1158 daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1160 }
1161 };
1162
1163 if should_run {
1164 info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1165 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1167 let cmd = match shell_words::split(&run_cmd) {
1168 Ok(cmd) => cmd,
1169 Err(e) => {
1170 error!("failed to parse command for cron daemon {}: {}", id, e);
1171 continue;
1172 }
1173 };
1174 let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1175 let force =
1177 matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1178 let opts = RunOptions {
1179 id: id.clone(),
1180 cmd,
1181 force,
1182 shell_pid: None,
1183 dir,
1184 autostop: daemon.autostop,
1185 cron_schedule: Some(schedule_str.clone()),
1186 cron_retrigger: Some(retrigger),
1187 retry: daemon.retry,
1188 retry_count: daemon.retry_count,
1189 ready_delay: daemon.ready_delay,
1190 ready_output: daemon.ready_output.clone(),
1191 ready_http: daemon.ready_http.clone(),
1192 ready_port: daemon.ready_port,
1193 wait_ready: false,
1194 depends: daemon.depends.clone(),
1195 };
1196 if let Err(e) = self.run(opts).await {
1197 error!("failed to run cron daemon {id}: {e}");
1198 }
1199 } else {
1200 warn!("no run command found for cron daemon {id}");
1201 }
1202 }
1203 }
1204 }
1205 }
1206
1207 Ok(())
1208 }
1209
1210 fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1211 use crate::pitchfork_toml::PitchforkToml;
1212 let pt = PitchforkToml::all_merged();
1213 pt.daemons.get(id).map(|d| d.run.clone())
1214 }
1215
1216 async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1217 loop {
1218 let (msg, send) = match ipc.read().await {
1219 Ok(msg) => msg,
1220 Err(e) => {
1221 error!("failed to accept connection: {:?}", e);
1222 continue;
1223 }
1224 };
1225 debug!("received message: {:?}", msg);
1226 tokio::spawn(async move {
1227 let rsp = SUPERVISOR
1228 .handle_ipc(msg)
1229 .await
1230 .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1231 if let Err(err) = send.send(rsp).await {
1232 debug!("failed to send message: {:?}", err);
1233 }
1234 });
1235 }
1236 }
1237
1238 async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1239 let rsp = match req {
1240 IpcRequest::Connect => {
1241 debug!("received connect message");
1242 IpcResponse::Ok
1243 }
1244 IpcRequest::Stop { id } => {
1245 if let Err(e) = validate_daemon_id(&id) {
1246 return Ok(IpcResponse::Error(e));
1247 }
1248 self.stop(&id).await?
1249 }
1250 IpcRequest::Run(opts) => {
1251 if let Err(e) = validate_daemon_id(&opts.id) {
1252 return Ok(IpcResponse::Error(e));
1253 }
1254 self.run(opts).await?
1255 }
1256 IpcRequest::Enable { id } => {
1257 if let Err(e) = validate_daemon_id(&id) {
1258 return Ok(IpcResponse::Error(e));
1259 }
1260 if self.enable(id).await? {
1261 IpcResponse::Yes
1262 } else {
1263 IpcResponse::No
1264 }
1265 }
1266 IpcRequest::Disable { id } => {
1267 if let Err(e) = validate_daemon_id(&id) {
1268 return Ok(IpcResponse::Error(e));
1269 }
1270 if self.disable(id).await? {
1271 IpcResponse::Yes
1272 } else {
1273 IpcResponse::No
1274 }
1275 }
1276 IpcRequest::GetActiveDaemons => {
1277 let daemons = self.active_daemons().await;
1278 IpcResponse::ActiveDaemons(daemons)
1279 }
1280 IpcRequest::GetNotifications => {
1281 let notifications = self.get_notifications().await;
1282 IpcResponse::Notifications(notifications)
1283 }
1284 IpcRequest::UpdateShellDir { shell_pid, dir } => {
1285 let prev = self.get_shell_dir(shell_pid).await;
1286 self.set_shell_dir(shell_pid, dir.clone()).await?;
1287 self.cancel_pending_autostops_for_dir(&dir).await;
1289 if let Some(prev) = prev {
1290 self.leave_dir(&prev).await?;
1291 }
1292 self.refresh().await?;
1293 IpcResponse::Ok
1294 }
1295 IpcRequest::Clean => {
1296 self.clean().await?;
1297 IpcResponse::Ok
1298 }
1299 IpcRequest::GetDisabledDaemons => {
1300 let disabled = self.state_file.lock().await.disabled.clone();
1301 IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1302 }
1303 };
1304 Ok(rsp)
1305 }
1306
1307 async fn close(&self) {
1308 for daemon in self.active_daemons().await {
1309 if daemon.id == "pitchfork" {
1310 continue;
1311 }
1312 if let Err(err) = self.stop(&daemon.id).await {
1313 error!("failed to stop daemon {daemon}: {err}");
1314 }
1315 }
1316 let _ = self.remove_daemon("pitchfork").await;
1317 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1318 }
1320
1321 async fn add_notification(&self, level: log::LevelFilter, message: String) {
1322 self.pending_notifications
1323 .lock()
1324 .await
1325 .push((level, message));
1326 }
1327
1328 async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1329 self.pending_notifications.lock().await.drain(..).collect()
1330 }
1331
1332 async fn active_daemons(&self) -> Vec<Daemon> {
1333 self.state_file
1334 .lock()
1335 .await
1336 .daemons
1337 .values()
1338 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1339 .cloned()
1340 .collect()
1341 }
1342
1343 async fn remove_daemon(&self, id: &str) -> Result<()> {
1344 self.state_file.lock().await.daemons.remove(id);
1345 if let Err(err) = self.state_file.lock().await.write() {
1346 warn!("failed to update state file: {err:#}");
1347 }
1348 Ok(())
1349 }
1350
1351 async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1352 info!(
1353 "upserting daemon: {} pid: {} status: {}",
1354 opts.id,
1355 opts.pid.unwrap_or(0),
1356 opts.status
1357 );
1358 let mut state_file = self.state_file.lock().await;
1359 let existing = state_file.daemons.get(&opts.id);
1360 let daemon = Daemon {
1361 id: opts.id.to_string(),
1362 title: opts.pid.and_then(|pid| PROCS.title(pid)),
1363 pid: opts.pid,
1364 status: opts.status,
1365 shell_pid: opts.shell_pid,
1366 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1367 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1368 cron_schedule: opts
1369 .cron_schedule
1370 .or(existing.and_then(|d| d.cron_schedule.clone())),
1371 cron_retrigger: opts
1372 .cron_retrigger
1373 .or(existing.and_then(|d| d.cron_retrigger)),
1374 last_exit_success: opts
1375 .last_exit_success
1376 .or(existing.and_then(|d| d.last_exit_success)),
1377 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1378 retry_count: opts
1379 .retry_count
1380 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1381 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1382 ready_output: opts
1383 .ready_output
1384 .or(existing.and_then(|d| d.ready_output.clone())),
1385 ready_http: opts
1386 .ready_http
1387 .or(existing.and_then(|d| d.ready_http.clone())),
1388 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1389 depends: opts
1390 .depends
1391 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
1392 };
1393 state_file
1394 .daemons
1395 .insert(opts.id.to_string(), daemon.clone());
1396 if let Err(err) = state_file.write() {
1397 warn!("failed to update state file: {err:#}");
1398 }
1399 Ok(daemon)
1400 }
1401
1402 pub async fn enable(&self, id: String) -> Result<bool> {
1403 info!("enabling daemon: {id}");
1404 let mut state_file = self.state_file.lock().await;
1405 let result = state_file.disabled.remove(&id);
1406 state_file.write()?;
1407 Ok(result)
1408 }
1409
1410 pub async fn disable(&self, id: String) -> Result<bool> {
1411 info!("disabling daemon: {id}");
1412 let mut state_file = self.state_file.lock().await;
1413 let result = state_file.disabled.insert(id);
1414 state_file.write()?;
1415 Ok(result)
1416 }
1417
1418 async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1419 self.state_file.lock().await.daemons.get(id).cloned()
1420 }
1421
1422 async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1423 let mut state_file = self.state_file.lock().await;
1424 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1425 state_file.write()?;
1426 Ok(())
1427 }
1428
1429 async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1430 self.state_file
1431 .lock()
1432 .await
1433 .shell_dirs
1434 .get(&shell_pid.to_string())
1435 .cloned()
1436 }
1437
1438 async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1439 let mut state_file = self.state_file.lock().await;
1440 if state_file
1441 .shell_dirs
1442 .remove(&shell_pid.to_string())
1443 .is_some()
1444 {
1445 state_file.write()?;
1446 }
1447 Ok(())
1448 }
1449
1450 async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1451 self.state_file.lock().await.shell_dirs.iter().fold(
1452 HashMap::new(),
1453 |mut acc, (pid, dir)| {
1454 if let Ok(pid) = pid.parse() {
1455 acc.entry(dir.clone()).or_default().push(pid);
1456 }
1457 acc
1458 },
1459 )
1460 }
1461
1462 async fn clean(&self) -> Result<()> {
1463 let mut state_file = self.state_file.lock().await;
1464 state_file.daemons.retain(|_id, d| d.pid.is_some());
1465 state_file.write()?;
1466 Ok(())
1467 }
1468}
1469
1470#[derive(Debug)]
1471struct UpsertDaemonOpts {
1472 id: String,
1473 pid: Option<u32>,
1474 status: DaemonStatus,
1475 shell_pid: Option<u32>,
1476 dir: Option<PathBuf>,
1477 autostop: bool,
1478 cron_schedule: Option<String>,
1479 cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1480 last_exit_success: Option<bool>,
1481 retry: Option<u32>,
1482 retry_count: Option<u32>,
1483 ready_delay: Option<u64>,
1484 ready_output: Option<String>,
1485 ready_http: Option<String>,
1486 ready_port: Option<u16>,
1487 depends: Option<Vec<String>>,
1488}
1489
1490impl Default for UpsertDaemonOpts {
1491 fn default() -> Self {
1492 Self {
1493 id: "".to_string(),
1494 pid: None,
1495 status: DaemonStatus::Stopped,
1496 shell_pid: None,
1497 dir: None,
1498 autostop: false,
1499 cron_schedule: None,
1500 cron_retrigger: None,
1501 last_exit_success: None,
1502 retry: None,
1503 retry_count: None,
1504 ready_delay: None,
1505 ready_output: None,
1506 ready_http: None,
1507 ready_port: None,
1508 depends: None,
1509 }
1510 }
1511}