1mod autostop;
12mod hooks;
13mod ipc_handlers;
14mod lifecycle;
15#[cfg(unix)]
16mod pty;
17mod retry;
18mod state;
19mod watchers;
20
21use crate::daemon_id::DaemonId;
22use crate::daemon_status::DaemonStatus;
23use crate::deps::compute_reverse_stop_order;
24use crate::ipc::server::{IpcServer, IpcServerHandle};
25
26use crate::procs::PROCS;
27use crate::settings::settings;
28use crate::state_file::StateFile;
29use crate::{Result, env};
30use duct::cmd;
31use miette::IntoDiagnostic;
32use once_cell::sync::Lazy;
33use std::collections::HashMap;
34#[cfg(unix)]
35use std::collections::HashSet;
36use std::fs;
37#[cfg(unix)]
38use std::os::unix::fs::PermissionsExt;
39use std::process::exit;
40use std::sync::atomic;
41use std::sync::atomic::{AtomicBool, AtomicU32};
42use std::time::Duration;
43#[cfg(unix)]
44use tokio::signal::unix::SignalKind;
45use tokio::sync::{Mutex, Notify};
46use tokio::task::JoinHandle;
47use tokio::{signal, time};
48
49#[cfg(all(unix, not(target_os = "linux")))]
58pub(crate) static REAPED_STATUSES: Lazy<Mutex<HashMap<u32, i32>>> =
59 Lazy::new(|| Mutex::new(HashMap::new()));
60
61pub(crate) use state::UpsertDaemonOpts;
63
64pub struct Supervisor {
65 pub(crate) state_file: Mutex<StateFile>,
66 pub(crate) pending_notifications: Mutex<Vec<(log::LevelFilter, String)>>,
67 pub(crate) last_refreshed_at: Mutex<time::Instant>,
68 pub(crate) pending_autostops: Mutex<HashMap<DaemonId, time::Instant>>,
70 pub(crate) ipc_shutdown: Mutex<Option<IpcServerHandle>>,
72 pub(crate) hook_tasks: Mutex<Vec<JoinHandle<()>>>,
74 pub(crate) active_monitors: AtomicU32,
78 pub(crate) monitor_done: Notify,
81 pub(crate) proxy_cancel: Mutex<Option<tokio_util::sync::CancellationToken>>,
84 pub(crate) proxy_task: Mutex<Option<JoinHandle<()>>>,
86 pub(crate) mdns_publisher:
89 Mutex<Option<std::sync::Arc<tokio::sync::Mutex<crate::proxy::mdns::MdnsPublisher>>>>,
90 pub(crate) lan_monitor_task: Mutex<Option<JoinHandle<()>>>,
92 pub(crate) flush_cancel: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
94}
95
96pub(crate) fn interval_duration() -> Duration {
97 settings().general_interval()
98}
99
100pub static SUPERVISOR: Lazy<Supervisor> =
101 Lazy::new(|| Supervisor::new().expect("Error creating supervisor"));
102
103pub fn start_if_not_running() -> Result<()> {
104 let sf = StateFile::get();
105 if let Some(d) = sf.daemons.get(&DaemonId::pitchfork())
106 && let Some(pid) = d.pid
107 {
108 PROCS.refresh_pids(&[pid]);
109 if PROCS.is_running(pid) {
110 return Ok(());
111 }
112 }
113 start_in_background()
114}
115
116pub fn start_in_background() -> Result<()> {
117 debug!("starting supervisor in background");
118 let log_file = &*env::PITCHFORK_LOG_FILE;
122 if let Some(parent) = log_file.parent() {
123 let _ = fs::create_dir_all(parent);
124 }
125 let stderr_file = fs::OpenOptions::new()
126 .create(true)
127 .append(true)
128 .open(log_file)
129 .into_diagnostic()?;
130 #[cfg(unix)]
131 fix_state_dir_permissions();
132 cmd!(&*env::PITCHFORK_BIN, "supervisor", "run")
133 .stdout_null()
134 .stderr_file(stderr_file)
135 .start()
136 .into_diagnostic()?;
137 Ok(())
138}
139
140impl Supervisor {
141 pub fn new() -> Result<Self> {
142 Ok(Self {
143 state_file: Mutex::new(StateFile::new(env::PITCHFORK_STATE_FILE.clone())),
144 last_refreshed_at: Mutex::new(time::Instant::now()),
145 pending_notifications: Mutex::new(vec![]),
146 pending_autostops: Mutex::new(HashMap::new()),
147 ipc_shutdown: Mutex::new(None),
148 hook_tasks: Mutex::new(Vec::new()),
149 active_monitors: AtomicU32::new(0),
150 monitor_done: Notify::new(),
151 proxy_cancel: Mutex::new(None),
152 proxy_task: Mutex::new(None),
153 mdns_publisher: Mutex::new(None),
154 lan_monitor_task: Mutex::new(None),
155 flush_cancel: std::sync::Mutex::new(None),
156 })
157 }
158
159 pub async fn start(
160 &self,
161 is_boot: bool,
162 container: bool,
163 web_port: Option<u16>,
164 web_path: Option<String>,
165 ) -> Result<()> {
166 #[cfg(unix)]
171 fix_state_dir_permissions();
172
173 let pid = std::process::id();
174 PROCS.refresh_pids(&[pid]);
176 let container_mode = container || settings().supervisor.container;
178 if container_mode {
179 info!("Starting supervisor in container/PID1 mode with pid {pid}");
180 } else {
181 info!("Starting supervisor with pid {pid}");
182 }
183
184 cleanup_orphaned_daemons(self).await;
189
190 self.upsert_daemon(
191 UpsertDaemonOpts::builder(DaemonId::pitchfork())
192 .set(|o| {
193 o.pid = Some(pid);
194 o.status = DaemonStatus::Running;
195 })
196 .build(),
197 )
198 .await?;
199 #[cfg(unix)]
200 fix_state_dir_permissions();
201
202 if is_boot {
204 info!("Boot start mode enabled, starting boot_start daemons");
205 self.start_boot_daemons().await?;
206 }
207
208 self.interval_watch()?;
209 self.cron_watch()?;
210 self.signals()?;
211 self.daemon_file_watch()?;
212
213 #[cfg(unix)]
215 if container_mode {
216 self.reap_zombies()?;
217 }
218
219 let s = settings();
221 let effective_port = web_port.or_else(|| {
222 if s.web.auto_start {
223 match u16::try_from(s.web.bind_port).ok().filter(|&p| p > 0) {
224 Some(p) => Some(p),
225 None => {
226 error!(
227 "web.bind_port {} is out of valid port range (1-65535), web UI disabled",
228 s.web.bind_port
229 );
230 None
231 }
232 }
233 } else {
234 None
235 }
236 });
237 let effective_path = web_path.or_else(|| {
239 let bp = s.web.base_path.clone();
240 if bp.is_empty() { None } else { Some(bp) }
241 });
242 if let Some(port) = effective_port {
243 tokio::spawn(async move {
244 if let Err(e) = crate::web::serve(port, effective_path).await {
245 error!("Web server error: {e}");
246 }
247 });
248 }
249
250 let api_port = if s.api.auto_start {
252 match u16::try_from(s.api.bind_port).ok().filter(|&p| p > 0) {
253 Some(p) => Some(p),
254 None => {
255 error!(
256 "api.bind_port {} is out of valid port range (1-65535), API server disabled",
257 s.api.bind_port
258 );
259 None
260 }
261 }
262 } else {
263 None
264 };
265 if let Some(port) = api_port {
266 tokio::spawn(async move {
267 if let Err(e) = crate::web::serve_api(port, None).await {
268 error!("API server error: {e}");
269 }
270 });
271 }
272
273 if s.proxy.enable {
275 #[cfg(feature = "proxy-tls")]
279 if s.proxy.https {
280 let proxy_dir = crate::env::PITCHFORK_STATE_DIR.join("proxy");
281 let ca_cert_path = proxy_dir.join("ca.pem");
282 let ca_key_path = proxy_dir.join("ca-key.pem");
283 if !ca_cert_path.exists() || !ca_key_path.exists() {
284 match crate::proxy::server::generate_ca(&ca_cert_path, &ca_key_path) {
285 Ok(()) => {
286 info!(
287 "Generated local CA certificate at {}",
288 ca_cert_path.display()
289 );
290 }
291 Err(e) => {
292 error!("Failed to generate CA certificate: {e}");
293 }
294 }
295 }
296
297 if s.proxy.auto_trust && ca_cert_path.exists() {
301 use crate::proxy::trust::{AutoTrustResult, auto_trust};
302 match auto_trust(&ca_cert_path) {
303 AutoTrustResult::AlreadyTrusted => {}
304 AutoTrustResult::Trusted => {
305 info!("CA certificate auto-trusted in system store");
306 }
307 AutoTrustResult::NotTrusted { reason } => {
308 warn!("Auto-trust skipped: {reason}");
309 warn!("Run `pitchfork proxy trust` to install manually");
310 }
311 }
312 }
313 }
314 let (bind_tx, bind_rx) = tokio::sync::oneshot::channel();
318 let proxy_cancel = tokio_util::sync::CancellationToken::new();
319 let proxy_cancel_clone = proxy_cancel.clone();
320 *self.proxy_cancel.lock().await = Some(proxy_cancel);
321 let proxy_task = tokio::spawn(async move {
322 if let Err(e) = crate::proxy::server::serve(bind_tx, proxy_cancel_clone).await {
323 error!("Proxy server error: {e}");
324 }
325 });
326 *self.proxy_task.lock().await = Some(proxy_task);
327 match bind_rx.await {
328 Ok(Ok(())) => {
329 info!("Proxy server bound successfully");
330 self.start_mdns().await;
331 }
332 Ok(Err(msg)) => {
333 error!("{msg}");
334 self.add_notification(log::LevelFilter::Error, msg).await;
335 }
336 Err(_) => {
337 }
341 }
342 }
343
344 tokio::spawn(async {
347 crate::proxy::server::get_cached_slugs().await;
348 });
349
350 let (ipc, ipc_handle) = IpcServer::new()?;
351 *self.ipc_shutdown.lock().await = Some(ipc_handle);
352 self.start_state_flush_task();
353 self.conn_watch(ipc).await
354 }
355
356 async fn start_mdns(&self) {
358 let s = crate::settings::settings();
359 let lan_enabled = s.proxy.lan || !s.proxy.lan_ip.is_empty();
360 if !s.proxy.enable || !lan_enabled {
361 return;
362 }
363
364 let lan_ip = if !s.proxy.lan_ip.is_empty() {
365 match s.proxy.lan_ip.parse::<std::net::Ipv4Addr>() {
366 Ok(ip) => Some(ip),
367 Err(e) => {
368 error!(
369 "proxy.lan_ip {:?} is not a valid IPv4 address: {e}",
370 s.proxy.lan_ip
371 );
372 return;
373 }
374 }
375 } else {
376 match crate::proxy::lan_ip::detect_lan_ip().await {
377 Some(ip) => Some(ip),
378 None => {
379 error!(
380 "LAN mode is enabled but no LAN IP address could be detected. \
381 Set proxy.lan_ip to a specific address, or ensure you are connected to a network."
382 );
383 return;
384 }
385 }
386 };
387
388 let Some(lan_ip) = lan_ip else { return };
389 let port = u16::try_from(s.proxy.port).unwrap_or(443);
390
391 let Some(mut publisher) = crate::proxy::mdns::MdnsPublisher::new(lan_ip) else {
392 error!("Failed to start mDNS publisher. Is Avahi (Linux) or Bonjour (macOS) running?");
393 return;
394 };
395
396 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
398 for slug in slugs.keys() {
399 let hostname = format!("{slug}.local");
400 publisher.publish(&hostname, port);
401 }
402
403 log::info!(
404 "LAN mode: mDNS publishing on {lan_ip}, {} slug(s) registered",
405 slugs.len()
406 );
407
408 let publisher = std::sync::Arc::new(tokio::sync::Mutex::new(publisher));
409
410 let ip_pinned = !s.proxy.lan_ip.is_empty();
412 if !ip_pinned {
413 let monitor_cancel = self.proxy_cancel.lock().await.clone();
414 let publisher_clone = publisher.clone();
415 let task = tokio::spawn(async move {
416 let mut last_ip = lan_ip;
417 let interval = std::time::Duration::from_secs(5);
418 let mut ticker = tokio::time::interval(interval);
419 ticker.tick().await; loop {
421 ticker.tick().await;
422 if let Some(cancel) = monitor_cancel.as_ref() {
423 if cancel.is_cancelled() {
424 break;
425 }
426 }
427 if let Some(new_ip) =
428 crate::proxy::lan_ip::detect_lan_ip_if_changed(last_ip).await
429 {
430 log::info!("LAN IP changed: {last_ip} → {new_ip}");
431 last_ip = new_ip;
432 let mut pub_guard = publisher_clone.lock().await;
433 pub_guard.republish_all(new_ip, port);
434 }
435 }
436 });
437 *self.lan_monitor_task.lock().await = Some(task);
438 }
439
440 *self.mdns_publisher.lock().await = Some(publisher);
441 }
442
443 async fn sync_mdns(&self) {
448 let publisher = {
451 let guard = self.mdns_publisher.lock().await;
452 match guard.as_ref() {
453 Some(p) => p.clone(),
454 None => {
455 debug!("sync_mdns: mDNS publisher not active, skipping");
456 return;
457 }
458 }
459 };
460
461 let s = crate::settings::settings();
462 let port = u16::try_from(s.proxy.port).unwrap_or(443);
463
464 let slugs = crate::pitchfork_toml::PitchforkToml::read_global_slugs();
465 let mut pub_guard = publisher.lock().await;
466
467 let current_keys: Vec<&String> = slugs.keys().collect();
469 let registered: Vec<String> = pub_guard.registered_hostnames();
470 for hostname in ®istered {
471 let slug = hostname.strip_suffix(".local").unwrap_or(hostname);
473 if !current_keys.iter().any(|k| k.as_str() == slug) {
474 log::info!("mDNS: unpublishing removed slug {slug}");
475 pub_guard.unpublish(hostname);
476 }
477 }
478
479 for slug in slugs.keys() {
481 let hostname = format!("{slug}.local");
482 if !pub_guard.is_published(&hostname) {
483 log::info!("mDNS: publishing new slug {slug}");
484 pub_guard.publish(&hostname, port);
485 }
486 }
487 }
488
489 fn start_state_flush_task(&self) {
493 let cancel = tokio_util::sync::CancellationToken::new();
494 *self.flush_cancel.lock().unwrap() = Some(cancel.clone());
495 tokio::spawn(async move {
496 let mut interval = time::interval(Duration::from_secs(1));
497 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
498 loop {
499 tokio::select! {
500 _ = interval.tick() => {}
501 _ = cancel.cancelled() => {
502 debug!("state flush task received shutdown signal");
503 break;
504 }
505 }
506 let state = SUPERVISOR.state_file.lock().await;
507 if state.is_dirty() {
508 if let Err(e) = state.write() {
509 warn!("failed to flush state file: {e}");
510 }
511 }
512 }
513 debug!("state flush task exiting");
514 });
515 }
516
517 pub(crate) async fn flush_state(&self) {
518 let state = self.state_file.lock().await;
519 if state.is_dirty() {
520 if let Err(e) = state.write() {
521 warn!("failed to flush state file: {e}");
522 }
523 }
524 }
525
526 pub(crate) async fn refresh(&self) -> Result<()> {
527 trace!("refreshing");
528
529 let dirs_with_pids = self.get_dirs_with_shell_pids().await;
532 let pids_to_check: Vec<u32> = dirs_with_pids.values().flatten().copied().collect();
533
534 if pids_to_check.is_empty() {
535 trace!("no shell PIDs to check, skipping process refresh");
537 } else {
538 PROCS.refresh_pids(&pids_to_check);
539 }
540
541 let mut last_refreshed_at = self.last_refreshed_at.lock().await;
542 *last_refreshed_at = time::Instant::now();
543
544 for (dir, pids) in dirs_with_pids {
545 let to_remove = pids
546 .iter()
547 .filter(|pid| !PROCS.is_running(**pid))
548 .collect::<Vec<_>>();
549 for pid in &to_remove {
550 self.remove_shell_pid(**pid).await?
551 }
552 if to_remove.len() == pids.len() {
553 self.leave_dir(&dir).await?;
554 }
555 }
556
557 self.check_retry().await?;
558 self.process_pending_autostops().await?;
559
560 Ok(())
561 }
562
563 #[cfg(unix)]
588 fn reap_zombies(&self) -> Result<()> {
589 let mut stream = signal::unix::signal(SignalKind::child())
590 .map_err(|e| miette::miette!("Failed to register SIGCHLD handler: {e}"))?;
591 tokio::spawn(async move {
592 loop {
593 stream.recv().await;
594 let managed_pids: HashSet<u32> = SUPERVISOR
596 .state_file
597 .lock()
598 .await
599 .daemons
600 .values()
601 .filter_map(|d| d.pid)
602 .collect();
603 Self::reap_unmanaged_zombies(&managed_pids).await;
605 }
606 });
607 info!("container mode: SIGCHLD zombie reaper installed");
608 Ok(())
609 }
610
611 #[cfg(target_os = "linux")]
617 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
618 use nix::sys::wait::{Id, WaitPidFlag, WaitStatus, waitid, waitpid};
619 use nix::unistd::Pid;
620
621 loop {
622 let peek_flags = WaitPidFlag::WNOHANG | WaitPidFlag::WNOWAIT | WaitPidFlag::WEXITED;
624 match waitid(Id::All, peek_flags) {
625 Ok(WaitStatus::StillAlive) => break,
626 Ok(status) => {
627 let Some(pid_raw) = status.pid().map(|p| p.as_raw() as u32) else {
628 break;
629 };
630 if managed_pids.contains(&pid_raw) {
631 trace!(
635 "zombie reaper: skipping managed daemon pid {pid_raw}, \
636 leaving for Tokio to reap"
637 );
638 break;
639 }
640 match waitpid(Pid::from_raw(pid_raw as i32), Some(WaitPidFlag::WNOHANG)) {
642 Ok(s) => trace!("reaped orphaned zombie child: {s:?}"),
643 Err(nix::errno::Errno::ECHILD) => break,
644 Err(e) => {
645 trace!("waitpid error reaping pid {pid_raw}: {e}");
646 break;
647 }
648 }
649 }
650 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
652 trace!("waitid error in zombie reaper: {e}");
653 break;
654 }
655 }
656 }
657 }
658
659 #[cfg(all(unix, not(target_os = "linux")))]
665 async fn reap_unmanaged_zombies(managed_pids: &HashSet<u32>) {
666 use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
667
668 loop {
669 match waitpid(None, Some(WaitPidFlag::WNOHANG)) {
670 Ok(WaitStatus::StillAlive) => break,
671 Ok(status) => {
672 let Some(pid) = status.pid().map(|p| p.as_raw() as u32) else {
673 continue;
674 };
675 if managed_pids.contains(&pid) {
676 let exit_code = match status {
678 WaitStatus::Exited(_, code) => code,
679 WaitStatus::Signaled(_, sig, _) => -(sig as i32),
680 _ => -1,
681 };
682 warn!(
683 "zombie reaper reaped managed daemon pid {pid} \
684 (exit_code={exit_code}); stashing status for recovery"
685 );
686 REAPED_STATUSES.lock().await.insert(pid, exit_code);
687 } else {
688 trace!("reaped orphaned zombie child: {status:?}");
689 }
690 }
691 Err(nix::errno::Errno::ECHILD) => break, Err(e) => {
693 trace!("waitpid error in zombie reaper: {e}");
694 break;
695 }
696 }
697 }
698 }
699
700 #[cfg(unix)]
701 fn signals(&self) -> Result<()> {
702 let signals = [
703 SignalKind::terminate(),
704 SignalKind::alarm(),
705 SignalKind::interrupt(),
706 SignalKind::quit(),
707 SignalKind::hangup(),
708 SignalKind::user_defined1(),
709 SignalKind::user_defined2(),
710 ];
711 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
712 for signal in signals {
713 let stream = match signal::unix::signal(signal) {
714 Ok(s) => s,
715 Err(e) => {
716 warn!("Failed to register signal handler for {signal:?}: {e}");
717 continue;
718 }
719 };
720 tokio::spawn(async move {
721 let mut stream = stream;
722 loop {
723 stream.recv().await;
724 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
725 exit(1);
726 } else {
727 SUPERVISOR.handle_signal().await;
728 }
729 }
730 });
731 }
732 Ok(())
733 }
734
735 #[cfg(windows)]
736 fn signals(&self) -> Result<()> {
737 tokio::spawn(async move {
738 static RECEIVED_SIGNAL: AtomicBool = AtomicBool::new(false);
739 loop {
740 if let Err(e) = signal::ctrl_c().await {
741 error!("Failed to wait for ctrl-c: {}", e);
742 return;
743 }
744 if RECEIVED_SIGNAL.swap(true, atomic::Ordering::SeqCst) {
745 exit(1);
746 } else {
747 SUPERVISOR.handle_signal().await;
748 }
749 }
750 });
751 Ok(())
752 }
753
754 async fn handle_signal(&self) {
755 info!("received signal, stopping");
756 self.close().await;
757 exit(0)
758 }
759
760 pub(crate) async fn close(&self) {
761 if let Some(cancel) = self.proxy_cancel.lock().await.take() {
765 cancel.cancel();
766 }
767
768 if let Some(monitor_task) = self.lan_monitor_task.lock().await.take() {
770 monitor_task.abort();
771 }
772
773 if let Some(publisher) = self.mdns_publisher.lock().await.take() {
775 publisher.lock().await.shutdown();
776 }
777
778 if let Some(proxy_task) = self.proxy_task.lock().await.take() {
779 let _ = tokio::time::timeout(Duration::from_secs(12), proxy_task).await;
780 }
781
782 let s = settings();
784 if s.proxy.enable && s.proxy.sync_hosts {
785 crate::proxy::hosts::clean_hosts_file();
786 }
787
788 let pitchfork_id = DaemonId::pitchfork();
789 let active = self.active_daemons().await;
790 let active_ids: Vec<DaemonId> = active
791 .iter()
792 .filter(|d| d.id != pitchfork_id)
793 .map(|d| d.id.clone())
794 .collect();
795
796 let stop_levels = compute_reverse_stop_order(&active_ids);
801 for level in &stop_levels {
802 let mut tasks = Vec::new();
803 for id in level {
804 let id = id.clone();
805 tasks.push(tokio::spawn(async move {
806 if let Err(err) = SUPERVISOR.stop(&id).await {
807 error!("failed to stop daemon {id}: {err}");
808 }
809 }));
810 }
811 for task in tasks {
812 let _ = task.await;
813 }
814 }
815 let _ = self.remove_daemon(&pitchfork_id).await;
816
817 if let Some(cancel) = self.flush_cancel.lock().unwrap().take() {
820 cancel.cancel();
821 }
822
823 {
826 let state = self.state_file.lock().await;
827 if state.is_dirty() {
828 if let Err(e) = state.write() {
829 warn!("failed to flush state file during shutdown: {e}");
830 }
831 }
832 }
833
834 if let Some(mut handle) = self.ipc_shutdown.lock().await.take() {
836 handle.shutdown();
837 }
838
839 let drain_timeout = time::sleep(Duration::from_secs(5));
845 tokio::pin!(drain_timeout);
846 loop {
847 if self.active_monitors.load(atomic::Ordering::Acquire) == 0 {
848 break;
849 }
850 tokio::select! {
851 _ = self.monitor_done.notified() => {}
852 _ = &mut drain_timeout => {
853 warn!("timed out waiting for monitoring tasks to register hooks, proceeding with shutdown");
854 break;
855 }
856 }
857 }
858 let handles: Vec<JoinHandle<()>> = std::mem::take(&mut *self.hook_tasks.lock().await);
859 let hook_timeout = Duration::from_secs(30);
860 for handle in handles {
861 match time::timeout(hook_timeout, handle).await {
862 Ok(_) => {} Err(_) => {
864 warn!(
865 "hook task did not complete within {hook_timeout:?} during shutdown, skipping"
866 );
867 }
868 }
869 }
870
871 let _ = fs::remove_dir_all(&*env::IPC_SOCK_DIR);
872 }
873
874 pub(crate) async fn add_notification(&self, level: log::LevelFilter, message: String) {
875 self.pending_notifications
876 .lock()
877 .await
878 .push((level, message));
879 }
880}
881
882#[cfg(unix)]
900fn fix_state_dir_permissions() {
901 let state_dir = &*env::PITCHFORK_STATE_DIR;
902 if let Some((uid, gid)) = state_owner_ids() {
903 if !state_dir.exists()
904 && let Err(err) = fs::create_dir_all(state_dir)
905 {
906 warn!(
907 "failed to create state directory for ownership fix at {}: {err}",
908 state_dir.display()
909 );
910 return;
911 }
912
913 chown_recursive(state_dir, uid, gid, true);
915 debug!(
916 "chowned state directory to uid={uid} gid={gid} at {}",
917 state_dir.display()
918 );
919 } else {
920 if !state_dir.exists() {
921 return;
922 }
923
924 chmod_safe_subtrees(state_dir);
927 debug!(
928 "relaxed permissions on safe subtrees at {}",
929 state_dir.display()
930 );
931 }
932}
933
934#[cfg(unix)]
935pub(crate) fn state_owner_ids() -> Option<(u32, u32)> {
936 if !nix::unistd::Uid::effective().is_root() {
937 return None;
938 }
939
940 let s = settings();
941 let user = s.supervisor.user.trim();
942 if !user.is_empty() {
943 return resolve_supervisor_user_ids(user).or_else(|| {
944 warn!(
945 "failed to resolve supervisor.user '{user}' for state ownership; falling back to SUDO_UID/SUDO_GID"
946 );
947 parse_sudo_ids()
948 });
949 }
950
951 parse_sudo_ids()
952}
953
954#[cfg(unix)]
955fn resolve_supervisor_user_ids(user: &str) -> Option<(u32, u32)> {
956 let user_record = if user.chars().all(|c| c.is_ascii_digit()) {
957 let uid = user.parse::<u32>().ok()?;
958 nix::unistd::User::from_uid(nix::unistd::Uid::from_raw(uid))
959 .ok()
960 .flatten()
961 } else {
962 nix::unistd::User::from_name(user).ok().flatten()
963 }?;
964
965 Some((user_record.uid.as_raw(), user_record.gid.as_raw()))
966}
967
968#[cfg(unix)]
974fn parse_sudo_ids() -> Option<(u32, u32)> {
975 if !nix::unistd::Uid::effective().is_root() {
976 return None;
977 }
978 let uid: u32 = std::env::var("SUDO_UID").ok()?.parse().ok()?;
979 let gid: u32 = std::env::var("SUDO_GID").ok()?.parse().ok()?;
980 Some((uid, gid))
981}
982
983#[cfg(unix)]
986fn chown_recursive(dir: &std::path::Path, uid: u32, gid: u32, skip_proxy: bool) {
987 let _ = chown_path(dir, uid, gid);
989
990 let entries = match std::fs::read_dir(dir) {
991 Ok(e) => e,
992 Err(_) => return,
993 };
994 for entry in entries.flatten() {
995 let path = entry.path();
996 if path.is_dir() {
997 if skip_proxy {
999 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
1000 if name == "proxy" {
1001 continue;
1002 }
1003 }
1004 }
1005 chown_recursive(&path, uid, gid, false);
1006 } else {
1007 let _ = chown_path(&path, uid, gid);
1008 }
1009 }
1010}
1011
1012#[cfg(unix)]
1014fn chown_path(path: &std::path::Path, uid: u32, gid: u32) -> std::io::Result<()> {
1015 use std::ffi::CString;
1016 use std::os::unix::ffi::OsStrExt;
1017 let c_path = CString::new(path.as_os_str().as_bytes())
1018 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
1019 let ret = unsafe { libc::chown(c_path.as_ptr(), uid, gid) };
1020 if ret == 0 {
1021 Ok(())
1022 } else {
1023 Err(std::io::Error::last_os_error())
1024 }
1025}
1026
1027#[cfg(unix)]
1030fn chmod_safe_subtrees(state_dir: &std::path::Path) {
1031 let _ = fs::set_permissions(state_dir, fs::Permissions::from_mode(0o755));
1033
1034 let state_file = state_dir.join("state.toml");
1036 if state_file.exists() {
1037 let _ = fs::set_permissions(&state_file, fs::Permissions::from_mode(0o644));
1038 }
1039
1040 for subdir_name in &["sock", "logs"] {
1042 let subdir = state_dir.join(subdir_name);
1043 if subdir.is_dir() {
1044 chmod_recursive(&subdir);
1045 }
1046 }
1047}
1048
1049async fn cleanup_orphaned_daemons(supervisor: &Supervisor) {
1062 if !settings().supervisor.cleanup_orphans {
1063 return;
1064 }
1065
1066 let candidates: Vec<_> = {
1067 let state = supervisor.state_file.lock().await;
1068 state
1069 .daemons
1070 .values()
1071 .filter(|d| d.id != DaemonId::pitchfork() && d.pid.is_some())
1072 .cloned()
1073 .collect()
1074 };
1075
1076 if candidates.is_empty() {
1077 return;
1078 }
1079
1080 info!(
1081 "checking {} daemon(s) for orphaned processes",
1082 candidates.len()
1083 );
1084
1085 for daemon in candidates {
1086 let Some(pid) = daemon.pid else { continue };
1087
1088 PROCS.refresh_pids(&[pid]);
1089
1090 if !PROCS.is_running(pid) {
1091 let _ = supervisor
1093 .upsert_daemon(
1094 UpsertDaemonOpts::builder(daemon.id.clone())
1095 .set(|o| {
1096 o.pid = None;
1097 o.status = DaemonStatus::Stopped;
1098 o.active_port = None;
1099 })
1100 .build(),
1101 )
1102 .await;
1103 continue;
1104 }
1105
1106 let current_title = PROCS.title(pid);
1110 let matches = match (¤t_title, &daemon.title) {
1111 (Some(current), Some(expected)) => current == expected,
1112 _ => true,
1116 };
1117
1118 if !matches {
1119 warn!(
1120 "pid {pid} for daemon {} has changed name (expected '{}', found '{}'); skipping orphan cleanup",
1121 daemon.id,
1122 daemon.title.as_deref().unwrap_or("?"),
1123 current_title.as_deref().unwrap_or("?")
1124 );
1125 let _ = supervisor
1127 .upsert_daemon(
1128 UpsertDaemonOpts::builder(daemon.id.clone())
1129 .set(|o| {
1130 o.pid = None;
1131 o.status = DaemonStatus::Stopped;
1132 o.active_port = None;
1133 })
1134 .build(),
1135 )
1136 .await;
1137 continue;
1138 }
1139
1140 info!("terminating orphaned daemon {} (pid {pid})", daemon.id);
1141
1142 let stop_cfg = daemon.stop_signal.unwrap_or_default();
1143 let _ = PROCS
1144 .kill_process_group_async(pid, stop_cfg.signal.into(), stop_cfg.timeout)
1145 .await;
1146
1147 let _ = supervisor
1148 .upsert_daemon(
1149 UpsertDaemonOpts::builder(daemon.id.clone())
1150 .set(|o| {
1151 o.pid = None;
1152 o.status = DaemonStatus::Stopped;
1153 o.active_port = None;
1154 })
1155 .build(),
1156 )
1157 .await;
1158 }
1159}
1160
1161#[cfg(unix)]
1163fn chmod_recursive(dir: &std::path::Path) {
1164 let _ = fs::set_permissions(dir, fs::Permissions::from_mode(0o755));
1165 let entries = match fs::read_dir(dir) {
1166 Ok(e) => e,
1167 Err(_) => return,
1168 };
1169 for entry in entries.flatten() {
1170 let path = entry.path();
1171 if path.is_dir() {
1172 chmod_recursive(&path);
1173 } else {
1174 let _ = fs::set_permissions(&path, fs::Permissions::from_mode(0o644));
1175 }
1176 }
1177}