1use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::log_store::LogStore;
13use crate::log_store::sqlite::LOG_STORE;
14use crate::procs::PROCS;
15use crate::settings::settings;
16use crate::shell::Shell;
17use crate::supervisor::state::UpsertDaemonOpts;
18use crate::{Result, env};
19use miette::IntoDiagnostic;
20use once_cell::sync::Lazy;
21use regex::Regex;
22use std::collections::HashMap;
23#[cfg(unix)]
24use std::ffi::CString;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::AsyncBufReadExt;
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39 Inherit,
40 Switch {
41 uid: nix::unistd::Uid,
42 gid: nix::unistd::Gid,
43 username: Option<CString>,
44 },
45}
46
47pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50 if let Some(re) = cache.get(pattern) {
51 return Some(re.clone());
52 }
53 match Regex::new(pattern) {
54 Ok(re) => {
55 cache.insert(pattern.to_string(), re.clone());
56 Some(re)
57 }
58 Err(e) => {
59 error!("invalid regex pattern '{pattern}': {e}");
60 None
61 }
62 }
63}
64
65impl Supervisor {
66 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68 let id = &opts.id;
69 let cmd = opts.cmd.clone();
70
71 {
73 let mut pending = self.pending_autostops.lock().await;
74 if pending.remove(id).is_some() {
75 info!("cleared pending autostop for {id} (daemon starting)");
76 }
77 }
78
79 let daemon = self.get_daemon(id).await;
80 if let Some(daemon) = daemon {
81 if !daemon.status.is_stopping()
84 && !daemon.status.is_stopped()
85 && let Some(pid) = daemon.pid
86 {
87 if opts.force {
88 self.stop(id).await?;
89 info!("run: stop completed for daemon {id}");
90 } else {
91 warn!("daemon {id} already running with pid {pid}");
92 return Ok(IpcResponse::DaemonAlreadyRunning);
93 }
94 }
95 }
96
97 if opts.wait_ready && opts.retry.count() > 0 {
99 let max_attempts = opts.retry.count().saturating_add(1);
101 for attempt in 0..max_attempts {
102 let mut retry_opts = opts.clone();
103 retry_opts.retry_count = attempt;
104 retry_opts.cmd = cmd.clone();
105
106 let result = self.run_once(retry_opts).await?;
107
108 match result {
109 IpcResponse::DaemonReady { daemon } => {
110 return Ok(IpcResponse::DaemonReady { daemon });
111 }
112 IpcResponse::DaemonFailedWithCode { exit_code } => {
113 if attempt < opts.retry.count() {
114 let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115 info!(
116 "daemon {id} failed (attempt {}/{}), retrying in {}s",
117 attempt + 1,
118 max_attempts,
119 backoff_secs
120 );
121 fire_hook(
122 HookType::OnRetry,
123 id.clone(),
124 opts.dir.0.clone(),
125 attempt + 1,
126 opts.env.clone(),
127 vec![],
128 )
129 .await;
130 time::sleep(Duration::from_secs(backoff_secs)).await;
131 continue;
132 } else {
133 info!("daemon {id} failed after {max_attempts} attempts");
134 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135 }
136 }
137 other => return Ok(other),
138 }
139 }
140 }
141
142 self.run_once(opts).await
144 }
145
146 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148 let id = &opts.id;
149 let original_cmd = opts.cmd.clone(); let (ready_tx, ready_rx) = if opts.wait_ready {
153 let (tx, rx) = oneshot::channel();
154 (Some(tx), Some(rx))
155 } else {
156 (None, None)
157 };
158
159 let expected_ports = opts
161 .port
162 .as_ref()
163 .map(|p| p.expect.clone())
164 .unwrap_or_default();
165 let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
166 let port_cfg = opts.port.as_ref().unwrap();
167 match check_ports_available(
168 &expected_ports,
169 port_cfg.auto_bump(),
170 port_cfg.max_bump_attempts(),
171 )
172 .await
173 {
174 Ok(resolved) => {
175 let ready_port = if let Some(configured_port) = opts.ready_port {
176 let bump_offset = resolved
178 .first()
179 .unwrap_or(&0)
180 .saturating_sub(*expected_ports.first().unwrap_or(&0));
181 if expected_ports.contains(&configured_port) && bump_offset > 0 {
182 configured_port
183 .checked_add(bump_offset)
184 .or(Some(configured_port))
185 } else {
186 Some(configured_port)
187 }
188 } else if opts.ready_output.is_none()
189 && opts.ready_http.is_none()
190 && opts.ready_cmd.is_none()
191 && opts.ready_delay.is_none()
192 {
193 resolved.first().copied().filter(|&p| p != 0)
197 } else {
198 None
202 };
203 info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
204 (resolved, ready_port)
205 }
206 Err(e) => {
207 error!("daemon {id}: port check failed: {e}");
208 if let Some(port_error) = e.downcast_ref::<PortError>() {
210 match port_error {
211 PortError::InUse { port, process, pid } => {
212 return Ok(IpcResponse::PortConflict {
213 port: *port,
214 process: process.clone(),
215 pid: *pid,
216 });
217 }
218 PortError::NoAvailablePort {
219 start_port,
220 attempts,
221 } => {
222 return Ok(IpcResponse::NoAvailablePort {
223 start_port: *start_port,
224 attempts: *attempts,
225 });
226 }
227 }
228 }
229 return Ok(IpcResponse::DaemonFailed {
230 error: e.to_string(),
231 });
232 }
233 }
234 } else {
235 if let Some(port) = opts.ready_port {
241 if port > 0 {
242 if let Some((pid, process)) = detect_port_conflict(port).await {
243 return Ok(IpcResponse::PortConflict { port, process, pid });
244 }
245 }
246 }
247 (Vec::new(), opts.ready_port)
248 };
249
250 let shell_setting = settings().general.shell.clone();
254 let shell_parts = match shell_words::split(&shell_setting) {
255 Ok(parts) if !parts.is_empty() => parts,
256 Ok(_) => {
257 return Ok(IpcResponse::DaemonFailed {
258 error: "general.shell setting is empty".to_string(),
259 });
260 }
261 Err(e) => {
262 return Ok(IpcResponse::DaemonFailed {
263 error: format!("failed to parse general.shell setting {shell_setting:?}: {e}"),
264 });
265 }
266 };
267 let (shell_program, shell_args) = shell_parts.split_first().unwrap();
268
269 let run_script = opts
274 .run
275 .clone()
276 .unwrap_or_else(|| shell_words::join(&original_cmd));
277
278 let (program, args) = if opts.mise.unwrap_or(settings().general.mise) {
279 match settings().resolve_mise_bin() {
280 Some(mise_bin) => {
281 let mise_bin_str = mise_bin.to_string_lossy().to_string();
282 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
283 let mut args = vec!["x".to_string(), "--".to_string()];
284 args.push(shell_program.clone());
285 args.extend(shell_args.iter().cloned());
286 args.push(run_script);
287 (mise_bin_str, args)
288 }
289 None => {
290 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
291 let mut args: Vec<String> = shell_args.to_vec();
292 args.push(run_script);
293 (shell_program.clone(), args)
294 }
295 }
296 } else {
297 let mut args: Vec<String> = shell_args.to_vec();
298 args.push(run_script);
299 (shell_program.clone(), args)
300 };
301 #[cfg(unix)]
302 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
303 Ok(identity) => identity,
304 Err(e) => {
305 return Ok(IpcResponse::DaemonFailed {
306 error: e.to_string(),
307 });
308 }
309 };
310 info!("run: spawning daemon {id} with {program} {args:?}");
311
312 #[cfg(unix)]
314 let pty_pair = if opts.pty.unwrap_or(false) {
315 match super::pty::openpty() {
316 Ok(pair) => {
317 info!("daemon {id}: allocated PTY (pty = true)");
318 Some(pair)
319 }
320 Err(e) => {
321 warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
322 None
323 }
324 }
325 } else {
326 None
327 };
328
329 let mut cmd = tokio::process::Command::new(&program);
330
331 #[cfg(unix)]
332 if let Some(ref pair) = pty_pair {
333 let slave_file = std::fs::File::from(
337 pair.slave
338 .try_clone()
339 .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
340 );
341 cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
342 |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
343 )?));
344 cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
345 |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
346 )?));
347 cmd.stderr(std::process::Stdio::from(slave_file));
348 } else {
349 cmd.stdout(std::process::Stdio::piped())
350 .stderr(std::process::Stdio::piped());
351 }
352
353 #[cfg(not(unix))]
354 {
355 cmd.stdout(std::process::Stdio::piped())
356 .stderr(std::process::Stdio::piped());
357 }
358
359 cmd.args(&args).current_dir(&opts.dir);
360
361 #[cfg(unix)]
362 if pty_pair.is_none() {
363 cmd.stdin(std::process::Stdio::null());
364 }
365
366 #[cfg(not(unix))]
367 cmd.stdin(std::process::Stdio::null());
368
369 if let Some(ref path) = *env::ORIGINAL_PATH {
371 cmd.env("PATH", path);
372 }
373
374 if let Some(ref env_vars) = opts.env {
376 cmd.envs(env_vars);
377 }
378
379 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
381 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
382 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
383
384 if !resolved_ports.is_empty() {
386 cmd.env("PORT", resolved_ports[0].to_string());
390 for (i, port) in resolved_ports.iter().enumerate() {
392 cmd.env(format!("PORT{i}"), port.to_string());
393 }
394 }
395
396 inject_proxy_env(&mut cmd, &opts.slug);
398
399 #[cfg(unix)]
400 {
401 let run_identity = run_identity.clone();
402 let use_pty = pty_pair.is_some();
403 unsafe {
404 cmd.pre_exec(move || {
405 nix::unistd::setsid().map_err(nix_to_io_error)?;
406
407 if use_pty {
411 let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
412 if ret < 0 {
413 #[cfg(target_os = "linux")]
416 eprintln!(
417 "pitchfork: TIOCSCTTY failed: {}",
418 std::io::Error::last_os_error()
419 );
420 }
421 }
422
423 apply_run_identity(&run_identity)?;
424 Ok(())
425 });
426 }
427 }
428
429 let mut child = cmd.spawn().into_diagnostic()?;
430 let pid = match child.id() {
431 Some(p) => p,
432 None => {
433 warn!("Daemon {id} exited before PID could be captured");
434 return Ok(IpcResponse::DaemonFailed {
435 error: "Process exited immediately".to_string(),
436 });
437 }
438 };
439 info!("started daemon {id} with pid {pid}");
440 PROCS.refresh_pids(&[pid]);
441 let daemon = self
442 .upsert_daemon(
443 UpsertDaemonOpts::builder(id.clone())
444 .set(|o| {
445 o.pid = Some(pid);
446 o.status = DaemonStatus::Running;
447 o.shell_pid = opts.shell_pid;
448 o.dir = Some(opts.dir.0.clone());
449 o.cmd = Some(original_cmd);
450 o.run = opts.run.clone();
451 o.autostop = opts.autostop;
452 o.cron_schedule = opts.cron_schedule.clone();
453 o.cron_retrigger = opts.cron_retrigger;
454 o.cron_immediate = opts.cron_immediate;
455 o.retry = Some(opts.retry);
456 o.retry_count = Some(opts.retry_count);
457 o.ready_delay = opts.ready_delay;
458 o.ready_output = opts.ready_output.clone();
459 o.ready_http = opts.ready_http.clone();
460 o.ready_port = effective_ready_port;
461 o.ready_cmd = opts.ready_cmd.clone();
462 o.port = crate::config_types::PortConfig::from_parts(
463 expected_ports,
464 opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
465 );
466 o.resolved_port = resolved_ports;
467 o.depends = Some(opts.depends.clone());
468 o.env = opts.env.clone();
469 o.watch = Some(opts.watch.clone());
470 o.watch_mode = Some(opts.watch_mode);
471 o.watch_base_dir = opts.watch_base_dir.clone();
472 o.mise = opts.mise;
473 o.user = opts.user.clone();
474 o.memory_limit = opts.memory_limit;
475 o.cpu_limit = opts.cpu_limit;
476 o.stop_signal = opts.stop_signal;
477 o.pty = opts.pty;
478 })
479 .build(),
480 )
481 .await?;
482
483 let id_clone = id.clone();
484 let ready_delay = opts.ready_delay;
485 let ready_output = opts.ready_output.clone();
486 let ready_http = opts.ready_http.clone();
487 let ready_port = effective_ready_port;
488 let ready_cmd = opts.ready_cmd.clone();
489 let daemon_dir = opts.dir.0.clone();
490 let hook_retry_count = opts.retry_count;
491 let hook_retry = opts.retry;
492 let hook_daemon_env = opts.env.clone();
493 let on_output_hook = opts.on_output_hook.clone();
494 let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
500 || (settings().proxy.enable && is_daemon_slug_target(id));
501 let daemon_pid = pid;
502
503 #[cfg(unix)]
507 let pty_reader = pty_pair.map(|p| {
508 tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
509 .lines()
510 });
511 #[cfg(not(unix))]
512 let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
513 let stdout_reader = if pty_reader.is_none() {
514 child
515 .stdout
516 .take()
517 .map(|s| tokio::io::BufReader::new(s).lines())
518 } else {
519 None
520 };
521 let stderr_reader = if pty_reader.is_none() {
522 child
523 .stderr
524 .take()
525 .map(|s| tokio::io::BufReader::new(s).lines())
526 } else {
527 None
528 };
529
530 if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
531 error!("Failed to capture stdout/stderr for daemon {id}");
532 }
533
534 tokio::spawn(async move {
535 let id = id_clone;
536
537 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
539
540 if let Some(mut reader) = pty_reader {
541 tokio::spawn(async move {
545 while let Ok(Some(mut line)) = reader.next_line().await {
546 if line.ends_with('\r') {
548 line.pop();
549 }
550 if output_tx.send(line).await.is_err() {
551 break;
552 }
553 }
554 });
555 } else {
556 if let Some(mut stdout) = stdout_reader {
561 let tx = output_tx.clone();
562 tokio::spawn(async move {
563 while let Ok(Some(line)) = stdout.next_line().await {
564 if tx.send(line).await.is_err() {
565 break;
566 }
567 }
568 });
569 }
570 if let Some(mut stderr) = stderr_reader {
571 let tx = output_tx.clone();
572 tokio::spawn(async move {
573 while let Ok(Some(line)) = stderr.next_line().await {
574 if tx.send(line).await.is_err() {
575 break;
576 }
577 }
578 });
579 }
580 drop(output_tx);
582 }
583 let log_store = &*LOG_STORE;
584 let format_line = |line: String| line;
585
586 let mut ready_notified = false;
590 let mut ready_tx = ready_tx;
591 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
592 let mut active_port_spawned = false;
594
595 let on_output_hook = match on_output_hook {
599 Some(ref hook) => match hook.validate(id.name()) {
600 Ok(()) => on_output_hook,
601 Err(e) => {
602 error!("{e}");
603 None
604 }
605 },
606 None => None,
607 };
608
609 let on_output_pattern: Option<regex::Regex> = on_output_hook
612 .as_ref()
613 .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
614 let on_output_debounce = on_output_hook
615 .as_ref()
616 .map(|h| h.debounce_duration())
617 .unwrap_or(Duration::from_millis(1000));
618 let mut on_output_last_fired: Option<std::time::Instant> = None;
620
621 let mut delay_timer =
622 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
623
624 let s = settings();
626 let ready_check_interval = s.supervisor_ready_check_interval();
627 let http_client_timeout = s.supervisor_http_client_timeout();
628
629 let mut http_check_interval = ready_http
631 .as_ref()
632 .map(|_| tokio::time::interval(ready_check_interval));
633 let http_client = ready_http.as_ref().map(|_| {
634 reqwest::Client::builder()
635 .timeout(http_client_timeout)
636 .build()
637 .unwrap_or_default()
638 });
639
640 let mut port_check_interval =
642 ready_port.map(|_| tokio::time::interval(ready_check_interval));
643
644 let mut cmd_check_interval = ready_cmd
646 .as_ref()
647 .map(|_| tokio::time::interval(ready_check_interval));
648
649 let (exit_tx, mut exit_rx) =
651 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
652
653 let child_pid = child.id().unwrap_or(0);
655 tokio::spawn(async move {
656 let result = child.wait().await;
657 #[cfg(all(unix, not(target_os = "linux")))]
667 let result = match &result {
668 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
669 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
670 warn!(
671 "daemon pid {child_pid} wait() got ECHILD; \
672 recovered exit code {code} from zombie reaper"
673 );
674 use std::os::unix::process::ExitStatusExt;
679 if code >= 0 {
680 Ok(std::process::ExitStatus::from_raw(code << 8))
681 } else {
682 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
684 }
685 } else {
686 warn!(
687 "daemon pid {child_pid} wait() got ECHILD but no \
688 stashed status found; reporting as error"
689 );
690 result
691 }
692 }
693 _ => result,
694 };
695 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
696 let _ = exit_tx.send(result).await;
697 });
698
699 #[allow(unused_assignments)]
700 let mut exit_status = None;
702
703 if has_port_config
709 && ready_pattern.is_none()
710 && ready_http.is_none()
711 && ready_port.is_none()
712 && ready_cmd.is_none()
713 && delay_timer.is_none()
714 {
715 active_port_spawned = true;
716 detect_and_store_active_port(id.clone(), daemon_pid);
717 }
718
719 loop {
720 select! {
721 Some(line) = output_rx.recv() => {
722 let formatted = format_line(line.clone());
723 if let Err(e) = log_store.append(&id, &formatted) {
724 error!("Failed to write to log for daemon {id}: {e}");
725 }
726 trace!("output: {id} {formatted}");
727
728 let line_clean = console::strip_ansi_codes(&line).to_string();
731
732 if !ready_notified
734 && let Some(ref pattern) = ready_pattern
735 && pattern.is_match(&line_clean)
736 {
737 info!("daemon {id} ready: output matched pattern");
738 ready_notified = true;
739 if let Some(tx) = ready_tx.take() {
740 let _ = tx.send(Ok(()));
741 }
742 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
743 if !active_port_spawned && has_port_config {
744 active_port_spawned = true;
745 detect_and_store_active_port(id.clone(), daemon_pid);
746 }
747 }
748
749 if let Some(ref hook) = on_output_hook {
751 let matched = match (&hook.filter, &on_output_pattern) {
752 (Some(substr), _) => line_clean.contains(substr.as_str()),
753 (None, Some(re)) => re.is_match(&line_clean),
754 (None, None) => true,
755 };
756 if matched {
757 let now = std::time::Instant::now();
758 let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
759 if elapsed.is_none_or(|e| e >= on_output_debounce) {
760 on_output_last_fired = Some(now);
761 hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
762 }
763 }
764 }
765 }
766 Some(result) = exit_rx.recv() => {
767 exit_status = Some(result);
769 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
770 if !ready_notified {
771 if let Some(tx) = ready_tx.take() {
772 let is_success = exit_status.as_ref()
774 .and_then(|r| r.as_ref().ok())
775 .map(|s| s.success())
776 .unwrap_or(false);
777
778 if is_success {
779 debug!("daemon {id} exited successfully before ready check, sending success notification");
780 let _ = tx.send(Ok(()));
781 } else {
782 let exit_code = exit_status.as_ref()
783 .and_then(|r| r.as_ref().ok())
784 .and_then(|s| s.code());
785 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
786 let _ = tx.send(Err(exit_code));
787 }
788 }
789 } else {
790 debug!("daemon {id} was already marked ready, not sending notification");
791 }
792 break;
793 },
794 _ = async {
795 if let Some(ref mut interval) = http_check_interval {
796 interval.tick().await;
797 } else {
798 std::future::pending::<()>().await;
799 }
800 }, if !ready_notified && ready_http.is_some() => {
801 if let (Some(http), Some(client)) = (&ready_http, &http_client) {
802 match client.get(&http.url).send().await {
803 Ok(response) if http.accepts_status(response.status().as_u16()) => {
804 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
805 ready_notified = true;
806 if let Some(tx) = ready_tx.take() {
807 let _ = tx.send(Ok(()));
808 }
809 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
810 http_check_interval = None;
811 if !active_port_spawned && has_port_config {
812 active_port_spawned = true;
813 detect_and_store_active_port(id.clone(), daemon_pid);
814 }
815 }
816 Ok(response) => {
817 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
818 }
819 Err(e) => {
820 trace!("daemon {id} HTTP check failed: {e}");
821 }
822 }
823 }
824 }
825 _ = async {
826 if let Some(ref mut interval) = port_check_interval {
827 interval.tick().await;
828 } else {
829 std::future::pending::<()>().await;
830 }
831 }, if !ready_notified && ready_port.is_some() => {
832 if let Some(port) = ready_port {
833 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
834 Ok(_) => {
835 info!("daemon {id} ready: TCP port {port} is listening");
836 ready_notified = true;
837 if let Some(tx) = ready_tx.take() {
838 let _ = tx.send(Ok(()));
839 }
840 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
841 port_check_interval = None;
843 if !active_port_spawned && has_port_config {
844 active_port_spawned = true;
845 detect_and_store_active_port(id.clone(), daemon_pid);
846 }
847 }
848 Err(_) => {
849 trace!("daemon {id} port check: port {port} not listening yet");
850 }
851 }
852 }
853 }
854 _ = async {
855 if let Some(ref mut interval) = cmd_check_interval {
856 interval.tick().await;
857 } else {
858 std::future::pending::<()>().await;
859 }
860 }, if !ready_notified && ready_cmd.is_some() => {
861 if let Some(ref cmd) = ready_cmd {
862 let mut command = Shell::default_for_platform().command(cmd);
864 command
865 .current_dir(&daemon_dir)
866 .stdout(std::process::Stdio::null())
867 .stderr(std::process::Stdio::null());
868 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
869 match result {
870 Ok(status) if status.success() => {
871 info!("daemon {id} ready: readiness command succeeded");
872 ready_notified = true;
873 if let Some(tx) = ready_tx.take() {
874 let _ = tx.send(Ok(()));
875 }
876 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
877 cmd_check_interval = None;
879 if !active_port_spawned && has_port_config {
880 active_port_spawned = true;
881 detect_and_store_active_port(id.clone(), daemon_pid);
882 }
883 }
884 Ok(_) => {
885 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
886 }
887 Err(e) => {
888 trace!("daemon {id} cmd check failed: {e}");
889 }
890 }
891 }
892 }
893 _ = async {
894 if let Some(ref mut timer) = delay_timer {
895 timer.await;
896 } else {
897 std::future::pending::<()>().await;
898 }
899 } => {
900 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
901 info!("daemon {id} ready: delay elapsed");
902 ready_notified = true;
903 if let Some(tx) = ready_tx.take() {
904 let _ = tx.send(Ok(()));
905 }
906 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
907 }
908 delay_timer = None;
910 if !active_port_spawned && has_port_config {
911 active_port_spawned = true;
912 detect_and_store_active_port(id.clone(), daemon_pid);
913 }
914 }
915 }
916 }
917
918 {
920 let mut state_file = SUPERVISOR.state_file.lock().await;
921 state_file.clear_active_port(&id);
922 }
923
924 let exit_status = if let Some(status) = exit_status {
926 status
927 } else {
928 match exit_rx.recv().await {
930 Some(status) => status,
931 None => {
932 warn!("daemon {id} exit channel closed without receiving status");
933 Err(std::io::Error::other("exit channel closed"))
934 }
935 }
936 };
937 let current_daemon = SUPERVISOR.get_daemon(&id).await;
938
939 SUPERVISOR
944 .active_monitors
945 .fetch_add(1, atomic::Ordering::Release);
946 struct MonitorGuard;
947 impl Drop for MonitorGuard {
948 fn drop(&mut self) {
949 SUPERVISOR
950 .active_monitors
951 .fetch_sub(1, atomic::Ordering::Release);
952 SUPERVISOR.monitor_done.notify_waiters();
953 }
954 }
955 let _monitor_guard = MonitorGuard;
956 if current_daemon.is_none()
961 || current_daemon.as_ref().is_some_and(|d| {
962 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
963 })
964 {
965 return;
967 }
968 let already_stopped = current_daemon
973 .as_ref()
974 .is_some_and(|d| d.status.is_stopped());
975 let is_stopping = already_stopped
976 || current_daemon
977 .as_ref()
978 .is_some_and(|d| d.status.is_stopping());
979
980 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
982 (Ok(status), true) => {
983 (status.code().unwrap_or(-1), "stop")
987 }
988 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
989 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
990 (Err(_), true) => {
991 (-1, "stop")
993 }
994 (Err(_), false) => (-1, "fail"),
995 };
996
997 if !already_stopped {
999 if let Ok(status) = &exit_status {
1000 info!("daemon {id} exited with status {status}");
1001 }
1002 let (new_status, last_exit_success) = match exit_reason {
1003 "stop" | "exit" => (
1004 DaemonStatus::Stopped,
1005 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1006 ),
1007 _ => (DaemonStatus::Errored(exit_code), false),
1008 };
1009 if let Err(e) = SUPERVISOR
1010 .upsert_daemon(
1011 UpsertDaemonOpts::builder(id.clone())
1012 .set(|o| {
1013 o.pid = None;
1014 o.status = new_status;
1015 o.last_exit_success = Some(last_exit_success);
1016 })
1017 .build(),
1018 )
1019 .await
1020 {
1021 error!("Failed to update daemon state for {id}: {e}");
1022 }
1023 }
1024
1025 let hook_extra_env = vec![
1027 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1028 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1029 ];
1030
1031 let hooks_to_fire: Vec<HookType> = match exit_reason {
1033 "stop" => vec![HookType::OnStop, HookType::OnExit],
1034 "exit" => vec![HookType::OnExit],
1035 _ if hook_retry_count >= hook_retry.count() => {
1037 vec![HookType::OnFail, HookType::OnExit]
1038 }
1039 _ => vec![],
1040 };
1041
1042 for hook_type in hooks_to_fire {
1043 fire_hook(
1044 hook_type,
1045 id.clone(),
1046 daemon_dir.clone(),
1047 hook_retry_count,
1048 hook_daemon_env.clone(),
1049 hook_extra_env.clone(),
1050 )
1051 .await;
1052 }
1053 });
1054
1055 if let Some(ready_rx) = ready_rx {
1057 match ready_rx.await {
1058 Ok(Ok(())) => {
1059 info!("daemon {id} is ready");
1060 Ok(IpcResponse::DaemonReady { daemon })
1061 }
1062 Ok(Err(exit_code)) => {
1063 error!("daemon {id} failed before becoming ready");
1064 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1065 }
1066 Err(_) => {
1067 error!("readiness channel closed unexpectedly for daemon {id}");
1068 Ok(IpcResponse::DaemonStart { daemon })
1069 }
1070 }
1071 } else {
1072 Ok(IpcResponse::DaemonStart { daemon })
1073 }
1074 }
1075
1076 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1078 let pitchfork_id = DaemonId::pitchfork();
1079 if *id == pitchfork_id {
1080 return Ok(IpcResponse::Error(
1081 "Cannot stop supervisor via stop command".into(),
1082 ));
1083 }
1084 info!("stopping daemon: {id}");
1085 if let Some(daemon) = self.get_daemon(id).await {
1086 trace!("daemon to stop: {daemon}");
1087 if let Some(pid) = daemon.pid {
1088 trace!("killing pid: {pid}");
1089 PROCS.refresh_pids(&[pid]);
1090 if PROCS.is_running(pid) {
1091 self.upsert_daemon(
1093 UpsertDaemonOpts::builder(id.clone())
1094 .set(|o| {
1095 o.pid = Some(pid);
1096 o.status = DaemonStatus::Stopping;
1097 })
1098 .build(),
1099 )
1100 .await?;
1101
1102 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1105 let stop_signal: i32 = stop_cfg.signal.into();
1106 if let Err(e) = PROCS
1107 .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1108 .await
1109 {
1110 debug!("failed to kill pid {pid}: {e}");
1111 PROCS.refresh_processes();
1113 if PROCS.is_running(pid) {
1114 debug!("failed to stop pid {pid}: process still running after kill");
1116 self.upsert_daemon(
1117 UpsertDaemonOpts::builder(id.clone())
1118 .set(|o| {
1119 o.pid = Some(pid); o.status = DaemonStatus::Running;
1121 })
1122 .build(),
1123 )
1124 .await?;
1125 return Ok(IpcResponse::DaemonStopFailed {
1126 error: format!(
1127 "process {pid} still running after kill attempt: {e}"
1128 ),
1129 });
1130 }
1131 }
1132
1133 self.upsert_daemon(
1138 UpsertDaemonOpts::builder(id.clone())
1139 .set(|o| {
1140 o.pid = None;
1141 o.status = DaemonStatus::Stopped;
1142 o.last_exit_success = Some(true); })
1144 .build(),
1145 )
1146 .await?;
1147 } else {
1148 debug!("pid {pid} not running, process may have exited unexpectedly");
1149 self.upsert_daemon(
1152 UpsertDaemonOpts::builder(id.clone())
1153 .set(|o| {
1154 o.pid = None;
1155 o.status = DaemonStatus::Stopped;
1156 })
1157 .build(),
1158 )
1159 .await?;
1160 return Ok(IpcResponse::DaemonWasNotRunning);
1161 }
1162 Ok(IpcResponse::Ok)
1163 } else {
1164 debug!("daemon {id} not running");
1165 Ok(IpcResponse::DaemonNotRunning)
1166 }
1167 } else {
1168 debug!("daemon {id} not found");
1169 Ok(IpcResponse::DaemonNotFound)
1170 }
1171 }
1172}
1173
1174#[cfg(unix)]
1175fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1176 let s = settings();
1177 let settings_user = s.supervisor.user.trim();
1178 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1179 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1180 let configured = daemon_user.or(settings_user);
1181 let current_uid = nix::unistd::Uid::effective().as_raw();
1182 let current_gid = nix::unistd::Gid::effective().as_raw();
1183 resolve_run_identity(
1184 configured,
1185 current_uid,
1186 current_gid,
1187 std::env::var("SUDO_UID").ok().as_deref(),
1188 std::env::var("SUDO_GID").ok().as_deref(),
1189 )
1190}
1191
1192#[cfg(unix)]
1193fn resolve_run_identity(
1194 configured: Option<&str>,
1195 current_uid: u32,
1196 current_gid: u32,
1197 sudo_uid: Option<&str>,
1198 sudo_gid: Option<&str>,
1199) -> Result<RunIdentity> {
1200 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1201 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1202 if let Some(user) = configured {
1203 let identity = resolve_configured_user(user)?;
1204 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1205 if identity.matches(current_uid, current_gid) {
1206 return Ok(RunIdentity::Inherit);
1207 }
1208 return Ok(identity);
1209 }
1210
1211 if current_uid.is_root()
1212 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1213 {
1214 return Ok(identity);
1215 }
1216
1217 Ok(RunIdentity::Inherit)
1218}
1219
1220#[cfg(unix)]
1221fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1222 if user.chars().all(|c| c.is_ascii_digit()) {
1223 let uid = user
1224 .parse::<u32>()
1225 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1226 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1227 .into_diagnostic()?
1228 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1229 return run_identity_from_user_record(user_record);
1230 }
1231
1232 let user_record = nix::unistd::User::from_name(user)
1233 .into_diagnostic()?
1234 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1235 run_identity_from_user_record(user_record)
1236}
1237
1238#[cfg(unix)]
1239fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1240 let username = CString::new(user.name)
1241 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1242 Ok(RunIdentity::Switch {
1243 uid: user.uid,
1244 gid: user.gid,
1245 username: Some(username),
1246 })
1247}
1248
1249#[cfg(unix)]
1250fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1251 RunIdentity::Switch {
1252 uid: nix::unistd::Uid::from_raw(uid),
1253 gid: nix::unistd::Gid::from_raw(gid),
1254 username,
1255 }
1256}
1257
1258#[cfg(unix)]
1259fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1260 let uid = sudo_uid?.parse::<u32>().ok()?;
1261 let gid = sudo_gid?.parse::<u32>().ok()?;
1262 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1263 .ok()
1264 .flatten()
1265 .and_then(|u| CString::new(u.name).ok());
1266 Some(run_identity_from_raw_ids(uid, gid, username))
1267}
1268
1269#[cfg(unix)]
1270fn ensure_can_use_identity(
1271 configured_user: &str,
1272 identity: &RunIdentity,
1273 current_uid: nix::unistd::Uid,
1274 current_gid: nix::unistd::Gid,
1275) -> Result<()> {
1276 let RunIdentity::Switch { uid, gid, .. } = identity else {
1277 return Ok(());
1278 };
1279 if *uid == current_uid && *gid == current_gid {
1280 return Ok(());
1281 }
1282 if current_uid.is_root() {
1283 return Ok(());
1284 }
1285 Err(miette::miette!(
1286 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1287 configured_user,
1288 current_uid.as_raw(),
1289 current_gid.as_raw(),
1290 uid.as_raw(),
1291 gid.as_raw()
1292 ))
1293}
1294
1295#[cfg(unix)]
1296fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1297 let RunIdentity::Switch { uid, gid, username } = identity else {
1298 return Ok(());
1299 };
1300 if let Some(username) = username {
1301 initgroups_for_user(username, *gid)?;
1302 } else {
1303 setgroups_to_primary(*gid)?;
1304 }
1305 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1306 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1307 Ok(())
1308}
1309
1310#[cfg(unix)]
1311impl RunIdentity {
1312 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1313 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1314 }
1315}
1316
1317#[cfg(unix)]
1318fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1319 let groups = [gid.as_raw() as libc::gid_t];
1320 #[cfg(any(target_os = "linux", target_os = "android"))]
1321 let group_count = groups.len();
1322 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1323 let group_count = groups.len() as libc::c_int;
1324 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1325 if rc == -1 {
1326 Err(std::io::Error::last_os_error())
1327 } else {
1328 Ok(())
1329 }
1330}
1331
1332#[cfg(unix)]
1333fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1334 let gid = gid.as_raw();
1335 #[cfg(any(
1336 target_os = "macos",
1337 target_os = "ios",
1338 target_os = "tvos",
1339 target_os = "watchos"
1340 ))]
1341 let base_gid = i32::try_from(gid)
1342 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1343
1344 #[cfg(not(any(
1345 target_os = "macos",
1346 target_os = "ios",
1347 target_os = "tvos",
1348 target_os = "watchos"
1349 )))]
1350 let base_gid = gid as libc::gid_t;
1351
1352 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1355 if rc == -1 {
1356 Err(std::io::Error::last_os_error())
1357 } else {
1358 Ok(())
1359 }
1360}
1361
1362#[cfg(unix)]
1363fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1364 std::io::Error::from_raw_os_error(err as i32)
1365}
1366
1367async fn check_ports_available(
1374 expected_ports: &[u16],
1375 auto_bump: bool,
1376 max_attempts: u32,
1377) -> Result<Vec<u16>> {
1378 if expected_ports.is_empty() {
1379 return Ok(Vec::new());
1380 }
1381
1382 for bump_offset in 0..=max_attempts {
1383 let candidate_ports: Vec<u16> = expected_ports
1385 .iter()
1386 .map(|&p| p.wrapping_add(bump_offset as u16))
1387 .collect();
1388
1389 let mut all_available = true;
1391 let mut conflicting_port = None;
1392
1393 for &port in &candidate_ports {
1394 if port == 0 {
1397 continue;
1398 }
1399
1400 if is_port_in_use(port).await {
1414 all_available = false;
1415 conflicting_port = Some(port);
1416 break;
1417 }
1418 }
1419
1420 if all_available {
1421 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1425 return Err(PortError::NoAvailablePort {
1426 start_port: expected_ports[0],
1427 attempts: bump_offset + 1,
1428 }
1429 .into());
1430 }
1431 if bump_offset > 0 {
1432 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1433 }
1434 return Ok(candidate_ports);
1435 }
1436
1437 if bump_offset == 0 && !auto_bump {
1439 if let Some(port) = conflicting_port {
1440 let (pid, process) = identify_port_owner(port).await;
1441 return Err(PortError::InUse { port, process, pid }.into());
1442 }
1443 }
1444 }
1445
1446 Err(PortError::NoAvailablePort {
1448 start_port: expected_ports[0],
1449 attempts: max_attempts + 1,
1450 }
1451 .into())
1452}
1453
1454async fn is_port_in_use(port: u16) -> bool {
1460 tokio::task::spawn_blocking(move || {
1461 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1462 match std::net::TcpListener::bind((addr, port)) {
1463 Ok(listener) => drop(listener),
1464 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1465 Err(_) => continue,
1466 }
1467 }
1468 false
1469 })
1470 .await
1471 .unwrap_or(false)
1472}
1473
1474async fn identify_port_owner(port: u16) -> (u32, String) {
1479 tokio::task::spawn_blocking(move || {
1480 listeners::get_all()
1481 .ok()
1482 .and_then(|list| {
1483 list.into_iter()
1484 .find(|l| l.socket.port() == port)
1485 .map(|l| (l.process.pid, l.process.name))
1486 })
1487 .unwrap_or((0, "unknown".to_string()))
1488 })
1489 .await
1490 .unwrap_or((0, "unknown".to_string()))
1491}
1492
1493async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1498 if !is_port_in_use(port).await {
1499 return None;
1500 }
1501 Some(identify_port_owner(port).await)
1502}
1503
1504fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1519 tokio::spawn(async move {
1520 for delay_ms in [500u64, 1000, 2000, 4000] {
1524 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1525
1526 let expected_port: Option<u16> = {
1529 let state_file = SUPERVISOR.state_file.lock().await;
1530 match state_file.daemons.get(&id) {
1531 Some(d) if d.pid.is_none() => {
1532 debug!("daemon {id}: aborting active_port detection — process exited");
1533 return;
1534 }
1535 Some(d) => d
1536 .port
1537 .as_ref()
1538 .and_then(|p| p.expect.first().copied())
1539 .filter(|&p| p > 0),
1540 None => None,
1541 }
1542 };
1543
1544 let active_port = tokio::task::spawn_blocking(move || {
1545 let listeners = listeners::get_all().ok()?;
1546
1547 PROCS.refresh_processes();
1549
1550 let descendant_pids: std::collections::HashSet<u32> = PROCS
1551 .all_children(pid)
1552 .into_iter()
1553 .chain(std::iter::once(pid))
1554 .collect();
1555
1556 let process_ports: Vec<u16> = listeners
1557 .into_iter()
1558 .filter(|listener| descendant_pids.contains(&listener.process.pid))
1559 .map(|listener| listener.socket.port())
1560 .filter(|&port| port > 0)
1561 .collect();
1562
1563 if process_ports.is_empty() {
1564 return None;
1565 }
1566
1567 if let Some(ep) = expected_port {
1570 if process_ports.contains(&ep) {
1571 return Some(ep);
1572 }
1573 }
1574
1575 process_ports.into_iter().next()
1581 })
1582 .await
1583 .ok()
1584 .flatten();
1585
1586 if let Some(port) = active_port {
1587 debug!("daemon {id} active_port detected: {port}");
1588 let mut state_file = SUPERVISOR.state_file.lock().await;
1589 if let Some(d) = state_file.daemons.get(&id) {
1590 if d.pid == Some(pid) {
1594 state_file.set_active_port(&id, port);
1595 } else {
1596 debug!(
1597 "daemon {id}: skipping active_port write — PID mismatch \
1598 (expected {pid}, current {:?})",
1599 d.pid
1600 );
1601 return;
1602 }
1603 }
1604 return;
1605 }
1606
1607 debug!(
1608 "daemon {id}: no active port detected for pid {pid} or its descendants (will retry)"
1609 );
1610 }
1611
1612 debug!(
1613 "daemon {id}: active port detection exhausted all retries for pid {pid} and its descendants"
1614 );
1615 });
1616}
1617
1618fn is_daemon_slug_target(id: &DaemonId) -> bool {
1626 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1630 slugs.iter().any(|(slug, entry)| {
1631 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1632 id.name() == daemon_name
1633 })
1634}
1635
1636#[cfg(all(test, unix))]
1637mod tests {
1638 use super::*;
1639
1640 #[test]
1641 fn test_resolve_run_identity_empty_without_sudo() {
1642 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1643 assert_eq!(identity, RunIdentity::Inherit);
1644 }
1645
1646 #[test]
1647 fn test_resolve_run_identity_sudo_fallback() {
1648 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1649 let RunIdentity::Switch { uid, gid, .. } = identity else {
1650 panic!("expected identity switch");
1651 };
1652 assert_eq!(uid.as_raw(), 501);
1653 assert_eq!(gid.as_raw(), 20);
1654 }
1655
1656 #[test]
1657 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1658 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1659 assert_eq!(identity, RunIdentity::Inherit);
1660 }
1661
1662 #[test]
1663 fn test_resolve_configured_user_root_name() {
1664 let identity = resolve_configured_user("root").unwrap();
1665 let RunIdentity::Switch { uid, username, .. } = identity else {
1666 panic!("expected identity switch");
1667 };
1668 assert_eq!(uid.as_raw(), 0);
1669 assert_eq!(
1670 username.as_deref().and_then(|s| s.to_str().ok()),
1671 Some("root")
1672 );
1673 }
1674
1675 #[test]
1676 fn test_resolve_configured_user_root_uid() {
1677 let identity = resolve_configured_user("0").unwrap();
1678 let RunIdentity::Switch { uid, username, .. } = identity else {
1679 panic!("expected identity switch");
1680 };
1681 assert_eq!(uid.as_raw(), 0);
1682 assert_eq!(
1683 username.as_deref().and_then(|s| s.to_str().ok()),
1684 Some("root")
1685 );
1686 }
1687
1688 #[test]
1689 fn test_resolve_configured_user_missing_user_fails() {
1690 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1691 .unwrap_err()
1692 .to_string();
1693 assert!(err.contains("does not exist"));
1694 }
1695
1696 #[test]
1697 fn test_resolve_run_identity_requires_root_for_user_switch() {
1698 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1699 .unwrap_err()
1700 .to_string();
1701 assert!(err.contains("Restart the supervisor with sudo"));
1702 }
1703
1704 #[test]
1705 fn test_resolve_run_identity_same_user_is_noop() {
1706 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1707 assert_eq!(identity, RunIdentity::Inherit);
1708 }
1709}
1710
1711fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1720 let s = crate::settings::settings();
1721 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1722
1723 if should_force_loopback_host(slug) && !lan_enabled {
1724 cmd.env("HOST", "127.0.0.1");
1727 }
1728
1729 if let Some(url) = build_pitchfork_url(slug, &s) {
1731 cmd.env("PITCHFORK_URL", &url);
1732 }
1733
1734 if s.proxy.enable && s.proxy.https {
1736 let ca_path = if s.proxy.tls_cert.is_empty() {
1737 crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1738 } else {
1739 std::path::PathBuf::from(&s.proxy.tls_cert)
1740 };
1741 if ca_path.exists() {
1742 cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1743 }
1744 }
1745
1746 if s.proxy.enable {
1748 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1749 cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1750 }
1751
1752 if lan_enabled {
1754 cmd.env("PITCHFORK_LAN", "1");
1755 }
1756}
1757
1758fn should_force_loopback_host(slug: &Option<String>) -> bool {
1759 let Some(slug) = slug.as_deref() else {
1760 return false;
1761 };
1762
1763 let s = crate::settings::settings();
1764 if !s.proxy.enable {
1765 return false;
1766 }
1767
1768 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1769 slugs.contains_key(slug)
1770}
1771
1772fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1776 let slug = slug.as_ref()?;
1777 if !s.proxy.enable {
1778 return None;
1779 }
1780 let scheme = if s.proxy.https { "https" } else { "http" };
1781 let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1782 let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1783 String::new()
1784 } else {
1785 format!(":{port}")
1786 };
1787 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1788 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1789 Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1790}