Skip to main content

codex_codes/
client_async.rs

1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns and initializes the app-server)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//!     thread_id: thread.thread_id().to_string(),
27//!     input: vec![UserInput::Text { text: "Hello!".into() }],
28//!     model: None,
29//!     reasoning_effort: None,
30//!     sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//!     match msg {
35//!         ServerMessage::Notification(n) => {
36//!             if let codex_codes::Notification::TurnCompleted(_) = n { break; }
37//!         }
38//!         ServerMessage::Request { id, .. } => {
39//!             client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//!         }
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48    JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52    ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53    ThreadStartParams, ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse,
54    TurnStartParams, TurnStartResponse,
55};
56use log::{debug, error, warn};
57use serde::de::DeserializeOwned;
58use serde::Serialize;
59use std::collections::VecDeque;
60use std::sync::atomic::{AtomicI64, Ordering};
61use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
62use tokio::process::Child;
63
64/// Buffer size for reading stdout (10MB).
65const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
66
67/// Asynchronous multi-turn client for the Codex app-server.
68///
69/// Communicates with a long-lived `codex app-server` process via
70/// newline-delimited JSON-RPC over stdio. Manages request/response
71/// correlation and buffers incoming notifications that arrive while
72/// waiting for RPC responses.
73///
74/// The client automatically kills the app-server process when dropped.
75pub struct AsyncClient {
76    child: Child,
77    writer: BufWriter<tokio::process::ChildStdin>,
78    reader: BufReader<tokio::process::ChildStdout>,
79    /// Handle to the background task draining the child's stderr pipe.
80    /// Kept alive for the lifetime of the client; the task exits on EOF
81    /// when the child is killed.
82    _stderr_drain: tokio::task::JoinHandle<()>,
83    next_id: AtomicI64,
84    /// Buffered incoming messages (notifications/server requests) that arrived
85    /// while waiting for a response to a client request.
86    buffered: VecDeque<ServerMessage>,
87}
88
89impl AsyncClient {
90    /// Start an app-server with default settings.
91    ///
92    /// Spawns `codex app-server --listen stdio://`, performs the required
93    /// `initialize` handshake, and returns a connected client ready for
94    /// `thread_start()`.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the `codex` CLI is not installed, the version is
99    /// incompatible, the process fails to start, or the initialization
100    /// handshake fails.
101    pub async fn start() -> Result<Self> {
102        Self::start_with(AppServerBuilder::new()).await
103    }
104
105    /// Start an app-server with a custom [`AppServerBuilder`].
106    ///
107    /// Performs the required `initialize` handshake before returning.
108    /// Use this to configure a custom binary path or working directory.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the process fails to start, stdio pipes
113    /// cannot be established, or the initialization handshake fails.
114    pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
115        let mut client = Self::spawn(builder).await?;
116        client
117            .initialize(&InitializeParams {
118                client_info: ClientInfo {
119                    name: "codex-codes".to_string(),
120                    version: env!("CARGO_PKG_VERSION").to_string(),
121                    title: None,
122                },
123                capabilities: None,
124            })
125            .await?;
126        Ok(client)
127    }
128
129    /// Spawn an app-server without performing the `initialize` handshake.
130    ///
131    /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
132    /// specific capabilities). You **must** call [`AsyncClient::initialize`]
133    /// before any other requests.
134    pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
135        crate::version::check_codex_version_async().await?;
136
137        let mut child = builder.spawn().await?;
138
139        let stdin = child
140            .stdin
141            .take()
142            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
143        let stdout = child
144            .stdout
145            .take()
146            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
147        let stderr = child
148            .stderr
149            .take()
150            .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
151
152        // The app-server emits ~200 KB/s of tracing to stderr. Without an
153        // active reader, the ~64 KB kernel pipe fills almost instantly and
154        // the child blocks. Drain in the background and route lines through
155        // the `log` crate (see [`crate::stderr_drain`]).
156        let stderr_drain = crate::stderr_drain::spawn_async(stderr);
157
158        Ok(Self {
159            child,
160            writer: BufWriter::new(stdin),
161            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
162            _stderr_drain: stderr_drain,
163            next_id: AtomicI64::new(1),
164            buffered: VecDeque::new(),
165        })
166    }
167
168    /// Send a JSON-RPC request and wait for the matching response.
169    ///
170    /// Any notifications or server requests that arrive before the response
171    /// are buffered and can be retrieved via [`AsyncClient::next_message`].
172    ///
173    /// # Errors
174    ///
175    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
176    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
177    /// - [`Error::Json`] if response deserialization fails
178    pub async fn request<P: Serialize, R: DeserializeOwned>(
179        &mut self,
180        method: &str,
181        params: &P,
182    ) -> Result<R> {
183        let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
184
185        let req = JsonRpcRequest {
186            id: id.clone(),
187            method: method.to_string(),
188            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
189        };
190
191        self.send_raw(&req).await?;
192
193        // Read lines until we get a response matching our id
194        loop {
195            let msg = self.read_message().await?;
196            match msg {
197                JsonRpcMessage::Response(resp) if resp.id == id => {
198                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
199                    return Ok(result);
200                }
201                JsonRpcMessage::Error(err) if err.id == id => {
202                    return Err(Error::JsonRpc {
203                        code: err.error.code,
204                        message: err.error.message,
205                    });
206                }
207                // Buffer notifications and server requests
208                JsonRpcMessage::Notification(notif) => {
209                    let typed = Notification::from_envelope(&notif.method, notif.params)
210                        .map_err(Error::Json)?;
211                    self.buffered.push_back(ServerMessage::Notification(typed));
212                }
213                JsonRpcMessage::Request(req) => {
214                    let typed = ServerRequest::from_envelope(&req.method, req.params)
215                        .map_err(Error::Json)?;
216                    self.buffered.push_back(ServerMessage::Request {
217                        id: req.id,
218                        request: typed,
219                    });
220                }
221                // Response/error for a different id — unexpected
222                JsonRpcMessage::Response(resp) => {
223                    warn!(
224                        "[CLIENT] Unexpected response for id={}, expected id={}",
225                        resp.id, id
226                    );
227                }
228                JsonRpcMessage::Error(err) => {
229                    warn!(
230                        "[CLIENT] Unexpected error for id={}, expected id={}",
231                        err.id, id
232                    );
233                }
234            }
235        }
236    }
237
238    /// Start a new thread (conversation session).
239    ///
240    /// A thread must be created before any turns can be started. The returned
241    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
242    pub async fn thread_start(
243        &mut self,
244        params: &ThreadStartParams,
245    ) -> Result<ThreadStartResponse> {
246        self.request(crate::protocol::methods::THREAD_START, params)
247            .await
248    }
249
250    /// Start a new turn within a thread.
251    ///
252    /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
253    /// to stream notifications until `turn/completed` arrives.
254    pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
255        self.request(crate::protocol::methods::TURN_START, params)
256            .await
257    }
258
259    /// Interrupt an active turn.
260    pub async fn turn_interrupt(
261        &mut self,
262        params: &TurnInterruptParams,
263    ) -> Result<TurnInterruptResponse> {
264        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
265            .await
266    }
267
268    /// Archive a thread.
269    pub async fn thread_archive(
270        &mut self,
271        params: &ThreadArchiveParams,
272    ) -> Result<ThreadArchiveResponse> {
273        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
274            .await
275    }
276
277    /// Perform the `initialize` handshake with the app-server.
278    ///
279    /// Sends `initialize` with the given params and then sends the
280    /// `initialized` notification. This must be the first request after
281    /// spawning the process.
282    pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
283        let resp: InitializeResponse = self
284            .request(crate::protocol::methods::INITIALIZE, params)
285            .await?;
286        self.send_notification(crate::protocol::methods::INITIALIZED)
287            .await?;
288        Ok(resp)
289    }
290
291    /// Respond to a server-to-client request (e.g., approval flow).
292    ///
293    /// When the server sends a [`ServerMessage::Request`], it expects a response.
294    /// Use this method with the request's `id` and a result payload. For command
295    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
296    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
297    pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
298        let resp = JsonRpcResponse {
299            id,
300            result: serde_json::to_value(result).map_err(Error::Json)?,
301        };
302        self.send_raw(&resp).await
303    }
304
305    /// Respond to a server-to-client request with an error.
306    pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
307        let err = JsonRpcError {
308            id,
309            error: crate::jsonrpc::JsonRpcErrorData {
310                code,
311                message: message.to_string(),
312                data: None,
313            },
314        };
315        self.send_raw(&err).await
316    }
317
318    /// Read the next incoming server message (notification or server request).
319    ///
320    /// Returns buffered messages first (from notifications that arrived during
321    /// an [`AsyncClient::request`] call), then reads from the wire.
322    ///
323    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
324    ///
325    /// # Typical notification methods
326    ///
327    /// | Method | Meaning |
328    /// |--------|---------|
329    /// | `turn/started` | Agent began processing |
330    /// | `item/agentMessage/delta` | Streaming text chunk |
331    /// | `item/commandExecution/outputDelta` | Command output chunk |
332    /// | `item/started` / `item/completed` | Item lifecycle |
333    /// | `turn/completed` | Agent finished the turn |
334    /// | `error` | Server-side error |
335    pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
336        // Drain buffered messages first
337        if let Some(msg) = self.buffered.pop_front() {
338            return Ok(Some(msg));
339        }
340
341        // Read from the wire
342        loop {
343            let msg = match self.read_message_opt().await? {
344                Some(m) => m,
345                None => return Ok(None),
346            };
347
348            match msg {
349                JsonRpcMessage::Notification(notif) => {
350                    let JsonRpcNotification { method, params } = notif;
351                    let typed =
352                        Notification::from_envelope(&method, params.clone()).map_err(|e| {
353                            Error::Deserialization(ParseError::from_envelope(method, params, e))
354                        })?;
355                    return Ok(Some(ServerMessage::Notification(typed)));
356                }
357                JsonRpcMessage::Request(req) => {
358                    let JsonRpcRequest { id, method, params } = req;
359                    let typed =
360                        ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
361                            Error::Deserialization(ParseError::from_envelope(method, params, e))
362                        })?;
363                    return Ok(Some(ServerMessage::Request { id, request: typed }));
364                }
365                // Unexpected responses without a pending request
366                JsonRpcMessage::Response(resp) => {
367                    warn!(
368                        "[CLIENT] Unexpected response (no pending request): id={}",
369                        resp.id
370                    );
371                }
372                JsonRpcMessage::Error(err) => {
373                    warn!(
374                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
375                        err.id, err.error.code
376                    );
377                }
378            }
379        }
380    }
381
382    /// Return an async event stream over [`ServerMessage`]s.
383    ///
384    /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
385    /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
386    /// gather all messages until EOF.
387    pub fn events(&mut self) -> EventStream<'_> {
388        EventStream { client: self }
389    }
390
391    /// Get the process ID.
392    pub fn pid(&self) -> Option<u32> {
393        self.child.id()
394    }
395
396    /// Check if the child process is still running.
397    pub fn is_alive(&mut self) -> bool {
398        self.child.try_wait().ok().flatten().is_none()
399    }
400
401    /// Shut down the app-server process.
402    ///
403    /// Consumes the client. If you don't call this explicitly, the
404    /// [`Drop`] implementation will kill the process automatically.
405    pub async fn shutdown(mut self) -> Result<()> {
406        debug!("[CLIENT] Shutting down");
407        self.child.kill().await.map_err(Error::Io)?;
408        Ok(())
409    }
410
411    // -- internal --
412
413    async fn send_notification(&mut self, method: &str) -> Result<()> {
414        let notif = JsonRpcNotification {
415            method: method.to_string(),
416            params: None,
417        };
418        self.send_raw(&notif).await
419    }
420
421    async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
422        let json = serde_json::to_string(msg).map_err(Error::Json)?;
423        debug!("[CLIENT] Sending: {}", json);
424        self.writer
425            .write_all(json.as_bytes())
426            .await
427            .map_err(Error::Io)?;
428        self.writer.write_all(b"\n").await.map_err(Error::Io)?;
429        self.writer.flush().await.map_err(Error::Io)?;
430        Ok(())
431    }
432
433    async fn read_message(&mut self) -> Result<JsonRpcMessage> {
434        self.read_message_opt().await?.ok_or(Error::ServerClosed)
435    }
436
437    async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
438        let mut line = String::new();
439
440        loop {
441            line.clear();
442            let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
443
444            if bytes_read == 0 {
445                debug!("[CLIENT] Stream closed (EOF)");
446                return Ok(None);
447            }
448
449            let trimmed = line.trim();
450            if trimmed.is_empty() {
451                continue;
452            }
453
454            debug!("[CLIENT] Received: {}", trimmed);
455
456            match serde_json::from_str::<JsonRpcMessage>(trimmed) {
457                Ok(msg) => return Ok(Some(msg)),
458                Err(e) => {
459                    warn!(
460                        "[CLIENT] Failed to deserialize message. \
461                         Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
462                    );
463                    warn!("[CLIENT] Parse error: {}", e);
464                    warn!("[CLIENT] Raw: {}", trimmed);
465                    return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
466                }
467            }
468        }
469    }
470}
471
472impl Drop for AsyncClient {
473    fn drop(&mut self) {
474        if self.is_alive() {
475            if let Err(e) = self.child.start_kill() {
476                error!("Failed to kill app-server process on drop: {}", e);
477            }
478        }
479    }
480}
481
482/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
483pub struct EventStream<'a> {
484    client: &'a mut AsyncClient,
485}
486
487impl EventStream<'_> {
488    /// Get the next server message.
489    pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
490        match self.client.next_message().await {
491            Ok(Some(msg)) => Some(Ok(msg)),
492            Ok(None) => None,
493            Err(e) => Some(Err(e)),
494        }
495    }
496
497    /// Collect all remaining messages.
498    pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
499        let mut msgs = Vec::new();
500        while let Some(result) = self.next().await {
501            msgs.push(result?);
502        }
503        Ok(msgs)
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[test]
512    fn test_buffer_size() {
513        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
514    }
515}