lifeloop/router/subprocess.rs
1//! Subprocess-backed callback invocation (issue #8).
2//!
3//! Fills the [`CallbackInvoker`] seam declared in `src/router/seams.rs`
4//! (issue #7) for the **process-boundary** transport. This module is
5//! the load-bearing proof that Lifeloop can drive an external client
6//! command without taking a Rust dependency on it: the only contract
7//! between Lifeloop and the client is JSON over stdio.
8//!
9//! # Boundary
10//!
11//! Owns:
12//! * [`SubprocessInvokerConfig`] — configurable command/args/timeout
13//! used to spawn the external client per invocation;
14//! * [`SubprocessCallbackInvoker`] — concrete [`CallbackInvoker`]
15//! implementation that spawns a fresh child for each event, pipes a
16//! [`CallbackRequest`] JSON document to its stdin, reads a
17//! [`CallbackResponse`] JSON document from its stdout, and validates
18//! it;
19//! * [`SubprocessInvokerError`] — typed error variants carrying enough
20//! detail for [`super::LifeloopFailureMapper`] to map onto the
21//! shared [`crate::FailureClass`] vocabulary;
22//! * [`failure_class_for_subprocess_error`] / `From<&SubprocessInvokerError>
23//! for FailureClass` — deterministic mapping into the existing
24//! failure-class vocabulary. No new variants are introduced.
25//!
26//! Does **not** own:
27//! * receipt synthesis (that is [`super::receipts`]);
28//! * wire schema (the request/response types come from the lifeloop
29//! crate root and are unchanged by this module).
30//!
31//! # Receipt-emitted guard
32//!
33//! `receipt.emitted` is a notification event and must not be invoked
34//! downstream. The subprocess invoker rejects such plans **before** it
35//! spawns a child — same rule as the in-memory path, enforced via
36//! [`super::validate_receipt_eligible`]. The rejection surfaces as
37//! [`SubprocessInvokerError::ReceiptEmittedRejected`] which maps to
38//! [`crate::FailureClass::InvalidRequest`].
39
40use std::ffi::OsString;
41use std::io::{Read, Write};
42use std::path::PathBuf;
43use std::process::{Child, Command, Stdio};
44use std::sync::mpsc;
45use std::thread;
46use std::time::{Duration, Instant};
47
48use crate::{CallbackResponse, DispatchEnvelope, FailureClass, PayloadEnvelope, ValidationError};
49
50use super::failure_mapping::{TransportError, validate_receipt_eligible};
51use super::plan::RoutingPlan;
52use super::seams::CallbackInvoker;
53use super::validation::RouteError;
54
55// ===========================================================================
56// SubprocessInvokerConfig
57// ===========================================================================
58
59/// Configuration for [`SubprocessCallbackInvoker`].
60///
61/// `program` is the external client command (an absolute path to the
62/// client's callback binary). `args` are appended on every spawn.
63/// `timeout` bounds the total round trip — write request, read
64/// response, child exit. A child still running at deadline is killed
65/// and the invocation fails with [`SubprocessInvokerError::Timeout`].
66#[derive(Debug, Clone)]
67pub struct SubprocessInvokerConfig {
68 program: PathBuf,
69 args: Vec<OsString>,
70 timeout: Duration,
71}
72
73impl SubprocessInvokerConfig {
74 /// Construct a config from an explicit program path and timeout.
75 /// The default timeout is intentionally not provided — the caller
76 /// must pick a value that matches its lifecycle SLOs.
77 pub fn new(program: impl Into<PathBuf>, timeout: Duration) -> Self {
78 Self {
79 program: program.into(),
80 args: Vec::new(),
81 timeout,
82 }
83 }
84
85 /// Append a single argument. Returns `self` for chaining.
86 pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
87 self.args.push(arg.into());
88 self
89 }
90
91 /// Append a sequence of arguments. Returns `self` for chaining.
92 pub fn args<I, A>(mut self, args: I) -> Self
93 where
94 I: IntoIterator<Item = A>,
95 A: Into<OsString>,
96 {
97 self.args.extend(args.into_iter().map(Into::into));
98 self
99 }
100
101 pub fn program(&self) -> &PathBuf {
102 &self.program
103 }
104
105 pub fn timeout(&self) -> Duration {
106 self.timeout
107 }
108}
109
110// ===========================================================================
111// SubprocessInvokerError
112// ===========================================================================
113
114/// Failure variants surfaced by [`SubprocessCallbackInvoker::invoke`].
115///
116/// Every variant maps deterministically onto a single
117/// [`FailureClass`] from the shared vocabulary — no new failure
118/// classes are introduced by the subprocess transport.
119#[derive(Debug)]
120pub enum SubprocessInvokerError {
121 /// Plan rejected before spawn because the event is
122 /// `receipt.emitted` (a notification event that must not produce
123 /// downstream invocations). Maps to
124 /// [`FailureClass::InvalidRequest`].
125 ReceiptEmittedRejected(RouteError),
126 /// Failed to spawn the child (e.g. binary not found, permission
127 /// denied). Maps to [`FailureClass::TransportError`].
128 Spawn(std::io::Error),
129 /// Failed to write the request JSON to the child's stdin. Maps to
130 /// [`FailureClass::TransportError`].
131 WriteRequest(std::io::Error),
132 /// Failed to serialize the [`CallbackRequest`] before writing.
133 /// Treated as an internal-class failure — the bug is on the
134 /// Lifeloop side, not the transport.
135 SerializeRequest(serde_json::Error),
136 /// Failed to read the child's stdout. Maps to
137 /// [`FailureClass::TransportError`].
138 ReadResponse(std::io::Error),
139 /// Child wrote bytes that did not parse as JSON. Maps to
140 /// [`FailureClass::InvalidRequest`] — the response is malformed
141 /// at the wire layer.
142 ParseResponse(serde_json::Error),
143 /// JSON parsed cleanly but the response failed
144 /// [`CallbackResponse::validate`]. Maps to
145 /// [`FailureClass::InvalidRequest`].
146 InvalidResponse(ValidationError),
147 /// Child exited non-zero. Maps to
148 /// [`FailureClass::TransportError`] — the transport returned a
149 /// failure signal we cannot interpret further.
150 NonZeroExit { code: Option<i32>, stderr: String },
151 /// Round trip exceeded the configured timeout; child was killed.
152 /// Maps to [`FailureClass::Timeout`].
153 Timeout,
154}
155
156impl std::fmt::Display for SubprocessInvokerError {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 match self {
159 Self::ReceiptEmittedRejected(e) => {
160 write!(f, "subprocess invoker refused receipt.emitted plan: {e}")
161 }
162 Self::Spawn(e) => write!(f, "failed to spawn callback subprocess: {e}"),
163 Self::WriteRequest(e) => write!(f, "failed to write request to subprocess stdin: {e}"),
164 Self::SerializeRequest(e) => write!(f, "failed to serialize CallbackRequest: {e}"),
165 Self::ReadResponse(e) => write!(f, "failed to read subprocess stdout: {e}"),
166 Self::ParseResponse(e) => write!(
167 f,
168 "subprocess stdout was not a valid JSON CallbackResponse: {e}"
169 ),
170 Self::InvalidResponse(e) => write!(
171 f,
172 "subprocess returned a CallbackResponse that failed validation: {e}"
173 ),
174 Self::NonZeroExit { code, stderr } => match code {
175 Some(c) => write!(f, "callback subprocess exited with code {c}: {stderr}"),
176 None => write!(f, "callback subprocess terminated by signal: {stderr}"),
177 },
178 Self::Timeout => f.write_str("callback subprocess exceeded configured timeout"),
179 }
180 }
181}
182
183impl std::error::Error for SubprocessInvokerError {}
184
185/// Map a [`SubprocessInvokerError`] onto the shared
186/// [`FailureClass`] vocabulary.
187///
188/// Pure function: same variant always maps to the same class so a
189/// receipt ledger replays consistently. Stays aligned with the
190/// in-process [`TransportError`] mapping in
191/// [`super::failure_mapping`].
192pub fn failure_class_for_subprocess_error(err: &SubprocessInvokerError) -> FailureClass {
193 use SubprocessInvokerError as E;
194 match err {
195 E::ReceiptEmittedRejected(_) => FailureClass::InvalidRequest,
196 E::Spawn(_) | E::WriteRequest(_) | E::ReadResponse(_) | E::NonZeroExit { .. } => {
197 FailureClass::TransportError
198 }
199 E::SerializeRequest(_) => FailureClass::InternalError,
200 E::ParseResponse(_) | E::InvalidResponse(_) => FailureClass::InvalidRequest,
201 E::Timeout => FailureClass::Timeout,
202 }
203}
204
205impl From<&SubprocessInvokerError> for FailureClass {
206 fn from(err: &SubprocessInvokerError) -> Self {
207 failure_class_for_subprocess_error(err)
208 }
209}
210
211/// Convert a [`SubprocessInvokerError`] into the shared
212/// [`TransportError`] enum so callers that already speak in
213/// [`super::LifeloopFailureMapper::map_transport_error`] terms can
214/// route subprocess failures through the same path. `None` for
215/// variants that are not transport-class (validation, parse,
216/// receipt-emitted rejection).
217pub fn transport_error_for(err: &SubprocessInvokerError) -> Option<TransportError> {
218 use SubprocessInvokerError as E;
219 match err {
220 E::Spawn(e) | E::WriteRequest(e) | E::ReadResponse(e) => {
221 Some(TransportError::Io(e.to_string()))
222 }
223 E::NonZeroExit { code, stderr } => Some(TransportError::Io(match code {
224 Some(c) => format!("exit code {c}: {stderr}"),
225 None => format!("terminated by signal: {stderr}"),
226 })),
227 E::Timeout => Some(TransportError::Timeout),
228 E::SerializeRequest(e) => Some(TransportError::Internal(e.to_string())),
229 E::ReceiptEmittedRejected(_) | E::ParseResponse(_) | E::InvalidResponse(_) => None,
230 }
231}
232
233// ===========================================================================
234// SubprocessCallbackInvoker
235// ===========================================================================
236
237/// Subprocess-backed [`CallbackInvoker`].
238///
239/// Spawns a fresh child per invocation. The protocol is intentionally
240/// minimal: stdin receives a single JSON-serialized
241/// [`DispatchEnvelope`] (carrying the [`crate::CallbackRequest`] and
242/// any opaque [`PayloadEnvelope`] bodies); stdout returns a single
243/// JSON-serialized [`CallbackResponse`]. The child must exit zero on
244/// success. Anything outside that contract surfaces as a typed
245/// [`SubprocessInvokerError`].
246///
247/// # Payload bodies
248///
249/// Payload bodies flow through the subprocess channel inside the
250/// [`DispatchEnvelope`] (issue #22). Bodies are transported verbatim:
251/// Lifeloop never parses `body` or dereferences `body_ref`. Clients
252/// receive the same envelopes a caller passed to
253/// [`CallbackInvoker::invoke`].
254#[derive(Debug, Clone)]
255pub struct SubprocessCallbackInvoker {
256 config: SubprocessInvokerConfig,
257}
258
259impl SubprocessCallbackInvoker {
260 pub fn new(config: SubprocessInvokerConfig) -> Self {
261 Self { config }
262 }
263
264 pub fn config(&self) -> &SubprocessInvokerConfig {
265 &self.config
266 }
267
268 /// Internal: build the [`CallbackRequest`], spawn, write, read,
269 /// validate. Factored out so [`CallbackInvoker::invoke`] stays a
270 /// thin adapter.
271 fn invoke_inner(
272 &self,
273 plan: &RoutingPlan,
274 payloads: &[PayloadEnvelope],
275 ) -> Result<CallbackResponse, SubprocessInvokerError> {
276 // Pre-spawn guard: receipt.emitted must never produce a
277 // downstream invocation. Same rule as the receipt emitter,
278 // enforced before any IO so a misuse cannot leak side
279 // effects (spawned process, stderr, exit-code observability).
280 validate_receipt_eligible(plan).map_err(SubprocessInvokerError::ReceiptEmittedRejected)?;
281
282 let request = super::callbacks::synthesize_request(plan);
283 let envelope = DispatchEnvelope::new(request, payloads.to_vec());
284 let request_bytes =
285 serde_json::to_vec(&envelope).map_err(SubprocessInvokerError::SerializeRequest)?;
286
287 let mut child = Command::new(&self.config.program)
288 .args(&self.config.args)
289 .stdin(Stdio::piped())
290 .stdout(Stdio::piped())
291 .stderr(Stdio::piped())
292 .spawn()
293 .map_err(SubprocessInvokerError::Spawn)?;
294
295 // Hand stdin/stdout/stderr to helper threads so the main
296 // thread can honor the timeout independently of how the
297 // child consumes input. Each thread reports its result over
298 // an mpsc channel so the main thread can bound every join by
299 // the configured deadline — a direct `JoinHandle::join()`
300 // would block indefinitely if a descendant of the child
301 // inherited a pipe handle and held it open after the direct
302 // child exited.
303 let stdin = child.stdin.take().expect("stdin piped");
304 let stdout = child.stdout.take().expect("stdout piped");
305 let stderr = child.stderr.take().expect("stderr piped");
306
307 let (writer_tx, writer_rx) = mpsc::channel::<std::io::Result<()>>();
308 thread::spawn({
309 let bytes = request_bytes;
310 move || {
311 let mut stdin = stdin;
312 let result = stdin.write_all(&bytes).and_then(|()| stdin.flush());
313 let _ = writer_tx.send(result);
314 // stdin dropped on scope exit, signaling EOF.
315 }
316 });
317
318 let (stdout_tx, stdout_rx) = mpsc::channel::<std::io::Result<Vec<u8>>>();
319 thread::spawn(move || {
320 let mut buf = Vec::new();
321 let mut s = stdout;
322 let result = s.read_to_end(&mut buf).map(|_| buf);
323 let _ = stdout_tx.send(result);
324 });
325
326 let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>();
327 thread::spawn(move || {
328 let mut buf = Vec::new();
329 let mut s = stderr;
330 let _ = s.read_to_end(&mut buf);
331 let _ = stderr_tx.send(buf);
332 });
333
334 let deadline = Instant::now() + self.config.timeout;
335 let exit_status = match wait_with_deadline(&mut child, deadline) {
336 Ok(status) => status,
337 Err(WaitError::Timeout) => {
338 let _ = child.kill();
339 let _ = child.wait();
340 return Err(SubprocessInvokerError::Timeout);
341 }
342 Err(WaitError::Io(e)) => {
343 let _ = child.kill();
344 let _ = child.wait();
345 return Err(SubprocessInvokerError::ReadResponse(e));
346 }
347 };
348
349 // Reap helper threads via channels with deadline-bounded
350 // recvs. After child exit the OS closes the child-side pipe
351 // ends, so on a well-behaved client all three threads finish
352 // immediately. If a descendant inherited stdout/stderr and is
353 // still holding a writer end, the read threads block — and
354 // we treat that as a transport timeout rather than hanging
355 // the invoker. The configured `timeout` therefore bounds the
356 // *total* round trip (write + child run + drain), not just
357 // child exit.
358 //
359 // A small grace window covers the common case where the
360 // child has just exited and the threads have not yet noticed
361 // EOF; without it, a deadline that already elapsed would
362 // spuriously time out a successful run.
363 let grace = Duration::from_millis(100);
364 let join_timeout = remaining(deadline).max(grace);
365
366 // Writer first: a stdin write failure means the request was
367 // not delivered intact, so we MUST surface it even when the
368 // child later exits zero. A misbehaving client that ignores
369 // stdin and fabricates a response is exactly the false-
370 // success path the transport contract forbids.
371 match writer_rx.recv_timeout(join_timeout) {
372 Ok(Ok(())) => {}
373 Ok(Err(e)) => return Err(SubprocessInvokerError::WriteRequest(e)),
374 Err(mpsc::RecvTimeoutError::Timeout) => return Err(SubprocessInvokerError::Timeout),
375 Err(mpsc::RecvTimeoutError::Disconnected) => {
376 return Err(SubprocessInvokerError::WriteRequest(std::io::Error::other(
377 "writer thread disconnected before reporting result",
378 )));
379 }
380 }
381
382 let stdout_bytes = match stdout_rx.recv_timeout(join_timeout) {
383 Ok(Ok(buf)) => buf,
384 Ok(Err(e)) => return Err(SubprocessInvokerError::ReadResponse(e)),
385 Err(mpsc::RecvTimeoutError::Timeout) => return Err(SubprocessInvokerError::Timeout),
386 Err(mpsc::RecvTimeoutError::Disconnected) => {
387 return Err(SubprocessInvokerError::ReadResponse(std::io::Error::other(
388 "stdout reader thread disconnected before reporting result",
389 )));
390 }
391 };
392
393 let stderr_bytes = stderr_rx.recv_timeout(join_timeout).unwrap_or_default();
394 let stderr_text = String::from_utf8_lossy(&stderr_bytes).into_owned();
395
396 if !exit_status.success() {
397 return Err(SubprocessInvokerError::NonZeroExit {
398 code: exit_status.code(),
399 stderr: stderr_text,
400 });
401 }
402
403 let response: CallbackResponse =
404 serde_json::from_slice(&stdout_bytes).map_err(SubprocessInvokerError::ParseResponse)?;
405 response
406 .validate()
407 .map_err(SubprocessInvokerError::InvalidResponse)?;
408 Ok(response)
409 }
410}
411
412impl CallbackInvoker for SubprocessCallbackInvoker {
413 type Error = SubprocessInvokerError;
414
415 fn invoke(
416 &self,
417 plan: &RoutingPlan,
418 payloads: &[PayloadEnvelope],
419 ) -> Result<CallbackResponse, Self::Error> {
420 self.invoke_inner(plan, payloads)
421 }
422}
423
424// ===========================================================================
425// wait_with_deadline
426// ===========================================================================
427
428enum WaitError {
429 Timeout,
430 Io(std::io::Error),
431}
432
433/// Poll-based deadline wait. `std::process::Child` has no portable
434/// timed-wait; we poll `try_wait` with bounded sleeps. The poll
435/// interval is short relative to typical lifecycle SLOs (10ms) and
436/// scales with the remaining deadline so a 5-second timeout does not
437/// burn CPU.
438/// Time remaining until `deadline`. Saturates at zero so callers can
439/// use it as a `recv_timeout` argument without underflowing.
440fn remaining(deadline: Instant) -> Duration {
441 deadline.saturating_duration_since(Instant::now())
442}
443
444fn wait_with_deadline(
445 child: &mut Child,
446 deadline: Instant,
447) -> Result<std::process::ExitStatus, WaitError> {
448 let mut interval = Duration::from_millis(2);
449 let cap = Duration::from_millis(50);
450 loop {
451 match child.try_wait() {
452 Ok(Some(status)) => return Ok(status),
453 Ok(None) => {}
454 Err(e) => return Err(WaitError::Io(e)),
455 }
456 let now = Instant::now();
457 if now >= deadline {
458 // One last try_wait to avoid losing a race where the
459 // child exited between the previous check and now.
460 match child.try_wait() {
461 Ok(Some(status)) => return Ok(status),
462 Ok(None) => return Err(WaitError::Timeout),
463 Err(e) => return Err(WaitError::Io(e)),
464 }
465 }
466 let remaining = deadline.saturating_duration_since(now);
467 thread::sleep(interval.min(remaining));
468 if interval < cap {
469 interval = (interval * 2).min(cap);
470 }
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 #[test]
479 fn failure_class_mapping_is_deterministic() {
480 let cases: Vec<(SubprocessInvokerError, FailureClass)> = vec![
481 (
482 SubprocessInvokerError::ReceiptEmittedRejected(RouteError::InvalidEventEnvelope {
483 detail: "x".into(),
484 }),
485 FailureClass::InvalidRequest,
486 ),
487 (
488 SubprocessInvokerError::Spawn(std::io::Error::other("nope")),
489 FailureClass::TransportError,
490 ),
491 (
492 SubprocessInvokerError::WriteRequest(std::io::Error::other("epipe")),
493 FailureClass::TransportError,
494 ),
495 (
496 SubprocessInvokerError::ReadResponse(std::io::Error::other("eof")),
497 FailureClass::TransportError,
498 ),
499 (
500 SubprocessInvokerError::NonZeroExit {
501 code: Some(1),
502 stderr: "bang".into(),
503 },
504 FailureClass::TransportError,
505 ),
506 (SubprocessInvokerError::Timeout, FailureClass::Timeout),
507 (
508 SubprocessInvokerError::ParseResponse(
509 serde_json::from_str::<serde_json::Value>("not json").unwrap_err(),
510 ),
511 FailureClass::InvalidRequest,
512 ),
513 ];
514 for (err, expected) in cases {
515 let fc = failure_class_for_subprocess_error(&err);
516 assert_eq!(fc, expected, "subprocess err -> failure class: {err}");
517 let via_from: FailureClass = (&err).into();
518 assert_eq!(via_from, fc);
519 }
520 }
521
522 #[test]
523 fn transport_error_for_distinguishes_retryable_shapes() {
524 assert!(matches!(
525 transport_error_for(&SubprocessInvokerError::Timeout),
526 Some(TransportError::Timeout)
527 ));
528 assert!(matches!(
529 transport_error_for(&SubprocessInvokerError::Spawn(std::io::Error::other("x"))),
530 Some(TransportError::Io(_))
531 ));
532 assert!(
533 transport_error_for(&SubprocessInvokerError::ReceiptEmittedRejected(
534 RouteError::InvalidEventEnvelope { detail: "x".into() }
535 ))
536 .is_none()
537 );
538 }
539
540 #[test]
541 fn config_is_chainable() {
542 let cfg = SubprocessInvokerConfig::new("/bin/cat", Duration::from_secs(1))
543 .arg("-")
544 .args(["--flag"]);
545 assert_eq!(cfg.program(), &PathBuf::from("/bin/cat"));
546 assert_eq!(cfg.timeout(), Duration::from_secs(1));
547 }
548}