Skip to main content

codex_codes/
client_async.rs

1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns and initializes the app-server)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//!     thread_id: thread.thread_id().to_string(),
27//!     input: vec![UserInput::Text { text: "Hello!".into() }],
28//!     model: None,
29//!     reasoning_effort: None,
30//!     sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//!     match msg {
35//!         ServerMessage::Notification { method, params } => {
36//!             if method == "turn/completed" { break; }
37//!         }
38//!         ServerMessage::Request { id, method, .. } => {
39//!             client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//!         }
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{
48    JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::protocol::{
51    ClientInfo, InitializeParams, InitializeResponse, ServerMessage, ThreadArchiveParams,
52    ThreadArchiveResponse, ThreadStartParams, ThreadStartResponse, TurnInterruptParams,
53    TurnInterruptResponse, TurnStartParams, TurnStartResponse,
54};
55use log::{debug, error, warn};
56use serde::de::DeserializeOwned;
57use serde::Serialize;
58use std::collections::VecDeque;
59use std::sync::atomic::{AtomicI64, Ordering};
60use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
61use tokio::process::{Child, ChildStderr};
62
63/// Buffer size for reading stdout (10MB).
64const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
65
66/// Asynchronous multi-turn client for the Codex app-server.
67///
68/// Communicates with a long-lived `codex app-server` process via
69/// newline-delimited JSON-RPC over stdio. Manages request/response
70/// correlation and buffers incoming notifications that arrive while
71/// waiting for RPC responses.
72///
73/// The client automatically kills the app-server process when dropped.
74pub struct AsyncClient {
75    child: Child,
76    writer: BufWriter<tokio::process::ChildStdin>,
77    reader: BufReader<tokio::process::ChildStdout>,
78    stderr: Option<BufReader<ChildStderr>>,
79    next_id: AtomicI64,
80    /// Buffered incoming messages (notifications/server requests) that arrived
81    /// while waiting for a response to a client request.
82    buffered: VecDeque<ServerMessage>,
83}
84
85impl AsyncClient {
86    /// Start an app-server with default settings.
87    ///
88    /// Spawns `codex app-server --listen stdio://`, performs the required
89    /// `initialize` handshake, and returns a connected client ready for
90    /// `thread_start()`.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the `codex` CLI is not installed, the version is
95    /// incompatible, the process fails to start, or the initialization
96    /// handshake fails.
97    pub async fn start() -> Result<Self> {
98        Self::start_with(AppServerBuilder::new()).await
99    }
100
101    /// Start an app-server with a custom [`AppServerBuilder`].
102    ///
103    /// Performs the required `initialize` handshake before returning.
104    /// Use this to configure a custom binary path or working directory.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the process fails to start, stdio pipes
109    /// cannot be established, or the initialization handshake fails.
110    pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
111        let mut client = Self::spawn(builder).await?;
112        client
113            .initialize(&InitializeParams {
114                client_info: ClientInfo {
115                    name: "codex-codes".to_string(),
116                    version: env!("CARGO_PKG_VERSION").to_string(),
117                    title: None,
118                },
119                capabilities: None,
120            })
121            .await?;
122        Ok(client)
123    }
124
125    /// Spawn an app-server without performing the `initialize` handshake.
126    ///
127    /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
128    /// specific capabilities). You **must** call [`AsyncClient::initialize`]
129    /// before any other requests.
130    pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
131        crate::version::check_codex_version_async().await?;
132
133        let mut child = builder.spawn().await?;
134
135        let stdin = child
136            .stdin
137            .take()
138            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
139        let stdout = child
140            .stdout
141            .take()
142            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
143        let stderr = child.stderr.take().map(BufReader::new);
144
145        Ok(Self {
146            child,
147            writer: BufWriter::new(stdin),
148            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
149            stderr,
150            next_id: AtomicI64::new(1),
151            buffered: VecDeque::new(),
152        })
153    }
154
155    /// Send a JSON-RPC request and wait for the matching response.
156    ///
157    /// Any notifications or server requests that arrive before the response
158    /// are buffered and can be retrieved via [`AsyncClient::next_message`].
159    ///
160    /// # Errors
161    ///
162    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
163    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
164    /// - [`Error::Json`] if response deserialization fails
165    pub async fn request<P: Serialize, R: DeserializeOwned>(
166        &mut self,
167        method: &str,
168        params: &P,
169    ) -> Result<R> {
170        let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
171
172        let req = JsonRpcRequest {
173            id: id.clone(),
174            method: method.to_string(),
175            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
176        };
177
178        self.send_raw(&req).await?;
179
180        // Read lines until we get a response matching our id
181        loop {
182            let msg = self.read_message().await?;
183            match msg {
184                JsonRpcMessage::Response(resp) if resp.id == id => {
185                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
186                    return Ok(result);
187                }
188                JsonRpcMessage::Error(err) if err.id == id => {
189                    return Err(Error::JsonRpc {
190                        code: err.error.code,
191                        message: err.error.message,
192                    });
193                }
194                // Buffer notifications and server requests
195                JsonRpcMessage::Notification(notif) => {
196                    self.buffered.push_back(ServerMessage::Notification {
197                        method: notif.method,
198                        params: notif.params,
199                    });
200                }
201                JsonRpcMessage::Request(req) => {
202                    self.buffered.push_back(ServerMessage::Request {
203                        id: req.id,
204                        method: req.method,
205                        params: req.params,
206                    });
207                }
208                // Response/error for a different id — unexpected
209                JsonRpcMessage::Response(resp) => {
210                    warn!(
211                        "[CLIENT] Unexpected response for id={}, expected id={}",
212                        resp.id, id
213                    );
214                }
215                JsonRpcMessage::Error(err) => {
216                    warn!(
217                        "[CLIENT] Unexpected error for id={}, expected id={}",
218                        err.id, id
219                    );
220                }
221            }
222        }
223    }
224
225    /// Start a new thread (conversation session).
226    ///
227    /// A thread must be created before any turns can be started. The returned
228    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
229    pub async fn thread_start(
230        &mut self,
231        params: &ThreadStartParams,
232    ) -> Result<ThreadStartResponse> {
233        self.request(crate::protocol::methods::THREAD_START, params)
234            .await
235    }
236
237    /// Start a new turn within a thread.
238    ///
239    /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
240    /// to stream notifications until `turn/completed` arrives.
241    pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
242        self.request(crate::protocol::methods::TURN_START, params)
243            .await
244    }
245
246    /// Interrupt an active turn.
247    pub async fn turn_interrupt(
248        &mut self,
249        params: &TurnInterruptParams,
250    ) -> Result<TurnInterruptResponse> {
251        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
252            .await
253    }
254
255    /// Archive a thread.
256    pub async fn thread_archive(
257        &mut self,
258        params: &ThreadArchiveParams,
259    ) -> Result<ThreadArchiveResponse> {
260        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
261            .await
262    }
263
264    /// Perform the `initialize` handshake with the app-server.
265    ///
266    /// Sends `initialize` with the given params and then sends the
267    /// `initialized` notification. This must be the first request after
268    /// spawning the process.
269    pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
270        let resp: InitializeResponse = self
271            .request(crate::protocol::methods::INITIALIZE, params)
272            .await?;
273        self.send_notification(crate::protocol::methods::INITIALIZED)
274            .await?;
275        Ok(resp)
276    }
277
278    /// Respond to a server-to-client request (e.g., approval flow).
279    ///
280    /// When the server sends a [`ServerMessage::Request`], it expects a response.
281    /// Use this method with the request's `id` and a result payload. For command
282    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
283    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
284    pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
285        let resp = JsonRpcResponse {
286            id,
287            result: serde_json::to_value(result).map_err(Error::Json)?,
288        };
289        self.send_raw(&resp).await
290    }
291
292    /// Respond to a server-to-client request with an error.
293    pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
294        let err = JsonRpcError {
295            id,
296            error: crate::jsonrpc::JsonRpcErrorData {
297                code,
298                message: message.to_string(),
299                data: None,
300            },
301        };
302        self.send_raw(&err).await
303    }
304
305    /// Read the next incoming server message (notification or server request).
306    ///
307    /// Returns buffered messages first (from notifications that arrived during
308    /// an [`AsyncClient::request`] call), then reads from the wire.
309    ///
310    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
311    ///
312    /// # Typical notification methods
313    ///
314    /// | Method | Meaning |
315    /// |--------|---------|
316    /// | `turn/started` | Agent began processing |
317    /// | `item/agentMessage/delta` | Streaming text chunk |
318    /// | `item/commandExecution/outputDelta` | Command output chunk |
319    /// | `item/started` / `item/completed` | Item lifecycle |
320    /// | `turn/completed` | Agent finished the turn |
321    /// | `error` | Server-side error |
322    pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
323        // Drain buffered messages first
324        if let Some(msg) = self.buffered.pop_front() {
325            return Ok(Some(msg));
326        }
327
328        // Read from the wire
329        loop {
330            let msg = match self.read_message_opt().await? {
331                Some(m) => m,
332                None => return Ok(None),
333            };
334
335            match msg {
336                JsonRpcMessage::Notification(notif) => {
337                    return Ok(Some(ServerMessage::Notification {
338                        method: notif.method,
339                        params: notif.params,
340                    }));
341                }
342                JsonRpcMessage::Request(req) => {
343                    return Ok(Some(ServerMessage::Request {
344                        id: req.id,
345                        method: req.method,
346                        params: req.params,
347                    }));
348                }
349                // Unexpected responses without a pending request
350                JsonRpcMessage::Response(resp) => {
351                    warn!(
352                        "[CLIENT] Unexpected response (no pending request): id={}",
353                        resp.id
354                    );
355                }
356                JsonRpcMessage::Error(err) => {
357                    warn!(
358                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
359                        err.id, err.error.code
360                    );
361                }
362            }
363        }
364    }
365
366    /// Return an async event stream over [`ServerMessage`]s.
367    ///
368    /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
369    /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
370    /// gather all messages until EOF.
371    pub fn events(&mut self) -> EventStream<'_> {
372        EventStream { client: self }
373    }
374
375    /// Take the stderr reader (can only be called once).
376    ///
377    /// Useful for logging or diagnostics. Returns `None` on subsequent calls.
378    pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
379        self.stderr.take()
380    }
381
382    /// Get the process ID.
383    pub fn pid(&self) -> Option<u32> {
384        self.child.id()
385    }
386
387    /// Check if the child process is still running.
388    pub fn is_alive(&mut self) -> bool {
389        self.child.try_wait().ok().flatten().is_none()
390    }
391
392    /// Shut down the app-server process.
393    ///
394    /// Consumes the client. If you don't call this explicitly, the
395    /// [`Drop`] implementation will kill the process automatically.
396    pub async fn shutdown(mut self) -> Result<()> {
397        debug!("[CLIENT] Shutting down");
398        self.child.kill().await.map_err(Error::Io)?;
399        Ok(())
400    }
401
402    // -- internal --
403
404    async fn send_notification(&mut self, method: &str) -> Result<()> {
405        let notif = JsonRpcNotification {
406            method: method.to_string(),
407            params: None,
408        };
409        self.send_raw(&notif).await
410    }
411
412    async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
413        let json = serde_json::to_string(msg).map_err(Error::Json)?;
414        debug!("[CLIENT] Sending: {}", json);
415        self.writer
416            .write_all(json.as_bytes())
417            .await
418            .map_err(Error::Io)?;
419        self.writer.write_all(b"\n").await.map_err(Error::Io)?;
420        self.writer.flush().await.map_err(Error::Io)?;
421        Ok(())
422    }
423
424    async fn read_message(&mut self) -> Result<JsonRpcMessage> {
425        self.read_message_opt().await?.ok_or(Error::ServerClosed)
426    }
427
428    async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
429        let mut line = String::new();
430
431        loop {
432            line.clear();
433            let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
434
435            if bytes_read == 0 {
436                debug!("[CLIENT] Stream closed (EOF)");
437                return Ok(None);
438            }
439
440            let trimmed = line.trim();
441            if trimmed.is_empty() {
442                continue;
443            }
444
445            debug!("[CLIENT] Received: {}", trimmed);
446
447            match serde_json::from_str::<JsonRpcMessage>(trimmed) {
448                Ok(msg) => return Ok(Some(msg)),
449                Err(e) => {
450                    warn!(
451                        "[CLIENT] Failed to deserialize message. \
452                         Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
453                    );
454                    warn!("[CLIENT] Parse error: {}", e);
455                    warn!("[CLIENT] Raw: {}", trimmed);
456                    return Err(Error::Deserialization(format!("{} (raw: {})", e, trimmed)));
457                }
458            }
459        }
460    }
461}
462
463impl Drop for AsyncClient {
464    fn drop(&mut self) {
465        if self.is_alive() {
466            if let Err(e) = self.child.start_kill() {
467                error!("Failed to kill app-server process on drop: {}", e);
468            }
469        }
470    }
471}
472
473/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
474pub struct EventStream<'a> {
475    client: &'a mut AsyncClient,
476}
477
478impl EventStream<'_> {
479    /// Get the next server message.
480    pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
481        match self.client.next_message().await {
482            Ok(Some(msg)) => Some(Ok(msg)),
483            Ok(None) => None,
484            Err(e) => Some(Err(e)),
485        }
486    }
487
488    /// Collect all remaining messages.
489    pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
490        let mut msgs = Vec::new();
491        while let Some(result) = self.next().await {
492            msgs.push(result?);
493        }
494        Ok(msgs)
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    #[test]
503    fn test_buffer_size() {
504        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
505    }
506}