use arc_swap::ArcSwapOption;
pub use mcp_common::ToolFilter;
use rmcp::{
ErrorData, RoleClient, RoleServer, ServerHandler,
model::{
CallToolRequestParams, CallToolResult, ClientInfo, Content, Implementation,
ListToolsResult, PaginatedRequestParams, ServerInfo,
},
service::{NotificationContext, Peer, RequestContext, RunningService},
};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use tracing::{debug, error, info, warn};
static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Debug)]
struct PeerInner {
peer: Peer<RoleClient>,
#[allow(dead_code)]
_running: Arc<RunningService<RoleClient, ClientInfo>>,
}
#[derive(Clone, Debug)]
pub struct ProxyHandler {
peer: Arc<ArcSwapOption<PeerInner>>,
cached_info: ServerInfo,
mcp_id: String,
tool_filter: ToolFilter,
backend_version: Arc<AtomicU64>,
}
impl ServerHandler for ProxyHandler {
fn get_info(&self) -> ServerInfo {
self.cached_info.clone()
}
#[tracing::instrument(skip(self, request, context), fields(
mcp_id = %self.mcp_id,
request = ?request,
))]
async fn list_tools(
&self,
request: Option<PaginatedRequestParams>,
context: RequestContext<RoleServer>,
) -> Result<ListToolsResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
match self.capabilities().tools {
Some(_) => {
tokio::select! {
result = inner.peer.list_tools(request) => {
match result {
Ok(result) => {
let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
result
.tools
.into_iter()
.filter(|tool| self.tool_filter.is_allowed(&tool.name))
.collect()
} else {
result.tools
};
info!(
"[list_tools] Tool list results - MCP ID: {}, number of tools: {}{}",
self.mcp_id,
filtered_tools.len(),
if self.tool_filter.is_enabled() {
" (filtered)"
} else {
""
}
);
debug!(
"Proxying list_tools response with {} tools",
filtered_tools.len()
);
Ok(ListToolsResult {
tools: filtered_tools,
next_cursor: result.next_cursor,
meta: result.meta, })
}
Err(err) => {
error!("Error listing tools: {:?}", err);
Err(ErrorData::internal_error(
format!("Error listing tools: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[list_tools] Request canceled - MCP ID: {}", self.mcp_id);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
None => {
warn!("Server doesn't support tools capability");
Ok(ListToolsResult::default())
}
}
}
#[tracing::instrument(skip(self, request, context), fields(
mcp_id = %self.mcp_id,
tool_name = %request.name,
tool_arguments = ?request.arguments,
))]
async fn call_tool(
&self,
request: CallToolRequestParams,
context: RequestContext<RoleServer>,
) -> Result<CallToolResult, ErrorData> {
let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let start = Instant::now();
info!(
"[call_tool:{}] Start - Tool: {}, MCP ID: {}",
request_id, request.name, self.mcp_id
);
if !self.tool_filter.is_allowed(&request.name) {
info!(
"[call_tool:{}] Tool is filtered - MCP ID: {}, Tool: {}",
request_id, self.mcp_id, request.name
);
return Ok(CallToolResult::error(vec![Content::text(format!(
"Tool '{}' is not allowed by filter configuration",
request.name
))]));
}
let inner_guard = self.peer.load();
let inner = match inner_guard.as_ref() {
Some(inner) => {
let transport_closed = inner.peer.is_transport_closed();
info!(
"[call_tool:{}] Backend connection exists - transport_closed: {}",
request_id, transport_closed
);
inner
}
None => {
error!(
"[call_tool:{}] Backend connection unavailable (reconnecting) - MCP ID: {}",
request_id, self.mcp_id
);
return Ok(CallToolResult::error(vec![Content::text(
"Backend connection is not available, reconnecting...",
)]));
}
};
if inner.peer.is_transport_closed() {
error!(
"[call_tool:{}] Backend transport is closed - MCP ID: {}",
request_id, self.mcp_id
);
return Ok(CallToolResult::error(vec![Content::text(
"Backend connection closed, please retry",
)]));
}
let result = match self.capabilities().tools {
Some(_) => {
info!(
"[call_tool:{}] Send request to backend... - Tool: {}, Elapsed time: {}ms",
request_id,
request.name,
start.elapsed().as_millis()
);
let call_future = inner.peer.call_tool(request.clone());
tokio::pin!(call_future);
const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
heartbeat_interval.tick().await;
let call_result = loop {
tokio::select! {
biased;
result = &mut call_future => {
break result;
}
_ = context.ct.cancelled() => {
let elapsed = start.elapsed();
warn!(
"[call_tool:{}] Request canceled - Tool: {}, Time taken: {}ms, MCP ID: {}",
request_id, request.name, elapsed.as_millis(), self.mcp_id
);
return Ok(CallToolResult::error(vec![Content::text(
"Request cancelled"
)]));
}
_ = heartbeat_interval.tick() => {
let elapsed = start.elapsed();
let transport_closed = inner.peer.is_transport_closed();
info!(
"[call_tool:{}] Waiting for backend response... - Tool: {}, Waiting: {}ms, \\ transport_closed: {}, MCP ID: {}",
request_id, request.name, elapsed.as_millis(),
transport_closed, self.mcp_id
);
}
}
};
let elapsed = start.elapsed();
match &call_result {
Ok(call_result) => {
let is_error = call_result.is_error.unwrap_or(false);
info!(
"[call_tool:{}] Response received - tool: {}, time taken: {}ms, is_error: {}, MCP ID: {}",
request_id,
request.name,
elapsed.as_millis(),
is_error,
self.mcp_id
);
if is_error {
debug!(
"[call_tool:{}] Error response content: {:?}",
request_id, call_result.content
);
}
Ok(call_result.clone())
}
Err(err) => {
error!(
"[call_tool:{}] Backend returns error - Tool: {}, Time: {}ms, Error: {:?}, MCP ID: {}",
request_id,
request.name,
elapsed.as_millis(),
err,
self.mcp_id
);
Ok(CallToolResult::error(vec![Content::text(format!(
"Error: {err}"
))]))
}
}
}
None => {
error!(
"[call_tool:{}] The server does not support tools capability - MCP ID: {}",
request_id, self.mcp_id
);
Ok(CallToolResult::error(vec![Content::text(
"Server doesn't support tools capability",
)]))
}
};
let total_elapsed = start.elapsed();
info!(
"[call_tool:{}] Completed - Tool: {}, total time taken: {}ms",
request_id,
request.name,
total_elapsed.as_millis()
);
result
}
async fn list_resources(
&self,
request: Option<PaginatedRequestParams>,
context: RequestContext<RoleServer>,
) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
match self.capabilities().resources {
Some(_) => {
tokio::select! {
result = inner.peer.list_resources(request) => {
match result {
Ok(result) => {
info!(
"[list_resources] Resource list results - MCP ID: {}, resource quantity: {}",
self.mcp_id,
result.resources.len()
);
debug!("Proxying list_resources response");
Ok(result)
}
Err(err) => {
error!("Error listing resources: {:?}", err);
Err(ErrorData::internal_error(
format!("Error listing resources: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[list_resources] Request canceled - MCP ID: {}", self.mcp_id);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
None => {
warn!("Server doesn't support resources capability");
Ok(rmcp::model::ListResourcesResult::default())
}
}
}
async fn read_resource(
&self,
request: rmcp::model::ReadResourceRequestParams,
context: RequestContext<RoleServer>,
) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
match self.capabilities().resources {
Some(_) => {
tokio::select! {
result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParams::new(request.uri.clone())) => {
match result {
Ok(result) => {
info!(
"[read_resource] Resource read result - MCP ID: {}, URI: {}",
self.mcp_id, request.uri
);
debug!("Proxying read_resource response for {}", request.uri);
Ok(result)
}
Err(err) => {
error!("Error reading resource: {:?}", err);
Err(ErrorData::internal_error(
format!("Error reading resource: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[read_resource] Request canceled - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
None => {
error!("Server doesn't support resources capability");
Ok(rmcp::model::ReadResourceResult::new(vec![]))
}
}
}
async fn list_resource_templates(
&self,
request: Option<PaginatedRequestParams>,
context: RequestContext<RoleServer>,
) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
match self.capabilities().resources {
Some(_) => {
tokio::select! {
result = inner.peer.list_resource_templates(request) => {
match result {
Ok(result) => {
debug!("Proxying list_resource_templates response");
Ok(result)
}
Err(err) => {
error!("Error listing resource templates: {:?}", err);
Err(ErrorData::internal_error(
format!("Error listing resource templates: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[list_resource_templates] request canceled - MCP ID: {}", self.mcp_id);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
None => {
warn!("Server doesn't support resources capability");
Ok(rmcp::model::ListResourceTemplatesResult::default())
}
}
}
async fn list_prompts(
&self,
request: Option<PaginatedRequestParams>,
context: RequestContext<RoleServer>,
) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
match self.capabilities().prompts {
Some(_) => {
tokio::select! {
result = inner.peer.list_prompts(request) => {
match result {
Ok(result) => {
debug!("Proxying list_prompts response");
Ok(result)
}
Err(err) => {
error!("Error listing prompts: {:?}", err);
Err(ErrorData::internal_error(
format!("Error listing prompts: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[list_prompts] Request canceled - MCP ID: {}", self.mcp_id);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
None => {
warn!("Server doesn't support prompts capability");
Ok(rmcp::model::ListPromptsResult::default())
}
}
}
async fn get_prompt(
&self,
request: rmcp::model::GetPromptRequestParams,
context: RequestContext<RoleServer>,
) -> Result<rmcp::model::GetPromptResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
match self.capabilities().prompts {
Some(_) => {
tokio::select! {
result = inner.peer.get_prompt(request.clone()) => {
match result {
Ok(result) => {
debug!("Proxying get_prompt response");
Ok(result)
}
Err(err) => {
error!("Error getting prompt: {:?}", err);
Err(ErrorData::internal_error(
format!("Error getting prompt: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[get_prompt] Request canceled - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
None => {
warn!("Server doesn't support prompts capability");
let messages = Vec::new();
Ok(rmcp::model::GetPromptResult::new(messages))
}
}
}
async fn complete(
&self,
request: rmcp::model::CompleteRequestParams,
context: RequestContext<RoleServer>,
) -> Result<rmcp::model::CompleteResult, ErrorData> {
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
error!("Backend connection is not available (reconnecting)");
ErrorData::internal_error(
"Backend connection is not available, reconnecting...".to_string(),
None,
)
})?;
if inner.peer.is_transport_closed() {
error!("Backend transport is closed");
return Err(ErrorData::internal_error(
"Backend connection closed, please retry".to_string(),
None,
));
}
tokio::select! {
result = inner.peer.complete(request) => {
match result {
Ok(result) => {
debug!("Proxying complete response");
Ok(result)
}
Err(err) => {
error!("Error completing: {:?}", err);
Err(ErrorData::internal_error(
format!("Error completing: {err}"),
None,
))
}
}
}
_ = context.ct.cancelled() => {
info!("[complete] Request canceled - MCP ID: {}", self.mcp_id);
Err(ErrorData::internal_error(
"Request cancelled".to_string(),
None,
))
}
}
}
async fn on_progress(
&self,
notification: rmcp::model::ProgressNotificationParam,
_context: NotificationContext<RoleServer>,
) {
let inner_guard = self.peer.load();
let inner = match inner_guard.as_ref() {
Some(inner) => inner,
None => {
error!("Backend connection is not available, cannot forward progress notification");
return;
}
};
if inner.peer.is_transport_closed() {
error!("Backend transport is closed, cannot forward progress notification");
return;
}
match inner.peer.notify_progress(notification).await {
Ok(_) => {
debug!("Proxying progress notification");
}
Err(err) => {
error!("Error notifying progress: {:?}", err);
}
}
}
async fn on_cancelled(
&self,
notification: rmcp::model::CancelledNotificationParam,
_context: NotificationContext<RoleServer>,
) {
let inner_guard = self.peer.load();
let inner = match inner_guard.as_ref() {
Some(inner) => inner,
None => {
error!(
"Backend connection is not available, cannot forward cancelled notification"
);
return;
}
};
if inner.peer.is_transport_closed() {
error!("Backend transport is closed, cannot forward cancelled notification");
return;
}
match inner.peer.notify_cancelled(notification).await {
Ok(_) => {
debug!("Proxying cancelled notification");
}
Err(err) => {
error!("Error notifying cancelled: {:?}", err);
}
}
}
}
impl ProxyHandler {
#[inline]
fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
&self.cached_info.capabilities
}
fn default_server_info(mcp_id: &str) -> ServerInfo {
warn!(
"[ProxyHandler] Create default ServerInfo - MCP ID: {}",
mcp_id
);
ServerInfo::new(rmcp::model::ServerCapabilities::default())
.with_server_info(Implementation::new("MCP Proxy", "0.1.0"))
}
fn extract_server_info(
client: &RunningService<RoleClient, ClientInfo>,
mcp_id: &str,
) -> ServerInfo {
client
.peer_info()
.map(|peer_info| {
ServerInfo::new(peer_info.capabilities.clone())
.with_protocol_version(peer_info.protocol_version.clone())
.with_server_info(Implementation::new(
peer_info.server_info.name.clone(),
peer_info.server_info.version.clone(),
))
.with_instructions(peer_info.instructions.clone().unwrap_or_default())
})
.unwrap_or_else(|| Self::default_server_info(mcp_id))
}
pub fn new_disconnected(
mcp_id: String,
tool_filter: ToolFilter,
default_info: ServerInfo,
) -> Self {
info!(
"[ProxyHandler] Create a disconnected handler - MCP ID: {}",
mcp_id
);
if tool_filter.is_enabled() {
if let Some(ref allow_list) = tool_filter.allow_tools {
info!(
"[ProxyHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
mcp_id, allow_list
);
}
if let Some(ref deny_list) = tool_filter.deny_tools {
info!(
"[ProxyHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
mcp_id, deny_list
);
}
}
Self {
peer: Arc::new(ArcSwapOption::empty()),
cached_info: default_info,
mcp_id,
tool_filter,
backend_version: Arc::new(AtomicU64::new(0)), }
}
pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
Self::with_mcp_id(client, "unknown".to_string())
}
pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
Self::with_tool_filter(client, mcp_id, ToolFilter::default())
}
pub fn with_tool_filter(
client: RunningService<RoleClient, ClientInfo>,
mcp_id: String,
tool_filter: ToolFilter,
) -> Self {
use std::ops::Deref;
let cached_info = Self::extract_server_info(&client, &mcp_id);
let peer = client.deref().clone();
if tool_filter.is_enabled() {
if let Some(ref allow_list) = tool_filter.allow_tools {
info!(
"[ProxyHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
mcp_id, allow_list
);
}
if let Some(ref deny_list) = tool_filter.deny_tools {
info!(
"[ProxyHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
mcp_id, deny_list
);
}
}
let inner = PeerInner {
peer,
_running: Arc::new(client),
};
Self {
peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
cached_info,
mcp_id,
tool_filter,
backend_version: Arc::new(AtomicU64::new(1)), }
}
pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
use std::ops::Deref;
match new_client {
Some(client) => {
let peer = client.deref().clone();
let inner = PeerInner {
peer,
_running: Arc::new(client),
};
self.peer.store(Some(Arc::new(inner)));
info!(
"[ProxyHandler] Backend connection updated - MCP ID: {}",
self.mcp_id
);
}
None => {
self.peer.store(None);
info!(
"[ProxyHandler] Backend connection disconnected - MCP ID: {}",
self.mcp_id
);
}
}
let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
info!(
"[ProxyHandler] Backend version update: {} - MCP ID: {}",
new_version, self.mcp_id
);
}
pub fn is_backend_available(&self) -> bool {
let inner_guard = self.peer.load();
match inner_guard.as_ref() {
Some(inner) => !inner.peer.is_transport_closed(),
None => false,
}
}
pub async fn is_mcp_server_ready(&self) -> bool {
!self.is_terminated_async().await
}
pub fn is_terminated(&self) -> bool {
!self.is_backend_available()
}
pub async fn is_terminated_async(&self) -> bool {
let inner_guard = self.peer.load();
let inner = match inner_guard.as_ref() {
Some(inner) => inner,
None => return true,
};
if inner.peer.is_transport_closed() {
return true;
}
match inner.peer.list_tools(None).await {
Ok(_) => {
debug!("Backend connection status check: OK");
false
}
Err(e) => {
info!("Backend connection status check: Disconnected, reason: {e}");
true
}
}
}
pub fn mcp_id(&self) -> &str {
&self.mcp_id
}
pub fn get_server_info_json(&self) -> serde_json::Value {
serde_json::to_value(&self.cached_info).unwrap_or_default()
}
pub fn get_backend_version(&self) -> u64 {
self.backend_version.load(Ordering::SeqCst)
}
pub async fn call_peer_method(
&self,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, String> {
use rmcp::model::PaginatedRequestParams;
let inner_guard = self.peer.load();
let inner = inner_guard.as_ref().ok_or_else(|| {
"Backend connection is not available (reconnecting)".to_string()
})?;
if inner.peer.is_transport_closed() {
return Err("Backend transport is closed".to_string());
}
match method {
"tools/list" => {
let request: Option<PaginatedRequestParams> = serde_json::from_value(params).ok();
let result = inner.peer.list_tools(request).await
.map_err(|e| format!("list_tools error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"tools/call" => {
let request: rmcp::model::CallToolRequestParams = serde_json::from_value(params)
.map_err(|e| format!("Invalid params for tools/call: {}", e))?;
let result = inner.peer.call_tool(request).await
.map_err(|e| format!("call_tool error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"resources/list" => {
let request: Option<PaginatedRequestParams> = serde_json::from_value(params).ok();
let result = inner.peer.list_resources(request).await
.map_err(|e| format!("list_resources error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"resources/read" => {
let request: rmcp::model::ReadResourceRequestParams = serde_json::from_value(params)
.map_err(|e| format!("Invalid params for resources/read: {}", e))?;
let result = inner.peer.read_resource(request).await
.map_err(|e| format!("read_resource error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"prompts/list" => {
let request: Option<PaginatedRequestParams> = serde_json::from_value(params).ok();
let result = inner.peer.list_prompts(request).await
.map_err(|e| format!("list_prompts error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"prompts/get" => {
let request: rmcp::model::GetPromptRequestParams = serde_json::from_value(params)
.map_err(|e| format!("Invalid params for prompts/get: {}", e))?;
let result = inner.peer.get_prompt(request).await
.map_err(|e| format!("get_prompt error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"resources/templates/list" => {
let request: Option<PaginatedRequestParams> = serde_json::from_value(params).ok();
let result = inner.peer.list_resource_templates(request).await
.map_err(|e| format!("list_resource_templates error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
"completion/complete" => {
let request: rmcp::model::CompleteRequestParams = serde_json::from_value(params)
.map_err(|e| format!("Invalid params for completion/complete: {}", e))?;
let result = inner.peer.complete(request).await
.map_err(|e| format!("complete error: {:?}", e))?;
serde_json::to_value(result)
.map_err(|e| format!("serialize error: {}", e))
}
_ => Err(format!("Unsupported method: {}", method)),
}
}
pub fn swap_backend_from_connection(
&self,
conn: Option<crate::client::StreamClientConnection>,
) {
match conn {
Some(c) => {
let running = c.into_running_service();
self.swap_backend(Some(running));
}
None => {
self.swap_backend(None);
}
}
}
}
impl mcp_common::BackendBridge for ProxyHandler {
fn mcp_id(&self) -> &str {
self.mcp_id()
}
fn get_server_info_json(&self) -> serde_json::Value {
self.get_server_info_json()
}
fn is_backend_available(&self) -> bool {
self.is_backend_available()
}
fn is_mcp_server_ready(
&self,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + '_>> {
Box::pin(self.is_mcp_server_ready())
}
fn is_terminated_async(
&self,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + '_>> {
Box::pin(self.is_terminated_async())
}
fn call_peer_method(
&self,
method: &str,
params: serde_json::Value,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<serde_json::Value, String>> + Send + '_>,
> {
let method = method.to_string();
Box::pin(async move { ProxyHandler::call_peer_method(self, &method, params).await })
}
}