use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
#[cfg(unix)]
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tracing::{debug, info};
use turbomcp_client::Client;
use turbomcp_protocol::types::{GetPromptResult, Prompt, ReadResourceResult, Resource, Tool};
use turbomcp_protocol::{Error, PROTOCOL_VERSION};
#[cfg(unix)]
use turbomcp_transport::UnixTransport;
use turbomcp_transport::{
ChildProcessConfig, ChildProcessTransport, TcpTransport, Transport,
WebSocketBidirectionalConfig, WebSocketBidirectionalTransport,
streamable_http_client::{StreamableHttpClientConfig, StreamableHttpClientTransport},
};
use crate::error::{ProxyError, ProxyResult};
use crate::introspection::{
PromptSpec, PromptsCapability, ResourceSpec, ResourcesCapability, ServerCapabilities,
ServerInfo, ServerSpec, ToolInputSchema, ToolSpec, ToolsCapability,
};
type ClientFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>;
pub trait ProxyClient: Send + Sync {
fn list_tools(&self) -> ClientFuture<'_, Vec<Tool>>;
fn list_resources(&self) -> ClientFuture<'_, Vec<Resource>>;
fn list_prompts(&self) -> ClientFuture<'_, Vec<Prompt>>;
fn call_tool(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, Value>;
fn read_resource(&self, uri: &str) -> ClientFuture<'_, ReadResourceResult>;
fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, GetPromptResult>;
}
struct ConcreteProxyClient<T: Transport + 'static> {
client: Arc<Client<T>>,
}
impl<T: Transport + 'static> ProxyClient for ConcreteProxyClient<T> {
fn list_tools(&self) -> ClientFuture<'_, Vec<Tool>> {
let client = self.client.clone();
Box::pin(async move { client.list_tools().await })
}
fn list_resources(&self) -> ClientFuture<'_, Vec<Resource>> {
let client = self.client.clone();
Box::pin(async move { client.list_resources().await })
}
fn list_prompts(&self) -> ClientFuture<'_, Vec<Prompt>> {
let client = self.client.clone();
Box::pin(async move { client.list_prompts().await })
}
fn call_tool(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, Value> {
let client = self.client.clone();
let name = name.to_string();
Box::pin(async move {
let result = client.call_tool(&name, arguments, None).await?;
Ok(serde_json::to_value(result)?)
})
}
fn read_resource(&self, uri: &str) -> ClientFuture<'_, ReadResourceResult> {
let client = self.client.clone();
let uri = uri.to_string();
Box::pin(async move { client.read_resource(&uri).await })
}
fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ClientFuture<'_, GetPromptResult> {
let client = self.client.clone();
let name = name.to_string();
Box::pin(async move { client.get_prompt(&name, arguments).await })
}
}
#[derive(Debug, Clone)]
pub enum BackendTransport {
Stdio {
command: String,
args: Vec<String>,
working_dir: Option<String>,
},
Http {
url: String,
auth_token: Option<String>,
},
Tcp {
host: String,
port: u16,
},
#[cfg(unix)]
Unix {
path: String,
},
WebSocket {
url: String,
},
}
#[derive(Debug, Clone)]
pub struct BackendConfig {
pub transport: BackendTransport,
pub client_name: String,
pub client_version: String,
}
#[derive(Clone)]
pub struct BackendConnector {
client: Arc<dyn ProxyClient>,
#[allow(dead_code)] config: Arc<BackendConfig>,
spec: Arc<tokio::sync::Mutex<Option<ServerSpec>>>,
}
impl std::fmt::Debug for BackendConnector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BackendConnector")
.field("config", &self.config)
.field("spec", &"<Mutex>")
.finish_non_exhaustive()
}
}
impl BackendConnector {
#[allow(clippy::too_many_lines)]
pub async fn new(config: BackendConfig) -> ProxyResult<Self> {
info!("Creating backend connector: {:?}", config.transport);
let client: Arc<dyn ProxyClient> = match &config.transport {
BackendTransport::Stdio {
command,
args,
working_dir,
} => {
let process_config = ChildProcessConfig {
command: command.clone(),
args: args.clone(),
working_directory: working_dir.clone(),
environment: None,
..Default::default()
};
let transport = ChildProcessTransport::new(process_config);
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to subprocess: {e}"))
})?;
debug!("STDIO backend connected: {} {:?}", command, args);
let client = Client::new(transport);
let _init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
Arc::new(ConcreteProxyClient {
client: Arc::new(client),
})
}
BackendTransport::Http { url, auth_token } => {
let http_config = StreamableHttpClientConfig {
base_url: url.clone(),
endpoint_path: "/mcp".to_string(),
timeout: std::time::Duration::from_secs(30),
auth_token: auth_token.clone(),
..Default::default()
};
let transport = StreamableHttpClientTransport::new(http_config);
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to HTTP backend: {e}"))
})?;
debug!("HTTP backend connected: {}", url);
let client = Client::new(transport);
let _init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
Arc::new(ConcreteProxyClient {
client: Arc::new(client),
})
}
BackendTransport::Tcp { host, port } => {
let addr = format!("{host}:{port}")
.parse::<SocketAddr>()
.map_err(|e| ProxyError::backend(format!("Invalid TCP address: {e}")))?;
let transport = TcpTransport::new_client(
"127.0.0.1:0"
.parse()
.unwrap_or_else(|_| "127.0.0.1:0".parse().unwrap()),
addr,
);
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to TCP backend: {e}"))
})?;
debug!("TCP backend connected: {}:{}", host, port);
let client = Client::new(transport);
let _init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
Arc::new(ConcreteProxyClient {
client: Arc::new(client),
})
}
#[cfg(unix)]
BackendTransport::Unix { path } => {
let socket_path = PathBuf::from(path);
let transport = UnixTransport::new_client(socket_path.clone());
transport.connect().await.map_err(|e| {
ProxyError::backend(format!("Failed to connect to Unix socket: {e}"))
})?;
debug!("Unix socket backend connected: {}", path);
let client = Client::new(transport);
let _init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
Arc::new(ConcreteProxyClient {
client: Arc::new(client),
})
}
BackendTransport::WebSocket { url } => {
let ws_config = WebSocketBidirectionalConfig {
url: Some(url.clone()),
..Default::default()
};
let transport = WebSocketBidirectionalTransport::new(ws_config)
.await
.map_err(|e| {
ProxyError::backend(format!("Failed to connect to WebSocket: {e}"))
})?;
debug!("WebSocket backend connected: {}", url);
let client = Client::new(transport);
let _init_result = client.initialize().await.map_err(|e| {
ProxyError::backend(format!("Failed to initialize backend: {e}"))
})?;
Arc::new(ConcreteProxyClient {
client: Arc::new(client),
})
}
};
info!("Backend initialized successfully");
Ok(Self {
client,
config: Arc::new(config),
spec: Arc::new(tokio::sync::Mutex::new(None)),
})
}
pub async fn introspect(&self) -> ProxyResult<ServerSpec> {
debug!("Introspecting backend server");
let spec = self.introspect_via_client().await?;
*self.spec.lock().await = Some(spec.clone());
info!(
"Backend introspection complete: {} tools, {} resources, {} prompts",
spec.tools.len(),
spec.resources.len(),
spec.prompts.len()
);
Ok(spec)
}
async fn introspect_via_client(&self) -> ProxyResult<ServerSpec> {
let tools = self
.client
.list_tools()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list tools: {e}")))?;
let resources = self
.client
.list_resources()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list resources: {e}")))?;
let prompts = self
.client
.list_prompts()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list prompts: {e}")))?;
Ok(ServerSpec {
server_info: Self::create_server_info(),
protocol_version: PROTOCOL_VERSION.to_string(),
capabilities: Self::create_capabilities(),
tools: Self::convert_tools(tools),
resources: Self::convert_resources(resources),
prompts: Self::convert_prompts(prompts),
resource_templates: Vec::new(),
instructions: None,
})
}
fn create_server_info() -> ServerInfo {
ServerInfo {
name: "backend-server".to_string(),
version: "unknown".to_string(),
title: None,
}
}
fn create_capabilities() -> ServerCapabilities {
ServerCapabilities {
tools: Some(ToolsCapability { list_changed: None }),
resources: Some(ResourcesCapability {
subscribe: None,
list_changed: None,
}),
prompts: Some(PromptsCapability { list_changed: None }),
logging: None,
completions: None,
experimental: None,
}
}
fn convert_tools(tools: Vec<Tool>) -> Vec<ToolSpec> {
tools
.into_iter()
.map(|t| {
let mut additional = HashMap::new();
if let Some(additional_props) = t.input_schema.additional_properties {
additional.insert(
"additionalProperties".to_string(),
Value::Bool(additional_props),
);
}
ToolSpec {
name: t.name,
title: None,
description: t.description,
input_schema: ToolInputSchema {
schema_type: t.input_schema.schema_type,
properties: t.input_schema.properties,
required: t.input_schema.required,
additional,
},
output_schema: None,
annotations: None,
}
})
.collect()
}
fn convert_resources(resources: Vec<Resource>) -> Vec<ResourceSpec> {
resources
.into_iter()
.map(|r| ResourceSpec {
uri: r.uri.to_string(),
name: r.name,
title: None,
description: r.description,
mime_type: r.mime_type.map(Into::into),
size: None,
annotations: None,
})
.collect()
}
fn convert_prompts(prompts: Vec<Prompt>) -> Vec<PromptSpec> {
prompts
.into_iter()
.map(|p| {
let arguments = p
.arguments
.unwrap_or_default()
.into_iter()
.map(|a| crate::introspection::PromptArgument {
name: a.name,
title: None,
description: a.description,
required: a.required,
})
.collect();
PromptSpec {
name: p.name,
title: None,
description: p.description,
arguments,
}
})
.collect()
}
#[must_use]
pub async fn spec(&self) -> Option<ServerSpec> {
self.spec.lock().await.clone()
}
pub async fn call_tool(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ProxyResult<Value> {
debug!("Calling backend tool: {}", name);
self.client
.call_tool(name, arguments)
.await
.map_err(|e| ProxyError::backend(format!("Tool call failed: {e}")))
}
pub async fn list_tools(&self) -> ProxyResult<Vec<Tool>> {
self.client
.list_tools()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list tools: {e}")))
}
pub async fn list_resources(&self) -> ProxyResult<Vec<Resource>> {
self.client
.list_resources()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list resources: {e}")))
}
pub async fn read_resource(&self, uri: &str) -> ProxyResult<ReadResourceResult> {
self.client
.read_resource(uri)
.await
.map_err(|e| ProxyError::backend(format!("Failed to read resource: {e}")))
}
pub async fn list_prompts(&self) -> ProxyResult<Vec<Prompt>> {
self.client
.list_prompts()
.await
.map_err(|e| ProxyError::backend(format!("Failed to list prompts: {e}")))
}
pub async fn get_prompt(
&self,
name: &str,
arguments: Option<HashMap<String, Value>>,
) -> ProxyResult<turbomcp_protocol::types::GetPromptResult> {
self.client
.get_prompt(name, arguments)
.await
.map_err(|e| ProxyError::backend(format!("Failed to get prompt: {e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backend_config_creation() {
let config = BackendConfig {
transport: BackendTransport::Stdio {
command: "python".to_string(),
args: vec!["server.py".to_string()],
working_dir: None,
},
client_name: "test-proxy".to_string(),
client_version: "1.0.0".to_string(),
};
assert_eq!(config.client_name, "test-proxy");
assert_eq!(config.client_version, "1.0.0");
}
#[tokio::test]
#[ignore = "Requires building manual_server example via cargo run"]
async fn test_backend_connector_with_echo() {
let config = BackendConfig {
transport: BackendTransport::Stdio {
command: "cargo".to_string(),
args: vec![
"run".to_string(),
"--package".to_string(),
"turbomcp-server".to_string(),
"--example".to_string(),
"manual_server".to_string(),
],
working_dir: Some("/Users/nickpaterno/work/turbomcp".to_string()),
},
client_name: "test-proxy".to_string(),
client_version: "1.0.0".to_string(),
};
let result = BackendConnector::new(config).await;
if let Ok(backend) = result {
let spec = backend.introspect().await;
if let Ok(spec) = spec {
assert!(!spec.tools.is_empty(), "Should have at least one tool");
}
}
}
}