1use tracing::{debug, info, warn, error};
2use rmcp::{
6 ErrorData, RoleClient, RoleServer, ServerHandler,
7 model::{
8 CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
9 PaginatedRequestParam, ServerInfo,
10 },
11 service::{NotificationContext, Peer, RequestContext, RunningService},
12};
13use std::collections::HashSet;
14use std::sync::Arc;
15use arc_swap::ArcSwapOption;
16
17#[derive(Clone, Debug, Default)]
19pub struct ToolFilter {
20 pub allow_tools: Option<HashSet<String>>,
22 pub deny_tools: Option<HashSet<String>>,
24}
25
26impl ToolFilter {
27 pub fn allow(tools: Vec<String>) -> Self {
29 Self {
30 allow_tools: Some(tools.into_iter().collect()),
31 deny_tools: None,
32 }
33 }
34
35 pub fn deny(tools: Vec<String>) -> Self {
37 Self {
38 allow_tools: None,
39 deny_tools: Some(tools.into_iter().collect()),
40 }
41 }
42
43 pub fn is_allowed(&self, tool_name: &str) -> bool {
45 if let Some(ref allow_list) = self.allow_tools {
47 return allow_list.contains(tool_name);
48 }
49 if let Some(ref deny_list) = self.deny_tools {
51 return !deny_list.contains(tool_name);
52 }
53 true
55 }
56
57 pub fn is_enabled(&self) -> bool {
59 self.allow_tools.is_some() || self.deny_tools.is_some()
60 }
61}
62
63#[derive(Debug)]
66struct PeerInner {
67 peer: Peer<RoleClient>,
69 #[allow(dead_code)]
71 _running: Arc<RunningService<RoleClient, ClientInfo>>,
72}
73
74#[derive(Clone, Debug)]
77pub struct ProxyHandler {
78 peer: Arc<ArcSwapOption<PeerInner>>,
81 cached_info: ServerInfo,
83 mcp_id: String,
85 tool_filter: ToolFilter,
87}
88
89impl ServerHandler for ProxyHandler {
90 fn get_info(&self) -> ServerInfo {
91 self.cached_info.clone()
92 }
93
94 #[tracing::instrument(skip(self, request, context), fields(
95 mcp_id = %self.mcp_id,
96 request = ?request,
97 ))]
98 async fn list_tools(
99 &self,
100 request: Option<PaginatedRequestParam>,
101 context: RequestContext<RoleServer>,
102 ) -> Result<ListToolsResult, ErrorData> {
103 let inner_guard = self.peer.load();
105 let inner = inner_guard.as_ref().ok_or_else(|| {
106 error!("Backend connection is not available (reconnecting)");
107 ErrorData::internal_error(
108 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
109 None,
110 )
111 })?;
112
113 if inner.peer.is_transport_closed() {
115 error!("Backend transport is closed");
116 return Err(ErrorData::internal_error(
117 "Backend connection closed, please retry".to_string(),
118 None,
119 ));
120 }
121
122 match self.capabilities().tools {
124 Some(_) => {
125 tokio::select! {
127 result = inner.peer.list_tools(request) => {
128 match result {
129 Ok(result) => {
130 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
132 result
133 .tools
134 .into_iter()
135 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
136 .collect()
137 } else {
138 result.tools
139 };
140
141 info!(
143 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
144 self.mcp_id,
145 filtered_tools.len(),
146 if self.tool_filter.is_enabled() {
147 " (已过滤)"
148 } else {
149 ""
150 }
151 );
152
153 debug!(
154 "Proxying list_tools response with {} tools",
155 filtered_tools.len()
156 );
157 Ok(ListToolsResult {
158 tools: filtered_tools,
159 next_cursor: result.next_cursor,
160 })
161 }
162 Err(err) => {
163 error!("Error listing tools: {:?}", err);
164 Err(ErrorData::internal_error(
165 format!("Error listing tools: {err}"),
166 None,
167 ))
168 }
169 }
170 }
171 _ = context.ct.cancelled() => {
172 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
173 Err(ErrorData::internal_error(
174 "Request cancelled".to_string(),
175 None,
176 ))
177 }
178 }
179 }
180 None => {
181 warn!("Server doesn't support tools capability");
183 Ok(ListToolsResult::default())
184 }
185 }
186 }
187
188 #[tracing::instrument(skip(self, request, context), fields(
189 mcp_id = %self.mcp_id,
190 tool_name = %request.name,
191 tool_arguments = ?request.arguments,
192 ))]
193 async fn call_tool(
194 &self,
195 request: CallToolRequestParam,
196 context: RequestContext<RoleServer>,
197 ) -> Result<CallToolResult, ErrorData> {
198 if !self.tool_filter.is_allowed(&request.name) {
200 info!(
201 "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
202 self.mcp_id, request.name
203 );
204 return Ok(CallToolResult::error(vec![Content::text(format!(
205 "Tool '{}' is not allowed by filter configuration",
206 request.name
207 ))]));
208 }
209
210 let inner_guard = self.peer.load();
212 let inner = match inner_guard.as_ref() {
213 Some(inner) => inner,
214 None => {
215 error!("Backend connection is not available (reconnecting)");
216 return Ok(CallToolResult::error(vec![Content::text(
217 "Backend connection is not available, reconnecting..."
218 )]));
219 }
220 };
221
222 if inner.peer.is_transport_closed() {
224 error!("Backend transport is closed");
225 return Ok(CallToolResult::error(vec![Content::text(
226 "Backend connection closed, please retry",
227 )]));
228 }
229
230 match self.capabilities().tools {
232 Some(_) => {
233 tokio::select! {
235 result = inner.peer.call_tool(request.clone()) => {
236 match result {
237 Ok(result) => {
238 info!(
240 "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}",
241 self.mcp_id, request.name
242 );
243
244 debug!("Tool call succeeded");
245 Ok(result)
246 }
247 Err(err) => {
248 error!("Error calling tool: {:?}", err);
249 Ok(CallToolResult::error(vec![Content::text(format!(
251 "Error: {err}"
252 ))]))
253 }
254 }
255 }
256 _ = context.ct.cancelled() => {
257 info!(
258 "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}",
259 self.mcp_id, request.name
260 );
261 Ok(CallToolResult::error(vec![Content::text(
262 "Request cancelled"
263 )]))
264 }
265 }
266 }
267 None => {
268 error!("Server doesn't support tools capability");
269 Ok(CallToolResult::error(vec![Content::text(
270 "Server doesn't support tools capability",
271 )]))
272 }
273 }
274 }
275
276 async fn list_resources(
277 &self,
278 request: Option<PaginatedRequestParam>,
279 context: RequestContext<RoleServer>,
280 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
281 let inner_guard = self.peer.load();
283 let inner = inner_guard.as_ref().ok_or_else(|| {
284 error!("Backend connection is not available (reconnecting)");
285 ErrorData::internal_error(
286 "Backend connection is not available, reconnecting...".to_string(),
287 None,
288 )
289 })?;
290
291 if inner.peer.is_transport_closed() {
293 error!("Backend transport is closed");
294 return Err(ErrorData::internal_error(
295 "Backend connection closed, please retry".to_string(),
296 None,
297 ));
298 }
299
300 match self.capabilities().resources {
302 Some(_) => {
303 tokio::select! {
304 result = inner.peer.list_resources(request) => {
305 match result {
306 Ok(result) => {
307 info!(
309 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
310 self.mcp_id,
311 result.resources.len()
312 );
313
314 debug!("Proxying list_resources response");
315 Ok(result)
316 }
317 Err(err) => {
318 error!("Error listing resources: {:?}", err);
319 Err(ErrorData::internal_error(
320 format!("Error listing resources: {err}"),
321 None,
322 ))
323 }
324 }
325 }
326 _ = context.ct.cancelled() => {
327 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
328 Err(ErrorData::internal_error(
329 "Request cancelled".to_string(),
330 None,
331 ))
332 }
333 }
334 }
335 None => {
336 warn!("Server doesn't support resources capability");
338 Ok(rmcp::model::ListResourcesResult::default())
339 }
340 }
341 }
342
343 async fn read_resource(
344 &self,
345 request: rmcp::model::ReadResourceRequestParam,
346 context: RequestContext<RoleServer>,
347 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
348 let inner_guard = self.peer.load();
350 let inner = inner_guard.as_ref().ok_or_else(|| {
351 error!("Backend connection is not available (reconnecting)");
352 ErrorData::internal_error(
353 "Backend connection is not available, reconnecting...".to_string(),
354 None,
355 )
356 })?;
357
358 if inner.peer.is_transport_closed() {
360 error!("Backend transport is closed");
361 return Err(ErrorData::internal_error(
362 "Backend connection closed, please retry".to_string(),
363 None,
364 ));
365 }
366
367 match self.capabilities().resources {
369 Some(_) => {
370 tokio::select! {
371 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
372 uri: request.uri.clone(),
373 }) => {
374 match result {
375 Ok(result) => {
376 info!(
378 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
379 self.mcp_id, request.uri
380 );
381
382 debug!("Proxying read_resource response for {}", request.uri);
383 Ok(result)
384 }
385 Err(err) => {
386 error!("Error reading resource: {:?}", err);
387 Err(ErrorData::internal_error(
388 format!("Error reading resource: {err}"),
389 None,
390 ))
391 }
392 }
393 }
394 _ = context.ct.cancelled() => {
395 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
396 Err(ErrorData::internal_error(
397 "Request cancelled".to_string(),
398 None,
399 ))
400 }
401 }
402 }
403 None => {
404 error!("Server doesn't support resources capability");
406 Ok(rmcp::model::ReadResourceResult {
407 contents: Vec::new(),
408 })
409 }
410 }
411 }
412
413 async fn list_resource_templates(
414 &self,
415 request: Option<PaginatedRequestParam>,
416 context: RequestContext<RoleServer>,
417 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
418 let inner_guard = self.peer.load();
420 let inner = inner_guard.as_ref().ok_or_else(|| {
421 error!("Backend connection is not available (reconnecting)");
422 ErrorData::internal_error(
423 "Backend connection is not available, reconnecting...".to_string(),
424 None,
425 )
426 })?;
427
428 if inner.peer.is_transport_closed() {
430 error!("Backend transport is closed");
431 return Err(ErrorData::internal_error(
432 "Backend connection closed, please retry".to_string(),
433 None,
434 ));
435 }
436
437 match self.capabilities().resources {
439 Some(_) => {
440 tokio::select! {
441 result = inner.peer.list_resource_templates(request) => {
442 match result {
443 Ok(result) => {
444 debug!("Proxying list_resource_templates response");
445 Ok(result)
446 }
447 Err(err) => {
448 error!("Error listing resource templates: {:?}", err);
449 Err(ErrorData::internal_error(
450 format!("Error listing resource templates: {err}"),
451 None,
452 ))
453 }
454 }
455 }
456 _ = context.ct.cancelled() => {
457 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
458 Err(ErrorData::internal_error(
459 "Request cancelled".to_string(),
460 None,
461 ))
462 }
463 }
464 }
465 None => {
466 warn!("Server doesn't support resources capability");
468 Ok(rmcp::model::ListResourceTemplatesResult::default())
469 }
470 }
471 }
472
473 async fn list_prompts(
474 &self,
475 request: Option<PaginatedRequestParam>,
476 context: RequestContext<RoleServer>,
477 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
478 let inner_guard = self.peer.load();
480 let inner = inner_guard.as_ref().ok_or_else(|| {
481 error!("Backend connection is not available (reconnecting)");
482 ErrorData::internal_error(
483 "Backend connection is not available, reconnecting...".to_string(),
484 None,
485 )
486 })?;
487
488 if inner.peer.is_transport_closed() {
490 error!("Backend transport is closed");
491 return Err(ErrorData::internal_error(
492 "Backend connection closed, please retry".to_string(),
493 None,
494 ));
495 }
496
497 match self.capabilities().prompts {
499 Some(_) => {
500 tokio::select! {
501 result = inner.peer.list_prompts(request) => {
502 match result {
503 Ok(result) => {
504 debug!("Proxying list_prompts response");
505 Ok(result)
506 }
507 Err(err) => {
508 error!("Error listing prompts: {:?}", err);
509 Err(ErrorData::internal_error(
510 format!("Error listing prompts: {err}"),
511 None,
512 ))
513 }
514 }
515 }
516 _ = context.ct.cancelled() => {
517 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
518 Err(ErrorData::internal_error(
519 "Request cancelled".to_string(),
520 None,
521 ))
522 }
523 }
524 }
525 None => {
526 warn!("Server doesn't support prompts capability");
528 Ok(rmcp::model::ListPromptsResult::default())
529 }
530 }
531 }
532
533 async fn get_prompt(
534 &self,
535 request: rmcp::model::GetPromptRequestParam,
536 context: RequestContext<RoleServer>,
537 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
538 let inner_guard = self.peer.load();
540 let inner = inner_guard.as_ref().ok_or_else(|| {
541 error!("Backend connection is not available (reconnecting)");
542 ErrorData::internal_error(
543 "Backend connection is not available, reconnecting...".to_string(),
544 None,
545 )
546 })?;
547
548 if inner.peer.is_transport_closed() {
550 error!("Backend transport is closed");
551 return Err(ErrorData::internal_error(
552 "Backend connection closed, please retry".to_string(),
553 None,
554 ));
555 }
556
557 match self.capabilities().prompts {
559 Some(_) => {
560 tokio::select! {
561 result = inner.peer.get_prompt(request.clone()) => {
562 match result {
563 Ok(result) => {
564 debug!("Proxying get_prompt response");
565 Ok(result)
566 }
567 Err(err) => {
568 error!("Error getting prompt: {:?}", err);
569 Err(ErrorData::internal_error(
570 format!("Error getting prompt: {err}"),
571 None,
572 ))
573 }
574 }
575 }
576 _ = context.ct.cancelled() => {
577 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
578 Err(ErrorData::internal_error(
579 "Request cancelled".to_string(),
580 None,
581 ))
582 }
583 }
584 }
585 None => {
586 warn!("Server doesn't support prompts capability");
588 Ok(rmcp::model::GetPromptResult {
589 description: None,
590 messages: Vec::new(),
591 })
592 }
593 }
594 }
595
596 async fn complete(
597 &self,
598 request: rmcp::model::CompleteRequestParam,
599 context: RequestContext<RoleServer>,
600 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
601 let inner_guard = self.peer.load();
603 let inner = inner_guard.as_ref().ok_or_else(|| {
604 error!("Backend connection is not available (reconnecting)");
605 ErrorData::internal_error(
606 "Backend connection is not available, reconnecting...".to_string(),
607 None,
608 )
609 })?;
610
611 if inner.peer.is_transport_closed() {
613 error!("Backend transport is closed");
614 return Err(ErrorData::internal_error(
615 "Backend connection closed, please retry".to_string(),
616 None,
617 ));
618 }
619
620 tokio::select! {
621 result = inner.peer.complete(request) => {
622 match result {
623 Ok(result) => {
624 debug!("Proxying complete response");
625 Ok(result)
626 }
627 Err(err) => {
628 error!("Error completing: {:?}", err);
629 Err(ErrorData::internal_error(
630 format!("Error completing: {err}"),
631 None,
632 ))
633 }
634 }
635 }
636 _ = context.ct.cancelled() => {
637 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
638 Err(ErrorData::internal_error(
639 "Request cancelled".to_string(),
640 None,
641 ))
642 }
643 }
644 }
645
646 async fn on_progress(
647 &self,
648 notification: rmcp::model::ProgressNotificationParam,
649 _context: NotificationContext<RoleServer>,
650 ) {
651 let inner_guard = self.peer.load();
653 let inner = match inner_guard.as_ref() {
654 Some(inner) => inner,
655 None => {
656 error!("Backend connection is not available, cannot forward progress notification");
657 return;
658 }
659 };
660
661 if inner.peer.is_transport_closed() {
663 error!("Backend transport is closed, cannot forward progress notification");
664 return;
665 }
666
667 match inner.peer.notify_progress(notification).await {
668 Ok(_) => {
669 debug!("Proxying progress notification");
670 }
671 Err(err) => {
672 error!("Error notifying progress: {:?}", err);
673 }
674 }
675 }
676
677 async fn on_cancelled(
678 &self,
679 notification: rmcp::model::CancelledNotificationParam,
680 _context: NotificationContext<RoleServer>,
681 ) {
682 let inner_guard = self.peer.load();
684 let inner = match inner_guard.as_ref() {
685 Some(inner) => inner,
686 None => {
687 error!("Backend connection is not available, cannot forward cancelled notification");
688 return;
689 }
690 };
691
692 if inner.peer.is_transport_closed() {
694 error!("Backend transport is closed, cannot forward cancelled notification");
695 return;
696 }
697
698 match inner.peer.notify_cancelled(notification).await {
699 Ok(_) => {
700 debug!("Proxying cancelled notification");
701 }
702 Err(err) => {
703 error!("Error notifying cancelled: {:?}", err);
704 }
705 }
706 }
707}
708
709impl ProxyHandler {
710 #[inline]
712 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
713 &self.cached_info.capabilities
714 }
715
716 fn default_server_info(mcp_id: &str) -> ServerInfo {
718 warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
719 ServerInfo {
720 protocol_version: Default::default(),
721 server_info: Implementation {
722 name: "MCP Proxy".to_string(),
723 version: "0.1.0".to_string(),
724 title: None,
725 website_url: None,
726 icons: None,
727 },
728 instructions: None,
729 capabilities: Default::default(),
730 }
731 }
732
733 fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
735 client.peer_info().map(|peer_info| ServerInfo {
736 protocol_version: peer_info.protocol_version.clone(),
737 server_info: Implementation {
738 name: peer_info.server_info.name.clone(),
739 version: peer_info.server_info.version.clone(),
740 title: None,
741 website_url: None,
742 icons: None,
743 },
744 instructions: peer_info.instructions.clone(),
745 capabilities: peer_info.capabilities.clone(),
746 }).unwrap_or_else(|| Self::default_server_info(mcp_id))
747 }
748
749 pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
752 info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
753
754 if tool_filter.is_enabled() {
756 if let Some(ref allow_list) = tool_filter.allow_tools {
757 info!(
758 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
759 mcp_id, allow_list
760 );
761 }
762 if let Some(ref deny_list) = tool_filter.deny_tools {
763 info!(
764 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
765 mcp_id, deny_list
766 );
767 }
768 }
769
770 Self {
771 peer: Arc::new(ArcSwapOption::empty()),
772 cached_info: default_info,
773 mcp_id,
774 tool_filter,
775 }
776 }
777
778 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
779 Self::with_mcp_id(client, "unknown".to_string())
780 }
781
782 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
783 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
784 }
785
786 pub fn with_tool_filter(
788 client: RunningService<RoleClient, ClientInfo>,
789 mcp_id: String,
790 tool_filter: ToolFilter,
791 ) -> Self {
792 use std::ops::Deref;
793
794 let cached_info = Self::extract_server_info(&client, &mcp_id);
796
797 let peer = client.deref().clone();
799
800 if tool_filter.is_enabled() {
802 if let Some(ref allow_list) = tool_filter.allow_tools {
803 info!(
804 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
805 mcp_id, allow_list
806 );
807 }
808 if let Some(ref deny_list) = tool_filter.deny_tools {
809 info!(
810 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
811 mcp_id, deny_list
812 );
813 }
814 }
815
816 let inner = PeerInner {
818 peer,
819 _running: Arc::new(client),
820 };
821
822 Self {
823 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
824 cached_info,
825 mcp_id,
826 tool_filter,
827 }
828 }
829
830 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
834 use std::ops::Deref;
835
836 match new_client {
837 Some(client) => {
838 let peer = client.deref().clone();
839 let inner = PeerInner {
840 peer,
841 _running: Arc::new(client),
842 };
843 self.peer.store(Some(Arc::new(inner)));
844 info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
845 }
846 None => {
847 self.peer.store(None);
848 info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
849 }
850 }
851 }
852
853 pub fn is_backend_available(&self) -> bool {
855 let inner_guard = self.peer.load();
856 match inner_guard.as_ref() {
857 Some(inner) => !inner.peer.is_transport_closed(),
858 None => false,
859 }
860 }
861
862 pub async fn is_mcp_server_ready(&self) -> bool {
864 !self.is_terminated_async().await
865 }
866
867 pub fn is_terminated(&self) -> bool {
869 !self.is_backend_available()
870 }
871
872 pub async fn is_terminated_async(&self) -> bool {
874 let inner_guard = self.peer.load();
876 let inner = match inner_guard.as_ref() {
877 Some(inner) => inner,
878 None => return true,
879 };
880
881 if inner.peer.is_transport_closed() {
883 return true;
884 }
885
886 match inner.peer.list_tools(None).await {
888 Ok(_) => {
889 debug!("后端连接状态检查: 正常");
890 false
891 }
892 Err(e) => {
893 info!("后端连接状态检查: 已断开,原因: {e}");
894 true
895 }
896 }
897 }
898
899 pub fn mcp_id(&self) -> &str {
901 &self.mcp_id
902 }
903}