1use super::hooks::{HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::{IntoDiagnostic, WrapErr};
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22use std::iter::once;
23use std::net::TcpListener;
24use std::sync::atomic;
25use std::time::Duration;
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
27use tokio::select;
28use tokio::sync::oneshot;
29use tokio::time;
30
31static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
33 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
34
35pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
37 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
38 if let Some(re) = cache.get(pattern) {
39 return Some(re.clone());
40 }
41 match Regex::new(pattern) {
42 Ok(re) => {
43 cache.insert(pattern.to_string(), re.clone());
44 Some(re)
45 }
46 Err(e) => {
47 error!("invalid regex pattern '{pattern}': {e}");
48 None
49 }
50 }
51}
52
53impl Supervisor {
54 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
56 let id = &opts.id;
57 let cmd = opts.cmd.clone();
58
59 {
61 let mut pending = self.pending_autostops.lock().await;
62 if pending.remove(id).is_some() {
63 info!("cleared pending autostop for {id} (daemon starting)");
64 }
65 }
66
67 let daemon = self.get_daemon(id).await;
68 if let Some(daemon) = daemon {
69 if !daemon.status.is_stopping()
72 && !daemon.status.is_stopped()
73 && let Some(pid) = daemon.pid
74 {
75 if opts.force {
76 self.stop(id).await?;
77 info!("run: stop completed for daemon {id}");
78 } else {
79 warn!("daemon {id} already running with pid {pid}");
80 return Ok(IpcResponse::DaemonAlreadyRunning);
81 }
82 }
83 }
84
85 if opts.wait_ready && opts.retry > 0 {
87 let max_attempts = opts.retry.saturating_add(1);
89 for attempt in 0..max_attempts {
90 let mut retry_opts = opts.clone();
91 retry_opts.retry_count = attempt;
92 retry_opts.cmd = cmd.clone();
93
94 let result = self.run_once(retry_opts).await?;
95
96 match result {
97 IpcResponse::DaemonReady { daemon } => {
98 return Ok(IpcResponse::DaemonReady { daemon });
99 }
100 IpcResponse::DaemonFailedWithCode { exit_code } => {
101 if attempt < opts.retry {
102 let backoff_secs = 2u64.pow(attempt);
103 info!(
104 "daemon {id} failed (attempt {}/{}), retrying in {}s",
105 attempt + 1,
106 max_attempts,
107 backoff_secs
108 );
109 fire_hook(
110 HookType::OnRetry,
111 id.clone(),
112 opts.dir.clone(),
113 attempt + 1,
114 opts.env.clone(),
115 vec![],
116 )
117 .await;
118 time::sleep(Duration::from_secs(backoff_secs)).await;
119 continue;
120 } else {
121 info!("daemon {id} failed after {max_attempts} attempts");
122 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
123 }
124 }
125 other => return Ok(other),
126 }
127 }
128 }
129
130 self.run_once(opts).await
132 }
133
134 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
136 let id = &opts.id;
137 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
139
140 let (ready_tx, ready_rx) = if opts.wait_ready {
142 let (tx, rx) = oneshot::channel();
143 (Some(tx), Some(rx))
144 } else {
145 (None, None)
146 };
147
148 let expected_ports = opts.expected_port.clone();
150 let (resolved_ports, effective_ready_port) = if !opts.expected_port.is_empty() {
151 match check_ports_available(
152 &opts.expected_port,
153 opts.auto_bump_port,
154 opts.port_bump_attempts,
155 )
156 .await
157 {
158 Ok(resolved) => {
159 let ready_port = if let Some(configured_port) = opts.ready_port {
160 let bump_offset = resolved
162 .first()
163 .unwrap_or(&0)
164 .saturating_sub(*opts.expected_port.first().unwrap_or(&0));
165 if opts.expected_port.contains(&configured_port) && bump_offset > 0 {
166 configured_port
167 .checked_add(bump_offset)
168 .or(Some(configured_port))
169 } else {
170 Some(configured_port)
171 }
172 } else {
173 resolved.first().copied().filter(|&p| p != 0)
176 };
177 info!(
178 "daemon {id}: ports {:?} resolved to {:?}",
179 opts.expected_port, resolved
180 );
181 (resolved, ready_port)
182 }
183 Err(e) => {
184 error!("daemon {id}: port check failed: {e}");
185 if let Some(port_error) = e.downcast_ref::<PortError>() {
187 match port_error {
188 PortError::InUse { port, process, pid } => {
189 return Ok(IpcResponse::PortConflict {
190 port: *port,
191 process: process.clone(),
192 pid: *pid,
193 });
194 }
195 PortError::NoAvailablePort {
196 start_port,
197 attempts,
198 } => {
199 return Ok(IpcResponse::NoAvailablePort {
200 start_port: *start_port,
201 attempts: *attempts,
202 });
203 }
204 }
205 }
206 return Ok(IpcResponse::DaemonFailed {
207 error: e.to_string(),
208 });
209 }
210 }
211 } else {
212 (Vec::new(), opts.ready_port)
213 };
214
215 let cmd: Vec<String> = if opts.mise {
216 match settings().resolve_mise_bin() {
217 Some(mise_bin) => {
218 let mise_bin_str = mise_bin.to_string_lossy().to_string();
219 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
220 once("exec".to_string())
221 .chain(once(mise_bin_str))
222 .chain(once("x".to_string()))
223 .chain(once("--".to_string()))
224 .chain(cmd)
225 .collect_vec()
226 }
227 None => {
228 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
229 once("exec".to_string()).chain(cmd).collect_vec()
230 }
231 }
232 } else {
233 once("exec".to_string()).chain(cmd).collect_vec()
234 };
235 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
236 let log_path = id.log_path();
237 if let Some(parent) = log_path.parent() {
238 xx::file::mkdirp(parent)?;
239 }
240 info!("run: spawning daemon {id} with args: {args:?}");
241 let mut cmd = tokio::process::Command::new("sh");
242 cmd.args(&args)
243 .stdin(std::process::Stdio::null())
244 .stdout(std::process::Stdio::piped())
245 .stderr(std::process::Stdio::piped())
246 .current_dir(&opts.dir);
247
248 if let Some(ref path) = *env::ORIGINAL_PATH {
250 cmd.env("PATH", path);
251 }
252
253 if let Some(ref env_vars) = opts.env {
255 cmd.envs(env_vars);
256 }
257
258 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
260 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
261 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
262
263 if !resolved_ports.is_empty() {
265 cmd.env("PORT", resolved_ports[0].to_string());
269 for (i, port) in resolved_ports.iter().enumerate() {
271 cmd.env(format!("PORT{}", i), port.to_string());
272 }
273 }
274
275 #[cfg(unix)]
278 unsafe {
279 cmd.pre_exec(|| {
280 if libc::setsid() == -1 {
281 return Err(std::io::Error::last_os_error());
282 }
283 Ok(())
284 });
285 }
286
287 let mut child = cmd.spawn().into_diagnostic()?;
288 let pid = match child.id() {
289 Some(p) => p,
290 None => {
291 warn!("Daemon {id} exited before PID could be captured");
292 return Ok(IpcResponse::DaemonFailed {
293 error: "Process exited immediately".to_string(),
294 });
295 }
296 };
297 info!("started daemon {id} with pid {pid}");
298 let daemon = self
299 .upsert_daemon(
300 UpsertDaemonOpts::builder(id.clone())
301 .set(|o| {
302 o.pid = Some(pid);
303 o.status = DaemonStatus::Running;
304 o.shell_pid = opts.shell_pid;
305 o.dir = Some(opts.dir.clone());
306 o.cmd = Some(original_cmd);
307 o.autostop = opts.autostop;
308 o.cron_schedule = opts.cron_schedule.clone();
309 o.cron_retrigger = opts.cron_retrigger;
310 o.retry = Some(opts.retry);
311 o.retry_count = Some(opts.retry_count);
312 o.ready_delay = opts.ready_delay;
313 o.ready_output = opts.ready_output.clone();
314 o.ready_http = opts.ready_http.clone();
315 o.ready_port = effective_ready_port;
316 o.ready_cmd = opts.ready_cmd.clone();
317 o.expected_port = expected_ports;
318 o.resolved_port = resolved_ports;
319 o.auto_bump_port = Some(opts.auto_bump_port);
320 o.port_bump_attempts = Some(opts.port_bump_attempts);
321 o.depends = Some(opts.depends.clone());
322 o.env = opts.env.clone();
323 o.watch = Some(opts.watch.clone());
324 o.watch_base_dir = opts.watch_base_dir.clone();
325 o.mise = Some(opts.mise);
326 })
327 .build(),
328 )
329 .await?;
330
331 let id_clone = id.clone();
332 let ready_delay = opts.ready_delay;
333 let ready_output = opts.ready_output.clone();
334 let ready_http = opts.ready_http.clone();
335 let ready_port = effective_ready_port;
336 let ready_cmd = opts.ready_cmd.clone();
337 let daemon_dir = opts.dir.clone();
338 let hook_retry_count = opts.retry_count;
339 let hook_retry = opts.retry;
340 let hook_daemon_env = opts.env.clone();
341
342 tokio::spawn(async move {
343 let id = id_clone;
344 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
345 (Some(out), Some(err)) => (out, err),
346 _ => {
347 error!("Failed to capture stdout/stderr for daemon {id}");
348 return;
349 }
350 };
351 let mut stdout = tokio::io::BufReader::new(stdout).lines();
352 let mut stderr = tokio::io::BufReader::new(stderr).lines();
353 let log_file = match tokio::fs::File::options()
354 .append(true)
355 .create(true)
356 .open(&log_path)
357 .await
358 {
359 Ok(f) => f,
360 Err(e) => {
361 error!("Failed to open log file for daemon {id}: {e}");
362 return;
363 }
364 };
365 let mut log_appender = BufWriter::new(log_file);
366
367 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
368 let format_line = |line: String| {
369 if line.starts_with(&format!("{id} ")) {
370 format!("{} {line}\n", now())
372 } else {
373 format!("{} {id} {line}\n", now())
374 }
375 };
376
377 let mut ready_notified = false;
379 let mut ready_tx = ready_tx;
380 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
381
382 let mut delay_timer =
383 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
384
385 let s = settings();
387 let ready_check_interval = s.supervisor_ready_check_interval();
388 let http_client_timeout = s.supervisor_http_client_timeout();
389 let log_flush_interval_duration = s.supervisor_log_flush_interval();
390
391 let mut http_check_interval = ready_http
393 .as_ref()
394 .map(|_| tokio::time::interval(ready_check_interval));
395 let http_client = ready_http.as_ref().map(|_| {
396 reqwest::Client::builder()
397 .timeout(http_client_timeout)
398 .build()
399 .unwrap_or_default()
400 });
401
402 let mut port_check_interval =
404 ready_port.map(|_| tokio::time::interval(ready_check_interval));
405
406 let mut cmd_check_interval = ready_cmd
408 .as_ref()
409 .map(|_| tokio::time::interval(ready_check_interval));
410
411 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
413
414 let (exit_tx, mut exit_rx) =
416 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
417
418 let child_pid = child.id().unwrap_or(0);
420 tokio::spawn(async move {
421 let result = child.wait().await;
422 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
423 let _ = exit_tx.send(result).await;
424 });
425
426 #[allow(unused_assignments)]
427 let mut exit_status = None;
429
430 loop {
431 select! {
432 Ok(Some(line)) = stdout.next_line() => {
433 let formatted = format_line(line.clone());
434 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
435 error!("Failed to write to log for daemon {id}: {e}");
436 }
437 trace!("stdout: {id} {formatted}");
438
439 if !ready_notified
441 && let Some(ref pattern) = ready_pattern
442 && pattern.is_match(&line) {
443 info!("daemon {id} ready: output matched pattern");
444 ready_notified = true;
445 let _ = log_appender.flush().await;
447 if let Some(tx) = ready_tx.take() {
448 let _ = tx.send(Ok(()));
449 }
450 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
451 }
452 }
453 Ok(Some(line)) = stderr.next_line() => {
454 let formatted = format_line(line.clone());
455 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
456 error!("Failed to write to log for daemon {id}: {e}");
457 }
458 trace!("stderr: {id} {formatted}");
459
460 if !ready_notified
462 && let Some(ref pattern) = ready_pattern
463 && pattern.is_match(&line) {
464 info!("daemon {id} ready: output matched pattern");
465 ready_notified = true;
466 let _ = log_appender.flush().await;
468 if let Some(tx) = ready_tx.take() {
469 let _ = tx.send(Ok(()));
470 }
471 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
472 }
473 },
474 Some(result) = exit_rx.recv() => {
475 exit_status = Some(result);
477 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
478 let _ = log_appender.flush().await;
480 if !ready_notified {
481 if let Some(tx) = ready_tx.take() {
482 let is_success = exit_status.as_ref()
484 .and_then(|r| r.as_ref().ok())
485 .map(|s| s.success())
486 .unwrap_or(false);
487
488 if is_success {
489 debug!("daemon {id} exited successfully before ready check, sending success notification");
490 let _ = tx.send(Ok(()));
491 } else {
492 let exit_code = exit_status.as_ref()
493 .and_then(|r| r.as_ref().ok())
494 .and_then(|s| s.code());
495 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
496 let _ = tx.send(Err(exit_code));
497 }
498 }
499 } else {
500 debug!("daemon {id} was already marked ready, not sending notification");
501 }
502 break;
503 }
504 _ = async {
505 if let Some(ref mut interval) = http_check_interval {
506 interval.tick().await;
507 } else {
508 std::future::pending::<()>().await;
509 }
510 }, if !ready_notified && ready_http.is_some() => {
511 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
512 match client.get(url).send().await {
513 Ok(response) if response.status().is_success() => {
514 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
515 ready_notified = true;
516 let _ = log_appender.flush().await;
518 if let Some(tx) = ready_tx.take() {
519 let _ = tx.send(Ok(()));
520 }
521 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
522 http_check_interval = None;
524 }
525 Ok(response) => {
526 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
527 }
528 Err(e) => {
529 trace!("daemon {id} HTTP check failed: {e}");
530 }
531 }
532 }
533 }
534 _ = async {
535 if let Some(ref mut interval) = port_check_interval {
536 interval.tick().await;
537 } else {
538 std::future::pending::<()>().await;
539 }
540 }, if !ready_notified && ready_port.is_some() => {
541 if let Some(port) = ready_port {
542 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
543 Ok(_) => {
544 info!("daemon {id} ready: TCP port {port} is listening");
545 ready_notified = true;
546 let _ = log_appender.flush().await;
548 if let Some(tx) = ready_tx.take() {
549 let _ = tx.send(Ok(()));
550 }
551 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
552 port_check_interval = None;
554 }
555 Err(_) => {
556 trace!("daemon {id} port check: port {port} not listening yet");
557 }
558 }
559 }
560 }
561 _ = async {
562 if let Some(ref mut interval) = cmd_check_interval {
563 interval.tick().await;
564 } else {
565 std::future::pending::<()>().await;
566 }
567 }, if !ready_notified && ready_cmd.is_some() => {
568 if let Some(ref cmd) = ready_cmd {
569 let mut command = Shell::default_for_platform().command(cmd);
571 command
572 .current_dir(&daemon_dir)
573 .stdout(std::process::Stdio::null())
574 .stderr(std::process::Stdio::null());
575 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
576 match result {
577 Ok(status) if status.success() => {
578 info!("daemon {id} ready: readiness command succeeded");
579 ready_notified = true;
580 let _ = log_appender.flush().await;
581 if let Some(tx) = ready_tx.take() {
582 let _ = tx.send(Ok(()));
583 }
584 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
585 cmd_check_interval = None;
587 }
588 Ok(_) => {
589 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
590 }
591 Err(e) => {
592 trace!("daemon {id} cmd check failed: {e}");
593 }
594 }
595 }
596 }
597 _ = async {
598 if let Some(ref mut timer) = delay_timer {
599 timer.await;
600 } else {
601 std::future::pending::<()>().await;
602 }
603 } => {
604 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
605 info!("daemon {id} ready: delay elapsed");
606 ready_notified = true;
607 let _ = log_appender.flush().await;
609 if let Some(tx) = ready_tx.take() {
610 let _ = tx.send(Ok(()));
611 }
612 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
613 }
614 delay_timer = None;
616 }
617 _ = log_flush_interval.tick() => {
618 if let Err(e) = log_appender.flush().await {
620 error!("Failed to flush log for daemon {id}: {e}");
621 }
622 }
623 }
626 }
627
628 if let Err(e) = log_appender.flush().await {
630 error!("Failed to final flush log for daemon {id}: {e}");
631 }
632
633 let exit_status = if let Some(status) = exit_status {
635 status
636 } else {
637 match exit_rx.recv().await {
639 Some(status) => status,
640 None => {
641 warn!("daemon {id} exit channel closed without receiving status");
642 Err(std::io::Error::other("exit channel closed"))
643 }
644 }
645 };
646 let current_daemon = SUPERVISOR.get_daemon(&id).await;
647
648 SUPERVISOR
653 .active_monitors
654 .fetch_add(1, atomic::Ordering::Release);
655 struct MonitorGuard;
656 impl Drop for MonitorGuard {
657 fn drop(&mut self) {
658 SUPERVISOR
659 .active_monitors
660 .fetch_sub(1, atomic::Ordering::Release);
661 SUPERVISOR.monitor_done.notify_waiters();
662 }
663 }
664 let _monitor_guard = MonitorGuard;
665 if current_daemon.is_none()
670 || current_daemon.as_ref().is_some_and(|d| {
671 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
672 })
673 {
674 return;
676 }
677 let already_stopped = current_daemon
682 .as_ref()
683 .is_some_and(|d| d.status.is_stopped());
684 let is_stopping = already_stopped
685 || current_daemon
686 .as_ref()
687 .is_some_and(|d| d.status.is_stopping());
688
689 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
691 (Ok(status), true) => {
692 (status.code().unwrap_or(-1), "stop")
696 }
697 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
698 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
699 (Err(_), true) => {
700 (-1, "stop")
702 }
703 (Err(_), false) => (-1, "fail"),
704 };
705
706 if !already_stopped {
708 if let Ok(status) = &exit_status {
709 info!("daemon {id} exited with status {status}");
710 }
711 let (new_status, last_exit_success) = match exit_reason {
712 "stop" | "exit" => (
713 DaemonStatus::Stopped,
714 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
715 ),
716 _ => (DaemonStatus::Errored(exit_code), false),
717 };
718 if let Err(e) = SUPERVISOR
719 .upsert_daemon(
720 UpsertDaemonOpts::builder(id.clone())
721 .set(|o| {
722 o.pid = None;
723 o.status = new_status;
724 o.last_exit_success = Some(last_exit_success);
725 })
726 .build(),
727 )
728 .await
729 {
730 error!("Failed to update daemon state for {id}: {e}");
731 }
732 }
733
734 let hook_extra_env = vec![
736 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
737 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
738 ];
739
740 let hooks_to_fire: Vec<HookType> = match exit_reason {
742 "stop" => vec![HookType::OnStop, HookType::OnExit],
743 "exit" => vec![HookType::OnExit],
744 _ if hook_retry_count >= hook_retry => {
746 vec![HookType::OnFail, HookType::OnExit]
747 }
748 _ => vec![],
749 };
750
751 for hook_type in hooks_to_fire {
752 fire_hook(
753 hook_type,
754 id.clone(),
755 daemon_dir.clone(),
756 hook_retry_count,
757 hook_daemon_env.clone(),
758 hook_extra_env.clone(),
759 )
760 .await;
761 }
762 });
763
764 if let Some(ready_rx) = ready_rx {
766 match ready_rx.await {
767 Ok(Ok(())) => {
768 info!("daemon {id} is ready");
769 Ok(IpcResponse::DaemonReady { daemon })
770 }
771 Ok(Err(exit_code)) => {
772 error!("daemon {id} failed before becoming ready");
773 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
774 }
775 Err(_) => {
776 error!("readiness channel closed unexpectedly for daemon {id}");
777 Ok(IpcResponse::DaemonStart { daemon })
778 }
779 }
780 } else {
781 Ok(IpcResponse::DaemonStart { daemon })
782 }
783 }
784
785 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
787 let pitchfork_id = DaemonId::pitchfork();
788 if *id == pitchfork_id {
789 return Ok(IpcResponse::Error(
790 "Cannot stop supervisor via stop command".into(),
791 ));
792 }
793 info!("stopping daemon: {id}");
794 if let Some(daemon) = self.get_daemon(id).await {
795 trace!("daemon to stop: {daemon}");
796 if let Some(pid) = daemon.pid {
797 trace!("killing pid: {pid}");
798 PROCS.refresh_processes();
799 if PROCS.is_running(pid) {
800 self.upsert_daemon(
802 UpsertDaemonOpts::builder(id.clone())
803 .set(|o| {
804 o.pid = Some(pid);
805 o.status = DaemonStatus::Stopping;
806 })
807 .build(),
808 )
809 .await?;
810
811 if let Err(e) = PROCS.kill_process_group_async(pid).await {
814 debug!("failed to kill pid {pid}: {e}");
815 PROCS.refresh_processes();
817 if PROCS.is_running(pid) {
818 debug!("failed to stop pid {pid}: process still running after kill");
820 self.upsert_daemon(
821 UpsertDaemonOpts::builder(id.clone())
822 .set(|o| {
823 o.pid = Some(pid); o.status = DaemonStatus::Running;
825 })
826 .build(),
827 )
828 .await?;
829 return Ok(IpcResponse::DaemonStopFailed {
830 error: format!(
831 "process {pid} still running after kill attempt: {e}"
832 ),
833 });
834 }
835 }
836
837 self.upsert_daemon(
842 UpsertDaemonOpts::builder(id.clone())
843 .set(|o| {
844 o.pid = None;
845 o.status = DaemonStatus::Stopped;
846 o.last_exit_success = Some(true); })
848 .build(),
849 )
850 .await?;
851 } else {
852 debug!("pid {pid} not running, process may have exited unexpectedly");
853 self.upsert_daemon(
856 UpsertDaemonOpts::builder(id.clone())
857 .set(|o| {
858 o.pid = None;
859 o.status = DaemonStatus::Stopped;
860 })
861 .build(),
862 )
863 .await?;
864 return Ok(IpcResponse::DaemonWasNotRunning);
865 }
866 Ok(IpcResponse::Ok)
867 } else {
868 debug!("daemon {id} not running");
869 Ok(IpcResponse::DaemonNotRunning)
870 }
871 } else {
872 debug!("daemon {id} not found");
873 Ok(IpcResponse::DaemonNotFound)
874 }
875 }
876}
877
878async fn check_ports_available(
885 expected_ports: &[u16],
886 auto_bump: bool,
887 max_attempts: u32,
888) -> Result<Vec<u16>> {
889 if expected_ports.is_empty() {
890 return Ok(Vec::new());
891 }
892
893 for bump_offset in 0..=max_attempts {
894 let candidate_ports: Vec<u16> = expected_ports
896 .iter()
897 .map(|&p| p.wrapping_add(bump_offset as u16))
898 .collect();
899
900 let mut all_available = true;
902 let mut conflicting_port = None;
903
904 for &port in &candidate_ports {
905 if port == 0 {
908 continue;
909 }
910
911 let port_check =
919 tokio::task::spawn_blocking(move || match TcpListener::bind(("0.0.0.0", port)) {
920 Ok(listener) => {
921 drop(listener);
922 true
923 }
924 Err(_) => false,
925 })
926 .await
927 .into_diagnostic()
928 .wrap_err("failed to check port availability")?;
929
930 if !port_check {
931 all_available = false;
932 conflicting_port = Some(port);
933 break;
934 }
935 }
936
937 if all_available {
938 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
942 return Err(PortError::NoAvailablePort {
943 start_port: expected_ports[0],
944 attempts: bump_offset + 1,
945 }
946 .into());
947 }
948 if bump_offset > 0 {
949 info!(
950 "ports {:?} bumped by {} to {:?}",
951 expected_ports, bump_offset, candidate_ports
952 );
953 }
954 return Ok(candidate_ports);
955 }
956
957 if bump_offset == 0 {
959 if let Some(port) = conflicting_port {
961 if let Some((pid, process_name)) = get_process_using_port(port).await {
962 if !auto_bump {
963 return Err(PortError::InUse {
964 port,
965 process: process_name,
966 pid,
967 }
968 .into());
969 }
970 } else if !auto_bump {
971 return Err(PortError::InUse {
973 port,
974 process: "unknown".to_string(),
975 pid: 0,
976 }
977 .into());
978 }
979 }
980 }
981 }
982
983 Err(PortError::NoAvailablePort {
985 start_port: expected_ports[0],
986 attempts: max_attempts + 1,
987 }
988 .into())
989}
990
991async fn get_process_using_port(port: u16) -> Option<(u32, String)> {
995 tokio::task::spawn_blocking(move || {
996 listeners::get_all()
997 .ok()?
998 .into_iter()
999 .find(|listener| listener.socket.port() == port)
1000 .map(|listener| (listener.process.pid, listener.process.name))
1001 })
1002 .await
1003 .ok()
1004 .flatten()
1005}