pitchfork_cli/supervisor/
lifecycle.rs1use super::{SUPERVISOR, Supervisor, UpsertDaemonOpts};
6use crate::daemon::RunOptions;
7use crate::daemon_status::DaemonStatus;
8use crate::ipc::IpcResponse;
9use crate::procs::PROCS;
10use crate::shell::Shell;
11use crate::{Result, env};
12use itertools::Itertools;
13use miette::IntoDiagnostic;
14use once_cell::sync::Lazy;
15use regex::Regex;
16use std::collections::HashMap;
17use std::iter::once;
18use std::time::Duration;
19use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
20use tokio::select;
21use tokio::sync::oneshot;
22use tokio::time;
23
24static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
26 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
27
28pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
30 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
31 if let Some(re) = cache.get(pattern) {
32 return Some(re.clone());
33 }
34 match Regex::new(pattern) {
35 Ok(re) => {
36 cache.insert(pattern.to_string(), re.clone());
37 Some(re)
38 }
39 Err(e) => {
40 error!("invalid regex pattern '{pattern}': {e}");
41 None
42 }
43 }
44}
45
46impl Supervisor {
47 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
49 let id = &opts.id;
50 let cmd = opts.cmd.clone();
51
52 {
54 let mut pending = self.pending_autostops.lock().await;
55 if pending.remove(id).is_some() {
56 info!("cleared pending autostop for {id} (daemon starting)");
57 }
58 }
59
60 let daemon = self.get_daemon(id).await;
61 if let Some(daemon) = daemon {
62 if !daemon.status.is_stopping()
65 && !daemon.status.is_stopped()
66 && let Some(pid) = daemon.pid
67 {
68 if opts.force {
69 self.stop(id).await?;
70 info!("run: stop completed for daemon {id}");
71 } else {
72 warn!("daemon {id} already running with pid {pid}");
73 return Ok(IpcResponse::DaemonAlreadyRunning);
74 }
75 }
76 }
77
78 if opts.wait_ready && opts.retry > 0 {
80 let max_attempts = opts.retry.saturating_add(1);
82 for attempt in 0..max_attempts {
83 let mut retry_opts = opts.clone();
84 retry_opts.retry_count = attempt;
85 retry_opts.cmd = cmd.clone();
86
87 let result = self.run_once(retry_opts).await?;
88
89 match result {
90 IpcResponse::DaemonReady { daemon } => {
91 return Ok(IpcResponse::DaemonReady { daemon });
92 }
93 IpcResponse::DaemonFailedWithCode { exit_code } => {
94 if attempt < opts.retry {
95 let backoff_secs = 2u64.pow(attempt);
96 info!(
97 "daemon {id} failed (attempt {}/{}), retrying in {}s",
98 attempt + 1,
99 max_attempts,
100 backoff_secs
101 );
102 time::sleep(Duration::from_secs(backoff_secs)).await;
103 continue;
104 } else {
105 info!("daemon {id} failed after {max_attempts} attempts");
106 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
107 }
108 }
109 other => return Ok(other),
110 }
111 }
112 }
113
114 self.run_once(opts).await
116 }
117
118 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
120 let id = &opts.id;
121 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
123
124 let (ready_tx, ready_rx) = if opts.wait_ready {
126 let (tx, rx) = oneshot::channel();
127 (Some(tx), Some(rx))
128 } else {
129 (None, None)
130 };
131
132 let cmd = once("exec".to_string())
133 .chain(cmd.into_iter())
134 .collect_vec();
135 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
136 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
137 if let Some(parent) = log_path.parent() {
138 xx::file::mkdirp(parent)?;
139 }
140 info!("run: spawning daemon {id} with args: {args:?}");
141 let mut cmd = tokio::process::Command::new("sh");
142 cmd.args(&args)
143 .stdin(std::process::Stdio::null())
144 .stdout(std::process::Stdio::piped())
145 .stderr(std::process::Stdio::piped())
146 .current_dir(&opts.dir);
147
148 if let Some(ref path) = *env::ORIGINAL_PATH {
150 cmd.env("PATH", path);
151 }
152
153 if let Some(ref env_vars) = opts.env {
155 cmd.envs(env_vars);
156 }
157
158 #[cfg(unix)]
161 unsafe {
162 cmd.pre_exec(|| {
163 if libc::setsid() == -1 {
164 return Err(std::io::Error::last_os_error());
165 }
166 Ok(())
167 });
168 }
169
170 let mut child = cmd.spawn().into_diagnostic()?;
171 let pid = match child.id() {
172 Some(p) => p,
173 None => {
174 warn!("Daemon {id} exited before PID could be captured");
175 return Ok(IpcResponse::DaemonFailed {
176 error: "Process exited immediately".to_string(),
177 });
178 }
179 };
180 info!("started daemon {id} with pid {pid}");
181 let daemon = self
182 .upsert_daemon(UpsertDaemonOpts {
183 id: id.to_string(),
184 pid: Some(pid),
185 status: DaemonStatus::Running,
186 shell_pid: opts.shell_pid,
187 dir: Some(opts.dir.clone()),
188 cmd: Some(original_cmd),
189 autostop: opts.autostop,
190 cron_schedule: opts.cron_schedule.clone(),
191 cron_retrigger: opts.cron_retrigger,
192 last_exit_success: None,
193 retry: Some(opts.retry),
194 retry_count: Some(opts.retry_count),
195 ready_delay: opts.ready_delay,
196 ready_output: opts.ready_output.clone(),
197 ready_http: opts.ready_http.clone(),
198 ready_port: opts.ready_port,
199 ready_cmd: opts.ready_cmd.clone(),
200 depends: Some(opts.depends.clone()),
201 env: opts.env.clone(),
202 })
203 .await?;
204
205 let id_clone = id.to_string();
206 let ready_delay = opts.ready_delay;
207 let ready_output = opts.ready_output.clone();
208 let ready_http = opts.ready_http.clone();
209 let ready_port = opts.ready_port;
210 let ready_cmd = opts.ready_cmd.clone();
211
212 tokio::spawn(async move {
213 let id = id_clone;
214 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
215 (Some(out), Some(err)) => (out, err),
216 _ => {
217 error!("Failed to capture stdout/stderr for daemon {id}");
218 return;
219 }
220 };
221 let mut stdout = tokio::io::BufReader::new(stdout).lines();
222 let mut stderr = tokio::io::BufReader::new(stderr).lines();
223 let log_file = match tokio::fs::File::options()
224 .append(true)
225 .create(true)
226 .open(&log_path)
227 .await
228 {
229 Ok(f) => f,
230 Err(e) => {
231 error!("Failed to open log file for daemon {id}: {e}");
232 return;
233 }
234 };
235 let mut log_appender = BufWriter::new(log_file);
236
237 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
238 let format_line = |line: String| {
239 if line.starts_with(&format!("{id} ")) {
240 format!("{} {line}\n", now())
242 } else {
243 format!("{} {id} {line}\n", now())
244 }
245 };
246
247 let mut ready_notified = false;
249 let mut ready_tx = ready_tx;
250 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
251
252 let mut delay_timer =
253 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
254
255 let mut http_check_interval = ready_http
257 .as_ref()
258 .map(|_| tokio::time::interval(Duration::from_millis(500)));
259 let http_client = ready_http.as_ref().map(|_| {
260 reqwest::Client::builder()
261 .timeout(Duration::from_secs(5))
262 .build()
263 .unwrap_or_default()
264 });
265
266 let mut port_check_interval =
268 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
269
270 let mut cmd_check_interval = ready_cmd
272 .as_ref()
273 .map(|_| tokio::time::interval(Duration::from_millis(500)));
274
275 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
277
278 let (exit_tx, mut exit_rx) =
280 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
281
282 let child_pid = child.id().unwrap_or(0);
284 tokio::spawn(async move {
285 let result = child.wait().await;
286 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
287 let _ = exit_tx.send(result).await;
288 });
289
290 #[allow(unused_assignments)]
291 let mut exit_status = None;
293
294 loop {
295 select! {
296 Ok(Some(line)) = stdout.next_line() => {
297 let formatted = format_line(line.clone());
298 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
299 error!("Failed to write to log for daemon {id}: {e}");
300 }
301 trace!("stdout: {id} {formatted}");
302
303 if !ready_notified
305 && let Some(ref pattern) = ready_pattern
306 && pattern.is_match(&line) {
307 info!("daemon {id} ready: output matched pattern");
308 ready_notified = true;
309 let _ = log_appender.flush().await;
311 if let Some(tx) = ready_tx.take() {
312 let _ = tx.send(Ok(()));
313 }
314 }
315 }
316 Ok(Some(line)) = stderr.next_line() => {
317 let formatted = format_line(line.clone());
318 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
319 error!("Failed to write to log for daemon {id}: {e}");
320 }
321 trace!("stderr: {id} {formatted}");
322
323 if !ready_notified
325 && let Some(ref pattern) = ready_pattern
326 && pattern.is_match(&line) {
327 info!("daemon {id} ready: output matched pattern");
328 ready_notified = true;
329 let _ = log_appender.flush().await;
331 if let Some(tx) = ready_tx.take() {
332 let _ = tx.send(Ok(()));
333 }
334 }
335 },
336 Some(result) = exit_rx.recv() => {
337 exit_status = Some(result);
339 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
340 let _ = log_appender.flush().await;
342 if !ready_notified {
343 if let Some(tx) = ready_tx.take() {
344 let is_success = exit_status.as_ref()
346 .and_then(|r| r.as_ref().ok())
347 .map(|s| s.success())
348 .unwrap_or(false);
349
350 if is_success {
351 debug!("daemon {id} exited successfully before ready check, sending success notification");
352 let _ = tx.send(Ok(()));
353 } else {
354 let exit_code = exit_status.as_ref()
355 .and_then(|r| r.as_ref().ok())
356 .and_then(|s| s.code());
357 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
358 let _ = tx.send(Err(exit_code));
359 }
360 }
361 } else {
362 debug!("daemon {id} was already marked ready, not sending notification");
363 }
364 break;
365 }
366 _ = async {
367 if let Some(ref mut interval) = http_check_interval {
368 interval.tick().await;
369 } else {
370 std::future::pending::<()>().await;
371 }
372 }, if !ready_notified && ready_http.is_some() => {
373 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
374 match client.get(url).send().await {
375 Ok(response) if response.status().is_success() => {
376 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
377 ready_notified = true;
378 let _ = log_appender.flush().await;
380 if let Some(tx) = ready_tx.take() {
381 let _ = tx.send(Ok(()));
382 }
383 http_check_interval = None;
385 }
386 Ok(response) => {
387 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
388 }
389 Err(e) => {
390 trace!("daemon {id} HTTP check failed: {e}");
391 }
392 }
393 }
394 }
395 _ = async {
396 if let Some(ref mut interval) = port_check_interval {
397 interval.tick().await;
398 } else {
399 std::future::pending::<()>().await;
400 }
401 }, if !ready_notified && ready_port.is_some() => {
402 if let Some(port) = ready_port {
403 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
404 Ok(_) => {
405 info!("daemon {id} ready: TCP port {port} is listening");
406 ready_notified = true;
407 let _ = log_appender.flush().await;
409 if let Some(tx) = ready_tx.take() {
410 let _ = tx.send(Ok(()));
411 }
412 port_check_interval = None;
414 }
415 Err(_) => {
416 trace!("daemon {id} port check: port {port} not listening yet");
417 }
418 }
419 }
420 }
421 _ = async {
422 if let Some(ref mut interval) = cmd_check_interval {
423 interval.tick().await;
424 } else {
425 std::future::pending::<()>().await;
426 }
427 }, if !ready_notified && ready_cmd.is_some() => {
428 if let Some(ref cmd) = ready_cmd {
429 let mut command = Shell::default_for_platform().command(cmd);
431 command
432 .stdout(std::process::Stdio::null())
433 .stderr(std::process::Stdio::null());
434 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
435 match result {
436 Ok(status) if status.success() => {
437 info!("daemon {id} ready: readiness command succeeded");
438 ready_notified = true;
439 let _ = log_appender.flush().await;
440 if let Some(tx) = ready_tx.take() {
441 let _ = tx.send(Ok(()));
442 }
443 cmd_check_interval = None;
445 }
446 Ok(_) => {
447 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
448 }
449 Err(e) => {
450 trace!("daemon {id} cmd check failed: {e}");
451 }
452 }
453 }
454 }
455 _ = async {
456 if let Some(ref mut timer) = delay_timer {
457 timer.await;
458 } else {
459 std::future::pending::<()>().await;
460 }
461 } => {
462 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
463 info!("daemon {id} ready: delay elapsed");
464 ready_notified = true;
465 let _ = log_appender.flush().await;
467 if let Some(tx) = ready_tx.take() {
468 let _ = tx.send(Ok(()));
469 }
470 }
471 delay_timer = None;
473 }
474 _ = log_flush_interval.tick() => {
475 if let Err(e) = log_appender.flush().await {
477 error!("Failed to flush log for daemon {id}: {e}");
478 }
479 }
480 }
483 }
484
485 if let Err(e) = log_appender.flush().await {
487 error!("Failed to final flush log for daemon {id}: {e}");
488 }
489
490 let exit_status = if let Some(status) = exit_status {
492 status
493 } else {
494 match exit_rx.recv().await {
496 Some(status) => status,
497 None => {
498 warn!("daemon {id} exit channel closed without receiving status");
499 Err(std::io::Error::other("exit channel closed"))
500 }
501 }
502 };
503 let current_daemon = SUPERVISOR.get_daemon(&id).await;
504
505 if current_daemon.is_none()
507 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
508 {
509 return;
511 }
512 let is_stopping = current_daemon
513 .as_ref()
514 .is_some_and(|d| d.status.is_stopping());
515
516 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
517 return;
519 }
520 if let Ok(status) = exit_status {
521 info!("daemon {id} exited with status {status}");
522 if status.success() || is_stopping {
523 if let Err(e) = SUPERVISOR
526 .upsert_daemon(UpsertDaemonOpts {
527 id: id.clone(),
528 pid: None, status: DaemonStatus::Stopped,
530 last_exit_success: Some(status.success()),
531 ..Default::default()
532 })
533 .await
534 {
535 error!("Failed to update daemon state for {id}: {e}");
536 }
537 } else {
538 let status = match status.code() {
541 Some(code) => DaemonStatus::Errored(code),
542 None => DaemonStatus::Errored(-1),
543 };
544 if let Err(e) = SUPERVISOR
545 .upsert_daemon(UpsertDaemonOpts {
546 id: id.clone(),
547 pid: None,
548 status,
549 last_exit_success: Some(false),
550 ..Default::default()
551 })
552 .await
553 {
554 error!("Failed to update daemon state for {id}: {e}");
555 }
556 }
557 } else if is_stopping {
558 if let Err(e) = SUPERVISOR
561 .upsert_daemon(UpsertDaemonOpts {
562 id: id.clone(),
563 pid: None,
564 status: DaemonStatus::Stopped,
565 last_exit_success: Some(true),
566 ..Default::default()
567 })
568 .await
569 {
570 error!("Failed to update daemon state for {id}: {e}");
571 }
572 } else if let Err(e) = SUPERVISOR
573 .upsert_daemon(UpsertDaemonOpts {
574 id: id.clone(),
575 pid: None,
576 status: DaemonStatus::Errored(-1),
577 last_exit_success: Some(false),
578 ..Default::default()
579 })
580 .await
581 {
582 error!("Failed to update daemon state for {id}: {e}");
583 }
584 });
585
586 if let Some(ready_rx) = ready_rx {
588 match ready_rx.await {
589 Ok(Ok(())) => {
590 info!("daemon {id} is ready");
591 Ok(IpcResponse::DaemonReady { daemon })
592 }
593 Ok(Err(exit_code)) => {
594 error!("daemon {id} failed before becoming ready");
595 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
596 }
597 Err(_) => {
598 error!("readiness channel closed unexpectedly for daemon {id}");
599 Ok(IpcResponse::DaemonStart { daemon })
600 }
601 }
602 } else {
603 Ok(IpcResponse::DaemonStart { daemon })
604 }
605 }
606
607 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
609 if id == "pitchfork" {
610 return Ok(IpcResponse::Error(
611 "Cannot stop supervisor via stop command".into(),
612 ));
613 }
614 info!("stopping daemon: {id}");
615 if let Some(daemon) = self.get_daemon(id).await {
616 trace!("daemon to stop: {daemon}");
617 if let Some(pid) = daemon.pid {
618 trace!("killing pid: {pid}");
619 PROCS.refresh_processes();
620 if PROCS.is_running(pid) {
621 self.upsert_daemon(UpsertDaemonOpts {
623 id: id.to_string(),
624 pid: Some(pid),
625 status: DaemonStatus::Stopping,
626 ..Default::default()
627 })
628 .await?;
629
630 if let Err(e) = PROCS.kill_process_group_async(pid).await {
633 debug!("failed to kill pid {pid}: {e}");
634 PROCS.refresh_processes();
636 if PROCS.is_running(pid) {
637 debug!("failed to stop pid {pid}: process still running after kill");
639 self.upsert_daemon(UpsertDaemonOpts {
640 id: id.to_string(),
641 pid: Some(pid), status: DaemonStatus::Running,
643 ..Default::default()
644 })
645 .await?;
646 return Ok(IpcResponse::DaemonStopFailed {
647 error: format!(
648 "process {pid} still running after kill attempt: {e}"
649 ),
650 });
651 }
652 }
653
654 self.upsert_daemon(UpsertDaemonOpts {
659 id: id.to_string(),
660 pid: None,
661 status: DaemonStatus::Stopped,
662 last_exit_success: Some(true), ..Default::default()
664 })
665 .await?;
666 } else {
667 debug!("pid {pid} not running, process may have exited unexpectedly");
668 self.upsert_daemon(UpsertDaemonOpts {
671 id: id.to_string(),
672 pid: None,
673 status: DaemonStatus::Stopped,
674 ..Default::default()
675 })
676 .await?;
677 return Ok(IpcResponse::DaemonWasNotRunning);
678 }
679 Ok(IpcResponse::Ok)
680 } else {
681 debug!("daemon {id} not running");
682 Ok(IpcResponse::DaemonNotRunning)
683 }
684 } else {
685 debug!("daemon {id} not found");
686 Ok(IpcResponse::DaemonNotFound)
687 }
688 }
689}