codex_codes/client_sync.rs
1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`] (spawns and initializes the app-server)
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//! thread_id: thread.thread_id().to_string(),
29//! input: vec![UserInput::Text { text: "Hello!".into() }],
30//! model: None,
31//! reasoning_effort: None,
32//! sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//! match result? {
37//! ServerMessage::Notification(n) => {
38//! if let codex_codes::Notification::TurnCompleted(_) = n { break; }
39//! }
40//! _ => {}
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52 ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53 ThreadStartParams, ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse,
54 TurnStartParams, TurnStartResponse,
55};
56use log::{debug, warn};
57use serde::de::DeserializeOwned;
58use serde::Serialize;
59use std::collections::VecDeque;
60use std::io::{BufRead, BufReader, BufWriter, Write};
61use std::process::Child;
62
63/// Buffer size for reading stdout (10MB).
64const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
65
66/// Synchronous multi-turn client for the Codex app-server.
67///
68/// Communicates with a long-lived `codex app-server` process via
69/// newline-delimited JSON-RPC over stdio. Manages request/response
70/// correlation and buffers incoming notifications that arrive while
71/// waiting for RPC responses.
72///
73/// The client automatically kills the app-server process when dropped.
74pub struct SyncClient {
75 child: Child,
76 writer: BufWriter<std::process::ChildStdin>,
77 reader: BufReader<std::process::ChildStdout>,
78 /// Handle to the background thread draining the child's stderr pipe.
79 /// Kept alive for the lifetime of the client; the thread exits on EOF
80 /// when the child is killed.
81 _stderr_drain: std::thread::JoinHandle<()>,
82 next_id: i64,
83 buffered: VecDeque<ServerMessage>,
84}
85
86impl SyncClient {
87 /// Start an app-server with default settings.
88 ///
89 /// Spawns `codex app-server --listen stdio://`, performs the required
90 /// `initialize` handshake, and returns a connected client ready for
91 /// `thread_start()`.
92 ///
93 /// # Errors
94 ///
95 /// Returns an error if the `codex` CLI is not installed, the version is
96 /// incompatible, the process fails to start, or the initialization
97 /// handshake fails.
98 pub fn start() -> Result<Self> {
99 Self::start_with(AppServerBuilder::new())
100 }
101
102 /// Start an app-server with a custom [`AppServerBuilder`].
103 ///
104 /// Performs the required `initialize` handshake before returning.
105 /// Use this to configure a custom binary path or working directory.
106 ///
107 /// # Errors
108 ///
109 /// Returns an error if the process fails to start, stdio pipes
110 /// cannot be established, or the initialization handshake fails.
111 pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
112 let mut client = Self::spawn(builder)?;
113 client.initialize(&InitializeParams {
114 client_info: ClientInfo {
115 name: "codex-codes".to_string(),
116 version: env!("CARGO_PKG_VERSION").to_string(),
117 title: None,
118 },
119 capabilities: None,
120 })?;
121 Ok(client)
122 }
123
124 /// Spawn an app-server without performing the `initialize` handshake.
125 ///
126 /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
127 /// specific capabilities). You **must** call [`SyncClient::initialize`]
128 /// before any other requests.
129 pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
130 crate::version::check_codex_version()?;
131
132 let mut child = builder.spawn_sync()?;
133
134 let stdin = child
135 .stdin
136 .take()
137 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
138 let stdout = child
139 .stdout
140 .take()
141 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
142 let stderr = child
143 .stderr
144 .take()
145 .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
146
147 // The app-server emits ~200 KB/s of tracing to stderr. Without an
148 // active reader, the ~64 KB kernel pipe fills almost instantly and
149 // the child blocks. Drain in the background and route lines through
150 // the `log` crate (see [`crate::stderr_drain`]).
151 let stderr_drain = crate::stderr_drain::spawn_sync(stderr);
152
153 Ok(Self {
154 child,
155 writer: BufWriter::new(stdin),
156 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
157 _stderr_drain: stderr_drain,
158 next_id: 1,
159 buffered: VecDeque::new(),
160 })
161 }
162
163 /// Send a JSON-RPC request and wait for the matching response.
164 ///
165 /// Any notifications or server requests that arrive before the response
166 /// are buffered and can be retrieved via [`SyncClient::next_message`].
167 ///
168 /// # Errors
169 ///
170 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
171 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
172 /// - [`Error::Json`] if response deserialization fails
173 pub fn request<P: Serialize, R: DeserializeOwned>(
174 &mut self,
175 method: &str,
176 params: &P,
177 ) -> Result<R> {
178 let id = RequestId::Integer(self.next_id);
179 self.next_id += 1;
180
181 let req = JsonRpcRequest {
182 id: id.clone(),
183 method: method.to_string(),
184 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
185 };
186
187 self.send_raw(&req)?;
188
189 loop {
190 let msg = self.read_message()?;
191 match msg {
192 JsonRpcMessage::Response(resp) if resp.id == id => {
193 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
194 return Ok(result);
195 }
196 JsonRpcMessage::Error(err) if err.id == id => {
197 return Err(Error::JsonRpc {
198 code: err.error.code,
199 message: err.error.message,
200 });
201 }
202 JsonRpcMessage::Notification(notif) => {
203 let typed = Notification::from_envelope(¬if.method, notif.params)
204 .map_err(Error::Json)?;
205 self.buffered.push_back(ServerMessage::Notification(typed));
206 }
207 JsonRpcMessage::Request(req) => {
208 let typed = ServerRequest::from_envelope(&req.method, req.params)
209 .map_err(Error::Json)?;
210 self.buffered.push_back(ServerMessage::Request {
211 id: req.id,
212 request: typed,
213 });
214 }
215 JsonRpcMessage::Response(resp) => {
216 warn!(
217 "[CLIENT] Unexpected response for id={}, expected id={}",
218 resp.id, id
219 );
220 }
221 JsonRpcMessage::Error(err) => {
222 warn!(
223 "[CLIENT] Unexpected error for id={}, expected id={}",
224 err.id, id
225 );
226 }
227 }
228 }
229 }
230
231 /// Start a new thread (conversation session).
232 ///
233 /// A thread must be created before any turns can be started. The returned
234 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
235 pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
236 self.request(crate::protocol::methods::THREAD_START, params)
237 }
238
239 /// Start a new turn within a thread.
240 ///
241 /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
242 /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
243 pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
244 self.request(crate::protocol::methods::TURN_START, params)
245 }
246
247 /// Interrupt an active turn.
248 pub fn turn_interrupt(
249 &mut self,
250 params: &TurnInterruptParams,
251 ) -> Result<TurnInterruptResponse> {
252 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
253 }
254
255 /// Archive a thread.
256 pub fn thread_archive(
257 &mut self,
258 params: &ThreadArchiveParams,
259 ) -> Result<ThreadArchiveResponse> {
260 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
261 }
262
263 /// Perform the `initialize` handshake with the app-server.
264 ///
265 /// Sends `initialize` with the given params and then sends the
266 /// `initialized` notification. This must be the first request after
267 /// spawning the process.
268 pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
269 let resp: InitializeResponse =
270 self.request(crate::protocol::methods::INITIALIZE, params)?;
271 self.send_notification(crate::protocol::methods::INITIALIZED)?;
272 Ok(resp)
273 }
274
275 /// Respond to a server-to-client request (e.g., approval flow).
276 ///
277 /// When the server sends a [`ServerMessage::Request`], it expects a response.
278 /// Use this method with the request's `id` and a result payload. For command
279 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
280 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
281 pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
282 let resp = JsonRpcResponse {
283 id,
284 result: serde_json::to_value(result).map_err(Error::Json)?,
285 };
286 self.send_raw(&resp)
287 }
288
289 /// Respond to a server-to-client request with an error.
290 pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
291 let err = JsonRpcError {
292 id,
293 error: crate::jsonrpc::JsonRpcErrorData {
294 code,
295 message: message.to_string(),
296 data: None,
297 },
298 };
299 self.send_raw(&err)
300 }
301
302 /// Read the next incoming server message (notification or server request).
303 ///
304 /// Returns buffered messages first (from notifications that arrived during
305 /// a [`SyncClient::request`] call), then reads from the wire.
306 ///
307 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
308 pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
309 if let Some(msg) = self.buffered.pop_front() {
310 return Ok(Some(msg));
311 }
312
313 loop {
314 let msg = match self.read_message_opt()? {
315 Some(m) => m,
316 None => return Ok(None),
317 };
318
319 match msg {
320 JsonRpcMessage::Notification(notif) => {
321 let JsonRpcNotification { method, params } = notif;
322 let typed =
323 Notification::from_envelope(&method, params.clone()).map_err(|e| {
324 Error::Deserialization(ParseError::from_envelope(method, params, e))
325 })?;
326 return Ok(Some(ServerMessage::Notification(typed)));
327 }
328 JsonRpcMessage::Request(req) => {
329 let JsonRpcRequest { id, method, params } = req;
330 let typed =
331 ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
332 Error::Deserialization(ParseError::from_envelope(method, params, e))
333 })?;
334 return Ok(Some(ServerMessage::Request { id, request: typed }));
335 }
336 JsonRpcMessage::Response(resp) => {
337 warn!(
338 "[CLIENT] Unexpected response (no pending request): id={}",
339 resp.id
340 );
341 }
342 JsonRpcMessage::Error(err) => {
343 warn!(
344 "[CLIENT] Unexpected error (no pending request): id={} code={}",
345 err.id, err.error.code
346 );
347 }
348 }
349 }
350 }
351
352 /// Return an iterator over [`ServerMessage`]s.
353 ///
354 /// The iterator yields `Result<ServerMessage>` and terminates when the
355 /// connection closes (EOF). This is the idiomatic way to consume a turn's
356 /// notifications in synchronous code.
357 pub fn events(&mut self) -> EventIterator<'_> {
358 EventIterator { client: self }
359 }
360
361 /// Shut down the child process.
362 ///
363 /// Kills the process if it's still running. Called automatically on [`Drop`].
364 pub fn shutdown(&mut self) -> Result<()> {
365 debug!("[CLIENT] Shutting down");
366 match self.child.try_wait() {
367 Ok(Some(_)) => Ok(()),
368 Ok(None) => {
369 self.child.kill().map_err(Error::Io)?;
370 self.child.wait().map_err(Error::Io)?;
371 Ok(())
372 }
373 Err(e) => Err(Error::Io(e)),
374 }
375 }
376
377 // -- internal --
378
379 fn send_notification(&mut self, method: &str) -> Result<()> {
380 let notif = JsonRpcNotification {
381 method: method.to_string(),
382 params: None,
383 };
384 self.send_raw(¬if)
385 }
386
387 fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
388 let json = serde_json::to_string(msg).map_err(Error::Json)?;
389 debug!("[CLIENT] Sending: {}", json);
390 self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
391 self.writer.write_all(b"\n").map_err(Error::Io)?;
392 self.writer.flush().map_err(Error::Io)?;
393 Ok(())
394 }
395
396 fn read_message(&mut self) -> Result<JsonRpcMessage> {
397 self.read_message_opt()?.ok_or(Error::ServerClosed)
398 }
399
400 fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
401 loop {
402 let mut line = String::new();
403 match self.reader.read_line(&mut line) {
404 Ok(0) => {
405 debug!("[CLIENT] Stream closed (EOF)");
406 return Ok(None);
407 }
408 Ok(_) => {
409 let trimmed = line.trim();
410 if trimmed.is_empty() {
411 continue;
412 }
413
414 debug!("[CLIENT] Received: {}", trimmed);
415
416 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
417 Ok(msg) => return Ok(Some(msg)),
418 Err(e) => {
419 warn!(
420 "[CLIENT] Failed to deserialize message. \
421 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
422 );
423 warn!("[CLIENT] Parse error: {}", e);
424 warn!("[CLIENT] Raw: {}", trimmed);
425 return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
426 }
427 }
428 }
429 Err(e) => {
430 debug!("[CLIENT] Error reading stdout: {}", e);
431 return Err(Error::Io(e));
432 }
433 }
434 }
435 }
436}
437
438impl Drop for SyncClient {
439 fn drop(&mut self) {
440 if let Err(e) = self.shutdown() {
441 debug!("[CLIENT] Error during shutdown: {}", e);
442 }
443 }
444}
445
446/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
447pub struct EventIterator<'a> {
448 client: &'a mut SyncClient,
449}
450
451impl Iterator for EventIterator<'_> {
452 type Item = Result<ServerMessage>;
453
454 fn next(&mut self) -> Option<Self::Item> {
455 match self.client.next_message() {
456 Ok(Some(msg)) => Some(Ok(msg)),
457 Ok(None) => None,
458 Err(e) => Some(Err(e)),
459 }
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_buffer_size() {
469 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
470 }
471}