1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3use rmcp::{
7 ErrorData, RoleClient, RoleServer, ServerHandler,
8 model::{
9 CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10 PaginatedRequestParam, ProtocolVersion, ServerInfo,
11 },
12 service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Instant, SystemTime};
17use tracing::{debug, error, info, warn};
18
19static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22#[derive(Debug)]
25struct PeerInner {
26 peer: Peer<RoleClient>,
28 #[allow(dead_code)]
30 _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33#[derive(Clone, Debug)]
38pub struct SseHandler {
39 peer: Arc<ArcSwapOption<PeerInner>>,
42 cached_info: ServerInfo,
44 mcp_id: String,
46 tool_filter: ToolFilter,
48}
49
50impl ServerHandler for SseHandler {
51 fn get_info(&self) -> ServerInfo {
52 self.cached_info.clone()
53 }
54
55 #[tracing::instrument(skip(self, request, context), fields(
56 mcp_id = %self.mcp_id,
57 request = ?request,
58 ))]
59 async fn list_tools(
60 &self,
61 request: Option<PaginatedRequestParam>,
62 context: RequestContext<RoleServer>,
63 ) -> Result<ListToolsResult, ErrorData> {
64 let inner_guard = self.peer.load();
66 let inner = inner_guard.as_ref().ok_or_else(|| {
67 error!("Backend connection is not available (reconnecting)");
68 ErrorData::internal_error(
69 "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
70 None,
71 )
72 })?;
73
74 if inner.peer.is_transport_closed() {
76 error!("Backend transport is closed");
77 return Err(ErrorData::internal_error(
78 "Backend connection closed, please retry".to_string(),
79 None,
80 ));
81 }
82
83 match self.capabilities().tools {
85 Some(_) => {
86 tokio::select! {
88 result = inner.peer.list_tools(request) => {
89 match result {
90 Ok(result) => {
91 let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
93 result
94 .tools
95 .into_iter()
96 .filter(|tool| self.tool_filter.is_allowed(&tool.name))
97 .collect()
98 } else {
99 result.tools
100 };
101
102 info!(
104 "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
105 self.mcp_id,
106 filtered_tools.len(),
107 if self.tool_filter.is_enabled() {
108 " (已过滤)"
109 } else {
110 ""
111 }
112 );
113
114 debug!(
115 "Proxying list_tools response with {} tools",
116 filtered_tools.len()
117 );
118 Ok(ListToolsResult {
119 tools: filtered_tools,
120 next_cursor: result.next_cursor,
121 })
122 }
123 Err(err) => {
124 error!("Error listing tools: {:?}", err);
125 Err(ErrorData::internal_error(
126 format!("Error listing tools: {err}"),
127 None,
128 ))
129 }
130 }
131 }
132 _ = context.ct.cancelled() => {
133 info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
134 Err(ErrorData::internal_error(
135 "Request cancelled".to_string(),
136 None,
137 ))
138 }
139 }
140 }
141 None => {
142 warn!("Server doesn't support tools capability");
144 Ok(ListToolsResult::default())
145 }
146 }
147 }
148
149 #[tracing::instrument(skip(self, request, context), fields(
150 mcp_id = %self.mcp_id,
151 tool_name = %request.name,
152 tool_arguments = ?request.arguments,
153 ))]
154 async fn call_tool(
155 &self,
156 request: CallToolRequestParam,
157 context: RequestContext<RoleServer>,
158 ) -> Result<CallToolResult, ErrorData> {
159 let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
161 let start = Instant::now();
162 let start_time = SystemTime::now();
163
164 info!(
165 "[call_tool:{}] 开始 - 工具: {}, MCP ID: {}, 时间: {:?}",
166 request_id, request.name, self.mcp_id, start_time
167 );
168
169 if !self.tool_filter.is_allowed(&request.name) {
171 info!(
172 "[call_tool:{}] 工具被过滤 - MCP ID: {}, 工具: {}",
173 request_id, self.mcp_id, request.name
174 );
175 return Ok(CallToolResult::error(vec![Content::text(format!(
176 "Tool '{}' is not allowed by filter configuration",
177 request.name
178 ))]));
179 }
180
181 let inner_guard = self.peer.load();
183 let inner = match inner_guard.as_ref() {
184 Some(inner) => {
185 let transport_closed = inner.peer.is_transport_closed();
186 info!(
187 "[call_tool:{}] 后端连接存在 - transport_closed: {}",
188 request_id, transport_closed
189 );
190 inner
191 }
192 None => {
193 error!(
194 "[call_tool:{}] 后端连接不可用 (正在重连) - MCP ID: {}",
195 request_id, self.mcp_id
196 );
197 return Ok(CallToolResult::error(vec![Content::text(
198 "Backend connection is not available, reconnecting...",
199 )]));
200 }
201 };
202
203 if inner.peer.is_transport_closed() {
205 error!(
206 "[call_tool:{}] 后端 transport 已关闭 - MCP ID: {}",
207 request_id, self.mcp_id
208 );
209 return Ok(CallToolResult::error(vec![Content::text(
210 "Backend connection closed, please retry",
211 )]));
212 }
213
214 let result = match self.capabilities().tools {
216 Some(_) => {
217 info!(
219 "[call_tool:{}] 发送请求到后端... - 工具: {}, 已耗时: {}ms",
220 request_id,
221 request.name,
222 start.elapsed().as_millis()
223 );
224
225 tokio::select! {
227 result = inner.peer.call_tool(request.clone()) => {
228 let elapsed = start.elapsed();
229 match &result {
230 Ok(call_result) => {
231 let is_error = call_result.is_error.unwrap_or(false);
233 info!(
234 "[call_tool:{}] 收到响应 - 工具: {}, 耗时: {}ms, is_error: {}, MCP ID: {}",
235 request_id, request.name, elapsed.as_millis(), is_error, self.mcp_id
236 );
237 if is_error {
238 debug!(
240 "[call_tool:{}] 错误响应内容: {:?}",
241 request_id, call_result.content
242 );
243 }
244 Ok(call_result.clone())
245 }
246 Err(err) => {
247 error!(
248 "[call_tool:{}] 后端返回错误 - 工具: {}, 耗时: {}ms, 错误: {:?}, MCP ID: {}",
249 request_id, request.name, elapsed.as_millis(), err, self.mcp_id
250 );
251 Ok(CallToolResult::error(vec![Content::text(format!(
253 "Error: {err}"
254 ))]))
255 }
256 }
257 }
258 _ = context.ct.cancelled() => {
259 let elapsed = start.elapsed();
260 warn!(
261 "[call_tool:{}] 请求被取消 - 工具: {}, 耗时: {}ms, MCP ID: {}",
262 request_id, request.name, elapsed.as_millis(), self.mcp_id
263 );
264 Ok(CallToolResult::error(vec![Content::text(
265 "Request cancelled"
266 )]))
267 }
268 }
269 }
270 None => {
271 error!(
272 "[call_tool:{}] 服务器不支持 tools capability - MCP ID: {}",
273 request_id, self.mcp_id
274 );
275 Ok(CallToolResult::error(vec![Content::text(
276 "Server doesn't support tools capability",
277 )]))
278 }
279 };
280
281 let total_elapsed = start.elapsed();
282 info!(
283 "[call_tool:{}] 完成 - 工具: {}, 总耗时: {}ms",
284 request_id,
285 request.name,
286 total_elapsed.as_millis()
287 );
288 result
289 }
290
291 async fn list_resources(
292 &self,
293 request: Option<PaginatedRequestParam>,
294 context: RequestContext<RoleServer>,
295 ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
296 let inner_guard = self.peer.load();
298 let inner = inner_guard.as_ref().ok_or_else(|| {
299 error!("Backend connection is not available (reconnecting)");
300 ErrorData::internal_error(
301 "Backend connection is not available, reconnecting...".to_string(),
302 None,
303 )
304 })?;
305
306 if inner.peer.is_transport_closed() {
308 error!("Backend transport is closed");
309 return Err(ErrorData::internal_error(
310 "Backend connection closed, please retry".to_string(),
311 None,
312 ));
313 }
314
315 match self.capabilities().resources {
317 Some(_) => {
318 tokio::select! {
319 result = inner.peer.list_resources(request) => {
320 match result {
321 Ok(result) => {
322 info!(
324 "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
325 self.mcp_id,
326 result.resources.len()
327 );
328
329 debug!("Proxying list_resources response");
330 Ok(result)
331 }
332 Err(err) => {
333 error!("Error listing resources: {:?}", err);
334 Err(ErrorData::internal_error(
335 format!("Error listing resources: {err}"),
336 None,
337 ))
338 }
339 }
340 }
341 _ = context.ct.cancelled() => {
342 info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
343 Err(ErrorData::internal_error(
344 "Request cancelled".to_string(),
345 None,
346 ))
347 }
348 }
349 }
350 None => {
351 warn!("Server doesn't support resources capability");
353 Ok(rmcp::model::ListResourcesResult::default())
354 }
355 }
356 }
357
358 async fn read_resource(
359 &self,
360 request: rmcp::model::ReadResourceRequestParam,
361 context: RequestContext<RoleServer>,
362 ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
363 let inner_guard = self.peer.load();
365 let inner = inner_guard.as_ref().ok_or_else(|| {
366 error!("Backend connection is not available (reconnecting)");
367 ErrorData::internal_error(
368 "Backend connection is not available, reconnecting...".to_string(),
369 None,
370 )
371 })?;
372
373 if inner.peer.is_transport_closed() {
375 error!("Backend transport is closed");
376 return Err(ErrorData::internal_error(
377 "Backend connection closed, please retry".to_string(),
378 None,
379 ));
380 }
381
382 match self.capabilities().resources {
384 Some(_) => {
385 tokio::select! {
386 result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
387 uri: request.uri.clone(),
388 }) => {
389 match result {
390 Ok(result) => {
391 info!(
393 "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
394 self.mcp_id, request.uri
395 );
396
397 debug!("Proxying read_resource response for {}", request.uri);
398 Ok(result)
399 }
400 Err(err) => {
401 error!("Error reading resource: {:?}", err);
402 Err(ErrorData::internal_error(
403 format!("Error reading resource: {err}"),
404 None,
405 ))
406 }
407 }
408 }
409 _ = context.ct.cancelled() => {
410 info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
411 Err(ErrorData::internal_error(
412 "Request cancelled".to_string(),
413 None,
414 ))
415 }
416 }
417 }
418 None => {
419 error!("Server doesn't support resources capability");
421 Ok(rmcp::model::ReadResourceResult {
422 contents: Vec::new(),
423 })
424 }
425 }
426 }
427
428 async fn list_resource_templates(
429 &self,
430 request: Option<PaginatedRequestParam>,
431 context: RequestContext<RoleServer>,
432 ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
433 let inner_guard = self.peer.load();
435 let inner = inner_guard.as_ref().ok_or_else(|| {
436 error!("Backend connection is not available (reconnecting)");
437 ErrorData::internal_error(
438 "Backend connection is not available, reconnecting...".to_string(),
439 None,
440 )
441 })?;
442
443 if inner.peer.is_transport_closed() {
445 error!("Backend transport is closed");
446 return Err(ErrorData::internal_error(
447 "Backend connection closed, please retry".to_string(),
448 None,
449 ));
450 }
451
452 match self.capabilities().resources {
454 Some(_) => {
455 tokio::select! {
456 result = inner.peer.list_resource_templates(request) => {
457 match result {
458 Ok(result) => {
459 debug!("Proxying list_resource_templates response");
460 Ok(result)
461 }
462 Err(err) => {
463 error!("Error listing resource templates: {:?}", err);
464 Err(ErrorData::internal_error(
465 format!("Error listing resource templates: {err}"),
466 None,
467 ))
468 }
469 }
470 }
471 _ = context.ct.cancelled() => {
472 info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
473 Err(ErrorData::internal_error(
474 "Request cancelled".to_string(),
475 None,
476 ))
477 }
478 }
479 }
480 None => {
481 warn!("Server doesn't support resources capability");
483 Ok(rmcp::model::ListResourceTemplatesResult::default())
484 }
485 }
486 }
487
488 async fn list_prompts(
489 &self,
490 request: Option<PaginatedRequestParam>,
491 context: RequestContext<RoleServer>,
492 ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
493 let inner_guard = self.peer.load();
495 let inner = inner_guard.as_ref().ok_or_else(|| {
496 error!("Backend connection is not available (reconnecting)");
497 ErrorData::internal_error(
498 "Backend connection is not available, reconnecting...".to_string(),
499 None,
500 )
501 })?;
502
503 if inner.peer.is_transport_closed() {
505 error!("Backend transport is closed");
506 return Err(ErrorData::internal_error(
507 "Backend connection closed, please retry".to_string(),
508 None,
509 ));
510 }
511
512 match self.capabilities().prompts {
514 Some(_) => {
515 tokio::select! {
516 result = inner.peer.list_prompts(request) => {
517 match result {
518 Ok(result) => {
519 debug!("Proxying list_prompts response");
520 Ok(result)
521 }
522 Err(err) => {
523 error!("Error listing prompts: {:?}", err);
524 Err(ErrorData::internal_error(
525 format!("Error listing prompts: {err}"),
526 None,
527 ))
528 }
529 }
530 }
531 _ = context.ct.cancelled() => {
532 info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
533 Err(ErrorData::internal_error(
534 "Request cancelled".to_string(),
535 None,
536 ))
537 }
538 }
539 }
540 None => {
541 warn!("Server doesn't support prompts capability");
543 Ok(rmcp::model::ListPromptsResult::default())
544 }
545 }
546 }
547
548 async fn get_prompt(
549 &self,
550 request: rmcp::model::GetPromptRequestParam,
551 context: RequestContext<RoleServer>,
552 ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
553 let inner_guard = self.peer.load();
555 let inner = inner_guard.as_ref().ok_or_else(|| {
556 error!("Backend connection is not available (reconnecting)");
557 ErrorData::internal_error(
558 "Backend connection is not available, reconnecting...".to_string(),
559 None,
560 )
561 })?;
562
563 if inner.peer.is_transport_closed() {
565 error!("Backend transport is closed");
566 return Err(ErrorData::internal_error(
567 "Backend connection closed, please retry".to_string(),
568 None,
569 ));
570 }
571
572 match self.capabilities().prompts {
574 Some(_) => {
575 tokio::select! {
576 result = inner.peer.get_prompt(request.clone()) => {
577 match result {
578 Ok(result) => {
579 debug!("Proxying get_prompt response");
580 Ok(result)
581 }
582 Err(err) => {
583 error!("Error getting prompt: {:?}", err);
584 Err(ErrorData::internal_error(
585 format!("Error getting prompt: {err}"),
586 None,
587 ))
588 }
589 }
590 }
591 _ = context.ct.cancelled() => {
592 info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
593 Err(ErrorData::internal_error(
594 "Request cancelled".to_string(),
595 None,
596 ))
597 }
598 }
599 }
600 None => {
601 warn!("Server doesn't support prompts capability");
603 Ok(rmcp::model::GetPromptResult {
604 description: None,
605 messages: Vec::new(),
606 })
607 }
608 }
609 }
610
611 async fn complete(
612 &self,
613 request: rmcp::model::CompleteRequestParam,
614 context: RequestContext<RoleServer>,
615 ) -> Result<rmcp::model::CompleteResult, ErrorData> {
616 let inner_guard = self.peer.load();
618 let inner = inner_guard.as_ref().ok_or_else(|| {
619 error!("Backend connection is not available (reconnecting)");
620 ErrorData::internal_error(
621 "Backend connection is not available, reconnecting...".to_string(),
622 None,
623 )
624 })?;
625
626 if inner.peer.is_transport_closed() {
628 error!("Backend transport is closed");
629 return Err(ErrorData::internal_error(
630 "Backend connection closed, please retry".to_string(),
631 None,
632 ));
633 }
634
635 tokio::select! {
636 result = inner.peer.complete(request) => {
637 match result {
638 Ok(result) => {
639 debug!("Proxying complete response");
640 Ok(result)
641 }
642 Err(err) => {
643 error!("Error completing: {:?}", err);
644 Err(ErrorData::internal_error(
645 format!("Error completing: {err}"),
646 None,
647 ))
648 }
649 }
650 }
651 _ = context.ct.cancelled() => {
652 info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
653 Err(ErrorData::internal_error(
654 "Request cancelled".to_string(),
655 None,
656 ))
657 }
658 }
659 }
660
661 async fn on_progress(
662 &self,
663 notification: rmcp::model::ProgressNotificationParam,
664 _context: NotificationContext<RoleServer>,
665 ) {
666 let inner_guard = self.peer.load();
668 let inner = match inner_guard.as_ref() {
669 Some(inner) => inner,
670 None => {
671 error!("Backend connection is not available, cannot forward progress notification");
672 return;
673 }
674 };
675
676 if inner.peer.is_transport_closed() {
678 error!("Backend transport is closed, cannot forward progress notification");
679 return;
680 }
681
682 match inner.peer.notify_progress(notification).await {
683 Ok(_) => {
684 debug!("Proxying progress notification");
685 }
686 Err(err) => {
687 error!("Error notifying progress: {:?}", err);
688 }
689 }
690 }
691
692 async fn on_cancelled(
693 &self,
694 notification: rmcp::model::CancelledNotificationParam,
695 _context: NotificationContext<RoleServer>,
696 ) {
697 let inner_guard = self.peer.load();
699 let inner = match inner_guard.as_ref() {
700 Some(inner) => inner,
701 None => {
702 error!(
703 "Backend connection is not available, cannot forward cancelled notification"
704 );
705 return;
706 }
707 };
708
709 if inner.peer.is_transport_closed() {
711 error!("Backend transport is closed, cannot forward cancelled notification");
712 return;
713 }
714
715 match inner.peer.notify_cancelled(notification).await {
716 Ok(_) => {
717 debug!("Proxying cancelled notification");
718 }
719 Err(err) => {
720 error!("Error notifying cancelled: {:?}", err);
721 }
722 }
723 }
724}
725
726impl SseHandler {
727 #[inline]
729 fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
730 &self.cached_info.capabilities
731 }
732
733 fn default_server_info(mcp_id: &str) -> ServerInfo {
735 warn!("[SseHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
736 ServerInfo {
737 protocol_version: ProtocolVersion::V_2024_11_05,
738 server_info: Implementation {
739 name: "MCP Proxy".to_string(),
740 version: "0.1.0".to_string(),
741 title: None,
742 website_url: None,
743 icons: None,
744 },
745 instructions: None,
746 capabilities: Default::default(),
747 }
748 }
749
750 fn extract_server_info(
752 client: &RunningService<RoleClient, ClientInfo>,
753 mcp_id: &str,
754 ) -> ServerInfo {
755 client
756 .peer_info()
757 .map(|peer_info| ServerInfo {
758 protocol_version: peer_info.protocol_version.clone(),
759 server_info: Implementation {
760 name: peer_info.server_info.name.clone(),
761 version: peer_info.server_info.version.clone(),
762 title: None,
763 website_url: None,
764 icons: None,
765 },
766 instructions: peer_info.instructions.clone(),
767 capabilities: peer_info.capabilities.clone(),
768 })
769 .unwrap_or_else(|| Self::default_server_info(mcp_id))
770 }
771
772 pub fn new_disconnected(
775 mcp_id: String,
776 tool_filter: ToolFilter,
777 default_info: ServerInfo,
778 ) -> Self {
779 info!("[SseHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
780
781 if tool_filter.is_enabled() {
783 if let Some(ref allow_list) = tool_filter.allow_tools {
784 info!(
785 "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
786 mcp_id, allow_list
787 );
788 }
789 if let Some(ref deny_list) = tool_filter.deny_tools {
790 info!(
791 "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
792 mcp_id, deny_list
793 );
794 }
795 }
796
797 Self {
798 peer: Arc::new(ArcSwapOption::empty()),
799 cached_info: default_info,
800 mcp_id,
801 tool_filter,
802 }
803 }
804
805 pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
806 Self::with_mcp_id(client, "unknown".to_string())
807 }
808
809 pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
810 Self::with_tool_filter(client, mcp_id, ToolFilter::default())
811 }
812
813 pub fn with_tool_filter(
815 client: RunningService<RoleClient, ClientInfo>,
816 mcp_id: String,
817 tool_filter: ToolFilter,
818 ) -> Self {
819 use std::ops::Deref;
820
821 let cached_info = Self::extract_server_info(&client, &mcp_id);
823
824 let peer = client.deref().clone();
826
827 if tool_filter.is_enabled() {
829 if let Some(ref allow_list) = tool_filter.allow_tools {
830 info!(
831 "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
832 mcp_id, allow_list
833 );
834 }
835 if let Some(ref deny_list) = tool_filter.deny_tools {
836 info!(
837 "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
838 mcp_id, deny_list
839 );
840 }
841 }
842
843 let inner = PeerInner {
845 peer,
846 _running: Arc::new(client),
847 };
848
849 Self {
850 peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
851 cached_info,
852 mcp_id,
853 tool_filter,
854 }
855 }
856
857 pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
861 use std::ops::Deref;
862
863 match new_client {
864 Some(client) => {
865 let peer = client.deref().clone();
866 let inner = PeerInner {
867 peer,
868 _running: Arc::new(client),
869 };
870 self.peer.store(Some(Arc::new(inner)));
871 info!("[SseHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
872 }
873 None => {
874 self.peer.store(None);
875 info!("[SseHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
876 }
877 }
878 }
879
880 pub fn is_backend_available(&self) -> bool {
882 let inner_guard = self.peer.load();
883 match inner_guard.as_ref() {
884 Some(inner) => !inner.peer.is_transport_closed(),
885 None => false,
886 }
887 }
888
889 pub async fn is_mcp_server_ready(&self) -> bool {
891 !self.is_terminated_async().await
892 }
893
894 pub fn is_terminated(&self) -> bool {
896 !self.is_backend_available()
897 }
898
899 pub async fn is_terminated_async(&self) -> bool {
901 let inner_guard = self.peer.load();
903 let inner = match inner_guard.as_ref() {
904 Some(inner) => inner,
905 None => return true,
906 };
907
908 if inner.peer.is_transport_closed() {
910 return true;
911 }
912
913 match inner.peer.list_tools(None).await {
915 Ok(_) => {
916 debug!("后端连接状态检查: 正常");
917 false
918 }
919 Err(e) => {
920 info!("后端连接状态检查: 已断开,原因: {e}");
921 true
922 }
923 }
924 }
925
926 pub fn mcp_id(&self) -> &str {
928 &self.mcp_id
929 }
930
931 pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
940 match conn {
941 Some(c) => {
942 let running = c.into_running_service();
943 self.swap_backend(Some(running));
944 }
945 None => {
946 self.swap_backend(None);
947 }
948 }
949 }
950}