codex_codes/client_sync.rs
1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`]
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//! thread_id: thread.thread_id.clone(),
29//! input: vec![UserInput::Text { text: "Hello!".into() }],
30//! model: None,
31//! reasoning_effort: None,
32//! sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//! match result? {
37//! ServerMessage::Notification { method, .. } => {
38//! if method == "turn/completed" { break; }
39//! }
40//! _ => {}
41//! }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, RequestId};
48use crate::protocol::{
49 ServerMessage, ThreadArchiveParams, ThreadArchiveResponse, ThreadStartParams,
50 ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse, TurnStartParams,
51 TurnStartResponse,
52};
53use log::{debug, warn};
54use serde::de::DeserializeOwned;
55use serde::Serialize;
56use std::collections::VecDeque;
57use std::io::{BufRead, BufReader, BufWriter, Write};
58use std::process::Child;
59
60/// Buffer size for reading stdout (10MB).
61const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
62
63/// Synchronous multi-turn client for the Codex app-server.
64///
65/// Communicates with a long-lived `codex app-server` process via
66/// newline-delimited JSON-RPC over stdio. Manages request/response
67/// correlation and buffers incoming notifications that arrive while
68/// waiting for RPC responses.
69///
70/// The client automatically kills the app-server process when dropped.
71pub struct SyncClient {
72 child: Child,
73 writer: BufWriter<std::process::ChildStdin>,
74 reader: BufReader<std::process::ChildStdout>,
75 next_id: i64,
76 buffered: VecDeque<ServerMessage>,
77}
78
79impl SyncClient {
80 /// Start an app-server with default settings.
81 ///
82 /// Spawns `codex app-server --listen stdio://` and returns a connected client.
83 ///
84 /// # Errors
85 ///
86 /// Returns an error if the `codex` CLI is not installed, the version is
87 /// incompatible, or the process fails to start.
88 pub fn start() -> Result<Self> {
89 Self::start_with(AppServerBuilder::new())
90 }
91
92 /// Start an app-server with a custom [`AppServerBuilder`].
93 ///
94 /// Use this to configure a custom binary path or working directory.
95 ///
96 /// # Errors
97 ///
98 /// Returns an error if the process fails to start or stdio pipes
99 /// cannot be established.
100 pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
101 crate::version::check_codex_version()?;
102
103 let mut child = builder.spawn_sync().map_err(Error::Io)?;
104
105 let stdin = child
106 .stdin
107 .take()
108 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
109 let stdout = child
110 .stdout
111 .take()
112 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
113
114 Ok(Self {
115 child,
116 writer: BufWriter::new(stdin),
117 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
118 next_id: 1,
119 buffered: VecDeque::new(),
120 })
121 }
122
123 /// Send a JSON-RPC request and wait for the matching response.
124 ///
125 /// Any notifications or server requests that arrive before the response
126 /// are buffered and can be retrieved via [`SyncClient::next_message`].
127 ///
128 /// # Errors
129 ///
130 /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
131 /// - [`Error::ServerClosed`] if the connection drops before a response arrives
132 /// - [`Error::Json`] if response deserialization fails
133 pub fn request<P: Serialize, R: DeserializeOwned>(
134 &mut self,
135 method: &str,
136 params: &P,
137 ) -> Result<R> {
138 let id = RequestId::Integer(self.next_id);
139 self.next_id += 1;
140
141 let req = JsonRpcRequest {
142 id: id.clone(),
143 method: method.to_string(),
144 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
145 };
146
147 self.send_raw(&req)?;
148
149 loop {
150 let msg = self.read_message()?;
151 match msg {
152 JsonRpcMessage::Response(resp) if resp.id == id => {
153 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
154 return Ok(result);
155 }
156 JsonRpcMessage::Error(err) if err.id == id => {
157 return Err(Error::JsonRpc {
158 code: err.error.code,
159 message: err.error.message,
160 });
161 }
162 JsonRpcMessage::Notification(notif) => {
163 self.buffered.push_back(ServerMessage::Notification {
164 method: notif.method,
165 params: notif.params,
166 });
167 }
168 JsonRpcMessage::Request(req) => {
169 self.buffered.push_back(ServerMessage::Request {
170 id: req.id,
171 method: req.method,
172 params: req.params,
173 });
174 }
175 JsonRpcMessage::Response(resp) => {
176 warn!(
177 "[CLIENT] Unexpected response for id={}, expected id={}",
178 resp.id, id
179 );
180 }
181 JsonRpcMessage::Error(err) => {
182 warn!(
183 "[CLIENT] Unexpected error for id={}, expected id={}",
184 err.id, id
185 );
186 }
187 }
188 }
189 }
190
191 /// Start a new thread (conversation session).
192 ///
193 /// A thread must be created before any turns can be started. The returned
194 /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
195 pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
196 self.request(crate::protocol::methods::THREAD_START, params)
197 }
198
199 /// Start a new turn within a thread.
200 ///
201 /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
202 /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
203 pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
204 self.request(crate::protocol::methods::TURN_START, params)
205 }
206
207 /// Interrupt an active turn.
208 pub fn turn_interrupt(
209 &mut self,
210 params: &TurnInterruptParams,
211 ) -> Result<TurnInterruptResponse> {
212 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
213 }
214
215 /// Archive a thread.
216 pub fn thread_archive(
217 &mut self,
218 params: &ThreadArchiveParams,
219 ) -> Result<ThreadArchiveResponse> {
220 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
221 }
222
223 /// Respond to a server-to-client request (e.g., approval flow).
224 ///
225 /// When the server sends a [`ServerMessage::Request`], it expects a response.
226 /// Use this method with the request's `id` and a result payload. For command
227 /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
228 /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
229 pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
230 let resp = JsonRpcResponse {
231 id,
232 result: serde_json::to_value(result).map_err(Error::Json)?,
233 };
234 self.send_raw(&resp)
235 }
236
237 /// Respond to a server-to-client request with an error.
238 pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
239 let err = JsonRpcError {
240 id,
241 error: crate::jsonrpc::JsonRpcErrorData {
242 code,
243 message: message.to_string(),
244 data: None,
245 },
246 };
247 self.send_raw(&err)
248 }
249
250 /// Read the next incoming server message (notification or server request).
251 ///
252 /// Returns buffered messages first (from notifications that arrived during
253 /// a [`SyncClient::request`] call), then reads from the wire.
254 ///
255 /// Returns `Ok(None)` when the app-server closes the connection (EOF).
256 pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
257 if let Some(msg) = self.buffered.pop_front() {
258 return Ok(Some(msg));
259 }
260
261 loop {
262 let msg = match self.read_message_opt()? {
263 Some(m) => m,
264 None => return Ok(None),
265 };
266
267 match msg {
268 JsonRpcMessage::Notification(notif) => {
269 return Ok(Some(ServerMessage::Notification {
270 method: notif.method,
271 params: notif.params,
272 }));
273 }
274 JsonRpcMessage::Request(req) => {
275 return Ok(Some(ServerMessage::Request {
276 id: req.id,
277 method: req.method,
278 params: req.params,
279 }));
280 }
281 JsonRpcMessage::Response(resp) => {
282 warn!(
283 "[CLIENT] Unexpected response (no pending request): id={}",
284 resp.id
285 );
286 }
287 JsonRpcMessage::Error(err) => {
288 warn!(
289 "[CLIENT] Unexpected error (no pending request): id={} code={}",
290 err.id, err.error.code
291 );
292 }
293 }
294 }
295 }
296
297 /// Return an iterator over [`ServerMessage`]s.
298 ///
299 /// The iterator yields `Result<ServerMessage>` and terminates when the
300 /// connection closes (EOF). This is the idiomatic way to consume a turn's
301 /// notifications in synchronous code.
302 pub fn events(&mut self) -> EventIterator<'_> {
303 EventIterator { client: self }
304 }
305
306 /// Shut down the child process.
307 ///
308 /// Kills the process if it's still running. Called automatically on [`Drop`].
309 pub fn shutdown(&mut self) -> Result<()> {
310 debug!("[CLIENT] Shutting down");
311 match self.child.try_wait() {
312 Ok(Some(_)) => Ok(()),
313 Ok(None) => {
314 self.child.kill().map_err(Error::Io)?;
315 self.child.wait().map_err(Error::Io)?;
316 Ok(())
317 }
318 Err(e) => Err(Error::Io(e)),
319 }
320 }
321
322 // -- internal --
323
324 fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
325 let json = serde_json::to_string(msg).map_err(Error::Json)?;
326 debug!("[CLIENT] Sending: {}", json);
327 self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
328 self.writer.write_all(b"\n").map_err(Error::Io)?;
329 self.writer.flush().map_err(Error::Io)?;
330 Ok(())
331 }
332
333 fn read_message(&mut self) -> Result<JsonRpcMessage> {
334 self.read_message_opt()?.ok_or(Error::ServerClosed)
335 }
336
337 fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
338 loop {
339 let mut line = String::new();
340 match self.reader.read_line(&mut line) {
341 Ok(0) => {
342 debug!("[CLIENT] Stream closed (EOF)");
343 return Ok(None);
344 }
345 Ok(_) => {
346 let trimmed = line.trim();
347 if trimmed.is_empty() {
348 continue;
349 }
350
351 debug!("[CLIENT] Received: {}", trimmed);
352
353 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
354 Ok(msg) => return Ok(Some(msg)),
355 Err(e) => {
356 warn!(
357 "[CLIENT] Failed to deserialize message. \
358 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
359 );
360 warn!("[CLIENT] Parse error: {}", e);
361 warn!("[CLIENT] Raw: {}", trimmed);
362 return Err(Error::Deserialization(format!(
363 "{} (raw: {})",
364 e, trimmed
365 )));
366 }
367 }
368 }
369 Err(e) => {
370 debug!("[CLIENT] Error reading stdout: {}", e);
371 return Err(Error::Io(e));
372 }
373 }
374 }
375 }
376}
377
378impl Drop for SyncClient {
379 fn drop(&mut self) {
380 if let Err(e) = self.shutdown() {
381 debug!("[CLIENT] Error during shutdown: {}", e);
382 }
383 }
384}
385
386/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
387pub struct EventIterator<'a> {
388 client: &'a mut SyncClient,
389}
390
391impl Iterator for EventIterator<'_> {
392 type Item = Result<ServerMessage>;
393
394 fn next(&mut self) -> Option<Self::Item> {
395 match self.client.next_message() {
396 Ok(Some(msg)) => Some(Ok(msg)),
397 Ok(None) => None,
398 Err(e) => Some(Err(e)),
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn test_buffer_size() {
409 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
410 }
411}