1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3use rmcp::{
7 ErrorData, RoleClient, RoleServer, ServerHandler,
8 model::{
9 CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10 PaginatedRequestParam, ServerInfo,
11 },
12 service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Instant;
17use tracing::{debug, error, info, warn};
18
19static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22#[derive(Debug)]
25struct PeerInner {
26 peer: Peer<RoleClient>,
28 #[allow(dead_code)]
30 _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33#[derive(Clone, Debug)]
40pub struct ProxyHandler {
41 peer: Arc<ArcSwapOption<PeerInner>>,
44 cached_info: ServerInfo,
46 mcp_id: String,
48 tool_filter: ToolFilter,
50 backend_version: Arc<AtomicU64>,
53}
54
55impl ServerHandler for ProxyHandler {
56 fn get_info(&self) -> ServerInfo {
57 self.cached_info.clone()
58 }
59
60 #[tracing::instrument(skip(self, request, context), fields(
61 mcp_id = %self.mcp_id,
62 request = ?request,
63 ))]
64 async fn list_tools(
65 &self,
66 request: Option<PaginatedRequestParam>,
67 context: RequestContext<RoleServer>,
68 ) -> Result<ListToolsResult, ErrorData> {
69 let inner_guard = self.peer.load();
71 let inner = inner_guard.as_ref().ok_or_else(|| {
72 error!("Backend connection is not available (reconnecting)");
73 ErrorData::internal_error(
74 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
75 None,
76 )
77 })?;
78
79 if inner.peer.is_transport_closed() {
81 error!("Backend transport is closed");
82 return Err(ErrorData::internal_error(
83 "Backend connection closed, please retry".to_string(),
84 None,
85 ));
86 }
87
88 match self.capabilities().tools {
90 Some(_) => {
91 tokio::select! {
93 result = inner.peer.list_tools(request) => {
94 match result {
95 Ok(result) => {
96 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
98 result
99 .tools
100 .into_iter()
101 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
102 .collect()
103 } else {
104 result.tools
105 };
106
107 info!(
109 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
110 self.mcp_id,
111 filtered_tools.len(),
112 if self.tool_filter.is_enabled() {
113 " (已过滤)"
114 } else {
115 ""
116 }
117 );
118
119 debug!(
120 "Proxying list_tools response with {} tools",
121 filtered_tools.len()
122 );
123 Ok(ListToolsResult {
124 tools: filtered_tools,
125 next_cursor: result.next_cursor,
126 meta: result.meta, })
128 }
129 Err(err) => {
130 error!("Error listing tools: {:?}", err);
131 Err(ErrorData::internal_error(
132 format!("Error listing tools: {err}"),
133 None,
134 ))
135 }
136 }
137 }
138 _ = context.ct.cancelled() => {
139 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
140 Err(ErrorData::internal_error(
141 "Request cancelled".to_string(),
142 None,
143 ))
144 }
145 }
146 }
147 None => {
148 warn!("Server doesn't support tools capability");
150 Ok(ListToolsResult::default())
151 }
152 }
153 }
154
155 #[tracing::instrument(skip(self, request, context), fields(
156 mcp_id = %self.mcp_id,
157 tool_name = %request.name,
158 tool_arguments = ?request.arguments,
159 ))]
160 async fn call_tool(
161 &self,
162 request: CallToolRequestParam,
163 context: RequestContext<RoleServer>,
164 ) -> Result<CallToolResult, ErrorData> {
165 let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
167 let start = Instant::now();
168
169 info!(
170 "[call_tool:{}] 开始 - 工具: {}, MCP ID: {}",
171 request_id, request.name, self.mcp_id
172 );
173
174 if !self.tool_filter.is_allowed(&request.name) {
176 info!(
177 "[call_tool:{}] 工具被过滤 - MCP ID: {}, 工具: {}",
178 request_id, self.mcp_id, request.name
179 );
180 return Ok(CallToolResult::error(vec![Content::text(format!(
181 "Tool '{}' is not allowed by filter configuration",
182 request.name
183 ))]));
184 }
185
186 let inner_guard = self.peer.load();
188 let inner = match inner_guard.as_ref() {
189 Some(inner) => {
190 let transport_closed = inner.peer.is_transport_closed();
191 info!(
192 "[call_tool:{}] 后端连接存在 - transport_closed: {}",
193 request_id, transport_closed
194 );
195 inner
196 }
197 None => {
198 error!(
199 "[call_tool:{}] 后端连接不可用 (正在重连) - MCP ID: {}",
200 request_id, self.mcp_id
201 );
202 return Ok(CallToolResult::error(vec![Content::text(
203 "Backend connection is not available, reconnecting...",
204 )]));
205 }
206 };
207
208 if inner.peer.is_transport_closed() {
210 error!(
211 "[call_tool:{}] 后端 transport 已关闭 - MCP ID: {}",
212 request_id, self.mcp_id
213 );
214 return Ok(CallToolResult::error(vec![Content::text(
215 "Backend connection closed, please retry",
216 )]));
217 }
218
219 let result = match self.capabilities().tools {
221 Some(_) => {
222 info!(
224 "[call_tool:{}] 发送请求到后端... - 工具: {}, 已耗时: {}ms",
225 request_id,
226 request.name,
227 start.elapsed().as_millis()
228 );
229
230 let call_future = inner.peer.call_tool(request.clone());
232 tokio::pin!(call_future);
233
234 const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
236 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
237 heartbeat_interval.tick().await;
239
240 let call_result = loop {
242 tokio::select! {
243 biased;
245
246 result = &mut call_future => {
247 break result;
248 }
249 _ = context.ct.cancelled() => {
250 let elapsed = start.elapsed();
251 warn!(
252 "[call_tool:{}] 请求被取消 - 工具: {}, 耗时: {}ms, MCP ID: {}",
253 request_id, request.name, elapsed.as_millis(), self.mcp_id
254 );
255 return Ok(CallToolResult::error(vec![Content::text(
256 "Request cancelled"
257 )]));
258 }
259 _ = heartbeat_interval.tick() => {
260 let elapsed = start.elapsed();
262 let transport_closed = inner.peer.is_transport_closed();
263 info!(
264 "[call_tool:{}] 等待后端响应中... - 工具: {}, 已等待: {}ms, \
265 transport_closed: {}, MCP ID: {}",
266 request_id, request.name, elapsed.as_millis(),
267 transport_closed, self.mcp_id
268 );
269 }
270 }
271 };
272
273 let elapsed = start.elapsed();
274 match &call_result {
275 Ok(call_result) => {
276 let is_error = call_result.is_error.unwrap_or(false);
278 info!(
279 "[call_tool:{}] 收到响应 - 工具: {}, 耗时: {}ms, is_error: {}, MCP ID: {}",
280 request_id,
281 request.name,
282 elapsed.as_millis(),
283 is_error,
284 self.mcp_id
285 );
286 if is_error {
287 debug!(
288 "[call_tool:{}] 错误响应内容: {:?}",
289 request_id, call_result.content
290 );
291 }
292 Ok(call_result.clone())
293 }
294 Err(err) => {
295 error!(
296 "[call_tool:{}] 后端返回错误 - 工具: {}, 耗时: {}ms, 错误: {:?}, MCP ID: {}",
297 request_id,
298 request.name,
299 elapsed.as_millis(),
300 err,
301 self.mcp_id
302 );
303 Ok(CallToolResult::error(vec![Content::text(format!(
305 "Error: {err}"
306 ))]))
307 }
308 }
309 }
310 None => {
311 error!(
312 "[call_tool:{}] 服务器不支持 tools capability - MCP ID: {}",
313 request_id, self.mcp_id
314 );
315 Ok(CallToolResult::error(vec![Content::text(
316 "Server doesn't support tools capability",
317 )]))
318 }
319 };
320
321 let total_elapsed = start.elapsed();
322 info!(
323 "[call_tool:{}] 完成 - 工具: {}, 总耗时: {}ms",
324 request_id,
325 request.name,
326 total_elapsed.as_millis()
327 );
328 result
329 }
330
331 async fn list_resources(
332 &self,
333 request: Option<PaginatedRequestParam>,
334 context: RequestContext<RoleServer>,
335 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
336 let inner_guard = self.peer.load();
338 let inner = inner_guard.as_ref().ok_or_else(|| {
339 error!("Backend connection is not available (reconnecting)");
340 ErrorData::internal_error(
341 "Backend connection is not available, reconnecting...".to_string(),
342 None,
343 )
344 })?;
345
346 if inner.peer.is_transport_closed() {
348 error!("Backend transport is closed");
349 return Err(ErrorData::internal_error(
350 "Backend connection closed, please retry".to_string(),
351 None,
352 ));
353 }
354
355 match self.capabilities().resources {
357 Some(_) => {
358 tokio::select! {
359 result = inner.peer.list_resources(request) => {
360 match result {
361 Ok(result) => {
362 info!(
364 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
365 self.mcp_id,
366 result.resources.len()
367 );
368
369 debug!("Proxying list_resources response");
370 Ok(result)
371 }
372 Err(err) => {
373 error!("Error listing resources: {:?}", err);
374 Err(ErrorData::internal_error(
375 format!("Error listing resources: {err}"),
376 None,
377 ))
378 }
379 }
380 }
381 _ = context.ct.cancelled() => {
382 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
383 Err(ErrorData::internal_error(
384 "Request cancelled".to_string(),
385 None,
386 ))
387 }
388 }
389 }
390 None => {
391 warn!("Server doesn't support resources capability");
393 Ok(rmcp::model::ListResourcesResult::default())
394 }
395 }
396 }
397
398 async fn read_resource(
399 &self,
400 request: rmcp::model::ReadResourceRequestParam,
401 context: RequestContext<RoleServer>,
402 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
403 let inner_guard = self.peer.load();
405 let inner = inner_guard.as_ref().ok_or_else(|| {
406 error!("Backend connection is not available (reconnecting)");
407 ErrorData::internal_error(
408 "Backend connection is not available, reconnecting...".to_string(),
409 None,
410 )
411 })?;
412
413 if inner.peer.is_transport_closed() {
415 error!("Backend transport is closed");
416 return Err(ErrorData::internal_error(
417 "Backend connection closed, please retry".to_string(),
418 None,
419 ));
420 }
421
422 match self.capabilities().resources {
424 Some(_) => {
425 tokio::select! {
426 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
427 uri: request.uri.clone(),
428 }) => {
429 match result {
430 Ok(result) => {
431 info!(
433 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
434 self.mcp_id, request.uri
435 );
436
437 debug!("Proxying read_resource response for {}", request.uri);
438 Ok(result)
439 }
440 Err(err) => {
441 error!("Error reading resource: {:?}", err);
442 Err(ErrorData::internal_error(
443 format!("Error reading resource: {err}"),
444 None,
445 ))
446 }
447 }
448 }
449 _ = context.ct.cancelled() => {
450 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
451 Err(ErrorData::internal_error(
452 "Request cancelled".to_string(),
453 None,
454 ))
455 }
456 }
457 }
458 None => {
459 error!("Server doesn't support resources capability");
461 Ok(rmcp::model::ReadResourceResult {
462 contents: Vec::new(),
463 })
464 }
465 }
466 }
467
468 async fn list_resource_templates(
469 &self,
470 request: Option<PaginatedRequestParam>,
471 context: RequestContext<RoleServer>,
472 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
473 let inner_guard = self.peer.load();
475 let inner = inner_guard.as_ref().ok_or_else(|| {
476 error!("Backend connection is not available (reconnecting)");
477 ErrorData::internal_error(
478 "Backend connection is not available, reconnecting...".to_string(),
479 None,
480 )
481 })?;
482
483 if inner.peer.is_transport_closed() {
485 error!("Backend transport is closed");
486 return Err(ErrorData::internal_error(
487 "Backend connection closed, please retry".to_string(),
488 None,
489 ));
490 }
491
492 match self.capabilities().resources {
494 Some(_) => {
495 tokio::select! {
496 result = inner.peer.list_resource_templates(request) => {
497 match result {
498 Ok(result) => {
499 debug!("Proxying list_resource_templates response");
500 Ok(result)
501 }
502 Err(err) => {
503 error!("Error listing resource templates: {:?}", err);
504 Err(ErrorData::internal_error(
505 format!("Error listing resource templates: {err}"),
506 None,
507 ))
508 }
509 }
510 }
511 _ = context.ct.cancelled() => {
512 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
513 Err(ErrorData::internal_error(
514 "Request cancelled".to_string(),
515 None,
516 ))
517 }
518 }
519 }
520 None => {
521 warn!("Server doesn't support resources capability");
523 Ok(rmcp::model::ListResourceTemplatesResult::default())
524 }
525 }
526 }
527
528 async fn list_prompts(
529 &self,
530 request: Option<PaginatedRequestParam>,
531 context: RequestContext<RoleServer>,
532 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
533 let inner_guard = self.peer.load();
535 let inner = inner_guard.as_ref().ok_or_else(|| {
536 error!("Backend connection is not available (reconnecting)");
537 ErrorData::internal_error(
538 "Backend connection is not available, reconnecting...".to_string(),
539 None,
540 )
541 })?;
542
543 if inner.peer.is_transport_closed() {
545 error!("Backend transport is closed");
546 return Err(ErrorData::internal_error(
547 "Backend connection closed, please retry".to_string(),
548 None,
549 ));
550 }
551
552 match self.capabilities().prompts {
554 Some(_) => {
555 tokio::select! {
556 result = inner.peer.list_prompts(request) => {
557 match result {
558 Ok(result) => {
559 debug!("Proxying list_prompts response");
560 Ok(result)
561 }
562 Err(err) => {
563 error!("Error listing prompts: {:?}", err);
564 Err(ErrorData::internal_error(
565 format!("Error listing prompts: {err}"),
566 None,
567 ))
568 }
569 }
570 }
571 _ = context.ct.cancelled() => {
572 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
573 Err(ErrorData::internal_error(
574 "Request cancelled".to_string(),
575 None,
576 ))
577 }
578 }
579 }
580 None => {
581 warn!("Server doesn't support prompts capability");
583 Ok(rmcp::model::ListPromptsResult::default())
584 }
585 }
586 }
587
588 async fn get_prompt(
589 &self,
590 request: rmcp::model::GetPromptRequestParam,
591 context: RequestContext<RoleServer>,
592 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
593 let inner_guard = self.peer.load();
595 let inner = inner_guard.as_ref().ok_or_else(|| {
596 error!("Backend connection is not available (reconnecting)");
597 ErrorData::internal_error(
598 "Backend connection is not available, reconnecting...".to_string(),
599 None,
600 )
601 })?;
602
603 if inner.peer.is_transport_closed() {
605 error!("Backend transport is closed");
606 return Err(ErrorData::internal_error(
607 "Backend connection closed, please retry".to_string(),
608 None,
609 ));
610 }
611
612 match self.capabilities().prompts {
614 Some(_) => {
615 tokio::select! {
616 result = inner.peer.get_prompt(request.clone()) => {
617 match result {
618 Ok(result) => {
619 debug!("Proxying get_prompt response");
620 Ok(result)
621 }
622 Err(err) => {
623 error!("Error getting prompt: {:?}", err);
624 Err(ErrorData::internal_error(
625 format!("Error getting prompt: {err}"),
626 None,
627 ))
628 }
629 }
630 }
631 _ = context.ct.cancelled() => {
632 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
633 Err(ErrorData::internal_error(
634 "Request cancelled".to_string(),
635 None,
636 ))
637 }
638 }
639 }
640 None => {
641 warn!("Server doesn't support prompts capability");
643 Ok(rmcp::model::GetPromptResult {
644 description: None,
645 messages: Vec::new(),
646 })
647 }
648 }
649 }
650
651 async fn complete(
652 &self,
653 request: rmcp::model::CompleteRequestParam,
654 context: RequestContext<RoleServer>,
655 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
656 let inner_guard = self.peer.load();
658 let inner = inner_guard.as_ref().ok_or_else(|| {
659 error!("Backend connection is not available (reconnecting)");
660 ErrorData::internal_error(
661 "Backend connection is not available, reconnecting...".to_string(),
662 None,
663 )
664 })?;
665
666 if inner.peer.is_transport_closed() {
668 error!("Backend transport is closed");
669 return Err(ErrorData::internal_error(
670 "Backend connection closed, please retry".to_string(),
671 None,
672 ));
673 }
674
675 tokio::select! {
676 result = inner.peer.complete(request) => {
677 match result {
678 Ok(result) => {
679 debug!("Proxying complete response");
680 Ok(result)
681 }
682 Err(err) => {
683 error!("Error completing: {:?}", err);
684 Err(ErrorData::internal_error(
685 format!("Error completing: {err}"),
686 None,
687 ))
688 }
689 }
690 }
691 _ = context.ct.cancelled() => {
692 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
693 Err(ErrorData::internal_error(
694 "Request cancelled".to_string(),
695 None,
696 ))
697 }
698 }
699 }
700
701 async fn on_progress(
702 &self,
703 notification: rmcp::model::ProgressNotificationParam,
704 _context: NotificationContext<RoleServer>,
705 ) {
706 let inner_guard = self.peer.load();
708 let inner = match inner_guard.as_ref() {
709 Some(inner) => inner,
710 None => {
711 error!("Backend connection is not available, cannot forward progress notification");
712 return;
713 }
714 };
715
716 if inner.peer.is_transport_closed() {
718 error!("Backend transport is closed, cannot forward progress notification");
719 return;
720 }
721
722 match inner.peer.notify_progress(notification).await {
723 Ok(_) => {
724 debug!("Proxying progress notification");
725 }
726 Err(err) => {
727 error!("Error notifying progress: {:?}", err);
728 }
729 }
730 }
731
732 async fn on_cancelled(
733 &self,
734 notification: rmcp::model::CancelledNotificationParam,
735 _context: NotificationContext<RoleServer>,
736 ) {
737 let inner_guard = self.peer.load();
739 let inner = match inner_guard.as_ref() {
740 Some(inner) => inner,
741 None => {
742 error!(
743 "Backend connection is not available, cannot forward cancelled notification"
744 );
745 return;
746 }
747 };
748
749 if inner.peer.is_transport_closed() {
751 error!("Backend transport is closed, cannot forward cancelled notification");
752 return;
753 }
754
755 match inner.peer.notify_cancelled(notification).await {
756 Ok(_) => {
757 debug!("Proxying cancelled notification");
758 }
759 Err(err) => {
760 error!("Error notifying cancelled: {:?}", err);
761 }
762 }
763 }
764}
765
766impl ProxyHandler {
767 #[inline]
769 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
770 &self.cached_info.capabilities
771 }
772
773 fn default_server_info(mcp_id: &str) -> ServerInfo {
775 warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
776 ServerInfo {
777 protocol_version: Default::default(),
778 server_info: Implementation {
779 name: "MCP Proxy".to_string(),
780 version: "0.1.0".to_string(),
781 title: None,
782 website_url: None,
783 icons: None,
784 },
785 instructions: None,
786 capabilities: Default::default(),
787 }
788 }
789
790 fn extract_server_info(
792 client: &RunningService<RoleClient, ClientInfo>,
793 mcp_id: &str,
794 ) -> ServerInfo {
795 client
796 .peer_info()
797 .map(|peer_info| ServerInfo {
798 protocol_version: peer_info.protocol_version.clone(),
799 server_info: Implementation {
800 name: peer_info.server_info.name.clone(),
801 version: peer_info.server_info.version.clone(),
802 title: None,
803 website_url: None,
804 icons: None,
805 },
806 instructions: peer_info.instructions.clone(),
807 capabilities: peer_info.capabilities.clone(),
808 })
809 .unwrap_or_else(|| Self::default_server_info(mcp_id))
810 }
811
812 pub fn new_disconnected(
815 mcp_id: String,
816 tool_filter: ToolFilter,
817 default_info: ServerInfo,
818 ) -> Self {
819 info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
820
821 if tool_filter.is_enabled() {
823 if let Some(ref allow_list) = tool_filter.allow_tools {
824 info!(
825 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
826 mcp_id, allow_list
827 );
828 }
829 if let Some(ref deny_list) = tool_filter.deny_tools {
830 info!(
831 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
832 mcp_id, deny_list
833 );
834 }
835 }
836
837 Self {
838 peer: Arc::new(ArcSwapOption::empty()),
839 cached_info: default_info,
840 mcp_id,
841 tool_filter,
842 backend_version: Arc::new(AtomicU64::new(0)), }
844 }
845
846 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
847 Self::with_mcp_id(client, "unknown".to_string())
848 }
849
850 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
851 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
852 }
853
854 pub fn with_tool_filter(
856 client: RunningService<RoleClient, ClientInfo>,
857 mcp_id: String,
858 tool_filter: ToolFilter,
859 ) -> Self {
860 use std::ops::Deref;
861
862 let cached_info = Self::extract_server_info(&client, &mcp_id);
864
865 let peer = client.deref().clone();
867
868 if tool_filter.is_enabled() {
870 if let Some(ref allow_list) = tool_filter.allow_tools {
871 info!(
872 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
873 mcp_id, allow_list
874 );
875 }
876 if let Some(ref deny_list) = tool_filter.deny_tools {
877 info!(
878 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
879 mcp_id, deny_list
880 );
881 }
882 }
883
884 let inner = PeerInner {
886 peer,
887 _running: Arc::new(client),
888 };
889
890 Self {
891 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
892 cached_info,
893 mcp_id,
894 tool_filter,
895 backend_version: Arc::new(AtomicU64::new(1)), }
897 }
898
899 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
905 use std::ops::Deref;
906
907 match new_client {
908 Some(client) => {
909 let peer = client.deref().clone();
910 let inner = PeerInner {
911 peer,
912 _running: Arc::new(client),
913 };
914 self.peer.store(Some(Arc::new(inner)));
915 info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
916 }
917 None => {
918 self.peer.store(None);
919 info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
920 }
921 }
922
923 let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
925 info!(
926 "[ProxyHandler] 后端版本更新: {} - MCP ID: {}",
927 new_version, self.mcp_id
928 );
929 }
930
931 pub fn is_backend_available(&self) -> bool {
933 let inner_guard = self.peer.load();
934 match inner_guard.as_ref() {
935 Some(inner) => !inner.peer.is_transport_closed(),
936 None => false,
937 }
938 }
939
940 pub async fn is_mcp_server_ready(&self) -> bool {
942 !self.is_terminated_async().await
943 }
944
945 pub fn is_terminated(&self) -> bool {
947 !self.is_backend_available()
948 }
949
950 pub async fn is_terminated_async(&self) -> bool {
952 let inner_guard = self.peer.load();
954 let inner = match inner_guard.as_ref() {
955 Some(inner) => inner,
956 None => return true,
957 };
958
959 if inner.peer.is_transport_closed() {
961 return true;
962 }
963
964 match inner.peer.list_tools(None).await {
966 Ok(_) => {
967 debug!("后端连接状态检查: 正常");
968 false
969 }
970 Err(e) => {
971 info!("后端连接状态检查: 已断开,原因: {e}");
972 true
973 }
974 }
975 }
976
977 pub fn mcp_id(&self) -> &str {
979 &self.mcp_id
980 }
981
982 pub fn get_backend_version(&self) -> u64 {
990 self.backend_version.load(Ordering::SeqCst)
991 }
992
993 pub fn swap_backend_from_connection(
1002 &self,
1003 conn: Option<crate::client::StreamClientConnection>,
1004 ) {
1005 match conn {
1006 Some(c) => {
1007 let running = c.into_running_service();
1008 self.swap_backend(Some(running));
1009 }
1010 None => {
1011 self.swap_backend(None);
1012 }
1013 }
1014 }
1015}