codex_codes/client_async.rs
1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns and initializes the app-server)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//! thread_id: thread.thread_id().to_string(),
27//! input: vec![UserInput::Text { text: "Hello!".into() }],
28//! model: None,
29//! reasoning_effort: None,
30//! sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//! match msg {
35//! ServerMessage::Notification { method, params } => {
36//! if method == "turn/completed" { break; }
37//! }
38//! ServerMessage::Request { id, method, .. } => {
39//! client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//! }
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::protocol::{
51 ClientInfo, InitializeParams, InitializeResponse, ServerMessage, ThreadArchiveParams,
52 ThreadArchiveResponse, ThreadStartParams, ThreadStartResponse, TurnInterruptParams,
53 TurnInterruptResponse, TurnStartParams, TurnStartResponse,
54};
55use log::{debug, error, warn};
56use serde::de::DeserializeOwned;
57use serde::Serialize;
58use std::collections::VecDeque;
59use std::sync::atomic::{AtomicI64, Ordering};
60use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
61use tokio::process::{Child, ChildStderr};
62
63/// Buffer size for reading stdout (10MB).
64const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
65
66/// Asynchronous multi-turn client for the Codex app-server.
67///
68/// Communicates with a long-lived `codex app-server` process via
69/// newline-delimited JSON-RPC over stdio. Manages request/response
70/// correlation and buffers incoming notifications that arrive while
71/// waiting for RPC responses.
72///
73/// The client automatically kills the app-server process when dropped.
74pub struct AsyncClient {
75 child: Child,
76 writer: BufWriter<tokio::process::ChildStdin>,
77 reader: BufReader<tokio::process::ChildStdout>,
78 stderr: Option<BufReader<ChildStderr>>,
79 next_id: AtomicI64,
80 /// Buffered incoming messages (notifications/server requests) that arrived
81 /// while waiting for a response to a client request.
82 buffered: VecDeque<ServerMessage>,
83}
84
85impl AsyncClient {
86 /// Start an app-server with default settings.
87 ///
88 /// Spawns `codex app-server --listen stdio://`, performs the required
89 /// `initialize` handshake, and returns a connected client ready for
90 /// `thread_start()`.
91 ///
92 /// # Errors
93 ///
94 /// Returns an error if the `codex` CLI is not installed, the version is
95 /// incompatible, the process fails to start, or the initialization
96 /// handshake fails.
97 pub async fn start() -> Result<Self> {
98 Self::start_with(AppServerBuilder::new()).await
99 }
100
101 /// Start an app-server with a custom [`AppServerBuilder`].
102 ///
103 /// Performs the required `initialize` handshake before returning.
104 /// Use this to configure a custom binary path or working directory.
105 ///
106 /// # Errors
107 ///
108 /// Returns an error if the process fails to start, stdio pipes
109 /// cannot be established, or the initialization handshake fails.
110 pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
111 let mut client = Self::spawn(builder).await?;
112 client
113 .initialize(&InitializeParams {
114 client_info: ClientInfo {
115 name: "codex-codes".to_string(),
116 version: env!("CARGO_PKG_VERSION").to_string(),
117 title: None,
118 },
119 capabilities: None,
120 })
121 .await?;
122 Ok(client)
123 }
124
125 /// Spawn an app-server without performing the `initialize` handshake.
126 ///
127 /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
128 /// specific capabilities). You **must** call [`AsyncClient::initialize`]
129 /// before any other requests.
130 pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
131 crate::version::check_codex_version_async().await?;
132
133 let mut child = builder.spawn().await?;
134
135 let stdin = child
136 .stdin
137 .take()
138 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
139 let stdout = child
140 .stdout
141 .take()
142 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
143 let stderr = child.stderr.take().map(BufReader::new);
144
145 Ok(Self {
146 child,
147 writer: BufWriter::new(stdin),
148 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
149 stderr,
150 next_id: AtomicI64::new(1),
151 buffered: VecDeque::new(),
152 })
153 }
154
155 /// Send a JSON-RPC request and wait for the matching response.
156 ///
157 /// Any notifications or server requests that arrive before the response
158 /// are buffered and can be retrieved via [`AsyncClient::next_message`].
159 ///
160 /// # Errors
161 ///
162 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
163 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
164 /// - [`Error::Json`] if response deserialization fails
165 pub async fn request<P: Serialize, R: DeserializeOwned>(
166 &mut self,
167 method: &str,
168 params: &P,
169 ) -> Result<R> {
170 let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
171
172 let req = JsonRpcRequest {
173 id: id.clone(),
174 method: method.to_string(),
175 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
176 };
177
178 self.send_raw(&req).await?;
179
180 // Read lines until we get a response matching our id
181 loop {
182 let msg = self.read_message().await?;
183 match msg {
184 JsonRpcMessage::Response(resp) if resp.id == id => {
185 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
186 return Ok(result);
187 }
188 JsonRpcMessage::Error(err) if err.id == id => {
189 return Err(Error::JsonRpc {
190 code: err.error.code,
191 message: err.error.message,
192 });
193 }
194 // Buffer notifications and server requests
195 JsonRpcMessage::Notification(notif) => {
196 self.buffered.push_back(ServerMessage::Notification {
197 method: notif.method,
198 params: notif.params,
199 });
200 }
201 JsonRpcMessage::Request(req) => {
202 self.buffered.push_back(ServerMessage::Request {
203 id: req.id,
204 method: req.method,
205 params: req.params,
206 });
207 }
208 // Response/error for a different id — unexpected
209 JsonRpcMessage::Response(resp) => {
210 warn!(
211 "[CLIENT] Unexpected response for id={}, expected id={}",
212 resp.id, id
213 );
214 }
215 JsonRpcMessage::Error(err) => {
216 warn!(
217 "[CLIENT] Unexpected error for id={}, expected id={}",
218 err.id, id
219 );
220 }
221 }
222 }
223 }
224
225 /// Start a new thread (conversation session).
226 ///
227 /// A thread must be created before any turns can be started. The returned
228 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
229 pub async fn thread_start(
230 &mut self,
231 params: &ThreadStartParams,
232 ) -> Result<ThreadStartResponse> {
233 self.request(crate::protocol::methods::THREAD_START, params)
234 .await
235 }
236
237 /// Start a new turn within a thread.
238 ///
239 /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
240 /// to stream notifications until `turn/completed` arrives.
241 pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
242 self.request(crate::protocol::methods::TURN_START, params)
243 .await
244 }
245
246 /// Interrupt an active turn.
247 pub async fn turn_interrupt(
248 &mut self,
249 params: &TurnInterruptParams,
250 ) -> Result<TurnInterruptResponse> {
251 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
252 .await
253 }
254
255 /// Archive a thread.
256 pub async fn thread_archive(
257 &mut self,
258 params: &ThreadArchiveParams,
259 ) -> Result<ThreadArchiveResponse> {
260 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
261 .await
262 }
263
264 /// Perform the `initialize` handshake with the app-server.
265 ///
266 /// Sends `initialize` with the given params and then sends the
267 /// `initialized` notification. This must be the first request after
268 /// spawning the process.
269 pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
270 let resp: InitializeResponse = self
271 .request(crate::protocol::methods::INITIALIZE, params)
272 .await?;
273 self.send_notification(crate::protocol::methods::INITIALIZED)
274 .await?;
275 Ok(resp)
276 }
277
278 /// Respond to a server-to-client request (e.g., approval flow).
279 ///
280 /// When the server sends a [`ServerMessage::Request`], it expects a response.
281 /// Use this method with the request's `id` and a result payload. For command
282 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
283 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
284 pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
285 let resp = JsonRpcResponse {
286 id,
287 result: serde_json::to_value(result).map_err(Error::Json)?,
288 };
289 self.send_raw(&resp).await
290 }
291
292 /// Respond to a server-to-client request with an error.
293 pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
294 let err = JsonRpcError {
295 id,
296 error: crate::jsonrpc::JsonRpcErrorData {
297 code,
298 message: message.to_string(),
299 data: None,
300 },
301 };
302 self.send_raw(&err).await
303 }
304
305 /// Read the next incoming server message (notification or server request).
306 ///
307 /// Returns buffered messages first (from notifications that arrived during
308 /// an [`AsyncClient::request`] call), then reads from the wire.
309 ///
310 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
311 ///
312 /// # Typical notification methods
313 ///
314 /// | Method | Meaning |
315 /// |--------|---------|
316 /// | `turn/started` | Agent began processing |
317 /// | `item/agentMessage/delta` | Streaming text chunk |
318 /// | `item/commandExecution/outputDelta` | Command output chunk |
319 /// | `item/started` / `item/completed` | Item lifecycle |
320 /// | `turn/completed` | Agent finished the turn |
321 /// | `error` | Server-side error |
322 pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
323 // Drain buffered messages first
324 if let Some(msg) = self.buffered.pop_front() {
325 return Ok(Some(msg));
326 }
327
328 // Read from the wire
329 loop {
330 let msg = match self.read_message_opt().await? {
331 Some(m) => m,
332 None => return Ok(None),
333 };
334
335 match msg {
336 JsonRpcMessage::Notification(notif) => {
337 return Ok(Some(ServerMessage::Notification {
338 method: notif.method,
339 params: notif.params,
340 }));
341 }
342 JsonRpcMessage::Request(req) => {
343 return Ok(Some(ServerMessage::Request {
344 id: req.id,
345 method: req.method,
346 params: req.params,
347 }));
348 }
349 // Unexpected responses without a pending request
350 JsonRpcMessage::Response(resp) => {
351 warn!(
352 "[CLIENT] Unexpected response (no pending request): id={}",
353 resp.id
354 );
355 }
356 JsonRpcMessage::Error(err) => {
357 warn!(
358 "[CLIENT] Unexpected error (no pending request): id={} code={}",
359 err.id, err.error.code
360 );
361 }
362 }
363 }
364 }
365
366 /// Return an async event stream over [`ServerMessage`]s.
367 ///
368 /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
369 /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
370 /// gather all messages until EOF.
371 pub fn events(&mut self) -> EventStream<'_> {
372 EventStream { client: self }
373 }
374
375 /// Take the stderr reader (can only be called once).
376 ///
377 /// Useful for logging or diagnostics. Returns `None` on subsequent calls.
378 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
379 self.stderr.take()
380 }
381
382 /// Get the process ID.
383 pub fn pid(&self) -> Option<u32> {
384 self.child.id()
385 }
386
387 /// Check if the child process is still running.
388 pub fn is_alive(&mut self) -> bool {
389 self.child.try_wait().ok().flatten().is_none()
390 }
391
392 /// Shut down the app-server process.
393 ///
394 /// Consumes the client. If you don't call this explicitly, the
395 /// [`Drop`] implementation will kill the process automatically.
396 pub async fn shutdown(mut self) -> Result<()> {
397 debug!("[CLIENT] Shutting down");
398 self.child.kill().await.map_err(Error::Io)?;
399 Ok(())
400 }
401
402 // -- internal --
403
404 async fn send_notification(&mut self, method: &str) -> Result<()> {
405 let notif = JsonRpcNotification {
406 method: method.to_string(),
407 params: None,
408 };
409 self.send_raw(¬if).await
410 }
411
412 async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
413 let json = serde_json::to_string(msg).map_err(Error::Json)?;
414 debug!("[CLIENT] Sending: {}", json);
415 self.writer
416 .write_all(json.as_bytes())
417 .await
418 .map_err(Error::Io)?;
419 self.writer.write_all(b"\n").await.map_err(Error::Io)?;
420 self.writer.flush().await.map_err(Error::Io)?;
421 Ok(())
422 }
423
424 async fn read_message(&mut self) -> Result<JsonRpcMessage> {
425 self.read_message_opt().await?.ok_or(Error::ServerClosed)
426 }
427
428 async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
429 let mut line = String::new();
430
431 loop {
432 line.clear();
433 let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
434
435 if bytes_read == 0 {
436 debug!("[CLIENT] Stream closed (EOF)");
437 return Ok(None);
438 }
439
440 let trimmed = line.trim();
441 if trimmed.is_empty() {
442 continue;
443 }
444
445 debug!("[CLIENT] Received: {}", trimmed);
446
447 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
448 Ok(msg) => return Ok(Some(msg)),
449 Err(e) => {
450 warn!(
451 "[CLIENT] Failed to deserialize message. \
452 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
453 );
454 warn!("[CLIENT] Parse error: {}", e);
455 warn!("[CLIENT] Raw: {}", trimmed);
456 return Err(Error::Deserialization(format!("{} (raw: {})", e, trimmed)));
457 }
458 }
459 }
460 }
461}
462
463impl Drop for AsyncClient {
464 fn drop(&mut self) {
465 if self.is_alive() {
466 if let Err(e) = self.child.start_kill() {
467 error!("Failed to kill app-server process on drop: {}", e);
468 }
469 }
470 }
471}
472
473/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
474pub struct EventStream<'a> {
475 client: &'a mut AsyncClient,
476}
477
478impl EventStream<'_> {
479 /// Get the next server message.
480 pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
481 match self.client.next_message().await {
482 Ok(Some(msg)) => Some(Ok(msg)),
483 Ok(None) => None,
484 Err(e) => Some(Err(e)),
485 }
486 }
487
488 /// Collect all remaining messages.
489 pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
490 let mut msgs = Vec::new();
491 while let Some(result) = self.next().await {
492 msgs.push(result?);
493 }
494 Ok(msgs)
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_buffer_size() {
504 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
505 }
506}