Skip to main content

secure_exec_kernel/
process_table.rs

1use crate::user::ProcessIdentity;
2use std::collections::{BTreeMap, BTreeSet, VecDeque};
3use std::error::Error;
4use std::fmt;
5use std::ops::{BitOr, BitOrAssign};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Condvar, Mutex, MutexGuard, WaitTimeoutResult, Weak};
8use std::thread;
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11const ZOMBIE_TTL: Duration = Duration::from_secs(60);
12const INIT_PID: u32 = 1;
13const MAX_ALLOCATED_PID: u32 = i32::MAX as u32;
14pub const DEFAULT_PROCESS_UMASK: u32 = 0o022;
15pub const SIGHUP: i32 = 1;
16pub const SIGCHLD: i32 = 17;
17pub const SIGCONT: i32 = 18;
18pub const SIGSTOP: i32 = 19;
19pub const SIGTSTP: i32 = 20;
20pub const SIGTERM: i32 = 15;
21pub const SIGKILL: i32 = 9;
22pub const SIGPIPE: i32 = 13;
23pub const SIGWINCH: i32 = 28;
24const MAX_SIGNAL: i32 = 64;
25
26pub type ProcessResult<T> = Result<T, ProcessTableError>;
27pub type ProcessExitCallback = Arc<dyn Fn(i32) + Send + Sync + 'static>;
28
29pub trait DriverProcess: Send + Sync {
30    fn kill(&self, signal: i32);
31    fn wait(&self, timeout: Duration) -> Option<i32>;
32    fn set_on_exit(&self, callback: ProcessExitCallback);
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct ProcessTableError {
37    code: &'static str,
38    message: String,
39}
40
41impl ProcessTableError {
42    pub fn code(&self) -> &'static str {
43        self.code
44    }
45
46    fn invalid_signal(signal: i32) -> Self {
47        Self {
48            code: "EINVAL",
49            message: format!("invalid signal {signal}"),
50        }
51    }
52
53    fn no_such_process(pid: u32) -> Self {
54        Self {
55            code: "ESRCH",
56            message: format!("no such process {pid}"),
57        }
58    }
59
60    fn no_such_process_group(pgid: u32) -> Self {
61        Self {
62            code: "ESRCH",
63            message: format!("no such process group {pgid}"),
64        }
65    }
66
67    fn no_matching_child(waiter_pid: u32, pid: i32) -> Self {
68        Self {
69            code: "ECHILD",
70            message: format!("process {waiter_pid} has no matching child for waitpid({pid})"),
71        }
72    }
73
74    fn pid_space_exhausted() -> Self {
75        Self {
76            code: "EAGAIN",
77            message: String::from("process id space exhausted"),
78        }
79    }
80
81    fn permission_denied(message: impl Into<String>) -> Self {
82        Self {
83            code: "EPERM",
84            message: message.into(),
85        }
86    }
87}
88
89impl fmt::Display for ProcessTableError {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        write!(f, "{}: {}", self.code, self.message)
92    }
93}
94
95impl Error for ProcessTableError {}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ProcessStatus {
99    Running,
100    Stopped,
101    Exited,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
105pub struct SignalSet {
106    bits: u64,
107}
108
109impl SignalSet {
110    pub const fn empty() -> Self {
111        Self { bits: 0 }
112    }
113
114    pub const fn is_empty(self) -> bool {
115        self.bits == 0
116    }
117
118    pub fn from_signal(signal: i32) -> ProcessResult<Self> {
119        Ok(Self {
120            bits: signal_bit(signal)?,
121        })
122    }
123
124    pub fn from_signals(signals: impl IntoIterator<Item = i32>) -> ProcessResult<Self> {
125        let mut set = Self::empty();
126        for signal in signals {
127            set.insert(signal)?;
128        }
129        Ok(set)
130    }
131
132    pub fn contains(self, signal: i32) -> bool {
133        signal_bit(signal)
134            .map(|bit| self.bits & bit != 0)
135            .unwrap_or(false)
136    }
137
138    pub fn insert(&mut self, signal: i32) -> ProcessResult<()> {
139        self.bits |= signal_bit(signal)?;
140        Ok(())
141    }
142
143    pub fn remove(&mut self, signal: i32) -> ProcessResult<()> {
144        self.bits &= !signal_bit(signal)?;
145        Ok(())
146    }
147
148    pub fn union(self, other: Self) -> Self {
149        Self {
150            bits: self.bits | other.bits,
151        }
152    }
153
154    pub fn difference(self, other: Self) -> Self {
155        Self {
156            bits: self.bits & !other.bits,
157        }
158    }
159
160    pub fn signals(self) -> Vec<i32> {
161        let mut signals = Vec::new();
162        for signal in 1..=MAX_SIGNAL {
163            if self.contains(signal) {
164                signals.push(signal);
165            }
166        }
167        signals
168    }
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum SigmaskHow {
173    Block,
174    Unblock,
175    SetMask,
176}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
179pub struct WaitPidFlags {
180    bits: u32,
181}
182
183impl WaitPidFlags {
184    pub const WNOHANG: Self = Self { bits: 1 << 0 };
185    pub const WUNTRACED: Self = Self { bits: 1 << 1 };
186    pub const WCONTINUED: Self = Self { bits: 1 << 2 };
187
188    pub const fn empty() -> Self {
189        Self { bits: 0 }
190    }
191
192    pub const fn contains(self, other: Self) -> bool {
193        (self.bits & other.bits) == other.bits
194    }
195}
196
197impl Default for WaitPidFlags {
198    fn default() -> Self {
199        Self::empty()
200    }
201}
202
203impl BitOr for WaitPidFlags {
204    type Output = Self;
205
206    fn bitor(self, rhs: Self) -> Self::Output {
207        Self {
208            bits: self.bits | rhs.bits,
209        }
210    }
211}
212
213impl BitOrAssign for WaitPidFlags {
214    fn bitor_assign(&mut self, rhs: Self) {
215        self.bits |= rhs.bits;
216    }
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum ProcessWaitEvent {
221    Exited,
222    Stopped,
223    Continued,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct ProcessWaitResult {
228    pub pid: u32,
229    pub status: i32,
230    pub event: ProcessWaitEvent,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct ProcessFileDescriptors {
235    pub stdin: u32,
236    pub stdout: u32,
237    pub stderr: u32,
238}
239
240impl Default for ProcessFileDescriptors {
241    fn default() -> Self {
242        Self {
243            stdin: 0,
244            stdout: 1,
245            stderr: 2,
246        }
247    }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct ProcessContext {
252    pub pid: u32,
253    pub ppid: u32,
254    pub env: BTreeMap<String, String>,
255    pub cwd: String,
256    pub umask: u32,
257    pub fds: ProcessFileDescriptors,
258    pub identity: ProcessIdentity,
259    pub blocked_signals: SignalSet,
260    pub pending_signals: SignalSet,
261}
262
263impl Default for ProcessContext {
264    fn default() -> Self {
265        Self {
266            pid: 0,
267            ppid: 0,
268            env: BTreeMap::new(),
269            cwd: String::from("/"),
270            umask: DEFAULT_PROCESS_UMASK,
271            fds: ProcessFileDescriptors::default(),
272            identity: ProcessIdentity::default(),
273            blocked_signals: SignalSet::empty(),
274            pending_signals: SignalSet::empty(),
275        }
276    }
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct ProcessEntry {
281    pub pid: u32,
282    pub ppid: u32,
283    pub pgid: u32,
284    pub sid: u32,
285    pub driver: String,
286    pub command: String,
287    pub args: Vec<String>,
288    pub status: ProcessStatus,
289    pub exit_code: Option<i32>,
290    pub exit_time_ms: Option<u64>,
291    pub env: BTreeMap<String, String>,
292    pub cwd: String,
293    pub umask: u32,
294    pub identity: ProcessIdentity,
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
298pub struct ProcessInfo {
299    pub pid: u32,
300    pub ppid: u32,
301    pub pgid: u32,
302    pub sid: u32,
303    pub driver: String,
304    pub command: String,
305    pub status: ProcessStatus,
306    pub exit_code: Option<i32>,
307    pub identity: ProcessIdentity,
308}
309
310#[derive(Clone)]
311pub struct ProcessTable {
312    inner: Arc<ProcessTableInner>,
313}
314
315struct ProcessTableInner {
316    state: Mutex<ProcessTableState>,
317    waiters: Condvar,
318    reaper: Arc<ZombieReaper>,
319}
320
321struct ProcessRecord {
322    entry: ProcessEntry,
323    driver_process: Arc<dyn DriverProcess>,
324    pending_wait_events: VecDeque<PendingWaitEvent>,
325    blocked_signals: SignalSet,
326    pending_signals: SignalSet,
327}
328
329struct ScheduledSignalDelivery {
330    pid: u32,
331    signal: i32,
332    status: ProcessStatus,
333    driver_process: Arc<dyn DriverProcess>,
334}
335
336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337struct PendingWaitEvent {
338    status: i32,
339    event: ProcessWaitEvent,
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343enum WaitSelector {
344    AnyChild,
345    ChildPid(u32),
346    ProcessGroup(u32),
347}
348
349struct ZombieReaper {
350    state: Mutex<ZombieReaperState>,
351    wake: Condvar,
352    thread_spawns: AtomicUsize,
353}
354
355#[derive(Default)]
356struct ZombieReaperState {
357    deadlines: BTreeMap<u32, Instant>,
358    shutdown: bool,
359}
360
361struct ProcessTableState {
362    entries: BTreeMap<u32, ProcessRecord>,
363    next_pid: u32,
364    zombie_ttl: Duration,
365    on_process_exit: Option<Arc<dyn Fn(u32) + Send + Sync + 'static>>,
366    terminating_all: bool,
367}
368
369impl Default for ProcessTableState {
370    fn default() -> Self {
371        Self {
372            entries: BTreeMap::new(),
373            next_pid: 1,
374            zombie_ttl: ZOMBIE_TTL,
375            on_process_exit: None,
376            terminating_all: false,
377        }
378    }
379}
380
381impl Default for ProcessTable {
382    fn default() -> Self {
383        let reaper = Arc::new(ZombieReaper::default());
384        Self {
385            inner: {
386                let inner = Arc::new(ProcessTableInner {
387                    state: Mutex::new(ProcessTableState::default()),
388                    waiters: Condvar::new(),
389                    reaper,
390                });
391                start_zombie_reaper(Arc::downgrade(&inner), Arc::clone(&inner.reaper));
392                inner
393            },
394        }
395    }
396}
397
398impl ProcessTable {
399    pub fn new() -> Self {
400        Self::default()
401    }
402
403    pub fn with_zombie_ttl(zombie_ttl: Duration) -> Self {
404        let table = Self::new();
405        table.inner.lock_state().zombie_ttl = zombie_ttl;
406        table
407    }
408
409    pub fn allocate_pid(&self) -> ProcessResult<u32> {
410        let mut state = self.inner.lock_state();
411        let start = normalize_next_pid(state.next_pid);
412        let mut pid = start;
413
414        loop {
415            if !state.entries.contains_key(&pid) {
416                state.next_pid = next_allocated_pid_after(pid);
417                return Ok(pid);
418            }
419
420            pid = next_allocated_pid_after(pid);
421            if pid == start {
422                return Err(ProcessTableError::pid_space_exhausted());
423            }
424        }
425    }
426
427    pub fn set_on_process_exit(&self, callback: Option<Arc<dyn Fn(u32) + Send + Sync + 'static>>) {
428        self.inner.lock_state().on_process_exit = callback;
429    }
430
431    pub fn register(
432        &self,
433        pid: u32,
434        driver: impl Into<String>,
435        command: impl Into<String>,
436        args: Vec<String>,
437        ctx: ProcessContext,
438        driver_process: Arc<dyn DriverProcess>,
439    ) -> ProcessEntry {
440        let (pgid, sid) = {
441            let state = self.inner.lock_state();
442            match state.entries.get(&ctx.ppid) {
443                Some(parent) => (parent.entry.pgid, parent.entry.sid),
444                None => (pid, pid),
445            }
446        };
447
448        let entry = ProcessEntry {
449            pid,
450            ppid: ctx.ppid,
451            pgid,
452            sid,
453            driver: driver.into(),
454            command: command.into(),
455            args,
456            status: ProcessStatus::Running,
457            exit_code: None,
458            exit_time_ms: None,
459            env: ctx.env,
460            cwd: ctx.cwd,
461            umask: ctx.umask & 0o777,
462            identity: ctx.identity,
463        };
464
465        let weak = Arc::downgrade(&self.inner);
466        driver_process.set_on_exit(Arc::new(move |code| {
467            if let Some(inner) = weak.upgrade() {
468                mark_exited_inner(&inner, pid, code);
469            }
470        }));
471
472        let mut state = self.inner.lock_state();
473        state.next_pid = next_pid_after_registered(state.next_pid, pid);
474        state.entries.insert(
475            pid,
476            ProcessRecord {
477                entry: entry.clone(),
478                driver_process,
479                pending_wait_events: VecDeque::new(),
480                blocked_signals: ctx.blocked_signals,
481                pending_signals: ctx.pending_signals,
482            },
483        );
484
485        entry
486    }
487
488    pub fn get(&self, pid: u32) -> Option<ProcessEntry> {
489        self.inner
490            .lock_state()
491            .entries
492            .get(&pid)
493            .map(|record| record.entry.clone())
494    }
495
496    pub fn zombie_timer_count(&self) -> usize {
497        self.inner.reaper.scheduled_count()
498    }
499
500    pub fn zombie_reaper_thread_spawn_count(&self) -> usize {
501        self.inner.reaper.thread_spawn_count()
502    }
503
504    pub fn running_count(&self) -> usize {
505        self.inner
506            .lock_state()
507            .entries
508            .values()
509            .filter(|record| record.entry.status == ProcessStatus::Running)
510            .count()
511    }
512
513    pub fn mark_exited(&self, pid: u32, exit_code: i32) {
514        mark_exited_inner(&self.inner, pid, exit_code);
515    }
516
517    pub fn mark_stopped(&self, pid: u32, signal: i32) {
518        mark_wait_event_inner(
519            &self.inner,
520            pid,
521            ProcessStatus::Stopped,
522            PendingWaitEvent {
523                status: signal,
524                event: ProcessWaitEvent::Stopped,
525            },
526        );
527    }
528
529    pub fn mark_continued(&self, pid: u32) {
530        mark_wait_event_inner(
531            &self.inner,
532            pid,
533            ProcessStatus::Running,
534            PendingWaitEvent {
535                status: SIGCONT,
536                event: ProcessWaitEvent::Continued,
537            },
538        );
539    }
540
541    pub fn waitpid(&self, pid: u32) -> ProcessResult<(u32, i32)> {
542        let mut state = self.inner.lock_state();
543        loop {
544            let Some(record) = state.entries.get(&pid) else {
545                return Err(ProcessTableError::no_such_process(pid));
546            };
547
548            if record.entry.status == ProcessStatus::Exited {
549                let status = record.entry.exit_code.unwrap_or_default();
550                state.entries.remove(&pid);
551                drop(state);
552                self.inner.reaper.cancel(pid);
553                self.inner.waiters.notify_all();
554                return Ok((pid, status));
555            }
556
557            state = self.inner.wait_for_state(state);
558        }
559    }
560
561    pub fn waitpid_for(
562        &self,
563        waiter_pid: u32,
564        pid: i32,
565        flags: WaitPidFlags,
566    ) -> ProcessResult<Option<ProcessWaitResult>> {
567        let mut state = self.inner.lock_state();
568        loop {
569            let selector = resolve_wait_selector(&state, waiter_pid, pid)?;
570            let matching_children = matching_child_pids(&state, waiter_pid, selector);
571            if matching_children.is_empty() {
572                return Err(ProcessTableError::no_matching_child(waiter_pid, pid));
573            }
574
575            if let Some(result) = take_waitable_event(&mut state, &matching_children, flags) {
576                let should_reap = result.event == ProcessWaitEvent::Exited;
577                drop(state);
578                if should_reap {
579                    self.inner.reaper.cancel(result.pid);
580                    self.inner.waiters.notify_all();
581                }
582                return Ok(Some(result));
583            }
584
585            if flags.contains(WaitPidFlags::WNOHANG) {
586                return Ok(None);
587            }
588
589            state = self.inner.wait_for_state(state);
590        }
591    }
592
593    pub fn kill(&self, pid: i32, signal: i32) -> ProcessResult<()> {
594        if !(0..=MAX_SIGNAL).contains(&signal) {
595            return Err(ProcessTableError::invalid_signal(signal));
596        }
597
598        let deliveries = {
599            let mut state = self.inner.lock_state();
600            if pid < 0 {
601                let pgid = pid.unsigned_abs();
602                let grouped = state
603                    .entries
604                    .values()
605                    .filter(|record| record.entry.pgid == pgid)
606                    .map(|record| record.entry.pid)
607                    .collect::<Vec<_>>();
608                if grouped.is_empty() {
609                    return Err(ProcessTableError::no_such_process_group(pgid));
610                }
611                if signal == 0 {
612                    return Ok(());
613                }
614                collect_signal_deliveries(&mut state, &grouped, signal)?
615            } else {
616                let pid = pid as u32;
617                let Some(record) = state.entries.get(&pid) else {
618                    return Err(ProcessTableError::no_such_process(pid));
619                };
620                if record.entry.status == ProcessStatus::Exited || signal == 0 {
621                    return Ok(());
622                }
623                collect_signal_deliveries(&mut state, &[pid], signal)?
624            }
625        };
626
627        if signal == 0 {
628            return Ok(());
629        }
630
631        deliver_signals(&self.inner, deliveries);
632        Ok(())
633    }
634
635    pub fn setpgid(&self, pid: u32, pgid: u32) -> ProcessResult<()> {
636        let mut state = self.inner.lock_state();
637        let (current_sid, target_pgid) = {
638            let Some(record) = state.entries.get(&pid) else {
639                return Err(ProcessTableError::no_such_process(pid));
640            };
641            (record.entry.sid, if pgid == 0 { pid } else { pgid })
642        };
643
644        if target_pgid != pid {
645            let mut group_exists = false;
646            for record in state.entries.values() {
647                if record.entry.pgid != target_pgid || record.entry.status == ProcessStatus::Exited
648                {
649                    continue;
650                }
651                if record.entry.sid != current_sid {
652                    return Err(ProcessTableError::permission_denied(
653                        "cannot join process group in different session",
654                    ));
655                }
656                group_exists = true;
657                break;
658            }
659            if !group_exists {
660                return Err(ProcessTableError::permission_denied(format!(
661                    "no such process group {target_pgid}"
662                )));
663            }
664        }
665
666        if let Some(record) = state.entries.get_mut(&pid) {
667            record.entry.pgid = target_pgid;
668        }
669        Ok(())
670    }
671
672    pub fn getpgid(&self, pid: u32) -> ProcessResult<u32> {
673        self.get(pid)
674            .map(|entry| entry.pgid)
675            .ok_or_else(|| ProcessTableError::no_such_process(pid))
676    }
677
678    pub fn setsid(&self, pid: u32) -> ProcessResult<u32> {
679        let mut state = self.inner.lock_state();
680        let Some(record) = state.entries.get_mut(&pid) else {
681            return Err(ProcessTableError::no_such_process(pid));
682        };
683
684        if record.entry.pgid == pid {
685            return Err(ProcessTableError::permission_denied(format!(
686                "process {pid} is already a process group leader"
687            )));
688        }
689
690        record.entry.sid = pid;
691        record.entry.pgid = pid;
692        Ok(pid)
693    }
694
695    pub fn getsid(&self, pid: u32) -> ProcessResult<u32> {
696        self.get(pid)
697            .map(|entry| entry.sid)
698            .ok_or_else(|| ProcessTableError::no_such_process(pid))
699    }
700
701    pub fn getppid(&self, pid: u32) -> ProcessResult<u32> {
702        self.get(pid)
703            .map(|entry| entry.ppid)
704            .ok_or_else(|| ProcessTableError::no_such_process(pid))
705    }
706
707    pub fn get_umask(&self, pid: u32) -> ProcessResult<u32> {
708        self.get(pid)
709            .map(|entry| entry.umask)
710            .ok_or_else(|| ProcessTableError::no_such_process(pid))
711    }
712
713    pub fn set_umask(&self, pid: u32, umask: u32) -> ProcessResult<u32> {
714        let mut state = self.inner.lock_state();
715        let record = state
716            .entries
717            .get_mut(&pid)
718            .ok_or_else(|| ProcessTableError::no_such_process(pid))?;
719        let previous = record.entry.umask;
720        record.entry.umask = umask & 0o777;
721        Ok(previous)
722    }
723
724    pub fn has_process_group(&self, pgid: u32) -> bool {
725        self.inner
726            .lock_state()
727            .entries
728            .values()
729            .any(|record| record.entry.pgid == pgid && record.entry.status != ProcessStatus::Exited)
730    }
731
732    pub fn list_processes(&self) -> BTreeMap<u32, ProcessInfo> {
733        self.inner
734            .lock_state()
735            .entries
736            .values()
737            .map(|record| (record.entry.pid, to_process_info(&record.entry)))
738            .collect()
739    }
740
741    pub fn terminate_all(&self) {
742        let running = {
743            let mut state = self.inner.lock_state();
744            state.terminating_all = true;
745            self.inner.reaper.clear();
746            state
747                .entries
748                .values()
749                .filter(|record| record.entry.status == ProcessStatus::Running)
750                .map(|record| (record.entry.pid, Arc::clone(&record.driver_process)))
751                .collect::<Vec<_>>()
752        };
753
754        for (_, driver) in &running {
755            driver.kill(SIGTERM);
756        }
757        for (pid, driver) in &running {
758            if let Some(exit_code) = driver.wait(Duration::from_secs(1)) {
759                self.mark_exited(*pid, exit_code);
760            }
761        }
762
763        let survivors = {
764            let state = self.inner.lock_state();
765            running
766                .iter()
767                .filter(|(pid, _)| {
768                    state
769                        .entries
770                        .get(pid)
771                        .map(|record| record.entry.status == ProcessStatus::Running)
772                        .unwrap_or(false)
773                })
774                .cloned()
775                .collect::<Vec<_>>()
776        };
777
778        for (_, driver) in &survivors {
779            driver.kill(SIGKILL);
780        }
781        for (pid, driver) in &survivors {
782            if let Some(exit_code) = driver.wait(Duration::from_millis(500)) {
783                self.mark_exited(*pid, exit_code);
784            }
785        }
786
787        self.inner.lock_state().terminating_all = false;
788    }
789
790    pub fn sigprocmask(
791        &self,
792        pid: u32,
793        how: SigmaskHow,
794        set: SignalSet,
795    ) -> ProcessResult<SignalSet> {
796        let (previous, deliveries) = {
797            let mut state = self.inner.lock_state();
798            let record = state
799                .entries
800                .get_mut(&pid)
801                .ok_or_else(|| ProcessTableError::no_such_process(pid))?;
802            let previous = record.blocked_signals;
803            record.blocked_signals = match how {
804                SigmaskHow::Block => previous.union(set),
805                SigmaskHow::Unblock => previous.difference(set),
806                SigmaskHow::SetMask => set,
807            };
808
809            let unblocked_pending = record.pending_signals.difference(record.blocked_signals);
810            let deliveries = collect_pending_signal_deliveries(record, unblocked_pending)?;
811            (previous, deliveries)
812        };
813
814        deliver_signals(&self.inner, deliveries);
815        Ok(previous)
816    }
817
818    pub fn sigpending(&self, pid: u32) -> ProcessResult<SignalSet> {
819        self.inner
820            .lock_state()
821            .entries
822            .get(&pid)
823            .map(|record| record.pending_signals)
824            .ok_or_else(|| ProcessTableError::no_such_process(pid))
825    }
826}
827
828fn to_process_info(entry: &ProcessEntry) -> ProcessInfo {
829    ProcessInfo {
830        pid: entry.pid,
831        ppid: entry.ppid,
832        pgid: entry.pgid,
833        sid: entry.sid,
834        driver: entry.driver.clone(),
835        command: entry.command.clone(),
836        status: entry.status,
837        exit_code: entry.exit_code,
838        identity: entry.identity.clone(),
839    }
840}
841
842fn mark_exited_inner(inner: &Arc<ProcessTableInner>, pid: u32, exit_code: i32) {
843    let (callback, zombie_ttl, should_schedule, deliveries) = {
844        let mut state = inner.lock_state();
845        let (ppid, pgid) = {
846            let Some(record) = state.entries.get_mut(&pid) else {
847                return;
848            };
849
850            if record.entry.status == ProcessStatus::Exited {
851                return;
852            }
853
854            record.entry.status = ProcessStatus::Exited;
855            record.entry.exit_code = Some(exit_code);
856            record.entry.exit_time_ms = Some(now_ms());
857            let ppid = record.entry.ppid;
858            let pgid = record.entry.pgid;
859            (ppid, pgid)
860        };
861        let mut affected_pgids = BTreeSet::from([pgid]);
862        reparent_children_to_init(&mut state, pid, &mut affected_pgids);
863
864        let orphaned_group_targets = collect_orphaned_group_signal_targets(&state, &affected_pgids);
865
866        let should_schedule = !state.terminating_all;
867        let mut deliveries = Vec::new();
868        if should_schedule {
869            if let Some(parent) = state
870                .entries
871                .get_mut(&ppid)
872                .filter(|parent| parent.entry.status == ProcessStatus::Running)
873            {
874                if let Some(delivery) =
875                    queue_or_schedule_signal(parent, SIGCHLD).expect("SIGCHLD should be valid")
876                {
877                    deliveries.push(delivery);
878                }
879            }
880        }
881
882        for target_pid in orphaned_group_targets {
883            if let Some(record) = state.entries.get_mut(&target_pid) {
884                if let Some(delivery) =
885                    queue_or_schedule_signal(record, SIGHUP).expect("SIGHUP should be valid")
886                {
887                    deliveries.push(delivery);
888                }
889                if let Some(delivery) =
890                    queue_or_schedule_signal(record, SIGCONT).expect("SIGCONT should be valid")
891                {
892                    deliveries.push(delivery);
893                }
894            }
895        }
896
897        (
898            state.on_process_exit.clone(),
899            state.zombie_ttl,
900            should_schedule,
901            deliveries,
902        )
903    };
904
905    if should_schedule {
906        inner.reaper.schedule(pid, zombie_ttl);
907    } else {
908        inner.reaper.cancel(pid);
909    }
910
911    deliver_signals(inner, deliveries);
912
913    if let Some(on_process_exit) = callback {
914        on_process_exit(pid);
915    }
916
917    inner.waiters.notify_all();
918}
919
920fn reparent_children_to_init(
921    state: &mut ProcessTableState,
922    exiting_pid: u32,
923    affected_pgids: &mut BTreeSet<u32>,
924) {
925    let new_parent = reparent_target_pid(state, exiting_pid);
926    for record in state.entries.values_mut() {
927        if record.entry.ppid != exiting_pid {
928            continue;
929        }
930        record.entry.ppid = new_parent;
931        affected_pgids.insert(record.entry.pgid);
932    }
933}
934
935fn reparent_target_pid(state: &ProcessTableState, exiting_pid: u32) -> u32 {
936    if exiting_pid != INIT_PID
937        && state
938            .entries
939            .get(&INIT_PID)
940            .map(|record| record.entry.status != ProcessStatus::Exited)
941            .unwrap_or(false)
942    {
943        INIT_PID
944    } else {
945        0
946    }
947}
948
949fn collect_orphaned_group_signal_targets(
950    state: &ProcessTableState,
951    candidate_pgids: &BTreeSet<u32>,
952) -> Vec<u32> {
953    let mut targets = Vec::new();
954    for &pgid in candidate_pgids {
955        if !process_group_is_orphaned(state, pgid) || !process_group_has_stopped_member(state, pgid)
956        {
957            continue;
958        }
959
960        for record in state.entries.values() {
961            if record.entry.pgid == pgid && record.entry.status != ProcessStatus::Exited {
962                targets.push(record.entry.pid);
963            }
964        }
965    }
966    targets
967}
968
969fn process_group_is_orphaned(state: &ProcessTableState, pgid: u32) -> bool {
970    let mut has_member = false;
971    for record in state.entries.values() {
972        if record.entry.pgid != pgid || record.entry.status == ProcessStatus::Exited {
973            continue;
974        }
975        has_member = true;
976        if has_parent_outside_group_in_same_session(state, &record.entry) {
977            return false;
978        }
979    }
980
981    has_member
982}
983
984fn has_parent_outside_group_in_same_session(
985    state: &ProcessTableState,
986    entry: &ProcessEntry,
987) -> bool {
988    match entry.ppid {
989        0 | INIT_PID => false,
990        ppid => state
991            .entries
992            .get(&ppid)
993            .map(|parent| {
994                parent.entry.status != ProcessStatus::Exited
995                    && parent.entry.sid == entry.sid
996                    && parent.entry.pgid != entry.pgid
997            })
998            .unwrap_or(false),
999    }
1000}
1001
1002fn process_group_has_stopped_member(state: &ProcessTableState, pgid: u32) -> bool {
1003    state
1004        .entries
1005        .values()
1006        .any(|record| record.entry.pgid == pgid && record.entry.status == ProcessStatus::Stopped)
1007}
1008
1009fn mark_wait_event_inner(
1010    inner: &Arc<ProcessTableInner>,
1011    pid: u32,
1012    next_status: ProcessStatus,
1013    event: PendingWaitEvent,
1014) {
1015    let deliveries = {
1016        let mut state = inner.lock_state();
1017        let ppid = {
1018            let Some(record) = state.entries.get_mut(&pid) else {
1019                return;
1020            };
1021
1022            if record.entry.status == ProcessStatus::Exited || record.entry.status == next_status {
1023                return;
1024            }
1025
1026            record.entry.status = next_status;
1027            record.pending_wait_events.push_back(event);
1028            record.entry.ppid
1029        };
1030
1031        state
1032            .entries
1033            .get_mut(&ppid)
1034            .filter(|parent| parent.entry.status == ProcessStatus::Running)
1035            .and_then(|parent| {
1036                queue_or_schedule_signal(parent, SIGCHLD)
1037                    .expect("SIGCHLD should be valid")
1038                    .into_iter()
1039                    .next()
1040            })
1041            .into_iter()
1042            .collect::<Vec<_>>()
1043    };
1044
1045    deliver_signals(inner, deliveries);
1046
1047    inner.waiters.notify_all();
1048}
1049
1050fn signal_bit(signal: i32) -> ProcessResult<u64> {
1051    if !(1..=MAX_SIGNAL).contains(&signal) {
1052        return Err(ProcessTableError::invalid_signal(signal));
1053    }
1054    Ok(1u64 << (signal - 1))
1055}
1056
1057fn normalize_next_pid(pid: u32) -> u32 {
1058    if (INIT_PID..=MAX_ALLOCATED_PID).contains(&pid) {
1059        pid
1060    } else {
1061        INIT_PID
1062    }
1063}
1064
1065fn next_allocated_pid_after(pid: u32) -> u32 {
1066    if pid >= MAX_ALLOCATED_PID {
1067        INIT_PID
1068    } else {
1069        pid + 1
1070    }
1071}
1072
1073fn next_pid_after_registered(current: u32, registered: u32) -> u32 {
1074    let current = normalize_next_pid(current);
1075    if !(INIT_PID..=MAX_ALLOCATED_PID).contains(&registered) {
1076        return current;
1077    }
1078
1079    if current <= registered {
1080        next_allocated_pid_after(registered)
1081    } else {
1082        current
1083    }
1084}
1085
1086fn signal_can_be_blocked(signal: i32) -> bool {
1087    !matches!(signal, SIGKILL | SIGSTOP | SIGCONT)
1088}
1089
1090fn queue_or_schedule_signal(
1091    record: &mut ProcessRecord,
1092    signal: i32,
1093) -> ProcessResult<Option<ScheduledSignalDelivery>> {
1094    if signal_can_be_blocked(signal) && record.blocked_signals.contains(signal) {
1095        record.pending_signals.insert(signal)?;
1096        return Ok(None);
1097    }
1098
1099    Ok(Some(ScheduledSignalDelivery {
1100        pid: record.entry.pid,
1101        signal,
1102        status: record.entry.status,
1103        driver_process: Arc::clone(&record.driver_process),
1104    }))
1105}
1106
1107fn collect_signal_deliveries(
1108    state: &mut ProcessTableState,
1109    target_pids: &[u32],
1110    signal: i32,
1111) -> ProcessResult<Vec<ScheduledSignalDelivery>> {
1112    let mut deliveries = Vec::new();
1113    for pid in target_pids {
1114        let Some(record) = state.entries.get_mut(pid) else {
1115            continue;
1116        };
1117        if let Some(delivery) = queue_or_schedule_signal(record, signal)? {
1118            deliveries.push(delivery);
1119        }
1120    }
1121    Ok(deliveries)
1122}
1123
1124fn collect_pending_signal_deliveries(
1125    record: &mut ProcessRecord,
1126    signals: SignalSet,
1127) -> ProcessResult<Vec<ScheduledSignalDelivery>> {
1128    let mut deliveries = Vec::new();
1129    for signal in signals.signals() {
1130        record.pending_signals.remove(signal)?;
1131        deliveries.push(ScheduledSignalDelivery {
1132            pid: record.entry.pid,
1133            signal,
1134            status: record.entry.status,
1135            driver_process: Arc::clone(&record.driver_process),
1136        });
1137    }
1138    Ok(deliveries)
1139}
1140
1141fn deliver_signals(inner: &Arc<ProcessTableInner>, deliveries: Vec<ScheduledSignalDelivery>) {
1142    let mut stopped = Vec::new();
1143    let mut continued = Vec::new();
1144
1145    for delivery in &deliveries {
1146        match delivery.signal {
1147            SIGSTOP | SIGTSTP if delivery.status == ProcessStatus::Running => {
1148                stopped.push((delivery.pid, delivery.signal))
1149            }
1150            SIGCONT if delivery.status == ProcessStatus::Stopped => continued.push(delivery.pid),
1151            _ => {}
1152        }
1153        delivery.driver_process.kill(delivery.signal);
1154    }
1155
1156    for (pid, signal) in stopped {
1157        mark_wait_event_inner(
1158            inner,
1159            pid,
1160            ProcessStatus::Stopped,
1161            PendingWaitEvent {
1162                status: signal,
1163                event: ProcessWaitEvent::Stopped,
1164            },
1165        );
1166    }
1167    for pid in continued {
1168        mark_wait_event_inner(
1169            inner,
1170            pid,
1171            ProcessStatus::Running,
1172            PendingWaitEvent {
1173                status: SIGCONT,
1174                event: ProcessWaitEvent::Continued,
1175            },
1176        );
1177    }
1178}
1179
1180fn resolve_wait_selector(
1181    state: &ProcessTableState,
1182    waiter_pid: u32,
1183    pid: i32,
1184) -> ProcessResult<WaitSelector> {
1185    let waiter = state
1186        .entries
1187        .get(&waiter_pid)
1188        .ok_or_else(|| ProcessTableError::no_such_process(waiter_pid))?;
1189
1190    Ok(match pid {
1191        -1 => WaitSelector::AnyChild,
1192        0 => WaitSelector::ProcessGroup(waiter.entry.pgid),
1193        p if p < -1 => WaitSelector::ProcessGroup(p.unsigned_abs()),
1194        p => WaitSelector::ChildPid(p as u32),
1195    })
1196}
1197
1198fn matching_child_pids(
1199    state: &ProcessTableState,
1200    waiter_pid: u32,
1201    selector: WaitSelector,
1202) -> Vec<u32> {
1203    state
1204        .entries
1205        .values()
1206        .filter(|record| record.entry.ppid == waiter_pid)
1207        .filter(|record| match selector {
1208            WaitSelector::AnyChild => true,
1209            WaitSelector::ChildPid(pid) => record.entry.pid == pid,
1210            WaitSelector::ProcessGroup(pgid) => record.entry.pgid == pgid,
1211        })
1212        .map(|record| record.entry.pid)
1213        .collect()
1214}
1215
1216fn take_waitable_event(
1217    state: &mut ProcessTableState,
1218    matching_children: &[u32],
1219    flags: WaitPidFlags,
1220) -> Option<ProcessWaitResult> {
1221    for child_pid in matching_children {
1222        let mut non_exit_result = None;
1223        let mut should_reap = false;
1224        {
1225            let record = state.entries.get_mut(child_pid)?;
1226            if let Some(index) = record
1227                .pending_wait_events
1228                .iter()
1229                .position(|event| is_waitable_event(event.event, flags))
1230            {
1231                let event = record
1232                    .pending_wait_events
1233                    .remove(index)
1234                    .expect("pending wait event should exist");
1235                non_exit_result = Some(ProcessWaitResult {
1236                    pid: *child_pid,
1237                    status: event.status,
1238                    event: event.event,
1239                });
1240            } else if record.entry.status == ProcessStatus::Exited {
1241                should_reap = true;
1242            }
1243        }
1244
1245        if let Some(result) = non_exit_result {
1246            return Some(result);
1247        }
1248
1249        if should_reap {
1250            let record = state
1251                .entries
1252                .remove(child_pid)
1253                .expect("exited child should still exist");
1254            return Some(ProcessWaitResult {
1255                pid: *child_pid,
1256                status: record.entry.exit_code.unwrap_or_default(),
1257                event: ProcessWaitEvent::Exited,
1258            });
1259        }
1260    }
1261
1262    None
1263}
1264
1265fn is_waitable_event(event: ProcessWaitEvent, flags: WaitPidFlags) -> bool {
1266    match event {
1267        ProcessWaitEvent::Exited => true,
1268        ProcessWaitEvent::Stopped => flags.contains(WaitPidFlags::WUNTRACED),
1269        ProcessWaitEvent::Continued => flags.contains(WaitPidFlags::WCONTINUED),
1270    }
1271}
1272
1273fn start_zombie_reaper(inner: Weak<ProcessTableInner>, reaper: Arc<ZombieReaper>) {
1274    reaper.thread_spawns.fetch_add(1, Ordering::SeqCst);
1275    thread::spawn(move || loop {
1276        let Some(pid) = reaper.take_next_due_pid() else {
1277            return;
1278        };
1279
1280        let Some(inner) = inner.upgrade() else {
1281            return;
1282        };
1283
1284        let mut state = inner.lock_state();
1285        let should_reap = state
1286            .entries
1287            .get(&pid)
1288            .map(|record| {
1289                record.entry.status == ProcessStatus::Exited
1290                    && !has_living_parent(&state, record.entry.ppid)
1291            })
1292            .unwrap_or(false);
1293        if should_reap {
1294            state.entries.remove(&pid);
1295        } else if state
1296            .entries
1297            .get(&pid)
1298            .map(|record| record.entry.status == ProcessStatus::Exited)
1299            .unwrap_or(false)
1300        {
1301            reaper.schedule(pid, state.zombie_ttl);
1302        }
1303        drop(state);
1304        inner.waiters.notify_all();
1305    });
1306}
1307
1308fn has_living_parent(state: &ProcessTableState, ppid: u32) -> bool {
1309    ppid != 0
1310        && state
1311            .entries
1312            .get(&ppid)
1313            .map(|record| record.entry.status != ProcessStatus::Exited)
1314            .unwrap_or(false)
1315}
1316
1317impl ProcessTableInner {
1318    fn lock_state(&self) -> MutexGuard<'_, ProcessTableState> {
1319        lock_or_recover(&self.state)
1320    }
1321
1322    fn wait_for_state<'a>(
1323        &self,
1324        guard: MutexGuard<'a, ProcessTableState>,
1325    ) -> MutexGuard<'a, ProcessTableState> {
1326        wait_or_recover(&self.waiters, guard)
1327    }
1328}
1329
1330fn now_ms() -> u64 {
1331    SystemTime::now()
1332        .duration_since(UNIX_EPOCH)
1333        .unwrap_or_default()
1334        .as_millis() as u64
1335}
1336
1337impl Default for ZombieReaper {
1338    fn default() -> Self {
1339        Self {
1340            state: Mutex::new(ZombieReaperState::default()),
1341            wake: Condvar::new(),
1342            thread_spawns: AtomicUsize::new(0),
1343        }
1344    }
1345}
1346
1347impl ZombieReaper {
1348    fn schedule(&self, pid: u32, ttl: Duration) {
1349        let mut state = lock_or_recover(&self.state);
1350        state.deadlines.insert(pid, Instant::now() + ttl);
1351        drop(state);
1352        self.wake.notify_all();
1353    }
1354
1355    fn cancel(&self, pid: u32) {
1356        let mut state = lock_or_recover(&self.state);
1357        let removed = state.deadlines.remove(&pid).is_some();
1358        drop(state);
1359        if removed {
1360            self.wake.notify_all();
1361        }
1362    }
1363
1364    fn clear(&self) {
1365        let mut state = lock_or_recover(&self.state);
1366        let changed = !state.deadlines.is_empty();
1367        state.deadlines.clear();
1368        drop(state);
1369        if changed {
1370            self.wake.notify_all();
1371        }
1372    }
1373
1374    fn shutdown(&self) {
1375        let mut state = lock_or_recover(&self.state);
1376        state.shutdown = true;
1377        drop(state);
1378        self.wake.notify_all();
1379    }
1380
1381    fn scheduled_count(&self) -> usize {
1382        lock_or_recover(&self.state).deadlines.len()
1383    }
1384
1385    fn thread_spawn_count(&self) -> usize {
1386        self.thread_spawns.load(Ordering::SeqCst)
1387    }
1388
1389    fn take_next_due_pid(&self) -> Option<u32> {
1390        let mut state = lock_or_recover(&self.state);
1391        loop {
1392            if state.shutdown {
1393                return None;
1394            }
1395
1396            let Some((pid, deadline)) = state
1397                .deadlines
1398                .iter()
1399                .min_by_key(|(_, deadline)| **deadline)
1400                .map(|(&pid, &deadline)| (pid, deadline))
1401            else {
1402                state = wait_or_recover(&self.wake, state);
1403                continue;
1404            };
1405
1406            let now = Instant::now();
1407            if deadline <= now {
1408                state.deadlines.remove(&pid);
1409                return Some(pid);
1410            }
1411
1412            let timeout = deadline.saturating_duration_since(now);
1413            let (next_state, _) = wait_timeout_or_recover(&self.wake, state, timeout);
1414            state = next_state;
1415        }
1416    }
1417}
1418
1419impl Drop for ProcessTableInner {
1420    fn drop(&mut self) {
1421        self.reaper.shutdown();
1422    }
1423}
1424
1425fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>) -> MutexGuard<'a, T> {
1426    match mutex.lock() {
1427        Ok(guard) => guard,
1428        Err(poisoned) => poisoned.into_inner(),
1429    }
1430}
1431
1432fn wait_or_recover<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
1433    match condvar.wait(guard) {
1434        Ok(guard) => guard,
1435        Err(poisoned) => poisoned.into_inner(),
1436    }
1437}
1438
1439fn wait_timeout_or_recover<'a, T>(
1440    condvar: &Condvar,
1441    guard: MutexGuard<'a, T>,
1442    timeout: Duration,
1443) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
1444    match condvar.wait_timeout(guard, timeout) {
1445        Ok(result) => result,
1446        Err(poisoned) => poisoned.into_inner(),
1447    }
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452    use super::*;
1453
1454    #[derive(Default)]
1455    struct TestDriverProcess {
1456        on_exit: Mutex<Option<ProcessExitCallback>>,
1457    }
1458
1459    impl TestDriverProcess {
1460        fn exit(&self, exit_code: i32) {
1461            let callback = self
1462                .on_exit
1463                .lock()
1464                .expect("test driver lock poisoned")
1465                .clone();
1466            if let Some(callback) = callback {
1467                callback(exit_code);
1468            }
1469        }
1470    }
1471
1472    impl DriverProcess for TestDriverProcess {
1473        fn kill(&self, _signal: i32) {}
1474
1475        fn wait(&self, _timeout: Duration) -> Option<i32> {
1476            None
1477        }
1478
1479        fn set_on_exit(&self, callback: ProcessExitCallback) {
1480            *self.on_exit.lock().expect("test driver lock poisoned") = Some(callback);
1481        }
1482    }
1483
1484    fn context(ppid: u32) -> ProcessContext {
1485        ProcessContext {
1486            ppid,
1487            ..ProcessContext::default()
1488        }
1489    }
1490
1491    #[test]
1492    fn allocate_pid_wraps_without_reusing_live_or_zombie_processes() {
1493        let table = ProcessTable::with_zombie_ttl(Duration::from_secs(3600));
1494        let live_high = Arc::new(TestDriverProcess::default());
1495        let zombie_high = Arc::new(TestDriverProcess::default());
1496        let live_one = Arc::new(TestDriverProcess::default());
1497        let max_pid = MAX_ALLOCATED_PID;
1498
1499        table.register(
1500            max_pid - 1,
1501            "test",
1502            "live-high",
1503            Vec::new(),
1504            context(0),
1505            live_high,
1506        );
1507        table.register(
1508            max_pid,
1509            "test",
1510            "zombie-high",
1511            Vec::new(),
1512            context(0),
1513            zombie_high.clone(),
1514        );
1515        table.register(1, "test", "live-one", Vec::new(), context(0), live_one);
1516        zombie_high.exit(0);
1517
1518        table.inner.lock_state().next_pid = max_pid - 1;
1519
1520        assert_eq!(table.allocate_pid().expect("allocate pid"), 2);
1521        assert_eq!(table.allocate_pid().expect("allocate pid"), 3);
1522    }
1523}