use std::sync::Arc;
use rmcp::{
model::{
CallToolRequestParams, CallToolResult, ClientCapabilities, Content, Implementation,
InitializeRequestParams, InitializeResult, ListToolsResult, PaginatedRequestParams,
ServerCapabilities, ServerInfo,
},
service::{RequestContext, RoleServer},
ErrorData as RmcpError, ServerHandler,
};
use tokio::sync::Mutex;
use crate::base::discovery::{force_deferred_via_env, FORCE_DEFERRED_ENV_VAR};
use crate::base::errors::McpError;
use crate::base::subsystem::Subsystem;
use crate::base::wire::WireFormat;
const DISCOVERY_TOOL_SEARCH: &str = "tools_search";
const DISCOVERY_TOOL_DESCRIBE: &str = "tools_describe_many";
const DEFERRED_TOOLS_CAP_KEY: &str = "x-deferred-tools";
#[derive(Debug, Clone, Copy)]
enum NegotiatedMode {
Pending,
Full,
Discovery,
}
struct SubsystemMcpServer<S: Subsystem> {
subsystem: Arc<S>,
wire_format: WireFormat,
mode: Mutex<NegotiatedMode>,
}
impl<S: Subsystem> SubsystemMcpServer<S> {
fn new(subsystem: S, wire_format: WireFormat) -> Self {
Self {
subsystem: Arc::new(subsystem),
wire_format,
mode: Mutex::new(NegotiatedMode::Pending),
}
}
fn build_tool_list(&self) -> Vec<rmcp::model::Tool> {
self.subsystem
.tools()
.into_iter()
.map(|td| {
let wire_name = self.wire_format.outbound(&td.name);
debug_assert!(
matches!(self.wire_format, WireFormat::DotSeparated)
|| !wire_name.contains('.'),
"wire_name '{}' contains dots in non-DotSeparated mode",
wire_name
);
let td_with_wire_name = crate::base::tool::ToolDescriptor {
name: wire_name,
..td
};
rmcp::model::Tool::from(td_with_wire_name)
})
.collect()
}
fn discovery_meta_tools() -> Vec<rmcp::model::Tool> {
let search_schema = Arc::new({
let mut m = serde_json::Map::new();
m.insert("type".into(), serde_json::Value::String("object".into()));
let mut props = serde_json::Map::new();
let mut q = serde_json::Map::new();
q.insert("type".into(), serde_json::Value::String("string".into()));
q.insert(
"description".into(),
serde_json::Value::String("keyword query".into()),
);
props.insert("query".into(), serde_json::Value::Object(q));
m.insert("properties".into(), serde_json::Value::Object(props));
m.insert(
"required".into(),
serde_json::Value::Array(vec![serde_json::Value::String("query".into())]),
);
m
});
let search_tool = rmcp::model::Tool::new(
DISCOVERY_TOOL_SEARCH,
"Search available tools by keyword. Use this to discover what tools \
are available before invoking them.",
search_schema,
);
let describe_schema = Arc::new({
let mut m = serde_json::Map::new();
m.insert("type".into(), serde_json::Value::String("object".into()));
let mut props = serde_json::Map::new();
let mut names = serde_json::Map::new();
names.insert("type".into(), serde_json::Value::String("array".into()));
let mut items = serde_json::Map::new();
items.insert("type".into(), serde_json::Value::String("string".into()));
names.insert("items".into(), serde_json::Value::Object(items));
names.insert(
"description".into(),
serde_json::Value::String("List of tool names to describe".into()),
);
props.insert("names".into(), serde_json::Value::Object(names));
m.insert("properties".into(), serde_json::Value::Object(props));
m.insert(
"required".into(),
serde_json::Value::Array(vec![serde_json::Value::String("names".into())]),
);
m
});
let describe_tool = rmcp::model::Tool::new(
DISCOVERY_TOOL_DESCRIBE,
"Fetch full schemas for a list of tool names discovered via \
tools_search. Returns the complete input schema for each tool.",
describe_schema,
);
vec![search_tool, describe_tool]
}
fn select_tools_for_mode(&self, mode: NegotiatedMode) -> Vec<rmcp::model::Tool> {
match mode {
NegotiatedMode::Discovery => Self::discovery_meta_tools(),
NegotiatedMode::Full => self.build_tool_list(),
NegotiatedMode::Pending => {
tracing::warn!(
subsystem = %self.subsystem.name(),
"tools/list called before initialize completed; defaulting to Full mode (client may be in inconsistent state)"
);
self.build_tool_list()
}
}
}
fn dispatch_discovery_meta(
&self,
name: &str,
args: Option<serde_json::Map<String, serde_json::Value>>,
) -> CallToolResult {
match name {
DISCOVERY_TOOL_SEARCH => {
let query = match args.as_ref().and_then(|m| m.get("query")) {
Some(serde_json::Value::String(q)) if !q.is_empty() => q.clone(),
Some(serde_json::Value::String(_)) => {
return CallToolResult::error(vec![Content::text(
"tools_search: 'query' must be a non-empty string",
)])
}
Some(_) => {
return CallToolResult::error(vec![Content::text(
"tools_search: 'query' must be a string",
)])
}
None => {
return CallToolResult::error(vec![Content::text(
"tools_search: 'query' argument is required",
)])
}
};
let hits: Vec<serde_json::Value> = self
.subsystem
.tools()
.into_iter()
.filter(|td| {
let q = query.to_lowercase();
td.name.to_lowercase().contains(&q)
|| td.description.to_lowercase().contains(&q)
})
.map(|td| {
let wire = self.wire_format.outbound(&td.name);
serde_json::json!({"name": wire, "description": td.description})
})
.collect();
match serde_json::to_string_pretty(&hits) {
Ok(text) => CallToolResult::success(vec![Content::text(text)]),
Err(e) => {
tracing::error!(error = %e, "failed to serialize discovery results");
CallToolResult::error(vec![Content::text(format!(
"serialization error: {e}"
))])
}
}
}
DISCOVERY_TOOL_DESCRIBE => {
let names_value = args.as_ref().and_then(|m| m.get("names"));
let names_raw = match names_value {
Some(serde_json::Value::Array(arr)) => arr.clone(),
Some(_) => {
return CallToolResult::error(vec![Content::text(
"tools_describe_many: 'names' must be an array",
)])
}
None => {
return CallToolResult::error(vec![Content::text(
"tools_describe_many: 'names' argument is required",
)])
}
};
let filtered: Vec<&str> = names_raw.iter().filter_map(|v| v.as_str()).collect();
if filtered.len() != names_raw.len() {
tracing::warn!(
requested = names_raw.len(),
valid = filtered.len(),
"tools_describe_many: input contained non-string values; ignoring"
);
}
let names: Vec<String> = filtered.iter().map(|s| s.to_string()).collect();
let all_tools = self.build_tool_list();
let matched_tools: Vec<_> = all_tools
.into_iter()
.filter(|t| names.contains(&t.name.to_string()))
.collect();
if matched_tools.len() < names.len() {
let found: std::collections::HashSet<_> =
matched_tools.iter().map(|t| t.name.to_string()).collect();
let missing: Vec<&str> = names
.iter()
.filter(|n| !found.contains(*n))
.map(|n| n.as_str())
.collect();
tracing::warn!(
requested = names.len(),
found = matched_tools.len(),
?missing,
"tools_describe_many: some requested names were not found in catalog"
);
}
let matched: Vec<serde_json::Value> = matched_tools
.into_iter()
.map(|t| {
serde_json::json!({
"name": t.name,
"description": t.description,
"inputSchema": *t.input_schema,
})
})
.collect();
match serde_json::to_string_pretty(&matched) {
Ok(text) => CallToolResult::success(vec![Content::text(text)]),
Err(e) => {
tracing::error!(error = %e, "failed to serialize discovery results");
CallToolResult::error(vec![Content::text(format!(
"serialization error: {e}"
))])
}
}
}
other => {
CallToolResult::error(vec![Content::text(format!("unknown meta-tool: {other}"))])
}
}
}
}
#[allow(clippy::manual_async_fn)]
impl<S: Subsystem> ServerHandler for SubsystemMcpServer<S> {
fn get_info(&self) -> ServerInfo {
InitializeResult::new(ServerCapabilities::builder().enable_tools().build())
.with_server_info(Implementation::new(
self.subsystem.name(),
self.subsystem.version(),
))
}
fn initialize(
&self,
request: InitializeRequestParams,
_context: RequestContext<RoleServer>,
) -> impl std::future::Future<Output = Result<InitializeResult, RmcpError>> + Send + '_ {
async move {
let mode = detect_list_mode(&request.capabilities);
*self.mode.lock().await = mode;
tracing::debug!(
subsystem = self.subsystem.name(),
client = request.client_info.name,
?mode,
"MCP initialize negotiated"
);
Ok(self.get_info())
}
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, RmcpError>> + Send + '_ {
async move {
let mode = *self.mode.lock().await;
let tools = self.select_tools_for_mode(mode);
tracing::debug!(
subsystem = self.subsystem.name(),
?mode,
tool_count = tools.len(),
"tools/list"
);
Ok(ListToolsResult {
tools,
next_cursor: None,
meta: None,
})
}
}
fn call_tool(
&self,
request: CallToolRequestParams,
_context: RequestContext<RoleServer>,
) -> impl std::future::Future<Output = Result<CallToolResult, RmcpError>> + Send + '_ {
async move {
let name: &str = &request.name;
let args_value = request
.arguments
.map(serde_json::Value::Object)
.unwrap_or(serde_json::Value::Null);
tracing::debug!(tool = name, "tools/call");
if name == DISCOVERY_TOOL_SEARCH || name == DISCOVERY_TOOL_DESCRIBE {
let args_map = match args_value {
serde_json::Value::Object(m) => Some(m),
_ => None,
};
return Ok(self.dispatch_discovery_meta(name, args_map));
}
match self.subsystem.invoke(name, args_value).await {
Ok(result) => {
let text = serde_json::to_string_pretty(&result)
.unwrap_or_else(|e| format!("serialization error: {e}"));
Ok(CallToolResult::success(vec![Content::text(text)]))
}
Err(McpError::ToolNotFound { name: n }) => {
tracing::debug!(tool = %n, "tools/call: tool not found");
Ok(CallToolResult::error(vec![Content::text(format!(
"tool not found: {n}"
))]))
}
Err(e) => {
tracing::error!(tool = %name, error = %e, "tools/call: subsystem error");
Ok(CallToolResult::error(vec![Content::text(format!(
"subsystem error: {e}"
))]))
}
}
}
}
}
fn detect_list_mode(caps: &ClientCapabilities) -> NegotiatedMode {
let client_opted_in = caps
.experimental
.as_ref()
.map(|exp| exp.contains_key(DEFERRED_TOOLS_CAP_KEY))
.unwrap_or(false);
let forced = force_deferred_via_env();
if forced {
if client_opted_in {
tracing::debug!(
env_var = FORCE_DEFERRED_ENV_VAR,
"env var set but client already opted in; no effect on negotiated mode"
);
} else {
tracing::info!(
env_var = FORCE_DEFERRED_ENV_VAR,
"deferred mode forced by env var (client did not opt in)"
);
}
}
let deferred = client_opted_in || forced;
if deferred {
NegotiatedMode::Discovery
} else {
NegotiatedMode::Full
}
}
pub async fn serve_mcp<S: Subsystem>(
subsystem: S,
transport: crate::base::cli::McpTransport,
) -> Result<(), McpError> {
match transport {
crate::base::cli::McpTransport::Stdio => serve_stdio(subsystem).await,
crate::base::cli::McpTransport::Tcp | crate::base::cli::McpTransport::Ws => {
Err(McpError::Transport(
"TCP and WebSocket transports are not implemented in v1; \
use --transport stdio"
.to_string(),
))
}
}
}
async fn serve_stdio<S: Subsystem>(subsystem: S) -> Result<(), McpError> {
let server = SubsystemMcpServer::new(subsystem, WireFormat::default());
let transport = rmcp::transport::stdio();
let running = rmcp::serve_server(server, transport).await.map_err(|e| {
tracing::error!(error = %e, "rmcp::serve_server failed during initialize handshake");
McpError::Transport(e.to_string())
})?;
tracing::info!("MCP server ready (stdio)");
running.waiting().await.map_err(|e| {
tracing::error!(error = %e, "MCP server task exited with error");
McpError::Transport(format!("server task join error: {e}"))
})?;
tracing::info!("MCP session ended");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::base::health::{HealthLevel, HealthStatus};
use crate::base::subsystem::ConnectArgs;
use crate::base::tool::ToolDescriptor;
use async_trait::async_trait;
use serde_json::{json, Value};
struct MockSubsystem {
name: &'static str,
tools: Vec<ToolDescriptor>,
}
impl MockSubsystem {
fn with_tools(name: &'static str, tools: Vec<ToolDescriptor>) -> Self {
Self { name, tools }
}
}
#[async_trait]
impl Subsystem for MockSubsystem {
fn name(&self) -> &str {
self.name
}
fn tools(&self) -> Vec<ToolDescriptor> {
self.tools.clone()
}
async fn connect(&mut self, _args: &ConnectArgs) -> Result<(), McpError> {
Ok(())
}
async fn disconnect(&mut self) -> Result<(), McpError> {
Ok(())
}
async fn health_check(&self) -> Result<HealthStatus, McpError> {
Ok(HealthStatus {
level: HealthLevel::Healthy,
last_seen_unix_ms: 0,
latency_ms: None,
version: "0.0.0".into(),
ring: None,
subsystem_specific: Value::Null,
})
}
async fn shutdown(&mut self) -> Result<(), McpError> {
Ok(())
}
async fn invoke(&self, tool_name: &str, _args: Value) -> Result<Value, McpError> {
Ok(json!({"tool": tool_name, "ok": true}))
}
}
#[test]
fn tool_catalog_construction_matches_expected_shape() {
let tools = vec![
ToolDescriptor::new(
"connect",
"Connect two breadboard nodes",
json!({"type": "object", "properties": {"node1": {"type": "integer"}, "node2": {"type": "integer"}}, "required": ["node1", "node2"]}),
),
ToolDescriptor::new(
"dac_set",
"Set DAC voltage output",
json!({"type": "object", "properties": {"channel": {"type": "integer"}, "voltage_mv": {"type": "integer"}}, "required": ["channel", "voltage_mv"]}),
),
];
let subsystem = MockSubsystem::with_tools("jumperless", tools);
let server = SubsystemMcpServer::new(subsystem, WireFormat::default());
let rmcp_tools = server.build_tool_list();
assert_eq!(rmcp_tools.len(), 2);
let connect = &rmcp_tools[0];
assert_eq!(connect.name.as_ref(), "connect");
assert_eq!(
connect.description.as_deref(),
Some("Connect two breadboard nodes")
);
assert_eq!(
connect.input_schema.get("type").and_then(|v| v.as_str()),
Some("object")
);
let dac = &rmcp_tools[1];
assert_eq!(dac.name.as_ref(), "dac_set");
}
#[test]
fn bare_snake_case_names_pass_through_unchanged_under_underscore_fmt() {
let tools = vec![ToolDescriptor::new(
"ina_get_current",
"Read INA current",
json!({"type": "object", "properties": {}}),
)];
let subsystem = MockSubsystem::with_tools("bench", tools);
let server = SubsystemMcpServer::new(subsystem, WireFormat::UnderscoreSeparated);
let rmcp_tools = server.build_tool_list();
assert_eq!(rmcp_tools[0].name.as_ref(), "ina_get_current");
}
#[test]
fn dotted_internal_name_translates_to_underscore_wire_name() {
let td = ToolDescriptor {
name: "jumperless.connect".to_string(),
description: "Connect nodes".to_string(),
input_schema: json!({"type": "object", "properties": {}}),
timeout_ms: crate::base::tool::DEFAULT_TOOL_TIMEOUT_MS,
};
let server = SubsystemMcpServer::new(
MockSubsystem::with_tools("fed", vec![]),
WireFormat::UnderscoreSeparated,
);
let wire_name = server.wire_format.outbound(&td.name);
assert_eq!(wire_name, "jumperless_connect");
let dot_server = SubsystemMcpServer::new(
MockSubsystem::with_tools("fed2", vec![]),
WireFormat::DotSeparated,
);
let dot_wire = dot_server.wire_format.outbound(&td.name);
assert_eq!(dot_wire, "jumperless.connect");
}
#[test]
fn deferred_tools_capability_triggers_discovery_mode() {
let caps: ClientCapabilities = serde_json::from_value(serde_json::json!({
"experimental": {
"x-deferred-tools": {}
}
}))
.expect("failed to deserialize test ClientCapabilities");
let mode = detect_list_mode(&caps);
assert!(
matches!(mode, NegotiatedMode::Discovery),
"expected Discovery mode, got {mode:?}"
);
}
#[test]
fn no_deferred_capability_yields_full_mode() {
let caps = ClientCapabilities::default();
let mode = detect_list_mode(&caps);
assert!(
matches!(mode, NegotiatedMode::Full),
"expected Full mode, got {mode:?}"
);
}
#[test]
fn discovery_meta_tools_have_correct_names() {
let meta = SubsystemMcpServer::<MockSubsystem>::discovery_meta_tools();
assert_eq!(meta.len(), 2);
let names: Vec<&str> = meta.iter().map(|t| t.name.as_ref()).collect();
assert!(names.contains(&DISCOVERY_TOOL_SEARCH));
assert!(names.contains(&DISCOVERY_TOOL_DESCRIBE));
for tool in &meta {
assert!(
tool.description
.as_ref()
.map(|d| !d.is_empty())
.unwrap_or(false),
"meta-tool '{}' has empty description",
tool.name
);
}
}
#[tokio::test]
async fn pending_mode_falls_back_to_full() {
let tools = vec![
ToolDescriptor::new(
"tool_a",
"First tool",
json!({"type": "object", "properties": {}}),
),
ToolDescriptor::new(
"tool_b",
"Second tool",
json!({"type": "object", "properties": {}}),
),
];
let subsystem = MockSubsystem::with_tools("pending-test", tools);
let server = SubsystemMcpServer::new(subsystem, WireFormat::default());
let full_catalog = server.build_tool_list();
assert_eq!(
full_catalog.len(),
2,
"expected 2 subsystem tools, not discovery meta-tools"
);
let names: Vec<&str> = full_catalog.iter().map(|t| t.name.as_ref()).collect();
assert!(names.contains(&"tool_a"));
assert!(names.contains(&"tool_b"));
let meta = SubsystemMcpServer::<MockSubsystem>::discovery_meta_tools();
assert_eq!(meta.len(), 2);
assert!(meta
.iter()
.all(|t| t.name.as_ref() != "tool_a" && t.name.as_ref() != "tool_b"));
}
#[test]
fn select_tools_for_mode_all_arms() {
let tools = vec![
ToolDescriptor::new(
"alpha",
"First tool",
json!({"type": "object", "properties": {}}),
),
ToolDescriptor::new(
"beta",
"Second tool",
json!({"type": "object", "properties": {}}),
),
];
let subsystem = MockSubsystem::with_tools("mode-test", tools);
let server = SubsystemMcpServer::new(subsystem, WireFormat::default());
let full = server.select_tools_for_mode(NegotiatedMode::Full);
assert_eq!(full.len(), 2, "Full mode should return all subsystem tools");
let full_names: Vec<&str> = full.iter().map(|t| t.name.as_ref()).collect();
assert!(full_names.contains(&"alpha") && full_names.contains(&"beta"));
let discovery = server.select_tools_for_mode(NegotiatedMode::Discovery);
assert_eq!(
discovery.len(),
2,
"Discovery mode should return exactly two meta-tools"
);
let disc_names: Vec<&str> = discovery.iter().map(|t| t.name.as_ref()).collect();
assert!(disc_names.contains(&DISCOVERY_TOOL_SEARCH));
assert!(disc_names.contains(&DISCOVERY_TOOL_DESCRIBE));
let pending = server.select_tools_for_mode(NegotiatedMode::Pending);
assert_eq!(
pending.len(),
2,
"Pending mode should fall back to full catalog"
);
let pend_names: Vec<&str> = pending.iter().map(|t| t.name.as_ref()).collect();
assert!(pend_names.contains(&"alpha") && pend_names.contains(&"beta"));
}
#[test]
fn server_info_reflects_subsystem_name() {
let subsystem = MockSubsystem::with_tools("my-subsystem", vec![]);
let server = SubsystemMcpServer::new(subsystem, WireFormat::default());
let info = server.get_info();
assert_eq!(info.server_info.name, "my-subsystem");
}
}