Skip to main content

codex_codes/
client_async.rs

1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns the app-server process)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//!     thread_id: thread.thread_id.clone(),
27//!     input: vec![UserInput::Text { text: "Hello!".into() }],
28//!     model: None,
29//!     reasoning_effort: None,
30//!     sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//!     match msg {
35//!         ServerMessage::Notification { method, params } => {
36//!             if method == "turn/completed" { break; }
37//!         }
38//!         ServerMessage::Request { id, method, .. } => {
39//!             client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//!         }
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, RequestId};
48use crate::protocol::{
49    ServerMessage, ThreadArchiveParams, ThreadArchiveResponse, ThreadStartParams,
50    ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse, TurnStartParams,
51    TurnStartResponse,
52};
53use log::{debug, error, warn};
54use serde::de::DeserializeOwned;
55use serde::Serialize;
56use std::collections::VecDeque;
57use std::sync::atomic::{AtomicI64, Ordering};
58use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
59use tokio::process::{Child, ChildStderr};
60
61/// Buffer size for reading stdout (10MB).
62const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
63
64/// Asynchronous multi-turn client for the Codex app-server.
65///
66/// Communicates with a long-lived `codex app-server` process via
67/// newline-delimited JSON-RPC over stdio. Manages request/response
68/// correlation and buffers incoming notifications that arrive while
69/// waiting for RPC responses.
70///
71/// The client automatically kills the app-server process when dropped.
72pub struct AsyncClient {
73    child: Child,
74    writer: BufWriter<tokio::process::ChildStdin>,
75    reader: BufReader<tokio::process::ChildStdout>,
76    stderr: Option<BufReader<ChildStderr>>,
77    next_id: AtomicI64,
78    /// Buffered incoming messages (notifications/server requests) that arrived
79    /// while waiting for a response to a client request.
80    buffered: VecDeque<ServerMessage>,
81}
82
83impl AsyncClient {
84    /// Start an app-server with default settings.
85    ///
86    /// Spawns `codex app-server --listen stdio://` and returns a connected client.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the `codex` CLI is not installed, the version is
91    /// incompatible, or the process fails to start.
92    pub async fn start() -> Result<Self> {
93        Self::start_with(AppServerBuilder::new()).await
94    }
95
96    /// Start an app-server with a custom [`AppServerBuilder`].
97    ///
98    /// Use this to configure a custom binary path or working directory.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the process fails to start or stdio pipes
103    /// cannot be established.
104    pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
105        crate::version::check_codex_version_async().await?;
106
107        let mut child = builder.spawn().await?;
108
109        let stdin = child
110            .stdin
111            .take()
112            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
113        let stdout = child
114            .stdout
115            .take()
116            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
117        let stderr = child.stderr.take().map(BufReader::new);
118
119        Ok(Self {
120            child,
121            writer: BufWriter::new(stdin),
122            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
123            stderr,
124            next_id: AtomicI64::new(1),
125            buffered: VecDeque::new(),
126        })
127    }
128
129    /// Send a JSON-RPC request and wait for the matching response.
130    ///
131    /// Any notifications or server requests that arrive before the response
132    /// are buffered and can be retrieved via [`AsyncClient::next_message`].
133    ///
134    /// # Errors
135    ///
136    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
137    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
138    /// - [`Error::Json`] if response deserialization fails
139    pub async fn request<P: Serialize, R: DeserializeOwned>(
140        &mut self,
141        method: &str,
142        params: &P,
143    ) -> Result<R> {
144        let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
145
146        let req = JsonRpcRequest {
147            id: id.clone(),
148            method: method.to_string(),
149            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
150        };
151
152        self.send_raw(&req).await?;
153
154        // Read lines until we get a response matching our id
155        loop {
156            let msg = self.read_message().await?;
157            match msg {
158                JsonRpcMessage::Response(resp) if resp.id == id => {
159                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
160                    return Ok(result);
161                }
162                JsonRpcMessage::Error(err) if err.id == id => {
163                    return Err(Error::JsonRpc {
164                        code: err.error.code,
165                        message: err.error.message,
166                    });
167                }
168                // Buffer notifications and server requests
169                JsonRpcMessage::Notification(notif) => {
170                    self.buffered.push_back(ServerMessage::Notification {
171                        method: notif.method,
172                        params: notif.params,
173                    });
174                }
175                JsonRpcMessage::Request(req) => {
176                    self.buffered.push_back(ServerMessage::Request {
177                        id: req.id,
178                        method: req.method,
179                        params: req.params,
180                    });
181                }
182                // Response/error for a different id — unexpected
183                JsonRpcMessage::Response(resp) => {
184                    warn!(
185                        "[CLIENT] Unexpected response for id={}, expected id={}",
186                        resp.id, id
187                    );
188                }
189                JsonRpcMessage::Error(err) => {
190                    warn!(
191                        "[CLIENT] Unexpected error for id={}, expected id={}",
192                        err.id, id
193                    );
194                }
195            }
196        }
197    }
198
199    /// Start a new thread (conversation session).
200    ///
201    /// A thread must be created before any turns can be started. The returned
202    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
203    pub async fn thread_start(
204        &mut self,
205        params: &ThreadStartParams,
206    ) -> Result<ThreadStartResponse> {
207        self.request(crate::protocol::methods::THREAD_START, params)
208            .await
209    }
210
211    /// Start a new turn within a thread.
212    ///
213    /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
214    /// to stream notifications until `turn/completed` arrives.
215    pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
216        self.request(crate::protocol::methods::TURN_START, params)
217            .await
218    }
219
220    /// Interrupt an active turn.
221    pub async fn turn_interrupt(
222        &mut self,
223        params: &TurnInterruptParams,
224    ) -> Result<TurnInterruptResponse> {
225        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
226            .await
227    }
228
229    /// Archive a thread.
230    pub async fn thread_archive(
231        &mut self,
232        params: &ThreadArchiveParams,
233    ) -> Result<ThreadArchiveResponse> {
234        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
235            .await
236    }
237
238    /// Respond to a server-to-client request (e.g., approval flow).
239    ///
240    /// When the server sends a [`ServerMessage::Request`], it expects a response.
241    /// Use this method with the request's `id` and a result payload. For command
242    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
243    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
244    pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
245        let resp = JsonRpcResponse {
246            id,
247            result: serde_json::to_value(result).map_err(Error::Json)?,
248        };
249        self.send_raw(&resp).await
250    }
251
252    /// Respond to a server-to-client request with an error.
253    pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
254        let err = JsonRpcError {
255            id,
256            error: crate::jsonrpc::JsonRpcErrorData {
257                code,
258                message: message.to_string(),
259                data: None,
260            },
261        };
262        self.send_raw(&err).await
263    }
264
265    /// Read the next incoming server message (notification or server request).
266    ///
267    /// Returns buffered messages first (from notifications that arrived during
268    /// an [`AsyncClient::request`] call), then reads from the wire.
269    ///
270    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
271    ///
272    /// # Typical notification methods
273    ///
274    /// | Method | Meaning |
275    /// |--------|---------|
276    /// | `turn/started` | Agent began processing |
277    /// | `item/agentMessage/delta` | Streaming text chunk |
278    /// | `item/commandExecution/outputDelta` | Command output chunk |
279    /// | `item/started` / `item/completed` | Item lifecycle |
280    /// | `turn/completed` | Agent finished the turn |
281    /// | `error` | Server-side error |
282    pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
283        // Drain buffered messages first
284        if let Some(msg) = self.buffered.pop_front() {
285            return Ok(Some(msg));
286        }
287
288        // Read from the wire
289        loop {
290            let msg = match self.read_message_opt().await? {
291                Some(m) => m,
292                None => return Ok(None),
293            };
294
295            match msg {
296                JsonRpcMessage::Notification(notif) => {
297                    return Ok(Some(ServerMessage::Notification {
298                        method: notif.method,
299                        params: notif.params,
300                    }));
301                }
302                JsonRpcMessage::Request(req) => {
303                    return Ok(Some(ServerMessage::Request {
304                        id: req.id,
305                        method: req.method,
306                        params: req.params,
307                    }));
308                }
309                // Unexpected responses without a pending request
310                JsonRpcMessage::Response(resp) => {
311                    warn!(
312                        "[CLIENT] Unexpected response (no pending request): id={}",
313                        resp.id
314                    );
315                }
316                JsonRpcMessage::Error(err) => {
317                    warn!(
318                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
319                        err.id, err.error.code
320                    );
321                }
322            }
323        }
324    }
325
326    /// Return an async event stream over [`ServerMessage`]s.
327    ///
328    /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
329    /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
330    /// gather all messages until EOF.
331    pub fn events(&mut self) -> EventStream<'_> {
332        EventStream { client: self }
333    }
334
335    /// Take the stderr reader (can only be called once).
336    ///
337    /// Useful for logging or diagnostics. Returns `None` on subsequent calls.
338    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
339        self.stderr.take()
340    }
341
342    /// Get the process ID.
343    pub fn pid(&self) -> Option<u32> {
344        self.child.id()
345    }
346
347    /// Check if the child process is still running.
348    pub fn is_alive(&mut self) -> bool {
349        self.child.try_wait().ok().flatten().is_none()
350    }
351
352    /// Shut down the app-server process.
353    ///
354    /// Consumes the client. If you don't call this explicitly, the
355    /// [`Drop`] implementation will kill the process automatically.
356    pub async fn shutdown(mut self) -> Result<()> {
357        debug!("[CLIENT] Shutting down");
358        self.child.kill().await.map_err(Error::Io)?;
359        Ok(())
360    }
361
362    // -- internal --
363
364    async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
365        let json = serde_json::to_string(msg).map_err(Error::Json)?;
366        debug!("[CLIENT] Sending: {}", json);
367        self.writer
368            .write_all(json.as_bytes())
369            .await
370            .map_err(Error::Io)?;
371        self.writer.write_all(b"\n").await.map_err(Error::Io)?;
372        self.writer.flush().await.map_err(Error::Io)?;
373        Ok(())
374    }
375
376    async fn read_message(&mut self) -> Result<JsonRpcMessage> {
377        self.read_message_opt().await?.ok_or(Error::ServerClosed)
378    }
379
380    async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
381        let mut line = String::new();
382
383        loop {
384            line.clear();
385            let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
386
387            if bytes_read == 0 {
388                debug!("[CLIENT] Stream closed (EOF)");
389                return Ok(None);
390            }
391
392            let trimmed = line.trim();
393            if trimmed.is_empty() {
394                continue;
395            }
396
397            debug!("[CLIENT] Received: {}", trimmed);
398
399            match serde_json::from_str::<JsonRpcMessage>(trimmed) {
400                Ok(msg) => return Ok(Some(msg)),
401                Err(e) => {
402                    warn!(
403                        "[CLIENT] Failed to deserialize message. \
404                         Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
405                    );
406                    warn!("[CLIENT] Parse error: {}", e);
407                    warn!("[CLIENT] Raw: {}", trimmed);
408                    return Err(Error::Deserialization(format!("{} (raw: {})", e, trimmed)));
409                }
410            }
411        }
412    }
413}
414
415impl Drop for AsyncClient {
416    fn drop(&mut self) {
417        if self.is_alive() {
418            if let Err(e) = self.child.start_kill() {
419                error!("Failed to kill app-server process on drop: {}", e);
420            }
421        }
422    }
423}
424
425/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
426pub struct EventStream<'a> {
427    client: &'a mut AsyncClient,
428}
429
430impl EventStream<'_> {
431    /// Get the next server message.
432    pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
433        match self.client.next_message().await {
434            Ok(Some(msg)) => Some(Ok(msg)),
435            Ok(None) => None,
436            Err(e) => Some(Err(e)),
437        }
438    }
439
440    /// Collect all remaining messages.
441    pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
442        let mut msgs = Vec::new();
443        while let Some(result) = self.next().await {
444            msgs.push(result?);
445        }
446        Ok(msgs)
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_buffer_size() {
456        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
457    }
458}