1use tracing::{debug, info, warn, error};
2use rmcp::{
6 ErrorData, RoleClient, RoleServer, ServerHandler,
7 model::{
8 CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
9 PaginatedRequestParam, ServerInfo,
10 },
11 service::{NotificationContext, Peer, RequestContext, RunningService},
12};
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use arc_swap::ArcSwapOption;
16pub use mcp_common::ToolFilter;
17
18#[derive(Debug)]
21struct PeerInner {
22 peer: Peer<RoleClient>,
24 #[allow(dead_code)]
26 _running: Arc<RunningService<RoleClient, ClientInfo>>,
27}
28
29#[derive(Clone, Debug)]
36pub struct ProxyHandler {
37 peer: Arc<ArcSwapOption<PeerInner>>,
40 cached_info: ServerInfo,
42 mcp_id: String,
44 tool_filter: ToolFilter,
46 backend_version: Arc<AtomicU64>,
49}
50
51impl ServerHandler for ProxyHandler {
52 fn get_info(&self) -> ServerInfo {
53 self.cached_info.clone()
54 }
55
56 #[tracing::instrument(skip(self, request, context), fields(
57 mcp_id = %self.mcp_id,
58 request = ?request,
59 ))]
60 async fn list_tools(
61 &self,
62 request: Option<PaginatedRequestParam>,
63 context: RequestContext<RoleServer>,
64 ) -> Result<ListToolsResult, ErrorData> {
65 let inner_guard = self.peer.load();
67 let inner = inner_guard.as_ref().ok_or_else(|| {
68 error!("Backend connection is not available (reconnecting)");
69 ErrorData::internal_error(
70 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
71 None,
72 )
73 })?;
74
75 if inner.peer.is_transport_closed() {
77 error!("Backend transport is closed");
78 return Err(ErrorData::internal_error(
79 "Backend connection closed, please retry".to_string(),
80 None,
81 ));
82 }
83
84 match self.capabilities().tools {
86 Some(_) => {
87 tokio::select! {
89 result = inner.peer.list_tools(request) => {
90 match result {
91 Ok(result) => {
92 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
94 result
95 .tools
96 .into_iter()
97 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
98 .collect()
99 } else {
100 result.tools
101 };
102
103 info!(
105 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
106 self.mcp_id,
107 filtered_tools.len(),
108 if self.tool_filter.is_enabled() {
109 " (已过滤)"
110 } else {
111 ""
112 }
113 );
114
115 debug!(
116 "Proxying list_tools response with {} tools",
117 filtered_tools.len()
118 );
119 Ok(ListToolsResult {
120 tools: filtered_tools,
121 next_cursor: result.next_cursor,
122 meta: result.meta, })
124 }
125 Err(err) => {
126 error!("Error listing tools: {:?}", err);
127 Err(ErrorData::internal_error(
128 format!("Error listing tools: {err}"),
129 None,
130 ))
131 }
132 }
133 }
134 _ = context.ct.cancelled() => {
135 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
136 Err(ErrorData::internal_error(
137 "Request cancelled".to_string(),
138 None,
139 ))
140 }
141 }
142 }
143 None => {
144 warn!("Server doesn't support tools capability");
146 Ok(ListToolsResult::default())
147 }
148 }
149 }
150
151 #[tracing::instrument(skip(self, request, context), fields(
152 mcp_id = %self.mcp_id,
153 tool_name = %request.name,
154 tool_arguments = ?request.arguments,
155 ))]
156 async fn call_tool(
157 &self,
158 request: CallToolRequestParam,
159 context: RequestContext<RoleServer>,
160 ) -> Result<CallToolResult, ErrorData> {
161 if !self.tool_filter.is_allowed(&request.name) {
163 info!(
164 "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
165 self.mcp_id, request.name
166 );
167 return Ok(CallToolResult::error(vec![Content::text(format!(
168 "Tool '{}' is not allowed by filter configuration",
169 request.name
170 ))]));
171 }
172
173 let inner_guard = self.peer.load();
175 let inner = match inner_guard.as_ref() {
176 Some(inner) => inner,
177 None => {
178 error!("Backend connection is not available (reconnecting)");
179 return Ok(CallToolResult::error(vec![Content::text(
180 "Backend connection is not available, reconnecting..."
181 )]));
182 }
183 };
184
185 if inner.peer.is_transport_closed() {
187 error!("Backend transport is closed");
188 return Ok(CallToolResult::error(vec![Content::text(
189 "Backend connection closed, please retry",
190 )]));
191 }
192
193 match self.capabilities().tools {
195 Some(_) => {
196 tokio::select! {
198 result = inner.peer.call_tool(request.clone()) => {
199 match result {
200 Ok(result) => {
201 info!(
203 "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}",
204 self.mcp_id, request.name
205 );
206
207 debug!("Tool call succeeded");
208 Ok(result)
209 }
210 Err(err) => {
211 error!("Error calling tool: {:?}", err);
212 Ok(CallToolResult::error(vec![Content::text(format!(
214 "Error: {err}"
215 ))]))
216 }
217 }
218 }
219 _ = context.ct.cancelled() => {
220 info!(
221 "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}",
222 self.mcp_id, request.name
223 );
224 Ok(CallToolResult::error(vec![Content::text(
225 "Request cancelled"
226 )]))
227 }
228 }
229 }
230 None => {
231 error!("Server doesn't support tools capability");
232 Ok(CallToolResult::error(vec![Content::text(
233 "Server doesn't support tools capability",
234 )]))
235 }
236 }
237 }
238
239 async fn list_resources(
240 &self,
241 request: Option<PaginatedRequestParam>,
242 context: RequestContext<RoleServer>,
243 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
244 let inner_guard = self.peer.load();
246 let inner = inner_guard.as_ref().ok_or_else(|| {
247 error!("Backend connection is not available (reconnecting)");
248 ErrorData::internal_error(
249 "Backend connection is not available, reconnecting...".to_string(),
250 None,
251 )
252 })?;
253
254 if inner.peer.is_transport_closed() {
256 error!("Backend transport is closed");
257 return Err(ErrorData::internal_error(
258 "Backend connection closed, please retry".to_string(),
259 None,
260 ));
261 }
262
263 match self.capabilities().resources {
265 Some(_) => {
266 tokio::select! {
267 result = inner.peer.list_resources(request) => {
268 match result {
269 Ok(result) => {
270 info!(
272 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
273 self.mcp_id,
274 result.resources.len()
275 );
276
277 debug!("Proxying list_resources response");
278 Ok(result)
279 }
280 Err(err) => {
281 error!("Error listing resources: {:?}", err);
282 Err(ErrorData::internal_error(
283 format!("Error listing resources: {err}"),
284 None,
285 ))
286 }
287 }
288 }
289 _ = context.ct.cancelled() => {
290 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
291 Err(ErrorData::internal_error(
292 "Request cancelled".to_string(),
293 None,
294 ))
295 }
296 }
297 }
298 None => {
299 warn!("Server doesn't support resources capability");
301 Ok(rmcp::model::ListResourcesResult::default())
302 }
303 }
304 }
305
306 async fn read_resource(
307 &self,
308 request: rmcp::model::ReadResourceRequestParam,
309 context: RequestContext<RoleServer>,
310 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
311 let inner_guard = self.peer.load();
313 let inner = inner_guard.as_ref().ok_or_else(|| {
314 error!("Backend connection is not available (reconnecting)");
315 ErrorData::internal_error(
316 "Backend connection is not available, reconnecting...".to_string(),
317 None,
318 )
319 })?;
320
321 if inner.peer.is_transport_closed() {
323 error!("Backend transport is closed");
324 return Err(ErrorData::internal_error(
325 "Backend connection closed, please retry".to_string(),
326 None,
327 ));
328 }
329
330 match self.capabilities().resources {
332 Some(_) => {
333 tokio::select! {
334 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
335 uri: request.uri.clone(),
336 }) => {
337 match result {
338 Ok(result) => {
339 info!(
341 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
342 self.mcp_id, request.uri
343 );
344
345 debug!("Proxying read_resource response for {}", request.uri);
346 Ok(result)
347 }
348 Err(err) => {
349 error!("Error reading resource: {:?}", err);
350 Err(ErrorData::internal_error(
351 format!("Error reading resource: {err}"),
352 None,
353 ))
354 }
355 }
356 }
357 _ = context.ct.cancelled() => {
358 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
359 Err(ErrorData::internal_error(
360 "Request cancelled".to_string(),
361 None,
362 ))
363 }
364 }
365 }
366 None => {
367 error!("Server doesn't support resources capability");
369 Ok(rmcp::model::ReadResourceResult {
370 contents: Vec::new(),
371 })
372 }
373 }
374 }
375
376 async fn list_resource_templates(
377 &self,
378 request: Option<PaginatedRequestParam>,
379 context: RequestContext<RoleServer>,
380 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
381 let inner_guard = self.peer.load();
383 let inner = inner_guard.as_ref().ok_or_else(|| {
384 error!("Backend connection is not available (reconnecting)");
385 ErrorData::internal_error(
386 "Backend connection is not available, reconnecting...".to_string(),
387 None,
388 )
389 })?;
390
391 if inner.peer.is_transport_closed() {
393 error!("Backend transport is closed");
394 return Err(ErrorData::internal_error(
395 "Backend connection closed, please retry".to_string(),
396 None,
397 ));
398 }
399
400 match self.capabilities().resources {
402 Some(_) => {
403 tokio::select! {
404 result = inner.peer.list_resource_templates(request) => {
405 match result {
406 Ok(result) => {
407 debug!("Proxying list_resource_templates response");
408 Ok(result)
409 }
410 Err(err) => {
411 error!("Error listing resource templates: {:?}", err);
412 Err(ErrorData::internal_error(
413 format!("Error listing resource templates: {err}"),
414 None,
415 ))
416 }
417 }
418 }
419 _ = context.ct.cancelled() => {
420 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
421 Err(ErrorData::internal_error(
422 "Request cancelled".to_string(),
423 None,
424 ))
425 }
426 }
427 }
428 None => {
429 warn!("Server doesn't support resources capability");
431 Ok(rmcp::model::ListResourceTemplatesResult::default())
432 }
433 }
434 }
435
436 async fn list_prompts(
437 &self,
438 request: Option<PaginatedRequestParam>,
439 context: RequestContext<RoleServer>,
440 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
441 let inner_guard = self.peer.load();
443 let inner = inner_guard.as_ref().ok_or_else(|| {
444 error!("Backend connection is not available (reconnecting)");
445 ErrorData::internal_error(
446 "Backend connection is not available, reconnecting...".to_string(),
447 None,
448 )
449 })?;
450
451 if inner.peer.is_transport_closed() {
453 error!("Backend transport is closed");
454 return Err(ErrorData::internal_error(
455 "Backend connection closed, please retry".to_string(),
456 None,
457 ));
458 }
459
460 match self.capabilities().prompts {
462 Some(_) => {
463 tokio::select! {
464 result = inner.peer.list_prompts(request) => {
465 match result {
466 Ok(result) => {
467 debug!("Proxying list_prompts response");
468 Ok(result)
469 }
470 Err(err) => {
471 error!("Error listing prompts: {:?}", err);
472 Err(ErrorData::internal_error(
473 format!("Error listing prompts: {err}"),
474 None,
475 ))
476 }
477 }
478 }
479 _ = context.ct.cancelled() => {
480 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
481 Err(ErrorData::internal_error(
482 "Request cancelled".to_string(),
483 None,
484 ))
485 }
486 }
487 }
488 None => {
489 warn!("Server doesn't support prompts capability");
491 Ok(rmcp::model::ListPromptsResult::default())
492 }
493 }
494 }
495
496 async fn get_prompt(
497 &self,
498 request: rmcp::model::GetPromptRequestParam,
499 context: RequestContext<RoleServer>,
500 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
501 let inner_guard = self.peer.load();
503 let inner = inner_guard.as_ref().ok_or_else(|| {
504 error!("Backend connection is not available (reconnecting)");
505 ErrorData::internal_error(
506 "Backend connection is not available, reconnecting...".to_string(),
507 None,
508 )
509 })?;
510
511 if inner.peer.is_transport_closed() {
513 error!("Backend transport is closed");
514 return Err(ErrorData::internal_error(
515 "Backend connection closed, please retry".to_string(),
516 None,
517 ));
518 }
519
520 match self.capabilities().prompts {
522 Some(_) => {
523 tokio::select! {
524 result = inner.peer.get_prompt(request.clone()) => {
525 match result {
526 Ok(result) => {
527 debug!("Proxying get_prompt response");
528 Ok(result)
529 }
530 Err(err) => {
531 error!("Error getting prompt: {:?}", err);
532 Err(ErrorData::internal_error(
533 format!("Error getting prompt: {err}"),
534 None,
535 ))
536 }
537 }
538 }
539 _ = context.ct.cancelled() => {
540 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
541 Err(ErrorData::internal_error(
542 "Request cancelled".to_string(),
543 None,
544 ))
545 }
546 }
547 }
548 None => {
549 warn!("Server doesn't support prompts capability");
551 Ok(rmcp::model::GetPromptResult {
552 description: None,
553 messages: Vec::new(),
554 })
555 }
556 }
557 }
558
559 async fn complete(
560 &self,
561 request: rmcp::model::CompleteRequestParam,
562 context: RequestContext<RoleServer>,
563 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
564 let inner_guard = self.peer.load();
566 let inner = inner_guard.as_ref().ok_or_else(|| {
567 error!("Backend connection is not available (reconnecting)");
568 ErrorData::internal_error(
569 "Backend connection is not available, reconnecting...".to_string(),
570 None,
571 )
572 })?;
573
574 if inner.peer.is_transport_closed() {
576 error!("Backend transport is closed");
577 return Err(ErrorData::internal_error(
578 "Backend connection closed, please retry".to_string(),
579 None,
580 ));
581 }
582
583 tokio::select! {
584 result = inner.peer.complete(request) => {
585 match result {
586 Ok(result) => {
587 debug!("Proxying complete response");
588 Ok(result)
589 }
590 Err(err) => {
591 error!("Error completing: {:?}", err);
592 Err(ErrorData::internal_error(
593 format!("Error completing: {err}"),
594 None,
595 ))
596 }
597 }
598 }
599 _ = context.ct.cancelled() => {
600 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
601 Err(ErrorData::internal_error(
602 "Request cancelled".to_string(),
603 None,
604 ))
605 }
606 }
607 }
608
609 async fn on_progress(
610 &self,
611 notification: rmcp::model::ProgressNotificationParam,
612 _context: NotificationContext<RoleServer>,
613 ) {
614 let inner_guard = self.peer.load();
616 let inner = match inner_guard.as_ref() {
617 Some(inner) => inner,
618 None => {
619 error!("Backend connection is not available, cannot forward progress notification");
620 return;
621 }
622 };
623
624 if inner.peer.is_transport_closed() {
626 error!("Backend transport is closed, cannot forward progress notification");
627 return;
628 }
629
630 match inner.peer.notify_progress(notification).await {
631 Ok(_) => {
632 debug!("Proxying progress notification");
633 }
634 Err(err) => {
635 error!("Error notifying progress: {:?}", err);
636 }
637 }
638 }
639
640 async fn on_cancelled(
641 &self,
642 notification: rmcp::model::CancelledNotificationParam,
643 _context: NotificationContext<RoleServer>,
644 ) {
645 let inner_guard = self.peer.load();
647 let inner = match inner_guard.as_ref() {
648 Some(inner) => inner,
649 None => {
650 error!("Backend connection is not available, cannot forward cancelled notification");
651 return;
652 }
653 };
654
655 if inner.peer.is_transport_closed() {
657 error!("Backend transport is closed, cannot forward cancelled notification");
658 return;
659 }
660
661 match inner.peer.notify_cancelled(notification).await {
662 Ok(_) => {
663 debug!("Proxying cancelled notification");
664 }
665 Err(err) => {
666 error!("Error notifying cancelled: {:?}", err);
667 }
668 }
669 }
670}
671
672impl ProxyHandler {
673 #[inline]
675 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
676 &self.cached_info.capabilities
677 }
678
679 fn default_server_info(mcp_id: &str) -> ServerInfo {
681 warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
682 ServerInfo {
683 protocol_version: Default::default(),
684 server_info: Implementation {
685 name: "MCP Proxy".to_string(),
686 version: "0.1.0".to_string(),
687 title: None,
688 website_url: None,
689 icons: None,
690 },
691 instructions: None,
692 capabilities: Default::default(),
693 }
694 }
695
696 fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
698 client.peer_info().map(|peer_info| ServerInfo {
699 protocol_version: peer_info.protocol_version.clone(),
700 server_info: Implementation {
701 name: peer_info.server_info.name.clone(),
702 version: peer_info.server_info.version.clone(),
703 title: None,
704 website_url: None,
705 icons: None,
706 },
707 instructions: peer_info.instructions.clone(),
708 capabilities: peer_info.capabilities.clone(),
709 }).unwrap_or_else(|| Self::default_server_info(mcp_id))
710 }
711
712 pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
715 info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
716
717 if tool_filter.is_enabled() {
719 if let Some(ref allow_list) = tool_filter.allow_tools {
720 info!(
721 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
722 mcp_id, allow_list
723 );
724 }
725 if let Some(ref deny_list) = tool_filter.deny_tools {
726 info!(
727 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
728 mcp_id, deny_list
729 );
730 }
731 }
732
733 Self {
734 peer: Arc::new(ArcSwapOption::empty()),
735 cached_info: default_info,
736 mcp_id,
737 tool_filter,
738 backend_version: Arc::new(AtomicU64::new(0)), }
740 }
741
742 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
743 Self::with_mcp_id(client, "unknown".to_string())
744 }
745
746 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
747 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
748 }
749
750 pub fn with_tool_filter(
752 client: RunningService<RoleClient, ClientInfo>,
753 mcp_id: String,
754 tool_filter: ToolFilter,
755 ) -> Self {
756 use std::ops::Deref;
757
758 let cached_info = Self::extract_server_info(&client, &mcp_id);
760
761 let peer = client.deref().clone();
763
764 if tool_filter.is_enabled() {
766 if let Some(ref allow_list) = tool_filter.allow_tools {
767 info!(
768 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
769 mcp_id, allow_list
770 );
771 }
772 if let Some(ref deny_list) = tool_filter.deny_tools {
773 info!(
774 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
775 mcp_id, deny_list
776 );
777 }
778 }
779
780 let inner = PeerInner {
782 peer,
783 _running: Arc::new(client),
784 };
785
786 Self {
787 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
788 cached_info,
789 mcp_id,
790 tool_filter,
791 backend_version: Arc::new(AtomicU64::new(1)), }
793 }
794
795 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
801 use std::ops::Deref;
802
803 match new_client {
804 Some(client) => {
805 let peer = client.deref().clone();
806 let inner = PeerInner {
807 peer,
808 _running: Arc::new(client),
809 };
810 self.peer.store(Some(Arc::new(inner)));
811 info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
812 }
813 None => {
814 self.peer.store(None);
815 info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
816 }
817 }
818
819 let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
821 info!(
822 "[ProxyHandler] 后端版本更新: {} - MCP ID: {}",
823 new_version, self.mcp_id
824 );
825 }
826
827 pub fn is_backend_available(&self) -> bool {
829 let inner_guard = self.peer.load();
830 match inner_guard.as_ref() {
831 Some(inner) => !inner.peer.is_transport_closed(),
832 None => false,
833 }
834 }
835
836 pub async fn is_mcp_server_ready(&self) -> bool {
838 !self.is_terminated_async().await
839 }
840
841 pub fn is_terminated(&self) -> bool {
843 !self.is_backend_available()
844 }
845
846 pub async fn is_terminated_async(&self) -> bool {
848 let inner_guard = self.peer.load();
850 let inner = match inner_guard.as_ref() {
851 Some(inner) => inner,
852 None => return true,
853 };
854
855 if inner.peer.is_transport_closed() {
857 return true;
858 }
859
860 match inner.peer.list_tools(None).await {
862 Ok(_) => {
863 debug!("后端连接状态检查: 正常");
864 false
865 }
866 Err(e) => {
867 info!("后端连接状态检查: 已断开,原因: {e}");
868 true
869 }
870 }
871 }
872
873 pub fn mcp_id(&self) -> &str {
875 &self.mcp_id
876 }
877
878 pub fn get_backend_version(&self) -> u64 {
886 self.backend_version.load(Ordering::SeqCst)
887 }
888}