codex_codes/client_async.rs
1//! Asynchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! # Lifecycle
8//!
9//! 1. Create a client with [`AsyncClient::start`] (spawns the app-server process)
10//! 2. Call [`AsyncClient::thread_start`] to create a conversation session
11//! 3. Call [`AsyncClient::turn_start`] to send user input
12//! 4. Consume [`AsyncClient::next_message`] to stream notifications
13//! 5. Handle approval requests via [`AsyncClient::respond`]
14//! 6. Repeat steps 3-5 for follow-up turns
15//! 7. The client kills the app-server on [`Drop`]
16//!
17//! # Example
18//!
19//! ```ignore
20//! use codex_codes::{AsyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
21//!
22//! let mut client = AsyncClient::start().await?;
23//! let thread = client.thread_start(&ThreadStartParams::default()).await?;
24//!
25//! client.turn_start(&TurnStartParams {
26//! thread_id: thread.thread_id.clone(),
27//! input: vec![UserInput::Text { text: "Hello!".into() }],
28//! model: None,
29//! reasoning_effort: None,
30//! sandbox_policy: None,
31//! }).await?;
32//!
33//! while let Some(msg) = client.next_message().await? {
34//! match msg {
35//! ServerMessage::Notification { method, params } => {
36//! if method == "turn/completed" { break; }
37//! }
38//! ServerMessage::Request { id, method, .. } => {
39//! client.respond(id, &serde_json::json!({"decision": "accept"})).await?;
40//! }
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, RequestId};
48use crate::protocol::{
49 ServerMessage, ThreadArchiveParams, ThreadArchiveResponse, ThreadStartParams,
50 ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse, TurnStartParams,
51 TurnStartResponse,
52};
53use log::{debug, error, warn};
54use serde::de::DeserializeOwned;
55use serde::Serialize;
56use std::collections::VecDeque;
57use std::sync::atomic::{AtomicI64, Ordering};
58use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
59use tokio::process::{Child, ChildStderr};
60
61/// Buffer size for reading stdout (10MB).
62const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
63
64/// Asynchronous multi-turn client for the Codex app-server.
65///
66/// Communicates with a long-lived `codex app-server` process via
67/// newline-delimited JSON-RPC over stdio. Manages request/response
68/// correlation and buffers incoming notifications that arrive while
69/// waiting for RPC responses.
70///
71/// The client automatically kills the app-server process when dropped.
72pub struct AsyncClient {
73 child: Child,
74 writer: BufWriter<tokio::process::ChildStdin>,
75 reader: BufReader<tokio::process::ChildStdout>,
76 stderr: Option<BufReader<ChildStderr>>,
77 next_id: AtomicI64,
78 /// Buffered incoming messages (notifications/server requests) that arrived
79 /// while waiting for a response to a client request.
80 buffered: VecDeque<ServerMessage>,
81}
82
83impl AsyncClient {
84 /// Start an app-server with default settings.
85 ///
86 /// Spawns `codex app-server --listen stdio://` and returns a connected client.
87 ///
88 /// # Errors
89 ///
90 /// Returns an error if the `codex` CLI is not installed, the version is
91 /// incompatible, or the process fails to start.
92 pub async fn start() -> Result<Self> {
93 Self::start_with(AppServerBuilder::new()).await
94 }
95
96 /// Start an app-server with a custom [`AppServerBuilder`].
97 ///
98 /// Use this to configure a custom binary path or working directory.
99 ///
100 /// # Errors
101 ///
102 /// Returns an error if the process fails to start or stdio pipes
103 /// cannot be established.
104 pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
105 crate::version::check_codex_version_async().await?;
106
107 let mut child = builder.spawn().await?;
108
109 let stdin = child
110 .stdin
111 .take()
112 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
113 let stdout = child
114 .stdout
115 .take()
116 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
117 let stderr = child.stderr.take().map(BufReader::new);
118
119 Ok(Self {
120 child,
121 writer: BufWriter::new(stdin),
122 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
123 stderr,
124 next_id: AtomicI64::new(1),
125 buffered: VecDeque::new(),
126 })
127 }
128
129 /// Send a JSON-RPC request and wait for the matching response.
130 ///
131 /// Any notifications or server requests that arrive before the response
132 /// are buffered and can be retrieved via [`AsyncClient::next_message`].
133 ///
134 /// # Errors
135 ///
136 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
137 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
138 /// - [`Error::Json`] if response deserialization fails
139 pub async fn request<P: Serialize, R: DeserializeOwned>(
140 &mut self,
141 method: &str,
142 params: &P,
143 ) -> Result<R> {
144 let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
145
146 let req = JsonRpcRequest {
147 id: id.clone(),
148 method: method.to_string(),
149 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
150 };
151
152 self.send_raw(&req).await?;
153
154 // Read lines until we get a response matching our id
155 loop {
156 let msg = self.read_message().await?;
157 match msg {
158 JsonRpcMessage::Response(resp) if resp.id == id => {
159 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
160 return Ok(result);
161 }
162 JsonRpcMessage::Error(err) if err.id == id => {
163 return Err(Error::JsonRpc {
164 code: err.error.code,
165 message: err.error.message,
166 });
167 }
168 // Buffer notifications and server requests
169 JsonRpcMessage::Notification(notif) => {
170 self.buffered.push_back(ServerMessage::Notification {
171 method: notif.method,
172 params: notif.params,
173 });
174 }
175 JsonRpcMessage::Request(req) => {
176 self.buffered.push_back(ServerMessage::Request {
177 id: req.id,
178 method: req.method,
179 params: req.params,
180 });
181 }
182 // Response/error for a different id — unexpected
183 JsonRpcMessage::Response(resp) => {
184 warn!(
185 "[CLIENT] Unexpected response for id={}, expected id={}",
186 resp.id, id
187 );
188 }
189 JsonRpcMessage::Error(err) => {
190 warn!(
191 "[CLIENT] Unexpected error for id={}, expected id={}",
192 err.id, id
193 );
194 }
195 }
196 }
197 }
198
199 /// Start a new thread (conversation session).
200 ///
201 /// A thread must be created before any turns can be started. The returned
202 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
203 pub async fn thread_start(
204 &mut self,
205 params: &ThreadStartParams,
206 ) -> Result<ThreadStartResponse> {
207 self.request(crate::protocol::methods::THREAD_START, params)
208 .await
209 }
210
211 /// Start a new turn within a thread.
212 ///
213 /// Sends user input to the agent. After calling this, use [`AsyncClient::next_message`]
214 /// to stream notifications until `turn/completed` arrives.
215 pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
216 self.request(crate::protocol::methods::TURN_START, params)
217 .await
218 }
219
220 /// Interrupt an active turn.
221 pub async fn turn_interrupt(
222 &mut self,
223 params: &TurnInterruptParams,
224 ) -> Result<TurnInterruptResponse> {
225 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
226 .await
227 }
228
229 /// Archive a thread.
230 pub async fn thread_archive(
231 &mut self,
232 params: &ThreadArchiveParams,
233 ) -> Result<ThreadArchiveResponse> {
234 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
235 .await
236 }
237
238 /// Respond to a server-to-client request (e.g., approval flow).
239 ///
240 /// When the server sends a [`ServerMessage::Request`], it expects a response.
241 /// Use this method with the request's `id` and a result payload. For command
242 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
243 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
244 pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
245 let resp = JsonRpcResponse {
246 id,
247 result: serde_json::to_value(result).map_err(Error::Json)?,
248 };
249 self.send_raw(&resp).await
250 }
251
252 /// Respond to a server-to-client request with an error.
253 pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
254 let err = JsonRpcError {
255 id,
256 error: crate::jsonrpc::JsonRpcErrorData {
257 code,
258 message: message.to_string(),
259 data: None,
260 },
261 };
262 self.send_raw(&err).await
263 }
264
265 /// Read the next incoming server message (notification or server request).
266 ///
267 /// Returns buffered messages first (from notifications that arrived during
268 /// an [`AsyncClient::request`] call), then reads from the wire.
269 ///
270 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
271 ///
272 /// # Typical notification methods
273 ///
274 /// | Method | Meaning |
275 /// |--------|---------|
276 /// | `turn/started` | Agent began processing |
277 /// | `item/agentMessage/delta` | Streaming text chunk |
278 /// | `item/commandExecution/outputDelta` | Command output chunk |
279 /// | `item/started` / `item/completed` | Item lifecycle |
280 /// | `turn/completed` | Agent finished the turn |
281 /// | `error` | Server-side error |
282 pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
283 // Drain buffered messages first
284 if let Some(msg) = self.buffered.pop_front() {
285 return Ok(Some(msg));
286 }
287
288 // Read from the wire
289 loop {
290 let msg = match self.read_message_opt().await? {
291 Some(m) => m,
292 None => return Ok(None),
293 };
294
295 match msg {
296 JsonRpcMessage::Notification(notif) => {
297 return Ok(Some(ServerMessage::Notification {
298 method: notif.method,
299 params: notif.params,
300 }));
301 }
302 JsonRpcMessage::Request(req) => {
303 return Ok(Some(ServerMessage::Request {
304 id: req.id,
305 method: req.method,
306 params: req.params,
307 }));
308 }
309 // Unexpected responses without a pending request
310 JsonRpcMessage::Response(resp) => {
311 warn!(
312 "[CLIENT] Unexpected response (no pending request): id={}",
313 resp.id
314 );
315 }
316 JsonRpcMessage::Error(err) => {
317 warn!(
318 "[CLIENT] Unexpected error (no pending request): id={} code={}",
319 err.id, err.error.code
320 );
321 }
322 }
323 }
324 }
325
326 /// Return an async event stream over [`ServerMessage`]s.
327 ///
328 /// Wraps [`AsyncClient::next_message`] in a stream-like API. Call
329 /// [`EventStream::next`] in a loop, or [`EventStream::collect`] to
330 /// gather all messages until EOF.
331 pub fn events(&mut self) -> EventStream<'_> {
332 EventStream { client: self }
333 }
334
335 /// Take the stderr reader (can only be called once).
336 ///
337 /// Useful for logging or diagnostics. Returns `None` on subsequent calls.
338 pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
339 self.stderr.take()
340 }
341
342 /// Get the process ID.
343 pub fn pid(&self) -> Option<u32> {
344 self.child.id()
345 }
346
347 /// Check if the child process is still running.
348 pub fn is_alive(&mut self) -> bool {
349 self.child.try_wait().ok().flatten().is_none()
350 }
351
352 /// Shut down the app-server process.
353 ///
354 /// Consumes the client. If you don't call this explicitly, the
355 /// [`Drop`] implementation will kill the process automatically.
356 pub async fn shutdown(mut self) -> Result<()> {
357 debug!("[CLIENT] Shutting down");
358 self.child.kill().await.map_err(Error::Io)?;
359 Ok(())
360 }
361
362 // -- internal --
363
364 async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
365 let json = serde_json::to_string(msg).map_err(Error::Json)?;
366 debug!("[CLIENT] Sending: {}", json);
367 self.writer
368 .write_all(json.as_bytes())
369 .await
370 .map_err(Error::Io)?;
371 self.writer.write_all(b"\n").await.map_err(Error::Io)?;
372 self.writer.flush().await.map_err(Error::Io)?;
373 Ok(())
374 }
375
376 async fn read_message(&mut self) -> Result<JsonRpcMessage> {
377 self.read_message_opt().await?.ok_or(Error::ServerClosed)
378 }
379
380 async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
381 let mut line = String::new();
382
383 loop {
384 line.clear();
385 let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
386
387 if bytes_read == 0 {
388 debug!("[CLIENT] Stream closed (EOF)");
389 return Ok(None);
390 }
391
392 let trimmed = line.trim();
393 if trimmed.is_empty() {
394 continue;
395 }
396
397 debug!("[CLIENT] Received: {}", trimmed);
398
399 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
400 Ok(msg) => return Ok(Some(msg)),
401 Err(e) => {
402 warn!(
403 "[CLIENT] Failed to deserialize message. \
404 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
405 );
406 warn!("[CLIENT] Parse error: {}", e);
407 warn!("[CLIENT] Raw: {}", trimmed);
408 return Err(Error::Deserialization(format!("{} (raw: {})", e, trimmed)));
409 }
410 }
411 }
412 }
413}
414
415impl Drop for AsyncClient {
416 fn drop(&mut self) {
417 if self.is_alive() {
418 if let Err(e) = self.child.start_kill() {
419 error!("Failed to kill app-server process on drop: {}", e);
420 }
421 }
422 }
423}
424
425/// Async stream of [`ServerMessage`]s from an [`AsyncClient`].
426pub struct EventStream<'a> {
427 client: &'a mut AsyncClient,
428}
429
430impl EventStream<'_> {
431 /// Get the next server message.
432 pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
433 match self.client.next_message().await {
434 Ok(Some(msg)) => Some(Ok(msg)),
435 Ok(None) => None,
436 Err(e) => Some(Err(e)),
437 }
438 }
439
440 /// Collect all remaining messages.
441 pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
442 let mut msgs = Vec::new();
443 while let Some(result) = self.next().await {
444 msgs.push(result?);
445 }
446 Ok(msgs)
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_buffer_size() {
456 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
457 }
458}