use crate::error::RegistryError;
use pctx_config::server::ServerConfig;
use rmcp::model::{CallToolRequestParams, JsonObject, RawContent};
use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing::{info, instrument, warn};
#[derive(Clone)]
pub struct MCPRegistry {
configs: Arc<RwLock<HashMap<String, ServerConfig>>>,
}
impl MCPRegistry {
pub fn new() -> Self {
Self {
configs: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn add(&self, cfg: ServerConfig) -> Result<(), RegistryError> {
let mut configs = self.configs.write().unwrap();
if configs.contains_key(&cfg.name) {
return Err(RegistryError::Config(format!(
"MCP Server with name \"{}\" is already registered, you cannot register two MCP servers with the same name",
cfg.name
)));
}
configs.insert(cfg.name.clone(), cfg);
Ok(())
}
pub fn get(&self, name: &str) -> Option<ServerConfig> {
let configs = self.configs.read().unwrap();
configs.get(name).cloned()
}
pub fn has(&self, name: &str) -> bool {
let configs = self.configs.read().unwrap();
configs.contains_key(name)
}
pub fn delete(&self, name: &str) -> bool {
let mut configs = self.configs.write().unwrap();
configs.remove(name).is_some()
}
pub fn clear(&self) {
let mut configs = self.configs.write().unwrap();
configs.clear();
}
}
impl Default for MCPRegistry {
fn default() -> Self {
Self::new()
}
}
#[instrument(
name = "invoke_mcp_tool",
skip_all,
fields(id=format!("{server_name}.{tool_name}"), args = json!(args).to_string()),
ret(Display),
err
)]
pub async fn call_mcp_tool(
registry: &MCPRegistry,
server_name: &str,
tool_name: &str,
args: Option<JsonObject>,
) -> Result<serde_json::Value, RegistryError> {
let mcp_cfg = registry.get(server_name).ok_or_else(|| {
RegistryError::ToolCall(format!(
"MCP Server with name \"{server_name}\" does not exist"
))
})?;
let client = match mcp_cfg.connect().await {
Ok(client) => client,
Err(err) => {
warn!(
server = %server_name,
error = %err,
"Could not connect to MCP: initialization failure"
);
return Err(RegistryError::Connection(err.to_string()));
}
};
let tool_result = client
.call_tool({
let mut params = CallToolRequestParams::new(tool_name.to_string());
if let Some(args) = args {
params = params.with_arguments(args);
}
params
})
.await
.map_err(|e| {
RegistryError::ToolCall(format!(
"Tool call \"{server_name}.{tool_name}\" failed: {e}"
))
})?;
let _ = client.cancel().await;
if tool_result.is_error.unwrap_or(false) {
return Err(RegistryError::ToolCall(format!(
"Tool call \"{server_name}.{tool_name}\" failed"
)));
}
let has_structured = tool_result.structured_content.is_some();
let val = if let Some(structured) = tool_result.structured_content {
structured
} else if let Some(RawContent::Text(text_content)) = tool_result.content.first().map(|a| &**a) {
serde_json::from_str(&text_content.text)
.or_else(|_| Ok(serde_json::Value::String(text_content.text.clone())))
.map_err(|e: serde_json::Error| {
RegistryError::ToolCall(format!("Failed to parse content: {e}"))
})?
} else {
json!(tool_result.content)
};
info!(structured_content = has_structured, result =? &val, "Tool result");
Ok(val)
}