codex_codes/client_async.rs
1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns and initializes the app-server)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//! thread_id: thread.thread_id().to_string(),
27//! input: vec![UserInput::Text { text: "Hello!".into() }],
28//! model: None,
29//! reasoning_effort: None,
30//! sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//! match msg {
35//! ServerMessage::Notification(n) => {
36//! if let codex_codes::Notification::TurnCompleted(_) = n { break; }
37//! }
38//! ServerMessage::Request { id, .. } => {
39//! client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//! }
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52 ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53 ThreadStartParams, ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse,
54 TurnStartParams, TurnStartResponse,
55};
56use log::{debug, error, warn};
57use serde::de::DeserializeOwned;
58use serde::Serialize;
59use std::collections::VecDeque;
60use std::sync::atomic::{AtomicI64, Ordering};
61use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
62use tokio::process::Child;
63
64/// Buffer size for reading stdout (10MB).
65const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
66
67/// Asynchronous multi-turn client for the Codex app-server.
68///
69/// Communicates with a long-lived `codex app-server` process via
70/// newline-delimited JSON-RPC over stdio. Manages request/response
71/// correlation and buffers incoming notifications that arrive while
72/// waiting for RPC responses.
73///
74/// The client automatically kills the app-server process when dropped.
75pub struct AsyncClient {
76 child: Child,
77 writer: BufWriter<tokio::process::ChildStdin>,
78 reader: BufReader<tokio::process::ChildStdout>,
79 /// Handle to the background task draining the child's stderr pipe.
80 /// Kept alive for the lifetime of the client; the task exits on EOF
81 /// when the child is killed.
82 _stderr_drain: tokio::task::JoinHandle<()>,
83 next_id: AtomicI64,
84 /// Buffered incoming messages (notifications/server requests) that arrived
85 /// while waiting for a response to a client request.
86 buffered: VecDeque<ServerMessage>,
87}
88
89impl AsyncClient {
90 /// Start an app-server with default settings.
91 ///
92 /// Spawns `codex app-server --listen stdio://`, performs the required
93 /// `initialize` handshake, and returns a connected client ready for
94 /// `thread_start()`.
95 ///
96 /// # Errors
97 ///
98 /// Returns an error if the `codex` CLI is not installed, the version is
99 /// incompatible, the process fails to start, or the initialization
100 /// handshake fails.
101 pub async fn start() -> Result<Self> {
102 Self::start_with(AppServerBuilder::new()).await
103 }
104
105 /// Start an app-server with a custom [`AppServerBuilder`].
106 ///
107 /// Performs the required `initialize` handshake before returning.
108 /// Use this to configure a custom binary path or working directory.
109 ///
110 /// # Errors
111 ///
112 /// Returns an error if the process fails to start, stdio pipes
113 /// cannot be established, or the initialization handshake fails.
114 pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
115 let mut client = Self::spawn(builder).await?;
116 client
117 .initialize(&InitializeParams {
118 client_info: ClientInfo {
119 name: "codex-codes".to_string(),
120 version: env!("CARGO_PKG_VERSION").to_string(),
121 title: None,
122 },
123 capabilities: None,
124 })
125 .await?;
126 Ok(client)
127 }
128
129 /// Spawn an app-server without performing the `initialize` handshake.
130 ///
131 /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
132 /// specific capabilities). You **must** call [`AsyncClient::initialize`]
133 /// before any other requests.
134 pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
135 crate::version::check_codex_version_async().await?;
136
137 let mut child = builder.spawn().await?;
138
139 let stdin = child
140 .stdin
141 .take()
142 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
143 let stdout = child
144 .stdout
145 .take()
146 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
147 let stderr = child
148 .stderr
149 .take()
150 .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
151
152 // The app-server emits ~200 KB/s of tracing to stderr. Without an
153 // active reader, the ~64 KB kernel pipe fills almost instantly and
154 // the child blocks. Drain in the background and route lines through
155 // the `log` crate (see [`crate::stderr_drain`]).
156 let stderr_drain = crate::stderr_drain::spawn_async(stderr);
157
158 Ok(Self {
159 child,
160 writer: BufWriter::new(stdin),
161 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
162 _stderr_drain: stderr_drain,
163 next_id: AtomicI64::new(1),
164 buffered: VecDeque::new(),
165 })
166 }
167
168 /// Send a JSON-RPC request and wait for the matching response.
169 ///
170 /// Any notifications or server requests that arrive before the response
171 /// are buffered and can be retrieved via [`AsyncClient::next_message`].
172 ///
173 /// # Errors
174 ///
175 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
176 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
177 /// - [`Error::Json`] if response deserialization fails
178 pub async fn request<P: Serialize, R: DeserializeOwned>(
179 &mut self,
180 method: &str,
181 params: &P,
182 ) -> Result<R> {
183 let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
184
185 let req = JsonRpcRequest {
186 id: id.clone(),
187 method: method.to_string(),
188 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
189 };
190
191 self.send_raw(&req).await?;
192
193 // Read lines until we get a response matching our id
194 loop {
195 let msg = self.read_message().await?;
196 match msg {
197 JsonRpcMessage::Response(resp) if resp.id == id => {
198 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
199 return Ok(result);
200 }
201 JsonRpcMessage::Error(err) if err.id == id => {
202 return Err(Error::JsonRpc {
203 code: err.error.code,
204 message: err.error.message,
205 });
206 }
207 // Buffer notifications and server requests
208 JsonRpcMessage::Notification(notif) => {
209 let typed = Notification::from_envelope(¬if.method, notif.params)
210 .map_err(Error::Json)?;
211 self.buffered.push_back(ServerMessage::Notification(typed));
212 }
213 JsonRpcMessage::Request(req) => {
214 let typed = ServerRequest::from_envelope(&req.method, req.params)
215 .map_err(Error::Json)?;
216 self.buffered.push_back(ServerMessage::Request {
217 id: req.id,
218 request: typed,
219 });
220 }
221 // Response/error for a different id — unexpected
222 JsonRpcMessage::Response(resp) => {
223 warn!(
224 "[CLIENT] Unexpected response for id={}, expected id={}",
225 resp.id, id
226 );
227 }
228 JsonRpcMessage::Error(err) => {
229 warn!(
230 "[CLIENT] Unexpected error for id={}, expected id={}",
231 err.id, id
232 );
233 }
234 }
235 }
236 }
237
238 /// Start a new thread (conversation session).
239 ///
240 /// A thread must be created before any turns can be started. The returned
241 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
242 pub async fn thread_start(
243 &mut self,
244 params: &ThreadStartParams,
245 ) -> Result<ThreadStartResponse> {
246 self.request(crate::protocol::methods::THREAD_START, params)
247 .await
248 }
249
250 /// Start a new turn within a thread.
251 ///
252 /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
253 /// to stream notifications until `turn/completed` arrives.
254 pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
255 self.request(crate::protocol::methods::TURN_START, params)
256 .await
257 }
258
259 /// Interrupt an active turn.
260 pub async fn turn_interrupt(
261 &mut self,
262 params: &TurnInterruptParams,
263 ) -> Result<TurnInterruptResponse> {
264 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
265 .await
266 }
267
268 /// Archive a thread.
269 pub async fn thread_archive(
270 &mut self,
271 params: &ThreadArchiveParams,
272 ) -> Result<ThreadArchiveResponse> {
273 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
274 .await
275 }
276
277 /// Perform the `initialize` handshake with the app-server.
278 ///
279 /// Sends `initialize` with the given params and then sends the
280 /// `initialized` notification. This must be the first request after
281 /// spawning the process.
282 pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
283 let resp: InitializeResponse = self
284 .request(crate::protocol::methods::INITIALIZE, params)
285 .await?;
286 self.send_notification(crate::protocol::methods::INITIALIZED)
287 .await?;
288 Ok(resp)
289 }
290
291 /// Respond to a server-to-client request (e.g., approval flow).
292 ///
293 /// When the server sends a [`ServerMessage::Request`], it expects a response.
294 /// Use this method with the request's `id` and a result payload. For command
295 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
296 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
297 pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
298 let resp = JsonRpcResponse {
299 id,
300 result: serde_json::to_value(result).map_err(Error::Json)?,
301 };
302 self.send_raw(&resp).await
303 }
304
305 /// Respond to a server-to-client request with an error.
306 pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
307 let err = JsonRpcError {
308 id,
309 error: crate::jsonrpc::JsonRpcErrorData {
310 code,
311 message: message.to_string(),
312 data: None,
313 },
314 };
315 self.send_raw(&err).await
316 }
317
318 /// Read the next incoming server message (notification or server request).
319 ///
320 /// Returns buffered messages first (from notifications that arrived during
321 /// an [`AsyncClient::request`] call), then reads from the wire.
322 ///
323 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
324 ///
325 /// # Typical notification methods
326 ///
327 /// | Method | Meaning |
328 /// |--------|---------|
329 /// | `turn/started` | Agent began processing |
330 /// | `item/agentMessage/delta` | Streaming text chunk |
331 /// | `item/commandExecution/outputDelta` | Command output chunk |
332 /// | `item/started` / `item/completed` | Item lifecycle |
333 /// | `turn/completed` | Agent finished the turn |
334 /// | `error` | Server-side error |
335 pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
336 // Drain buffered messages first
337 if let Some(msg) = self.buffered.pop_front() {
338 return Ok(Some(msg));
339 }
340
341 // Read from the wire
342 loop {
343 let msg = match self.read_message_opt().await? {
344 Some(m) => m,
345 None => return Ok(None),
346 };
347
348 match msg {
349 JsonRpcMessage::Notification(notif) => {
350 let typed = Notification::from_envelope(¬if.method, notif.params)
351 .map_err(Error::Json)?;
352 return Ok(Some(ServerMessage::Notification(typed)));
353 }
354 JsonRpcMessage::Request(req) => {
355 let typed = ServerRequest::from_envelope(&req.method, req.params)
356 .map_err(Error::Json)?;
357 return Ok(Some(ServerMessage::Request {
358 id: req.id,
359 request: typed,
360 }));
361 }
362 // Unexpected responses without a pending request
363 JsonRpcMessage::Response(resp) => {
364 warn!(
365 "[CLIENT] Unexpected response (no pending request): id={}",
366 resp.id
367 );
368 }
369 JsonRpcMessage::Error(err) => {
370 warn!(
371 "[CLIENT] Unexpected error (no pending request): id={} code={}",
372 err.id, err.error.code
373 );
374 }
375 }
376 }
377 }
378
379 /// Return an async event stream over [`ServerMessage`]s.
380 ///
381 /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
382 /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
383 /// gather all messages until EOF.
384 pub fn events(&mut self) -> EventStream<'_> {
385 EventStream { client: self }
386 }
387
388 /// Get the process ID.
389 pub fn pid(&self) -> Option<u32> {
390 self.child.id()
391 }
392
393 /// Check if the child process is still running.
394 pub fn is_alive(&mut self) -> bool {
395 self.child.try_wait().ok().flatten().is_none()
396 }
397
398 /// Shut down the app-server process.
399 ///
400 /// Consumes the client. If you don't call this explicitly, the
401 /// [`Drop`] implementation will kill the process automatically.
402 pub async fn shutdown(mut self) -> Result<()> {
403 debug!("[CLIENT] Shutting down");
404 self.child.kill().await.map_err(Error::Io)?;
405 Ok(())
406 }
407
408 // -- internal --
409
410 async fn send_notification(&mut self, method: &str) -> Result<()> {
411 let notif = JsonRpcNotification {
412 method: method.to_string(),
413 params: None,
414 };
415 self.send_raw(¬if).await
416 }
417
418 async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
419 let json = serde_json::to_string(msg).map_err(Error::Json)?;
420 debug!("[CLIENT] Sending: {}", json);
421 self.writer
422 .write_all(json.as_bytes())
423 .await
424 .map_err(Error::Io)?;
425 self.writer.write_all(b"\n").await.map_err(Error::Io)?;
426 self.writer.flush().await.map_err(Error::Io)?;
427 Ok(())
428 }
429
430 async fn read_message(&mut self) -> Result<JsonRpcMessage> {
431 self.read_message_opt().await?.ok_or(Error::ServerClosed)
432 }
433
434 async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
435 let mut line = String::new();
436
437 loop {
438 line.clear();
439 let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
440
441 if bytes_read == 0 {
442 debug!("[CLIENT] Stream closed (EOF)");
443 return Ok(None);
444 }
445
446 let trimmed = line.trim();
447 if trimmed.is_empty() {
448 continue;
449 }
450
451 debug!("[CLIENT] Received: {}", trimmed);
452
453 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
454 Ok(msg) => return Ok(Some(msg)),
455 Err(e) => {
456 warn!(
457 "[CLIENT] Failed to deserialize message. \
458 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
459 );
460 warn!("[CLIENT] Parse error: {}", e);
461 warn!("[CLIENT] Raw: {}", trimmed);
462 return Err(Error::Deserialization(format!("{} (raw: {})", e, trimmed)));
463 }
464 }
465 }
466 }
467}
468
469impl Drop for AsyncClient {
470 fn drop(&mut self) {
471 if self.is_alive() {
472 if let Err(e) = self.child.start_kill() {
473 error!("Failed to kill app-server process on drop: {}", e);
474 }
475 }
476 }
477}
478
479/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
480pub struct EventStream<'a> {
481 client: &'a mut AsyncClient,
482}
483
484impl EventStream<'_> {
485 /// Get the next server message.
486 pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
487 match self.client.next_message().await {
488 Ok(Some(msg)) => Some(Ok(msg)),
489 Ok(None) => None,
490 Err(e) => Some(Err(e)),
491 }
492 }
493
494 /// Collect all remaining messages.
495 pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
496 let mut msgs = Vec::new();
497 while let Some(result) = self.next().await {
498 msgs.push(result?);
499 }
500 Ok(msgs)
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507
508 #[test]
509 fn test_buffer_size() {
510 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
511 }
512}