Skip to main content

codex_codes/
client_sync.rs

1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`]
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//!     thread_id: thread.thread_id.clone(),
29//!     input: vec![UserInput::Text { text: "Hello!".into() }],
30//!     model: None,
31//!     reasoning_effort: None,
32//!     sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//!     match result? {
37//!         ServerMessage::Notification { method, .. } => {
38//!             if method == "turn/completed" { break; }
39//!         }
40//!         _ => {}
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, Result};
47use crate::jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, RequestId};
48use crate::protocol::{
49    ServerMessage, ThreadArchiveParams, ThreadArchiveResponse, ThreadStartParams,
50    ThreadStartResponse, TurnInterruptParams, TurnInterruptResponse, TurnStartParams,
51    TurnStartResponse,
52};
53use log::{debug, warn};
54use serde::de::DeserializeOwned;
55use serde::Serialize;
56use std::collections::VecDeque;
57use std::io::{BufRead, BufReader, BufWriter, Write};
58use std::process::Child;
59
60/// Buffer size for reading stdout (10MB).
61const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
62
63/// Synchronous multi-turn client for the Codex app-server.
64///
65/// Communicates with a long-lived `codex app-server` process via
66/// newline-delimited JSON-RPC over stdio. Manages request/response
67/// correlation and buffers incoming notifications that arrive while
68/// waiting for RPC responses.
69///
70/// The client automatically kills the app-server process when dropped.
71pub struct SyncClient {
72    child: Child,
73    writer: BufWriter<std::process::ChildStdin>,
74    reader: BufReader<std::process::ChildStdout>,
75    next_id: i64,
76    buffered: VecDeque<ServerMessage>,
77}
78
79impl SyncClient {
80    /// Start an app-server with default settings.
81    ///
82    /// Spawns `codex app-server --listen stdio://` and returns a connected client.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if the `codex` CLI is not installed, the version is
87    /// incompatible, or the process fails to start.
88    pub fn start() -> Result<Self> {
89        Self::start_with(AppServerBuilder::new())
90    }
91
92    /// Start an app-server with a custom [`AppServerBuilder`].
93    ///
94    /// Use this to configure a custom binary path or working directory.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the process fails to start or stdio pipes
99    /// cannot be established.
100    pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
101        crate::version::check_codex_version()?;
102
103        let mut child = builder.spawn_sync().map_err(Error::Io)?;
104
105        let stdin = child
106            .stdin
107            .take()
108            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
109        let stdout = child
110            .stdout
111            .take()
112            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
113
114        Ok(Self {
115            child,
116            writer: BufWriter::new(stdin),
117            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
118            next_id: 1,
119            buffered: VecDeque::new(),
120        })
121    }
122
123    /// Send a JSON-RPC request and wait for the matching response.
124    ///
125    /// Any notifications or server requests that arrive before the response
126    /// are buffered and can be retrieved via [`SyncClient::next_message`].
127    ///
128    /// # Errors
129    ///
130    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
131    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
132    /// - [`Error::Json`] if response deserialization fails
133    pub fn request<P: Serialize, R: DeserializeOwned>(
134        &mut self,
135        method: &str,
136        params: &P,
137    ) -> Result<R> {
138        let id = RequestId::Integer(self.next_id);
139        self.next_id += 1;
140
141        let req = JsonRpcRequest {
142            id: id.clone(),
143            method: method.to_string(),
144            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
145        };
146
147        self.send_raw(&req)?;
148
149        loop {
150            let msg = self.read_message()?;
151            match msg {
152                JsonRpcMessage::Response(resp) if resp.id == id => {
153                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
154                    return Ok(result);
155                }
156                JsonRpcMessage::Error(err) if err.id == id => {
157                    return Err(Error::JsonRpc {
158                        code: err.error.code,
159                        message: err.error.message,
160                    });
161                }
162                JsonRpcMessage::Notification(notif) => {
163                    self.buffered.push_back(ServerMessage::Notification {
164                        method: notif.method,
165                        params: notif.params,
166                    });
167                }
168                JsonRpcMessage::Request(req) => {
169                    self.buffered.push_back(ServerMessage::Request {
170                        id: req.id,
171                        method: req.method,
172                        params: req.params,
173                    });
174                }
175                JsonRpcMessage::Response(resp) => {
176                    warn!(
177                        "[CLIENT] Unexpected response for id={}, expected id={}",
178                        resp.id, id
179                    );
180                }
181                JsonRpcMessage::Error(err) => {
182                    warn!(
183                        "[CLIENT] Unexpected error for id={}, expected id={}",
184                        err.id, id
185                    );
186                }
187            }
188        }
189    }
190
191    /// Start a new thread (conversation session).
192    ///
193    /// A thread must be created before any turns can be started. The returned
194    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
195    pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
196        self.request(crate::protocol::methods::THREAD_START, params)
197    }
198
199    /// Start a new turn within a thread.
200    ///
201    /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
202    /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
203    pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
204        self.request(crate::protocol::methods::TURN_START, params)
205    }
206
207    /// Interrupt an active turn.
208    pub fn turn_interrupt(
209        &mut self,
210        params: &TurnInterruptParams,
211    ) -> Result<TurnInterruptResponse> {
212        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
213    }
214
215    /// Archive a thread.
216    pub fn thread_archive(
217        &mut self,
218        params: &ThreadArchiveParams,
219    ) -> Result<ThreadArchiveResponse> {
220        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
221    }
222
223    /// Respond to a server-to-client request (e.g., approval flow).
224    ///
225    /// When the server sends a [`ServerMessage::Request`], it expects a response.
226    /// Use this method with the request's `id` and a result payload. For command
227    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
228    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
229    pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
230        let resp = JsonRpcResponse {
231            id,
232            result: serde_json::to_value(result).map_err(Error::Json)?,
233        };
234        self.send_raw(&resp)
235    }
236
237    /// Respond to a server-to-client request with an error.
238    pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
239        let err = JsonRpcError {
240            id,
241            error: crate::jsonrpc::JsonRpcErrorData {
242                code,
243                message: message.to_string(),
244                data: None,
245            },
246        };
247        self.send_raw(&err)
248    }
249
250    /// Read the next incoming server message (notification or server request).
251    ///
252    /// Returns buffered messages first (from notifications that arrived during
253    /// a [`SyncClient::request`] call), then reads from the wire.
254    ///
255    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
256    pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
257        if let Some(msg) = self.buffered.pop_front() {
258            return Ok(Some(msg));
259        }
260
261        loop {
262            let msg = match self.read_message_opt()? {
263                Some(m) => m,
264                None => return Ok(None),
265            };
266
267            match msg {
268                JsonRpcMessage::Notification(notif) => {
269                    return Ok(Some(ServerMessage::Notification {
270                        method: notif.method,
271                        params: notif.params,
272                    }));
273                }
274                JsonRpcMessage::Request(req) => {
275                    return Ok(Some(ServerMessage::Request {
276                        id: req.id,
277                        method: req.method,
278                        params: req.params,
279                    }));
280                }
281                JsonRpcMessage::Response(resp) => {
282                    warn!(
283                        "[CLIENT] Unexpected response (no pending request): id={}",
284                        resp.id
285                    );
286                }
287                JsonRpcMessage::Error(err) => {
288                    warn!(
289                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
290                        err.id, err.error.code
291                    );
292                }
293            }
294        }
295    }
296
297    /// Return an iterator over [`ServerMessage`]s.
298    ///
299    /// The iterator yields `Result<ServerMessage>` and terminates when the
300    /// connection closes (EOF). This is the idiomatic way to consume a turn's
301    /// notifications in synchronous code.
302    pub fn events(&mut self) -> EventIterator<'_> {
303        EventIterator { client: self }
304    }
305
306    /// Shut down the child process.
307    ///
308    /// Kills the process if it's still running. Called automatically on [`Drop`].
309    pub fn shutdown(&mut self) -> Result<()> {
310        debug!("[CLIENT] Shutting down");
311        match self.child.try_wait() {
312            Ok(Some(_)) => Ok(()),
313            Ok(None) => {
314                self.child.kill().map_err(Error::Io)?;
315                self.child.wait().map_err(Error::Io)?;
316                Ok(())
317            }
318            Err(e) => Err(Error::Io(e)),
319        }
320    }
321
322    // -- internal --
323
324    fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
325        let json = serde_json::to_string(msg).map_err(Error::Json)?;
326        debug!("[CLIENT] Sending: {}", json);
327        self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
328        self.writer.write_all(b"\n").map_err(Error::Io)?;
329        self.writer.flush().map_err(Error::Io)?;
330        Ok(())
331    }
332
333    fn read_message(&mut self) -> Result<JsonRpcMessage> {
334        self.read_message_opt()?.ok_or(Error::ServerClosed)
335    }
336
337    fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
338        loop {
339            let mut line = String::new();
340            match self.reader.read_line(&mut line) {
341                Ok(0) => {
342                    debug!("[CLIENT] Stream closed (EOF)");
343                    return Ok(None);
344                }
345                Ok(_) => {
346                    let trimmed = line.trim();
347                    if trimmed.is_empty() {
348                        continue;
349                    }
350
351                    debug!("[CLIENT] Received: {}", trimmed);
352
353                    match serde_json::from_str::<JsonRpcMessage>(trimmed) {
354                        Ok(msg) => return Ok(Some(msg)),
355                        Err(e) => {
356                            warn!(
357                                "[CLIENT] Failed to deserialize message. \
358                                 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
359                            );
360                            warn!("[CLIENT] Parse error: {}", e);
361                            warn!("[CLIENT] Raw: {}", trimmed);
362                            return Err(Error::Deserialization(format!(
363                                "{} (raw: {})",
364                                e, trimmed
365                            )));
366                        }
367                    }
368                }
369                Err(e) => {
370                    debug!("[CLIENT] Error reading stdout: {}", e);
371                    return Err(Error::Io(e));
372                }
373            }
374        }
375    }
376}
377
378impl Drop for SyncClient {
379    fn drop(&mut self) {
380        if let Err(e) = self.shutdown() {
381            debug!("[CLIENT] Error during shutdown: {}", e);
382        }
383    }
384}
385
386/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
387pub struct EventIterator<'a> {
388    client: &'a mut SyncClient,
389}
390
391impl Iterator for EventIterator<'_> {
392    type Item = Result<ServerMessage>;
393
394    fn next(&mut self) -> Option<Self::Item> {
395        match self.client.next_message() {
396            Ok(Some(msg)) => Some(Ok(msg)),
397            Ok(None) => None,
398            Err(e) => Some(Err(e)),
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_buffer_size() {
409        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
410    }
411}