1use super::hooks::{HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::{IntoDiagnostic, WrapErr};
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22use std::iter::once;
23use std::net::TcpListener;
24use std::sync::atomic;
25use std::time::Duration;
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
27use tokio::select;
28use tokio::sync::oneshot;
29use tokio::time;
30
31static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
33 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
34
35pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
37 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
38 if let Some(re) = cache.get(pattern) {
39 return Some(re.clone());
40 }
41 match Regex::new(pattern) {
42 Ok(re) => {
43 cache.insert(pattern.to_string(), re.clone());
44 Some(re)
45 }
46 Err(e) => {
47 error!("invalid regex pattern '{pattern}': {e}");
48 None
49 }
50 }
51}
52
53impl Supervisor {
54 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
56 let id = &opts.id;
57 let cmd = opts.cmd.clone();
58
59 {
61 let mut pending = self.pending_autostops.lock().await;
62 if pending.remove(id).is_some() {
63 info!("cleared pending autostop for {id} (daemon starting)");
64 }
65 }
66
67 let daemon = self.get_daemon(id).await;
68 if let Some(daemon) = daemon {
69 if !daemon.status.is_stopping()
72 && !daemon.status.is_stopped()
73 && let Some(pid) = daemon.pid
74 {
75 if opts.force {
76 self.stop(id).await?;
77 info!("run: stop completed for daemon {id}");
78 } else {
79 warn!("daemon {id} already running with pid {pid}");
80 return Ok(IpcResponse::DaemonAlreadyRunning);
81 }
82 }
83 }
84
85 if opts.wait_ready && opts.retry > 0 {
87 let max_attempts = opts.retry.saturating_add(1);
89 for attempt in 0..max_attempts {
90 let mut retry_opts = opts.clone();
91 retry_opts.retry_count = attempt;
92 retry_opts.cmd = cmd.clone();
93
94 let result = self.run_once(retry_opts).await?;
95
96 match result {
97 IpcResponse::DaemonReady { daemon } => {
98 return Ok(IpcResponse::DaemonReady { daemon });
99 }
100 IpcResponse::DaemonFailedWithCode { exit_code } => {
101 if attempt < opts.retry {
102 let backoff_secs = 2u64.pow(attempt);
103 info!(
104 "daemon {id} failed (attempt {}/{}), retrying in {}s",
105 attempt + 1,
106 max_attempts,
107 backoff_secs
108 );
109 fire_hook(
110 HookType::OnRetry,
111 id.clone(),
112 opts.dir.clone(),
113 attempt + 1,
114 opts.env.clone(),
115 vec![],
116 )
117 .await;
118 time::sleep(Duration::from_secs(backoff_secs)).await;
119 continue;
120 } else {
121 info!("daemon {id} failed after {max_attempts} attempts");
122 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
123 }
124 }
125 other => return Ok(other),
126 }
127 }
128 }
129
130 self.run_once(opts).await
132 }
133
134 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
136 let id = &opts.id;
137 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
139
140 let (ready_tx, ready_rx) = if opts.wait_ready {
142 let (tx, rx) = oneshot::channel();
143 (Some(tx), Some(rx))
144 } else {
145 (None, None)
146 };
147
148 let expected_ports = opts.expected_port.clone();
150 let (resolved_ports, effective_ready_port) = if !opts.expected_port.is_empty() {
151 match check_ports_available(
152 &opts.expected_port,
153 opts.auto_bump_port,
154 opts.port_bump_attempts,
155 )
156 .await
157 {
158 Ok(resolved) => {
159 let ready_port = if let Some(configured_port) = opts.ready_port {
160 let bump_offset = resolved
162 .first()
163 .unwrap_or(&0)
164 .saturating_sub(*opts.expected_port.first().unwrap_or(&0));
165 if opts.expected_port.contains(&configured_port) && bump_offset > 0 {
166 configured_port
167 .checked_add(bump_offset)
168 .or(Some(configured_port))
169 } else {
170 Some(configured_port)
171 }
172 } else {
173 resolved.first().copied().filter(|&p| p != 0)
176 };
177 info!(
178 "daemon {id}: ports {:?} resolved to {:?}",
179 opts.expected_port, resolved
180 );
181 (resolved, ready_port)
182 }
183 Err(e) => {
184 error!("daemon {id}: port check failed: {e}");
185 if let Some(port_error) = e.downcast_ref::<PortError>() {
187 match port_error {
188 PortError::InUse { port, process, pid } => {
189 return Ok(IpcResponse::PortConflict {
190 port: *port,
191 process: process.clone(),
192 pid: *pid,
193 });
194 }
195 PortError::NoAvailablePort {
196 start_port,
197 attempts,
198 } => {
199 return Ok(IpcResponse::NoAvailablePort {
200 start_port: *start_port,
201 attempts: *attempts,
202 });
203 }
204 }
205 }
206 return Ok(IpcResponse::DaemonFailed {
207 error: e.to_string(),
208 });
209 }
210 }
211 } else {
212 (Vec::new(), opts.ready_port)
213 };
214
215 let cmd: Vec<String> = if opts.mise {
216 match settings().resolve_mise_bin() {
217 Some(mise_bin) => {
218 let mise_bin_str = mise_bin.to_string_lossy().to_string();
219 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
220 once("exec".to_string())
221 .chain(once(mise_bin_str))
222 .chain(once("x".to_string()))
223 .chain(once("--".to_string()))
224 .chain(cmd)
225 .collect_vec()
226 }
227 None => {
228 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
229 once("exec".to_string()).chain(cmd).collect_vec()
230 }
231 }
232 } else {
233 once("exec".to_string()).chain(cmd).collect_vec()
234 };
235 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
236 let log_path = id.log_path();
237 if let Some(parent) = log_path.parent() {
238 xx::file::mkdirp(parent)?;
239 }
240 info!("run: spawning daemon {id} with args: {args:?}");
241 let mut cmd = tokio::process::Command::new("sh");
242 cmd.args(&args)
243 .stdin(std::process::Stdio::null())
244 .stdout(std::process::Stdio::piped())
245 .stderr(std::process::Stdio::piped())
246 .current_dir(&opts.dir);
247
248 if let Some(ref path) = *env::ORIGINAL_PATH {
250 cmd.env("PATH", path);
251 }
252
253 if let Some(ref env_vars) = opts.env {
255 cmd.envs(env_vars);
256 }
257
258 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
260 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
261 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
262
263 if !resolved_ports.is_empty() {
265 cmd.env("PORT", resolved_ports[0].to_string());
269 for (i, port) in resolved_ports.iter().enumerate() {
271 cmd.env(format!("PORT{}", i), port.to_string());
272 }
273 }
274
275 #[cfg(unix)]
276 {
277 unsafe {
278 cmd.pre_exec(move || {
279 if libc::setsid() == -1 {
280 return Err(std::io::Error::last_os_error());
281 }
282 Ok(())
283 });
284 }
285 }
286
287 let mut child = cmd.spawn().into_diagnostic()?;
288 let pid = match child.id() {
289 Some(p) => p,
290 None => {
291 warn!("Daemon {id} exited before PID could be captured");
292 return Ok(IpcResponse::DaemonFailed {
293 error: "Process exited immediately".to_string(),
294 });
295 }
296 };
297 info!("started daemon {id} with pid {pid}");
298 let daemon = self
299 .upsert_daemon(
300 UpsertDaemonOpts::builder(id.clone())
301 .set(|o| {
302 o.pid = Some(pid);
303 o.status = DaemonStatus::Running;
304 o.shell_pid = opts.shell_pid;
305 o.dir = Some(opts.dir.clone());
306 o.cmd = Some(original_cmd);
307 o.autostop = opts.autostop;
308 o.cron_schedule = opts.cron_schedule.clone();
309 o.cron_retrigger = opts.cron_retrigger;
310 o.retry = Some(opts.retry);
311 o.retry_count = Some(opts.retry_count);
312 o.ready_delay = opts.ready_delay;
313 o.ready_output = opts.ready_output.clone();
314 o.ready_http = opts.ready_http.clone();
315 o.ready_port = effective_ready_port;
316 o.ready_cmd = opts.ready_cmd.clone();
317 o.expected_port = expected_ports;
318 o.resolved_port = resolved_ports;
319 o.auto_bump_port = Some(opts.auto_bump_port);
320 o.port_bump_attempts = Some(opts.port_bump_attempts);
321 o.depends = Some(opts.depends.clone());
322 o.env = opts.env.clone();
323 o.watch = Some(opts.watch.clone());
324 o.watch_base_dir = opts.watch_base_dir.clone();
325 o.mise = Some(opts.mise);
326 o.memory_limit = opts.memory_limit;
327 o.cpu_limit = opts.cpu_limit;
328 })
329 .build(),
330 )
331 .await?;
332
333 let id_clone = id.clone();
334 let ready_delay = opts.ready_delay;
335 let ready_output = opts.ready_output.clone();
336 let ready_http = opts.ready_http.clone();
337 let ready_port = effective_ready_port;
338 let ready_cmd = opts.ready_cmd.clone();
339 let daemon_dir = opts.dir.clone();
340 let hook_retry_count = opts.retry_count;
341 let hook_retry = opts.retry;
342 let hook_daemon_env = opts.env.clone();
343
344 tokio::spawn(async move {
345 let id = id_clone;
346 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
347 (Some(out), Some(err)) => (out, err),
348 _ => {
349 error!("Failed to capture stdout/stderr for daemon {id}");
350 return;
351 }
352 };
353 let mut stdout = tokio::io::BufReader::new(stdout).lines();
354 let mut stderr = tokio::io::BufReader::new(stderr).lines();
355 let log_file = match tokio::fs::File::options()
356 .append(true)
357 .create(true)
358 .open(&log_path)
359 .await
360 {
361 Ok(f) => f,
362 Err(e) => {
363 error!("Failed to open log file for daemon {id}: {e}");
364 return;
365 }
366 };
367 let mut log_appender = BufWriter::new(log_file);
368
369 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
370 let format_line = |line: String| {
371 if line.starts_with(&format!("{id} ")) {
372 format!("{} {line}\n", now())
374 } else {
375 format!("{} {id} {line}\n", now())
376 }
377 };
378
379 let mut ready_notified = false;
381 let mut ready_tx = ready_tx;
382 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
383
384 let mut delay_timer =
385 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
386
387 let s = settings();
389 let ready_check_interval = s.supervisor_ready_check_interval();
390 let http_client_timeout = s.supervisor_http_client_timeout();
391 let log_flush_interval_duration = s.supervisor_log_flush_interval();
392
393 let mut http_check_interval = ready_http
395 .as_ref()
396 .map(|_| tokio::time::interval(ready_check_interval));
397 let http_client = ready_http.as_ref().map(|_| {
398 reqwest::Client::builder()
399 .timeout(http_client_timeout)
400 .build()
401 .unwrap_or_default()
402 });
403
404 let mut port_check_interval =
406 ready_port.map(|_| tokio::time::interval(ready_check_interval));
407
408 let mut cmd_check_interval = ready_cmd
410 .as_ref()
411 .map(|_| tokio::time::interval(ready_check_interval));
412
413 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
415
416 let (exit_tx, mut exit_rx) =
418 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
419
420 let child_pid = child.id().unwrap_or(0);
422 tokio::spawn(async move {
423 let result = child.wait().await;
424 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
425 let _ = exit_tx.send(result).await;
426 });
427
428 #[allow(unused_assignments)]
429 let mut exit_status = None;
431
432 loop {
433 select! {
434 Ok(Some(line)) = stdout.next_line() => {
435 let formatted = format_line(line.clone());
436 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
437 error!("Failed to write to log for daemon {id}: {e}");
438 }
439 trace!("stdout: {id} {formatted}");
440
441 if !ready_notified
443 && let Some(ref pattern) = ready_pattern
444 && pattern.is_match(&line) {
445 info!("daemon {id} ready: output matched pattern");
446 ready_notified = true;
447 let _ = log_appender.flush().await;
449 if let Some(tx) = ready_tx.take() {
450 let _ = tx.send(Ok(()));
451 }
452 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
453 }
454 }
455 Ok(Some(line)) = stderr.next_line() => {
456 let formatted = format_line(line.clone());
457 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
458 error!("Failed to write to log for daemon {id}: {e}");
459 }
460 trace!("stderr: {id} {formatted}");
461
462 if !ready_notified
464 && let Some(ref pattern) = ready_pattern
465 && pattern.is_match(&line) {
466 info!("daemon {id} ready: output matched pattern");
467 ready_notified = true;
468 let _ = log_appender.flush().await;
470 if let Some(tx) = ready_tx.take() {
471 let _ = tx.send(Ok(()));
472 }
473 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
474 }
475 },
476 Some(result) = exit_rx.recv() => {
477 exit_status = Some(result);
479 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
480 let _ = log_appender.flush().await;
482 if !ready_notified {
483 if let Some(tx) = ready_tx.take() {
484 let is_success = exit_status.as_ref()
486 .and_then(|r| r.as_ref().ok())
487 .map(|s| s.success())
488 .unwrap_or(false);
489
490 if is_success {
491 debug!("daemon {id} exited successfully before ready check, sending success notification");
492 let _ = tx.send(Ok(()));
493 } else {
494 let exit_code = exit_status.as_ref()
495 .and_then(|r| r.as_ref().ok())
496 .and_then(|s| s.code());
497 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
498 let _ = tx.send(Err(exit_code));
499 }
500 }
501 } else {
502 debug!("daemon {id} was already marked ready, not sending notification");
503 }
504 break;
505 }
506 _ = async {
507 if let Some(ref mut interval) = http_check_interval {
508 interval.tick().await;
509 } else {
510 std::future::pending::<()>().await;
511 }
512 }, if !ready_notified && ready_http.is_some() => {
513 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
514 match client.get(url).send().await {
515 Ok(response) if response.status().is_success() => {
516 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
517 ready_notified = true;
518 let _ = log_appender.flush().await;
520 if let Some(tx) = ready_tx.take() {
521 let _ = tx.send(Ok(()));
522 }
523 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
524 http_check_interval = None;
526 }
527 Ok(response) => {
528 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
529 }
530 Err(e) => {
531 trace!("daemon {id} HTTP check failed: {e}");
532 }
533 }
534 }
535 }
536 _ = async {
537 if let Some(ref mut interval) = port_check_interval {
538 interval.tick().await;
539 } else {
540 std::future::pending::<()>().await;
541 }
542 }, if !ready_notified && ready_port.is_some() => {
543 if let Some(port) = ready_port {
544 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
545 Ok(_) => {
546 info!("daemon {id} ready: TCP port {port} is listening");
547 ready_notified = true;
548 let _ = log_appender.flush().await;
550 if let Some(tx) = ready_tx.take() {
551 let _ = tx.send(Ok(()));
552 }
553 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
554 port_check_interval = None;
556 }
557 Err(_) => {
558 trace!("daemon {id} port check: port {port} not listening yet");
559 }
560 }
561 }
562 }
563 _ = async {
564 if let Some(ref mut interval) = cmd_check_interval {
565 interval.tick().await;
566 } else {
567 std::future::pending::<()>().await;
568 }
569 }, if !ready_notified && ready_cmd.is_some() => {
570 if let Some(ref cmd) = ready_cmd {
571 let mut command = Shell::default_for_platform().command(cmd);
573 command
574 .current_dir(&daemon_dir)
575 .stdout(std::process::Stdio::null())
576 .stderr(std::process::Stdio::null());
577 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
578 match result {
579 Ok(status) if status.success() => {
580 info!("daemon {id} ready: readiness command succeeded");
581 ready_notified = true;
582 let _ = log_appender.flush().await;
583 if let Some(tx) = ready_tx.take() {
584 let _ = tx.send(Ok(()));
585 }
586 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
587 cmd_check_interval = None;
589 }
590 Ok(_) => {
591 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
592 }
593 Err(e) => {
594 trace!("daemon {id} cmd check failed: {e}");
595 }
596 }
597 }
598 }
599 _ = async {
600 if let Some(ref mut timer) = delay_timer {
601 timer.await;
602 } else {
603 std::future::pending::<()>().await;
604 }
605 } => {
606 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
607 info!("daemon {id} ready: delay elapsed");
608 ready_notified = true;
609 let _ = log_appender.flush().await;
611 if let Some(tx) = ready_tx.take() {
612 let _ = tx.send(Ok(()));
613 }
614 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
615 }
616 delay_timer = None;
618 }
619 _ = log_flush_interval.tick() => {
620 if let Err(e) = log_appender.flush().await {
622 error!("Failed to flush log for daemon {id}: {e}");
623 }
624 }
625 }
628 }
629
630 if let Err(e) = log_appender.flush().await {
632 error!("Failed to final flush log for daemon {id}: {e}");
633 }
634
635 let exit_status = if let Some(status) = exit_status {
637 status
638 } else {
639 match exit_rx.recv().await {
641 Some(status) => status,
642 None => {
643 warn!("daemon {id} exit channel closed without receiving status");
644 Err(std::io::Error::other("exit channel closed"))
645 }
646 }
647 };
648 let current_daemon = SUPERVISOR.get_daemon(&id).await;
649
650 SUPERVISOR
655 .active_monitors
656 .fetch_add(1, atomic::Ordering::Release);
657 struct MonitorGuard;
658 impl Drop for MonitorGuard {
659 fn drop(&mut self) {
660 SUPERVISOR
661 .active_monitors
662 .fetch_sub(1, atomic::Ordering::Release);
663 SUPERVISOR.monitor_done.notify_waiters();
664 }
665 }
666 let _monitor_guard = MonitorGuard;
667 if current_daemon.is_none()
672 || current_daemon.as_ref().is_some_and(|d| {
673 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
674 })
675 {
676 return;
678 }
679 let already_stopped = current_daemon
684 .as_ref()
685 .is_some_and(|d| d.status.is_stopped());
686 let is_stopping = already_stopped
687 || current_daemon
688 .as_ref()
689 .is_some_and(|d| d.status.is_stopping());
690
691 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
693 (Ok(status), true) => {
694 (status.code().unwrap_or(-1), "stop")
698 }
699 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
700 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
701 (Err(_), true) => {
702 (-1, "stop")
704 }
705 (Err(_), false) => (-1, "fail"),
706 };
707
708 if !already_stopped {
710 if let Ok(status) = &exit_status {
711 info!("daemon {id} exited with status {status}");
712 }
713 let (new_status, last_exit_success) = match exit_reason {
714 "stop" | "exit" => (
715 DaemonStatus::Stopped,
716 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
717 ),
718 _ => (DaemonStatus::Errored(exit_code), false),
719 };
720 if let Err(e) = SUPERVISOR
721 .upsert_daemon(
722 UpsertDaemonOpts::builder(id.clone())
723 .set(|o| {
724 o.pid = None;
725 o.status = new_status;
726 o.last_exit_success = Some(last_exit_success);
727 })
728 .build(),
729 )
730 .await
731 {
732 error!("Failed to update daemon state for {id}: {e}");
733 }
734 }
735
736 let hook_extra_env = vec![
738 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
739 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
740 ];
741
742 let hooks_to_fire: Vec<HookType> = match exit_reason {
744 "stop" => vec![HookType::OnStop, HookType::OnExit],
745 "exit" => vec![HookType::OnExit],
746 _ if hook_retry_count >= hook_retry => {
748 vec![HookType::OnFail, HookType::OnExit]
749 }
750 _ => vec![],
751 };
752
753 for hook_type in hooks_to_fire {
754 fire_hook(
755 hook_type,
756 id.clone(),
757 daemon_dir.clone(),
758 hook_retry_count,
759 hook_daemon_env.clone(),
760 hook_extra_env.clone(),
761 )
762 .await;
763 }
764 });
765
766 if let Some(ready_rx) = ready_rx {
768 match ready_rx.await {
769 Ok(Ok(())) => {
770 info!("daemon {id} is ready");
771 Ok(IpcResponse::DaemonReady { daemon })
772 }
773 Ok(Err(exit_code)) => {
774 error!("daemon {id} failed before becoming ready");
775 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
776 }
777 Err(_) => {
778 error!("readiness channel closed unexpectedly for daemon {id}");
779 Ok(IpcResponse::DaemonStart { daemon })
780 }
781 }
782 } else {
783 Ok(IpcResponse::DaemonStart { daemon })
784 }
785 }
786
787 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
789 let pitchfork_id = DaemonId::pitchfork();
790 if *id == pitchfork_id {
791 return Ok(IpcResponse::Error(
792 "Cannot stop supervisor via stop command".into(),
793 ));
794 }
795 info!("stopping daemon: {id}");
796 if let Some(daemon) = self.get_daemon(id).await {
797 trace!("daemon to stop: {daemon}");
798 if let Some(pid) = daemon.pid {
799 trace!("killing pid: {pid}");
800 PROCS.refresh_processes();
801 if PROCS.is_running(pid) {
802 self.upsert_daemon(
804 UpsertDaemonOpts::builder(id.clone())
805 .set(|o| {
806 o.pid = Some(pid);
807 o.status = DaemonStatus::Stopping;
808 })
809 .build(),
810 )
811 .await?;
812
813 if let Err(e) = PROCS.kill_process_group_async(pid).await {
816 debug!("failed to kill pid {pid}: {e}");
817 PROCS.refresh_processes();
819 if PROCS.is_running(pid) {
820 debug!("failed to stop pid {pid}: process still running after kill");
822 self.upsert_daemon(
823 UpsertDaemonOpts::builder(id.clone())
824 .set(|o| {
825 o.pid = Some(pid); o.status = DaemonStatus::Running;
827 })
828 .build(),
829 )
830 .await?;
831 return Ok(IpcResponse::DaemonStopFailed {
832 error: format!(
833 "process {pid} still running after kill attempt: {e}"
834 ),
835 });
836 }
837 }
838
839 self.upsert_daemon(
844 UpsertDaemonOpts::builder(id.clone())
845 .set(|o| {
846 o.pid = None;
847 o.status = DaemonStatus::Stopped;
848 o.last_exit_success = Some(true); })
850 .build(),
851 )
852 .await?;
853 } else {
854 debug!("pid {pid} not running, process may have exited unexpectedly");
855 self.upsert_daemon(
858 UpsertDaemonOpts::builder(id.clone())
859 .set(|o| {
860 o.pid = None;
861 o.status = DaemonStatus::Stopped;
862 })
863 .build(),
864 )
865 .await?;
866 return Ok(IpcResponse::DaemonWasNotRunning);
867 }
868 Ok(IpcResponse::Ok)
869 } else {
870 debug!("daemon {id} not running");
871 Ok(IpcResponse::DaemonNotRunning)
872 }
873 } else {
874 debug!("daemon {id} not found");
875 Ok(IpcResponse::DaemonNotFound)
876 }
877 }
878}
879
880async fn check_ports_available(
887 expected_ports: &[u16],
888 auto_bump: bool,
889 max_attempts: u32,
890) -> Result<Vec<u16>> {
891 if expected_ports.is_empty() {
892 return Ok(Vec::new());
893 }
894
895 for bump_offset in 0..=max_attempts {
896 let candidate_ports: Vec<u16> = expected_ports
898 .iter()
899 .map(|&p| p.wrapping_add(bump_offset as u16))
900 .collect();
901
902 let mut all_available = true;
904 let mut conflicting_port = None;
905
906 for &port in &candidate_ports {
907 if port == 0 {
910 continue;
911 }
912
913 let port_check =
921 tokio::task::spawn_blocking(move || match TcpListener::bind(("0.0.0.0", port)) {
922 Ok(listener) => {
923 drop(listener);
924 true
925 }
926 Err(_) => false,
927 })
928 .await
929 .into_diagnostic()
930 .wrap_err("failed to check port availability")?;
931
932 if !port_check {
933 all_available = false;
934 conflicting_port = Some(port);
935 break;
936 }
937 }
938
939 if all_available {
940 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
944 return Err(PortError::NoAvailablePort {
945 start_port: expected_ports[0],
946 attempts: bump_offset + 1,
947 }
948 .into());
949 }
950 if bump_offset > 0 {
951 info!(
952 "ports {:?} bumped by {} to {:?}",
953 expected_ports, bump_offset, candidate_ports
954 );
955 }
956 return Ok(candidate_ports);
957 }
958
959 if bump_offset == 0 {
961 if let Some(port) = conflicting_port {
963 if let Some((pid, process_name)) = get_process_using_port(port).await {
964 if !auto_bump {
965 return Err(PortError::InUse {
966 port,
967 process: process_name,
968 pid,
969 }
970 .into());
971 }
972 } else if !auto_bump {
973 return Err(PortError::InUse {
975 port,
976 process: "unknown".to_string(),
977 pid: 0,
978 }
979 .into());
980 }
981 }
982 }
983 }
984
985 Err(PortError::NoAvailablePort {
987 start_port: expected_ports[0],
988 attempts: max_attempts + 1,
989 }
990 .into())
991}
992
993async fn get_process_using_port(port: u16) -> Option<(u32, String)> {
997 tokio::task::spawn_blocking(move || {
998 listeners::get_all()
999 .ok()?
1000 .into_iter()
1001 .find(|listener| listener.socket.port() == port)
1002 .map(|listener| (listener.process.pid, listener.process.name))
1003 })
1004 .await
1005 .ok()
1006 .flatten()
1007}