use crate::utils::circuit_breaker::{
AnalysisVector as CircuitAnalysisVector, EndpointId, GranularCircuitBreaker,
};
use crate::utils::debug_logger::VerbosityLevel;
use crate::{debug_error, debug_print, debug_success, debug_warn};
use anyhow::{Context, Result};
use base64::{engine::general_purpose, Engine as _};
use dirs;
use reqwest;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{ChildStdout, Command, Stdio};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader};
use tokio::process::{Child, Command as TokioCommand};
use toml;
const MCP_PROTOCOL_VERSION: &str = "2025-06-18";
const MCP_JSONRPC_VERSION: &str = "2.0";
const CLIENT_NAME: &str = "osvm-cli";
const DEFAULT_HTTP_TIMEOUT_SECS: u64 = 30;
const MAX_REQUEST_ID: u64 = u64::MAX - 1000;
const MCP_METHOD_INITIALIZE: &str = "initialize";
const MCP_METHOD_TOOLS_LIST: &str = "tools/list";
const MCP_METHOD_TOOLS_CALL: &str = "tools/call";
const GITHUB_CLONE_WARNING: &str = "\n⚠️ SECURITY WARNING: You are about to clone and build code from a remote repository.\nThis will execute arbitrary build scripts and binaries on your system.\nOnly proceed if you trust the source repository.\n";
pub mod exit_codes {
pub const SUCCESS: i32 = 0;
pub const GENERAL_ERROR: i32 = 1;
pub const AUTHENTICATION_ERROR: i32 = 2;
pub const NETWORK_ERROR: i32 = 3;
pub const CONFIG_ERROR: i32 = 4;
pub const VALIDATION_ERROR: i32 = 5;
}
#[derive(Debug)]
pub enum McpError {
Authentication(String),
Network(String),
Configuration(String),
Validation(String),
General(String),
}
impl std::fmt::Display for McpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
McpError::Authentication(msg) => write!(f, "Authentication error: {}", msg),
McpError::Network(msg) => write!(f, "Network error: {}", msg),
McpError::Configuration(msg) => write!(f, "Configuration error: {}", msg),
McpError::Validation(msg) => write!(f, "Validation error: {}", msg),
McpError::General(msg) => write!(f, "Error: {}", msg),
}
}
}
impl std::error::Error for McpError {}
impl McpError {
pub fn exit_code(&self) -> i32 {
match self {
McpError::Authentication(_) => exit_codes::AUTHENTICATION_ERROR,
McpError::Network(_) => exit_codes::NETWORK_ERROR,
McpError::Configuration(_) => exit_codes::CONFIG_ERROR,
McpError::Validation(_) => exit_codes::VALIDATION_ERROR,
McpError::General(_) => exit_codes::GENERAL_ERROR,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpServerConfig {
pub name: String,
pub url: String,
pub transport_type: McpTransportType,
pub auth: Option<McpAuthConfig>,
pub enabled: bool,
pub extra_config: HashMap<String, String>,
pub github_url: Option<String>,
pub local_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum McpTransportType {
Stdio,
Http,
Websocket,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpAuthConfig {
pub auth_type: String,
pub token: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
struct McpRequest {
jsonrpc: String,
id: u64,
method: String,
params: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Deserialize)]
struct McpResponse {
jsonrpc: String,
id: u64,
#[serde(default)]
result: Option<serde_json::Value>,
#[serde(default)]
error: Option<McpJsonRpcError>,
}
#[derive(Debug, Clone, Deserialize)]
struct McpJsonRpcError {
code: i32,
message: String,
#[serde(default)]
data: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize)]
struct McpInitializeRequest {
#[serde(rename = "protocolVersion")]
protocol_version: String,
capabilities: ClientCapabilities,
#[serde(rename = "clientInfo")]
client_info: ClientInfo,
}
#[derive(Debug, Clone, Serialize)]
struct ClientCapabilities {
#[serde(skip_serializing_if = "Option::is_none")]
experimental: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
sampling: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize)]
struct ClientInfo {
name: String,
version: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpTool {
pub name: String,
pub description: Option<String>,
#[serde(rename = "inputSchema")]
pub input_schema: serde_json::Value,
}
#[derive(Debug)]
pub struct StdioProcess {
pub child: Child,
pub server_id: String,
pub local_path: PathBuf,
}
pub struct McpService {
servers: HashMap<String, McpServerConfig>,
client: reqwest::Client,
circuit_breaker: GranularCircuitBreaker,
debug_mode: bool,
request_counter: std::sync::atomic::AtomicU64,
stdio_processes: HashMap<String, StdioProcess>,
}
impl McpService {
fn print_error(&self, message: &str, error: &anyhow::Error) {
if self.debug_mode {
debug_error!("{}: {}", message, error);
} else {
eprintln!("❌ {}: {}", message, error);
}
}
fn create_authenticated_request(
&self,
url: &str,
auth: &Option<McpAuthConfig>,
request: &McpRequest,
) -> reqwest::RequestBuilder {
let mut req_builder = self
.client
.post(url)
.header("Content-Type", "application/json")
.timeout(std::time::Duration::from_secs(DEFAULT_HTTP_TIMEOUT_SECS))
.json(request);
if let Some(auth) = auth {
req_builder = match auth.auth_type.as_str() {
"bearer" => {
if let Some(token) = &auth.token {
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Using Bearer authentication (token masked)"
);
}
req_builder.header("Authorization", format!("Bearer {}", token))
} else {
if self.debug_mode {
debug_warn!("Bearer auth configured but no token provided");
}
req_builder
}
}
"api_key" => {
if let Some(token) = &auth.token {
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Using API Key authentication (key masked)"
);
}
req_builder.header("X-API-Key", token)
} else {
if self.debug_mode {
debug_warn!("API key auth configured but no token provided");
}
req_builder
}
}
"basic" => {
if let Some(username) = &auth.username {
if let Some(password) = &auth.password {
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Using Basic authentication for user: {} (password masked)",
username
);
}
let credentials = general_purpose::STANDARD
.encode(format!("{}:{}", username, password));
req_builder.header("Authorization", format!("Basic {}", credentials))
} else {
if self.debug_mode {
debug_warn!("Basic auth configured but no password provided");
}
req_builder
}
} else {
if self.debug_mode {
debug_warn!("Basic auth configured but no username provided");
}
req_builder
}
}
_ => {
if self.debug_mode {
debug_warn!("Unknown auth type: {}", auth.auth_type);
}
req_builder
}
};
}
req_builder
}
fn next_request_id(&self) -> u64 {
let id = self
.request_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if id >= MAX_REQUEST_ID {
self.request_counter
.store(1, std::sync::atomic::Ordering::SeqCst);
1
} else {
id
}
}
pub fn new() -> Self {
Self {
servers: HashMap::new(),
client: reqwest::Client::new(),
circuit_breaker: GranularCircuitBreaker::new(),
debug_mode: false,
request_counter: std::sync::atomic::AtomicU64::new(1),
stdio_processes: HashMap::new(),
}
}
pub fn new_with_debug(debug_mode: bool) -> Self {
let mut service = Self::new();
service.debug_mode = debug_mode;
service
}
pub fn load_config(&mut self) -> Result<()> {
self.load_from_file()?;
self.load_from_env()?;
Ok(())
}
fn load_from_file(&mut self) -> Result<()> {
let config_path = dirs::config_dir()
.unwrap_or_else(|| std::env::current_dir().unwrap())
.join("osvm")
.join("mcp_servers.json");
if !config_path.exists() {
return Ok(()); }
let content =
std::fs::read_to_string(&config_path).context("Failed to read MCP config file")?;
let servers: HashMap<String, McpServerConfig> =
serde_json::from_str(&content).context("Failed to parse MCP config file")?;
for (server_id, config) in servers {
self.servers.insert(server_id, config);
}
if self.debug_mode {
debug_success!("Loaded {} MCP servers from config file", self.servers.len());
}
Ok(())
}
pub fn save_config(&self) -> Result<()> {
let config_dir = dirs::config_dir()
.unwrap_or_else(|| std::env::current_dir().unwrap())
.join("osvm");
std::fs::create_dir_all(&config_dir)?;
let config_path = config_dir.join("mcp_servers.json");
let content = serde_json::to_string_pretty(&self.servers)
.context("Failed to serialize MCP servers")?;
std::fs::write(&config_path, content).context("Failed to write MCP config file")?;
if self.debug_mode {
debug_success!("Saved {} MCP servers to config file", self.servers.len());
}
Ok(())
}
fn load_from_env(&mut self) -> Result<()> {
if let Ok(url) = env::var("SOLANA_MCP_SERVER_URL") {
let config = McpServerConfig {
name: "solana-mcp-server".to_string(),
url,
transport_type: McpTransportType::Http,
auth: None,
enabled: true,
extra_config: HashMap::new(),
github_url: None,
local_path: None,
};
self.servers.insert("solana".to_string(), config);
if self.debug_mode {
debug_success!("Loaded Solana MCP server from environment");
}
}
for (key, value) in env::vars() {
if key.starts_with("MCP_SERVER_") && key.ends_with("_URL") {
let server_name = key
.strip_prefix("MCP_SERVER_")
.unwrap()
.strip_suffix("_URL")
.unwrap()
.to_lowercase();
let config = McpServerConfig {
name: server_name.clone(),
url: value,
transport_type: McpTransportType::Http,
auth: None,
enabled: true,
extra_config: HashMap::new(),
github_url: None,
local_path: None,
};
self.servers.insert(server_name.clone(), config);
if self.debug_mode {
debug_success!("Loaded {} MCP server from environment", server_name);
}
}
}
Ok(())
}
pub fn add_server(&mut self, server_id: String, config: McpServerConfig) {
self.servers.insert(server_id, config);
if let Err(e) = self.save_config() {
if self.debug_mode {
debug_warn!("Failed to save config: {}", e);
}
}
}
pub async fn add_server_from_github(
&mut self,
server_id: String,
github_url: String,
name: Option<String>,
skip_confirmation: bool,
) -> Result<()> {
if !self.is_valid_github_url(&github_url) {
return Err(anyhow::anyhow!(
"Invalid GitHub URL. Must be a valid GitHub repository URL."
));
}
eprintln!("{}", GITHUB_CLONE_WARNING);
if !skip_confirmation && !self.confirm_github_clone(&github_url)? {
return Err(anyhow::anyhow!("Operation cancelled by user"));
}
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Cloning MCP server from GitHub: {}",
github_url
);
}
let temp_dir = std::env::temp_dir().join("osvm-mcp-servers");
std::fs::create_dir_all(&temp_dir)?;
let local_path = temp_dir.join(&server_id);
if local_path.exists() {
std::fs::remove_dir_all(&local_path)?;
}
let clone_result = Command::new("git")
.args([
"clone",
"--depth",
"1", "--single-branch", &github_url,
local_path.to_str().unwrap(),
])
.output()
.context("Failed to execute git clone command")?;
if !clone_result.status.success() {
let error_msg = String::from_utf8_lossy(&clone_result.stderr);
return Err(anyhow::anyhow!("Git clone failed: {}", error_msg));
}
if self.debug_mode {
debug_success!("Successfully cloned {} to {:?}", github_url, local_path);
}
let cargo_toml_path = local_path.join("Cargo.toml");
let package_json_path = local_path.join("package.json");
let execution_command = if cargo_toml_path.exists() {
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Building Rust project at {:?}",
local_path
);
}
let build_result = Command::new("cargo")
.args(["build", "--release"])
.current_dir(&local_path)
.env("CARGO_NET_OFFLINE", "false") .output()
.context("Failed to execute cargo build command")?;
if !build_result.status.success() {
let error_msg = String::from_utf8_lossy(&build_result.stderr);
return Err(anyhow::anyhow!("Cargo build failed: {}", error_msg));
}
if self.debug_mode {
debug_success!("Successfully built Rust MCP server at {:?}", local_path);
}
let binary_name = self.get_binary_name_from_cargo_toml(&cargo_toml_path)?;
let binary_path = local_path.join("target/release").join(&binary_name);
if !binary_path.exists() {
return Err(anyhow::anyhow!(
"Built binary not found at {:?}",
binary_path
));
}
binary_path.to_str().unwrap().to_string()
} else if package_json_path.exists() {
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Building Node.js project at {:?}",
local_path
);
}
let install_result = Command::new("npm")
.args(["install"])
.current_dir(&local_path)
.output()
.context("Failed to execute npm install command")?;
if !install_result.status.success() {
let error_msg = String::from_utf8_lossy(&install_result.stderr);
return Err(anyhow::anyhow!("npm install failed: {}", error_msg));
}
if self.debug_mode {
debug_success!(
"Successfully installed npm dependencies for MCP server at {:?}",
local_path
);
}
let (package_name, main_script) =
self.get_package_info_from_package_json(&package_json_path)?;
let script_path = if let Some(script) = main_script {
if let Some(stripped) = script.strip_prefix("./") {
local_path.join(stripped)
} else if script.starts_with("/") {
PathBuf::from(script)
} else {
local_path.join(script)
}
} else {
return Err(anyhow::anyhow!(
"No main script found in package.json for package '{}'",
package_name
));
};
if !script_path.exists() {
return Err(anyhow::anyhow!(
"Main script not found at {:?}",
script_path
));
}
format!("node {}", script_path.to_str().unwrap())
} else {
return Err(anyhow::anyhow!(
"No Cargo.toml or package.json found in the cloned repository. \
Only Rust (Cargo) and Node.js (npm) MCP servers are currently supported."
));
};
let config = McpServerConfig {
name: name.unwrap_or_else(|| format!("{} (from GitHub)", server_id)),
url: execution_command,
transport_type: McpTransportType::Stdio,
auth: None,
enabled: true,
extra_config: HashMap::new(),
github_url: Some(github_url),
local_path: Some(local_path.to_str().unwrap().to_string()),
};
self.servers.insert(server_id, config);
if let Err(e) = self.save_config() {
if self.debug_mode {
debug_warn!("Failed to save config: {}", e);
}
}
Ok(())
}
fn is_valid_github_url(&self, url: &str) -> bool {
url.starts_with("https://github.com/") &&
url.len() > 19 && url.chars().all(|c| c.is_ascii() && c != '<' && c != '>') && url.split('/').count() >= 5 }
fn confirm_github_clone(&self, github_url: &str) -> Result<bool> {
use std::io::{self, Write};
print!("Continue cloning from {}? [y/N]: ", github_url);
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input = input.trim().to_lowercase();
Ok(input == "y" || input == "yes")
}
fn get_binary_name_from_cargo_toml(&self, cargo_toml_path: &Path) -> Result<String> {
let content =
std::fs::read_to_string(cargo_toml_path).context("Failed to read Cargo.toml")?;
match toml::from_str::<toml::Value>(&content) {
Ok(toml_value) => {
if let Some(package) = toml_value.get("package") {
if let Some(name) = package.get("name") {
if let Some(name_str) = name.as_str() {
return Ok(name_str.to_string());
}
}
}
self.parse_simple_package_name(&content)
}
Err(_) => {
self.parse_simple_package_name(&content)
}
}
}
fn get_package_info_from_package_json(
&self,
package_json_path: &Path,
) -> Result<(String, Option<String>)> {
let content =
std::fs::read_to_string(package_json_path).context("Failed to read package.json")?;
let package_json: serde_json::Value =
serde_json::from_str(&content).context("Failed to parse package.json")?;
let name = package_json
.get("name")
.and_then(|n| n.as_str())
.ok_or_else(|| anyhow::anyhow!("No 'name' field found in package.json"))?
.to_string();
let main_script = if let Some(bin) = package_json.get("bin") {
match bin {
serde_json::Value::String(script) => Some(script.clone()),
serde_json::Value::Object(bin_obj) => {
bin_obj
.get(&name)
.or_else(|| bin_obj.values().next())
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
_ => None,
}
} else if let Some(main) = package_json.get("main") {
main.as_str().map(|s| s.to_string())
} else {
Some("index.js".to_string())
};
Ok((name, main_script))
}
fn parse_simple_package_name(&self, content: &str) -> Result<String> {
for line in content.lines() {
let line = line.trim();
if line.starts_with("name") && line.contains("=") {
if let Some(name_part) = line.split('=').nth(1) {
let name = name_part.trim().trim_matches('"').trim_matches('\'');
if !name.is_empty() {
return Ok(name.to_string());
}
}
}
}
Err(anyhow::anyhow!("Could not find package name in Cargo.toml"))
}
pub fn remove_server(&mut self, server_id: &str) -> Option<McpServerConfig> {
let result = self.servers.remove(server_id);
if result.is_some() {
if let Err(e) = self.save_config() {
if self.debug_mode {
debug_warn!("Failed to save config: {}", e);
}
}
}
result
}
pub fn list_servers(&self) -> Vec<(&String, &McpServerConfig)> {
self.servers.iter().collect()
}
pub fn get_server(&self, server_id: &str) -> Option<&McpServerConfig> {
self.servers.get(server_id)
}
pub fn toggle_server(&mut self, server_id: &str, enabled: bool) -> Result<()> {
let config = self
.servers
.get_mut(server_id)
.ok_or_else(|| anyhow::anyhow!("Server '{}' not found", server_id))?;
config.enabled = enabled;
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Server '{}' {}",
server_id,
if enabled { "enabled" } else { "disabled" }
);
}
if let Err(e) = self.save_config() {
if self.debug_mode {
debug_warn!("Failed to save config: {}", e);
}
}
Ok(())
}
pub async fn initialize_server(&mut self, server_id: &str) -> Result<()> {
let config = self
.servers
.get(server_id)
.ok_or_else(|| anyhow::anyhow!("Server '{}' not found", server_id))?
.clone();
if !config.enabled {
return Err(anyhow::anyhow!("Server '{}' is disabled", server_id));
}
match config.transport_type {
McpTransportType::Http => self.initialize_http_server(&config).await,
McpTransportType::Websocket => self.initialize_websocket_server(&config).await,
McpTransportType::Stdio => self.initialize_stdio_server(server_id, &config).await,
}
}
async fn initialize_http_server(&self, config: &McpServerConfig) -> Result<()> {
let endpoint_id = EndpointId {
service: "mcp".to_string(),
endpoint: config.url.clone(),
};
if !self.circuit_breaker.can_execute_endpoint(&endpoint_id) {
return Err(anyhow::anyhow!(
"Circuit breaker is open for MCP server '{}'",
config.name
));
}
let init_request = McpInitializeRequest {
protocol_version: MCP_PROTOCOL_VERSION.to_string(),
capabilities: ClientCapabilities {
experimental: None,
sampling: None,
},
client_info: ClientInfo {
name: CLIENT_NAME.to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
};
let request = McpRequest {
jsonrpc: MCP_JSONRPC_VERSION.to_string(),
id: self.next_request_id(),
method: MCP_METHOD_INITIALIZE.to_string(),
params: Some(serde_json::to_value(&init_request)?),
};
let response = self
.create_authenticated_request(
&format!("{}/api/mcp", config.url),
&config.auth,
&request,
)
.send()
.await
.context("Failed to send initialize request to MCP server")?;
if !response.status().is_success() {
self.circuit_breaker.on_failure_endpoint(&endpoint_id);
return Err(anyhow::anyhow!(
"MCP server returned error status: {}",
response.status()
));
}
let mcp_response: McpResponse = response
.json()
.await
.context("Failed to parse MCP initialize response")?;
if let Some(error) = mcp_response.error {
self.circuit_breaker.on_failure_endpoint(&endpoint_id);
return Err(anyhow::anyhow!(
"MCP server initialization error: {} - {}",
error.code,
error.message
));
}
self.circuit_breaker.on_success_endpoint(&endpoint_id);
if self.debug_mode {
debug_success!("Successfully initialized MCP server '{}'", config.name);
}
Ok(())
}
async fn initialize_websocket_server(&self, _config: &McpServerConfig) -> Result<()> {
Err(anyhow::anyhow!(
"WebSocket transport is not yet implemented. Please use HTTP or stdio transport instead.\n\
To add HTTP transport: osvm mcp add <server_id> --server-url <url> --transport http\n\
To add stdio transport: osvm mcp add-github <server_id> <github_url>"
))
}
async fn initialize_stdio_server(
&mut self,
server_id: &str,
config: &McpServerConfig,
) -> Result<()> {
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Starting stdio MCP server: {}",
config.url
);
}
if let Some(mut existing_process) = self.stdio_processes.remove(server_id) {
if let Err(e) = existing_process.child.kill().await {
if self.debug_mode {
debug_warn!("Failed to kill existing stdio process: {}", e);
}
}
}
let mut cmd_parts = config.url.split_whitespace();
let command = cmd_parts
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid command in server URL: {}", config.url))?;
let mut cmd = TokioCommand::new(command);
for arg in cmd_parts {
cmd.arg(arg);
}
cmd.arg("stdio");
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd.spawn().context("Failed to start MCP server process")?;
let init_request = McpInitializeRequest {
protocol_version: MCP_PROTOCOL_VERSION.to_string(),
capabilities: ClientCapabilities {
experimental: None,
sampling: None,
},
client_info: ClientInfo {
name: CLIENT_NAME.to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
};
let request = McpRequest {
jsonrpc: MCP_JSONRPC_VERSION.to_string(),
id: self.next_request_id(),
method: MCP_METHOD_INITIALIZE.to_string(),
params: Some(serde_json::to_value(&init_request)?),
};
let request_json = serde_json::to_string(&request)?;
if let Some(stdin) = child.stdin.as_mut() {
stdin.write_all(request_json.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
}
if let Some(stdout) = child.stdout.take() {
let mut reader = TokioBufReader::new(stdout);
let mut line = String::new();
let mut response_found = false;
while reader.read_line(&mut line).await? > 0 {
let line_trimmed = line.trim();
if line_trimmed.starts_with("{") {
let mcp_response: Result<McpResponse, _> = serde_json::from_str(line_trimmed);
if let Ok(response) = mcp_response {
if response.id == request.id {
if let Some(error) = response.error {
return Err(anyhow::anyhow!(
"MCP server initialization error: {} - {}",
error.code,
error.message
));
}
response_found = true;
break;
}
}
}
line.clear();
}
if !response_found {
return Err(anyhow::anyhow!(
"No valid response received from MCP server"
));
}
}
if let Some(local_path) = &config.local_path {
let stdio_process = StdioProcess {
child,
server_id: server_id.to_string(),
local_path: PathBuf::from(local_path),
};
self.stdio_processes
.insert(server_id.to_string(), stdio_process);
}
if self.debug_mode {
debug_success!(
"Successfully initialized stdio MCP server '{}'",
config.name
);
}
Ok(())
}
pub async fn cleanup_stdio_processes(&mut self) {
for (server_id, mut process) in self.stdio_processes.drain() {
if let Err(e) = process.child.kill().await {
if self.debug_mode {
debug_warn!("Failed to cleanup stdio process for '{}': {}", server_id, e);
}
}
}
}
pub async fn list_tools(&self, server_id: &str) -> Result<Vec<McpTool>> {
let config = self
.servers
.get(server_id)
.ok_or_else(|| anyhow::anyhow!("Server '{}' not found", server_id))?;
if !config.enabled {
return Err(anyhow::anyhow!("Server '{}' is disabled", server_id));
}
match config.transport_type {
McpTransportType::Http => self.list_tools_http(server_id, config).await,
McpTransportType::Stdio => self.list_tools_stdio(server_id, config).await,
McpTransportType::Websocket => {
Err(anyhow::anyhow!("WebSocket transport not yet implemented"))
}
}
}
async fn list_tools_http(
&self,
_server_id: &str,
config: &McpServerConfig,
) -> Result<Vec<McpTool>> {
let endpoint_id = EndpointId {
service: "mcp".to_string(),
endpoint: config.url.clone(),
};
if !self.circuit_breaker.can_execute_endpoint(&endpoint_id) {
return Err(anyhow::anyhow!(
"Circuit breaker is open for MCP server '{}'",
config.name
));
}
let request = McpRequest {
jsonrpc: "2.0".to_string(),
id: self
.request_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
method: MCP_METHOD_TOOLS_LIST.to_string(),
params: Some(serde_json::json!({})), };
let mut req_builder = self
.client
.post(format!("{}/api/mcp", config.url))
.header("Content-Type", "application/json")
.json(&request);
if let Some(auth) = &config.auth {
req_builder = match auth.auth_type.as_str() {
"bearer" => {
if let Some(token) = &auth.token {
req_builder.header("Authorization", format!("Bearer {}", token))
} else {
req_builder
}
}
"api_key" => {
if let Some(token) = &auth.token {
req_builder.header("X-API-Key", token)
} else {
req_builder
}
}
_ => req_builder,
};
}
let response = req_builder
.send()
.await
.context("Failed to send tools/list request to MCP server")?;
if !response.status().is_success() {
self.circuit_breaker.on_failure_endpoint(&endpoint_id);
return Err(anyhow::anyhow!(
"MCP server returned error status: {}",
response.status()
));
}
let mcp_response: McpResponse = response
.json()
.await
.context("Failed to parse MCP tools/list response")?;
if let Some(error) = mcp_response.error {
self.circuit_breaker.on_failure_endpoint(&endpoint_id);
return Err(anyhow::anyhow!(
"MCP server tools/list error: {} - {}",
error.code,
error.message
));
}
let result = mcp_response
.result
.ok_or_else(|| anyhow::anyhow!("MCP server returned no result for tools/list"))?;
let tools_list: serde_json::Value = result
.get("tools")
.ok_or_else(|| anyhow::anyhow!("MCP server response missing 'tools' field"))?
.clone();
let tools: Vec<McpTool> = serde_json::from_value(tools_list)
.context("Failed to parse tools from MCP server response")?;
self.circuit_breaker.on_success_endpoint(&endpoint_id);
if self.debug_mode {
debug_success!(
"Retrieved {} tools from MCP server '{}'",
tools.len(),
config.name
);
}
Ok(tools)
}
async fn list_tools_stdio(
&self,
server_id: &str,
config: &McpServerConfig,
) -> Result<Vec<McpTool>> {
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Listing tools from stdio MCP server: {}",
config.name
);
}
let mut parts = config.url.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid stdio command: {}", config.url))?;
let args: Vec<&str> = parts.collect();
let mut child = Command::new(program)
.args(&args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("Failed to spawn stdio MCP server process")?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to get stdin for stdio process"))?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to get stdout for stdio process"))?;
let init_request = McpRequest {
jsonrpc: MCP_JSONRPC_VERSION.to_string(),
id: 1,
method: MCP_METHOD_INITIALIZE.to_string(),
params: Some(serde_json::json!({
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {
"name": CLIENT_NAME,
"version": env!("CARGO_PKG_VERSION")
}
})),
};
let init_request_str = serde_json::to_string(&init_request)?;
stdin.write_all(init_request_str.as_bytes())?;
stdin.write_all(b"\n")?;
stdin.flush()?;
let mut reader = BufReader::new(stdout);
let init_response_str = self
.read_mcp_response(&mut reader, "initialization")
.context("Failed to read initialization response from stdio MCP server")?;
let _init_response: McpResponse = serde_json::from_str(&init_response_str)
.context("Failed to parse initialize response from stdio MCP server")?;
let tools_request = McpRequest {
jsonrpc: MCP_JSONRPC_VERSION.to_string(),
id: 2,
method: MCP_METHOD_TOOLS_LIST.to_string(),
params: Some(serde_json::json!({})), };
let tools_request_str = serde_json::to_string(&tools_request)?;
stdin.write_all(tools_request_str.as_bytes())?;
stdin.write_all(b"\n")?;
stdin.flush()?;
let tools_response_str = self
.read_mcp_response(&mut reader, "tools/list")
.context("Failed to read tools response from stdio MCP server")?;
let tools_response: McpResponse = serde_json::from_str(&tools_response_str)
.context("Failed to parse tools response from stdio MCP server")?;
let _ = child.kill();
if let Some(error) = tools_response.error {
return Err(anyhow::anyhow!(
"MCP server tools/list error: {} - {}",
error.code,
error.message
));
}
let result = tools_response
.result
.ok_or_else(|| anyhow::anyhow!("MCP server returned no result for tools/list"))?;
let tools_list: serde_json::Value = result
.get("tools")
.ok_or_else(|| anyhow::anyhow!("MCP server response missing 'tools' field"))?
.clone();
let tools: Vec<McpTool> = serde_json::from_value(tools_list)
.context("Failed to parse tools from MCP server response")?;
if self.debug_mode {
debug_success!(
"Retrieved {} tools from stdio MCP server '{}'",
tools.len(),
config.name
);
}
Ok(tools)
}
fn read_mcp_response(
&self,
reader: &mut BufReader<ChildStdout>,
operation: &str,
) -> Result<String> {
let mut line = String::new();
let mut attempts = 0;
const MAX_ATTEMPTS: usize = 50;
loop {
attempts += 1;
if attempts > MAX_ATTEMPTS {
return Err(anyhow::anyhow!(
"Timeout waiting for MCP response during {}",
operation
));
}
line.clear();
let bytes_read = reader
.read_line(&mut line)
.context("Failed to read line from stdio process")?;
if bytes_read == 0 {
return Err(anyhow::anyhow!(
"Unexpected EOF from stdio MCP server during {}",
operation
));
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if trimmed.starts_with('{') && trimmed.contains("jsonrpc") {
match serde_json::from_str::<serde_json::Value>(trimmed) {
Ok(json) => {
if json.get("jsonrpc").is_some()
&& (json.get("result").is_some() || json.get("error").is_some())
{
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Found valid MCP response for {}",
operation
);
}
return Ok(trimmed.to_string());
}
}
Err(_) => {
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Skipping invalid JSON line: {}",
trimmed
);
}
continue;
}
}
}
if self.is_log_message(trimmed) {
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Skipping log message: {}",
trimmed
);
}
continue;
}
if self.debug_mode {
debug_print!(
VerbosityLevel::Detailed,
"Skipping non-protocol line: {}",
trimmed
);
}
}
}
fn is_log_message(&self, line: &str) -> bool {
if line.contains("\"level\":")
|| line.contains("\"timestamp\":")
|| line.contains("\"message\":")
|| line.contains("\"time\":")
|| line.contains("\"msg\":")
{
return true;
}
if line.contains("Starting Solana MCP server")
|| line.contains("Loaded config:")
|| line.contains("Opened stdio transport")
|| line.contains("Starting message loop")
{
return true;
}
if line.starts_with('{') && !line.contains("jsonrpc") {
return true;
}
false
}
pub async fn call_tool(
&self,
server_id: &str,
tool_name: &str,
arguments: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let config = self
.servers
.get(server_id)
.ok_or_else(|| anyhow::anyhow!("Server '{}' not found", server_id))?;
if !config.enabled {
return Err(anyhow::anyhow!("Server '{}' is disabled", server_id));
}
match config.transport_type {
McpTransportType::Http | McpTransportType::Websocket => {
self.call_tool_http(server_id, tool_name, arguments, config)
.await
}
McpTransportType::Stdio => {
self.call_tool_stdio(server_id, tool_name, arguments, config)
.await
}
}
}
async fn call_tool_http(
&self,
server_id: &str,
tool_name: &str,
arguments: Option<serde_json::Value>,
config: &McpServerConfig,
) -> Result<serde_json::Value> {
let endpoint_id = EndpointId {
service: "mcp".to_string(),
endpoint: config.url.clone(),
};
if !self.circuit_breaker.can_execute_endpoint(&endpoint_id) {
return Err(anyhow::anyhow!(
"Circuit breaker is open for MCP server '{}'",
config.name
));
}
let tool_call = serde_json::json!({
"name": tool_name,
"arguments": arguments
});
let request = McpRequest {
jsonrpc: "2.0".to_string(),
id: self
.request_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
method: MCP_METHOD_TOOLS_CALL.to_string(),
params: Some(tool_call),
};
let mut req_builder = self
.client
.post(format!("{}/api/mcp", config.url))
.header("Content-Type", "application/json")
.json(&request);
if let Some(auth) = &config.auth {
req_builder = match auth.auth_type.as_str() {
"bearer" => {
if let Some(token) = &auth.token {
req_builder.header("Authorization", format!("Bearer {}", token))
} else {
req_builder
}
}
"api_key" => {
if let Some(token) = &auth.token {
req_builder.header("X-API-Key", token)
} else {
req_builder
}
}
_ => req_builder,
};
}
let response = req_builder
.send()
.await
.context("Failed to send tools/call request to MCP server")?;
if !response.status().is_success() {
self.circuit_breaker.on_failure_endpoint(&endpoint_id);
return Err(anyhow::anyhow!(
"MCP server returned error status: {}",
response.status()
));
}
let mcp_response: McpResponse = response
.json()
.await
.context("Failed to parse MCP tools/call response")?;
if let Some(error) = mcp_response.error {
self.circuit_breaker.on_failure_endpoint(&endpoint_id);
return Err(anyhow::anyhow!(
"MCP server tools/call error: {} - {}",
error.code,
error.message
));
}
let result = mcp_response
.result
.ok_or_else(|| anyhow::anyhow!("MCP server returned no result for tools/call"))?;
self.circuit_breaker.on_success_endpoint(&endpoint_id);
if self.debug_mode {
debug_success!(
"Successfully called tool '{}' on MCP server '{}'",
tool_name,
config.name
);
}
Ok(result)
}
async fn call_tool_stdio(
&self,
server_id: &str,
tool_name: &str,
arguments: Option<serde_json::Value>,
config: &McpServerConfig,
) -> Result<serde_json::Value> {
if self.debug_mode {
debug_print!(
VerbosityLevel::Basic,
"Calling tool '{}' on stdio MCP server: {}",
tool_name,
config.name
);
}
let mut parts = config.url.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid stdio command: {}", config.url))?;
let args: Vec<&str> = parts.collect();
let mut child = Command::new(program)
.args(&args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("Failed to spawn stdio MCP server process")?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to get stdin for stdio process"))?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to get stdout for stdio process"))?;
let init_request = McpRequest {
jsonrpc: MCP_JSONRPC_VERSION.to_string(),
id: 1,
method: MCP_METHOD_INITIALIZE.to_string(),
params: Some(serde_json::json!({
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {
"name": CLIENT_NAME,
"version": env!("CARGO_PKG_VERSION")
}
})),
};
let init_request_str = serde_json::to_string(&init_request)?;
stdin.write_all(init_request_str.as_bytes())?;
stdin.write_all(b"\n")?;
stdin.flush()?;
let mut reader = BufReader::new(stdout);
let init_response_str = self
.read_mcp_response(&mut reader, "initialization")
.context("Failed to read initialization response from stdio MCP server")?;
let _init_response: McpResponse = serde_json::from_str(&init_response_str)
.context("Failed to parse initialize response from stdio MCP server")?;
let tool_call = serde_json::json!({
"name": tool_name,
"arguments": arguments
});
let call_request = McpRequest {
jsonrpc: MCP_JSONRPC_VERSION.to_string(),
id: 2,
method: MCP_METHOD_TOOLS_CALL.to_string(),
params: Some(tool_call),
};
let call_request_str = serde_json::to_string(&call_request)?;
stdin.write_all(call_request_str.as_bytes())?;
stdin.write_all(b"\n")?;
stdin.flush()?;
let call_response_str = self
.read_mcp_response(&mut reader, "tools/call")
.context("Failed to read tools/call response from stdio MCP server")?;
let call_response: McpResponse = serde_json::from_str(&call_response_str)
.context("Failed to parse tools/call response from stdio MCP server")?;
let _ = child.kill();
if let Some(error) = call_response.error {
return Err(anyhow::anyhow!(
"MCP server tools/call error: {} - {}",
error.code,
error.message
));
}
let result = call_response
.result
.ok_or_else(|| anyhow::anyhow!("MCP server returned no result for tools/call"))?;
if self.debug_mode {
debug_success!(
"Successfully called tool '{}' on stdio MCP server '{}'",
tool_name,
config.name
);
}
Ok(result)
}
pub async fn query_all_servers(
&self,
tool_name: &str,
arguments: Option<serde_json::Value>,
) -> Vec<(String, Result<serde_json::Value>)> {
let mut results = Vec::new();
for (server_id, config) in &self.servers {
if !config.enabled {
continue;
}
let result = self
.call_tool(server_id, tool_name, arguments.clone())
.await;
results.push((server_id.clone(), result));
}
results
}
pub fn get_server_status(&self) -> HashMap<String, String> {
let mut status = HashMap::new();
for (server_id, config) in &self.servers {
let status_str = if config.enabled {
format!("Enabled - {} ({})", config.name, config.url)
} else {
format!("Disabled - {} ({})", config.name, config.url)
};
status.insert(server_id.clone(), status_str);
}
status
}
pub async fn test_server(&mut self, server_id: &str) -> Result<()> {
self.initialize_server(server_id)
.await
.context("Failed to initialize MCP server")?;
let _tools = self
.list_tools(server_id)
.await
.context("Failed to list tools from MCP server")?;
if self.debug_mode {
debug_success!("MCP server '{}' connectivity test passed", server_id);
}
Ok(())
}
pub fn search_servers(
&self,
query: &str,
transport_filter: Option<&str>,
enabled_only: bool,
) -> Vec<(&String, &McpServerConfig)> {
let query_lower = query.to_lowercase();
self.servers
.iter()
.filter(|(_, config)| {
if enabled_only && !config.enabled {
return false;
}
if let Some(transport) = transport_filter {
if transport != "any" {
let config_transport = match config.transport_type {
McpTransportType::Http => "http",
McpTransportType::Websocket => "websocket",
McpTransportType::Stdio => "stdio",
};
if config_transport != transport {
return false;
}
}
}
let name_matches = config.name.to_lowercase().contains(&query_lower);
let url_matches = config.url.to_lowercase().contains(&query_lower);
let github_matches = config
.github_url
.as_ref()
.map(|url| url.to_lowercase().contains(&query_lower))
.unwrap_or(false);
let auth_matches = config
.auth
.as_ref()
.map(|auth| auth.auth_type.to_lowercase().contains(&query_lower))
.unwrap_or(false);
name_matches || url_matches || github_matches || auth_matches
})
.collect()
}
}
impl Default for McpService {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mcp_service_creation() {
let service = McpService::new();
assert_eq!(service.servers.len(), 0);
assert!(!service.debug_mode);
}
#[test]
fn test_mcp_service_with_debug() {
let service = McpService::new_with_debug(true);
assert_eq!(service.servers.len(), 0);
assert!(service.debug_mode);
}
#[test]
fn test_add_server() {
let mut service = McpService::new();
let config = McpServerConfig {
name: "test-server".to_string(),
url: "http://localhost:3000".to_string(),
transport_type: McpTransportType::Http,
auth: None,
enabled: true,
extra_config: HashMap::new(),
github_url: None,
local_path: None,
};
service.add_server("test".to_string(), config);
assert_eq!(service.servers.len(), 1);
assert!(service.get_server("test").is_some());
}
#[test]
fn test_remove_server() {
let mut service = McpService::new();
let config = McpServerConfig {
name: "test-server".to_string(),
url: "http://localhost:3000".to_string(),
transport_type: McpTransportType::Http,
auth: None,
enabled: true,
extra_config: HashMap::new(),
github_url: None,
local_path: None,
};
service.add_server("test".to_string(), config);
assert_eq!(service.servers.len(), 1);
let removed = service.remove_server("test");
assert!(removed.is_some());
assert_eq!(service.servers.len(), 0);
}
#[test]
fn test_toggle_server() {
let mut service = McpService::new();
let config = McpServerConfig {
name: "test-server".to_string(),
url: "http://localhost:3000".to_string(),
transport_type: McpTransportType::Http,
auth: None,
enabled: true,
extra_config: HashMap::new(),
github_url: None,
local_path: None,
};
service.add_server("test".to_string(), config);
assert!(service.toggle_server("test", false).is_ok());
assert!(!service.get_server("test").unwrap().enabled);
assert!(service.toggle_server("test", true).is_ok());
assert!(service.get_server("test").unwrap().enabled);
assert!(service.toggle_server("nonexistent", true).is_err());
}
#[test]
fn test_get_package_info_from_package_json() {
use std::io::Write;
let service = McpService::new();
let temp_dir = std::env::temp_dir();
let package_json_path = temp_dir.join("test_package.json");
let package_content = r#"{
"name": "test-mcp-server",
"version": "1.0.0",
"main": "index.js"
}"#;
let mut file = std::fs::File::create(&package_json_path).unwrap();
file.write_all(package_content.as_bytes()).unwrap();
let result = service.get_package_info_from_package_json(&package_json_path);
assert!(result.is_ok());
let (name, main_script) = result.unwrap();
assert_eq!(name, "test-mcp-server");
assert_eq!(main_script, Some("index.js".to_string()));
let bin_content = r#"{
"name": "bin-mcp-server",
"version": "1.0.0",
"bin": "bin/server.js"
}"#;
file = std::fs::File::create(&package_json_path).unwrap();
file.write_all(bin_content.as_bytes()).unwrap();
let result = service.get_package_info_from_package_json(&package_json_path);
assert!(result.is_ok());
let (name, main_script) = result.unwrap();
assert_eq!(name, "bin-mcp-server");
assert_eq!(main_script, Some("bin/server.js".to_string()));
let bin_obj_content = r#"{
"name": "obj-mcp-server",
"version": "1.0.0",
"bin": {
"obj-mcp-server": "bin/server.js",
"other-cmd": "bin/other.js"
}
}"#;
file = std::fs::File::create(&package_json_path).unwrap();
file.write_all(bin_obj_content.as_bytes()).unwrap();
let result = service.get_package_info_from_package_json(&package_json_path);
assert!(result.is_ok());
let (name, main_script) = result.unwrap();
assert_eq!(name, "obj-mcp-server");
assert_eq!(main_script, Some("bin/server.js".to_string()));
std::fs::remove_file(&package_json_path).ok();
}
}