use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use futures_util::Stream;
use tokio_util::bytes::Bytes;
use tokio::process::{Child, Command};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use std::process::Stdio;
use std::sync::Arc;
use tokio::sync::Mutex;
use log::{debug, info};
use tokio::time::{sleep, timeout, Duration};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ToolDescriptor {
pub name: String,
pub description: Option<String>,
pub parameters: Option<Value>,
}
#[derive(Debug, Serialize)]
struct JsonRpcRequest {
jsonrpc: String,
id: u64,
method: String,
params: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
#[allow(dead_code)]
jsonrpc: String,
#[allow(dead_code)]
id: u64,
result: Option<Value>,
error: Option<JsonRpcError>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcError {
code: i32,
message: String,
#[allow(dead_code)]
data: Option<Value>,
}
#[derive(Debug)]
pub struct StdioMCPClient {
child: Arc<Mutex<Child>>,
request_id: Arc<Mutex<u64>>,
}
impl StdioMCPClient {
pub async fn new(command: &str, args: &[String], env: &std::collections::HashMap<String, String>) -> Result<Self> {
info!("Starting stdio MCP server: {} {:?}", command, args);
let mut cmd = Command::new(command);
cmd.args(args);
for (key, val) in env {
let resolved = if val.starts_with("${") && val.ends_with('}') {
std::env::var(&val[2..val.len() - 1]).unwrap_or_default()
} else {
val.clone()
};
cmd.env(key, resolved);
}
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit());
let child = cmd.spawn().context("failed to start stdio MCP server")?;
let client = Self {
child: Arc::new(Mutex::new(child)),
request_id: Arc::new(Mutex::new(1)),
};
info!("Waiting for stdio MCP server to initialize...");
sleep(Duration::from_millis(500)).await;
match timeout(Duration::from_secs(5), client.initialize()).await {
Ok(Ok(_)) => {
info!("Stdio MCP server initialized successfully");
Ok(client)
}
Ok(Err(e)) => {
info!("Failed to initialize stdio MCP server: {}", e);
let mut child = client.child.lock().await;
let _ = child.kill().await;
Err(e)
}
Err(_) => {
info!("Timeout initializing stdio MCP server");
let mut child = client.child.lock().await;
let _ = child.kill().await;
Err(anyhow::anyhow!("Timeout waiting for MCP server to initialize"))
}
}
}
async fn initialize(&self) -> Result<()> {
let params = serde_json::json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {
"listChanged": false
}
},
"clientInfo": {
"name": "cli_engineer",
"version": "1.3.0"
}
});
let _ = self.send_request("initialize", Some(params)).await?;
Ok(())
}
async fn send_request(&self, method: &str, params: Option<Value>) -> Result<Value> {
let id = {
let mut request_id = self.request_id.lock().await;
let id = *request_id;
*request_id += 1;
id
};
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id,
method: method.to_string(),
params,
};
let mut child = self.child.lock().await;
let request_json = serde_json::to_string(&request)?;
debug!("Sending MCP request: {}", request_json);
if let Some(stdin) = child.stdin.as_mut() {
stdin.write_all(request_json.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
} else {
return Err(anyhow::anyhow!("child stdin unavailable"));
}
let mut response_line = String::new();
if let Some(stdout) = child.stdout.as_mut() {
let mut reader = BufReader::new(stdout);
reader.read_line(&mut response_line).await?;
} else {
return Err(anyhow::anyhow!("child stdout unavailable"));
}
debug!("Received MCP response: {}", response_line.trim());
if response_line.trim().is_empty() {
return Err(anyhow::anyhow!("MCP server returned empty response"));
}
let response: JsonRpcResponse = serde_json::from_str(&response_line)
.context("failed to parse JSON-RPC response")?;
if let Some(error) = response.error {
let error_details = if let Some(data) = &error.data {
format!(" ({})", serde_json::to_string(data).unwrap_or_default())
} else {
String::new()
};
return Err(anyhow::anyhow!("MCP server error {}: {}{}", error.code, error.message, error_details));
}
response.result.ok_or_else(|| anyhow::anyhow!("MCP response missing result"))
}
pub async fn list_tools(&self) -> Result<Vec<ToolDescriptor>> {
let result = self.send_request("tools/list", Some(serde_json::json!({}))).await?;
if let Some(tools_value) = result.get("tools") {
let tools: Vec<ToolDescriptor> = serde_json::from_value(tools_value.clone())
.context("failed to parse tools list from MCP response")?;
Ok(tools)
} else {
let tools: Vec<ToolDescriptor> = serde_json::from_value(result)
.context("failed to parse tools list from MCP response")?;
Ok(tools)
}
}
pub async fn call_tool(&self, name: &str, args: &Value) -> Result<Value> {
let params = serde_json::json!({
"name": name,
"arguments": args
});
self.send_request("tools/call", Some(params)).await
}
}
#[derive(Debug)]
pub enum UnifiedMCPClient {
Http(MCPClient),
Stdio(StdioMCPClient),
}
impl UnifiedMCPClient {
pub async fn list_tools(&self) -> Result<Vec<ToolDescriptor>> {
match self {
UnifiedMCPClient::Http(client) => client.list_tools().await,
UnifiedMCPClient::Stdio(client) => client.list_tools().await,
}
}
pub async fn call_tool(&self, name: &str, args: &Value) -> Result<Value> {
match self {
UnifiedMCPClient::Http(client) => client.call_tool(name, args).await,
UnifiedMCPClient::Stdio(client) => client.call_tool(name, args).await,
}
}
}
#[derive(Debug, Clone)]
pub struct MCPClient {
base_url: String,
api_key: Option<String>,
http: reqwest::Client,
}
impl MCPClient {
pub fn new<S: Into<String>>(base_url: S, api_key: Option<String>) -> Result<Self> {
Ok(Self {
base_url: base_url.into().trim_end_matches('/').to_string(),
api_key,
http: reqwest::Client::builder()
.user_agent("cli_engineer/1.0 (+https://cli.engineer)")
.build()
.context("failed to build reqwest client")?,
})
}
fn auth_header(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
if let Some(key) = &self.api_key {
req.bearer_auth(key)
} else {
req
}
}
pub async fn list_tools(&self) -> Result<Vec<ToolDescriptor>> {
let url = format!("{}/v1/tools", self.base_url);
debug!("Fetching tools from HTTP MCP: {}", url);
let resp = self
.auth_header(self.http.get(&url))
.send()
.await
.context("failed to query MCP /tools endpoint")?;
let status = resp.status();
if !status.is_success() {
anyhow::bail!("MCP server returned HTTP {}", status);
}
let text = resp.text().await
.context("failed to read response body")?;
debug!("HTTP MCP response: {}", text);
let tools: Vec<ToolDescriptor> = match serde_json::from_str(&text) {
Ok(tools) => tools,
Err(e) => {
let preview = if text.len() > 200 {
format!("{}...", &text[..200])
} else {
text.clone()
};
return Err(anyhow::anyhow!(
"invalid JSON from MCP /tools endpoint. Parse error: {}. Response preview: {}",
e, preview
));
}
};
Ok(tools)
}
pub async fn call_tool(&self, name: &str, args: &Value) -> Result<Value> {
let url = format!("{}/v1/tools/{}", self.base_url, name);
let resp = self
.auth_header(self.http.post(url).json(args))
.send()
.await
.with_context(|| format!("failed to invoke MCP tool {}", name))?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("MCP tool {} failed with HTTP {} – {}", name, status, text);
}
resp.json().await
.with_context(|| format!("failed to parse MCP tool {} response as JSON", name))
}
#[allow(dead_code)]
pub async fn call_tool_stream(
&self,
name: &str,
args: &Value,
) -> Result<impl Stream<Item = Result<Bytes, reqwest::Error>>> {
let url = format!("{}/v1/tools/{}/stream", self.base_url, name);
let resp = self
.auth_header(self.http.post(url).json(args))
.send()
.await
.with_context(|| format!("failed to invoke MCP stream tool {}", name))?;
Ok(resp.bytes_stream())
}
}