use anyhow::Result;
use serde_json::{Value, json};
use std::io::{BufRead, BufReader, Write};
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, Stdio};
use std::sync::{Mutex, OnceLock};
#[allow(dead_code)]
pub enum StderrMode {
Null,
Inherit,
Capture,
}
pub struct McpTestClient {
child: Child,
pub stdin: ChildStdin,
pub stdout: BufReader<ChildStdout>,
pub stderr: Option<BufReader<ChildStderr>>,
read_timeout_ms: i32,
}
static GRAPH_BUILD_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
fn workspace_root() -> std::path::PathBuf {
let manifest_dir =
std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR must be set by cargo");
std::path::Path::new(&manifest_dir)
.parent()
.expect("sqry-mcp must have a parent directory")
.to_path_buf()
}
fn find_sqry_mcp_binary() -> Option<std::path::PathBuf> {
let binary_name = format!("sqry-mcp{}", std::env::consts::EXE_SUFFIX);
let exe = std::env::current_exe().ok()?;
let parent = exe.parent()?;
let candidate = parent.join(&binary_name);
if candidate.is_file() {
return Some(candidate);
}
let grandparent = parent.parent()?;
let candidate = grandparent.join(&binary_name);
if candidate.is_file() {
return Some(candidate);
}
None
}
impl McpTestClient {
pub fn new() -> Result<Self> {
let mut command = if let Some(binary) = find_sqry_mcp_binary() {
Command::new(binary)
} else {
let mut cmd = Command::new("cargo");
cmd.args(["run", "-p", "sqry-mcp", "--quiet"]);
cmd
};
let fixture_workspace = workspace_root().join("sqry-lsp/tests/fixtures/mini-workspace");
let ws_root = if fixture_workspace.exists() {
fixture_workspace
} else {
workspace_root()
};
command
.env("SQRY_MCP_WORKSPACE_ROOT", ws_root)
.env("SQRY_MCP_TIMEOUT_MS", "30000") .stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null());
let mut child = command.spawn()?;
let stdin = child.stdin.take().unwrap();
let stdout = BufReader::new(child.stdout.take().unwrap());
Ok(Self {
child,
stdin,
stdout,
stderr: None,
read_timeout_ms: 30_000,
})
}
#[allow(dead_code)]
pub fn new_for_workspace() -> Result<Self> {
let mut client = Self::new_with_env_and_stderr_mode(
&[(
"SQRY_MCP_WORKSPACE_ROOT".to_string(),
workspace_root().to_string_lossy().into_owned(),
)],
StderrMode::Null,
)?;
client.read_timeout_ms = 120_000;
Ok(client)
}
#[allow(dead_code)]
pub fn new_initialized() -> Result<Self> {
let mut client = Self::new()?;
let _ = client.initialize()?;
Ok(client)
}
#[allow(dead_code)]
pub fn new_for_workspace_initialized() -> Result<Self> {
let mut client = Self::new_for_workspace()?;
let _ = client.initialize()?;
Ok(client)
}
#[allow(dead_code)]
pub fn initialize(&mut self) -> Result<Value> {
let response = self.call("initialize", default_initialize_params(), 0)?;
let notification = json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
});
writeln!(self.stdin, "{notification}")?;
self.stdin.flush()?;
Ok(response)
}
#[allow(dead_code)]
pub fn new_with_env(envs: &[(String, String)]) -> Result<Self> {
Self::new_with_env_and_stderr_mode(envs, StderrMode::Inherit)
}
#[allow(dead_code)]
pub fn new_with_env_initialized(envs: &[(String, String)]) -> Result<Self> {
let mut client = Self::new_with_env(envs)?;
let _ = client.initialize()?;
Ok(client)
}
#[allow(dead_code)]
pub fn new_with_env_and_stderr(
envs: &[(String, String)],
capture_stderr: bool,
) -> Result<Self> {
let mode = if capture_stderr {
StderrMode::Capture
} else {
StderrMode::Null
};
Self::new_with_env_and_stderr_mode(envs, mode)
}
#[allow(dead_code)]
pub fn new_with_env_and_stderr_mode(
envs: &[(String, String)],
stderr_mode: StderrMode,
) -> Result<Self> {
let (program, args): (std::ffi::OsString, &[&str]) =
if let Some(binary) = find_sqry_mcp_binary() {
(binary.into(), &[])
} else {
("cargo".into(), &["run", "-p", "sqry-mcp", "--quiet"])
};
let mut command = Command::new(program);
command
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped());
let has_workspace_override = envs
.iter()
.any(|(k, _)| k == "SQRY_MCP_WORKSPACE_ROOT" || k == "SQRY_WORKSPACE_ROOT");
if !has_workspace_override {
command.env("SQRY_MCP_WORKSPACE_ROOT", workspace_root());
}
let has_timeout_override = envs.iter().any(|(k, _)| k == "SQRY_MCP_TIMEOUT_MS");
if !has_timeout_override {
command.env("SQRY_MCP_TIMEOUT_MS", "600000"); }
let capture_stderr = match stderr_mode {
StderrMode::Null => {
command.stderr(Stdio::null());
false
}
StderrMode::Inherit => {
command.stderr(Stdio::inherit());
false
}
StderrMode::Capture => {
command.stderr(Stdio::piped());
true
}
};
for (key, value) in envs {
command.env(key, value);
}
let mut child = command.spawn()?;
let stdin = child.stdin.take().unwrap();
let stdout = BufReader::new(child.stdout.take().unwrap());
let stderr = if capture_stderr {
child.stderr.take().map(BufReader::new)
} else {
None
};
Ok(Self {
child,
stdin,
stdout,
stderr,
read_timeout_ms: 30_000,
})
}
#[allow(dead_code)]
pub fn read_stderr(&mut self) -> String {
use std::io::Read;
if let Some(ref mut stderr) = self.stderr {
let mut buf = String::new();
let _ = stderr.read_to_string(&mut buf);
buf
} else {
String::new()
}
}
pub fn send_request(&mut self, method: &str, params: Value, id: i64) -> Result<()> {
let request = json!({
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": id
});
writeln!(self.stdin, "{request}")?;
self.stdin.flush()?;
Ok(())
}
pub fn read_response(&mut self) -> Result<Value> {
self.read_response_with_timeout()?;
let mut line = String::new();
self.stdout.read_line(&mut line)?;
Ok(serde_json::from_str(&line)?)
}
#[cfg(unix)]
fn read_response_with_timeout(&mut self) -> Result<()> {
use std::os::unix::io::AsRawFd;
let fd = self.stdout.get_ref().as_raw_fd();
let timeout_ms: i32 = self.read_timeout_ms;
let mut pollfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
if ret == 0 {
let _ = self.child.kill();
return Err(anyhow::anyhow!(
"read_response timed out after {} seconds — MCP server may be stuck",
timeout_ms / 1000
));
} else if ret < 0 {
return Err(anyhow::anyhow!(
"poll() failed: {}",
std::io::Error::last_os_error()
));
}
Ok(())
}
#[cfg(not(unix))]
fn read_response_with_timeout(&mut self) -> Result<()> {
Ok(())
}
pub fn call(&mut self, method: &str, params: Value, id: i64) -> Result<Value> {
self.send_request(method, params, id)?;
self.read_response()
}
}
impl Drop for McpTestClient {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
#[allow(dead_code)]
pub fn unwrap_mcp_content(response: &Value) -> Result<Value> {
let result = response["result"]
.as_object()
.ok_or_else(|| anyhow::anyhow!("Missing result object"))?;
if !result.contains_key("content") {
return Ok(response["result"].clone());
}
let content = result["content"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing content array"))?;
let first_content = content
.first()
.ok_or_else(|| anyhow::anyhow!("Empty content array"))?;
let text = first_content["text"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing text in content"))?;
let inner: Value = serde_json::from_str(text)?;
Ok(inner)
}
#[allow(dead_code)]
pub fn ensure_graph_snapshot(root: &std::path::Path) -> Result<()> {
use sqry_core::graph::unified::build::{BuildConfig, build_unified_graph};
use sqry_core::graph::unified::persistence::{GraphStorage, save_to_path};
use sqry_plugin_registry::create_plugin_manager;
let lock = GRAPH_BUILD_LOCK.get_or_init(|| Mutex::new(()));
let _guard = lock.lock().expect("graph build lock");
let storage = GraphStorage::new(root);
if storage.snapshot_exists() {
return Ok(());
}
let plugins = create_plugin_manager();
let config = BuildConfig::default();
let graph = build_unified_graph(root, &plugins, &config)?;
std::fs::create_dir_all(storage.graph_dir())?;
save_to_path(&graph, storage.snapshot_path())?;
Ok(())
}
fn default_initialize_params() -> Value {
json!({
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": { "name": "sqry-mcp-tests", "version": "0.0" }
})
}