1use crate::cli::AppServerBuilder;
46use crate::error::{Error, ParseError, Result};
47use crate::jsonrpc::{
48 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
49};
50use crate::messages::{Notification, ServerMessage, ServerRequest};
51use crate::protocol::{
52 ClientInfo, InitializeParams, InitializeResponse, ThreadArchiveParams, ThreadArchiveResponse,
53 ThreadDeleteParams, ThreadDeleteResponse, ThreadForkParams, ThreadForkResponse,
54 ThreadResumeParams, ThreadResumeResponse, ThreadStartParams, ThreadStartResponse,
55 TurnInterruptParams, TurnInterruptResponse, TurnStartParams, TurnStartResponse,
56};
57use log::{debug, warn};
58use serde::de::DeserializeOwned;
59use serde::Serialize;
60use std::collections::VecDeque;
61use std::io::{BufRead, BufReader, BufWriter, Write};
62use std::process::Child;
63
64const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
66
67pub struct SyncClient {
76 child: Child,
77 writer: BufWriter<std::process::ChildStdin>,
78 reader: BufReader<std::process::ChildStdout>,
79 _stderr_drain: std::thread::JoinHandle<()>,
83 next_id: i64,
84 buffered: VecDeque<ServerMessage>,
85}
86
87impl SyncClient {
88 pub fn start() -> Result<Self> {
100 Self::start_with(AppServerBuilder::new())
101 }
102
103 pub fn start_with(builder: AppServerBuilder) -> Result<Self> {
113 let mut client = Self::spawn(builder)?;
114 client.initialize(&InitializeParams {
115 client_info: ClientInfo {
116 name: "codex-codes".to_string(),
117 version: env!("CARGO_PKG_VERSION").to_string(),
118 title: None,
119 },
120 capabilities: None,
121 })?;
122 Ok(client)
123 }
124
125 pub fn spawn(builder: AppServerBuilder) -> Result<Self> {
131 crate::version::check_codex_version()?;
132
133 let mut child = builder.spawn_sync()?;
134
135 let stdin = child
136 .stdin
137 .take()
138 .ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
139 let stdout = child
140 .stdout
141 .take()
142 .ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
143 let stderr = child
144 .stderr
145 .take()
146 .ok_or_else(|| Error::Protocol("Failed to get stderr".to_string()))?;
147
148 let stderr_drain = crate::stderr_drain::spawn_sync(stderr);
153
154 Ok(Self {
155 child,
156 writer: BufWriter::new(stdin),
157 reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
158 _stderr_drain: stderr_drain,
159 next_id: 1,
160 buffered: VecDeque::new(),
161 })
162 }
163
164 pub fn request<P: Serialize, R: DeserializeOwned>(
175 &mut self,
176 method: &str,
177 params: &P,
178 ) -> Result<R> {
179 let id = RequestId::Integer(self.next_id);
180 self.next_id += 1;
181
182 let req = JsonRpcRequest {
183 id: id.clone(),
184 method: method.to_string(),
185 params: Some(serde_json::to_value(params).map_err(Error::Json)?),
186 };
187
188 self.send_raw(&req)?;
189
190 loop {
191 let msg = self.read_message()?;
192 match msg {
193 JsonRpcMessage::Response(resp) if resp.id == id => {
194 let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
195 return Ok(result);
196 }
197 JsonRpcMessage::Error(err) if err.id == id => {
198 return Err(Error::JsonRpc {
199 code: err.error.code,
200 message: err.error.message,
201 });
202 }
203 JsonRpcMessage::Notification(notif) => {
204 let typed = Notification::from_envelope(¬if.method, notif.params)
205 .map_err(Error::Json)?;
206 self.buffered.push_back(ServerMessage::Notification(typed));
207 }
208 JsonRpcMessage::Request(req) => {
209 let typed = ServerRequest::from_envelope(&req.method, req.params)
210 .map_err(Error::Json)?;
211 self.buffered.push_back(ServerMessage::Request {
212 id: req.id,
213 request: typed,
214 });
215 }
216 JsonRpcMessage::Response(resp) => {
217 warn!(
218 "[CLIENT] Unexpected response for id={}, expected id={}",
219 resp.id, id
220 );
221 }
222 JsonRpcMessage::Error(err) => {
223 warn!(
224 "[CLIENT] Unexpected error for id={}, expected id={}",
225 err.id, id
226 );
227 }
228 }
229 }
230 }
231
232 pub fn thread_start(&mut self, params: &ThreadStartParams) -> Result<ThreadStartResponse> {
237 self.request(crate::protocol::methods::THREAD_START, params)
238 }
239
240 pub fn thread_resume(&mut self, params: &ThreadResumeParams) -> Result<ThreadResumeResponse> {
244 self.request(crate::protocol::methods::THREAD_RESUME, params)
245 }
246
247 pub fn thread_fork(&mut self, params: &ThreadForkParams) -> Result<ThreadForkResponse> {
249 self.request(crate::protocol::methods::THREAD_FORK, params)
250 }
251
252 pub fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
257 self.request(crate::protocol::methods::TURN_START, params)
258 }
259
260 pub fn turn_interrupt(
262 &mut self,
263 params: &TurnInterruptParams,
264 ) -> Result<TurnInterruptResponse> {
265 self.request(crate::protocol::methods::TURN_INTERRUPT, params)
266 }
267
268 pub fn thread_archive(
270 &mut self,
271 params: &ThreadArchiveParams,
272 ) -> Result<ThreadArchiveResponse> {
273 self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
274 }
275
276 pub fn thread_delete(&mut self, params: &ThreadDeleteParams) -> Result<ThreadDeleteResponse> {
278 self.request(crate::protocol::methods::THREAD_DELETE, params)
279 }
280
281 pub fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
287 let resp: InitializeResponse =
288 self.request(crate::protocol::methods::INITIALIZE, params)?;
289 self.send_notification(crate::protocol::methods::INITIALIZED)?;
290 Ok(resp)
291 }
292
293 pub fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
300 let resp = JsonRpcResponse {
301 id,
302 result: serde_json::to_value(result).map_err(Error::Json)?,
303 };
304 self.send_raw(&resp)
305 }
306
307 pub fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
309 let err = JsonRpcError {
310 id,
311 error: crate::jsonrpc::JsonRpcErrorData {
312 code,
313 message: message.to_string(),
314 data: None,
315 },
316 };
317 self.send_raw(&err)
318 }
319
320 pub fn next_message(&mut self) -> Result<Option<ServerMessage>> {
327 if let Some(msg) = self.buffered.pop_front() {
328 return Ok(Some(msg));
329 }
330
331 loop {
332 let msg = match self.read_message_opt()? {
333 Some(m) => m,
334 None => return Ok(None),
335 };
336
337 match msg {
338 JsonRpcMessage::Notification(notif) => {
339 let JsonRpcNotification { method, params } = notif;
340 let typed =
341 Notification::from_envelope(&method, params.clone()).map_err(|e| {
342 Error::Deserialization(ParseError::from_envelope(method, params, e))
343 })?;
344 return Ok(Some(ServerMessage::Notification(typed)));
345 }
346 JsonRpcMessage::Request(req) => {
347 let JsonRpcRequest { id, method, params } = req;
348 let typed =
349 ServerRequest::from_envelope(&method, params.clone()).map_err(|e| {
350 Error::Deserialization(ParseError::from_envelope(method, params, e))
351 })?;
352 return Ok(Some(ServerMessage::Request { id, request: typed }));
353 }
354 JsonRpcMessage::Response(resp) => {
355 warn!(
356 "[CLIENT] Unexpected response (no pending request): id={}",
357 resp.id
358 );
359 }
360 JsonRpcMessage::Error(err) => {
361 warn!(
362 "[CLIENT] Unexpected error (no pending request): id={} code={}",
363 err.id, err.error.code
364 );
365 }
366 }
367 }
368 }
369
370 pub fn events(&mut self) -> EventIterator<'_> {
376 EventIterator { client: self }
377 }
378
379 pub fn shutdown(&mut self) -> Result<()> {
383 debug!("[CLIENT] Shutting down");
384 match self.child.try_wait() {
385 Ok(Some(_)) => Ok(()),
386 Ok(None) => {
387 self.child.kill().map_err(Error::Io)?;
388 self.child.wait().map_err(Error::Io)?;
389 Ok(())
390 }
391 Err(e) => Err(Error::Io(e)),
392 }
393 }
394
395 fn send_notification(&mut self, method: &str) -> Result<()> {
398 let notif = JsonRpcNotification {
399 method: method.to_string(),
400 params: None,
401 };
402 self.send_raw(¬if)
403 }
404
405 fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
406 let json = serde_json::to_string(msg).map_err(Error::Json)?;
407 debug!("[CLIENT] Sending: {}", json);
408 self.writer.write_all(json.as_bytes()).map_err(Error::Io)?;
409 self.writer.write_all(b"\n").map_err(Error::Io)?;
410 self.writer.flush().map_err(Error::Io)?;
411 Ok(())
412 }
413
414 fn read_message(&mut self) -> Result<JsonRpcMessage> {
415 self.read_message_opt()?.ok_or(Error::ServerClosed)
416 }
417
418 fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
419 loop {
420 let mut line = String::new();
421 match self.reader.read_line(&mut line) {
422 Ok(0) => {
423 debug!("[CLIENT] Stream closed (EOF)");
424 return Ok(None);
425 }
426 Ok(_) => {
427 let trimmed = line.trim();
428 if trimmed.is_empty() {
429 continue;
430 }
431
432 debug!("[CLIENT] Received: {}", trimmed);
433
434 match serde_json::from_str::<JsonRpcMessage>(trimmed) {
435 Ok(msg) => return Ok(Some(msg)),
436 Err(e) => {
437 warn!(
438 "[CLIENT] Failed to deserialize message. \
439 Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
440 );
441 warn!("[CLIENT] Parse error: {}", e);
442 warn!("[CLIENT] Raw: {}", trimmed);
443 return Err(Error::Deserialization(ParseError::from_line(trimmed, e)));
444 }
445 }
446 }
447 Err(e) => {
448 debug!("[CLIENT] Error reading stdout: {}", e);
449 return Err(Error::Io(e));
450 }
451 }
452 }
453 }
454}
455
456impl Drop for SyncClient {
457 fn drop(&mut self) {
458 if let Err(e) = self.shutdown() {
459 debug!("[CLIENT] Error during shutdown: {}", e);
460 }
461 }
462}
463
464pub struct EventIterator<'a> {
466 client: &'a mut SyncClient,
467}
468
469impl Iterator for EventIterator<'_> {
470 type Item = Result<ServerMessage>;
471
472 fn next(&mut self) -> Option<Self::Item> {
473 match self.client.next_message() {
474 Ok(Some(msg)) => Some(Ok(msg)),
475 Ok(None) => None,
476 Err(e) => Some(Err(e)),
477 }
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484
485 #[test]
486 fn test_buffer_size() {
487 assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
488 }
489}