1use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::log_store::LogStore;
13use crate::log_store::sqlite::LOG_STORE;
14use crate::procs::PROCS;
15use crate::settings::settings;
16use crate::shell::Shell;
17use crate::supervisor::state::UpsertDaemonOpts;
18use crate::{Result, env};
19use itertools::Itertools;
20use miette::IntoDiagnostic;
21use once_cell::sync::Lazy;
22use regex::Regex;
23use std::collections::HashMap;
24#[cfg(unix)]
25use std::ffi::CString;
26use std::iter::once;
27use std::sync::atomic;
28use std::time::Duration;
29use tokio::io::AsyncBufReadExt;
30use tokio::select;
31use tokio::sync::oneshot;
32use tokio::time;
33
34static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
36 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
37
38#[cfg(unix)]
39#[derive(Clone, Debug, PartialEq, Eq)]
40enum RunIdentity {
41 Inherit,
42 Switch {
43 uid: nix::unistd::Uid,
44 gid: nix::unistd::Gid,
45 username: Option<CString>,
46 },
47}
48
49pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
51 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
52 if let Some(re) = cache.get(pattern) {
53 return Some(re.clone());
54 }
55 match Regex::new(pattern) {
56 Ok(re) => {
57 cache.insert(pattern.to_string(), re.clone());
58 Some(re)
59 }
60 Err(e) => {
61 error!("invalid regex pattern '{pattern}': {e}");
62 None
63 }
64 }
65}
66
67impl Supervisor {
68 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
70 let id = &opts.id;
71 let cmd = opts.cmd.clone();
72
73 {
75 let mut pending = self.pending_autostops.lock().await;
76 if pending.remove(id).is_some() {
77 info!("cleared pending autostop for {id} (daemon starting)");
78 }
79 }
80
81 let daemon = self.get_daemon(id).await;
82 if let Some(daemon) = daemon {
83 if !daemon.status.is_stopping()
86 && !daemon.status.is_stopped()
87 && let Some(pid) = daemon.pid
88 {
89 if opts.force {
90 self.stop(id).await?;
91 info!("run: stop completed for daemon {id}");
92 } else {
93 warn!("daemon {id} already running with pid {pid}");
94 return Ok(IpcResponse::DaemonAlreadyRunning);
95 }
96 }
97 }
98
99 if opts.wait_ready && opts.retry.count() > 0 {
101 let max_attempts = opts.retry.count().saturating_add(1);
103 for attempt in 0..max_attempts {
104 let mut retry_opts = opts.clone();
105 retry_opts.retry_count = attempt;
106 retry_opts.cmd = cmd.clone();
107
108 let result = self.run_once(retry_opts).await?;
109
110 match result {
111 IpcResponse::DaemonReady { daemon } => {
112 return Ok(IpcResponse::DaemonReady { daemon });
113 }
114 IpcResponse::DaemonFailedWithCode { exit_code } => {
115 if attempt < opts.retry.count() {
116 let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
117 info!(
118 "daemon {id} failed (attempt {}/{}), retrying in {}s",
119 attempt + 1,
120 max_attempts,
121 backoff_secs
122 );
123 fire_hook(
124 HookType::OnRetry,
125 id.clone(),
126 opts.dir.0.clone(),
127 attempt + 1,
128 opts.env.clone(),
129 vec![],
130 )
131 .await;
132 time::sleep(Duration::from_secs(backoff_secs)).await;
133 continue;
134 } else {
135 info!("daemon {id} failed after {max_attempts} attempts");
136 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
137 }
138 }
139 other => return Ok(other),
140 }
141 }
142 }
143
144 self.run_once(opts).await
146 }
147
148 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
150 let id = &opts.id;
151 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
153
154 let (ready_tx, ready_rx) = if opts.wait_ready {
156 let (tx, rx) = oneshot::channel();
157 (Some(tx), Some(rx))
158 } else {
159 (None, None)
160 };
161
162 let expected_ports = opts
164 .port
165 .as_ref()
166 .map(|p| p.expect.clone())
167 .unwrap_or_default();
168 let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
169 let port_cfg = opts.port.as_ref().unwrap();
170 match check_ports_available(
171 &expected_ports,
172 port_cfg.auto_bump(),
173 port_cfg.max_bump_attempts(),
174 )
175 .await
176 {
177 Ok(resolved) => {
178 let ready_port = if let Some(configured_port) = opts.ready_port {
179 let bump_offset = resolved
181 .first()
182 .unwrap_or(&0)
183 .saturating_sub(*expected_ports.first().unwrap_or(&0));
184 if expected_ports.contains(&configured_port) && bump_offset > 0 {
185 configured_port
186 .checked_add(bump_offset)
187 .or(Some(configured_port))
188 } else {
189 Some(configured_port)
190 }
191 } else if opts.ready_output.is_none()
192 && opts.ready_http.is_none()
193 && opts.ready_cmd.is_none()
194 && opts.ready_delay.is_none()
195 {
196 resolved.first().copied().filter(|&p| p != 0)
200 } else {
201 None
205 };
206 info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
207 (resolved, ready_port)
208 }
209 Err(e) => {
210 error!("daemon {id}: port check failed: {e}");
211 if let Some(port_error) = e.downcast_ref::<PortError>() {
213 match port_error {
214 PortError::InUse { port, process, pid } => {
215 return Ok(IpcResponse::PortConflict {
216 port: *port,
217 process: process.clone(),
218 pid: *pid,
219 });
220 }
221 PortError::NoAvailablePort {
222 start_port,
223 attempts,
224 } => {
225 return Ok(IpcResponse::NoAvailablePort {
226 start_port: *start_port,
227 attempts: *attempts,
228 });
229 }
230 }
231 }
232 return Ok(IpcResponse::DaemonFailed {
233 error: e.to_string(),
234 });
235 }
236 }
237 } else {
238 if let Some(port) = opts.ready_port {
244 if port > 0 {
245 if let Some((pid, process)) = detect_port_conflict(port).await {
246 return Ok(IpcResponse::PortConflict { port, process, pid });
247 }
248 }
249 }
250 (Vec::new(), opts.ready_port)
251 };
252
253 let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
254 match settings().resolve_mise_bin() {
255 Some(mise_bin) => {
256 let mise_bin_str = mise_bin.to_string_lossy().to_string();
257 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
258 once("exec".to_string())
259 .chain(once(mise_bin_str))
260 .chain(once("x".to_string()))
261 .chain(once("--".to_string()))
262 .chain(cmd)
263 .collect_vec()
264 }
265 None => {
266 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
267 once("exec".to_string()).chain(cmd).collect_vec()
268 }
269 }
270 } else {
271 once("exec".to_string()).chain(cmd).collect_vec()
272 };
273 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
274 #[cfg(unix)]
275 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
276 Ok(identity) => identity,
277 Err(e) => {
278 return Ok(IpcResponse::DaemonFailed {
279 error: e.to_string(),
280 });
281 }
282 };
283 info!("run: spawning daemon {id} with args: {args:?}");
284
285 #[cfg(unix)]
287 let pty_pair = if opts.pty.unwrap_or(false) {
288 match super::pty::openpty() {
289 Ok(pair) => {
290 info!("daemon {id}: allocated PTY (pty = true)");
291 Some(pair)
292 }
293 Err(e) => {
294 warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
295 None
296 }
297 }
298 } else {
299 None
300 };
301
302 let mut cmd = tokio::process::Command::new("sh");
303
304 #[cfg(unix)]
305 if let Some(ref pair) = pty_pair {
306 let slave_file = std::fs::File::from(
310 pair.slave
311 .try_clone()
312 .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
313 );
314 cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
315 |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
316 )?));
317 cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
318 |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
319 )?));
320 cmd.stderr(std::process::Stdio::from(slave_file));
321 } else {
322 cmd.stdout(std::process::Stdio::piped())
323 .stderr(std::process::Stdio::piped());
324 }
325
326 #[cfg(not(unix))]
327 {
328 cmd.stdout(std::process::Stdio::piped())
329 .stderr(std::process::Stdio::piped());
330 }
331
332 cmd.args(&args).current_dir(&opts.dir);
333
334 #[cfg(unix)]
335 if pty_pair.is_none() {
336 cmd.stdin(std::process::Stdio::null());
337 }
338
339 #[cfg(not(unix))]
340 cmd.stdin(std::process::Stdio::null());
341
342 if let Some(ref path) = *env::ORIGINAL_PATH {
344 cmd.env("PATH", path);
345 }
346
347 if let Some(ref env_vars) = opts.env {
349 cmd.envs(env_vars);
350 }
351
352 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
354 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
355 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
356
357 if !resolved_ports.is_empty() {
359 cmd.env("PORT", resolved_ports[0].to_string());
363 for (i, port) in resolved_ports.iter().enumerate() {
365 cmd.env(format!("PORT{i}"), port.to_string());
366 }
367 }
368
369 inject_proxy_env(&mut cmd, &opts.slug);
371
372 #[cfg(unix)]
373 {
374 let run_identity = run_identity.clone();
375 let use_pty = pty_pair.is_some();
376 unsafe {
377 cmd.pre_exec(move || {
378 nix::unistd::setsid().map_err(nix_to_io_error)?;
379
380 if use_pty {
384 let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
385 if ret < 0 {
386 #[cfg(target_os = "linux")]
389 eprintln!(
390 "pitchfork: TIOCSCTTY failed: {}",
391 std::io::Error::last_os_error()
392 );
393 }
394 }
395
396 apply_run_identity(&run_identity)?;
397 Ok(())
398 });
399 }
400 }
401
402 let mut child = cmd.spawn().into_diagnostic()?;
403 let pid = match child.id() {
404 Some(p) => p,
405 None => {
406 warn!("Daemon {id} exited before PID could be captured");
407 return Ok(IpcResponse::DaemonFailed {
408 error: "Process exited immediately".to_string(),
409 });
410 }
411 };
412 info!("started daemon {id} with pid {pid}");
413 PROCS.refresh_pids(&[pid]);
414 let daemon = self
415 .upsert_daemon(
416 UpsertDaemonOpts::builder(id.clone())
417 .set(|o| {
418 o.pid = Some(pid);
419 o.status = DaemonStatus::Running;
420 o.shell_pid = opts.shell_pid;
421 o.dir = Some(opts.dir.0.clone());
422 o.cmd = Some(original_cmd);
423 o.autostop = opts.autostop;
424 o.cron_schedule = opts.cron_schedule.clone();
425 o.cron_retrigger = opts.cron_retrigger;
426 o.cron_immediate = opts.cron_immediate;
427 o.retry = Some(opts.retry);
428 o.retry_count = Some(opts.retry_count);
429 o.ready_delay = opts.ready_delay;
430 o.ready_output = opts.ready_output.clone();
431 o.ready_http = opts.ready_http.clone();
432 o.ready_port = effective_ready_port;
433 o.ready_cmd = opts.ready_cmd.clone();
434 o.port = crate::config_types::PortConfig::from_parts(
435 expected_ports,
436 opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
437 );
438 o.resolved_port = resolved_ports;
439 o.depends = Some(opts.depends.clone());
440 o.env = opts.env.clone();
441 o.watch = Some(opts.watch.clone());
442 o.watch_mode = Some(opts.watch_mode);
443 o.watch_base_dir = opts.watch_base_dir.clone();
444 o.mise = opts.mise;
445 o.user = opts.user.clone();
446 o.memory_limit = opts.memory_limit;
447 o.cpu_limit = opts.cpu_limit;
448 o.stop_signal = opts.stop_signal;
449 o.pty = opts.pty;
450 })
451 .build(),
452 )
453 .await?;
454
455 let id_clone = id.clone();
456 let ready_delay = opts.ready_delay;
457 let ready_output = opts.ready_output.clone();
458 let ready_http = opts.ready_http.clone();
459 let ready_port = effective_ready_port;
460 let ready_cmd = opts.ready_cmd.clone();
461 let daemon_dir = opts.dir.0.clone();
462 let hook_retry_count = opts.retry_count;
463 let hook_retry = opts.retry;
464 let hook_daemon_env = opts.env.clone();
465 let on_output_hook = opts.on_output_hook.clone();
466 let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
472 || (settings().proxy.enable && is_daemon_slug_target(id));
473 let daemon_pid = pid;
474
475 #[cfg(unix)]
479 let pty_reader = pty_pair.map(|p| {
480 tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
481 .lines()
482 });
483 #[cfg(not(unix))]
484 let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
485 let stdout_reader = if pty_reader.is_none() {
486 child
487 .stdout
488 .take()
489 .map(|s| tokio::io::BufReader::new(s).lines())
490 } else {
491 None
492 };
493 let stderr_reader = if pty_reader.is_none() {
494 child
495 .stderr
496 .take()
497 .map(|s| tokio::io::BufReader::new(s).lines())
498 } else {
499 None
500 };
501
502 if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
503 error!("Failed to capture stdout/stderr for daemon {id}");
504 }
505
506 tokio::spawn(async move {
507 let id = id_clone;
508
509 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
511
512 if let Some(mut reader) = pty_reader {
513 tokio::spawn(async move {
517 while let Ok(Some(mut line)) = reader.next_line().await {
518 if line.ends_with('\r') {
520 line.pop();
521 }
522 if output_tx.send(line).await.is_err() {
523 break;
524 }
525 }
526 });
527 } else {
528 if let Some(mut stdout) = stdout_reader {
533 let tx = output_tx.clone();
534 tokio::spawn(async move {
535 while let Ok(Some(line)) = stdout.next_line().await {
536 if tx.send(line).await.is_err() {
537 break;
538 }
539 }
540 });
541 }
542 if let Some(mut stderr) = stderr_reader {
543 let tx = output_tx.clone();
544 tokio::spawn(async move {
545 while let Ok(Some(line)) = stderr.next_line().await {
546 if tx.send(line).await.is_err() {
547 break;
548 }
549 }
550 });
551 }
552 drop(output_tx);
554 }
555 let log_store = &*LOG_STORE;
556 let format_line = |line: String| line;
557
558 let mut ready_notified = false;
562 let mut ready_tx = ready_tx;
563 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
564 let mut active_port_spawned = false;
566
567 let on_output_hook = match on_output_hook {
571 Some(ref hook) => match hook.validate(id.name()) {
572 Ok(()) => on_output_hook,
573 Err(e) => {
574 error!("{e}");
575 None
576 }
577 },
578 None => None,
579 };
580
581 let on_output_pattern: Option<regex::Regex> = on_output_hook
584 .as_ref()
585 .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
586 let on_output_debounce = on_output_hook
587 .as_ref()
588 .map(|h| h.debounce_duration())
589 .unwrap_or(Duration::from_millis(1000));
590 let mut on_output_last_fired: Option<std::time::Instant> = None;
592
593 let mut delay_timer =
594 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
595
596 let s = settings();
598 let ready_check_interval = s.supervisor_ready_check_interval();
599 let http_client_timeout = s.supervisor_http_client_timeout();
600
601 let mut http_check_interval = ready_http
603 .as_ref()
604 .map(|_| tokio::time::interval(ready_check_interval));
605 let http_client = ready_http.as_ref().map(|_| {
606 reqwest::Client::builder()
607 .timeout(http_client_timeout)
608 .build()
609 .unwrap_or_default()
610 });
611
612 let mut port_check_interval =
614 ready_port.map(|_| tokio::time::interval(ready_check_interval));
615
616 let mut cmd_check_interval = ready_cmd
618 .as_ref()
619 .map(|_| tokio::time::interval(ready_check_interval));
620
621 let (exit_tx, mut exit_rx) =
623 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
624
625 let child_pid = child.id().unwrap_or(0);
627 tokio::spawn(async move {
628 let result = child.wait().await;
629 #[cfg(all(unix, not(target_os = "linux")))]
639 let result = match &result {
640 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
641 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
642 warn!(
643 "daemon pid {child_pid} wait() got ECHILD; \
644 recovered exit code {code} from zombie reaper"
645 );
646 use std::os::unix::process::ExitStatusExt;
651 if code >= 0 {
652 Ok(std::process::ExitStatus::from_raw(code << 8))
653 } else {
654 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
656 }
657 } else {
658 warn!(
659 "daemon pid {child_pid} wait() got ECHILD but no \
660 stashed status found; reporting as error"
661 );
662 result
663 }
664 }
665 _ => result,
666 };
667 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
668 let _ = exit_tx.send(result).await;
669 });
670
671 #[allow(unused_assignments)]
672 let mut exit_status = None;
674
675 if has_port_config
681 && ready_pattern.is_none()
682 && ready_http.is_none()
683 && ready_port.is_none()
684 && ready_cmd.is_none()
685 && delay_timer.is_none()
686 {
687 active_port_spawned = true;
688 detect_and_store_active_port(id.clone(), daemon_pid);
689 }
690
691 loop {
692 select! {
693 Some(line) = output_rx.recv() => {
694 let formatted = format_line(line.clone());
695 if let Err(e) = log_store.append(&id, &formatted) {
696 error!("Failed to write to log for daemon {id}: {e}");
697 }
698 trace!("output: {id} {formatted}");
699
700 let line_clean = console::strip_ansi_codes(&line).to_string();
703
704 if !ready_notified
706 && let Some(ref pattern) = ready_pattern
707 && pattern.is_match(&line_clean)
708 {
709 info!("daemon {id} ready: output matched pattern");
710 ready_notified = true;
711 if let Some(tx) = ready_tx.take() {
712 let _ = tx.send(Ok(()));
713 }
714 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
715 if !active_port_spawned && has_port_config {
716 active_port_spawned = true;
717 detect_and_store_active_port(id.clone(), daemon_pid);
718 }
719 }
720
721 if let Some(ref hook) = on_output_hook {
723 let matched = match (&hook.filter, &on_output_pattern) {
724 (Some(substr), _) => line_clean.contains(substr.as_str()),
725 (None, Some(re)) => re.is_match(&line_clean),
726 (None, None) => true,
727 };
728 if matched {
729 let now = std::time::Instant::now();
730 let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
731 if elapsed.is_none_or(|e| e >= on_output_debounce) {
732 on_output_last_fired = Some(now);
733 hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
734 }
735 }
736 }
737 }
738 Some(result) = exit_rx.recv() => {
739 exit_status = Some(result);
741 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
742 if !ready_notified {
743 if let Some(tx) = ready_tx.take() {
744 let is_success = exit_status.as_ref()
746 .and_then(|r| r.as_ref().ok())
747 .map(|s| s.success())
748 .unwrap_or(false);
749
750 if is_success {
751 debug!("daemon {id} exited successfully before ready check, sending success notification");
752 let _ = tx.send(Ok(()));
753 } else {
754 let exit_code = exit_status.as_ref()
755 .and_then(|r| r.as_ref().ok())
756 .and_then(|s| s.code());
757 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
758 let _ = tx.send(Err(exit_code));
759 }
760 }
761 } else {
762 debug!("daemon {id} was already marked ready, not sending notification");
763 }
764 break;
765 },
766 _ = async {
767 if let Some(ref mut interval) = http_check_interval {
768 interval.tick().await;
769 } else {
770 std::future::pending::<()>().await;
771 }
772 }, if !ready_notified && ready_http.is_some() => {
773 if let (Some(http), Some(client)) = (&ready_http, &http_client) {
774 match client.get(&http.url).send().await {
775 Ok(response) if http.accepts_status(response.status().as_u16()) => {
776 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
777 ready_notified = true;
778 if let Some(tx) = ready_tx.take() {
779 let _ = tx.send(Ok(()));
780 }
781 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
782 http_check_interval = None;
783 if !active_port_spawned && has_port_config {
784 active_port_spawned = true;
785 detect_and_store_active_port(id.clone(), daemon_pid);
786 }
787 }
788 Ok(response) => {
789 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
790 }
791 Err(e) => {
792 trace!("daemon {id} HTTP check failed: {e}");
793 }
794 }
795 }
796 }
797 _ = async {
798 if let Some(ref mut interval) = port_check_interval {
799 interval.tick().await;
800 } else {
801 std::future::pending::<()>().await;
802 }
803 }, if !ready_notified && ready_port.is_some() => {
804 if let Some(port) = ready_port {
805 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
806 Ok(_) => {
807 info!("daemon {id} ready: TCP port {port} is listening");
808 ready_notified = true;
809 if let Some(tx) = ready_tx.take() {
810 let _ = tx.send(Ok(()));
811 }
812 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
813 port_check_interval = None;
815 if !active_port_spawned && has_port_config {
816 active_port_spawned = true;
817 detect_and_store_active_port(id.clone(), daemon_pid);
818 }
819 }
820 Err(_) => {
821 trace!("daemon {id} port check: port {port} not listening yet");
822 }
823 }
824 }
825 }
826 _ = async {
827 if let Some(ref mut interval) = cmd_check_interval {
828 interval.tick().await;
829 } else {
830 std::future::pending::<()>().await;
831 }
832 }, if !ready_notified && ready_cmd.is_some() => {
833 if let Some(ref cmd) = ready_cmd {
834 let mut command = Shell::default_for_platform().command(cmd);
836 command
837 .current_dir(&daemon_dir)
838 .stdout(std::process::Stdio::null())
839 .stderr(std::process::Stdio::null());
840 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
841 match result {
842 Ok(status) if status.success() => {
843 info!("daemon {id} ready: readiness command succeeded");
844 ready_notified = true;
845 if let Some(tx) = ready_tx.take() {
846 let _ = tx.send(Ok(()));
847 }
848 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
849 cmd_check_interval = None;
851 if !active_port_spawned && has_port_config {
852 active_port_spawned = true;
853 detect_and_store_active_port(id.clone(), daemon_pid);
854 }
855 }
856 Ok(_) => {
857 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
858 }
859 Err(e) => {
860 trace!("daemon {id} cmd check failed: {e}");
861 }
862 }
863 }
864 }
865 _ = async {
866 if let Some(ref mut timer) = delay_timer {
867 timer.await;
868 } else {
869 std::future::pending::<()>().await;
870 }
871 } => {
872 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
873 info!("daemon {id} ready: delay elapsed");
874 ready_notified = true;
875 if let Some(tx) = ready_tx.take() {
876 let _ = tx.send(Ok(()));
877 }
878 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
879 }
880 delay_timer = None;
882 if !active_port_spawned && has_port_config {
883 active_port_spawned = true;
884 detect_and_store_active_port(id.clone(), daemon_pid);
885 }
886 }
887 }
888 }
889
890 {
892 let mut state_file = SUPERVISOR.state_file.lock().await;
893 state_file.clear_active_port(&id);
894 }
895
896 let exit_status = if let Some(status) = exit_status {
898 status
899 } else {
900 match exit_rx.recv().await {
902 Some(status) => status,
903 None => {
904 warn!("daemon {id} exit channel closed without receiving status");
905 Err(std::io::Error::other("exit channel closed"))
906 }
907 }
908 };
909 let current_daemon = SUPERVISOR.get_daemon(&id).await;
910
911 SUPERVISOR
916 .active_monitors
917 .fetch_add(1, atomic::Ordering::Release);
918 struct MonitorGuard;
919 impl Drop for MonitorGuard {
920 fn drop(&mut self) {
921 SUPERVISOR
922 .active_monitors
923 .fetch_sub(1, atomic::Ordering::Release);
924 SUPERVISOR.monitor_done.notify_waiters();
925 }
926 }
927 let _monitor_guard = MonitorGuard;
928 if current_daemon.is_none()
933 || current_daemon.as_ref().is_some_and(|d| {
934 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
935 })
936 {
937 return;
939 }
940 let already_stopped = current_daemon
945 .as_ref()
946 .is_some_and(|d| d.status.is_stopped());
947 let is_stopping = already_stopped
948 || current_daemon
949 .as_ref()
950 .is_some_and(|d| d.status.is_stopping());
951
952 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
954 (Ok(status), true) => {
955 (status.code().unwrap_or(-1), "stop")
959 }
960 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
961 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
962 (Err(_), true) => {
963 (-1, "stop")
965 }
966 (Err(_), false) => (-1, "fail"),
967 };
968
969 if !already_stopped {
971 if let Ok(status) = &exit_status {
972 info!("daemon {id} exited with status {status}");
973 }
974 let (new_status, last_exit_success) = match exit_reason {
975 "stop" | "exit" => (
976 DaemonStatus::Stopped,
977 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
978 ),
979 _ => (DaemonStatus::Errored(exit_code), false),
980 };
981 if let Err(e) = SUPERVISOR
982 .upsert_daemon(
983 UpsertDaemonOpts::builder(id.clone())
984 .set(|o| {
985 o.pid = None;
986 o.status = new_status;
987 o.last_exit_success = Some(last_exit_success);
988 })
989 .build(),
990 )
991 .await
992 {
993 error!("Failed to update daemon state for {id}: {e}");
994 }
995 }
996
997 let hook_extra_env = vec![
999 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1000 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1001 ];
1002
1003 let hooks_to_fire: Vec<HookType> = match exit_reason {
1005 "stop" => vec![HookType::OnStop, HookType::OnExit],
1006 "exit" => vec![HookType::OnExit],
1007 _ if hook_retry_count >= hook_retry.count() => {
1009 vec![HookType::OnFail, HookType::OnExit]
1010 }
1011 _ => vec![],
1012 };
1013
1014 for hook_type in hooks_to_fire {
1015 fire_hook(
1016 hook_type,
1017 id.clone(),
1018 daemon_dir.clone(),
1019 hook_retry_count,
1020 hook_daemon_env.clone(),
1021 hook_extra_env.clone(),
1022 )
1023 .await;
1024 }
1025 });
1026
1027 if let Some(ready_rx) = ready_rx {
1029 match ready_rx.await {
1030 Ok(Ok(())) => {
1031 info!("daemon {id} is ready");
1032 Ok(IpcResponse::DaemonReady { daemon })
1033 }
1034 Ok(Err(exit_code)) => {
1035 error!("daemon {id} failed before becoming ready");
1036 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1037 }
1038 Err(_) => {
1039 error!("readiness channel closed unexpectedly for daemon {id}");
1040 Ok(IpcResponse::DaemonStart { daemon })
1041 }
1042 }
1043 } else {
1044 Ok(IpcResponse::DaemonStart { daemon })
1045 }
1046 }
1047
1048 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1050 let pitchfork_id = DaemonId::pitchfork();
1051 if *id == pitchfork_id {
1052 return Ok(IpcResponse::Error(
1053 "Cannot stop supervisor via stop command".into(),
1054 ));
1055 }
1056 info!("stopping daemon: {id}");
1057 if let Some(daemon) = self.get_daemon(id).await {
1058 trace!("daemon to stop: {daemon}");
1059 if let Some(pid) = daemon.pid {
1060 trace!("killing pid: {pid}");
1061 PROCS.refresh_pids(&[pid]);
1062 if PROCS.is_running(pid) {
1063 self.upsert_daemon(
1065 UpsertDaemonOpts::builder(id.clone())
1066 .set(|o| {
1067 o.pid = Some(pid);
1068 o.status = DaemonStatus::Stopping;
1069 })
1070 .build(),
1071 )
1072 .await?;
1073
1074 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1077 let stop_signal: i32 = stop_cfg.signal.into();
1078 if let Err(e) = PROCS
1079 .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1080 .await
1081 {
1082 debug!("failed to kill pid {pid}: {e}");
1083 PROCS.refresh_processes();
1085 if PROCS.is_running(pid) {
1086 debug!("failed to stop pid {pid}: process still running after kill");
1088 self.upsert_daemon(
1089 UpsertDaemonOpts::builder(id.clone())
1090 .set(|o| {
1091 o.pid = Some(pid); o.status = DaemonStatus::Running;
1093 })
1094 .build(),
1095 )
1096 .await?;
1097 return Ok(IpcResponse::DaemonStopFailed {
1098 error: format!(
1099 "process {pid} still running after kill attempt: {e}"
1100 ),
1101 });
1102 }
1103 }
1104
1105 self.upsert_daemon(
1110 UpsertDaemonOpts::builder(id.clone())
1111 .set(|o| {
1112 o.pid = None;
1113 o.status = DaemonStatus::Stopped;
1114 o.last_exit_success = Some(true); })
1116 .build(),
1117 )
1118 .await?;
1119 } else {
1120 debug!("pid {pid} not running, process may have exited unexpectedly");
1121 self.upsert_daemon(
1124 UpsertDaemonOpts::builder(id.clone())
1125 .set(|o| {
1126 o.pid = None;
1127 o.status = DaemonStatus::Stopped;
1128 })
1129 .build(),
1130 )
1131 .await?;
1132 return Ok(IpcResponse::DaemonWasNotRunning);
1133 }
1134 Ok(IpcResponse::Ok)
1135 } else {
1136 debug!("daemon {id} not running");
1137 Ok(IpcResponse::DaemonNotRunning)
1138 }
1139 } else {
1140 debug!("daemon {id} not found");
1141 Ok(IpcResponse::DaemonNotFound)
1142 }
1143 }
1144}
1145
1146#[cfg(unix)]
1147fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1148 let s = settings();
1149 let settings_user = s.supervisor.user.trim();
1150 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1151 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1152 let configured = daemon_user.or(settings_user);
1153 let current_uid = nix::unistd::Uid::effective().as_raw();
1154 let current_gid = nix::unistd::Gid::effective().as_raw();
1155 resolve_run_identity(
1156 configured,
1157 current_uid,
1158 current_gid,
1159 std::env::var("SUDO_UID").ok().as_deref(),
1160 std::env::var("SUDO_GID").ok().as_deref(),
1161 )
1162}
1163
1164#[cfg(unix)]
1165fn resolve_run_identity(
1166 configured: Option<&str>,
1167 current_uid: u32,
1168 current_gid: u32,
1169 sudo_uid: Option<&str>,
1170 sudo_gid: Option<&str>,
1171) -> Result<RunIdentity> {
1172 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1173 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1174 if let Some(user) = configured {
1175 let identity = resolve_configured_user(user)?;
1176 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1177 if identity.matches(current_uid, current_gid) {
1178 return Ok(RunIdentity::Inherit);
1179 }
1180 return Ok(identity);
1181 }
1182
1183 if current_uid.is_root()
1184 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1185 {
1186 return Ok(identity);
1187 }
1188
1189 Ok(RunIdentity::Inherit)
1190}
1191
1192#[cfg(unix)]
1193fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1194 if user.chars().all(|c| c.is_ascii_digit()) {
1195 let uid = user
1196 .parse::<u32>()
1197 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1198 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1199 .into_diagnostic()?
1200 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1201 return run_identity_from_user_record(user_record);
1202 }
1203
1204 let user_record = nix::unistd::User::from_name(user)
1205 .into_diagnostic()?
1206 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1207 run_identity_from_user_record(user_record)
1208}
1209
1210#[cfg(unix)]
1211fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1212 let username = CString::new(user.name)
1213 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1214 Ok(RunIdentity::Switch {
1215 uid: user.uid,
1216 gid: user.gid,
1217 username: Some(username),
1218 })
1219}
1220
1221#[cfg(unix)]
1222fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1223 RunIdentity::Switch {
1224 uid: nix::unistd::Uid::from_raw(uid),
1225 gid: nix::unistd::Gid::from_raw(gid),
1226 username,
1227 }
1228}
1229
1230#[cfg(unix)]
1231fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1232 let uid = sudo_uid?.parse::<u32>().ok()?;
1233 let gid = sudo_gid?.parse::<u32>().ok()?;
1234 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1235 .ok()
1236 .flatten()
1237 .and_then(|u| CString::new(u.name).ok());
1238 Some(run_identity_from_raw_ids(uid, gid, username))
1239}
1240
1241#[cfg(unix)]
1242fn ensure_can_use_identity(
1243 configured_user: &str,
1244 identity: &RunIdentity,
1245 current_uid: nix::unistd::Uid,
1246 current_gid: nix::unistd::Gid,
1247) -> Result<()> {
1248 let RunIdentity::Switch { uid, gid, .. } = identity else {
1249 return Ok(());
1250 };
1251 if *uid == current_uid && *gid == current_gid {
1252 return Ok(());
1253 }
1254 if current_uid.is_root() {
1255 return Ok(());
1256 }
1257 Err(miette::miette!(
1258 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1259 configured_user,
1260 current_uid.as_raw(),
1261 current_gid.as_raw(),
1262 uid.as_raw(),
1263 gid.as_raw()
1264 ))
1265}
1266
1267#[cfg(unix)]
1268fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1269 let RunIdentity::Switch { uid, gid, username } = identity else {
1270 return Ok(());
1271 };
1272 if let Some(username) = username {
1273 initgroups_for_user(username, *gid)?;
1274 } else {
1275 setgroups_to_primary(*gid)?;
1276 }
1277 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1278 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1279 Ok(())
1280}
1281
1282#[cfg(unix)]
1283impl RunIdentity {
1284 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1285 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1286 }
1287}
1288
1289#[cfg(unix)]
1290fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1291 let groups = [gid.as_raw() as libc::gid_t];
1292 #[cfg(any(target_os = "linux", target_os = "android"))]
1293 let group_count = groups.len();
1294 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1295 let group_count = groups.len() as libc::c_int;
1296 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1297 if rc == -1 {
1298 Err(std::io::Error::last_os_error())
1299 } else {
1300 Ok(())
1301 }
1302}
1303
1304#[cfg(unix)]
1305fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1306 let gid = gid.as_raw();
1307 #[cfg(any(
1308 target_os = "macos",
1309 target_os = "ios",
1310 target_os = "tvos",
1311 target_os = "watchos"
1312 ))]
1313 let base_gid = i32::try_from(gid)
1314 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1315
1316 #[cfg(not(any(
1317 target_os = "macos",
1318 target_os = "ios",
1319 target_os = "tvos",
1320 target_os = "watchos"
1321 )))]
1322 let base_gid = gid as libc::gid_t;
1323
1324 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1327 if rc == -1 {
1328 Err(std::io::Error::last_os_error())
1329 } else {
1330 Ok(())
1331 }
1332}
1333
1334#[cfg(unix)]
1335fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1336 std::io::Error::from_raw_os_error(err as i32)
1337}
1338
1339async fn check_ports_available(
1346 expected_ports: &[u16],
1347 auto_bump: bool,
1348 max_attempts: u32,
1349) -> Result<Vec<u16>> {
1350 if expected_ports.is_empty() {
1351 return Ok(Vec::new());
1352 }
1353
1354 for bump_offset in 0..=max_attempts {
1355 let candidate_ports: Vec<u16> = expected_ports
1357 .iter()
1358 .map(|&p| p.wrapping_add(bump_offset as u16))
1359 .collect();
1360
1361 let mut all_available = true;
1363 let mut conflicting_port = None;
1364
1365 for &port in &candidate_ports {
1366 if port == 0 {
1369 continue;
1370 }
1371
1372 if is_port_in_use(port).await {
1386 all_available = false;
1387 conflicting_port = Some(port);
1388 break;
1389 }
1390 }
1391
1392 if all_available {
1393 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1397 return Err(PortError::NoAvailablePort {
1398 start_port: expected_ports[0],
1399 attempts: bump_offset + 1,
1400 }
1401 .into());
1402 }
1403 if bump_offset > 0 {
1404 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1405 }
1406 return Ok(candidate_ports);
1407 }
1408
1409 if bump_offset == 0 && !auto_bump {
1411 if let Some(port) = conflicting_port {
1412 let (pid, process) = identify_port_owner(port).await;
1413 return Err(PortError::InUse { port, process, pid }.into());
1414 }
1415 }
1416 }
1417
1418 Err(PortError::NoAvailablePort {
1420 start_port: expected_ports[0],
1421 attempts: max_attempts + 1,
1422 }
1423 .into())
1424}
1425
1426async fn is_port_in_use(port: u16) -> bool {
1432 tokio::task::spawn_blocking(move || {
1433 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1434 match std::net::TcpListener::bind((addr, port)) {
1435 Ok(listener) => drop(listener),
1436 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1437 Err(_) => continue,
1438 }
1439 }
1440 false
1441 })
1442 .await
1443 .unwrap_or(false)
1444}
1445
1446async fn identify_port_owner(port: u16) -> (u32, String) {
1451 tokio::task::spawn_blocking(move || {
1452 listeners::get_all()
1453 .ok()
1454 .and_then(|list| {
1455 list.into_iter()
1456 .find(|l| l.socket.port() == port)
1457 .map(|l| (l.process.pid, l.process.name))
1458 })
1459 .unwrap_or((0, "unknown".to_string()))
1460 })
1461 .await
1462 .unwrap_or((0, "unknown".to_string()))
1463}
1464
1465async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1470 if !is_port_in_use(port).await {
1471 return None;
1472 }
1473 Some(identify_port_owner(port).await)
1474}
1475
1476fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1491 tokio::spawn(async move {
1492 for delay_ms in [500u64, 1000, 2000, 4000] {
1496 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1497
1498 let expected_port: Option<u16> = {
1501 let state_file = SUPERVISOR.state_file.lock().await;
1502 match state_file.daemons.get(&id) {
1503 Some(d) if d.pid.is_none() => {
1504 debug!("daemon {id}: aborting active_port detection — process exited");
1505 return;
1506 }
1507 Some(d) => d
1508 .port
1509 .as_ref()
1510 .and_then(|p| p.expect.first().copied())
1511 .filter(|&p| p > 0),
1512 None => None,
1513 }
1514 };
1515
1516 let active_port = tokio::task::spawn_blocking(move || {
1517 let listeners = listeners::get_all().ok()?;
1518
1519 PROCS.refresh_processes();
1521
1522 let descendant_pids: std::collections::HashSet<u32> = PROCS
1523 .all_children(pid)
1524 .into_iter()
1525 .chain(std::iter::once(pid))
1526 .collect();
1527
1528 let process_ports: Vec<u16> = listeners
1529 .into_iter()
1530 .filter(|listener| descendant_pids.contains(&listener.process.pid))
1531 .map(|listener| listener.socket.port())
1532 .filter(|&port| port > 0)
1533 .collect();
1534
1535 if process_ports.is_empty() {
1536 return None;
1537 }
1538
1539 if let Some(ep) = expected_port {
1542 if process_ports.contains(&ep) {
1543 return Some(ep);
1544 }
1545 }
1546
1547 process_ports.into_iter().next()
1553 })
1554 .await
1555 .ok()
1556 .flatten();
1557
1558 if let Some(port) = active_port {
1559 debug!("daemon {id} active_port detected: {port}");
1560 let mut state_file = SUPERVISOR.state_file.lock().await;
1561 if let Some(d) = state_file.daemons.get(&id) {
1562 if d.pid == Some(pid) {
1566 state_file.set_active_port(&id, port);
1567 } else {
1568 debug!(
1569 "daemon {id}: skipping active_port write — PID mismatch \
1570 (expected {pid}, current {:?})",
1571 d.pid
1572 );
1573 return;
1574 }
1575 }
1576 return;
1577 }
1578
1579 debug!(
1580 "daemon {id}: no active port detected for pid {pid} or its descendants (will retry)"
1581 );
1582 }
1583
1584 debug!(
1585 "daemon {id}: active port detection exhausted all retries for pid {pid} and its descendants"
1586 );
1587 });
1588}
1589
1590fn is_daemon_slug_target(id: &DaemonId) -> bool {
1598 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1602 slugs.iter().any(|(slug, entry)| {
1603 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1604 id.name() == daemon_name
1605 })
1606}
1607
1608#[cfg(all(test, unix))]
1609mod tests {
1610 use super::*;
1611
1612 #[test]
1613 fn test_resolve_run_identity_empty_without_sudo() {
1614 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1615 assert_eq!(identity, RunIdentity::Inherit);
1616 }
1617
1618 #[test]
1619 fn test_resolve_run_identity_sudo_fallback() {
1620 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1621 let RunIdentity::Switch { uid, gid, .. } = identity else {
1622 panic!("expected identity switch");
1623 };
1624 assert_eq!(uid.as_raw(), 501);
1625 assert_eq!(gid.as_raw(), 20);
1626 }
1627
1628 #[test]
1629 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1630 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1631 assert_eq!(identity, RunIdentity::Inherit);
1632 }
1633
1634 #[test]
1635 fn test_resolve_configured_user_root_name() {
1636 let identity = resolve_configured_user("root").unwrap();
1637 let RunIdentity::Switch { uid, username, .. } = identity else {
1638 panic!("expected identity switch");
1639 };
1640 assert_eq!(uid.as_raw(), 0);
1641 assert_eq!(
1642 username.as_deref().and_then(|s| s.to_str().ok()),
1643 Some("root")
1644 );
1645 }
1646
1647 #[test]
1648 fn test_resolve_configured_user_root_uid() {
1649 let identity = resolve_configured_user("0").unwrap();
1650 let RunIdentity::Switch { uid, username, .. } = identity else {
1651 panic!("expected identity switch");
1652 };
1653 assert_eq!(uid.as_raw(), 0);
1654 assert_eq!(
1655 username.as_deref().and_then(|s| s.to_str().ok()),
1656 Some("root")
1657 );
1658 }
1659
1660 #[test]
1661 fn test_resolve_configured_user_missing_user_fails() {
1662 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1663 .unwrap_err()
1664 .to_string();
1665 assert!(err.contains("does not exist"));
1666 }
1667
1668 #[test]
1669 fn test_resolve_run_identity_requires_root_for_user_switch() {
1670 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1671 .unwrap_err()
1672 .to_string();
1673 assert!(err.contains("Restart the supervisor with sudo"));
1674 }
1675
1676 #[test]
1677 fn test_resolve_run_identity_same_user_is_noop() {
1678 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1679 assert_eq!(identity, RunIdentity::Inherit);
1680 }
1681}
1682
1683fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1692 let s = crate::settings::settings();
1693 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1694
1695 if should_force_loopback_host(slug) && !lan_enabled {
1696 cmd.env("HOST", "127.0.0.1");
1699 }
1700
1701 if let Some(url) = build_pitchfork_url(slug, &s) {
1703 cmd.env("PITCHFORK_URL", &url);
1704 }
1705
1706 if s.proxy.enable && s.proxy.https {
1708 let ca_path = if s.proxy.tls_cert.is_empty() {
1709 crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1710 } else {
1711 std::path::PathBuf::from(&s.proxy.tls_cert)
1712 };
1713 if ca_path.exists() {
1714 cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1715 }
1716 }
1717
1718 if s.proxy.enable {
1720 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1721 cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1722 }
1723
1724 if lan_enabled {
1726 cmd.env("PITCHFORK_LAN", "1");
1727 }
1728}
1729
1730fn should_force_loopback_host(slug: &Option<String>) -> bool {
1731 let Some(slug) = slug.as_deref() else {
1732 return false;
1733 };
1734
1735 let s = crate::settings::settings();
1736 if !s.proxy.enable {
1737 return false;
1738 }
1739
1740 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1741 slugs.contains_key(slug)
1742}
1743
1744fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1748 let slug = slug.as_ref()?;
1749 if !s.proxy.enable {
1750 return None;
1751 }
1752 let scheme = if s.proxy.https { "https" } else { "http" };
1753 let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1754 let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1755 String::new()
1756 } else {
1757 format!(":{port}")
1758 };
1759 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1760 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1761 Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1762}