use std::borrow::Cow;
use bob_core::{
error::ToolError,
ports::ToolPort,
types::{ToolCall, ToolDescriptor, ToolResult, ToolSource},
};
use rmcp::{
ServiceExt,
model::CallToolRequestParams,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use tokio::process::Command;
type McpService = rmcp::service::RunningService<rmcp::RoleClient, ()>;
pub struct McpToolAdapter {
server_id: String,
service: McpService,
}
impl std::fmt::Debug for McpToolAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("McpToolAdapter").field("server_id", &self.server_id).finish_non_exhaustive()
}
}
impl McpToolAdapter {
pub async fn connect_stdio(
server_id: &str,
command: &str,
args: &[String],
env: &[(String, String)],
) -> Result<Self, ToolError> {
let args_owned: Vec<String> = args.to_vec();
let env_owned: Vec<(String, String)> = env.to_vec();
let transport = TokioChildProcess::new(Command::new(command).configure(move |cmd| {
for arg in &args_owned {
cmd.arg(arg);
}
for (k, v) in &env_owned {
cmd.env(k, v);
}
}))
.map_err(|e| ToolError::Execution(format!("failed to spawn MCP server: {e}")))?;
let service = ()
.serve(transport)
.await
.map_err(|e| ToolError::Execution(format!("failed to connect to MCP server: {e}")))?;
Ok(Self { server_id: server_id.to_string(), service })
}
pub async fn shutdown(self) -> Result<(), ToolError> {
self.service
.cancel()
.await
.map_err(|e| ToolError::Execution(format!("shutdown error: {e}")))?;
Ok(())
}
}
#[async_trait::async_trait]
impl ToolPort for McpToolAdapter {
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
let tools = self
.service
.list_all_tools()
.await
.map_err(|e| ToolError::Execution(format!("list_tools failed: {e}")))?;
let descriptors = tools
.into_iter()
.map(|t| {
let schema_value = serde_json::to_value(t.input_schema.as_ref())
.unwrap_or_else(|_| serde_json::json!({"type": "object"}));
ToolDescriptor::new(
format!("mcp/{}/{}", self.server_id, t.name),
t.description.as_deref().unwrap_or_default(),
)
.with_input_schema(schema_value)
.with_source(ToolSource::Mcp { server: self.server_id.clone() })
})
.collect();
Ok(descriptors)
}
async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
let prefix = format!("mcp/{}/", self.server_id);
let raw_name = call.name.strip_prefix(&prefix).unwrap_or(&call.name);
let arguments: Option<serde_json::Map<String, serde_json::Value>> =
call.arguments.as_object().cloned();
let mut params = CallToolRequestParams::new(Cow::Owned(raw_name.to_string()));
if let Some(args) = arguments {
params = params.with_arguments(args);
}
let result = self
.service
.call_tool(params)
.await
.map_err(|e| ToolError::Execution(format!("call_tool failed: {e}")))?;
let output_text: String = result
.content
.iter()
.filter_map(|c| {
let raw: &rmcp::model::RawContent = c;
raw.as_text().map(|t| t.text.as_str())
})
.collect::<Vec<_>>()
.join("\n");
let output = if output_text.is_empty() {
serde_json::json!(null)
} else {
serde_json::from_str(&output_text).unwrap_or(serde_json::Value::String(output_text))
};
Ok(ToolResult { name: call.name, output, is_error: result.is_error.unwrap_or(false) })
}
}
#[cfg(test)]
mod tests {
#[test]
fn namespace_prefix_strip() {
let prefix = format!("mcp/{}/", "fs");
let raw = "mcp/fs/read_file".strip_prefix(&prefix);
assert_eq!(raw, Some("read_file"));
}
#[test]
fn namespace_prefix_passthrough() {
let prefix = format!("mcp/{}/", "fs");
let raw = "read_file".strip_prefix(&prefix).unwrap_or("read_file");
assert_eq!(raw, "read_file");
}
}