1use crate::daemon::{Daemon, RunOptions};
2use crate::daemon_status::DaemonStatus;
3use crate::ipc::server::IpcServer;
4use crate::ipc::{IpcRequest, IpcResponse};
5use crate::procs::PROCS;
6use crate::state_file::StateFile;
7use crate::{env, Result};
8use duct::cmd;
9use itertools::Itertools;
10use log::LevelFilter::Info;
11use miette::IntoDiagnostic;
12use once_cell::sync::Lazy;
13use std::collections::HashMap;
14use std::fs;
15use std::iter::once;
16use std::path::{Path, PathBuf};
17use std::process::exit;
18use std::sync::atomic;
19use std::sync::atomic::AtomicBool;
20use std::time::Duration;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
22#[cfg(unix)]
23use tokio::signal::unix::SignalKind;
24use tokio::sync::oneshot;
25use tokio::sync::Mutex;
26use tokio::{select, signal, time};
27
28pub struct Supervisor {
29 state_file: Mutex<StateFile>,
30 pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
31 last_refreshed_at: Mutex<time::Instant>,
32}
33
34const INTERVAL: Duration = Duration::from_secs(10);
35
36pub static SUPERVISOR: Lazy<Supervisor> =
37 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
38
39pub fn start_if_not_running() -> Result<()> {
40 let sf = StateFile::get();
41 if let Some(d) = sf.daemons.get("pitchfork") {
42 if let Some(pid) = d.pid {
43 if PROCS.is_running(pid) {
44 return Ok(());
45 }
46 }
47 }
48 start_in_background()
49}
50
51pub fn start_in_background() -> Result<()> {
52 debug!("starting supervisor in background");
53 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
54 .stdout_null()
55 .stderr_null()
56 .start()
57 .into_diagnostic()?;
58 Ok(())
59}
60
61impl Supervisor {
62 pub fn new() -> Result<Self> {
63 Ok(Self {
64 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
65 last_refreshed_at: Mutex::new(time::Instant::now()),
66 pending_notifications: Mutex::new(vec![]),
67 })
68 }
69
70 pub async fn start(&self) -> Result<()> {
71 let pid = std::process::id();
72 info!("Starting supervisor with pid {pid}");
73
74 self.upsert_daemon(UpsertDaemonOpts {
75 id: "pitchfork".to_string(),
76 pid: Some(pid),
77 status: DaemonStatus::Running,
78 ..Default::default()
79 })
80 .await?;
81
82 self.interval_watch()?;
83 self.cron_watch()?;
84 self.signals()?;
85 let ipc = IpcServer::new()?;
88 self.conn_watch(ipc).await
89 }
90
91 async fn refresh(&self) -> Result<()> {
92 trace!("refreshing");
93 PROCS.refresh_processes();
94 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
95 *last_refreshed_at = time::Instant::now();
96
97 for (dir, pids) in self.get_dirs_with_shell_pids().await {
98 let to_remove = pids
99 .iter()
100 .filter(|pid| !PROCS.is_running(**pid))
101 .collect_vec();
102 for pid in &to_remove {
103 self.remove_shell_pid(**pid).await?
104 }
105 if to_remove.len() == pids.len() {
106 self.leave_dir(&dir).await?;
107 }
108 }
109
110 self.check_retry().await?;
111
112 Ok(())
113 }
114
115 async fn check_retry(&self) -> Result<()> {
116 let state_file = self.state_file.lock().await;
117 let daemons_to_retry: Vec<(String, Daemon)> = state_file
118 .daemons
119 .iter()
120 .filter(|(_id, d)| {
121 d.status.is_errored() && d.pid.is_none() && d.retry > 0 && d.retry_count < d.retry
123 })
124 .map(|(id, d)| (id.clone(), d.clone()))
125 .collect();
126 drop(state_file);
127
128 for (id, daemon) in daemons_to_retry {
129 info!(
130 "retrying daemon {} ({}/{} attempts)",
131 id,
132 daemon.retry_count + 1,
133 daemon.retry
134 );
135
136 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
138 let retry_opts = RunOptions {
139 id: id.clone(),
140 cmd: shell_words::split(&run_cmd).unwrap_or_default(),
141 force: false,
142 shell_pid: daemon.shell_pid,
143 dir: daemon.dir.unwrap_or_else(|| env::CWD.clone()),
144 autostop: daemon.autostop,
145 cron_schedule: daemon.cron_schedule,
146 cron_retrigger: daemon.cron_retrigger,
147 retry: daemon.retry,
148 retry_count: daemon.retry_count + 1,
149 ready_delay: daemon.ready_delay,
150 ready_output: daemon.ready_output.clone(),
151 wait_ready: false,
152 };
153 if let Err(e) = self.run(retry_opts).await {
154 error!("failed to retry daemon {}: {}", id, e);
155 }
156 } else {
157 warn!("no run command found for daemon {}, cannot retry", id);
158 self.upsert_daemon(UpsertDaemonOpts {
160 id,
161 retry_count: Some(daemon.retry),
162 ..Default::default()
163 })
164 .await?;
165 }
166 }
167
168 Ok(())
169 }
170
171 async fn leave_dir(&self, dir: &Path) -> Result<()> {
172 debug!("left dir {}", dir.display());
173 let shell_dirs = self.get_dirs_with_shell_pids().await;
174 let shell_dirs = shell_dirs.keys().collect_vec();
175 for daemon in self.active_daemons().await {
176 if !daemon.autostop {
177 continue;
178 }
179 if let Some(daemon_dir) = daemon.dir.as_ref() {
183 if daemon_dir.starts_with(dir)
184 && !shell_dirs.iter().any(|d| d.starts_with(daemon_dir))
185 {
186 info!("autostopping {daemon}");
187 self.stop(&daemon.id).await?;
188 self.add_notification(Info, format!("autostopped {daemon}"))
189 .await;
190 }
191 }
192 }
193 Ok(())
194 }
195
196 async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
197 let id = &opts.id;
198 let cmd = opts.cmd.clone();
199 let daemon = self.get_daemon(id).await;
200 if let Some(daemon) = daemon {
201 if !daemon.status.is_stopping() && !daemon.status.is_stopped() {
204 if let Some(pid) = daemon.pid {
205 if opts.force {
206 self.stop(id).await?;
207 info!("run: stop completed for daemon {id}");
208 } else {
209 warn!("daemon {id} already running with pid {pid}");
210 return Ok(IpcResponse::DaemonAlreadyRunning);
211 }
212 }
213 }
214 }
215
216 if opts.wait_ready && opts.retry > 0 {
218 let max_attempts = opts.retry + 1; for attempt in 0..max_attempts {
220 let mut retry_opts = opts.clone();
221 retry_opts.retry_count = attempt;
222 retry_opts.cmd = cmd.clone();
223
224 let result = self.run_once(retry_opts).await?;
225
226 match result {
227 IpcResponse::DaemonReady { daemon } => {
228 return Ok(IpcResponse::DaemonReady { daemon });
229 }
230 IpcResponse::DaemonFailedWithCode { exit_code } => {
231 if attempt < opts.retry {
232 let backoff_secs = 2u64.pow(attempt);
233 info!(
234 "daemon {id} failed (attempt {}/{}), retrying in {}s",
235 attempt + 1,
236 max_attempts,
237 backoff_secs
238 );
239 time::sleep(Duration::from_secs(backoff_secs)).await;
240 continue;
241 } else {
242 info!("daemon {id} failed after {} attempts", max_attempts);
243 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
244 }
245 }
246 other => return Ok(other),
247 }
248 }
249 }
250
251 self.run_once(opts).await
253 }
254
255 async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
256 let id = &opts.id;
257 let cmd = opts.cmd;
258
259 let (ready_tx, ready_rx) = if opts.wait_ready {
261 let (tx, rx) = oneshot::channel();
262 (Some(tx), Some(rx))
263 } else {
264 (None, None)
265 };
266
267 let cmd = once("exec".to_string())
268 .chain(cmd.into_iter())
269 .collect_vec();
270 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
271 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
272 xx::file::mkdirp(log_path.parent().unwrap())?;
273 info!("run: spawning daemon {id} with args: {args:?}");
274 let mut cmd = tokio::process::Command::new("sh");
275 cmd.args(&args)
276 .stdin(std::process::Stdio::null())
277 .stdout(std::process::Stdio::piped())
278 .stderr(std::process::Stdio::piped())
279 .current_dir(&opts.dir);
280
281 if let Some(ref path) = *env::ORIGINAL_PATH {
283 cmd.env("PATH", path);
284 }
285
286 let mut child = cmd.spawn().into_diagnostic()?;
287 let pid = child.id().unwrap();
288 info!("started daemon {id} with pid {pid}");
289 let daemon = self
290 .upsert_daemon(UpsertDaemonOpts {
291 id: id.to_string(),
292 pid: Some(pid),
293 status: DaemonStatus::Running,
294 shell_pid: opts.shell_pid,
295 dir: Some(opts.dir.clone()),
296 autostop: opts.autostop,
297 cron_schedule: opts.cron_schedule.clone(),
298 cron_retrigger: opts.cron_retrigger,
299 last_exit_success: None,
300 retry: Some(opts.retry),
301 retry_count: Some(opts.retry_count),
302 ready_delay: opts.ready_delay,
303 ready_output: opts.ready_output.clone(),
304 })
305 .await?;
306
307 let id_clone = id.to_string();
308 let ready_delay = opts.ready_delay;
309 let ready_output = opts.ready_output.clone();
310
311 tokio::spawn(async move {
312 let id = id_clone;
313 let stdout = child.stdout.take().unwrap();
314 let stderr = child.stderr.take().unwrap();
315 let mut stdout = tokio::io::BufReader::new(stdout).lines();
316 let mut stderr = tokio::io::BufReader::new(stderr).lines();
317 let mut log_appender = BufWriter::new(
318 tokio::fs::File::options()
319 .append(true)
320 .create(true)
321 .open(&log_path)
322 .await
323 .unwrap(),
324 );
325
326 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
327 let format_line = |line: String| {
328 if line.starts_with(&format!("{id} ")) {
329 format!("{} {line}\n", now())
331 } else {
332 format!("{} {id} {line}\n", now())
333 }
334 };
335
336 let mut ready_notified = false;
338 let mut ready_tx = ready_tx;
339 let ready_pattern =
340 ready_output
341 .as_ref()
342 .and_then(|pattern| match regex::Regex::new(pattern) {
343 Ok(re) => Some(re),
344 Err(e) => {
345 error!("invalid regex pattern for daemon {id}: {e}");
346 None
347 }
348 });
349
350 let mut delay_timer =
351 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
352
353 let (exit_tx, mut exit_rx) =
355 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
356
357 let child_pid = child.id().unwrap();
359 tokio::spawn(async move {
360 let result = child.wait().await;
361 debug!(
362 "daemon pid {child_pid} wait() completed with result: {:?}",
363 result
364 );
365 let _ = exit_tx.send(result).await;
366 });
367
368 let mut exit_status = None;
369
370 loop {
371 select! {
372 Ok(Some(line)) = stdout.next_line() => {
373 let formatted = format_line(line.clone());
374 log_appender.write_all(formatted.as_bytes()).await.unwrap();
375 log_appender.flush().await.unwrap();
376 trace!("stdout: {id} {formatted}");
377
378 if !ready_notified {
380 if let Some(ref pattern) = ready_pattern {
381 if pattern.is_match(&line) {
382 info!("daemon {id} ready: output matched pattern");
383 ready_notified = true;
384 if let Some(tx) = ready_tx.take() {
385 let _ = tx.send(Ok(()));
386 }
387 }
388 }
389 }
390 }
391 Ok(Some(line)) = stderr.next_line() => {
392 let formatted = format_line(line.clone());
393 log_appender.write_all(formatted.as_bytes()).await.unwrap();
394 log_appender.flush().await.unwrap();
395 trace!("stderr: {id} {formatted}");
396
397 if !ready_notified {
399 if let Some(ref pattern) = ready_pattern {
400 if pattern.is_match(&line) {
401 info!("daemon {id} ready: output matched pattern");
402 ready_notified = true;
403 if let Some(tx) = ready_tx.take() {
404 let _ = tx.send(Ok(()));
405 }
406 }
407 }
408 }
409 },
410 Some(result) = exit_rx.recv() => {
411 exit_status = Some(result);
413 debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
414 if !ready_notified {
415 if let Some(tx) = ready_tx.take() {
416 let exit_code = exit_status.as_ref().and_then(|r| r.as_ref().ok().and_then(|s| s.code()));
417 debug!("daemon {id} not ready yet, sending failure notification with exit_code: {:?}", exit_code);
418 let _ = tx.send(Err(exit_code));
419 }
420 } else {
421 debug!("daemon {id} was already marked ready, not sending failure notification");
422 }
423 break;
424 }
425 _ = async {
426 if let Some(ref mut timer) = delay_timer {
427 timer.await;
428 } else {
429 std::future::pending::<()>().await;
430 }
431 } => {
432 if !ready_notified && ready_pattern.is_none() {
433 info!("daemon {id} ready: delay elapsed");
434 ready_notified = true;
435 if let Some(tx) = ready_tx.take() {
436 let _ = tx.send(Ok(()));
437 }
438 }
439 delay_timer = None;
441 }
442 else => break,
443 }
444 }
445
446 let exit_status = if let Some(status) = exit_status {
448 status
449 } else {
450 match exit_rx.recv().await {
452 Some(status) => status,
453 None => {
454 warn!("daemon {id} exit channel closed without receiving status");
455 Err(std::io::Error::other("exit channel closed"))
456 }
457 }
458 };
459 let current_daemon = SUPERVISOR.get_daemon(&id).await;
460
461 if current_daemon.is_none()
463 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
464 {
465 return;
467 }
468 let current_daemon_clone = current_daemon.clone();
469 let is_stopping = current_daemon
470 .as_ref()
471 .is_some_and(|d| d.status.is_stopping());
472
473 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
474 return;
476 }
477 if let Ok(status) = exit_status {
478 info!("daemon {id} exited with status {status}");
479 if status.success() || is_stopping {
480 SUPERVISOR
483 .upsert_daemon(UpsertDaemonOpts {
484 id: id.clone(),
485 pid: None, status: DaemonStatus::Stopped,
487 last_exit_success: Some(status.success()),
488 ..Default::default()
489 })
490 .await
491 .unwrap();
492 } else {
493 SUPERVISOR
496 .upsert_daemon(UpsertDaemonOpts {
497 id: id.clone(),
498 pid: None,
499 status: DaemonStatus::Errored(status.code()),
500 last_exit_success: Some(false),
501 ..Default::default()
502 })
503 .await
504 .unwrap();
505 }
506 } else {
507 SUPERVISOR
508 .upsert_daemon(UpsertDaemonOpts {
509 id: id.clone(),
510 pid: None,
511 status: DaemonStatus::Errored(None),
512 last_exit_success: Some(false),
513 ..Default::default()
514 })
515 .await
516 .unwrap();
517 }
518 });
519
520 if let Some(ready_rx) = ready_rx {
522 match ready_rx.await {
523 Ok(Ok(())) => {
524 info!("daemon {id} is ready");
525 Ok(IpcResponse::DaemonReady { daemon })
526 }
527 Ok(Err(exit_code)) => {
528 error!("daemon {id} failed before becoming ready");
529 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
530 }
531 Err(_) => {
532 error!("readiness channel closed unexpectedly for daemon {id}");
533 Ok(IpcResponse::DaemonStart { daemon })
534 }
535 }
536 } else {
537 Ok(IpcResponse::DaemonStart { daemon })
538 }
539 }
540
541 async fn stop(&self, id: &str) -> Result<IpcResponse> {
542 info!("stopping daemon: {id}");
543 if let Some(daemon) = self.get_daemon(id).await {
544 trace!("daemon to stop: {daemon}");
545 if let Some(pid) = daemon.pid {
546 trace!("killing pid: {pid}");
547 PROCS.refresh_processes();
548 if PROCS.is_running(pid) {
549 self.upsert_daemon(UpsertDaemonOpts {
551 id: id.to_string(),
552 status: DaemonStatus::Stopping,
553 ..Default::default()
554 })
555 .await?;
556
557 if let Err(e) = PROCS.kill_async(pid).await {
559 warn!("failed to kill pid {pid}: {e}");
560 }
561 PROCS.refresh_processes();
562 for child_pid in PROCS.all_children(pid) {
563 debug!("killing child pid: {child_pid}");
564 if let Err(e) = PROCS.kill_async(child_pid).await {
565 warn!("failed to kill child pid {child_pid}: {e}");
566 }
567 }
568 } else {
570 debug!("pid {pid} not running");
571 self.upsert_daemon(UpsertDaemonOpts {
573 id: id.to_string(),
574 pid: None,
575 status: DaemonStatus::Stopped,
576 ..Default::default()
577 })
578 .await?;
579 }
580 return Ok(IpcResponse::Ok);
581 } else {
582 debug!("daemon {id} not running");
583 }
584 } else {
585 debug!("daemon {id} not found");
586 }
587 Ok(IpcResponse::DaemonAlreadyStopped)
588 }
589
590 #[cfg(unix)]
591 fn signals(&self) -> Result<()> {
592 let signals = [
593 SignalKind::terminate(),
594 SignalKind::alarm(),
595 SignalKind::interrupt(),
596 SignalKind::quit(),
597 SignalKind::hangup(),
598 SignalKind::user_defined1(),
599 SignalKind::user_defined2(),
600 ];
601 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
602 for signal in signals {
603 tokio::spawn(async move {
604 let mut stream = signal::unix::signal(signal).unwrap();
605 loop {
606 stream.recv().await;
607 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
608 exit(1);
609 } else {
610 SUPERVISOR.handle_signal().await;
611 }
612 }
613 });
614 }
615 Ok(())
616 }
617
618 #[cfg(windows)]
619 fn signals(&self) -> Result<()> {
620 tokio::spawn(async move {
621 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
622 loop {
623 signal::ctrl_c().await.unwrap();
624 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
625 exit(1);
626 } else {
627 SUPERVISOR.handle_signal().await;
628 }
629 }
630 });
631 Ok(())
632 }
633
634 async fn handle_signal(&self) {
635 info!("received signal, stopping");
636 self.close().await;
637 exit(0)
638 }
639
640 fn interval_watch(&self) -> Result<()> {
667 tokio::spawn(async move {
668 let mut interval = time::interval(INTERVAL);
669 loop {
670 interval.tick().await;
671 if SUPERVISOR.last_refreshed_at.lock().await.elapsed() > INTERVAL {
672 if let Err(err) = SUPERVISOR.refresh().await {
673 error!("failed to refresh: {err}");
674 }
675 }
676 }
677 });
678 Ok(())
679 }
680
681 fn cron_watch(&self) -> Result<()> {
682 tokio::spawn(async move {
683 let mut interval = time::interval(Duration::from_secs(60));
686 loop {
687 interval.tick().await;
688 if let Err(err) = SUPERVISOR.check_cron_schedules().await {
689 error!("failed to check cron schedules: {err}");
690 }
691 }
692 });
693 Ok(())
694 }
695
696 async fn check_cron_schedules(&self) -> Result<()> {
697 use cron::Schedule;
698 use std::str::FromStr;
699
700 let now = chrono::Local::now();
701 let daemons = self.state_file.lock().await.daemons.clone();
702
703 for (id, daemon) in daemons {
704 if let Some(schedule_str) = &daemon.cron_schedule {
705 if let Some(retrigger) = daemon.cron_retrigger {
706 let schedule = match Schedule::from_str(schedule_str) {
708 Ok(s) => s,
709 Err(e) => {
710 warn!("invalid cron schedule for daemon {id}: {e}");
711 continue;
712 }
713 };
714
715 let should_trigger = schedule.upcoming(chrono::Local).take(1).any(|next| {
717 let diff = next.signed_duration_since(now);
719 diff.num_seconds() < 60 && diff.num_seconds() >= 0
720 });
721
722 if should_trigger {
723 let should_run = match retrigger {
724 crate::pitchfork_toml::CronRetrigger::Finish => {
725 daemon.pid.is_none()
727 }
728 crate::pitchfork_toml::CronRetrigger::Always => {
729 true
731 }
732 crate::pitchfork_toml::CronRetrigger::Success => {
733 daemon.pid.is_none() && daemon.last_exit_success.unwrap_or(false)
735 }
736 crate::pitchfork_toml::CronRetrigger::Fail => {
737 daemon.pid.is_none() && !daemon.last_exit_success.unwrap_or(true)
739 }
740 };
741
742 if should_run {
743 info!("cron: triggering daemon {id} (retrigger: {retrigger:?})");
744 if let Some(run_cmd) = self.get_daemon_run_command(&id) {
746 let dir = daemon.dir.clone().unwrap_or_else(|| env::CWD.clone());
747 let force = matches!(
749 retrigger,
750 crate::pitchfork_toml::CronRetrigger::Always
751 );
752 let opts = RunOptions {
753 id: id.clone(),
754 cmd: shell_words::split(&run_cmd).unwrap_or_default(),
755 force,
756 shell_pid: None,
757 dir,
758 autostop: daemon.autostop,
759 cron_schedule: Some(schedule_str.clone()),
760 cron_retrigger: Some(retrigger),
761 retry: daemon.retry,
762 retry_count: daemon.retry_count,
763 ready_delay: daemon.ready_delay,
764 ready_output: daemon.ready_output.clone(),
765 wait_ready: false,
766 };
767 if let Err(e) = self.run(opts).await {
768 error!("failed to run cron daemon {id}: {e}");
769 }
770 } else {
771 warn!("no run command found for cron daemon {id}");
772 }
773 }
774 }
775 }
776 }
777 }
778
779 Ok(())
780 }
781
782 fn get_daemon_run_command(&self, id: &str) -> Option<String> {
783 use crate::pitchfork_toml::PitchforkToml;
784 let pt = PitchforkToml::all_merged();
785 pt.daemons.get(id).map(|d| d.run.clone())
786 }
787
788 async fn conn_watch(&self, mut ipc: IpcServer) -> ! {
789 loop {
790 let (msg, send) = match ipc.read().await {
791 Ok(msg) => msg,
792 Err(e) => {
793 error!("failed to accept connection: {:?}", e);
794 continue;
795 }
796 };
797 debug!("received message: {:?}", msg);
798 tokio::spawn(async move {
799 let rsp = SUPERVISOR
800 .handle_ipc(msg)
801 .await
802 .unwrap_or_else(|err| IpcResponse::Error(err.to_string()));
803 if let Err(err) = send.send(rsp).await {
804 debug!("failed to send message: {:?}", err);
805 }
806 });
807 }
808 }
809
810 async fn handle_ipc(&self, req: IpcRequest) -> Result<IpcResponse> {
811 let rsp = match req {
812 IpcRequest::Connect => {
813 debug!("received connect message");
814 IpcResponse::Ok
815 }
816 IpcRequest::Stop { id } => self.stop(&id).await?,
817 IpcRequest::Run(opts) => self.run(opts).await?,
818 IpcRequest::Enable { id } => {
819 if self.enable(id).await? {
820 IpcResponse::Yes
821 } else {
822 IpcResponse::No
823 }
824 }
825 IpcRequest::Disable { id } => {
826 if self.disable(id).await? {
827 IpcResponse::Yes
828 } else {
829 IpcResponse::No
830 }
831 }
832 IpcRequest::GetActiveDaemons => {
833 let daemons = self.active_daemons().await;
834 IpcResponse::ActiveDaemons(daemons)
835 }
836 IpcRequest::GetNotifications => {
837 let notifications = self.get_notifications().await;
838 IpcResponse::Notifications(notifications)
839 }
840 IpcRequest::UpdateShellDir { shell_pid, dir } => {
841 let prev = self.get_shell_dir(shell_pid).await;
842 self.set_shell_dir(shell_pid, dir).await?;
843 if let Some(prev) = prev {
844 self.leave_dir(&prev).await?;
845 }
846 self.refresh().await?;
847 IpcResponse::Ok
848 }
849 IpcRequest::Clean => {
850 self.clean().await?;
851 IpcResponse::Ok
852 }
853 IpcRequest::GetDisabledDaemons => {
854 let disabled = self.state_file.lock().await.disabled.clone();
855 IpcResponse::DisabledDaemons(disabled.into_iter().collect())
856 }
857 };
858 Ok(rsp)
859 }
860
861 async fn close(&self) {
862 for daemon in self.active_daemons().await {
863 if daemon.id == "pitchfork" {
864 continue;
865 }
866 if let Err(err) = self.stop(&daemon.id).await {
867 error!("failed to stop daemon {daemon}: {err}");
868 }
869 }
870 let _ = self.remove_daemon("pitchfork").await;
871 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
872 }
874
875 async fn add_notification(&self, level: log::LevelFilter, message: String) {
876 self.pending_notifications
877 .lock()
878 .await
879 .push((level, message));
880 }
881
882 async fn get_notifications(&self) -> Vec<(log::LevelFilter, String)> {
883 self.pending_notifications.lock().await.drain(..).collect()
884 }
885
886 async fn active_daemons(&self) -> Vec<Daemon> {
887 self.state_file
888 .lock()
889 .await
890 .daemons
891 .values()
892 .filter(|d| d.pid.is_some())
893 .cloned()
894 .collect()
895 }
896
897 async fn remove_daemon(&self, id: &str) -> Result<()> {
898 self.state_file.lock().await.daemons.remove(id);
899 if let Err(err) = self.state_file.lock().await.write() {
900 warn!("failed to update state file: {err:#}");
901 }
902 Ok(())
903 }
904
905 async fn upsert_daemon(&self, opts: UpsertDaemonOpts) -> Result<Daemon> {
906 info!(
907 "upserting daemon: {} pid: {} status: {}",
908 opts.id,
909 opts.pid.unwrap_or(0),
910 opts.status
911 );
912 let mut state_file = self.state_file.lock().await;
913 let existing = state_file.daemons.get(&opts.id);
914 let daemon = Daemon {
915 id: opts.id.to_string(),
916 title: opts.pid.and_then(|pid| PROCS.title(pid)),
917 pid: opts.pid,
918 status: opts.status,
919 shell_pid: opts.shell_pid,
920 autostop: opts.autostop || existing.is_some_and(|d| d.autostop),
921 dir: opts.dir.or(existing.and_then(|d| d.dir.clone())),
922 cron_schedule: opts
923 .cron_schedule
924 .or(existing.and_then(|d| d.cron_schedule.clone())),
925 cron_retrigger: opts
926 .cron_retrigger
927 .or(existing.and_then(|d| d.cron_retrigger)),
928 last_exit_success: opts
929 .last_exit_success
930 .or(existing.and_then(|d| d.last_exit_success)),
931 retry: opts.retry.unwrap_or(existing.map(|d| d.retry).unwrap_or(0)),
932 retry_count: opts
933 .retry_count
934 .unwrap_or(existing.map(|d| d.retry_count).unwrap_or(0)),
935 ready_delay: opts.ready_delay.or(existing.and_then(|d| d.ready_delay)),
936 ready_output: opts
937 .ready_output
938 .or(existing.and_then(|d| d.ready_output.clone())),
939 };
940 state_file
941 .daemons
942 .insert(opts.id.to_string(), daemon.clone());
943 if let Err(err) = state_file.write() {
944 warn!("failed to update state file: {err:#}");
945 }
946 Ok(daemon)
947 }
948
949 async fn enable(&self, id: String) -> Result<bool> {
950 info!("enabling daemon: {id}");
951 let mut state_file = self.state_file.lock().await;
952 let result = state_file.disabled.remove(&id);
953 state_file.write()?;
954 Ok(result)
955 }
956
957 async fn disable(&self, id: String) -> Result<bool> {
958 info!("disabling daemon: {id}");
959 let mut state_file = self.state_file.lock().await;
960 let result = state_file.disabled.insert(id);
961 state_file.write()?;
962 Ok(result)
963 }
964
965 async fn get_daemon(&self, id: &str) -> Option<Daemon> {
966 self.state_file.lock().await.daemons.get(id).cloned()
967 }
968
969 async fn set_shell_dir(&self, shell_pid: u32, dir: PathBuf) -> Result<()> {
970 let mut state_file = self.state_file.lock().await;
971 state_file.shell_dirs.insert(shell_pid.to_string(), dir);
972 state_file.write()?;
973 Ok(())
974 }
975
976 async fn get_shell_dir(&self, shell_pid: u32) -> Option<PathBuf> {
977 self.state_file
978 .lock()
979 .await
980 .shell_dirs
981 .get(&shell_pid.to_string())
982 .cloned()
983 }
984
985 async fn remove_shell_pid(&self, shell_pid: u32) -> Result<()> {
986 let mut state_file = self.state_file.lock().await;
987 if state_file
988 .shell_dirs
989 .remove(&shell_pid.to_string())
990 .is_some()
991 {
992 state_file.write()?;
993 }
994 Ok(())
995 }
996
997 async fn get_dirs_with_shell_pids(&self) -> HashMap<PathBuf, Vec<u32>> {
998 self.state_file.lock().await.shell_dirs.iter().fold(
999 HashMap::new(),
1000 |mut acc, (pid, dir)| {
1001 acc.entry(dir.clone())
1002 .or_default()
1003 .push(pid.parse().unwrap());
1004 acc
1005 },
1006 )
1007 }
1008
1009 async fn clean(&self) -> Result<()> {
1010 let mut state_file = self.state_file.lock().await;
1011 state_file.daemons.retain(|_id, d| d.pid.is_some());
1012 state_file.write()?;
1013 Ok(())
1014 }
1015}
1016
1017#[derive(Debug)]
1018struct UpsertDaemonOpts {
1019 id: String,
1020 pid: Option<u32>,
1021 status: DaemonStatus,
1022 shell_pid: Option<u32>,
1023 dir: Option<PathBuf>,
1024 autostop: bool,
1025 cron_schedule: Option<String>,
1026 cron_retrigger: Option<crate::pitchfork_toml::CronRetrigger>,
1027 last_exit_success: Option<bool>,
1028 retry: Option<u32>,
1029 retry_count: Option<u32>,
1030 ready_delay: Option<u64>,
1031 ready_output: Option<String>,
1032}
1033
1034impl Default for UpsertDaemonOpts {
1035 fn default() -> Self {
1036 Self {
1037 id: "".to_string(),
1038 pid: None,
1039 status: DaemonStatus::Stopped,
1040 shell_pid: None,
1041 dir: None,
1042 autostop: false,
1043 cron_schedule: None,
1044 cron_retrigger: None,
1045 last_exit_success: None,
1046 retry: None,
1047 retry_count: None,
1048 ready_delay: None,
1049 ready_output: None,
1050 }
1051 }
1052}