pitchfork_cli/supervisor/
lifecycle.rs1use super::{SUPERVISOR, Supervisor, UpsertDaemonOpts};
6use crate::daemon::RunOptions;
7use crate::daemon_status::DaemonStatus;
8use crate::ipc::IpcResponse;
9use crate::procs::PROCS;
10use crate::shell::Shell;
11use crate::{Result, env};
12use itertools::Itertools;
13use miette::IntoDiagnostic;
14use once_cell::sync::Lazy;
15use regex::Regex;
16use std::collections::HashMap;
17use std::iter::once;
18use std::time::Duration;
19use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
20use tokio::select;
21use tokio::sync::oneshot;
22use tokio::time;
23
24static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
26 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
27
28pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
30 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
31 if let Some(re) = cache.get(pattern) {
32 return Some(re.clone());
33 }
34 match Regex::new(pattern) {
35 Ok(re) => {
36 cache.insert(pattern.to_string(), re.clone());
37 Some(re)
38 }
39 Err(e) => {
40 error!("invalid regex pattern '{pattern}': {e}");
41 None
42 }
43 }
44}
45
46impl Supervisor {
47 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
49 let id = &opts.id;
50 let cmd = opts.cmd.clone();
51
52 {
54 let mut pending = self.pending_autostops.lock().await;
55 if pending.remove(id).is_some() {
56 info!("cleared pending autostop for {id} (daemon starting)");
57 }
58 }
59
60 let daemon = self.get_daemon(id).await;
61 if let Some(daemon) = daemon {
62 if !daemon.status.is_stopping()
65 && !daemon.status.is_stopped()
66 && let Some(pid) = daemon.pid
67 {
68 if opts.force {
69 self.stop(id).await?;
70 info!("run: stop completed for daemon {id}");
71 } else {
72 warn!("daemon {id} already running with pid {pid}");
73 return Ok(IpcResponse::DaemonAlreadyRunning);
74 }
75 }
76 }
77
78 if opts.wait_ready && opts.retry > 0 {
80 let max_attempts = opts.retry.saturating_add(1);
82 for attempt in 0..max_attempts {
83 let mut retry_opts = opts.clone();
84 retry_opts.retry_count = attempt;
85 retry_opts.cmd = cmd.clone();
86
87 let result = self.run_once(retry_opts).await?;
88
89 match result {
90 IpcResponse::DaemonReady { daemon } => {
91 return Ok(IpcResponse::DaemonReady { daemon });
92 }
93 IpcResponse::DaemonFailedWithCode { exit_code } => {
94 if attempt < opts.retry {
95 let backoff_secs = 2u64.pow(attempt);
96 info!(
97 "daemon {id} failed (attempt {}/{}), retrying in {}s",
98 attempt + 1,
99 max_attempts,
100 backoff_secs
101 );
102 time::sleep(Duration::from_secs(backoff_secs)).await;
103 continue;
104 } else {
105 info!("daemon {id} failed after {max_attempts} attempts");
106 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
107 }
108 }
109 other => return Ok(other),
110 }
111 }
112 }
113
114 self.run_once(opts).await
116 }
117
118 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
120 let id = &opts.id;
121 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
123
124 let (ready_tx, ready_rx) = if opts.wait_ready {
126 let (tx, rx) = oneshot::channel();
127 (Some(tx), Some(rx))
128 } else {
129 (None, None)
130 };
131
132 let cmd = once("exec".to_string())
133 .chain(cmd.into_iter())
134 .collect_vec();
135 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
136 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
137 if let Some(parent) = log_path.parent() {
138 xx::file::mkdirp(parent)?;
139 }
140 info!("run: spawning daemon {id} with args: {args:?}");
141 let mut cmd = tokio::process::Command::new("sh");
142 cmd.args(&args)
143 .stdin(std::process::Stdio::null())
144 .stdout(std::process::Stdio::piped())
145 .stderr(std::process::Stdio::piped())
146 .current_dir(&opts.dir);
147
148 if let Some(ref path) = *env::ORIGINAL_PATH {
150 cmd.env("PATH", path);
151 }
152
153 if let Some(ref env_vars) = opts.env {
155 cmd.envs(env_vars);
156 }
157
158 let mut child = cmd.spawn().into_diagnostic()?;
159 let pid = match child.id() {
160 Some(p) => p,
161 None => {
162 warn!("Daemon {id} exited before PID could be captured");
163 return Ok(IpcResponse::DaemonFailed {
164 error: "Process exited immediately".to_string(),
165 });
166 }
167 };
168 info!("started daemon {id} with pid {pid}");
169 let daemon = self
170 .upsert_daemon(UpsertDaemonOpts {
171 id: id.to_string(),
172 pid: Some(pid),
173 status: DaemonStatus::Running,
174 shell_pid: opts.shell_pid,
175 dir: Some(opts.dir.clone()),
176 cmd: Some(original_cmd),
177 autostop: opts.autostop,
178 cron_schedule: opts.cron_schedule.clone(),
179 cron_retrigger: opts.cron_retrigger,
180 last_exit_success: None,
181 retry: Some(opts.retry),
182 retry_count: Some(opts.retry_count),
183 ready_delay: opts.ready_delay,
184 ready_output: opts.ready_output.clone(),
185 ready_http: opts.ready_http.clone(),
186 ready_port: opts.ready_port,
187 ready_cmd: opts.ready_cmd.clone(),
188 depends: Some(opts.depends.clone()),
189 env: opts.env.clone(),
190 })
191 .await?;
192
193 let id_clone = id.to_string();
194 let ready_delay = opts.ready_delay;
195 let ready_output = opts.ready_output.clone();
196 let ready_http = opts.ready_http.clone();
197 let ready_port = opts.ready_port;
198 let ready_cmd = opts.ready_cmd.clone();
199
200 tokio::spawn(async move {
201 let id = id_clone;
202 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
203 (Some(out), Some(err)) => (out, err),
204 _ => {
205 error!("Failed to capture stdout/stderr for daemon {id}");
206 return;
207 }
208 };
209 let mut stdout = tokio::io::BufReader::new(stdout).lines();
210 let mut stderr = tokio::io::BufReader::new(stderr).lines();
211 let log_file = match tokio::fs::File::options()
212 .append(true)
213 .create(true)
214 .open(&log_path)
215 .await
216 {
217 Ok(f) => f,
218 Err(e) => {
219 error!("Failed to open log file for daemon {id}: {e}");
220 return;
221 }
222 };
223 let mut log_appender = BufWriter::new(log_file);
224
225 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
226 let format_line = |line: String| {
227 if line.starts_with(&format!("{id} ")) {
228 format!("{} {line}\n", now())
230 } else {
231 format!("{} {id} {line}\n", now())
232 }
233 };
234
235 let mut ready_notified = false;
237 let mut ready_tx = ready_tx;
238 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
239
240 let mut delay_timer =
241 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
242
243 let mut http_check_interval = ready_http
245 .as_ref()
246 .map(|_| tokio::time::interval(Duration::from_millis(500)));
247 let http_client = ready_http.as_ref().map(|_| {
248 reqwest::Client::builder()
249 .timeout(Duration::from_secs(5))
250 .build()
251 .unwrap_or_default()
252 });
253
254 let mut port_check_interval =
256 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
257
258 let mut cmd_check_interval = ready_cmd
260 .as_ref()
261 .map(|_| tokio::time::interval(Duration::from_millis(500)));
262
263 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
265
266 let (exit_tx, mut exit_rx) =
268 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
269
270 let child_pid = child.id().unwrap_or(0);
272 tokio::spawn(async move {
273 let result = child.wait().await;
274 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
275 let _ = exit_tx.send(result).await;
276 });
277
278 #[allow(unused_assignments)]
279 let mut exit_status = None;
281
282 loop {
283 select! {
284 Ok(Some(line)) = stdout.next_line() => {
285 let formatted = format_line(line.clone());
286 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
287 error!("Failed to write to log for daemon {id}: {e}");
288 }
289 trace!("stdout: {id} {formatted}");
290
291 if !ready_notified
293 && let Some(ref pattern) = ready_pattern
294 && pattern.is_match(&line) {
295 info!("daemon {id} ready: output matched pattern");
296 ready_notified = true;
297 let _ = log_appender.flush().await;
299 if let Some(tx) = ready_tx.take() {
300 let _ = tx.send(Ok(()));
301 }
302 }
303 }
304 Ok(Some(line)) = stderr.next_line() => {
305 let formatted = format_line(line.clone());
306 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
307 error!("Failed to write to log for daemon {id}: {e}");
308 }
309 trace!("stderr: {id} {formatted}");
310
311 if !ready_notified
313 && let Some(ref pattern) = ready_pattern
314 && pattern.is_match(&line) {
315 info!("daemon {id} ready: output matched pattern");
316 ready_notified = true;
317 let _ = log_appender.flush().await;
319 if let Some(tx) = ready_tx.take() {
320 let _ = tx.send(Ok(()));
321 }
322 }
323 },
324 Some(result) = exit_rx.recv() => {
325 exit_status = Some(result);
327 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
328 let _ = log_appender.flush().await;
330 if !ready_notified {
331 if let Some(tx) = ready_tx.take() {
332 let is_success = exit_status.as_ref()
334 .and_then(|r| r.as_ref().ok())
335 .map(|s| s.success())
336 .unwrap_or(false);
337
338 if is_success {
339 debug!("daemon {id} exited successfully before ready check, sending success notification");
340 let _ = tx.send(Ok(()));
341 } else {
342 let exit_code = exit_status.as_ref()
343 .and_then(|r| r.as_ref().ok())
344 .and_then(|s| s.code());
345 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
346 let _ = tx.send(Err(exit_code));
347 }
348 }
349 } else {
350 debug!("daemon {id} was already marked ready, not sending notification");
351 }
352 break;
353 }
354 _ = async {
355 if let Some(ref mut interval) = http_check_interval {
356 interval.tick().await;
357 } else {
358 std::future::pending::<()>().await;
359 }
360 }, if !ready_notified && ready_http.is_some() => {
361 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
362 match client.get(url).send().await {
363 Ok(response) if response.status().is_success() => {
364 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
365 ready_notified = true;
366 let _ = log_appender.flush().await;
368 if let Some(tx) = ready_tx.take() {
369 let _ = tx.send(Ok(()));
370 }
371 http_check_interval = None;
373 }
374 Ok(response) => {
375 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
376 }
377 Err(e) => {
378 trace!("daemon {id} HTTP check failed: {e}");
379 }
380 }
381 }
382 }
383 _ = async {
384 if let Some(ref mut interval) = port_check_interval {
385 interval.tick().await;
386 } else {
387 std::future::pending::<()>().await;
388 }
389 }, if !ready_notified && ready_port.is_some() => {
390 if let Some(port) = ready_port {
391 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
392 Ok(_) => {
393 info!("daemon {id} ready: TCP port {port} is listening");
394 ready_notified = true;
395 let _ = log_appender.flush().await;
397 if let Some(tx) = ready_tx.take() {
398 let _ = tx.send(Ok(()));
399 }
400 port_check_interval = None;
402 }
403 Err(_) => {
404 trace!("daemon {id} port check: port {port} not listening yet");
405 }
406 }
407 }
408 }
409 _ = async {
410 if let Some(ref mut interval) = cmd_check_interval {
411 interval.tick().await;
412 } else {
413 std::future::pending::<()>().await;
414 }
415 }, if !ready_notified && ready_cmd.is_some() => {
416 if let Some(ref cmd) = ready_cmd {
417 let mut command = Shell::default_for_platform().command(cmd);
419 command
420 .stdout(std::process::Stdio::null())
421 .stderr(std::process::Stdio::null());
422 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
423 match result {
424 Ok(status) if status.success() => {
425 info!("daemon {id} ready: readiness command succeeded");
426 ready_notified = true;
427 let _ = log_appender.flush().await;
428 if let Some(tx) = ready_tx.take() {
429 let _ = tx.send(Ok(()));
430 }
431 cmd_check_interval = None;
433 }
434 Ok(_) => {
435 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
436 }
437 Err(e) => {
438 trace!("daemon {id} cmd check failed: {e}");
439 }
440 }
441 }
442 }
443 _ = async {
444 if let Some(ref mut timer) = delay_timer {
445 timer.await;
446 } else {
447 std::future::pending::<()>().await;
448 }
449 } => {
450 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
451 info!("daemon {id} ready: delay elapsed");
452 ready_notified = true;
453 let _ = log_appender.flush().await;
455 if let Some(tx) = ready_tx.take() {
456 let _ = tx.send(Ok(()));
457 }
458 }
459 delay_timer = None;
461 }
462 _ = log_flush_interval.tick() => {
463 if let Err(e) = log_appender.flush().await {
465 error!("Failed to flush log for daemon {id}: {e}");
466 }
467 }
468 }
471 }
472
473 if let Err(e) = log_appender.flush().await {
475 error!("Failed to final flush log for daemon {id}: {e}");
476 }
477
478 let exit_status = if let Some(status) = exit_status {
480 status
481 } else {
482 match exit_rx.recv().await {
484 Some(status) => status,
485 None => {
486 warn!("daemon {id} exit channel closed without receiving status");
487 Err(std::io::Error::other("exit channel closed"))
488 }
489 }
490 };
491 let current_daemon = SUPERVISOR.get_daemon(&id).await;
492
493 if current_daemon.is_none()
495 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
496 {
497 return;
499 }
500 let is_stopping = current_daemon
501 .as_ref()
502 .is_some_and(|d| d.status.is_stopping());
503
504 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
505 return;
507 }
508 if let Ok(status) = exit_status {
509 info!("daemon {id} exited with status {status}");
510 if status.success() || is_stopping {
511 if let Err(e) = SUPERVISOR
514 .upsert_daemon(UpsertDaemonOpts {
515 id: id.clone(),
516 pid: None, status: DaemonStatus::Stopped,
518 last_exit_success: Some(status.success()),
519 ..Default::default()
520 })
521 .await
522 {
523 error!("Failed to update daemon state for {id}: {e}");
524 }
525 } else {
526 let status = match status.code() {
529 Some(code) => DaemonStatus::Errored(code),
530 None => DaemonStatus::Errored(-1),
531 };
532 if let Err(e) = SUPERVISOR
533 .upsert_daemon(UpsertDaemonOpts {
534 id: id.clone(),
535 pid: None,
536 status,
537 last_exit_success: Some(false),
538 ..Default::default()
539 })
540 .await
541 {
542 error!("Failed to update daemon state for {id}: {e}");
543 }
544 }
545 } else if is_stopping {
546 if let Err(e) = SUPERVISOR
549 .upsert_daemon(UpsertDaemonOpts {
550 id: id.clone(),
551 pid: None,
552 status: DaemonStatus::Stopped,
553 last_exit_success: Some(true),
554 ..Default::default()
555 })
556 .await
557 {
558 error!("Failed to update daemon state for {id}: {e}");
559 }
560 } else if let Err(e) = SUPERVISOR
561 .upsert_daemon(UpsertDaemonOpts {
562 id: id.clone(),
563 pid: None,
564 status: DaemonStatus::Errored(-1),
565 last_exit_success: Some(false),
566 ..Default::default()
567 })
568 .await
569 {
570 error!("Failed to update daemon state for {id}: {e}");
571 }
572 });
573
574 if let Some(ready_rx) = ready_rx {
576 match ready_rx.await {
577 Ok(Ok(())) => {
578 info!("daemon {id} is ready");
579 Ok(IpcResponse::DaemonReady { daemon })
580 }
581 Ok(Err(exit_code)) => {
582 error!("daemon {id} failed before becoming ready");
583 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
584 }
585 Err(_) => {
586 error!("readiness channel closed unexpectedly for daemon {id}");
587 Ok(IpcResponse::DaemonStart { daemon })
588 }
589 }
590 } else {
591 Ok(IpcResponse::DaemonStart { daemon })
592 }
593 }
594
595 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
597 if id == "pitchfork" {
598 return Ok(IpcResponse::Error(
599 "Cannot stop supervisor via stop command".into(),
600 ));
601 }
602 info!("stopping daemon: {id}");
603 if let Some(daemon) = self.get_daemon(id).await {
604 trace!("daemon to stop: {daemon}");
605 if let Some(pid) = daemon.pid {
606 trace!("killing pid: {pid}");
607 PROCS.refresh_processes();
608 if PROCS.is_running(pid) {
609 self.upsert_daemon(UpsertDaemonOpts {
611 id: id.to_string(),
612 pid: Some(pid),
613 status: DaemonStatus::Stopping,
614 ..Default::default()
615 })
616 .await?;
617
618 for child_pid in PROCS.all_children(pid) {
620 debug!("killing child pid: {child_pid}");
621 if let Err(e) = PROCS.kill_async(child_pid).await {
622 warn!("failed to kill child pid {child_pid}: {e}");
623 }
624 }
625
626 if let Err(e) = PROCS.kill_async(pid).await {
628 debug!("failed to kill pid {pid}: {e}");
629 PROCS.refresh_processes();
631 if PROCS.is_running(pid) {
632 debug!("failed to stop pid {pid}: process still running after kill");
634 self.upsert_daemon(UpsertDaemonOpts {
635 id: id.to_string(),
636 pid: Some(pid), status: DaemonStatus::Running,
638 ..Default::default()
639 })
640 .await?;
641 return Ok(IpcResponse::DaemonStopFailed {
642 error: format!(
643 "process {pid} still running after kill attempt: {e}"
644 ),
645 });
646 }
647 }
648
649 let mut process_terminated = false;
653 for i in 0..10 {
654 PROCS.refresh_pids(&[pid]);
655 if !PROCS.is_running(pid) {
656 process_terminated = true;
657 break;
658 }
659 if i < 9 {
660 debug!(
661 "waiting for process {pid} to fully terminate ({}/10)",
662 i + 1
663 );
664 time::sleep(Duration::from_millis(50)).await;
665 }
666 }
667
668 if !process_terminated {
669 warn!(
670 "Process {pid} for daemon {id} did not terminate within 500ms after SIGTERM. \
671 The process may take longer to release resources."
672 );
673 }
674
675 self.upsert_daemon(UpsertDaemonOpts {
677 id: id.to_string(),
678 pid: None,
679 status: DaemonStatus::Stopped,
680 last_exit_success: Some(true), ..Default::default()
682 })
683 .await?;
684 } else {
685 debug!("pid {pid} not running, process may have exited unexpectedly");
686 self.upsert_daemon(UpsertDaemonOpts {
689 id: id.to_string(),
690 pid: None,
691 status: DaemonStatus::Stopped,
692 ..Default::default()
693 })
694 .await?;
695 return Ok(IpcResponse::DaemonWasNotRunning);
696 }
697 Ok(IpcResponse::Ok)
698 } else {
699 debug!("daemon {id} not running");
700 Ok(IpcResponse::DaemonNotRunning)
701 }
702 } else {
703 debug!("daemon {id} not found");
704 Ok(IpcResponse::DaemonNotFound)
705 }
706 }
707}