pitchfork_cli/supervisor/
lifecycle.rs1use super::{SUPERVISOR, Supervisor, UpsertDaemonOpts};
6use crate::daemon::RunOptions;
7use crate::daemon_status::DaemonStatus;
8use crate::ipc::IpcResponse;
9use crate::procs::PROCS;
10use crate::{Result, env};
11use itertools::Itertools;
12use miette::IntoDiagnostic;
13use once_cell::sync::Lazy;
14use regex::Regex;
15use std::collections::HashMap;
16use std::iter::once;
17use std::time::Duration;
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
19use tokio::select;
20use tokio::sync::oneshot;
21use tokio::time;
22
23static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
25 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
26
27pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
29 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
30 if let Some(re) = cache.get(pattern) {
31 return Some(re.clone());
32 }
33 match Regex::new(pattern) {
34 Ok(re) => {
35 cache.insert(pattern.to_string(), re.clone());
36 Some(re)
37 }
38 Err(e) => {
39 error!("invalid regex pattern '{}': {}", pattern, e);
40 None
41 }
42 }
43}
44
45impl Supervisor {
46 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
48 let id = &opts.id;
49 let cmd = opts.cmd.clone();
50
51 {
53 let mut pending = self.pending_autostops.lock().await;
54 if pending.remove(id).is_some() {
55 info!("cleared pending autostop for {} (daemon starting)", id);
56 }
57 }
58
59 let daemon = self.get_daemon(id).await;
60 if let Some(daemon) = daemon {
61 if !daemon.status.is_stopping()
64 && !daemon.status.is_stopped()
65 && let Some(pid) = daemon.pid
66 {
67 if opts.force {
68 self.stop(id).await?;
69 info!("run: stop completed for daemon {id}");
70 } else {
71 warn!("daemon {id} already running with pid {pid}");
72 return Ok(IpcResponse::DaemonAlreadyRunning);
73 }
74 }
75 }
76
77 if opts.wait_ready && opts.retry > 0 {
79 let max_attempts = opts.retry.saturating_add(1);
81 for attempt in 0..max_attempts {
82 let mut retry_opts = opts.clone();
83 retry_opts.retry_count = attempt;
84 retry_opts.cmd = cmd.clone();
85
86 let result = self.run_once(retry_opts).await?;
87
88 match result {
89 IpcResponse::DaemonReady { daemon } => {
90 return Ok(IpcResponse::DaemonReady { daemon });
91 }
92 IpcResponse::DaemonFailedWithCode { exit_code } => {
93 if attempt < opts.retry {
94 let backoff_secs = 2u64.pow(attempt);
95 info!(
96 "daemon {id} failed (attempt {}/{}), retrying in {}s",
97 attempt + 1,
98 max_attempts,
99 backoff_secs
100 );
101 time::sleep(Duration::from_secs(backoff_secs)).await;
102 continue;
103 } else {
104 info!("daemon {id} failed after {} attempts", max_attempts);
105 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
106 }
107 }
108 other => return Ok(other),
109 }
110 }
111 }
112
113 self.run_once(opts).await
115 }
116
117 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
119 let id = &opts.id;
120 let cmd = opts.cmd;
121
122 let (ready_tx, ready_rx) = if opts.wait_ready {
124 let (tx, rx) = oneshot::channel();
125 (Some(tx), Some(rx))
126 } else {
127 (None, None)
128 };
129
130 let cmd = once("exec".to_string())
131 .chain(cmd.into_iter())
132 .collect_vec();
133 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
134 let log_path = env::PITCHFORK_LOGS_DIR.join(id).join(format!("{id}.log"));
135 if let Some(parent) = log_path.parent() {
136 xx::file::mkdirp(parent)?;
137 }
138 info!("run: spawning daemon {id} with args: {args:?}");
139 let mut cmd = tokio::process::Command::new("sh");
140 cmd.args(&args)
141 .stdin(std::process::Stdio::null())
142 .stdout(std::process::Stdio::piped())
143 .stderr(std::process::Stdio::piped())
144 .current_dir(&opts.dir);
145
146 if let Some(ref path) = *env::ORIGINAL_PATH {
148 cmd.env("PATH", path);
149 }
150
151 let mut child = cmd.spawn().into_diagnostic()?;
152 let pid = match child.id() {
153 Some(p) => p,
154 None => {
155 warn!("Daemon {id} exited before PID could be captured");
156 return Ok(IpcResponse::DaemonFailed {
157 error: "Process exited immediately".to_string(),
158 });
159 }
160 };
161 info!("started daemon {id} with pid {pid}");
162 let daemon = self
163 .upsert_daemon(UpsertDaemonOpts {
164 id: id.to_string(),
165 pid: Some(pid),
166 status: DaemonStatus::Running,
167 shell_pid: opts.shell_pid,
168 dir: Some(opts.dir.clone()),
169 autostop: opts.autostop,
170 cron_schedule: opts.cron_schedule.clone(),
171 cron_retrigger: opts.cron_retrigger,
172 last_exit_success: None,
173 retry: Some(opts.retry),
174 retry_count: Some(opts.retry_count),
175 ready_delay: opts.ready_delay,
176 ready_output: opts.ready_output.clone(),
177 ready_http: opts.ready_http.clone(),
178 ready_port: opts.ready_port,
179 depends: Some(opts.depends.clone()),
180 })
181 .await?;
182
183 let id_clone = id.to_string();
184 let ready_delay = opts.ready_delay;
185 let ready_output = opts.ready_output.clone();
186 let ready_http = opts.ready_http.clone();
187 let ready_port = opts.ready_port;
188
189 tokio::spawn(async move {
190 let id = id_clone;
191 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
192 (Some(out), Some(err)) => (out, err),
193 _ => {
194 error!("Failed to capture stdout/stderr for daemon {id}");
195 return;
196 }
197 };
198 let mut stdout = tokio::io::BufReader::new(stdout).lines();
199 let mut stderr = tokio::io::BufReader::new(stderr).lines();
200 let log_file = match tokio::fs::File::options()
201 .append(true)
202 .create(true)
203 .open(&log_path)
204 .await
205 {
206 Ok(f) => f,
207 Err(e) => {
208 error!("Failed to open log file for daemon {id}: {e}");
209 return;
210 }
211 };
212 let mut log_appender = BufWriter::new(log_file);
213
214 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
215 let format_line = |line: String| {
216 if line.starts_with(&format!("{id} ")) {
217 format!("{} {line}\n", now())
219 } else {
220 format!("{} {id} {line}\n", now())
221 }
222 };
223
224 let mut ready_notified = false;
226 let mut ready_tx = ready_tx;
227 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
228
229 let mut delay_timer =
230 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
231
232 let mut http_check_interval = ready_http
234 .as_ref()
235 .map(|_| tokio::time::interval(Duration::from_millis(500)));
236 let http_client = ready_http.as_ref().map(|_| {
237 reqwest::Client::builder()
238 .timeout(Duration::from_secs(5))
239 .build()
240 .unwrap_or_default()
241 });
242
243 let mut port_check_interval =
245 ready_port.map(|_| tokio::time::interval(Duration::from_millis(500)));
246
247 let mut log_flush_interval = tokio::time::interval(Duration::from_millis(500));
249
250 let (exit_tx, mut exit_rx) =
252 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
253
254 let child_pid = child.id().unwrap_or(0);
256 tokio::spawn(async move {
257 let result = child.wait().await;
258 debug!(
259 "daemon pid {child_pid} wait() completed with result: {:?}",
260 result
261 );
262 let _ = exit_tx.send(result).await;
263 });
264
265 #[allow(unused_assignments)]
266 let mut exit_status = None;
268
269 loop {
270 select! {
271 Ok(Some(line)) = stdout.next_line() => {
272 let formatted = format_line(line.clone());
273 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
274 error!("Failed to write to log for daemon {id}: {e}");
275 }
276 trace!("stdout: {id} {formatted}");
277
278 if !ready_notified
280 && let Some(ref pattern) = ready_pattern
281 && pattern.is_match(&line) {
282 info!("daemon {id} ready: output matched pattern");
283 ready_notified = true;
284 let _ = log_appender.flush().await;
286 if let Some(tx) = ready_tx.take() {
287 let _ = tx.send(Ok(()));
288 }
289 }
290 }
291 Ok(Some(line)) = stderr.next_line() => {
292 let formatted = format_line(line.clone());
293 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
294 error!("Failed to write to log for daemon {id}: {e}");
295 }
296 trace!("stderr: {id} {formatted}");
297
298 if !ready_notified
300 && let Some(ref pattern) = ready_pattern
301 && pattern.is_match(&line) {
302 info!("daemon {id} ready: output matched pattern");
303 ready_notified = true;
304 let _ = log_appender.flush().await;
306 if let Some(tx) = ready_tx.take() {
307 let _ = tx.send(Ok(()));
308 }
309 }
310 },
311 Some(result) = exit_rx.recv() => {
312 exit_status = Some(result);
314 debug!("daemon {id} process exited, exit_status: {:?}", exit_status);
315 let _ = log_appender.flush().await;
317 if !ready_notified {
318 if let Some(tx) = ready_tx.take() {
319 let is_success = exit_status.as_ref()
321 .and_then(|r| r.as_ref().ok())
322 .map(|s| s.success())
323 .unwrap_or(false);
324
325 if is_success {
326 debug!("daemon {id} exited successfully before ready check, sending success notification");
327 let _ = tx.send(Ok(()));
328 } else {
329 let exit_code = exit_status.as_ref()
330 .and_then(|r| r.as_ref().ok())
331 .and_then(|s| s.code());
332 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {:?}", exit_code);
333 let _ = tx.send(Err(exit_code));
334 }
335 }
336 } else {
337 debug!("daemon {id} was already marked ready, not sending notification");
338 }
339 break;
340 }
341 _ = async {
342 if let Some(ref mut interval) = http_check_interval {
343 interval.tick().await;
344 } else {
345 std::future::pending::<()>().await;
346 }
347 }, if !ready_notified && ready_http.is_some() => {
348 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
349 match client.get(url).send().await {
350 Ok(response) if response.status().is_success() => {
351 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
352 ready_notified = true;
353 let _ = log_appender.flush().await;
355 if let Some(tx) = ready_tx.take() {
356 let _ = tx.send(Ok(()));
357 }
358 http_check_interval = None;
360 }
361 Ok(response) => {
362 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
363 }
364 Err(e) => {
365 trace!("daemon {id} HTTP check failed: {e}");
366 }
367 }
368 }
369 }
370 _ = async {
371 if let Some(ref mut interval) = port_check_interval {
372 interval.tick().await;
373 } else {
374 std::future::pending::<()>().await;
375 }
376 }, if !ready_notified && ready_port.is_some() => {
377 if let Some(port) = ready_port {
378 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
379 Ok(_) => {
380 info!("daemon {id} ready: TCP port {port} is listening");
381 ready_notified = true;
382 let _ = log_appender.flush().await;
384 if let Some(tx) = ready_tx.take() {
385 let _ = tx.send(Ok(()));
386 }
387 port_check_interval = None;
389 }
390 Err(_) => {
391 trace!("daemon {id} port check: port {port} not listening yet");
392 }
393 }
394 }
395 }
396 _ = async {
397 if let Some(ref mut timer) = delay_timer {
398 timer.await;
399 } else {
400 std::future::pending::<()>().await;
401 }
402 } => {
403 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() {
404 info!("daemon {id} ready: delay elapsed");
405 ready_notified = true;
406 let _ = log_appender.flush().await;
408 if let Some(tx) = ready_tx.take() {
409 let _ = tx.send(Ok(()));
410 }
411 }
412 delay_timer = None;
414 }
415 _ = log_flush_interval.tick() => {
416 if let Err(e) = log_appender.flush().await {
418 error!("Failed to flush log for daemon {id}: {e}");
419 }
420 }
421 }
424 }
425
426 if let Err(e) = log_appender.flush().await {
428 error!("Failed to final flush log for daemon {id}: {e}");
429 }
430
431 let exit_status = if let Some(status) = exit_status {
433 status
434 } else {
435 match exit_rx.recv().await {
437 Some(status) => status,
438 None => {
439 warn!("daemon {id} exit channel closed without receiving status");
440 Err(std::io::Error::other("exit channel closed"))
441 }
442 }
443 };
444 let current_daemon = SUPERVISOR.get_daemon(&id).await;
445
446 if current_daemon.is_none()
448 || current_daemon.as_ref().is_some_and(|d| d.pid != Some(pid))
449 {
450 return;
452 }
453 let is_stopping = current_daemon
454 .as_ref()
455 .is_some_and(|d| d.status.is_stopping());
456
457 if current_daemon.is_some_and(|d| d.status.is_stopped()) {
458 return;
460 }
461 if let Ok(status) = exit_status {
462 info!("daemon {id} exited with status {status}");
463 if status.success() || is_stopping {
464 if let Err(e) = SUPERVISOR
467 .upsert_daemon(UpsertDaemonOpts {
468 id: id.clone(),
469 pid: None, status: DaemonStatus::Stopped,
471 last_exit_success: Some(status.success()),
472 ..Default::default()
473 })
474 .await
475 {
476 error!("Failed to update daemon state for {id}: {e}");
477 }
478 } else {
479 if let Err(e) = SUPERVISOR
482 .upsert_daemon(UpsertDaemonOpts {
483 id: id.clone(),
484 pid: None,
485 status: DaemonStatus::Errored(status.code()),
486 last_exit_success: Some(false),
487 ..Default::default()
488 })
489 .await
490 {
491 error!("Failed to update daemon state for {id}: {e}");
492 }
493 }
494 } else if let Err(e) = SUPERVISOR
495 .upsert_daemon(UpsertDaemonOpts {
496 id: id.clone(),
497 pid: None,
498 status: DaemonStatus::Errored(None),
499 last_exit_success: Some(false),
500 ..Default::default()
501 })
502 .await
503 {
504 error!("Failed to update daemon state for {id}: {e}");
505 }
506 });
507
508 if let Some(ready_rx) = ready_rx {
510 match ready_rx.await {
511 Ok(Ok(())) => {
512 info!("daemon {id} is ready");
513 Ok(IpcResponse::DaemonReady { daemon })
514 }
515 Ok(Err(exit_code)) => {
516 error!("daemon {id} failed before becoming ready");
517 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
518 }
519 Err(_) => {
520 error!("readiness channel closed unexpectedly for daemon {id}");
521 Ok(IpcResponse::DaemonStart { daemon })
522 }
523 }
524 } else {
525 Ok(IpcResponse::DaemonStart { daemon })
526 }
527 }
528
529 pub async fn stop(&self, id: &str) -> Result<IpcResponse> {
531 if id == "pitchfork" {
532 return Ok(IpcResponse::Error(
533 "Cannot stop supervisor via stop command".into(),
534 ));
535 }
536 info!("stopping daemon: {id}");
537 if let Some(daemon) = self.get_daemon(id).await {
538 trace!("daemon to stop: {daemon}");
539 if let Some(pid) = daemon.pid {
540 trace!("killing pid: {pid}");
541 PROCS.refresh_processes();
542 if PROCS.is_running(pid) {
543 self.upsert_daemon(UpsertDaemonOpts {
545 id: id.to_string(),
546 status: DaemonStatus::Stopping,
547 ..Default::default()
548 })
549 .await?;
550
551 if let Err(e) = PROCS.kill_async(pid).await {
553 warn!("failed to kill pid {pid}: {e}");
554 }
555 PROCS.refresh_processes();
556 for child_pid in PROCS.all_children(pid) {
557 debug!("killing child pid: {child_pid}");
558 if let Err(e) = PROCS.kill_async(child_pid).await {
559 warn!("failed to kill child pid {child_pid}: {e}");
560 }
561 }
562 } else {
564 debug!("pid {pid} not running");
565 self.upsert_daemon(UpsertDaemonOpts {
567 id: id.to_string(),
568 pid: None,
569 status: DaemonStatus::Stopped,
570 ..Default::default()
571 })
572 .await?;
573 }
574 return Ok(IpcResponse::Ok);
575 } else {
576 debug!("daemon {id} not running");
577 }
578 } else {
579 debug!("daemon {id} not found");
580 }
581 Ok(IpcResponse::DaemonAlreadyStopped)
582 }
583}