1use crate::daemon::{Daemon, RunOptions, validate_daemon_id};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::{IpcServer, IpcServerHandle};
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::pitchfork_toml::PitchforkToml;
6use crate::procs::PROCS;
7use crate::state_file::StateFile;
8use crate::watch_files::{WatchFiles, expand_watch_patterns, path_matches_patterns};
9use crate::{Result, env};
10use duct::cmd;
11use itertools::Itertools;
12use log::LevelFilter::Info;
13use miette::IntoDiagnostic;
14use notify::RecursiveMode;
15use once_cell::sync::Lazy;
16use regex::Regex;
17use std::collections::HashMap;
18use std::fs;
19use std::iter::once;
20use std::path::{Path, PathBuf};
21use std::process::exit;
22use std::sync::atomic;
23use std::sync::atomic::AtomicBool;
24use std::time::Duration;
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
26#[cfg(unix)]
27use tokio::signal::unix::SignalKind;
28use tokio::sync::Mutex;
29use tokio::sync::oneshot;
30use tokio::{select, signal, time};
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
38 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
39 if let Some(re) = cache.get(pattern) {
40 return Some(re.clone());
41 }
42 match Regex::new(pattern) {
43 Ok(re) => {
44 cache.insert(pattern.to_string(), re.clone());
45 Some(re)
46 }
47 Err(e) => {
48 error!("invalid regex pattern '{}': {}", pattern, e);
49 None
50 }
51 }
52}
53
54pub struct Supervisor {
55 state_file: Mutex<StateFile>,
56 pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
57 last_refreshed_at: Mutex<time::Instant>,
58 pending_autostops: Mutex<HashMap<String, time::Instant>>,
60 ipc_shutdown: Mutex<Option<IpcServerHandle>>,
62}
63
64fn interval_duration() -> Duration {
65 Duration::from_secs(*env::PITCHFORK_INTERVAL_SECS)
66}
67
68pub static SUPERVISOR: Lazy<Supervisor> =
69 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
70
71pub fn start_if_not_running() -> Result<()> {
72 let sf = StateFile::get();
73 if let Some(d) = sf.daemons.get("pitchfork")
74 && let Some(pid) = d.pid
75 && PROCS.is_running(pid)
76 {
77 return Ok(());
78 }
79 start_in_background()
80}
81
82pub fn start_in_background() -> Result<()> {
83 debug!("starting supervisor in background");
84 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
85 .stdout_null()
86 .stderr_null()
87 .start()
88 .into_diagnostic()?;
89 Ok(())
90}
91
92impl Supervisor {
93 pub fn new() -> Result<Self> {
94 Ok(Self {
95 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
96 last_refreshed_at: Mutex::new(time::Instant::now()),
97 pending_notifications: Mutex::new(vec![]),
98 pending_autostops: Mutex::new(HashMap::new()),
99 ipc_shutdown: Mutex::new(None),
100 })
101 }
102
103 pub async fn start(&self, is_boot: bool, web_port: Option<u16>) -> Result<()> {
104 let pid = std::process::id();
105 info!("Starting supervisor with pid {pid}");
106
107 self.upsert_daemon(UpsertDaemonOpts {
108 id: "pitchfork".to_string(),
109 pid: Some(pid),
110 status: DaemonStatus::Running,
111 ..Default::default()
112 })
113 .await?;
114
115 if is_boot {
117 info!("Boot start mode enabled, starting boot_start daemons");
118 self.start_boot_daemons().await?;
119 }
120
121 self.interval_watch()?;
122 self.cron_watch()?;
123 self.signals()?;
124 self.daemon_file_watch()?;
125
126 if let Some(port) = web_port {
128 tokio::spawn(async move {
129 if let Err(e) = crate::web::serve(port).await {
130 error!("Web server error: {}", e);
131 }
132 });
133 }
134
135 let (ipc, ipc_handle) = IpcServer::new()?;
136 *self.ipc_shutdown.lock().await = Some(ipc_handle);
137 self.conn_watch(ipc).await
138 }
139
140 async fn refresh(&self) -> Result<()> {
141 trace!("refreshing");
142
143 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
146 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
147
148 if pids_to_check.is_empty() {
149 trace!("no shell PIDs to check, skipping process refresh");
151 } else {
152 PROCS.refresh_pids(&pids_to_check);
153 }
154
155 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
156 *last_refreshed_at = time::Instant::now();
157
158 for (dir, pids) in dirs_with_pids {
159 let to_remove = pids
160 .iter()
161 .filter(|pid| !PROCS.is_running(**pid))
162 .collect_vec();
163 for pid in &to_remove {
164 self.remove_shell_pid(**pid).await?
165 }
166 if to_remove.len() == pids.len() {
167 self.leave_dir(&dir).await?;
168 }
169 }
170
171 self.check_retry().await?;
172 self.process_pending_autostops().await?;
173
174 Ok(())
175 }
176
177 async fn check_retry(&self) -> Result<()> {
178 let ids_to_retry: Vec<String> = {
180 let state_file = self.state_file.lock().await;
181 state_file
182 .daemons
183 .iter()
184 .filter(|(_id, d)| {
185 d.status.is_errored()
187 && d.pid.is_none()
188 && d.retry > 0
189 && d.retry_count < d.retry
190 })
191 .map(|(id, _d)| id.clone())
192 .collect()
193 };
194
195 for id in ids_to_retry {
196 let daemon = {
199 let state_file = self.state_file.lock().await;
200 match state_file.daemons.get(&id) {
201 Some(d)
202 if d.status.is_errored()
203 && d.pid.is_none()
204 && d.retry > 0
205 && d.retry_count < d.retry =>
206 {
207 d.clone()
208 }
209 _ => continue, }
211 };
212 info!(
213 "retrying daemon {} ({}/{} attempts)",
214 id,
215 daemon.retry_count + 1,
216 daemon.retry
217 );
218
219 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
221 let cmd = match shell_words::split(&run_cmd) {
222 Ok(cmd) => cmd,
223 Err(e) => {
224 error!("failed to parse command for daemon {}: {}", id, e);
225 self.upsert_daemon(UpsertDaemonOpts {
227 id,
228 status: daemon.status.clone(),
229 retry_count: Some(daemon.retry),
230 ..Default::default()
231 })
232 .await?;
233 continue;
234 }
235 };
236 let retry_opts = RunOptions {
237 id: id.clone(),
238 cmd,
239 force: false,
240 shell_pid: daemon.shell_pid,
241 dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
242 autostop: daemon.autostop,
243 cron_schedule: daemon.cron_schedule,
244 cron_retrigger: daemon.cron_retrigger,
245 retry: daemon.retry,
246 retry_count: daemon.retry_count + 1,
247 ready_delay: daemon.ready_delay,
248 ready_output: daemon.ready_output.clone(),
249 ready_http: daemon.ready_http.clone(),
250 ready_port: daemon.ready_port,
251 wait_ready: false,
252 depends: daemon.depends.clone(),
253 };
254 if let Err(e) = self.run(retry_opts).await {
255 error!("failed to retry daemon {}: {}", id, e);
256 }
257 } else {
258 warn!("no run command found for daemon {}, cannot retry", id);
259 self.upsert_daemon(UpsertDaemonOpts {
261 id,
262 retry_count: Some(daemon.retry),
263 ..Default::default()
264 })
265 .await?;
266 }
267 }
268
269 Ok(())
270 }
271
272 async fn leave_dir(&self, dir: &Path) -> Result<()> {
273 debug!("left dir {}", dir.display());
274 let shell_dirs = self.get_dirs_with_shell_pids().await;
275 let shell_dirs = shell_dirs.keys().collect_vec();
276 let delay_secs = *env::PITCHFORK_AUTOSTOP_DELAY;
277
278 for daemon in self.active_daemons().await {
279 if !daemon.autostop {
280 continue;
281 }
282 if let Some(daemon_dir) = daemon.dir.as_ref()
286 && daemon_dir.starts_with(dir)
287 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
288 {
289 if delay_secs == 0 {
290 info!("autostopping {daemon}");
292 self.stop(&daemon.id).await?;
293 self.add_notification(Info, format!("autostopped {daemon}"))
294 .await;
295 } else {
296 let stop_at = time::Instant::now() + Duration::from_secs(delay_secs);
298 let mut pending = self.pending_autostops.lock().await;
299 if !pending.contains_key(&daemon.id) {
300 info!("scheduling autostop for {} in {}s", daemon.id, delay_secs);
301 pending.insert(daemon.id.clone(), stop_at);
302 }
303 }
304 }
305 }
306 Ok(())
307 }
308
309 async fn cancel_pending_autostops_for_dir(&self, dir: &Path) {
313 let mut pending = self.pending_autostops.lock().await;
314 let daemons_to_cancel: Vec<String> = {
315 let state_file = self.state_file.lock().await;
316 state_file
317 .daemons
318 .iter()
319 .filter(|(_id, d)| {
320 d.dir.as_ref().is_some_and(|daemon_dir| {
321 dir.starts_with(daemon_dir) || daemon_dir.starts_with(dir)
324 })
325 })
326 .map(|(id, _)| id.clone())
327 .collect()
328 };
329
330 for daemon_id in daemons_to_cancel {
331 if pending.remove(&daemon_id).is_some() {
332 info!("cancelled pending autostop for {}", daemon_id);
333 }
334 }
335 }
336
337 async fn process_pending_autostops(&self) -> Result<()> {
339 let now = time::Instant::now();
340 let to_stop: Vec<String> = {
341 let pending = self.pending_autostops.lock().await;
342 pending
343 .iter()
344 .filter(|(_, stop_at)| now >= **stop_at)
345 .map(|(id, _)| id.clone())
346 .collect()
347 };
348
349 for daemon_id in to_stop {
350 {
352 let mut pending = self.pending_autostops.lock().await;
353 pending.remove(&daemon_id);
354 }
355
356 if let Some(daemon) = self.get_daemon(&daemon_id).await
358 && daemon.autostop
359 && daemon.status.is_running()
360 {
361 let shell_dirs = self.get_dirs_with_shell_pids().await;
363 let shell_dirs = shell_dirs.keys().collect_vec();
364 if let Some(daemon_dir) = daemon.dir.as_ref()
365 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
366 {
367 info!("autostopping {} (after delay)", daemon_id);
368 self.stop(&daemon_id).await?;
369 self.add_notification(Info, format!("autostopped {daemon_id}"))
370 .await;
371 }
372 }
373 }
374 Ok(())
375 }
376
377 async fn start_boot_daemons(&self) -> Result<()> {
378 use crate::pitchfork_toml::PitchforkToml;
379
380 info!("Scanning for boot_start daemons");
381 let pt = PitchforkToml::all_merged();
382
383 let boot_daemons: Vec<_> = pt
384 .daemons
385 .iter()
386 .filter(|(_id, d)| d.boot_start.unwrap_or(false))
387 .collect();
388
389 if boot_daemons.is_empty() {
390 info!("No daemons configured with boot_start = true");
391 return Ok(());
392 }
393
394 info!("Found {} daemon(s) to start at boot", boot_daemons.len());
395
396 for (id, daemon) in boot_daemons {
397 info!("Starting boot daemon: {}", id);
398
399 let dir = daemon
400 .path
401 .as_ref()
402 .and_then(|p| p.parent())
403 .map(|p| p.to_path_buf())
404 .unwrap_or_else(|| env::CWD.clone());
405
406 let cmd = match shell_words::split(&daemon.run) {
407 Ok(cmd) => cmd,
408 Err(e) => {
409 error!("failed to parse command for boot daemon {}: {}", id, e);
410 continue;
411 }
412 };
413 let run_opts = RunOptions {
414 id: id.clone(),
415 cmd,
416 force: false,
417 shell_pid: None,
418 dir,
419 autostop: false, cron_schedule: daemon.cron.as_ref().map(|c| c.schedule.clone()),
421 cron_retrigger: daemon.cron.as_ref().map(|c| c.retrigger),
422 retry: daemon.retry.count(),
423 retry_count: 0,
424 ready_delay: daemon.ready_delay,
425 ready_output: daemon.ready_output.clone(),
426 ready_http: daemon.ready_http.clone(),
427 ready_port: daemon.ready_port,
428 wait_ready: false, depends: daemon.depends.clone(),
430 };
431
432 match self.run(run_opts).await {
433 Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
434 info!("Successfully started boot daemon: {}", id);
435 }
436 Ok(IpcResponse::DaemonAlreadyRunning) => {
437 info!("Boot daemon already running: {}", id);
438 }
439 Ok(other) => {
440 warn!(
441 "Unexpected response when starting boot daemon {}: {:?}",
442 id, other
443 );
444 }
445 Err(e) => {
446 error!("Failed to start boot daemon {}: {}", id, e);
447 }
448 }
449 }
450
451 Ok(())
452 }
453
454 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
455 let id = &opts.id;
456 let cmd = opts.cmd.clone();
457
458 {
460 let mut pending = self.pending_autostops.lock().await;
461 if pending.remove(id).is_some() {
462 info!("cleared pending autostop for {} (daemon starting)", id);
463 }
464 }
465
466 let daemon = self.get_daemon(id).await;
467 if let Some(daemon) = daemon {
468 if !daemon.status.is_stopping()
471 && !daemon.status.is_stopped()
472 && let Some(pid) = daemon.pid
473 {
474 if opts.force {
475 self.stop(id).await?;
476 info!("run: stop completed for daemon {id}");
477 } else {
478 warn!("daemon {id} already running with pid {pid}");
479 return Ok(IpcResponse::DaemonAlreadyRunning);
480 }
481 }
482 }
483
484 if opts.wait_ready && opts.retry > 0 {
486 let max_attempts = opts.retry.saturating_add(1);
488 for attempt in 0..max_attempts {
489 let mut retry_opts = opts.clone();
490 retry_opts.retry_count = attempt;
491 retry_opts.cmd = cmd.clone();
492
493 let result = self.run_once(retry_opts).await?;
494
495 match result {
496 IpcResponse::DaemonReady { daemon } => {
497 return Ok(IpcResponse::DaemonReady { daemon });
498 }
499 IpcResponse::DaemonFailedWithCode { exit_code } => {
500 if attempt < opts.retry {
501 let backoff_secs = 2u64.pow(attempt);
502 info!(
503 "daemon {id} failed (attempt {}/{}), retrying in {}s",
504 attempt + 1,
505 max_attempts,
506 backoff_secs
507 );
508 time::sleep(Duration::from_secs(backoff_secs)).await;
509 continue;
510 } else {
511 info!("daemon {id} failed after {} attempts", max_attempts);
512 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
513 }
514 }
515 other => return Ok(other),
516 }
517 }
518 }
519
520 self.run_once(opts).await
522 }
523
524 async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
525 let id = &opts.id;
526 let cmd = opts.cmd;
527
528 let (ready_tx, ready_rx) = if opts.wait_ready {
530 let (tx, rx) = oneshot::channel();
531 (Some(tx), Some(rx))
532 } else {
533 (None, None)
534 };
535
536 let cmd = once("exec".to_string())
537 .chain(cmd.into_iter())
538 .collect_vec();
539 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
540 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
541 if let Some(parent) = log_path.parent() {
542 xx::file::mkdirp(parent)?;
543 }
544 info!("run: spawning daemon {id} with args: {args:?}");
545 let mut cmd = tokio::process::Command::new("sh");
546 cmd.args(&args)
547 .stdin(std::process::Stdio::null())
548 .stdout(std::process::Stdio::piped())
549 .stderr(std::process::Stdio::piped())
550 .current_dir(&opts.dir);
551
552 if let Some(ref path) = *env::ORIGINAL_PATH {
554 cmd.env("PATH", path);
555 }
556
557 let mut child = cmd.spawn().into_diagnostic()?;
558 let pid = match child.id() {
559 Some(p) => p,
560 None => {
561 warn!("Daemon {id} exited before PID could be captured");
562 return Ok(IpcResponse::DaemonFailed {
563 error: "Process exited immediately".to_string(),
564 });
565 }
566 };
567 info!("started daemon {id} with pid {pid}");
568 let daemon = self
569 .upsert_daemon(UpsertDaemonOpts {
570 id: id.to_string(),
571 pid: Some(pid),
572 status: DaemonStatus::Running,
573 shell_pid: opts.shell_pid,
574 dir: Some(opts.dir.clone()),
575 autostop: opts.autostop,
576 cron_schedule: opts.cron_schedule.clone(),
577 cron_retrigger: opts.cron_retrigger,
578 last_exit_success: None,
579 retry: Some(opts.retry),
580 retry_count: Some(opts.retry_count),
581 ready_delay: opts.ready_delay,
582 ready_output: opts.ready_output.clone(),
583 ready_http: opts.ready_http.clone(),
584 ready_port: opts.ready_port,
585 depends: Some(opts.depends.clone()),
586 })
587 .await?;
588
589 let id_clone = id.to_string();
590 let ready_delay = opts.ready_delay;
591 let ready_output = opts.ready_output.clone();
592 let ready_http = opts.ready_http.clone();
593 let ready_port = opts.ready_port;
594
595 tokio::spawn(async move {
596 let id = id_clone;
597 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
598 (Some(out), Some(err)) => (out, err),
599 _ => {
600 error!("Failed to capture stdout/stderr for daemon {id}");
601 return;
602 }
603 };
604 let mut stdout = tokio::io::BufReader::new(stdout).lines();
605 let mut stderr = tokio::io::BufReader::new(stderr).lines();
606 let log_file = match tokio::fs::File::options()
607 .append(true)
608 .create(true)
609 .open(&log_path)
610 .await
611 {
612 Ok(f) => f,
613 Err(e) => {
614 error!("Failed to open log file for daemon {id}: {e}");
615 return;
616 }
617 };
618 let mut log_appender = BufWriter::new(log_file);
619
620 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
621 let format_line = |line: String| {
622 if line.starts_with(&format!("{id} ")) {
623 format!("{} {line}\n", now())
625 } else {
626 format!("{} {id} {line}\n", now())
627 }
628 };
629
630 let mut ready_notified = false;
632 let mut ready_tx = ready_tx;
633 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
634
635 let mut delay_timer =
636 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
637
638 let mut http_check_interval = ready_http
640 .as_ref()
641 .map(|_| tokio::time::interval(Duration::from_millis(500)));
642 let http_client = ready_http.as_ref().map(|_| {
643 reqwest::Client::builder()
644 .timeout(Duration::from_secs(5))
645 .build()
646 .unwrap_or_default()
647 });
648
649 let mut port_check_interval =
651 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
652
653 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
655
656 let (exit_tx, mut exit_rx) =
658 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
659
660 let child_pid = child.id().unwrap_or(0);
662 tokio::spawn(async move {
663 let result = child.wait().await;
664 debug!(
665 "daemon pid {child_pid} wait() completed with result: {:?}",
666 result
667 );
668 let _ = exit_tx.send(result).await;
669 });
670
671 #[allow(unused_assignments)]
672 let mut exit_status = None;
674
675 loop {
676 select! {
677 Ok(Some(line)) = stdout.next_line() => {
678 let formatted = format_line(line.clone());
679 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
680 error!("Failed to write to log for daemon {id}: {e}");
681 }
682 trace!("stdout: {id} {formatted}");
683
684 if !ready_notified
686 && let Some(ref pattern) = ready_pattern
687 && pattern.is_match(&line) {
688 info!("daemon {id} ready: output matched pattern");
689 ready_notified = true;
690 let _ = log_appender.flush().await;
692 if let Some(tx) = ready_tx.take() {
693 let _ = tx.send(Ok(()));
694 }
695 }
696 }
697 Ok(Some(line)) = stderr.next_line() => {
698 let formatted = format_line(line.clone());
699 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
700 error!("Failed to write to log for daemon {id}: {e}");
701 }
702 trace!("stderr: {id} {formatted}");
703
704 if !ready_notified
706 && let Some(ref pattern) = ready_pattern
707 && pattern.is_match(&line) {
708 info!("daemon {id} ready: output matched pattern");
709 ready_notified = true;
710 let _ = log_appender.flush().await;
712 if let Some(tx) = ready_tx.take() {
713 let _ = tx.send(Ok(()));
714 }
715 }
716 },
717 Some(result) = exit_rx.recv() => {
718 exit_status = Some(result);
720 debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
721 let _ = log_appender.flush().await;
723 if !ready_notified {
724 if let Some(tx) = ready_tx.take() {
725 let is_success = exit_status.as_ref()
727 .and_then(|r| r.as_ref().ok())
728 .map(|s| s.success())
729 .unwrap_or(false);
730
731 if is_success {
732 debug!("daemon {id} exited successfully before ready check, sending success notification");
733 let _ = tx.send(Ok(()));
734 } else {
735 let exit_code = exit_status.as_ref()
736 .and_then(|r| r.as_ref().ok())
737 .and_then(|s| s.code());
738 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
739 let _ = tx.send(Err(exit_code));
740 }
741 }
742 } else {
743 debug!("daemon {id} was already marked ready, not sending notification");
744 }
745 break;
746 }
747 _ = async {
748 if let Some(ref mut interval) = http_check_interval {
749 interval.tick().await;
750 } else {
751 std::future::pending::<()>().await;
752 }
753 }, if !ready_notified && ready_http.is_some() => {
754 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
755 match client.get(url).send().await {
756 Ok(response) if response.status().is_success() => {
757 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
758 ready_notified = true;
759 let _ = log_appender.flush().await;
761 if let Some(tx) = ready_tx.take() {
762 let _ = tx.send(Ok(()));
763 }
764 http_check_interval = None;
766 }
767 Ok(response) => {
768 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
769 }
770 Err(e) => {
771 trace!("daemon {id} HTTP check failed: {e}");
772 }
773 }
774 }
775 }
776 _ = async {
777 if let Some(ref mut interval) = port_check_interval {
778 interval.tick().await;
779 } else {
780 std::future::pending::<()>().await;
781 }
782 }, if !ready_notified && ready_port.is_some() => {
783 if let Some(port) = ready_port {
784 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
785 Ok(_) => {
786 info!("daemon {id} ready: TCP port {port} is listening");
787 ready_notified = true;
788 let _ = log_appender.flush().await;
790 if let Some(tx) = ready_tx.take() {
791 let _ = tx.send(Ok(()));
792 }
793 port_check_interval = None;
795 }
796 Err(_) => {
797 trace!("daemon {id} port check: port {port} not listening yet");
798 }
799 }
800 }
801 }
802 _ = async {
803 if let Some(ref mut timer) = delay_timer {
804 timer.await;
805 } else {
806 std::future::pending::<()>().await;
807 }
808 } => {
809 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
810 info!("daemon {id} ready: delay elapsed");
811 ready_notified = true;
812 let _ = log_appender.flush().await;
814 if let Some(tx) = ready_tx.take() {
815 let _ = tx.send(Ok(()));
816 }
817 }
818 delay_timer = None;
820 }
821 _ = log_flush_interval.tick() => {
822 if let Err(e) = log_appender.flush().await {
824 error!("Failed to flush log for daemon {id}: {e}");
825 }
826 }
827 }
830 }
831
832 if let Err(e) = log_appender.flush().await {
834 error!("Failed to final flush log for daemon {id}: {e}");
835 }
836
837 let exit_status = if let Some(status) = exit_status {
839 status
840 } else {
841 match exit_rx.recv().await {
843 Some(status) => status,
844 None => {
845 warn!("daemon {id} exit channel closed without receiving status");
846 Err(std::io::Error::other("exit channel closed"))
847 }
848 }
849 };
850 let current_daemon = SUPERVISOR.get_daemon(&id).await;
851
852 if current_daemon.is_none()
854 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
855 {
856 return;
858 }
859 let is_stopping = current_daemon
860 .as_ref()
861 .is_some_and(|d| d.status.is_stopping());
862
863 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
864 return;
866 }
867 if let Ok(status) = exit_status {
868 info!("daemon {id} exited with status {status}");
869 if status.success() || is_stopping {
870 if let Err(e) = SUPERVISOR
873 .upsert_daemon(UpsertDaemonOpts {
874 id: id.clone(),
875 pid: None, status: DaemonStatus::Stopped,
877 last_exit_success: Some(status.success()),
878 ..Default::default()
879 })
880 .await
881 {
882 error!("Failed to update daemon state for {id}: {e}");
883 }
884 } else {
885 if let Err(e) = SUPERVISOR
888 .upsert_daemon(UpsertDaemonOpts {
889 id: id.clone(),
890 pid: None,
891 status: DaemonStatus::Errored(status.code()),
892 last_exit_success: Some(false),
893 ..Default::default()
894 })
895 .await
896 {
897 error!("Failed to update daemon state for {id}: {e}");
898 }
899 }
900 } else if let Err(e) = SUPERVISOR
901 .upsert_daemon(UpsertDaemonOpts {
902 id: id.clone(),
903 pid: None,
904 status: DaemonStatus::Errored(None),
905 last_exit_success: Some(false),
906 ..Default::default()
907 })
908 .await
909 {
910 error!("Failed to update daemon state for {id}: {e}");
911 }
912 });
913
914 if let Some(ready_rx) = ready_rx {
916 match ready_rx.await {
917 Ok(Ok(())) => {
918 info!("daemon {id} is ready");
919 Ok(IpcResponse::DaemonReady { daemon })
920 }
921 Ok(Err(exit_code)) => {
922 error!("daemon {id} failed before becoming ready");
923 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
924 }
925 Err(_) => {
926 error!("readiness channel closed unexpectedly for daemon {id}");
927 Ok(IpcResponse::DaemonStart { daemon })
928 }
929 }
930 } else {
931 Ok(IpcResponse::DaemonStart { daemon })
932 }
933 }
934
935 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
936 if id == "pitchfork" {
937 return Ok(IpcResponse::Error(
938 "Cannot stop supervisor via stop command".into(),
939 ));
940 }
941 info!("stopping daemon: {id}");
942 if let Some(daemon) = self.get_daemon(id).await {
943 trace!("daemon to stop: {daemon}");
944 if let Some(pid) = daemon.pid {
945 trace!("killing pid: {pid}");
946 PROCS.refresh_processes();
947 if PROCS.is_running(pid) {
948 self.upsert_daemon(UpsertDaemonOpts {
950 id: id.to_string(),
951 status: DaemonStatus::Stopping,
952 ..Default::default()
953 })
954 .await?;
955
956 if let Err(e) = PROCS.kill_async(pid).await {
958 warn!("failed to kill pid {pid}: {e}");
959 }
960 PROCS.refresh_processes();
961 for child_pid in PROCS.all_children(pid) {
962 debug!("killing child pid: {child_pid}");
963 if let Err(e) = PROCS.kill_async(child_pid).await {
964 warn!("failed to kill child pid {child_pid}: {e}");
965 }
966 }
967 } else {
969 debug!("pid {pid} not running");
970 self.upsert_daemon(UpsertDaemonOpts {
972 id: id.to_string(),
973 pid: None,
974 status: DaemonStatus::Stopped,
975 ..Default::default()
976 })
977 .await?;
978 }
979 return Ok(IpcResponse::Ok);
980 } else {
981 debug!("daemon {id} not running");
982 }
983 } else {
984 debug!("daemon {id} not found");
985 }
986 Ok(IpcResponse::DaemonAlreadyStopped)
987 }
988
989 #[cfg(unix)]
990 fn signals(&self) -> Result<()> {
991 let signals = [
992 SignalKind::terminate(),
993 SignalKind::alarm(),
994 SignalKind::interrupt(),
995 SignalKind::quit(),
996 SignalKind::hangup(),
997 SignalKind::user_defined1(),
998 SignalKind::user_defined2(),
999 ];
1000 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1001 for signal in signals {
1002 let stream = match signal::unix::signal(signal) {
1003 Ok(s) => s,
1004 Err(e) => {
1005 warn!("Failed to register signal handler for {:?}: {}", signal, e);
1006 continue;
1007 }
1008 };
1009 tokio::spawn(async move {
1010 let mut stream = stream;
1011 loop {
1012 stream.recv().await;
1013 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1014 exit(1);
1015 } else {
1016 SUPERVISOR.handle_signal().await;
1017 }
1018 }
1019 });
1020 }
1021 Ok(())
1022 }
1023
1024 #[cfg(windows)]
1025 fn signals(&self) -> Result<()> {
1026 tokio::spawn(async move {
1027 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
1028 loop {
1029 if let Err(e) = signal::ctrl_c().await {
1030 error!("Failed to wait for ctrl-c: {}", e);
1031 return;
1032 }
1033 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
1034 exit(1);
1035 } else {
1036 SUPERVISOR.handle_signal().await;
1037 }
1038 }
1039 });
1040 Ok(())
1041 }
1042
1043 async fn handle_signal(&self) {
1044 info!("received signal, stopping");
1045 self.close().await;
1046 exit(0)
1047 }
1048
1049 fn daemon_file_watch(&self) -> Result<()> {
1052 let pt = PitchforkToml::all_merged();
1053
1054 let watch_configs: Vec<(String, Vec<String>, PathBuf)> = pt
1056 .daemons
1057 .iter()
1058 .filter(|(_, d)| !d.watch.is_empty())
1059 .map(|(id, d)| {
1060 let base_dir = d
1061 .path
1062 .as_ref()
1063 .and_then(|p| p.parent())
1064 .map(|p| p.to_path_buf())
1065 .unwrap_or_else(|| env::CWD.clone());
1066 (id.clone(), d.watch.clone(), base_dir)
1067 })
1068 .collect();
1069
1070 if watch_configs.is_empty() {
1071 debug!("No daemons with watch patterns configured");
1072 return Ok(());
1073 }
1074
1075 info!(
1076 "Setting up file watching for {} daemon(s)",
1077 watch_configs.len()
1078 );
1079
1080 let mut all_dirs = std::collections::HashSet::new();
1082 for (id, patterns, base_dir) in &watch_configs {
1083 match expand_watch_patterns(patterns, base_dir) {
1084 Ok(dirs) => {
1085 for dir in &dirs {
1086 debug!("Watching {} for daemon {}", dir.display(), id);
1087 }
1088 all_dirs.extend(dirs);
1089 }
1090 Err(e) => {
1091 warn!("Failed to expand watch patterns for {}: {}", id, e);
1092 }
1093 }
1094 }
1095
1096 if all_dirs.is_empty() {
1097 debug!("No directories to watch after expanding patterns");
1098 return Ok(());
1099 }
1100
1101 tokio::spawn(async move {
1103 let mut wf = match WatchFiles::new(Duration::from_secs(1)) {
1104 Ok(wf) => wf,
1105 Err(e) => {
1106 error!("Failed to create file watcher: {}", e);
1107 return;
1108 }
1109 };
1110
1111 for dir in all_dirs {
1113 if let Err(e) = wf.watch(&dir, RecursiveMode::Recursive) {
1114 warn!("Failed to watch directory {}: {}", dir.display(), e);
1115 }
1116 }
1117
1118 info!("File watcher started");
1119
1120 while let Some(changed_paths) = wf.rx.recv().await {
1122 debug!("File changes detected: {:?}", changed_paths);
1123
1124 let mut daemons_to_restart = std::collections::HashSet::new();
1126
1127 for changed_path in &changed_paths {
1128 for (id, patterns, base_dir) in &watch_configs {
1129 if path_matches_patterns(changed_path, patterns, base_dir) {
1130 info!(
1131 "File {} matched pattern for daemon {}, scheduling restart",
1132 changed_path.display(),
1133 id
1134 );
1135 daemons_to_restart.insert(id.clone());
1136 }
1137 }
1138 }
1139
1140 for id in daemons_to_restart {
1142 if let Err(e) = SUPERVISOR.restart_watched_daemon(&id).await {
1143 error!("Failed to restart daemon {} after file change: {}", id, e);
1144 }
1145 }
1146 }
1147 });
1148
1149 Ok(())
1150 }
1151
1152 async fn restart_watched_daemon(&self, id: &str) -> Result<()> {
1155 let daemon = self.get_daemon(id).await;
1157 let is_running = daemon
1158 .as_ref()
1159 .is_some_and(|d| d.pid.is_some() && d.status.is_running());
1160
1161 if !is_running {
1162 debug!(
1163 "Daemon {} is not running, skipping restart on file change",
1164 id
1165 );
1166 return Ok(());
1167 }
1168
1169 let is_disabled = self.state_file.lock().await.disabled.contains(id);
1171 if is_disabled {
1172 debug!("Daemon {} is disabled, skipping restart on file change", id);
1173 return Ok(());
1174 }
1175
1176 info!("Restarting daemon {} due to file change", id);
1177
1178 let pt = PitchforkToml::all_merged();
1180 let Some(daemon_config) = pt.daemons.get(id) else {
1181 warn!("Daemon {} not found in config, cannot restart", id);
1182 return Ok(());
1183 };
1184
1185 let dir = daemon_config
1186 .path
1187 .as_ref()
1188 .and_then(|p| p.parent())
1189 .map(|p| p.to_path_buf())
1190 .unwrap_or_else(|| env::CWD.clone());
1191
1192 let cmd = match shell_words::split(&daemon_config.run) {
1193 Ok(cmd) => cmd,
1194 Err(e) => {
1195 error!("Failed to parse command for daemon {}: {}", id, e);
1196 return Ok(());
1197 }
1198 };
1199
1200 let shell_pid = daemon.as_ref().and_then(|d| d.shell_pid);
1202 let autostop = daemon.as_ref().map(|d| d.autostop).unwrap_or(false);
1203
1204 let _ = self.stop(id).await;
1206
1207 time::sleep(Duration::from_millis(100)).await;
1209
1210 let run_opts = RunOptions {
1212 id: id.to_string(),
1213 cmd,
1214 force: true,
1215 shell_pid,
1216 dir,
1217 autostop,
1218 cron_schedule: daemon_config.cron.as_ref().map(|c| c.schedule.clone()),
1219 cron_retrigger: daemon_config.cron.as_ref().map(|c| c.retrigger),
1220 retry: daemon_config.retry.count(),
1221 retry_count: 0,
1222 ready_delay: daemon_config.ready_delay,
1223 ready_output: daemon_config.ready_output.clone(),
1224 ready_http: daemon_config.ready_http.clone(),
1225 ready_port: daemon_config.ready_port,
1226 wait_ready: false, depends: daemon_config.depends.clone(),
1228 };
1229
1230 match self.run(run_opts).await {
1231 Ok(IpcResponse::DaemonStart { .. }) | Ok(IpcResponse::DaemonReady { .. }) => {
1232 info!("Successfully restarted daemon {} after file change", id);
1233 }
1234 Ok(other) => {
1235 warn!(
1236 "Unexpected response when restarting daemon {}: {:?}",
1237 id, other
1238 );
1239 }
1240 Err(e) => {
1241 error!("Failed to restart daemon {}: {}", id, e);
1242 }
1243 }
1244
1245 Ok(())
1246 }
1247
1248 fn interval_watch(&self) -> Result<()> {
1249 tokio::spawn(async move {
1250 let mut interval = time::interval(interval_duration());
1251 loop {
1252 interval.tick().await;
1253 if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > interval_duration()
1254 && let Err(err) = SUPERVISOR.refresh().await
1255 {
1256 error!("failed to refresh: {err}");
1257 }
1258 }
1259 });
1260 Ok(())
1261 }
1262
1263 fn cron_watch(&self) -> Result<()> {
1264 tokio::spawn(async move {
1265 let mut interval = time::interval(Duration::from_secs(10));
1267 loop {
1268 interval.tick().await;
1269 if let Err(err) = SUPERVISOR.check_cron_schedules().await {
1270 error!("failed to check cron schedules: {err}");
1271 }
1272 }
1273 });
1274 Ok(())
1275 }
1276
1277 async fn check_cron_schedules(&self) -> Result<()> {
1278 use cron::Schedule;
1279 use std::str::FromStr;
1280
1281 let now = chrono::Local::now();
1282
1283 let cron_daemon_ids: Vec<String> = {
1285 let state_file = self.state_file.lock().await;
1286 state_file
1287 .daemons
1288 .iter()
1289 .filter(|(_id, d)| d.cron_schedule.is_some() && d.cron_retrigger.is_some())
1290 .map(|(id, _d)| id.clone())
1291 .collect()
1292 };
1293
1294 for id in cron_daemon_ids {
1295 let daemon = {
1297 let state_file = self.state_file.lock().await;
1298 match state_file.daemons.get(&id) {
1299 Some(d) => d.clone(),
1300 None => continue,
1301 }
1302 };
1303
1304 if let Some(schedule_str) = &daemon.cron_schedule
1305 && let Some(retrigger) = daemon.cron_retrigger
1306 {
1307 let schedule = match Schedule::from_str(schedule_str) {
1309 Ok(s) => s,
1310 Err(e) => {
1311 warn!("invalid cron schedule for daemon {id}: {e}");
1312 continue;
1313 }
1314 };
1315
1316 let check_since = daemon
1319 .last_cron_triggered
1320 .unwrap_or_else(|| now - chrono::Duration::seconds(10));
1321
1322 let should_trigger = schedule
1324 .after(&check_since)
1325 .take_while(|t| *t <= now)
1326 .next()
1327 .is_some();
1328
1329 if should_trigger {
1330 {
1332 let mut state_file = self.state_file.lock().await;
1333 if let Some(d) = state_file.daemons.get_mut(&id) {
1334 d.last_cron_triggered = Some(now);
1335 }
1336 if let Err(e) = state_file.write() {
1337 error!("failed to update cron trigger time: {e}");
1338 }
1339 }
1340
1341 let should_run = match retrigger {
1342 crate::pitchfork_toml::CronRetrigger::Finish => {
1343 daemon.pid.is_none()
1345 }
1346 crate::pitchfork_toml::CronRetrigger::Always => {
1347 true
1349 }
1350 crate::pitchfork_toml::CronRetrigger::Success => {
1351 daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
1353 }
1354 crate::pitchfork_toml::CronRetrigger::Fail => {
1355 daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
1357 }
1358 };
1359
1360 if should_run {
1361 info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
1362 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
1364 let cmd = match shell_words::split(&run_cmd) {
1365 Ok(cmd) => cmd,
1366 Err(e) => {
1367 error!("failed to parse command for cron daemon {}: {}", id, e);
1368 continue;
1369 }
1370 };
1371 let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
1372 let force =
1374 matches!(retrigger, crate::pitchfork_toml::CronRetrigger::Always);
1375 let opts = RunOptions {
1376 id: id.clone(),
1377 cmd,
1378 force,
1379 shell_pid: None,
1380 dir,
1381 autostop: daemon.autostop,
1382 cron_schedule: Some(schedule_str.clone()),
1383 cron_retrigger: Some(retrigger),
1384 retry: daemon.retry,
1385 retry_count: daemon.retry_count,
1386 ready_delay: daemon.ready_delay,
1387 ready_output: daemon.ready_output.clone(),
1388 ready_http: daemon.ready_http.clone(),
1389 ready_port: daemon.ready_port,
1390 wait_ready: false,
1391 depends: daemon.depends.clone(),
1392 };
1393 if let Err(e) = self.run(opts).await {
1394 error!("failed to run cron daemon {id}: {e}");
1395 }
1396 } else {
1397 warn!("no run command found for cron daemon {id}");
1398 }
1399 }
1400 }
1401 }
1402 }
1403
1404 Ok(())
1405 }
1406
1407 fn get_daemon_run_command(&self, id: &str) -> Option<String> {
1408 use crate::pitchfork_toml::PitchforkToml;
1409 let pt = PitchforkToml::all_merged();
1410 pt.daemons.get(id).map(|d| d.run.clone())
1411 }
1412
1413 async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
1414 loop {
1415 let (msg, send) = match ipc.read().await {
1416 Ok(msg) => msg,
1417 Err(e) => {
1418 error!("failed to accept connection: {:?}", e);
1419 continue;
1420 }
1421 };
1422 debug!("received message: {:?}", msg);
1423 tokio::spawn(async move {
1424 let rsp = SUPERVISOR
1425 .handle_ipc(msg)
1426 .await
1427 .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
1428 if let Err(err) = send.send(rsp).await {
1429 debug!("failed to send message: {:?}", err);
1430 }
1431 });
1432 }
1433 }
1434
1435 async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
1436 let rsp = match req {
1437 IpcRequest::Connect => {
1438 debug!("received connect message");
1439 IpcResponse::Ok
1440 }
1441 IpcRequest::Stop { id } => {
1442 if let Err(e) = validate_daemon_id(&id) {
1443 return Ok(IpcResponse::Error(e));
1444 }
1445 self.stop(&id).await?
1446 }
1447 IpcRequest::Run(opts) => {
1448 if let Err(e) = validate_daemon_id(&opts.id) {
1449 return Ok(IpcResponse::Error(e));
1450 }
1451 self.run(opts).await?
1452 }
1453 IpcRequest::Enable { id } => {
1454 if let Err(e) = validate_daemon_id(&id) {
1455 return Ok(IpcResponse::Error(e));
1456 }
1457 if self.enable(id).await? {
1458 IpcResponse::Yes
1459 } else {
1460 IpcResponse::No
1461 }
1462 }
1463 IpcRequest::Disable { id } => {
1464 if let Err(e) = validate_daemon_id(&id) {
1465 return Ok(IpcResponse::Error(e));
1466 }
1467 if self.disable(id).await? {
1468 IpcResponse::Yes
1469 } else {
1470 IpcResponse::No
1471 }
1472 }
1473 IpcRequest::GetActiveDaemons => {
1474 let daemons = self.active_daemons().await;
1475 IpcResponse::ActiveDaemons(daemons)
1476 }
1477 IpcRequest::GetNotifications => {
1478 let notifications = self.get_notifications().await;
1479 IpcResponse::Notifications(notifications)
1480 }
1481 IpcRequest::UpdateShellDir { shell_pid, dir } => {
1482 let prev = self.get_shell_dir(shell_pid).await;
1483 self.set_shell_dir(shell_pid, dir.clone()).await?;
1484 self.cancel_pending_autostops_for_dir(&dir).await;
1486 if let Some(prev) = prev {
1487 self.leave_dir(&prev).await?;
1488 }
1489 self.refresh().await?;
1490 IpcResponse::Ok
1491 }
1492 IpcRequest::Clean => {
1493 self.clean().await?;
1494 IpcResponse::Ok
1495 }
1496 IpcRequest::GetDisabledDaemons => {
1497 let disabled = self.state_file.lock().await.disabled.clone();
1498 IpcResponse::DisabledDaemons(disabled.into_iter().collect())
1499 }
1500 };
1501 Ok(rsp)
1502 }
1503
1504 async fn close(&self) {
1505 for daemon in self.active_daemons().await {
1506 if daemon.id == "pitchfork" {
1507 continue;
1508 }
1509 if let Err(err) = self.stop(&daemon.id).await {
1510 error!("failed to stop daemon {daemon}: {err}");
1511 }
1512 }
1513 let _ = self.remove_daemon("pitchfork").await;
1514
1515 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
1517 handle.shutdown();
1518 }
1519
1520 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
1521 }
1522
1523 async fn add_notification(&self, level: log::LevelFilter, message: String) {
1524 self.pending_notifications
1525 .lock()
1526 .await
1527 .push((level, message));
1528 }
1529
1530 async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
1531 self.pending_notifications.lock().await.drain(..).collect()
1532 }
1533
1534 async fn active_daemons(&self) -> Vec<Daemon> {
1535 self.state_file
1536 .lock()
1537 .await
1538 .daemons
1539 .values()
1540 .filter(|d| d.pid.is_some() && d.id != "pitchfork")
1541 .cloned()
1542 .collect()
1543 }
1544
1545 async fn remove_daemon(&self, id: &str) -> Result<()> {
1546 self.state_file.lock().await.daemons.remove(id);
1547 if let Err(err) = self.state_file.lock().await.write() {
1548 warn!("failed to update state file: {err:#}");
1549 }
1550 Ok(())
1551 }
1552
1553 async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
1554 info!(
1555 "upserting daemon: {} pid: {} status: {}",
1556 opts.id,
1557 opts.pid.unwrap_or(0),
1558 opts.status
1559 );
1560 let mut state_file = self.state_file.lock().await;
1561 let existing = state_file.daemons.get(&opts.id);
1562 let daemon = Daemon {
1563 id: opts.id.to_string(),
1564 title: opts.pid.and_then(|pid| PROCS.title(pid)),
1565 pid: opts.pid,
1566 status: opts.status,
1567 shell_pid: opts.shell_pid,
1568 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
1569 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
1570 cron_schedule: opts
1571 .cron_schedule
1572 .or(existing.and_then(|d| d.cron_schedule.clone())),
1573 cron_retrigger: opts
1574 .cron_retrigger
1575 .or(existing.and_then(|d| d.cron_retrigger)),
1576 last_cron_triggered: existing.and_then(|d| d.last_cron_triggered),
1577 last_exit_success: opts
1578 .last_exit_success
1579 .or(existing.and_then(|d| d.last_exit_success)),
1580 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
1581 retry_count: opts
1582 .retry_count
1583 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
1584 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
1585 ready_output: opts
1586 .ready_output
1587 .or(existing.and_then(|d| d.ready_output.clone())),
1588 ready_http: opts
1589 .ready_http
1590 .or(existing.and_then(|d| d.ready_http.clone())),
1591 ready_port: opts.ready_port.or(existing.and_then(|d| d.ready_port)),
1592 depends: opts
1593 .depends
1594 .unwrap_or_else(|| existing.map(|d| d.depends.clone()).unwrap_or_default()),
1595 };
1596 state_file
1597 .daemons
1598 .insert(opts.id.to_string(), daemon.clone());
1599 if let Err(err) = state_file.write() {
1600 warn!("failed to update state file: {err:#}");
1601 }
1602 Ok(daemon)
1603 }
1604
1605 pub async fn enable(&self, id: String) -> Result<bool> {
1606 info!("enabling daemon: {id}");
1607 let mut state_file = self.state_file.lock().await;
1608 let result = state_file.disabled.remove(&id);
1609 state_file.write()?;
1610 Ok(result)
1611 }
1612
1613 pub async fn disable(&self, id: String) -> Result<bool> {
1614 info!("disabling daemon: {id}");
1615 let mut state_file = self.state_file.lock().await;
1616 let result = state_file.disabled.insert(id);
1617 state_file.write()?;
1618 Ok(result)
1619 }
1620
1621 async fn get_daemon(&self, id: &str) -> Option<Daemon> {
1622 self.state_file.lock().await.daemons.get(id).cloned()
1623 }
1624
1625 async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
1626 let mut state_file = self.state_file.lock().await;
1627 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
1628 state_file.write()?;
1629 Ok(())
1630 }
1631
1632 async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
1633 self.state_file
1634 .lock()
1635 .await
1636 .shell_dirs
1637 .get(&shell_pid.to_string())
1638 .cloned()
1639 }
1640
1641 async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
1642 let mut state_file = self.state_file.lock().await;
1643 if state_file
1644 .shell_dirs
1645 .remove(&shell_pid.to_string())
1646 .is_some()
1647 {
1648 state_file.write()?;
1649 }
1650 Ok(())
1651 }
1652
1653 async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
1654 self.state_file.lock().await.shell_dirs.iter().fold(
1655 HashMap::new(),
1656 |mut acc, (pid, dir)| {
1657 if let Ok(pid) = pid.parse() {
1658 acc.entry(dir.clone()).or_default().push(pid);
1659 }
1660 acc
1661 },
1662 )
1663 }
1664
1665 async fn clean(&self) -> Result<()> {
1666 let mut state_file = self.state_file.lock().await;
1667 state_file.daemons.retain(|_id, d| d.pid.is_some());
1668 state_file.write()?;
1669 Ok(())
1670 }
1671}
1672
1673#[derive(Debug)]
1674struct UpsertDaemonOpts {
1675 id: String,
1676 pid: Option<u32>,
1677 status: DaemonStatus,
1678 shell_pid: Option<u32>,
1679 dir: Option<PathBuf>,
1680 autostop: bool,
1681 cron_schedule: Option<String>,
1682 cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1683 last_exit_success: Option<bool>,
1684 retry: Option<u32>,
1685 retry_count: Option<u32>,
1686 ready_delay: Option<u64>,
1687 ready_output: Option<String>,
1688 ready_http: Option<String>,
1689 ready_port: Option<u16>,
1690 depends: Option<Vec<String>>,
1691}
1692
1693impl Default for UpsertDaemonOpts {
1694 fn default() -> Self {
1695 Self {
1696 id: "".to_string(),
1697 pid: None,
1698 status: DaemonStatus::Stopped,
1699 shell_pid: None,
1700 dir: None,
1701 autostop: false,
1702 cron_schedule: None,
1703 cron_retrigger: None,
1704 last_exit_success: None,
1705 retry: None,
1706 retry_count: None,
1707 ready_delay: None,
1708 ready_output: None,
1709 ready_http: None,
1710 ready_port: None,
1711 depends: None,
1712 }
1713 }
1714}