1use super::hooks::{self, HookType, fire_hook};
6use super::{SUPERVISOR, Supervisor};
7use crate::daemon::RunOptions;
8use crate::daemon_id::DaemonId;
9use crate::daemon_status::DaemonStatus;
10use crate::error::PortError;
11use crate::ipc::IpcResponse;
12use crate::procs::PROCS;
13use crate::settings::settings;
14use crate::shell::Shell;
15use crate::supervisor::state::UpsertDaemonOpts;
16use crate::{Result, env};
17use itertools::Itertools;
18use miette::IntoDiagnostic;
19use once_cell::sync::Lazy;
20use regex::Regex;
21use std::collections::HashMap;
22#[cfg(unix)]
23use std::ffi::CString;
24use std::iter::once;
25use std::sync::atomic;
26use std::time::Duration;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
28use tokio::select;
29use tokio::sync::oneshot;
30use tokio::time;
31
32static REGEX_CACHE: Lazy<std::sync::Mutex<HashMap<String, Regex>>> =
34 Lazy::new(|| std::sync::Mutex::new(HashMap::new()));
35
36#[cfg(unix)]
37#[derive(Clone, Debug, PartialEq, Eq)]
38enum RunIdentity {
39 Inherit,
40 Switch {
41 uid: nix::unistd::Uid,
42 gid: nix::unistd::Gid,
43 username: Option<CString>,
44 },
45}
46
47pub(crate) fn get_or_compile_regex(pattern: &str) -> Option<Regex> {
49 let mut cache = REGEX_CACHE.lock().unwrap_or_else(|e| e.into_inner());
50 if let Some(re) = cache.get(pattern) {
51 return Some(re.clone());
52 }
53 match Regex::new(pattern) {
54 Ok(re) => {
55 cache.insert(pattern.to_string(), re.clone());
56 Some(re)
57 }
58 Err(e) => {
59 error!("invalid regex pattern '{pattern}': {e}");
60 None
61 }
62 }
63}
64
65impl Supervisor {
66 pub async fn run(&self, opts: RunOptions) -> Result<IpcResponse> {
68 let id = &opts.id;
69 let cmd = opts.cmd.clone();
70
71 {
73 let mut pending = self.pending_autostops.lock().await;
74 if pending.remove(id).is_some() {
75 info!("cleared pending autostop for {id} (daemon starting)");
76 }
77 }
78
79 let daemon = self.get_daemon(id).await;
80 if let Some(daemon) = daemon {
81 if !daemon.status.is_stopping()
84 && !daemon.status.is_stopped()
85 && let Some(pid) = daemon.pid
86 {
87 if opts.force {
88 self.stop(id).await?;
89 info!("run: stop completed for daemon {id}");
90 } else {
91 warn!("daemon {id} already running with pid {pid}");
92 return Ok(IpcResponse::DaemonAlreadyRunning);
93 }
94 }
95 }
96
97 if opts.wait_ready && opts.retry.count() > 0 {
99 let max_attempts = opts.retry.count().saturating_add(1);
101 for attempt in 0..max_attempts {
102 let mut retry_opts = opts.clone();
103 retry_opts.retry_count = attempt;
104 retry_opts.cmd = cmd.clone();
105
106 let result = self.run_once(retry_opts).await?;
107
108 match result {
109 IpcResponse::DaemonReady { daemon } => {
110 return Ok(IpcResponse::DaemonReady { daemon });
111 }
112 IpcResponse::DaemonFailedWithCode { exit_code } => {
113 if attempt < opts.retry.count() {
114 let backoff_secs = 2u64.saturating_pow(attempt).min(3600);
115 info!(
116 "daemon {id} failed (attempt {}/{}), retrying in {}s",
117 attempt + 1,
118 max_attempts,
119 backoff_secs
120 );
121 fire_hook(
122 HookType::OnRetry,
123 id.clone(),
124 opts.dir.0.clone(),
125 attempt + 1,
126 opts.env.clone(),
127 vec![],
128 )
129 .await;
130 time::sleep(Duration::from_secs(backoff_secs)).await;
131 continue;
132 } else {
133 info!("daemon {id} failed after {max_attempts} attempts");
134 return Ok(IpcResponse::DaemonFailedWithCode { exit_code });
135 }
136 }
137 other => return Ok(other),
138 }
139 }
140 }
141
142 self.run_once(opts).await
144 }
145
146 pub(crate) async fn run_once(&self, opts: RunOptions) -> Result<IpcResponse> {
148 let id = &opts.id;
149 let original_cmd = opts.cmd.clone(); let cmd = opts.cmd;
151
152 let (ready_tx, ready_rx) = if opts.wait_ready {
154 let (tx, rx) = oneshot::channel();
155 (Some(tx), Some(rx))
156 } else {
157 (None, None)
158 };
159
160 let expected_ports = opts
162 .port
163 .as_ref()
164 .map(|p| p.expect.clone())
165 .unwrap_or_default();
166 let (resolved_ports, effective_ready_port) = if !expected_ports.is_empty() {
167 let port_cfg = opts.port.as_ref().unwrap();
168 match check_ports_available(
169 &expected_ports,
170 port_cfg.auto_bump(),
171 port_cfg.max_bump_attempts(),
172 )
173 .await
174 {
175 Ok(resolved) => {
176 let ready_port = if let Some(configured_port) = opts.ready_port {
177 let bump_offset = resolved
179 .first()
180 .unwrap_or(&0)
181 .saturating_sub(*expected_ports.first().unwrap_or(&0));
182 if expected_ports.contains(&configured_port) && bump_offset > 0 {
183 configured_port
184 .checked_add(bump_offset)
185 .or(Some(configured_port))
186 } else {
187 Some(configured_port)
188 }
189 } else if opts.ready_output.is_none()
190 && opts.ready_http.is_none()
191 && opts.ready_cmd.is_none()
192 && opts.ready_delay.is_none()
193 {
194 resolved.first().copied().filter(|&p| p != 0)
198 } else {
199 None
203 };
204 info!("daemon {id}: ports {expected_ports:?} resolved to {resolved:?}");
205 (resolved, ready_port)
206 }
207 Err(e) => {
208 error!("daemon {id}: port check failed: {e}");
209 if let Some(port_error) = e.downcast_ref::<PortError>() {
211 match port_error {
212 PortError::InUse { port, process, pid } => {
213 return Ok(IpcResponse::PortConflict {
214 port: *port,
215 process: process.clone(),
216 pid: *pid,
217 });
218 }
219 PortError::NoAvailablePort {
220 start_port,
221 attempts,
222 } => {
223 return Ok(IpcResponse::NoAvailablePort {
224 start_port: *start_port,
225 attempts: *attempts,
226 });
227 }
228 }
229 }
230 return Ok(IpcResponse::DaemonFailed {
231 error: e.to_string(),
232 });
233 }
234 }
235 } else {
236 if let Some(port) = opts.ready_port {
242 if port > 0 {
243 if let Some((pid, process)) = detect_port_conflict(port).await {
244 return Ok(IpcResponse::PortConflict { port, process, pid });
245 }
246 }
247 }
248 (Vec::new(), opts.ready_port)
249 };
250
251 let cmd: Vec<String> = if opts.mise.unwrap_or(settings().general.mise) {
252 match settings().resolve_mise_bin() {
253 Some(mise_bin) => {
254 let mise_bin_str = mise_bin.to_string_lossy().to_string();
255 info!("daemon {id}: wrapping command with mise ({mise_bin_str})");
256 once("exec".to_string())
257 .chain(once(mise_bin_str))
258 .chain(once("x".to_string()))
259 .chain(once("--".to_string()))
260 .chain(cmd)
261 .collect_vec()
262 }
263 None => {
264 warn!("daemon {id}: mise=true but mise binary not found, running without mise");
265 once("exec".to_string()).chain(cmd).collect_vec()
266 }
267 }
268 } else {
269 once("exec".to_string()).chain(cmd).collect_vec()
270 };
271 let args = vec!["-c".to_string(), shell_words::join(&cmd)];
272 let log_path = id.log_path();
273 if let Some(parent) = log_path.parent() {
274 xx::file::mkdirp(parent)?;
275 }
276 #[cfg(unix)]
277 let run_identity = match resolve_effective_run_identity(opts.user.as_deref()) {
278 Ok(identity) => identity,
279 Err(e) => {
280 return Ok(IpcResponse::DaemonFailed {
281 error: e.to_string(),
282 });
283 }
284 };
285 info!("run: spawning daemon {id} with args: {args:?}");
286
287 #[cfg(unix)]
289 let pty_pair = if opts.pty.unwrap_or(false) {
290 match super::pty::openpty() {
291 Ok(pair) => {
292 info!("daemon {id}: allocated PTY (pty = true)");
293 Some(pair)
294 }
295 Err(e) => {
296 warn!("daemon {id}: failed to allocate PTY, falling back to pipes: {e}");
297 None
298 }
299 }
300 } else {
301 None
302 };
303
304 let mut cmd = tokio::process::Command::new("sh");
305
306 #[cfg(unix)]
307 if let Some(ref pair) = pty_pair {
308 let slave_file = std::fs::File::from(
312 pair.slave
313 .try_clone()
314 .map_err(|e| miette::miette!("failed to dup slave PTY fd: {e}"))?,
315 );
316 cmd.stdin(std::process::Stdio::from(slave_file.try_clone().map_err(
317 |e| miette::miette!("failed to clone slave PTY fd for stdin: {e}"),
318 )?));
319 cmd.stdout(std::process::Stdio::from(slave_file.try_clone().map_err(
320 |e| miette::miette!("failed to clone slave PTY fd for stdout: {e}"),
321 )?));
322 cmd.stderr(std::process::Stdio::from(slave_file));
323 } else {
324 cmd.stdout(std::process::Stdio::piped())
325 .stderr(std::process::Stdio::piped());
326 }
327
328 #[cfg(not(unix))]
329 {
330 cmd.stdout(std::process::Stdio::piped())
331 .stderr(std::process::Stdio::piped());
332 }
333
334 cmd.args(&args).current_dir(&opts.dir);
335
336 #[cfg(unix)]
337 if pty_pair.is_none() {
338 cmd.stdin(std::process::Stdio::null());
339 }
340
341 #[cfg(not(unix))]
342 cmd.stdin(std::process::Stdio::null());
343
344 if let Some(ref path) = *env::ORIGINAL_PATH {
346 cmd.env("PATH", path);
347 }
348
349 if let Some(ref env_vars) = opts.env {
351 cmd.envs(env_vars);
352 }
353
354 cmd.env("PITCHFORK_DAEMON_ID", id.qualified());
356 cmd.env("PITCHFORK_DAEMON_NAMESPACE", id.namespace());
357 cmd.env("PITCHFORK_RETRY_COUNT", opts.retry_count.to_string());
358
359 if !resolved_ports.is_empty() {
361 cmd.env("PORT", resolved_ports[0].to_string());
365 for (i, port) in resolved_ports.iter().enumerate() {
367 cmd.env(format!("PORT{i}"), port.to_string());
368 }
369 }
370
371 #[cfg(unix)]
372 {
373 let run_identity = run_identity.clone();
374 let use_pty = pty_pair.is_some();
375 unsafe {
376 cmd.pre_exec(move || {
377 nix::unistd::setsid().map_err(nix_to_io_error)?;
378
379 if use_pty {
383 let ret = libc::ioctl(0, libc::TIOCSCTTY as libc::c_ulong, 0);
384 if ret < 0 {
385 #[cfg(target_os = "linux")]
388 eprintln!(
389 "pitchfork: TIOCSCTTY failed: {}",
390 std::io::Error::last_os_error()
391 );
392 }
393 }
394
395 apply_run_identity(&run_identity)?;
396 Ok(())
397 });
398 }
399 }
400
401 let mut child = cmd.spawn().into_diagnostic()?;
402 let pid = match child.id() {
403 Some(p) => p,
404 None => {
405 warn!("Daemon {id} exited before PID could be captured");
406 return Ok(IpcResponse::DaemonFailed {
407 error: "Process exited immediately".to_string(),
408 });
409 }
410 };
411 info!("started daemon {id} with pid {pid}");
412 let daemon = self
413 .upsert_daemon(
414 UpsertDaemonOpts::builder(id.clone())
415 .set(|o| {
416 o.pid = Some(pid);
417 o.status = DaemonStatus::Running;
418 o.shell_pid = opts.shell_pid;
419 o.dir = Some(opts.dir.0.clone());
420 o.cmd = Some(original_cmd);
421 o.autostop = opts.autostop;
422 o.cron_schedule = opts.cron_schedule.clone();
423 o.cron_retrigger = opts.cron_retrigger;
424 o.retry = Some(opts.retry);
425 o.retry_count = Some(opts.retry_count);
426 o.ready_delay = opts.ready_delay;
427 o.ready_output = opts.ready_output.clone();
428 o.ready_http = opts.ready_http.clone();
429 o.ready_port = effective_ready_port;
430 o.ready_cmd = opts.ready_cmd.clone();
431 o.port = crate::config_types::PortConfig::from_parts(
432 expected_ports,
433 opts.port.as_ref().map(|p| p.bump).unwrap_or_default(),
434 );
435 o.resolved_port = resolved_ports;
436 o.depends = Some(opts.depends.clone());
437 o.env = opts.env.clone();
438 o.watch = Some(opts.watch.clone());
439 o.watch_mode = Some(opts.watch_mode);
440 o.watch_base_dir = opts.watch_base_dir.clone();
441 o.mise = opts.mise;
442 o.user = opts.user.clone();
443 o.memory_limit = opts.memory_limit;
444 o.cpu_limit = opts.cpu_limit;
445 o.stop_signal = opts.stop_signal;
446 o.pty = opts.pty;
447 })
448 .build(),
449 )
450 .await?;
451
452 let id_clone = id.clone();
453 let ready_delay = opts.ready_delay;
454 let ready_output = opts.ready_output.clone();
455 let ready_http = opts.ready_http.clone();
456 let ready_port = effective_ready_port;
457 let ready_cmd = opts.ready_cmd.clone();
458 let daemon_dir = opts.dir.0.clone();
459 let hook_retry_count = opts.retry_count;
460 let hook_retry = opts.retry;
461 let hook_daemon_env = opts.env.clone();
462 let on_output_hook = opts.on_output_hook.clone();
463 let has_port_config = opts.port.as_ref().is_some_and(|p| !p.expect.is_empty())
469 || (settings().proxy.enable && is_daemon_slug_target(id));
470 let daemon_pid = pid;
471
472 #[cfg(unix)]
476 let pty_reader = pty_pair.map(|p| {
477 tokio::io::BufReader::new(tokio::fs::File::from_std(std::fs::File::from(p.master)))
478 .lines()
479 });
480 #[cfg(not(unix))]
481 let pty_reader: Option<tokio::io::Lines<tokio::io::BufReader<tokio::fs::File>>> = None;
482 let stdout_reader = if pty_reader.is_none() {
483 child
484 .stdout
485 .take()
486 .map(|s| tokio::io::BufReader::new(s).lines())
487 } else {
488 None
489 };
490 let stderr_reader = if pty_reader.is_none() {
491 child
492 .stderr
493 .take()
494 .map(|s| tokio::io::BufReader::new(s).lines())
495 } else {
496 None
497 };
498
499 if pty_reader.is_none() && (stdout_reader.is_none() || stderr_reader.is_none()) {
500 error!("Failed to capture stdout/stderr for daemon {id}");
501 }
502
503 tokio::spawn(async move {
504 let id = id_clone;
505
506 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<String>(256);
508
509 if let Some(mut reader) = pty_reader {
510 tokio::spawn(async move {
514 while let Ok(Some(mut line)) = reader.next_line().await {
515 if line.ends_with('\r') {
517 line.pop();
518 }
519 if output_tx.send(line).await.is_err() {
520 break;
521 }
522 }
523 });
524 } else {
525 if let Some(mut stdout) = stdout_reader {
530 let tx = output_tx.clone();
531 tokio::spawn(async move {
532 while let Ok(Some(line)) = stdout.next_line().await {
533 if tx.send(line).await.is_err() {
534 break;
535 }
536 }
537 });
538 }
539 if let Some(mut stderr) = stderr_reader {
540 let tx = output_tx.clone();
541 tokio::spawn(async move {
542 while let Ok(Some(line)) = stderr.next_line().await {
543 if tx.send(line).await.is_err() {
544 break;
545 }
546 }
547 });
548 }
549 drop(output_tx);
551 }
552 let log_file = match tokio::fs::File::options()
553 .append(true)
554 .create(true)
555 .open(&log_path)
556 .await
557 {
558 Ok(f) => f,
559 Err(e) => {
560 error!("Failed to open log file for daemon {id}: {e}");
561 return;
562 }
563 };
564 let mut log_appender = BufWriter::new(log_file);
565
566 let now = || chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
567 let format_line = |line: String| {
568 let line_for_log = line;
569 if line_for_log.starts_with(&format!("{id} ")) {
570 format!("{} {line_for_log}\n", now())
572 } else {
573 format!("{} {id} {line_for_log}\n", now())
574 }
575 };
576
577 let mut ready_notified = false;
579 let mut ready_tx = ready_tx;
580 let ready_pattern = ready_output.as_ref().and_then(|p| get_or_compile_regex(p));
581 let mut active_port_spawned = false;
583
584 let on_output_hook = match on_output_hook {
588 Some(ref hook) => match hook.validate(id.name()) {
589 Ok(()) => on_output_hook,
590 Err(e) => {
591 error!("{e}");
592 None
593 }
594 },
595 None => None,
596 };
597
598 let on_output_pattern: Option<regex::Regex> = on_output_hook
601 .as_ref()
602 .and_then(|h| h.regex.as_deref().and_then(get_or_compile_regex));
603 let on_output_debounce = on_output_hook
604 .as_ref()
605 .map(|h| h.debounce_duration())
606 .unwrap_or(Duration::from_millis(1000));
607 let mut on_output_last_fired: Option<std::time::Instant> = None;
609
610 let mut delay_timer =
611 ready_delay.map(|secs| Box::pin(time::sleep(Duration::from_secs(secs))));
612
613 let s = settings();
615 let ready_check_interval = s.supervisor_ready_check_interval();
616 let http_client_timeout = s.supervisor_http_client_timeout();
617 let log_flush_interval_duration = s.supervisor_log_flush_interval();
618
619 let mut http_check_interval = ready_http
621 .as_ref()
622 .map(|_| tokio::time::interval(ready_check_interval));
623 let http_client = ready_http.as_ref().map(|_| {
624 reqwest::Client::builder()
625 .timeout(http_client_timeout)
626 .build()
627 .unwrap_or_default()
628 });
629
630 let mut port_check_interval =
632 ready_port.map(|_| tokio::time::interval(ready_check_interval));
633
634 let mut cmd_check_interval = ready_cmd
636 .as_ref()
637 .map(|_| tokio::time::interval(ready_check_interval));
638
639 let mut log_flush_interval = tokio::time::interval(log_flush_interval_duration);
641
642 let (exit_tx, mut exit_rx) =
644 tokio::sync::mpsc::channel::<std::io::Result<std::process::ExitStatus>>(1);
645
646 let child_pid = child.id().unwrap_or(0);
648 tokio::spawn(async move {
649 let result = child.wait().await;
650 #[cfg(all(unix, not(target_os = "linux")))]
660 let result = match &result {
661 Err(e) if e.raw_os_error() == Some(nix::libc::ECHILD) => {
662 if let Some(code) = super::REAPED_STATUSES.lock().await.remove(&child_pid) {
663 warn!(
664 "daemon pid {child_pid} wait() got ECHILD; \
665 recovered exit code {code} from zombie reaper"
666 );
667 use std::os::unix::process::ExitStatusExt;
672 if code >= 0 {
673 Ok(std::process::ExitStatus::from_raw(code << 8))
674 } else {
675 Ok(std::process::ExitStatus::from_raw((-code) & 0x7f))
677 }
678 } else {
679 warn!(
680 "daemon pid {child_pid} wait() got ECHILD but no \
681 stashed status found; reporting as error"
682 );
683 result
684 }
685 }
686 _ => result,
687 };
688 debug!("daemon pid {child_pid} wait() completed with result: {result:?}");
689 let _ = exit_tx.send(result).await;
690 });
691
692 #[allow(unused_assignments)]
693 let mut exit_status = None;
695
696 if has_port_config
702 && ready_pattern.is_none()
703 && ready_http.is_none()
704 && ready_port.is_none()
705 && ready_cmd.is_none()
706 && delay_timer.is_none()
707 {
708 active_port_spawned = true;
709 detect_and_store_active_port(id.clone(), daemon_pid);
710 }
711
712 loop {
713 select! {
714 Some(line) = output_rx.recv() => {
715 let formatted = format_line(line.clone());
716 if let Err(e) = log_appender.write_all(formatted.as_bytes()).await {
717 error!("Failed to write to log for daemon {id}: {e}");
718 }
719 trace!("output: {id} {formatted}");
720
721 let line_clean = console::strip_ansi_codes(&line).to_string();
724
725 if !ready_notified
727 && let Some(ref pattern) = ready_pattern
728 && pattern.is_match(&line_clean)
729 {
730 info!("daemon {id} ready: output matched pattern");
731 ready_notified = true;
732 let _ = log_appender.flush().await;
733 if let Some(tx) = ready_tx.take() {
734 let _ = tx.send(Ok(()));
735 }
736 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
737 if !active_port_spawned && has_port_config {
738 active_port_spawned = true;
739 detect_and_store_active_port(id.clone(), daemon_pid);
740 }
741 }
742
743 if let Some(ref hook) = on_output_hook {
745 let matched = match (&hook.filter, &on_output_pattern) {
746 (Some(substr), _) => line_clean.contains(substr.as_str()),
747 (None, Some(re)) => re.is_match(&line_clean),
748 (None, None) => true,
749 };
750 if matched {
751 let now = std::time::Instant::now();
752 let elapsed = on_output_last_fired.map(|t| now.duration_since(t));
753 if elapsed.is_none_or(|e| e >= on_output_debounce) {
754 on_output_last_fired = Some(now);
755 hooks::fire_output_hook(id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), hook.run.clone(), line_clean.clone()).await;
756 }
757 }
758 }
759 }
760 Some(result) = exit_rx.recv() => {
761 exit_status = Some(result);
763 debug!("daemon {id} process exited, exit_status: {exit_status:?}");
764 let _ = log_appender.flush().await;
766 if !ready_notified {
767 if let Some(tx) = ready_tx.take() {
768 let is_success = exit_status.as_ref()
770 .and_then(|r| r.as_ref().ok())
771 .map(|s| s.success())
772 .unwrap_or(false);
773
774 if is_success {
775 debug!("daemon {id} exited successfully before ready check, sending success notification");
776 let _ = tx.send(Ok(()));
777 } else {
778 let exit_code = exit_status.as_ref()
779 .and_then(|r| r.as_ref().ok())
780 .and_then(|s| s.code());
781 debug!("daemon {id} exited with failure before ready check, sending failure notification with exit_code: {exit_code:?}");
782 let _ = tx.send(Err(exit_code));
783 }
784 }
785 } else {
786 debug!("daemon {id} was already marked ready, not sending notification");
787 }
788 break;
789 },
790 _ = async {
791 if let Some(ref mut interval) = http_check_interval {
792 interval.tick().await;
793 } else {
794 std::future::pending::<()>().await;
795 }
796 }, if !ready_notified && ready_http.is_some() => {
797 if let (Some(url), Some(client)) = (&ready_http, &http_client) {
798 match client.get(url).send().await {
799 Ok(response) if response.status().is_success() => {
800 info!("daemon {id} ready: HTTP check passed (status {})", response.status());
801 ready_notified = true;
802 let _ = log_appender.flush().await;
803 if let Some(tx) = ready_tx.take() {
804 let _ = tx.send(Ok(()));
805 }
806 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
807 http_check_interval = None;
808 if !active_port_spawned && has_port_config {
809 active_port_spawned = true;
810 detect_and_store_active_port(id.clone(), daemon_pid);
811 }
812 }
813 Ok(response) => {
814 trace!("daemon {id} HTTP check: status {} (not ready)", response.status());
815 }
816 Err(e) => {
817 trace!("daemon {id} HTTP check failed: {e}");
818 }
819 }
820 }
821 }
822 _ = async {
823 if let Some(ref mut interval) = port_check_interval {
824 interval.tick().await;
825 } else {
826 std::future::pending::<()>().await;
827 }
828 }, if !ready_notified && ready_port.is_some() => {
829 if let Some(port) = ready_port {
830 match tokio::net::TcpStream::connect(("127.0.0.1", port)).await {
831 Ok(_) => {
832 info!("daemon {id} ready: TCP port {port} is listening");
833 ready_notified = true;
834 let _ = log_appender.flush().await;
836 if let Some(tx) = ready_tx.take() {
837 let _ = tx.send(Ok(()));
838 }
839 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
840 port_check_interval = None;
842 if !active_port_spawned && has_port_config {
843 active_port_spawned = true;
844 detect_and_store_active_port(id.clone(), daemon_pid);
845 }
846 }
847 Err(_) => {
848 trace!("daemon {id} port check: port {port} not listening yet");
849 }
850 }
851 }
852 }
853 _ = async {
854 if let Some(ref mut interval) = cmd_check_interval {
855 interval.tick().await;
856 } else {
857 std::future::pending::<()>().await;
858 }
859 }, if !ready_notified && ready_cmd.is_some() => {
860 if let Some(ref cmd) = ready_cmd {
861 let mut command = Shell::default_for_platform().command(cmd);
863 command
864 .current_dir(&daemon_dir)
865 .stdout(std::process::Stdio::null())
866 .stderr(std::process::Stdio::null());
867 let result: std::io::Result<std::process::ExitStatus> = command.status().await;
868 match result {
869 Ok(status) if status.success() => {
870 info!("daemon {id} ready: readiness command succeeded");
871 ready_notified = true;
872 let _ = log_appender.flush().await;
873 if let Some(tx) = ready_tx.take() {
874 let _ = tx.send(Ok(()));
875 }
876 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
877 cmd_check_interval = None;
879 if !active_port_spawned && has_port_config {
880 active_port_spawned = true;
881 detect_and_store_active_port(id.clone(), daemon_pid);
882 }
883 }
884 Ok(_) => {
885 trace!("daemon {id} cmd check: command returned non-zero (not ready)");
886 }
887 Err(e) => {
888 trace!("daemon {id} cmd check failed: {e}");
889 }
890 }
891 }
892 }
893 _ = async {
894 if let Some(ref mut timer) = delay_timer {
895 timer.await;
896 } else {
897 std::future::pending::<()>().await;
898 }
899 } => {
900 if !ready_notified && ready_pattern.is_none() && ready_http.is_none() && ready_port.is_none() && ready_cmd.is_none() {
901 info!("daemon {id} ready: delay elapsed");
902 ready_notified = true;
903 let _ = log_appender.flush().await;
905 if let Some(tx) = ready_tx.take() {
906 let _ = tx.send(Ok(()));
907 }
908 fire_hook(HookType::OnReady, id.clone(), daemon_dir.clone(), hook_retry_count, hook_daemon_env.clone(), vec![]).await;
909 }
910 delay_timer = None;
912 if !active_port_spawned && has_port_config {
913 active_port_spawned = true;
914 detect_and_store_active_port(id.clone(), daemon_pid);
915 }
916 }
917 _ = log_flush_interval.tick() => {
918 if let Err(e) = log_appender.flush().await {
920 error!("Failed to flush log for daemon {id}: {e}");
921 }
922 }
923 }
924 }
925
926 if let Err(e) = log_appender.flush().await {
928 error!("Failed to final flush log for daemon {id}: {e}");
929 }
930
931 {
933 let mut state_file = SUPERVISOR.state_file.lock().await;
934 if let Some(d) = state_file.daemons.get_mut(&id) {
935 d.active_port = None;
936 }
937 if let Err(e) = state_file.write() {
938 debug!("Failed to write state after clearing active_port for {id}: {e}");
939 }
940 }
941
942 let exit_status = if let Some(status) = exit_status {
944 status
945 } else {
946 match exit_rx.recv().await {
948 Some(status) => status,
949 None => {
950 warn!("daemon {id} exit channel closed without receiving status");
951 Err(std::io::Error::other("exit channel closed"))
952 }
953 }
954 };
955 let current_daemon = SUPERVISOR.get_daemon(&id).await;
956
957 SUPERVISOR
962 .active_monitors
963 .fetch_add(1, atomic::Ordering::Release);
964 struct MonitorGuard;
965 impl Drop for MonitorGuard {
966 fn drop(&mut self) {
967 SUPERVISOR
968 .active_monitors
969 .fetch_sub(1, atomic::Ordering::Release);
970 SUPERVISOR.monitor_done.notify_waiters();
971 }
972 }
973 let _monitor_guard = MonitorGuard;
974 if current_daemon.is_none()
979 || current_daemon.as_ref().is_some_and(|d| {
980 d.pid != Some(pid) && !d.status.is_stopped() && !d.status.is_stopping()
981 })
982 {
983 return;
985 }
986 let already_stopped = current_daemon
991 .as_ref()
992 .is_some_and(|d| d.status.is_stopped());
993 let is_stopping = already_stopped
994 || current_daemon
995 .as_ref()
996 .is_some_and(|d| d.status.is_stopping());
997
998 let (exit_code, exit_reason) = match (&exit_status, is_stopping) {
1000 (Ok(status), true) => {
1001 (status.code().unwrap_or(-1), "stop")
1005 }
1006 (Ok(status), false) if status.success() => (status.code().unwrap_or(-1), "exit"),
1007 (Ok(status), false) => (status.code().unwrap_or(-1), "fail"),
1008 (Err(_), true) => {
1009 (-1, "stop")
1011 }
1012 (Err(_), false) => (-1, "fail"),
1013 };
1014
1015 if !already_stopped {
1017 if let Ok(status) = &exit_status {
1018 info!("daemon {id} exited with status {status}");
1019 }
1020 let (new_status, last_exit_success) = match exit_reason {
1021 "stop" | "exit" => (
1022 DaemonStatus::Stopped,
1023 exit_status.as_ref().map(|s| s.success()).unwrap_or(true),
1024 ),
1025 _ => (DaemonStatus::Errored(exit_code), false),
1026 };
1027 if let Err(e) = SUPERVISOR
1028 .upsert_daemon(
1029 UpsertDaemonOpts::builder(id.clone())
1030 .set(|o| {
1031 o.pid = None;
1032 o.status = new_status;
1033 o.last_exit_success = Some(last_exit_success);
1034 })
1035 .build(),
1036 )
1037 .await
1038 {
1039 error!("Failed to update daemon state for {id}: {e}");
1040 }
1041 }
1042
1043 let hook_extra_env = vec![
1045 ("PITCHFORK_EXIT_CODE".to_string(), exit_code.to_string()),
1046 ("PITCHFORK_EXIT_REASON".to_string(), exit_reason.to_string()),
1047 ];
1048
1049 let hooks_to_fire: Vec<HookType> = match exit_reason {
1051 "stop" => vec![HookType::OnStop, HookType::OnExit],
1052 "exit" => vec![HookType::OnExit],
1053 _ if hook_retry_count >= hook_retry.count() => {
1055 vec![HookType::OnFail, HookType::OnExit]
1056 }
1057 _ => vec![],
1058 };
1059
1060 for hook_type in hooks_to_fire {
1061 fire_hook(
1062 hook_type,
1063 id.clone(),
1064 daemon_dir.clone(),
1065 hook_retry_count,
1066 hook_daemon_env.clone(),
1067 hook_extra_env.clone(),
1068 )
1069 .await;
1070 }
1071 });
1072
1073 if let Some(ready_rx) = ready_rx {
1075 match ready_rx.await {
1076 Ok(Ok(())) => {
1077 info!("daemon {id} is ready");
1078 Ok(IpcResponse::DaemonReady { daemon })
1079 }
1080 Ok(Err(exit_code)) => {
1081 error!("daemon {id} failed before becoming ready");
1082 Ok(IpcResponse::DaemonFailedWithCode { exit_code })
1083 }
1084 Err(_) => {
1085 error!("readiness channel closed unexpectedly for daemon {id}");
1086 Ok(IpcResponse::DaemonStart { daemon })
1087 }
1088 }
1089 } else {
1090 Ok(IpcResponse::DaemonStart { daemon })
1091 }
1092 }
1093
1094 pub async fn stop(&self, id: &DaemonId) -> Result<IpcResponse> {
1096 let pitchfork_id = DaemonId::pitchfork();
1097 if *id == pitchfork_id {
1098 return Ok(IpcResponse::Error(
1099 "Cannot stop supervisor via stop command".into(),
1100 ));
1101 }
1102 info!("stopping daemon: {id}");
1103 if let Some(daemon) = self.get_daemon(id).await {
1104 trace!("daemon to stop: {daemon}");
1105 if let Some(pid) = daemon.pid {
1106 trace!("killing pid: {pid}");
1107 PROCS.refresh_processes();
1108 if PROCS.is_running(pid) {
1109 self.upsert_daemon(
1111 UpsertDaemonOpts::builder(id.clone())
1112 .set(|o| {
1113 o.pid = Some(pid);
1114 o.status = DaemonStatus::Stopping;
1115 })
1116 .build(),
1117 )
1118 .await?;
1119
1120 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1123 let stop_signal: i32 = stop_cfg.signal.into();
1124 if let Err(e) = PROCS
1125 .kill_process_group_async(pid, stop_signal, stop_cfg.timeout)
1126 .await
1127 {
1128 debug!("failed to kill pid {pid}: {e}");
1129 PROCS.refresh_processes();
1131 if PROCS.is_running(pid) {
1132 debug!("failed to stop pid {pid}: process still running after kill");
1134 self.upsert_daemon(
1135 UpsertDaemonOpts::builder(id.clone())
1136 .set(|o| {
1137 o.pid = Some(pid); o.status = DaemonStatus::Running;
1139 })
1140 .build(),
1141 )
1142 .await?;
1143 return Ok(IpcResponse::DaemonStopFailed {
1144 error: format!(
1145 "process {pid} still running after kill attempt: {e}"
1146 ),
1147 });
1148 }
1149 }
1150
1151 self.upsert_daemon(
1156 UpsertDaemonOpts::builder(id.clone())
1157 .set(|o| {
1158 o.pid = None;
1159 o.status = DaemonStatus::Stopped;
1160 o.last_exit_success = Some(true); })
1162 .build(),
1163 )
1164 .await?;
1165 } else {
1166 debug!("pid {pid} not running, process may have exited unexpectedly");
1167 self.upsert_daemon(
1170 UpsertDaemonOpts::builder(id.clone())
1171 .set(|o| {
1172 o.pid = None;
1173 o.status = DaemonStatus::Stopped;
1174 })
1175 .build(),
1176 )
1177 .await?;
1178 return Ok(IpcResponse::DaemonWasNotRunning);
1179 }
1180 Ok(IpcResponse::Ok)
1181 } else {
1182 debug!("daemon {id} not running");
1183 Ok(IpcResponse::DaemonNotRunning)
1184 }
1185 } else {
1186 debug!("daemon {id} not found");
1187 Ok(IpcResponse::DaemonNotFound)
1188 }
1189 }
1190}
1191
1192#[cfg(unix)]
1193fn resolve_effective_run_identity(daemon_user: Option<&str>) -> Result<RunIdentity> {
1194 let settings_user = settings().supervisor.user.trim();
1195 let daemon_user = daemon_user.map(str::trim).filter(|user| !user.is_empty());
1196 let settings_user = (!settings_user.is_empty()).then_some(settings_user);
1197 let configured = daemon_user.or(settings_user);
1198 let current_uid = nix::unistd::Uid::effective().as_raw();
1199 let current_gid = nix::unistd::Gid::effective().as_raw();
1200 resolve_run_identity(
1201 configured,
1202 current_uid,
1203 current_gid,
1204 std::env::var("SUDO_UID").ok().as_deref(),
1205 std::env::var("SUDO_GID").ok().as_deref(),
1206 )
1207}
1208
1209#[cfg(unix)]
1210fn resolve_run_identity(
1211 configured: Option<&str>,
1212 current_uid: u32,
1213 current_gid: u32,
1214 sudo_uid: Option<&str>,
1215 sudo_gid: Option<&str>,
1216) -> Result<RunIdentity> {
1217 let current_uid = nix::unistd::Uid::from_raw(current_uid);
1218 let current_gid = nix::unistd::Gid::from_raw(current_gid);
1219 if let Some(user) = configured {
1220 let identity = resolve_configured_user(user)?;
1221 ensure_can_use_identity(user, &identity, current_uid, current_gid)?;
1222 if identity.matches(current_uid, current_gid) {
1223 return Ok(RunIdentity::Inherit);
1224 }
1225 return Ok(identity);
1226 }
1227
1228 if current_uid.is_root()
1229 && let Some(identity) = resolve_sudo_identity(sudo_uid, sudo_gid)
1230 {
1231 return Ok(identity);
1232 }
1233
1234 Ok(RunIdentity::Inherit)
1235}
1236
1237#[cfg(unix)]
1238fn resolve_configured_user(user: &str) -> Result<RunIdentity> {
1239 if user.chars().all(|c| c.is_ascii_digit()) {
1240 let uid = user
1241 .parse::<u32>()
1242 .map_err(|e| miette::miette!("invalid run user UID '{}': {}", user, e))?;
1243 let user_record = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1244 .into_diagnostic()?
1245 .ok_or_else(|| miette::miette!("run user UID '{}' does not exist", user))?;
1246 return run_identity_from_user_record(user_record);
1247 }
1248
1249 let user_record = nix::unistd::User::from_name(user)
1250 .into_diagnostic()?
1251 .ok_or_else(|| miette::miette!("run user '{}' does not exist", user))?;
1252 run_identity_from_user_record(user_record)
1253}
1254
1255#[cfg(unix)]
1256fn run_identity_from_user_record(user: nix::unistd::User) -> Result<RunIdentity> {
1257 let username = CString::new(user.name)
1258 .map_err(|e| miette::miette!("run user name contains an interior nul byte: {}", e))?;
1259 Ok(RunIdentity::Switch {
1260 uid: user.uid,
1261 gid: user.gid,
1262 username: Some(username),
1263 })
1264}
1265
1266#[cfg(unix)]
1267fn run_identity_from_raw_ids(uid: u32, gid: u32, username: Option<CString>) -> RunIdentity {
1268 RunIdentity::Switch {
1269 uid: nix::unistd::Uid::from_raw(uid),
1270 gid: nix::unistd::Gid::from_raw(gid),
1271 username,
1272 }
1273}
1274
1275#[cfg(unix)]
1276fn resolve_sudo_identity(sudo_uid: Option<&str>, sudo_gid: Option<&str>) -> Option<RunIdentity> {
1277 let uid = sudo_uid?.parse::<u32>().ok()?;
1278 let gid = sudo_gid?.parse::<u32>().ok()?;
1279 let username = nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
1280 .ok()
1281 .flatten()
1282 .and_then(|u| CString::new(u.name).ok());
1283 Some(run_identity_from_raw_ids(uid, gid, username))
1284}
1285
1286#[cfg(unix)]
1287fn ensure_can_use_identity(
1288 configured_user: &str,
1289 identity: &RunIdentity,
1290 current_uid: nix::unistd::Uid,
1291 current_gid: nix::unistd::Gid,
1292) -> Result<()> {
1293 let RunIdentity::Switch { uid, gid, .. } = identity else {
1294 return Ok(());
1295 };
1296 if *uid == current_uid && *gid == current_gid {
1297 return Ok(());
1298 }
1299 if current_uid.is_root() {
1300 return Ok(());
1301 }
1302 Err(miette::miette!(
1303 "daemon is configured to run as '{}', but the supervisor is running as uid={} gid={}. Restart the supervisor with sudo to switch to uid={} gid={}, or choose a user matching the supervisor.",
1304 configured_user,
1305 current_uid.as_raw(),
1306 current_gid.as_raw(),
1307 uid.as_raw(),
1308 gid.as_raw()
1309 ))
1310}
1311
1312#[cfg(unix)]
1313fn apply_run_identity(identity: &RunIdentity) -> std::io::Result<()> {
1314 let RunIdentity::Switch { uid, gid, username } = identity else {
1315 return Ok(());
1316 };
1317 if let Some(username) = username {
1318 initgroups_for_user(username, *gid)?;
1319 } else {
1320 setgroups_to_primary(*gid)?;
1321 }
1322 nix::unistd::setgid(*gid).map_err(nix_to_io_error)?;
1323 nix::unistd::setuid(*uid).map_err(nix_to_io_error)?;
1324 Ok(())
1325}
1326
1327#[cfg(unix)]
1328impl RunIdentity {
1329 fn matches(&self, uid: nix::unistd::Uid, gid: nix::unistd::Gid) -> bool {
1330 matches!(self, RunIdentity::Switch { uid: u, gid: g, .. } if *u == uid && *g == gid)
1331 }
1332}
1333
1334#[cfg(unix)]
1335fn setgroups_to_primary(gid: nix::unistd::Gid) -> std::io::Result<()> {
1336 let groups = [gid.as_raw() as libc::gid_t];
1337 #[cfg(any(target_os = "linux", target_os = "android"))]
1338 let group_count = groups.len();
1339 #[cfg(not(any(target_os = "linux", target_os = "android")))]
1340 let group_count = groups.len() as libc::c_int;
1341 let rc = unsafe { libc::setgroups(group_count, groups.as_ptr()) };
1342 if rc == -1 {
1343 Err(std::io::Error::last_os_error())
1344 } else {
1345 Ok(())
1346 }
1347}
1348
1349#[cfg(unix)]
1350fn initgroups_for_user(username: &CString, gid: nix::unistd::Gid) -> std::io::Result<()> {
1351 let gid = gid.as_raw();
1352 #[cfg(any(
1353 target_os = "macos",
1354 target_os = "ios",
1355 target_os = "tvos",
1356 target_os = "watchos"
1357 ))]
1358 let base_gid = i32::try_from(gid)
1359 .map_err(|_| std::io::Error::other(format!("gid {gid} is out of range")))?;
1360
1361 #[cfg(not(any(
1362 target_os = "macos",
1363 target_os = "ios",
1364 target_os = "tvos",
1365 target_os = "watchos"
1366 )))]
1367 let base_gid = gid as libc::gid_t;
1368
1369 let rc = unsafe { libc::initgroups(username.as_ptr(), base_gid) };
1372 if rc == -1 {
1373 Err(std::io::Error::last_os_error())
1374 } else {
1375 Ok(())
1376 }
1377}
1378
1379#[cfg(unix)]
1380fn nix_to_io_error(err: nix::errno::Errno) -> std::io::Error {
1381 std::io::Error::from_raw_os_error(err as i32)
1382}
1383
1384async fn check_ports_available(
1391 expected_ports: &[u16],
1392 auto_bump: bool,
1393 max_attempts: u32,
1394) -> Result<Vec<u16>> {
1395 if expected_ports.is_empty() {
1396 return Ok(Vec::new());
1397 }
1398
1399 for bump_offset in 0..=max_attempts {
1400 let candidate_ports: Vec<u16> = expected_ports
1402 .iter()
1403 .map(|&p| p.wrapping_add(bump_offset as u16))
1404 .collect();
1405
1406 let mut all_available = true;
1408 let mut conflicting_port = None;
1409
1410 for &port in &candidate_ports {
1411 if port == 0 {
1414 continue;
1415 }
1416
1417 if is_port_in_use(port).await {
1431 all_available = false;
1432 conflicting_port = Some(port);
1433 break;
1434 }
1435 }
1436
1437 if all_available {
1438 if candidate_ports.contains(&0) && !expected_ports.contains(&0) {
1442 return Err(PortError::NoAvailablePort {
1443 start_port: expected_ports[0],
1444 attempts: bump_offset + 1,
1445 }
1446 .into());
1447 }
1448 if bump_offset > 0 {
1449 info!("ports {expected_ports:?} bumped by {bump_offset} to {candidate_ports:?}");
1450 }
1451 return Ok(candidate_ports);
1452 }
1453
1454 if bump_offset == 0 && !auto_bump {
1456 if let Some(port) = conflicting_port {
1457 let (pid, process) = identify_port_owner(port).await;
1458 return Err(PortError::InUse { port, process, pid }.into());
1459 }
1460 }
1461 }
1462
1463 Err(PortError::NoAvailablePort {
1465 start_port: expected_ports[0],
1466 attempts: max_attempts + 1,
1467 }
1468 .into())
1469}
1470
1471async fn is_port_in_use(port: u16) -> bool {
1477 tokio::task::spawn_blocking(move || {
1478 for &addr in &["0.0.0.0", "127.0.0.1", "::1"] {
1479 match std::net::TcpListener::bind((addr, port)) {
1480 Ok(listener) => drop(listener),
1481 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => return true,
1482 Err(_) => continue,
1483 }
1484 }
1485 false
1486 })
1487 .await
1488 .unwrap_or(false)
1489}
1490
1491async fn identify_port_owner(port: u16) -> (u32, String) {
1496 tokio::task::spawn_blocking(move || {
1497 listeners::get_all()
1498 .ok()
1499 .and_then(|list| {
1500 list.into_iter()
1501 .find(|l| l.socket.port() == port)
1502 .map(|l| (l.process.pid, l.process.name))
1503 })
1504 .unwrap_or((0, "unknown".to_string()))
1505 })
1506 .await
1507 .unwrap_or((0, "unknown".to_string()))
1508}
1509
1510async fn detect_port_conflict(port: u16) -> Option<(u32, String)> {
1515 if !is_port_in_use(port).await {
1516 return None;
1517 }
1518 Some(identify_port_owner(port).await)
1519}
1520
1521fn detect_and_store_active_port(id: DaemonId, pid: u32) {
1536 tokio::spawn(async move {
1537 for delay_ms in [500u64, 1000, 2000, 4000] {
1541 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1542
1543 let expected_port: Option<u16> = {
1546 let state_file = SUPERVISOR.state_file.lock().await;
1547 match state_file.daemons.get(&id) {
1548 Some(d) if d.pid.is_none() => {
1549 debug!("daemon {id}: aborting active_port detection — process exited");
1550 return;
1551 }
1552 Some(d) => d
1553 .port
1554 .as_ref()
1555 .and_then(|p| p.expect.first().copied())
1556 .filter(|&p| p > 0),
1557 None => None,
1558 }
1559 };
1560
1561 let active_port = tokio::task::spawn_blocking(move || {
1562 let listeners = listeners::get_all().ok()?;
1563 let process_ports: Vec<u16> = listeners
1564 .into_iter()
1565 .filter(|listener| listener.process.pid == pid)
1566 .map(|listener| listener.socket.port())
1567 .filter(|&port| port > 0)
1568 .collect();
1569
1570 if process_ports.is_empty() {
1571 return None;
1572 }
1573
1574 if let Some(ep) = expected_port {
1577 if process_ports.contains(&ep) {
1578 return Some(ep);
1579 }
1580 }
1581
1582 process_ports.into_iter().next()
1588 })
1589 .await
1590 .ok()
1591 .flatten();
1592
1593 if let Some(port) = active_port {
1594 debug!("daemon {id} active_port detected: {port}");
1595 let mut state_file = SUPERVISOR.state_file.lock().await;
1596 if let Some(d) = state_file.daemons.get_mut(&id) {
1597 if d.pid == Some(pid) {
1601 d.active_port = Some(port);
1602 } else {
1603 debug!(
1604 "daemon {id}: skipping active_port write — PID mismatch \
1605 (expected {pid}, current {:?})",
1606 d.pid
1607 );
1608 return;
1609 }
1610 }
1611 if let Err(e) = state_file.write() {
1612 debug!("Failed to write state after detecting active_port for {id}: {e}");
1613 }
1614 return;
1615 }
1616
1617 debug!("daemon {id}: no active port detected for pid {pid} (will retry)");
1618 }
1619
1620 debug!("daemon {id}: active port detection exhausted all retries for pid {pid}");
1621 });
1622}
1623
1624fn is_daemon_slug_target(id: &DaemonId) -> bool {
1632 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
1636 slugs.iter().any(|(slug, entry)| {
1637 let daemon_name = entry.daemon.as_deref().unwrap_or(slug);
1638 id.name() == daemon_name
1639 })
1640}
1641
1642#[cfg(all(test, unix))]
1643mod tests {
1644 use super::*;
1645
1646 #[test]
1647 fn test_resolve_run_identity_empty_without_sudo() {
1648 let identity = resolve_run_identity(None, 501, 20, None, None).unwrap();
1649 assert_eq!(identity, RunIdentity::Inherit);
1650 }
1651
1652 #[test]
1653 fn test_resolve_run_identity_sudo_fallback() {
1654 let identity = resolve_run_identity(None, 0, 0, Some("501"), Some("20")).unwrap();
1655 let RunIdentity::Switch { uid, gid, .. } = identity else {
1656 panic!("expected identity switch");
1657 };
1658 assert_eq!(uid.as_raw(), 501);
1659 assert_eq!(gid.as_raw(), 20);
1660 }
1661
1662 #[test]
1663 fn test_resolve_run_identity_ignores_stale_sudo_when_not_root() {
1664 let identity = resolve_run_identity(None, 501, 20, Some("0"), Some("0")).unwrap();
1665 assert_eq!(identity, RunIdentity::Inherit);
1666 }
1667
1668 #[test]
1669 fn test_resolve_configured_user_root_name() {
1670 let identity = resolve_configured_user("root").unwrap();
1671 let RunIdentity::Switch { uid, username, .. } = identity else {
1672 panic!("expected identity switch");
1673 };
1674 assert_eq!(uid.as_raw(), 0);
1675 assert_eq!(
1676 username.as_deref().and_then(|s| s.to_str().ok()),
1677 Some("root")
1678 );
1679 }
1680
1681 #[test]
1682 fn test_resolve_configured_user_root_uid() {
1683 let identity = resolve_configured_user("0").unwrap();
1684 let RunIdentity::Switch { uid, username, .. } = identity else {
1685 panic!("expected identity switch");
1686 };
1687 assert_eq!(uid.as_raw(), 0);
1688 assert_eq!(
1689 username.as_deref().and_then(|s| s.to_str().ok()),
1690 Some("root")
1691 );
1692 }
1693
1694 #[test]
1695 fn test_resolve_configured_user_missing_user_fails() {
1696 let err = resolve_configured_user("pitchfork-user-that-should-not-exist")
1697 .unwrap_err()
1698 .to_string();
1699 assert!(err.contains("does not exist"));
1700 }
1701
1702 #[test]
1703 fn test_resolve_run_identity_requires_root_for_user_switch() {
1704 let err = resolve_run_identity(Some("root"), 501, 20, None, None)
1705 .unwrap_err()
1706 .to_string();
1707 assert!(err.contains("Restart the supervisor with sudo"));
1708 }
1709
1710 #[test]
1711 fn test_resolve_run_identity_same_user_is_noop() {
1712 let identity = resolve_run_identity(Some("root"), 0, 0, Some("501"), Some("20")).unwrap();
1713 assert_eq!(identity, RunIdentity::Inherit);
1714 }
1715}