1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::process::{ExitStatus, Stdio};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6
7use serde_json::Value;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio::time::{timeout, Duration};
13
14use crate::runtime::core::io_policy::{
15 normalize_text_tail, trim_ascii_line_endings, trim_tail_bytes, validate_positive_capacity,
16};
17use crate::runtime::errors::RuntimeError;
18
19const DEFAULT_MAX_INBOUND_FRAME_BYTES: usize = 1024 * 1024;
20const DEFAULT_STDERR_TAIL_MAX_BYTES: usize = 16 * 1024;
21
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub struct StdioProcessSpec {
24 pub program: PathBuf,
25 pub args: Vec<String>,
26 pub env: HashMap<String, String>,
27 pub cwd: Option<PathBuf>,
28}
29
30impl StdioProcessSpec {
31 pub fn new(program: impl Into<PathBuf>) -> Self {
32 Self {
33 program: program.into(),
34 args: Vec::new(),
35 env: HashMap::new(),
36 cwd: None,
37 }
38 }
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub struct StdioTransportConfig {
43 pub read_channel_capacity: usize,
44 pub write_channel_capacity: usize,
45 pub max_inbound_frame_bytes: usize,
46 pub stderr_tail_max_bytes: usize,
47}
48
49impl Default for StdioTransportConfig {
50 fn default() -> Self {
51 Self {
52 read_channel_capacity: 1024,
53 write_channel_capacity: 1024,
54 max_inbound_frame_bytes: DEFAULT_MAX_INBOUND_FRAME_BYTES,
55 stderr_tail_max_bytes: DEFAULT_STDERR_TAIL_MAX_BYTES,
56 }
57 }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct TransportJoinResult {
62 pub exit_status: ExitStatus,
63 pub malformed_line_count: u64,
64 pub stderr_tail: Option<String>,
65}
66
67pub struct StdioTransport {
68 write_tx: Option<mpsc::Sender<Value>>,
69 read_rx: Option<mpsc::Receiver<Value>>,
70 malformed_line_count: Arc<AtomicU64>,
71 stderr_diagnostics: Arc<StderrDiagnostics>,
72 reader_task: Option<JoinHandle<std::io::Result<()>>>,
73 writer_task: Option<JoinHandle<std::io::Result<()>>>,
74 stderr_task: Option<JoinHandle<std::io::Result<()>>>,
75 child: Option<Child>,
76 child_exit_status: Option<ExitStatus>,
77}
78
79#[derive(Default)]
80struct StderrDiagnostics {
81 tail: Mutex<Vec<u8>>,
82}
83
84impl StderrDiagnostics {
85 fn append_chunk(&self, chunk: &[u8], max_tail_bytes: usize) {
86 let mut tail = match self.tail.lock() {
87 Ok(guard) => guard,
88 Err(poisoned) => poisoned.into_inner(),
89 };
90 tail.extend_from_slice(chunk);
91 trim_tail_bytes(&mut tail, max_tail_bytes);
92 }
93
94 fn snapshot(&self) -> Option<String> {
95 let tail = match self.tail.lock() {
96 Ok(guard) => guard,
97 Err(poisoned) => poisoned.into_inner(),
98 };
99 normalize_text_tail(&tail)
100 }
101
102 fn has_data(&self) -> bool {
103 let tail = match self.tail.lock() {
104 Ok(guard) => guard,
105 Err(poisoned) => poisoned.into_inner(),
106 };
107 !tail.is_empty()
108 }
109}
110
111impl StdioTransport {
112 pub async fn spawn(
113 spec: StdioProcessSpec,
114 config: StdioTransportConfig,
115 ) -> Result<Self, RuntimeError> {
116 validate_positive_capacity("read_channel_capacity", config.read_channel_capacity)?;
117 validate_positive_capacity("write_channel_capacity", config.write_channel_capacity)?;
118 validate_positive_capacity("max_inbound_frame_bytes", config.max_inbound_frame_bytes)?;
119 validate_positive_capacity("stderr_tail_max_bytes", config.stderr_tail_max_bytes)?;
120
121 let mut command = Command::new(&spec.program);
122 command
123 .args(&spec.args)
124 .stdin(Stdio::piped())
125 .stdout(Stdio::piped())
126 .stderr(Stdio::piped());
127
128 if let Some(cwd) = &spec.cwd {
129 command.current_dir(cwd);
130 }
131
132 for (key, value) in &spec.env {
133 command.env(key, value);
134 }
135
136 let mut child = command
137 .spawn()
138 .map_err(|err| RuntimeError::Internal(format!("failed to spawn child: {err}")))?;
139
140 let stdin = child.stdin.take().ok_or_else(|| {
141 RuntimeError::Internal("failed to acquire child stdin pipe".to_owned())
142 })?;
143 let stdout = child.stdout.take().ok_or_else(|| {
144 RuntimeError::Internal("failed to acquire child stdout pipe".to_owned())
145 })?;
146 let stderr = child.stderr.take().ok_or_else(|| {
147 RuntimeError::Internal("failed to acquire child stderr pipe".to_owned())
148 })?;
149
150 let (write_tx, write_rx) = mpsc::channel(config.write_channel_capacity);
151 let (read_tx, read_rx) = mpsc::channel(config.read_channel_capacity);
152 let malformed_line_count = Arc::new(AtomicU64::new(0));
153 let malformed_line_count_clone = Arc::clone(&malformed_line_count);
154 let stderr_diagnostics = Arc::new(StderrDiagnostics::default());
155 let stderr_diagnostics_clone = Arc::clone(&stderr_diagnostics);
156
157 let reader_task = tokio::spawn(reader_loop(
158 stdout,
159 read_tx,
160 malformed_line_count_clone,
161 config.max_inbound_frame_bytes,
162 ));
163 let writer_task = tokio::spawn(writer_loop(write_rx, stdin));
164 let stderr_task = tokio::spawn(stderr_loop(
165 stderr,
166 stderr_diagnostics_clone,
167 config.stderr_tail_max_bytes,
168 ));
169
170 Ok(Self {
171 write_tx: Some(write_tx),
172 read_rx: Some(read_rx),
173 malformed_line_count,
174 stderr_diagnostics,
175 reader_task: Some(reader_task),
176 writer_task: Some(writer_task),
177 stderr_task: Some(stderr_task),
178 child: Some(child),
179 child_exit_status: None,
180 })
181 }
182
183 pub fn write_tx(&self) -> Result<mpsc::Sender<Value>, RuntimeError> {
184 self.write_tx
185 .as_ref()
186 .cloned()
187 .ok_or_else(|| RuntimeError::Internal("write sender missing from transport".to_owned()))
188 }
189
190 pub fn take_read_rx(&mut self) -> Result<mpsc::Receiver<Value>, RuntimeError> {
191 self.read_rx.take().ok_or_else(|| {
192 RuntimeError::Internal("read receiver already taken from transport".to_owned())
193 })
194 }
195
196 pub fn malformed_line_count(&self) -> u64 {
197 self.malformed_line_count.load(Ordering::Relaxed)
198 }
199
200 pub fn stderr_tail_snapshot(&self) -> Option<String> {
203 self.stderr_diagnostics.snapshot()
204 }
205
206 pub fn try_wait_exit(&mut self) -> Result<Option<ExitStatus>, RuntimeError> {
209 if let Some(status) = self.child_exit_status {
210 return Ok(Some(status));
211 }
212
213 let Some(child) = self.child.as_mut() else {
214 return Ok(None);
215 };
216 let status = child
217 .try_wait()
218 .map_err(|err| RuntimeError::Internal(format!("child try_wait failed: {err}")))?;
219 if let Some(status) = status {
220 self.child_exit_status = Some(status);
221 return Ok(Some(status));
222 }
223 Ok(None)
224 }
225
226 pub async fn join(mut self) -> Result<TransportJoinResult, RuntimeError> {
227 let malformed_line_count = self.malformed_line_count();
228
229 drop(self.read_rx.take());
230 drop(self.write_tx.take());
231
232 await_io_task(
233 self.writer_task.take(),
234 "writer",
235 self.stderr_diagnostics.as_ref(),
236 )
237 .await?;
238 await_io_task(
239 self.reader_task.take(),
240 "reader",
241 self.stderr_diagnostics.as_ref(),
242 )
243 .await?;
244 let exit_status = wait_child_exit(&mut self).await?;
245 await_io_task(
246 self.stderr_task.take(),
247 "stderr-reader",
248 self.stderr_diagnostics.as_ref(),
249 )
250 .await?;
251
252 Ok(TransportJoinResult {
253 exit_status,
254 malformed_line_count,
255 stderr_tail: self.stderr_diagnostics.snapshot(),
256 })
257 }
258
259 pub async fn terminate_and_join(
264 mut self,
265 flush_timeout: Duration,
266 terminate_grace: Duration,
267 ) -> Result<TransportJoinResult, RuntimeError> {
268 let malformed_line_count = self.malformed_line_count();
269
270 drop(self.read_rx.take());
271 drop(self.write_tx.take());
272
273 let mut writer_task = self
274 .writer_task
275 .take()
276 .ok_or_else(|| RuntimeError::Internal("writer task missing in transport".to_owned()))?;
277
278 let writer_join = timeout(flush_timeout, &mut writer_task).await;
279 let writer_result = match writer_join {
280 Ok(joined) => joined,
281 Err(_) => {
282 wait_child_exit_with_grace(&mut self, terminate_grace).await?;
285 writer_task.await
286 }
287 }
288 .map_err(|err| {
289 runtime_internal_with_stderr(
290 format!("writer task join failed: {err}"),
291 self.stderr_diagnostics.as_ref(),
292 )
293 })?;
294
295 if let Err(err) = writer_result {
296 return Err(runtime_internal_with_stderr(
297 format!("writer task failed: {err}"),
298 self.stderr_diagnostics.as_ref(),
299 ));
300 }
301
302 let exit_status = wait_child_exit_with_grace(&mut self, terminate_grace).await?;
303 await_io_task(
304 self.reader_task.take(),
305 "reader",
306 self.stderr_diagnostics.as_ref(),
307 )
308 .await?;
309 await_io_task(
310 self.stderr_task.take(),
311 "stderr-reader",
312 self.stderr_diagnostics.as_ref(),
313 )
314 .await?;
315
316 Ok(TransportJoinResult {
317 exit_status,
318 malformed_line_count,
319 stderr_tail: self.stderr_diagnostics.snapshot(),
320 })
321 }
322}
323
324async fn await_io_task(
325 task: Option<JoinHandle<std::io::Result<()>>>,
326 label: &str,
327 stderr_diagnostics: &StderrDiagnostics,
328) -> Result<(), RuntimeError> {
329 let Some(task) = task else {
330 return Err(runtime_internal_with_stderr(
331 format!("{label} task missing in transport"),
332 stderr_diagnostics,
333 ));
334 };
335
336 let joined = task.await;
337 let task_result = joined.map_err(|err| {
338 runtime_internal_with_stderr(
339 format!("{label} task join failed: {err}"),
340 stderr_diagnostics,
341 )
342 })?;
343 if let Err(err) = task_result {
344 return Err(runtime_internal_with_stderr(
345 format!("{label} task failed: {err}"),
346 stderr_diagnostics,
347 ));
348 }
349 Ok(())
350}
351
352fn runtime_internal_with_stderr(
353 message: String,
354 stderr_diagnostics: &StderrDiagnostics,
355) -> RuntimeError {
356 if stderr_diagnostics.has_data() {
357 RuntimeError::Internal(format!("{message}; child stderr tail captured"))
358 } else {
359 RuntimeError::Internal(message)
360 }
361}
362
363async fn wait_child_exit(transport: &mut StdioTransport) -> Result<ExitStatus, RuntimeError> {
364 wait_child_exit_inner(transport, None).await
365}
366
367async fn wait_child_exit_with_grace(
368 transport: &mut StdioTransport,
369 terminate_grace: Duration,
370) -> Result<ExitStatus, RuntimeError> {
371 wait_child_exit_inner(transport, Some(terminate_grace)).await
372}
373
374async fn wait_child_exit_inner(
375 transport: &mut StdioTransport,
376 terminate_grace: Option<Duration>,
377) -> Result<ExitStatus, RuntimeError> {
378 if let Some(status) = transport.try_wait_exit()? {
379 return Ok(status);
380 }
381
382 let child = transport.child.as_mut().ok_or_else(|| {
383 runtime_internal_with_stderr(
384 "child handle missing in transport".to_owned(),
385 transport.stderr_diagnostics.as_ref(),
386 )
387 })?;
388
389 let status = match terminate_grace {
390 None => child.wait().await.map_err(|err| {
391 runtime_internal_with_stderr(
392 format!("child wait failed: {err}"),
393 transport.stderr_diagnostics.as_ref(),
394 )
395 })?,
396 Some(grace) => match timeout(grace, child.wait()).await {
397 Ok(waited) => waited.map_err(|err| {
398 runtime_internal_with_stderr(
399 format!("child wait failed: {err}"),
400 transport.stderr_diagnostics.as_ref(),
401 )
402 })?,
403 Err(_) => {
404 child.kill().await.map_err(|err| {
405 runtime_internal_with_stderr(
406 format!("child kill failed: {err}"),
407 transport.stderr_diagnostics.as_ref(),
408 )
409 })?;
410 child.wait().await.map_err(|err| {
411 runtime_internal_with_stderr(
412 format!("child wait after kill failed: {err}"),
413 transport.stderr_diagnostics.as_ref(),
414 )
415 })?
416 }
417 },
418 };
419 transport.child_exit_status = Some(status);
420 Ok(status)
421}
422
423async fn reader_loop(
426 stdout: ChildStdout,
427 inbound_tx: mpsc::Sender<Value>,
428 malformed_line_count: Arc<AtomicU64>,
429 max_inbound_frame_bytes: usize,
430) -> std::io::Result<()> {
431 let mut reader = BufReader::new(stdout);
432 let mut line = Vec::<u8>::with_capacity(4096);
433
434 loop {
435 line.clear();
436 let read = {
437 let mut limited_reader = (&mut reader).take((max_inbound_frame_bytes + 1) as u64);
438 limited_reader.read_until(b'\n', &mut line).await?
439 };
440 if read == 0 {
441 break;
442 }
443
444 if line.len() > max_inbound_frame_bytes {
445 malformed_line_count.fetch_add(1, Ordering::Relaxed);
446 if !line.ends_with(b"\n") {
447 discard_until_newline(&mut reader).await?;
448 }
449 continue;
450 }
451
452 let raw = trim_ascii_line_endings(line.as_slice());
453 if raw.is_empty() {
454 continue;
455 }
456
457 match serde_json::from_slice::<Value>(raw) {
458 Ok(json) => {
459 if inbound_tx.send(json).await.is_err() {
460 break;
461 }
462 }
463 Err(_) => {
464 malformed_line_count.fetch_add(1, Ordering::Relaxed);
465 }
466 }
467 }
468
469 Ok(())
470}
471
472async fn discard_until_newline(reader: &mut BufReader<ChildStdout>) -> std::io::Result<()> {
473 loop {
474 let (consume_len, found_newline) = {
475 let buf = reader.fill_buf().await?;
476 if buf.is_empty() {
477 return Ok(());
478 }
479 match buf.iter().position(|byte| *byte == b'\n') {
480 Some(pos) => (pos + 1, true),
481 None => (buf.len(), false),
482 }
483 };
484 reader.consume(consume_len);
485 if found_newline {
486 return Ok(());
487 }
488 }
489}
490
491async fn stderr_loop(
492 stderr: ChildStderr,
493 diagnostics: Arc<StderrDiagnostics>,
494 max_tail_bytes: usize,
495) -> std::io::Result<()> {
496 let mut reader = BufReader::new(stderr);
497 let mut chunk = [0u8; 4096];
498
499 loop {
500 let read = reader.read(&mut chunk).await?;
501 if read == 0 {
502 break;
503 }
504 diagnostics.append_chunk(&chunk[..read], max_tail_bytes);
505 }
506
507 Ok(())
508}
509
510async fn writer_loop(
513 mut outbound_rx: mpsc::Receiver<Value>,
514 mut stdin: ChildStdin,
515) -> std::io::Result<()> {
516 let mut frame = Vec::<u8>::with_capacity(4096);
517
518 while let Some(json) = outbound_rx.recv().await {
519 frame.clear();
520
521 serde_json::to_writer(&mut frame, &json).map_err(|err| {
522 std::io::Error::new(
523 std::io::ErrorKind::InvalidData,
524 format!("failed to serialize outbound json: {err}"),
525 )
526 })?;
527 frame.push(b'\n');
528
529 if let Err(err) = stdin.write_all(&frame).await {
530 if err.kind() == std::io::ErrorKind::BrokenPipe {
531 return Ok(());
532 }
533 return Err(err);
534 }
535 }
536
537 if let Err(err) = stdin.flush().await {
538 if err.kind() == std::io::ErrorKind::BrokenPipe {
539 return Ok(());
540 }
541 return Err(err);
542 }
543
544 Ok(())
545}
546
547#[cfg(test)]
548mod tests {
549 use std::time::Duration;
550
551 use serde_json::json;
552 use tokio::time::timeout;
553
554 use super::*;
555
556 fn shell_spec(script: &str) -> StdioProcessSpec {
557 let mut spec = StdioProcessSpec::new("sh");
558 spec.args = vec!["-c".to_owned(), script.to_owned()];
559 spec
560 }
561
562 #[tokio::test(flavor = "current_thread")]
563 async fn spawn_rejects_zero_capacity_channels() {
564 let err = match StdioTransport::spawn(
565 shell_spec("cat"),
566 StdioTransportConfig {
567 read_channel_capacity: 0,
568 write_channel_capacity: 16,
569 ..StdioTransportConfig::default()
570 },
571 )
572 .await
573 {
574 Ok(_) => panic!("must reject zero read channel capacity"),
575 Err(err) => err,
576 };
577 assert!(matches!(err, RuntimeError::InvalidConfig(_)));
578
579 let err = match StdioTransport::spawn(
580 shell_spec("cat"),
581 StdioTransportConfig {
582 read_channel_capacity: 16,
583 write_channel_capacity: 0,
584 ..StdioTransportConfig::default()
585 },
586 )
587 .await
588 {
589 Ok(_) => panic!("must reject zero write channel capacity"),
590 Err(err) => err,
591 };
592 assert!(matches!(err, RuntimeError::InvalidConfig(_)));
593 }
594
595 #[tokio::test(flavor = "current_thread")]
596 async fn writer_and_reader_roundtrip() {
597 let mut transport =
598 StdioTransport::spawn(shell_spec("cat"), StdioTransportConfig::default())
599 .await
600 .expect("spawn");
601 let mut read_rx = transport.take_read_rx().expect("take rx");
602 let write_tx = transport.write_tx().expect("take tx");
603
604 write_tx
605 .send(json!({"method":"ping","params":{"n":1}}))
606 .await
607 .expect("send #1");
608 write_tx
609 .send(json!({"method":"pong","params":{"n":2}}))
610 .await
611 .expect("send #2");
612 drop(write_tx);
613
614 let first = timeout(Duration::from_secs(2), read_rx.recv())
615 .await
616 .expect("recv timeout #1")
617 .expect("stream closed #1");
618 let second = timeout(Duration::from_secs(2), read_rx.recv())
619 .await
620 .expect("recv timeout #2")
621 .expect("stream closed #2");
622
623 assert_eq!(first["method"], "ping");
624 assert_eq!(second["method"], "pong");
625
626 drop(read_rx);
627 let joined = transport.join().await.expect("join");
628 assert!(joined.exit_status.success());
629 assert_eq!(joined.malformed_line_count, 0);
630 assert!(joined.stderr_tail.is_none());
631 }
632
633 #[tokio::test(flavor = "current_thread")]
634 async fn reader_skips_malformed_lines() {
635 let script =
636 r#"printf '%s\n' '{"method":"ok"}' 'not-json' '{"id":1,"result":{}}' '{broken'"#;
637 let mut transport =
638 StdioTransport::spawn(shell_spec(script), StdioTransportConfig::default())
639 .await
640 .expect("spawn");
641 let mut read_rx = transport.take_read_rx().expect("take rx");
642
643 let mut parsed = Vec::new();
644 while let Some(msg) = timeout(Duration::from_secs(2), read_rx.recv())
645 .await
646 .expect("recv timeout")
647 {
648 parsed.push(msg);
649 }
650
651 assert_eq!(parsed.len(), 2);
652 assert_eq!(transport.malformed_line_count(), 2);
653
654 drop(read_rx);
655 let joined = transport.join().await.expect("join");
656 assert!(joined.exit_status.success());
657 assert_eq!(joined.malformed_line_count, 2);
658 assert!(joined.stderr_tail.is_none());
659 }
660
661 #[tokio::test(flavor = "current_thread")]
662 async fn reader_survives_100k_lines_stream() {
663 let script = r#"
664i=0
665while [ "$i" -lt 100000 ]; do
666 printf '{"method":"tick","params":{"n":%s}}\n' "$i"
667 i=$((i+1))
668done
669"#;
670 let mut transport =
671 StdioTransport::spawn(shell_spec(script), StdioTransportConfig::default())
672 .await
673 .expect("spawn");
674 let mut read_rx = transport.take_read_rx().expect("take rx");
675
676 let mut count = 0usize;
677 while let Some(_msg) = timeout(Duration::from_secs(20), read_rx.recv())
678 .await
679 .expect("recv timeout")
680 {
681 count += 1;
682 }
683
684 assert_eq!(count, 100_000);
685 assert_eq!(transport.malformed_line_count(), 0);
686
687 drop(read_rx);
688 let joined = transport.join().await.expect("join");
689 assert!(joined.exit_status.success());
690 assert_eq!(joined.malformed_line_count, 0);
691 assert!(joined.stderr_tail.is_none());
692 }
693
694 #[tokio::test(flavor = "current_thread")]
695 async fn reader_drops_oversized_frame_and_recovers_next_frame() {
696 let script = r#"
697long=$(head -c 2048 </dev/zero | tr '\0' 'a')
698printf '{"method":"%s"}\n' "$long"
699printf '{"id":1,"result":{"ok":true}}\n'
700"#;
701 let mut transport = StdioTransport::spawn(
702 shell_spec(script),
703 StdioTransportConfig {
704 max_inbound_frame_bytes: 256,
705 ..StdioTransportConfig::default()
706 },
707 )
708 .await
709 .expect("spawn");
710 let mut read_rx = transport.take_read_rx().expect("take rx");
711
712 let mut parsed = Vec::new();
713 while let Some(msg) = timeout(Duration::from_secs(2), read_rx.recv())
714 .await
715 .expect("recv timeout")
716 {
717 parsed.push(msg);
718 }
719
720 assert_eq!(parsed.len(), 1);
721 assert_eq!(parsed[0]["id"], 1);
722 assert_eq!(transport.malformed_line_count(), 1);
723
724 drop(read_rx);
725 let joined = transport.join().await.expect("join");
726 assert!(joined.exit_status.success());
727 assert_eq!(joined.malformed_line_count, 1);
728 assert!(joined.stderr_tail.is_none());
729 }
730
731 #[tokio::test(flavor = "current_thread")]
732 async fn join_exposes_child_stderr_tail_for_diagnostics() {
733 let script = r#"
734printf 'diag-line-1\n' >&2
735printf 'diag-line-2\n' >&2
736"#;
737 let mut transport = StdioTransport::spawn(
738 shell_spec(script),
739 StdioTransportConfig {
740 stderr_tail_max_bytes: 128,
741 ..StdioTransportConfig::default()
742 },
743 )
744 .await
745 .expect("spawn");
746 let read_rx = transport.take_read_rx().expect("take rx");
747 drop(read_rx);
748
749 let joined = transport.join().await.expect("join");
750 assert!(joined.exit_status.success());
751 let stderr_tail = joined.stderr_tail.expect("stderr tail must be captured");
752 assert!(stderr_tail.contains("diag-line-1"));
753 assert!(stderr_tail.contains("diag-line-2"));
754 }
755
756 #[test]
757 fn runtime_internal_with_stderr_redacts_tail_contents() {
758 let diagnostics = StderrDiagnostics::default();
759 diagnostics.append_chunk(b"secret-token\n", 128);
760
761 let RuntimeError::Internal(message) =
762 runtime_internal_with_stderr("transport failed".to_owned(), &diagnostics)
763 else {
764 panic!("expected internal runtime error");
765 };
766
767 assert!(message.contains("transport failed"));
768 assert!(message.contains("child stderr tail captured"));
769 assert!(!message.contains("secret-token"));
770 }
771}