codex_codes/client_async.rs
1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns and initializes the app-server)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//! thread_id: thread.thread_id().to_string(),
27//! input: vec![UserInput::Text { text: "Hello!".into() }],
28//! model: None,
29//! reasoning_effort: None,
30//! sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//! match msg {
35//! ServerMessage::Notification(n) => {
36//! if let codex_codes::Notification::TurnCompleted(_) = n { break; }
37//! }
38//! ServerMessage::Request { id, .. } => {
39//! client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//! }
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52 ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53 ThreadForkParams, ThreadForkResponse, ThreadResumeParams, ThreadResumeResponse,
54 ThreadStartParams, ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse,
55 TurnStartParams, TurnStartResponse,
56};
57use log::{debug, error, warn};
58use serde::de::DeserializeOwned;
59use serde::Serialize;
60use std::collections::VecDeque;
61use std::sync::atomic::{AtomicI64, Ordering};
62use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
63use tokio::process::Child;
64
65/// Buffer size for reading stdout (10MB).
66const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
67
68/// Asynchronous multi-turn client for the Codex app-server.
69///
70/// Communicates with a long-lived `codex app-server` process via
71/// newline-delimited JSON-RPC over stdio. Manages request/response
72/// correlation and buffers incoming notifications that arrive while
73/// waiting for RPC responses.
74///
75/// The client automatically kills the app-server process when dropped.
76pub struct AsyncClient {
77 child: Child,
78 writer: BufWriter<tokio::process::ChildStdin>,
79 reader: BufReader<tokio::process::ChildStdout>,
80 /// Handle to the background task draining the child's stderr pipe.
81 /// Kept alive for the lifetime of the client; the task exits on EOF
82 /// when the child is killed.
83 _stderr_drain: tokio::task::JoinHandle<()>,
84 next_id: AtomicI64,
85 /// Buffered incoming messages (notifications/server requests) that arrived
86 /// while waiting for a response to a client request.
87 buffered: VecDeque<ServerMessage>,
88}
89
90impl AsyncClient {
91 /// Start an app-server with default settings.
92 ///
93 /// Spawns `codex app-server --listen stdio://`, performs the required
94 /// `initialize` handshake, and returns a connected client ready for
95 /// `thread_start()`.
96 ///
97 /// # Errors
98 ///
99 /// Returns an error if the `codex` CLI is not installed, the version is
100 /// incompatible, the process fails to start, or the initialization
101 /// handshake fails.
102 pub async fn start() -> Result<Self> {
103 Self::start_with(AppServerBuilder::new()).await
104 }
105
106 /// Start an app-server with a custom [`AppServerBuilder`].
107 ///
108 /// Performs the required `initialize` handshake before returning.
109 /// Use this to configure a custom binary path or working directory.
110 ///
111 /// # Errors
112 ///
113 /// Returns an error if the process fails to start, stdio pipes
114 /// cannot be established, or the initialization handshake fails.
115 pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
116 let mut client = Self::spawn(builder).await?;
117 client
118 .initialize(&InitializeParams {
119 client_info: ClientInfo {
120 name: "codex-codes".to_string(),
121 version: env!("CARGO_PKG_VERSION").to_string(),
122 title: None,
123 },
124 capabilities: None,
125 })
126 .await?;
127 Ok(client)
128 }
129
130 /// Spawn an app-server without performing the `initialize` handshake.
131 ///
132 /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
133 /// specific capabilities). You **must** call [`AsyncClient::initialize`]
134 /// before any other requests.
135 pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
136 crate::version::check_codex_version_async().await?;
137
138 let mut child = builder.spawn().await?;
139
140 let stdin = child
141 .stdin
142 .take()
143 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
144 let stdout = child
145 .stdout
146 .take()
147 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
148 let stderr = child
149 .stderr
150 .take()
151 .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
152
153 // The app-server emits ~200 KB/s of tracing to stderr. Without an
154 // active reader, the ~64 KB kernel pipe fills almost instantly and
155 // the child blocks. Drain in the background and route lines through
156 // the `log` crate (see [`crate::stderr_drain`]).
157 let stderr_drain = crate::stderr_drain::spawn_async(stderr);
158
159 Ok(Self {
160 child,
161 writer: BufWriter::new(stdin),
162 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
163 _stderr_drain: stderr_drain,
164 next_id: AtomicI64::new(1),
165 buffered: VecDeque::new(),
166 })
167 }
168
169 /// Send a JSON-RPC request and wait for the matching response.
170 ///
171 /// Any notifications or server requests that arrive before the response
172 /// are buffered and can be retrieved via [`AsyncClient::next_message`].
173 ///
174 /// # Errors
175 ///
176 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
177 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
178 /// - [`Error::Json`] if response deserialization fails
179 pub async fn request<P: Serialize, R: DeserializeOwned>(
180 &mut self,
181 method: &str,
182 params: &P,
183 ) -> Result<R> {
184 let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
185
186 let req = JsonRpcRequest {
187 id: id.clone(),
188 method: method.to_string(),
189 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
190 };
191
192 self.send_raw(&req).await?;
193
194 // Read lines until we get a response matching our id
195 loop {
196 let msg = self.read_message().await?;
197 match msg {
198 JsonRpcMessage::Response(resp) if resp.id == id => {
199 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
200 return Ok(result);
201 }
202 JsonRpcMessage::Error(err) if err.id == id => {
203 return Err(Error::JsonRpc {
204 code: err.error.code,
205 message: err.error.message,
206 });
207 }
208 // Buffer notifications and server requests
209 JsonRpcMessage::Notification(notif) => {
210 let typed = Notification::from_envelope(¬if.method, notif.params)
211 .map_err(Error::Json)?;
212 self.buffered.push_back(ServerMessage::Notification(typed));
213 }
214 JsonRpcMessage::Request(req) => {
215 let typed = ServerRequest::from_envelope(&req.method, req.params)
216 .map_err(Error::Json)?;
217 self.buffered.push_back(ServerMessage::Request {
218 id: req.id,
219 request: typed,
220 });
221 }
222 // Response/error for a different id — unexpected
223 JsonRpcMessage::Response(resp) => {
224 warn!(
225 "[CLIENT] Unexpected response for id={}, expected id={}",
226 resp.id, id
227 );
228 }
229 JsonRpcMessage::Error(err) => {
230 warn!(
231 "[CLIENT] Unexpected error for id={}, expected id={}",
232 err.id, id
233 );
234 }
235 }
236 }
237 }
238
239 /// Start a new thread (conversation session).
240 ///
241 /// A thread must be created before any turns can be started. The returned
242 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
243 pub async fn thread_start(
244 &mut self,
245 params: &ThreadStartParams,
246 ) -> Result<ThreadStartResponse> {
247 self.request(crate::protocol::methods::THREAD_START, params)
248 .await
249 }
250
251 /// Resume a previously persisted thread by id.
252 ///
253 /// Replays the thread's history so turns can continue where they left off.
254 pub async fn thread_resume(
255 &mut self,
256 params: &ThreadResumeParams,
257 ) -> Result<ThreadResumeResponse> {
258 self.request(crate::protocol::methods::THREAD_RESUME, params)
259 .await
260 }
261
262 /// Fork an existing thread into a new independent thread.
263 pub async fn thread_fork(&mut self, params: &ThreadForkParams) -> Result<ThreadForkResponse> {
264 self.request(crate::protocol::methods::THREAD_FORK, params)
265 .await
266 }
267
268 /// Start a new turn within a thread.
269 ///
270 /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
271 /// to stream notifications until `turn/completed` arrives.
272 pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
273 self.request(crate::protocol::methods::TURN_START, params)
274 .await
275 }
276
277 /// Interrupt an active turn.
278 pub async fn turn_interrupt(
279 &mut self,
280 params: &TurnInterruptParams,
281 ) -> Result<TurnInterruptResponse> {
282 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
283 .await
284 }
285
286 /// Archive a thread.
287 pub async fn thread_archive(
288 &mut self,
289 params: &ThreadArchiveParams,
290 ) -> Result<ThreadArchiveResponse> {
291 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
292 .await
293 }
294
295 /// Perform the `initialize` handshake with the app-server.
296 ///
297 /// Sends `initialize` with the given params and then sends the
298 /// `initialized` notification. This must be the first request after
299 /// spawning the process.
300 pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
301 let resp: InitializeResponse = self
302 .request(crate::protocol::methods::INITIALIZE, params)
303 .await?;
304 self.send_notification(crate::protocol::methods::INITIALIZED)
305 .await?;
306 Ok(resp)
307 }
308
309 /// Respond to a server-to-client request (e.g., approval flow).
310 ///
311 /// When the server sends a [`ServerMessage::Request`], it expects a response.
312 /// Use this method with the request's `id` and a result payload. For command
313 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
314 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
315 pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
316 let resp = JsonRpcResponse {
317 id,
318 result: serde_json::to_value(result).map_err(Error::Json)?,
319 };
320 self.send_raw(&resp).await
321 }
322
323 /// Respond to a server-to-client request with an error.
324 pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
325 let err = JsonRpcError {
326 id,
327 error: crate::jsonrpc::JsonRpcErrorData {
328 code,
329 message: message.to_string(),
330 data: None,
331 },
332 };
333 self.send_raw(&err).await
334 }
335
336 /// Read the next incoming server message (notification or server request).
337 ///
338 /// Returns buffered messages first (from notifications that arrived during
339 /// an [`AsyncClient::request`] call), then reads from the wire.
340 ///
341 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
342 ///
343 /// # Typical notification methods
344 ///
345 /// | Method | Meaning |
346 /// |--------|---------|
347 /// | `turn/started` | Agent began processing |
348 /// | `item/agentMessage/delta` | Streaming text chunk |
349 /// | `item/commandExecution/outputDelta` | Command output chunk |
350 /// | `item/started` / `item/completed` | Item lifecycle |
351 /// | `turn/completed` | Agent finished the turn |
352 /// | `error` | Server-side error |
353 pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
354 // Drain buffered messages first
355 if let Some(msg) = self.buffered.pop_front() {
356 return Ok(Some(msg));
357 }
358
359 // Read from the wire
360 loop {
361 let msg = match self.read_message_opt().await? {
362 Some(m) => m,
363 None => return Ok(None),
364 };
365
366 match msg {
367 JsonRpcMessage::Notification(notif) => {
368 let JsonRpcNotification { method, params } = notif;
369 let typed =
370 Notification::from_envelope(&method, params.clone()).map_err(|e| {
371 Error::Deserialization(ParseError::from_envelope(method, params, e))
372 })?;
373 return Ok(Some(ServerMessage::Notification(typed)));
374 }
375 JsonRpcMessage::Request(req) => {
376 let JsonRpcRequest { id, method, params } = req;
377 let typed =
378 ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
379 Error::Deserialization(ParseError::from_envelope(method, params, e))
380 })?;
381 return Ok(Some(ServerMessage::Request { id, request: typed }));
382 }
383 // Unexpected responses without a pending request
384 JsonRpcMessage::Response(resp) => {
385 warn!(
386 "[CLIENT] Unexpected response (no pending request): id={}",
387 resp.id
388 );
389 }
390 JsonRpcMessage::Error(err) => {
391 warn!(
392 "[CLIENT] Unexpected error (no pending request): id={} code={}",
393 err.id, err.error.code
394 );
395 }
396 }
397 }
398 }
399
400 /// Return an async event stream over [`ServerMessage`]s.
401 ///
402 /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
403 /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
404 /// gather all messages until EOF.
405 pub fn events(&mut self) -> EventStream<'_> {
406 EventStream { client: self }
407 }
408
409 /// Get the process ID.
410 pub fn pid(&self) -> Option<u32> {
411 self.child.id()
412 }
413
414 /// Check if the child process is still running.
415 pub fn is_alive(&mut self) -> bool {
416 self.child.try_wait().ok().flatten().is_none()
417 }
418
419 /// Shut down the app-server process.
420 ///
421 /// Consumes the client. If you don't call this explicitly, the
422 /// [`Drop`] implementation will kill the process automatically.
423 pub async fn shutdown(mut self) -> Result<()> {
424 debug!("[CLIENT] Shutting down");
425 self.child.kill().await.map_err(Error::Io)?;
426 Ok(())
427 }
428
429 // -- internal --
430
431 async fn send_notification(&mut self, method: &str) -> Result<()> {
432 let notif = JsonRpcNotification {
433 method: method.to_string(),
434 params: None,
435 };
436 self.send_raw(¬if).await
437 }
438
439 async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
440 let json = serde_json::to_string(msg).map_err(Error::Json)?;
441 debug!("[CLIENT] Sending: {}", json);
442 self.writer
443 .write_all(json.as_bytes())
444 .await
445 .map_err(Error::Io)?;
446 self.writer.write_all(b"\n").await.map_err(Error::Io)?;
447 self.writer.flush().await.map_err(Error::Io)?;
448 Ok(())
449 }
450
451 async fn read_message(&mut self) -> Result<JsonRpcMessage> {
452 self.read_message_opt().await?.ok_or(Error::ServerClosed)
453 }
454
455 async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
456 let mut line = String::new();
457
458 loop {
459 line.clear();
460 let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
461
462 if bytes_read == 0 {
463 debug!("[CLIENT] Stream closed (EOF)");
464 return Ok(None);
465 }
466
467 let trimmed = line.trim();
468 if trimmed.is_empty() {
469 continue;
470 }
471
472 debug!("[CLIENT] Received: {}", trimmed);
473
474 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
475 Ok(msg) => return Ok(Some(msg)),
476 Err(e) => {
477 warn!(
478 "[CLIENT] Failed to deserialize message. \
479 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
480 );
481 warn!("[CLIENT] Parse error: {}", e);
482 warn!("[CLIENT] Raw: {}", trimmed);
483 return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
484 }
485 }
486 }
487 }
488}
489
490impl Drop for AsyncClient {
491 fn drop(&mut self) {
492 if self.is_alive() {
493 if let Err(e) = self.child.start_kill() {
494 error!("Failed to kill app-server process on drop: {}", e);
495 }
496 }
497 }
498}
499
500/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
501pub struct EventStream<'a> {
502 client: &'a mut AsyncClient,
503}
504
505impl EventStream<'_> {
506 /// Get the next server message.
507 pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
508 match self.client.next_message().await {
509 Ok(Some(msg)) => Some(Ok(msg)),
510 Ok(None) => None,
511 Err(e) => Some(Err(e)),
512 }
513 }
514
515 /// Collect all remaining messages.
516 pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
517 let mut msgs = Vec::new();
518 while let Some(result) = self.next().await {
519 msgs.push(result?);
520 }
521 Ok(msgs)
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528
529 #[test]
530 fn test_buffer_size() {
531 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
532 }
533}