codex_codes/client_sync.rs
1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`] (spawns and initializes the app-server)
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//! thread_id: thread.thread_id().to_string(),
29//! input: vec![UserInput::Text { text: "Hello!".into() }],
30//! model: None,
31//! reasoning_effort: None,
32//! sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//! match result? {
37//! ServerMessage::Notification { method, .. } => {
38//! if method == "turn/completed" { break; }
39//! }
40//! _ => {}
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::protocol::{
51 ClientInfo, InitializeParams, InitializeResponse, ServerMessage, ThreadArchiveParams,
52 ThreadArchiveResponse, ThreadStartParams, ThreadStartResponse, TurnInterruptParams,
53 TurnInterruptResponse, TurnStartParams, TurnStartResponse,
54};
55use log::{debug, warn};
56use serde::de::DeserializeOwned;
57use serde::Serialize;
58use std::collections::VecDeque;
59use std::io::{BufRead, BufReader, BufWriter, Write};
60use std::process::Child;
61
62/// Buffer size for reading stdout (10MB).
63const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
64
65/// Synchronous multi-turn client for the Codex app-server.
66///
67/// Communicates with a long-lived `codex app-server` process via
68/// newline-delimited JSON-RPC over stdio. Manages request/response
69/// correlation and buffers incoming notifications that arrive while
70/// waiting for RPC responses.
71///
72/// The client automatically kills the app-server process when dropped.
73pub struct SyncClient {
74 child: Child,
75 writer: BufWriter<std::process::ChildStdin>,
76 reader: BufReader<std::process::ChildStdout>,
77 next_id: i64,
78 buffered: VecDeque<ServerMessage>,
79}
80
81impl SyncClient {
82 /// Start an app-server with default settings.
83 ///
84 /// Spawns `codex app-server --listen stdio://`, performs the required
85 /// `initialize` handshake, and returns a connected client ready for
86 /// `thread_start()`.
87 ///
88 /// # Errors
89 ///
90 /// Returns an error if the `codex` CLI is not installed, the version is
91 /// incompatible, the process fails to start, or the initialization
92 /// handshake fails.
93 pub fn start() -> Result<Self> {
94 Self::start_with(AppServerBuilder::new())
95 }
96
97 /// Start an app-server with a custom [`AppServerBuilder`].
98 ///
99 /// Performs the required `initialize` handshake before returning.
100 /// Use this to configure a custom binary path or working directory.
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if the process fails to start, stdio pipes
105 /// cannot be established, or the initialization handshake fails.
106 pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
107 let mut client = Self::spawn(builder)?;
108 client.initialize(&InitializeParams {
109 client_info: ClientInfo {
110 name: "codex-codes".to_string(),
111 version: env!("CARGO_PKG_VERSION").to_string(),
112 title: None,
113 },
114 capabilities: None,
115 })?;
116 Ok(client)
117 }
118
119 /// Spawn an app-server without performing the `initialize` handshake.
120 ///
121 /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
122 /// specific capabilities). You **must** call [`SyncClient::initialize`]
123 /// before any other requests.
124 pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
125 crate::version::check_codex_version()?;
126
127 let mut child = builder.spawn_sync().map_err(Error::Io)?;
128
129 let stdin = child
130 .stdin
131 .take()
132 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
133 let stdout = child
134 .stdout
135 .take()
136 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
137
138 Ok(Self {
139 child,
140 writer: BufWriter::new(stdin),
141 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
142 next_id: 1,
143 buffered: VecDeque::new(),
144 })
145 }
146
147 /// Send a JSON-RPC request and wait for the matching response.
148 ///
149 /// Any notifications or server requests that arrive before the response
150 /// are buffered and can be retrieved via [`SyncClient::next_message`].
151 ///
152 /// # Errors
153 ///
154 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
155 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
156 /// - [`Error::Json`] if response deserialization fails
157 pub fn request<P: Serialize, R: DeserializeOwned>(
158 &mut self,
159 method: &str,
160 params: &P,
161 ) -> Result<R> {
162 let id = RequestId::Integer(self.next_id);
163 self.next_id += 1;
164
165 let req = JsonRpcRequest {
166 id: id.clone(),
167 method: method.to_string(),
168 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
169 };
170
171 self.send_raw(&req)?;
172
173 loop {
174 let msg = self.read_message()?;
175 match msg {
176 JsonRpcMessage::Response(resp) if resp.id == id => {
177 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
178 return Ok(result);
179 }
180 JsonRpcMessage::Error(err) if err.id == id => {
181 return Err(Error::JsonRpc {
182 code: err.error.code,
183 message: err.error.message,
184 });
185 }
186 JsonRpcMessage::Notification(notif) => {
187 self.buffered.push_back(ServerMessage::Notification {
188 method: notif.method,
189 params: notif.params,
190 });
191 }
192 JsonRpcMessage::Request(req) => {
193 self.buffered.push_back(ServerMessage::Request {
194 id: req.id,
195 method: req.method,
196 params: req.params,
197 });
198 }
199 JsonRpcMessage::Response(resp) => {
200 warn!(
201 "[CLIENT] Unexpected response for id={}, expected id={}",
202 resp.id, id
203 );
204 }
205 JsonRpcMessage::Error(err) => {
206 warn!(
207 "[CLIENT] Unexpected error for id={}, expected id={}",
208 err.id, id
209 );
210 }
211 }
212 }
213 }
214
215 /// Start a new thread (conversation session).
216 ///
217 /// A thread must be created before any turns can be started. The returned
218 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
219 pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
220 self.request(crate::protocol::methods::THREAD_START, params)
221 }
222
223 /// Start a new turn within a thread.
224 ///
225 /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
226 /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
227 pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
228 self.request(crate::protocol::methods::TURN_START, params)
229 }
230
231 /// Interrupt an active turn.
232 pub fn turn_interrupt(
233 &mut self,
234 params: &TurnInterruptParams,
235 ) -> Result<TurnInterruptResponse> {
236 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
237 }
238
239 /// Archive a thread.
240 pub fn thread_archive(
241 &mut self,
242 params: &ThreadArchiveParams,
243 ) -> Result<ThreadArchiveResponse> {
244 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
245 }
246
247 /// Perform the `initialize` handshake with the app-server.
248 ///
249 /// Sends `initialize` with the given params and then sends the
250 /// `initialized` notification. This must be the first request after
251 /// spawning the process.
252 pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
253 let resp: InitializeResponse =
254 self.request(crate::protocol::methods::INITIALIZE, params)?;
255 self.send_notification(crate::protocol::methods::INITIALIZED)?;
256 Ok(resp)
257 }
258
259 /// Respond to a server-to-client request (e.g., approval flow).
260 ///
261 /// When the server sends a [`ServerMessage::Request`], it expects a response.
262 /// Use this method with the request's `id` and a result payload. For command
263 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
264 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
265 pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
266 let resp = JsonRpcResponse {
267 id,
268 result: serde_json::to_value(result).map_err(Error::Json)?,
269 };
270 self.send_raw(&resp)
271 }
272
273 /// Respond to a server-to-client request with an error.
274 pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
275 let err = JsonRpcError {
276 id,
277 error: crate::jsonrpc::JsonRpcErrorData {
278 code,
279 message: message.to_string(),
280 data: None,
281 },
282 };
283 self.send_raw(&err)
284 }
285
286 /// Read the next incoming server message (notification or server request).
287 ///
288 /// Returns buffered messages first (from notifications that arrived during
289 /// a [`SyncClient::request`] call), then reads from the wire.
290 ///
291 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
292 pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
293 if let Some(msg) = self.buffered.pop_front() {
294 return Ok(Some(msg));
295 }
296
297 loop {
298 let msg = match self.read_message_opt()? {
299 Some(m) => m,
300 None => return Ok(None),
301 };
302
303 match msg {
304 JsonRpcMessage::Notification(notif) => {
305 return Ok(Some(ServerMessage::Notification {
306 method: notif.method,
307 params: notif.params,
308 }));
309 }
310 JsonRpcMessage::Request(req) => {
311 return Ok(Some(ServerMessage::Request {
312 id: req.id,
313 method: req.method,
314 params: req.params,
315 }));
316 }
317 JsonRpcMessage::Response(resp) => {
318 warn!(
319 "[CLIENT] Unexpected response (no pending request): id={}",
320 resp.id
321 );
322 }
323 JsonRpcMessage::Error(err) => {
324 warn!(
325 "[CLIENT] Unexpected error (no pending request): id={} code={}",
326 err.id, err.error.code
327 );
328 }
329 }
330 }
331 }
332
333 /// Return an iterator over [`ServerMessage`]s.
334 ///
335 /// The iterator yields `Result<ServerMessage>` and terminates when the
336 /// connection closes (EOF). This is the idiomatic way to consume a turn's
337 /// notifications in synchronous code.
338 pub fn events(&mut self) -> EventIterator<'_> {
339 EventIterator { client: self }
340 }
341
342 /// Shut down the child process.
343 ///
344 /// Kills the process if it's still running. Called automatically on [`Drop`].
345 pub fn shutdown(&mut self) -> Result<()> {
346 debug!("[CLIENT] Shutting down");
347 match self.child.try_wait() {
348 Ok(Some(_)) => Ok(()),
349 Ok(None) => {
350 self.child.kill().map_err(Error::Io)?;
351 self.child.wait().map_err(Error::Io)?;
352 Ok(())
353 }
354 Err(e) => Err(Error::Io(e)),
355 }
356 }
357
358 // -- internal --
359
360 fn send_notification(&mut self, method: &str) -> Result<()> {
361 let notif = JsonRpcNotification {
362 method: method.to_string(),
363 params: None,
364 };
365 self.send_raw(¬if)
366 }
367
368 fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
369 let json = serde_json::to_string(msg).map_err(Error::Json)?;
370 debug!("[CLIENT] Sending: {}", json);
371 self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
372 self.writer.write_all(b"\n").map_err(Error::Io)?;
373 self.writer.flush().map_err(Error::Io)?;
374 Ok(())
375 }
376
377 fn read_message(&mut self) -> Result<JsonRpcMessage> {
378 self.read_message_opt()?.ok_or(Error::ServerClosed)
379 }
380
381 fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
382 loop {
383 let mut line = String::new();
384 match self.reader.read_line(&mut line) {
385 Ok(0) => {
386 debug!("[CLIENT] Stream closed (EOF)");
387 return Ok(None);
388 }
389 Ok(_) => {
390 let trimmed = line.trim();
391 if trimmed.is_empty() {
392 continue;
393 }
394
395 debug!("[CLIENT] Received: {}", trimmed);
396
397 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
398 Ok(msg) => return Ok(Some(msg)),
399 Err(e) => {
400 warn!(
401 "[CLIENT] Failed to deserialize message. \
402 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
403 );
404 warn!("[CLIENT] Parse error: {}", e);
405 warn!("[CLIENT] Raw: {}", trimmed);
406 return Err(Error::Deserialization(format!(
407 "{} (raw: {})",
408 e, trimmed
409 )));
410 }
411 }
412 }
413 Err(e) => {
414 debug!("[CLIENT] Error reading stdout: {}", e);
415 return Err(Error::Io(e));
416 }
417 }
418 }
419 }
420}
421
422impl Drop for SyncClient {
423 fn drop(&mut self) {
424 if let Err(e) = self.shutdown() {
425 debug!("[CLIENT] Error during shutdown: {}", e);
426 }
427 }
428}
429
430/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
431pub struct EventIterator<'a> {
432 client: &'a mut SyncClient,
433}
434
435impl Iterator for EventIterator<'_> {
436 type Item = Result<ServerMessage>;
437
438 fn next(&mut self) -> Option<Self::Item> {
439 match self.client.next_message() {
440 Ok(Some(msg)) => Some(Ok(msg)),
441 Ok(None) => None,
442 Err(e) => Some(Err(e)),
443 }
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 #[test]
452 fn test_buffer_size() {
453 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
454 }
455}