Skip to main content

codex_runtime/runtime/
transport.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::process::{ExitStatus, Stdio};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6
7use serde_json::Value;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
10use tokio::sync::mpsc;
11use tokio::task::JoinHandle;
12use tokio::time::{timeout, Duration};
13
14use crate::runtime::core::io_policy::{
15    normalize_text_tail, trim_ascii_line_endings, trim_tail_bytes, validate_positive_capacity,
16};
17use crate::runtime::errors::RuntimeError;
18
19const DEFAULT_MAX_INBOUND_FRAME_BYTES: usize = 1024 * 1024;
20const DEFAULT_STDERR_TAIL_MAX_BYTES: usize = 16 * 1024;
21
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub struct StdioProcessSpec {
24    pub program: PathBuf,
25    pub args: Vec<String>,
26    pub env: HashMap<String, String>,
27    pub cwd: Option<PathBuf>,
28}
29
30impl StdioProcessSpec {
31    pub fn new(program: impl Into<PathBuf>) -> Self {
32        Self {
33            program: program.into(),
34            args: Vec::new(),
35            env: HashMap::new(),
36            cwd: None,
37        }
38    }
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub struct StdioTransportConfig {
43    pub read_channel_capacity: usize,
44    pub write_channel_capacity: usize,
45    pub max_inbound_frame_bytes: usize,
46    pub stderr_tail_max_bytes: usize,
47}
48
49impl Default for StdioTransportConfig {
50    fn default() -> Self {
51        Self {
52            read_channel_capacity: 1024,
53            write_channel_capacity: 1024,
54            max_inbound_frame_bytes: DEFAULT_MAX_INBOUND_FRAME_BYTES,
55            stderr_tail_max_bytes: DEFAULT_STDERR_TAIL_MAX_BYTES,
56        }
57    }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct TransportJoinResult {
62    pub exit_status: ExitStatus,
63    pub malformed_line_count: u64,
64    pub stderr_tail: Option<String>,
65}
66
67pub struct StdioTransport {
68    write_tx: Option<mpsc::Sender<Value>>,
69    read_rx: Option<mpsc::Receiver<Value>>,
70    malformed_line_count: Arc<AtomicU64>,
71    stderr_diagnostics: Arc<StderrDiagnostics>,
72    reader_task: Option<JoinHandle<std::io::Result<()>>>,
73    writer_task: Option<JoinHandle<std::io::Result<()>>>,
74    stderr_task: Option<JoinHandle<std::io::Result<()>>>,
75    child: Option<Child>,
76    child_exit_status: Option<ExitStatus>,
77}
78
79#[derive(Default)]
80struct StderrDiagnostics {
81    tail: Mutex<Vec<u8>>,
82}
83
84impl StderrDiagnostics {
85    fn append_chunk(&self, chunk: &[u8], max_tail_bytes: usize) {
86        let mut tail = match self.tail.lock() {
87            Ok(guard) => guard,
88            Err(poisoned) => poisoned.into_inner(),
89        };
90        tail.extend_from_slice(chunk);
91        trim_tail_bytes(&mut tail, max_tail_bytes);
92    }
93
94    fn snapshot(&self) -> Option<String> {
95        let tail = match self.tail.lock() {
96            Ok(guard) => guard,
97            Err(poisoned) => poisoned.into_inner(),
98        };
99        normalize_text_tail(&tail)
100    }
101
102    fn has_data(&self) -> bool {
103        let tail = match self.tail.lock() {
104            Ok(guard) => guard,
105            Err(poisoned) => poisoned.into_inner(),
106        };
107        !tail.is_empty()
108    }
109}
110
111impl StdioTransport {
112    pub async fn spawn(
113        spec: StdioProcessSpec,
114        config: StdioTransportConfig,
115    ) -> Result<Self, RuntimeError> {
116        validate_positive_capacity("read_channel_capacity", config.read_channel_capacity)?;
117        validate_positive_capacity("write_channel_capacity", config.write_channel_capacity)?;
118        validate_positive_capacity("max_inbound_frame_bytes", config.max_inbound_frame_bytes)?;
119        validate_positive_capacity("stderr_tail_max_bytes", config.stderr_tail_max_bytes)?;
120
121        let mut command = Command::new(&spec.program);
122        command
123            .args(&spec.args)
124            .stdin(Stdio::piped())
125            .stdout(Stdio::piped())
126            .stderr(Stdio::piped());
127
128        if let Some(cwd) = &spec.cwd {
129            command.current_dir(cwd);
130        }
131
132        for (key, value) in &spec.env {
133            command.env(key, value);
134        }
135
136        let mut child = command
137            .spawn()
138            .map_err(|err| RuntimeError::Internal(format!("failed to spawn child: {err}")))?;
139
140        let stdin = child.stdin.take().ok_or_else(|| {
141            RuntimeError::Internal("failed to acquire child stdin pipe".to_owned())
142        })?;
143        let stdout = child.stdout.take().ok_or_else(|| {
144            RuntimeError::Internal("failed to acquire child stdout pipe".to_owned())
145        })?;
146        let stderr = child.stderr.take().ok_or_else(|| {
147            RuntimeError::Internal("failed to acquire child stderr pipe".to_owned())
148        })?;
149
150        let (write_tx, write_rx) = mpsc::channel(config.write_channel_capacity);
151        let (read_tx, read_rx) = mpsc::channel(config.read_channel_capacity);
152        let malformed_line_count = Arc::new(AtomicU64::new(0));
153        let malformed_line_count_clone = Arc::clone(&malformed_line_count);
154        let stderr_diagnostics = Arc::new(StderrDiagnostics::default());
155        let stderr_diagnostics_clone = Arc::clone(&stderr_diagnostics);
156
157        let reader_task = tokio::spawn(reader_loop(
158            stdout,
159            read_tx,
160            malformed_line_count_clone,
161            config.max_inbound_frame_bytes,
162        ));
163        let writer_task = tokio::spawn(writer_loop(write_rx, stdin));
164        let stderr_task = tokio::spawn(stderr_loop(
165            stderr,
166            stderr_diagnostics_clone,
167            config.stderr_tail_max_bytes,
168        ));
169
170        Ok(Self {
171            write_tx: Some(write_tx),
172            read_rx: Some(read_rx),
173            malformed_line_count,
174            stderr_diagnostics,
175            reader_task: Some(reader_task),
176            writer_task: Some(writer_task),
177            stderr_task: Some(stderr_task),
178            child: Some(child),
179            child_exit_status: None,
180        })
181    }
182
183    pub fn write_tx(&self) -> Result<mpsc::Sender<Value>, RuntimeError> {
184        self.write_tx
185            .as_ref()
186            .cloned()
187            .ok_or_else(|| RuntimeError::Internal("write sender missing from transport".to_owned()))
188    }
189
190    pub fn take_read_rx(&mut self) -> Result<mpsc::Receiver<Value>, RuntimeError> {
191        self.read_rx.take().ok_or_else(|| {
192            RuntimeError::Internal("read receiver already taken from transport".to_owned())
193        })
194    }
195
196    pub fn malformed_line_count(&self) -> u64 {
197        self.malformed_line_count.load(Ordering::Relaxed)
198    }
199
200    /// Latest child stderr tail snapshot, if any bytes have been observed.
201    /// Allocation: one String clone. Complexity: O(n), n = stored stderr tail bytes.
202    pub fn stderr_tail_snapshot(&self) -> Option<String> {
203        self.stderr_diagnostics.snapshot()
204    }
205
206    /// Non-blocking child status probe.
207    /// Allocation: none. Complexity: O(1).
208    pub fn try_wait_exit(&mut self) -> Result<Option<ExitStatus>, RuntimeError> {
209        if let Some(status) = self.child_exit_status {
210            return Ok(Some(status));
211        }
212
213        let Some(child) = self.child.as_mut() else {
214            return Ok(None);
215        };
216        let status = child
217            .try_wait()
218            .map_err(|err| RuntimeError::Internal(format!("child try_wait failed: {err}")))?;
219        if let Some(status) = status {
220            self.child_exit_status = Some(status);
221            return Ok(Some(status));
222        }
223        Ok(None)
224    }
225
226    pub async fn join(mut self) -> Result<TransportJoinResult, RuntimeError> {
227        let malformed_line_count = self.malformed_line_count();
228
229        drop(self.read_rx.take());
230        drop(self.write_tx.take());
231
232        await_io_task(
233            self.writer_task.take(),
234            "writer",
235            self.stderr_diagnostics.as_ref(),
236        )
237        .await?;
238        await_io_task(
239            self.reader_task.take(),
240            "reader",
241            self.stderr_diagnostics.as_ref(),
242        )
243        .await?;
244        let exit_status = wait_child_exit(&mut self).await?;
245        await_io_task(
246            self.stderr_task.take(),
247            "stderr-reader",
248            self.stderr_diagnostics.as_ref(),
249        )
250        .await?;
251
252        Ok(TransportJoinResult {
253            exit_status,
254            malformed_line_count,
255            stderr_tail: self.stderr_diagnostics.snapshot(),
256        })
257    }
258
259    /// Shutdown path used by runtime.
260    /// It closes outbound queue, attempts bounded writer flush,
261    /// waits for graceful child exit and then force-kills on timeout, and joins reader.
262    /// Allocation: none. Complexity: O(1) control + O(bytes) flush/write drain.
263    pub async fn terminate_and_join(
264        mut self,
265        flush_timeout: Duration,
266        terminate_grace: Duration,
267    ) -> Result<TransportJoinResult, RuntimeError> {
268        let malformed_line_count = self.malformed_line_count();
269
270        drop(self.read_rx.take());
271        drop(self.write_tx.take());
272
273        let mut writer_task = self
274            .writer_task
275            .take()
276            .ok_or_else(|| RuntimeError::Internal("writer task missing in transport".to_owned()))?;
277
278        let writer_join = timeout(flush_timeout, &mut writer_task).await;
279        let writer_result = match writer_join {
280            Ok(joined) => joined,
281            Err(_) => {
282                // Flush timed out: continue shutdown by terminating child,
283                // then rejoin writer to avoid detached background tasks.
284                wait_child_exit_with_grace(&mut self, terminate_grace).await?;
285                writer_task.await
286            }
287        }
288        .map_err(|err| {
289            runtime_internal_with_stderr(
290                format!("writer task join failed: {err}"),
291                self.stderr_diagnostics.as_ref(),
292            )
293        })?;
294
295        if let Err(err) = writer_result {
296            return Err(runtime_internal_with_stderr(
297                format!("writer task failed: {err}"),
298                self.stderr_diagnostics.as_ref(),
299            ));
300        }
301
302        let exit_status = wait_child_exit_with_grace(&mut self, terminate_grace).await?;
303        await_io_task(
304            self.reader_task.take(),
305            "reader",
306            self.stderr_diagnostics.as_ref(),
307        )
308        .await?;
309        await_io_task(
310            self.stderr_task.take(),
311            "stderr-reader",
312            self.stderr_diagnostics.as_ref(),
313        )
314        .await?;
315
316        Ok(TransportJoinResult {
317            exit_status,
318            malformed_line_count,
319            stderr_tail: self.stderr_diagnostics.snapshot(),
320        })
321    }
322}
323
324async fn await_io_task(
325    task: Option<JoinHandle<std::io::Result<()>>>,
326    label: &str,
327    stderr_diagnostics: &StderrDiagnostics,
328) -> Result<(), RuntimeError> {
329    let Some(task) = task else {
330        return Err(runtime_internal_with_stderr(
331            format!("{label} task missing in transport"),
332            stderr_diagnostics,
333        ));
334    };
335
336    let joined = task.await;
337    let task_result = joined.map_err(|err| {
338        runtime_internal_with_stderr(
339            format!("{label} task join failed: {err}"),
340            stderr_diagnostics,
341        )
342    })?;
343    if let Err(err) = task_result {
344        return Err(runtime_internal_with_stderr(
345            format!("{label} task failed: {err}"),
346            stderr_diagnostics,
347        ));
348    }
349    Ok(())
350}
351
352fn runtime_internal_with_stderr(
353    message: String,
354    stderr_diagnostics: &StderrDiagnostics,
355) -> RuntimeError {
356    if stderr_diagnostics.has_data() {
357        RuntimeError::Internal(format!("{message}; child stderr tail captured"))
358    } else {
359        RuntimeError::Internal(message)
360    }
361}
362
363async fn wait_child_exit(transport: &mut StdioTransport) -> Result<ExitStatus, RuntimeError> {
364    wait_child_exit_inner(transport, None).await
365}
366
367async fn wait_child_exit_with_grace(
368    transport: &mut StdioTransport,
369    terminate_grace: Duration,
370) -> Result<ExitStatus, RuntimeError> {
371    wait_child_exit_inner(transport, Some(terminate_grace)).await
372}
373
374async fn wait_child_exit_inner(
375    transport: &mut StdioTransport,
376    terminate_grace: Option<Duration>,
377) -> Result<ExitStatus, RuntimeError> {
378    if let Some(status) = transport.try_wait_exit()? {
379        return Ok(status);
380    }
381
382    let child = transport.child.as_mut().ok_or_else(|| {
383        runtime_internal_with_stderr(
384            "child handle missing in transport".to_owned(),
385            transport.stderr_diagnostics.as_ref(),
386        )
387    })?;
388
389    let status = match terminate_grace {
390        None => child.wait().await.map_err(|err| {
391            runtime_internal_with_stderr(
392                format!("child wait failed: {err}"),
393                transport.stderr_diagnostics.as_ref(),
394            )
395        })?,
396        Some(grace) => match timeout(grace, child.wait()).await {
397            Ok(waited) => waited.map_err(|err| {
398                runtime_internal_with_stderr(
399                    format!("child wait failed: {err}"),
400                    transport.stderr_diagnostics.as_ref(),
401                )
402            })?,
403            Err(_) => {
404                child.kill().await.map_err(|err| {
405                    runtime_internal_with_stderr(
406                        format!("child kill failed: {err}"),
407                        transport.stderr_diagnostics.as_ref(),
408                    )
409                })?;
410                child.wait().await.map_err(|err| {
411                    runtime_internal_with_stderr(
412                        format!("child wait after kill failed: {err}"),
413                        transport.stderr_diagnostics.as_ref(),
414                    )
415                })?
416            }
417        },
418    };
419    transport.child_exit_status = Some(status);
420    Ok(status)
421}
422
423/// Reader loop: one line -> one JSON parse attempt.
424/// Allocation: one reusable byte buffer per task. Complexity: O(line_length) per line.
425async fn reader_loop(
426    stdout: ChildStdout,
427    inbound_tx: mpsc::Sender<Value>,
428    malformed_line_count: Arc<AtomicU64>,
429    max_inbound_frame_bytes: usize,
430) -> std::io::Result<()> {
431    let mut reader = BufReader::new(stdout);
432    let mut line = Vec::<u8>::with_capacity(4096);
433
434    loop {
435        line.clear();
436        let read = {
437            let mut limited_reader = (&mut reader).take((max_inbound_frame_bytes + 1) as u64);
438            limited_reader.read_until(b'\n', &mut line).await?
439        };
440        if read == 0 {
441            break;
442        }
443
444        if line.len() > max_inbound_frame_bytes {
445            malformed_line_count.fetch_add(1, Ordering::Relaxed);
446            if !line.ends_with(b"\n") {
447                discard_until_newline(&mut reader).await?;
448            }
449            continue;
450        }
451
452        let raw = trim_ascii_line_endings(line.as_slice());
453        if raw.is_empty() {
454            continue;
455        }
456
457        match serde_json::from_slice::<Value>(raw) {
458            Ok(json) => {
459                if inbound_tx.send(json).await.is_err() {
460                    break;
461                }
462            }
463            Err(_) => {
464                malformed_line_count.fetch_add(1, Ordering::Relaxed);
465            }
466        }
467    }
468
469    Ok(())
470}
471
472async fn discard_until_newline(reader: &mut BufReader<ChildStdout>) -> std::io::Result<()> {
473    loop {
474        let (consume_len, found_newline) = {
475            let buf = reader.fill_buf().await?;
476            if buf.is_empty() {
477                return Ok(());
478            }
479            match buf.iter().position(|byte| *byte == b'\n') {
480                Some(pos) => (pos + 1, true),
481                None => (buf.len(), false),
482            }
483        };
484        reader.consume(consume_len);
485        if found_newline {
486            return Ok(());
487        }
488    }
489}
490
491async fn stderr_loop(
492    stderr: ChildStderr,
493    diagnostics: Arc<StderrDiagnostics>,
494    max_tail_bytes: usize,
495) -> std::io::Result<()> {
496    let mut reader = BufReader::new(stderr);
497    let mut chunk = [0u8; 4096];
498
499    loop {
500        let read = reader.read(&mut chunk).await?;
501        if read == 0 {
502            break;
503        }
504        diagnostics.append_chunk(&chunk[..read], max_tail_bytes);
505    }
506
507    Ok(())
508}
509
510/// Writer loop: single serialization/write path into child stdin.
511/// Allocation: one reusable byte buffer per task. Complexity: O(frame_size) per message.
512async fn writer_loop(
513    mut outbound_rx: mpsc::Receiver<Value>,
514    mut stdin: ChildStdin,
515) -> std::io::Result<()> {
516    let mut frame = Vec::<u8>::with_capacity(4096);
517
518    while let Some(json) = outbound_rx.recv().await {
519        frame.clear();
520
521        serde_json::to_writer(&mut frame, &json).map_err(|err| {
522            std::io::Error::new(
523                std::io::ErrorKind::InvalidData,
524                format!("failed to serialize outbound json: {err}"),
525            )
526        })?;
527        frame.push(b'\n');
528
529        if let Err(err) = stdin.write_all(&frame).await {
530            if err.kind() == std::io::ErrorKind::BrokenPipe {
531                return Ok(());
532            }
533            return Err(err);
534        }
535    }
536
537    if let Err(err) = stdin.flush().await {
538        if err.kind() == std::io::ErrorKind::BrokenPipe {
539            return Ok(());
540        }
541        return Err(err);
542    }
543
544    Ok(())
545}
546
547#[cfg(test)]
548mod tests {
549    use std::time::Duration;
550
551    use serde_json::json;
552    use tokio::time::timeout;
553
554    use super::*;
555
556    fn shell_spec(script: &str) -> StdioProcessSpec {
557        let mut spec = StdioProcessSpec::new("sh");
558        spec.args = vec!["-c".to_owned(), script.to_owned()];
559        spec
560    }
561
562    #[tokio::test(flavor = "current_thread")]
563    async fn spawn_rejects_zero_capacity_channels() {
564        let err = match StdioTransport::spawn(
565            shell_spec("cat"),
566            StdioTransportConfig {
567                read_channel_capacity: 0,
568                write_channel_capacity: 16,
569                ..StdioTransportConfig::default()
570            },
571        )
572        .await
573        {
574            Ok(_) => panic!("must reject zero read channel capacity"),
575            Err(err) => err,
576        };
577        assert!(matches!(err, RuntimeError::InvalidConfig(_)));
578
579        let err = match StdioTransport::spawn(
580            shell_spec("cat"),
581            StdioTransportConfig {
582                read_channel_capacity: 16,
583                write_channel_capacity: 0,
584                ..StdioTransportConfig::default()
585            },
586        )
587        .await
588        {
589            Ok(_) => panic!("must reject zero write channel capacity"),
590            Err(err) => err,
591        };
592        assert!(matches!(err, RuntimeError::InvalidConfig(_)));
593    }
594
595    #[tokio::test(flavor = "current_thread")]
596    async fn writer_and_reader_roundtrip() {
597        let mut transport =
598            StdioTransport::spawn(shell_spec("cat"), StdioTransportConfig::default())
599                .await
600                .expect("spawn");
601        let mut read_rx = transport.take_read_rx().expect("take rx");
602        let write_tx = transport.write_tx().expect("take tx");
603
604        write_tx
605            .send(json!({"method":"ping","params":{"n":1}}))
606            .await
607            .expect("send #1");
608        write_tx
609            .send(json!({"method":"pong","params":{"n":2}}))
610            .await
611            .expect("send #2");
612        drop(write_tx);
613
614        let first = timeout(Duration::from_secs(2), read_rx.recv())
615            .await
616            .expect("recv timeout #1")
617            .expect("stream closed #1");
618        let second = timeout(Duration::from_secs(2), read_rx.recv())
619            .await
620            .expect("recv timeout #2")
621            .expect("stream closed #2");
622
623        assert_eq!(first["method"], "ping");
624        assert_eq!(second["method"], "pong");
625
626        drop(read_rx);
627        let joined = transport.join().await.expect("join");
628        assert!(joined.exit_status.success());
629        assert_eq!(joined.malformed_line_count, 0);
630        assert!(joined.stderr_tail.is_none());
631    }
632
633    #[tokio::test(flavor = "current_thread")]
634    async fn reader_skips_malformed_lines() {
635        let script =
636            r#"printf '%s\n' '{"method":"ok"}' 'not-json' '{"id":1,"result":{}}' '{broken'"#;
637        let mut transport =
638            StdioTransport::spawn(shell_spec(script), StdioTransportConfig::default())
639                .await
640                .expect("spawn");
641        let mut read_rx = transport.take_read_rx().expect("take rx");
642
643        let mut parsed = Vec::new();
644        while let Some(msg) = timeout(Duration::from_secs(2), read_rx.recv())
645            .await
646            .expect("recv timeout")
647        {
648            parsed.push(msg);
649        }
650
651        assert_eq!(parsed.len(), 2);
652        assert_eq!(transport.malformed_line_count(), 2);
653
654        drop(read_rx);
655        let joined = transport.join().await.expect("join");
656        assert!(joined.exit_status.success());
657        assert_eq!(joined.malformed_line_count, 2);
658        assert!(joined.stderr_tail.is_none());
659    }
660
661    #[tokio::test(flavor = "current_thread")]
662    async fn reader_survives_100k_lines_stream() {
663        let script = r#"
664i=0
665while [ "$i" -lt 100000 ]; do
666  printf '{"method":"tick","params":{"n":%s}}\n' "$i"
667  i=$((i+1))
668done
669"#;
670        let mut transport =
671            StdioTransport::spawn(shell_spec(script), StdioTransportConfig::default())
672                .await
673                .expect("spawn");
674        let mut read_rx = transport.take_read_rx().expect("take rx");
675
676        let mut count = 0usize;
677        while let Some(_msg) = timeout(Duration::from_secs(20), read_rx.recv())
678            .await
679            .expect("recv timeout")
680        {
681            count += 1;
682        }
683
684        assert_eq!(count, 100_000);
685        assert_eq!(transport.malformed_line_count(), 0);
686
687        drop(read_rx);
688        let joined = transport.join().await.expect("join");
689        assert!(joined.exit_status.success());
690        assert_eq!(joined.malformed_line_count, 0);
691        assert!(joined.stderr_tail.is_none());
692    }
693
694    #[tokio::test(flavor = "current_thread")]
695    async fn reader_drops_oversized_frame_and_recovers_next_frame() {
696        let script = r#"
697long=$(head -c 2048 </dev/zero | tr '\0' 'a')
698printf '{"method":"%s"}\n' "$long"
699printf '{"id":1,"result":{"ok":true}}\n'
700"#;
701        let mut transport = StdioTransport::spawn(
702            shell_spec(script),
703            StdioTransportConfig {
704                max_inbound_frame_bytes: 256,
705                ..StdioTransportConfig::default()
706            },
707        )
708        .await
709        .expect("spawn");
710        let mut read_rx = transport.take_read_rx().expect("take rx");
711
712        let mut parsed = Vec::new();
713        while let Some(msg) = timeout(Duration::from_secs(2), read_rx.recv())
714            .await
715            .expect("recv timeout")
716        {
717            parsed.push(msg);
718        }
719
720        assert_eq!(parsed.len(), 1);
721        assert_eq!(parsed[0]["id"], 1);
722        assert_eq!(transport.malformed_line_count(), 1);
723
724        drop(read_rx);
725        let joined = transport.join().await.expect("join");
726        assert!(joined.exit_status.success());
727        assert_eq!(joined.malformed_line_count, 1);
728        assert!(joined.stderr_tail.is_none());
729    }
730
731    #[tokio::test(flavor = "current_thread")]
732    async fn join_exposes_child_stderr_tail_for_diagnostics() {
733        let script = r#"
734printf 'diag-line-1\n' >&2
735printf 'diag-line-2\n' >&2
736"#;
737        let mut transport = StdioTransport::spawn(
738            shell_spec(script),
739            StdioTransportConfig {
740                stderr_tail_max_bytes: 128,
741                ..StdioTransportConfig::default()
742            },
743        )
744        .await
745        .expect("spawn");
746        let read_rx = transport.take_read_rx().expect("take rx");
747        drop(read_rx);
748
749        let joined = transport.join().await.expect("join");
750        assert!(joined.exit_status.success());
751        let stderr_tail = joined.stderr_tail.expect("stderr tail must be captured");
752        assert!(stderr_tail.contains("diag-line-1"));
753        assert!(stderr_tail.contains("diag-line-2"));
754    }
755
756    #[test]
757    fn runtime_internal_with_stderr_redacts_tail_contents() {
758        let diagnostics = StderrDiagnostics::default();
759        diagnostics.append_chunk(b"secret-token\n", 128);
760
761        let RuntimeError::Internal(message) =
762            runtime_internal_with_stderr("transport failed".to_owned(), &diagnostics)
763        else {
764            panic!("expected internal runtime error");
765        };
766
767        assert!(message.contains("transport failed"));
768        assert!(message.contains("child stderr tail captured"));
769        assert!(!message.contains("secret-token"));
770    }
771}