Skip to main content

codex_codes/
client_sync.rs

1//! Synchronous multi-turn client for the Codex app-server.
2//!
3//! Spawns `codex app-server --listen stdio://` and communicates over
4//! newline-delimited JSON-RPC. The connection stays open for multiple
5//! turns until explicitly shut down.
6//!
7//! This is the blocking counterpart to [`crate::client_async::AsyncClient`].
8//! Prefer the async client for applications that already use tokio.
9//!
10//! # Lifecycle
11//!
12//! 1. Create a client with [`SyncClient::start`] (spawns and initializes the app-server)
13//! 2. Call [`SyncClient::thread_start`] to create a conversation session
14//! 3. Call [`SyncClient::turn_start`] to send user input
15//! 4. Iterate over [`SyncClient::events`] until `turn/completed`
16//! 5. Handle approval requests via [`SyncClient::respond`]
17//! 6. Repeat steps 3-5 for follow-up turns
18//!
19//! # Example
20//!
21//! ```ignore
22//! use codex_codes::{SyncClient, ThreadStartParams, TurnStartParams, UserInput, ServerMessage};
23//!
24//! let mut client = SyncClient::start()?;
25//! let thread = client.thread_start(&ThreadStartParams::default())?;
26//!
27//! client.turn_start(&TurnStartParams {
28//!     thread_id: thread.thread_id().to_string(),
29//!     input: vec![UserInput::Text { text: "Hello!".into() }],
30//!     model: None,
31//!     reasoning_effort: None,
32//!     sandbox_policy: None,
33//! })?;
34//!
35//! for result in client.events() {
36//!     match result? {
37//!         ServerMessage::Notification(n) => {
38//!             if let codex_codes::Notification::TurnCompleted(_) = n { break; }
39//!         }
40//!         _ => {}
41//!     }
42//! }
43//! ```
44
45use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48    JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52    ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53    ThreadDeleteParams, ThreadDeleteResponse, ThreadForkParams, ThreadForkResponse,
54    ThreadResumeParams, ThreadResumeResponse, ThreadStartParams, ThreadStartResponse,
55    TurnInterruptParams, TurnInterruptResponse, TurnStartParams, TurnStartResponse,
56};
57use log::{debug, warn};
58use serde::de::DeserializeOwned;
59use serde::Serialize;
60use std::collections::VecDeque;
61use std::io::{BufRead, BufReader, BufWriter, Write};
62use std::process::Child;
63
64/// Buffer size for reading stdout (10MB).
65const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
66
67/// Synchronous multi-turn client for the Codex app-server.
68///
69/// Communicates with a long-lived `codex app-server` process via
70/// newline-delimited JSON-RPC over stdio. Manages request/response
71/// correlation and buffers incoming notifications that arrive while
72/// waiting for RPC responses.
73///
74/// The client automatically kills the app-server process when dropped.
75pub struct SyncClient {
76    child: Child,
77    writer: BufWriter<std::process::ChildStdin>,
78    reader: BufReader<std::process::ChildStdout>,
79    /// Handle to the background thread draining the child's stderr pipe.
80    /// Kept alive for the lifetime of the client; the thread exits on EOF
81    /// when the child is killed.
82    _stderr_drain: std::thread::JoinHandle<()>,
83    next_id: i64,
84    buffered: VecDeque<ServerMessage>,
85}
86
87impl SyncClient {
88    /// Start an app-server with default settings.
89    ///
90    /// Spawns `codex app-server --listen stdio://`, performs the required
91    /// `initialize` handshake, and returns a connected client ready for
92    /// `thread_start()`.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the `codex` CLI is not installed, the version is
97    /// incompatible, the process fails to start, or the initialization
98    /// handshake fails.
99    pub fn start() -> Result<Self> {
100        Self::start_with(AppServerBuilder::new())
101    }
102
103    /// Start an app-server with a custom [`AppServerBuilder`].
104    ///
105    /// Performs the required `initialize` handshake before returning.
106    /// Use this to configure a custom binary path or working directory.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the process fails to start, stdio pipes
111    /// cannot be established, or the initialization handshake fails.
112    pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
113        let mut client = Self::spawn(builder)?;
114        client.initialize(&InitializeParams {
115            client_info: ClientInfo {
116                name: "codex-codes".to_string(),
117                version: env!("CARGO_PKG_VERSION").to_string(),
118                title: None,
119            },
120            capabilities: None,
121        })?;
122        Ok(client)
123    }
124
125    /// Spawn an app-server without performing the `initialize` handshake.
126    ///
127    /// Use this if you need to send a custom [`InitializeParams`] (e.g., with
128    /// specific capabilities). You **must** call [`SyncClient::initialize`]
129    /// before any other requests.
130    pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
131        crate::version::check_codex_version()?;
132
133        let mut child = builder.spawn_sync()?;
134
135        let stdin = child
136            .stdin
137            .take()
138            .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
139        let stdout = child
140            .stdout
141            .take()
142            .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
143        let stderr = child
144            .stderr
145            .take()
146            .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
147
148        // The app-server emits ~200 KB/s of tracing to stderr. Without an
149        // active reader, the ~64 KB kernel pipe fills almost instantly and
150        // the child blocks. Drain in the background and route lines through
151        // the `log` crate (see [`crate::stderr_drain`]).
152        let stderr_drain = crate::stderr_drain::spawn_sync(stderr);
153
154        Ok(Self {
155            child,
156            writer: BufWriter::new(stdin),
157            reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
158            _stderr_drain: stderr_drain,
159            next_id: 1,
160            buffered: VecDeque::new(),
161        })
162    }
163
164    /// Send a JSON-RPC request and wait for the matching response.
165    ///
166    /// Any notifications or server requests that arrive before the response
167    /// are buffered and can be retrieved via [`SyncClient::next_message`].
168    ///
169    /// # Errors
170    ///
171    /// - [`Error::JsonRpc`] if the server returns a JSON-RPC error
172    /// - [`Error::ServerClosed`] if the connection drops before a response arrives
173    /// - [`Error::Json`] if response deserialization fails
174    pub fn request<P: Serialize, R: DeserializeOwned>(
175        &mut self,
176        method: &str,
177        params: &P,
178    ) -> Result<R> {
179        let id = RequestId::Integer(self.next_id);
180        self.next_id += 1;
181
182        let req = JsonRpcRequest {
183            id: id.clone(),
184            method: method.to_string(),
185            params: Some(serde_json::to_value(params).map_err(Error::Json)?),
186        };
187
188        self.send_raw(&req)?;
189
190        loop {
191            let msg = self.read_message()?;
192            match msg {
193                JsonRpcMessage::Response(resp) if resp.id == id => {
194                    let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
195                    return Ok(result);
196                }
197                JsonRpcMessage::Error(err) if err.id == id => {
198                    return Err(Error::JsonRpc {
199                        code: err.error.code,
200                        message: err.error.message,
201                    });
202                }
203                JsonRpcMessage::Notification(notif) => {
204                    let typed = Notification::from_envelope(&notif.method, notif.params)
205                        .map_err(Error::Json)?;
206                    self.buffered.push_back(ServerMessage::Notification(typed));
207                }
208                JsonRpcMessage::Request(req) => {
209                    let typed = ServerRequest::from_envelope(&req.method, req.params)
210                        .map_err(Error::Json)?;
211                    self.buffered.push_back(ServerMessage::Request {
212                        id: req.id,
213                        request: typed,
214                    });
215                }
216                JsonRpcMessage::Response(resp) => {
217                    warn!(
218                        "[CLIENT] Unexpected response for id={}, expected id={}",
219                        resp.id, id
220                    );
221                }
222                JsonRpcMessage::Error(err) => {
223                    warn!(
224                        "[CLIENT] Unexpected error for id={}, expected id={}",
225                        err.id, id
226                    );
227                }
228            }
229        }
230    }
231
232    /// Start a new thread (conversation session).
233    ///
234    /// A thread must be created before any turns can be started. The returned
235    /// [`ThreadStartResponse`] contains the `thread_id` needed for subsequent calls.
236    pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
237        self.request(crate::protocol::methods::THREAD_START, params)
238    }
239
240    /// Resume a previously persisted thread by id.
241    ///
242    /// Replays the thread's history so turns can continue where they left off.
243    pub fn thread_resume(&mut self, params: &ThreadResumeParams) -> Result<ThreadResumeResponse> {
244        self.request(crate::protocol::methods::THREAD_RESUME, params)
245    }
246
247    /// Fork an existing thread into a new independent thread.
248    pub fn thread_fork(&mut self, params: &ThreadForkParams) -> Result<ThreadForkResponse> {
249        self.request(crate::protocol::methods::THREAD_FORK, params)
250    }
251
252    /// Start a new turn within a thread.
253    ///
254    /// Sends user input to the agent. After calling this, use [`SyncClient::events`]
255    /// or [`SyncClient::next_message`] to consume notifications until `turn/completed`.
256    pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
257        self.request(crate::protocol::methods::TURN_START, params)
258    }
259
260    /// Interrupt an active turn.
261    pub fn turn_interrupt(
262        &mut self,
263        params: &TurnInterruptParams,
264    ) -> Result<TurnInterruptResponse> {
265        self.request(crate::protocol::methods::TURN_INTERRUPT, params)
266    }
267
268    /// Archive a thread.
269    pub fn thread_archive(
270        &mut self,
271        params: &ThreadArchiveParams,
272    ) -> Result<ThreadArchiveResponse> {
273        self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
274    }
275
276    /// Delete a thread.
277    pub fn thread_delete(&mut self, params: &ThreadDeleteParams) -> Result<ThreadDeleteResponse> {
278        self.request(crate::protocol::methods::THREAD_DELETE, params)
279    }
280
281    /// Perform the `initialize` handshake with the app-server.
282    ///
283    /// Sends `initialize` with the given params and then sends the
284    /// `initialized` notification. This must be the first request after
285    /// spawning the process.
286    pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
287        let resp: InitializeResponse =
288            self.request(crate::protocol::methods::INITIALIZE, params)?;
289        self.send_notification(crate::protocol::methods::INITIALIZED)?;
290        Ok(resp)
291    }
292
293    /// Respond to a server-to-client request (e.g., approval flow).
294    ///
295    /// When the server sends a [`ServerMessage::Request`], it expects a response.
296    /// Use this method with the request's `id` and a result payload. For command
297    /// approval, pass a [`CommandExecutionApprovalResponse`](crate::CommandExecutionApprovalResponse).
298    /// For file change approval, pass a [`FileChangeApprovalResponse`](crate::FileChangeApprovalResponse).
299    pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
300        let resp = JsonRpcResponse {
301            id,
302            result: serde_json::to_value(result).map_err(Error::Json)?,
303        };
304        self.send_raw(&resp)
305    }
306
307    /// Respond to a server-to-client request with an error.
308    pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
309        let err = JsonRpcError {
310            id,
311            error: crate::jsonrpc::JsonRpcErrorData {
312                code,
313                message: message.to_string(),
314                data: None,
315            },
316        };
317        self.send_raw(&err)
318    }
319
320    /// Read the next incoming server message (notification or server request).
321    ///
322    /// Returns buffered messages first (from notifications that arrived during
323    /// a [`SyncClient::request`] call), then reads from the wire.
324    ///
325    /// Returns `Ok(None)` when the app-server closes the connection (EOF).
326    pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
327        if let Some(msg) = self.buffered.pop_front() {
328            return Ok(Some(msg));
329        }
330
331        loop {
332            let msg = match self.read_message_opt()? {
333                Some(m) => m,
334                None => return Ok(None),
335            };
336
337            match msg {
338                JsonRpcMessage::Notification(notif) => {
339                    let JsonRpcNotification { method, params } = notif;
340                    let typed =
341                        Notification::from_envelope(&method, params.clone()).map_err(|e| {
342                            Error::Deserialization(ParseError::from_envelope(method, params, e))
343                        })?;
344                    return Ok(Some(ServerMessage::Notification(typed)));
345                }
346                JsonRpcMessage::Request(req) => {
347                    let JsonRpcRequest { id, method, params } = req;
348                    let typed =
349                        ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
350                            Error::Deserialization(ParseError::from_envelope(method, params, e))
351                        })?;
352                    return Ok(Some(ServerMessage::Request { id, request: typed }));
353                }
354                JsonRpcMessage::Response(resp) => {
355                    warn!(
356                        "[CLIENT] Unexpected response (no pending request): id={}",
357                        resp.id
358                    );
359                }
360                JsonRpcMessage::Error(err) => {
361                    warn!(
362                        "[CLIENT] Unexpected error (no pending request): id={} code={}",
363                        err.id, err.error.code
364                    );
365                }
366            }
367        }
368    }
369
370    /// Return an iterator over [`ServerMessage`]s.
371    ///
372    /// The iterator yields `Result<ServerMessage>` and terminates when the
373    /// connection closes (EOF). This is the idiomatic way to consume a turn's
374    /// notifications in synchronous code.
375    pub fn events(&mut self) -> EventIterator<'_> {
376        EventIterator { client: self }
377    }
378
379    /// Shut down the child process.
380    ///
381    /// Kills the process if it's still running. Called automatically on [`Drop`].
382    pub fn shutdown(&mut self) -> Result<()> {
383        debug!("[CLIENT] Shutting down");
384        match self.child.try_wait() {
385            Ok(Some(_)) => Ok(()),
386            Ok(None) => {
387                self.child.kill().map_err(Error::Io)?;
388                self.child.wait().map_err(Error::Io)?;
389                Ok(())
390            }
391            Err(e) => Err(Error::Io(e)),
392        }
393    }
394
395    // -- internal --
396
397    fn send_notification(&mut self, method: &str) -> Result<()> {
398        let notif = JsonRpcNotification {
399            method: method.to_string(),
400            params: None,
401        };
402        self.send_raw(&notif)
403    }
404
405    fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
406        let json = serde_json::to_string(msg).map_err(Error::Json)?;
407        debug!("[CLIENT] Sending: {}", json);
408        self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
409        self.writer.write_all(b"\n").map_err(Error::Io)?;
410        self.writer.flush().map_err(Error::Io)?;
411        Ok(())
412    }
413
414    fn read_message(&mut self) -> Result<JsonRpcMessage> {
415        self.read_message_opt()?.ok_or(Error::ServerClosed)
416    }
417
418    fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
419        loop {
420            let mut line = String::new();
421            match self.reader.read_line(&mut line) {
422                Ok(0) => {
423                    debug!("[CLIENT] Stream closed (EOF)");
424                    return Ok(None);
425                }
426                Ok(_) => {
427                    let trimmed = line.trim();
428                    if trimmed.is_empty() {
429                        continue;
430                    }
431
432                    debug!("[CLIENT] Received: {}", trimmed);
433
434                    match serde_json::from_str::<JsonRpcMessage>(trimmed) {
435                        Ok(msg) => return Ok(Some(msg)),
436                        Err(e) => {
437                            warn!(
438                                "[CLIENT] Failed to deserialize message. \
439                                 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
440                            );
441                            warn!("[CLIENT] Parse error: {}", e);
442                            warn!("[CLIENT] Raw: {}", trimmed);
443                            return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
444                        }
445                    }
446                }
447                Err(e) => {
448                    debug!("[CLIENT] Error reading stdout: {}", e);
449                    return Err(Error::Io(e));
450                }
451            }
452        }
453    }
454}
455
456impl Drop for SyncClient {
457    fn drop(&mut self) {
458        if let Err(e) = self.shutdown() {
459            debug!("[CLIENT] Error during shutdown: {}", e);
460        }
461    }
462}
463
464/// Iterator over [`ServerMessage`]s from a [`SyncClient`].
465pub struct EventIterator<'a> {
466    client: &'a mut SyncClient,
467}
468
469impl Iterator for EventIterator<'_> {
470    type Item = Result<ServerMessage>;
471
472    fn next(&mut self) -> Option<Self::Item> {
473        match self.client.next_message() {
474            Ok(Some(msg)) => Some(Ok(msg)),
475            Ok(None) => None,
476            Err(e) => Some(Err(e)),
477        }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484
485    #[test]
486    fn test_buffer_size() {
487        assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
488    }
489}