1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
//! Child process supervisor for PID 1 agent.
//!
//! Manages daemon processes (containerd, dockerd) spawned by the agent.
//! Handles SIGCHLD for zombie reaping when running as PID 1.
#[cfg(target_os = "linux")]
mod platform {
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
/// Manages child processes spawned by the agent.
pub struct Supervisor {
/// Tracked children: name → PID.
children: HashMap<String, u32>,
}
impl Supervisor {
pub fn new() -> Self {
Self {
children: HashMap::new(),
}
}
/// Spawn a named child process. Returns the PID on success.
pub fn spawn(&mut self, name: &str, mut cmd: std::process::Command) -> anyhow::Result<u32> {
let child = cmd.spawn()?;
let pid = child.id();
tracing::info!(name, pid, "spawned child process");
self.children.insert(name.to_string(), pid);
Ok(pid)
}
/// Wait for a Unix socket to become ready, polling at 200ms intervals.
pub async fn wait_ready(&self, name: &str, socket: &str, timeout: Duration) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
while tokio::time::Instant::now() < deadline {
if probe_unix_socket(socket).await {
tracing::info!(name, socket, "socket ready");
return true;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
tracing::warn!(name, socket, "socket not ready after timeout");
false
}
/// Reap zombie children via waitpid(-1, WNOHANG).
///
/// Reaps ALL zombies (including orphaned grandchildren reparented to PID 1),
/// and removes tracked children from the internal map.
pub fn reap_children(&mut self) {
use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
use nix::unistd::Pid;
loop {
match waitpid(Pid::from_raw(-1), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(pid, status)) => {
let name = self.remove_by_pid(pid.as_raw() as u32);
tracing::info!(
pid = pid.as_raw(),
status,
name = name.as_deref().unwrap_or("untracked"),
"reaped child process"
);
}
Ok(WaitStatus::Signaled(pid, sig, _)) => {
let name = self.remove_by_pid(pid.as_raw() as u32);
tracing::warn!(
pid = pid.as_raw(),
signal = %sig,
name = name.as_deref().unwrap_or("untracked"),
"child killed by signal"
);
}
// No more zombies.
Ok(WaitStatus::StillAlive) => break,
// ECHILD = no children at all; any other error is unexpected.
Err(nix::errno::Errno::ECHILD) => break,
Err(e) => {
tracing::warn!(error = %e, "unexpected waitpid error");
break;
}
_ => continue,
}
}
}
/// Gracefully stop all tracked children (SIGTERM, wait, reap).
pub async fn shutdown_all(&mut self) {
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
for (name, pid) in &self.children {
tracing::info!(name, pid, "sending SIGTERM to child");
let _ = kill(Pid::from_raw(*pid as i32), Signal::SIGTERM);
}
// Brief grace period, then reap.
tokio::time::sleep(Duration::from_secs(2)).await;
self.reap_children();
}
fn remove_by_pid(&mut self, pid: u32) -> Option<String> {
let name = self
.children
.iter()
.find(|(_, p)| **p == pid)
.map(|(n, _)| n.clone());
if let Some(ref n) = name {
self.children.remove(n);
}
name
}
}
async fn probe_unix_socket(path: &str) -> bool {
if !std::path::Path::new(path).exists() {
return false;
}
matches!(
tokio::time::timeout(
Duration::from_millis(300),
tokio::net::UnixStream::connect(path),
)
.await,
Ok(Ok(_))
)
}
/// Spawn a background task that reaps zombies on SIGCHLD.
///
/// Must be called from a tokio runtime. Runs until the process exits.
pub fn spawn_reaper(supervisor: Arc<Mutex<Supervisor>>) {
tokio::spawn(async move {
let mut sigchld = match tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::child(),
) {
Ok(s) => s,
Err(e) => {
// PID 1 must not panic. Degrade gracefully: zombies may
// accumulate but the agent keeps running.
tracing::error!(error = %e, "failed to register SIGCHLD handler, zombie reaping disabled");
return;
}
};
loop {
if sigchld.recv().await.is_none() {
// Signal stream closed — should not happen for PID 1,
// but degrade gracefully rather than spin.
tracing::error!("SIGCHLD stream closed unexpectedly, zombie reaping disabled");
return;
}
supervisor.lock().await.reap_children();
}
});
}
}
#[cfg(target_os = "linux")]
pub use platform::{Supervisor, spawn_reaper};
// Stubs for non-Linux development.
#[cfg(not(target_os = "linux"))]
pub struct Supervisor;
#[cfg(not(target_os = "linux"))]
impl Supervisor {
pub fn new() -> Self {
Self
}
}
#[cfg(not(target_os = "linux"))]
pub fn spawn_reaper(_supervisor: std::sync::Arc<tokio::sync::Mutex<Supervisor>>) {}