codex_app_server_sdk/transport/
stdio.rs1use std::collections::HashMap;
2use std::process::Stdio;
3
4use serde_json::Value;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
6use tokio::process::Command;
7use tokio::sync::mpsc;
8
9use super::TransportHandle;
10use crate::error::ClientError;
11
12pub async fn spawn_stdio_transport(
13 binary: &str,
14 args: &[String],
15 env: &HashMap<String, String>,
16) -> Result<TransportHandle, ClientError> {
17 let mut cmd = Command::new(binary);
18 cmd.args(args)
19 .stdin(Stdio::piped())
20 .stdout(Stdio::piped())
21 .stderr(Stdio::inherit());
22
23 for (key, value) in env {
24 cmd.env(key, value);
25 }
26
27 let mut child = cmd.spawn()?;
28
29 let mut stdin = child
30 .stdin
31 .take()
32 .ok_or_else(|| ClientError::TransportSend("missing child stdin".to_string()))?;
33 let stdout = child
34 .stdout
35 .take()
36 .ok_or_else(|| ClientError::TransportSend("missing child stdout".to_string()))?;
37
38 let (outbound_tx, mut outbound_rx) = mpsc::channel::<Value>(256);
39 let (inbound_tx, inbound_rx) = mpsc::channel::<Result<Value, ClientError>>(1024);
40
41 let inbound_for_writer = inbound_tx.clone();
42 tokio::spawn(async move {
43 while let Some(message) = outbound_rx.recv().await {
44 match serde_json::to_string(&message) {
45 Ok(line) => {
46 if let Err(err) = stdin.write_all(line.as_bytes()).await {
47 let _ = inbound_for_writer.send(Err(ClientError::Io(err))).await;
48 break;
49 }
50 if let Err(err) = stdin.write_all(b"\n").await {
51 let _ = inbound_for_writer.send(Err(ClientError::Io(err))).await;
52 break;
53 }
54 }
55 Err(err) => {
56 let _ = inbound_for_writer
57 .send(Err(ClientError::Serialization(err)))
58 .await;
59 break;
60 }
61 }
62 }
63 });
64
65 let inbound_for_reader = inbound_tx.clone();
66 tokio::spawn(async move {
67 let mut lines = BufReader::new(stdout).lines();
68 loop {
69 match lines.next_line().await {
70 Ok(Some(line)) => match serde_json::from_str::<Value>(&line) {
71 Ok(value) => {
72 if inbound_for_reader.send(Ok(value)).await.is_err() {
73 break;
74 }
75 }
76 Err(err) => {
77 if inbound_for_reader
78 .send(Err(ClientError::InvalidMessage(format!(
79 "failed to parse JSONL frame: {err}"
80 ))))
81 .await
82 .is_err()
83 {
84 break;
85 }
86 }
87 },
88 Ok(None) => {
89 let _ = inbound_for_reader
90 .send(Err(ClientError::TransportClosed))
91 .await;
92 break;
93 }
94 Err(err) => {
95 let _ = inbound_for_reader.send(Err(ClientError::Io(err))).await;
96 break;
97 }
98 }
99 }
100 });
101
102 tokio::spawn(async move {
103 let _ = child.wait().await;
104 });
105
106 Ok(TransportHandle {
107 outbound: outbound_tx,
108 inbound: inbound_rx,
109 })
110}