use anyhow::Result;
use async_trait::async_trait;
use serde_json::Value;
use std::sync::Arc;
use tokio::process::Command;
use tracing::{error, info};
use rmcp::{
Peer, RoleClient, ServiceExt,
model::{CallToolRequestParams, PaginatedRequestParams, RawContent, Tool as McpTool},
service::RunningService,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use crate::spec_ai_core::tools::{Tool, ToolResult};
pub struct McpToolAdapter {
name: String,
description: String,
parameters: Value,
service: Arc<RunningService<RoleClient, ()>>,
}
impl McpToolAdapter {
pub fn new(mcp_tool: &McpTool, service: Arc<RunningService<RoleClient, ()>>) -> Self {
Self {
name: mcp_tool.name.to_string(),
description: mcp_tool
.description
.as_ref()
.map(|d| d.to_string())
.unwrap_or_default(),
parameters: serde_json::to_value(&mcp_tool.input_schema).unwrap_or(serde_json::json!({
"type": "object",
"properties": {}
})),
service,
}
}
}
#[async_trait]
impl Tool for McpToolAdapter {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
&self.description
}
fn parameters(&self) -> Value {
self.parameters.clone()
}
async fn execute(&self, args: Value) -> Result<ToolResult> {
let arguments = args.as_object().cloned();
let mut request = CallToolRequestParams::default();
request.name = self.name.clone().into();
request.arguments = arguments;
let result = self.service.call_tool(request).await?;
let mut output = String::new();
for content in result.content {
match content.raw {
RawContent::Text(text_content) => {
output.push_str(&text_content.text);
output.push('\n');
}
RawContent::Image(_) => {
output.push_str("[Image Content Not Supported Yet]\n");
}
RawContent::Resource(_) => {
output.push_str("[Resource Content Not Supported Yet]\n");
}
_ => {
output.push_str("[Unsupported Content Type]\n");
}
}
}
if result.is_error.unwrap_or(false) {
Ok(ToolResult::failure(output.trim()))
} else {
Ok(ToolResult::success(output.trim()))
}
}
}
pub struct McpManager {
services: Vec<Arc<RunningService<RoleClient, ()>>>,
}
impl Default for McpManager {
fn default() -> Self {
Self::new()
}
}
impl McpManager {
pub fn new() -> Self {
Self {
services: Vec::new(),
}
}
pub async fn connect_stdio(
&mut self,
command: &str,
args: &[String],
env: &std::collections::HashMap<String, String>,
) -> Result<()> {
info!("Connecting to MCP server: {} {:?}", command, args);
let mut cmd = Command::new(command);
cmd.args(args);
for (k, v) in env {
cmd.env(k, v);
}
let transport = TokioChildProcess::new(cmd)?;
let running_service = ().serve(transport).await?;
self.services.push(Arc::new(running_service));
Ok(())
}
pub async fn list_tools(&self) -> Vec<McpToolAdapter> {
let mut adapters = Vec::new();
for service in &self.services {
match service
.list_tools(None as Option<PaginatedRequestParams>)
.await
{
Ok(response) => {
for mcp_tool in response.tools {
adapters.push(McpToolAdapter::new(&mcp_tool, service.clone()));
}
}
Err(e) => {
error!("Failed to list tools from MCP server: {}", e);
}
}
}
adapters
}
}