pitchfork_cli/supervisor/
lifecycle.rs1use super::{SUPERVISOR, Supervisor, UpsertDaemonOpts};
6use crate::daemon::RunOptions;
7use crate::daemon_status::DaemonStatus;
8use crate::ipc::IpcResponse;
9use crate::procs::PROCS;
10use crate::shell::Shell;
11use crate::{Result, env};
12use itertools::Itertools;
13use miette::IntoDiagnostic;
14use once_cell::sync::Lazy;
15use regex::Regex;
16use std::collections::HashMap;
17use std::iter::once;
18use std::time::Duration;
19use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
20use tokio::select;
21use tokio::sync::oneshot;
22use tokio::time;
23
24static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
26 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
27
28pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
30 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
31 if let Some(re) = cache.get(pattern) {
32 return Some(re.clone());
33 }
34 match Regex::new(pattern) {
35 Ok(re) => {
36 cache.insert(pattern.to_string(), re.clone());
37 Some(re)
38 }
39 Err(e) => {
40 error!("invalid regex pattern '{pattern}': {e}");
41 None
42 }
43 }
44}
45
46impl Supervisor {
47 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
49 let id = &opts.id;
50 let cmd = opts.cmd.clone();
51
52 {
54 let mut pending = self.pending_autostops.lock().await;
55 if pending.remove(id).is_some() {
56 info!("cleared pending autostop for {id} (daemon starting)");
57 }
58 }
59
60 let daemon = self.get_daemon(id).await;
61 if let Some(daemon) = daemon {
62 if !daemon.status.is_stopping()
65 && !daemon.status.is_stopped()
66 && let Some(pid) = daemon.pid
67 {
68 if opts.force {
69 self.stop(id).await?;
70 info!("run: stop completed for daemon {id}");
71 } else {
72 warn!("daemon {id} already running with pid {pid}");
73 return Ok(IpcResponse::DaemonAlreadyRunning);
74 }
75 }
76 }
77
78 if opts.wait_ready && opts.retry > 0 {
80 let max_attempts = opts.retry.saturating_add(1);
82 for attempt in 0..max_attempts {
83 let mut retry_opts = opts.clone();
84 retry_opts.retry_count = attempt;
85 retry_opts.cmd = cmd.clone();
86
87 let result = self.run_once(retry_opts).await?;
88
89 match result {
90 IpcResponse::DaemonReady { daemon } => {
91 return Ok(IpcResponse::DaemonReady { daemon });
92 }
93 IpcResponse::DaemonFailedWithCode { exit_code } => {
94 if attempt < opts.retry {
95 let backoff_secs = 2u64.pow(attempt);
96 info!(
97 "daemon {id} failed (attempt {}/{}), retrying in {}s",
98 attempt + 1,
99 max_attempts,
100 backoff_secs
101 );
102 time::sleep(Duration::from_secs(backoff_secs)).await;
103 continue;
104 } else {
105 info!("daemon {id} failed after {max_attempts} attempts");
106 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
107 }
108 }
109 other => return Ok(other),
110 }
111 }
112 }
113
114 self.run_once(opts).await
116 }
117
118 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
120 let id = &opts.id;
121 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
123
124 let (ready_tx, ready_rx) = if opts.wait_ready {
126 let (tx, rx) = oneshot::channel();
127 (Some(tx), Some(rx))
128 } else {
129 (None, None)
130 };
131
132 let cmd = once("exec".to_string())
133 .chain(cmd.into_iter())
134 .collect_vec();
135 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
136 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
137 if let Some(parent) = log_path.parent() {
138 xx::file::mkdirp(parent)?;
139 }
140 info!("run: spawning daemon {id} with args: {args:?}");
141 let mut cmd = tokio::process::Command::new("sh");
142 cmd.args(&args)
143 .stdin(std::process::Stdio::null())
144 .stdout(std::process::Stdio::piped())
145 .stderr(std::process::Stdio::piped())
146 .current_dir(&opts.dir);
147
148 if let Some(ref path) = *env::ORIGINAL_PATH {
150 cmd.env("PATH", path);
151 }
152
153 let mut child = cmd.spawn().into_diagnostic()?;
154 let pid = match child.id() {
155 Some(p) => p,
156 None => {
157 warn!("Daemon {id} exited before PID could be captured");
158 return Ok(IpcResponse::DaemonFailed {
159 error: "Process exited immediately".to_string(),
160 });
161 }
162 };
163 info!("started daemon {id} with pid {pid}");
164 let daemon = self
165 .upsert_daemon(UpsertDaemonOpts {
166 id: id.to_string(),
167 pid: Some(pid),
168 status: DaemonStatus::Running,
169 shell_pid: opts.shell_pid,
170 dir: Some(opts.dir.clone()),
171 cmd: Some(original_cmd),
172 autostop: opts.autostop,
173 cron_schedule: opts.cron_schedule.clone(),
174 cron_retrigger: opts.cron_retrigger,
175 last_exit_success: None,
176 retry: Some(opts.retry),
177 retry_count: Some(opts.retry_count),
178 ready_delay: opts.ready_delay,
179 ready_output: opts.ready_output.clone(),
180 ready_http: opts.ready_http.clone(),
181 ready_port: opts.ready_port,
182 ready_cmd: opts.ready_cmd.clone(),
183 depends: Some(opts.depends.clone()),
184 })
185 .await?;
186
187 let id_clone = id.to_string();
188 let ready_delay = opts.ready_delay;
189 let ready_output = opts.ready_output.clone();
190 let ready_http = opts.ready_http.clone();
191 let ready_port = opts.ready_port;
192 let ready_cmd = opts.ready_cmd.clone();
193
194 tokio::spawn(async move {
195 let id = id_clone;
196 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
197 (Some(out), Some(err)) => (out, err),
198 _ => {
199 error!("Failed to capture stdout/stderr for daemon {id}");
200 return;
201 }
202 };
203 let mut stdout = tokio::io::BufReader::new(stdout).lines();
204 let mut stderr = tokio::io::BufReader::new(stderr).lines();
205 let log_file = match tokio::fs::File::options()
206 .append(true)
207 .create(true)
208 .open(&log_path)
209 .await
210 {
211 Ok(f) => f,
212 Err(e) => {
213 error!("Failed to open log file for daemon {id}: {e}");
214 return;
215 }
216 };
217 let mut log_appender = BufWriter::new(log_file);
218
219 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
220 let format_line = |line: String| {
221 if line.starts_with(&format!("{id} ")) {
222 format!("{} {line}\n", now())
224 } else {
225 format!("{} {id} {line}\n", now())
226 }
227 };
228
229 let mut ready_notified = false;
231 let mut ready_tx = ready_tx;
232 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
233
234 let mut delay_timer =
235 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
236
237 let mut http_check_interval = ready_http
239 .as_ref()
240 .map(|_| tokio::time::interval(Duration::from_millis(500)));
241 let http_client = ready_http.as_ref().map(|_| {
242 reqwest::Client::builder()
243 .timeout(Duration::from_secs(5))
244 .build()
245 .unwrap_or_default()
246 });
247
248 let mut port_check_interval =
250 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
251
252 let mut cmd_check_interval = ready_cmd
254 .as_ref()
255 .map(|_| tokio::time::interval(Duration::from_millis(500)));
256
257 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
259
260 let (exit_tx, mut exit_rx) =
262 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
263
264 let child_pid = child.id().unwrap_or(0);
266 tokio::spawn(async move {
267 let result = child.wait().await;
268 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
269 let _ = exit_tx.send(result).await;
270 });
271
272 #[allow(unused_assignments)]
273 let mut exit_status = None;
275
276 loop {
277 select! {
278 Ok(Some(line)) = stdout.next_line() => {
279 let formatted = format_line(line.clone());
280 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
281 error!("Failed to write to log for daemon {id}: {e}");
282 }
283 trace!("stdout: {id} {formatted}");
284
285 if !ready_notified
287 && let Some(ref pattern) = ready_pattern
288 && pattern.is_match(&line) {
289 info!("daemon {id} ready: output matched pattern");
290 ready_notified = true;
291 let _ = log_appender.flush().await;
293 if let Some(tx) = ready_tx.take() {
294 let _ = tx.send(Ok(()));
295 }
296 }
297 }
298 Ok(Some(line)) = stderr.next_line() => {
299 let formatted = format_line(line.clone());
300 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
301 error!("Failed to write to log for daemon {id}: {e}");
302 }
303 trace!("stderr: {id} {formatted}");
304
305 if !ready_notified
307 && let Some(ref pattern) = ready_pattern
308 && pattern.is_match(&line) {
309 info!("daemon {id} ready: output matched pattern");
310 ready_notified = true;
311 let _ = log_appender.flush().await;
313 if let Some(tx) = ready_tx.take() {
314 let _ = tx.send(Ok(()));
315 }
316 }
317 },
318 Some(result) = exit_rx.recv() => {
319 exit_status = Some(result);
321 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
322 let _ = log_appender.flush().await;
324 if !ready_notified {
325 if let Some(tx) = ready_tx.take() {
326 let is_success = exit_status.as_ref()
328 .and_then(|r| r.as_ref().ok())
329 .map(|s| s.success())
330 .unwrap_or(false);
331
332 if is_success {
333 debug!("daemon {id} exited successfully before ready check, sending success notification");
334 let _ = tx.send(Ok(()));
335 } else {
336 let exit_code = exit_status.as_ref()
337 .and_then(|r| r.as_ref().ok())
338 .and_then(|s| s.code());
339 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
340 let _ = tx.send(Err(exit_code));
341 }
342 }
343 } else {
344 debug!("daemon {id} was already marked ready, not sending notification");
345 }
346 break;
347 }
348 _ = async {
349 if let Some(ref mut interval) = http_check_interval {
350 interval.tick().await;
351 } else {
352 std::future::pending::<()>().await;
353 }
354 }, if !ready_notified && ready_http.is_some() => {
355 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
356 match client.get(url).send().await {
357 Ok(response) if response.status().is_success() => {
358 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
359 ready_notified = true;
360 let _ = log_appender.flush().await;
362 if let Some(tx) = ready_tx.take() {
363 let _ = tx.send(Ok(()));
364 }
365 http_check_interval = None;
367 }
368 Ok(response) => {
369 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
370 }
371 Err(e) => {
372 trace!("daemon {id} HTTP check failed: {e}");
373 }
374 }
375 }
376 }
377 _ = async {
378 if let Some(ref mut interval) = port_check_interval {
379 interval.tick().await;
380 } else {
381 std::future::pending::<()>().await;
382 }
383 }, if !ready_notified && ready_port.is_some() => {
384 if let Some(port) = ready_port {
385 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
386 Ok(_) => {
387 info!("daemon {id} ready: TCP port {port} is listening");
388 ready_notified = true;
389 let _ = log_appender.flush().await;
391 if let Some(tx) = ready_tx.take() {
392 let _ = tx.send(Ok(()));
393 }
394 port_check_interval = None;
396 }
397 Err(_) => {
398 trace!("daemon {id} port check: port {port} not listening yet");
399 }
400 }
401 }
402 }
403 _ = async {
404 if let Some(ref mut interval) = cmd_check_interval {
405 interval.tick().await;
406 } else {
407 std::future::pending::<()>().await;
408 }
409 }, if !ready_notified && ready_cmd.is_some() => {
410 if let Some(ref cmd) = ready_cmd {
411 let mut command = Shell::default_for_platform().command(cmd);
413 command
414 .stdout(std::process::Stdio::null())
415 .stderr(std::process::Stdio::null());
416 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
417 match result {
418 Ok(status) if status.success() => {
419 info!("daemon {id} ready: readiness command succeeded");
420 ready_notified = true;
421 let _ = log_appender.flush().await;
422 if let Some(tx) = ready_tx.take() {
423 let _ = tx.send(Ok(()));
424 }
425 cmd_check_interval = None;
427 }
428 Ok(_) => {
429 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
430 }
431 Err(e) => {
432 trace!("daemon {id} cmd check failed: {e}");
433 }
434 }
435 }
436 }
437 _ = async {
438 if let Some(ref mut timer) = delay_timer {
439 timer.await;
440 } else {
441 std::future::pending::<()>().await;
442 }
443 } => {
444 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
445 info!("daemon {id} ready: delay elapsed");
446 ready_notified = true;
447 let _ = log_appender.flush().await;
449 if let Some(tx) = ready_tx.take() {
450 let _ = tx.send(Ok(()));
451 }
452 }
453 delay_timer = None;
455 }
456 _ = log_flush_interval.tick() => {
457 if let Err(e) = log_appender.flush().await {
459 error!("Failed to flush log for daemon {id}: {e}");
460 }
461 }
462 }
465 }
466
467 if let Err(e) = log_appender.flush().await {
469 error!("Failed to final flush log for daemon {id}: {e}");
470 }
471
472 let exit_status = if let Some(status) = exit_status {
474 status
475 } else {
476 match exit_rx.recv().await {
478 Some(status) => status,
479 None => {
480 warn!("daemon {id} exit channel closed without receiving status");
481 Err(std::io::Error::other("exit channel closed"))
482 }
483 }
484 };
485 let current_daemon = SUPERVISOR.get_daemon(&id).await;
486
487 if current_daemon.is_none()
489 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
490 {
491 return;
493 }
494 let is_stopping = current_daemon
495 .as_ref()
496 .is_some_and(|d| d.status.is_stopping());
497
498 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
499 return;
501 }
502 if let Ok(status) = exit_status {
503 info!("daemon {id} exited with status {status}");
504 if status.success() || is_stopping {
505 if let Err(e) = SUPERVISOR
508 .upsert_daemon(UpsertDaemonOpts {
509 id: id.clone(),
510 pid: None, status: DaemonStatus::Stopped,
512 last_exit_success: Some(status.success()),
513 ..Default::default()
514 })
515 .await
516 {
517 error!("Failed to update daemon state for {id}: {e}");
518 }
519 } else {
520 let status = match status.code() {
523 Some(code) => DaemonStatus::Errored(Some(code)),
524 None => DaemonStatus::Errored(None),
525 };
526 if let Err(e) = SUPERVISOR
527 .upsert_daemon(UpsertDaemonOpts {
528 id: id.clone(),
529 pid: None,
530 status,
531 last_exit_success: Some(false),
532 ..Default::default()
533 })
534 .await
535 {
536 error!("Failed to update daemon state for {id}: {e}");
537 }
538 }
539 } else if let Err(e) = SUPERVISOR
540 .upsert_daemon(UpsertDaemonOpts {
541 id: id.clone(),
542 pid: None,
543 status: DaemonStatus::Errored(None),
544 last_exit_success: Some(false),
545 ..Default::default()
546 })
547 .await
548 {
549 error!("Failed to update daemon state for {id}: {e}");
550 }
551 });
552
553 if let Some(ready_rx) = ready_rx {
555 match ready_rx.await {
556 Ok(Ok(())) => {
557 info!("daemon {id} is ready");
558 Ok(IpcResponse::DaemonReady { daemon })
559 }
560 Ok(Err(exit_code)) => {
561 error!("daemon {id} failed before becoming ready");
562 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
563 }
564 Err(_) => {
565 error!("readiness channel closed unexpectedly for daemon {id}");
566 Ok(IpcResponse::DaemonStart { daemon })
567 }
568 }
569 } else {
570 Ok(IpcResponse::DaemonStart { daemon })
571 }
572 }
573
574 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
576 if id == "pitchfork" {
577 return Ok(IpcResponse::Error(
578 "Cannot stop supervisor via stop command".into(),
579 ));
580 }
581 info!("stopping daemon: {id}");
582 if let Some(daemon) = self.get_daemon(id).await {
583 trace!("daemon to stop: {daemon}");
584 if let Some(pid) = daemon.pid {
585 trace!("killing pid: {pid}");
586 PROCS.refresh_processes();
587 if PROCS.is_running(pid) {
588 self.upsert_daemon(UpsertDaemonOpts {
590 id: id.to_string(),
591 pid: Some(pid),
592 status: DaemonStatus::Stopping,
593 ..Default::default()
594 })
595 .await?;
596
597 for child_pid in PROCS.all_children(pid) {
599 debug!("killing child pid: {child_pid}");
600 if let Err(e) = PROCS.kill_async(child_pid).await {
601 warn!("failed to kill child pid {child_pid}: {e}");
602 }
603 }
604
605 if let Err(e) = PROCS.kill_async(pid).await {
607 debug!("failed to kill pid {pid}: {e}");
608 PROCS.refresh_processes();
610 if PROCS.is_running(pid) {
611 debug!("failed to stop pid {pid}: process still running after kill");
613 self.upsert_daemon(UpsertDaemonOpts {
614 id: id.to_string(),
615 pid: Some(pid), status: DaemonStatus::Running,
617 ..Default::default()
618 })
619 .await?;
620 return Ok(IpcResponse::DaemonStopFailed {
621 error: format!(
622 "process {pid} still running after kill attempt: {e}"
623 ),
624 });
625 }
626 }
627
628 let mut process_terminated = false;
632 for i in 0..10 {
633 PROCS.refresh_pids(&[pid]);
634 if !PROCS.is_running(pid) {
635 process_terminated = true;
636 break;
637 }
638 if i < 9 {
639 debug!(
640 "waiting for process {pid} to fully terminate ({}/10)",
641 i + 1
642 );
643 time::sleep(Duration::from_millis(50)).await;
644 }
645 }
646
647 if !process_terminated {
648 warn!(
649 "Process {pid} for daemon {id} did not terminate within 500ms after SIGTERM. \
650 The process may take longer to release resources."
651 );
652 }
653
654 self.upsert_daemon(UpsertDaemonOpts {
656 id: id.to_string(),
657 pid: None,
658 status: DaemonStatus::Stopped,
659 last_exit_success: Some(true), ..Default::default()
661 })
662 .await?;
663 } else {
664 debug!("pid {pid} not running, process may have exited unexpectedly");
665 self.upsert_daemon(UpsertDaemonOpts {
668 id: id.to_string(),
669 pid: None,
670 status: DaemonStatus::Stopped,
671 ..Default::default()
672 })
673 .await?;
674 return Ok(IpcResponse::DaemonWasNotRunning);
675 }
676 Ok(IpcResponse::Ok)
677 } else {
678 debug!("daemon {id} not running");
679 Ok(IpcResponse::DaemonNotRunning)
680 }
681 } else {
682 debug!("daemon {id} not found");
683 Ok(IpcResponse::DaemonNotFound)
684 }
685 }
686}