1use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39 Inherit,
40 Switch {
41 uid: nix::unistd::Uid,
42 gid: nix::unistd::Gid,
43 username: Option<CString>,
44 },
45}
46
47pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50 if let Some(re) = cache.get(pattern) {
51 return Some(re.clone());
52 }
53 match Regex::new(pattern) {
54 Ok(re) => {
55 cache.insert(pattern.to_string(), re.clone());
56 Some(re)
57 }
58 Err(e) => {
59 error!("invalid regex pattern '{pattern}': {e}");
60 None
61 }
62 }
63}
64
65impl Supervisor {
66 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68 let id = &opts.id;
69 let cmd = opts.cmd.clone();
70
71 {
73 let mut pending = self.pending_autostops.lock().await;
74 if pending.remove(id).is_some() {
75 info!("cleared pending autostop for {id} (daemon starting)");
76 }
77 }
78
79 let daemon = self.get_daemon(id).await;
80 if let Some(daemon) = daemon {
81 if !daemon.status.is_stopping()
84 && !daemon.status.is_stopped()
85 && let Some(pid) = daemon.pid
86 {
87 if opts.force {
88 self.stop(id).await?;
89 info!("run: stop completed for daemon {id}");
90 } else {
91 warn!("daemon {id} already running with pid {pid}");
92 return Ok(IpcResponse::DaemonAlreadyRunning);
93 }
94 }
95 }
96
97 if opts.wait_ready && opts.retry.count() > 0 {
99 let max_attempts = opts.retry.count().saturating_add(1);
101 for attempt in 0..max_attempts {
102 let mut retry_opts = opts.clone();
103 retry_opts.retry_count = attempt;
104 retry_opts.cmd = cmd.clone();
105
106 let result = self.run_once(retry_opts).await?;
107
108 match result {
109 IpcResponse::DaemonReady { daemon } => {
110 return Ok(IpcResponse::DaemonReady { daemon });
111 }
112 IpcResponse::DaemonFailedWithCode { exit_code } => {
113 if attempt < opts.retry.count() {
114 let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115 info!(
116 "daemon {id} failed (attempt {}/{}), retrying in {}s",
117 attempt + 1,
118 max_attempts,
119 backoff_secs
120 );
121 fire_hook(
122 HookType::OnRetry,
123 id.clone(),
124 opts.dir.0.clone(),
125 attempt + 1,
126 opts.env.clone(),
127 vec![],
128 )
129 .await;
130 time::sleep(Duration::from_secs(backoff_secs)).await;
131 continue;
132 } else {
133 info!("daemon {id} failed after {max_attempts} attempts");
134 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135 }
136 }
137 other => return Ok(other),
138 }
139 }
140 }
141
142 self.run_once(opts).await
144 }
145
146 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148 let id = &opts.id;
149 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
151
152 let (ready_tx, ready_rx) = if opts.wait_ready {
154 let (tx, rx) = oneshot::channel();
155 (Some(tx), Some(rx))
156 } else {
157 (None, None)
158 };
159
160 let expected_ports = opts
162 .port
163 .as_ref()
164 .map(|p| p.expect.clone())
165 .unwrap_or_default();
166 let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
167 let port_cfg = opts.port.as_ref().unwrap();
168 match check_ports_available(
169 &expected_ports,
170 port_cfg.auto_bump(),
171 port_cfg.max_bump_attempts(),
172 )
173 .await
174 {
175 Ok(resolved) => {
176 let ready_port = if let Some(configured_port) = opts.ready_port {
177 let bump_offset = resolved
179 .first()
180 .unwrap_or(&0)
181 .saturating_sub(*expected_ports.first().unwrap_or(&0));
182 if expected_ports.contains(&configured_port) && bump_offset > 0 {
183 configured_port
184 .checked_add(bump_offset)
185 .or(Some(configured_port))
186 } else {
187 Some(configured_port)
188 }
189 } else if opts.ready_output.is_none()
190 && opts.ready_http.is_none()
191 && opts.ready_cmd.is_none()
192 && opts.ready_delay.is_none()
193 {
194 resolved.first().copied().filter(|&p| p != 0)
198 } else {
199 None
203 };
204 info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
205 (resolved, ready_port)
206 }
207 Err(e) => {
208 error!("daemon {id}: port check failed: {e}");
209 if let Some(port_error) = e.downcast_ref::<PortError>() {
211 match port_error {
212 PortError::InUse { port, process, pid } => {
213 return Ok(IpcResponse::PortConflict {
214 port: *port,
215 process: process.clone(),
216 pid: *pid,
217 });
218 }
219 PortError::NoAvailablePort {
220 start_port,
221 attempts,
222 } => {
223 return Ok(IpcResponse::NoAvailablePort {
224 start_port: *start_port,
225 attempts: *attempts,
226 });
227 }
228 }
229 }
230 return Ok(IpcResponse::DaemonFailed {
231 error: e.to_string(),
232 });
233 }
234 }
235 } else {
236 if let Some(port) = opts.ready_port {
242 if port > 0 {
243 if let Some((pid, process)) = detect_port_conflict(port).await {
244 return Ok(IpcResponse::PortConflict { port, process, pid });
245 }
246 }
247 }
248 (Vec::new(), opts.ready_port)
249 };
250
251 let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
252 match settings().resolve_mise_bin() {
253 Some(mise_bin) => {
254 let mise_bin_str = mise_bin.to_string_lossy().to_string();
255 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
256 once("exec".to_string())
257 .chain(once(mise_bin_str))
258 .chain(once("x".to_string()))
259 .chain(once("--".to_string()))
260 .chain(cmd)
261 .collect_vec()
262 }
263 None => {
264 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
265 once("exec".to_string()).chain(cmd).collect_vec()
266 }
267 }
268 } else {
269 once("exec".to_string()).chain(cmd).collect_vec()
270 };
271 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
272 let log_path = id.log_path();
273 if let Some(parent) = log_path.parent() {
274 xx::file::mkdirp(parent)?;
275 }
276 #[cfg(unix)]
277 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
278 Ok(identity) => identity,
279 Err(e) => {
280 return Ok(IpcResponse::DaemonFailed {
281 error: e.to_string(),
282 });
283 }
284 };
285 info!("run: spawning daemon {id} with args: {args:?}");
286
287 #[cfg(unix)]
289 let pty_pair = if opts.pty.unwrap_or(false) {
290 match super::pty::openpty() {
291 Ok(pair) => {
292 info!("daemon {id}: allocated PTY (pty = true)");
293 Some(pair)
294 }
295 Err(e) => {
296 warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
297 None
298 }
299 }
300 } else {
301 None
302 };
303
304 let mut cmd = tokio::process::Command::new("sh");
305
306 #[cfg(unix)]
307 if let Some(ref pair) = pty_pair {
308 let slave_file = std::fs::File::from(
312 pair.slave
313 .try_clone()
314 .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
315 );
316 cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
317 |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
318 )?));
319 cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
320 |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
321 )?));
322 cmd.stderr(std::process::Stdio::from(slave_file));
323 } else {
324 cmd.stdout(std::process::Stdio::piped())
325 .stderr(std::process::Stdio::piped());
326 }
327
328 #[cfg(not(unix))]
329 {
330 cmd.stdout(std::process::Stdio::piped())
331 .stderr(std::process::Stdio::piped());
332 }
333
334 cmd.args(&args).current_dir(&opts.dir);
335
336 #[cfg(unix)]
337 if pty_pair.is_none() {
338 cmd.stdin(std::process::Stdio::null());
339 }
340
341 #[cfg(not(unix))]
342 cmd.stdin(std::process::Stdio::null());
343
344 if let Some(ref path) = *env::ORIGINAL_PATH {
346 cmd.env("PATH", path);
347 }
348
349 if let Some(ref env_vars) = opts.env {
351 cmd.envs(env_vars);
352 }
353
354 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
356 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
357 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
358
359 if !resolved_ports.is_empty() {
361 cmd.env("PORT", resolved_ports[0].to_string());
365 for (i, port) in resolved_ports.iter().enumerate() {
367 cmd.env(format!("PORT{i}"), port.to_string());
368 }
369 }
370
371 inject_proxy_env(&mut cmd, &opts.slug);
373
374 #[cfg(unix)]
375 {
376 let run_identity = run_identity.clone();
377 let use_pty = pty_pair.is_some();
378 unsafe {
379 cmd.pre_exec(move || {
380 nix::unistd::setsid().map_err(nix_to_io_error)?;
381
382 if use_pty {
386 let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
387 if ret < 0 {
388 #[cfg(target_os = "linux")]
391 eprintln!(
392 "pitchfork: TIOCSCTTY failed: {}",
393 std::io::Error::last_os_error()
394 );
395 }
396 }
397
398 apply_run_identity(&run_identity)?;
399 Ok(())
400 });
401 }
402 }
403
404 let mut child = cmd.spawn().into_diagnostic()?;
405 let pid = match child.id() {
406 Some(p) => p,
407 None => {
408 warn!("Daemon {id} exited before PID could be captured");
409 return Ok(IpcResponse::DaemonFailed {
410 error: "Process exited immediately".to_string(),
411 });
412 }
413 };
414 info!("started daemon {id} with pid {pid}");
415 PROCS.refresh_pids(&[pid]);
416 let daemon = self
417 .upsert_daemon(
418 UpsertDaemonOpts::builder(id.clone())
419 .set(|o| {
420 o.pid = Some(pid);
421 o.status = DaemonStatus::Running;
422 o.shell_pid = opts.shell_pid;
423 o.dir = Some(opts.dir.0.clone());
424 o.cmd = Some(original_cmd);
425 o.autostop = opts.autostop;
426 o.cron_schedule = opts.cron_schedule.clone();
427 o.cron_retrigger = opts.cron_retrigger;
428 o.retry = Some(opts.retry);
429 o.retry_count = Some(opts.retry_count);
430 o.ready_delay = opts.ready_delay;
431 o.ready_output = opts.ready_output.clone();
432 o.ready_http = opts.ready_http.clone();
433 o.ready_port = effective_ready_port;
434 o.ready_cmd = opts.ready_cmd.clone();
435 o.port = crate::config_types::PortConfig::from_parts(
436 expected_ports,
437 opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
438 );
439 o.resolved_port = resolved_ports;
440 o.depends = Some(opts.depends.clone());
441 o.env = opts.env.clone();
442 o.watch = Some(opts.watch.clone());
443 o.watch_mode = Some(opts.watch_mode);
444 o.watch_base_dir = opts.watch_base_dir.clone();
445 o.mise = opts.mise;
446 o.user = opts.user.clone();
447 o.memory_limit = opts.memory_limit;
448 o.cpu_limit = opts.cpu_limit;
449 o.stop_signal = opts.stop_signal;
450 o.pty = opts.pty;
451 })
452 .build(),
453 )
454 .await?;
455
456 let id_clone = id.clone();
457 let ready_delay = opts.ready_delay;
458 let ready_output = opts.ready_output.clone();
459 let ready_http = opts.ready_http.clone();
460 let ready_port = effective_ready_port;
461 let ready_cmd = opts.ready_cmd.clone();
462 let daemon_dir = opts.dir.0.clone();
463 let hook_retry_count = opts.retry_count;
464 let hook_retry = opts.retry;
465 let hook_daemon_env = opts.env.clone();
466 let on_output_hook = opts.on_output_hook.clone();
467 let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
473 || (settings().proxy.enable && is_daemon_slug_target(id));
474 let daemon_pid = pid;
475
476 #[cfg(unix)]
480 let pty_reader = pty_pair.map(|p| {
481 tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
482 .lines()
483 });
484 #[cfg(not(unix))]
485 let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
486 let stdout_reader = if pty_reader.is_none() {
487 child
488 .stdout
489 .take()
490 .map(|s| tokio::io::BufReader::new(s).lines())
491 } else {
492 None
493 };
494 let stderr_reader = if pty_reader.is_none() {
495 child
496 .stderr
497 .take()
498 .map(|s| tokio::io::BufReader::new(s).lines())
499 } else {
500 None
501 };
502
503 if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
504 error!("Failed to capture stdout/stderr for daemon {id}");
505 }
506
507 tokio::spawn(async move {
508 let id = id_clone;
509
510 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
512
513 if let Some(mut reader) = pty_reader {
514 tokio::spawn(async move {
518 while let Ok(Some(mut line)) = reader.next_line().await {
519 if line.ends_with('\r') {
521 line.pop();
522 }
523 if output_tx.send(line).await.is_err() {
524 break;
525 }
526 }
527 });
528 } else {
529 if let Some(mut stdout) = stdout_reader {
534 let tx = output_tx.clone();
535 tokio::spawn(async move {
536 while let Ok(Some(line)) = stdout.next_line().await {
537 if tx.send(line).await.is_err() {
538 break;
539 }
540 }
541 });
542 }
543 if let Some(mut stderr) = stderr_reader {
544 let tx = output_tx.clone();
545 tokio::spawn(async move {
546 while let Ok(Some(line)) = stderr.next_line().await {
547 if tx.send(line).await.is_err() {
548 break;
549 }
550 }
551 });
552 }
553 drop(output_tx);
555 }
556 let log_file = match tokio::fs::File::options()
557 .append(true)
558 .create(true)
559 .open(&log_path)
560 .await
561 {
562 Ok(f) => f,
563 Err(e) => {
564 error!("Failed to open log file for daemon {id}: {e}");
565 return;
566 }
567 };
568 let mut log_appender = BufWriter::new(log_file);
569
570 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
571 let format_line = |line: String| {
572 let line_for_log = line;
573 if line_for_log.starts_with(&format!("{id} ")) {
574 format!("{} {line_for_log}\n", now())
576 } else {
577 format!("{} {id} {line_for_log}\n", now())
578 }
579 };
580
581 let mut ready_notified = false;
583 let mut ready_tx = ready_tx;
584 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
585 let mut active_port_spawned = false;
587
588 let on_output_hook = match on_output_hook {
592 Some(ref hook) => match hook.validate(id.name()) {
593 Ok(()) => on_output_hook,
594 Err(e) => {
595 error!("{e}");
596 None
597 }
598 },
599 None => None,
600 };
601
602 let on_output_pattern: Option<regex::Regex> = on_output_hook
605 .as_ref()
606 .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
607 let on_output_debounce = on_output_hook
608 .as_ref()
609 .map(|h| h.debounce_duration())
610 .unwrap_or(Duration::from_millis(1000));
611 let mut on_output_last_fired: Option<std::time::Instant> = None;
613
614 let mut delay_timer =
615 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
616
617 let s = settings();
619 let ready_check_interval = s.supervisor_ready_check_interval();
620 let http_client_timeout = s.supervisor_http_client_timeout();
621 let log_flush_interval_duration = s.supervisor_log_flush_interval();
622
623 let mut http_check_interval = ready_http
625 .as_ref()
626 .map(|_| tokio::time::interval(ready_check_interval));
627 let http_client = ready_http.as_ref().map(|_| {
628 reqwest::Client::builder()
629 .timeout(http_client_timeout)
630 .build()
631 .unwrap_or_default()
632 });
633
634 let mut port_check_interval =
636 ready_port.map(|_| tokio::time::interval(ready_check_interval));
637
638 let mut cmd_check_interval = ready_cmd
640 .as_ref()
641 .map(|_| tokio::time::interval(ready_check_interval));
642
643 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
645
646 let (exit_tx, mut exit_rx) =
648 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
649
650 let child_pid = child.id().unwrap_or(0);
652 tokio::spawn(async move {
653 let result = child.wait().await;
654 #[cfg(all(unix, not(target_os = "linux")))]
664 let result = match &result {
665 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
666 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
667 warn!(
668 "daemon pid {child_pid} wait() got ECHILD; \
669 recovered exit code {code} from zombie reaper"
670 );
671 use std::os::unix::process::ExitStatusExt;
676 if code >= 0 {
677 Ok(std::process::ExitStatus::from_raw(code << 8))
678 } else {
679 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
681 }
682 } else {
683 warn!(
684 "daemon pid {child_pid} wait() got ECHILD but no \
685 stashed status found; reporting as error"
686 );
687 result
688 }
689 }
690 _ => result,
691 };
692 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
693 let _ = exit_tx.send(result).await;
694 });
695
696 #[allow(unused_assignments)]
697 let mut exit_status = None;
699
700 if has_port_config
706 && ready_pattern.is_none()
707 && ready_http.is_none()
708 && ready_port.is_none()
709 && ready_cmd.is_none()
710 && delay_timer.is_none()
711 {
712 active_port_spawned = true;
713 detect_and_store_active_port(id.clone(), daemon_pid);
714 }
715
716 loop {
717 select! {
718 Some(line) = output_rx.recv() => {
719 let formatted = format_line(line.clone());
720 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
721 error!("Failed to write to log for daemon {id}: {e}");
722 }
723 trace!("output: {id} {formatted}");
724
725 let line_clean = console::strip_ansi_codes(&line).to_string();
728
729 if !ready_notified
731 && let Some(ref pattern) = ready_pattern
732 && pattern.is_match(&line_clean)
733 {
734 info!("daemon {id} ready: output matched pattern");
735 ready_notified = true;
736 let _ = log_appender.flush().await;
737 if let Some(tx) = ready_tx.take() {
738 let _ = tx.send(Ok(()));
739 }
740 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
741 if !active_port_spawned && has_port_config {
742 active_port_spawned = true;
743 detect_and_store_active_port(id.clone(), daemon_pid);
744 }
745 }
746
747 if let Some(ref hook) = on_output_hook {
749 let matched = match (&hook.filter, &on_output_pattern) {
750 (Some(substr), _) => line_clean.contains(substr.as_str()),
751 (None, Some(re)) => re.is_match(&line_clean),
752 (None, None) => true,
753 };
754 if matched {
755 let now = std::time::Instant::now();
756 let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
757 if elapsed.is_none_or(|e| e >= on_output_debounce) {
758 on_output_last_fired = Some(now);
759 hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
760 }
761 }
762 }
763 }
764 Some(result) = exit_rx.recv() => {
765 exit_status = Some(result);
767 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
768 let _ = log_appender.flush().await;
770 if !ready_notified {
771 if let Some(tx) = ready_tx.take() {
772 let is_success = exit_status.as_ref()
774 .and_then(|r| r.as_ref().ok())
775 .map(|s| s.success())
776 .unwrap_or(false);
777
778 if is_success {
779 debug!("daemon {id} exited successfully before ready check, sending success notification");
780 let _ = tx.send(Ok(()));
781 } else {
782 let exit_code = exit_status.as_ref()
783 .and_then(|r| r.as_ref().ok())
784 .and_then(|s| s.code());
785 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
786 let _ = tx.send(Err(exit_code));
787 }
788 }
789 } else {
790 debug!("daemon {id} was already marked ready, not sending notification");
791 }
792 break;
793 },
794 _ = async {
795 if let Some(ref mut interval) = http_check_interval {
796 interval.tick().await;
797 } else {
798 std::future::pending::<()>().await;
799 }
800 }, if !ready_notified && ready_http.is_some() => {
801 if let (Some(http), Some(client)) = (&ready_http, &http_client) {
802 match client.get(&http.url).send().await {
803 Ok(response) if http.accepts_status(response.status().as_u16()) => {
804 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
805 ready_notified = true;
806 let _ = log_appender.flush().await;
807 if let Some(tx) = ready_tx.take() {
808 let _ = tx.send(Ok(()));
809 }
810 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
811 http_check_interval = None;
812 if !active_port_spawned && has_port_config {
813 active_port_spawned = true;
814 detect_and_store_active_port(id.clone(), daemon_pid);
815 }
816 }
817 Ok(response) => {
818 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
819 }
820 Err(e) => {
821 trace!("daemon {id} HTTP check failed: {e}");
822 }
823 }
824 }
825 }
826 _ = async {
827 if let Some(ref mut interval) = port_check_interval {
828 interval.tick().await;
829 } else {
830 std::future::pending::<()>().await;
831 }
832 }, if !ready_notified && ready_port.is_some() => {
833 if let Some(port) = ready_port {
834 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
835 Ok(_) => {
836 info!("daemon {id} ready: TCP port {port} is listening");
837 ready_notified = true;
838 let _ = log_appender.flush().await;
840 if let Some(tx) = ready_tx.take() {
841 let _ = tx.send(Ok(()));
842 }
843 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
844 port_check_interval = None;
846 if !active_port_spawned && has_port_config {
847 active_port_spawned = true;
848 detect_and_store_active_port(id.clone(), daemon_pid);
849 }
850 }
851 Err(_) => {
852 trace!("daemon {id} port check: port {port} not listening yet");
853 }
854 }
855 }
856 }
857 _ = async {
858 if let Some(ref mut interval) = cmd_check_interval {
859 interval.tick().await;
860 } else {
861 std::future::pending::<()>().await;
862 }
863 }, if !ready_notified && ready_cmd.is_some() => {
864 if let Some(ref cmd) = ready_cmd {
865 let mut command = Shell::default_for_platform().command(cmd);
867 command
868 .current_dir(&daemon_dir)
869 .stdout(std::process::Stdio::null())
870 .stderr(std::process::Stdio::null());
871 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
872 match result {
873 Ok(status) if status.success() => {
874 info!("daemon {id} ready: readiness command succeeded");
875 ready_notified = true;
876 let _ = log_appender.flush().await;
877 if let Some(tx) = ready_tx.take() {
878 let _ = tx.send(Ok(()));
879 }
880 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
881 cmd_check_interval = None;
883 if !active_port_spawned && has_port_config {
884 active_port_spawned = true;
885 detect_and_store_active_port(id.clone(), daemon_pid);
886 }
887 }
888 Ok(_) => {
889 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
890 }
891 Err(e) => {
892 trace!("daemon {id} cmd check failed: {e}");
893 }
894 }
895 }
896 }
897 _ = async {
898 if let Some(ref mut timer) = delay_timer {
899 timer.await;
900 } else {
901 std::future::pending::<()>().await;
902 }
903 } => {
904 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
905 info!("daemon {id} ready: delay elapsed");
906 ready_notified = true;
907 let _ = log_appender.flush().await;
909 if let Some(tx) = ready_tx.take() {
910 let _ = tx.send(Ok(()));
911 }
912 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
913 }
914 delay_timer = None;
916 if !active_port_spawned && has_port_config {
917 active_port_spawned = true;
918 detect_and_store_active_port(id.clone(), daemon_pid);
919 }
920 }
921 _ = log_flush_interval.tick() => {
922 if let Err(e) = log_appender.flush().await {
924 error!("Failed to flush log for daemon {id}: {e}");
925 }
926 }
927 }
928 }
929
930 if let Err(e) = log_appender.flush().await {
932 error!("Failed to final flush log for daemon {id}: {e}");
933 }
934
935 {
937 let mut state_file = SUPERVISOR.state_file.lock().await;
938 if let Some(d) = state_file.daemons.get_mut(&id) {
939 d.active_port = None;
940 }
941 if let Err(e) = state_file.write() {
942 debug!("Failed to write state after clearing active_port for {id}: {e}");
943 }
944 }
945
946 let exit_status = if let Some(status) = exit_status {
948 status
949 } else {
950 match exit_rx.recv().await {
952 Some(status) => status,
953 None => {
954 warn!("daemon {id} exit channel closed without receiving status");
955 Err(std::io::Error::other("exit channel closed"))
956 }
957 }
958 };
959 let current_daemon = SUPERVISOR.get_daemon(&id).await;
960
961 SUPERVISOR
966 .active_monitors
967 .fetch_add(1, atomic::Ordering::Release);
968 struct MonitorGuard;
969 impl Drop for MonitorGuard {
970 fn drop(&mut self) {
971 SUPERVISOR
972 .active_monitors
973 .fetch_sub(1, atomic::Ordering::Release);
974 SUPERVISOR.monitor_done.notify_waiters();
975 }
976 }
977 let _monitor_guard = MonitorGuard;
978 if current_daemon.is_none()
983 || current_daemon.as_ref().is_some_and(|d| {
984 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
985 })
986 {
987 return;
989 }
990 let already_stopped = current_daemon
995 .as_ref()
996 .is_some_and(|d| d.status.is_stopped());
997 let is_stopping = already_stopped
998 || current_daemon
999 .as_ref()
1000 .is_some_and(|d| d.status.is_stopping());
1001
1002 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
1004 (Ok(status), true) => {
1005 (status.code().unwrap_or(-1), "stop")
1009 }
1010 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
1011 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
1012 (Err(_), true) => {
1013 (-1, "stop")
1015 }
1016 (Err(_), false) => (-1, "fail"),
1017 };
1018
1019 if !already_stopped {
1021 if let Ok(status) = &exit_status {
1022 info!("daemon {id} exited with status {status}");
1023 }
1024 let (new_status, last_exit_success) = match exit_reason {
1025 "stop" | "exit" => (
1026 DaemonStatus::Stopped,
1027 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1028 ),
1029 _ => (DaemonStatus::Errored(exit_code), false),
1030 };
1031 if let Err(e) = SUPERVISOR
1032 .upsert_daemon(
1033 UpsertDaemonOpts::builder(id.clone())
1034 .set(|o| {
1035 o.pid = None;
1036 o.status = new_status;
1037 o.last_exit_success = Some(last_exit_success);
1038 })
1039 .build(),
1040 )
1041 .await
1042 {
1043 error!("Failed to update daemon state for {id}: {e}");
1044 }
1045 }
1046
1047 let hook_extra_env = vec![
1049 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1050 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1051 ];
1052
1053 let hooks_to_fire: Vec<HookType> = match exit_reason {
1055 "stop" => vec![HookType::OnStop, HookType::OnExit],
1056 "exit" => vec![HookType::OnExit],
1057 _ if hook_retry_count >= hook_retry.count() => {
1059 vec![HookType::OnFail, HookType::OnExit]
1060 }
1061 _ => vec![],
1062 };
1063
1064 for hook_type in hooks_to_fire {
1065 fire_hook(
1066 hook_type,
1067 id.clone(),
1068 daemon_dir.clone(),
1069 hook_retry_count,
1070 hook_daemon_env.clone(),
1071 hook_extra_env.clone(),
1072 )
1073 .await;
1074 }
1075 });
1076
1077 if let Some(ready_rx) = ready_rx {
1079 match ready_rx.await {
1080 Ok(Ok(())) => {
1081 info!("daemon {id} is ready");
1082 Ok(IpcResponse::DaemonReady { daemon })
1083 }
1084 Ok(Err(exit_code)) => {
1085 error!("daemon {id} failed before becoming ready");
1086 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1087 }
1088 Err(_) => {
1089 error!("readiness channel closed unexpectedly for daemon {id}");
1090 Ok(IpcResponse::DaemonStart { daemon })
1091 }
1092 }
1093 } else {
1094 Ok(IpcResponse::DaemonStart { daemon })
1095 }
1096 }
1097
1098 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1100 let pitchfork_id = DaemonId::pitchfork();
1101 if *id == pitchfork_id {
1102 return Ok(IpcResponse::Error(
1103 "Cannot stop supervisor via stop command".into(),
1104 ));
1105 }
1106 info!("stopping daemon: {id}");
1107 if let Some(daemon) = self.get_daemon(id).await {
1108 trace!("daemon to stop: {daemon}");
1109 if let Some(pid) = daemon.pid {
1110 trace!("killing pid: {pid}");
1111 PROCS.refresh_pids(&[pid]);
1112 if PROCS.is_running(pid) {
1113 self.upsert_daemon(
1115 UpsertDaemonOpts::builder(id.clone())
1116 .set(|o| {
1117 o.pid = Some(pid);
1118 o.status = DaemonStatus::Stopping;
1119 })
1120 .build(),
1121 )
1122 .await?;
1123
1124 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1127 let stop_signal: i32 = stop_cfg.signal.into();
1128 if let Err(e) = PROCS
1129 .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1130 .await
1131 {
1132 debug!("failed to kill pid {pid}: {e}");
1133 PROCS.refresh_processes();
1135 if PROCS.is_running(pid) {
1136 debug!("failed to stop pid {pid}: process still running after kill");
1138 self.upsert_daemon(
1139 UpsertDaemonOpts::builder(id.clone())
1140 .set(|o| {
1141 o.pid = Some(pid); o.status = DaemonStatus::Running;
1143 })
1144 .build(),
1145 )
1146 .await?;
1147 return Ok(IpcResponse::DaemonStopFailed {
1148 error: format!(
1149 "process {pid} still running after kill attempt: {e}"
1150 ),
1151 });
1152 }
1153 }
1154
1155 self.upsert_daemon(
1160 UpsertDaemonOpts::builder(id.clone())
1161 .set(|o| {
1162 o.pid = None;
1163 o.status = DaemonStatus::Stopped;
1164 o.last_exit_success = Some(true); })
1166 .build(),
1167 )
1168 .await?;
1169 } else {
1170 debug!("pid {pid} not running, process may have exited unexpectedly");
1171 self.upsert_daemon(
1174 UpsertDaemonOpts::builder(id.clone())
1175 .set(|o| {
1176 o.pid = None;
1177 o.status = DaemonStatus::Stopped;
1178 })
1179 .build(),
1180 )
1181 .await?;
1182 return Ok(IpcResponse::DaemonWasNotRunning);
1183 }
1184 Ok(IpcResponse::Ok)
1185 } else {
1186 debug!("daemon {id} not running");
1187 Ok(IpcResponse::DaemonNotRunning)
1188 }
1189 } else {
1190 debug!("daemon {id} not found");
1191 Ok(IpcResponse::DaemonNotFound)
1192 }
1193 }
1194}
1195
1196#[cfg(unix)]
1197fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1198 let settings_user = settings().supervisor.user.trim();
1199 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1200 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1201 let configured = daemon_user.or(settings_user);
1202 let current_uid = nix::unistd::Uid::effective().as_raw();
1203 let current_gid = nix::unistd::Gid::effective().as_raw();
1204 resolve_run_identity(
1205 configured,
1206 current_uid,
1207 current_gid,
1208 std::env::var("SUDO_UID").ok().as_deref(),
1209 std::env::var("SUDO_GID").ok().as_deref(),
1210 )
1211}
1212
1213#[cfg(unix)]
1214fn resolve_run_identity(
1215 configured: Option<&str>,
1216 current_uid: u32,
1217 current_gid: u32,
1218 sudo_uid: Option<&str>,
1219 sudo_gid: Option<&str>,
1220) -> Result<RunIdentity> {
1221 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1222 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1223 if let Some(user) = configured {
1224 let identity = resolve_configured_user(user)?;
1225 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1226 if identity.matches(current_uid, current_gid) {
1227 return Ok(RunIdentity::Inherit);
1228 }
1229 return Ok(identity);
1230 }
1231
1232 if current_uid.is_root()
1233 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1234 {
1235 return Ok(identity);
1236 }
1237
1238 Ok(RunIdentity::Inherit)
1239}
1240
1241#[cfg(unix)]
1242fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1243 if user.chars().all(|c| c.is_ascii_digit()) {
1244 let uid = user
1245 .parse::<u32>()
1246 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1247 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1248 .into_diagnostic()?
1249 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1250 return run_identity_from_user_record(user_record);
1251 }
1252
1253 let user_record = nix::unistd::User::from_name(user)
1254 .into_diagnostic()?
1255 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1256 run_identity_from_user_record(user_record)
1257}
1258
1259#[cfg(unix)]
1260fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1261 let username = CString::new(user.name)
1262 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1263 Ok(RunIdentity::Switch {
1264 uid: user.uid,
1265 gid: user.gid,
1266 username: Some(username),
1267 })
1268}
1269
1270#[cfg(unix)]
1271fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1272 RunIdentity::Switch {
1273 uid: nix::unistd::Uid::from_raw(uid),
1274 gid: nix::unistd::Gid::from_raw(gid),
1275 username,
1276 }
1277}
1278
1279#[cfg(unix)]
1280fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1281 let uid = sudo_uid?.parse::<u32>().ok()?;
1282 let gid = sudo_gid?.parse::<u32>().ok()?;
1283 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1284 .ok()
1285 .flatten()
1286 .and_then(|u| CString::new(u.name).ok());
1287 Some(run_identity_from_raw_ids(uid, gid, username))
1288}
1289
1290#[cfg(unix)]
1291fn ensure_can_use_identity(
1292 configured_user: &str,
1293 identity: &RunIdentity,
1294 current_uid: nix::unistd::Uid,
1295 current_gid: nix::unistd::Gid,
1296) -> Result<()> {
1297 let RunIdentity::Switch { uid, gid, .. } = identity else {
1298 return Ok(());
1299 };
1300 if *uid == current_uid && *gid == current_gid {
1301 return Ok(());
1302 }
1303 if current_uid.is_root() {
1304 return Ok(());
1305 }
1306 Err(miette::miette!(
1307 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1308 configured_user,
1309 current_uid.as_raw(),
1310 current_gid.as_raw(),
1311 uid.as_raw(),
1312 gid.as_raw()
1313 ))
1314}
1315
1316#[cfg(unix)]
1317fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1318 let RunIdentity::Switch { uid, gid, username } = identity else {
1319 return Ok(());
1320 };
1321 if let Some(username) = username {
1322 initgroups_for_user(username, *gid)?;
1323 } else {
1324 setgroups_to_primary(*gid)?;
1325 }
1326 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1327 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1328 Ok(())
1329}
1330
1331#[cfg(unix)]
1332impl RunIdentity {
1333 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1334 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1335 }
1336}
1337
1338#[cfg(unix)]
1339fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1340 let groups = [gid.as_raw() as libc::gid_t];
1341 #[cfg(any(target_os = "linux", target_os = "android"))]
1342 let group_count = groups.len();
1343 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1344 let group_count = groups.len() as libc::c_int;
1345 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1346 if rc == -1 {
1347 Err(std::io::Error::last_os_error())
1348 } else {
1349 Ok(())
1350 }
1351}
1352
1353#[cfg(unix)]
1354fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1355 let gid = gid.as_raw();
1356 #[cfg(any(
1357 target_os = "macos",
1358 target_os = "ios",
1359 target_os = "tvos",
1360 target_os = "watchos"
1361 ))]
1362 let base_gid = i32::try_from(gid)
1363 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1364
1365 #[cfg(not(any(
1366 target_os = "macos",
1367 target_os = "ios",
1368 target_os = "tvos",
1369 target_os = "watchos"
1370 )))]
1371 let base_gid = gid as libc::gid_t;
1372
1373 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1376 if rc == -1 {
1377 Err(std::io::Error::last_os_error())
1378 } else {
1379 Ok(())
1380 }
1381}
1382
1383#[cfg(unix)]
1384fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1385 std::io::Error::from_raw_os_error(err as i32)
1386}
1387
1388async fn check_ports_available(
1395 expected_ports: &[u16],
1396 auto_bump: bool,
1397 max_attempts: u32,
1398) -> Result<Vec<u16>> {
1399 if expected_ports.is_empty() {
1400 return Ok(Vec::new());
1401 }
1402
1403 for bump_offset in 0..=max_attempts {
1404 let candidate_ports: Vec<u16> = expected_ports
1406 .iter()
1407 .map(|&p| p.wrapping_add(bump_offset as u16))
1408 .collect();
1409
1410 let mut all_available = true;
1412 let mut conflicting_port = None;
1413
1414 for &port in &candidate_ports {
1415 if port == 0 {
1418 continue;
1419 }
1420
1421 if is_port_in_use(port).await {
1435 all_available = false;
1436 conflicting_port = Some(port);
1437 break;
1438 }
1439 }
1440
1441 if all_available {
1442 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1446 return Err(PortError::NoAvailablePort {
1447 start_port: expected_ports[0],
1448 attempts: bump_offset + 1,
1449 }
1450 .into());
1451 }
1452 if bump_offset > 0 {
1453 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1454 }
1455 return Ok(candidate_ports);
1456 }
1457
1458 if bump_offset == 0 && !auto_bump {
1460 if let Some(port) = conflicting_port {
1461 let (pid, process) = identify_port_owner(port).await;
1462 return Err(PortError::InUse { port, process, pid }.into());
1463 }
1464 }
1465 }
1466
1467 Err(PortError::NoAvailablePort {
1469 start_port: expected_ports[0],
1470 attempts: max_attempts + 1,
1471 }
1472 .into())
1473}
1474
1475async fn is_port_in_use(port: u16) -> bool {
1481 tokio::task::spawn_blocking(move || {
1482 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1483 match std::net::TcpListener::bind((addr, port)) {
1484 Ok(listener) => drop(listener),
1485 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1486 Err(_) => continue,
1487 }
1488 }
1489 false
1490 })
1491 .await
1492 .unwrap_or(false)
1493}
1494
1495async fn identify_port_owner(port: u16) -> (u32, String) {
1500 tokio::task::spawn_blocking(move || {
1501 listeners::get_all()
1502 .ok()
1503 .and_then(|list| {
1504 list.into_iter()
1505 .find(|l| l.socket.port() == port)
1506 .map(|l| (l.process.pid, l.process.name))
1507 })
1508 .unwrap_or((0, "unknown".to_string()))
1509 })
1510 .await
1511 .unwrap_or((0, "unknown".to_string()))
1512}
1513
1514async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1519 if !is_port_in_use(port).await {
1520 return None;
1521 }
1522 Some(identify_port_owner(port).await)
1523}
1524
1525fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1540 tokio::spawn(async move {
1541 for delay_ms in [500u64, 1000, 2000, 4000] {
1545 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1546
1547 let expected_port: Option<u16> = {
1550 let state_file = SUPERVISOR.state_file.lock().await;
1551 match state_file.daemons.get(&id) {
1552 Some(d) if d.pid.is_none() => {
1553 debug!("daemon {id}: aborting active_port detection — process exited");
1554 return;
1555 }
1556 Some(d) => d
1557 .port
1558 .as_ref()
1559 .and_then(|p| p.expect.first().copied())
1560 .filter(|&p| p > 0),
1561 None => None,
1562 }
1563 };
1564
1565 let active_port = tokio::task::spawn_blocking(move || {
1566 let listeners = listeners::get_all().ok()?;
1567 let process_ports: Vec<u16> = listeners
1568 .into_iter()
1569 .filter(|listener| listener.process.pid == pid)
1570 .map(|listener| listener.socket.port())
1571 .filter(|&port| port > 0)
1572 .collect();
1573
1574 if process_ports.is_empty() {
1575 return None;
1576 }
1577
1578 if let Some(ep) = expected_port {
1581 if process_ports.contains(&ep) {
1582 return Some(ep);
1583 }
1584 }
1585
1586 process_ports.into_iter().next()
1592 })
1593 .await
1594 .ok()
1595 .flatten();
1596
1597 if let Some(port) = active_port {
1598 debug!("daemon {id} active_port detected: {port}");
1599 let mut state_file = SUPERVISOR.state_file.lock().await;
1600 if let Some(d) = state_file.daemons.get_mut(&id) {
1601 if d.pid == Some(pid) {
1605 d.active_port = Some(port);
1606 } else {
1607 debug!(
1608 "daemon {id}: skipping active_port write — PID mismatch \
1609 (expected {pid}, current {:?})",
1610 d.pid
1611 );
1612 return;
1613 }
1614 }
1615 if let Err(e) = state_file.write() {
1616 debug!("Failed to write state after detecting active_port for {id}: {e}");
1617 }
1618 return;
1619 }
1620
1621 debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1622 }
1623
1624 debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1625 });
1626}
1627
1628fn is_daemon_slug_target(id: &DaemonId) -> bool {
1636 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1640 slugs.iter().any(|(slug, entry)| {
1641 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1642 id.name() == daemon_name
1643 })
1644}
1645
1646#[cfg(all(test, unix))]
1647mod tests {
1648 use super::*;
1649
1650 #[test]
1651 fn test_resolve_run_identity_empty_without_sudo() {
1652 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1653 assert_eq!(identity, RunIdentity::Inherit);
1654 }
1655
1656 #[test]
1657 fn test_resolve_run_identity_sudo_fallback() {
1658 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1659 let RunIdentity::Switch { uid, gid, .. } = identity else {
1660 panic!("expected identity switch");
1661 };
1662 assert_eq!(uid.as_raw(), 501);
1663 assert_eq!(gid.as_raw(), 20);
1664 }
1665
1666 #[test]
1667 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1668 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1669 assert_eq!(identity, RunIdentity::Inherit);
1670 }
1671
1672 #[test]
1673 fn test_resolve_configured_user_root_name() {
1674 let identity = resolve_configured_user("root").unwrap();
1675 let RunIdentity::Switch { uid, username, .. } = identity else {
1676 panic!("expected identity switch");
1677 };
1678 assert_eq!(uid.as_raw(), 0);
1679 assert_eq!(
1680 username.as_deref().and_then(|s| s.to_str().ok()),
1681 Some("root")
1682 );
1683 }
1684
1685 #[test]
1686 fn test_resolve_configured_user_root_uid() {
1687 let identity = resolve_configured_user("0").unwrap();
1688 let RunIdentity::Switch { uid, username, .. } = identity else {
1689 panic!("expected identity switch");
1690 };
1691 assert_eq!(uid.as_raw(), 0);
1692 assert_eq!(
1693 username.as_deref().and_then(|s| s.to_str().ok()),
1694 Some("root")
1695 );
1696 }
1697
1698 #[test]
1699 fn test_resolve_configured_user_missing_user_fails() {
1700 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1701 .unwrap_err()
1702 .to_string();
1703 assert!(err.contains("does not exist"));
1704 }
1705
1706 #[test]
1707 fn test_resolve_run_identity_requires_root_for_user_switch() {
1708 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1709 .unwrap_err()
1710 .to_string();
1711 assert!(err.contains("Restart the supervisor with sudo"));
1712 }
1713
1714 #[test]
1715 fn test_resolve_run_identity_same_user_is_noop() {
1716 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1717 assert_eq!(identity, RunIdentity::Inherit);
1718 }
1719}
1720
1721fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1730 let s = crate::settings::settings();
1731 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1732
1733 if should_force_loopback_host(slug) && !lan_enabled {
1734 cmd.env("HOST", "127.0.0.1");
1737 }
1738
1739 if let Some(url) = build_pitchfork_url(slug, s) {
1741 cmd.env("PITCHFORK_URL", &url);
1742 }
1743
1744 if s.proxy.enable && s.proxy.https {
1746 let ca_path = if s.proxy.tls_cert.is_empty() {
1747 crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1748 } else {
1749 std::path::PathBuf::from(&s.proxy.tls_cert)
1750 };
1751 if ca_path.exists() {
1752 cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1753 }
1754 }
1755
1756 if s.proxy.enable {
1758 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1759 cmd.env("__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS", format!(".{tld}"));
1760 }
1761
1762 if lan_enabled {
1764 cmd.env("PITCHFORK_LAN", "1");
1765 }
1766}
1767
1768fn should_force_loopback_host(slug: &Option<String>) -> bool {
1769 let Some(slug) = slug.as_deref() else {
1770 return false;
1771 };
1772
1773 let s = crate::settings::settings();
1774 if !s.proxy.enable {
1775 return false;
1776 }
1777
1778 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1779 slugs.contains_key(slug)
1780}
1781
1782fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1786 let slug = slug.as_ref()?;
1787 if !s.proxy.enable {
1788 return None;
1789 }
1790 let scheme = if s.proxy.https { "https" } else { "http" };
1791 let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1792 let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1793 String::new()
1794 } else {
1795 format!(":{port}")
1796 };
1797 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
1798 let tld = if lan_enabled { "local" } else { &s.proxy.tld };
1799 Some(format!("{scheme}://{slug}.{tld}{port_suffix}",))
1800}