pitchfork_cli/supervisor/
lifecycle.rs1use super::{SUPERVISOR, Supervisor, UpsertDaemonOpts};
6use crate::daemon::RunOptions;
7use crate::daemon_status::DaemonStatus;
8use crate::ipc::IpcResponse;
9use crate::procs::PROCS;
10use crate::shell::Shell;
11use crate::{Result, env};
12use itertools::Itertools;
13use miette::IntoDiagnostic;
14use once_cell::sync::Lazy;
15use regex::Regex;
16use std::collections::HashMap;
17use std::iter::once;
18use std::time::Duration;
19use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
20use tokio::select;
21use tokio::sync::oneshot;
22use tokio::time;
23
24static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
26 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
27
28pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
30 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
31 if let Some(re) = cache.get(pattern) {
32 return Some(re.clone());
33 }
34 match Regex::new(pattern) {
35 Ok(re) => {
36 cache.insert(pattern.to_string(), re.clone());
37 Some(re)
38 }
39 Err(e) => {
40 error!("invalid regex pattern '{pattern}': {e}");
41 None
42 }
43 }
44}
45
46impl Supervisor {
47 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
49 let id = &opts.id;
50 let cmd = opts.cmd.clone();
51
52 {
54 let mut pending = self.pending_autostops.lock().await;
55 if pending.remove(id).is_some() {
56 info!("cleared pending autostop for {id} (daemon starting)");
57 }
58 }
59
60 let daemon = self.get_daemon(id).await;
61 if let Some(daemon) = daemon {
62 if !daemon.status.is_stopping()
65 && !daemon.status.is_stopped()
66 && let Some(pid) = daemon.pid
67 {
68 if opts.force {
69 self.stop(id).await?;
70 info!("run: stop completed for daemon {id}");
71 } else {
72 warn!("daemon {id} already running with pid {pid}");
73 return Ok(IpcResponse::DaemonAlreadyRunning);
74 }
75 }
76 }
77
78 if opts.wait_ready && opts.retry > 0 {
80 let max_attempts = opts.retry.saturating_add(1);
82 for attempt in 0..max_attempts {
83 let mut retry_opts = opts.clone();
84 retry_opts.retry_count = attempt;
85 retry_opts.cmd = cmd.clone();
86
87 let result = self.run_once(retry_opts).await?;
88
89 match result {
90 IpcResponse::DaemonReady { daemon } => {
91 return Ok(IpcResponse::DaemonReady { daemon });
92 }
93 IpcResponse::DaemonFailedWithCode { exit_code } => {
94 if attempt < opts.retry {
95 let backoff_secs = 2u64.pow(attempt);
96 info!(
97 "daemon {id} failed (attempt {}/{}), retrying in {}s",
98 attempt + 1,
99 max_attempts,
100 backoff_secs
101 );
102 time::sleep(Duration::from_secs(backoff_secs)).await;
103 continue;
104 } else {
105 info!("daemon {id} failed after {max_attempts} attempts");
106 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
107 }
108 }
109 other => return Ok(other),
110 }
111 }
112 }
113
114 self.run_once(opts).await
116 }
117
118 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
120 let id = &opts.id;
121 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
123
124 let (ready_tx, ready_rx) = if opts.wait_ready {
126 let (tx, rx) = oneshot::channel();
127 (Some(tx), Some(rx))
128 } else {
129 (None, None)
130 };
131
132 let cmd = once("exec".to_string())
133 .chain(cmd.into_iter())
134 .collect_vec();
135 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
136 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
137 if let Some(parent) = log_path.parent() {
138 xx::file::mkdirp(parent)?;
139 }
140 info!("run: spawning daemon {id} with args: {args:?}");
141 let mut cmd = tokio::process::Command::new("sh");
142 cmd.args(&args)
143 .stdin(std::process::Stdio::null())
144 .stdout(std::process::Stdio::piped())
145 .stderr(std::process::Stdio::piped())
146 .current_dir(&opts.dir);
147
148 if let Some(ref path) = *env::ORIGINAL_PATH {
150 cmd.env("PATH", path);
151 }
152
153 if let Some(ref env_vars) = opts.env {
155 cmd.envs(env_vars);
156 }
157
158 let mut child = cmd.spawn().into_diagnostic()?;
159 let pid = match child.id() {
160 Some(p) => p,
161 None => {
162 warn!("Daemon {id} exited before PID could be captured");
163 return Ok(IpcResponse::DaemonFailed {
164 error: "Process exited immediately".to_string(),
165 });
166 }
167 };
168 info!("started daemon {id} with pid {pid}");
169 let daemon = self
170 .upsert_daemon(UpsertDaemonOpts {
171 id: id.to_string(),
172 pid: Some(pid),
173 status: DaemonStatus::Running,
174 shell_pid: opts.shell_pid,
175 dir: Some(opts.dir.clone()),
176 cmd: Some(original_cmd),
177 autostop: opts.autostop,
178 cron_schedule: opts.cron_schedule.clone(),
179 cron_retrigger: opts.cron_retrigger,
180 last_exit_success: None,
181 retry: Some(opts.retry),
182 retry_count: Some(opts.retry_count),
183 ready_delay: opts.ready_delay,
184 ready_output: opts.ready_output.clone(),
185 ready_http: opts.ready_http.clone(),
186 ready_port: opts.ready_port,
187 ready_cmd: opts.ready_cmd.clone(),
188 depends: Some(opts.depends.clone()),
189 env: opts.env.clone(),
190 })
191 .await?;
192
193 let id_clone = id.to_string();
194 let ready_delay = opts.ready_delay;
195 let ready_output = opts.ready_output.clone();
196 let ready_http = opts.ready_http.clone();
197 let ready_port = opts.ready_port;
198 let ready_cmd = opts.ready_cmd.clone();
199
200 tokio::spawn(async move {
201 let id = id_clone;
202 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
203 (Some(out), Some(err)) => (out, err),
204 _ => {
205 error!("Failed to capture stdout/stderr for daemon {id}");
206 return;
207 }
208 };
209 let mut stdout = tokio::io::BufReader::new(stdout).lines();
210 let mut stderr = tokio::io::BufReader::new(stderr).lines();
211 let log_file = match tokio::fs::File::options()
212 .append(true)
213 .create(true)
214 .open(&log_path)
215 .await
216 {
217 Ok(f) => f,
218 Err(e) => {
219 error!("Failed to open log file for daemon {id}: {e}");
220 return;
221 }
222 };
223 let mut log_appender = BufWriter::new(log_file);
224
225 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
226 let format_line = |line: String| {
227 if line.starts_with(&format!("{id} ")) {
228 format!("{} {line}\n", now())
230 } else {
231 format!("{} {id} {line}\n", now())
232 }
233 };
234
235 let mut ready_notified = false;
237 let mut ready_tx = ready_tx;
238 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
239
240 let mut delay_timer =
241 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
242
243 let mut http_check_interval = ready_http
245 .as_ref()
246 .map(|_| tokio::time::interval(Duration::from_millis(500)));
247 let http_client = ready_http.as_ref().map(|_| {
248 reqwest::Client::builder()
249 .timeout(Duration::from_secs(5))
250 .build()
251 .unwrap_or_default()
252 });
253
254 let mut port_check_interval =
256 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
257
258 let mut cmd_check_interval = ready_cmd
260 .as_ref()
261 .map(|_| tokio::time::interval(Duration::from_millis(500)));
262
263 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
265
266 let (exit_tx, mut exit_rx) =
268 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
269
270 let child_pid = child.id().unwrap_or(0);
272 tokio::spawn(async move {
273 let result = child.wait().await;
274 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
275 let _ = exit_tx.send(result).await;
276 });
277
278 #[allow(unused_assignments)]
279 let mut exit_status = None;
281
282 loop {
283 select! {
284 Ok(Some(line)) = stdout.next_line() => {
285 let formatted = format_line(line.clone());
286 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
287 error!("Failed to write to log for daemon {id}: {e}");
288 }
289 trace!("stdout: {id} {formatted}");
290
291 if !ready_notified
293 && let Some(ref pattern) = ready_pattern
294 && pattern.is_match(&line) {
295 info!("daemon {id} ready: output matched pattern");
296 ready_notified = true;
297 let _ = log_appender.flush().await;
299 if let Some(tx) = ready_tx.take() {
300 let _ = tx.send(Ok(()));
301 }
302 }
303 }
304 Ok(Some(line)) = stderr.next_line() => {
305 let formatted = format_line(line.clone());
306 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
307 error!("Failed to write to log for daemon {id}: {e}");
308 }
309 trace!("stderr: {id} {formatted}");
310
311 if !ready_notified
313 && let Some(ref pattern) = ready_pattern
314 && pattern.is_match(&line) {
315 info!("daemon {id} ready: output matched pattern");
316 ready_notified = true;
317 let _ = log_appender.flush().await;
319 if let Some(tx) = ready_tx.take() {
320 let _ = tx.send(Ok(()));
321 }
322 }
323 },
324 Some(result) = exit_rx.recv() => {
325 exit_status = Some(result);
327 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
328 let _ = log_appender.flush().await;
330 if !ready_notified {
331 if let Some(tx) = ready_tx.take() {
332 let is_success = exit_status.as_ref()
334 .and_then(|r| r.as_ref().ok())
335 .map(|s| s.success())
336 .unwrap_or(false);
337
338 if is_success {
339 debug!("daemon {id} exited successfully before ready check, sending success notification");
340 let _ = tx.send(Ok(()));
341 } else {
342 let exit_code = exit_status.as_ref()
343 .and_then(|r| r.as_ref().ok())
344 .and_then(|s| s.code());
345 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
346 let _ = tx.send(Err(exit_code));
347 }
348 }
349 } else {
350 debug!("daemon {id} was already marked ready, not sending notification");
351 }
352 break;
353 }
354 _ = async {
355 if let Some(ref mut interval) = http_check_interval {
356 interval.tick().await;
357 } else {
358 std::future::pending::<()>().await;
359 }
360 }, if !ready_notified && ready_http.is_some() => {
361 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
362 match client.get(url).send().await {
363 Ok(response) if response.status().is_success() => {
364 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
365 ready_notified = true;
366 let _ = log_appender.flush().await;
368 if let Some(tx) = ready_tx.take() {
369 let _ = tx.send(Ok(()));
370 }
371 http_check_interval = None;
373 }
374 Ok(response) => {
375 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
376 }
377 Err(e) => {
378 trace!("daemon {id} HTTP check failed: {e}");
379 }
380 }
381 }
382 }
383 _ = async {
384 if let Some(ref mut interval) = port_check_interval {
385 interval.tick().await;
386 } else {
387 std::future::pending::<()>().await;
388 }
389 }, if !ready_notified && ready_port.is_some() => {
390 if let Some(port) = ready_port {
391 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
392 Ok(_) => {
393 info!("daemon {id} ready: TCP port {port} is listening");
394 ready_notified = true;
395 let _ = log_appender.flush().await;
397 if let Some(tx) = ready_tx.take() {
398 let _ = tx.send(Ok(()));
399 }
400 port_check_interval = None;
402 }
403 Err(_) => {
404 trace!("daemon {id} port check: port {port} not listening yet");
405 }
406 }
407 }
408 }
409 _ = async {
410 if let Some(ref mut interval) = cmd_check_interval {
411 interval.tick().await;
412 } else {
413 std::future::pending::<()>().await;
414 }
415 }, if !ready_notified && ready_cmd.is_some() => {
416 if let Some(ref cmd) = ready_cmd {
417 let mut command = Shell::default_for_platform().command(cmd);
419 command
420 .stdout(std::process::Stdio::null())
421 .stderr(std::process::Stdio::null());
422 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
423 match result {
424 Ok(status) if status.success() => {
425 info!("daemon {id} ready: readiness command succeeded");
426 ready_notified = true;
427 let _ = log_appender.flush().await;
428 if let Some(tx) = ready_tx.take() {
429 let _ = tx.send(Ok(()));
430 }
431 cmd_check_interval = None;
433 }
434 Ok(_) => {
435 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
436 }
437 Err(e) => {
438 trace!("daemon {id} cmd check failed: {e}");
439 }
440 }
441 }
442 }
443 _ = async {
444 if let Some(ref mut timer) = delay_timer {
445 timer.await;
446 } else {
447 std::future::pending::<()>().await;
448 }
449 } => {
450 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
451 info!("daemon {id} ready: delay elapsed");
452 ready_notified = true;
453 let _ = log_appender.flush().await;
455 if let Some(tx) = ready_tx.take() {
456 let _ = tx.send(Ok(()));
457 }
458 }
459 delay_timer = None;
461 }
462 _ = log_flush_interval.tick() => {
463 if let Err(e) = log_appender.flush().await {
465 error!("Failed to flush log for daemon {id}: {e}");
466 }
467 }
468 }
471 }
472
473 if let Err(e) = log_appender.flush().await {
475 error!("Failed to final flush log for daemon {id}: {e}");
476 }
477
478 let exit_status = if let Some(status) = exit_status {
480 status
481 } else {
482 match exit_rx.recv().await {
484 Some(status) => status,
485 None => {
486 warn!("daemon {id} exit channel closed without receiving status");
487 Err(std::io::Error::other("exit channel closed"))
488 }
489 }
490 };
491 let current_daemon = SUPERVISOR.get_daemon(&id).await;
492
493 if current_daemon.is_none()
495 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
496 {
497 return;
499 }
500 let is_stopping = current_daemon
501 .as_ref()
502 .is_some_and(|d| d.status.is_stopping());
503
504 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
505 return;
507 }
508 if let Ok(status) = exit_status {
509 info!("daemon {id} exited with status {status}");
510 if status.success() || is_stopping {
511 if let Err(e) = SUPERVISOR
514 .upsert_daemon(UpsertDaemonOpts {
515 id: id.clone(),
516 pid: None, status: DaemonStatus::Stopped,
518 last_exit_success: Some(status.success()),
519 ..Default::default()
520 })
521 .await
522 {
523 error!("Failed to update daemon state for {id}: {e}");
524 }
525 } else {
526 let status = match status.code() {
529 Some(code) => DaemonStatus::Errored(Some(code)),
530 None => DaemonStatus::Errored(None),
531 };
532 if let Err(e) = SUPERVISOR
533 .upsert_daemon(UpsertDaemonOpts {
534 id: id.clone(),
535 pid: None,
536 status,
537 last_exit_success: Some(false),
538 ..Default::default()
539 })
540 .await
541 {
542 error!("Failed to update daemon state for {id}: {e}");
543 }
544 }
545 } else if let Err(e) = SUPERVISOR
546 .upsert_daemon(UpsertDaemonOpts {
547 id: id.clone(),
548 pid: None,
549 status: DaemonStatus::Errored(None),
550 last_exit_success: Some(false),
551 ..Default::default()
552 })
553 .await
554 {
555 error!("Failed to update daemon state for {id}: {e}");
556 }
557 });
558
559 if let Some(ready_rx) = ready_rx {
561 match ready_rx.await {
562 Ok(Ok(())) => {
563 info!("daemon {id} is ready");
564 Ok(IpcResponse::DaemonReady { daemon })
565 }
566 Ok(Err(exit_code)) => {
567 error!("daemon {id} failed before becoming ready");
568 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
569 }
570 Err(_) => {
571 error!("readiness channel closed unexpectedly for daemon {id}");
572 Ok(IpcResponse::DaemonStart { daemon })
573 }
574 }
575 } else {
576 Ok(IpcResponse::DaemonStart { daemon })
577 }
578 }
579
580 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
582 if id == "pitchfork" {
583 return Ok(IpcResponse::Error(
584 "Cannot stop supervisor via stop command".into(),
585 ));
586 }
587 info!("stopping daemon: {id}");
588 if let Some(daemon) = self.get_daemon(id).await {
589 trace!("daemon to stop: {daemon}");
590 if let Some(pid) = daemon.pid {
591 trace!("killing pid: {pid}");
592 PROCS.refresh_processes();
593 if PROCS.is_running(pid) {
594 self.upsert_daemon(UpsertDaemonOpts {
596 id: id.to_string(),
597 pid: Some(pid),
598 status: DaemonStatus::Stopping,
599 ..Default::default()
600 })
601 .await?;
602
603 for child_pid in PROCS.all_children(pid) {
605 debug!("killing child pid: {child_pid}");
606 if let Err(e) = PROCS.kill_async(child_pid).await {
607 warn!("failed to kill child pid {child_pid}: {e}");
608 }
609 }
610
611 if let Err(e) = PROCS.kill_async(pid).await {
613 debug!("failed to kill pid {pid}: {e}");
614 PROCS.refresh_processes();
616 if PROCS.is_running(pid) {
617 debug!("failed to stop pid {pid}: process still running after kill");
619 self.upsert_daemon(UpsertDaemonOpts {
620 id: id.to_string(),
621 pid: Some(pid), status: DaemonStatus::Running,
623 ..Default::default()
624 })
625 .await?;
626 return Ok(IpcResponse::DaemonStopFailed {
627 error: format!(
628 "process {pid} still running after kill attempt: {e}"
629 ),
630 });
631 }
632 }
633
634 let mut process_terminated = false;
638 for i in 0..10 {
639 PROCS.refresh_pids(&[pid]);
640 if !PROCS.is_running(pid) {
641 process_terminated = true;
642 break;
643 }
644 if i < 9 {
645 debug!(
646 "waiting for process {pid} to fully terminate ({}/10)",
647 i + 1
648 );
649 time::sleep(Duration::from_millis(50)).await;
650 }
651 }
652
653 if !process_terminated {
654 warn!(
655 "Process {pid} for daemon {id} did not terminate within 500ms after SIGTERM. \
656 The process may take longer to release resources."
657 );
658 }
659
660 self.upsert_daemon(UpsertDaemonOpts {
662 id: id.to_string(),
663 pid: None,
664 status: DaemonStatus::Stopped,
665 last_exit_success: Some(true), ..Default::default()
667 })
668 .await?;
669 } else {
670 debug!("pid {pid} not running, process may have exited unexpectedly");
671 self.upsert_daemon(UpsertDaemonOpts {
674 id: id.to_string(),
675 pid: None,
676 status: DaemonStatus::Stopped,
677 ..Default::default()
678 })
679 .await?;
680 return Ok(IpcResponse::DaemonWasNotRunning);
681 }
682 Ok(IpcResponse::Ok)
683 } else {
684 debug!("daemon {id} not running");
685 Ok(IpcResponse::DaemonNotRunning)
686 }
687 } else {
688 debug!("daemon {id} not found");
689 Ok(IpcResponse::DaemonNotFound)
690 }
691 }
692}