pitchfork_cli/supervisor/
mod.rs1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59 Lazy::new(|| Mutex::new(HashMap::new()));
60
61pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65 pub(crate) state_file: Mutex<StateFile>,
66 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67 pub(crate) last_refreshed_at: Mutex<time::Instant>,
68 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74 pub(crate) active_monitors: AtomicU32,
78 pub(crate) monitor_done: Notify,
81 pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84 pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86 pub(crate) mdns_publisher:
89 Mutex<Option<std::sync::Arc<tokio::sync::Mutex<crate::proxy::mdns::MdnsPublisher>>>>,
90 pub(crate) lan_monitor_task: Mutex<Option<JoinHandle<()>>>,
92 pub(crate) flush_cancel: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
94}
95
96pub(crate) fn interval_duration() -> Duration {
97 settings().general_interval()
98}
99
100pub static SUPERVISOR: Lazy<Supervisor> =
101 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
102
103pub fn start_if_not_running() -> Result<()> {
104 let sf = StateFile::get();
105 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
106 && let Some(pid) = d.pid
107 {
108 PROCS.refresh_pids(&[pid]);
109 if PROCS.is_running(pid) {
110 return Ok(());
111 }
112 }
113 start_in_background()
114}
115
116pub fn start_in_background() -> Result<()> {
117 debug!("starting supervisor in background");
118 let log_file = &*env::PITCHFORK_LOG_FILE;
122 if let Some(parent) = log_file.parent() {
123 let _ = fs::create_dir_all(parent);
124 }
125 let stderr_file = fs::OpenOptions::new()
126 .create(true)
127 .append(true)
128 .open(log_file)
129 .into_diagnostic()?;
130 #[cfg(unix)]
131 fix_state_dir_permissions();
132 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
133 .stdout_null()
134 .stderr_file(stderr_file)
135 .start()
136 .into_diagnostic()?;
137 Ok(())
138}
139
140impl Supervisor {
141 pub fn new() -> Result<Self> {
142 Ok(Self {
143 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
144 last_refreshed_at: Mutex::new(time::Instant::now()),
145 pending_notifications: Mutex::new(vec![]),
146 pending_autostops: Mutex::new(HashMap::new()),
147 ipc_shutdown: Mutex::new(None),
148 hook_tasks: Mutex::new(Vec::new()),
149 active_monitors: AtomicU32::new(0),
150 monitor_done: Notify::new(),
151 proxy_cancel: Mutex::new(None),
152 proxy_task: Mutex::new(None),
153 mdns_publisher: Mutex::new(None),
154 lan_monitor_task: Mutex::new(None),
155 flush_cancel: std::sync::Mutex::new(None),
156 })
157 }
158
159 pub async fn start(
160 &self,
161 is_boot: bool,
162 container: bool,
163 web_port: Option<u16>,
164 web_path: Option<String>,
165 ) -> Result<()> {
166 #[cfg(unix)]
171 fix_state_dir_permissions();
172
173 let pid = std::process::id();
174 PROCS.refresh_pids(&[pid]);
176 let container_mode = container || settings().supervisor.container;
178 if container_mode {
179 info!("Starting supervisor in container/PID1 mode with pid {pid}");
180 } else {
181 info!("Starting supervisor with pid {pid}");
182 }
183
184 self.upsert_daemon(
185 UpsertDaemonOpts::builder(DaemonId::pitchfork())
186 .set(|o| {
187 o.pid = Some(pid);
188 o.status = DaemonStatus::Running;
189 })
190 .build(),
191 )
192 .await?;
193 #[cfg(unix)]
194 fix_state_dir_permissions();
195
196 if is_boot {
198 info!("Boot start mode enabled, starting boot_start daemons");
199 self.start_boot_daemons().await?;
200 }
201
202 self.interval_watch()?;
203 self.cron_watch()?;
204 self.signals()?;
205 self.daemon_file_watch()?;
206
207 #[cfg(unix)]
209 if container_mode {
210 self.reap_zombies()?;
211 }
212
213 let s = settings();
215 let effective_port = web_port.or_else(|| {
216 if s.web.auto_start {
217 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
218 Some(p) => Some(p),
219 None => {
220 error!(
221 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
222 s.web.bind_port
223 );
224 None
225 }
226 }
227 } else {
228 None
229 }
230 });
231 let effective_path = web_path.or_else(|| {
233 let bp = s.web.base_path.clone();
234 if bp.is_empty() { None } else { Some(bp) }
235 });
236 if let Some(port) = effective_port {
237 tokio::spawn(async move {
238 if let Err(e) = crate::web::serve(port, effective_path).await {
239 error!("Web server error: {e}");
240 }
241 });
242 }
243
244 if s.proxy.enable {
246 #[cfg(feature = "proxy-tls")]
250 if s.proxy.https {
251 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
252 let ca_cert_path = proxy_dir.join("ca.pem");
253 let ca_key_path = proxy_dir.join("ca-key.pem");
254 if !ca_cert_path.exists() || !ca_key_path.exists() {
255 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
256 Ok(()) => {
257 info!(
258 "Generated local CA certificate at {}",
259 ca_cert_path.display()
260 );
261 }
262 Err(e) => {
263 error!("Failed to generate CA certificate: {e}");
264 }
265 }
266 }
267
268 if s.proxy.auto_trust && ca_cert_path.exists() {
272 use crate::proxy::trust::{AutoTrustResult, auto_trust};
273 match auto_trust(&ca_cert_path) {
274 AutoTrustResult::AlreadyTrusted => {}
275 AutoTrustResult::Trusted => {
276 info!("CA certificate auto-trusted in system store");
277 }
278 AutoTrustResult::NotTrusted { reason } => {
279 warn!("Auto-trust skipped: {reason}");
280 warn!("Run `pitchfork proxy trust` to install manually");
281 }
282 }
283 }
284 }
285 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
289 let proxy_cancel = tokio_util::sync::CancellationToken::new();
290 let proxy_cancel_clone = proxy_cancel.clone();
291 *self.proxy_cancel.lock().await = Some(proxy_cancel);
292 let proxy_task = tokio::spawn(async move {
293 if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
294 error!("Proxy server error: {e}");
295 }
296 });
297 *self.proxy_task.lock().await = Some(proxy_task);
298 match bind_rx.await {
299 Ok(Ok(())) => {
300 self.start_mdns().await;
302 }
303 Ok(Err(msg)) => {
304 error!("{msg}");
305 self.add_notification(log::LevelFilter::Error, msg).await;
306 }
307 Err(_) => {
308 }
312 }
313 }
314
315 let (ipc, ipc_handle) = IpcServer::new()?;
316 *self.ipc_shutdown.lock().await = Some(ipc_handle);
317 self.start_state_flush_task();
318 self.conn_watch(ipc).await
319 }
320
321 async fn start_mdns(&self) {
323 let s = crate::settings::settings();
324 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
325 if !s.proxy.enable || !lan_enabled {
326 return;
327 }
328
329 let lan_ip = if !s.proxy.lan_ip.is_empty() {
330 match s.proxy.lan_ip.parse::<std::net::Ipv4Addr>() {
331 Ok(ip) => Some(ip),
332 Err(e) => {
333 error!(
334 "proxy.lan_ip {:?} is not a valid IPv4 address: {e}",
335 s.proxy.lan_ip
336 );
337 return;
338 }
339 }
340 } else {
341 match crate::proxy::lan_ip::detect_lan_ip().await {
342 Some(ip) => Some(ip),
343 None => {
344 error!(
345 "LAN mode is enabled but no LAN IP address could be detected. \
346 Set proxy.lan_ip to a specific address, or ensure you are connected to a network."
347 );
348 return;
349 }
350 }
351 };
352
353 let Some(lan_ip) = lan_ip else { return };
354 let port = u16::try_from(s.proxy.port).unwrap_or(443);
355
356 let Some(mut publisher) = crate::proxy::mdns::MdnsPublisher::new(lan_ip) else {
357 error!("Failed to start mDNS publisher. Is Avahi (Linux) or Bonjour (macOS) running?");
358 return;
359 };
360
361 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
363 for slug in slugs.keys() {
364 let hostname = format!("{slug}.local");
365 publisher.publish(&hostname, port);
366 }
367
368 log::info!(
369 "LAN mode: mDNS publishing on {lan_ip}, {} slug(s) registered",
370 slugs.len()
371 );
372
373 let publisher = std::sync::Arc::new(tokio::sync::Mutex::new(publisher));
374
375 let ip_pinned = !s.proxy.lan_ip.is_empty();
377 if !ip_pinned {
378 let monitor_cancel = self.proxy_cancel.lock().await.clone();
379 let publisher_clone = publisher.clone();
380 let task = tokio::spawn(async move {
381 let mut last_ip = lan_ip;
382 let interval = std::time::Duration::from_secs(5);
383 let mut ticker = tokio::time::interval(interval);
384 ticker.tick().await; loop {
386 ticker.tick().await;
387 if let Some(cancel) = monitor_cancel.as_ref() {
388 if cancel.is_cancelled() {
389 break;
390 }
391 }
392 if let Some(new_ip) =
393 crate::proxy::lan_ip::detect_lan_ip_if_changed(last_ip).await
394 {
395 log::info!("LAN IP changed: {last_ip} → {new_ip}");
396 last_ip = new_ip;
397 let mut pub_guard = publisher_clone.lock().await;
398 pub_guard.republish_all(new_ip, port);
399 }
400 }
401 });
402 *self.lan_monitor_task.lock().await = Some(task);
403 }
404
405 *self.mdns_publisher.lock().await = Some(publisher);
406 }
407
408 async fn sync_mdns(&self) {
413 let publisher = {
416 let guard = self.mdns_publisher.lock().await;
417 match guard.as_ref() {
418 Some(p) => p.clone(),
419 None => {
420 debug!("sync_mdns: mDNS publisher not active, skipping");
421 return;
422 }
423 }
424 };
425
426 let s = crate::settings::settings();
427 let port = u16::try_from(s.proxy.port).unwrap_or(443);
428
429 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
430 let mut pub_guard = publisher.lock().await;
431
432 let current_keys: Vec<&String> = slugs.keys().collect();
434 let registered: Vec<String> = pub_guard.registered_hostnames();
435 for hostname in ®istered {
436 let slug = hostname.strip_suffix(".local").unwrap_or(hostname);
438 if !current_keys.iter().any(|k| k.as_str() == slug) {
439 log::info!("mDNS: unpublishing removed slug {slug}");
440 pub_guard.unpublish(hostname);
441 }
442 }
443
444 for slug in slugs.keys() {
446 let hostname = format!("{slug}.local");
447 if !pub_guard.is_published(&hostname) {
448 log::info!("mDNS: publishing new slug {slug}");
449 pub_guard.publish(&hostname, port);
450 }
451 }
452 }
453
454 fn start_state_flush_task(&self) {
458 let cancel = tokio_util::sync::CancellationToken::new();
459 *self.flush_cancel.lock().unwrap() = Some(cancel.clone());
460 tokio::spawn(async move {
461 let mut interval = time::interval(Duration::from_secs(1));
462 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
463 loop {
464 tokio::select! {
465 _ = interval.tick() => {}
466 _ = cancel.cancelled() => {
467 debug!("state flush task received shutdown signal");
468 break;
469 }
470 }
471 let state = SUPERVISOR.state_file.lock().await;
472 if state.is_dirty() {
473 if let Err(e) = state.write() {
474 warn!("failed to flush state file: {e}");
475 }
476 }
477 }
478 debug!("state flush task exiting");
479 });
480 }
481
482 pub(crate) async fn flush_state(&self) {
483 let state = self.state_file.lock().await;
484 if state.is_dirty() {
485 if let Err(e) = state.write() {
486 warn!("failed to flush state file: {e}");
487 }
488 }
489 }
490
491 pub(crate) async fn refresh(&self) -> Result<()> {
492 trace!("refreshing");
493
494 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
497 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
498
499 if pids_to_check.is_empty() {
500 trace!("no shell PIDs to check, skipping process refresh");
502 } else {
503 PROCS.refresh_pids(&pids_to_check);
504 }
505
506 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
507 *last_refreshed_at = time::Instant::now();
508
509 for (dir, pids) in dirs_with_pids {
510 let to_remove = pids
511 .iter()
512 .filter(|pid| !PROCS.is_running(**pid))
513 .collect::<Vec<_>>();
514 for pid in &to_remove {
515 self.remove_shell_pid(**pid).await?
516 }
517 if to_remove.len() == pids.len() {
518 self.leave_dir(&dir).await?;
519 }
520 }
521
522 self.check_retry().await?;
523 self.process_pending_autostops().await?;
524
525 Ok(())
526 }
527
528 #[cfg(unix)]
553 fn reap_zombies(&self) -> Result<()> {
554 let mut stream = signal::unix::signal(SignalKind::child())
555 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
556 tokio::spawn(async move {
557 loop {
558 stream.recv().await;
559 let managed_pids: HashSet<u32> = SUPERVISOR
561 .state_file
562 .lock()
563 .await
564 .daemons
565 .values()
566 .filter_map(|d| d.pid)
567 .collect();
568 Self::reap_unmanaged_zombies(&managed_pids).await;
570 }
571 });
572 info!("container mode: SIGCHLD zombie reaper installed");
573 Ok(())
574 }
575
576 #[cfg(target_os = "linux")]
582 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
583 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
584 use nix::unistd::Pid;
585
586 loop {
587 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
589 match waitid(Id::All, peek_flags) {
590 Ok(WaitStatus::StillAlive) => break,
591 Ok(status) => {
592 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
593 break;
594 };
595 if managed_pids.contains(&pid_raw) {
596 trace!(
600 "zombie reaper: skipping managed daemon pid {pid_raw}, \
601 leaving for Tokio to reap"
602 );
603 break;
604 }
605 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
607 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
608 Err(nix::errno::Errno::ECHILD) => break,
609 Err(e) => {
610 trace!("waitpid error reaping pid {pid_raw}: {e}");
611 break;
612 }
613 }
614 }
615 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
617 trace!("waitid error in zombie reaper: {e}");
618 break;
619 }
620 }
621 }
622 }
623
624 #[cfg(all(unix, not(target_os = "linux")))]
630 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
631 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
632
633 loop {
634 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
635 Ok(WaitStatus::StillAlive) => break,
636 Ok(status) => {
637 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
638 continue;
639 };
640 if managed_pids.contains(&pid) {
641 let exit_code = match status {
643 WaitStatus::Exited(_, code) => code,
644 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
645 _ => -1,
646 };
647 warn!(
648 "zombie reaper reaped managed daemon pid {pid} \
649 (exit_code={exit_code}); stashing status for recovery"
650 );
651 REAPED_STATUSES.lock().await.insert(pid, exit_code);
652 } else {
653 trace!("reaped orphaned zombie child: {status:?}");
654 }
655 }
656 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
658 trace!("waitpid error in zombie reaper: {e}");
659 break;
660 }
661 }
662 }
663 }
664
665 #[cfg(unix)]
666 fn signals(&self) -> Result<()> {
667 let signals = [
668 SignalKind::terminate(),
669 SignalKind::alarm(),
670 SignalKind::interrupt(),
671 SignalKind::quit(),
672 SignalKind::hangup(),
673 SignalKind::user_defined1(),
674 SignalKind::user_defined2(),
675 ];
676 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
677 for signal in signals {
678 let stream = match signal::unix::signal(signal) {
679 Ok(s) => s,
680 Err(e) => {
681 warn!("Failed to register signal handler for {signal:?}: {e}");
682 continue;
683 }
684 };
685 tokio::spawn(async move {
686 let mut stream = stream;
687 loop {
688 stream.recv().await;
689 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
690 exit(1);
691 } else {
692 SUPERVISOR.handle_signal().await;
693 }
694 }
695 });
696 }
697 Ok(())
698 }
699
700 #[cfg(windows)]
701 fn signals(&self) -> Result<()> {
702 tokio::spawn(async move {
703 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
704 loop {
705 if let Err(e) = signal::ctrl_c().await {
706 error!("Failed to wait for ctrl-c: {}", e);
707 return;
708 }
709 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
710 exit(1);
711 } else {
712 SUPERVISOR.handle_signal().await;
713 }
714 }
715 });
716 Ok(())
717 }
718
719 async fn handle_signal(&self) {
720 info!("received signal, stopping");
721 self.close().await;
722 exit(0)
723 }
724
725 pub(crate) async fn close(&self) {
726 if let Some(cancel) = self.proxy_cancel.lock().await.take() {
730 cancel.cancel();
731 }
732
733 if let Some(monitor_task) = self.lan_monitor_task.lock().await.take() {
735 monitor_task.abort();
736 }
737
738 if let Some(publisher) = self.mdns_publisher.lock().await.take() {
740 publisher.lock().await.shutdown();
741 }
742
743 if let Some(proxy_task) = self.proxy_task.lock().await.take() {
744 let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
745 }
746
747 let s = settings();
749 if s.proxy.enable && s.proxy.sync_hosts {
750 crate::proxy::hosts::clean_hosts_file();
751 }
752
753 let pitchfork_id = DaemonId::pitchfork();
754 let active = self.active_daemons().await;
755 let active_ids: Vec<DaemonId> = active
756 .iter()
757 .filter(|d| d.id != pitchfork_id)
758 .map(|d| d.id.clone())
759 .collect();
760
761 let stop_levels = compute_reverse_stop_order(&active_ids);
766 for level in &stop_levels {
767 let mut tasks = Vec::new();
768 for id in level {
769 let id = id.clone();
770 tasks.push(tokio::spawn(async move {
771 if let Err(err) = SUPERVISOR.stop(&id).await {
772 error!("failed to stop daemon {id}: {err}");
773 }
774 }));
775 }
776 for task in tasks {
777 let _ = task.await;
778 }
779 }
780 let _ = self.remove_daemon(&pitchfork_id).await;
781
782 if let Some(cancel) = self.flush_cancel.lock().unwrap().take() {
785 cancel.cancel();
786 }
787
788 {
791 let state = self.state_file.lock().await;
792 if state.is_dirty() {
793 if let Err(e) = state.write() {
794 warn!("failed to flush state file during shutdown: {e}");
795 }
796 }
797 }
798
799 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
801 handle.shutdown();
802 }
803
804 let drain_timeout = time::sleep(Duration::from_secs(5));
810 tokio::pin!(drain_timeout);
811 loop {
812 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
813 break;
814 }
815 tokio::select! {
816 _ = self.monitor_done.notified() => {}
817 _ = &mut drain_timeout => {
818 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
819 break;
820 }
821 }
822 }
823 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
824 let hook_timeout = Duration::from_secs(30);
825 for handle in handles {
826 match time::timeout(hook_timeout, handle).await {
827 Ok(_) => {} Err(_) => {
829 warn!(
830 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
831 );
832 }
833 }
834 }
835
836 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
837 }
838
839 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
840 self.pending_notifications
841 .lock()
842 .await
843 .push((level, message));
844 }
845}
846
847#[cfg(unix)]
865fn fix_state_dir_permissions() {
866 let state_dir = &*env::PITCHFORK_STATE_DIR;
867 if let Some((uid, gid)) = state_owner_ids() {
868 if !state_dir.exists()
869 && let Err(err) = fs::create_dir_all(state_dir)
870 {
871 warn!(
872 "failed to create state directory for ownership fix at {}: {err}",
873 state_dir.display()
874 );
875 return;
876 }
877
878 chown_recursive(state_dir, uid, gid, true);
880 debug!(
881 "chowned state directory to uid={uid} gid={gid} at {}",
882 state_dir.display()
883 );
884 } else {
885 if !state_dir.exists() {
886 return;
887 }
888
889 chmod_safe_subtrees(state_dir);
892 debug!(
893 "relaxed permissions on safe subtrees at {}",
894 state_dir.display()
895 );
896 }
897}
898
899#[cfg(unix)]
900pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
901 if !nix::unistd::Uid::effective().is_root() {
902 return None;
903 }
904
905 let user = settings().supervisor.user.trim();
906 if !user.is_empty() {
907 return resolve_supervisor_user_ids(user).or_else(|| {
908 warn!(
909 "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
910 );
911 parse_sudo_ids()
912 });
913 }
914
915 parse_sudo_ids()
916}
917
918#[cfg(unix)]
919fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
920 let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
921 let uid = user.parse::<u32>().ok()?;
922 nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
923 .ok()
924 .flatten()
925 } else {
926 nix::unistd::User::from_name(user).ok().flatten()
927 }?;
928
929 Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
930}
931
932#[cfg(unix)]
938fn parse_sudo_ids() -> Option<(u32, u32)> {
939 if !nix::unistd::Uid::effective().is_root() {
940 return None;
941 }
942 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
943 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
944 Some((uid, gid))
945}
946
947#[cfg(unix)]
950fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
951 let _ = chown_path(dir, uid, gid);
953
954 let entries = match std::fs::read_dir(dir) {
955 Ok(e) => e,
956 Err(_) => return,
957 };
958 for entry in entries.flatten() {
959 let path = entry.path();
960 if path.is_dir() {
961 if skip_proxy {
963 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
964 if name == "proxy" {
965 continue;
966 }
967 }
968 }
969 chown_recursive(&path, uid, gid, false);
970 } else {
971 let _ = chown_path(&path, uid, gid);
972 }
973 }
974}
975
976#[cfg(unix)]
978fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
979 use std::ffi::CString;
980 use std::os::unix::ffi::OsStrExt;
981 let c_path = CString::new(path.as_os_str().as_bytes())
982 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
983 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
984 if ret == 0 {
985 Ok(())
986 } else {
987 Err(std::io::Error::last_os_error())
988 }
989}
990
991#[cfg(unix)]
994fn chmod_safe_subtrees(state_dir: &std::path::Path) {
995 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
997
998 let state_file = state_dir.join("state.toml");
1000 if state_file.exists() {
1001 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
1002 }
1003
1004 for subdir_name in &["sock", "logs"] {
1006 let subdir = state_dir.join(subdir_name);
1007 if subdir.is_dir() {
1008 chmod_recursive(&subdir);
1009 }
1010 }
1011}
1012
1013#[cfg(unix)]
1015fn chmod_recursive(dir: &std::path::Path) {
1016 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
1017 let entries = match fs::read_dir(dir) {
1018 Ok(e) => e,
1019 Err(_) => return,
1020 };
1021 for entry in entries.flatten() {
1022 let path = entry.path();
1023 if path.is_dir() {
1024 chmod_recursive(&path);
1025 } else {
1026 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
1027 }
1028 }
1029}