1use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39 Inherit,
40 Switch {
41 uid: nix::unistd::Uid,
42 gid: nix::unistd::Gid,
43 username: Option<CString>,
44 },
45}
46
47pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50 if let Some(re) = cache.get(pattern) {
51 return Some(re.clone());
52 }
53 match Regex::new(pattern) {
54 Ok(re) => {
55 cache.insert(pattern.to_string(), re.clone());
56 Some(re)
57 }
58 Err(e) => {
59 error!("invalid regex pattern '{pattern}': {e}");
60 None
61 }
62 }
63}
64
65impl Supervisor {
66 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68 let id = &opts.id;
69 let cmd = opts.cmd.clone();
70
71 {
73 let mut pending = self.pending_autostops.lock().await;
74 if pending.remove(id).is_some() {
75 info!("cleared pending autostop for {id} (daemon starting)");
76 }
77 }
78
79 let daemon = self.get_daemon(id).await;
80 if let Some(daemon) = daemon {
81 if !daemon.status.is_stopping()
84 && !daemon.status.is_stopped()
85 && let Some(pid) = daemon.pid
86 {
87 if opts.force {
88 self.stop(id).await?;
89 info!("run: stop completed for daemon {id}");
90 } else {
91 warn!("daemon {id} already running with pid {pid}");
92 return Ok(IpcResponse::DaemonAlreadyRunning);
93 }
94 }
95 }
96
97 if opts.wait_ready && opts.retry.count() > 0 {
99 let max_attempts = opts.retry.count().saturating_add(1);
101 for attempt in 0..max_attempts {
102 let mut retry_opts = opts.clone();
103 retry_opts.retry_count = attempt;
104 retry_opts.cmd = cmd.clone();
105
106 let result = self.run_once(retry_opts).await?;
107
108 match result {
109 IpcResponse::DaemonReady { daemon } => {
110 return Ok(IpcResponse::DaemonReady { daemon });
111 }
112 IpcResponse::DaemonFailedWithCode { exit_code } => {
113 if attempt < opts.retry.count() {
114 let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115 info!(
116 "daemon {id} failed (attempt {}/{}), retrying in {}s",
117 attempt + 1,
118 max_attempts,
119 backoff_secs
120 );
121 fire_hook(
122 HookType::OnRetry,
123 id.clone(),
124 opts.dir.0.clone(),
125 attempt + 1,
126 opts.env.clone(),
127 vec![],
128 )
129 .await;
130 time::sleep(Duration::from_secs(backoff_secs)).await;
131 continue;
132 } else {
133 info!("daemon {id} failed after {max_attempts} attempts");
134 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135 }
136 }
137 other => return Ok(other),
138 }
139 }
140 }
141
142 self.run_once(opts).await
144 }
145
146 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148 let id = &opts.id;
149 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
151
152 let (ready_tx, ready_rx) = if opts.wait_ready {
154 let (tx, rx) = oneshot::channel();
155 (Some(tx), Some(rx))
156 } else {
157 (None, None)
158 };
159
160 let expected_ports = opts
162 .port
163 .as_ref()
164 .map(|p| p.expect.clone())
165 .unwrap_or_default();
166 let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
167 let port_cfg = opts.port.as_ref().unwrap();
168 match check_ports_available(
169 &expected_ports,
170 port_cfg.auto_bump(),
171 port_cfg.max_bump_attempts(),
172 )
173 .await
174 {
175 Ok(resolved) => {
176 let ready_port = if let Some(configured_port) = opts.ready_port {
177 let bump_offset = resolved
179 .first()
180 .unwrap_or(&0)
181 .saturating_sub(*expected_ports.first().unwrap_or(&0));
182 if expected_ports.contains(&configured_port) && bump_offset > 0 {
183 configured_port
184 .checked_add(bump_offset)
185 .or(Some(configured_port))
186 } else {
187 Some(configured_port)
188 }
189 } else if opts.ready_output.is_none()
190 && opts.ready_http.is_none()
191 && opts.ready_cmd.is_none()
192 && opts.ready_delay.is_none()
193 {
194 resolved.first().copied().filter(|&p| p != 0)
198 } else {
199 None
203 };
204 info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
205 (resolved, ready_port)
206 }
207 Err(e) => {
208 error!("daemon {id}: port check failed: {e}");
209 if let Some(port_error) = e.downcast_ref::<PortError>() {
211 match port_error {
212 PortError::InUse { port, process, pid } => {
213 return Ok(IpcResponse::PortConflict {
214 port: *port,
215 process: process.clone(),
216 pid: *pid,
217 });
218 }
219 PortError::NoAvailablePort {
220 start_port,
221 attempts,
222 } => {
223 return Ok(IpcResponse::NoAvailablePort {
224 start_port: *start_port,
225 attempts: *attempts,
226 });
227 }
228 }
229 }
230 return Ok(IpcResponse::DaemonFailed {
231 error: e.to_string(),
232 });
233 }
234 }
235 } else {
236 if let Some(port) = opts.ready_port {
242 if port > 0 {
243 if let Some((pid, process)) = detect_port_conflict(port).await {
244 return Ok(IpcResponse::PortConflict { port, process, pid });
245 }
246 }
247 }
248 (Vec::new(), opts.ready_port)
249 };
250
251 let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
252 match settings().resolve_mise_bin() {
253 Some(mise_bin) => {
254 let mise_bin_str = mise_bin.to_string_lossy().to_string();
255 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
256 once("exec".to_string())
257 .chain(once(mise_bin_str))
258 .chain(once("x".to_string()))
259 .chain(once("--".to_string()))
260 .chain(cmd)
261 .collect_vec()
262 }
263 None => {
264 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
265 once("exec".to_string()).chain(cmd).collect_vec()
266 }
267 }
268 } else {
269 once("exec".to_string()).chain(cmd).collect_vec()
270 };
271 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
272 let log_path = id.log_path();
273 if let Some(parent) = log_path.parent() {
274 xx::file::mkdirp(parent)?;
275 }
276 #[cfg(unix)]
277 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
278 Ok(identity) => identity,
279 Err(e) => {
280 return Ok(IpcResponse::DaemonFailed {
281 error: e.to_string(),
282 });
283 }
284 };
285 info!("run: spawning daemon {id} with args: {args:?}");
286
287 #[cfg(unix)]
289 let pty_pair = if opts.pty.unwrap_or(false) {
290 match super::pty::openpty() {
291 Ok(pair) => {
292 info!("daemon {id}: allocated PTY (pty = true)");
293 Some(pair)
294 }
295 Err(e) => {
296 warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
297 None
298 }
299 }
300 } else {
301 None
302 };
303
304 let mut cmd = tokio::process::Command::new("sh");
305
306 #[cfg(unix)]
307 if let Some(ref pair) = pty_pair {
308 let slave_file = std::fs::File::from(
312 pair.slave
313 .try_clone()
314 .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
315 );
316 cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
317 |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
318 )?));
319 cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
320 |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
321 )?));
322 cmd.stderr(std::process::Stdio::from(slave_file));
323 } else {
324 cmd.stdout(std::process::Stdio::piped())
325 .stderr(std::process::Stdio::piped());
326 }
327
328 #[cfg(not(unix))]
329 {
330 cmd.stdout(std::process::Stdio::piped())
331 .stderr(std::process::Stdio::piped());
332 }
333
334 cmd.args(&args).current_dir(&opts.dir);
335
336 #[cfg(unix)]
337 if pty_pair.is_none() {
338 cmd.stdin(std::process::Stdio::null());
339 }
340
341 #[cfg(not(unix))]
342 cmd.stdin(std::process::Stdio::null());
343
344 if let Some(ref path) = *env::ORIGINAL_PATH {
346 cmd.env("PATH", path);
347 }
348
349 if let Some(ref env_vars) = opts.env {
351 cmd.envs(env_vars);
352 }
353
354 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
356 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
357 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
358
359 if !resolved_ports.is_empty() {
361 cmd.env("PORT", resolved_ports[0].to_string());
365 for (i, port) in resolved_ports.iter().enumerate() {
367 cmd.env(format!("PORT{i}"), port.to_string());
368 }
369 }
370
371 inject_proxy_env(&mut cmd, &opts.slug);
373
374 #[cfg(unix)]
375 {
376 let run_identity = run_identity.clone();
377 let use_pty = pty_pair.is_some();
378 unsafe {
379 cmd.pre_exec(move || {
380 nix::unistd::setsid().map_err(nix_to_io_error)?;
381
382 if use_pty {
386 let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
387 if ret < 0 {
388 #[cfg(target_os = "linux")]
391 eprintln!(
392 "pitchfork: TIOCSCTTY failed: {}",
393 std::io::Error::last_os_error()
394 );
395 }
396 }
397
398 apply_run_identity(&run_identity)?;
399 Ok(())
400 });
401 }
402 }
403
404 let mut child = cmd.spawn().into_diagnostic()?;
405 let pid = match child.id() {
406 Some(p) => p,
407 None => {
408 warn!("Daemon {id} exited before PID could be captured");
409 return Ok(IpcResponse::DaemonFailed {
410 error: "Process exited immediately".to_string(),
411 });
412 }
413 };
414 info!("started daemon {id} with pid {pid}");
415 PROCS.refresh_pids(&[pid]);
416 let daemon = self
417 .upsert_daemon(
418 UpsertDaemonOpts::builder(id.clone())
419 .set(|o| {
420 o.pid = Some(pid);
421 o.status = DaemonStatus::Running;
422 o.shell_pid = opts.shell_pid;
423 o.dir = Some(opts.dir.0.clone());
424 o.cmd = Some(original_cmd);
425 o.autostop = opts.autostop;
426 o.cron_schedule = opts.cron_schedule.clone();
427 o.cron_retrigger = opts.cron_retrigger;
428 o.retry = Some(opts.retry);
429 o.retry_count = Some(opts.retry_count);
430 o.ready_delay = opts.ready_delay;
431 o.ready_output = opts.ready_output.clone();
432 o.ready_http = opts.ready_http.clone();
433 o.ready_port = effective_ready_port;
434 o.ready_cmd = opts.ready_cmd.clone();
435 o.port = crate::config_types::PortConfig::from_parts(
436 expected_ports,
437 opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
438 );
439 o.resolved_port = resolved_ports;
440 o.depends = Some(opts.depends.clone());
441 o.env = opts.env.clone();
442 o.watch = Some(opts.watch.clone());
443 o.watch_mode = Some(opts.watch_mode);
444 o.watch_base_dir = opts.watch_base_dir.clone();
445 o.mise = opts.mise;
446 o.user = opts.user.clone();
447 o.memory_limit = opts.memory_limit;
448 o.cpu_limit = opts.cpu_limit;
449 o.stop_signal = opts.stop_signal;
450 o.pty = opts.pty;
451 })
452 .build(),
453 )
454 .await?;
455
456 let id_clone = id.clone();
457 let ready_delay = opts.ready_delay;
458 let ready_output = opts.ready_output.clone();
459 let ready_http = opts.ready_http.clone();
460 let ready_port = effective_ready_port;
461 let ready_cmd = opts.ready_cmd.clone();
462 let daemon_dir = opts.dir.0.clone();
463 let hook_retry_count = opts.retry_count;
464 let hook_retry = opts.retry;
465 let hook_daemon_env = opts.env.clone();
466 let on_output_hook = opts.on_output_hook.clone();
467 let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
473 || (settings().proxy.enable && is_daemon_slug_target(id));
474 let daemon_pid = pid;
475
476 #[cfg(unix)]
480 let pty_reader = pty_pair.map(|p| {
481 tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
482 .lines()
483 });
484 #[cfg(not(unix))]
485 let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
486 let stdout_reader = if pty_reader.is_none() {
487 child
488 .stdout
489 .take()
490 .map(|s| tokio::io::BufReader::new(s).lines())
491 } else {
492 None
493 };
494 let stderr_reader = if pty_reader.is_none() {
495 child
496 .stderr
497 .take()
498 .map(|s| tokio::io::BufReader::new(s).lines())
499 } else {
500 None
501 };
502
503 if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
504 error!("Failed to capture stdout/stderr for daemon {id}");
505 }
506
507 tokio::spawn(async move {
508 let id = id_clone;
509
510 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
512
513 if let Some(mut reader) = pty_reader {
514 tokio::spawn(async move {
518 while let Ok(Some(mut line)) = reader.next_line().await {
519 if line.ends_with('\r') {
521 line.pop();
522 }
523 if output_tx.send(line).await.is_err() {
524 break;
525 }
526 }
527 });
528 } else {
529 if let Some(mut stdout) = stdout_reader {
534 let tx = output_tx.clone();
535 tokio::spawn(async move {
536 while let Ok(Some(line)) = stdout.next_line().await {
537 if tx.send(line).await.is_err() {
538 break;
539 }
540 }
541 });
542 }
543 if let Some(mut stderr) = stderr_reader {
544 let tx = output_tx.clone();
545 tokio::spawn(async move {
546 while let Ok(Some(line)) = stderr.next_line().await {
547 if tx.send(line).await.is_err() {
548 break;
549 }
550 }
551 });
552 }
553 drop(output_tx);
555 }
556 let log_file = match tokio::fs::File::options()
557 .append(true)
558 .create(true)
559 .open(&log_path)
560 .await
561 {
562 Ok(f) => f,
563 Err(e) => {
564 error!("Failed to open log file for daemon {id}: {e}");
565 return;
566 }
567 };
568 let mut log_appender = BufWriter::new(log_file);
569
570 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
571 let format_line = |line: String| {
572 let line_for_log = line;
573 if line_for_log.starts_with(&format!("{id} ")) {
574 format!("{} {line_for_log}\n", now())
576 } else {
577 format!("{} {id} {line_for_log}\n", now())
578 }
579 };
580
581 let mut ready_notified = false;
583 let mut ready_tx = ready_tx;
584 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
585 let mut active_port_spawned = false;
587
588 let on_output_hook = match on_output_hook {
592 Some(ref hook) => match hook.validate(id.name()) {
593 Ok(()) => on_output_hook,
594 Err(e) => {
595 error!("{e}");
596 None
597 }
598 },
599 None => None,
600 };
601
602 let on_output_pattern: Option<regex::Regex> = on_output_hook
605 .as_ref()
606 .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
607 let on_output_debounce = on_output_hook
608 .as_ref()
609 .map(|h| h.debounce_duration())
610 .unwrap_or(Duration::from_millis(1000));
611 let mut on_output_last_fired: Option<std::time::Instant> = None;
613
614 let mut delay_timer =
615 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
616
617 let s = settings();
619 let ready_check_interval = s.supervisor_ready_check_interval();
620 let http_client_timeout = s.supervisor_http_client_timeout();
621 let log_flush_interval_duration = s.supervisor_log_flush_interval();
622
623 let mut http_check_interval = ready_http
625 .as_ref()
626 .map(|_| tokio::time::interval(ready_check_interval));
627 let http_client = ready_http.as_ref().map(|_| {
628 reqwest::Client::builder()
629 .timeout(http_client_timeout)
630 .build()
631 .unwrap_or_default()
632 });
633
634 let mut port_check_interval =
636 ready_port.map(|_| tokio::time::interval(ready_check_interval));
637
638 let mut cmd_check_interval = ready_cmd
640 .as_ref()
641 .map(|_| tokio::time::interval(ready_check_interval));
642
643 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
645
646 let (exit_tx, mut exit_rx) =
648 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
649
650 let child_pid = child.id().unwrap_or(0);
652 tokio::spawn(async move {
653 let result = child.wait().await;
654 #[cfg(all(unix, not(target_os = "linux")))]
664 let result = match &result {
665 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
666 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
667 warn!(
668 "daemon pid {child_pid} wait() got ECHILD; \
669 recovered exit code {code} from zombie reaper"
670 );
671 use std::os::unix::process::ExitStatusExt;
676 if code >= 0 {
677 Ok(std::process::ExitStatus::from_raw(code << 8))
678 } else {
679 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
681 }
682 } else {
683 warn!(
684 "daemon pid {child_pid} wait() got ECHILD but no \
685 stashed status found; reporting as error"
686 );
687 result
688 }
689 }
690 _ => result,
691 };
692 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
693 let _ = exit_tx.send(result).await;
694 });
695
696 #[allow(unused_assignments)]
697 let mut exit_status = None;
699
700 if has_port_config
706 && ready_pattern.is_none()
707 && ready_http.is_none()
708 && ready_port.is_none()
709 && ready_cmd.is_none()
710 && delay_timer.is_none()
711 {
712 active_port_spawned = true;
713 detect_and_store_active_port(id.clone(), daemon_pid);
714 }
715
716 loop {
717 select! {
718 Some(line) = output_rx.recv() => {
719 let formatted = format_line(line.clone());
720 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
721 error!("Failed to write to log for daemon {id}: {e}");
722 }
723 trace!("output: {id} {formatted}");
724
725 let line_clean = console::strip_ansi_codes(&line).to_string();
728
729 if !ready_notified
731 && let Some(ref pattern) = ready_pattern
732 && pattern.is_match(&line_clean)
733 {
734 info!("daemon {id} ready: output matched pattern");
735 ready_notified = true;
736 let _ = log_appender.flush().await;
737 if let Some(tx) = ready_tx.take() {
738 let _ = tx.send(Ok(()));
739 }
740 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
741 if !active_port_spawned && has_port_config {
742 active_port_spawned = true;
743 detect_and_store_active_port(id.clone(), daemon_pid);
744 }
745 }
746
747 if let Some(ref hook) = on_output_hook {
749 let matched = match (&hook.filter, &on_output_pattern) {
750 (Some(substr), _) => line_clean.contains(substr.as_str()),
751 (None, Some(re)) => re.is_match(&line_clean),
752 (None, None) => true,
753 };
754 if matched {
755 let now = std::time::Instant::now();
756 let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
757 if elapsed.is_none_or(|e| e >= on_output_debounce) {
758 on_output_last_fired = Some(now);
759 hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
760 }
761 }
762 }
763 }
764 Some(result) = exit_rx.recv() => {
765 exit_status = Some(result);
767 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
768 let _ = log_appender.flush().await;
770 if !ready_notified {
771 if let Some(tx) = ready_tx.take() {
772 let is_success = exit_status.as_ref()
774 .and_then(|r| r.as_ref().ok())
775 .map(|s| s.success())
776 .unwrap_or(false);
777
778 if is_success {
779 debug!("daemon {id} exited successfully before ready check, sending success notification");
780 let _ = tx.send(Ok(()));
781 } else {
782 let exit_code = exit_status.as_ref()
783 .and_then(|r| r.as_ref().ok())
784 .and_then(|s| s.code());
785 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
786 let _ = tx.send(Err(exit_code));
787 }
788 }
789 } else {
790 debug!("daemon {id} was already marked ready, not sending notification");
791 }
792 break;
793 },
794 _ = async {
795 if let Some(ref mut interval) = http_check_interval {
796 interval.tick().await;
797 } else {
798 std::future::pending::<()>().await;
799 }
800 }, if !ready_notified && ready_http.is_some() => {
801 if let (Some(http), Some(client)) = (&ready_http, &http_client) {
802 match client.get(&http.url).send().await {
803 Ok(response) if http.accepts_status(response.status().as_u16()) => {
804 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
805 ready_notified = true;
806 let _ = log_appender.flush().await;
807 if let Some(tx) = ready_tx.take() {
808 let _ = tx.send(Ok(()));
809 }
810 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
811 http_check_interval = None;
812 if !active_port_spawned && has_port_config {
813 active_port_spawned = true;
814 detect_and_store_active_port(id.clone(), daemon_pid);
815 }
816 }
817 Ok(response) => {
818 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
819 }
820 Err(e) => {
821 trace!("daemon {id} HTTP check failed: {e}");
822 }
823 }
824 }
825 }
826 _ = async {
827 if let Some(ref mut interval) = port_check_interval {
828 interval.tick().await;
829 } else {
830 std::future::pending::<()>().await;
831 }
832 }, if !ready_notified && ready_port.is_some() => {
833 if let Some(port) = ready_port {
834 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
835 Ok(_) => {
836 info!("daemon {id} ready: TCP port {port} is listening");
837 ready_notified = true;
838 let _ = log_appender.flush().await;
840 if let Some(tx) = ready_tx.take() {
841 let _ = tx.send(Ok(()));
842 }
843 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
844 port_check_interval = None;
846 if !active_port_spawned && has_port_config {
847 active_port_spawned = true;
848 detect_and_store_active_port(id.clone(), daemon_pid);
849 }
850 }
851 Err(_) => {
852 trace!("daemon {id} port check: port {port} not listening yet");
853 }
854 }
855 }
856 }
857 _ = async {
858 if let Some(ref mut interval) = cmd_check_interval {
859 interval.tick().await;
860 } else {
861 std::future::pending::<()>().await;
862 }
863 }, if !ready_notified && ready_cmd.is_some() => {
864 if let Some(ref cmd) = ready_cmd {
865 let mut command = Shell::default_for_platform().command(cmd);
867 command
868 .current_dir(&daemon_dir)
869 .stdout(std::process::Stdio::null())
870 .stderr(std::process::Stdio::null());
871 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
872 match result {
873 Ok(status) if status.success() => {
874 info!("daemon {id} ready: readiness command succeeded");
875 ready_notified = true;
876 let _ = log_appender.flush().await;
877 if let Some(tx) = ready_tx.take() {
878 let _ = tx.send(Ok(()));
879 }
880 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
881 cmd_check_interval = None;
883 if !active_port_spawned && has_port_config {
884 active_port_spawned = true;
885 detect_and_store_active_port(id.clone(), daemon_pid);
886 }
887 }
888 Ok(_) => {
889 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
890 }
891 Err(e) => {
892 trace!("daemon {id} cmd check failed: {e}");
893 }
894 }
895 }
896 }
897 _ = async {
898 if let Some(ref mut timer) = delay_timer {
899 timer.await;
900 } else {
901 std::future::pending::<()>().await;
902 }
903 } => {
904 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
905 info!("daemon {id} ready: delay elapsed");
906 ready_notified = true;
907 let _ = log_appender.flush().await;
909 if let Some(tx) = ready_tx.take() {
910 let _ = tx.send(Ok(()));
911 }
912 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
913 }
914 delay_timer = None;
916 if !active_port_spawned && has_port_config {
917 active_port_spawned = true;
918 detect_and_store_active_port(id.clone(), daemon_pid);
919 }
920 }
921 _ = log_flush_interval.tick() => {
922 if let Err(e) = log_appender.flush().await {
924 error!("Failed to flush log for daemon {id}: {e}");
925 }
926 }
927 }
928 }
929
930 if let Err(e) = log_appender.flush().await {
932 error!("Failed to final flush log for daemon {id}: {e}");
933 }
934
935 {
937 let mut state_file = SUPERVISOR.state_file.lock().await;
938 state_file.clear_active_port(&id);
939 }
940
941 let exit_status = if let Some(status) = exit_status {
943 status
944 } else {
945 match exit_rx.recv().await {
947 Some(status) => status,
948 None => {
949 warn!("daemon {id} exit channel closed without receiving status");
950 Err(std::io::Error::other("exit channel closed"))
951 }
952 }
953 };
954 let current_daemon = SUPERVISOR.get_daemon(&id).await;
955
956 SUPERVISOR
961 .active_monitors
962 .fetch_add(1, atomic::Ordering::Release);
963 struct MonitorGuard;
964 impl Drop for MonitorGuard {
965 fn drop(&mut self) {
966 SUPERVISOR
967 .active_monitors
968 .fetch_sub(1, atomic::Ordering::Release);
969 SUPERVISOR.monitor_done.notify_waiters();
970 }
971 }
972 let _monitor_guard = MonitorGuard;
973 if current_daemon.is_none()
978 || current_daemon.as_ref().is_some_and(|d| {
979 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
980 })
981 {
982 return;
984 }
985 let already_stopped = current_daemon
990 .as_ref()
991 .is_some_and(|d| d.status.is_stopped());
992 let is_stopping = already_stopped
993 || current_daemon
994 .as_ref()
995 .is_some_and(|d| d.status.is_stopping());
996
997 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
999 (Ok(status), true) => {
1000 (status.code().unwrap_or(-1), "stop")
1004 }
1005 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
1006 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
1007 (Err(_), true) => {
1008 (-1, "stop")
1010 }
1011 (Err(_), false) => (-1, "fail"),
1012 };
1013
1014 if !already_stopped {
1016 if let Ok(status) = &exit_status {
1017 info!("daemon {id} exited with status {status}");
1018 }
1019 let (new_status, last_exit_success) = match exit_reason {
1020 "stop" | "exit" => (
1021 DaemonStatus::Stopped,
1022 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1023 ),
1024 _ => (DaemonStatus::Errored(exit_code), false),
1025 };
1026 if let Err(e) = SUPERVISOR
1027 .upsert_daemon(
1028 UpsertDaemonOpts::builder(id.clone())
1029 .set(|o| {
1030 o.pid = None;
1031 o.status = new_status;
1032 o.last_exit_success = Some(last_exit_success);
1033 })
1034 .build(),
1035 )
1036 .await
1037 {
1038 error!("Failed to update daemon state for {id}: {e}");
1039 }
1040 }
1041
1042 let hook_extra_env = vec![
1044 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1045 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1046 ];
1047
1048 let hooks_to_fire: Vec<HookType> = match exit_reason {
1050 "stop" => vec![HookType::OnStop, HookType::OnExit],
1051 "exit" => vec![HookType::OnExit],
1052 _ if hook_retry_count >= hook_retry.count() => {
1054 vec![HookType::OnFail, HookType::OnExit]
1055 }
1056 _ => vec![],
1057 };
1058
1059 for hook_type in hooks_to_fire {
1060 fire_hook(
1061 hook_type,
1062 id.clone(),
1063 daemon_dir.clone(),
1064 hook_retry_count,
1065 hook_daemon_env.clone(),
1066 hook_extra_env.clone(),
1067 )
1068 .await;
1069 }
1070 });
1071
1072 if let Some(ready_rx) = ready_rx {
1074 match ready_rx.await {
1075 Ok(Ok(())) => {
1076 info!("daemon {id} is ready");
1077 Ok(IpcResponse::DaemonReady { daemon })
1078 }
1079 Ok(Err(exit_code)) => {
1080 error!("daemon {id} failed before becoming ready");
1081 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1082 }
1083 Err(_) => {
1084 error!("readiness channel closed unexpectedly for daemon {id}");
1085 Ok(IpcResponse::DaemonStart { daemon })
1086 }
1087 }
1088 } else {
1089 Ok(IpcResponse::DaemonStart { daemon })
1090 }
1091 }
1092
1093 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1095 let pitchfork_id = DaemonId::pitchfork();
1096 if *id == pitchfork_id {
1097 return Ok(IpcResponse::Error(
1098 "Cannot stop supervisor via stop command".into(),
1099 ));
1100 }
1101 info!("stopping daemon: {id}");
1102 if let Some(daemon) = self.get_daemon(id).await {
1103 trace!("daemon to stop: {daemon}");
1104 if let Some(pid) = daemon.pid {
1105 trace!("killing pid: {pid}");
1106 PROCS.refresh_pids(&[pid]);
1107 if PROCS.is_running(pid) {
1108 self.upsert_daemon(
1110 UpsertDaemonOpts::builder(id.clone())
1111 .set(|o| {
1112 o.pid = Some(pid);
1113 o.status = DaemonStatus::Stopping;
1114 })
1115 .build(),
1116 )
1117 .await?;
1118
1119 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1122 let stop_signal: i32 = stop_cfg.signal.into();
1123 if let Err(e) = PROCS
1124 .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1125 .await
1126 {
1127 debug!("failed to kill pid {pid}: {e}");
1128 PROCS.refresh_processes();
1130 if PROCS.is_running(pid) {
1131 debug!("failed to stop pid {pid}: process still running after kill");
1133 self.upsert_daemon(
1134 UpsertDaemonOpts::builder(id.clone())
1135 .set(|o| {
1136 o.pid = Some(pid); o.status = DaemonStatus::Running;
1138 })
1139 .build(),
1140 )
1141 .await?;
1142 return Ok(IpcResponse::DaemonStopFailed {
1143 error: format!(
1144 "process {pid} still running after kill attempt: {e}"
1145 ),
1146 });
1147 }
1148 }
1149
1150 self.upsert_daemon(
1155 UpsertDaemonOpts::builder(id.clone())
1156 .set(|o| {
1157 o.pid = None;
1158 o.status = DaemonStatus::Stopped;
1159 o.last_exit_success = Some(true); })
1161 .build(),
1162 )
1163 .await?;
1164 } else {
1165 debug!("pid {pid} not running, process may have exited unexpectedly");
1166 self.upsert_daemon(
1169 UpsertDaemonOpts::builder(id.clone())
1170 .set(|o| {
1171 o.pid = None;
1172 o.status = DaemonStatus::Stopped;
1173 })
1174 .build(),
1175 )
1176 .await?;
1177 return Ok(IpcResponse::DaemonWasNotRunning);
1178 }
1179 Ok(IpcResponse::Ok)
1180 } else {
1181 debug!("daemon {id} not running");
1182 Ok(IpcResponse::DaemonNotRunning)
1183 }
1184 } else {
1185 debug!("daemon {id} not found");
1186 Ok(IpcResponse::DaemonNotFound)
1187 }
1188 }
1189}
1190
1191#[cfg(unix)]
1192fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1193 let settings_user = settings().supervisor.user.trim();
1194 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1195 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1196 let configured = daemon_user.or(settings_user);
1197 let current_uid = nix::unistd::Uid::effective().as_raw();
1198 let current_gid = nix::unistd::Gid::effective().as_raw();
1199 resolve_run_identity(
1200 configured,
1201 current_uid,
1202 current_gid,
1203 std::env::var("SUDO_UID").ok().as_deref(),
1204 std::env::var("SUDO_GID").ok().as_deref(),
1205 )
1206}
1207
1208#[cfg(unix)]
1209fn resolve_run_identity(
1210 configured: Option<&str>,
1211 current_uid: u32,
1212 current_gid: u32,
1213 sudo_uid: Option<&str>,
1214 sudo_gid: Option<&str>,
1215) -> Result<RunIdentity> {
1216 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1217 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1218 if let Some(user) = configured {
1219 let identity = resolve_configured_user(user)?;
1220 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1221 if identity.matches(current_uid, current_gid) {
1222 return Ok(RunIdentity::Inherit);
1223 }
1224 return Ok(identity);
1225 }
1226
1227 if current_uid.is_root()
1228 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1229 {
1230 return Ok(identity);
1231 }
1232
1233 Ok(RunIdentity::Inherit)
1234}
1235
1236#[cfg(unix)]
1237fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1238 if user.chars().all(|c| c.is_ascii_digit()) {
1239 let uid = user
1240 .parse::<u32>()
1241 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1242 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1243 .into_diagnostic()?
1244 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1245 return run_identity_from_user_record(user_record);
1246 }
1247
1248 let user_record = nix::unistd::User::from_name(user)
1249 .into_diagnostic()?
1250 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1251 run_identity_from_user_record(user_record)
1252}
1253
1254#[cfg(unix)]
1255fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1256 let username = CString::new(user.name)
1257 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1258 Ok(RunIdentity::Switch {
1259 uid: user.uid,
1260 gid: user.gid,
1261 username: Some(username),
1262 })
1263}
1264
1265#[cfg(unix)]
1266fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1267 RunIdentity::Switch {
1268 uid: nix::unistd::Uid::from_raw(uid),
1269 gid: nix::unistd::Gid::from_raw(gid),
1270 username,
1271 }
1272}
1273
1274#[cfg(unix)]
1275fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1276 let uid = sudo_uid?.parse::<u32>().ok()?;
1277 let gid = sudo_gid?.parse::<u32>().ok()?;
1278 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1279 .ok()
1280 .flatten()
1281 .and_then(|u| CString::new(u.name).ok());
1282 Some(run_identity_from_raw_ids(uid, gid, username))
1283}
1284
1285#[cfg(unix)]
1286fn ensure_can_use_identity(
1287 configured_user: &str,
1288 identity: &RunIdentity,
1289 current_uid: nix::unistd::Uid,
1290 current_gid: nix::unistd::Gid,
1291) -> Result<()> {
1292 let RunIdentity::Switch { uid, gid, .. } = identity else {
1293 return Ok(());
1294 };
1295 if *uid == current_uid && *gid == current_gid {
1296 return Ok(());
1297 }
1298 if current_uid.is_root() {
1299 return Ok(());
1300 }
1301 Err(miette::miette!(
1302 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1303 configured_user,
1304 current_uid.as_raw(),
1305 current_gid.as_raw(),
1306 uid.as_raw(),
1307 gid.as_raw()
1308 ))
1309}
1310
1311#[cfg(unix)]
1312fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1313 let RunIdentity::Switch { uid, gid, username } = identity else {
1314 return Ok(());
1315 };
1316 if let Some(username) = username {
1317 initgroups_for_user(username, *gid)?;
1318 } else {
1319 setgroups_to_primary(*gid)?;
1320 }
1321 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1322 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1323 Ok(())
1324}
1325
1326#[cfg(unix)]
1327impl RunIdentity {
1328 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1329 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1330 }
1331}
1332
1333#[cfg(unix)]
1334fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1335 let groups = [gid.as_raw() as libc::gid_t];
1336 #[cfg(any(target_os = "linux", target_os = "android"))]
1337 let group_count = groups.len();
1338 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1339 let group_count = groups.len() as libc::c_int;
1340 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1341 if rc == -1 {
1342 Err(std::io::Error::last_os_error())
1343 } else {
1344 Ok(())
1345 }
1346}
1347
1348#[cfg(unix)]
1349fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1350 let gid = gid.as_raw();
1351 #[cfg(any(
1352 target_os = "macos",
1353 target_os = "ios",
1354 target_os = "tvos",
1355 target_os = "watchos"
1356 ))]
1357 let base_gid = i32::try_from(gid)
1358 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1359
1360 #[cfg(not(any(
1361 target_os = "macos",
1362 target_os = "ios",
1363 target_os = "tvos",
1364 target_os = "watchos"
1365 )))]
1366 let base_gid = gid as libc::gid_t;
1367
1368 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1371 if rc == -1 {
1372 Err(std::io::Error::last_os_error())
1373 } else {
1374 Ok(())
1375 }
1376}
1377
1378#[cfg(unix)]
1379fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1380 std::io::Error::from_raw_os_error(err as i32)
1381}
1382
1383async fn check_ports_available(
1390 expected_ports: &[u16],
1391 auto_bump: bool,
1392 max_attempts: u32,
1393) -> Result<Vec<u16>> {
1394 if expected_ports.is_empty() {
1395 return Ok(Vec::new());
1396 }
1397
1398 for bump_offset in 0..=max_attempts {
1399 let candidate_ports: Vec<u16> = expected_ports
1401 .iter()
1402 .map(|&p| p.wrapping_add(bump_offset as u16))
1403 .collect();
1404
1405 let mut all_available = true;
1407 let mut conflicting_port = None;
1408
1409 for &port in &candidate_ports {
1410 if port == 0 {
1413 continue;
1414 }
1415
1416 if is_port_in_use(port).await {
1430 all_available = false;
1431 conflicting_port = Some(port);
1432 break;
1433 }
1434 }
1435
1436 if all_available {
1437 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1441 return Err(PortError::NoAvailablePort {
1442 start_port: expected_ports[0],
1443 attempts: bump_offset + 1,
1444 }
1445 .into());
1446 }
1447 if bump_offset > 0 {
1448 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1449 }
1450 return Ok(candidate_ports);
1451 }
1452
1453 if bump_offset == 0 && !auto_bump {
1455 if let Some(port) = conflicting_port {
1456 let (pid, process) = identify_port_owner(port).await;
1457 return Err(PortError::InUse { port, process, pid }.into());
1458 }
1459 }
1460 }
1461
1462 Err(PortError::NoAvailablePort {
1464 start_port: expected_ports[0],
1465 attempts: max_attempts + 1,
1466 }
1467 .into())
1468}
1469
1470async fn is_port_in_use(port: u16) -> bool {
1476 tokio::task::spawn_blocking(move || {
1477 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1478 match std::net::TcpListener::bind((addr, port)) {
1479 Ok(listener) => drop(listener),
1480 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1481 Err(_) => continue,
1482 }
1483 }
1484 false
1485 })
1486 .await
1487 .unwrap_or(false)
1488}
1489
1490async fn identify_port_owner(port: u16) -> (u32, String) {
1495 tokio::task::spawn_blocking(move || {
1496 listeners::get_all()
1497 .ok()
1498 .and_then(|list| {
1499 list.into_iter()
1500 .find(|l| l.socket.port() == port)
1501 .map(|l| (l.process.pid, l.process.name))
1502 })
1503 .unwrap_or((0, "unknown".to_string()))
1504 })
1505 .await
1506 .unwrap_or((0, "unknown".to_string()))
1507}
1508
1509async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1514 if !is_port_in_use(port).await {
1515 return None;
1516 }
1517 Some(identify_port_owner(port).await)
1518}
1519
1520fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1535 tokio::spawn(async move {
1536 for delay_ms in [500u64, 1000, 2000, 4000] {
1540 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1541
1542 let expected_port: Option<u16> = {
1545 let state_file = SUPERVISOR.state_file.lock().await;
1546 match state_file.daemons.get(&id) {
1547 Some(d) if d.pid.is_none() => {
1548 debug!("daemon {id}: aborting active_port detection — process exited");
1549 return;
1550 }
1551 Some(d) => d
1552 .port
1553 .as_ref()
1554 .and_then(|p| p.expect.first().copied())
1555 .filter(|&p| p > 0),
1556 None => None,
1557 }
1558 };
1559
1560 let active_port = tokio::task::spawn_blocking(move || {
1561 let listeners = listeners::get_all().ok()?;
1562 let process_ports: Vec<u16> = listeners
1563 .into_iter()
1564 .filter(|listener| listener.process.pid == pid)
1565 .map(|listener| listener.socket.port())
1566 .filter(|&port| port > 0)
1567 .collect();
1568
1569 if process_ports.is_empty() {
1570 return None;
1571 }
1572
1573 if let Some(ep) = expected_port {
1576 if process_ports.contains(&ep) {
1577 return Some(ep);
1578 }
1579 }
1580
1581 process_ports.into_iter().next()
1587 })
1588 .await
1589 .ok()
1590 .flatten();
1591
1592 if let Some(port) = active_port {
1593 debug!("daemon {id} active_port detected: {port}");
1594 let mut state_file = SUPERVISOR.state_file.lock().await;
1595 if let Some(d) = state_file.daemons.get(&id) {
1596 if d.pid == Some(pid) {
1600 state_file.set_active_port(&id, port);
1601 } else {
1602 debug!(
1603 "daemon {id}: skipping active_port write — PID mismatch \
1604 (expected {pid}, current {:?})",
1605 d.pid
1606 );
1607 return;
1608 }
1609 }
1610 return;
1611 }
1612
1613 debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1614 }
1615
1616 debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1617 });
1618}
1619
1620fn is_daemon_slug_target(id: &DaemonId) -> bool {
1628 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1632 slugs.iter().any(|(slug, entry)| {
1633 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1634 id.name() == daemon_name
1635 })
1636}
1637
1638#[cfg(all(test, unix))]
1639mod tests {
1640 use super::*;
1641
1642 #[test]
1643 fn test_resolve_run_identity_empty_without_sudo() {
1644 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1645 assert_eq!(identity, RunIdentity::Inherit);
1646 }
1647
1648 #[test]
1649 fn test_resolve_run_identity_sudo_fallback() {
1650 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1651 let RunIdentity::Switch { uid, gid, .. } = identity else {
1652 panic!("expected identity switch");
1653 };
1654 assert_eq!(uid.as_raw(), 501);
1655 assert_eq!(gid.as_raw(), 20);
1656 }
1657
1658 #[test]
1659 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1660 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1661 assert_eq!(identity, RunIdentity::Inherit);
1662 }
1663
1664 #[test]
1665 fn test_resolve_configured_user_root_name() {
1666 let identity = resolve_configured_user("root").unwrap();
1667 let RunIdentity::Switch { uid, username, .. } = identity else {
1668 panic!("expected identity switch");
1669 };
1670 assert_eq!(uid.as_raw(), 0);
1671 assert_eq!(
1672 username.as_deref().and_then(|s| s.to_str().ok()),
1673 Some("root")
1674 );
1675 }
1676
1677 #[test]
1678 fn test_resolve_configured_user_root_uid() {
1679 let identity = resolve_configured_user("0").unwrap();
1680 let RunIdentity::Switch { uid, username, .. } = identity else {
1681 panic!("expected identity switch");
1682 };
1683 assert_eq!(uid.as_raw(), 0);
1684 assert_eq!(
1685 username.as_deref().and_then(|s| s.to_str().ok()),
1686 Some("root")
1687 );
1688 }
1689
1690 #[test]
1691 fn test_resolve_configured_user_missing_user_fails() {
1692 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1693 .unwrap_err()
1694 .to_string();
1695 assert!(err.contains("does not exist"));
1696 }
1697
1698 #[test]
1699 fn test_resolve_run_identity_requires_root_for_user_switch() {
1700 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1701 .unwrap_err()
1702 .to_string();
1703 assert!(err.contains("Restart the supervisor with sudo"));
1704 }
1705
1706 #[test]
1707 fn test_resolve_run_identity_same_user_is_noop() {
1708 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1709 assert_eq!(identity, RunIdentity::Inherit);
1710 }
1711}
1712
1713fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1722 let s = crate::settings::settings();
1723 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1724
1725 if should_force_loopback_host(slug) && !lan_enabled {
1726 cmd.env("HOST", "127.0.0.1");
1729 }
1730
1731 if let Some(url) = build_pitchfork_url(slug, s) {
1733 cmd.env("PITCHFORK_URL", &url);
1734 }
1735
1736 if s.proxy.enable && s.proxy.https {
1738 let ca_path = if s.proxy.tls_cert.is_empty() {
1739 crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1740 } else {
1741 std::path::PathBuf::from(&s.proxy.tls_cert)
1742 };
1743 if ca_path.exists() {
1744 cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1745 }
1746 }
1747
1748 if s.proxy.enable {
1750 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1751 cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1752 }
1753
1754 if lan_enabled {
1756 cmd.env("PITCHFORK_LAN", "1");
1757 }
1758}
1759
1760fn should_force_loopback_host(slug: &Option<String>) -> bool {
1761 let Some(slug) = slug.as_deref() else {
1762 return false;
1763 };
1764
1765 let s = crate::settings::settings();
1766 if !s.proxy.enable {
1767 return false;
1768 }
1769
1770 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1771 slugs.contains_key(slug)
1772}
1773
1774fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1778 let slug = slug.as_ref()?;
1779 if !s.proxy.enable {
1780 return None;
1781 }
1782 let scheme = if s.proxy.https { "https" } else { "http" };
1783 let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1784 let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1785 String::new()
1786 } else {
1787 format!(":{port}")
1788 };
1789 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1790 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1791 Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1792}