1use super::hooks::{HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39 Inherit,
40 Switch {
41 uid: nix::unistd::Uid,
42 gid: nix::unistd::Gid,
43 username: Option<CString>,
44 },
45}
46
47pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50 if let Some(re) = cache.get(pattern) {
51 return Some(re.clone());
52 }
53 match Regex::new(pattern) {
54 Ok(re) => {
55 cache.insert(pattern.to_string(), re.clone());
56 Some(re)
57 }
58 Err(e) => {
59 error!("invalid regex pattern '{pattern}': {e}");
60 None
61 }
62 }
63}
64
65impl Supervisor {
66 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68 let id = &opts.id;
69 let cmd = opts.cmd.clone();
70
71 {
73 let mut pending = self.pending_autostops.lock().await;
74 if pending.remove(id).is_some() {
75 info!("cleared pending autostop for {id} (daemon starting)");
76 }
77 }
78
79 let daemon = self.get_daemon(id).await;
80 if let Some(daemon) = daemon {
81 if !daemon.status.is_stopping()
84 && !daemon.status.is_stopped()
85 && let Some(pid) = daemon.pid
86 {
87 if opts.force {
88 self.stop(id).await?;
89 info!("run: stop completed for daemon {id}");
90 } else {
91 warn!("daemon {id} already running with pid {pid}");
92 return Ok(IpcResponse::DaemonAlreadyRunning);
93 }
94 }
95 }
96
97 if opts.wait_ready && opts.retry > 0 {
99 let max_attempts = opts.retry.saturating_add(1);
101 for attempt in 0..max_attempts {
102 let mut retry_opts = opts.clone();
103 retry_opts.retry_count = attempt;
104 retry_opts.cmd = cmd.clone();
105
106 let result = self.run_once(retry_opts).await?;
107
108 match result {
109 IpcResponse::DaemonReady { daemon } => {
110 return Ok(IpcResponse::DaemonReady { daemon });
111 }
112 IpcResponse::DaemonFailedWithCode { exit_code } => {
113 if attempt < opts.retry {
114 let backoff_secs = 2u64.pow(attempt);
115 info!(
116 "daemon {id} failed (attempt {}/{}), retrying in {}s",
117 attempt + 1,
118 max_attempts,
119 backoff_secs
120 );
121 fire_hook(
122 HookType::OnRetry,
123 id.clone(),
124 opts.dir.clone(),
125 attempt + 1,
126 opts.env.clone(),
127 vec![],
128 )
129 .await;
130 time::sleep(Duration::from_secs(backoff_secs)).await;
131 continue;
132 } else {
133 info!("daemon {id} failed after {max_attempts} attempts");
134 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135 }
136 }
137 other => return Ok(other),
138 }
139 }
140 }
141
142 self.run_once(opts).await
144 }
145
146 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148 let id = &opts.id;
149 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
151
152 let (ready_tx, ready_rx) = if opts.wait_ready {
154 let (tx, rx) = oneshot::channel();
155 (Some(tx), Some(rx))
156 } else {
157 (None, None)
158 };
159
160 let expected_ports = opts.expected_port.clone();
162 let (resolved_ports, effective_ready_port) = if !opts.expected_port.is_empty() {
163 match check_ports_available(
164 &opts.expected_port,
165 opts.auto_bump_port,
166 opts.port_bump_attempts,
167 )
168 .await
169 {
170 Ok(resolved) => {
171 let ready_port = if let Some(configured_port) = opts.ready_port {
172 let bump_offset = resolved
174 .first()
175 .unwrap_or(&0)
176 .saturating_sub(*opts.expected_port.first().unwrap_or(&0));
177 if opts.expected_port.contains(&configured_port) && bump_offset > 0 {
178 configured_port
179 .checked_add(bump_offset)
180 .or(Some(configured_port))
181 } else {
182 Some(configured_port)
183 }
184 } else if opts.ready_output.is_none()
185 && opts.ready_http.is_none()
186 && opts.ready_cmd.is_none()
187 && opts.ready_delay.is_none()
188 {
189 resolved.first().copied().filter(|&p| p != 0)
193 } else {
194 None
198 };
199 info!(
200 "daemon {id}: ports {:?} resolved to {:?}",
201 opts.expected_port, resolved
202 );
203 (resolved, ready_port)
204 }
205 Err(e) => {
206 error!("daemon {id}: port check failed: {e}");
207 if let Some(port_error) = e.downcast_ref::<PortError>() {
209 match port_error {
210 PortError::InUse { port, process, pid } => {
211 return Ok(IpcResponse::PortConflict {
212 port: *port,
213 process: process.clone(),
214 pid: *pid,
215 });
216 }
217 PortError::NoAvailablePort {
218 start_port,
219 attempts,
220 } => {
221 return Ok(IpcResponse::NoAvailablePort {
222 start_port: *start_port,
223 attempts: *attempts,
224 });
225 }
226 }
227 }
228 return Ok(IpcResponse::DaemonFailed {
229 error: e.to_string(),
230 });
231 }
232 }
233 } else {
234 if let Some(port) = opts.ready_port {
240 if port > 0 {
241 if let Some((pid, process)) = detect_port_conflict(port).await {
242 return Ok(IpcResponse::PortConflict { port, process, pid });
243 }
244 }
245 }
246 (Vec::new(), opts.ready_port)
247 };
248
249 let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
250 match settings().resolve_mise_bin() {
251 Some(mise_bin) => {
252 let mise_bin_str = mise_bin.to_string_lossy().to_string();
253 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
254 once("exec".to_string())
255 .chain(once(mise_bin_str))
256 .chain(once("x".to_string()))
257 .chain(once("--".to_string()))
258 .chain(cmd)
259 .collect_vec()
260 }
261 None => {
262 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
263 once("exec".to_string()).chain(cmd).collect_vec()
264 }
265 }
266 } else {
267 once("exec".to_string()).chain(cmd).collect_vec()
268 };
269 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
270 let log_path = id.log_path();
271 if let Some(parent) = log_path.parent() {
272 xx::file::mkdirp(parent)?;
273 }
274 #[cfg(unix)]
275 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
276 Ok(identity) => identity,
277 Err(e) => {
278 return Ok(IpcResponse::DaemonFailed {
279 error: e.to_string(),
280 });
281 }
282 };
283 info!("run: spawning daemon {id} with args: {args:?}");
284 let mut cmd = tokio::process::Command::new("sh");
285 cmd.args(&args)
286 .stdin(std::process::Stdio::null())
287 .stdout(std::process::Stdio::piped())
288 .stderr(std::process::Stdio::piped())
289 .current_dir(&opts.dir);
290
291 if let Some(ref path) = *env::ORIGINAL_PATH {
293 cmd.env("PATH", path);
294 }
295
296 if let Some(ref env_vars) = opts.env {
298 cmd.envs(env_vars);
299 }
300
301 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
303 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
304 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
305
306 if !resolved_ports.is_empty() {
308 cmd.env("PORT", resolved_ports[0].to_string());
312 for (i, port) in resolved_ports.iter().enumerate() {
314 cmd.env(format!("PORT{i}"), port.to_string());
315 }
316 }
317
318 #[cfg(unix)]
319 {
320 let run_identity = run_identity.clone();
321 unsafe {
322 cmd.pre_exec(move || {
323 nix::unistd::setsid().map_err(nix_to_io_error)?;
324 apply_run_identity(&run_identity)?;
325 Ok(())
326 });
327 }
328 }
329
330 let mut child = cmd.spawn().into_diagnostic()?;
331 let pid = match child.id() {
332 Some(p) => p,
333 None => {
334 warn!("Daemon {id} exited before PID could be captured");
335 return Ok(IpcResponse::DaemonFailed {
336 error: "Process exited immediately".to_string(),
337 });
338 }
339 };
340 info!("started daemon {id} with pid {pid}");
341 let daemon = self
342 .upsert_daemon(
343 UpsertDaemonOpts::builder(id.clone())
344 .set(|o| {
345 o.pid = Some(pid);
346 o.status = DaemonStatus::Running;
347 o.shell_pid = opts.shell_pid;
348 o.dir = Some(opts.dir.clone());
349 o.cmd = Some(original_cmd);
350 o.autostop = opts.autostop;
351 o.cron_schedule = opts.cron_schedule.clone();
352 o.cron_retrigger = opts.cron_retrigger;
353 o.retry = Some(opts.retry);
354 o.retry_count = Some(opts.retry_count);
355 o.ready_delay = opts.ready_delay;
356 o.ready_output = opts.ready_output.clone();
357 o.ready_http = opts.ready_http.clone();
358 o.ready_port = effective_ready_port;
359 o.ready_cmd = opts.ready_cmd.clone();
360 o.expected_port = expected_ports;
361 o.resolved_port = resolved_ports;
362 o.auto_bump_port = Some(opts.auto_bump_port);
363 o.port_bump_attempts = Some(opts.port_bump_attempts);
364 o.depends = Some(opts.depends.clone());
365 o.env = opts.env.clone();
366 o.watch = Some(opts.watch.clone());
367 o.watch_mode = Some(opts.watch_mode);
368 o.watch_base_dir = opts.watch_base_dir.clone();
369 o.mise = opts.mise;
370 o.user = opts.user.clone();
371 o.memory_limit = opts.memory_limit;
372 o.cpu_limit = opts.cpu_limit;
373 })
374 .build(),
375 )
376 .await?;
377
378 let id_clone = id.clone();
379 let ready_delay = opts.ready_delay;
380 let ready_output = opts.ready_output.clone();
381 let ready_http = opts.ready_http.clone();
382 let ready_port = effective_ready_port;
383 let ready_cmd = opts.ready_cmd.clone();
384 let daemon_dir = opts.dir.clone();
385 let hook_retry_count = opts.retry_count;
386 let hook_retry = opts.retry;
387 let hook_daemon_env = opts.env.clone();
388 let has_port_config = !opts.expected_port.is_empty()
394 || (settings().proxy.enable && is_daemon_slug_target(id));
395 let daemon_pid = pid;
396
397 tokio::spawn(async move {
398 let id = id_clone;
399 let (stdout, stderr) = match (child.stdout.take(), child.stderr.take()) {
400 (Some(out), Some(err)) => (out, err),
401 _ => {
402 error!("Failed to capture stdout/stderr for daemon {id}");
403 return;
404 }
405 };
406 let mut stdout = tokio::io::BufReader::new(stdout).lines();
407 let mut stderr = tokio::io::BufReader::new(stderr).lines();
408 let log_file = match tokio::fs::File::options()
409 .append(true)
410 .create(true)
411 .open(&log_path)
412 .await
413 {
414 Ok(f) => f,
415 Err(e) => {
416 error!("Failed to open log file for daemon {id}: {e}");
417 return;
418 }
419 };
420 let mut log_appender = BufWriter::new(log_file);
421
422 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
423 let format_line = |line: String| {
424 if line.starts_with(&format!("{id} ")) {
425 format!("{} {line}\n", now())
427 } else {
428 format!("{} {id} {line}\n", now())
429 }
430 };
431
432 let mut ready_notified = false;
434 let mut ready_tx = ready_tx;
435 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
436 let mut active_port_spawned = false;
438
439 let mut delay_timer =
440 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
441
442 let s = settings();
444 let ready_check_interval = s.supervisor_ready_check_interval();
445 let http_client_timeout = s.supervisor_http_client_timeout();
446 let log_flush_interval_duration = s.supervisor_log_flush_interval();
447
448 let mut http_check_interval = ready_http
450 .as_ref()
451 .map(|_| tokio::time::interval(ready_check_interval));
452 let http_client = ready_http.as_ref().map(|_| {
453 reqwest::Client::builder()
454 .timeout(http_client_timeout)
455 .build()
456 .unwrap_or_default()
457 });
458
459 let mut port_check_interval =
461 ready_port.map(|_| tokio::time::interval(ready_check_interval));
462
463 let mut cmd_check_interval = ready_cmd
465 .as_ref()
466 .map(|_| tokio::time::interval(ready_check_interval));
467
468 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
470
471 let (exit_tx, mut exit_rx) =
473 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
474
475 let child_pid = child.id().unwrap_or(0);
477 tokio::spawn(async move {
478 let result = child.wait().await;
479 #[cfg(all(unix, not(target_os = "linux")))]
489 let result = match &result {
490 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
491 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
492 warn!(
493 "daemon pid {child_pid} wait() got ECHILD; \
494 recovered exit code {code} from zombie reaper"
495 );
496 use std::os::unix::process::ExitStatusExt;
501 if code >= 0 {
502 Ok(std::process::ExitStatus::from_raw(code << 8))
503 } else {
504 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
506 }
507 } else {
508 warn!(
509 "daemon pid {child_pid} wait() got ECHILD but no \
510 stashed status found; reporting as error"
511 );
512 result
513 }
514 }
515 _ => result,
516 };
517 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
518 let _ = exit_tx.send(result).await;
519 });
520
521 #[allow(unused_assignments)]
522 let mut exit_status = None;
524
525 if has_port_config
531 && ready_pattern.is_none()
532 && ready_http.is_none()
533 && ready_port.is_none()
534 && ready_cmd.is_none()
535 && delay_timer.is_none()
536 {
537 active_port_spawned = true;
538 detect_and_store_active_port(id.clone(), daemon_pid);
539 }
540
541 loop {
542 select! {
543 Ok(Some(line)) = stdout.next_line() => {
544 let formatted = format_line(line.clone());
545 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
546 error!("Failed to write to log for daemon {id}: {e}");
547 }
548 trace!("stdout: {id} {formatted}");
549
550 if !ready_notified
552 && let Some(ref pattern) = ready_pattern
553 && pattern.is_match(&line)
554 {
555 info!("daemon {id} ready: output matched pattern");
556 ready_notified = true;
557 let _ = log_appender.flush().await;
558 if let Some(tx) = ready_tx.take() {
559 let _ = tx.send(Ok(()));
560 }
561 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
562 if !active_port_spawned && has_port_config {
563 active_port_spawned = true;
564 detect_and_store_active_port(id.clone(), daemon_pid);
565 }
566 }
567 }
568 Ok(Some(line)) = stderr.next_line() => {
569 let formatted = format_line(line.clone());
570 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
571 error!("Failed to write to log for daemon {id}: {e}");
572 }
573 trace!("stderr: {id} {formatted}");
574
575 if !ready_notified
576 && let Some(ref pattern) = ready_pattern
577 && pattern.is_match(&line)
578 {
579 info!("daemon {id} ready: output matched pattern");
580 ready_notified = true;
581 let _ = log_appender.flush().await;
582 if let Some(tx) = ready_tx.take() {
583 let _ = tx.send(Ok(()));
584 }
585 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
586 if !active_port_spawned && has_port_config {
587 active_port_spawned = true;
588 detect_and_store_active_port(id.clone(), daemon_pid);
589 }
590 }
591 },
592 Some(result) = exit_rx.recv() => {
593 exit_status = Some(result);
595 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
596 let _ = log_appender.flush().await;
598 if !ready_notified {
599 if let Some(tx) = ready_tx.take() {
600 let is_success = exit_status.as_ref()
602 .and_then(|r| r.as_ref().ok())
603 .map(|s| s.success())
604 .unwrap_or(false);
605
606 if is_success {
607 debug!("daemon {id} exited successfully before ready check, sending success notification");
608 let _ = tx.send(Ok(()));
609 } else {
610 let exit_code = exit_status.as_ref()
611 .and_then(|r| r.as_ref().ok())
612 .and_then(|s| s.code());
613 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
614 let _ = tx.send(Err(exit_code));
615 }
616 }
617 } else {
618 debug!("daemon {id} was already marked ready, not sending notification");
619 }
620 break;
621 },
622 _ = async {
623 if let Some(ref mut interval) = http_check_interval {
624 interval.tick().await;
625 } else {
626 std::future::pending::<()>().await;
627 }
628 }, if !ready_notified && ready_http.is_some() => {
629 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
630 match client.get(url).send().await {
631 Ok(response) if response.status().is_success() => {
632 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
633 ready_notified = true;
634 let _ = log_appender.flush().await;
635 if let Some(tx) = ready_tx.take() {
636 let _ = tx.send(Ok(()));
637 }
638 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
639 http_check_interval = None;
640 if !active_port_spawned && has_port_config {
641 active_port_spawned = true;
642 detect_and_store_active_port(id.clone(), daemon_pid);
643 }
644 }
645 Ok(response) => {
646 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
647 }
648 Err(e) => {
649 trace!("daemon {id} HTTP check failed: {e}");
650 }
651 }
652 }
653 }
654 _ = async {
655 if let Some(ref mut interval) = port_check_interval {
656 interval.tick().await;
657 } else {
658 std::future::pending::<()>().await;
659 }
660 }, if !ready_notified && ready_port.is_some() => {
661 if let Some(port) = ready_port {
662 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
663 Ok(_) => {
664 info!("daemon {id} ready: TCP port {port} is listening");
665 ready_notified = true;
666 let _ = log_appender.flush().await;
668 if let Some(tx) = ready_tx.take() {
669 let _ = tx.send(Ok(()));
670 }
671 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
672 port_check_interval = None;
674 if !active_port_spawned && has_port_config {
675 active_port_spawned = true;
676 detect_and_store_active_port(id.clone(), daemon_pid);
677 }
678 }
679 Err(_) => {
680 trace!("daemon {id} port check: port {port} not listening yet");
681 }
682 }
683 }
684 }
685 _ = async {
686 if let Some(ref mut interval) = cmd_check_interval {
687 interval.tick().await;
688 } else {
689 std::future::pending::<()>().await;
690 }
691 }, if !ready_notified && ready_cmd.is_some() => {
692 if let Some(ref cmd) = ready_cmd {
693 let mut command = Shell::default_for_platform().command(cmd);
695 command
696 .current_dir(&daemon_dir)
697 .stdout(std::process::Stdio::null())
698 .stderr(std::process::Stdio::null());
699 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
700 match result {
701 Ok(status) if status.success() => {
702 info!("daemon {id} ready: readiness command succeeded");
703 ready_notified = true;
704 let _ = log_appender.flush().await;
705 if let Some(tx) = ready_tx.take() {
706 let _ = tx.send(Ok(()));
707 }
708 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
709 cmd_check_interval = None;
711 if !active_port_spawned && has_port_config {
712 active_port_spawned = true;
713 detect_and_store_active_port(id.clone(), daemon_pid);
714 }
715 }
716 Ok(_) => {
717 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
718 }
719 Err(e) => {
720 trace!("daemon {id} cmd check failed: {e}");
721 }
722 }
723 }
724 }
725 _ = async {
726 if let Some(ref mut timer) = delay_timer {
727 timer.await;
728 } else {
729 std::future::pending::<()>().await;
730 }
731 } => {
732 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
733 info!("daemon {id} ready: delay elapsed");
734 ready_notified = true;
735 let _ = log_appender.flush().await;
737 if let Some(tx) = ready_tx.take() {
738 let _ = tx.send(Ok(()));
739 }
740 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
741 }
742 delay_timer = None;
744 if !active_port_spawned && has_port_config {
745 active_port_spawned = true;
746 detect_and_store_active_port(id.clone(), daemon_pid);
747 }
748 }
749 _ = log_flush_interval.tick() => {
750 if let Err(e) = log_appender.flush().await {
752 error!("Failed to flush log for daemon {id}: {e}");
753 }
754 }
755 }
756 }
757
758 if let Err(e) = log_appender.flush().await {
760 error!("Failed to final flush log for daemon {id}: {e}");
761 }
762
763 {
765 let mut state_file = SUPERVISOR.state_file.lock().await;
766 if let Some(d) = state_file.daemons.get_mut(&id) {
767 d.active_port = None;
768 }
769 if let Err(e) = state_file.write() {
770 debug!("Failed to write state after clearing active_port for {id}: {e}");
771 }
772 }
773
774 let exit_status = if let Some(status) = exit_status {
776 status
777 } else {
778 match exit_rx.recv().await {
780 Some(status) => status,
781 None => {
782 warn!("daemon {id} exit channel closed without receiving status");
783 Err(std::io::Error::other("exit channel closed"))
784 }
785 }
786 };
787 let current_daemon = SUPERVISOR.get_daemon(&id).await;
788
789 SUPERVISOR
794 .active_monitors
795 .fetch_add(1, atomic::Ordering::Release);
796 struct MonitorGuard;
797 impl Drop for MonitorGuard {
798 fn drop(&mut self) {
799 SUPERVISOR
800 .active_monitors
801 .fetch_sub(1, atomic::Ordering::Release);
802 SUPERVISOR.monitor_done.notify_waiters();
803 }
804 }
805 let _monitor_guard = MonitorGuard;
806 if current_daemon.is_none()
811 || current_daemon.as_ref().is_some_and(|d| {
812 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
813 })
814 {
815 return;
817 }
818 let already_stopped = current_daemon
823 .as_ref()
824 .is_some_and(|d| d.status.is_stopped());
825 let is_stopping = already_stopped
826 || current_daemon
827 .as_ref()
828 .is_some_and(|d| d.status.is_stopping());
829
830 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
832 (Ok(status), true) => {
833 (status.code().unwrap_or(-1), "stop")
837 }
838 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
839 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
840 (Err(_), true) => {
841 (-1, "stop")
843 }
844 (Err(_), false) => (-1, "fail"),
845 };
846
847 if !already_stopped {
849 if let Ok(status) = &exit_status {
850 info!("daemon {id} exited with status {status}");
851 }
852 let (new_status, last_exit_success) = match exit_reason {
853 "stop" | "exit" => (
854 DaemonStatus::Stopped,
855 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
856 ),
857 _ => (DaemonStatus::Errored(exit_code), false),
858 };
859 if let Err(e) = SUPERVISOR
860 .upsert_daemon(
861 UpsertDaemonOpts::builder(id.clone())
862 .set(|o| {
863 o.pid = None;
864 o.status = new_status;
865 o.last_exit_success = Some(last_exit_success);
866 })
867 .build(),
868 )
869 .await
870 {
871 error!("Failed to update daemon state for {id}: {e}");
872 }
873 }
874
875 let hook_extra_env = vec![
877 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
878 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
879 ];
880
881 let hooks_to_fire: Vec<HookType> = match exit_reason {
883 "stop" => vec![HookType::OnStop, HookType::OnExit],
884 "exit" => vec![HookType::OnExit],
885 _ if hook_retry_count >= hook_retry => {
887 vec![HookType::OnFail, HookType::OnExit]
888 }
889 _ => vec![],
890 };
891
892 for hook_type in hooks_to_fire {
893 fire_hook(
894 hook_type,
895 id.clone(),
896 daemon_dir.clone(),
897 hook_retry_count,
898 hook_daemon_env.clone(),
899 hook_extra_env.clone(),
900 )
901 .await;
902 }
903 });
904
905 if let Some(ready_rx) = ready_rx {
907 match ready_rx.await {
908 Ok(Ok(())) => {
909 info!("daemon {id} is ready");
910 Ok(IpcResponse::DaemonReady { daemon })
911 }
912 Ok(Err(exit_code)) => {
913 error!("daemon {id} failed before becoming ready");
914 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
915 }
916 Err(_) => {
917 error!("readiness channel closed unexpectedly for daemon {id}");
918 Ok(IpcResponse::DaemonStart { daemon })
919 }
920 }
921 } else {
922 Ok(IpcResponse::DaemonStart { daemon })
923 }
924 }
925
926 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
928 let pitchfork_id = DaemonId::pitchfork();
929 if *id == pitchfork_id {
930 return Ok(IpcResponse::Error(
931 "Cannot stop supervisor via stop command".into(),
932 ));
933 }
934 info!("stopping daemon: {id}");
935 if let Some(daemon) = self.get_daemon(id).await {
936 trace!("daemon to stop: {daemon}");
937 if let Some(pid) = daemon.pid {
938 trace!("killing pid: {pid}");
939 PROCS.refresh_processes();
940 if PROCS.is_running(pid) {
941 self.upsert_daemon(
943 UpsertDaemonOpts::builder(id.clone())
944 .set(|o| {
945 o.pid = Some(pid);
946 o.status = DaemonStatus::Stopping;
947 })
948 .build(),
949 )
950 .await?;
951
952 if let Err(e) = PROCS.kill_process_group_async(pid).await {
955 debug!("failed to kill pid {pid}: {e}");
956 PROCS.refresh_processes();
958 if PROCS.is_running(pid) {
959 debug!("failed to stop pid {pid}: process still running after kill");
961 self.upsert_daemon(
962 UpsertDaemonOpts::builder(id.clone())
963 .set(|o| {
964 o.pid = Some(pid); o.status = DaemonStatus::Running;
966 })
967 .build(),
968 )
969 .await?;
970 return Ok(IpcResponse::DaemonStopFailed {
971 error: format!(
972 "process {pid} still running after kill attempt: {e}"
973 ),
974 });
975 }
976 }
977
978 self.upsert_daemon(
983 UpsertDaemonOpts::builder(id.clone())
984 .set(|o| {
985 o.pid = None;
986 o.status = DaemonStatus::Stopped;
987 o.last_exit_success = Some(true); })
989 .build(),
990 )
991 .await?;
992 } else {
993 debug!("pid {pid} not running, process may have exited unexpectedly");
994 self.upsert_daemon(
997 UpsertDaemonOpts::builder(id.clone())
998 .set(|o| {
999 o.pid = None;
1000 o.status = DaemonStatus::Stopped;
1001 })
1002 .build(),
1003 )
1004 .await?;
1005 return Ok(IpcResponse::DaemonWasNotRunning);
1006 }
1007 Ok(IpcResponse::Ok)
1008 } else {
1009 debug!("daemon {id} not running");
1010 Ok(IpcResponse::DaemonNotRunning)
1011 }
1012 } else {
1013 debug!("daemon {id} not found");
1014 Ok(IpcResponse::DaemonNotFound)
1015 }
1016 }
1017}
1018
1019#[cfg(unix)]
1020fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1021 let settings_user = settings().supervisor.user.trim();
1022 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1023 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1024 let configured = daemon_user.or(settings_user);
1025 let current_uid = nix::unistd::Uid::effective().as_raw();
1026 let current_gid = nix::unistd::Gid::effective().as_raw();
1027 resolve_run_identity(
1028 configured,
1029 current_uid,
1030 current_gid,
1031 std::env::var("SUDO_UID").ok().as_deref(),
1032 std::env::var("SUDO_GID").ok().as_deref(),
1033 )
1034}
1035
1036#[cfg(unix)]
1037fn resolve_run_identity(
1038 configured: Option<&str>,
1039 current_uid: u32,
1040 current_gid: u32,
1041 sudo_uid: Option<&str>,
1042 sudo_gid: Option<&str>,
1043) -> Result<RunIdentity> {
1044 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1045 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1046 if let Some(user) = configured {
1047 let identity = resolve_configured_user(user)?;
1048 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1049 if identity.matches(current_uid, current_gid) {
1050 return Ok(RunIdentity::Inherit);
1051 }
1052 return Ok(identity);
1053 }
1054
1055 if current_uid.is_root()
1056 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1057 {
1058 return Ok(identity);
1059 }
1060
1061 Ok(RunIdentity::Inherit)
1062}
1063
1064#[cfg(unix)]
1065fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1066 if user.chars().all(|c| c.is_ascii_digit()) {
1067 let uid = user
1068 .parse::<u32>()
1069 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1070 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1071 .into_diagnostic()?
1072 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1073 return run_identity_from_user_record(user_record);
1074 }
1075
1076 let user_record = nix::unistd::User::from_name(user)
1077 .into_diagnostic()?
1078 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1079 run_identity_from_user_record(user_record)
1080}
1081
1082#[cfg(unix)]
1083fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1084 let username = CString::new(user.name)
1085 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1086 Ok(RunIdentity::Switch {
1087 uid: user.uid,
1088 gid: user.gid,
1089 username: Some(username),
1090 })
1091}
1092
1093#[cfg(unix)]
1094fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1095 RunIdentity::Switch {
1096 uid: nix::unistd::Uid::from_raw(uid),
1097 gid: nix::unistd::Gid::from_raw(gid),
1098 username,
1099 }
1100}
1101
1102#[cfg(unix)]
1103fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1104 let uid = sudo_uid?.parse::<u32>().ok()?;
1105 let gid = sudo_gid?.parse::<u32>().ok()?;
1106 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1107 .ok()
1108 .flatten()
1109 .and_then(|u| CString::new(u.name).ok());
1110 Some(run_identity_from_raw_ids(uid, gid, username))
1111}
1112
1113#[cfg(unix)]
1114fn ensure_can_use_identity(
1115 configured_user: &str,
1116 identity: &RunIdentity,
1117 current_uid: nix::unistd::Uid,
1118 current_gid: nix::unistd::Gid,
1119) -> Result<()> {
1120 let RunIdentity::Switch { uid, gid, .. } = identity else {
1121 return Ok(());
1122 };
1123 if *uid == current_uid && *gid == current_gid {
1124 return Ok(());
1125 }
1126 if current_uid.is_root() {
1127 return Ok(());
1128 }
1129 Err(miette::miette!(
1130 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1131 configured_user,
1132 current_uid.as_raw(),
1133 current_gid.as_raw(),
1134 uid.as_raw(),
1135 gid.as_raw()
1136 ))
1137}
1138
1139#[cfg(unix)]
1140fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1141 let RunIdentity::Switch { uid, gid, username } = identity else {
1142 return Ok(());
1143 };
1144 if let Some(username) = username {
1145 initgroups_for_user(username, *gid)?;
1146 } else {
1147 setgroups_to_primary(*gid)?;
1148 }
1149 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1150 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1151 Ok(())
1152}
1153
1154#[cfg(unix)]
1155impl RunIdentity {
1156 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1157 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1158 }
1159}
1160
1161#[cfg(unix)]
1162fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1163 let groups = [gid.as_raw() as libc::gid_t];
1164 #[cfg(any(target_os = "linux", target_os = "android"))]
1165 let group_count = groups.len();
1166 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1167 let group_count = groups.len() as libc::c_int;
1168 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1169 if rc == -1 {
1170 Err(std::io::Error::last_os_error())
1171 } else {
1172 Ok(())
1173 }
1174}
1175
1176#[cfg(unix)]
1177fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1178 let gid = gid.as_raw();
1179 #[cfg(any(
1180 target_os = "macos",
1181 target_os = "ios",
1182 target_os = "tvos",
1183 target_os = "watchos"
1184 ))]
1185 let base_gid = i32::try_from(gid)
1186 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1187
1188 #[cfg(not(any(
1189 target_os = "macos",
1190 target_os = "ios",
1191 target_os = "tvos",
1192 target_os = "watchos"
1193 )))]
1194 let base_gid = gid as libc::gid_t;
1195
1196 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1199 if rc == -1 {
1200 Err(std::io::Error::last_os_error())
1201 } else {
1202 Ok(())
1203 }
1204}
1205
1206#[cfg(unix)]
1207fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1208 std::io::Error::from_raw_os_error(err as i32)
1209}
1210
1211async fn check_ports_available(
1218 expected_ports: &[u16],
1219 auto_bump: bool,
1220 max_attempts: u32,
1221) -> Result<Vec<u16>> {
1222 if expected_ports.is_empty() {
1223 return Ok(Vec::new());
1224 }
1225
1226 for bump_offset in 0..=max_attempts {
1227 let candidate_ports: Vec<u16> = expected_ports
1229 .iter()
1230 .map(|&p| p.wrapping_add(bump_offset as u16))
1231 .collect();
1232
1233 let mut all_available = true;
1235 let mut conflicting_port = None;
1236
1237 for &port in &candidate_ports {
1238 if port == 0 {
1241 continue;
1242 }
1243
1244 if is_port_in_use(port).await {
1258 all_available = false;
1259 conflicting_port = Some(port);
1260 break;
1261 }
1262 }
1263
1264 if all_available {
1265 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1269 return Err(PortError::NoAvailablePort {
1270 start_port: expected_ports[0],
1271 attempts: bump_offset + 1,
1272 }
1273 .into());
1274 }
1275 if bump_offset > 0 {
1276 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1277 }
1278 return Ok(candidate_ports);
1279 }
1280
1281 if bump_offset == 0 && !auto_bump {
1283 if let Some(port) = conflicting_port {
1284 let (pid, process) = identify_port_owner(port).await;
1285 return Err(PortError::InUse { port, process, pid }.into());
1286 }
1287 }
1288 }
1289
1290 Err(PortError::NoAvailablePort {
1292 start_port: expected_ports[0],
1293 attempts: max_attempts + 1,
1294 }
1295 .into())
1296}
1297
1298async fn is_port_in_use(port: u16) -> bool {
1304 tokio::task::spawn_blocking(move || {
1305 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1306 match std::net::TcpListener::bind((addr, port)) {
1307 Ok(listener) => drop(listener),
1308 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1309 Err(_) => continue,
1310 }
1311 }
1312 false
1313 })
1314 .await
1315 .unwrap_or(false)
1316}
1317
1318async fn identify_port_owner(port: u16) -> (u32, String) {
1323 tokio::task::spawn_blocking(move || {
1324 listeners::get_all()
1325 .ok()
1326 .and_then(|list| {
1327 list.into_iter()
1328 .find(|l| l.socket.port() == port)
1329 .map(|l| (l.process.pid, l.process.name))
1330 })
1331 .unwrap_or((0, "unknown".to_string()))
1332 })
1333 .await
1334 .unwrap_or((0, "unknown".to_string()))
1335}
1336
1337async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1342 if !is_port_in_use(port).await {
1343 return None;
1344 }
1345 Some(identify_port_owner(port).await)
1346}
1347
1348fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1363 tokio::spawn(async move {
1364 for delay_ms in [500u64, 1000, 2000, 4000] {
1368 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1369
1370 let expected_port: Option<u16> = {
1373 let state_file = SUPERVISOR.state_file.lock().await;
1374 match state_file.daemons.get(&id) {
1375 Some(d) if d.pid.is_none() => {
1376 debug!("daemon {id}: aborting active_port detection — process exited");
1377 return;
1378 }
1379 Some(d) => d.expected_port.first().copied().filter(|&p| p > 0),
1380 None => None,
1381 }
1382 };
1383
1384 let active_port = tokio::task::spawn_blocking(move || {
1385 let listeners = listeners::get_all().ok()?;
1386 let process_ports: Vec<u16> = listeners
1387 .into_iter()
1388 .filter(|listener| listener.process.pid == pid)
1389 .map(|listener| listener.socket.port())
1390 .filter(|&port| port > 0)
1391 .collect();
1392
1393 if process_ports.is_empty() {
1394 return None;
1395 }
1396
1397 if let Some(ep) = expected_port {
1400 if process_ports.contains(&ep) {
1401 return Some(ep);
1402 }
1403 }
1404
1405 process_ports.into_iter().next()
1411 })
1412 .await
1413 .ok()
1414 .flatten();
1415
1416 if let Some(port) = active_port {
1417 debug!("daemon {id} active_port detected: {port}");
1418 let mut state_file = SUPERVISOR.state_file.lock().await;
1419 if let Some(d) = state_file.daemons.get_mut(&id) {
1420 if d.pid == Some(pid) {
1424 d.active_port = Some(port);
1425 } else {
1426 debug!(
1427 "daemon {id}: skipping active_port write — PID mismatch \
1428 (expected {pid}, current {:?})",
1429 d.pid
1430 );
1431 return;
1432 }
1433 }
1434 if let Err(e) = state_file.write() {
1435 debug!("Failed to write state after detecting active_port for {id}: {e}");
1436 }
1437 return;
1438 }
1439
1440 debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1441 }
1442
1443 debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1444 });
1445}
1446
1447fn is_daemon_slug_target(id: &DaemonId) -> bool {
1455 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1459 slugs.iter().any(|(slug, entry)| {
1460 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1461 id.name() == daemon_name
1462 })
1463}
1464
1465#[cfg(all(test, unix))]
1466mod tests {
1467 use super::*;
1468
1469 #[test]
1470 fn test_resolve_run_identity_empty_without_sudo() {
1471 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1472 assert_eq!(identity, RunIdentity::Inherit);
1473 }
1474
1475 #[test]
1476 fn test_resolve_run_identity_sudo_fallback() {
1477 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1478 let RunIdentity::Switch { uid, gid, .. } = identity else {
1479 panic!("expected identity switch");
1480 };
1481 assert_eq!(uid.as_raw(), 501);
1482 assert_eq!(gid.as_raw(), 20);
1483 }
1484
1485 #[test]
1486 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1487 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1488 assert_eq!(identity, RunIdentity::Inherit);
1489 }
1490
1491 #[test]
1492 fn test_resolve_configured_user_root_name() {
1493 let identity = resolve_configured_user("root").unwrap();
1494 let RunIdentity::Switch { uid, username, .. } = identity else {
1495 panic!("expected identity switch");
1496 };
1497 assert_eq!(uid.as_raw(), 0);
1498 assert_eq!(
1499 username.as_deref().and_then(|s| s.to_str().ok()),
1500 Some("root")
1501 );
1502 }
1503
1504 #[test]
1505 fn test_resolve_configured_user_root_uid() {
1506 let identity = resolve_configured_user("0").unwrap();
1507 let RunIdentity::Switch { uid, username, .. } = identity else {
1508 panic!("expected identity switch");
1509 };
1510 assert_eq!(uid.as_raw(), 0);
1511 assert_eq!(
1512 username.as_deref().and_then(|s| s.to_str().ok()),
1513 Some("root")
1514 );
1515 }
1516
1517 #[test]
1518 fn test_resolve_configured_user_missing_user_fails() {
1519 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1520 .unwrap_err()
1521 .to_string();
1522 assert!(err.contains("does not exist"));
1523 }
1524
1525 #[test]
1526 fn test_resolve_run_identity_requires_root_for_user_switch() {
1527 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1528 .unwrap_err()
1529 .to_string();
1530 assert!(err.contains("Restart the supervisor with sudo"));
1531 }
1532
1533 #[test]
1534 fn test_resolve_run_identity_same_user_is_noop() {
1535 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1536 assert_eq!(identity, RunIdentity::Inherit);
1537 }
1538}