Skip to main content

codex_codes/
client_async.rs

1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns and initializes the app-server)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//!     thread_id: thread.thread_id().to_string(),
27//!     input: vec![UserInput::Text { text: "Hello!".into() }],
28//!     model: None,
29//!     reasoning_effort: None,
30//!     sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//!     match msg {
35//!         ServerMessage::Notification(n) => {
36//!             if let codex_codes::Notification::TurnCompleted(_) = n { break; }
37//!         }
38//!         ServerMessage::Request { id, .. } => {
39//!             client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//!         }
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48    JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52    ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53    ThreadDeleteParams, ThreadDeleteResponse, ThreadForkParams, ThreadForkResponse,
54    ThreadResumeParams, ThreadResumeResponse, ThreadStartParams, ThreadStartResponse,
55    TurnInterruptParams, TurnInterruptResponse, TurnStartParams, TurnStartResponse,
56};
57use log::{debug, error, warn};
58use serde::de::DeserializeOwned;
59use serde::Serialize;
60use std::collections::VecDeque;
61use std::sync::atomic::{AtomicI64, Ordering};
62use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
63use tokio::process::Child;
64
65/// Buffer size for reading stdout (10MB).
66const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
67
68/// Asynchronous multi-turn client for the Codex app-server.
69///
70/// Communicates with a long-lived `codex app-server` process via
71/// newline-delimited JSON-RPC over stdio. Manages request/response
72/// correlation and buffers incoming notifications that arrive while
73/// waiting for RPC responses.
74///
75/// The client automatically kills the app-server process when dropped.
76pub struct AsyncClient {
77    child: Child,
78    writer: BufWriter<tokio::process::ChildStdin>,
79    reader: BufReader<tokio::process::ChildStdout>,
80    /// Handle to the background task draining the child's stderr pipe.
81    /// Kept alive for the lifetime of the client; the task exits on EOF
82    /// when the child is killed.
83    _stderr_drain: tokio::task::JoinHandle<()>,
84    next_id: AtomicI64,
85    /// Buffered incoming messages (notifications/server requests) that arrived
86    /// while waiting for a response to a client request.
87    buffered: VecDeque<ServerMessage>,
88}
89
90impl AsyncClient {
91    /// Start an app-server with default settings.
92    ///
93    /// Spawns `codex app-server --listen stdio://`, performs the required
94    /// `initialize` handshake, and returns a connected client ready for
95    /// `thread_start()`.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the `codex` CLI is not installed, the version is
100    /// incompatible, the process fails to start, or the initialization
101    /// handshake fails.
102    pub async fn start() -> Result<Self> {
103        Self::start_with(AppServerBuilder::new()).await
104    }
105
106    /// Start an app-server with a custom [`AppServerBuilder`].
107    ///
108    /// Performs the required `initialize` handshake before returning.
109    /// Use this to configure a custom binary path or working directory.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the process fails to start, stdio pipes
114    /// cannot be established, or the initialization handshake fails.
115    pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
116        let mut client = Self::spawn(builder).await?;
117        client
118            .initialize(&InitializeParams {
119                client_info: ClientInfo {
120                    name: "codex-codes".to_string(),
121                    version: env!("CARGO_PKG_VERSION").to_string(),
122                    title: None,
123                },
124                capabilities: None,
125            })
126            .await?;
127        Ok(client)
128    }
129
130    /// Spawn an app-server without performing the `initialize` handshake.
131    ///
132    /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
133    /// specific capabilities). You **must** call [`AsyncClient::initialize`]
134    /// before any other requests.
135    pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
136        crate::version::check_codex_version_async().await?;
137
138        let mut child = builder.spawn().await?;
139
140        let stdin = child
141            .stdin
142            .take()
143            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
144        let stdout = child
145            .stdout
146            .take()
147            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
148        let stderr = child
149            .stderr
150            .take()
151            .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
152
153        // The app-server emits ~200 KB/s of tracing to stderr. Without an
154        // active reader, the ~64 KB kernel pipe fills almost instantly and
155        // the child blocks. Drain in the background and route lines through
156        // the `log` crate (see [`crate::stderr_drain`]).
157        let stderr_drain = crate::stderr_drain::spawn_async(stderr);
158
159        Ok(Self {
160            child,
161            writer: BufWriter::new(stdin),
162            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
163            _stderr_drain: stderr_drain,
164            next_id: AtomicI64::new(1),
165            buffered: VecDeque::new(),
166        })
167    }
168
169    /// Send a JSON-RPC request and wait for the matching response.
170    ///
171    /// Any notifications or server requests that arrive before the response
172    /// are buffered and can be retrieved via [`AsyncClient::next_message`].
173    ///
174    /// # Errors
175    ///
176    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
177    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
178    /// - [`Error::Json`] if response deserialization fails
179    pub async fn request<P: Serialize, R: DeserializeOwned>(
180        &mut self,
181        method: &str,
182        params: &P,
183    ) -> Result<R> {
184        let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
185
186        let req = JsonRpcRequest {
187            id: id.clone(),
188            method: method.to_string(),
189            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
190        };
191
192        self.send_raw(&req).await?;
193
194        // Read lines until we get a response matching our id
195        loop {
196            let msg = self.read_message().await?;
197            match msg {
198                JsonRpcMessage::Response(resp) if resp.id == id => {
199                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
200                    return Ok(result);
201                }
202                JsonRpcMessage::Error(err) if err.id == id => {
203                    return Err(Error::JsonRpc {
204                        code: err.error.code,
205                        message: err.error.message,
206                    });
207                }
208                // Buffer notifications and server requests
209                JsonRpcMessage::Notification(notif) => {
210                    let typed = Notification::from_envelope(&notif.method, notif.params)
211                        .map_err(Error::Json)?;
212                    self.buffered.push_back(ServerMessage::Notification(typed));
213                }
214                JsonRpcMessage::Request(req) => {
215                    let typed = ServerRequest::from_envelope(&req.method, req.params)
216                        .map_err(Error::Json)?;
217                    self.buffered.push_back(ServerMessage::Request {
218                        id: req.id,
219                        request: typed,
220                    });
221                }
222                // Response/error for a different id — unexpected
223                JsonRpcMessage::Response(resp) => {
224                    warn!(
225                        "[CLIENT] Unexpected response for id={}, expected id={}",
226                        resp.id, id
227                    );
228                }
229                JsonRpcMessage::Error(err) => {
230                    warn!(
231                        "[CLIENT] Unexpected error for id={}, expected id={}",
232                        err.id, id
233                    );
234                }
235            }
236        }
237    }
238
239    /// Start a new thread (conversation session).
240    ///
241    /// A thread must be created before any turns can be started. The returned
242    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
243    pub async fn thread_start(
244        &mut self,
245        params: &ThreadStartParams,
246    ) -> Result<ThreadStartResponse> {
247        self.request(crate::protocol::methods::THREAD_START, params)
248            .await
249    }
250
251    /// Resume a previously persisted thread by id.
252    ///
253    /// Replays the thread's history so turns can continue where they left off.
254    pub async fn thread_resume(
255        &mut self,
256        params: &ThreadResumeParams,
257    ) -> Result<ThreadResumeResponse> {
258        self.request(crate::protocol::methods::THREAD_RESUME, params)
259            .await
260    }
261
262    /// Fork an existing thread into a new independent thread.
263    pub async fn thread_fork(&mut self, params: &ThreadForkParams) -> Result<ThreadForkResponse> {
264        self.request(crate::protocol::methods::THREAD_FORK, params)
265            .await
266    }
267
268    /// Start a new turn within a thread.
269    ///
270    /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
271    /// to stream notifications until `turn/completed` arrives.
272    pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
273        self.request(crate::protocol::methods::TURN_START, params)
274            .await
275    }
276
277    /// Interrupt an active turn.
278    pub async fn turn_interrupt(
279        &mut self,
280        params: &TurnInterruptParams,
281    ) -> Result<TurnInterruptResponse> {
282        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
283            .await
284    }
285
286    /// Archive a thread.
287    pub async fn thread_archive(
288        &mut self,
289        params: &ThreadArchiveParams,
290    ) -> Result<ThreadArchiveResponse> {
291        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
292            .await
293    }
294
295    /// Delete a thread.
296    pub async fn thread_delete(
297        &mut self,
298        params: &ThreadDeleteParams,
299    ) -> Result<ThreadDeleteResponse> {
300        self.request(crate::protocol::methods::THREAD_DELETE, params)
301            .await
302    }
303
304    /// Perform the `initialize` handshake with the app-server.
305    ///
306    /// Sends `initialize` with the given params and then sends the
307    /// `initialized` notification. This must be the first request after
308    /// spawning the process.
309    pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
310        let resp: InitializeResponse = self
311            .request(crate::protocol::methods::INITIALIZE, params)
312            .await?;
313        self.send_notification(crate::protocol::methods::INITIALIZED)
314            .await?;
315        Ok(resp)
316    }
317
318    /// Respond to a server-to-client request (e.g., approval flow).
319    ///
320    /// When the server sends a [`ServerMessage::Request`], it expects a response.
321    /// Use this method with the request's `id` and a result payload. For command
322    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
323    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
324    pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
325        let resp = JsonRpcResponse {
326            id,
327            result: serde_json::to_value(result).map_err(Error::Json)?,
328        };
329        self.send_raw(&resp).await
330    }
331
332    /// Respond to a server-to-client request with an error.
333    pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
334        let err = JsonRpcError {
335            id,
336            error: crate::jsonrpc::JsonRpcErrorData {
337                code,
338                message: message.to_string(),
339                data: None,
340            },
341        };
342        self.send_raw(&err).await
343    }
344
345    /// Read the next incoming server message (notification or server request).
346    ///
347    /// Returns buffered messages first (from notifications that arrived during
348    /// an [`AsyncClient::request`] call), then reads from the wire.
349    ///
350    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
351    ///
352    /// # Typical notification methods
353    ///
354    /// | Method | Meaning |
355    /// |--------|---------|
356    /// | `turn/started` | Agent began processing |
357    /// | `item/agentMessage/delta` | Streaming text chunk |
358    /// | `item/commandExecution/outputDelta` | Command output chunk |
359    /// | `item/started` / `item/completed` | Item lifecycle |
360    /// | `turn/completed` | Agent finished the turn |
361    /// | `error` | Server-side error |
362    pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
363        // Drain buffered messages first
364        if let Some(msg) = self.buffered.pop_front() {
365            return Ok(Some(msg));
366        }
367
368        // Read from the wire
369        loop {
370            let msg = match self.read_message_opt().await? {
371                Some(m) => m,
372                None => return Ok(None),
373            };
374
375            match msg {
376                JsonRpcMessage::Notification(notif) => {
377                    let JsonRpcNotification { method, params } = notif;
378                    let typed =
379                        Notification::from_envelope(&method, params.clone()).map_err(|e| {
380                            Error::Deserialization(ParseError::from_envelope(method, params, e))
381                        })?;
382                    return Ok(Some(ServerMessage::Notification(typed)));
383                }
384                JsonRpcMessage::Request(req) => {
385                    let JsonRpcRequest { id, method, params } = req;
386                    let typed =
387                        ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
388                            Error::Deserialization(ParseError::from_envelope(method, params, e))
389                        })?;
390                    return Ok(Some(ServerMessage::Request { id, request: typed }));
391                }
392                // Unexpected responses without a pending request
393                JsonRpcMessage::Response(resp) => {
394                    warn!(
395                        "[CLIENT] Unexpected response (no pending request): id={}",
396                        resp.id
397                    );
398                }
399                JsonRpcMessage::Error(err) => {
400                    warn!(
401                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
402                        err.id, err.error.code
403                    );
404                }
405            }
406        }
407    }
408
409    /// Return an async event stream over [`ServerMessage`]s.
410    ///
411    /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
412    /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
413    /// gather all messages until EOF.
414    pub fn events(&mut self) -> EventStream<'_> {
415        EventStream { client: self }
416    }
417
418    /// Get the process ID.
419    pub fn pid(&self) -> Option<u32> {
420        self.child.id()
421    }
422
423    /// Check if the child process is still running.
424    pub fn is_alive(&mut self) -> bool {
425        self.child.try_wait().ok().flatten().is_none()
426    }
427
428    /// Shut down the app-server process.
429    ///
430    /// Consumes the client. If you don't call this explicitly, the
431    /// [`Drop`] implementation will kill the process automatically.
432    pub async fn shutdown(mut self) -> Result<()> {
433        debug!("[CLIENT] Shutting down");
434        self.child.kill().await.map_err(Error::Io)?;
435        Ok(())
436    }
437
438    // -- internal --
439
440    async fn send_notification(&mut self, method: &str) -> Result<()> {
441        let notif = JsonRpcNotification {
442            method: method.to_string(),
443            params: None,
444        };
445        self.send_raw(&notif).await
446    }
447
448    async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
449        let json = serde_json::to_string(msg).map_err(Error::Json)?;
450        debug!("[CLIENT] Sending: {}", json);
451        self.writer
452            .write_all(json.as_bytes())
453            .await
454            .map_err(Error::Io)?;
455        self.writer.write_all(b"\n").await.map_err(Error::Io)?;
456        self.writer.flush().await.map_err(Error::Io)?;
457        Ok(())
458    }
459
460    async fn read_message(&mut self) -> Result<JsonRpcMessage> {
461        self.read_message_opt().await?.ok_or(Error::ServerClosed)
462    }
463
464    async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
465        let mut line = String::new();
466
467        loop {
468            line.clear();
469            let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
470
471            if bytes_read == 0 {
472                debug!("[CLIENT] Stream closed (EOF)");
473                return Ok(None);
474            }
475
476            let trimmed = line.trim();
477            if trimmed.is_empty() {
478                continue;
479            }
480
481            debug!("[CLIENT] Received: {}", trimmed);
482
483            match serde_json::from_str::<JsonRpcMessage>(trimmed) {
484                Ok(msg) => return Ok(Some(msg)),
485                Err(e) => {
486                    warn!(
487                        "[CLIENT] Failed to deserialize message. \
488                         Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
489                    );
490                    warn!("[CLIENT] Parse error: {}", e);
491                    warn!("[CLIENT] Raw: {}", trimmed);
492                    return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
493                }
494            }
495        }
496    }
497}
498
499impl Drop for AsyncClient {
500    fn drop(&mut self) {
501        if self.is_alive() {
502            if let Err(e) = self.child.start_kill() {
503                error!("Failed to kill app-server process on drop: {}", e);
504            }
505        }
506    }
507}
508
509/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
510pub struct EventStream<'a> {
511    client: &'a mut AsyncClient,
512}
513
514impl EventStream<'_> {
515    /// Get the next server message.
516    pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
517        match self.client.next_message().await {
518            Ok(Some(msg)) => Some(Ok(msg)),
519            Ok(None) => None,
520            Err(e) => Some(Err(e)),
521        }
522    }
523
524    /// Collect all remaining messages.
525    pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
526        let mut msgs = Vec::new();
527        while let Some(result) = self.next().await {
528            msgs.push(result?);
529        }
530        Ok(msgs)
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537
538    #[test]
539    fn test_buffer_size() {
540        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
541    }
542}