Skip to main content

lifeloop/router/
subprocess.rs

1//! Subprocess-backed callback invocation (issue #8).
2//!
3//! Fills the [`CallbackInvoker`] seam declared in `src/router/seams.rs`
4//! (issue #7) for the **process-boundary** transport. This module is
5//! the load-bearing proof that Lifeloop can drive an external client
6//! command without taking a Rust dependency on it: the only contract
7//! between Lifeloop and the client is JSON over stdio.
8//!
9//! # Boundary
10//!
11//! Owns:
12//! * [`SubprocessInvokerConfig`] — configurable command/args/timeout
13//!   used to spawn the external client per invocation;
14//! * [`SubprocessCallbackInvoker`] — concrete [`CallbackInvoker`]
15//!   implementation that spawns a fresh child for each event, pipes a
16//!   [`CallbackRequest`] JSON document to its stdin, reads a
17//!   [`CallbackResponse`] JSON document from its stdout, and validates
18//!   it;
19//! * [`SubprocessInvokerError`] — typed error variants carrying enough
20//!   detail for [`super::LifeloopFailureMapper`] to map onto the
21//!   shared [`crate::FailureClass`] vocabulary;
22//! * [`failure_class_for_subprocess_error`] / `From<&SubprocessInvokerError>
23//!   for FailureClass` — deterministic mapping into the existing
24//!   failure-class vocabulary. No new variants are introduced.
25//!
26//! Does **not** own:
27//! * receipt synthesis (that is [`super::receipts`]);
28//! * wire schema (the request/response types come from the lifeloop
29//!   crate root and are unchanged by this module).
30//!
31//! # Receipt-emitted guard
32//!
33//! `receipt.emitted` is a notification event and must not be invoked
34//! downstream. The subprocess invoker rejects such plans **before** it
35//! spawns a child — same rule as the in-memory path, enforced via
36//! [`super::validate_receipt_eligible`]. The rejection surfaces as
37//! [`SubprocessInvokerError::ReceiptEmittedRejected`] which maps to
38//! [`crate::FailureClass::InvalidRequest`].
39
40use std::ffi::OsString;
41use std::io::{Read, Write};
42use std::path::PathBuf;
43use std::process::{Child, Command, Stdio};
44use std::sync::mpsc;
45use std::thread;
46use std::time::{Duration, Instant};
47
48use crate::{CallbackResponse, DispatchEnvelope, FailureClass, PayloadEnvelope, ValidationError};
49
50use super::failure_mapping::{TransportError, validate_receipt_eligible};
51use super::plan::RoutingPlan;
52use super::seams::CallbackInvoker;
53use super::validation::RouteError;
54
55// ===========================================================================
56// SubprocessInvokerConfig
57// ===========================================================================
58
59/// Configuration for [`SubprocessCallbackInvoker`].
60///
61/// `program` is the external client command (an absolute path to the
62/// client's callback binary). `args` are appended on every spawn.
63/// `timeout` bounds the total round trip — write request, read
64/// response, child exit. A child still running at deadline is killed
65/// and the invocation fails with [`SubprocessInvokerError::Timeout`].
66#[derive(Debug, Clone)]
67pub struct SubprocessInvokerConfig {
68    program: PathBuf,
69    args: Vec<OsString>,
70    timeout: Duration,
71}
72
73impl SubprocessInvokerConfig {
74    /// Construct a config from an explicit program path and timeout.
75    /// The default timeout is intentionally not provided — the caller
76    /// must pick a value that matches its lifecycle SLOs.
77    pub fn new(program: impl Into<PathBuf>, timeout: Duration) -> Self {
78        Self {
79            program: program.into(),
80            args: Vec::new(),
81            timeout,
82        }
83    }
84
85    /// Append a single argument. Returns `self` for chaining.
86    pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
87        self.args.push(arg.into());
88        self
89    }
90
91    /// Append a sequence of arguments. Returns `self` for chaining.
92    pub fn args<I, A>(mut self, args: I) -> Self
93    where
94        I: IntoIterator<Item = A>,
95        A: Into<OsString>,
96    {
97        self.args.extend(args.into_iter().map(Into::into));
98        self
99    }
100
101    pub fn program(&self) -> &PathBuf {
102        &self.program
103    }
104
105    pub fn timeout(&self) -> Duration {
106        self.timeout
107    }
108}
109
110// ===========================================================================
111// SubprocessInvokerError
112// ===========================================================================
113
114/// Failure variants surfaced by [`SubprocessCallbackInvoker::invoke`].
115///
116/// Every variant maps deterministically onto a single
117/// [`FailureClass`] from the shared vocabulary — no new failure
118/// classes are introduced by the subprocess transport.
119#[derive(Debug)]
120pub enum SubprocessInvokerError {
121    /// Plan rejected before spawn because the event is
122    /// `receipt.emitted` (a notification event that must not produce
123    /// downstream invocations). Maps to
124    /// [`FailureClass::InvalidRequest`].
125    ReceiptEmittedRejected(RouteError),
126    /// Failed to spawn the child (e.g. binary not found, permission
127    /// denied). Maps to [`FailureClass::TransportError`].
128    Spawn(std::io::Error),
129    /// Failed to write the request JSON to the child's stdin. Maps to
130    /// [`FailureClass::TransportError`].
131    WriteRequest(std::io::Error),
132    /// Failed to serialize the [`CallbackRequest`] before writing.
133    /// Treated as an internal-class failure — the bug is on the
134    /// Lifeloop side, not the transport.
135    SerializeRequest(serde_json::Error),
136    /// Failed to read the child's stdout. Maps to
137    /// [`FailureClass::TransportError`].
138    ReadResponse(std::io::Error),
139    /// Child wrote bytes that did not parse as JSON. Maps to
140    /// [`FailureClass::InvalidRequest`] — the response is malformed
141    /// at the wire layer.
142    ParseResponse(serde_json::Error),
143    /// JSON parsed cleanly but the response failed
144    /// [`CallbackResponse::validate`]. Maps to
145    /// [`FailureClass::InvalidRequest`].
146    InvalidResponse(ValidationError),
147    /// Child exited non-zero. Maps to
148    /// [`FailureClass::TransportError`] — the transport returned a
149    /// failure signal we cannot interpret further.
150    NonZeroExit { code: Option<i32>, stderr: String },
151    /// Round trip exceeded the configured timeout; child was killed.
152    /// Maps to [`FailureClass::Timeout`].
153    Timeout,
154}
155
156impl std::fmt::Display for SubprocessInvokerError {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        match self {
159            Self::ReceiptEmittedRejected(e) => {
160                write!(f, "subprocess invoker refused receipt.emitted plan: {e}")
161            }
162            Self::Spawn(e) => write!(f, "failed to spawn callback subprocess: {e}"),
163            Self::WriteRequest(e) => write!(f, "failed to write request to subprocess stdin: {e}"),
164            Self::SerializeRequest(e) => write!(f, "failed to serialize CallbackRequest: {e}"),
165            Self::ReadResponse(e) => write!(f, "failed to read subprocess stdout: {e}"),
166            Self::ParseResponse(e) => write!(
167                f,
168                "subprocess stdout was not a valid JSON CallbackResponse: {e}"
169            ),
170            Self::InvalidResponse(e) => write!(
171                f,
172                "subprocess returned a CallbackResponse that failed validation: {e}"
173            ),
174            Self::NonZeroExit { code, stderr } => match code {
175                Some(c) => write!(f, "callback subprocess exited with code {c}: {stderr}"),
176                None => write!(f, "callback subprocess terminated by signal: {stderr}"),
177            },
178            Self::Timeout => f.write_str("callback subprocess exceeded configured timeout"),
179        }
180    }
181}
182
183impl std::error::Error for SubprocessInvokerError {}
184
185/// Map a [`SubprocessInvokerError`] onto the shared
186/// [`FailureClass`] vocabulary.
187///
188/// Pure function: same variant always maps to the same class so a
189/// receipt ledger replays consistently. Stays aligned with the
190/// in-process [`TransportError`] mapping in
191/// [`super::failure_mapping`].
192pub fn failure_class_for_subprocess_error(err: &SubprocessInvokerError) -> FailureClass {
193    use SubprocessInvokerError as E;
194    match err {
195        E::ReceiptEmittedRejected(_) => FailureClass::InvalidRequest,
196        E::Spawn(_) | E::WriteRequest(_) | E::ReadResponse(_) | E::NonZeroExit { .. } => {
197            FailureClass::TransportError
198        }
199        E::SerializeRequest(_) => FailureClass::InternalError,
200        E::ParseResponse(_) | E::InvalidResponse(_) => FailureClass::InvalidRequest,
201        E::Timeout => FailureClass::Timeout,
202    }
203}
204
205impl From<&SubprocessInvokerError> for FailureClass {
206    fn from(err: &SubprocessInvokerError) -> Self {
207        failure_class_for_subprocess_error(err)
208    }
209}
210
211/// Convert a [`SubprocessInvokerError`] into the shared
212/// [`TransportError`] enum so callers that already speak in
213/// [`super::LifeloopFailureMapper::map_transport_error`] terms can
214/// route subprocess failures through the same path. `None` for
215/// variants that are not transport-class (validation, parse,
216/// receipt-emitted rejection).
217pub fn transport_error_for(err: &SubprocessInvokerError) -> Option<TransportError> {
218    use SubprocessInvokerError as E;
219    match err {
220        E::Spawn(e) | E::WriteRequest(e) | E::ReadResponse(e) => {
221            Some(TransportError::Io(e.to_string()))
222        }
223        E::NonZeroExit { code, stderr } => Some(TransportError::Io(match code {
224            Some(c) => format!("exit code {c}: {stderr}"),
225            None => format!("terminated by signal: {stderr}"),
226        })),
227        E::Timeout => Some(TransportError::Timeout),
228        E::SerializeRequest(e) => Some(TransportError::Internal(e.to_string())),
229        E::ReceiptEmittedRejected(_) | E::ParseResponse(_) | E::InvalidResponse(_) => None,
230    }
231}
232
233// ===========================================================================
234// SubprocessCallbackInvoker
235// ===========================================================================
236
237/// Subprocess-backed [`CallbackInvoker`].
238///
239/// Spawns a fresh child per invocation. The protocol is intentionally
240/// minimal: stdin receives a single JSON-serialized
241/// [`DispatchEnvelope`] (carrying the [`crate::CallbackRequest`] and
242/// any opaque [`PayloadEnvelope`] bodies); stdout returns a single
243/// JSON-serialized [`CallbackResponse`]. The child must exit zero on
244/// success. Anything outside that contract surfaces as a typed
245/// [`SubprocessInvokerError`].
246///
247/// # Payload bodies
248///
249/// Payload bodies flow through the subprocess channel inside the
250/// [`DispatchEnvelope`] (issue #22). Bodies are transported verbatim:
251/// Lifeloop never parses `body` or dereferences `body_ref`. Clients
252/// receive the same envelopes a caller passed to
253/// [`CallbackInvoker::invoke`].
254#[derive(Debug, Clone)]
255pub struct SubprocessCallbackInvoker {
256    config: SubprocessInvokerConfig,
257}
258
259impl SubprocessCallbackInvoker {
260    pub fn new(config: SubprocessInvokerConfig) -> Self {
261        Self { config }
262    }
263
264    pub fn config(&self) -> &SubprocessInvokerConfig {
265        &self.config
266    }
267
268    /// Internal: build the [`CallbackRequest`], spawn, write, read,
269    /// validate. Factored out so [`CallbackInvoker::invoke`] stays a
270    /// thin adapter.
271    fn invoke_inner(
272        &self,
273        plan: &RoutingPlan,
274        payloads: &[PayloadEnvelope],
275    ) -> Result<CallbackResponse, SubprocessInvokerError> {
276        // Pre-spawn guard: receipt.emitted must never produce a
277        // downstream invocation. Same rule as the receipt emitter,
278        // enforced before any IO so a misuse cannot leak side
279        // effects (spawned process, stderr, exit-code observability).
280        validate_receipt_eligible(plan).map_err(SubprocessInvokerError::ReceiptEmittedRejected)?;
281
282        let request = super::callbacks::synthesize_request(plan);
283        let envelope = DispatchEnvelope::new(request, payloads.to_vec());
284        let request_bytes =
285            serde_json::to_vec(&envelope).map_err(SubprocessInvokerError::SerializeRequest)?;
286
287        let mut child = Command::new(&self.config.program)
288            .args(&self.config.args)
289            .stdin(Stdio::piped())
290            .stdout(Stdio::piped())
291            .stderr(Stdio::piped())
292            .spawn()
293            .map_err(SubprocessInvokerError::Spawn)?;
294
295        // Hand stdin/stdout/stderr to helper threads so the main
296        // thread can honor the timeout independently of how the
297        // child consumes input. Each thread reports its result over
298        // an mpsc channel so the main thread can bound every join by
299        // the configured deadline — a direct `JoinHandle::join()`
300        // would block indefinitely if a descendant of the child
301        // inherited a pipe handle and held it open after the direct
302        // child exited.
303        let stdin = child.stdin.take().expect("stdin piped");
304        let stdout = child.stdout.take().expect("stdout piped");
305        let stderr = child.stderr.take().expect("stderr piped");
306
307        let (writer_tx, writer_rx) = mpsc::channel::<std::io::Result<()>>();
308        thread::spawn({
309            let bytes = request_bytes;
310            move || {
311                let mut stdin = stdin;
312                let result = stdin.write_all(&bytes).and_then(|()| stdin.flush());
313                let _ = writer_tx.send(result);
314                // stdin dropped on scope exit, signaling EOF.
315            }
316        });
317
318        let (stdout_tx, stdout_rx) = mpsc::channel::<std::io::Result<Vec<u8>>>();
319        thread::spawn(move || {
320            let mut buf = Vec::new();
321            let mut s = stdout;
322            let result = s.read_to_end(&mut buf).map(|_| buf);
323            let _ = stdout_tx.send(result);
324        });
325
326        let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>();
327        thread::spawn(move || {
328            let mut buf = Vec::new();
329            let mut s = stderr;
330            let _ = s.read_to_end(&mut buf);
331            let _ = stderr_tx.send(buf);
332        });
333
334        let deadline = Instant::now() + self.config.timeout;
335        let exit_status = match wait_with_deadline(&mut child, deadline) {
336            Ok(status) => status,
337            Err(WaitError::Timeout) => {
338                let _ = child.kill();
339                let _ = child.wait();
340                return Err(SubprocessInvokerError::Timeout);
341            }
342            Err(WaitError::Io(e)) => {
343                let _ = child.kill();
344                let _ = child.wait();
345                return Err(SubprocessInvokerError::ReadResponse(e));
346            }
347        };
348
349        // Reap helper threads via channels with deadline-bounded
350        // recvs. After child exit the OS closes the child-side pipe
351        // ends, so on a well-behaved client all three threads finish
352        // immediately. If a descendant inherited stdout/stderr and is
353        // still holding a writer end, the read threads block — and
354        // we treat that as a transport timeout rather than hanging
355        // the invoker. The configured `timeout` therefore bounds the
356        // *total* round trip (write + child run + drain), not just
357        // child exit.
358        //
359        // A small grace window covers the common case where the
360        // child has just exited and the threads have not yet noticed
361        // EOF; without it, a deadline that already elapsed would
362        // spuriously time out a successful run.
363        let grace = Duration::from_millis(100);
364        let join_timeout = remaining(deadline).max(grace);
365
366        // Writer first: a stdin write failure means the request was
367        // not delivered intact, so we MUST surface it even when the
368        // child later exits zero. A misbehaving client that ignores
369        // stdin and fabricates a response is exactly the false-
370        // success path the transport contract forbids.
371        match writer_rx.recv_timeout(join_timeout) {
372            Ok(Ok(())) => {}
373            Ok(Err(e)) => return Err(SubprocessInvokerError::WriteRequest(e)),
374            Err(mpsc::RecvTimeoutError::Timeout) => return Err(SubprocessInvokerError::Timeout),
375            Err(mpsc::RecvTimeoutError::Disconnected) => {
376                return Err(SubprocessInvokerError::WriteRequest(std::io::Error::other(
377                    "writer thread disconnected before reporting result",
378                )));
379            }
380        }
381
382        let stdout_bytes = match stdout_rx.recv_timeout(join_timeout) {
383            Ok(Ok(buf)) => buf,
384            Ok(Err(e)) => return Err(SubprocessInvokerError::ReadResponse(e)),
385            Err(mpsc::RecvTimeoutError::Timeout) => return Err(SubprocessInvokerError::Timeout),
386            Err(mpsc::RecvTimeoutError::Disconnected) => {
387                return Err(SubprocessInvokerError::ReadResponse(std::io::Error::other(
388                    "stdout reader thread disconnected before reporting result",
389                )));
390            }
391        };
392
393        let stderr_bytes = stderr_rx.recv_timeout(join_timeout).unwrap_or_default();
394        let stderr_text = String::from_utf8_lossy(&stderr_bytes).into_owned();
395
396        if !exit_status.success() {
397            return Err(SubprocessInvokerError::NonZeroExit {
398                code: exit_status.code(),
399                stderr: stderr_text,
400            });
401        }
402
403        let response: CallbackResponse =
404            serde_json::from_slice(&stdout_bytes).map_err(SubprocessInvokerError::ParseResponse)?;
405        response
406            .validate()
407            .map_err(SubprocessInvokerError::InvalidResponse)?;
408        Ok(response)
409    }
410}
411
412impl CallbackInvoker for SubprocessCallbackInvoker {
413    type Error = SubprocessInvokerError;
414
415    fn invoke(
416        &self,
417        plan: &RoutingPlan,
418        payloads: &[PayloadEnvelope],
419    ) -> Result<CallbackResponse, Self::Error> {
420        self.invoke_inner(plan, payloads)
421    }
422}
423
424// ===========================================================================
425// wait_with_deadline
426// ===========================================================================
427
428enum WaitError {
429    Timeout,
430    Io(std::io::Error),
431}
432
433/// Poll-based deadline wait. `std::process::Child` has no portable
434/// timed-wait; we poll `try_wait` with bounded sleeps. The poll
435/// interval is short relative to typical lifecycle SLOs (10ms) and
436/// scales with the remaining deadline so a 5-second timeout does not
437/// burn CPU.
438/// Time remaining until `deadline`. Saturates at zero so callers can
439/// use it as a `recv_timeout` argument without underflowing.
440fn remaining(deadline: Instant) -> Duration {
441    deadline.saturating_duration_since(Instant::now())
442}
443
444fn wait_with_deadline(
445    child: &mut Child,
446    deadline: Instant,
447) -> Result<std::process::ExitStatus, WaitError> {
448    let mut interval = Duration::from_millis(2);
449    let cap = Duration::from_millis(50);
450    loop {
451        match child.try_wait() {
452            Ok(Some(status)) => return Ok(status),
453            Ok(None) => {}
454            Err(e) => return Err(WaitError::Io(e)),
455        }
456        let now = Instant::now();
457        if now >= deadline {
458            // One last try_wait to avoid losing a race where the
459            // child exited between the previous check and now.
460            match child.try_wait() {
461                Ok(Some(status)) => return Ok(status),
462                Ok(None) => return Err(WaitError::Timeout),
463                Err(e) => return Err(WaitError::Io(e)),
464            }
465        }
466        let remaining = deadline.saturating_duration_since(now);
467        thread::sleep(interval.min(remaining));
468        if interval < cap {
469            interval = (interval * 2).min(cap);
470        }
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    #[test]
479    fn failure_class_mapping_is_deterministic() {
480        let cases: Vec<(SubprocessInvokerError, FailureClass)> = vec![
481            (
482                SubprocessInvokerError::ReceiptEmittedRejected(RouteError::InvalidEventEnvelope {
483                    detail: "x".into(),
484                }),
485                FailureClass::InvalidRequest,
486            ),
487            (
488                SubprocessInvokerError::Spawn(std::io::Error::other("nope")),
489                FailureClass::TransportError,
490            ),
491            (
492                SubprocessInvokerError::WriteRequest(std::io::Error::other("epipe")),
493                FailureClass::TransportError,
494            ),
495            (
496                SubprocessInvokerError::ReadResponse(std::io::Error::other("eof")),
497                FailureClass::TransportError,
498            ),
499            (
500                SubprocessInvokerError::NonZeroExit {
501                    code: Some(1),
502                    stderr: "bang".into(),
503                },
504                FailureClass::TransportError,
505            ),
506            (SubprocessInvokerError::Timeout, FailureClass::Timeout),
507            (
508                SubprocessInvokerError::ParseResponse(
509                    serde_json::from_str::<serde_json::Value>("not json").unwrap_err(),
510                ),
511                FailureClass::InvalidRequest,
512            ),
513        ];
514        for (err, expected) in cases {
515            let fc = failure_class_for_subprocess_error(&err);
516            assert_eq!(fc, expected, "subprocess err -> failure class: {err}");
517            let via_from: FailureClass = (&err).into();
518            assert_eq!(via_from, fc);
519        }
520    }
521
522    #[test]
523    fn transport_error_for_distinguishes_retryable_shapes() {
524        assert!(matches!(
525            transport_error_for(&SubprocessInvokerError::Timeout),
526            Some(TransportError::Timeout)
527        ));
528        assert!(matches!(
529            transport_error_for(&SubprocessInvokerError::Spawn(std::io::Error::other("x"))),
530            Some(TransportError::Io(_))
531        ));
532        assert!(
533            transport_error_for(&SubprocessInvokerError::ReceiptEmittedRejected(
534                RouteError::InvalidEventEnvelope { detail: "x".into() }
535            ))
536            .is_none()
537        );
538    }
539
540    #[test]
541    fn config_is_chainable() {
542        let cfg = SubprocessInvokerConfig::new("/bin/cat", Duration::from_secs(1))
543            .arg("-")
544            .args(["--flag"]);
545        assert_eq!(cfg.program(), &PathBuf::from("/bin/cat"));
546        assert_eq!(cfg.timeout(), Duration::from_secs(1));
547    }
548}