use std::io::{BufRead, BufReader, Write};
use std::process::{Child, ChildStdin, Command, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use serde_json::Value;
const RPC_TIMEOUT: Duration = Duration::from_secs(30);
fn koda_bin() -> String {
if let Ok(path) = std::env::var("CARGO_BIN_EXE_koda") {
return path;
}
let mut path = std::env::current_exe().unwrap();
path.pop(); path.pop(); path.push("koda");
path.to_string_lossy().to_string()
}
struct ServerHarness {
child: Option<Child>,
stdin: Option<ChildStdin>,
rx: mpsc::Receiver<std::io::Result<String>>,
reader_handle: Option<thread::JoinHandle<()>>,
stderr_path: std::path::PathBuf,
}
impl ServerHarness {
fn spawn(project_dir: &tempfile::TempDir, config_dir: &tempfile::TempDir) -> Self {
let stderr_path = config_dir.path().join("koda-stderr.log");
let stderr_file = std::fs::File::create(&stderr_path).expect("create stderr log");
let mut child = Command::new(koda_bin())
.arg("--project-root")
.arg(project_dir.path())
.args(["server", "--stdio"])
.env("XDG_CONFIG_HOME", config_dir.path())
.env("RUST_LOG", "koda_core=debug,koda_cli=debug")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::from(stderr_file))
.spawn()
.expect("Failed to start koda server");
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let (tx, rx) = mpsc::channel();
let reader_handle = thread::spawn(move || {
let mut buf = BufReader::new(stdout);
loop {
let mut line = String::new();
match buf.read_line(&mut line) {
Ok(0) => break, Ok(_) => {
if tx.send(Ok(line)).is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(e));
break;
}
}
}
});
Self {
child: Some(child),
stdin: Some(stdin),
rx,
reader_handle: Some(reader_handle),
stderr_path,
}
}
fn dump_stderr(&self) -> String {
std::fs::read_to_string(&self.stderr_path).unwrap_or_else(|e| format!("<read err: {e}>"))
}
fn send_and_recv(&mut self, label: &str, msg: &Value) -> Value {
let stdin = self.stdin.as_mut().expect("stdin gone");
let line = serde_json::to_string(msg).unwrap() + "\n";
stdin.write_all(line.as_bytes()).unwrap();
stdin.flush().unwrap();
let response = match self.rx.recv_timeout(RPC_TIMEOUT) {
Ok(Ok(line)) => line,
Ok(Err(e)) => panic!("[{label}] read error from server: {e}"),
Err(mpsc::RecvTimeoutError::Timeout) => {
let status = self
.child
.as_mut()
.and_then(|c| c.try_wait().ok().flatten());
let stderr = self.dump_stderr();
panic!(
"[{label}] timed out after {RPC_TIMEOUT:?} waiting for server \
response (process exited: {status:?}).\nSent: {}\n=== koda stderr ===\n{stderr}",
serde_json::to_string_pretty(msg).unwrap()
);
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
let status = self
.child
.as_mut()
.and_then(|c| c.try_wait().ok().flatten());
let stderr = self.dump_stderr();
panic!(
"[{label}] Server EOF before responding (process exited: \
{status:?}).\nSent: {}\n=== koda stderr ===\n{stderr}",
serde_json::to_string_pretty(msg).unwrap()
);
}
};
if response.trim().is_empty() {
panic!(
"[{label}] Server returned empty response.\nSent: {}",
serde_json::to_string_pretty(msg).unwrap()
);
}
serde_json::from_str(response.trim())
.unwrap_or_else(|e| panic!("[{label}] invalid JSON: {e}\nraw: {response:?}"))
}
fn send_notification(&mut self, msg: &Value) {
let stdin = self.stdin.as_mut().expect("stdin gone");
let line = serde_json::to_string(msg).unwrap() + "\n";
stdin.write_all(line.as_bytes()).unwrap();
stdin.flush().unwrap();
}
fn shutdown(&mut self) {
drop(self.stdin.take());
if let Some(mut child) = self.child.take() {
let _ = child.kill();
let _ = child.wait();
}
if let Some(h) = self.reader_handle.take() {
let _ = h.join();
}
}
}
impl Drop for ServerHarness {
fn drop(&mut self) {
if let Some(child) = self.child.as_mut() {
let _ = child.kill();
let _ = child.wait();
}
}
}
fn initialize_msg(id: u64) -> Value {
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": "initialize",
"params": {
"protocolVersion": "0.1",
"clientCapabilities": {}
}
})
}
fn new_session_msg(id: u64, project_dir: &tempfile::TempDir) -> Value {
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": "session/new",
"params": {
"cwd": project_dir.path().to_string_lossy(),
"mcpServers": []
}
})
}
#[test]
fn test_server_protocol_smoke() {
let project_dir = tempfile::TempDir::new().unwrap();
let config_dir = tempfile::TempDir::new().unwrap();
let mut srv = ServerHarness::spawn(&project_dir, &config_dir);
let resp = srv.send_and_recv("initialize", &initialize_msg(1));
assert_eq!(resp["jsonrpc"], "2.0");
assert_eq!(resp["id"], 1);
assert!(
resp["result"].is_object(),
"initialize: expected result object"
);
let agent_info = &resp["result"]["agentInfo"];
assert_eq!(
agent_info["name"], "koda",
"initialize: agent name should be 'koda'"
);
assert_eq!(
agent_info["version"],
env!("CARGO_PKG_VERSION"),
"initialize: should report compiled version"
);
let resp = srv.send_and_recv("session/new", &new_session_msg(2, &project_dir));
assert_eq!(resp["jsonrpc"], "2.0");
assert_eq!(resp["id"], 2);
assert!(
resp["result"].is_object(),
"session/new: expected result object"
);
let session_id = resp["result"]["sessionId"]
.as_str()
.expect("session/new: sessionId should be a string")
.to_string();
assert!(
!session_id.is_empty(),
"session/new: sessionId should not be empty"
);
srv.send_notification(&serde_json::json!({
"jsonrpc": "2.0",
"method": "session/cancel",
"params": { "sessionId": session_id }
}));
let resp = srv.send_and_recv("post-cancel initialize", &initialize_msg(3));
assert_eq!(resp["id"], 3);
assert!(
resp["result"].is_object(),
"post-cancel initialize: expected result object"
);
srv.shutdown();
}