mcp_sse_proxy/
sse_handler.rs

1use tracing::{debug, info, warn, error};
2/**
3 * Create a local SSE server that proxies requests to a stdio MCP server.
4 */
5use rmcp::{
6    ErrorData, RoleClient, RoleServer, ServerHandler,
7    model::{
8        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
9        PaginatedRequestParam, ServerInfo,
10    },
11    service::{NotificationContext, Peer, RequestContext, RunningService},
12};
13use std::sync::Arc;
14use arc_swap::ArcSwapOption;
15pub use mcp_common::ToolFilter;
16
17/// 包装后端连接和运行服务
18/// 用于 ArcSwap 热替换
19#[derive(Debug)]
20struct PeerInner {
21    /// Peer 用于发送请求
22    peer: Peer<RoleClient>,
23    /// 保持 RunningService 的所有权,确保服务生命周期
24    #[allow(dead_code)]
25    _running: Arc<RunningService<RoleClient, ClientInfo>>,
26}
27
28/// A SSE proxy handler that forwards requests to a client based on the server's capabilities
29/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
30///
31/// **SSE 模式**:使用 rmcp 0.10,稳定的 SSE 传输协议
32#[derive(Clone, Debug)]
33pub struct SseHandler {
34    /// 后端连接(ArcSwap 支持无锁原子替换)
35    /// None 表示后端断开/重连中
36    peer: Arc<ArcSwapOption<PeerInner>>,
37    /// 缓存的服务器信息(保持不变,重连后应一致)
38    cached_info: ServerInfo,
39    /// MCP ID 用于日志记录
40    mcp_id: String,
41    /// 工具过滤配置
42    tool_filter: ToolFilter,
43}
44
45impl ServerHandler for SseHandler {
46    fn get_info(&self) -> ServerInfo {
47        self.cached_info.clone()
48    }
49
50    #[tracing::instrument(skip(self, request, context), fields(
51        mcp_id = %self.mcp_id,
52        request = ?request,
53    ))]
54    async fn list_tools(
55        &self,
56        request: Option<PaginatedRequestParam>,
57        context: RequestContext<RoleServer>,
58    ) -> Result<ListToolsResult, ErrorData> {
59        // 原子加载后端连接
60        let inner_guard = self.peer.load();
61        let inner = inner_guard.as_ref().ok_or_else(|| {
62            error!("Backend connection is not available (reconnecting)");
63            ErrorData::internal_error(
64                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
65                None,
66            )
67        })?;
68
69        // 检查后端连接是否已关闭
70        if inner.peer.is_transport_closed() {
71            error!("Backend transport is closed");
72            return Err(ErrorData::internal_error(
73                "Backend connection closed, please retry".to_string(),
74                None,
75            ));
76        }
77
78        // Check if the server has tools capability and forward the request
79        match self.capabilities().tools {
80            Some(_) => {
81                // 使用 tokio::select! 同时等待取消和结果
82                tokio::select! {
83                    result = inner.peer.list_tools(request) => {
84                        match result {
85                            Ok(result) => {
86                                // 根据过滤配置过滤工具列表
87                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
88                                    result
89                                        .tools
90                                        .into_iter()
91                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
92                                        .collect()
93                                } else {
94                                    result.tools
95                                };
96
97                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
98                                info!(
99                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
100                                    self.mcp_id,
101                                    filtered_tools.len(),
102                                    if self.tool_filter.is_enabled() {
103                                        " (已过滤)"
104                                    } else {
105                                        ""
106                                    }
107                                );
108
109                                debug!(
110                                    "Proxying list_tools response with {} tools",
111                                    filtered_tools.len()
112                                );
113                                Ok(ListToolsResult {
114                                    tools: filtered_tools,
115                                    next_cursor: result.next_cursor,
116                                })
117                            }
118                            Err(err) => {
119                                error!("Error listing tools: {:?}", err);
120                                Err(ErrorData::internal_error(
121                                    format!("Error listing tools: {err}"),
122                                    None,
123                                ))
124                            }
125                        }
126                    }
127                    _ = context.ct.cancelled() => {
128                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
129                        Err(ErrorData::internal_error(
130                            "Request cancelled".to_string(),
131                            None,
132                        ))
133                    }
134                }
135            }
136            None => {
137                // Server doesn't support tools, return empty list
138                warn!("Server doesn't support tools capability");
139                Ok(ListToolsResult::default())
140            }
141        }
142    }
143
144    #[tracing::instrument(skip(self, request, context), fields(
145        mcp_id = %self.mcp_id,
146        tool_name = %request.name,
147        tool_arguments = ?request.arguments,
148    ))]
149    async fn call_tool(
150        &self,
151        request: CallToolRequestParam,
152        context: RequestContext<RoleServer>,
153    ) -> Result<CallToolResult, ErrorData> {
154        // 首先检查工具是否被过滤
155        if !self.tool_filter.is_allowed(&request.name) {
156            info!(
157                "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
158                self.mcp_id, request.name
159            );
160            return Ok(CallToolResult::error(vec![Content::text(format!(
161                "Tool '{}' is not allowed by filter configuration",
162                request.name
163            ))]));
164        }
165
166        // 原子加载后端连接
167        let inner_guard = self.peer.load();
168        let inner = match inner_guard.as_ref() {
169            Some(inner) => inner,
170            None => {
171                error!("Backend connection is not available (reconnecting)");
172                return Ok(CallToolResult::error(vec![Content::text(
173                    "Backend connection is not available, reconnecting..."
174                )]));
175            }
176        };
177
178        // 检查后端连接是否已关闭
179        if inner.peer.is_transport_closed() {
180            error!("Backend transport is closed");
181            return Ok(CallToolResult::error(vec![Content::text(
182                "Backend connection closed, please retry",
183            )]));
184        }
185
186        // Check if the server has tools capability and forward the request
187        match self.capabilities().tools {
188            Some(_) => {
189                // 使用 tokio::select! 同时等待取消和结果
190                tokio::select! {
191                    result = inner.peer.call_tool(request.clone()) => {
192                        match result {
193                            Ok(result) => {
194                                // 记录工具调用结果,这些结果会通过 SSE 推送给客户端
195                                info!(
196                                    "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}",
197                                    self.mcp_id, request.name
198                                );
199
200                                debug!("Tool call succeeded");
201                                Ok(result)
202                            }
203                            Err(err) => {
204                                error!("Error calling tool: {:?}", err);
205                                // Return an error result instead of propagating the error
206                                Ok(CallToolResult::error(vec![Content::text(format!(
207                                    "Error: {err}"
208                                ))]))
209                            }
210                        }
211                    }
212                    _ = context.ct.cancelled() => {
213                        info!(
214                            "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}",
215                            self.mcp_id, request.name
216                        );
217                        Ok(CallToolResult::error(vec![Content::text(
218                            "Request cancelled"
219                        )]))
220                    }
221                }
222            }
223            None => {
224                error!("Server doesn't support tools capability");
225                Ok(CallToolResult::error(vec![Content::text(
226                    "Server doesn't support tools capability",
227                )]))
228            }
229        }
230    }
231
232    async fn list_resources(
233        &self,
234        request: Option<PaginatedRequestParam>,
235        context: RequestContext<RoleServer>,
236    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
237        // 原子加载后端连接
238        let inner_guard = self.peer.load();
239        let inner = inner_guard.as_ref().ok_or_else(|| {
240            error!("Backend connection is not available (reconnecting)");
241            ErrorData::internal_error(
242                "Backend connection is not available, reconnecting...".to_string(),
243                None,
244            )
245        })?;
246
247        // 检查后端连接是否已关闭
248        if inner.peer.is_transport_closed() {
249            error!("Backend transport is closed");
250            return Err(ErrorData::internal_error(
251                "Backend connection closed, please retry".to_string(),
252                None,
253            ));
254        }
255
256        // Check if the server has resources capability and forward the request
257        match self.capabilities().resources {
258            Some(_) => {
259                tokio::select! {
260                    result = inner.peer.list_resources(request) => {
261                        match result {
262                            Ok(result) => {
263                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
264                                info!(
265                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
266                                    self.mcp_id,
267                                    result.resources.len()
268                                );
269
270                                debug!("Proxying list_resources response");
271                                Ok(result)
272                            }
273                            Err(err) => {
274                                error!("Error listing resources: {:?}", err);
275                                Err(ErrorData::internal_error(
276                                    format!("Error listing resources: {err}"),
277                                    None,
278                                ))
279                            }
280                        }
281                    }
282                    _ = context.ct.cancelled() => {
283                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
284                        Err(ErrorData::internal_error(
285                            "Request cancelled".to_string(),
286                            None,
287                        ))
288                    }
289                }
290            }
291            None => {
292                // Server doesn't support resources, return empty list
293                warn!("Server doesn't support resources capability");
294                Ok(rmcp::model::ListResourcesResult::default())
295            }
296        }
297    }
298
299    async fn read_resource(
300        &self,
301        request: rmcp::model::ReadResourceRequestParam,
302        context: RequestContext<RoleServer>,
303    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
304        // 原子加载后端连接
305        let inner_guard = self.peer.load();
306        let inner = inner_guard.as_ref().ok_or_else(|| {
307            error!("Backend connection is not available (reconnecting)");
308            ErrorData::internal_error(
309                "Backend connection is not available, reconnecting...".to_string(),
310                None,
311            )
312        })?;
313
314        // 检查后端连接是否已关闭
315        if inner.peer.is_transport_closed() {
316            error!("Backend transport is closed");
317            return Err(ErrorData::internal_error(
318                "Backend connection closed, please retry".to_string(),
319                None,
320            ));
321        }
322
323        // Check if the server has resources capability and forward the request
324        match self.capabilities().resources {
325            Some(_) => {
326                tokio::select! {
327                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
328                        uri: request.uri.clone(),
329                    }) => {
330                        match result {
331                            Ok(result) => {
332                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
333                                info!(
334                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
335                                    self.mcp_id, request.uri
336                                );
337
338                                debug!("Proxying read_resource response for {}", request.uri);
339                                Ok(result)
340                            }
341                            Err(err) => {
342                                error!("Error reading resource: {:?}", err);
343                                Err(ErrorData::internal_error(
344                                    format!("Error reading resource: {err}"),
345                                    None,
346                                ))
347                            }
348                        }
349                    }
350                    _ = context.ct.cancelled() => {
351                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
352                        Err(ErrorData::internal_error(
353                            "Request cancelled".to_string(),
354                            None,
355                        ))
356                    }
357                }
358            }
359            None => {
360                // Server doesn't support resources, return error
361                error!("Server doesn't support resources capability");
362                Ok(rmcp::model::ReadResourceResult {
363                    contents: Vec::new(),
364                })
365            }
366        }
367    }
368
369    async fn list_resource_templates(
370        &self,
371        request: Option<PaginatedRequestParam>,
372        context: RequestContext<RoleServer>,
373    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
374        // 原子加载后端连接
375        let inner_guard = self.peer.load();
376        let inner = inner_guard.as_ref().ok_or_else(|| {
377            error!("Backend connection is not available (reconnecting)");
378            ErrorData::internal_error(
379                "Backend connection is not available, reconnecting...".to_string(),
380                None,
381            )
382        })?;
383
384        // 检查后端连接是否已关闭
385        if inner.peer.is_transport_closed() {
386            error!("Backend transport is closed");
387            return Err(ErrorData::internal_error(
388                "Backend connection closed, please retry".to_string(),
389                None,
390            ));
391        }
392
393        // Check if the server has resources capability and forward the request
394        match self.capabilities().resources {
395            Some(_) => {
396                tokio::select! {
397                    result = inner.peer.list_resource_templates(request) => {
398                        match result {
399                            Ok(result) => {
400                                debug!("Proxying list_resource_templates response");
401                                Ok(result)
402                            }
403                            Err(err) => {
404                                error!("Error listing resource templates: {:?}", err);
405                                Err(ErrorData::internal_error(
406                                    format!("Error listing resource templates: {err}"),
407                                    None,
408                                ))
409                            }
410                        }
411                    }
412                    _ = context.ct.cancelled() => {
413                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
414                        Err(ErrorData::internal_error(
415                            "Request cancelled".to_string(),
416                            None,
417                        ))
418                    }
419                }
420            }
421            None => {
422                // Server doesn't support resources, return empty list
423                warn!("Server doesn't support resources capability");
424                Ok(rmcp::model::ListResourceTemplatesResult::default())
425            }
426        }
427    }
428
429    async fn list_prompts(
430        &self,
431        request: Option<PaginatedRequestParam>,
432        context: RequestContext<RoleServer>,
433    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
434        // 原子加载后端连接
435        let inner_guard = self.peer.load();
436        let inner = inner_guard.as_ref().ok_or_else(|| {
437            error!("Backend connection is not available (reconnecting)");
438            ErrorData::internal_error(
439                "Backend connection is not available, reconnecting...".to_string(),
440                None,
441            )
442        })?;
443
444        // 检查后端连接是否已关闭
445        if inner.peer.is_transport_closed() {
446            error!("Backend transport is closed");
447            return Err(ErrorData::internal_error(
448                "Backend connection closed, please retry".to_string(),
449                None,
450            ));
451        }
452
453        // Check if the server has prompts capability and forward the request
454        match self.capabilities().prompts {
455            Some(_) => {
456                tokio::select! {
457                    result = inner.peer.list_prompts(request) => {
458                        match result {
459                            Ok(result) => {
460                                debug!("Proxying list_prompts response");
461                                Ok(result)
462                            }
463                            Err(err) => {
464                                error!("Error listing prompts: {:?}", err);
465                                Err(ErrorData::internal_error(
466                                    format!("Error listing prompts: {err}"),
467                                    None,
468                                ))
469                            }
470                        }
471                    }
472                    _ = context.ct.cancelled() => {
473                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
474                        Err(ErrorData::internal_error(
475                            "Request cancelled".to_string(),
476                            None,
477                        ))
478                    }
479                }
480            }
481            None => {
482                // Server doesn't support prompts, return empty list
483                warn!("Server doesn't support prompts capability");
484                Ok(rmcp::model::ListPromptsResult::default())
485            }
486        }
487    }
488
489    async fn get_prompt(
490        &self,
491        request: rmcp::model::GetPromptRequestParam,
492        context: RequestContext<RoleServer>,
493    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
494        // 原子加载后端连接
495        let inner_guard = self.peer.load();
496        let inner = inner_guard.as_ref().ok_or_else(|| {
497            error!("Backend connection is not available (reconnecting)");
498            ErrorData::internal_error(
499                "Backend connection is not available, reconnecting...".to_string(),
500                None,
501            )
502        })?;
503
504        // 检查后端连接是否已关闭
505        if inner.peer.is_transport_closed() {
506            error!("Backend transport is closed");
507            return Err(ErrorData::internal_error(
508                "Backend connection closed, please retry".to_string(),
509                None,
510            ));
511        }
512
513        // Check if the server has prompts capability and forward the request
514        match self.capabilities().prompts {
515            Some(_) => {
516                tokio::select! {
517                    result = inner.peer.get_prompt(request.clone()) => {
518                        match result {
519                            Ok(result) => {
520                                debug!("Proxying get_prompt response");
521                                Ok(result)
522                            }
523                            Err(err) => {
524                                error!("Error getting prompt: {:?}", err);
525                                Err(ErrorData::internal_error(
526                                    format!("Error getting prompt: {err}"),
527                                    None,
528                                ))
529                            }
530                        }
531                    }
532                    _ = context.ct.cancelled() => {
533                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
534                        Err(ErrorData::internal_error(
535                            "Request cancelled".to_string(),
536                            None,
537                        ))
538                    }
539                }
540            }
541            None => {
542                // Server doesn't support prompts, return error
543                warn!("Server doesn't support prompts capability");
544                Ok(rmcp::model::GetPromptResult {
545                    description: None,
546                    messages: Vec::new(),
547                })
548            }
549        }
550    }
551
552    async fn complete(
553        &self,
554        request: rmcp::model::CompleteRequestParam,
555        context: RequestContext<RoleServer>,
556    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
557        // 原子加载后端连接
558        let inner_guard = self.peer.load();
559        let inner = inner_guard.as_ref().ok_or_else(|| {
560            error!("Backend connection is not available (reconnecting)");
561            ErrorData::internal_error(
562                "Backend connection is not available, reconnecting...".to_string(),
563                None,
564            )
565        })?;
566
567        // 检查后端连接是否已关闭
568        if inner.peer.is_transport_closed() {
569            error!("Backend transport is closed");
570            return Err(ErrorData::internal_error(
571                "Backend connection closed, please retry".to_string(),
572                None,
573            ));
574        }
575
576        tokio::select! {
577            result = inner.peer.complete(request) => {
578                match result {
579                    Ok(result) => {
580                        debug!("Proxying complete response");
581                        Ok(result)
582                    }
583                    Err(err) => {
584                        error!("Error completing: {:?}", err);
585                        Err(ErrorData::internal_error(
586                            format!("Error completing: {err}"),
587                            None,
588                        ))
589                    }
590                }
591            }
592            _ = context.ct.cancelled() => {
593                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
594                Err(ErrorData::internal_error(
595                    "Request cancelled".to_string(),
596                    None,
597                ))
598            }
599        }
600    }
601
602    async fn on_progress(
603        &self,
604        notification: rmcp::model::ProgressNotificationParam,
605        _context: NotificationContext<RoleServer>,
606    ) {
607        // 原子加载后端连接
608        let inner_guard = self.peer.load();
609        let inner = match inner_guard.as_ref() {
610            Some(inner) => inner,
611            None => {
612                error!("Backend connection is not available, cannot forward progress notification");
613                return;
614            }
615        };
616
617        // 检查后端连接是否已关闭
618        if inner.peer.is_transport_closed() {
619            error!("Backend transport is closed, cannot forward progress notification");
620            return;
621        }
622
623        match inner.peer.notify_progress(notification).await {
624            Ok(_) => {
625                debug!("Proxying progress notification");
626            }
627            Err(err) => {
628                error!("Error notifying progress: {:?}", err);
629            }
630        }
631    }
632
633    async fn on_cancelled(
634        &self,
635        notification: rmcp::model::CancelledNotificationParam,
636        _context: NotificationContext<RoleServer>,
637    ) {
638        // 原子加载后端连接
639        let inner_guard = self.peer.load();
640        let inner = match inner_guard.as_ref() {
641            Some(inner) => inner,
642            None => {
643                error!("Backend connection is not available, cannot forward cancelled notification");
644                return;
645            }
646        };
647
648        // 检查后端连接是否已关闭
649        if inner.peer.is_transport_closed() {
650            error!("Backend transport is closed, cannot forward cancelled notification");
651            return;
652        }
653
654        match inner.peer.notify_cancelled(notification).await {
655            Ok(_) => {
656                debug!("Proxying cancelled notification");
657            }
658            Err(err) => {
659                error!("Error notifying cancelled: {:?}", err);
660            }
661        }
662    }
663}
664
665impl SseHandler {
666    /// 获取 capabilities 的引用,避免 clone
667    #[inline]
668    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
669        &self.cached_info.capabilities
670    }
671
672    /// 创建一个默认的 ServerInfo(用于断开状态)
673    fn default_server_info(mcp_id: &str) -> ServerInfo {
674        warn!("[SseHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
675        ServerInfo {
676            protocol_version: Default::default(),
677            server_info: Implementation {
678                name: "MCP Proxy".to_string(),
679                version: "0.1.0".to_string(),
680                title: None,
681                website_url: None,
682                icons: None,
683            },
684            instructions: None,
685            capabilities: Default::default(),
686        }
687    }
688
689    /// 从 RunningService 提取 ServerInfo
690    fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
691        client.peer_info().map(|peer_info| ServerInfo {
692            protocol_version: peer_info.protocol_version.clone(),
693            server_info: Implementation {
694                name: peer_info.server_info.name.clone(),
695                version: peer_info.server_info.version.clone(),
696                title: None,
697                website_url: None,
698                icons: None,
699            },
700            instructions: peer_info.instructions.clone(),
701            capabilities: peer_info.capabilities.clone(),
702        }).unwrap_or_else(|| Self::default_server_info(mcp_id))
703    }
704
705    /// 创建断开状态的 handler(用于初始化)
706    /// 后续通过 swap_backend() 注入实际的后端连接
707    pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
708        info!("[SseHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
709
710        // 记录过滤器配置
711        if tool_filter.is_enabled() {
712            if let Some(ref allow_list) = tool_filter.allow_tools {
713                info!(
714                    "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
715                    mcp_id, allow_list
716                );
717            }
718            if let Some(ref deny_list) = tool_filter.deny_tools {
719                info!(
720                    "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
721                    mcp_id, deny_list
722                );
723            }
724        }
725
726        Self {
727            peer: Arc::new(ArcSwapOption::empty()),
728            cached_info: default_info,
729            mcp_id,
730            tool_filter,
731        }
732    }
733
734    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
735        Self::with_mcp_id(client, "unknown".to_string())
736    }
737
738    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
739        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
740    }
741
742    /// 创建带工具过滤器的 SseHandler(带初始后端连接)
743    pub fn with_tool_filter(
744        client: RunningService<RoleClient, ClientInfo>,
745        mcp_id: String,
746        tool_filter: ToolFilter,
747    ) -> Self {
748        use std::ops::Deref;
749
750        // 提取 ServerInfo
751        let cached_info = Self::extract_server_info(&client, &mcp_id);
752
753        // 克隆 Peer 用于并发请求(无需锁)
754        let peer = client.deref().clone();
755
756        // 记录过滤器配置
757        if tool_filter.is_enabled() {
758            if let Some(ref allow_list) = tool_filter.allow_tools {
759                info!(
760                    "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
761                    mcp_id, allow_list
762                );
763            }
764            if let Some(ref deny_list) = tool_filter.deny_tools {
765                info!(
766                    "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
767                    mcp_id, deny_list
768                );
769            }
770        }
771
772        // 创建 PeerInner
773        let inner = PeerInner {
774            peer,
775            _running: Arc::new(client),
776        };
777
778        Self {
779            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
780            cached_info,
781            mcp_id,
782            tool_filter,
783        }
784    }
785
786    /// 原子性替换后端连接
787    /// - Some(client): 设置新的后端连接
788    /// - None: 标记后端断开
789    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
790        use std::ops::Deref;
791
792        match new_client {
793            Some(client) => {
794                let peer = client.deref().clone();
795                let inner = PeerInner {
796                    peer,
797                    _running: Arc::new(client),
798                };
799                self.peer.store(Some(Arc::new(inner)));
800                info!("[SseHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
801            }
802            None => {
803                self.peer.store(None);
804                info!("[SseHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
805            }
806        }
807    }
808
809    /// 检查后端是否可用(快速检查,不发送请求)
810    pub fn is_backend_available(&self) -> bool {
811        let inner_guard = self.peer.load();
812        match inner_guard.as_ref() {
813            Some(inner) => !inner.peer.is_transport_closed(),
814            None => false,
815        }
816    }
817
818    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
819    pub async fn is_mcp_server_ready(&self) -> bool {
820        !self.is_terminated_async().await
821    }
822
823    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
824    pub fn is_terminated(&self) -> bool {
825        !self.is_backend_available()
826    }
827
828    /// 异步检查后端连接是否已断开(会发送验证请求)
829    pub async fn is_terminated_async(&self) -> bool {
830        // 原子加载后端连接
831        let inner_guard = self.peer.load();
832        let inner = match inner_guard.as_ref() {
833            Some(inner) => inner,
834            None => return true,
835        };
836
837        // 快速检查 transport 状态
838        if inner.peer.is_transport_closed() {
839            return true;
840        }
841
842        // 通过发送轻量级请求来验证连接
843        match inner.peer.list_tools(None).await {
844            Ok(_) => {
845                debug!("后端连接状态检查: 正常");
846                false
847            }
848            Err(e) => {
849                info!("后端连接状态检查: 已断开,原因: {e}");
850                true
851            }
852        }
853    }
854
855    /// 获取 MCP ID
856    pub fn mcp_id(&self) -> &str {
857        &self.mcp_id
858    }
859
860    /// Update backend from an SseClientConnection
861    ///
862    /// This method allows updating the backend connection using the high-level
863    /// `SseClientConnection` type, which is more convenient than the raw
864    /// `RunningService` type.
865    ///
866    /// # Arguments
867    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
868    pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
869        match conn {
870            Some(c) => {
871                let running = c.into_running_service();
872                self.swap_backend(Some(running));
873            }
874            None => {
875                self.swap_backend(None);
876            }
877        }
878    }
879}