use crate::cli::AppServerBuilder;
use crate::error::{Error, Result};
use crate::jsonrpc::{
JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RequestId,
};
use crate::protocol::{
ClientInfo, InitializeParams, InitializeResponse, ServerMessage, ThreadArchiveParams,
ThreadArchiveResponse, ThreadStartParams, ThreadStartResponse, TurnInterruptParams,
TurnInterruptResponse, TurnStartParams, TurnStartResponse,
};
use log::{debug, error, warn};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicI64, Ordering};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::{Child, ChildStderr};
const STDOUT_BUFFER_SIZE: usize = 10 * 1024 * 1024;
pub struct AsyncClient {
child: Child,
writer: BufWriter<tokio::process::ChildStdin>,
reader: BufReader<tokio::process::ChildStdout>,
stderr: Option<BufReader<ChildStderr>>,
next_id: AtomicI64,
buffered: VecDeque<ServerMessage>,
}
impl AsyncClient {
pub async fn start() -> Result<Self> {
Self::start_with(AppServerBuilder::new()).await
}
pub async fn start_with(builder: AppServerBuilder) -> Result<Self> {
let mut client = Self::spawn(builder).await?;
client
.initialize(&InitializeParams {
client_info: ClientInfo {
name: "codex-codes".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
title: None,
},
capabilities: None,
})
.await?;
Ok(client)
}
pub async fn spawn(builder: AppServerBuilder) -> Result<Self> {
crate::version::check_codex_version_async().await?;
let mut child = builder.spawn().await?;
let stdin = child
.stdin
.take()
.ok_or_else(|| Error::Protocol("Failed to get stdin".to_string()))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| Error::Protocol("Failed to get stdout".to_string()))?;
let stderr = child.stderr.take().map(BufReader::new);
Ok(Self {
child,
writer: BufWriter::new(stdin),
reader: BufReader::with_capacity(STDOUT_BUFFER_SIZE, stdout),
stderr,
next_id: AtomicI64::new(1),
buffered: VecDeque::new(),
})
}
pub async fn request<P: Serialize, R: DeserializeOwned>(
&mut self,
method: &str,
params: &P,
) -> Result<R> {
let id = RequestId::Integer(self.next_id.fetch_add(1, Ordering::Relaxed));
let req = JsonRpcRequest {
id: id.clone(),
method: method.to_string(),
params: Some(serde_json::to_value(params).map_err(Error::Json)?),
};
self.send_raw(&req).await?;
loop {
let msg = self.read_message().await?;
match msg {
JsonRpcMessage::Response(resp) if resp.id == id => {
let result: R = serde_json::from_value(resp.result).map_err(Error::Json)?;
return Ok(result);
}
JsonRpcMessage::Error(err) if err.id == id => {
return Err(Error::JsonRpc {
code: err.error.code,
message: err.error.message,
});
}
JsonRpcMessage::Notification(notif) => {
self.buffered.push_back(ServerMessage::Notification {
method: notif.method,
params: notif.params,
});
}
JsonRpcMessage::Request(req) => {
self.buffered.push_back(ServerMessage::Request {
id: req.id,
method: req.method,
params: req.params,
});
}
JsonRpcMessage::Response(resp) => {
warn!(
"[CLIENT] Unexpected response for id={}, expected id={}",
resp.id, id
);
}
JsonRpcMessage::Error(err) => {
warn!(
"[CLIENT] Unexpected error for id={}, expected id={}",
err.id, id
);
}
}
}
}
pub async fn thread_start(
&mut self,
params: &ThreadStartParams,
) -> Result<ThreadStartResponse> {
self.request(crate::protocol::methods::THREAD_START, params)
.await
}
pub async fn turn_start(&mut self, params: &TurnStartParams) -> Result<TurnStartResponse> {
self.request(crate::protocol::methods::TURN_START, params)
.await
}
pub async fn turn_interrupt(
&mut self,
params: &TurnInterruptParams,
) -> Result<TurnInterruptResponse> {
self.request(crate::protocol::methods::TURN_INTERRUPT, params)
.await
}
pub async fn thread_archive(
&mut self,
params: &ThreadArchiveParams,
) -> Result<ThreadArchiveResponse> {
self.request(crate::protocol::methods::THREAD_ARCHIVE, params)
.await
}
pub async fn initialize(&mut self, params: &InitializeParams) -> Result<InitializeResponse> {
let resp: InitializeResponse = self
.request(crate::protocol::methods::INITIALIZE, params)
.await?;
self.send_notification(crate::protocol::methods::INITIALIZED)
.await?;
Ok(resp)
}
pub async fn respond<R: Serialize>(&mut self, id: RequestId, result: &R) -> Result<()> {
let resp = JsonRpcResponse {
id,
result: serde_json::to_value(result).map_err(Error::Json)?,
};
self.send_raw(&resp).await
}
pub async fn respond_error(&mut self, id: RequestId, code: i64, message: &str) -> Result<()> {
let err = JsonRpcError {
id,
error: crate::jsonrpc::JsonRpcErrorData {
code,
message: message.to_string(),
data: None,
},
};
self.send_raw(&err).await
}
pub async fn next_message(&mut self) -> Result<Option<ServerMessage>> {
if let Some(msg) = self.buffered.pop_front() {
return Ok(Some(msg));
}
loop {
let msg = match self.read_message_opt().await? {
Some(m) => m,
None => return Ok(None),
};
match msg {
JsonRpcMessage::Notification(notif) => {
return Ok(Some(ServerMessage::Notification {
method: notif.method,
params: notif.params,
}));
}
JsonRpcMessage::Request(req) => {
return Ok(Some(ServerMessage::Request {
id: req.id,
method: req.method,
params: req.params,
}));
}
JsonRpcMessage::Response(resp) => {
warn!(
"[CLIENT] Unexpected response (no pending request): id={}",
resp.id
);
}
JsonRpcMessage::Error(err) => {
warn!(
"[CLIENT] Unexpected error (no pending request): id={} code={}",
err.id, err.error.code
);
}
}
}
}
pub fn events(&mut self) -> EventStream<'_> {
EventStream { client: self }
}
pub fn take_stderr(&mut self) -> Option<BufReader<ChildStderr>> {
self.stderr.take()
}
pub fn pid(&self) -> Option<u32> {
self.child.id()
}
pub fn is_alive(&mut self) -> bool {
self.child.try_wait().ok().flatten().is_none()
}
pub async fn shutdown(mut self) -> Result<()> {
debug!("[CLIENT] Shutting down");
self.child.kill().await.map_err(Error::Io)?;
Ok(())
}
async fn send_notification(&mut self, method: &str) -> Result<()> {
let notif = JsonRpcNotification {
method: method.to_string(),
params: None,
};
self.send_raw(¬if).await
}
async fn send_raw<T: Serialize>(&mut self, msg: &T) -> Result<()> {
let json = serde_json::to_string(msg).map_err(Error::Json)?;
debug!("[CLIENT] Sending: {}", json);
self.writer
.write_all(json.as_bytes())
.await
.map_err(Error::Io)?;
self.writer.write_all(b"\n").await.map_err(Error::Io)?;
self.writer.flush().await.map_err(Error::Io)?;
Ok(())
}
async fn read_message(&mut self) -> Result<JsonRpcMessage> {
self.read_message_opt().await?.ok_or(Error::ServerClosed)
}
async fn read_message_opt(&mut self) -> Result<Option<JsonRpcMessage>> {
let mut line = String::new();
loop {
line.clear();
let bytes_read = self.reader.read_line(&mut line).await.map_err(Error::Io)?;
if bytes_read == 0 {
debug!("[CLIENT] Stream closed (EOF)");
return Ok(None);
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
debug!("[CLIENT] Received: {}", trimmed);
match serde_json::from_str::<JsonRpcMessage>(trimmed) {
Ok(msg) => return Ok(Some(msg)),
Err(e) => {
warn!(
"[CLIENT] Failed to deserialize message. \
Please report this at https://github.com/meawoppl/rust-code-agent-sdks/issues"
);
warn!("[CLIENT] Parse error: {}", e);
warn!("[CLIENT] Raw: {}", trimmed);
return Err(Error::Deserialization(format!("{} (raw: {})", e, trimmed)));
}
}
}
}
}
impl Drop for AsyncClient {
fn drop(&mut self) {
if self.is_alive() {
if let Err(e) = self.child.start_kill() {
error!("Failed to kill app-server process on drop: {}", e);
}
}
}
}
pub struct EventStream<'a> {
client: &'a mut AsyncClient,
}
impl EventStream<'_> {
pub async fn next(&mut self) -> Option<Result<ServerMessage>> {
match self.client.next_message().await {
Ok(Some(msg)) => Some(Ok(msg)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
pub async fn collect(mut self) -> Result<Vec<ServerMessage>> {
let mut msgs = Vec::new();
while let Some(result) = self.next().await {
msgs.push(result?);
}
Ok(msgs)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_buffer_size() {
assert_eq!(STDOUT_BUFFER_SIZE, 10 * 1024 * 1024);
}
}