1use std::ffi::OsString;
41use std::io::{Read, Write};
42use std::path::PathBuf;
43use std::process::{Child, Command, Stdio};
44use std::sync::mpsc;
45use std::thread;
46use std::time::{Duration, Instant};
47
48#[cfg(unix)]
49use std::os::unix::process::CommandExt;
50
51use crate::{CallbackResponse, DispatchEnvelope, FailureClass, PayloadEnvelope, ValidationError};
52
53use super::failure_mapping::{TransportError, validate_receipt_eligible};
54use super::plan::RoutingPlan;
55use super::seams::CallbackInvoker;
56use super::validation::RouteError;
57
58#[derive(Debug, Clone)]
70pub struct SubprocessInvokerConfig {
71 program: PathBuf,
72 args: Vec<OsString>,
73 timeout: Duration,
74}
75
76impl SubprocessInvokerConfig {
77 pub fn new(program: impl Into<PathBuf>, timeout: Duration) -> Self {
81 Self {
82 program: program.into(),
83 args: Vec::new(),
84 timeout,
85 }
86 }
87
88 pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
90 self.args.push(arg.into());
91 self
92 }
93
94 pub fn args<I, A>(mut self, args: I) -> Self
96 where
97 I: IntoIterator<Item = A>,
98 A: Into<OsString>,
99 {
100 self.args.extend(args.into_iter().map(Into::into));
101 self
102 }
103
104 pub fn program(&self) -> &PathBuf {
105 &self.program
106 }
107
108 pub fn timeout(&self) -> Duration {
109 self.timeout
110 }
111}
112
113#[derive(Debug)]
123pub enum SubprocessInvokerError {
124 ReceiptEmittedRejected(RouteError),
129 Spawn(std::io::Error),
132 WriteRequest(std::io::Error),
135 SerializeRequest(serde_json::Error),
139 ReadResponse(std::io::Error),
142 ParseResponse(serde_json::Error),
146 InvalidResponse(ValidationError),
150 NonZeroExit { code: Option<i32>, stderr: String },
154 Timeout,
157}
158
159impl std::fmt::Display for SubprocessInvokerError {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 match self {
162 Self::ReceiptEmittedRejected(e) => {
163 write!(f, "subprocess invoker refused receipt.emitted plan: {e}")
164 }
165 Self::Spawn(e) => write!(f, "failed to spawn callback subprocess: {e}"),
166 Self::WriteRequest(e) => write!(f, "failed to write request to subprocess stdin: {e}"),
167 Self::SerializeRequest(e) => write!(f, "failed to serialize CallbackRequest: {e}"),
168 Self::ReadResponse(e) => write!(f, "failed to read subprocess stdout: {e}"),
169 Self::ParseResponse(e) => write!(
170 f,
171 "subprocess stdout was not a valid JSON CallbackResponse: {e}"
172 ),
173 Self::InvalidResponse(e) => write!(
174 f,
175 "subprocess returned a CallbackResponse that failed validation: {e}"
176 ),
177 Self::NonZeroExit { code, stderr } => match code {
178 Some(c) => write!(f, "callback subprocess exited with code {c}: {stderr}"),
179 None => write!(f, "callback subprocess terminated by signal: {stderr}"),
180 },
181 Self::Timeout => f.write_str("callback subprocess exceeded configured timeout"),
182 }
183 }
184}
185
186impl std::error::Error for SubprocessInvokerError {}
187
188pub fn failure_class_for_subprocess_error(err: &SubprocessInvokerError) -> FailureClass {
196 use SubprocessInvokerError as E;
197 match err {
198 E::ReceiptEmittedRejected(_) => FailureClass::InvalidRequest,
199 E::Spawn(_) | E::WriteRequest(_) | E::ReadResponse(_) | E::NonZeroExit { .. } => {
200 FailureClass::TransportError
201 }
202 E::SerializeRequest(_) => FailureClass::InternalError,
203 E::ParseResponse(_) | E::InvalidResponse(_) => FailureClass::InvalidRequest,
204 E::Timeout => FailureClass::Timeout,
205 }
206}
207
208impl From<&SubprocessInvokerError> for FailureClass {
209 fn from(err: &SubprocessInvokerError) -> Self {
210 failure_class_for_subprocess_error(err)
211 }
212}
213
214pub fn transport_error_for(err: &SubprocessInvokerError) -> Option<TransportError> {
221 use SubprocessInvokerError as E;
222 match err {
223 E::Spawn(e) | E::WriteRequest(e) | E::ReadResponse(e) => {
224 Some(TransportError::Io(e.to_string()))
225 }
226 E::NonZeroExit { code, stderr } => Some(TransportError::Io(match code {
227 Some(c) => format!("exit code {c}: {stderr}"),
228 None => format!("terminated by signal: {stderr}"),
229 })),
230 E::Timeout => Some(TransportError::Timeout),
231 E::SerializeRequest(e) => Some(TransportError::Internal(e.to_string())),
232 E::ReceiptEmittedRejected(_) | E::ParseResponse(_) | E::InvalidResponse(_) => None,
233 }
234}
235
236const MAX_STDOUT_BYTES: u64 = 16 * 1024 * 1024;
241const MAX_STDERR_BYTES: u64 = 256 * 1024;
242
243#[derive(Debug, Clone)]
261pub struct SubprocessCallbackInvoker {
262 config: SubprocessInvokerConfig,
263}
264
265impl SubprocessCallbackInvoker {
266 pub fn new(config: SubprocessInvokerConfig) -> Self {
267 Self { config }
268 }
269
270 pub fn config(&self) -> &SubprocessInvokerConfig {
271 &self.config
272 }
273
274 fn invoke_inner(
278 &self,
279 plan: &RoutingPlan,
280 payloads: &[PayloadEnvelope],
281 ) -> Result<CallbackResponse, SubprocessInvokerError> {
282 validate_receipt_eligible(plan).map_err(SubprocessInvokerError::ReceiptEmittedRejected)?;
287
288 let request = super::callbacks::synthesize_request(plan);
289 let envelope = DispatchEnvelope::new(request, payloads.to_vec());
290 let request_bytes =
291 serde_json::to_vec(&envelope).map_err(SubprocessInvokerError::SerializeRequest)?;
292
293 let mut command = Command::new(&self.config.program);
294 command
295 .args(&self.config.args)
296 .stdin(Stdio::piped())
297 .stdout(Stdio::piped())
298 .stderr(Stdio::piped());
299 #[cfg(unix)]
300 {
301 command.process_group(0);
302 }
303 let mut child = command.spawn().map_err(SubprocessInvokerError::Spawn)?;
304
305 let stdin = child.stdin.take().expect("stdin piped");
314 let stdout = child.stdout.take().expect("stdout piped");
315 let stderr = child.stderr.take().expect("stderr piped");
316
317 let (writer_tx, writer_rx) = mpsc::channel::<std::io::Result<()>>();
318 thread::spawn({
319 let bytes = request_bytes;
320 move || {
321 let mut stdin = stdin;
322 let result = stdin.write_all(&bytes).and_then(|()| stdin.flush());
323 let _ = writer_tx.send(result);
324 }
326 });
327
328 let (stdout_tx, stdout_rx) = mpsc::channel::<std::io::Result<Vec<u8>>>();
329 thread::spawn(move || {
330 let mut s = stdout;
331 let result = read_to_end_limited(&mut s, MAX_STDOUT_BYTES, "stdout");
332 let _ = stdout_tx.send(result);
333 });
334
335 let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>();
336 thread::spawn(move || {
337 let mut s = stderr;
338 let buf = read_to_end_truncated(&mut s, MAX_STDERR_BYTES).unwrap_or_default();
339 let _ = stderr_tx.send(buf);
340 });
341
342 let deadline = Instant::now() + self.config.timeout;
343 let exit_status = match wait_with_deadline(&mut child, deadline) {
344 Ok(status) => status,
345 Err(WaitError::Timeout) => {
346 terminate_child_tree(&mut child);
347 return Err(SubprocessInvokerError::Timeout);
348 }
349 Err(WaitError::Io(e)) => {
350 terminate_child_tree(&mut child);
351 return Err(SubprocessInvokerError::ReadResponse(e));
352 }
353 };
354
355 let grace = Duration::from_millis(100);
370 let join_timeout = remaining(deadline).max(grace);
371
372 match writer_rx.recv_timeout(join_timeout) {
386 Ok(Ok(())) => {}
387 Ok(Err(e)) => {
388 if e.kind() == std::io::ErrorKind::BrokenPipe && !exit_status.success() {
389 let stderr_bytes = stderr_rx.recv_timeout(join_timeout).unwrap_or_default();
390 let stderr_text = String::from_utf8_lossy(&stderr_bytes).into_owned();
391 return Err(SubprocessInvokerError::NonZeroExit {
392 code: exit_status.code(),
393 stderr: stderr_text,
394 });
395 }
396 return Err(SubprocessInvokerError::WriteRequest(e));
397 }
398 Err(mpsc::RecvTimeoutError::Timeout) => {
399 terminate_child_tree(&mut child);
400 return Err(SubprocessInvokerError::Timeout);
401 }
402 Err(mpsc::RecvTimeoutError::Disconnected) => {
403 return Err(SubprocessInvokerError::WriteRequest(std::io::Error::other(
404 "writer thread disconnected before reporting result",
405 )));
406 }
407 }
408
409 let stdout_bytes = match stdout_rx.recv_timeout(join_timeout) {
410 Ok(Ok(buf)) => buf,
411 Ok(Err(e)) => return Err(SubprocessInvokerError::ReadResponse(e)),
412 Err(mpsc::RecvTimeoutError::Timeout) => {
413 terminate_child_tree(&mut child);
414 return Err(SubprocessInvokerError::Timeout);
415 }
416 Err(mpsc::RecvTimeoutError::Disconnected) => {
417 return Err(SubprocessInvokerError::ReadResponse(std::io::Error::other(
418 "stdout reader thread disconnected before reporting result",
419 )));
420 }
421 };
422
423 let stderr_bytes = match stderr_rx.recv_timeout(join_timeout) {
424 Ok(buf) => buf,
425 Err(mpsc::RecvTimeoutError::Timeout) => {
426 terminate_child_tree(&mut child);
427 Vec::new()
428 }
429 Err(mpsc::RecvTimeoutError::Disconnected) => Vec::new(),
430 };
431 let stderr_text = String::from_utf8_lossy(&stderr_bytes).into_owned();
432
433 if !exit_status.success() {
434 return Err(SubprocessInvokerError::NonZeroExit {
435 code: exit_status.code(),
436 stderr: stderr_text,
437 });
438 }
439
440 let response: CallbackResponse =
441 serde_json::from_slice(&stdout_bytes).map_err(SubprocessInvokerError::ParseResponse)?;
442 response
443 .validate()
444 .map_err(SubprocessInvokerError::InvalidResponse)?;
445 Ok(response)
446 }
447}
448
449impl CallbackInvoker for SubprocessCallbackInvoker {
450 type Error = SubprocessInvokerError;
451
452 fn invoke(
453 &self,
454 plan: &RoutingPlan,
455 payloads: &[PayloadEnvelope],
456 ) -> Result<CallbackResponse, Self::Error> {
457 self.invoke_inner(plan, payloads)
458 }
459}
460
461fn read_to_end_limited<R: Read>(
462 reader: &mut R,
463 max_bytes: u64,
464 stream_name: &'static str,
465) -> std::io::Result<Vec<u8>> {
466 let mut buf = Vec::new();
467 reader.take(max_bytes + 1).read_to_end(&mut buf)?;
468 if buf.len() as u64 > max_bytes {
469 return Err(std::io::Error::other(format!(
470 "subprocess {stream_name} exceeded {max_bytes} bytes"
471 )));
472 }
473 Ok(buf)
474}
475
476fn read_to_end_truncated<R: Read>(reader: &mut R, max_bytes: u64) -> std::io::Result<Vec<u8>> {
477 let mut buf = Vec::new();
478 reader.take(max_bytes + 1).read_to_end(&mut buf)?;
479 if buf.len() as u64 > max_bytes {
480 buf.truncate(max_bytes as usize);
481 buf.extend_from_slice(b"\n[stderr truncated]\n");
482 }
483 Ok(buf)
484}
485
486fn terminate_child_tree(child: &mut Child) {
487 #[cfg(unix)]
488 terminate_process_group(child.id());
489 let _ = child.kill();
490 let _ = child.wait();
491}
492
493#[cfg(unix)]
494fn terminate_process_group(child_pid: u32) {
495 let pgid = format!("-{child_pid}");
496 let _ = Command::new("kill")
497 .args(["-TERM", "--", &pgid])
498 .stdout(Stdio::null())
499 .stderr(Stdio::null())
500 .status();
501 thread::sleep(Duration::from_millis(20));
502 let _ = Command::new("kill")
503 .args(["-KILL", "--", &pgid])
504 .stdout(Stdio::null())
505 .stderr(Stdio::null())
506 .status();
507}
508
509enum WaitError {
514 Timeout,
515 Io(std::io::Error),
516}
517
518fn remaining(deadline: Instant) -> Duration {
526 deadline.saturating_duration_since(Instant::now())
527}
528
529fn wait_with_deadline(
530 child: &mut Child,
531 deadline: Instant,
532) -> Result<std::process::ExitStatus, WaitError> {
533 let mut interval = Duration::from_millis(2);
534 let cap = Duration::from_millis(50);
535 loop {
536 match child.try_wait() {
537 Ok(Some(status)) => return Ok(status),
538 Ok(None) => {}
539 Err(e) => return Err(WaitError::Io(e)),
540 }
541 let now = Instant::now();
542 if now >= deadline {
543 match child.try_wait() {
546 Ok(Some(status)) => return Ok(status),
547 Ok(None) => return Err(WaitError::Timeout),
548 Err(e) => return Err(WaitError::Io(e)),
549 }
550 }
551 let remaining = deadline.saturating_duration_since(now);
552 thread::sleep(interval.min(remaining));
553 if interval < cap {
554 interval = (interval * 2).min(cap);
555 }
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use std::io::Cursor;
563
564 #[test]
565 fn failure_class_mapping_is_deterministic() {
566 let cases: Vec<(SubprocessInvokerError, FailureClass)> = vec![
567 (
568 SubprocessInvokerError::ReceiptEmittedRejected(RouteError::InvalidEventEnvelope {
569 detail: "x".into(),
570 }),
571 FailureClass::InvalidRequest,
572 ),
573 (
574 SubprocessInvokerError::Spawn(std::io::Error::other("nope")),
575 FailureClass::TransportError,
576 ),
577 (
578 SubprocessInvokerError::WriteRequest(std::io::Error::other("epipe")),
579 FailureClass::TransportError,
580 ),
581 (
582 SubprocessInvokerError::ReadResponse(std::io::Error::other("eof")),
583 FailureClass::TransportError,
584 ),
585 (
586 SubprocessInvokerError::NonZeroExit {
587 code: Some(1),
588 stderr: "bang".into(),
589 },
590 FailureClass::TransportError,
591 ),
592 (SubprocessInvokerError::Timeout, FailureClass::Timeout),
593 (
594 SubprocessInvokerError::ParseResponse(
595 serde_json::from_str::<serde_json::Value>("not json").unwrap_err(),
596 ),
597 FailureClass::InvalidRequest,
598 ),
599 ];
600 for (err, expected) in cases {
601 let fc = failure_class_for_subprocess_error(&err);
602 assert_eq!(fc, expected, "subprocess err -> failure class: {err}");
603 let via_from: FailureClass = (&err).into();
604 assert_eq!(via_from, fc);
605 }
606 }
607
608 #[test]
609 fn transport_error_for_distinguishes_retryable_shapes() {
610 assert!(matches!(
611 transport_error_for(&SubprocessInvokerError::Timeout),
612 Some(TransportError::Timeout)
613 ));
614 assert!(matches!(
615 transport_error_for(&SubprocessInvokerError::Spawn(std::io::Error::other("x"))),
616 Some(TransportError::Io(_))
617 ));
618 assert!(
619 transport_error_for(&SubprocessInvokerError::ReceiptEmittedRejected(
620 RouteError::InvalidEventEnvelope { detail: "x".into() }
621 ))
622 .is_none()
623 );
624 }
625
626 #[test]
627 fn read_to_end_limited_allows_exact_limit_and_rejects_overflow() {
628 let mut exact = Cursor::new(b"abcd".to_vec());
629 assert_eq!(
630 read_to_end_limited(&mut exact, 4, "stdout").unwrap(),
631 b"abcd"
632 );
633
634 let mut over = Cursor::new(b"abcde".to_vec());
635 let err = read_to_end_limited(&mut over, 4, "stdout").unwrap_err();
636 assert!(
637 err.to_string()
638 .contains("subprocess stdout exceeded 4 bytes")
639 );
640 }
641
642 #[test]
643 fn read_to_end_truncated_marks_only_over_limit_stderr() {
644 let mut exact = Cursor::new(b"abcd".to_vec());
645 assert_eq!(read_to_end_truncated(&mut exact, 4).unwrap(), b"abcd");
646
647 let mut over = Cursor::new(b"abcde".to_vec());
648 let mut expected = b"abcd".to_vec();
649 expected.extend_from_slice(b"\n[stderr truncated]\n");
650 assert_eq!(read_to_end_truncated(&mut over, 4).unwrap(), expected);
651 }
652
653 #[test]
654 fn remaining_reports_future_duration_and_saturates_past_deadline() {
655 let future = Instant::now() + Duration::from_secs(60);
656 assert!(remaining(future) > Duration::from_secs(59));
657
658 let past = Instant::now() - Duration::from_millis(1);
659 assert_eq!(remaining(past), Duration::ZERO);
660 }
661
662 #[test]
663 fn config_is_chainable() {
664 let cfg = SubprocessInvokerConfig::new("/bin/cat", Duration::from_secs(1))
665 .arg("-")
666 .args(["--flag"]);
667 assert_eq!(cfg.program(), &PathBuf::from("/bin/cat"));
668 assert_eq!(cfg.timeout(), Duration::from_secs(1));
669 }
670}