use secrecy::{ExposeSecret, SecretString};
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
#[cfg(unix)]
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tracing::{debug, info};
use turbomcp_client::Client;
use turbomcp_protocol::types::{
GetPromptResult, Prompt, ReadResourceResult, Resource, ResourceTemplate, Tool,
};
use turbomcp_protocol::{Error, PROTOCOL_VERSION};
#[cfg(unix)]
use turbomcp_transport::UnixTransport;
use turbomcp_transport::{
ChildProcessConfig, ChildProcessTransport, TcpTransport, Transport,
WebSocketBidirectionalConfig, WebSocketBidirectionalTransport,
streamable_http_client::{StreamableHttpClientConfig, StreamableHttpClientTransport},
};
use crate::error::{ProxyError, ProxyResult};
use crate::introspection::{
EmptyCapability, LoggingCapability, PromptSpec, PromptsCapability, ResourceSpec,
ResourcesCapability, ServerCapabilities, ServerInfo, ServerSpec, ToolAnnotations,
ToolInputSchema, ToolOutputSchema, ToolSpec, ToolsCapability,
};
type ClientFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>;
pub trait ProxyClient: Send + Sync {
fn list_tools(&self) -> ClientFuture<'_, Vec<Tool>>;
fn list_resources(&self) -> ClientFuture<'_, Vec<Resource>>;
fn list_resource_templates(&self) -> ClientFuture<'_, Vec<ResourceTemplate>>;
fn list_prompts(&self) -> ClientFuture<'_, Vec<Prompt>>;
fn call_tool(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, Value>;
fn read_resource(&self, uri: &str) -> ClientFuture<'_, ReadResourceResult>;
fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, GetPromptResult>;
}
struct ConcreteProxyClient<T: Transport + 'static> {
client: Arc<Client<T>>,
}
impl<T: Transport + 'static> ProxyClient for ConcreteProxyClient<T> {
fn list_tools(&self) -> ClientFuture<'_, Vec<Tool>> {
let client = self.client.clone();
Box::pin(async move { client.list_tools().await })
}
fn list_resources(&self) -> ClientFuture<'_, Vec<Resource>> {
let client = self.client.clone();
Box::pin(async move { client.list_resources().await })
}
fn list_resource_templates(&self) -> ClientFuture<'_, Vec<ResourceTemplate>> {
let client = self.client.clone();
Box::pin(async move {
let mut all_templates = Vec::new();
let mut cursor = None;
for _ in 0..1000 {
let result = client.list_resource_templates_paginated(cursor).await?;
let page_empty = result.resource_templates.is_empty();
all_templates.extend(result.resource_templates);
match result.next_cursor {
Some(next_cursor) if !page_empty => cursor = Some(next_cursor),
_ => break,
}
}
Ok(all_templates)
})
}
fn list_prompts(&self) -> ClientFuture<'_, Vec<Prompt>> {
let client = self.client.clone();
Box::pin(async move { client.list_prompts().await })
}
fn call_tool(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, Value> {
let client = self.client.clone();
let name = name.to_string();
Box::pin(async move {
let result = client.call_tool(&name, arguments, None).await?;
Ok(serde_json::to_value(result)?)
})
}
fn read_resource(&self, uri: &str) -> ClientFuture<'_, ReadResourceResult> {
let client = self.client.clone();
let uri = uri.to_string();
Box::pin(async move { client.read_resource(&uri).await })
}
fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, GetPromptResult> {
let client = self.client.clone();
let name = name.to_string();
Box::pin(async move { client.get_prompt(&name, arguments).await })
}
}
#[derive(Debug, Clone)]
pub enum BackendTransport {
Stdio {
command: String,
args: Vec<String>,
working_dir: Option<String>,
},
Http {
url: String,
endpoint_path: Option<String>,
auth_token: Option<SecretString>,
},
Tcp {
host: String,
port: u16,
},
#[cfg(unix)]
Unix {
path: String,
},
WebSocket {
url: String,
},
}
fn backend_transport_kind(t: &BackendTransport) -> &'static str {
match t {
BackendTransport::Stdio { .. } => "stdio",
BackendTransport::Http { .. } => "http",
BackendTransport::Tcp { .. } => "tcp",
#[cfg(unix)]
BackendTransport::Unix { .. } => "unix",
BackendTransport::WebSocket { .. } => "websocket",
}
}
#[derive(Debug, Clone)]
pub struct BackendConfig {
pub transport: BackendTransport,
pub client_name: String,
pub client_version: String,
}
#[derive(Clone)]
pub struct BackendConnector {
client: Arc<dyn ProxyClient>,
#[allow(dead_code)] config: Arc<BackendConfig>,
spec: Arc<tokio::sync::Mutex<Option<ServerSpec>>>,
init_result: Arc<turbomcp_client::InitializeResult>,
}
impl std::fmt::Debug for BackendConnector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BackendConnector")
.field("config", &self.config)
.field("spec", &"<Mutex>")
.finish_non_exhaustive()
}
}
impl BackendConnector {
#[allow(clippy::too_many_lines)]
pub async fn new(config: BackendConfig) -> ProxyResult<Self> {
info!(
transport_kind = backend_transport_kind(&config.transport),
"Creating backend connector"
);
let (client, init_result): (Arc<dyn ProxyClient>, _) = match &config.transport {
BackendTransport::Stdio {
command,
args,
working_dir,
} => {
let process_config = ChildProcessConfig {
command: command.clone(),
args: args.clone(),
working_directory: working_dir.clone(),
environment: None,
..Default::default()
};
let transport = ChildProcessTransport::new(process_config);
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to subprocess: {e}"))
})?;
debug!("STDIO backend connected: {} {:?}", command, args);
let client = Client::new(transport);
let init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
let proxy_client: Arc<dyn ProxyClient> = Arc::new(ConcreteProxyClient {
client: Arc::new(client),
});
(proxy_client, init_result)
}
BackendTransport::Http {
url,
endpoint_path,
auth_token,
} => {
let http_config = StreamableHttpClientConfig {
base_url: url.clone(),
endpoint_path: endpoint_path.clone().unwrap_or_else(|| "/mcp".to_string()),
timeout: std::time::Duration::from_secs(30),
auth_token: auth_token.as_ref().map(|s| s.expose_secret().to_string()),
..Default::default()
};
let transport = StreamableHttpClientTransport::new(http_config).map_err(|e| {
ProxyError::backend(format!("Failed to build HTTP transport: {e}"))
})?;
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to HTTP backend: {e}"))
})?;
debug!("HTTP backend connected: {}", url);
let client = Client::new(transport);
let init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
let proxy_client: Arc<dyn ProxyClient> = Arc::new(ConcreteProxyClient {
client: Arc::new(client),
});
(proxy_client, init_result)
}
BackendTransport::Tcp { host, port } => {
let addr = format!("{host}:{port}")
.parse::<SocketAddr>()
.map_err(|e| ProxyError::backend(format!("Invalid TCP address: {e}")))?;
let transport =
TcpTransport::new_client(SocketAddr::from(([127, 0, 0, 1], 0)), addr);
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to TCP backend: {e}"))
})?;
debug!("TCP backend connected: {}:{}", host, port);
let client = Client::new(transport);
let init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
let proxy_client: Arc<dyn ProxyClient> = Arc::new(ConcreteProxyClient {
client: Arc::new(client),
});
(proxy_client, init_result)
}
#[cfg(unix)]
BackendTransport::Unix { path } => {
let socket_path = PathBuf::from(path);
let transport = UnixTransport::new_client(socket_path.clone());
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to Unix socket: {e}"))
})?;
debug!("Unix socket backend connected: {}", path);
let client = Client::new(transport);
let init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
let proxy_client: Arc<dyn ProxyClient> = Arc::new(ConcreteProxyClient {
client: Arc::new(client),
});
(proxy_client, init_result)
}
BackendTransport::WebSocket { url } => {
let ws_config = WebSocketBidirectionalConfig {
url: Some(url.clone()),
..Default::default()
};
let transport = WebSocketBidirectionalTransport::new(ws_config)
.await
.map_err(|e| {
ProxyError::backend(format!("Failed to connect to WebSocket: {e}"))
})?;
debug!("WebSocket backend connected: {}", url);
let client = Client::new(transport);
let init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
let proxy_client: Arc<dyn ProxyClient> = Arc::new(ConcreteProxyClient {
client: Arc::new(client),
});
(proxy_client, init_result)
}
};
info!("Backend initialized successfully");
Ok(Self {
client,
config: Arc::new(config),
spec: Arc::new(tokio::sync::Mutex::new(None)),
init_result: Arc::new(init_result),
})
}
pub async fn introspect(&self) -> ProxyResult<ServerSpec> {
debug!("Introspecting backend server");
let spec = self.introspect_via_client().await?;
*self.spec.lock().await = Some(spec.clone());
info!(
"Backend introspection complete: {} tools, {} resources, {} prompts",
spec.tools.len(),
spec.resources.len(),
spec.prompts.len()
);
Ok(spec)
}
async fn introspect_via_client(&self) -> ProxyResult<ServerSpec> {
let tools = self
.client
.list_tools()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list tools: {e}")))?;
let resources = self
.client
.list_resources()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list resources: {e}")))?;
let resource_templates =
self.client.list_resource_templates().await.map_err(|e| {
ProxyError::backend(format!("Failed to list resource templates: {e}"))
})?;
let prompts = self
.client
.list_prompts()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list prompts: {e}")))?;
let server_info = ServerInfo {
name: self.init_result.server_info.name.clone(),
version: self.init_result.server_info.version.clone(),
title: self.init_result.server_info.title.clone(),
};
Ok(ServerSpec {
server_info,
protocol_version: PROTOCOL_VERSION.to_string(),
capabilities: Self::convert_capabilities(&self.init_result.server_capabilities),
tools: Self::convert_tools(tools),
resources: Self::convert_resources(resources),
resource_templates: Self::convert_resource_templates(resource_templates),
prompts: Self::convert_prompts(prompts),
instructions: None,
})
}
fn convert_capabilities(
caps: &turbomcp_protocol::types::ServerCapabilities,
) -> ServerCapabilities {
ServerCapabilities {
logging: caps.logging.as_ref().map(|_| LoggingCapability {}),
completions: caps.completions.as_ref().map(|_| EmptyCapability {}),
prompts: caps.prompts.as_ref().map(|p| PromptsCapability {
list_changed: p.list_changed,
}),
resources: caps.resources.as_ref().map(|r| ResourcesCapability {
subscribe: r.subscribe,
list_changed: r.list_changed,
}),
tools: caps.tools.as_ref().map(|t| ToolsCapability {
list_changed: t.list_changed,
}),
experimental: caps.experimental.clone(),
}
}
fn convert_tools(tools: Vec<Tool>) -> Vec<ToolSpec> {
tools
.into_iter()
.map(|t| ToolSpec {
name: t.name,
title: t.title,
description: t.description,
input_schema: convert_input_schema(t.input_schema),
output_schema: t.output_schema.map(convert_output_schema),
annotations: t.annotations.map(|a| ToolAnnotations {
title: a.title,
read_only_hint: a.read_only_hint,
destructive_hint: a.destructive_hint,
idempotent_hint: a.idempotent_hint,
open_world_hint: a.open_world_hint,
}),
})
.collect()
}
fn convert_resources(resources: Vec<Resource>) -> Vec<ResourceSpec> {
resources
.into_iter()
.map(|r| ResourceSpec {
uri: r.uri.clone(),
name: r.name,
title: r.title,
description: r.description,
mime_type: r.mime_type,
size: r.size,
annotations: r.annotations.map(|ann| crate::introspection::Annotations {
fields: serde_json::from_value(serde_json::to_value(ann).unwrap_or_default())
.unwrap_or_default(),
}),
})
.collect()
}
fn convert_resource_templates(
templates: Vec<ResourceTemplate>,
) -> Vec<crate::introspection::ResourceTemplateSpec> {
templates
.into_iter()
.map(|t| crate::introspection::ResourceTemplateSpec {
uri_template: t.uri_template,
name: t.name,
title: t.title,
description: t.description,
mime_type: t.mime_type,
annotations: t.annotations.map(|ann| crate::introspection::Annotations {
fields: serde_json::from_value(serde_json::to_value(ann).unwrap_or_default())
.unwrap_or_default(),
}),
})
.collect()
}
fn convert_prompts(prompts: Vec<Prompt>) -> Vec<PromptSpec> {
prompts
.into_iter()
.map(|p| {
let arguments = p
.arguments
.unwrap_or_default()
.into_iter()
.map(|a| crate::introspection::PromptArgument {
name: a.name,
title: None,
description: a.description,
required: a.required,
})
.collect();
PromptSpec {
name: p.name,
title: p.title,
description: p.description,
arguments,
}
})
.collect()
}
#[must_use]
pub async fn spec(&self) -> Option<ServerSpec> {
self.spec.lock().await.clone()
}
pub async fn call_tool(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ProxyResult<Value> {
debug!("Calling backend tool: {}", name);
self.client.call_tool(name, arguments).await.map_err(|e| {
ProxyError::backend_with_code(format!("Tool call failed: {e}"), e.jsonrpc_error_code())
})
}
pub async fn list_tools(&self) -> ProxyResult<Vec<Tool>> {
self.client.list_tools().await.map_err(|e| {
ProxyError::backend_with_code(
format!("Failed to list tools: {e}"),
e.jsonrpc_error_code(),
)
})
}
pub async fn list_resources(&self) -> ProxyResult<Vec<Resource>> {
self.client.list_resources().await.map_err(|e| {
ProxyError::backend_with_code(
format!("Failed to list resources: {e}"),
e.jsonrpc_error_code(),
)
})
}
pub async fn list_resource_templates(&self) -> ProxyResult<Vec<ResourceTemplate>> {
self.client.list_resource_templates().await.map_err(|e| {
ProxyError::backend_with_code(
format!("Failed to list resource templates: {e}"),
e.jsonrpc_error_code(),
)
})
}
pub async fn read_resource(&self, uri: &str) -> ProxyResult<ReadResourceResult> {
self.client.read_resource(uri).await.map_err(|e| {
ProxyError::backend_with_code(
format!("Failed to read resource: {e}"),
e.jsonrpc_error_code(),
)
})
}
pub async fn list_prompts(&self) -> ProxyResult<Vec<Prompt>> {
self.client.list_prompts().await.map_err(|e| {
ProxyError::backend_with_code(
format!("Failed to list prompts: {e}"),
e.jsonrpc_error_code(),
)
})
}
pub async fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ProxyResult<turbomcp_protocol::types::GetPromptResult> {
self.client.get_prompt(name, arguments).await.map_err(|e| {
ProxyError::backend_with_code(
format!("Failed to get prompt: {e}"),
e.jsonrpc_error_code(),
)
})
}
}
#[cfg(test)]
#[derive(Clone, Default)]
struct StaticProxyClient {
tools: Vec<Tool>,
resources: Vec<Resource>,
resource_templates: Vec<ResourceTemplate>,
prompts: Vec<Prompt>,
}
#[cfg(test)]
impl ProxyClient for StaticProxyClient {
fn list_tools(&self) -> ClientFuture<'_, Vec<Tool>> {
let tools = self.tools.clone();
Box::pin(async move { Ok(tools) })
}
fn list_resources(&self) -> ClientFuture<'_, Vec<Resource>> {
let resources = self.resources.clone();
Box::pin(async move { Ok(resources) })
}
fn list_resource_templates(&self) -> ClientFuture<'_, Vec<ResourceTemplate>> {
let resource_templates = self.resource_templates.clone();
Box::pin(async move { Ok(resource_templates) })
}
fn list_prompts(&self) -> ClientFuture<'_, Vec<Prompt>> {
let prompts = self.prompts.clone();
Box::pin(async move { Ok(prompts) })
}
fn call_tool(
&self,
_name: &str,
_arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, Value> {
Box::pin(async { Err(Error::method_not_found("test backend has no tools")) })
}
fn read_resource(&self, _uri: &str) -> ClientFuture<'_, ReadResourceResult> {
Box::pin(async { Err(Error::method_not_found("test backend has no resources")) })
}
fn get_prompt(
&self,
_name: &str,
_arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, GetPromptResult> {
Box::pin(async { Err(Error::method_not_found("test backend has no prompts")) })
}
}
#[cfg(test)]
impl BackendConnector {
pub(crate) fn from_static_data_for_test(
tools: Vec<Tool>,
resources: Vec<Resource>,
resource_templates: Vec<ResourceTemplate>,
prompts: Vec<Prompt>,
) -> Self {
let client = StaticProxyClient {
tools,
resources,
resource_templates,
prompts,
};
Self {
client: Arc::new(client),
config: Arc::new(BackendConfig {
transport: BackendTransport::Stdio {
command: "test".to_string(),
args: Vec::new(),
working_dir: None,
},
client_name: "test-proxy".to_string(),
client_version: "1.0.0".to_string(),
}),
spec: Arc::new(tokio::sync::Mutex::new(None)),
init_result: Arc::new(turbomcp_client::InitializeResult {
server_info: turbomcp_protocol::types::Implementation {
name: "test-backend".to_string(),
version: "1.0.0".to_string(),
..Default::default()
},
server_capabilities: turbomcp_protocol::types::ServerCapabilities::default(),
}),
}
}
}
fn convert_input_schema(schema: turbomcp_protocol::types::ToolInputSchema) -> ToolInputSchema {
let schema_type = schema
.schema_type
.as_ref()
.and_then(|value| value.as_str().map(str::to_owned))
.unwrap_or_else(|| "object".to_string());
let properties = schema.properties_as_object().map(|obj| {
obj.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<std::collections::HashMap<_, _>>()
});
let mut additional = HashMap::new();
if let Some(additional_props) = schema.additional_properties {
additional.insert("additionalProperties".to_string(), additional_props);
}
additional.extend(schema.extra_keywords);
ToolInputSchema {
schema_type,
properties,
required: schema.required,
additional,
}
}
fn convert_output_schema(schema: turbomcp_protocol::types::ToolOutputSchema) -> ToolOutputSchema {
let mut additional = HashMap::new();
if let Some(additional_props) = schema.additional_properties {
additional.insert("additionalProperties".to_string(), additional_props);
}
additional.extend(schema.extra_keywords);
let schema_type = schema
.schema_type
.as_ref()
.and_then(|value| value.as_str().map(str::to_owned))
.unwrap_or_else(|| "object".to_string());
let properties = schema.properties.and_then(|props| {
props.as_object().map(|obj| {
obj.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<std::collections::HashMap<_, _>>()
})
});
ToolOutputSchema {
schema_type,
properties,
required: schema.required,
additional,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backend_config_creation() {
let config = BackendConfig {
transport: BackendTransport::Stdio {
command: "python".to_string(),
args: vec!["server.py".to_string()],
working_dir: None,
},
client_name: "test-proxy".to_string(),
client_version: "1.0.0".to_string(),
};
assert_eq!(config.client_name, "test-proxy");
assert_eq!(config.client_version, "1.0.0");
}
#[tokio::test]
async fn test_static_backend_introspection_preserves_resource_templates() {
let tool = Tool {
name: "inspect".to_string(),
title: Some("Inspect".to_string()),
description: Some("Inspect an item".to_string()),
output_schema: Some(turbomcp_protocol::types::ToolOutputSchema::empty()),
annotations: Some(turbomcp_protocol::types::ToolAnnotations {
title: Some("Inspect".to_string()),
read_only_hint: Some(true),
..Default::default()
}),
..Default::default()
};
let resource = Resource {
uri: "repo://one".to_string(),
name: "repo-one".to_string(),
title: Some("Repo One".to_string()),
description: Some("Repository one".to_string()),
mime_type: Some("application/json".to_string()),
size: Some(42),
..Default::default()
};
let template = ResourceTemplate {
uri_template: "repo://{owner}/{name}".to_string(),
name: "repo".to_string(),
title: Some("Repository".to_string()),
description: Some("Repository metadata".to_string()),
mime_type: Some("application/json".to_string()),
..Default::default()
};
let prompt = Prompt {
name: "summarize".to_string(),
title: Some("Summarize".to_string()),
description: Some("Summarize input".to_string()),
..Default::default()
};
let backend = BackendConnector::from_static_data_for_test(
vec![tool],
vec![resource],
vec![template],
vec![prompt],
);
let spec = backend.introspect().await.expect("introspection");
assert_eq!(spec.tools[0].title.as_deref(), Some("Inspect"));
assert!(spec.tools[0].output_schema.is_some());
assert_eq!(
spec.tools[0]
.annotations
.as_ref()
.and_then(|a| a.read_only_hint),
Some(true)
);
assert_eq!(spec.resources[0].title.as_deref(), Some("Repo One"));
assert_eq!(spec.resources[0].size, Some(42));
assert_eq!(
spec.resource_templates[0].uri_template,
"repo://{owner}/{name}"
);
assert_eq!(
spec.resource_templates[0].title.as_deref(),
Some("Repository")
);
assert_eq!(spec.prompts[0].title.as_deref(), Some("Summarize"));
let forwarded = backend
.list_resource_templates()
.await
.expect("forwarded templates");
assert_eq!(forwarded[0].uri_template, "repo://{owner}/{name}");
assert_eq!(forwarded[0].title.as_deref(), Some("Repository"));
}
#[tokio::test]
#[ignore = "Requires building manual_server example via cargo run"]
async fn test_backend_connector_with_echo() {
let config = BackendConfig {
transport: BackendTransport::Stdio {
command: "cargo".to_string(),
args: vec![
"run".to_string(),
"--package".to_string(),
"turbomcp-server".to_string(),
"--example".to_string(),
"manual_server".to_string(),
],
working_dir: Some("/Users/nickpaterno/work/turbomcp".to_string()),
},
client_name: "test-proxy".to_string(),
client_version: "1.0.0".to_string(),
};
let result = BackendConnector::new(config).await;
if let Ok(backend) = result {
let spec = backend.introspect().await;
if let Ok(spec) = spec {
assert!(!spec.tools.is_empty(), "Should have at least one tool");
}
}
}
}