Skip to main content

codex_codes/
client_sync.rs

1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`] (spawns and initializes the app-server)
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//!     thread_id: thread.thread_id().to_string(),
29//!     input: vec![UserInput::Text { text: "Hello!".into() }],
30//!     model: None,
31//!     reasoning_effort: None,
32//!     sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//!     match result? {
37//!         ServerMessage::Notification(n) => {
38//!             if let codex_codes::Notification::TurnCompleted(_) = n { break; }
39//!         }
40//!         _ => {}
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48    JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52    ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53    ThreadStartParams, ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse,
54    TurnStartParams, TurnStartResponse,
55};
56use log::{debug, warn};
57use serde::de::DeserializeOwned;
58use serde::Serialize;
59use std::collections::VecDeque;
60use std::io::{BufRead, BufReader, BufWriter, Write};
61use std::process::Child;
62
63/// Buffer size for reading stdout (10MB).
64const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
65
66/// Synchronous multi-turn client for the Codex app-server.
67///
68/// Communicates with a long-lived `codex app-server` process via
69/// newline-delimited JSON-RPC over stdio. Manages request/response
70/// correlation and buffers incoming notifications that arrive while
71/// waiting for RPC responses.
72///
73/// The client automatically kills the app-server process when dropped.
74pub struct SyncClient {
75    child: Child,
76    writer: BufWriter<std::process::ChildStdin>,
77    reader: BufReader<std::process::ChildStdout>,
78    /// Handle to the background thread draining the child's stderr pipe.
79    /// Kept alive for the lifetime of the client; the thread exits on EOF
80    /// when the child is killed.
81    _stderr_drain: std::thread::JoinHandle<()>,
82    next_id: i64,
83    buffered: VecDeque<ServerMessage>,
84}
85
86impl SyncClient {
87    /// Start an app-server with default settings.
88    ///
89    /// Spawns `codex app-server --listen stdio://`, performs the required
90    /// `initialize` handshake, and returns a connected client ready for
91    /// `thread_start()`.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the `codex` CLI is not installed, the version is
96    /// incompatible, the process fails to start, or the initialization
97    /// handshake fails.
98    pub fn start() -> Result<Self> {
99        Self::start_with(AppServerBuilder::new())
100    }
101
102    /// Start an app-server with a custom [`AppServerBuilder`].
103    ///
104    /// Performs the required `initialize` handshake before returning.
105    /// Use this to configure a custom binary path or working directory.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the process fails to start, stdio pipes
110    /// cannot be established, or the initialization handshake fails.
111    pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
112        let mut client = Self::spawn(builder)?;
113        client.initialize(&InitializeParams {
114            client_info: ClientInfo {
115                name: "codex-codes".to_string(),
116                version: env!("CARGO_PKG_VERSION").to_string(),
117                title: None,
118            },
119            capabilities: None,
120        })?;
121        Ok(client)
122    }
123
124    /// Spawn an app-server without performing the `initialize` handshake.
125    ///
126    /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
127    /// specific capabilities). You **must** call [`SyncClient::initialize`]
128    /// before any other requests.
129    pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
130        crate::version::check_codex_version()?;
131
132        let mut child = builder.spawn_sync()?;
133
134        let stdin = child
135            .stdin
136            .take()
137            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
138        let stdout = child
139            .stdout
140            .take()
141            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
142        let stderr = child
143            .stderr
144            .take()
145            .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
146
147        // The app-server emits ~200 KB/s of tracing to stderr. Without an
148        // active reader, the ~64 KB kernel pipe fills almost instantly and
149        // the child blocks. Drain in the background and route lines through
150        // the `log` crate (see [`crate::stderr_drain`]).
151        let stderr_drain = crate::stderr_drain::spawn_sync(stderr);
152
153        Ok(Self {
154            child,
155            writer: BufWriter::new(stdin),
156            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
157            _stderr_drain: stderr_drain,
158            next_id: 1,
159            buffered: VecDeque::new(),
160        })
161    }
162
163    /// Send a JSON-RPC request and wait for the matching response.
164    ///
165    /// Any notifications or server requests that arrive before the response
166    /// are buffered and can be retrieved via [`SyncClient::next_message`].
167    ///
168    /// # Errors
169    ///
170    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
171    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
172    /// - [`Error::Json`] if response deserialization fails
173    pub fn request<P: Serialize, R: DeserializeOwned>(
174        &mut self,
175        method: &str,
176        params: &P,
177    ) -> Result<R> {
178        let id = RequestId::Integer(self.next_id);
179        self.next_id += 1;
180
181        let req = JsonRpcRequest {
182            id: id.clone(),
183            method: method.to_string(),
184            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
185        };
186
187        self.send_raw(&req)?;
188
189        loop {
190            let msg = self.read_message()?;
191            match msg {
192                JsonRpcMessage::Response(resp) if resp.id == id => {
193                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
194                    return Ok(result);
195                }
196                JsonRpcMessage::Error(err) if err.id == id => {
197                    return Err(Error::JsonRpc {
198                        code: err.error.code,
199                        message: err.error.message,
200                    });
201                }
202                JsonRpcMessage::Notification(notif) => {
203                    let typed = Notification::from_envelope(&notif.method, notif.params)
204                        .map_err(Error::Json)?;
205                    self.buffered.push_back(ServerMessage::Notification(typed));
206                }
207                JsonRpcMessage::Request(req) => {
208                    let typed = ServerRequest::from_envelope(&req.method, req.params)
209                        .map_err(Error::Json)?;
210                    self.buffered.push_back(ServerMessage::Request {
211                        id: req.id,
212                        request: typed,
213                    });
214                }
215                JsonRpcMessage::Response(resp) => {
216                    warn!(
217                        "[CLIENT] Unexpected response for id={}, expected id={}",
218                        resp.id, id
219                    );
220                }
221                JsonRpcMessage::Error(err) => {
222                    warn!(
223                        "[CLIENT] Unexpected error for id={}, expected id={}",
224                        err.id, id
225                    );
226                }
227            }
228        }
229    }
230
231    /// Start a new thread (conversation session).
232    ///
233    /// A thread must be created before any turns can be started. The returned
234    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
235    pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
236        self.request(crate::protocol::methods::THREAD_START, params)
237    }
238
239    /// Start a new turn within a thread.
240    ///
241    /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
242    /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
243    pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
244        self.request(crate::protocol::methods::TURN_START, params)
245    }
246
247    /// Interrupt an active turn.
248    pub fn turn_interrupt(
249        &mut self,
250        params: &TurnInterruptParams,
251    ) -> Result<TurnInterruptResponse> {
252        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
253    }
254
255    /// Archive a thread.
256    pub fn thread_archive(
257        &mut self,
258        params: &ThreadArchiveParams,
259    ) -> Result<ThreadArchiveResponse> {
260        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
261    }
262
263    /// Perform the `initialize` handshake with the app-server.
264    ///
265    /// Sends `initialize` with the given params and then sends the
266    /// `initialized` notification. This must be the first request after
267    /// spawning the process.
268    pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
269        let resp: InitializeResponse =
270            self.request(crate::protocol::methods::INITIALIZE, params)?;
271        self.send_notification(crate::protocol::methods::INITIALIZED)?;
272        Ok(resp)
273    }
274
275    /// Respond to a server-to-client request (e.g., approval flow).
276    ///
277    /// When the server sends a [`ServerMessage::Request`], it expects a response.
278    /// Use this method with the request's `id` and a result payload. For command
279    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
280    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
281    pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
282        let resp = JsonRpcResponse {
283            id,
284            result: serde_json::to_value(result).map_err(Error::Json)?,
285        };
286        self.send_raw(&resp)
287    }
288
289    /// Respond to a server-to-client request with an error.
290    pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
291        let err = JsonRpcError {
292            id,
293            error: crate::jsonrpc::JsonRpcErrorData {
294                code,
295                message: message.to_string(),
296                data: None,
297            },
298        };
299        self.send_raw(&err)
300    }
301
302    /// Read the next incoming server message (notification or server request).
303    ///
304    /// Returns buffered messages first (from notifications that arrived during
305    /// a [`SyncClient::request`] call), then reads from the wire.
306    ///
307    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
308    pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
309        if let Some(msg) = self.buffered.pop_front() {
310            return Ok(Some(msg));
311        }
312
313        loop {
314            let msg = match self.read_message_opt()? {
315                Some(m) => m,
316                None => return Ok(None),
317            };
318
319            match msg {
320                JsonRpcMessage::Notification(notif) => {
321                    let JsonRpcNotification { method, params } = notif;
322                    let typed =
323                        Notification::from_envelope(&method, params.clone()).map_err(|e| {
324                            Error::Deserialization(ParseError::from_envelope(method, params, e))
325                        })?;
326                    return Ok(Some(ServerMessage::Notification(typed)));
327                }
328                JsonRpcMessage::Request(req) => {
329                    let JsonRpcRequest { id, method, params } = req;
330                    let typed =
331                        ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
332                            Error::Deserialization(ParseError::from_envelope(method, params, e))
333                        })?;
334                    return Ok(Some(ServerMessage::Request { id, request: typed }));
335                }
336                JsonRpcMessage::Response(resp) => {
337                    warn!(
338                        "[CLIENT] Unexpected response (no pending request): id={}",
339                        resp.id
340                    );
341                }
342                JsonRpcMessage::Error(err) => {
343                    warn!(
344                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
345                        err.id, err.error.code
346                    );
347                }
348            }
349        }
350    }
351
352    /// Return an iterator over [`ServerMessage`]s.
353    ///
354    /// The iterator yields `Result<ServerMessage>` and terminates when the
355    /// connection closes (EOF). This is the idiomatic way to consume a turn's
356    /// notifications in synchronous code.
357    pub fn events(&mut self) -> EventIterator<'_> {
358        EventIterator { client: self }
359    }
360
361    /// Shut down the child process.
362    ///
363    /// Kills the process if it's still running. Called automatically on [`Drop`].
364    pub fn shutdown(&mut self) -> Result<()> {
365        debug!("[CLIENT] Shutting down");
366        match self.child.try_wait() {
367            Ok(Some(_)) => Ok(()),
368            Ok(None) => {
369                self.child.kill().map_err(Error::Io)?;
370                self.child.wait().map_err(Error::Io)?;
371                Ok(())
372            }
373            Err(e) => Err(Error::Io(e)),
374        }
375    }
376
377    // -- internal --
378
379    fn send_notification(&mut self, method: &str) -> Result<()> {
380        let notif = JsonRpcNotification {
381            method: method.to_string(),
382            params: None,
383        };
384        self.send_raw(&notif)
385    }
386
387    fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
388        let json = serde_json::to_string(msg).map_err(Error::Json)?;
389        debug!("[CLIENT] Sending: {}", json);
390        self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
391        self.writer.write_all(b"\n").map_err(Error::Io)?;
392        self.writer.flush().map_err(Error::Io)?;
393        Ok(())
394    }
395
396    fn read_message(&mut self) -> Result<JsonRpcMessage> {
397        self.read_message_opt()?.ok_or(Error::ServerClosed)
398    }
399
400    fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
401        loop {
402            let mut line = String::new();
403            match self.reader.read_line(&mut line) {
404                Ok(0) => {
405                    debug!("[CLIENT] Stream closed (EOF)");
406                    return Ok(None);
407                }
408                Ok(_) => {
409                    let trimmed = line.trim();
410                    if trimmed.is_empty() {
411                        continue;
412                    }
413
414                    debug!("[CLIENT] Received: {}", trimmed);
415
416                    match serde_json::from_str::<JsonRpcMessage>(trimmed) {
417                        Ok(msg) => return Ok(Some(msg)),
418                        Err(e) => {
419                            warn!(
420                                "[CLIENT] Failed to deserialize message. \
421                                 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
422                            );
423                            warn!("[CLIENT] Parse error: {}", e);
424                            warn!("[CLIENT] Raw: {}", trimmed);
425                            return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
426                        }
427                    }
428                }
429                Err(e) => {
430                    debug!("[CLIENT] Error reading stdout: {}", e);
431                    return Err(Error::Io(e));
432                }
433            }
434        }
435    }
436}
437
438impl Drop for SyncClient {
439    fn drop(&mut self) {
440        if let Err(e) = self.shutdown() {
441            debug!("[CLIENT] Error during shutdown: {}", e);
442        }
443    }
444}
445
446/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
447pub struct EventIterator<'a> {
448    client: &'a mut SyncClient,
449}
450
451impl Iterator for EventIterator<'_> {
452    type Item = Result<ServerMessage>;
453
454    fn next(&mut self) -> Option<Self::Item> {
455        match self.client.next_message() {
456            Ok(Some(msg)) => Some(Ok(msg)),
457            Ok(None) => None,
458            Err(e) => Some(Err(e)),
459        }
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_buffer_size() {
469        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
470    }
471}