1use crate::daemon::{Daemon, RunOptions, validate_daemon_id};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::{IpcServer, IpcServerHandle};
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{Result, env};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use regex::Regex;
14use std::collections::HashMap;
15use std::fs;
16use std::iter::once;
17use std::path::{Path, PathBuf};
18use std::process::exit;
19use std::sync::atomic;
20use std::sync::atomic::AtomicBool;
21use std::time::Duration;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
23#[cfg(unix)]
24use tokio::signal::unix::SignalKind;
25use tokio::sync::Mutex;
26use tokio::sync::oneshot;
27use tokio::{select, signal, time};
28
29static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
31 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
32
33fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
35 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
36 if let Some(re) = cache.get(pattern) {
37 return Some(re.clone());
38 }
39 match Regex::new(pattern) {
40 Ok(re) => {
41 cache.insert(pattern.to_string(), re.clone());
42 Some(re)
43 }
44 Err(e) => {
45 error!("invalid regex pattern '{}': {}", pattern, e);
46 None
47 }
48 }
49}
50
51pub struct Supervisor {
52 state_file: Mutex<StateFile>,
53 pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
54 last_refreshed_at: Mutex<time::Instant>,
55 pending_autostops: Mutex<HashMap<String, time::Instant>>,
57 ipc_shutdown: Mutex<Option<IpcServerHandle>>,
59}
60
61fn interval_duration() -> Duration {
62 Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
63}
64
65pub static SUPERVISOR: Lazy<Supervisor> =
66 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
67
68pub fn start_if_not_running() -> Result<()> {
69 let sf = StateFile::get();
70 if let Some(d) = sf.daemons.get("pitchfork")
71 && let Some(pid) = d.pid
72 && PROCS.is_running(pid)
73 {
74 return Ok(());
75 }
76 start_in_background()
77}
78
79pub fn start_in_background() -> Result<()> {
80 debug!("starting supervisor in background");
81 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
82 .stdout_null()
83 .stderr_null()
84 .start()
85 .into_diagnostic()?;
86 Ok(())
87}
88
89impl Supervisor {
90 pub fn new() -> Result<Self> {
91 Ok(Self {
92 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
93 last_refreshed_at: Mutex::new(time::Instant::now()),
94 pending_notifications: Mutex::new(vec![]),
95 pending_autostops: Mutex::new(HashMap::new()),
96 ipc_shutdown: Mutex::new(None),
97 })
98 }
99
100 pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
101 let pid = std::process::id();
102 info!("Starting supervisor with pid {pid}");
103
104 self.upsert_daemon(UpsertDaemonOpts {
105 id: "pitchfork".to_string(),
106 pid: Some(pid),
107 status: DaemonStatus::Running,
108 ..Default::default()
109 })
110 .await?;
111
112 if is_boot {
114 info!("Boot start mode enabled, starting boot_start daemons");
115 self.start_boot_daemons().await?;
116 }
117
118 self.interval_watch()?;
119 self.cron_watch()?;
120 self.signals()?;
121 if let Some(port) = web_port {
125 tokio::spawn(async move {
126 if let Err(e) = crate::web::serve(port).await {
127 error!("Web server error: {}", e);
128 }
129 });
130 }
131
132 let (ipc, ipc_handle) = IpcServer::new()?;
133 *self.ipc_shutdown.lock().await = Some(ipc_handle);
134 self.conn_watch(ipc).await
135 }
136
137 async fn refresh(&self) -> Result<()> {
138 trace!("refreshing");
139
140 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
143 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
144
145 if pids_to_check.is_empty() {
146 trace!("no shell PIDs to check, skipping process refresh");
148 } else {
149 PROCS.refresh_pids(&pids_to_check);
150 }
151
152 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
153 *last_refreshed_at = time::Instant::now();
154
155 for (dir, pids) in dirs_with_pids {
156 let to_remove = pids
157 .iter()
158 .filter(|pid| !PROCS.is_running(**pid))
159 .collect_vec();
160 for pid in &to_remove {
161 self.remove_shell_pid(**pid).await?
162 }
163 if to_remove.len() == pids.len() {
164 self.leave_dir(&dir).await?;
165 }
166 }
167
168 self.check_retry().await?;
169 self.process_pending_autostops().await?;
170
171 Ok(())
172 }
173
174 async fn check_retry(&self) -> Result<()> {
175 let ids_to_retry: Vec<String> = {
177 let state_file = self.state_file.lock().await;
178 state_file
179 .daemons
180 .iter()
181 .filter(|(_id, d)| {
182 d.status.is_errored()
184 && d.pid.is_none()
185 && d.retry > 0
186 && d.retry_count < d.retry
187 })
188 .map(|(id, _d)| id.clone())
189 .collect()
190 };
191
192 for id in ids_to_retry {
193 let daemon = {
196 let state_file = self.state_file.lock().await;
197 match state_file.daemons.get(&id) {
198 Some(d)
199 if d.status.is_errored()
200 && d.pid.is_none()
201 && d.retry > 0
202 && d.retry_count < d.retry =>
203 {
204 d.clone()
205 }
206 _ => continue, }
208 };
209 info!(
210 "retrying daemon {} ({}/{} attempts)",
211 id,
212 daemon.retry_count + 1,
213 daemon.retry
214 );
215
216 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
218 let cmd = match shell_words::split(&run_cmd) {
219 Ok(cmd) => cmd,
220 Err(e) => {
221 error!("failed to parse command for daemon {}: {}", id, e);
222 self.upsert_daemon(UpsertDaemonOpts {
224 id,
225 status: daemon.status.clone(),
226 retry_count: Some(daemon.retry),
227 ..Default::default()
228 })
229 .await?;
230 continue;
231 }
232 };
233 let retry_opts = RunOptions {
234 id: id.clone(),
235 cmd,
236 force: false,
237 shell_pid: daemon.shell_pid,
238 dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
239 autostop: daemon.autostop,
240 cron_schedule: daemon.cron_schedule,
241 cron_retrigger: daemon.cron_retrigger,
242 retry: daemon.retry,
243 retry_count: daemon.retry_count + 1,
244 ready_delay: daemon.ready_delay,
245 ready_output: daemon.ready_output.clone(),
246 ready_http: daemon.ready_http.clone(),
247 ready_port: daemon.ready_port,
248 wait_ready: false,
249 depends: daemon.depends.clone(),
250 };
251 if let Err(e) = self.run(retry_opts).await {
252 error!("failed to retry daemon {}: {}", id, e);
253 }
254 } else {
255 warn!("no run command found for daemon {}, cannot retry", id);
256 self.upsert_daemon(UpsertDaemonOpts {
258 id,
259 retry_count: Some(daemon.retry),
260 ..Default::default()
261 })
262 .await?;
263 }
264 }
265
266 Ok(())
267 }
268
269 async fn leave_dir(&self, dir: &Path) -> Result<()> {
270 debug!("left dir {}", dir.display());
271 let shell_dirs = self.get_dirs_with_shell_pids().await;
272 let shell_dirs = shell_dirs.keys().collect_vec();
273 let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
274
275 for daemon in self.active_daemons().await {
276 if !daemon.autostop {
277 continue;
278 }
279 if let Some(daemon_dir) = daemon.dir.as_ref()
283 && daemon_dir.starts_with(dir)
284 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
285 {
286 if delay_secs == 0 {
287 info!("autostopping {daemon}");
289 self.stop(&daemon.id).await?;
290 self.add_notification(Info, format!("autostopped {daemon}"))
291 .await;
292 } else {
293 let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
295 let mut pending = self.pending_autostops.lock().await;
296 if !pending.contains_key(&daemon.id) {
297 info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
298 pending.insert(daemon.id.clone(), stop_at);
299 }
300 }
301 }
302 }
303 Ok(())
304 }
305
306 async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
310 let mut pending = self.pending_autostops.lock().await;
311 let daemons_to_cancel: Vec<String> = {
312 let state_file = self.state_file.lock().await;
313 state_file
314 .daemons
315 .iter()
316 .filter(|(_id, d)| {
317 d.dir.as_ref().is_some_and(|daemon_dir| {
318 dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
321 })
322 })
323 .map(|(id, _)| id.clone())
324 .collect()
325 };
326
327 for daemon_id in daemons_to_cancel {
328 if pending.remove(&daemon_id).is_some() {
329 info!("cancelled pending autostop for {}", daemon_id);
330 }
331 }
332 }
333
334 async fn process_pending_autostops(&self) -> Result<()> {
336 let now = time::Instant::now();
337 let to_stop: Vec<String> = {
338 let pending = self.pending_autostops.lock().await;
339 pending
340 .iter()
341 .filter(|(_, stop_at)| now >= **stop_at)
342 .map(|(id, _)| id.clone())
343 .collect()
344 };
345
346 for daemon_id in to_stop {
347 {
349 let mut pending = self.pending_autostops.lock().await;
350 pending.remove(&daemon_id);
351 }
352
353 if let Some(daemon) = self.get_daemon(&daemon_id).await
355 && daemon.autostop
356 && daemon.status.is_running()
357 {
358 let shell_dirs = self.get_dirs_with_shell_pids().await;
360 let shell_dirs = shell_dirs.keys().collect_vec();
361 if let Some(daemon_dir) = daemon.dir.as_ref()
362 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
363 {
364 info!("autostopping {} (after delay)", daemon_id);
365 self.stop(&daemon_id).await?;
366 self.add_notification(Info, format!("autostopped {daemon_id}"))
367 .await;
368 }
369 }
370 }
371 Ok(())
372 }
373
374 async fn start_boot_daemons(&self) -> Result<()> {
375 use crate::pitchfork_toml::PitchforkToml;
376
377 info!("Scanning for boot_start daemons");
378 let pt = PitchforkToml::all_merged();
379
380 let boot_daemons: Vec<_> = pt
381 .daemons
382 .iter()
383 .filter(|(_id, d)| d.boot_start.unwrap_or(false))
384 .collect();
385
386 if boot_daemons.is_empty() {
387 info!("No daemons configured with boot_start = true");
388 return Ok(());
389 }
390
391 info!("Found {} daemon(s) to start at boot", boot_daemons.len());
392
393 for (id, daemon) in boot_daemons {
394 info!("Starting boot daemon: {}", id);
395
396 let dir = daemon
397 .path
398 .as_ref()
399 .and_then(|p| p.parent())
400 .map(|p| p.to_path_buf())
401 .unwrap_or_else(|| env::CWD.clone());
402
403 let cmd = match shell_words::split(&daemon.run) {
404 Ok(cmd) => cmd,
405 Err(e) => {
406 error!("failed to parse command for boot daemon {}: {}", id, e);
407 continue;
408 }
409 };
410 let run_opts = RunOptions {
411 id: id.clone(),
412 cmd,
413 force: false,
414 shell_pid: None,
415 dir,
416 autostop: false, cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
418 cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
419 retry: daemon.retry,
420 retry_count: 0,
421 ready_delay: daemon.ready_delay,
422 ready_output: daemon.ready_output.clone(),
423 ready_http: daemon.ready_http.clone(),
424 ready_port: daemon.ready_port,
425 wait_ready: false, depends: daemon.depends.clone(),
427 };
428
429 match self.run(run_opts).await {
430 Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
431 info!("Successfully started boot daemon: {}", id);
432 }
433 Ok(IpcResponse::DaemonAlreadyRunning) => {
434 info!("Boot daemon already running: {}", id);
435 }
436 Ok(other) => {
437 warn!(
438 "Unexpected response when starting boot daemon {}: {:?}",
439 id, other
440 );
441 }
442 Err(e) => {
443 error!("Failed to start boot daemon {}: {}", id, e);
444 }
445 }
446 }
447
448 Ok(())
449 }
450
451 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
452 let id = &opts.id;
453 let cmd = opts.cmd.clone();
454
455 {
457 let mut pending = self.pending_autostops.lock().await;
458 if pending.remove(id).is_some() {
459 info!("cleared pending autostop for {} (daemon starting)", id);
460 }
461 }
462
463 let daemon = self.get_daemon(id).await;
464 if let Some(daemon) = daemon {
465 if !daemon.status.is_stopping()
468 && !daemon.status.is_stopped()
469 && let Some(pid) = daemon.pid
470 {
471 if opts.force {
472 self.stop(id).await?;
473 info!("run: stop completed for daemon {id}");
474 } else {
475 warn!("daemon {id} already running with pid {pid}");
476 return Ok(IpcResponse::DaemonAlreadyRunning);
477 }
478 }
479 }
480
481 if opts.wait_ready && opts.retry > 0 {
483 let max_attempts = opts.retry + 1; for attempt in 0..max_attempts {
485 let mut retry_opts = opts.clone();
486 retry_opts.retry_count = attempt;
487 retry_opts.cmd = cmd.clone();
488
489 let result = self.run_once(retry_opts).await?;
490
491 match result {
492 IpcResponse::DaemonReady { daemon } => {
493 return Ok(IpcResponse::DaemonReady { daemon });
494 }
495 IpcResponse::DaemonFailedWithCode { exit_code } => {
496 if attempt < opts.retry {
497 let backoff_secs = 2u64.pow(attempt);
498 info!(
499 "daemon {id} failed (attempt {}/{}), retrying in {}s",
500 attempt + 1,
501 max_attempts,
502 backoff_secs
503 );
504 time::sleep(Duration::from_secs(backoff_secs)).await;
505 continue;
506 } else {
507 info!("daemon {id} failed after {} attempts", max_attempts);
508 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
509 }
510 }
511 other => return Ok(other),
512 }
513 }
514 }
515
516 self.run_once(opts).await
518 }
519
520 async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
521 let id = &opts.id;
522 let cmd = opts.cmd;
523
524 let (ready_tx, ready_rx) = if opts.wait_ready {
526 let (tx, rx) = oneshot::channel();
527 (Some(tx), Some(rx))
528 } else {
529 (None, None)
530 };
531
532 let cmd = once("exec".to_string())
533 .chain(cmd.into_iter())
534 .collect_vec();
535 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
536 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
537 if let Some(parent) = log_path.parent() {
538 xx::file::mkdirp(parent)?;
539 }
540 info!("run: spawning daemon {id} with args: {args:?}");
541 let mut cmd = tokio::process::Command::new("sh");
542 cmd.args(&args)
543 .stdin(std::process::Stdio::null())
544 .stdout(std::process::Stdio::piped())
545 .stderr(std::process::Stdio::piped())
546 .current_dir(&opts.dir);
547
548 if let Some(ref path) = *env::ORIGINAL_PATH {
550 cmd.env("PATH", path);
551 }
552
553 let mut child = cmd.spawn().into_diagnostic()?;
554 let pid = match child.id() {
555 Some(p) => p,
556 None => {
557 warn!("Daemon {id} exited before PID could be captured");
558 return Ok(IpcResponse::DaemonFailed {
559 error: "Process exited immediately".to_string(),
560 });
561 }
562 };
563 info!("started daemon {id} with pid {pid}");
564 let daemon = self
565 .upsert_daemon(UpsertDaemonOpts {
566 id: id.to_string(),
567 pid: Some(pid),
568 status: DaemonStatus::Running,
569 shell_pid: opts.shell_pid,
570 dir: Some(opts.dir.clone()),
571 autostop: opts.autostop,
572 cron_schedule: opts.cron_schedule.clone(),
573 cron_retrigger: opts.cron_retrigger,
574 last_exit_success: None,
575 retry: Some(opts.retry),
576 retry_count: Some(opts.retry_count),
577 ready_delay: opts.ready_delay,
578 ready_output: opts.ready_output.clone(),
579 ready_http: opts.ready_http.clone(),
580 ready_port: opts.ready_port,
581 depends: Some(opts.depends.clone()),
582 })
583 .await?;
584
585 let id_clone = id.to_string();
586 let ready_delay = opts.ready_delay;
587 let ready_output = opts.ready_output.clone();
588 let ready_http = opts.ready_http.clone();
589 let ready_port = opts.ready_port;
590
591 tokio::spawn(async move {
592 let id = id_clone;
593 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
594 (Some(out), Some(err)) => (out, err),
595 _ => {
596 error!("Failed to capture stdout/stderr for daemon {id}");
597 return;
598 }
599 };
600 let mut stdout = tokio::io::BufReader::new(stdout).lines();
601 let mut stderr = tokio::io::BufReader::new(stderr).lines();
602 let log_file = match tokio::fs::File::options()
603 .append(true)
604 .create(true)
605 .open(&log_path)
606 .await
607 {
608 Ok(f) => f,
609 Err(e) => {
610 error!("Failed to open log file for daemon {id}: {e}");
611 return;
612 }
613 };
614 let mut log_appender = BufWriter::new(log_file);
615
616 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
617 let format_line = |line: String| {
618 if line.starts_with(&format!("{id} ")) {
619 format!("{} {line}\n", now())
621 } else {
622 format!("{} {id} {line}\n", now())
623 }
624 };
625
626 let mut ready_notified = false;
628 let mut ready_tx = ready_tx;
629 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
630
631 let mut delay_timer =
632 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
633
634 let mut http_check_interval = ready_http
636 .as_ref()
637 .map(|_| tokio::time::interval(Duration::from_millis(500)));
638 let http_client = ready_http.as_ref().map(|_| {
639 reqwest::Client::builder()
640 .timeout(Duration::from_secs(5))
641 .build()
642 .unwrap_or_default()
643 });
644
645 let mut port_check_interval =
647 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
648
649 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
651
652 let (exit_tx, mut exit_rx) =
654 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
655
656 let child_pid = child.id().unwrap_or(0);
658 tokio::spawn(async move {
659 let result = child.wait().await;
660 debug!(
661 "daemon pid {child_pid} wait() completed with result: {:?}",
662 result
663 );
664 let _ = exit_tx.send(result).await;
665 });
666
667 #[allow(unused_assignments)]
668 let mut exit_status = None;
670
671 loop {
672 select! {
673 Ok(Some(line)) = stdout.next_line() => {
674 let formatted = format_line(line.clone());
675 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
676 error!("Failed to write to log for daemon {id}: {e}");
677 }
678 trace!("stdout: {id} {formatted}");
679
680 if !ready_notified
682 && let Some(ref pattern) = ready_pattern
683 && pattern.is_match(&line) {
684 info!("daemon {id} ready: output matched pattern");
685 ready_notified = true;
686 let _ = log_appender.flush().await;
688 if let Some(tx) = ready_tx.take() {
689 let _ = tx.send(Ok(()));
690 }
691 }
692 }
693 Ok(Some(line)) = stderr.next_line() => {
694 let formatted = format_line(line.clone());
695 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
696 error!("Failed to write to log for daemon {id}: {e}");
697 }
698 trace!("stderr: {id} {formatted}");
699
700 if !ready_notified
702 && let Some(ref pattern) = ready_pattern
703 && pattern.is_match(&line) {
704 info!("daemon {id} ready: output matched pattern");
705 ready_notified = true;
706 let _ = log_appender.flush().await;
708 if let Some(tx) = ready_tx.take() {
709 let _ = tx.send(Ok(()));
710 }
711 }
712 },
713 Some(result) = exit_rx.recv() => {
714 exit_status = Some(result);
716 debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
717 let _ = log_appender.flush().await;
719 if !ready_notified {
720 if let Some(tx) = ready_tx.take() {
721 let is_success = exit_status.as_ref()
723 .and_then(|r| r.as_ref().ok())
724 .map(|s| s.success())
725 .unwrap_or(false);
726
727 if is_success {
728 debug!("daemon {id} exited successfully before ready check, sending success notification");
729 let _ = tx.send(Ok(()));
730 } else {
731 let exit_code = exit_status.as_ref()
732 .and_then(|r| r.as_ref().ok())
733 .and_then(|s| s.code());
734 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
735 let _ = tx.send(Err(exit_code));
736 }
737 }
738 } else {
739 debug!("daemon {id} was already marked ready, not sending notification");
740 }
741 break;
742 }
743 _ = async {
744 if let Some(ref mut interval) = http_check_interval {
745 interval.tick().await;
746 } else {
747 std::future::pending::<()>().await;
748 }
749 }, if !ready_notified && ready_http.is_some() => {
750 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
751 match client.get(url).send().await {
752 Ok(response) if response.status().is_success() => {
753 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
754 ready_notified = true;
755 let _ = log_appender.flush().await;
757 if let Some(tx) = ready_tx.take() {
758 let _ = tx.send(Ok(()));
759 }
760 http_check_interval = None;
762 }
763 Ok(response) => {
764 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
765 }
766 Err(e) => {
767 trace!("daemon {id} HTTP check failed: {e}");
768 }
769 }
770 }
771 }
772 _ = async {
773 if let Some(ref mut interval) = port_check_interval {
774 interval.tick().await;
775 } else {
776 std::future::pending::<()>().await;
777 }
778 }, if !ready_notified && ready_port.is_some() => {
779 if let Some(port) = ready_port {
780 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
781 Ok(_) => {
782 info!("daemon {id} ready: TCP port {port} is listening");
783 ready_notified = true;
784 let _ = log_appender.flush().await;
786 if let Some(tx) = ready_tx.take() {
787 let _ = tx.send(Ok(()));
788 }
789 port_check_interval = None;
791 }
792 Err(_) => {
793 trace!("daemon {id} port check: port {port} not listening yet");
794 }
795 }
796 }
797 }
798 _ = async {
799 if let Some(ref mut timer) = delay_timer {
800 timer.await;
801 } else {
802 std::future::pending::<()>().await;
803 }
804 } => {
805 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
806 info!("daemon {id} ready: delay elapsed");
807 ready_notified = true;
808 let _ = log_appender.flush().await;
810 if let Some(tx) = ready_tx.take() {
811 let _ = tx.send(Ok(()));
812 }
813 }
814 delay_timer = None;
816 }
817 _ = log_flush_interval.tick() => {
818 if let Err(e) = log_appender.flush().await {
820 error!("Failed to flush log for daemon {id}: {e}");
821 }
822 }
823 }
826 }
827
828 if let Err(e) = log_appender.flush().await {
830 error!("Failed to final flush log for daemon {id}: {e}");
831 }
832
833 let exit_status = if let Some(status) = exit_status {
835 status
836 } else {
837 match exit_rx.recv().await {
839 Some(status) => status,
840 None => {
841 warn!("daemon {id} exit channel closed without receiving status");
842 Err(std::io::Error::other("exit channel closed"))
843 }
844 }
845 };
846 let current_daemon = SUPERVISOR.get_daemon(&id).await;
847
848 if current_daemon.is_none()
850 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
851 {
852 return;
854 }
855 let is_stopping = current_daemon
856 .as_ref()
857 .is_some_and(|d| d.status.is_stopping());
858
859 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
860 return;
862 }
863 if let Ok(status) = exit_status {
864 info!("daemon {id} exited with status {status}");
865 if status.success() || is_stopping {
866 if let Err(e) = SUPERVISOR
869 .upsert_daemon(UpsertDaemonOpts {
870 id: id.clone(),
871 pid: None, status: DaemonStatus::Stopped,
873 last_exit_success: Some(status.success()),
874 ..Default::default()
875 })
876 .await
877 {
878 error!("Failed to update daemon state for {id}: {e}");
879 }
880 } else {
881 if let Err(e) = SUPERVISOR
884 .upsert_daemon(UpsertDaemonOpts {
885 id: id.clone(),
886 pid: None,
887 status: DaemonStatus::Errored(status.code()),
888 last_exit_success: Some(false),
889 ..Default::default()
890 })
891 .await
892 {
893 error!("Failed to update daemon state for {id}: {e}");
894 }
895 }
896 } else if let Err(e) = SUPERVISOR
897 .upsert_daemon(UpsertDaemonOpts {
898 id: id.clone(),
899 pid: None,
900 status: DaemonStatus::Errored(None),
901 last_exit_success: Some(false),
902 ..Default::default()
903 })
904 .await
905 {
906 error!("Failed to update daemon state for {id}: {e}");
907 }
908 });
909
910 if let Some(ready_rx) = ready_rx {
912 match ready_rx.await {
913 Ok(Ok(())) => {
914 info!("daemon {id} is ready");
915 Ok(IpcResponse::DaemonReady { daemon })
916 }
917 Ok(Err(exit_code)) => {
918 error!("daemon {id} failed before becoming ready");
919 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
920 }
921 Err(_) => {
922 error!("readiness channel closed unexpectedly for daemon {id}");
923 Ok(IpcResponse::DaemonStart { daemon })
924 }
925 }
926 } else {
927 Ok(IpcResponse::DaemonStart { daemon })
928 }
929 }
930
931 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
932 if id == "pitchfork" {
933 return Ok(IpcResponse::Error(
934 "Cannot stop supervisor via stop command".into(),
935 ));
936 }
937 info!("stopping daemon: {id}");
938 if let Some(daemon) = self.get_daemon(id).await {
939 trace!("daemon to stop: {daemon}");
940 if let Some(pid) = daemon.pid {
941 trace!("killing pid: {pid}");
942 PROCS.refresh_processes();
943 if PROCS.is_running(pid) {
944 self.upsert_daemon(UpsertDaemonOpts {
946 id: id.to_string(),
947 status: DaemonStatus::Stopping,
948 ..Default::default()
949 })
950 .await?;
951
952 if let Err(e) = PROCS.kill_async(pid).await {
954 warn!("failed to kill pid {pid}: {e}");
955 }
956 PROCS.refresh_processes();
957 for child_pid in PROCS.all_children(pid) {
958 debug!("killing child pid: {child_pid}");
959 if let Err(e) = PROCS.kill_async(child_pid).await {
960 warn!("failed to kill child pid {child_pid}: {e}");
961 }
962 }
963 } else {
965 debug!("pid {pid} not running");
966 self.upsert_daemon(UpsertDaemonOpts {
968 id: id.to_string(),
969 pid: None,
970 status: DaemonStatus::Stopped,
971 ..Default::default()
972 })
973 .await?;
974 }
975 return Ok(IpcResponse::Ok);
976 } else {
977 debug!("daemon {id} not running");
978 }
979 } else {
980 debug!("daemon {id} not found");
981 }
982 Ok(IpcResponse::DaemonAlreadyStopped)
983 }
984
985 #[cfg(unix)]
986 fn signals(&self) -> Result<()> {
987 let signals = [
988 SignalKind::terminate(),
989 SignalKind::alarm(),
990 SignalKind::interrupt(),
991 SignalKind::quit(),
992 SignalKind::hangup(),
993 SignalKind::user_defined1(),
994 SignalKind::user_defined2(),
995 ];
996 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
997 for signal in signals {
998 let stream = match signal::unix::signal(signal) {
999 Ok(s) => s,
1000 Err(e) => {
1001 warn!("Failed to register signal handler for {:?}: {}", signal, e);
1002 continue;
1003 }
1004 };
1005 tokio::spawn(async move {
1006 let mut stream = stream;
1007 loop {
1008 stream.recv().await;
1009 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1010 exit(1);
1011 } else {
1012 SUPERVISOR.handle_signal().await;
1013 }
1014 }
1015 });
1016 }
1017 Ok(())
1018 }
1019
1020 #[cfg(windows)]
1021 fn signals(&self) -> Result<()> {
1022 tokio::spawn(async move {
1023 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1024 loop {
1025 if let Err(e) = signal::ctrl_c().await {
1026 error!("Failed to wait for ctrl-c: {}", e);
1027 return;
1028 }
1029 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1030 exit(1);
1031 } else {
1032 SUPERVISOR.handle_signal().await;
1033 }
1034 }
1035 });
1036 Ok(())
1037 }
1038
1039 async fn handle_signal(&self) {
1040 info!("received signal, stopping");
1041 self.close().await;
1042 exit(0)
1043 }
1044
1045 fn interval_watch(&self) -> Result<()> {
1072 tokio::spawn(async move {
1073 let mut interval = time::interval(interval_duration());
1074 loop {
1075 interval.tick().await;
1076 if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
1077 && let Err(err) = SUPERVISOR.refresh().await
1078 {
1079 error!("failed to refresh: {err}");
1080 }
1081 }
1082 });
1083 Ok(())
1084 }
1085
1086 fn cron_watch(&self) -> Result<()> {
1087 tokio::spawn(async move {
1088 let mut interval = time::interval(Duration::from_secs(60));
1091 loop {
1092 interval.tick().await;
1093 if let Err(err) = SUPERVISOR.check_cron_schedules().await {
1094 error!("failed to check cron schedules: {err}");
1095 }
1096 }
1097 });
1098 Ok(())
1099 }
1100
1101 async fn check_cron_schedules(&self) -> Result<()> {
1102 use cron::Schedule;
1103 use std::str::FromStr;
1104
1105 let now = chrono::Local::now();
1106
1107 let cron_daemon_ids: Vec<String> = {
1109 let state_file = self.state_file.lock().await;
1110 state_file
1111 .daemons
1112 .iter()
1113 .filter(|(_id, d)| d.cron_schedule.is_some() && d.cron_retrigger.is_some())
1114 .map(|(id, _d)| id.clone())
1115 .collect()
1116 };
1117
1118 for id in cron_daemon_ids {
1119 let daemon = {
1121 let state_file = self.state_file.lock().await;
1122 match state_file.daemons.get(&id) {
1123 Some(d) => d.clone(),
1124 None => continue,
1125 }
1126 };
1127
1128 if let Some(schedule_str) = &daemon.cron_schedule
1129 && let Some(retrigger) = daemon.cron_retrigger
1130 {
1131 let schedule = match Schedule::from_str(schedule_str) {
1133 Ok(s) => s,
1134 Err(e) => {
1135 warn!("invalid cron schedule for daemon {id}: {e}");
1136 continue;
1137 }
1138 };
1139
1140 let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
1142 let diff = next.signed_duration_since(now);
1144 diff.num_seconds() < 60 && diff.num_seconds() >= 0
1145 });
1146
1147 if should_trigger {
1148 let should_run = match retrigger {
1149 crate::pitchfork_toml::CronRetrigger::Finish => {
1150 daemon.pid.is_none()
1152 }
1153 crate::pitchfork_toml::CronRetrigger::Always => {
1154 true
1156 }
1157 crate::pitchfork_toml::CronRetrigger::Success => {
1158 daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1160 }
1161 crate::pitchfork_toml::CronRetrigger::Fail => {
1162 daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1164 }
1165 };
1166
1167 if should_run {
1168 info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1169 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1171 let cmd = match shell_words::split(&run_cmd) {
1172 Ok(cmd) => cmd,
1173 Err(e) => {
1174 error!("failed to parse command for cron daemon {}: {}", id, e);
1175 continue;
1176 }
1177 };
1178 let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1179 let force =
1181 matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1182 let opts = RunOptions {
1183 id: id.clone(),
1184 cmd,
1185 force,
1186 shell_pid: None,
1187 dir,
1188 autostop: daemon.autostop,
1189 cron_schedule: Some(schedule_str.clone()),
1190 cron_retrigger: Some(retrigger),
1191 retry: daemon.retry,
1192 retry_count: daemon.retry_count,
1193 ready_delay: daemon.ready_delay,
1194 ready_output: daemon.ready_output.clone(),
1195 ready_http: daemon.ready_http.clone(),
1196 ready_port: daemon.ready_port,
1197 wait_ready: false,
1198 depends: daemon.depends.clone(),
1199 };
1200 if let Err(e) = self.run(opts).await {
1201 error!("failed to run cron daemon {id}: {e}");
1202 }
1203 } else {
1204 warn!("no run command found for cron daemon {id}");
1205 }
1206 }
1207 }
1208 }
1209 }
1210
1211 Ok(())
1212 }
1213
1214 fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1215 use crate::pitchfork_toml::PitchforkToml;
1216 let pt = PitchforkToml::all_merged();
1217 pt.daemons.get(id).map(|d| d.run.clone())
1218 }
1219
1220 async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1221 loop {
1222 let (msg, send) = match ipc.read().await {
1223 Ok(msg) => msg,
1224 Err(e) => {
1225 error!("failed to accept connection: {:?}", e);
1226 continue;
1227 }
1228 };
1229 debug!("received message: {:?}", msg);
1230 tokio::spawn(async move {
1231 let rsp = SUPERVISOR
1232 .handle_ipc(msg)
1233 .await
1234 .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1235 if let Err(err) = send.send(rsp).await {
1236 debug!("failed to send message: {:?}", err);
1237 }
1238 });
1239 }
1240 }
1241
1242 async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1243 let rsp = match req {
1244 IpcRequest::Connect => {
1245 debug!("received connect message");
1246 IpcResponse::Ok
1247 }
1248 IpcRequest::Stop { id } => {
1249 if let Err(e) = validate_daemon_id(&id) {
1250 return Ok(IpcResponse::Error(e));
1251 }
1252 self.stop(&id).await?
1253 }
1254 IpcRequest::Run(opts) => {
1255 if let Err(e) = validate_daemon_id(&opts.id) {
1256 return Ok(IpcResponse::Error(e));
1257 }
1258 self.run(opts).await?
1259 }
1260 IpcRequest::Enable { id } => {
1261 if let Err(e) = validate_daemon_id(&id) {
1262 return Ok(IpcResponse::Error(e));
1263 }
1264 if self.enable(id).await? {
1265 IpcResponse::Yes
1266 } else {
1267 IpcResponse::No
1268 }
1269 }
1270 IpcRequest::Disable { id } => {
1271 if let Err(e) = validate_daemon_id(&id) {
1272 return Ok(IpcResponse::Error(e));
1273 }
1274 if self.disable(id).await? {
1275 IpcResponse::Yes
1276 } else {
1277 IpcResponse::No
1278 }
1279 }
1280 IpcRequest::GetActiveDaemons => {
1281 let daemons = self.active_daemons().await;
1282 IpcResponse::ActiveDaemons(daemons)
1283 }
1284 IpcRequest::GetNotifications => {
1285 let notifications = self.get_notifications().await;
1286 IpcResponse::Notifications(notifications)
1287 }
1288 IpcRequest::UpdateShellDir { shell_pid, dir } => {
1289 let prev = self.get_shell_dir(shell_pid).await;
1290 self.set_shell_dir(shell_pid, dir.clone()).await?;
1291 self.cancel_pending_autostops_for_dir(&dir).await;
1293 if let Some(prev) = prev {
1294 self.leave_dir(&prev).await?;
1295 }
1296 self.refresh().await?;
1297 IpcResponse::Ok
1298 }
1299 IpcRequest::Clean => {
1300 self.clean().await?;
1301 IpcResponse::Ok
1302 }
1303 IpcRequest::GetDisabledDaemons => {
1304 let disabled = self.state_file.lock().await.disabled.clone();
1305 IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1306 }
1307 };
1308 Ok(rsp)
1309 }
1310
1311 async fn close(&self) {
1312 for daemon in self.active_daemons().await {
1313 if daemon.id == "pitchfork" {
1314 continue;
1315 }
1316 if let Err(err) = self.stop(&daemon.id).await {
1317 error!("failed to stop daemon {daemon}: {err}");
1318 }
1319 }
1320 let _ = self.remove_daemon("pitchfork").await;
1321
1322 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
1324 handle.shutdown();
1325 }
1326
1327 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1328 }
1329
1330 async fn add_notification(&self, level: log::LevelFilter, message: String) {
1331 self.pending_notifications
1332 .lock()
1333 .await
1334 .push((level, message));
1335 }
1336
1337 async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1338 self.pending_notifications.lock().await.drain(..).collect()
1339 }
1340
1341 async fn active_daemons(&self) -> Vec<Daemon> {
1342 self.state_file
1343 .lock()
1344 .await
1345 .daemons
1346 .values()
1347 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1348 .cloned()
1349 .collect()
1350 }
1351
1352 async fn remove_daemon(&self, id: &str) -> Result<()> {
1353 self.state_file.lock().await.daemons.remove(id);
1354 if let Err(err) = self.state_file.lock().await.write() {
1355 warn!("failed to update state file: {err:#}");
1356 }
1357 Ok(())
1358 }
1359
1360 async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1361 info!(
1362 "upserting daemon: {} pid: {} status: {}",
1363 opts.id,
1364 opts.pid.unwrap_or(0),
1365 opts.status
1366 );
1367 let mut state_file = self.state_file.lock().await;
1368 let existing = state_file.daemons.get(&opts.id);
1369 let daemon = Daemon {
1370 id: opts.id.to_string(),
1371 title: opts.pid.and_then(|pid| PROCS.title(pid)),
1372 pid: opts.pid,
1373 status: opts.status,
1374 shell_pid: opts.shell_pid,
1375 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1376 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1377 cron_schedule: opts
1378 .cron_schedule
1379 .or(existing.and_then(|d| d.cron_schedule.clone())),
1380 cron_retrigger: opts
1381 .cron_retrigger
1382 .or(existing.and_then(|d| d.cron_retrigger)),
1383 last_exit_success: opts
1384 .last_exit_success
1385 .or(existing.and_then(|d| d.last_exit_success)),
1386 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1387 retry_count: opts
1388 .retry_count
1389 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1390 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1391 ready_output: opts
1392 .ready_output
1393 .or(existing.and_then(|d| d.ready_output.clone())),
1394 ready_http: opts
1395 .ready_http
1396 .or(existing.and_then(|d| d.ready_http.clone())),
1397 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1398 depends: opts
1399 .depends
1400 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
1401 };
1402 state_file
1403 .daemons
1404 .insert(opts.id.to_string(), daemon.clone());
1405 if let Err(err) = state_file.write() {
1406 warn!("failed to update state file: {err:#}");
1407 }
1408 Ok(daemon)
1409 }
1410
1411 pub async fn enable(&self, id: String) -> Result<bool> {
1412 info!("enabling daemon: {id}");
1413 let mut state_file = self.state_file.lock().await;
1414 let result = state_file.disabled.remove(&id);
1415 state_file.write()?;
1416 Ok(result)
1417 }
1418
1419 pub async fn disable(&self, id: String) -> Result<bool> {
1420 info!("disabling daemon: {id}");
1421 let mut state_file = self.state_file.lock().await;
1422 let result = state_file.disabled.insert(id);
1423 state_file.write()?;
1424 Ok(result)
1425 }
1426
1427 async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1428 self.state_file.lock().await.daemons.get(id).cloned()
1429 }
1430
1431 async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1432 let mut state_file = self.state_file.lock().await;
1433 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1434 state_file.write()?;
1435 Ok(())
1436 }
1437
1438 async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1439 self.state_file
1440 .lock()
1441 .await
1442 .shell_dirs
1443 .get(&shell_pid.to_string())
1444 .cloned()
1445 }
1446
1447 async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1448 let mut state_file = self.state_file.lock().await;
1449 if state_file
1450 .shell_dirs
1451 .remove(&shell_pid.to_string())
1452 .is_some()
1453 {
1454 state_file.write()?;
1455 }
1456 Ok(())
1457 }
1458
1459 async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1460 self.state_file.lock().await.shell_dirs.iter().fold(
1461 HashMap::new(),
1462 |mut acc, (pid, dir)| {
1463 if let Ok(pid) = pid.parse() {
1464 acc.entry(dir.clone()).or_default().push(pid);
1465 }
1466 acc
1467 },
1468 )
1469 }
1470
1471 async fn clean(&self) -> Result<()> {
1472 let mut state_file = self.state_file.lock().await;
1473 state_file.daemons.retain(|_id, d| d.pid.is_some());
1474 state_file.write()?;
1475 Ok(())
1476 }
1477}
1478
1479#[derive(Debug)]
1480struct UpsertDaemonOpts {
1481 id: String,
1482 pid: Option<u32>,
1483 status: DaemonStatus,
1484 shell_pid: Option<u32>,
1485 dir: Option<PathBuf>,
1486 autostop: bool,
1487 cron_schedule: Option<String>,
1488 cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1489 last_exit_success: Option<bool>,
1490 retry: Option<u32>,
1491 retry_count: Option<u32>,
1492 ready_delay: Option<u64>,
1493 ready_output: Option<String>,
1494 ready_http: Option<String>,
1495 ready_port: Option<u16>,
1496 depends: Option<Vec<String>>,
1497}
1498
1499impl Default for UpsertDaemonOpts {
1500 fn default() -> Self {
1501 Self {
1502 id: "".to_string(),
1503 pid: None,
1504 status: DaemonStatus::Stopped,
1505 shell_pid: None,
1506 dir: None,
1507 autostop: false,
1508 cron_schedule: None,
1509 cron_retrigger: None,
1510 last_exit_success: None,
1511 retry: None,
1512 retry_count: None,
1513 ready_delay: None,
1514 ready_output: None,
1515 ready_http: None,
1516 ready_port: None,
1517 depends: None,
1518 }
1519 }
1520}