use dashmap::DashMap;
use rmcp::{
RoleClient,
model::{CallToolRequestParams, CallToolResult},
service::{DynService, RunningService},
};
use serde_json::Value;
use std::borrow::Cow;
use std::sync::Arc;
use crate::Tool;
struct InternalPegBoard {
tools: DashMap<String, Tool>,
services: DashMap<
String,
(
String,
RunningService<RoleClient, Box<dyn DynService<RoleClient>>>,
),
>,
tool_routing: DashMap<String, ToolRoute>,
namespace_tools: DashMap<String, Vec<String>>,
tool_discovery: DashMap<String, Vec<String>>,
mcp_discovery: DashMap<String, Vec<String>>,
}
#[derive(Debug, Clone)]
struct ToolRoute {
service_id: String,
original_name: String,
}
pub fn prefix_tool_name(namespace: &str, tool_name: &str) -> String {
format!("{}::{}", namespace, tool_name)
}
impl InternalPegBoard {
fn new() -> Self {
Self {
tools: DashMap::new(),
services: DashMap::new(),
tool_routing: DashMap::new(),
namespace_tools: DashMap::new(),
tool_discovery: DashMap::new(),
mcp_discovery: DashMap::new(),
}
}
pub async fn add_service(
&self,
mcp_id: String,
namespace: Option<String>,
service: RunningService<RoleClient, Box<dyn DynService<RoleClient>>>,
) -> Result<(), PegBoardError> {
if self.services.contains_key(&mcp_id) {
return Err(PegBoardError::DuplicateMcpId(mcp_id));
}
let namespace_str = namespace.unwrap_or_default();
let has_namespace = !namespace_str.is_empty();
if has_namespace && self.namespace_tools.contains_key(&namespace_str) {
return Err(PegBoardError::NamespaceAlreadyExists(namespace_str));
}
let tools_response = service
.list_tools(None) .await
.map_err(|e| PegBoardError::ServiceError(format!("Failed to list tools: {:?}", e)))?;
let tools_list: Vec<Tool> = tools_response
.tools
.into_iter()
.map(|rmcp_tool| Tool {
name: rmcp_tool.name,
description: rmcp_tool.description.map(Cow::from),
input_schema: serde_json::Value::Object((*rmcp_tool.input_schema).clone()),
})
.collect();
self.services
.insert(mcp_id.clone(), (namespace_str.clone(), service));
let mut registered_tool_names = Vec::new();
for original_tool in tools_list {
let original_name = original_tool.name.to_string();
let final_name = if has_namespace {
prefix_tool_name(&namespace_str, &original_name)
} else {
original_name.clone()
};
if self.tools.contains_key(&final_name) {
return Err(PegBoardError::ToolAlreadyExists(final_name));
}
let mut final_tool = original_tool.clone();
final_tool.name = Cow::Owned(final_name.clone());
self.tools.insert(final_name.clone(), final_tool);
self.tool_routing.insert(
final_name.clone(),
ToolRoute {
service_id: mcp_id.clone(),
original_name,
},
);
registered_tool_names.push(final_name);
}
if has_namespace {
self.namespace_tools
.insert(namespace_str, registered_tool_names);
}
Ok(())
}
pub fn register_tool(
&self,
namespace: Option<&str>,
tool: Tool,
service_id: &str,
) -> Result<(), PegBoardError> {
if !self.services.contains_key(service_id) {
return Err(PegBoardError::InvalidServiceId(service_id.to_string()));
}
let original_name = tool.name.to_string();
let namespace_str = namespace.unwrap_or("");
let has_namespace = !namespace_str.is_empty();
let final_name = if has_namespace {
prefix_tool_name(namespace_str, &original_name)
} else {
original_name.clone()
};
if self.tools.contains_key(&final_name) {
return Err(PegBoardError::ToolAlreadyExists(final_name));
}
let mut final_tool = tool;
final_tool.name = Cow::Owned(final_name.clone());
self.tools.insert(final_name.clone(), final_tool);
self.tool_routing.insert(
final_name.clone(),
ToolRoute {
service_id: service_id.to_string(),
original_name,
},
);
if has_namespace {
self.namespace_tools
.entry(namespace_str.to_string())
.or_default()
.push(final_name);
}
Ok(())
}
pub fn get_tool(&self, tool_name: &str) -> Option<Tool> {
self.tools.get(tool_name).map(|entry| entry.value().clone())
}
pub fn select_tools(&self, tool_names: &[&str]) -> Option<Vec<Tool>> {
let mut result = Vec::with_capacity(tool_names.len());
for &tool_name in tool_names {
let tool = self.get_tool(tool_name)?;
result.push(tool);
}
Some(result)
}
pub fn get_tool_route(&self, tool_name: &str) -> Option<(String, String)> {
self.tool_routing.get(tool_name).map(|entry| {
let route = entry.value();
(route.service_id.clone(), route.original_name.clone())
})
}
pub fn list_tools_in_namespace(&self, namespace: &str) -> Vec<String> {
self.namespace_tools
.get(namespace)
.map(|entry| entry.value().clone())
.unwrap_or_default()
}
pub fn get_tools_in_namespace(&self, namespace: &str) -> Vec<Tool> {
self.list_tools_in_namespace(namespace)
.iter()
.filter_map(|tool_name| self.get_tool(tool_name))
.collect()
}
pub fn list_all_tools(&self) -> Vec<String> {
self.tools.iter().map(|entry| entry.key().clone()).collect()
}
pub fn get_all_tools(&self) -> Vec<Tool> {
self.tools
.iter()
.map(|entry| entry.value().clone())
.collect()
}
pub fn list_namespaces(&self) -> Vec<String> {
self.namespace_tools
.iter()
.map(|entry| entry.key().clone())
.collect()
}
pub fn unregister_tool(&self, prefixed_name: &str) -> Result<(), PegBoardError> {
if self.tools.remove(prefixed_name).is_none() {
return Err(PegBoardError::ToolNotFound(prefixed_name.to_string()));
}
self.tool_routing.remove(prefixed_name);
for mut namespace_entry in self.namespace_tools.iter_mut() {
namespace_entry.value_mut().retain(|n| n != prefixed_name);
}
Ok(())
}
pub fn unregister_namespace(&self, namespace: &str) -> Result<usize, PegBoardError> {
let prefixed_names = self.list_tools_in_namespace(namespace);
let count = prefixed_names.len();
for prefixed_name in prefixed_names {
self.tools.remove(&prefixed_name);
self.tool_routing.remove(&prefixed_name);
}
let service_id_to_remove = self
.services
.iter()
.find(|entry| entry.value().0 == namespace)
.map(|entry| entry.key().clone());
if let Some(id) = service_id_to_remove {
self.services.remove(&id);
}
self.namespace_tools.remove(namespace);
Ok(count)
}
pub fn unregister_service(&self, service_id: &str) -> Result<usize, PegBoardError> {
let namespace = self
.services
.get(service_id)
.map(|entry| entry.value().0.clone())
.ok_or(PegBoardError::InvalidServiceId(service_id.to_string()))?;
let tools_to_remove: Vec<String> = self
.tool_routing
.iter()
.filter(|entry| entry.value().service_id == service_id)
.map(|entry| entry.key().clone())
.collect();
let count = tools_to_remove.len();
for tool_name in tools_to_remove {
self.tools.remove(&tool_name);
self.tool_routing.remove(&tool_name);
}
self.services.remove(service_id);
if !namespace.is_empty() {
self.namespace_tools.remove(&namespace);
}
Ok(count)
}
pub fn tool_count(&self) -> usize {
self.tools.len()
}
pub fn service_count(&self) -> usize {
self.services.len()
}
pub fn namespace_count(&self) -> usize {
self.namespace_tools.len()
}
pub async fn call_tool(
&self,
tool_name: &str,
arguments: Value,
) -> Result<CallToolResult, PegBoardError> {
let (service_id, original_name) = self
.get_tool_route(tool_name)
.ok_or_else(|| PegBoardError::ToolNotFound(tool_name.to_string()))?;
let service_entry = self
.services
.get(&service_id)
.ok_or(PegBoardError::InvalidServiceId(service_id.clone()))?;
let (_namespace, service) = service_entry.value();
let arguments_obj = match arguments {
Value::Object(obj) => Some(obj),
Value::Null => None,
_ => {
return Err(PegBoardError::ServiceError(
"Tool arguments must be a JSON object or null".to_string(),
));
}
};
let param = match arguments_obj {
Some(obj) => CallToolRequestParams::new(original_name).with_arguments(obj),
None => CallToolRequestParams::new(original_name),
};
service
.call_tool(param)
.await
.map_err(|e| PegBoardError::ServiceError(format!("Tool call failed: {:?}", e)))
}
pub fn register_tool_discovery(&self, tool_name: &str, related_tools: Vec<String>) {
self.tool_discovery
.insert(tool_name.to_string(), related_tools);
}
pub fn discover_tool(&self, tool_name: &str) -> Vec<Tool> {
self.tool_discovery
.get(tool_name)
.map(|entry| {
entry
.value()
.iter()
.filter_map(|name| self.get_tool(name))
.collect()
})
.unwrap_or_default()
}
pub fn register_mcp_discovery(&self, mcp_id: &str, related_mcps: Vec<String>) {
self.mcp_discovery.insert(mcp_id.to_string(), related_mcps);
}
pub fn discover_mcp(&self, mcp_id: &str) -> Vec<String> {
self.mcp_discovery
.get(mcp_id)
.map(|entry| entry.value().clone())
.unwrap_or_default()
}
}
#[derive(Clone)]
pub struct PegBoard {
inner: Arc<InternalPegBoard>,
}
impl PegBoard {
pub fn new() -> Self {
Self {
inner: Arc::new(InternalPegBoard::new()),
}
}
pub async fn add_service(
&self,
mcp_id: String,
namespace: Option<String>,
service: RunningService<RoleClient, Box<dyn DynService<RoleClient>>>,
) -> Result<(), PegBoardError> {
self.inner.add_service(mcp_id, namespace, service).await
}
pub fn register_tool(
&self,
namespace: Option<&str>,
tool: Tool,
service_id: &str,
) -> Result<(), PegBoardError> {
self.inner.register_tool(namespace, tool, service_id)
}
pub fn get_tool(&self, tool_name: &str) -> Option<Tool> {
self.inner.get_tool(tool_name)
}
pub fn select_tools(&self, tool_names: &[&str]) -> Option<Vec<Tool>> {
self.inner.select_tools(tool_names)
}
pub fn get_tool_route(&self, tool_name: &str) -> Option<(String, String)> {
self.inner.get_tool_route(tool_name)
}
pub fn list_tools_in_namespace(&self, namespace: &str) -> Vec<String> {
self.inner.list_tools_in_namespace(namespace)
}
pub fn get_tools_in_namespace(&self, namespace: &str) -> Vec<Tool> {
self.inner.get_tools_in_namespace(namespace)
}
pub fn list_all_tools(&self) -> Vec<String> {
self.inner.list_all_tools()
}
pub fn get_all_tools(&self) -> Vec<Tool> {
self.inner.get_all_tools()
}
pub fn list_namespaces(&self) -> Vec<String> {
self.inner.list_namespaces()
}
pub fn unregister_tool(&self, prefixed_name: &str) -> Result<(), PegBoardError> {
self.inner.unregister_tool(prefixed_name)
}
pub fn unregister_namespace(&self, namespace: &str) -> Result<usize, PegBoardError> {
self.inner.unregister_namespace(namespace)
}
pub fn unregister_service(&self, service_id: &str) -> Result<usize, PegBoardError> {
self.inner.unregister_service(service_id)
}
pub fn tool_count(&self) -> usize {
self.inner.tool_count()
}
pub fn service_count(&self) -> usize {
self.inner.service_count()
}
pub fn namespace_count(&self) -> usize {
self.inner.namespace_count()
}
pub async fn call_tool(
&self,
tool_name: &str,
arguments: Value,
) -> Result<CallToolResult, PegBoardError> {
self.inner.call_tool(tool_name, arguments).await
}
pub fn register_tool_discovery(&self, tool_name: &str, related_tools: Vec<String>) {
self.inner.register_tool_discovery(tool_name, related_tools)
}
pub fn discover_tool(&self, tool_name: &str) -> Vec<Tool> {
self.inner.discover_tool(tool_name)
}
pub fn register_mcp_discovery(&self, mcp_id: &str, related_mcps: Vec<String>) {
self.inner.register_mcp_discovery(mcp_id, related_mcps)
}
pub fn discover_mcp(&self, mcp_id: &str) -> Vec<String> {
self.inner.discover_mcp(mcp_id)
}
}
impl Default for PegBoard {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, thiserror::Error)]
pub enum PegBoardError {
#[error("Tool '{0}' already exists")]
ToolAlreadyExists(String),
#[error("Tool '{0}' not found")]
ToolNotFound(String),
#[error("Invalid service ID '{0}'")]
InvalidServiceId(String),
#[error("MCP ID '{0}' already exists")]
DuplicateMcpId(String),
#[error("Namespace '{0}' already exists")]
NamespaceAlreadyExists(String),
#[error("Service error: {0}")]
ServiceError(String),
}
#[cfg(test)]
mod tests {
use super::*;
use schemars::JsonSchema;
#[derive(JsonSchema)]
#[allow(dead_code)]
struct TestParams {
value: String,
}
#[test]
fn test_prefix_tool_name() {
let prefixed = prefix_tool_name("web_search", "search");
assert_eq!(prefixed, "web_search::search");
let prefixed2 = prefix_tool_name("fs", "read_file");
assert_eq!(prefixed2, "fs::read_file");
}
#[test]
fn test_pegboard_register_and_get() {
let pegboard = PegBoard::new();
let tool = crate::get_tool::<TestParams, _, _>("search", Some("Search tool")).unwrap();
assert_eq!(pegboard.tool_count(), 0);
assert_eq!(pegboard.namespace_count(), 0);
assert!(
pegboard
.register_tool(Some("web"), tool.clone(), "invalid-service-id")
.is_err()
);
assert!(
pegboard
.register_tool(None, tool.clone(), "invalid-service-id")
.is_err()
);
}
#[test]
fn test_pegboard_tool_name_prefixing() {
let original_tool =
crate::get_tool::<TestParams, _, _>("search", Some("Search tool")).unwrap();
assert_eq!(original_tool.name, "search");
}
#[test]
fn test_pegboard_namespace_operations() {
let pegboard = PegBoard::new();
assert_eq!(pegboard.list_namespaces().len(), 0);
assert_eq!(pegboard.list_all_tools().len(), 0);
assert_eq!(pegboard.get_all_tools().len(), 0);
assert_eq!(pegboard.list_tools_in_namespace("nonexistent").len(), 0);
assert_eq!(pegboard.get_tools_in_namespace("nonexistent").len(), 0);
}
#[test]
fn test_pegboard_get_tool_methods() {
let pegboard = PegBoard::new();
assert!(pegboard.get_tool("web-search").is_none());
assert!(pegboard.get_tool_route("web-search").is_none());
}
#[test]
fn test_pegboard_unregister() {
let pegboard = PegBoard::new();
assert!(pegboard.unregister_tool("web-nonexistent").is_err());
let result = pegboard.unregister_namespace("nonexistent");
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0); }
#[test]
fn test_tool_route_structure() {
let route = ToolRoute {
service_id: "test-mcp".to_string(),
original_name: "search".to_string(),
};
assert_eq!(route.service_id, "test-mcp");
assert_eq!(route.original_name, "search");
}
#[test]
fn test_optional_namespace() {
let namespace_none: Option<String> = None;
let namespace_empty = Some(String::new());
let ns1 = namespace_none.unwrap_or_default();
let ns2 = namespace_empty.unwrap_or_default();
assert_eq!(ns1, "");
assert_eq!(ns2, "");
assert!(!ns1.is_empty() == false);
assert!(!ns2.is_empty() == false);
}
#[test]
fn test_prefix_only_when_namespace_provided() {
let original_name = "search";
let with_ns = prefix_tool_name("web", original_name);
assert_eq!(with_ns, "web::search");
}
#[test]
fn test_tool_discovery() {
let pegboard = PegBoard::new();
let discovered = pegboard.discover_tool("web-search");
assert_eq!(discovered.len(), 0);
pegboard.register_tool_discovery(
"web-search",
vec!["web-fetch".to_string(), "web-parse".to_string()],
);
let discovered = pegboard.discover_tool("web-search");
assert_eq!(discovered.len(), 0);
pegboard.register_tool_discovery("web-search", vec!["other-tool".to_string()]);
let discovered = pegboard.discover_tool("web-search");
assert_eq!(discovered.len(), 0); }
#[test]
fn test_mcp_discovery() {
let pegboard = PegBoard::new();
let discovered = pegboard.discover_mcp("web-mcp");
assert_eq!(discovered.len(), 0);
pegboard.register_mcp_discovery(
"web-mcp",
vec![
"html-parser-mcp".to_string(),
"image-fetcher-mcp".to_string(),
],
);
let discovered = pegboard.discover_mcp("web-mcp");
assert_eq!(discovered.len(), 2);
assert!(discovered.contains(&"html-parser-mcp".to_string()));
assert!(discovered.contains(&"image-fetcher-mcp".to_string()));
pegboard.register_mcp_discovery("web-mcp", vec!["other-mcp".to_string()]);
let discovered = pegboard.discover_mcp("web-mcp");
assert_eq!(discovered.len(), 1);
assert_eq!(discovered[0], "other-mcp");
}
}