Skip to main content

lifeloop/router/
subprocess.rs

1//! Subprocess-backed callback invocation (issue #8).
2//!
3//! Fills the [`CallbackInvoker`] seam declared in `src/router/seams.rs`
4//! (issue #7) for the **process-boundary** transport. This module is
5//! the load-bearing proof that Lifeloop can drive an external client
6//! command without taking a Rust dependency on it: the only contract
7//! between Lifeloop and the client is JSON over stdio.
8//!
9//! # Boundary
10//!
11//! Owns:
12//! * [`SubprocessInvokerConfig`] — configurable command/args/timeout
13//!   used to spawn the external client per invocation;
14//! * [`SubprocessCallbackInvoker`] — concrete [`CallbackInvoker`]
15//!   implementation that spawns a fresh child for each event, pipes a
16//!   [`crate::CallbackRequest`] JSON document to its stdin, reads a
17//!   [`CallbackResponse`] JSON document from its stdout, and validates
18//!   it;
19//! * [`SubprocessInvokerError`] — typed error variants carrying enough
20//!   detail for [`super::LifeloopFailureMapper`] to map onto the
21//!   shared [`crate::FailureClass`] vocabulary;
22//! * [`failure_class_for_subprocess_error`] / `From<&SubprocessInvokerError>
23//!   for FailureClass` — deterministic mapping into the existing
24//!   failure-class vocabulary. No new variants are introduced.
25//!
26//! Does **not** own:
27//! * receipt synthesis (that is [`super::receipts`]);
28//! * wire schema (the request/response types come from the lifeloop
29//!   crate root and are unchanged by this module).
30//!
31//! # Receipt-emitted guard
32//!
33//! `receipt.emitted` is a notification event and must not be invoked
34//! downstream. The subprocess invoker rejects such plans **before** it
35//! spawns a child — same rule as the in-memory path, enforced via
36//! [`super::validate_receipt_eligible`]. The rejection surfaces as
37//! [`SubprocessInvokerError::ReceiptEmittedRejected`] which maps to
38//! [`crate::FailureClass::InvalidRequest`].
39
40use std::ffi::OsString;
41use std::io::{Read, Write};
42use std::path::PathBuf;
43use std::process::{Child, Command, Stdio};
44use std::sync::mpsc;
45use std::thread;
46use std::time::{Duration, Instant};
47
48#[cfg(unix)]
49use std::os::unix::process::CommandExt;
50
51use crate::{CallbackResponse, DispatchEnvelope, FailureClass, PayloadEnvelope, ValidationError};
52
53use super::failure_mapping::{TransportError, validate_receipt_eligible};
54use super::plan::RoutingPlan;
55use super::seams::CallbackInvoker;
56use super::validation::RouteError;
57
58// ===========================================================================
59// SubprocessInvokerConfig
60// ===========================================================================
61
62/// Configuration for [`SubprocessCallbackInvoker`].
63///
64/// `program` is the external client command (an absolute path to the
65/// client's callback binary). `args` are appended on every spawn.
66/// `timeout` bounds the total round trip — write request, read
67/// response, child exit. A child still running at deadline is killed
68/// and the invocation fails with [`SubprocessInvokerError::Timeout`].
69#[derive(Debug, Clone)]
70pub struct SubprocessInvokerConfig {
71    program: PathBuf,
72    args: Vec<OsString>,
73    timeout: Duration,
74}
75
76impl SubprocessInvokerConfig {
77    /// Construct a config from an explicit program path and timeout.
78    /// The default timeout is intentionally not provided — the caller
79    /// must pick a value that matches its lifecycle SLOs.
80    pub fn new(program: impl Into<PathBuf>, timeout: Duration) -> Self {
81        Self {
82            program: program.into(),
83            args: Vec::new(),
84            timeout,
85        }
86    }
87
88    /// Append a single argument. Returns `self` for chaining.
89    pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
90        self.args.push(arg.into());
91        self
92    }
93
94    /// Append a sequence of arguments. Returns `self` for chaining.
95    pub fn args<I, A>(mut self, args: I) -> Self
96    where
97        I: IntoIterator<Item = A>,
98        A: Into<OsString>,
99    {
100        self.args.extend(args.into_iter().map(Into::into));
101        self
102    }
103
104    pub fn program(&self) -> &PathBuf {
105        &self.program
106    }
107
108    pub fn timeout(&self) -> Duration {
109        self.timeout
110    }
111}
112
113// ===========================================================================
114// SubprocessInvokerError
115// ===========================================================================
116
117/// Failure variants surfaced by [`SubprocessCallbackInvoker::invoke`].
118///
119/// Every variant maps deterministically onto a single
120/// [`FailureClass`] from the shared vocabulary — no new failure
121/// classes are introduced by the subprocess transport.
122#[derive(Debug)]
123pub enum SubprocessInvokerError {
124    /// Plan rejected before spawn because the event is
125    /// `receipt.emitted` (a notification event that must not produce
126    /// downstream invocations). Maps to
127    /// [`FailureClass::InvalidRequest`].
128    ReceiptEmittedRejected(RouteError),
129    /// Failed to spawn the child (e.g. binary not found, permission
130    /// denied). Maps to [`FailureClass::TransportError`].
131    Spawn(std::io::Error),
132    /// Failed to write the request JSON to the child's stdin. Maps to
133    /// [`FailureClass::TransportError`].
134    WriteRequest(std::io::Error),
135    /// Failed to serialize the [`crate::CallbackRequest`] before writing.
136    /// Treated as an internal-class failure — the bug is on the
137    /// Lifeloop side, not the transport.
138    SerializeRequest(serde_json::Error),
139    /// Failed to read the child's stdout. Maps to
140    /// [`FailureClass::TransportError`].
141    ReadResponse(std::io::Error),
142    /// Child wrote bytes that did not parse as JSON. Maps to
143    /// [`FailureClass::InvalidRequest`] — the response is malformed
144    /// at the wire layer.
145    ParseResponse(serde_json::Error),
146    /// JSON parsed cleanly but the response failed
147    /// [`CallbackResponse::validate`]. Maps to
148    /// [`FailureClass::InvalidRequest`].
149    InvalidResponse(ValidationError),
150    /// Child exited non-zero. Maps to
151    /// [`FailureClass::TransportError`] — the transport returned a
152    /// failure signal we cannot interpret further.
153    NonZeroExit { code: Option<i32>, stderr: String },
154    /// Round trip exceeded the configured timeout; child was killed.
155    /// Maps to [`FailureClass::Timeout`].
156    Timeout,
157}
158
159impl std::fmt::Display for SubprocessInvokerError {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        match self {
162            Self::ReceiptEmittedRejected(e) => {
163                write!(f, "subprocess invoker refused receipt.emitted plan: {e}")
164            }
165            Self::Spawn(e) => write!(f, "failed to spawn callback subprocess: {e}"),
166            Self::WriteRequest(e) => write!(f, "failed to write request to subprocess stdin: {e}"),
167            Self::SerializeRequest(e) => write!(f, "failed to serialize CallbackRequest: {e}"),
168            Self::ReadResponse(e) => write!(f, "failed to read subprocess stdout: {e}"),
169            Self::ParseResponse(e) => write!(
170                f,
171                "subprocess stdout was not a valid JSON CallbackResponse: {e}"
172            ),
173            Self::InvalidResponse(e) => write!(
174                f,
175                "subprocess returned a CallbackResponse that failed validation: {e}"
176            ),
177            Self::NonZeroExit { code, stderr } => match code {
178                Some(c) => write!(f, "callback subprocess exited with code {c}: {stderr}"),
179                None => write!(f, "callback subprocess terminated by signal: {stderr}"),
180            },
181            Self::Timeout => f.write_str("callback subprocess exceeded configured timeout"),
182        }
183    }
184}
185
186impl std::error::Error for SubprocessInvokerError {}
187
188/// Map a [`SubprocessInvokerError`] onto the shared
189/// [`FailureClass`] vocabulary.
190///
191/// Pure function: same variant always maps to the same class so a
192/// receipt ledger replays consistently. Stays aligned with the
193/// in-process [`TransportError`] mapping in
194/// `super::failure_mapping` (private).
195pub fn failure_class_for_subprocess_error(err: &SubprocessInvokerError) -> FailureClass {
196    use SubprocessInvokerError as E;
197    match err {
198        E::ReceiptEmittedRejected(_) => FailureClass::InvalidRequest,
199        E::Spawn(_) | E::WriteRequest(_) | E::ReadResponse(_) | E::NonZeroExit { .. } => {
200            FailureClass::TransportError
201        }
202        E::SerializeRequest(_) => FailureClass::InternalError,
203        E::ParseResponse(_) | E::InvalidResponse(_) => FailureClass::InvalidRequest,
204        E::Timeout => FailureClass::Timeout,
205    }
206}
207
208impl From<&SubprocessInvokerError> for FailureClass {
209    fn from(err: &SubprocessInvokerError) -> Self {
210        failure_class_for_subprocess_error(err)
211    }
212}
213
214/// Convert a [`SubprocessInvokerError`] into the shared
215/// [`TransportError`] enum so callers that already speak in
216/// [`super::LifeloopFailureMapper::map_transport_error`] terms can
217/// route subprocess failures through the same path. `None` for
218/// variants that are not transport-class (validation, parse,
219/// receipt-emitted rejection).
220pub fn transport_error_for(err: &SubprocessInvokerError) -> Option<TransportError> {
221    use SubprocessInvokerError as E;
222    match err {
223        E::Spawn(e) | E::WriteRequest(e) | E::ReadResponse(e) => {
224            Some(TransportError::Io(e.to_string()))
225        }
226        E::NonZeroExit { code, stderr } => Some(TransportError::Io(match code {
227            Some(c) => format!("exit code {c}: {stderr}"),
228            None => format!("terminated by signal: {stderr}"),
229        })),
230        E::Timeout => Some(TransportError::Timeout),
231        E::SerializeRequest(e) => Some(TransportError::Internal(e.to_string())),
232        E::ReceiptEmittedRejected(_) | E::ParseResponse(_) | E::InvalidResponse(_) => None,
233    }
234}
235
236// ===========================================================================
237// SubprocessCallbackInvoker
238// ===========================================================================
239
240const MAX_STDOUT_BYTES: u64 = 16 * 1024 * 1024;
241const MAX_STDERR_BYTES: u64 = 256 * 1024;
242
243/// Subprocess-backed [`CallbackInvoker`].
244///
245/// Spawns a fresh child per invocation. The protocol is intentionally
246/// minimal: stdin receives a single JSON-serialized
247/// [`DispatchEnvelope`] (carrying the [`crate::CallbackRequest`] and
248/// any opaque [`PayloadEnvelope`] bodies); stdout returns a single
249/// JSON-serialized [`CallbackResponse`]. The child must exit zero on
250/// success. Anything outside that contract surfaces as a typed
251/// [`SubprocessInvokerError`].
252///
253/// # Payload bodies
254///
255/// Payload bodies flow through the subprocess channel inside the
256/// [`DispatchEnvelope`] (issue #22). Bodies are transported verbatim:
257/// Lifeloop never parses `body` or dereferences `body_ref`. Clients
258/// receive the same envelopes a caller passed to
259/// [`CallbackInvoker::invoke`].
260#[derive(Debug, Clone)]
261pub struct SubprocessCallbackInvoker {
262    config: SubprocessInvokerConfig,
263}
264
265impl SubprocessCallbackInvoker {
266    pub fn new(config: SubprocessInvokerConfig) -> Self {
267        Self { config }
268    }
269
270    pub fn config(&self) -> &SubprocessInvokerConfig {
271        &self.config
272    }
273
274    /// Internal: build the [`crate::CallbackRequest`], spawn, write, read,
275    /// validate. Factored out so [`CallbackInvoker::invoke`] stays a
276    /// thin adapter.
277    fn invoke_inner(
278        &self,
279        plan: &RoutingPlan,
280        payloads: &[PayloadEnvelope],
281    ) -> Result<CallbackResponse, SubprocessInvokerError> {
282        // Pre-spawn guard: receipt.emitted must never produce a
283        // downstream invocation. Same rule as the receipt emitter,
284        // enforced before any IO so a misuse cannot leak side
285        // effects (spawned process, stderr, exit-code observability).
286        validate_receipt_eligible(plan).map_err(SubprocessInvokerError::ReceiptEmittedRejected)?;
287
288        let request = super::callbacks::synthesize_request(plan);
289        let envelope = DispatchEnvelope::new(request, payloads.to_vec());
290        let request_bytes =
291            serde_json::to_vec(&envelope).map_err(SubprocessInvokerError::SerializeRequest)?;
292
293        let mut command = Command::new(&self.config.program);
294        command
295            .args(&self.config.args)
296            .stdin(Stdio::piped())
297            .stdout(Stdio::piped())
298            .stderr(Stdio::piped());
299        #[cfg(unix)]
300        {
301            command.process_group(0);
302        }
303        let mut child = command.spawn().map_err(SubprocessInvokerError::Spawn)?;
304
305        // Hand stdin/stdout/stderr to helper threads so the main
306        // thread can honor the timeout independently of how the
307        // child consumes input. Each thread reports its result over
308        // an mpsc channel so the main thread can bound every join by
309        // the configured deadline — a direct `JoinHandle::join()`
310        // would block indefinitely if a descendant of the child
311        // inherited a pipe handle and held it open after the direct
312        // child exited.
313        let stdin = child.stdin.take().expect("stdin piped");
314        let stdout = child.stdout.take().expect("stdout piped");
315        let stderr = child.stderr.take().expect("stderr piped");
316
317        let (writer_tx, writer_rx) = mpsc::channel::<std::io::Result<()>>();
318        thread::spawn({
319            let bytes = request_bytes;
320            move || {
321                let mut stdin = stdin;
322                let result = stdin.write_all(&bytes).and_then(|()| stdin.flush());
323                let _ = writer_tx.send(result);
324                // stdin dropped on scope exit, signaling EOF.
325            }
326        });
327
328        let (stdout_tx, stdout_rx) = mpsc::channel::<std::io::Result<Vec<u8>>>();
329        thread::spawn(move || {
330            let mut s = stdout;
331            let result = read_to_end_limited(&mut s, MAX_STDOUT_BYTES, "stdout");
332            let _ = stdout_tx.send(result);
333        });
334
335        let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>();
336        thread::spawn(move || {
337            let mut s = stderr;
338            let buf = read_to_end_truncated(&mut s, MAX_STDERR_BYTES).unwrap_or_default();
339            let _ = stderr_tx.send(buf);
340        });
341
342        let deadline = Instant::now() + self.config.timeout;
343        let exit_status = match wait_with_deadline(&mut child, deadline) {
344            Ok(status) => status,
345            Err(WaitError::Timeout) => {
346                terminate_child_tree(&mut child);
347                return Err(SubprocessInvokerError::Timeout);
348            }
349            Err(WaitError::Io(e)) => {
350                terminate_child_tree(&mut child);
351                return Err(SubprocessInvokerError::ReadResponse(e));
352            }
353        };
354
355        // Reap helper threads via channels with deadline-bounded
356        // recvs. After child exit the OS closes the child-side pipe
357        // ends, so on a well-behaved client all three threads finish
358        // immediately. If a descendant inherited stdout/stderr and is
359        // still holding a writer end, the read threads block — and
360        // we treat that as a transport timeout rather than hanging
361        // the invoker. The configured `timeout` therefore bounds the
362        // *total* round trip (write + child run + drain), not just
363        // child exit.
364        //
365        // A small grace window covers the common case where the
366        // child has just exited and the threads have not yet noticed
367        // EOF; without it, a deadline that already elapsed would
368        // spuriously time out a successful run.
369        let grace = Duration::from_millis(100);
370        let join_timeout = remaining(deadline).max(grace);
371
372        // Writer first: a stdin write failure means the request was
373        // not delivered intact, so we MUST surface it even when the
374        // child later exits zero. A misbehaving client that ignores
375        // stdin and fabricates a response is exactly the false-
376        // success path the transport contract forbids.
377        //
378        // Exception: EPIPE on stdin almost always means the child
379        // already closed its read end — i.e. it has exited. When that
380        // exit was non-zero, the exit code is the truthful diagnosis;
381        // the BrokenPipe is its downstream symptom, and surfacing it
382        // would mask the real failure (and make the error scheduling-
383        // dependent, since a slower runner observes EPIPE before the
384        // writer finishes). The zero-exit guard above is preserved.
385        match writer_rx.recv_timeout(join_timeout) {
386            Ok(Ok(())) => {}
387            Ok(Err(e)) => {
388                if e.kind() == std::io::ErrorKind::BrokenPipe && !exit_status.success() {
389                    let stderr_bytes = stderr_rx.recv_timeout(join_timeout).unwrap_or_default();
390                    let stderr_text = String::from_utf8_lossy(&stderr_bytes).into_owned();
391                    return Err(SubprocessInvokerError::NonZeroExit {
392                        code: exit_status.code(),
393                        stderr: stderr_text,
394                    });
395                }
396                return Err(SubprocessInvokerError::WriteRequest(e));
397            }
398            Err(mpsc::RecvTimeoutError::Timeout) => {
399                terminate_child_tree(&mut child);
400                return Err(SubprocessInvokerError::Timeout);
401            }
402            Err(mpsc::RecvTimeoutError::Disconnected) => {
403                return Err(SubprocessInvokerError::WriteRequest(std::io::Error::other(
404                    "writer thread disconnected before reporting result",
405                )));
406            }
407        }
408
409        let stdout_bytes = match stdout_rx.recv_timeout(join_timeout) {
410            Ok(Ok(buf)) => buf,
411            Ok(Err(e)) => return Err(SubprocessInvokerError::ReadResponse(e)),
412            Err(mpsc::RecvTimeoutError::Timeout) => {
413                terminate_child_tree(&mut child);
414                return Err(SubprocessInvokerError::Timeout);
415            }
416            Err(mpsc::RecvTimeoutError::Disconnected) => {
417                return Err(SubprocessInvokerError::ReadResponse(std::io::Error::other(
418                    "stdout reader thread disconnected before reporting result",
419                )));
420            }
421        };
422
423        let stderr_bytes = match stderr_rx.recv_timeout(join_timeout) {
424            Ok(buf) => buf,
425            Err(mpsc::RecvTimeoutError::Timeout) => {
426                terminate_child_tree(&mut child);
427                Vec::new()
428            }
429            Err(mpsc::RecvTimeoutError::Disconnected) => Vec::new(),
430        };
431        let stderr_text = String::from_utf8_lossy(&stderr_bytes).into_owned();
432
433        if !exit_status.success() {
434            return Err(SubprocessInvokerError::NonZeroExit {
435                code: exit_status.code(),
436                stderr: stderr_text,
437            });
438        }
439
440        let response: CallbackResponse =
441            serde_json::from_slice(&stdout_bytes).map_err(SubprocessInvokerError::ParseResponse)?;
442        response
443            .validate()
444            .map_err(SubprocessInvokerError::InvalidResponse)?;
445        Ok(response)
446    }
447}
448
449impl CallbackInvoker for SubprocessCallbackInvoker {
450    type Error = SubprocessInvokerError;
451
452    fn invoke(
453        &self,
454        plan: &RoutingPlan,
455        payloads: &[PayloadEnvelope],
456    ) -> Result<CallbackResponse, Self::Error> {
457        self.invoke_inner(plan, payloads)
458    }
459}
460
461fn read_to_end_limited<R: Read>(
462    reader: &mut R,
463    max_bytes: u64,
464    stream_name: &'static str,
465) -> std::io::Result<Vec<u8>> {
466    let mut buf = Vec::new();
467    reader.take(max_bytes + 1).read_to_end(&mut buf)?;
468    if buf.len() as u64 > max_bytes {
469        return Err(std::io::Error::other(format!(
470            "subprocess {stream_name} exceeded {max_bytes} bytes"
471        )));
472    }
473    Ok(buf)
474}
475
476fn read_to_end_truncated<R: Read>(reader: &mut R, max_bytes: u64) -> std::io::Result<Vec<u8>> {
477    let mut buf = Vec::new();
478    reader.take(max_bytes + 1).read_to_end(&mut buf)?;
479    if buf.len() as u64 > max_bytes {
480        buf.truncate(max_bytes as usize);
481        buf.extend_from_slice(b"\n[stderr truncated]\n");
482    }
483    Ok(buf)
484}
485
486fn terminate_child_tree(child: &mut Child) {
487    #[cfg(unix)]
488    terminate_process_group(child.id());
489    let _ = child.kill();
490    let _ = child.wait();
491}
492
493#[cfg(unix)]
494fn terminate_process_group(child_pid: u32) {
495    let pgid = format!("-{child_pid}");
496    let _ = Command::new("kill")
497        .args(["-TERM", "--", &pgid])
498        .stdout(Stdio::null())
499        .stderr(Stdio::null())
500        .status();
501    thread::sleep(Duration::from_millis(20));
502    let _ = Command::new("kill")
503        .args(["-KILL", "--", &pgid])
504        .stdout(Stdio::null())
505        .stderr(Stdio::null())
506        .status();
507}
508
509// ===========================================================================
510// wait_with_deadline
511// ===========================================================================
512
513enum WaitError {
514    Timeout,
515    Io(std::io::Error),
516}
517
518/// Poll-based deadline wait. `std::process::Child` has no portable
519/// timed-wait; we poll `try_wait` with bounded sleeps. The poll
520/// interval is short relative to typical lifecycle SLOs (10ms) and
521/// scales with the remaining deadline so a 5-second timeout does not
522/// burn CPU.
523/// Time remaining until `deadline`. Saturates at zero so callers can
524/// use it as a `recv_timeout` argument without underflowing.
525fn remaining(deadline: Instant) -> Duration {
526    deadline.saturating_duration_since(Instant::now())
527}
528
529fn wait_with_deadline(
530    child: &mut Child,
531    deadline: Instant,
532) -> Result<std::process::ExitStatus, WaitError> {
533    let mut interval = Duration::from_millis(2);
534    let cap = Duration::from_millis(50);
535    loop {
536        match child.try_wait() {
537            Ok(Some(status)) => return Ok(status),
538            Ok(None) => {}
539            Err(e) => return Err(WaitError::Io(e)),
540        }
541        let now = Instant::now();
542        if now >= deadline {
543            // One last try_wait to avoid losing a race where the
544            // child exited between the previous check and now.
545            match child.try_wait() {
546                Ok(Some(status)) => return Ok(status),
547                Ok(None) => return Err(WaitError::Timeout),
548                Err(e) => return Err(WaitError::Io(e)),
549            }
550        }
551        let remaining = deadline.saturating_duration_since(now);
552        thread::sleep(interval.min(remaining));
553        if interval < cap {
554            interval = (interval * 2).min(cap);
555        }
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use std::io::Cursor;
563
564    #[test]
565    fn failure_class_mapping_is_deterministic() {
566        let cases: Vec<(SubprocessInvokerError, FailureClass)> = vec![
567            (
568                SubprocessInvokerError::ReceiptEmittedRejected(RouteError::InvalidEventEnvelope {
569                    detail: "x".into(),
570                }),
571                FailureClass::InvalidRequest,
572            ),
573            (
574                SubprocessInvokerError::Spawn(std::io::Error::other("nope")),
575                FailureClass::TransportError,
576            ),
577            (
578                SubprocessInvokerError::WriteRequest(std::io::Error::other("epipe")),
579                FailureClass::TransportError,
580            ),
581            (
582                SubprocessInvokerError::ReadResponse(std::io::Error::other("eof")),
583                FailureClass::TransportError,
584            ),
585            (
586                SubprocessInvokerError::NonZeroExit {
587                    code: Some(1),
588                    stderr: "bang".into(),
589                },
590                FailureClass::TransportError,
591            ),
592            (SubprocessInvokerError::Timeout, FailureClass::Timeout),
593            (
594                SubprocessInvokerError::ParseResponse(
595                    serde_json::from_str::<serde_json::Value>("not json").unwrap_err(),
596                ),
597                FailureClass::InvalidRequest,
598            ),
599        ];
600        for (err, expected) in cases {
601            let fc = failure_class_for_subprocess_error(&err);
602            assert_eq!(fc, expected, "subprocess err -> failure class: {err}");
603            let via_from: FailureClass = (&err).into();
604            assert_eq!(via_from, fc);
605        }
606    }
607
608    #[test]
609    fn transport_error_for_distinguishes_retryable_shapes() {
610        assert!(matches!(
611            transport_error_for(&SubprocessInvokerError::Timeout),
612            Some(TransportError::Timeout)
613        ));
614        assert!(matches!(
615            transport_error_for(&SubprocessInvokerError::Spawn(std::io::Error::other("x"))),
616            Some(TransportError::Io(_))
617        ));
618        assert!(
619            transport_error_for(&SubprocessInvokerError::ReceiptEmittedRejected(
620                RouteError::InvalidEventEnvelope { detail: "x".into() }
621            ))
622            .is_none()
623        );
624    }
625
626    #[test]
627    fn read_to_end_limited_allows_exact_limit_and_rejects_overflow() {
628        let mut exact = Cursor::new(b"abcd".to_vec());
629        assert_eq!(
630            read_to_end_limited(&mut exact, 4, "stdout").unwrap(),
631            b"abcd"
632        );
633
634        let mut over = Cursor::new(b"abcde".to_vec());
635        let err = read_to_end_limited(&mut over, 4, "stdout").unwrap_err();
636        assert!(
637            err.to_string()
638                .contains("subprocess stdout exceeded 4 bytes")
639        );
640    }
641
642    #[test]
643    fn read_to_end_truncated_marks_only_over_limit_stderr() {
644        let mut exact = Cursor::new(b"abcd".to_vec());
645        assert_eq!(read_to_end_truncated(&mut exact, 4).unwrap(), b"abcd");
646
647        let mut over = Cursor::new(b"abcde".to_vec());
648        let mut expected = b"abcd".to_vec();
649        expected.extend_from_slice(b"\n[stderr truncated]\n");
650        assert_eq!(read_to_end_truncated(&mut over, 4).unwrap(), expected);
651    }
652
653    #[test]
654    fn remaining_reports_future_duration_and_saturates_past_deadline() {
655        let future = Instant::now() + Duration::from_secs(60);
656        assert!(remaining(future) > Duration::from_secs(59));
657
658        let past = Instant::now() - Duration::from_millis(1);
659        assert_eq!(remaining(past), Duration::ZERO);
660    }
661
662    #[test]
663    fn config_is_chainable() {
664        let cfg = SubprocessInvokerConfig::new("/bin/cat", Duration::from_secs(1))
665            .arg("-")
666            .args(["--flag"]);
667        assert_eq!(cfg.program(), &PathBuf::from("/bin/cat"));
668        assert_eq!(cfg.timeout(), Duration::from_secs(1));
669    }
670}