1use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39 Inherit,
40 Switch {
41 uid: nix::unistd::Uid,
42 gid: nix::unistd::Gid,
43 username: Option<CString>,
44 },
45}
46
47pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50 if let Some(re) = cache.get(pattern) {
51 return Some(re.clone());
52 }
53 match Regex::new(pattern) {
54 Ok(re) => {
55 cache.insert(pattern.to_string(), re.clone());
56 Some(re)
57 }
58 Err(e) => {
59 error!("invalid regex pattern '{pattern}': {e}");
60 None
61 }
62 }
63}
64
65impl Supervisor {
66 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68 let id = &opts.id;
69 let cmd = opts.cmd.clone();
70
71 {
73 let mut pending = self.pending_autostops.lock().await;
74 if pending.remove(id).is_some() {
75 info!("cleared pending autostop for {id} (daemon starting)");
76 }
77 }
78
79 let daemon = self.get_daemon(id).await;
80 if let Some(daemon) = daemon {
81 if !daemon.status.is_stopping()
84 && !daemon.status.is_stopped()
85 && let Some(pid) = daemon.pid
86 {
87 if opts.force {
88 self.stop(id).await?;
89 info!("run: stop completed for daemon {id}");
90 } else {
91 warn!("daemon {id} already running with pid {pid}");
92 return Ok(IpcResponse::DaemonAlreadyRunning);
93 }
94 }
95 }
96
97 if opts.wait_ready && opts.retry.count() > 0 {
99 let max_attempts = opts.retry.count().saturating_add(1);
101 for attempt in 0..max_attempts {
102 let mut retry_opts = opts.clone();
103 retry_opts.retry_count = attempt;
104 retry_opts.cmd = cmd.clone();
105
106 let result = self.run_once(retry_opts).await?;
107
108 match result {
109 IpcResponse::DaemonReady { daemon } => {
110 return Ok(IpcResponse::DaemonReady { daemon });
111 }
112 IpcResponse::DaemonFailedWithCode { exit_code } => {
113 if attempt < opts.retry.count() {
114 let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115 info!(
116 "daemon {id} failed (attempt {}/{}), retrying in {}s",
117 attempt + 1,
118 max_attempts,
119 backoff_secs
120 );
121 fire_hook(
122 HookType::OnRetry,
123 id.clone(),
124 opts.dir.0.clone(),
125 attempt + 1,
126 opts.env.clone(),
127 vec![],
128 )
129 .await;
130 time::sleep(Duration::from_secs(backoff_secs)).await;
131 continue;
132 } else {
133 info!("daemon {id} failed after {max_attempts} attempts");
134 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135 }
136 }
137 other => return Ok(other),
138 }
139 }
140 }
141
142 self.run_once(opts).await
144 }
145
146 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148 let id = &opts.id;
149 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
151
152 let (ready_tx, ready_rx) = if opts.wait_ready {
154 let (tx, rx) = oneshot::channel();
155 (Some(tx), Some(rx))
156 } else {
157 (None, None)
158 };
159
160 let expected_ports = opts
162 .port
163 .as_ref()
164 .map(|p| p.expect.clone())
165 .unwrap_or_default();
166 let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
167 let port_cfg = opts.port.as_ref().unwrap();
168 match check_ports_available(
169 &expected_ports,
170 port_cfg.auto_bump(),
171 port_cfg.max_bump_attempts(),
172 )
173 .await
174 {
175 Ok(resolved) => {
176 let ready_port = if let Some(configured_port) = opts.ready_port {
177 let bump_offset = resolved
179 .first()
180 .unwrap_or(&0)
181 .saturating_sub(*expected_ports.first().unwrap_or(&0));
182 if expected_ports.contains(&configured_port) && bump_offset > 0 {
183 configured_port
184 .checked_add(bump_offset)
185 .or(Some(configured_port))
186 } else {
187 Some(configured_port)
188 }
189 } else if opts.ready_output.is_none()
190 && opts.ready_http.is_none()
191 && opts.ready_cmd.is_none()
192 && opts.ready_delay.is_none()
193 {
194 resolved.first().copied().filter(|&p| p != 0)
198 } else {
199 None
203 };
204 info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
205 (resolved, ready_port)
206 }
207 Err(e) => {
208 error!("daemon {id}: port check failed: {e}");
209 if let Some(port_error) = e.downcast_ref::<PortError>() {
211 match port_error {
212 PortError::InUse { port, process, pid } => {
213 return Ok(IpcResponse::PortConflict {
214 port: *port,
215 process: process.clone(),
216 pid: *pid,
217 });
218 }
219 PortError::NoAvailablePort {
220 start_port,
221 attempts,
222 } => {
223 return Ok(IpcResponse::NoAvailablePort {
224 start_port: *start_port,
225 attempts: *attempts,
226 });
227 }
228 }
229 }
230 return Ok(IpcResponse::DaemonFailed {
231 error: e.to_string(),
232 });
233 }
234 }
235 } else {
236 if let Some(port) = opts.ready_port {
242 if port > 0 {
243 if let Some((pid, process)) = detect_port_conflict(port).await {
244 return Ok(IpcResponse::PortConflict { port, process, pid });
245 }
246 }
247 }
248 (Vec::new(), opts.ready_port)
249 };
250
251 let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
252 match settings().resolve_mise_bin() {
253 Some(mise_bin) => {
254 let mise_bin_str = mise_bin.to_string_lossy().to_string();
255 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
256 once("exec".to_string())
257 .chain(once(mise_bin_str))
258 .chain(once("x".to_string()))
259 .chain(once("--".to_string()))
260 .chain(cmd)
261 .collect_vec()
262 }
263 None => {
264 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
265 once("exec".to_string()).chain(cmd).collect_vec()
266 }
267 }
268 } else {
269 once("exec".to_string()).chain(cmd).collect_vec()
270 };
271 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
272 let log_path = id.log_path();
273 if let Some(parent) = log_path.parent() {
274 xx::file::mkdirp(parent)?;
275 }
276 #[cfg(unix)]
277 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
278 Ok(identity) => identity,
279 Err(e) => {
280 return Ok(IpcResponse::DaemonFailed {
281 error: e.to_string(),
282 });
283 }
284 };
285 info!("run: spawning daemon {id} with args: {args:?}");
286
287 #[cfg(unix)]
289 let pty_pair = if opts.pty.unwrap_or(false) {
290 match super::pty::openpty() {
291 Ok(pair) => {
292 info!("daemon {id}: allocated PTY (pty = true)");
293 Some(pair)
294 }
295 Err(e) => {
296 warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
297 None
298 }
299 }
300 } else {
301 None
302 };
303
304 let mut cmd = tokio::process::Command::new("sh");
305
306 #[cfg(unix)]
307 if let Some(ref pair) = pty_pair {
308 let slave_file = std::fs::File::from(
312 pair.slave
313 .try_clone()
314 .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
315 );
316 cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
317 |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
318 )?));
319 cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
320 |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
321 )?));
322 cmd.stderr(std::process::Stdio::from(slave_file));
323 } else {
324 cmd.stdout(std::process::Stdio::piped())
325 .stderr(std::process::Stdio::piped());
326 }
327
328 #[cfg(not(unix))]
329 {
330 cmd.stdout(std::process::Stdio::piped())
331 .stderr(std::process::Stdio::piped());
332 }
333
334 cmd.args(&args).current_dir(&opts.dir);
335
336 #[cfg(unix)]
337 if pty_pair.is_none() {
338 cmd.stdin(std::process::Stdio::null());
339 }
340
341 #[cfg(not(unix))]
342 cmd.stdin(std::process::Stdio::null());
343
344 if let Some(ref path) = *env::ORIGINAL_PATH {
346 cmd.env("PATH", path);
347 }
348
349 if let Some(ref env_vars) = opts.env {
351 cmd.envs(env_vars);
352 }
353
354 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
356 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
357 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
358
359 if !resolved_ports.is_empty() {
361 cmd.env("PORT", resolved_ports[0].to_string());
365 for (i, port) in resolved_ports.iter().enumerate() {
367 cmd.env(format!("PORT{i}"), port.to_string());
368 }
369 }
370
371 inject_proxy_env(&mut cmd, &opts.slug);
373
374 #[cfg(unix)]
375 {
376 let run_identity = run_identity.clone();
377 let use_pty = pty_pair.is_some();
378 unsafe {
379 cmd.pre_exec(move || {
380 nix::unistd::setsid().map_err(nix_to_io_error)?;
381
382 if use_pty {
386 let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
387 if ret < 0 {
388 #[cfg(target_os = "linux")]
391 eprintln!(
392 "pitchfork: TIOCSCTTY failed: {}",
393 std::io::Error::last_os_error()
394 );
395 }
396 }
397
398 apply_run_identity(&run_identity)?;
399 Ok(())
400 });
401 }
402 }
403
404 let mut child = cmd.spawn().into_diagnostic()?;
405 let pid = match child.id() {
406 Some(p) => p,
407 None => {
408 warn!("Daemon {id} exited before PID could be captured");
409 return Ok(IpcResponse::DaemonFailed {
410 error: "Process exited immediately".to_string(),
411 });
412 }
413 };
414 info!("started daemon {id} with pid {pid}");
415 let daemon = self
416 .upsert_daemon(
417 UpsertDaemonOpts::builder(id.clone())
418 .set(|o| {
419 o.pid = Some(pid);
420 o.status = DaemonStatus::Running;
421 o.shell_pid = opts.shell_pid;
422 o.dir = Some(opts.dir.0.clone());
423 o.cmd = Some(original_cmd);
424 o.autostop = opts.autostop;
425 o.cron_schedule = opts.cron_schedule.clone();
426 o.cron_retrigger = opts.cron_retrigger;
427 o.retry = Some(opts.retry);
428 o.retry_count = Some(opts.retry_count);
429 o.ready_delay = opts.ready_delay;
430 o.ready_output = opts.ready_output.clone();
431 o.ready_http = opts.ready_http.clone();
432 o.ready_port = effective_ready_port;
433 o.ready_cmd = opts.ready_cmd.clone();
434 o.port = crate::config_types::PortConfig::from_parts(
435 expected_ports,
436 opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
437 );
438 o.resolved_port = resolved_ports;
439 o.depends = Some(opts.depends.clone());
440 o.env = opts.env.clone();
441 o.watch = Some(opts.watch.clone());
442 o.watch_mode = Some(opts.watch_mode);
443 o.watch_base_dir = opts.watch_base_dir.clone();
444 o.mise = opts.mise;
445 o.user = opts.user.clone();
446 o.memory_limit = opts.memory_limit;
447 o.cpu_limit = opts.cpu_limit;
448 o.stop_signal = opts.stop_signal;
449 o.pty = opts.pty;
450 })
451 .build(),
452 )
453 .await?;
454
455 let id_clone = id.clone();
456 let ready_delay = opts.ready_delay;
457 let ready_output = opts.ready_output.clone();
458 let ready_http = opts.ready_http.clone();
459 let ready_port = effective_ready_port;
460 let ready_cmd = opts.ready_cmd.clone();
461 let daemon_dir = opts.dir.0.clone();
462 let hook_retry_count = opts.retry_count;
463 let hook_retry = opts.retry;
464 let hook_daemon_env = opts.env.clone();
465 let on_output_hook = opts.on_output_hook.clone();
466 let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
472 || (settings().proxy.enable && is_daemon_slug_target(id));
473 let daemon_pid = pid;
474
475 #[cfg(unix)]
479 let pty_reader = pty_pair.map(|p| {
480 tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
481 .lines()
482 });
483 #[cfg(not(unix))]
484 let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
485 let stdout_reader = if pty_reader.is_none() {
486 child
487 .stdout
488 .take()
489 .map(|s| tokio::io::BufReader::new(s).lines())
490 } else {
491 None
492 };
493 let stderr_reader = if pty_reader.is_none() {
494 child
495 .stderr
496 .take()
497 .map(|s| tokio::io::BufReader::new(s).lines())
498 } else {
499 None
500 };
501
502 if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
503 error!("Failed to capture stdout/stderr for daemon {id}");
504 }
505
506 tokio::spawn(async move {
507 let id = id_clone;
508
509 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
511
512 if let Some(mut reader) = pty_reader {
513 tokio::spawn(async move {
517 while let Ok(Some(mut line)) = reader.next_line().await {
518 if line.ends_with('\r') {
520 line.pop();
521 }
522 if output_tx.send(line).await.is_err() {
523 break;
524 }
525 }
526 });
527 } else {
528 if let Some(mut stdout) = stdout_reader {
533 let tx = output_tx.clone();
534 tokio::spawn(async move {
535 while let Ok(Some(line)) = stdout.next_line().await {
536 if tx.send(line).await.is_err() {
537 break;
538 }
539 }
540 });
541 }
542 if let Some(mut stderr) = stderr_reader {
543 let tx = output_tx.clone();
544 tokio::spawn(async move {
545 while let Ok(Some(line)) = stderr.next_line().await {
546 if tx.send(line).await.is_err() {
547 break;
548 }
549 }
550 });
551 }
552 drop(output_tx);
554 }
555 let log_file = match tokio::fs::File::options()
556 .append(true)
557 .create(true)
558 .open(&log_path)
559 .await
560 {
561 Ok(f) => f,
562 Err(e) => {
563 error!("Failed to open log file for daemon {id}: {e}");
564 return;
565 }
566 };
567 let mut log_appender = BufWriter::new(log_file);
568
569 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
570 let format_line = |line: String| {
571 let line_for_log = line;
572 if line_for_log.starts_with(&format!("{id} ")) {
573 format!("{} {line_for_log}\n", now())
575 } else {
576 format!("{} {id} {line_for_log}\n", now())
577 }
578 };
579
580 let mut ready_notified = false;
582 let mut ready_tx = ready_tx;
583 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
584 let mut active_port_spawned = false;
586
587 let on_output_hook = match on_output_hook {
591 Some(ref hook) => match hook.validate(id.name()) {
592 Ok(()) => on_output_hook,
593 Err(e) => {
594 error!("{e}");
595 None
596 }
597 },
598 None => None,
599 };
600
601 let on_output_pattern: Option<regex::Regex> = on_output_hook
604 .as_ref()
605 .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
606 let on_output_debounce = on_output_hook
607 .as_ref()
608 .map(|h| h.debounce_duration())
609 .unwrap_or(Duration::from_millis(1000));
610 let mut on_output_last_fired: Option<std::time::Instant> = None;
612
613 let mut delay_timer =
614 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
615
616 let s = settings();
618 let ready_check_interval = s.supervisor_ready_check_interval();
619 let http_client_timeout = s.supervisor_http_client_timeout();
620 let log_flush_interval_duration = s.supervisor_log_flush_interval();
621
622 let mut http_check_interval = ready_http
624 .as_ref()
625 .map(|_| tokio::time::interval(ready_check_interval));
626 let http_client = ready_http.as_ref().map(|_| {
627 reqwest::Client::builder()
628 .timeout(http_client_timeout)
629 .build()
630 .unwrap_or_default()
631 });
632
633 let mut port_check_interval =
635 ready_port.map(|_| tokio::time::interval(ready_check_interval));
636
637 let mut cmd_check_interval = ready_cmd
639 .as_ref()
640 .map(|_| tokio::time::interval(ready_check_interval));
641
642 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
644
645 let (exit_tx, mut exit_rx) =
647 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
648
649 let child_pid = child.id().unwrap_or(0);
651 tokio::spawn(async move {
652 let result = child.wait().await;
653 #[cfg(all(unix, not(target_os = "linux")))]
663 let result = match &result {
664 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
665 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
666 warn!(
667 "daemon pid {child_pid} wait() got ECHILD; \
668 recovered exit code {code} from zombie reaper"
669 );
670 use std::os::unix::process::ExitStatusExt;
675 if code >= 0 {
676 Ok(std::process::ExitStatus::from_raw(code << 8))
677 } else {
678 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
680 }
681 } else {
682 warn!(
683 "daemon pid {child_pid} wait() got ECHILD but no \
684 stashed status found; reporting as error"
685 );
686 result
687 }
688 }
689 _ => result,
690 };
691 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
692 let _ = exit_tx.send(result).await;
693 });
694
695 #[allow(unused_assignments)]
696 let mut exit_status = None;
698
699 if has_port_config
705 && ready_pattern.is_none()
706 && ready_http.is_none()
707 && ready_port.is_none()
708 && ready_cmd.is_none()
709 && delay_timer.is_none()
710 {
711 active_port_spawned = true;
712 detect_and_store_active_port(id.clone(), daemon_pid);
713 }
714
715 loop {
716 select! {
717 Some(line) = output_rx.recv() => {
718 let formatted = format_line(line.clone());
719 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
720 error!("Failed to write to log for daemon {id}: {e}");
721 }
722 trace!("output: {id} {formatted}");
723
724 let line_clean = console::strip_ansi_codes(&line).to_string();
727
728 if !ready_notified
730 && let Some(ref pattern) = ready_pattern
731 && pattern.is_match(&line_clean)
732 {
733 info!("daemon {id} ready: output matched pattern");
734 ready_notified = true;
735 let _ = log_appender.flush().await;
736 if let Some(tx) = ready_tx.take() {
737 let _ = tx.send(Ok(()));
738 }
739 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
740 if !active_port_spawned && has_port_config {
741 active_port_spawned = true;
742 detect_and_store_active_port(id.clone(), daemon_pid);
743 }
744 }
745
746 if let Some(ref hook) = on_output_hook {
748 let matched = match (&hook.filter, &on_output_pattern) {
749 (Some(substr), _) => line_clean.contains(substr.as_str()),
750 (None, Some(re)) => re.is_match(&line_clean),
751 (None, None) => true,
752 };
753 if matched {
754 let now = std::time::Instant::now();
755 let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
756 if elapsed.is_none_or(|e| e >= on_output_debounce) {
757 on_output_last_fired = Some(now);
758 hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
759 }
760 }
761 }
762 }
763 Some(result) = exit_rx.recv() => {
764 exit_status = Some(result);
766 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
767 let _ = log_appender.flush().await;
769 if !ready_notified {
770 if let Some(tx) = ready_tx.take() {
771 let is_success = exit_status.as_ref()
773 .and_then(|r| r.as_ref().ok())
774 .map(|s| s.success())
775 .unwrap_or(false);
776
777 if is_success {
778 debug!("daemon {id} exited successfully before ready check, sending success notification");
779 let _ = tx.send(Ok(()));
780 } else {
781 let exit_code = exit_status.as_ref()
782 .and_then(|r| r.as_ref().ok())
783 .and_then(|s| s.code());
784 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
785 let _ = tx.send(Err(exit_code));
786 }
787 }
788 } else {
789 debug!("daemon {id} was already marked ready, not sending notification");
790 }
791 break;
792 },
793 _ = async {
794 if let Some(ref mut interval) = http_check_interval {
795 interval.tick().await;
796 } else {
797 std::future::pending::<()>().await;
798 }
799 }, if !ready_notified && ready_http.is_some() => {
800 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
801 match client.get(url).send().await {
802 Ok(response) if response.status().is_success() => {
803 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
804 ready_notified = true;
805 let _ = log_appender.flush().await;
806 if let Some(tx) = ready_tx.take() {
807 let _ = tx.send(Ok(()));
808 }
809 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
810 http_check_interval = None;
811 if !active_port_spawned && has_port_config {
812 active_port_spawned = true;
813 detect_and_store_active_port(id.clone(), daemon_pid);
814 }
815 }
816 Ok(response) => {
817 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
818 }
819 Err(e) => {
820 trace!("daemon {id} HTTP check failed: {e}");
821 }
822 }
823 }
824 }
825 _ = async {
826 if let Some(ref mut interval) = port_check_interval {
827 interval.tick().await;
828 } else {
829 std::future::pending::<()>().await;
830 }
831 }, if !ready_notified && ready_port.is_some() => {
832 if let Some(port) = ready_port {
833 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
834 Ok(_) => {
835 info!("daemon {id} ready: TCP port {port} is listening");
836 ready_notified = true;
837 let _ = log_appender.flush().await;
839 if let Some(tx) = ready_tx.take() {
840 let _ = tx.send(Ok(()));
841 }
842 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
843 port_check_interval = None;
845 if !active_port_spawned && has_port_config {
846 active_port_spawned = true;
847 detect_and_store_active_port(id.clone(), daemon_pid);
848 }
849 }
850 Err(_) => {
851 trace!("daemon {id} port check: port {port} not listening yet");
852 }
853 }
854 }
855 }
856 _ = async {
857 if let Some(ref mut interval) = cmd_check_interval {
858 interval.tick().await;
859 } else {
860 std::future::pending::<()>().await;
861 }
862 }, if !ready_notified && ready_cmd.is_some() => {
863 if let Some(ref cmd) = ready_cmd {
864 let mut command = Shell::default_for_platform().command(cmd);
866 command
867 .current_dir(&daemon_dir)
868 .stdout(std::process::Stdio::null())
869 .stderr(std::process::Stdio::null());
870 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
871 match result {
872 Ok(status) if status.success() => {
873 info!("daemon {id} ready: readiness command succeeded");
874 ready_notified = true;
875 let _ = log_appender.flush().await;
876 if let Some(tx) = ready_tx.take() {
877 let _ = tx.send(Ok(()));
878 }
879 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
880 cmd_check_interval = None;
882 if !active_port_spawned && has_port_config {
883 active_port_spawned = true;
884 detect_and_store_active_port(id.clone(), daemon_pid);
885 }
886 }
887 Ok(_) => {
888 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
889 }
890 Err(e) => {
891 trace!("daemon {id} cmd check failed: {e}");
892 }
893 }
894 }
895 }
896 _ = async {
897 if let Some(ref mut timer) = delay_timer {
898 timer.await;
899 } else {
900 std::future::pending::<()>().await;
901 }
902 } => {
903 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
904 info!("daemon {id} ready: delay elapsed");
905 ready_notified = true;
906 let _ = log_appender.flush().await;
908 if let Some(tx) = ready_tx.take() {
909 let _ = tx.send(Ok(()));
910 }
911 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
912 }
913 delay_timer = None;
915 if !active_port_spawned && has_port_config {
916 active_port_spawned = true;
917 detect_and_store_active_port(id.clone(), daemon_pid);
918 }
919 }
920 _ = log_flush_interval.tick() => {
921 if let Err(e) = log_appender.flush().await {
923 error!("Failed to flush log for daemon {id}: {e}");
924 }
925 }
926 }
927 }
928
929 if let Err(e) = log_appender.flush().await {
931 error!("Failed to final flush log for daemon {id}: {e}");
932 }
933
934 {
936 let mut state_file = SUPERVISOR.state_file.lock().await;
937 if let Some(d) = state_file.daemons.get_mut(&id) {
938 d.active_port = None;
939 }
940 if let Err(e) = state_file.write() {
941 debug!("Failed to write state after clearing active_port for {id}: {e}");
942 }
943 }
944
945 let exit_status = if let Some(status) = exit_status {
947 status
948 } else {
949 match exit_rx.recv().await {
951 Some(status) => status,
952 None => {
953 warn!("daemon {id} exit channel closed without receiving status");
954 Err(std::io::Error::other("exit channel closed"))
955 }
956 }
957 };
958 let current_daemon = SUPERVISOR.get_daemon(&id).await;
959
960 SUPERVISOR
965 .active_monitors
966 .fetch_add(1, atomic::Ordering::Release);
967 struct MonitorGuard;
968 impl Drop for MonitorGuard {
969 fn drop(&mut self) {
970 SUPERVISOR
971 .active_monitors
972 .fetch_sub(1, atomic::Ordering::Release);
973 SUPERVISOR.monitor_done.notify_waiters();
974 }
975 }
976 let _monitor_guard = MonitorGuard;
977 if current_daemon.is_none()
982 || current_daemon.as_ref().is_some_and(|d| {
983 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
984 })
985 {
986 return;
988 }
989 let already_stopped = current_daemon
994 .as_ref()
995 .is_some_and(|d| d.status.is_stopped());
996 let is_stopping = already_stopped
997 || current_daemon
998 .as_ref()
999 .is_some_and(|d| d.status.is_stopping());
1000
1001 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
1003 (Ok(status), true) => {
1004 (status.code().unwrap_or(-1), "stop")
1008 }
1009 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
1010 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
1011 (Err(_), true) => {
1012 (-1, "stop")
1014 }
1015 (Err(_), false) => (-1, "fail"),
1016 };
1017
1018 if !already_stopped {
1020 if let Ok(status) = &exit_status {
1021 info!("daemon {id} exited with status {status}");
1022 }
1023 let (new_status, last_exit_success) = match exit_reason {
1024 "stop" | "exit" => (
1025 DaemonStatus::Stopped,
1026 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1027 ),
1028 _ => (DaemonStatus::Errored(exit_code), false),
1029 };
1030 if let Err(e) = SUPERVISOR
1031 .upsert_daemon(
1032 UpsertDaemonOpts::builder(id.clone())
1033 .set(|o| {
1034 o.pid = None;
1035 o.status = new_status;
1036 o.last_exit_success = Some(last_exit_success);
1037 })
1038 .build(),
1039 )
1040 .await
1041 {
1042 error!("Failed to update daemon state for {id}: {e}");
1043 }
1044 }
1045
1046 let hook_extra_env = vec![
1048 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1049 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1050 ];
1051
1052 let hooks_to_fire: Vec<HookType> = match exit_reason {
1054 "stop" => vec![HookType::OnStop, HookType::OnExit],
1055 "exit" => vec![HookType::OnExit],
1056 _ if hook_retry_count >= hook_retry.count() => {
1058 vec![HookType::OnFail, HookType::OnExit]
1059 }
1060 _ => vec![],
1061 };
1062
1063 for hook_type in hooks_to_fire {
1064 fire_hook(
1065 hook_type,
1066 id.clone(),
1067 daemon_dir.clone(),
1068 hook_retry_count,
1069 hook_daemon_env.clone(),
1070 hook_extra_env.clone(),
1071 )
1072 .await;
1073 }
1074 });
1075
1076 if let Some(ready_rx) = ready_rx {
1078 match ready_rx.await {
1079 Ok(Ok(())) => {
1080 info!("daemon {id} is ready");
1081 Ok(IpcResponse::DaemonReady { daemon })
1082 }
1083 Ok(Err(exit_code)) => {
1084 error!("daemon {id} failed before becoming ready");
1085 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1086 }
1087 Err(_) => {
1088 error!("readiness channel closed unexpectedly for daemon {id}");
1089 Ok(IpcResponse::DaemonStart { daemon })
1090 }
1091 }
1092 } else {
1093 Ok(IpcResponse::DaemonStart { daemon })
1094 }
1095 }
1096
1097 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1099 let pitchfork_id = DaemonId::pitchfork();
1100 if *id == pitchfork_id {
1101 return Ok(IpcResponse::Error(
1102 "Cannot stop supervisor via stop command".into(),
1103 ));
1104 }
1105 info!("stopping daemon: {id}");
1106 if let Some(daemon) = self.get_daemon(id).await {
1107 trace!("daemon to stop: {daemon}");
1108 if let Some(pid) = daemon.pid {
1109 trace!("killing pid: {pid}");
1110 PROCS.refresh_processes();
1111 if PROCS.is_running(pid) {
1112 self.upsert_daemon(
1114 UpsertDaemonOpts::builder(id.clone())
1115 .set(|o| {
1116 o.pid = Some(pid);
1117 o.status = DaemonStatus::Stopping;
1118 })
1119 .build(),
1120 )
1121 .await?;
1122
1123 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1126 let stop_signal: i32 = stop_cfg.signal.into();
1127 if let Err(e) = PROCS
1128 .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1129 .await
1130 {
1131 debug!("failed to kill pid {pid}: {e}");
1132 PROCS.refresh_processes();
1134 if PROCS.is_running(pid) {
1135 debug!("failed to stop pid {pid}: process still running after kill");
1137 self.upsert_daemon(
1138 UpsertDaemonOpts::builder(id.clone())
1139 .set(|o| {
1140 o.pid = Some(pid); o.status = DaemonStatus::Running;
1142 })
1143 .build(),
1144 )
1145 .await?;
1146 return Ok(IpcResponse::DaemonStopFailed {
1147 error: format!(
1148 "process {pid} still running after kill attempt: {e}"
1149 ),
1150 });
1151 }
1152 }
1153
1154 self.upsert_daemon(
1159 UpsertDaemonOpts::builder(id.clone())
1160 .set(|o| {
1161 o.pid = None;
1162 o.status = DaemonStatus::Stopped;
1163 o.last_exit_success = Some(true); })
1165 .build(),
1166 )
1167 .await?;
1168 } else {
1169 debug!("pid {pid} not running, process may have exited unexpectedly");
1170 self.upsert_daemon(
1173 UpsertDaemonOpts::builder(id.clone())
1174 .set(|o| {
1175 o.pid = None;
1176 o.status = DaemonStatus::Stopped;
1177 })
1178 .build(),
1179 )
1180 .await?;
1181 return Ok(IpcResponse::DaemonWasNotRunning);
1182 }
1183 Ok(IpcResponse::Ok)
1184 } else {
1185 debug!("daemon {id} not running");
1186 Ok(IpcResponse::DaemonNotRunning)
1187 }
1188 } else {
1189 debug!("daemon {id} not found");
1190 Ok(IpcResponse::DaemonNotFound)
1191 }
1192 }
1193}
1194
1195#[cfg(unix)]
1196fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1197 let settings_user = settings().supervisor.user.trim();
1198 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1199 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1200 let configured = daemon_user.or(settings_user);
1201 let current_uid = nix::unistd::Uid::effective().as_raw();
1202 let current_gid = nix::unistd::Gid::effective().as_raw();
1203 resolve_run_identity(
1204 configured,
1205 current_uid,
1206 current_gid,
1207 std::env::var("SUDO_UID").ok().as_deref(),
1208 std::env::var("SUDO_GID").ok().as_deref(),
1209 )
1210}
1211
1212#[cfg(unix)]
1213fn resolve_run_identity(
1214 configured: Option<&str>,
1215 current_uid: u32,
1216 current_gid: u32,
1217 sudo_uid: Option<&str>,
1218 sudo_gid: Option<&str>,
1219) -> Result<RunIdentity> {
1220 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1221 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1222 if let Some(user) = configured {
1223 let identity = resolve_configured_user(user)?;
1224 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1225 if identity.matches(current_uid, current_gid) {
1226 return Ok(RunIdentity::Inherit);
1227 }
1228 return Ok(identity);
1229 }
1230
1231 if current_uid.is_root()
1232 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1233 {
1234 return Ok(identity);
1235 }
1236
1237 Ok(RunIdentity::Inherit)
1238}
1239
1240#[cfg(unix)]
1241fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1242 if user.chars().all(|c| c.is_ascii_digit()) {
1243 let uid = user
1244 .parse::<u32>()
1245 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1246 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1247 .into_diagnostic()?
1248 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1249 return run_identity_from_user_record(user_record);
1250 }
1251
1252 let user_record = nix::unistd::User::from_name(user)
1253 .into_diagnostic()?
1254 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1255 run_identity_from_user_record(user_record)
1256}
1257
1258#[cfg(unix)]
1259fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1260 let username = CString::new(user.name)
1261 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1262 Ok(RunIdentity::Switch {
1263 uid: user.uid,
1264 gid: user.gid,
1265 username: Some(username),
1266 })
1267}
1268
1269#[cfg(unix)]
1270fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1271 RunIdentity::Switch {
1272 uid: nix::unistd::Uid::from_raw(uid),
1273 gid: nix::unistd::Gid::from_raw(gid),
1274 username,
1275 }
1276}
1277
1278#[cfg(unix)]
1279fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1280 let uid = sudo_uid?.parse::<u32>().ok()?;
1281 let gid = sudo_gid?.parse::<u32>().ok()?;
1282 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1283 .ok()
1284 .flatten()
1285 .and_then(|u| CString::new(u.name).ok());
1286 Some(run_identity_from_raw_ids(uid, gid, username))
1287}
1288
1289#[cfg(unix)]
1290fn ensure_can_use_identity(
1291 configured_user: &str,
1292 identity: &RunIdentity,
1293 current_uid: nix::unistd::Uid,
1294 current_gid: nix::unistd::Gid,
1295) -> Result<()> {
1296 let RunIdentity::Switch { uid, gid, .. } = identity else {
1297 return Ok(());
1298 };
1299 if *uid == current_uid && *gid == current_gid {
1300 return Ok(());
1301 }
1302 if current_uid.is_root() {
1303 return Ok(());
1304 }
1305 Err(miette::miette!(
1306 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1307 configured_user,
1308 current_uid.as_raw(),
1309 current_gid.as_raw(),
1310 uid.as_raw(),
1311 gid.as_raw()
1312 ))
1313}
1314
1315#[cfg(unix)]
1316fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1317 let RunIdentity::Switch { uid, gid, username } = identity else {
1318 return Ok(());
1319 };
1320 if let Some(username) = username {
1321 initgroups_for_user(username, *gid)?;
1322 } else {
1323 setgroups_to_primary(*gid)?;
1324 }
1325 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1326 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1327 Ok(())
1328}
1329
1330#[cfg(unix)]
1331impl RunIdentity {
1332 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1333 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1334 }
1335}
1336
1337#[cfg(unix)]
1338fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1339 let groups = [gid.as_raw() as libc::gid_t];
1340 #[cfg(any(target_os = "linux", target_os = "android"))]
1341 let group_count = groups.len();
1342 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1343 let group_count = groups.len() as libc::c_int;
1344 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1345 if rc == -1 {
1346 Err(std::io::Error::last_os_error())
1347 } else {
1348 Ok(())
1349 }
1350}
1351
1352#[cfg(unix)]
1353fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1354 let gid = gid.as_raw();
1355 #[cfg(any(
1356 target_os = "macos",
1357 target_os = "ios",
1358 target_os = "tvos",
1359 target_os = "watchos"
1360 ))]
1361 let base_gid = i32::try_from(gid)
1362 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1363
1364 #[cfg(not(any(
1365 target_os = "macos",
1366 target_os = "ios",
1367 target_os = "tvos",
1368 target_os = "watchos"
1369 )))]
1370 let base_gid = gid as libc::gid_t;
1371
1372 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1375 if rc == -1 {
1376 Err(std::io::Error::last_os_error())
1377 } else {
1378 Ok(())
1379 }
1380}
1381
1382#[cfg(unix)]
1383fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1384 std::io::Error::from_raw_os_error(err as i32)
1385}
1386
1387async fn check_ports_available(
1394 expected_ports: &[u16],
1395 auto_bump: bool,
1396 max_attempts: u32,
1397) -> Result<Vec<u16>> {
1398 if expected_ports.is_empty() {
1399 return Ok(Vec::new());
1400 }
1401
1402 for bump_offset in 0..=max_attempts {
1403 let candidate_ports: Vec<u16> = expected_ports
1405 .iter()
1406 .map(|&p| p.wrapping_add(bump_offset as u16))
1407 .collect();
1408
1409 let mut all_available = true;
1411 let mut conflicting_port = None;
1412
1413 for &port in &candidate_ports {
1414 if port == 0 {
1417 continue;
1418 }
1419
1420 if is_port_in_use(port).await {
1434 all_available = false;
1435 conflicting_port = Some(port);
1436 break;
1437 }
1438 }
1439
1440 if all_available {
1441 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1445 return Err(PortError::NoAvailablePort {
1446 start_port: expected_ports[0],
1447 attempts: bump_offset + 1,
1448 }
1449 .into());
1450 }
1451 if bump_offset > 0 {
1452 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1453 }
1454 return Ok(candidate_ports);
1455 }
1456
1457 if bump_offset == 0 && !auto_bump {
1459 if let Some(port) = conflicting_port {
1460 let (pid, process) = identify_port_owner(port).await;
1461 return Err(PortError::InUse { port, process, pid }.into());
1462 }
1463 }
1464 }
1465
1466 Err(PortError::NoAvailablePort {
1468 start_port: expected_ports[0],
1469 attempts: max_attempts + 1,
1470 }
1471 .into())
1472}
1473
1474async fn is_port_in_use(port: u16) -> bool {
1480 tokio::task::spawn_blocking(move || {
1481 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1482 match std::net::TcpListener::bind((addr, port)) {
1483 Ok(listener) => drop(listener),
1484 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1485 Err(_) => continue,
1486 }
1487 }
1488 false
1489 })
1490 .await
1491 .unwrap_or(false)
1492}
1493
1494async fn identify_port_owner(port: u16) -> (u32, String) {
1499 tokio::task::spawn_blocking(move || {
1500 listeners::get_all()
1501 .ok()
1502 .and_then(|list| {
1503 list.into_iter()
1504 .find(|l| l.socket.port() == port)
1505 .map(|l| (l.process.pid, l.process.name))
1506 })
1507 .unwrap_or((0, "unknown".to_string()))
1508 })
1509 .await
1510 .unwrap_or((0, "unknown".to_string()))
1511}
1512
1513async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1518 if !is_port_in_use(port).await {
1519 return None;
1520 }
1521 Some(identify_port_owner(port).await)
1522}
1523
1524fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1539 tokio::spawn(async move {
1540 for delay_ms in [500u64, 1000, 2000, 4000] {
1544 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1545
1546 let expected_port: Option<u16> = {
1549 let state_file = SUPERVISOR.state_file.lock().await;
1550 match state_file.daemons.get(&id) {
1551 Some(d) if d.pid.is_none() => {
1552 debug!("daemon {id}: aborting active_port detection — process exited");
1553 return;
1554 }
1555 Some(d) => d
1556 .port
1557 .as_ref()
1558 .and_then(|p| p.expect.first().copied())
1559 .filter(|&p| p > 0),
1560 None => None,
1561 }
1562 };
1563
1564 let active_port = tokio::task::spawn_blocking(move || {
1565 let listeners = listeners::get_all().ok()?;
1566 let process_ports: Vec<u16> = listeners
1567 .into_iter()
1568 .filter(|listener| listener.process.pid == pid)
1569 .map(|listener| listener.socket.port())
1570 .filter(|&port| port > 0)
1571 .collect();
1572
1573 if process_ports.is_empty() {
1574 return None;
1575 }
1576
1577 if let Some(ep) = expected_port {
1580 if process_ports.contains(&ep) {
1581 return Some(ep);
1582 }
1583 }
1584
1585 process_ports.into_iter().next()
1591 })
1592 .await
1593 .ok()
1594 .flatten();
1595
1596 if let Some(port) = active_port {
1597 debug!("daemon {id} active_port detected: {port}");
1598 let mut state_file = SUPERVISOR.state_file.lock().await;
1599 if let Some(d) = state_file.daemons.get_mut(&id) {
1600 if d.pid == Some(pid) {
1604 d.active_port = Some(port);
1605 } else {
1606 debug!(
1607 "daemon {id}: skipping active_port write — PID mismatch \
1608 (expected {pid}, current {:?})",
1609 d.pid
1610 );
1611 return;
1612 }
1613 }
1614 if let Err(e) = state_file.write() {
1615 debug!("Failed to write state after detecting active_port for {id}: {e}");
1616 }
1617 return;
1618 }
1619
1620 debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1621 }
1622
1623 debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1624 });
1625}
1626
1627fn is_daemon_slug_target(id: &DaemonId) -> bool {
1635 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1639 slugs.iter().any(|(slug, entry)| {
1640 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1641 id.name() == daemon_name
1642 })
1643}
1644
1645#[cfg(all(test, unix))]
1646mod tests {
1647 use super::*;
1648
1649 #[test]
1650 fn test_resolve_run_identity_empty_without_sudo() {
1651 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1652 assert_eq!(identity, RunIdentity::Inherit);
1653 }
1654
1655 #[test]
1656 fn test_resolve_run_identity_sudo_fallback() {
1657 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1658 let RunIdentity::Switch { uid, gid, .. } = identity else {
1659 panic!("expected identity switch");
1660 };
1661 assert_eq!(uid.as_raw(), 501);
1662 assert_eq!(gid.as_raw(), 20);
1663 }
1664
1665 #[test]
1666 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1667 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1668 assert_eq!(identity, RunIdentity::Inherit);
1669 }
1670
1671 #[test]
1672 fn test_resolve_configured_user_root_name() {
1673 let identity = resolve_configured_user("root").unwrap();
1674 let RunIdentity::Switch { uid, username, .. } = identity else {
1675 panic!("expected identity switch");
1676 };
1677 assert_eq!(uid.as_raw(), 0);
1678 assert_eq!(
1679 username.as_deref().and_then(|s| s.to_str().ok()),
1680 Some("root")
1681 );
1682 }
1683
1684 #[test]
1685 fn test_resolve_configured_user_root_uid() {
1686 let identity = resolve_configured_user("0").unwrap();
1687 let RunIdentity::Switch { uid, username, .. } = identity else {
1688 panic!("expected identity switch");
1689 };
1690 assert_eq!(uid.as_raw(), 0);
1691 assert_eq!(
1692 username.as_deref().and_then(|s| s.to_str().ok()),
1693 Some("root")
1694 );
1695 }
1696
1697 #[test]
1698 fn test_resolve_configured_user_missing_user_fails() {
1699 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1700 .unwrap_err()
1701 .to_string();
1702 assert!(err.contains("does not exist"));
1703 }
1704
1705 #[test]
1706 fn test_resolve_run_identity_requires_root_for_user_switch() {
1707 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1708 .unwrap_err()
1709 .to_string();
1710 assert!(err.contains("Restart the supervisor with sudo"));
1711 }
1712
1713 #[test]
1714 fn test_resolve_run_identity_same_user_is_noop() {
1715 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1716 assert_eq!(identity, RunIdentity::Inherit);
1717 }
1718}
1719
1720fn inject_proxy_env(cmd: &mut tokio::process::Command, slug: &Option<String>) {
1728 let s = crate::settings::settings();
1729
1730 if should_force_loopback_host(slug) {
1731 cmd.env("HOST", "127.0.0.1");
1733 }
1734
1735 if let Some(url) = build_pitchfork_url(slug, s) {
1737 cmd.env("PITCHFORK_URL", &url);
1738 }
1739
1740 if s.proxy.enable && s.proxy.https {
1742 let ca_path = if s.proxy.tls_cert.is_empty() {
1743 crate::env::PITCHFORK_STATE_DIR.join("proxy").join("ca.pem")
1744 } else {
1745 std::path::PathBuf::from(&s.proxy.tls_cert)
1746 };
1747 if ca_path.exists() {
1748 cmd.env("NODE_EXTRA_CA_CERTS", ca_path.to_string_lossy().to_string());
1749 }
1750 }
1751
1752 if s.proxy.enable {
1754 cmd.env(
1755 "__VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS",
1756 format!(".{}", s.proxy.tld),
1757 );
1758 }
1759}
1760
1761fn should_force_loopback_host(slug: &Option<String>) -> bool {
1762 let Some(slug) = slug.as_deref() else {
1763 return false;
1764 };
1765
1766 let s = crate::settings::settings();
1767 if !s.proxy.enable {
1768 return false;
1769 }
1770
1771 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1772 slugs.contains_key(slug)
1773}
1774
1775fn build_pitchfork_url(slug: &Option<String>, s: &crate::settings::Settings) -> Option<String> {
1779 let slug = slug.as_ref()?;
1780 if !s.proxy.enable {
1781 return None;
1782 }
1783 let scheme = if s.proxy.https { "https" } else { "http" };
1784 let port = u16::try_from(s.proxy.port).ok().filter(|&p| p > 0)?;
1785 let port_suffix = if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1786 String::new()
1787 } else {
1788 format!(":{port}")
1789 };
1790 Some(format!(
1791 "{scheme}://{slug}.{tld}{port_suffix}",
1792 tld = s.proxy.tld
1793 ))
1794}