use assert_cmd::cargo::CommandCargoExt;
use std::{
io::{BufRead, BufReader, Read, Write},
process::{Command, Stdio},
time::{Duration, Instant},
};
const SPAWN_ENV: &[(&str, &str)] = &[
("RUST_LOG", "grex=info,rmcp=warn"),
];
fn frame(msg: &str) -> String {
let mut s = String::with_capacity(msg.len() + 1);
s.push_str(msg);
s.push('\n');
s
}
fn spawn_serve() -> std::process::Child {
let mut cmd = Command::cargo_bin("grex").expect("grex binary builds");
cmd.arg("serve");
for (k, v) in SPAWN_ENV {
cmd.env(k, v);
}
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn grex serve")
}
fn read_until_contains(
rx: &mut dyn BufRead,
needle: &str,
deadline: Instant,
) -> Result<String, String> {
let mut acc = String::new();
while Instant::now() < deadline {
let mut line = String::new();
match rx.read_line(&mut line) {
Ok(0) => return Err(format!("EOF before `{needle}`; got: {acc}")),
Ok(_) => {
acc.push_str(&line);
if line.contains(needle) {
return Ok(acc);
}
}
Err(e) => return Err(format!("read err {e}; partial: {acc}")),
}
}
Err(format!("timeout waiting for `{needle}`; got: {acc}"))
}
fn drive_init(stdin: &mut std::process::ChildStdin, client_name: &str) {
let init = format!(
r#"{{"jsonrpc":"2.0","id":1,"method":"initialize","params":{{"protocolVersion":"2025-06-18","capabilities":{{}},"clientInfo":{{"name":"{client_name}","version":"0.0.1"}}}}}}"#,
);
stdin.write_all(frame(&init).as_bytes()).expect("write init");
stdin.flush().expect("flush init");
}
fn drive_initialized(stdin: &mut std::process::ChildStdin) {
let initialized = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
stdin.write_all(frame(initialized).as_bytes()).expect("write initialized");
}
fn wait_for_exit(child: &mut std::process::Child, timeout: Duration, label: &str) {
let deadline = Instant::now() + timeout;
loop {
match child.try_wait().expect("try_wait") {
Some(_) => return,
None if Instant::now() < deadline => {
std::thread::sleep(Duration::from_millis(20));
}
None => {
let _ = child.kill();
panic!("child did not exit within {timeout:?}: {label}");
}
}
}
}
#[test]
fn grex_serve_subprocess_responds_to_tools_list() {
let mut child = spawn_serve();
let mut stdin = child.stdin.take().expect("stdin pipe");
let stdout = child.stdout.take().expect("stdout pipe");
let mut reader = BufReader::new(stdout);
drive_init(&mut stdin, "serve-smoke");
let deadline = Instant::now() + Duration::from_secs(10);
let init_resp = read_until_contains(&mut reader, "\"id\":1", deadline)
.expect("initialize response within 10s");
let init_line =
init_resp.lines().find(|l| l.contains("\"id\":1")).expect("init id=1 line present");
let _: serde_json::Value =
serde_json::from_str(init_line).expect("init response is valid JSON");
drive_initialized(&mut stdin);
let list = r#"{"jsonrpc":"2.0","id":2,"method":"tools/list"}"#;
stdin.write_all(frame(list).as_bytes()).expect("write tools/list");
stdin.flush().expect("flush stdin");
let list_resp = read_until_contains(&mut reader, "\"id\":2", deadline)
.expect("tools/list response within 10s");
let list_line =
list_resp.lines().find(|l| l.contains("\"id\":2")).expect("tools/list id=2 line present");
let parsed: serde_json::Value =
serde_json::from_str(list_line).expect("tools/list response is valid JSON");
let tools = parsed
.pointer("/result/tools")
.and_then(|v| v.as_array())
.expect("result.tools is an array");
assert_eq!(
tools.len(),
11,
"expected 11 tools, got {}: {:?}",
tools.len(),
tools.iter().map(|t| t.get("name")).collect::<Vec<_>>(),
);
drop(stdin);
wait_for_exit(&mut child, Duration::from_millis(2000), "after stdin close");
}
#[test]
fn grex_serve_shutdown_exits_cleanly() {
let mut child = spawn_serve();
let mut stdin = child.stdin.take().expect("stdin pipe");
let stdout = child.stdout.take().expect("stdout pipe");
let mut reader = BufReader::new(stdout);
drive_init(&mut stdin, "serve-smoke-shutdown");
let deadline = Instant::now() + Duration::from_secs(10);
let _ = read_until_contains(&mut reader, "\"id\":1", deadline)
.expect("initialize response within 10s");
drop(stdin);
let exit_deadline = Instant::now() + Duration::from_millis(500);
let status = loop {
match child.try_wait().expect("try_wait") {
Some(s) => break s,
None if Instant::now() < exit_deadline => {
std::thread::sleep(Duration::from_millis(10));
}
None => {
let _ = child.kill();
panic!("grex serve did not exit within 500 ms of stdin close");
}
}
};
assert!(status.success(), "grex serve exited non-zero on clean shutdown: {status:?}",);
}
#[test]
fn grex_serve_stderr_carries_tracing() {
let mut child = spawn_serve();
let mut stdin = child.stdin.take().expect("stdin pipe");
let stdout = child.stdout.take().expect("stdout pipe");
let mut stderr = child.stderr.take().expect("stderr pipe");
let mut stdout_reader = BufReader::new(stdout);
drive_init(&mut stdin, "serve-smoke-tracing");
drive_initialized(&mut stdin);
let deadline = Instant::now() + Duration::from_secs(10);
let stdout_acc =
read_until_contains(&mut stdout_reader, "\"id\":1", deadline).expect("init response");
drop(stdin);
let mut stdout_tail = String::new();
let _ = stdout_reader.read_to_string(&mut stdout_tail);
let full_stdout = format!("{stdout_acc}{stdout_tail}");
let _ = child.wait().expect("child waits");
let mut stderr_buf = String::new();
stderr.read_to_string(&mut stderr_buf).expect("read stderr");
for (i, line) in full_stdout.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
serde_json::from_str::<serde_json::Value>(trimmed).unwrap_or_else(|e| {
panic!(
"stdout line {i} is not valid JSON ({e}): {trimmed}\nfull stdout:\n{full_stdout}"
)
});
}
let has_tracing =
stderr_buf.contains("INFO") || stderr_buf.contains("WARN") || stderr_buf.contains("DEBUG");
assert!(
has_tracing,
"stderr lacks any tracing level marker (INFO/WARN/DEBUG); got:\n{stderr_buf}",
);
}