Skip to main content

codex_app_server_sdk/transport/
stdio.rs

1use std::collections::HashMap;
2use std::process::Stdio;
3
4use serde_json::Value;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
6use tokio::process::Command;
7use tokio::sync::mpsc;
8
9use super::TransportHandle;
10use crate::error::ClientError;
11
12pub async fn spawn_stdio_transport(
13    binary: &str,
14    args: &[String],
15    env: &HashMap<String, String>,
16) -> Result<TransportHandle, ClientError> {
17    let mut cmd = Command::new(binary);
18    cmd.args(args)
19        .stdin(Stdio::piped())
20        .stdout(Stdio::piped())
21        .stderr(Stdio::inherit());
22
23    for (key, value) in env {
24        cmd.env(key, value);
25    }
26
27    let mut child = cmd.spawn()?;
28
29    let mut stdin = child
30        .stdin
31        .take()
32        .ok_or_else(|| ClientError::TransportSend("missing child stdin".to_string()))?;
33    let stdout = child
34        .stdout
35        .take()
36        .ok_or_else(|| ClientError::TransportSend("missing child stdout".to_string()))?;
37
38    let (outbound_tx, mut outbound_rx) = mpsc::channel::<Value>(256);
39    let (inbound_tx, inbound_rx) = mpsc::channel::<Result<Value, ClientError>>(1024);
40
41    let inbound_for_writer = inbound_tx.clone();
42    tokio::spawn(async move {
43        while let Some(message) = outbound_rx.recv().await {
44            match serde_json::to_string(&message) {
45                Ok(line) => {
46                    if let Err(err) = stdin.write_all(line.as_bytes()).await {
47                        let _ = inbound_for_writer.send(Err(ClientError::Io(err))).await;
48                        break;
49                    }
50                    if let Err(err) = stdin.write_all(b"\n").await {
51                        let _ = inbound_for_writer.send(Err(ClientError::Io(err))).await;
52                        break;
53                    }
54                }
55                Err(err) => {
56                    let _ = inbound_for_writer
57                        .send(Err(ClientError::Serialization(err)))
58                        .await;
59                    break;
60                }
61            }
62        }
63    });
64
65    let inbound_for_reader = inbound_tx.clone();
66    tokio::spawn(async move {
67        let mut lines = BufReader::new(stdout).lines();
68        loop {
69            match lines.next_line().await {
70                Ok(Some(line)) => match serde_json::from_str::<Value>(&line) {
71                    Ok(value) => {
72                        if inbound_for_reader.send(Ok(value)).await.is_err() {
73                            break;
74                        }
75                    }
76                    Err(err) => {
77                        if inbound_for_reader
78                            .send(Err(ClientError::InvalidMessage(format!(
79                                "failed to parse JSONL frame: {err}"
80                            ))))
81                            .await
82                            .is_err()
83                        {
84                            break;
85                        }
86                    }
87                },
88                Ok(None) => {
89                    let _ = inbound_for_reader
90                        .send(Err(ClientError::TransportClosed))
91                        .await;
92                    break;
93                }
94                Err(err) => {
95                    let _ = inbound_for_reader.send(Err(ClientError::Io(err))).await;
96                    break;
97                }
98            }
99        }
100    });
101
102    tokio::spawn(async move {
103        let _ = child.wait().await;
104    });
105
106    Ok(TransportHandle {
107        outbound: outbound_tx,
108        inbound: inbound_rx,
109    })
110}