Skip to main content

codex_codes/
client_sync.rs

1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`] (spawns and initializes the app-server)
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//!     thread_id: thread.thread_id().to_string(),
29//!     input: vec![UserInput::Text { text: "Hello!".into() }],
30//!     model: None,
31//!     reasoning_effort: None,
32//!     sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//!     match result? {
37//!         ServerMessage::Notification { method, .. } => {
38//!             if method == "turn/completed" { break; }
39//!         }
40//!         _ => {}
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{
48    JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::protocol::{
51    ClientInfo, InitializeParams, InitializeResponse, ServerMessage, ThreadArchiveParams,
52    ThreadArchiveResponse, ThreadStartParams, ThreadStartResponse, TurnInterruptParams,
53    TurnInterruptResponse, TurnStartParams, TurnStartResponse,
54};
55use log::{debug, warn};
56use serde::de::DeserializeOwned;
57use serde::Serialize;
58use std::collections::VecDeque;
59use std::io::{BufRead, BufReader, BufWriter, Write};
60use std::process::Child;
61
62/// Buffer size for reading stdout (10MB).
63const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
64
65/// Synchronous multi-turn client for the Codex app-server.
66///
67/// Communicates with a long-lived `codex app-server` process via
68/// newline-delimited JSON-RPC over stdio. Manages request/response
69/// correlation and buffers incoming notifications that arrive while
70/// waiting for RPC responses.
71///
72/// The client automatically kills the app-server process when dropped.
73pub struct SyncClient {
74    child: Child,
75    writer: BufWriter<std::process::ChildStdin>,
76    reader: BufReader<std::process::ChildStdout>,
77    next_id: i64,
78    buffered: VecDeque<ServerMessage>,
79}
80
81impl SyncClient {
82    /// Start an app-server with default settings.
83    ///
84    /// Spawns `codex app-server --listen stdio://`, performs the required
85    /// `initialize` handshake, and returns a connected client ready for
86    /// `thread_start()`.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the `codex` CLI is not installed, the version is
91    /// incompatible, the process fails to start, or the initialization
92    /// handshake fails.
93    pub fn start() -> Result<Self> {
94        Self::start_with(AppServerBuilder::new())
95    }
96
97    /// Start an app-server with a custom [`AppServerBuilder`].
98    ///
99    /// Performs the required `initialize` handshake before returning.
100    /// Use this to configure a custom binary path or working directory.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the process fails to start, stdio pipes
105    /// cannot be established, or the initialization handshake fails.
106    pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
107        let mut client = Self::spawn(builder)?;
108        client.initialize(&InitializeParams {
109            client_info: ClientInfo {
110                name: "codex-codes".to_string(),
111                version: env!("CARGO_PKG_VERSION").to_string(),
112                title: None,
113            },
114            capabilities: None,
115        })?;
116        Ok(client)
117    }
118
119    /// Spawn an app-server without performing the `initialize` handshake.
120    ///
121    /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
122    /// specific capabilities). You **must** call [`SyncClient::initialize`]
123    /// before any other requests.
124    pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
125        crate::version::check_codex_version()?;
126
127        let mut child = builder.spawn_sync().map_err(Error::Io)?;
128
129        let stdin = child
130            .stdin
131            .take()
132            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
133        let stdout = child
134            .stdout
135            .take()
136            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
137
138        Ok(Self {
139            child,
140            writer: BufWriter::new(stdin),
141            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
142            next_id: 1,
143            buffered: VecDeque::new(),
144        })
145    }
146
147    /// Send a JSON-RPC request and wait for the matching response.
148    ///
149    /// Any notifications or server requests that arrive before the response
150    /// are buffered and can be retrieved via [`SyncClient::next_message`].
151    ///
152    /// # Errors
153    ///
154    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
155    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
156    /// - [`Error::Json`] if response deserialization fails
157    pub fn request<P: Serialize, R: DeserializeOwned>(
158        &mut self,
159        method: &str,
160        params: &P,
161    ) -> Result<R> {
162        let id = RequestId::Integer(self.next_id);
163        self.next_id += 1;
164
165        let req = JsonRpcRequest {
166            id: id.clone(),
167            method: method.to_string(),
168            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
169        };
170
171        self.send_raw(&req)?;
172
173        loop {
174            let msg = self.read_message()?;
175            match msg {
176                JsonRpcMessage::Response(resp) if resp.id == id => {
177                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
178                    return Ok(result);
179                }
180                JsonRpcMessage::Error(err) if err.id == id => {
181                    return Err(Error::JsonRpc {
182                        code: err.error.code,
183                        message: err.error.message,
184                    });
185                }
186                JsonRpcMessage::Notification(notif) => {
187                    self.buffered.push_back(ServerMessage::Notification {
188                        method: notif.method,
189                        params: notif.params,
190                    });
191                }
192                JsonRpcMessage::Request(req) => {
193                    self.buffered.push_back(ServerMessage::Request {
194                        id: req.id,
195                        method: req.method,
196                        params: req.params,
197                    });
198                }
199                JsonRpcMessage::Response(resp) => {
200                    warn!(
201                        "[CLIENT] Unexpected response for id={}, expected id={}",
202                        resp.id, id
203                    );
204                }
205                JsonRpcMessage::Error(err) => {
206                    warn!(
207                        "[CLIENT] Unexpected error for id={}, expected id={}",
208                        err.id, id
209                    );
210                }
211            }
212        }
213    }
214
215    /// Start a new thread (conversation session).
216    ///
217    /// A thread must be created before any turns can be started. The returned
218    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
219    pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
220        self.request(crate::protocol::methods::THREAD_START, params)
221    }
222
223    /// Start a new turn within a thread.
224    ///
225    /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
226    /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
227    pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
228        self.request(crate::protocol::methods::TURN_START, params)
229    }
230
231    /// Interrupt an active turn.
232    pub fn turn_interrupt(
233        &mut self,
234        params: &TurnInterruptParams,
235    ) -> Result<TurnInterruptResponse> {
236        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
237    }
238
239    /// Archive a thread.
240    pub fn thread_archive(
241        &mut self,
242        params: &ThreadArchiveParams,
243    ) -> Result<ThreadArchiveResponse> {
244        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
245    }
246
247    /// Perform the `initialize` handshake with the app-server.
248    ///
249    /// Sends `initialize` with the given params and then sends the
250    /// `initialized` notification. This must be the first request after
251    /// spawning the process.
252    pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
253        let resp: InitializeResponse =
254            self.request(crate::protocol::methods::INITIALIZE, params)?;
255        self.send_notification(crate::protocol::methods::INITIALIZED)?;
256        Ok(resp)
257    }
258
259    /// Respond to a server-to-client request (e.g., approval flow).
260    ///
261    /// When the server sends a [`ServerMessage::Request`], it expects a response.
262    /// Use this method with the request's `id` and a result payload. For command
263    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
264    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
265    pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
266        let resp = JsonRpcResponse {
267            id,
268            result: serde_json::to_value(result).map_err(Error::Json)?,
269        };
270        self.send_raw(&resp)
271    }
272
273    /// Respond to a server-to-client request with an error.
274    pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
275        let err = JsonRpcError {
276            id,
277            error: crate::jsonrpc::JsonRpcErrorData {
278                code,
279                message: message.to_string(),
280                data: None,
281            },
282        };
283        self.send_raw(&err)
284    }
285
286    /// Read the next incoming server message (notification or server request).
287    ///
288    /// Returns buffered messages first (from notifications that arrived during
289    /// a [`SyncClient::request`] call), then reads from the wire.
290    ///
291    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
292    pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
293        if let Some(msg) = self.buffered.pop_front() {
294            return Ok(Some(msg));
295        }
296
297        loop {
298            let msg = match self.read_message_opt()? {
299                Some(m) => m,
300                None => return Ok(None),
301            };
302
303            match msg {
304                JsonRpcMessage::Notification(notif) => {
305                    return Ok(Some(ServerMessage::Notification {
306                        method: notif.method,
307                        params: notif.params,
308                    }));
309                }
310                JsonRpcMessage::Request(req) => {
311                    return Ok(Some(ServerMessage::Request {
312                        id: req.id,
313                        method: req.method,
314                        params: req.params,
315                    }));
316                }
317                JsonRpcMessage::Response(resp) => {
318                    warn!(
319                        "[CLIENT] Unexpected response (no pending request): id={}",
320                        resp.id
321                    );
322                }
323                JsonRpcMessage::Error(err) => {
324                    warn!(
325                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
326                        err.id, err.error.code
327                    );
328                }
329            }
330        }
331    }
332
333    /// Return an iterator over [`ServerMessage`]s.
334    ///
335    /// The iterator yields `Result<ServerMessage>` and terminates when the
336    /// connection closes (EOF). This is the idiomatic way to consume a turn's
337    /// notifications in synchronous code.
338    pub fn events(&mut self) -> EventIterator<'_> {
339        EventIterator { client: self }
340    }
341
342    /// Shut down the child process.
343    ///
344    /// Kills the process if it's still running. Called automatically on [`Drop`].
345    pub fn shutdown(&mut self) -> Result<()> {
346        debug!("[CLIENT] Shutting down");
347        match self.child.try_wait() {
348            Ok(Some(_)) => Ok(()),
349            Ok(None) => {
350                self.child.kill().map_err(Error::Io)?;
351                self.child.wait().map_err(Error::Io)?;
352                Ok(())
353            }
354            Err(e) => Err(Error::Io(e)),
355        }
356    }
357
358    // -- internal --
359
360    fn send_notification(&mut self, method: &str) -> Result<()> {
361        let notif = JsonRpcNotification {
362            method: method.to_string(),
363            params: None,
364        };
365        self.send_raw(&notif)
366    }
367
368    fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
369        let json = serde_json::to_string(msg).map_err(Error::Json)?;
370        debug!("[CLIENT] Sending: {}", json);
371        self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
372        self.writer.write_all(b"\n").map_err(Error::Io)?;
373        self.writer.flush().map_err(Error::Io)?;
374        Ok(())
375    }
376
377    fn read_message(&mut self) -> Result<JsonRpcMessage> {
378        self.read_message_opt()?.ok_or(Error::ServerClosed)
379    }
380
381    fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
382        loop {
383            let mut line = String::new();
384            match self.reader.read_line(&mut line) {
385                Ok(0) => {
386                    debug!("[CLIENT] Stream closed (EOF)");
387                    return Ok(None);
388                }
389                Ok(_) => {
390                    let trimmed = line.trim();
391                    if trimmed.is_empty() {
392                        continue;
393                    }
394
395                    debug!("[CLIENT] Received: {}", trimmed);
396
397                    match serde_json::from_str::<JsonRpcMessage>(trimmed) {
398                        Ok(msg) => return Ok(Some(msg)),
399                        Err(e) => {
400                            warn!(
401                                "[CLIENT] Failed to deserialize message. \
402                                 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
403                            );
404                            warn!("[CLIENT] Parse error: {}", e);
405                            warn!("[CLIENT] Raw: {}", trimmed);
406                            return Err(Error::Deserialization(format!(
407                                "{} (raw: {})",
408                                e, trimmed
409                            )));
410                        }
411                    }
412                }
413                Err(e) => {
414                    debug!("[CLIENT] Error reading stdout: {}", e);
415                    return Err(Error::Io(e));
416                }
417            }
418        }
419    }
420}
421
422impl Drop for SyncClient {
423    fn drop(&mut self) {
424        if let Err(e) = self.shutdown() {
425            debug!("[CLIENT] Error during shutdown: {}", e);
426        }
427    }
428}
429
430/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
431pub struct EventIterator<'a> {
432    client: &'a mut SyncClient,
433}
434
435impl Iterator for EventIterator<'_> {
436    type Item = Result<ServerMessage>;
437
438    fn next(&mut self) -> Option<Self::Item> {
439        match self.client.next_message() {
440            Ok(Some(msg)) => Some(Ok(msg)),
441            Ok(None) => None,
442            Err(e) => Some(Err(e)),
443        }
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[test]
452    fn test_buffer_size() {
453        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
454    }
455}