1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//! Child reaping: waitpid, kill, drain capture.
use std::io::Read;
use std::time::Instant;
use crate::audit::{CompleteOutcome, CompleteRecord, RecoveryAuditLog};
use super::{Outstanding, Recovery, RecoveryOutcome};
impl Recovery {
/// Attempt a non-blocking reap of the outstanding child for `pid`.
///
/// Returns `None` when the child is still running (try_wait returned
/// `Ok(None)`). Returns `Some(outcome)` on exit, reap failure, or
/// any other terminal condition; the `Outstanding` entry is removed
/// from the table on all terminal paths.
pub(super) fn reap_finished_child(&mut self, pid: u32) -> Option<RecoveryOutcome> {
let cap = self.capture_cap;
let try_wait_result = {
let entry_mut = self.outstanding.get_mut(pid)?;
Self::drain_outstanding_capture(entry_mut, cap);
let child_pid = entry_mut.child.id();
let wait = entry_mut.child.try_wait();
(child_pid, wait)
};
let (child_pid, try_wait_result) = try_wait_result;
match try_wait_result {
Ok(Some(status)) => {
if let Some(entry_mut) = self.outstanding.get_mut(pid) {
Self::drain_outstanding_capture(entry_mut, cap);
}
let entry = self.outstanding.remove(pid)?;
let killed = entry.killed;
self.emit_complete_audit(
pid,
child_pid,
if killed {
CompleteOutcome::Killed
} else {
CompleteOutcome::Reaped
},
Some(&status),
entry.spawned_at,
entry.wallclock_at_spawn_ms,
entry.stdout_len,
entry.stderr_len,
entry.truncated,
);
Some(RecoveryOutcome::Reaped { child_pid, status })
}
Ok(None) => None,
Err(e) => {
let entry = self.outstanding.remove(pid)?;
self.emit_complete_audit(
pid,
child_pid,
CompleteOutcome::ReapFailed,
None,
entry.spawned_at,
entry.wallclock_at_spawn_ms,
entry.stdout_len,
entry.stderr_len,
entry.truncated,
);
Some(RecoveryOutcome::ReapFailed(e))
}
}
}
/// Non-blocking drain of captured stdout/stderr for one outstanding child.
///
/// Reads as many bytes as the kernel has buffered (up to the remaining cap)
/// without ever blocking. `WouldBlock` is treated as "drain again next tick".
/// Takes the entry by `&mut Outstanding` so it can be called while an
/// `OccupiedEntry` is held in [`Self::reap_finished_child`] without
/// re-borrowing the map.
fn drain_outstanding_capture(entry: &mut Outstanding, cap_cfg: u32) {
let cap = cap_cfg as usize;
if cap == 0 {
return;
}
if entry.truncated {
return;
}
let mut total = entry.stdout_len as usize + entry.stderr_len as usize;
if let Some(handle) = entry.stdout_handle.as_mut() {
let mut buf = [0u8; 4096];
loop {
if total >= cap {
entry.truncated = true;
break;
}
let want = (cap - total).min(buf.len());
match handle.read(&mut buf[..want]) {
Ok(0) => break,
Ok(n) => {
entry.stdout_len = entry.stdout_len.saturating_add(n as u32);
total = total.saturating_add(n);
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
}
if let Some(handle) = entry.stderr_handle.as_mut() {
let mut buf = [0u8; 4096];
loop {
if total >= cap {
entry.truncated = true;
break;
}
let want = (cap - total).min(buf.len());
match handle.read(&mut buf[..want]) {
Ok(0) => break,
Ok(n) => {
entry.stderr_len = entry.stderr_len.saturating_add(n as u32);
total = total.saturating_add(n);
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
}
}
/// Emit a recovery-complete audit record (if a sink is configured)
/// from already-extracted fields.
#[allow(clippy::too_many_arguments)]
pub(super) fn emit_complete_audit(
&mut self,
agent_pid: u32,
child_pid: u32,
outcome: CompleteOutcome,
status: Option<&std::process::ExitStatus>,
spawned_at: Instant,
wallclock_at_spawn_ms: u64,
stdout_len: u32,
stderr_len: u32,
truncated: bool,
) {
let Some(sink) = self.audit_sink.as_mut() else {
return;
};
use std::os::unix::process::ExitStatusExt;
let exit_code = status.and_then(|s| s.code());
let signal = status.and_then(|s| s.signal());
let duration_ns = spawned_at.elapsed().as_nanos().min(u64::MAX as u128) as u64;
let _ = wallclock_at_spawn_ms;
sink.record_complete(&CompleteRecord {
wallclock_ms: RecoveryAuditLog::wallclock_ms_now(),
observer_ns: 0,
agent_pid,
child_pid,
outcome,
exit_code,
signal,
duration_ns,
stdout_len,
stderr_len,
truncated,
});
}
}