codex_codes/client_sync.rs
1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`] (spawns and initializes the app-server)
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//! thread_id: thread.thread_id().to_string(),
29//! input: vec![UserInput::Text { text: "Hello!".into() }],
30//! model: None,
31//! reasoning_effort: None,
32//! sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//! match result? {
37//! ServerMessage::Notification(n) => {
38//! if let codex_codes::Notification::TurnCompleted(_) = n { break; }
39//! }
40//! _ => {}
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52 ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53 ThreadForkParams, ThreadForkResponse, ThreadResumeParams, ThreadResumeResponse,
54 ThreadStartParams, ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse,
55 TurnStartParams, TurnStartResponse,
56};
57use log::{debug, warn};
58use serde::de::DeserializeOwned;
59use serde::Serialize;
60use std::collections::VecDeque;
61use std::io::{BufRead, BufReader, BufWriter, Write};
62use std::process::Child;
63
64/// Buffer size for reading stdout (10MB).
65const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
66
67/// Synchronous multi-turn client for the Codex app-server.
68///
69/// Communicates with a long-lived `codex app-server` process via
70/// newline-delimited JSON-RPC over stdio. Manages request/response
71/// correlation and buffers incoming notifications that arrive while
72/// waiting for RPC responses.
73///
74/// The client automatically kills the app-server process when dropped.
75pub struct SyncClient {
76 child: Child,
77 writer: BufWriter<std::process::ChildStdin>,
78 reader: BufReader<std::process::ChildStdout>,
79 /// Handle to the background thread draining the child's stderr pipe.
80 /// Kept alive for the lifetime of the client; the thread exits on EOF
81 /// when the child is killed.
82 _stderr_drain: std::thread::JoinHandle<()>,
83 next_id: i64,
84 buffered: VecDeque<ServerMessage>,
85}
86
87impl SyncClient {
88 /// Start an app-server with default settings.
89 ///
90 /// Spawns `codex app-server --listen stdio://`, performs the required
91 /// `initialize` handshake, and returns a connected client ready for
92 /// `thread_start()`.
93 ///
94 /// # Errors
95 ///
96 /// Returns an error if the `codex` CLI is not installed, the version is
97 /// incompatible, the process fails to start, or the initialization
98 /// handshake fails.
99 pub fn start() -> Result<Self> {
100 Self::start_with(AppServerBuilder::new())
101 }
102
103 /// Start an app-server with a custom [`AppServerBuilder`].
104 ///
105 /// Performs the required `initialize` handshake before returning.
106 /// Use this to configure a custom binary path or working directory.
107 ///
108 /// # Errors
109 ///
110 /// Returns an error if the process fails to start, stdio pipes
111 /// cannot be established, or the initialization handshake fails.
112 pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
113 let mut client = Self::spawn(builder)?;
114 client.initialize(&InitializeParams {
115 client_info: ClientInfo {
116 name: "codex-codes".to_string(),
117 version: env!("CARGO_PKG_VERSION").to_string(),
118 title: None,
119 },
120 capabilities: None,
121 })?;
122 Ok(client)
123 }
124
125 /// Spawn an app-server without performing the `initialize` handshake.
126 ///
127 /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
128 /// specific capabilities). You **must** call [`SyncClient::initialize`]
129 /// before any other requests.
130 pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
131 crate::version::check_codex_version()?;
132
133 let mut child = builder.spawn_sync()?;
134
135 let stdin = child
136 .stdin
137 .take()
138 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
139 let stdout = child
140 .stdout
141 .take()
142 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
143 let stderr = child
144 .stderr
145 .take()
146 .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
147
148 // The app-server emits ~200 KB/s of tracing to stderr. Without an
149 // active reader, the ~64 KB kernel pipe fills almost instantly and
150 // the child blocks. Drain in the background and route lines through
151 // the `log` crate (see [`crate::stderr_drain`]).
152 let stderr_drain = crate::stderr_drain::spawn_sync(stderr);
153
154 Ok(Self {
155 child,
156 writer: BufWriter::new(stdin),
157 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
158 _stderr_drain: stderr_drain,
159 next_id: 1,
160 buffered: VecDeque::new(),
161 })
162 }
163
164 /// Send a JSON-RPC request and wait for the matching response.
165 ///
166 /// Any notifications or server requests that arrive before the response
167 /// are buffered and can be retrieved via [`SyncClient::next_message`].
168 ///
169 /// # Errors
170 ///
171 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
172 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
173 /// - [`Error::Json`] if response deserialization fails
174 pub fn request<P: Serialize, R: DeserializeOwned>(
175 &mut self,
176 method: &str,
177 params: &P,
178 ) -> Result<R> {
179 let id = RequestId::Integer(self.next_id);
180 self.next_id += 1;
181
182 let req = JsonRpcRequest {
183 id: id.clone(),
184 method: method.to_string(),
185 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
186 };
187
188 self.send_raw(&req)?;
189
190 loop {
191 let msg = self.read_message()?;
192 match msg {
193 JsonRpcMessage::Response(resp) if resp.id == id => {
194 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
195 return Ok(result);
196 }
197 JsonRpcMessage::Error(err) if err.id == id => {
198 return Err(Error::JsonRpc {
199 code: err.error.code,
200 message: err.error.message,
201 });
202 }
203 JsonRpcMessage::Notification(notif) => {
204 let typed = Notification::from_envelope(¬if.method, notif.params)
205 .map_err(Error::Json)?;
206 self.buffered.push_back(ServerMessage::Notification(typed));
207 }
208 JsonRpcMessage::Request(req) => {
209 let typed = ServerRequest::from_envelope(&req.method, req.params)
210 .map_err(Error::Json)?;
211 self.buffered.push_back(ServerMessage::Request {
212 id: req.id,
213 request: typed,
214 });
215 }
216 JsonRpcMessage::Response(resp) => {
217 warn!(
218 "[CLIENT] Unexpected response for id={}, expected id={}",
219 resp.id, id
220 );
221 }
222 JsonRpcMessage::Error(err) => {
223 warn!(
224 "[CLIENT] Unexpected error for id={}, expected id={}",
225 err.id, id
226 );
227 }
228 }
229 }
230 }
231
232 /// Start a new thread (conversation session).
233 ///
234 /// A thread must be created before any turns can be started. The returned
235 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
236 pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
237 self.request(crate::protocol::methods::THREAD_START, params)
238 }
239
240 /// Resume a previously persisted thread by id.
241 ///
242 /// Replays the thread's history so turns can continue where they left off.
243 pub fn thread_resume(&mut self, params: &ThreadResumeParams) -> Result<ThreadResumeResponse> {
244 self.request(crate::protocol::methods::THREAD_RESUME, params)
245 }
246
247 /// Fork an existing thread into a new independent thread.
248 pub fn thread_fork(&mut self, params: &ThreadForkParams) -> Result<ThreadForkResponse> {
249 self.request(crate::protocol::methods::THREAD_FORK, params)
250 }
251
252 /// Start a new turn within a thread.
253 ///
254 /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
255 /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
256 pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
257 self.request(crate::protocol::methods::TURN_START, params)
258 }
259
260 /// Interrupt an active turn.
261 pub fn turn_interrupt(
262 &mut self,
263 params: &TurnInterruptParams,
264 ) -> Result<TurnInterruptResponse> {
265 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
266 }
267
268 /// Archive a thread.
269 pub fn thread_archive(
270 &mut self,
271 params: &ThreadArchiveParams,
272 ) -> Result<ThreadArchiveResponse> {
273 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
274 }
275
276 /// Perform the `initialize` handshake with the app-server.
277 ///
278 /// Sends `initialize` with the given params and then sends the
279 /// `initialized` notification. This must be the first request after
280 /// spawning the process.
281 pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
282 let resp: InitializeResponse =
283 self.request(crate::protocol::methods::INITIALIZE, params)?;
284 self.send_notification(crate::protocol::methods::INITIALIZED)?;
285 Ok(resp)
286 }
287
288 /// Respond to a server-to-client request (e.g., approval flow).
289 ///
290 /// When the server sends a [`ServerMessage::Request`], it expects a response.
291 /// Use this method with the request's `id` and a result payload. For command
292 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
293 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
294 pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
295 let resp = JsonRpcResponse {
296 id,
297 result: serde_json::to_value(result).map_err(Error::Json)?,
298 };
299 self.send_raw(&resp)
300 }
301
302 /// Respond to a server-to-client request with an error.
303 pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
304 let err = JsonRpcError {
305 id,
306 error: crate::jsonrpc::JsonRpcErrorData {
307 code,
308 message: message.to_string(),
309 data: None,
310 },
311 };
312 self.send_raw(&err)
313 }
314
315 /// Read the next incoming server message (notification or server request).
316 ///
317 /// Returns buffered messages first (from notifications that arrived during
318 /// a [`SyncClient::request`] call), then reads from the wire.
319 ///
320 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
321 pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
322 if let Some(msg) = self.buffered.pop_front() {
323 return Ok(Some(msg));
324 }
325
326 loop {
327 let msg = match self.read_message_opt()? {
328 Some(m) => m,
329 None => return Ok(None),
330 };
331
332 match msg {
333 JsonRpcMessage::Notification(notif) => {
334 let JsonRpcNotification { method, params } = notif;
335 let typed =
336 Notification::from_envelope(&method, params.clone()).map_err(|e| {
337 Error::Deserialization(ParseError::from_envelope(method, params, e))
338 })?;
339 return Ok(Some(ServerMessage::Notification(typed)));
340 }
341 JsonRpcMessage::Request(req) => {
342 let JsonRpcRequest { id, method, params } = req;
343 let typed =
344 ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
345 Error::Deserialization(ParseError::from_envelope(method, params, e))
346 })?;
347 return Ok(Some(ServerMessage::Request { id, request: typed }));
348 }
349 JsonRpcMessage::Response(resp) => {
350 warn!(
351 "[CLIENT] Unexpected response (no pending request): id={}",
352 resp.id
353 );
354 }
355 JsonRpcMessage::Error(err) => {
356 warn!(
357 "[CLIENT] Unexpected error (no pending request): id={} code={}",
358 err.id, err.error.code
359 );
360 }
361 }
362 }
363 }
364
365 /// Return an iterator over [`ServerMessage`]s.
366 ///
367 /// The iterator yields `Result<ServerMessage>` and terminates when the
368 /// connection closes (EOF). This is the idiomatic way to consume a turn's
369 /// notifications in synchronous code.
370 pub fn events(&mut self) -> EventIterator<'_> {
371 EventIterator { client: self }
372 }
373
374 /// Shut down the child process.
375 ///
376 /// Kills the process if it's still running. Called automatically on [`Drop`].
377 pub fn shutdown(&mut self) -> Result<()> {
378 debug!("[CLIENT] Shutting down");
379 match self.child.try_wait() {
380 Ok(Some(_)) => Ok(()),
381 Ok(None) => {
382 self.child.kill().map_err(Error::Io)?;
383 self.child.wait().map_err(Error::Io)?;
384 Ok(())
385 }
386 Err(e) => Err(Error::Io(e)),
387 }
388 }
389
390 // -- internal --
391
392 fn send_notification(&mut self, method: &str) -> Result<()> {
393 let notif = JsonRpcNotification {
394 method: method.to_string(),
395 params: None,
396 };
397 self.send_raw(¬if)
398 }
399
400 fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
401 let json = serde_json::to_string(msg).map_err(Error::Json)?;
402 debug!("[CLIENT] Sending: {}", json);
403 self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
404 self.writer.write_all(b"\n").map_err(Error::Io)?;
405 self.writer.flush().map_err(Error::Io)?;
406 Ok(())
407 }
408
409 fn read_message(&mut self) -> Result<JsonRpcMessage> {
410 self.read_message_opt()?.ok_or(Error::ServerClosed)
411 }
412
413 fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
414 loop {
415 let mut line = String::new();
416 match self.reader.read_line(&mut line) {
417 Ok(0) => {
418 debug!("[CLIENT] Stream closed (EOF)");
419 return Ok(None);
420 }
421 Ok(_) => {
422 let trimmed = line.trim();
423 if trimmed.is_empty() {
424 continue;
425 }
426
427 debug!("[CLIENT] Received: {}", trimmed);
428
429 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
430 Ok(msg) => return Ok(Some(msg)),
431 Err(e) => {
432 warn!(
433 "[CLIENT] Failed to deserialize message. \
434 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
435 );
436 warn!("[CLIENT] Parse error: {}", e);
437 warn!("[CLIENT] Raw: {}", trimmed);
438 return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
439 }
440 }
441 }
442 Err(e) => {
443 debug!("[CLIENT] Error reading stdout: {}", e);
444 return Err(Error::Io(e));
445 }
446 }
447 }
448 }
449}
450
451impl Drop for SyncClient {
452 fn drop(&mut self) {
453 if let Err(e) = self.shutdown() {
454 debug!("[CLIENT] Error during shutdown: {}", e);
455 }
456 }
457}
458
459/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
460pub struct EventIterator<'a> {
461 client: &'a mut SyncClient,
462}
463
464impl Iterator for EventIterator<'_> {
465 type Item = Result<ServerMessage>;
466
467 fn next(&mut self) -> Option<Self::Item> {
468 match self.client.next_message() {
469 Ok(Some(msg)) => Some(Ok(msg)),
470 Ok(None) => None,
471 Err(e) => Some(Err(e)),
472 }
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 #[test]
481 fn test_buffer_size() {
482 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
483 }
484}