Skip to main content

mcp_sse_proxy/
sse_handler.rs

1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3/**
4 * Create a local SSE server that proxies requests to a stdio MCP server.
5 */
6use rmcp::{
7    ErrorData, RoleClient, RoleServer, ServerHandler,
8    model::{
9        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10        PaginatedRequestParam, ProtocolVersion, ServerInfo,
11    },
12    service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Instant, SystemTime};
17use tracing::{debug, error, info, warn};
18
19/// 全局请求计数器,用于生成唯一的请求 ID
20static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22/// 包装后端连接和运行服务
23/// 用于 ArcSwap 热替换
24#[derive(Debug)]
25struct PeerInner {
26    /// Peer 用于发送请求
27    peer: Peer<RoleClient>,
28    /// 保持 RunningService 的所有权,确保服务生命周期
29    #[allow(dead_code)]
30    _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33/// A SSE proxy handler that forwards requests to a client based on the server's capabilities
34/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
35///
36/// **SSE 模式**:使用 rmcp 0.10,稳定的 SSE 传输协议
37#[derive(Clone, Debug)]
38pub struct SseHandler {
39    /// 后端连接(ArcSwap 支持无锁原子替换)
40    /// None 表示后端断开/重连中
41    peer: Arc<ArcSwapOption<PeerInner>>,
42    /// 缓存的服务器信息(保持不变,重连后应一致)
43    cached_info: ServerInfo,
44    /// MCP ID 用于日志记录
45    mcp_id: String,
46    /// 工具过滤配置
47    tool_filter: ToolFilter,
48}
49
50impl ServerHandler for SseHandler {
51    fn get_info(&self) -> ServerInfo {
52        self.cached_info.clone()
53    }
54
55    #[tracing::instrument(skip(self, request, context), fields(
56        mcp_id = %self.mcp_id,
57        request = ?request,
58    ))]
59    async fn list_tools(
60        &self,
61        request: Option<PaginatedRequestParam>,
62        context: RequestContext<RoleServer>,
63    ) -> Result<ListToolsResult, ErrorData> {
64        // 原子加载后端连接
65        let inner_guard = self.peer.load();
66        let inner = inner_guard.as_ref().ok_or_else(|| {
67            error!("Backend connection is not available (reconnecting)");
68            ErrorData::internal_error(
69                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
70                None,
71            )
72        })?;
73
74        // 检查后端连接是否已关闭
75        if inner.peer.is_transport_closed() {
76            error!("Backend transport is closed");
77            return Err(ErrorData::internal_error(
78                "Backend connection closed, please retry".to_string(),
79                None,
80            ));
81        }
82
83        // Check if the server has tools capability and forward the request
84        match self.capabilities().tools {
85            Some(_) => {
86                // 使用 tokio::select! 同时等待取消和结果
87                tokio::select! {
88                    result = inner.peer.list_tools(request) => {
89                        match result {
90                            Ok(result) => {
91                                // 根据过滤配置过滤工具列表
92                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
93                                    result
94                                        .tools
95                                        .into_iter()
96                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
97                                        .collect()
98                                } else {
99                                    result.tools
100                                };
101
102                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
103                                info!(
104                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
105                                    self.mcp_id,
106                                    filtered_tools.len(),
107                                    if self.tool_filter.is_enabled() {
108                                        " (已过滤)"
109                                    } else {
110                                        ""
111                                    }
112                                );
113
114                                debug!(
115                                    "Proxying list_tools response with {} tools",
116                                    filtered_tools.len()
117                                );
118                                Ok(ListToolsResult {
119                                    tools: filtered_tools,
120                                    next_cursor: result.next_cursor,
121                                })
122                            }
123                            Err(err) => {
124                                error!("Error listing tools: {:?}", err);
125                                Err(ErrorData::internal_error(
126                                    format!("Error listing tools: {err}"),
127                                    None,
128                                ))
129                            }
130                        }
131                    }
132                    _ = context.ct.cancelled() => {
133                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
134                        Err(ErrorData::internal_error(
135                            "Request cancelled".to_string(),
136                            None,
137                        ))
138                    }
139                }
140            }
141            None => {
142                // Server doesn't support tools, return empty list
143                warn!("Server doesn't support tools capability");
144                Ok(ListToolsResult::default())
145            }
146        }
147    }
148
149    #[tracing::instrument(skip(self, request, context), fields(
150        mcp_id = %self.mcp_id,
151        tool_name = %request.name,
152        tool_arguments = ?request.arguments,
153    ))]
154    async fn call_tool(
155        &self,
156        request: CallToolRequestParam,
157        context: RequestContext<RoleServer>,
158    ) -> Result<CallToolResult, ErrorData> {
159        // 生成唯一请求 ID 用于追踪
160        let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
161        let start = Instant::now();
162        let start_time = SystemTime::now();
163
164        info!(
165            "[call_tool:{}] 开始 - 工具: {}, MCP ID: {}, 时间: {:?}",
166            request_id, request.name, self.mcp_id, start_time
167        );
168
169        // 首先检查工具是否被过滤
170        if !self.tool_filter.is_allowed(&request.name) {
171            info!(
172                "[call_tool:{}] 工具被过滤 - MCP ID: {}, 工具: {}",
173                request_id, self.mcp_id, request.name
174            );
175            return Ok(CallToolResult::error(vec![Content::text(format!(
176                "Tool '{}' is not allowed by filter configuration",
177                request.name
178            ))]));
179        }
180
181        // 原子加载后端连接
182        let inner_guard = self.peer.load();
183        let inner = match inner_guard.as_ref() {
184            Some(inner) => {
185                let transport_closed = inner.peer.is_transport_closed();
186                info!(
187                    "[call_tool:{}] 后端连接存在 - transport_closed: {}",
188                    request_id, transport_closed
189                );
190                inner
191            }
192            None => {
193                error!(
194                    "[call_tool:{}] 后端连接不可用 (正在重连) - MCP ID: {}",
195                    request_id, self.mcp_id
196                );
197                return Ok(CallToolResult::error(vec![Content::text(
198                    "Backend connection is not available, reconnecting...",
199                )]));
200            }
201        };
202
203        // 检查后端连接是否已关闭
204        if inner.peer.is_transport_closed() {
205            error!(
206                "[call_tool:{}] 后端 transport 已关闭 - MCP ID: {}",
207                request_id, self.mcp_id
208            );
209            return Ok(CallToolResult::error(vec![Content::text(
210                "Backend connection closed, please retry",
211            )]));
212        }
213
214        // Check if the server has tools capability and forward the request
215        let result = match self.capabilities().tools {
216            Some(_) => {
217                // 记录发送请求到后端的时间点
218                info!(
219                    "[call_tool:{}] 发送请求到后端... - 工具: {}, 已耗时: {}ms",
220                    request_id,
221                    request.name,
222                    start.elapsed().as_millis()
223                );
224
225                // 使用 tokio::select! 同时等待取消和结果
226                tokio::select! {
227                    result = inner.peer.call_tool(request.clone()) => {
228                        let elapsed = start.elapsed();
229                        match &result {
230                            Ok(call_result) => {
231                                // 记录工具调用结果,这些结果会通过 SSE 推送给客户端
232                                let is_error = call_result.is_error.unwrap_or(false);
233                                info!(
234                                    "[call_tool:{}] 收到响应 - 工具: {}, 耗时: {}ms, is_error: {}, MCP ID: {}",
235                                    request_id, request.name, elapsed.as_millis(), is_error, self.mcp_id
236                                );
237                                if is_error {
238                                    // 记录错误响应的内容(用于调试)
239                                    debug!(
240                                        "[call_tool:{}] 错误响应内容: {:?}",
241                                        request_id, call_result.content
242                                    );
243                                }
244                                Ok(call_result.clone())
245                            }
246                            Err(err) => {
247                                error!(
248                                    "[call_tool:{}] 后端返回错误 - 工具: {}, 耗时: {}ms, 错误: {:?}, MCP ID: {}",
249                                    request_id, request.name, elapsed.as_millis(), err, self.mcp_id
250                                );
251                                // Return an error result instead of propagating the error
252                                Ok(CallToolResult::error(vec![Content::text(format!(
253                                    "Error: {err}"
254                                ))]))
255                            }
256                        }
257                    }
258                    _ = context.ct.cancelled() => {
259                        let elapsed = start.elapsed();
260                        warn!(
261                            "[call_tool:{}] 请求被取消 - 工具: {}, 耗时: {}ms, MCP ID: {}",
262                            request_id, request.name, elapsed.as_millis(), self.mcp_id
263                        );
264                        Ok(CallToolResult::error(vec![Content::text(
265                            "Request cancelled"
266                        )]))
267                    }
268                }
269            }
270            None => {
271                error!(
272                    "[call_tool:{}] 服务器不支持 tools capability - MCP ID: {}",
273                    request_id, self.mcp_id
274                );
275                Ok(CallToolResult::error(vec![Content::text(
276                    "Server doesn't support tools capability",
277                )]))
278            }
279        };
280
281        let total_elapsed = start.elapsed();
282        info!(
283            "[call_tool:{}] 完成 - 工具: {}, 总耗时: {}ms",
284            request_id,
285            request.name,
286            total_elapsed.as_millis()
287        );
288        result
289    }
290
291    async fn list_resources(
292        &self,
293        request: Option<PaginatedRequestParam>,
294        context: RequestContext<RoleServer>,
295    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
296        // 原子加载后端连接
297        let inner_guard = self.peer.load();
298        let inner = inner_guard.as_ref().ok_or_else(|| {
299            error!("Backend connection is not available (reconnecting)");
300            ErrorData::internal_error(
301                "Backend connection is not available, reconnecting...".to_string(),
302                None,
303            )
304        })?;
305
306        // 检查后端连接是否已关闭
307        if inner.peer.is_transport_closed() {
308            error!("Backend transport is closed");
309            return Err(ErrorData::internal_error(
310                "Backend connection closed, please retry".to_string(),
311                None,
312            ));
313        }
314
315        // Check if the server has resources capability and forward the request
316        match self.capabilities().resources {
317            Some(_) => {
318                tokio::select! {
319                    result = inner.peer.list_resources(request) => {
320                        match result {
321                            Ok(result) => {
322                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
323                                info!(
324                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
325                                    self.mcp_id,
326                                    result.resources.len()
327                                );
328
329                                debug!("Proxying list_resources response");
330                                Ok(result)
331                            }
332                            Err(err) => {
333                                error!("Error listing resources: {:?}", err);
334                                Err(ErrorData::internal_error(
335                                    format!("Error listing resources: {err}"),
336                                    None,
337                                ))
338                            }
339                        }
340                    }
341                    _ = context.ct.cancelled() => {
342                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
343                        Err(ErrorData::internal_error(
344                            "Request cancelled".to_string(),
345                            None,
346                        ))
347                    }
348                }
349            }
350            None => {
351                // Server doesn't support resources, return empty list
352                warn!("Server doesn't support resources capability");
353                Ok(rmcp::model::ListResourcesResult::default())
354            }
355        }
356    }
357
358    async fn read_resource(
359        &self,
360        request: rmcp::model::ReadResourceRequestParam,
361        context: RequestContext<RoleServer>,
362    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
363        // 原子加载后端连接
364        let inner_guard = self.peer.load();
365        let inner = inner_guard.as_ref().ok_or_else(|| {
366            error!("Backend connection is not available (reconnecting)");
367            ErrorData::internal_error(
368                "Backend connection is not available, reconnecting...".to_string(),
369                None,
370            )
371        })?;
372
373        // 检查后端连接是否已关闭
374        if inner.peer.is_transport_closed() {
375            error!("Backend transport is closed");
376            return Err(ErrorData::internal_error(
377                "Backend connection closed, please retry".to_string(),
378                None,
379            ));
380        }
381
382        // Check if the server has resources capability and forward the request
383        match self.capabilities().resources {
384            Some(_) => {
385                tokio::select! {
386                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
387                        uri: request.uri.clone(),
388                    }) => {
389                        match result {
390                            Ok(result) => {
391                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
392                                info!(
393                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
394                                    self.mcp_id, request.uri
395                                );
396
397                                debug!("Proxying read_resource response for {}", request.uri);
398                                Ok(result)
399                            }
400                            Err(err) => {
401                                error!("Error reading resource: {:?}", err);
402                                Err(ErrorData::internal_error(
403                                    format!("Error reading resource: {err}"),
404                                    None,
405                                ))
406                            }
407                        }
408                    }
409                    _ = context.ct.cancelled() => {
410                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
411                        Err(ErrorData::internal_error(
412                            "Request cancelled".to_string(),
413                            None,
414                        ))
415                    }
416                }
417            }
418            None => {
419                // Server doesn't support resources, return error
420                error!("Server doesn't support resources capability");
421                Ok(rmcp::model::ReadResourceResult {
422                    contents: Vec::new(),
423                })
424            }
425        }
426    }
427
428    async fn list_resource_templates(
429        &self,
430        request: Option<PaginatedRequestParam>,
431        context: RequestContext<RoleServer>,
432    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
433        // 原子加载后端连接
434        let inner_guard = self.peer.load();
435        let inner = inner_guard.as_ref().ok_or_else(|| {
436            error!("Backend connection is not available (reconnecting)");
437            ErrorData::internal_error(
438                "Backend connection is not available, reconnecting...".to_string(),
439                None,
440            )
441        })?;
442
443        // 检查后端连接是否已关闭
444        if inner.peer.is_transport_closed() {
445            error!("Backend transport is closed");
446            return Err(ErrorData::internal_error(
447                "Backend connection closed, please retry".to_string(),
448                None,
449            ));
450        }
451
452        // Check if the server has resources capability and forward the request
453        match self.capabilities().resources {
454            Some(_) => {
455                tokio::select! {
456                    result = inner.peer.list_resource_templates(request) => {
457                        match result {
458                            Ok(result) => {
459                                debug!("Proxying list_resource_templates response");
460                                Ok(result)
461                            }
462                            Err(err) => {
463                                error!("Error listing resource templates: {:?}", err);
464                                Err(ErrorData::internal_error(
465                                    format!("Error listing resource templates: {err}"),
466                                    None,
467                                ))
468                            }
469                        }
470                    }
471                    _ = context.ct.cancelled() => {
472                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
473                        Err(ErrorData::internal_error(
474                            "Request cancelled".to_string(),
475                            None,
476                        ))
477                    }
478                }
479            }
480            None => {
481                // Server doesn't support resources, return empty list
482                warn!("Server doesn't support resources capability");
483                Ok(rmcp::model::ListResourceTemplatesResult::default())
484            }
485        }
486    }
487
488    async fn list_prompts(
489        &self,
490        request: Option<PaginatedRequestParam>,
491        context: RequestContext<RoleServer>,
492    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
493        // 原子加载后端连接
494        let inner_guard = self.peer.load();
495        let inner = inner_guard.as_ref().ok_or_else(|| {
496            error!("Backend connection is not available (reconnecting)");
497            ErrorData::internal_error(
498                "Backend connection is not available, reconnecting...".to_string(),
499                None,
500            )
501        })?;
502
503        // 检查后端连接是否已关闭
504        if inner.peer.is_transport_closed() {
505            error!("Backend transport is closed");
506            return Err(ErrorData::internal_error(
507                "Backend connection closed, please retry".to_string(),
508                None,
509            ));
510        }
511
512        // Check if the server has prompts capability and forward the request
513        match self.capabilities().prompts {
514            Some(_) => {
515                tokio::select! {
516                    result = inner.peer.list_prompts(request) => {
517                        match result {
518                            Ok(result) => {
519                                debug!("Proxying list_prompts response");
520                                Ok(result)
521                            }
522                            Err(err) => {
523                                error!("Error listing prompts: {:?}", err);
524                                Err(ErrorData::internal_error(
525                                    format!("Error listing prompts: {err}"),
526                                    None,
527                                ))
528                            }
529                        }
530                    }
531                    _ = context.ct.cancelled() => {
532                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
533                        Err(ErrorData::internal_error(
534                            "Request cancelled".to_string(),
535                            None,
536                        ))
537                    }
538                }
539            }
540            None => {
541                // Server doesn't support prompts, return empty list
542                warn!("Server doesn't support prompts capability");
543                Ok(rmcp::model::ListPromptsResult::default())
544            }
545        }
546    }
547
548    async fn get_prompt(
549        &self,
550        request: rmcp::model::GetPromptRequestParam,
551        context: RequestContext<RoleServer>,
552    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
553        // 原子加载后端连接
554        let inner_guard = self.peer.load();
555        let inner = inner_guard.as_ref().ok_or_else(|| {
556            error!("Backend connection is not available (reconnecting)");
557            ErrorData::internal_error(
558                "Backend connection is not available, reconnecting...".to_string(),
559                None,
560            )
561        })?;
562
563        // 检查后端连接是否已关闭
564        if inner.peer.is_transport_closed() {
565            error!("Backend transport is closed");
566            return Err(ErrorData::internal_error(
567                "Backend connection closed, please retry".to_string(),
568                None,
569            ));
570        }
571
572        // Check if the server has prompts capability and forward the request
573        match self.capabilities().prompts {
574            Some(_) => {
575                tokio::select! {
576                    result = inner.peer.get_prompt(request.clone()) => {
577                        match result {
578                            Ok(result) => {
579                                debug!("Proxying get_prompt response");
580                                Ok(result)
581                            }
582                            Err(err) => {
583                                error!("Error getting prompt: {:?}", err);
584                                Err(ErrorData::internal_error(
585                                    format!("Error getting prompt: {err}"),
586                                    None,
587                                ))
588                            }
589                        }
590                    }
591                    _ = context.ct.cancelled() => {
592                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
593                        Err(ErrorData::internal_error(
594                            "Request cancelled".to_string(),
595                            None,
596                        ))
597                    }
598                }
599            }
600            None => {
601                // Server doesn't support prompts, return error
602                warn!("Server doesn't support prompts capability");
603                Ok(rmcp::model::GetPromptResult {
604                    description: None,
605                    messages: Vec::new(),
606                })
607            }
608        }
609    }
610
611    async fn complete(
612        &self,
613        request: rmcp::model::CompleteRequestParam,
614        context: RequestContext<RoleServer>,
615    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
616        // 原子加载后端连接
617        let inner_guard = self.peer.load();
618        let inner = inner_guard.as_ref().ok_or_else(|| {
619            error!("Backend connection is not available (reconnecting)");
620            ErrorData::internal_error(
621                "Backend connection is not available, reconnecting...".to_string(),
622                None,
623            )
624        })?;
625
626        // 检查后端连接是否已关闭
627        if inner.peer.is_transport_closed() {
628            error!("Backend transport is closed");
629            return Err(ErrorData::internal_error(
630                "Backend connection closed, please retry".to_string(),
631                None,
632            ));
633        }
634
635        tokio::select! {
636            result = inner.peer.complete(request) => {
637                match result {
638                    Ok(result) => {
639                        debug!("Proxying complete response");
640                        Ok(result)
641                    }
642                    Err(err) => {
643                        error!("Error completing: {:?}", err);
644                        Err(ErrorData::internal_error(
645                            format!("Error completing: {err}"),
646                            None,
647                        ))
648                    }
649                }
650            }
651            _ = context.ct.cancelled() => {
652                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
653                Err(ErrorData::internal_error(
654                    "Request cancelled".to_string(),
655                    None,
656                ))
657            }
658        }
659    }
660
661    async fn on_progress(
662        &self,
663        notification: rmcp::model::ProgressNotificationParam,
664        _context: NotificationContext<RoleServer>,
665    ) {
666        // 原子加载后端连接
667        let inner_guard = self.peer.load();
668        let inner = match inner_guard.as_ref() {
669            Some(inner) => inner,
670            None => {
671                error!("Backend connection is not available, cannot forward progress notification");
672                return;
673            }
674        };
675
676        // 检查后端连接是否已关闭
677        if inner.peer.is_transport_closed() {
678            error!("Backend transport is closed, cannot forward progress notification");
679            return;
680        }
681
682        match inner.peer.notify_progress(notification).await {
683            Ok(_) => {
684                debug!("Proxying progress notification");
685            }
686            Err(err) => {
687                error!("Error notifying progress: {:?}", err);
688            }
689        }
690    }
691
692    async fn on_cancelled(
693        &self,
694        notification: rmcp::model::CancelledNotificationParam,
695        _context: NotificationContext<RoleServer>,
696    ) {
697        // 原子加载后端连接
698        let inner_guard = self.peer.load();
699        let inner = match inner_guard.as_ref() {
700            Some(inner) => inner,
701            None => {
702                error!(
703                    "Backend connection is not available, cannot forward cancelled notification"
704                );
705                return;
706            }
707        };
708
709        // 检查后端连接是否已关闭
710        if inner.peer.is_transport_closed() {
711            error!("Backend transport is closed, cannot forward cancelled notification");
712            return;
713        }
714
715        match inner.peer.notify_cancelled(notification).await {
716            Ok(_) => {
717                debug!("Proxying cancelled notification");
718            }
719            Err(err) => {
720                error!("Error notifying cancelled: {:?}", err);
721            }
722        }
723    }
724}
725
726impl SseHandler {
727    /// 获取 capabilities 的引用,避免 clone
728    #[inline]
729    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
730        &self.cached_info.capabilities
731    }
732
733    /// 创建一个默认的 ServerInfo(用于断开状态)
734    fn default_server_info(mcp_id: &str) -> ServerInfo {
735        warn!("[SseHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
736        ServerInfo {
737            protocol_version: ProtocolVersion::V_2024_11_05,
738            server_info: Implementation {
739                name: "MCP Proxy".to_string(),
740                version: "0.1.0".to_string(),
741                title: None,
742                website_url: None,
743                icons: None,
744            },
745            instructions: None,
746            capabilities: Default::default(),
747        }
748    }
749
750    /// 从 RunningService 提取 ServerInfo
751    fn extract_server_info(
752        client: &RunningService<RoleClient, ClientInfo>,
753        mcp_id: &str,
754    ) -> ServerInfo {
755        client
756            .peer_info()
757            .map(|peer_info| ServerInfo {
758                protocol_version: peer_info.protocol_version.clone(),
759                server_info: Implementation {
760                    name: peer_info.server_info.name.clone(),
761                    version: peer_info.server_info.version.clone(),
762                    title: None,
763                    website_url: None,
764                    icons: None,
765                },
766                instructions: peer_info.instructions.clone(),
767                capabilities: peer_info.capabilities.clone(),
768            })
769            .unwrap_or_else(|| Self::default_server_info(mcp_id))
770    }
771
772    /// 创建断开状态的 handler(用于初始化)
773    /// 后续通过 swap_backend() 注入实际的后端连接
774    pub fn new_disconnected(
775        mcp_id: String,
776        tool_filter: ToolFilter,
777        default_info: ServerInfo,
778    ) -> Self {
779        info!("[SseHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
780
781        // 记录过滤器配置
782        if tool_filter.is_enabled() {
783            if let Some(ref allow_list) = tool_filter.allow_tools {
784                info!(
785                    "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
786                    mcp_id, allow_list
787                );
788            }
789            if let Some(ref deny_list) = tool_filter.deny_tools {
790                info!(
791                    "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
792                    mcp_id, deny_list
793                );
794            }
795        }
796
797        Self {
798            peer: Arc::new(ArcSwapOption::empty()),
799            cached_info: default_info,
800            mcp_id,
801            tool_filter,
802        }
803    }
804
805    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
806        Self::with_mcp_id(client, "unknown".to_string())
807    }
808
809    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
810        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
811    }
812
813    /// 创建带工具过滤器的 SseHandler(带初始后端连接)
814    pub fn with_tool_filter(
815        client: RunningService<RoleClient, ClientInfo>,
816        mcp_id: String,
817        tool_filter: ToolFilter,
818    ) -> Self {
819        use std::ops::Deref;
820
821        // 提取 ServerInfo
822        let cached_info = Self::extract_server_info(&client, &mcp_id);
823
824        // 克隆 Peer 用于并发请求(无需锁)
825        let peer = client.deref().clone();
826
827        // 记录过滤器配置
828        if tool_filter.is_enabled() {
829            if let Some(ref allow_list) = tool_filter.allow_tools {
830                info!(
831                    "[SseHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
832                    mcp_id, allow_list
833                );
834            }
835            if let Some(ref deny_list) = tool_filter.deny_tools {
836                info!(
837                    "[SseHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
838                    mcp_id, deny_list
839                );
840            }
841        }
842
843        // 创建 PeerInner
844        let inner = PeerInner {
845            peer,
846            _running: Arc::new(client),
847        };
848
849        Self {
850            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
851            cached_info,
852            mcp_id,
853            tool_filter,
854        }
855    }
856
857    /// 原子性替换后端连接
858    /// - Some(client): 设置新的后端连接
859    /// - None: 标记后端断开
860    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
861        use std::ops::Deref;
862
863        match new_client {
864            Some(client) => {
865                let peer = client.deref().clone();
866                let inner = PeerInner {
867                    peer,
868                    _running: Arc::new(client),
869                };
870                self.peer.store(Some(Arc::new(inner)));
871                info!("[SseHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
872            }
873            None => {
874                self.peer.store(None);
875                info!("[SseHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
876            }
877        }
878    }
879
880    /// 检查后端是否可用(快速检查,不发送请求)
881    pub fn is_backend_available(&self) -> bool {
882        let inner_guard = self.peer.load();
883        match inner_guard.as_ref() {
884            Some(inner) => !inner.peer.is_transport_closed(),
885            None => false,
886        }
887    }
888
889    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
890    pub async fn is_mcp_server_ready(&self) -> bool {
891        !self.is_terminated_async().await
892    }
893
894    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
895    pub fn is_terminated(&self) -> bool {
896        !self.is_backend_available()
897    }
898
899    /// 异步检查后端连接是否已断开(会发送验证请求)
900    pub async fn is_terminated_async(&self) -> bool {
901        // 原子加载后端连接
902        let inner_guard = self.peer.load();
903        let inner = match inner_guard.as_ref() {
904            Some(inner) => inner,
905            None => return true,
906        };
907
908        // 快速检查 transport 状态
909        if inner.peer.is_transport_closed() {
910            return true;
911        }
912
913        // 通过发送轻量级请求来验证连接
914        match inner.peer.list_tools(None).await {
915            Ok(_) => {
916                debug!("后端连接状态检查: 正常");
917                false
918            }
919            Err(e) => {
920                info!("后端连接状态检查: 已断开,原因: {e}");
921                true
922            }
923        }
924    }
925
926    /// 获取 MCP ID
927    pub fn mcp_id(&self) -> &str {
928        &self.mcp_id
929    }
930
931    /// Update backend from an SseClientConnection
932    ///
933    /// This method allows updating the backend connection using the high-level
934    /// `SseClientConnection` type, which is more convenient than the raw
935    /// `RunningService` type.
936    ///
937    /// # Arguments
938    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
939    pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
940        match conn {
941            Some(c) => {
942                let running = c.into_running_service();
943                self.swap_backend(Some(running));
944            }
945            None => {
946                self.swap_backend(None);
947            }
948        }
949    }
950}