epics-tools-rs 0.20.2

Operational tooling for EPICS deployments — process supervisors, log/audit dumpers, config validators. First tenant: procserv (Rust port of epics-modules/procServ).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
//! PTY-based child process management.
//!
//! Wraps `forkpty(3)` (via `nix::pty::forkpty`) to launch the
//! supervised child with its stdin/stdout/stderr connected to a
//! pseudo-terminal. The supervisor owns the master fd; we read its
//! output asynchronously via `tokio::io::unix::AsyncFd` and write
//! to it (filtered through `ignore_chars`) when forwarding party-line
//! input to the child stdin.
//!
//! Mirrors C `processClass` / `processFactory` (`processFactory.cc`):
//! - `forkpty` + `execvp`
//! - `setsid()` so signals to `-pid` reach the whole process group
//! - per-line PTY-master read with EIO/EOF → child-died detection
//! - `kill(-pid, sig)` for signal forwarding to the entire group
//! - SIGCHLD-style reap via blocking `waitpid` on a side task

use std::ffi::CString;
use std::io;
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::path::PathBuf;
use std::process::ExitStatus;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use nix::pty::ForkptyResult;
use nix::sys::signal::Signal;
use nix::sys::wait::{WaitStatus, waitpid};
use nix::unistd::{Pid, chdir, execvp};
use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::procserv::error::{ProcServError, ProcServResult};

/// Lifecycle event emitted by [`ChildHandle`] over its event channel.
#[derive(Debug)]
pub enum ChildEvent {
    /// PTY produced a chunk of output (child stdout/stderr).
    Output(Vec<u8>),
    /// Child process terminated. The supervisor consults the restart
    /// policy and either re-spawns or exits.
    Exited { status: Option<ExitStatus> },
}

/// Configuration for one child launch. Mirrors the subset of
/// [`crate::procserv::config::ChildConfig`] this module needs.
#[derive(Debug, Clone)]
pub struct ChildSpec {
    pub program: PathBuf,
    pub args: Vec<String>,
    pub cwd: Option<PathBuf>,
    pub ignore_chars: Vec<u8>,
}

/// Handle to a running child process. Cloning is cheap (Arcs inside).
#[derive(Clone)]
pub struct ChildHandle {
    pid: Pid,
    master: Arc<AsyncFd<OwnedFd>>,
    /// Filter applied to outbound bytes (matches C `--ignore` flag).
    ignore_chars: Arc<Vec<u8>>,
    alive: Arc<AtomicBool>,
}

impl std::fmt::Debug for ChildHandle {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ChildHandle")
            .field("pid", &self.pid.as_raw())
            .field("alive", &self.alive.load(Ordering::Relaxed))
            .finish()
    }
}

impl ChildHandle {
    /// Spawn a new child via `forkpty` + `execvp`, returning the
    /// handle plus the receiver for [`ChildEvent`]s. The receiver is
    /// closed when the child exits and its PTY drains.
    ///
    /// Two side tasks are spawned per child:
    /// 1. **PTY reader** — reads from master fd, emits `Output` events
    /// 2. **Reaper** — blocking `waitpid` on a `spawn_blocking` thread,
    ///    emits the final `Exited` event and flips `alive` to false
    pub fn spawn(spec: &ChildSpec) -> ProcServResult<(Self, mpsc::Receiver<ChildEvent>)> {
        // SAFETY: forkpty is unsafe because between fork and exec we
        // may only call async-signal-safe functions. We do exactly
        // that: setsid + chdir (libc syscalls, both AS-safe) + execvp.
        let result = unsafe { nix::pty::forkpty(None, None) }
            .map_err(|e| ProcServError::Forkpty(e.to_string()))?;

        match result {
            ForkptyResult::Parent { child, master } => {
                let alive = Arc::new(AtomicBool::new(true));
                set_nonblocking(&master)
                    .map_err(|e| ProcServError::Forkpty(format!("set O_NONBLOCK: {e}")))?;
                let master_fd = Arc::new(
                    AsyncFd::new(master)
                        .map_err(|e| ProcServError::Forkpty(format!("AsyncFd: {e}")))?,
                );

                let (tx, rx) = mpsc::channel::<ChildEvent>(64);

                // Reader task: pump PTY-master → ChildEvent::Output.
                spawn_reader(master_fd.clone(), tx.clone());

                // Reaper task: blocking waitpid → ChildEvent::Exited.
                spawn_reaper(child, alive.clone(), tx);

                Ok((
                    Self {
                        pid: child,
                        master: master_fd,
                        ignore_chars: Arc::new(spec.ignore_chars.clone()),
                        alive,
                    },
                    rx,
                ))
            }
            ForkptyResult::Child => {
                // We're in the child. Exec the target program.
                // `in_child_setup_and_exec` returns `!` — either
                // execvp succeeds (we never return) or it logs the
                // error and `process::exit`s.
                in_child_setup_and_exec(spec);
            }
        }
    }

    /// Write bytes to the child's stdin (PTY-master write). Called
    /// by the supervisor when a non-readonly client's input is
    /// being forwarded to the child via the party-line.
    ///
    /// `ignore_chars` from [`ChildSpec`] are filtered out before the
    /// write — matches C procServ's `--ignore` flag handled in
    /// `processClass::Send`.
    pub async fn write_stdin(&self, bytes: &[u8]) -> ProcServResult<()> {
        if !self.alive.load(Ordering::Acquire) {
            return Err(ProcServError::ChildExited(None));
        }

        let filtered: Vec<u8> = if self.ignore_chars.is_empty() {
            bytes.to_vec()
        } else {
            bytes
                .iter()
                .copied()
                .filter(|b| !self.ignore_chars.contains(b))
                .collect()
        };
        if filtered.is_empty() {
            return Ok(());
        }

        let mut written = 0;
        while written < filtered.len() {
            let mut guard = self.master.writable().await.map_err(ProcServError::Io)?;
            let raw = self.master.as_ref().as_raw_fd();
            let result = guard.try_io(|_| {
                let n = unsafe {
                    libc::write(
                        raw,
                        filtered[written..].as_ptr() as *const libc::c_void,
                        filtered.len() - written,
                    )
                };
                if n < 0 {
                    Err(io::Error::last_os_error())
                } else {
                    Ok(n as usize)
                }
            });
            match result {
                Ok(Ok(n)) => written += n,
                Ok(Err(e)) => return Err(ProcServError::Io(e)),
                Err(_would_block) => continue,
            }
        }
        Ok(())
    }

    /// Send a signal to the child process group. Negative pid means
    /// "all processes in pgid", which is what we want — `setsid`
    /// makes the child its own group leader so we can signal the
    /// whole tree of grandchildren too.
    pub fn signal(&self, signo: i32) -> ProcServResult<()> {
        let sig = Signal::try_from(signo)
            .map_err(|e| ProcServError::Config(format!("invalid signal {signo}: {e}")))?;
        // Negative pid → process group.
        let pgid = Pid::from_raw(-self.pid.as_raw());
        nix::sys::signal::kill(pgid, sig)
            .map_err(|e| ProcServError::Io(io::Error::other(e.to_string())))?;
        Ok(())
    }

    /// Whether the child is currently alive. The menu-key dispatch
    /// (`Action::evaluate(child_alive=…)`) consults this each
    /// keystroke.
    pub fn is_alive(&self) -> bool {
        self.alive.load(Ordering::Acquire)
    }

    /// PID of the child (for info-file rendering).
    pub fn pid(&self) -> i32 {
        self.pid.as_raw()
    }
}

/// In-child path of `forkpty` — optional chdir, then `execvp` the
/// target program. Never returns on success; on failure prints to
/// stderr (which goes back through the PTY to the parent) and exits
/// with status 126 / 127.
///
/// Note: `forkpty(3)` already calls `setsid()` internally and
/// connects the slave fd as the controlling terminal, so we MUST
/// NOT call `setsid` here again — it would return `EPERM` because
/// we're already a session leader.
fn in_child_setup_and_exec(spec: &ChildSpec) -> ! {
    if let Some(ref cwd) = spec.cwd {
        let c_cwd = match CString::new(cwd.as_os_str().as_encoded_bytes()) {
            Ok(c) => c,
            Err(_) => {
                eprintln!("procserv child: invalid chdir path");
                std::process::exit(126);
            }
        };
        if let Err(e) = chdir(c_cwd.as_c_str()) {
            eprintln!("procserv child: chdir to {} failed: {e}", cwd.display());
            std::process::exit(126);
        }
    }

    let prog = match CString::new(spec.program.as_os_str().as_encoded_bytes()) {
        Ok(c) => c,
        Err(_) => {
            eprintln!("procserv child: program name contains NUL");
            std::process::exit(126);
        }
    };
    let mut argv: Vec<CString> = Vec::with_capacity(1 + spec.args.len());
    argv.push(prog.clone());
    for a in &spec.args {
        match CString::new(a.as_bytes()) {
            Ok(c) => argv.push(c),
            Err(_) => {
                eprintln!("procserv child: argument contains NUL: {a:?}");
                std::process::exit(126);
            }
        }
    }

    let argv_refs: Vec<&std::ffi::CStr> = argv.iter().map(|c| c.as_c_str()).collect();
    match execvp(prog.as_c_str(), &argv_refs) {
        Ok(infallible) => match infallible {},
        Err(e) => {
            eprintln!(
                "procserv child: execvp({}) failed: {e}",
                spec.program.display()
            );
            std::process::exit(127);
        }
    }
}

/// Set `O_NONBLOCK` on a borrowed fd. Required before wrapping in
/// `AsyncFd` — otherwise `try_io`'s read syscall blocks indefinitely
/// on no-data instead of returning `EAGAIN` for the runtime to wait.
fn set_nonblocking(fd: &OwnedFd) -> io::Result<()> {
    let raw = fd.as_raw_fd();
    let flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };
    if flags < 0 {
        return Err(io::Error::last_os_error());
    }
    if unsafe { libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) } < 0 {
        return Err(io::Error::last_os_error());
    }
    Ok(())
}

/// Reader task: pumps PTY-master into [`ChildEvent::Output`]
/// messages until EOF / EIO (which is what we get when the child
/// exits and its slave side closes).
fn spawn_reader(master: Arc<AsyncFd<OwnedFd>>, tx: mpsc::Sender<ChildEvent>) -> JoinHandle<()> {
    tokio::spawn(async move {
        let mut buf = vec![0u8; 4096];
        let raw: RawFd = master.as_ref().as_raw_fd();
        loop {
            let mut guard = match master.readable().await {
                Ok(g) => g,
                Err(e) => {
                    tracing::debug!(error = %e, "procserv child PTY readable() ended");
                    break;
                }
            };
            match guard.try_io(|_| {
                let n =
                    unsafe { libc::read(raw, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
                if n < 0 {
                    Err(io::Error::last_os_error())
                } else {
                    Ok(n as usize)
                }
            }) {
                Ok(Ok(0)) => break, // EOF
                Ok(Ok(n)) => {
                    // Send a copy; the buffer is reused next iteration.
                    let chunk = buf[..n].to_vec();
                    if tx.send(ChildEvent::Output(chunk)).await.is_err() {
                        break;
                    }
                }
                Ok(Err(e)) => {
                    // EIO commonly means the slave end closed — child died.
                    if e.raw_os_error() == Some(libc::EIO) {
                        tracing::debug!("procserv child PTY EIO (slave closed)");
                    } else {
                        tracing::debug!(error = %e, "procserv child PTY read error");
                    }
                    break;
                }
                Err(_would_block) => continue,
            }
        }
    })
}

/// Reaper task: blocking `waitpid` on a thread; emits the final
/// [`ChildEvent::Exited`] then closes the channel.
fn spawn_reaper(pid: Pid, alive: Arc<AtomicBool>, tx: mpsc::Sender<ChildEvent>) -> JoinHandle<()> {
    tokio::task::spawn(async move {
        let res = tokio::task::spawn_blocking(move || waitpid(pid, None))
            .await
            .ok();
        let exit_code = match res {
            Some(Ok(WaitStatus::Exited(_, code))) => Some(make_exit_status(code)),
            Some(Ok(WaitStatus::Signaled(_, sig, _))) => Some(make_exit_status(128 + sig as i32)),
            _ => None,
        };
        alive.store(false, Ordering::Release);
        let _ = tx.send(ChildEvent::Exited { status: exit_code }).await;
    })
}

#[cfg(unix)]
fn make_exit_status(code: i32) -> ExitStatus {
    use std::os::unix::process::ExitStatusExt;
    // Pack code into the wait-status form. ExitStatusExt::from_raw
    // takes a raw `wait()` status; for an "exited normally" status
    // that's `code << 8`.
    ExitStatus::from_raw(code << 8)
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::time::{Duration, sleep};

    /// Drain `rx` until the channel closes or `deadline` elapses,
    /// collecting all output. We deliberately do NOT break on
    /// `Exited` — the reader and reaper run in parallel, so a final
    /// chunk of PTY output can arrive after the exit signal.
    /// Channel closure (`None`) is the authoritative end-of-life
    /// because both reader and reaper drop their tx clones before
    /// returning.
    async fn drain_until_closed(
        rx: &mut mpsc::Receiver<ChildEvent>,
        deadline: tokio::time::Instant,
    ) -> (Vec<u8>, bool) {
        let mut output = Vec::new();
        let mut exited = false;
        while tokio::time::Instant::now() < deadline {
            tokio::select! {
                ev = rx.recv() => match ev {
                    Some(ChildEvent::Output(b)) => output.extend_from_slice(&b),
                    Some(ChildEvent::Exited { .. }) => exited = true,
                    None => break,
                },
                _ = sleep(Duration::from_millis(50)) => {}
            }
        }
        (output, exited)
    }

    #[tokio::test]
    async fn spawn_echo_child_yields_output_and_exits() {
        let spec = ChildSpec {
            program: PathBuf::from("/bin/echo"),
            args: vec!["hello procserv".into()],
            cwd: None,
            ignore_chars: Vec::new(),
        };
        let (handle, mut rx) = ChildHandle::spawn(&spec).expect("spawn");
        let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
        let (output, exited) = drain_until_closed(&mut rx, deadline).await;

        assert!(exited, "child should have exited");
        assert!(!handle.is_alive(), "alive flag should flip false");
        let text = String::from_utf8_lossy(&output);
        assert!(text.contains("hello procserv"), "got: {text:?}");
    }

    #[tokio::test]
    async fn write_stdin_filters_ignore_chars() {
        // `cat` echoes stdin to stdout; we'll feed it bytes and
        // verify the ignore filter strips them before the write.
        let spec = ChildSpec {
            program: PathBuf::from("/bin/cat"),
            args: vec![],
            cwd: None,
            ignore_chars: vec![b'X'],
        };
        let (handle, mut rx) = ChildHandle::spawn(&spec).expect("spawn");

        // Wait briefly for the PTY to settle and for cat to be in
        // its read loop. Without this, the first write can race the
        // exec.
        sleep(Duration::from_millis(150)).await;
        handle.write_stdin(b"abXXcd\n").await.expect("write");
        // Give cat a moment to echo before sending EOF.
        sleep(Duration::from_millis(150)).await;
        // Send EOF (Ctrl-D = 0x04) so cat exits cleanly.
        handle.write_stdin(&[0x04]).await.ok();

        let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
        let (output, _) = drain_until_closed(&mut rx, deadline).await;

        let text = String::from_utf8_lossy(&output);
        assert!(text.contains("abcd"), "filter stripped X's, got: {text:?}");
        assert!(
            !text.contains('X'),
            "X bytes should not appear, got: {text:?}"
        );
    }
}