mcp_sse_proxy/
sse_handler.rs

1use tracing::{debug, info, warn, error};
2use std::time::{Instant, SystemTime};
3/**
4 * Create a local SSE server that proxies requests to a stdio MCP server.
5 */
6use rmcp::{
7    ErrorData, RoleClient, RoleServer, ServerHandler,
8    model::{
9        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10        PaginatedRequestParam, ServerInfo,
11    },
12    service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use arc_swap::ArcSwapOption;
16pub use mcp_common::ToolFilter;
17
18/// 包装后端连接和运行服务
19/// 用于 ArcSwap 热替换
20#[derive(Debug)]
21struct PeerInner {
22    /// Peer 用于发送请求
23    peer: Peer<RoleClient>,
24    /// 保持 RunningService 的所有权,确保服务生命周期
25    #[allow(dead_code)]
26    _running: Arc<RunningService<RoleClient, ClientInfo>>,
27}
28
29/// A SSE proxy handler that forwards requests to a client based on the server's capabilities
30/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
31///
32/// **SSE 模式**:使用 rmcp 0.10,稳定的 SSE 传输协议
33#[derive(Clone, Debug)]
34pub struct SseHandler {
35    /// 后端连接(ArcSwap 支持无锁原子替换)
36    /// None 表示后端断开/重连中
37    peer: Arc<ArcSwapOption<PeerInner>>,
38    /// 缓存的服务器信息(保持不变,重连后应一致)
39    cached_info: ServerInfo,
40    /// MCP ID 用于日志记录
41    mcp_id: String,
42    /// 工具过滤配置
43    tool_filter: ToolFilter,
44}
45
46impl ServerHandler for SseHandler {
47    fn get_info(&self) -> ServerInfo {
48        self.cached_info.clone()
49    }
50
51    #[tracing::instrument(skip(self, request, context), fields(
52        mcp_id = %self.mcp_id,
53        request = ?request,
54    ))]
55    async fn list_tools(
56        &self,
57        request: Option<PaginatedRequestParam>,
58        context: RequestContext<RoleServer>,
59    ) -> Result<ListToolsResult, ErrorData> {
60        // 原子加载后端连接
61        let inner_guard = self.peer.load();
62        let inner = inner_guard.as_ref().ok_or_else(|| {
63            error!("Backend connection is not available (reconnecting)");
64            ErrorData::internal_error(
65                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
66                None,
67            )
68        })?;
69
70        // 检查后端连接是否已关闭
71        if inner.peer.is_transport_closed() {
72            error!("Backend transport is closed");
73            return Err(ErrorData::internal_error(
74                "Backend connection closed, please retry".to_string(),
75                None,
76            ));
77        }
78
79        // Check if the server has tools capability and forward the request
80        match self.capabilities().tools {
81            Some(_) => {
82                // 使用 tokio::select! 同时等待取消和结果
83                tokio::select! {
84                    result = inner.peer.list_tools(request) => {
85                        match result {
86                            Ok(result) => {
87                                // 根据过滤配置过滤工具列表
88                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
89                                    result
90                                        .tools
91                                        .into_iter()
92                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
93                                        .collect()
94                                } else {
95                                    result.tools
96                                };
97
98                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
99                                info!(
100                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
101                                    self.mcp_id,
102                                    filtered_tools.len(),
103                                    if self.tool_filter.is_enabled() {
104                                        " (已过滤)"
105                                    } else {
106                                        ""
107                                    }
108                                );
109
110                                debug!(
111                                    "Proxying list_tools response with {} tools",
112                                    filtered_tools.len()
113                                );
114                                Ok(ListToolsResult {
115                                    tools: filtered_tools,
116                                    next_cursor: result.next_cursor,
117                                })
118                            }
119                            Err(err) => {
120                                error!("Error listing tools: {:?}", err);
121                                Err(ErrorData::internal_error(
122                                    format!("Error listing tools: {err}"),
123                                    None,
124                                ))
125                            }
126                        }
127                    }
128                    _ = context.ct.cancelled() => {
129                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
130                        Err(ErrorData::internal_error(
131                            "Request cancelled".to_string(),
132                            None,
133                        ))
134                    }
135                }
136            }
137            None => {
138                // Server doesn't support tools, return empty list
139                warn!("Server doesn't support tools capability");
140                Ok(ListToolsResult::default())
141            }
142        }
143    }
144
145    #[tracing::instrument(skip(self, request, context), fields(
146        mcp_id = %self.mcp_id,
147        tool_name = %request.name,
148        tool_arguments = ?request.arguments,
149    ))]
150    async fn call_tool(
151        &self,
152        request: CallToolRequestParam,
153        context: RequestContext<RoleServer>,
154    ) -> Result<CallToolResult, ErrorData> {
155        let start = Instant::now();
156        let start_time = SystemTime::now();
157        info!("🔧 开始执行工具: {}, 时间: {:?}", request.name, start_time);
158        
159        // 首先检查工具是否被过滤
160        if !self.tool_filter.is_allowed(&request.name) {
161            info!(
162                "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
163                self.mcp_id, request.name
164            );
165            return Ok(CallToolResult::error(vec![Content::text(format!(
166                "Tool '{}' is not allowed by filter configuration",
167                request.name
168            ))]));
169        }
170
171        // 原子加载后端连接
172        let inner_guard = self.peer.load();
173        let inner = match inner_guard.as_ref() {
174            Some(inner) => inner,
175            None => {
176                error!("Backend connection is not available (reconnecting)");
177                return Ok(CallToolResult::error(vec![Content::text(
178                    "Backend connection is not available, reconnecting..."
179                )]));
180            }
181        };
182
183        // 检查后端连接是否已关闭
184        if inner.peer.is_transport_closed() {
185            error!("Backend transport is closed");
186            return Ok(CallToolResult::error(vec![Content::text(
187                "Backend connection closed, please retry",
188            )]));
189        }
190
191        // Check if the server has tools capability and forward the request
192        let result = match self.capabilities().tools {
193            Some(_) => {
194                // 使用 tokio::select! 同时等待取消和结果
195                tokio::select! {
196                    result = inner.peer.call_tool(request.clone()) => {
197                        match result {
198                            Ok(result) => {
199                                // 记录工具调用结果,这些结果会通过 SSE 推送给客户端
200                                let elapsed = start.elapsed();
201                                info!(
202                                    "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}, 耗时: {}ms",
203                                    self.mcp_id, request.name, elapsed.as_millis()
204                                );
205
206                                debug!("Tool call succeeded");
207                                Ok(result)
208                            }
209                            Err(err) => {
210                                let elapsed = start.elapsed();
211                                error!("Error calling tool: {:?}, 耗时: {}ms", err, elapsed.as_millis());
212                                // Return an error result instead of propagating the error
213                                Ok(CallToolResult::error(vec![Content::text(format!(
214                                    "Error: {err}"
215                                ))]))
216                            }
217                        }
218                    }
219                    _ = context.ct.cancelled() => {
220                        let elapsed = start.elapsed();
221                        info!(
222                            "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}, 耗时: {}ms",
223                            self.mcp_id, request.name, elapsed.as_millis()
224                        );
225                        Ok(CallToolResult::error(vec![Content::text(
226                            "Request cancelled"
227                        )]))
228                    }
229                }
230            }
231            None => {
232                error!("Server doesn't support tools capability");
233                Ok(CallToolResult::error(vec![Content::text(
234                    "Server doesn't support tools capability",
235                )]))
236            }
237        };
238        
239        let total_elapsed = start.elapsed();
240        info!("✅ 工具执行完成: {}, 总耗时: {}ms", request.name, total_elapsed.as_millis());
241        result
242    }
243
244    async fn list_resources(
245        &self,
246        request: Option<PaginatedRequestParam>,
247        context: RequestContext<RoleServer>,
248    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
249        // 原子加载后端连接
250        let inner_guard = self.peer.load();
251        let inner = inner_guard.as_ref().ok_or_else(|| {
252            error!("Backend connection is not available (reconnecting)");
253            ErrorData::internal_error(
254                "Backend connection is not available, reconnecting...".to_string(),
255                None,
256            )
257        })?;
258
259        // 检查后端连接是否已关闭
260        if inner.peer.is_transport_closed() {
261            error!("Backend transport is closed");
262            return Err(ErrorData::internal_error(
263                "Backend connection closed, please retry".to_string(),
264                None,
265            ));
266        }
267
268        // Check if the server has resources capability and forward the request
269        match self.capabilities().resources {
270            Some(_) => {
271                tokio::select! {
272                    result = inner.peer.list_resources(request) => {
273                        match result {
274                            Ok(result) => {
275                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
276                                info!(
277                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
278                                    self.mcp_id,
279                                    result.resources.len()
280                                );
281
282                                debug!("Proxying list_resources response");
283                                Ok(result)
284                            }
285                            Err(err) => {
286                                error!("Error listing resources: {:?}", err);
287                                Err(ErrorData::internal_error(
288                                    format!("Error listing resources: {err}"),
289                                    None,
290                                ))
291                            }
292                        }
293                    }
294                    _ = context.ct.cancelled() => {
295                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
296                        Err(ErrorData::internal_error(
297                            "Request cancelled".to_string(),
298                            None,
299                        ))
300                    }
301                }
302            }
303            None => {
304                // Server doesn't support resources, return empty list
305                warn!("Server doesn't support resources capability");
306                Ok(rmcp::model::ListResourcesResult::default())
307            }
308        }
309    }
310
311    async fn read_resource(
312        &self,
313        request: rmcp::model::ReadResourceRequestParam,
314        context: RequestContext<RoleServer>,
315    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
316        // 原子加载后端连接
317        let inner_guard = self.peer.load();
318        let inner = inner_guard.as_ref().ok_or_else(|| {
319            error!("Backend connection is not available (reconnecting)");
320            ErrorData::internal_error(
321                "Backend connection is not available, reconnecting...".to_string(),
322                None,
323            )
324        })?;
325
326        // 检查后端连接是否已关闭
327        if inner.peer.is_transport_closed() {
328            error!("Backend transport is closed");
329            return Err(ErrorData::internal_error(
330                "Backend connection closed, please retry".to_string(),
331                None,
332            ));
333        }
334
335        // Check if the server has resources capability and forward the request
336        match self.capabilities().resources {
337            Some(_) => {
338                tokio::select! {
339                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
340                        uri: request.uri.clone(),
341                    }) => {
342                        match result {
343                            Ok(result) => {
344                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
345                                info!(
346                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
347                                    self.mcp_id, request.uri
348                                );
349
350                                debug!("Proxying read_resource response for {}", request.uri);
351                                Ok(result)
352                            }
353                            Err(err) => {
354                                error!("Error reading resource: {:?}", err);
355                                Err(ErrorData::internal_error(
356                                    format!("Error reading resource: {err}"),
357                                    None,
358                                ))
359                            }
360                        }
361                    }
362                    _ = context.ct.cancelled() => {
363                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
364                        Err(ErrorData::internal_error(
365                            "Request cancelled".to_string(),
366                            None,
367                        ))
368                    }
369                }
370            }
371            None => {
372                // Server doesn't support resources, return error
373                error!("Server doesn't support resources capability");
374                Ok(rmcp::model::ReadResourceResult {
375                    contents: Vec::new(),
376                })
377            }
378        }
379    }
380
381    async fn list_resource_templates(
382        &self,
383        request: Option<PaginatedRequestParam>,
384        context: RequestContext<RoleServer>,
385    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
386        // 原子加载后端连接
387        let inner_guard = self.peer.load();
388        let inner = inner_guard.as_ref().ok_or_else(|| {
389            error!("Backend connection is not available (reconnecting)");
390            ErrorData::internal_error(
391                "Backend connection is not available, reconnecting...".to_string(),
392                None,
393            )
394        })?;
395
396        // 检查后端连接是否已关闭
397        if inner.peer.is_transport_closed() {
398            error!("Backend transport is closed");
399            return Err(ErrorData::internal_error(
400                "Backend connection closed, please retry".to_string(),
401                None,
402            ));
403        }
404
405        // Check if the server has resources capability and forward the request
406        match self.capabilities().resources {
407            Some(_) => {
408                tokio::select! {
409                    result = inner.peer.list_resource_templates(request) => {
410                        match result {
411                            Ok(result) => {
412                                debug!("Proxying list_resource_templates response");
413                                Ok(result)
414                            }
415                            Err(err) => {
416                                error!("Error listing resource templates: {:?}", err);
417                                Err(ErrorData::internal_error(
418                                    format!("Error listing resource templates: {err}"),
419                                    None,
420                                ))
421                            }
422                        }
423                    }
424                    _ = context.ct.cancelled() => {
425                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
426                        Err(ErrorData::internal_error(
427                            "Request cancelled".to_string(),
428                            None,
429                        ))
430                    }
431                }
432            }
433            None => {
434                // Server doesn't support resources, return empty list
435                warn!("Server doesn't support resources capability");
436                Ok(rmcp::model::ListResourceTemplatesResult::default())
437            }
438        }
439    }
440
441    async fn list_prompts(
442        &self,
443        request: Option<PaginatedRequestParam>,
444        context: RequestContext<RoleServer>,
445    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
446        // 原子加载后端连接
447        let inner_guard = self.peer.load();
448        let inner = inner_guard.as_ref().ok_or_else(|| {
449            error!("Backend connection is not available (reconnecting)");
450            ErrorData::internal_error(
451                "Backend connection is not available, reconnecting...".to_string(),
452                None,
453            )
454        })?;
455
456        // 检查后端连接是否已关闭
457        if inner.peer.is_transport_closed() {
458            error!("Backend transport is closed");
459            return Err(ErrorData::internal_error(
460                "Backend connection closed, please retry".to_string(),
461                None,
462            ));
463        }
464
465        // Check if the server has prompts capability and forward the request
466        match self.capabilities().prompts {
467            Some(_) => {
468                tokio::select! {
469                    result = inner.peer.list_prompts(request) => {
470                        match result {
471                            Ok(result) => {
472                                debug!("Proxying list_prompts response");
473                                Ok(result)
474                            }
475                            Err(err) => {
476                                error!("Error listing prompts: {:?}", err);
477                                Err(ErrorData::internal_error(
478                                    format!("Error listing prompts: {err}"),
479                                    None,
480                                ))
481                            }
482                        }
483                    }
484                    _ = context.ct.cancelled() => {
485                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
486                        Err(ErrorData::internal_error(
487                            "Request cancelled".to_string(),
488                            None,
489                        ))
490                    }
491                }
492            }
493            None => {
494                // Server doesn't support prompts, return empty list
495                warn!("Server doesn't support prompts capability");
496                Ok(rmcp::model::ListPromptsResult::default())
497            }
498        }
499    }
500
501    async fn get_prompt(
502        &self,
503        request: rmcp::model::GetPromptRequestParam,
504        context: RequestContext<RoleServer>,
505    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
506        // 原子加载后端连接
507        let inner_guard = self.peer.load();
508        let inner = inner_guard.as_ref().ok_or_else(|| {
509            error!("Backend connection is not available (reconnecting)");
510            ErrorData::internal_error(
511                "Backend connection is not available, reconnecting...".to_string(),
512                None,
513            )
514        })?;
515
516        // 检查后端连接是否已关闭
517        if inner.peer.is_transport_closed() {
518            error!("Backend transport is closed");
519            return Err(ErrorData::internal_error(
520                "Backend connection closed, please retry".to_string(),
521                None,
522            ));
523        }
524
525        // Check if the server has prompts capability and forward the request
526        match self.capabilities().prompts {
527            Some(_) => {
528                tokio::select! {
529                    result = inner.peer.get_prompt(request.clone()) => {
530                        match result {
531                            Ok(result) => {
532                                debug!("Proxying get_prompt response");
533                                Ok(result)
534                            }
535                            Err(err) => {
536                                error!("Error getting prompt: {:?}", err);
537                                Err(ErrorData::internal_error(
538                                    format!("Error getting prompt: {err}"),
539                                    None,
540                                ))
541                            }
542                        }
543                    }
544                    _ = context.ct.cancelled() => {
545                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
546                        Err(ErrorData::internal_error(
547                            "Request cancelled".to_string(),
548                            None,
549                        ))
550                    }
551                }
552            }
553            None => {
554                // Server doesn't support prompts, return error
555                warn!("Server doesn't support prompts capability");
556                Ok(rmcp::model::GetPromptResult {
557                    description: None,
558                    messages: Vec::new(),
559                })
560            }
561        }
562    }
563
564    async fn complete(
565        &self,
566        request: rmcp::model::CompleteRequestParam,
567        context: RequestContext<RoleServer>,
568    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
569        // 原子加载后端连接
570        let inner_guard = self.peer.load();
571        let inner = inner_guard.as_ref().ok_or_else(|| {
572            error!("Backend connection is not available (reconnecting)");
573            ErrorData::internal_error(
574                "Backend connection is not available, reconnecting...".to_string(),
575                None,
576            )
577        })?;
578
579        // 检查后端连接是否已关闭
580        if inner.peer.is_transport_closed() {
581            error!("Backend transport is closed");
582            return Err(ErrorData::internal_error(
583                "Backend connection closed, please retry".to_string(),
584                None,
585            ));
586        }
587
588        tokio::select! {
589            result = inner.peer.complete(request) => {
590                match result {
591                    Ok(result) => {
592                        debug!("Proxying complete response");
593                        Ok(result)
594                    }
595                    Err(err) => {
596                        error!("Error completing: {:?}", err);
597                        Err(ErrorData::internal_error(
598                            format!("Error completing: {err}"),
599                            None,
600                        ))
601                    }
602                }
603            }
604            _ = context.ct.cancelled() => {
605                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
606                Err(ErrorData::internal_error(
607                    "Request cancelled".to_string(),
608                    None,
609                ))
610            }
611        }
612    }
613
614    async fn on_progress(
615        &self,
616        notification: rmcp::model::ProgressNotificationParam,
617        _context: NotificationContext<RoleServer>,
618    ) {
619        // 原子加载后端连接
620        let inner_guard = self.peer.load();
621        let inner = match inner_guard.as_ref() {
622            Some(inner) => inner,
623            None => {
624                error!("Backend connection is not available, cannot forward progress notification");
625                return;
626            }
627        };
628
629        // 检查后端连接是否已关闭
630        if inner.peer.is_transport_closed() {
631            error!("Backend transport is closed, cannot forward progress notification");
632            return;
633        }
634
635        match inner.peer.notify_progress(notification).await {
636            Ok(_) => {
637                debug!("Proxying progress notification");
638            }
639            Err(err) => {
640                error!("Error notifying progress: {:?}", err);
641            }
642        }
643    }
644
645    async fn on_cancelled(
646        &self,
647        notification: rmcp::model::CancelledNotificationParam,
648        _context: NotificationContext<RoleServer>,
649    ) {
650        // 原子加载后端连接
651        let inner_guard = self.peer.load();
652        let inner = match inner_guard.as_ref() {
653            Some(inner) => inner,
654            None => {
655                error!("Backend connection is not available, cannot forward cancelled notification");
656                return;
657            }
658        };
659
660        // 检查后端连接是否已关闭
661        if inner.peer.is_transport_closed() {
662            error!("Backend transport is closed, cannot forward cancelled notification");
663            return;
664        }
665
666        match inner.peer.notify_cancelled(notification).await {
667            Ok(_) => {
668                debug!("Proxying cancelled notification");
669            }
670            Err(err) => {
671                error!("Error notifying cancelled: {:?}", err);
672            }
673        }
674    }
675}
676
677impl SseHandler {
678    /// 获取 capabilities 的引用,避免 clone
679    #[inline]
680    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
681        &self.cached_info.capabilities
682    }
683
684    /// 创建一个默认的 ServerInfo(用于断开状态)
685    fn default_server_info(mcp_id: &str) -> ServerInfo {
686        warn!("[SseHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
687        ServerInfo {
688            protocol_version: Default::default(),
689            server_info: Implementation {
690                name: "MCP Proxy".to_string(),
691                version: "0.1.0".to_string(),
692                title: None,
693                website_url: None,
694                icons: None,
695            },
696            instructions: None,
697            capabilities: Default::default(),
698        }
699    }
700
701    /// 从 RunningService 提取 ServerInfo
702    fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
703        client.peer_info().map(|peer_info| ServerInfo {
704            protocol_version: peer_info.protocol_version.clone(),
705            server_info: Implementation {
706                name: peer_info.server_info.name.clone(),
707                version: peer_info.server_info.version.clone(),
708                title: None,
709                website_url: None,
710                icons: None,
711            },
712            instructions: peer_info.instructions.clone(),
713            capabilities: peer_info.capabilities.clone(),
714        }).unwrap_or_else(|| Self::default_server_info(mcp_id))
715    }
716
717    /// 创建断开状态的 handler(用于初始化)
718    /// 后续通过 swap_backend() 注入实际的后端连接
719    pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
720        info!("[SseHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
721
722        // 记录过滤器配置
723        if tool_filter.is_enabled() {
724            if let Some(ref allow_list) = tool_filter.allow_tools {
725                info!(
726                    "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
727                    mcp_id, allow_list
728                );
729            }
730            if let Some(ref deny_list) = tool_filter.deny_tools {
731                info!(
732                    "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
733                    mcp_id, deny_list
734                );
735            }
736        }
737
738        Self {
739            peer: Arc::new(ArcSwapOption::empty()),
740            cached_info: default_info,
741            mcp_id,
742            tool_filter,
743        }
744    }
745
746    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
747        Self::with_mcp_id(client, "unknown".to_string())
748    }
749
750    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
751        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
752    }
753
754    /// 创建带工具过滤器的 SseHandler(带初始后端连接)
755    pub fn with_tool_filter(
756        client: RunningService<RoleClient, ClientInfo>,
757        mcp_id: String,
758        tool_filter: ToolFilter,
759    ) -> Self {
760        use std::ops::Deref;
761
762        // 提取 ServerInfo
763        let cached_info = Self::extract_server_info(&client, &mcp_id);
764
765        // 克隆 Peer 用于并发请求(无需锁)
766        let peer = client.deref().clone();
767
768        // 记录过滤器配置
769        if tool_filter.is_enabled() {
770            if let Some(ref allow_list) = tool_filter.allow_tools {
771                info!(
772                    "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
773                    mcp_id, allow_list
774                );
775            }
776            if let Some(ref deny_list) = tool_filter.deny_tools {
777                info!(
778                    "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
779                    mcp_id, deny_list
780                );
781            }
782        }
783
784        // 创建 PeerInner
785        let inner = PeerInner {
786            peer,
787            _running: Arc::new(client),
788        };
789
790        Self {
791            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
792            cached_info,
793            mcp_id,
794            tool_filter,
795        }
796    }
797
798    /// 原子性替换后端连接
799    /// - Some(client): 设置新的后端连接
800    /// - None: 标记后端断开
801    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
802        use std::ops::Deref;
803
804        match new_client {
805            Some(client) => {
806                let peer = client.deref().clone();
807                let inner = PeerInner {
808                    peer,
809                    _running: Arc::new(client),
810                };
811                self.peer.store(Some(Arc::new(inner)));
812                info!("[SseHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
813            }
814            None => {
815                self.peer.store(None);
816                info!("[SseHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
817            }
818        }
819    }
820
821    /// 检查后端是否可用(快速检查,不发送请求)
822    pub fn is_backend_available(&self) -> bool {
823        let inner_guard = self.peer.load();
824        match inner_guard.as_ref() {
825            Some(inner) => !inner.peer.is_transport_closed(),
826            None => false,
827        }
828    }
829
830    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
831    pub async fn is_mcp_server_ready(&self) -> bool {
832        !self.is_terminated_async().await
833    }
834
835    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
836    pub fn is_terminated(&self) -> bool {
837        !self.is_backend_available()
838    }
839
840    /// 异步检查后端连接是否已断开(会发送验证请求)
841    pub async fn is_terminated_async(&self) -> bool {
842        // 原子加载后端连接
843        let inner_guard = self.peer.load();
844        let inner = match inner_guard.as_ref() {
845            Some(inner) => inner,
846            None => return true,
847        };
848
849        // 快速检查 transport 状态
850        if inner.peer.is_transport_closed() {
851            return true;
852        }
853
854        // 通过发送轻量级请求来验证连接
855        match inner.peer.list_tools(None).await {
856            Ok(_) => {
857                debug!("后端连接状态检查: 正常");
858                false
859            }
860            Err(e) => {
861                info!("后端连接状态检查: 已断开,原因: {e}");
862                true
863            }
864        }
865    }
866
867    /// 获取 MCP ID
868    pub fn mcp_id(&self) -> &str {
869        &self.mcp_id
870    }
871
872    /// Update backend from an SseClientConnection
873    ///
874    /// This method allows updating the backend connection using the high-level
875    /// `SseClientConnection` type, which is more convenient than the raw
876    /// `RunningService` type.
877    ///
878    /// # Arguments
879    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
880    pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
881        match conn {
882            Some(c) => {
883                let running = c.into_running_service();
884                self.swap_backend(Some(running));
885            }
886            None => {
887                self.swap_backend(None);
888            }
889        }
890    }
891}