1use crate::user::ProcessIdentity;
2use std::collections::{BTreeMap, BTreeSet, VecDeque};
3use std::error::Error;
4use std::fmt;
5use std::ops::{BitOr, BitOrAssign};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Condvar, Mutex, MutexGuard, WaitTimeoutResult, Weak};
8use std::thread;
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11const ZOMBIE_TTL: Duration = Duration::from_secs(60);
12const INIT_PID: u32 = 1;
13const MAX_ALLOCATED_PID: u32 = i32::MAX as u32;
14pub const DEFAULT_PROCESS_UMASK: u32 = 0o022;
15pub const SIGHUP: i32 = 1;
16pub const SIGCHLD: i32 = 17;
17pub const SIGCONT: i32 = 18;
18pub const SIGSTOP: i32 = 19;
19pub const SIGTSTP: i32 = 20;
20pub const SIGTERM: i32 = 15;
21pub const SIGKILL: i32 = 9;
22pub const SIGPIPE: i32 = 13;
23pub const SIGWINCH: i32 = 28;
24const MAX_SIGNAL: i32 = 64;
25
26pub type ProcessResult<T> = Result<T, ProcessTableError>;
27pub type ProcessExitCallback = Arc<dyn Fn(i32) + Send + Sync + 'static>;
28
29pub trait DriverProcess: Send + Sync {
30 fn kill(&self, signal: i32);
31 fn wait(&self, timeout: Duration) -> Option<i32>;
32 fn set_on_exit(&self, callback: ProcessExitCallback);
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct ProcessTableError {
37 code: &'static str,
38 message: String,
39}
40
41impl ProcessTableError {
42 pub fn code(&self) -> &'static str {
43 self.code
44 }
45
46 fn invalid_signal(signal: i32) -> Self {
47 Self {
48 code: "EINVAL",
49 message: format!("invalid signal {signal}"),
50 }
51 }
52
53 fn no_such_process(pid: u32) -> Self {
54 Self {
55 code: "ESRCH",
56 message: format!("no such process {pid}"),
57 }
58 }
59
60 fn no_such_process_group(pgid: u32) -> Self {
61 Self {
62 code: "ESRCH",
63 message: format!("no such process group {pgid}"),
64 }
65 }
66
67 fn no_matching_child(waiter_pid: u32, pid: i32) -> Self {
68 Self {
69 code: "ECHILD",
70 message: format!("process {waiter_pid} has no matching child for waitpid({pid})"),
71 }
72 }
73
74 fn pid_space_exhausted() -> Self {
75 Self {
76 code: "EAGAIN",
77 message: String::from("process id space exhausted"),
78 }
79 }
80
81 fn permission_denied(message: impl Into<String>) -> Self {
82 Self {
83 code: "EPERM",
84 message: message.into(),
85 }
86 }
87}
88
89impl fmt::Display for ProcessTableError {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 write!(f, "{}: {}", self.code, self.message)
92 }
93}
94
95impl Error for ProcessTableError {}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ProcessStatus {
99 Running,
100 Stopped,
101 Exited,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
105pub struct SignalSet {
106 bits: u64,
107}
108
109impl SignalSet {
110 pub const fn empty() -> Self {
111 Self { bits: 0 }
112 }
113
114 pub const fn is_empty(self) -> bool {
115 self.bits == 0
116 }
117
118 pub fn from_signal(signal: i32) -> ProcessResult<Self> {
119 Ok(Self {
120 bits: signal_bit(signal)?,
121 })
122 }
123
124 pub fn from_signals(signals: impl IntoIterator<Item = i32>) -> ProcessResult<Self> {
125 let mut set = Self::empty();
126 for signal in signals {
127 set.insert(signal)?;
128 }
129 Ok(set)
130 }
131
132 pub fn contains(self, signal: i32) -> bool {
133 signal_bit(signal)
134 .map(|bit| self.bits & bit != 0)
135 .unwrap_or(false)
136 }
137
138 pub fn insert(&mut self, signal: i32) -> ProcessResult<()> {
139 self.bits |= signal_bit(signal)?;
140 Ok(())
141 }
142
143 pub fn remove(&mut self, signal: i32) -> ProcessResult<()> {
144 self.bits &= !signal_bit(signal)?;
145 Ok(())
146 }
147
148 pub fn union(self, other: Self) -> Self {
149 Self {
150 bits: self.bits | other.bits,
151 }
152 }
153
154 pub fn difference(self, other: Self) -> Self {
155 Self {
156 bits: self.bits & !other.bits,
157 }
158 }
159
160 pub fn signals(self) -> Vec<i32> {
161 let mut signals = Vec::new();
162 for signal in 1..=MAX_SIGNAL {
163 if self.contains(signal) {
164 signals.push(signal);
165 }
166 }
167 signals
168 }
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum SigmaskHow {
173 Block,
174 Unblock,
175 SetMask,
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
179pub struct WaitPidFlags {
180 bits: u32,
181}
182
183impl WaitPidFlags {
184 pub const WNOHANG: Self = Self { bits: 1 << 0 };
185 pub const WUNTRACED: Self = Self { bits: 1 << 1 };
186 pub const WCONTINUED: Self = Self { bits: 1 << 2 };
187
188 pub const fn empty() -> Self {
189 Self { bits: 0 }
190 }
191
192 pub const fn contains(self, other: Self) -> bool {
193 (self.bits & other.bits) == other.bits
194 }
195}
196
197impl Default for WaitPidFlags {
198 fn default() -> Self {
199 Self::empty()
200 }
201}
202
203impl BitOr for WaitPidFlags {
204 type Output = Self;
205
206 fn bitor(self, rhs: Self) -> Self::Output {
207 Self {
208 bits: self.bits | rhs.bits,
209 }
210 }
211}
212
213impl BitOrAssign for WaitPidFlags {
214 fn bitor_assign(&mut self, rhs: Self) {
215 self.bits |= rhs.bits;
216 }
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum ProcessWaitEvent {
221 Exited,
222 Stopped,
223 Continued,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct ProcessWaitResult {
228 pub pid: u32,
229 pub status: i32,
230 pub event: ProcessWaitEvent,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct ProcessFileDescriptors {
235 pub stdin: u32,
236 pub stdout: u32,
237 pub stderr: u32,
238}
239
240impl Default for ProcessFileDescriptors {
241 fn default() -> Self {
242 Self {
243 stdin: 0,
244 stdout: 1,
245 stderr: 2,
246 }
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct ProcessContext {
252 pub pid: u32,
253 pub ppid: u32,
254 pub env: BTreeMap<String, String>,
255 pub cwd: String,
256 pub umask: u32,
257 pub fds: ProcessFileDescriptors,
258 pub identity: ProcessIdentity,
259 pub blocked_signals: SignalSet,
260 pub pending_signals: SignalSet,
261}
262
263impl Default for ProcessContext {
264 fn default() -> Self {
265 Self {
266 pid: 0,
267 ppid: 0,
268 env: BTreeMap::new(),
269 cwd: String::from("/"),
270 umask: DEFAULT_PROCESS_UMASK,
271 fds: ProcessFileDescriptors::default(),
272 identity: ProcessIdentity::default(),
273 blocked_signals: SignalSet::empty(),
274 pending_signals: SignalSet::empty(),
275 }
276 }
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct ProcessEntry {
281 pub pid: u32,
282 pub ppid: u32,
283 pub pgid: u32,
284 pub sid: u32,
285 pub driver: String,
286 pub command: String,
287 pub args: Vec<String>,
288 pub status: ProcessStatus,
289 pub exit_code: Option<i32>,
290 pub exit_time_ms: Option<u64>,
291 pub env: BTreeMap<String, String>,
292 pub cwd: String,
293 pub umask: u32,
294 pub identity: ProcessIdentity,
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct ProcessInfo {
299 pub pid: u32,
300 pub ppid: u32,
301 pub pgid: u32,
302 pub sid: u32,
303 pub driver: String,
304 pub command: String,
305 pub status: ProcessStatus,
306 pub exit_code: Option<i32>,
307 pub identity: ProcessIdentity,
308}
309
310#[derive(Clone)]
311pub struct ProcessTable {
312 inner: Arc<ProcessTableInner>,
313}
314
315struct ProcessTableInner {
316 state: Mutex<ProcessTableState>,
317 waiters: Condvar,
318 reaper: Arc<ZombieReaper>,
319}
320
321struct ProcessRecord {
322 entry: ProcessEntry,
323 driver_process: Arc<dyn DriverProcess>,
324 pending_wait_events: VecDeque<PendingWaitEvent>,
325 blocked_signals: SignalSet,
326 pending_signals: SignalSet,
327}
328
329struct ScheduledSignalDelivery {
330 pid: u32,
331 signal: i32,
332 status: ProcessStatus,
333 driver_process: Arc<dyn DriverProcess>,
334}
335
336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337struct PendingWaitEvent {
338 status: i32,
339 event: ProcessWaitEvent,
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343enum WaitSelector {
344 AnyChild,
345 ChildPid(u32),
346 ProcessGroup(u32),
347}
348
349struct ZombieReaper {
350 state: Mutex<ZombieReaperState>,
351 wake: Condvar,
352 thread_spawns: AtomicUsize,
353}
354
355#[derive(Default)]
356struct ZombieReaperState {
357 deadlines: BTreeMap<u32, Instant>,
358 shutdown: bool,
359}
360
361struct ProcessTableState {
362 entries: BTreeMap<u32, ProcessRecord>,
363 next_pid: u32,
364 zombie_ttl: Duration,
365 on_process_exit: Option<Arc<dyn Fn(u32) + Send + Sync + 'static>>,
366 terminating_all: bool,
367}
368
369impl Default for ProcessTableState {
370 fn default() -> Self {
371 Self {
372 entries: BTreeMap::new(),
373 next_pid: 1,
374 zombie_ttl: ZOMBIE_TTL,
375 on_process_exit: None,
376 terminating_all: false,
377 }
378 }
379}
380
381impl Default for ProcessTable {
382 fn default() -> Self {
383 let reaper = Arc::new(ZombieReaper::default());
384 Self {
385 inner: {
386 let inner = Arc::new(ProcessTableInner {
387 state: Mutex::new(ProcessTableState::default()),
388 waiters: Condvar::new(),
389 reaper,
390 });
391 start_zombie_reaper(Arc::downgrade(&inner), Arc::clone(&inner.reaper));
392 inner
393 },
394 }
395 }
396}
397
398impl ProcessTable {
399 pub fn new() -> Self {
400 Self::default()
401 }
402
403 pub fn with_zombie_ttl(zombie_ttl: Duration) -> Self {
404 let table = Self::new();
405 table.inner.lock_state().zombie_ttl = zombie_ttl;
406 table
407 }
408
409 pub fn allocate_pid(&self) -> ProcessResult<u32> {
410 let mut state = self.inner.lock_state();
411 let start = normalize_next_pid(state.next_pid);
412 let mut pid = start;
413
414 loop {
415 if !state.entries.contains_key(&pid) {
416 state.next_pid = next_allocated_pid_after(pid);
417 return Ok(pid);
418 }
419
420 pid = next_allocated_pid_after(pid);
421 if pid == start {
422 return Err(ProcessTableError::pid_space_exhausted());
423 }
424 }
425 }
426
427 pub fn set_on_process_exit(&self, callback: Option<Arc<dyn Fn(u32) + Send + Sync + 'static>>) {
428 self.inner.lock_state().on_process_exit = callback;
429 }
430
431 pub fn register(
432 &self,
433 pid: u32,
434 driver: impl Into<String>,
435 command: impl Into<String>,
436 args: Vec<String>,
437 ctx: ProcessContext,
438 driver_process: Arc<dyn DriverProcess>,
439 ) -> ProcessEntry {
440 let (pgid, sid) = {
441 let state = self.inner.lock_state();
442 match state.entries.get(&ctx.ppid) {
443 Some(parent) => (parent.entry.pgid, parent.entry.sid),
444 None => (pid, pid),
445 }
446 };
447
448 let entry = ProcessEntry {
449 pid,
450 ppid: ctx.ppid,
451 pgid,
452 sid,
453 driver: driver.into(),
454 command: command.into(),
455 args,
456 status: ProcessStatus::Running,
457 exit_code: None,
458 exit_time_ms: None,
459 env: ctx.env,
460 cwd: ctx.cwd,
461 umask: ctx.umask & 0o777,
462 identity: ctx.identity,
463 };
464
465 let weak = Arc::downgrade(&self.inner);
466 driver_process.set_on_exit(Arc::new(move |code| {
467 if let Some(inner) = weak.upgrade() {
468 mark_exited_inner(&inner, pid, code);
469 }
470 }));
471
472 let mut state = self.inner.lock_state();
473 state.next_pid = next_pid_after_registered(state.next_pid, pid);
474 state.entries.insert(
475 pid,
476 ProcessRecord {
477 entry: entry.clone(),
478 driver_process,
479 pending_wait_events: VecDeque::new(),
480 blocked_signals: ctx.blocked_signals,
481 pending_signals: ctx.pending_signals,
482 },
483 );
484
485 entry
486 }
487
488 pub fn get(&self, pid: u32) -> Option<ProcessEntry> {
489 self.inner
490 .lock_state()
491 .entries
492 .get(&pid)
493 .map(|record| record.entry.clone())
494 }
495
496 pub fn zombie_timer_count(&self) -> usize {
497 self.inner.reaper.scheduled_count()
498 }
499
500 pub fn zombie_reaper_thread_spawn_count(&self) -> usize {
501 self.inner.reaper.thread_spawn_count()
502 }
503
504 pub fn running_count(&self) -> usize {
505 self.inner
506 .lock_state()
507 .entries
508 .values()
509 .filter(|record| record.entry.status == ProcessStatus::Running)
510 .count()
511 }
512
513 pub fn mark_exited(&self, pid: u32, exit_code: i32) {
514 mark_exited_inner(&self.inner, pid, exit_code);
515 }
516
517 pub fn mark_stopped(&self, pid: u32, signal: i32) {
518 mark_wait_event_inner(
519 &self.inner,
520 pid,
521 ProcessStatus::Stopped,
522 PendingWaitEvent {
523 status: signal,
524 event: ProcessWaitEvent::Stopped,
525 },
526 );
527 }
528
529 pub fn mark_continued(&self, pid: u32) {
530 mark_wait_event_inner(
531 &self.inner,
532 pid,
533 ProcessStatus::Running,
534 PendingWaitEvent {
535 status: SIGCONT,
536 event: ProcessWaitEvent::Continued,
537 },
538 );
539 }
540
541 pub fn waitpid(&self, pid: u32) -> ProcessResult<(u32, i32)> {
542 let mut state = self.inner.lock_state();
543 loop {
544 let Some(record) = state.entries.get(&pid) else {
545 return Err(ProcessTableError::no_such_process(pid));
546 };
547
548 if record.entry.status == ProcessStatus::Exited {
549 let status = record.entry.exit_code.unwrap_or_default();
550 state.entries.remove(&pid);
551 drop(state);
552 self.inner.reaper.cancel(pid);
553 self.inner.waiters.notify_all();
554 return Ok((pid, status));
555 }
556
557 state = self.inner.wait_for_state(state);
558 }
559 }
560
561 pub fn waitpid_for(
562 &self,
563 waiter_pid: u32,
564 pid: i32,
565 flags: WaitPidFlags,
566 ) -> ProcessResult<Option<ProcessWaitResult>> {
567 let mut state = self.inner.lock_state();
568 loop {
569 let selector = resolve_wait_selector(&state, waiter_pid, pid)?;
570 let matching_children = matching_child_pids(&state, waiter_pid, selector);
571 if matching_children.is_empty() {
572 return Err(ProcessTableError::no_matching_child(waiter_pid, pid));
573 }
574
575 if let Some(result) = take_waitable_event(&mut state, &matching_children, flags) {
576 let should_reap = result.event == ProcessWaitEvent::Exited;
577 drop(state);
578 if should_reap {
579 self.inner.reaper.cancel(result.pid);
580 self.inner.waiters.notify_all();
581 }
582 return Ok(Some(result));
583 }
584
585 if flags.contains(WaitPidFlags::WNOHANG) {
586 return Ok(None);
587 }
588
589 state = self.inner.wait_for_state(state);
590 }
591 }
592
593 pub fn kill(&self, pid: i32, signal: i32) -> ProcessResult<()> {
594 if !(0..=MAX_SIGNAL).contains(&signal) {
595 return Err(ProcessTableError::invalid_signal(signal));
596 }
597
598 let deliveries = {
599 let mut state = self.inner.lock_state();
600 if pid < 0 {
601 let pgid = pid.unsigned_abs();
602 let grouped = state
603 .entries
604 .values()
605 .filter(|record| record.entry.pgid == pgid)
606 .map(|record| record.entry.pid)
607 .collect::<Vec<_>>();
608 if grouped.is_empty() {
609 return Err(ProcessTableError::no_such_process_group(pgid));
610 }
611 if signal == 0 {
612 return Ok(());
613 }
614 collect_signal_deliveries(&mut state, &grouped, signal)?
615 } else {
616 let pid = pid as u32;
617 let Some(record) = state.entries.get(&pid) else {
618 return Err(ProcessTableError::no_such_process(pid));
619 };
620 if record.entry.status == ProcessStatus::Exited || signal == 0 {
621 return Ok(());
622 }
623 collect_signal_deliveries(&mut state, &[pid], signal)?
624 }
625 };
626
627 if signal == 0 {
628 return Ok(());
629 }
630
631 deliver_signals(&self.inner, deliveries);
632 Ok(())
633 }
634
635 pub fn setpgid(&self, pid: u32, pgid: u32) -> ProcessResult<()> {
636 let mut state = self.inner.lock_state();
637 let (current_sid, target_pgid) = {
638 let Some(record) = state.entries.get(&pid) else {
639 return Err(ProcessTableError::no_such_process(pid));
640 };
641 (record.entry.sid, if pgid == 0 { pid } else { pgid })
642 };
643
644 if target_pgid != pid {
645 let mut group_exists = false;
646 for record in state.entries.values() {
647 if record.entry.pgid != target_pgid || record.entry.status == ProcessStatus::Exited
648 {
649 continue;
650 }
651 if record.entry.sid != current_sid {
652 return Err(ProcessTableError::permission_denied(
653 "cannot join process group in different session",
654 ));
655 }
656 group_exists = true;
657 break;
658 }
659 if !group_exists {
660 return Err(ProcessTableError::permission_denied(format!(
661 "no such process group {target_pgid}"
662 )));
663 }
664 }
665
666 if let Some(record) = state.entries.get_mut(&pid) {
667 record.entry.pgid = target_pgid;
668 }
669 Ok(())
670 }
671
672 pub fn getpgid(&self, pid: u32) -> ProcessResult<u32> {
673 self.get(pid)
674 .map(|entry| entry.pgid)
675 .ok_or_else(|| ProcessTableError::no_such_process(pid))
676 }
677
678 pub fn setsid(&self, pid: u32) -> ProcessResult<u32> {
679 let mut state = self.inner.lock_state();
680 let Some(record) = state.entries.get_mut(&pid) else {
681 return Err(ProcessTableError::no_such_process(pid));
682 };
683
684 if record.entry.pgid == pid {
685 return Err(ProcessTableError::permission_denied(format!(
686 "process {pid} is already a process group leader"
687 )));
688 }
689
690 record.entry.sid = pid;
691 record.entry.pgid = pid;
692 Ok(pid)
693 }
694
695 pub fn getsid(&self, pid: u32) -> ProcessResult<u32> {
696 self.get(pid)
697 .map(|entry| entry.sid)
698 .ok_or_else(|| ProcessTableError::no_such_process(pid))
699 }
700
701 pub fn getppid(&self, pid: u32) -> ProcessResult<u32> {
702 self.get(pid)
703 .map(|entry| entry.ppid)
704 .ok_or_else(|| ProcessTableError::no_such_process(pid))
705 }
706
707 pub fn get_umask(&self, pid: u32) -> ProcessResult<u32> {
708 self.get(pid)
709 .map(|entry| entry.umask)
710 .ok_or_else(|| ProcessTableError::no_such_process(pid))
711 }
712
713 pub fn set_umask(&self, pid: u32, umask: u32) -> ProcessResult<u32> {
714 let mut state = self.inner.lock_state();
715 let record = state
716 .entries
717 .get_mut(&pid)
718 .ok_or_else(|| ProcessTableError::no_such_process(pid))?;
719 let previous = record.entry.umask;
720 record.entry.umask = umask & 0o777;
721 Ok(previous)
722 }
723
724 pub fn has_process_group(&self, pgid: u32) -> bool {
725 self.inner
726 .lock_state()
727 .entries
728 .values()
729 .any(|record| record.entry.pgid == pgid && record.entry.status != ProcessStatus::Exited)
730 }
731
732 pub fn list_processes(&self) -> BTreeMap<u32, ProcessInfo> {
733 self.inner
734 .lock_state()
735 .entries
736 .values()
737 .map(|record| (record.entry.pid, to_process_info(&record.entry)))
738 .collect()
739 }
740
741 pub fn terminate_all(&self) {
742 let running = {
743 let mut state = self.inner.lock_state();
744 state.terminating_all = true;
745 self.inner.reaper.clear();
746 state
747 .entries
748 .values()
749 .filter(|record| record.entry.status == ProcessStatus::Running)
750 .map(|record| (record.entry.pid, Arc::clone(&record.driver_process)))
751 .collect::<Vec<_>>()
752 };
753
754 for (_, driver) in &running {
755 driver.kill(SIGTERM);
756 }
757 for (pid, driver) in &running {
758 if let Some(exit_code) = driver.wait(Duration::from_secs(1)) {
759 self.mark_exited(*pid, exit_code);
760 }
761 }
762
763 let survivors = {
764 let state = self.inner.lock_state();
765 running
766 .iter()
767 .filter(|(pid, _)| {
768 state
769 .entries
770 .get(pid)
771 .map(|record| record.entry.status == ProcessStatus::Running)
772 .unwrap_or(false)
773 })
774 .cloned()
775 .collect::<Vec<_>>()
776 };
777
778 for (_, driver) in &survivors {
779 driver.kill(SIGKILL);
780 }
781 for (pid, driver) in &survivors {
782 if let Some(exit_code) = driver.wait(Duration::from_millis(500)) {
783 self.mark_exited(*pid, exit_code);
784 }
785 }
786
787 self.inner.lock_state().terminating_all = false;
788 }
789
790 pub fn sigprocmask(
791 &self,
792 pid: u32,
793 how: SigmaskHow,
794 set: SignalSet,
795 ) -> ProcessResult<SignalSet> {
796 let (previous, deliveries) = {
797 let mut state = self.inner.lock_state();
798 let record = state
799 .entries
800 .get_mut(&pid)
801 .ok_or_else(|| ProcessTableError::no_such_process(pid))?;
802 let previous = record.blocked_signals;
803 record.blocked_signals = match how {
804 SigmaskHow::Block => previous.union(set),
805 SigmaskHow::Unblock => previous.difference(set),
806 SigmaskHow::SetMask => set,
807 };
808
809 let unblocked_pending = record.pending_signals.difference(record.blocked_signals);
810 let deliveries = collect_pending_signal_deliveries(record, unblocked_pending)?;
811 (previous, deliveries)
812 };
813
814 deliver_signals(&self.inner, deliveries);
815 Ok(previous)
816 }
817
818 pub fn sigpending(&self, pid: u32) -> ProcessResult<SignalSet> {
819 self.inner
820 .lock_state()
821 .entries
822 .get(&pid)
823 .map(|record| record.pending_signals)
824 .ok_or_else(|| ProcessTableError::no_such_process(pid))
825 }
826}
827
828fn to_process_info(entry: &ProcessEntry) -> ProcessInfo {
829 ProcessInfo {
830 pid: entry.pid,
831 ppid: entry.ppid,
832 pgid: entry.pgid,
833 sid: entry.sid,
834 driver: entry.driver.clone(),
835 command: entry.command.clone(),
836 status: entry.status,
837 exit_code: entry.exit_code,
838 identity: entry.identity.clone(),
839 }
840}
841
842fn mark_exited_inner(inner: &Arc<ProcessTableInner>, pid: u32, exit_code: i32) {
843 let (callback, zombie_ttl, should_schedule, deliveries) = {
844 let mut state = inner.lock_state();
845 let (ppid, pgid) = {
846 let Some(record) = state.entries.get_mut(&pid) else {
847 return;
848 };
849
850 if record.entry.status == ProcessStatus::Exited {
851 return;
852 }
853
854 record.entry.status = ProcessStatus::Exited;
855 record.entry.exit_code = Some(exit_code);
856 record.entry.exit_time_ms = Some(now_ms());
857 let ppid = record.entry.ppid;
858 let pgid = record.entry.pgid;
859 (ppid, pgid)
860 };
861 let mut affected_pgids = BTreeSet::from([pgid]);
862 reparent_children_to_init(&mut state, pid, &mut affected_pgids);
863
864 let orphaned_group_targets = collect_orphaned_group_signal_targets(&state, &affected_pgids);
865
866 let should_schedule = !state.terminating_all;
867 let mut deliveries = Vec::new();
868 if should_schedule {
869 if let Some(parent) = state
870 .entries
871 .get_mut(&ppid)
872 .filter(|parent| parent.entry.status == ProcessStatus::Running)
873 {
874 if let Some(delivery) =
875 queue_or_schedule_signal(parent, SIGCHLD).expect("SIGCHLD should be valid")
876 {
877 deliveries.push(delivery);
878 }
879 }
880 }
881
882 for target_pid in orphaned_group_targets {
883 if let Some(record) = state.entries.get_mut(&target_pid) {
884 if let Some(delivery) =
885 queue_or_schedule_signal(record, SIGHUP).expect("SIGHUP should be valid")
886 {
887 deliveries.push(delivery);
888 }
889 if let Some(delivery) =
890 queue_or_schedule_signal(record, SIGCONT).expect("SIGCONT should be valid")
891 {
892 deliveries.push(delivery);
893 }
894 }
895 }
896
897 (
898 state.on_process_exit.clone(),
899 state.zombie_ttl,
900 should_schedule,
901 deliveries,
902 )
903 };
904
905 if should_schedule {
906 inner.reaper.schedule(pid, zombie_ttl);
907 } else {
908 inner.reaper.cancel(pid);
909 }
910
911 deliver_signals(inner, deliveries);
912
913 if let Some(on_process_exit) = callback {
914 on_process_exit(pid);
915 }
916
917 inner.waiters.notify_all();
918}
919
920fn reparent_children_to_init(
921 state: &mut ProcessTableState,
922 exiting_pid: u32,
923 affected_pgids: &mut BTreeSet<u32>,
924) {
925 let new_parent = reparent_target_pid(state, exiting_pid);
926 for record in state.entries.values_mut() {
927 if record.entry.ppid != exiting_pid {
928 continue;
929 }
930 record.entry.ppid = new_parent;
931 affected_pgids.insert(record.entry.pgid);
932 }
933}
934
935fn reparent_target_pid(state: &ProcessTableState, exiting_pid: u32) -> u32 {
936 if exiting_pid != INIT_PID
937 && state
938 .entries
939 .get(&INIT_PID)
940 .map(|record| record.entry.status != ProcessStatus::Exited)
941 .unwrap_or(false)
942 {
943 INIT_PID
944 } else {
945 0
946 }
947}
948
949fn collect_orphaned_group_signal_targets(
950 state: &ProcessTableState,
951 candidate_pgids: &BTreeSet<u32>,
952) -> Vec<u32> {
953 let mut targets = Vec::new();
954 for &pgid in candidate_pgids {
955 if !process_group_is_orphaned(state, pgid) || !process_group_has_stopped_member(state, pgid)
956 {
957 continue;
958 }
959
960 for record in state.entries.values() {
961 if record.entry.pgid == pgid && record.entry.status != ProcessStatus::Exited {
962 targets.push(record.entry.pid);
963 }
964 }
965 }
966 targets
967}
968
969fn process_group_is_orphaned(state: &ProcessTableState, pgid: u32) -> bool {
970 let mut has_member = false;
971 for record in state.entries.values() {
972 if record.entry.pgid != pgid || record.entry.status == ProcessStatus::Exited {
973 continue;
974 }
975 has_member = true;
976 if has_parent_outside_group_in_same_session(state, &record.entry) {
977 return false;
978 }
979 }
980
981 has_member
982}
983
984fn has_parent_outside_group_in_same_session(
985 state: &ProcessTableState,
986 entry: &ProcessEntry,
987) -> bool {
988 match entry.ppid {
989 0 | INIT_PID => false,
990 ppid => state
991 .entries
992 .get(&ppid)
993 .map(|parent| {
994 parent.entry.status != ProcessStatus::Exited
995 && parent.entry.sid == entry.sid
996 && parent.entry.pgid != entry.pgid
997 })
998 .unwrap_or(false),
999 }
1000}
1001
1002fn process_group_has_stopped_member(state: &ProcessTableState, pgid: u32) -> bool {
1003 state
1004 .entries
1005 .values()
1006 .any(|record| record.entry.pgid == pgid && record.entry.status == ProcessStatus::Stopped)
1007}
1008
1009fn mark_wait_event_inner(
1010 inner: &Arc<ProcessTableInner>,
1011 pid: u32,
1012 next_status: ProcessStatus,
1013 event: PendingWaitEvent,
1014) {
1015 let deliveries = {
1016 let mut state = inner.lock_state();
1017 let ppid = {
1018 let Some(record) = state.entries.get_mut(&pid) else {
1019 return;
1020 };
1021
1022 if record.entry.status == ProcessStatus::Exited || record.entry.status == next_status {
1023 return;
1024 }
1025
1026 record.entry.status = next_status;
1027 record.pending_wait_events.push_back(event);
1028 record.entry.ppid
1029 };
1030
1031 state
1032 .entries
1033 .get_mut(&ppid)
1034 .filter(|parent| parent.entry.status == ProcessStatus::Running)
1035 .and_then(|parent| {
1036 queue_or_schedule_signal(parent, SIGCHLD)
1037 .expect("SIGCHLD should be valid")
1038 .into_iter()
1039 .next()
1040 })
1041 .into_iter()
1042 .collect::<Vec<_>>()
1043 };
1044
1045 deliver_signals(inner, deliveries);
1046
1047 inner.waiters.notify_all();
1048}
1049
1050fn signal_bit(signal: i32) -> ProcessResult<u64> {
1051 if !(1..=MAX_SIGNAL).contains(&signal) {
1052 return Err(ProcessTableError::invalid_signal(signal));
1053 }
1054 Ok(1u64 << (signal - 1))
1055}
1056
1057fn normalize_next_pid(pid: u32) -> u32 {
1058 if (INIT_PID..=MAX_ALLOCATED_PID).contains(&pid) {
1059 pid
1060 } else {
1061 INIT_PID
1062 }
1063}
1064
1065fn next_allocated_pid_after(pid: u32) -> u32 {
1066 if pid >= MAX_ALLOCATED_PID {
1067 INIT_PID
1068 } else {
1069 pid + 1
1070 }
1071}
1072
1073fn next_pid_after_registered(current: u32, registered: u32) -> u32 {
1074 let current = normalize_next_pid(current);
1075 if !(INIT_PID..=MAX_ALLOCATED_PID).contains(®istered) {
1076 return current;
1077 }
1078
1079 if current <= registered {
1080 next_allocated_pid_after(registered)
1081 } else {
1082 current
1083 }
1084}
1085
1086fn signal_can_be_blocked(signal: i32) -> bool {
1087 !matches!(signal, SIGKILL | SIGSTOP | SIGCONT)
1088}
1089
1090fn queue_or_schedule_signal(
1091 record: &mut ProcessRecord,
1092 signal: i32,
1093) -> ProcessResult<Option<ScheduledSignalDelivery>> {
1094 if signal_can_be_blocked(signal) && record.blocked_signals.contains(signal) {
1095 record.pending_signals.insert(signal)?;
1096 return Ok(None);
1097 }
1098
1099 Ok(Some(ScheduledSignalDelivery {
1100 pid: record.entry.pid,
1101 signal,
1102 status: record.entry.status,
1103 driver_process: Arc::clone(&record.driver_process),
1104 }))
1105}
1106
1107fn collect_signal_deliveries(
1108 state: &mut ProcessTableState,
1109 target_pids: &[u32],
1110 signal: i32,
1111) -> ProcessResult<Vec<ScheduledSignalDelivery>> {
1112 let mut deliveries = Vec::new();
1113 for pid in target_pids {
1114 let Some(record) = state.entries.get_mut(pid) else {
1115 continue;
1116 };
1117 if let Some(delivery) = queue_or_schedule_signal(record, signal)? {
1118 deliveries.push(delivery);
1119 }
1120 }
1121 Ok(deliveries)
1122}
1123
1124fn collect_pending_signal_deliveries(
1125 record: &mut ProcessRecord,
1126 signals: SignalSet,
1127) -> ProcessResult<Vec<ScheduledSignalDelivery>> {
1128 let mut deliveries = Vec::new();
1129 for signal in signals.signals() {
1130 record.pending_signals.remove(signal)?;
1131 deliveries.push(ScheduledSignalDelivery {
1132 pid: record.entry.pid,
1133 signal,
1134 status: record.entry.status,
1135 driver_process: Arc::clone(&record.driver_process),
1136 });
1137 }
1138 Ok(deliveries)
1139}
1140
1141fn deliver_signals(inner: &Arc<ProcessTableInner>, deliveries: Vec<ScheduledSignalDelivery>) {
1142 let mut stopped = Vec::new();
1143 let mut continued = Vec::new();
1144
1145 for delivery in &deliveries {
1146 match delivery.signal {
1147 SIGSTOP | SIGTSTP if delivery.status == ProcessStatus::Running => {
1148 stopped.push((delivery.pid, delivery.signal))
1149 }
1150 SIGCONT if delivery.status == ProcessStatus::Stopped => continued.push(delivery.pid),
1151 _ => {}
1152 }
1153 delivery.driver_process.kill(delivery.signal);
1154 }
1155
1156 for (pid, signal) in stopped {
1157 mark_wait_event_inner(
1158 inner,
1159 pid,
1160 ProcessStatus::Stopped,
1161 PendingWaitEvent {
1162 status: signal,
1163 event: ProcessWaitEvent::Stopped,
1164 },
1165 );
1166 }
1167 for pid in continued {
1168 mark_wait_event_inner(
1169 inner,
1170 pid,
1171 ProcessStatus::Running,
1172 PendingWaitEvent {
1173 status: SIGCONT,
1174 event: ProcessWaitEvent::Continued,
1175 },
1176 );
1177 }
1178}
1179
1180fn resolve_wait_selector(
1181 state: &ProcessTableState,
1182 waiter_pid: u32,
1183 pid: i32,
1184) -> ProcessResult<WaitSelector> {
1185 let waiter = state
1186 .entries
1187 .get(&waiter_pid)
1188 .ok_or_else(|| ProcessTableError::no_such_process(waiter_pid))?;
1189
1190 Ok(match pid {
1191 -1 => WaitSelector::AnyChild,
1192 0 => WaitSelector::ProcessGroup(waiter.entry.pgid),
1193 p if p < -1 => WaitSelector::ProcessGroup(p.unsigned_abs()),
1194 p => WaitSelector::ChildPid(p as u32),
1195 })
1196}
1197
1198fn matching_child_pids(
1199 state: &ProcessTableState,
1200 waiter_pid: u32,
1201 selector: WaitSelector,
1202) -> Vec<u32> {
1203 state
1204 .entries
1205 .values()
1206 .filter(|record| record.entry.ppid == waiter_pid)
1207 .filter(|record| match selector {
1208 WaitSelector::AnyChild => true,
1209 WaitSelector::ChildPid(pid) => record.entry.pid == pid,
1210 WaitSelector::ProcessGroup(pgid) => record.entry.pgid == pgid,
1211 })
1212 .map(|record| record.entry.pid)
1213 .collect()
1214}
1215
1216fn take_waitable_event(
1217 state: &mut ProcessTableState,
1218 matching_children: &[u32],
1219 flags: WaitPidFlags,
1220) -> Option<ProcessWaitResult> {
1221 for child_pid in matching_children {
1222 let mut non_exit_result = None;
1223 let mut should_reap = false;
1224 {
1225 let record = state.entries.get_mut(child_pid)?;
1226 if let Some(index) = record
1227 .pending_wait_events
1228 .iter()
1229 .position(|event| is_waitable_event(event.event, flags))
1230 {
1231 let event = record
1232 .pending_wait_events
1233 .remove(index)
1234 .expect("pending wait event should exist");
1235 non_exit_result = Some(ProcessWaitResult {
1236 pid: *child_pid,
1237 status: event.status,
1238 event: event.event,
1239 });
1240 } else if record.entry.status == ProcessStatus::Exited {
1241 should_reap = true;
1242 }
1243 }
1244
1245 if let Some(result) = non_exit_result {
1246 return Some(result);
1247 }
1248
1249 if should_reap {
1250 let record = state
1251 .entries
1252 .remove(child_pid)
1253 .expect("exited child should still exist");
1254 return Some(ProcessWaitResult {
1255 pid: *child_pid,
1256 status: record.entry.exit_code.unwrap_or_default(),
1257 event: ProcessWaitEvent::Exited,
1258 });
1259 }
1260 }
1261
1262 None
1263}
1264
1265fn is_waitable_event(event: ProcessWaitEvent, flags: WaitPidFlags) -> bool {
1266 match event {
1267 ProcessWaitEvent::Exited => true,
1268 ProcessWaitEvent::Stopped => flags.contains(WaitPidFlags::WUNTRACED),
1269 ProcessWaitEvent::Continued => flags.contains(WaitPidFlags::WCONTINUED),
1270 }
1271}
1272
1273fn start_zombie_reaper(inner: Weak<ProcessTableInner>, reaper: Arc<ZombieReaper>) {
1274 reaper.thread_spawns.fetch_add(1, Ordering::SeqCst);
1275 thread::spawn(move || loop {
1276 let Some(pid) = reaper.take_next_due_pid() else {
1277 return;
1278 };
1279
1280 let Some(inner) = inner.upgrade() else {
1281 return;
1282 };
1283
1284 let mut state = inner.lock_state();
1285 let should_reap = state
1286 .entries
1287 .get(&pid)
1288 .map(|record| {
1289 record.entry.status == ProcessStatus::Exited
1290 && !has_living_parent(&state, record.entry.ppid)
1291 })
1292 .unwrap_or(false);
1293 if should_reap {
1294 state.entries.remove(&pid);
1295 } else if state
1296 .entries
1297 .get(&pid)
1298 .map(|record| record.entry.status == ProcessStatus::Exited)
1299 .unwrap_or(false)
1300 {
1301 reaper.schedule(pid, state.zombie_ttl);
1302 }
1303 drop(state);
1304 inner.waiters.notify_all();
1305 });
1306}
1307
1308fn has_living_parent(state: &ProcessTableState, ppid: u32) -> bool {
1309 ppid != 0
1310 && state
1311 .entries
1312 .get(&ppid)
1313 .map(|record| record.entry.status != ProcessStatus::Exited)
1314 .unwrap_or(false)
1315}
1316
1317impl ProcessTableInner {
1318 fn lock_state(&self) -> MutexGuard<'_, ProcessTableState> {
1319 lock_or_recover(&self.state)
1320 }
1321
1322 fn wait_for_state<'a>(
1323 &self,
1324 guard: MutexGuard<'a, ProcessTableState>,
1325 ) -> MutexGuard<'a, ProcessTableState> {
1326 wait_or_recover(&self.waiters, guard)
1327 }
1328}
1329
1330fn now_ms() -> u64 {
1331 SystemTime::now()
1332 .duration_since(UNIX_EPOCH)
1333 .unwrap_or_default()
1334 .as_millis() as u64
1335}
1336
1337impl Default for ZombieReaper {
1338 fn default() -> Self {
1339 Self {
1340 state: Mutex::new(ZombieReaperState::default()),
1341 wake: Condvar::new(),
1342 thread_spawns: AtomicUsize::new(0),
1343 }
1344 }
1345}
1346
1347impl ZombieReaper {
1348 fn schedule(&self, pid: u32, ttl: Duration) {
1349 let mut state = lock_or_recover(&self.state);
1350 state.deadlines.insert(pid, Instant::now() + ttl);
1351 drop(state);
1352 self.wake.notify_all();
1353 }
1354
1355 fn cancel(&self, pid: u32) {
1356 let mut state = lock_or_recover(&self.state);
1357 let removed = state.deadlines.remove(&pid).is_some();
1358 drop(state);
1359 if removed {
1360 self.wake.notify_all();
1361 }
1362 }
1363
1364 fn clear(&self) {
1365 let mut state = lock_or_recover(&self.state);
1366 let changed = !state.deadlines.is_empty();
1367 state.deadlines.clear();
1368 drop(state);
1369 if changed {
1370 self.wake.notify_all();
1371 }
1372 }
1373
1374 fn shutdown(&self) {
1375 let mut state = lock_or_recover(&self.state);
1376 state.shutdown = true;
1377 drop(state);
1378 self.wake.notify_all();
1379 }
1380
1381 fn scheduled_count(&self) -> usize {
1382 lock_or_recover(&self.state).deadlines.len()
1383 }
1384
1385 fn thread_spawn_count(&self) -> usize {
1386 self.thread_spawns.load(Ordering::SeqCst)
1387 }
1388
1389 fn take_next_due_pid(&self) -> Option<u32> {
1390 let mut state = lock_or_recover(&self.state);
1391 loop {
1392 if state.shutdown {
1393 return None;
1394 }
1395
1396 let Some((pid, deadline)) = state
1397 .deadlines
1398 .iter()
1399 .min_by_key(|(_, deadline)| **deadline)
1400 .map(|(&pid, &deadline)| (pid, deadline))
1401 else {
1402 state = wait_or_recover(&self.wake, state);
1403 continue;
1404 };
1405
1406 let now = Instant::now();
1407 if deadline <= now {
1408 state.deadlines.remove(&pid);
1409 return Some(pid);
1410 }
1411
1412 let timeout = deadline.saturating_duration_since(now);
1413 let (next_state, _) = wait_timeout_or_recover(&self.wake, state, timeout);
1414 state = next_state;
1415 }
1416 }
1417}
1418
1419impl Drop for ProcessTableInner {
1420 fn drop(&mut self) {
1421 self.reaper.shutdown();
1422 }
1423}
1424
1425fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>) -> MutexGuard<'a, T> {
1426 match mutex.lock() {
1427 Ok(guard) => guard,
1428 Err(poisoned) => poisoned.into_inner(),
1429 }
1430}
1431
1432fn wait_or_recover<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
1433 match condvar.wait(guard) {
1434 Ok(guard) => guard,
1435 Err(poisoned) => poisoned.into_inner(),
1436 }
1437}
1438
1439fn wait_timeout_or_recover<'a, T>(
1440 condvar: &Condvar,
1441 guard: MutexGuard<'a, T>,
1442 timeout: Duration,
1443) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
1444 match condvar.wait_timeout(guard, timeout) {
1445 Ok(result) => result,
1446 Err(poisoned) => poisoned.into_inner(),
1447 }
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452 use super::*;
1453
1454 #[derive(Default)]
1455 struct TestDriverProcess {
1456 on_exit: Mutex<Option<ProcessExitCallback>>,
1457 }
1458
1459 impl TestDriverProcess {
1460 fn exit(&self, exit_code: i32) {
1461 let callback = self
1462 .on_exit
1463 .lock()
1464 .expect("test driver lock poisoned")
1465 .clone();
1466 if let Some(callback) = callback {
1467 callback(exit_code);
1468 }
1469 }
1470 }
1471
1472 impl DriverProcess for TestDriverProcess {
1473 fn kill(&self, _signal: i32) {}
1474
1475 fn wait(&self, _timeout: Duration) -> Option<i32> {
1476 None
1477 }
1478
1479 fn set_on_exit(&self, callback: ProcessExitCallback) {
1480 *self.on_exit.lock().expect("test driver lock poisoned") = Some(callback);
1481 }
1482 }
1483
1484 fn context(ppid: u32) -> ProcessContext {
1485 ProcessContext {
1486 ppid,
1487 ..ProcessContext::default()
1488 }
1489 }
1490
1491 #[test]
1492 fn allocate_pid_wraps_without_reusing_live_or_zombie_processes() {
1493 let table = ProcessTable::with_zombie_ttl(Duration::from_secs(3600));
1494 let live_high = Arc::new(TestDriverProcess::default());
1495 let zombie_high = Arc::new(TestDriverProcess::default());
1496 let live_one = Arc::new(TestDriverProcess::default());
1497 let max_pid = MAX_ALLOCATED_PID;
1498
1499 table.register(
1500 max_pid - 1,
1501 "test",
1502 "live-high",
1503 Vec::new(),
1504 context(0),
1505 live_high,
1506 );
1507 table.register(
1508 max_pid,
1509 "test",
1510 "zombie-high",
1511 Vec::new(),
1512 context(0),
1513 zombie_high.clone(),
1514 );
1515 table.register(1, "test", "live-one", Vec::new(), context(0), live_one);
1516 zombie_high.exit(0);
1517
1518 table.inner.lock_state().next_pid = max_pid - 1;
1519
1520 assert_eq!(table.allocate_pid().expect("allocate pid"), 2);
1521 assert_eq!(table.allocate_pid().expect("allocate pid"), 3);
1522 }
1523}