1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3use rmcp::{
7 ErrorData, RoleClient, RoleServer, ServerHandler,
8 model::{
9 CallToolRequestParams, CallToolResult, ClientInfo, Content, Implementation,
10 ListToolsResult, PaginatedRequestParams, ServerInfo,
11 },
12 service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Instant;
17use tracing::{debug, error, info, warn};
18
19static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22#[derive(Debug)]
25struct PeerInner {
26 peer: Peer<RoleClient>,
28 #[allow(dead_code)]
30 _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33#[derive(Clone, Debug)]
40pub struct ProxyHandler {
41 peer: Arc<ArcSwapOption<PeerInner>>,
44 cached_info: ServerInfo,
46 mcp_id: String,
48 tool_filter: ToolFilter,
50 backend_version: Arc<AtomicU64>,
53}
54
55impl ServerHandler for ProxyHandler {
56 fn get_info(&self) -> ServerInfo {
57 self.cached_info.clone()
58 }
59
60 #[tracing::instrument(skip(self, request, context), fields(
61 mcp_id = %self.mcp_id,
62 request = ?request,
63 ))]
64 async fn list_tools(
65 &self,
66 request: Option<PaginatedRequestParams>,
67 context: RequestContext<RoleServer>,
68 ) -> Result<ListToolsResult, ErrorData> {
69 let inner_guard = self.peer.load();
71 let inner = inner_guard.as_ref().ok_or_else(|| {
72 error!("Backend connection is not available (reconnecting)");
73 ErrorData::internal_error(
74 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
75 None,
76 )
77 })?;
78
79 if inner.peer.is_transport_closed() {
81 error!("Backend transport is closed");
82 return Err(ErrorData::internal_error(
83 "Backend connection closed, please retry".to_string(),
84 None,
85 ));
86 }
87
88 match self.capabilities().tools {
90 Some(_) => {
91 tokio::select! {
93 result = inner.peer.list_tools(request) => {
94 match result {
95 Ok(result) => {
96 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
98 result
99 .tools
100 .into_iter()
101 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
102 .collect()
103 } else {
104 result.tools
105 };
106
107 info!(
109 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
110 self.mcp_id,
111 filtered_tools.len(),
112 if self.tool_filter.is_enabled() {
113 " (已过滤)"
114 } else {
115 ""
116 }
117 );
118
119 debug!(
120 "Proxying list_tools response with {} tools",
121 filtered_tools.len()
122 );
123 Ok(ListToolsResult {
124 tools: filtered_tools,
125 next_cursor: result.next_cursor,
126 meta: result.meta, })
128 }
129 Err(err) => {
130 error!("Error listing tools: {:?}", err);
131 Err(ErrorData::internal_error(
132 format!("Error listing tools: {err}"),
133 None,
134 ))
135 }
136 }
137 }
138 _ = context.ct.cancelled() => {
139 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
140 Err(ErrorData::internal_error(
141 "Request cancelled".to_string(),
142 None,
143 ))
144 }
145 }
146 }
147 None => {
148 warn!("Server doesn't support tools capability");
150 Ok(ListToolsResult::default())
151 }
152 }
153 }
154
155 #[tracing::instrument(skip(self, request, context), fields(
156 mcp_id = %self.mcp_id,
157 tool_name = %request.name,
158 tool_arguments = ?request.arguments,
159 ))]
160 async fn call_tool(
161 &self,
162 request: CallToolRequestParams,
163 context: RequestContext<RoleServer>,
164 ) -> Result<CallToolResult, ErrorData> {
165 let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
167 let start = Instant::now();
168
169 info!(
170 "[call_tool:{}] 开始 - 工具: {}, MCP ID: {}",
171 request_id, request.name, self.mcp_id
172 );
173
174 if !self.tool_filter.is_allowed(&request.name) {
176 info!(
177 "[call_tool:{}] 工具被过滤 - MCP ID: {}, 工具: {}",
178 request_id, self.mcp_id, request.name
179 );
180 return Ok(CallToolResult::error(vec![Content::text(format!(
181 "Tool '{}' is not allowed by filter configuration",
182 request.name
183 ))]));
184 }
185
186 let inner_guard = self.peer.load();
188 let inner = match inner_guard.as_ref() {
189 Some(inner) => {
190 let transport_closed = inner.peer.is_transport_closed();
191 info!(
192 "[call_tool:{}] 后端连接存在 - transport_closed: {}",
193 request_id, transport_closed
194 );
195 inner
196 }
197 None => {
198 error!(
199 "[call_tool:{}] 后端连接不可用 (正在重连) - MCP ID: {}",
200 request_id, self.mcp_id
201 );
202 return Ok(CallToolResult::error(vec![Content::text(
203 "Backend connection is not available, reconnecting...",
204 )]));
205 }
206 };
207
208 if inner.peer.is_transport_closed() {
210 error!(
211 "[call_tool:{}] 后端 transport 已关闭 - MCP ID: {}",
212 request_id, self.mcp_id
213 );
214 return Ok(CallToolResult::error(vec![Content::text(
215 "Backend connection closed, please retry",
216 )]));
217 }
218
219 let result = match self.capabilities().tools {
221 Some(_) => {
222 info!(
224 "[call_tool:{}] 发送请求到后端... - 工具: {}, 已耗时: {}ms",
225 request_id,
226 request.name,
227 start.elapsed().as_millis()
228 );
229
230 let call_future = inner.peer.call_tool(request.clone());
232 tokio::pin!(call_future);
233
234 const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
236 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
237 heartbeat_interval.tick().await;
239
240 let call_result = loop {
242 tokio::select! {
243 biased;
245
246 result = &mut call_future => {
247 break result;
248 }
249 _ = context.ct.cancelled() => {
250 let elapsed = start.elapsed();
251 warn!(
252 "[call_tool:{}] 请求被取消 - 工具: {}, 耗时: {}ms, MCP ID: {}",
253 request_id, request.name, elapsed.as_millis(), self.mcp_id
254 );
255 return Ok(CallToolResult::error(vec![Content::text(
256 "Request cancelled"
257 )]));
258 }
259 _ = heartbeat_interval.tick() => {
260 let elapsed = start.elapsed();
262 let transport_closed = inner.peer.is_transport_closed();
263 info!(
264 "[call_tool:{}] 等待后端响应中... - 工具: {}, 已等待: {}ms, \
265 transport_closed: {}, MCP ID: {}",
266 request_id, request.name, elapsed.as_millis(),
267 transport_closed, self.mcp_id
268 );
269 }
270 }
271 };
272
273 let elapsed = start.elapsed();
274 match &call_result {
275 Ok(call_result) => {
276 let is_error = call_result.is_error.unwrap_or(false);
278 info!(
279 "[call_tool:{}] 收到响应 - 工具: {}, 耗时: {}ms, is_error: {}, MCP ID: {}",
280 request_id,
281 request.name,
282 elapsed.as_millis(),
283 is_error,
284 self.mcp_id
285 );
286 if is_error {
287 debug!(
288 "[call_tool:{}] 错误响应内容: {:?}",
289 request_id, call_result.content
290 );
291 }
292 Ok(call_result.clone())
293 }
294 Err(err) => {
295 error!(
296 "[call_tool:{}] 后端返回错误 - 工具: {}, 耗时: {}ms, 错误: {:?}, MCP ID: {}",
297 request_id,
298 request.name,
299 elapsed.as_millis(),
300 err,
301 self.mcp_id
302 );
303 Ok(CallToolResult::error(vec![Content::text(format!(
305 "Error: {err}"
306 ))]))
307 }
308 }
309 }
310 None => {
311 error!(
312 "[call_tool:{}] 服务器不支持 tools capability - MCP ID: {}",
313 request_id, self.mcp_id
314 );
315 Ok(CallToolResult::error(vec![Content::text(
316 "Server doesn't support tools capability",
317 )]))
318 }
319 };
320
321 let total_elapsed = start.elapsed();
322 info!(
323 "[call_tool:{}] 完成 - 工具: {}, 总耗时: {}ms",
324 request_id,
325 request.name,
326 total_elapsed.as_millis()
327 );
328 result
329 }
330
331 async fn list_resources(
332 &self,
333 request: Option<PaginatedRequestParams>,
334 context: RequestContext<RoleServer>,
335 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
336 let inner_guard = self.peer.load();
338 let inner = inner_guard.as_ref().ok_or_else(|| {
339 error!("Backend connection is not available (reconnecting)");
340 ErrorData::internal_error(
341 "Backend connection is not available, reconnecting...".to_string(),
342 None,
343 )
344 })?;
345
346 if inner.peer.is_transport_closed() {
348 error!("Backend transport is closed");
349 return Err(ErrorData::internal_error(
350 "Backend connection closed, please retry".to_string(),
351 None,
352 ));
353 }
354
355 match self.capabilities().resources {
357 Some(_) => {
358 tokio::select! {
359 result = inner.peer.list_resources(request) => {
360 match result {
361 Ok(result) => {
362 info!(
364 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
365 self.mcp_id,
366 result.resources.len()
367 );
368
369 debug!("Proxying list_resources response");
370 Ok(result)
371 }
372 Err(err) => {
373 error!("Error listing resources: {:?}", err);
374 Err(ErrorData::internal_error(
375 format!("Error listing resources: {err}"),
376 None,
377 ))
378 }
379 }
380 }
381 _ = context.ct.cancelled() => {
382 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
383 Err(ErrorData::internal_error(
384 "Request cancelled".to_string(),
385 None,
386 ))
387 }
388 }
389 }
390 None => {
391 warn!("Server doesn't support resources capability");
393 Ok(rmcp::model::ListResourcesResult::default())
394 }
395 }
396 }
397
398 async fn read_resource(
399 &self,
400 request: rmcp::model::ReadResourceRequestParams,
401 context: RequestContext<RoleServer>,
402 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
403 let inner_guard = self.peer.load();
405 let inner = inner_guard.as_ref().ok_or_else(|| {
406 error!("Backend connection is not available (reconnecting)");
407 ErrorData::internal_error(
408 "Backend connection is not available, reconnecting...".to_string(),
409 None,
410 )
411 })?;
412
413 if inner.peer.is_transport_closed() {
415 error!("Backend transport is closed");
416 return Err(ErrorData::internal_error(
417 "Backend connection closed, please retry".to_string(),
418 None,
419 ));
420 }
421
422 match self.capabilities().resources {
424 Some(_) => {
425 tokio::select! {
426 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParams::new(request.uri.clone())) => {
427 match result {
428 Ok(result) => {
429 info!(
431 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
432 self.mcp_id, request.uri
433 );
434
435 debug!("Proxying read_resource response for {}", request.uri);
436 Ok(result)
437 }
438 Err(err) => {
439 error!("Error reading resource: {:?}", err);
440 Err(ErrorData::internal_error(
441 format!("Error reading resource: {err}"),
442 None,
443 ))
444 }
445 }
446 }
447 _ = context.ct.cancelled() => {
448 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
449 Err(ErrorData::internal_error(
450 "Request cancelled".to_string(),
451 None,
452 ))
453 }
454 }
455 }
456 None => {
457 error!("Server doesn't support resources capability");
459 Ok(rmcp::model::ReadResourceResult::new(vec![]))
460 }
461 }
462 }
463
464 async fn list_resource_templates(
465 &self,
466 request: Option<PaginatedRequestParams>,
467 context: RequestContext<RoleServer>,
468 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
469 let inner_guard = self.peer.load();
471 let inner = inner_guard.as_ref().ok_or_else(|| {
472 error!("Backend connection is not available (reconnecting)");
473 ErrorData::internal_error(
474 "Backend connection is not available, reconnecting...".to_string(),
475 None,
476 )
477 })?;
478
479 if inner.peer.is_transport_closed() {
481 error!("Backend transport is closed");
482 return Err(ErrorData::internal_error(
483 "Backend connection closed, please retry".to_string(),
484 None,
485 ));
486 }
487
488 match self.capabilities().resources {
490 Some(_) => {
491 tokio::select! {
492 result = inner.peer.list_resource_templates(request) => {
493 match result {
494 Ok(result) => {
495 debug!("Proxying list_resource_templates response");
496 Ok(result)
497 }
498 Err(err) => {
499 error!("Error listing resource templates: {:?}", err);
500 Err(ErrorData::internal_error(
501 format!("Error listing resource templates: {err}"),
502 None,
503 ))
504 }
505 }
506 }
507 _ = context.ct.cancelled() => {
508 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
509 Err(ErrorData::internal_error(
510 "Request cancelled".to_string(),
511 None,
512 ))
513 }
514 }
515 }
516 None => {
517 warn!("Server doesn't support resources capability");
519 Ok(rmcp::model::ListResourceTemplatesResult::default())
520 }
521 }
522 }
523
524 async fn list_prompts(
525 &self,
526 request: Option<PaginatedRequestParams>,
527 context: RequestContext<RoleServer>,
528 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
529 let inner_guard = self.peer.load();
531 let inner = inner_guard.as_ref().ok_or_else(|| {
532 error!("Backend connection is not available (reconnecting)");
533 ErrorData::internal_error(
534 "Backend connection is not available, reconnecting...".to_string(),
535 None,
536 )
537 })?;
538
539 if inner.peer.is_transport_closed() {
541 error!("Backend transport is closed");
542 return Err(ErrorData::internal_error(
543 "Backend connection closed, please retry".to_string(),
544 None,
545 ));
546 }
547
548 match self.capabilities().prompts {
550 Some(_) => {
551 tokio::select! {
552 result = inner.peer.list_prompts(request) => {
553 match result {
554 Ok(result) => {
555 debug!("Proxying list_prompts response");
556 Ok(result)
557 }
558 Err(err) => {
559 error!("Error listing prompts: {:?}", err);
560 Err(ErrorData::internal_error(
561 format!("Error listing prompts: {err}"),
562 None,
563 ))
564 }
565 }
566 }
567 _ = context.ct.cancelled() => {
568 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
569 Err(ErrorData::internal_error(
570 "Request cancelled".to_string(),
571 None,
572 ))
573 }
574 }
575 }
576 None => {
577 warn!("Server doesn't support prompts capability");
579 Ok(rmcp::model::ListPromptsResult::default())
580 }
581 }
582 }
583
584 async fn get_prompt(
585 &self,
586 request: rmcp::model::GetPromptRequestParams,
587 context: RequestContext<RoleServer>,
588 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
589 let inner_guard = self.peer.load();
591 let inner = inner_guard.as_ref().ok_or_else(|| {
592 error!("Backend connection is not available (reconnecting)");
593 ErrorData::internal_error(
594 "Backend connection is not available, reconnecting...".to_string(),
595 None,
596 )
597 })?;
598
599 if inner.peer.is_transport_closed() {
601 error!("Backend transport is closed");
602 return Err(ErrorData::internal_error(
603 "Backend connection closed, please retry".to_string(),
604 None,
605 ));
606 }
607
608 match self.capabilities().prompts {
610 Some(_) => {
611 tokio::select! {
612 result = inner.peer.get_prompt(request.clone()) => {
613 match result {
614 Ok(result) => {
615 debug!("Proxying get_prompt response");
616 Ok(result)
617 }
618 Err(err) => {
619 error!("Error getting prompt: {:?}", err);
620 Err(ErrorData::internal_error(
621 format!("Error getting prompt: {err}"),
622 None,
623 ))
624 }
625 }
626 }
627 _ = context.ct.cancelled() => {
628 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
629 Err(ErrorData::internal_error(
630 "Request cancelled".to_string(),
631 None,
632 ))
633 }
634 }
635 }
636 None => {
637 warn!("Server doesn't support prompts capability");
639 let messages = Vec::new();
640 Ok(rmcp::model::GetPromptResult::new(messages))
641 }
642 }
643 }
644
645 async fn complete(
646 &self,
647 request: rmcp::model::CompleteRequestParams,
648 context: RequestContext<RoleServer>,
649 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
650 let inner_guard = self.peer.load();
652 let inner = inner_guard.as_ref().ok_or_else(|| {
653 error!("Backend connection is not available (reconnecting)");
654 ErrorData::internal_error(
655 "Backend connection is not available, reconnecting...".to_string(),
656 None,
657 )
658 })?;
659
660 if inner.peer.is_transport_closed() {
662 error!("Backend transport is closed");
663 return Err(ErrorData::internal_error(
664 "Backend connection closed, please retry".to_string(),
665 None,
666 ));
667 }
668
669 tokio::select! {
670 result = inner.peer.complete(request) => {
671 match result {
672 Ok(result) => {
673 debug!("Proxying complete response");
674 Ok(result)
675 }
676 Err(err) => {
677 error!("Error completing: {:?}", err);
678 Err(ErrorData::internal_error(
679 format!("Error completing: {err}"),
680 None,
681 ))
682 }
683 }
684 }
685 _ = context.ct.cancelled() => {
686 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
687 Err(ErrorData::internal_error(
688 "Request cancelled".to_string(),
689 None,
690 ))
691 }
692 }
693 }
694
695 async fn on_progress(
696 &self,
697 notification: rmcp::model::ProgressNotificationParam,
698 _context: NotificationContext<RoleServer>,
699 ) {
700 let inner_guard = self.peer.load();
702 let inner = match inner_guard.as_ref() {
703 Some(inner) => inner,
704 None => {
705 error!("Backend connection is not available, cannot forward progress notification");
706 return;
707 }
708 };
709
710 if inner.peer.is_transport_closed() {
712 error!("Backend transport is closed, cannot forward progress notification");
713 return;
714 }
715
716 match inner.peer.notify_progress(notification).await {
717 Ok(_) => {
718 debug!("Proxying progress notification");
719 }
720 Err(err) => {
721 error!("Error notifying progress: {:?}", err);
722 }
723 }
724 }
725
726 async fn on_cancelled(
727 &self,
728 notification: rmcp::model::CancelledNotificationParam,
729 _context: NotificationContext<RoleServer>,
730 ) {
731 let inner_guard = self.peer.load();
733 let inner = match inner_guard.as_ref() {
734 Some(inner) => inner,
735 None => {
736 error!(
737 "Backend connection is not available, cannot forward cancelled notification"
738 );
739 return;
740 }
741 };
742
743 if inner.peer.is_transport_closed() {
745 error!("Backend transport is closed, cannot forward cancelled notification");
746 return;
747 }
748
749 match inner.peer.notify_cancelled(notification).await {
750 Ok(_) => {
751 debug!("Proxying cancelled notification");
752 }
753 Err(err) => {
754 error!("Error notifying cancelled: {:?}", err);
755 }
756 }
757 }
758}
759
760impl ProxyHandler {
761 #[inline]
763 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
764 &self.cached_info.capabilities
765 }
766
767 fn default_server_info(mcp_id: &str) -> ServerInfo {
769 warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
770 ServerInfo::new(rmcp::model::ServerCapabilities::default())
771 .with_server_info(Implementation::new("MCP Proxy", "0.1.0"))
772 }
773
774 fn extract_server_info(
776 client: &RunningService<RoleClient, ClientInfo>,
777 mcp_id: &str,
778 ) -> ServerInfo {
779 client
780 .peer_info()
781 .map(|peer_info| {
782 ServerInfo::new(peer_info.capabilities.clone())
783 .with_protocol_version(peer_info.protocol_version.clone())
784 .with_server_info(Implementation::new(
785 peer_info.server_info.name.clone(),
786 peer_info.server_info.version.clone(),
787 ))
788 .with_instructions(peer_info.instructions.clone().unwrap_or_default())
789 })
790 .unwrap_or_else(|| Self::default_server_info(mcp_id))
791 }
792
793 pub fn new_disconnected(
796 mcp_id: String,
797 tool_filter: ToolFilter,
798 default_info: ServerInfo,
799 ) -> Self {
800 info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
801
802 if tool_filter.is_enabled() {
804 if let Some(ref allow_list) = tool_filter.allow_tools {
805 info!(
806 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
807 mcp_id, allow_list
808 );
809 }
810 if let Some(ref deny_list) = tool_filter.deny_tools {
811 info!(
812 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
813 mcp_id, deny_list
814 );
815 }
816 }
817
818 Self {
819 peer: Arc::new(ArcSwapOption::empty()),
820 cached_info: default_info,
821 mcp_id,
822 tool_filter,
823 backend_version: Arc::new(AtomicU64::new(0)), }
825 }
826
827 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
828 Self::with_mcp_id(client, "unknown".to_string())
829 }
830
831 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
832 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
833 }
834
835 pub fn with_tool_filter(
837 client: RunningService<RoleClient, ClientInfo>,
838 mcp_id: String,
839 tool_filter: ToolFilter,
840 ) -> Self {
841 use std::ops::Deref;
842
843 let cached_info = Self::extract_server_info(&client, &mcp_id);
845
846 let peer = client.deref().clone();
848
849 if tool_filter.is_enabled() {
851 if let Some(ref allow_list) = tool_filter.allow_tools {
852 info!(
853 "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
854 mcp_id, allow_list
855 );
856 }
857 if let Some(ref deny_list) = tool_filter.deny_tools {
858 info!(
859 "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
860 mcp_id, deny_list
861 );
862 }
863 }
864
865 let inner = PeerInner {
867 peer,
868 _running: Arc::new(client),
869 };
870
871 Self {
872 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
873 cached_info,
874 mcp_id,
875 tool_filter,
876 backend_version: Arc::new(AtomicU64::new(1)), }
878 }
879
880 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
886 use std::ops::Deref;
887
888 match new_client {
889 Some(client) => {
890 let peer = client.deref().clone();
891 let inner = PeerInner {
892 peer,
893 _running: Arc::new(client),
894 };
895 self.peer.store(Some(Arc::new(inner)));
896 info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
897 }
898 None => {
899 self.peer.store(None);
900 info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
901 }
902 }
903
904 let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
906 info!(
907 "[ProxyHandler] 后端版本更新: {} - MCP ID: {}",
908 new_version, self.mcp_id
909 );
910 }
911
912 pub fn is_backend_available(&self) -> bool {
914 let inner_guard = self.peer.load();
915 match inner_guard.as_ref() {
916 Some(inner) => !inner.peer.is_transport_closed(),
917 None => false,
918 }
919 }
920
921 pub async fn is_mcp_server_ready(&self) -> bool {
923 !self.is_terminated_async().await
924 }
925
926 pub fn is_terminated(&self) -> bool {
928 !self.is_backend_available()
929 }
930
931 pub async fn is_terminated_async(&self) -> bool {
933 let inner_guard = self.peer.load();
935 let inner = match inner_guard.as_ref() {
936 Some(inner) => inner,
937 None => return true,
938 };
939
940 if inner.peer.is_transport_closed() {
942 return true;
943 }
944
945 match inner.peer.list_tools(None).await {
947 Ok(_) => {
948 debug!("后端连接状态检查: 正常");
949 false
950 }
951 Err(e) => {
952 info!("后端连接状态检查: 已断开,原因: {e}");
953 true
954 }
955 }
956 }
957
958 pub fn mcp_id(&self) -> &str {
960 &self.mcp_id
961 }
962
963 pub fn get_backend_version(&self) -> u64 {
971 self.backend_version.load(Ordering::SeqCst)
972 }
973
974 pub fn swap_backend_from_connection(
983 &self,
984 conn: Option<crate::client::StreamClientConnection>,
985 ) {
986 match conn {
987 Some(c) => {
988 let running = c.into_running_service();
989 self.swap_backend(Some(running));
990 }
991 None => {
992 self.swap_backend(None);
993 }
994 }
995 }
996}