Skip to main content

mcp_sse_proxy/
sse_handler.rs

1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3/**
4 * Create a local SSE server that proxies requests to a stdio MCP server.
5 */
6use rmcp::{
7    ErrorData, RoleClient, RoleServer, ServerHandler,
8    model::{
9        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10        PaginatedRequestParam, ProtocolVersion, ServerInfo,
11    },
12    service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{Instant, SystemTime};
17use tracing::{debug, error, info, warn};
18
19/// 全局请求计数器,用于生成唯一的请求 ID
20static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22/// 包装后端连接和运行服务
23/// 用于 ArcSwap 热替换
24#[derive(Debug)]
25struct PeerInner {
26    /// Peer 用于发送请求
27    peer: Peer<RoleClient>,
28    /// 保持 RunningService 的所有权,确保服务生命周期
29    #[allow(dead_code)]
30    _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33/// A SSE proxy handler that forwards requests to a client based on the server's capabilities
34/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
35///
36/// **SSE 模式**:使用 rmcp 0.10,稳定的 SSE 传输协议
37#[derive(Clone, Debug)]
38pub struct SseHandler {
39    /// 后端连接(ArcSwap 支持无锁原子替换)
40    /// None 表示后端断开/重连中
41    peer: Arc<ArcSwapOption<PeerInner>>,
42    /// 缓存的服务器信息(保持不变,重连后应一致)
43    cached_info: ServerInfo,
44    /// MCP ID 用于日志记录
45    mcp_id: String,
46    /// 工具过滤配置
47    tool_filter: ToolFilter,
48}
49
50impl ServerHandler for SseHandler {
51    fn get_info(&self) -> ServerInfo {
52        self.cached_info.clone()
53    }
54
55    #[tracing::instrument(skip(self, request, context), fields(
56        mcp_id = %self.mcp_id,
57        request = ?request,
58    ))]
59    async fn list_tools(
60        &self,
61        request: Option<PaginatedRequestParam>,
62        context: RequestContext<RoleServer>,
63    ) -> Result<ListToolsResult, ErrorData> {
64        // 原子加载后端连接
65        let inner_guard = self.peer.load();
66        let inner = inner_guard.as_ref().ok_or_else(|| {
67            error!("Backend connection is not available (reconnecting)");
68            ErrorData::internal_error(
69                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
70                None,
71            )
72        })?;
73
74        // 检查后端连接是否已关闭
75        if inner.peer.is_transport_closed() {
76            error!("Backend transport is closed");
77            return Err(ErrorData::internal_error(
78                "Backend connection closed, please retry".to_string(),
79                None,
80            ));
81        }
82
83        // Check if the server has tools capability and forward the request
84        match self.capabilities().tools {
85            Some(_) => {
86                // 使用 tokio::select! 同时等待取消和结果
87                tokio::select! {
88                    result = inner.peer.list_tools(request) => {
89                        match result {
90                            Ok(result) => {
91                                // 根据过滤配置过滤工具列表
92                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
93                                    result
94                                        .tools
95                                        .into_iter()
96                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
97                                        .collect()
98                                } else {
99                                    result.tools
100                                };
101
102                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
103                                info!(
104                                    "[list_tools] Tool list results - MCP ID: {}, number of tools: {}{}",
105                                    self.mcp_id,
106                                    filtered_tools.len(),
107                                    if self.tool_filter.is_enabled() {
108                                        " (filtered)"
109                                    } else {
110                                        ""
111                                    }
112                                );
113
114                                debug!(
115                                    "Proxying list_tools response with {} tools",
116                                    filtered_tools.len()
117                                );
118                                Ok(ListToolsResult {
119                                    tools: filtered_tools,
120                                    next_cursor: result.next_cursor,
121                                })
122                            }
123                            Err(err) => {
124                                error!("Error listing tools: {:?}", err);
125                                Err(ErrorData::internal_error(
126                                    format!("Error listing tools: {err}"),
127                                    None,
128                                ))
129                            }
130                        }
131                    }
132                    _ = context.ct.cancelled() => {
133                        info!("[list_tools] Request canceled - MCP ID: {}", self.mcp_id);
134                        Err(ErrorData::internal_error(
135                            "Request cancelled".to_string(),
136                            None,
137                        ))
138                    }
139                }
140            }
141            None => {
142                // Server doesn't support tools, return empty list
143                warn!("Server doesn't support tools capability");
144                Ok(ListToolsResult::default())
145            }
146        }
147    }
148
149    #[tracing::instrument(skip(self, request, context), fields(
150        mcp_id = %self.mcp_id,
151        tool_name = %request.name,
152        tool_arguments = ?request.arguments,
153    ))]
154    async fn call_tool(
155        &self,
156        request: CallToolRequestParam,
157        context: RequestContext<RoleServer>,
158    ) -> Result<CallToolResult, ErrorData> {
159        // 生成唯一请求 ID 用于追踪
160        let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
161        let start = Instant::now();
162        let start_time = SystemTime::now();
163
164        info!(
165            "[call_tool:{}] Start - Tool: {}, MCP ID: {}, Time: {:?}",
166            request_id, request.name, self.mcp_id, start_time
167        );
168
169        // 首先检查工具是否被过滤
170        if !self.tool_filter.is_allowed(&request.name) {
171            info!(
172                "[call_tool:{}] Tool is filtered - MCP ID: {}, Tool: {}",
173                request_id, self.mcp_id, request.name
174            );
175            return Ok(CallToolResult::error(vec![Content::text(format!(
176                "Tool '{}' is not allowed by filter configuration",
177                request.name
178            ))]));
179        }
180
181        // 原子加载后端连接
182        let inner_guard = self.peer.load();
183        let inner = match inner_guard.as_ref() {
184            Some(inner) => {
185                let transport_closed = inner.peer.is_transport_closed();
186                info!(
187                    "[call_tool:{}] Backend connection exists - transport_closed: {}",
188                    request_id, transport_closed
189                );
190                inner
191            }
192            None => {
193                error!(
194                    "[call_tool:{}] Backend connection unavailable (reconnecting) - MCP ID: {}",
195                    request_id, self.mcp_id
196                );
197                return Ok(CallToolResult::error(vec![Content::text(
198                    "Backend connection is not available, reconnecting...",
199                )]));
200            }
201        };
202
203        // 检查后端连接是否已关闭
204        if inner.peer.is_transport_closed() {
205            error!(
206                "[call_tool:{}] Backend transport is closed - MCP ID: {}",
207                request_id, self.mcp_id
208            );
209            return Ok(CallToolResult::error(vec![Content::text(
210                "Backend connection closed, please retry",
211            )]));
212        }
213
214        // Check if the server has tools capability and forward the request
215        let result = match self.capabilities().tools {
216            Some(_) => {
217                // 记录发送请求到后端的时间点
218                info!(
219                    "[call_tool:{}] Send request to backend... - Tool: {}, Elapsed time: {}ms",
220                    request_id,
221                    request.name,
222                    start.elapsed().as_millis()
223                );
224
225                // 使用 tokio::select! 同时等待取消和结果
226                tokio::select! {
227                    result = inner.peer.call_tool(request.clone()) => {
228                        let elapsed = start.elapsed();
229                        match &result {
230                            Ok(call_result) => {
231                                // 记录工具调用结果,这些结果会通过 SSE 推送给客户端
232                                let is_error = call_result.is_error.unwrap_or(false);
233                                info!(
234                                    "[call_tool:{}] Response received - tool: {}, time taken: {}ms, is_error: {}, MCP ID: {}",
235                                    request_id, request.name, elapsed.as_millis(), is_error, self.mcp_id
236                                );
237                                if is_error {
238                                    // 记录错误响应的内容(用于调试)
239                                    debug!(
240                                        "[call_tool:{}] Error response content: {:?}",
241                                        request_id, call_result.content
242                                    );
243                                }
244                                Ok(call_result.clone())
245                            }
246                            Err(err) => {
247                                error!(
248                                    "[call_tool:{}] Backend returns error - Tool: {}, Time: {}ms, Error: {:?}, MCP ID: {}",
249                                    request_id, request.name, elapsed.as_millis(), err, self.mcp_id
250                                );
251                                // Return an error result instead of propagating the error
252                                Ok(CallToolResult::error(vec![Content::text(format!(
253                                    "Error: {err}"
254                                ))]))
255                            }
256                        }
257                    }
258                    _ = context.ct.cancelled() => {
259                        let elapsed = start.elapsed();
260                        warn!(
261                            "[call_tool:{}] Request canceled - Tool: {}, Time taken: {}ms, MCP ID: {}",
262                            request_id, request.name, elapsed.as_millis(), self.mcp_id
263                        );
264                        Ok(CallToolResult::error(vec![Content::text(
265                            "Request cancelled"
266                        )]))
267                    }
268                }
269            }
270            None => {
271                error!(
272                    "[call_tool:{}] The server does not support tools capability - MCP ID: {}",
273                    request_id, self.mcp_id
274                );
275                Ok(CallToolResult::error(vec![Content::text(
276                    "Server doesn't support tools capability",
277                )]))
278            }
279        };
280
281        let total_elapsed = start.elapsed();
282        info!(
283            "[call_tool:{}] Completed - Tool: {}, total time taken: {}ms",
284            request_id,
285            request.name,
286            total_elapsed.as_millis()
287        );
288        result
289    }
290
291    async fn list_resources(
292        &self,
293        request: Option<PaginatedRequestParam>,
294        context: RequestContext<RoleServer>,
295    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
296        // 原子加载后端连接
297        let inner_guard = self.peer.load();
298        let inner = inner_guard.as_ref().ok_or_else(|| {
299            error!("Backend connection is not available (reconnecting)");
300            ErrorData::internal_error(
301                "Backend connection is not available, reconnecting...".to_string(),
302                None,
303            )
304        })?;
305
306        // 检查后端连接是否已关闭
307        if inner.peer.is_transport_closed() {
308            error!("Backend transport is closed");
309            return Err(ErrorData::internal_error(
310                "Backend connection closed, please retry".to_string(),
311                None,
312            ));
313        }
314
315        // Check if the server has resources capability and forward the request
316        match self.capabilities().resources {
317            Some(_) => {
318                tokio::select! {
319                    result = inner.peer.list_resources(request) => {
320                        match result {
321                            Ok(result) => {
322                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
323                                info!(
324                                    "[list_resources] Resource list results - MCP ID: {}, resource quantity: {}",
325                                    self.mcp_id,
326                                    result.resources.len()
327                                );
328
329                                debug!("Proxying list_resources response");
330                                Ok(result)
331                            }
332                            Err(err) => {
333                                error!("Error listing resources: {:?}", err);
334                                Err(ErrorData::internal_error(
335                                    format!("Error listing resources: {err}"),
336                                    None,
337                                ))
338                            }
339                        }
340                    }
341                    _ = context.ct.cancelled() => {
342                        info!("[list_resources] Request canceled - MCP ID: {}", self.mcp_id);
343                        Err(ErrorData::internal_error(
344                            "Request cancelled".to_string(),
345                            None,
346                        ))
347                    }
348                }
349            }
350            None => {
351                // Server doesn't support resources, return empty list
352                warn!("Server doesn't support resources capability");
353                Ok(rmcp::model::ListResourcesResult::default())
354            }
355        }
356    }
357
358    async fn read_resource(
359        &self,
360        request: rmcp::model::ReadResourceRequestParam,
361        context: RequestContext<RoleServer>,
362    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
363        // 原子加载后端连接
364        let inner_guard = self.peer.load();
365        let inner = inner_guard.as_ref().ok_or_else(|| {
366            error!("Backend connection is not available (reconnecting)");
367            ErrorData::internal_error(
368                "Backend connection is not available, reconnecting...".to_string(),
369                None,
370            )
371        })?;
372
373        // 检查后端连接是否已关闭
374        if inner.peer.is_transport_closed() {
375            error!("Backend transport is closed");
376            return Err(ErrorData::internal_error(
377                "Backend connection closed, please retry".to_string(),
378                None,
379            ));
380        }
381
382        // Check if the server has resources capability and forward the request
383        match self.capabilities().resources {
384            Some(_) => {
385                tokio::select! {
386                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
387                        uri: request.uri.clone(),
388                    }) => {
389                        match result {
390                            Ok(result) => {
391                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
392                                info!(
393                                    "[read_resource] Resource read result - MCP ID: {}, URI: {}",
394                                    self.mcp_id, request.uri
395                                );
396
397                                debug!("Proxying read_resource response for {}", request.uri);
398                                Ok(result)
399                            }
400                            Err(err) => {
401                                error!("Error reading resource: {:?}", err);
402                                Err(ErrorData::internal_error(
403                                    format!("Error reading resource: {err}"),
404                                    None,
405                                ))
406                            }
407                        }
408                    }
409                    _ = context.ct.cancelled() => {
410                        info!("[read_resource] Request canceled - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
411                        Err(ErrorData::internal_error(
412                            "Request cancelled".to_string(),
413                            None,
414                        ))
415                    }
416                }
417            }
418            None => {
419                // Server doesn't support resources, return error
420                error!("Server doesn't support resources capability");
421                Ok(rmcp::model::ReadResourceResult {
422                    contents: Vec::new(),
423                })
424            }
425        }
426    }
427
428    async fn list_resource_templates(
429        &self,
430        request: Option<PaginatedRequestParam>,
431        context: RequestContext<RoleServer>,
432    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
433        // 原子加载后端连接
434        let inner_guard = self.peer.load();
435        let inner = inner_guard.as_ref().ok_or_else(|| {
436            error!("Backend connection is not available (reconnecting)");
437            ErrorData::internal_error(
438                "Backend connection is not available, reconnecting...".to_string(),
439                None,
440            )
441        })?;
442
443        // 检查后端连接是否已关闭
444        if inner.peer.is_transport_closed() {
445            error!("Backend transport is closed");
446            return Err(ErrorData::internal_error(
447                "Backend connection closed, please retry".to_string(),
448                None,
449            ));
450        }
451
452        // Check if the server has resources capability and forward the request
453        match self.capabilities().resources {
454            Some(_) => {
455                tokio::select! {
456                    result = inner.peer.list_resource_templates(request) => {
457                        match result {
458                            Ok(result) => {
459                                debug!("Proxying list_resource_templates response");
460                                Ok(result)
461                            }
462                            Err(err) => {
463                                error!("Error listing resource templates: {:?}", err);
464                                Err(ErrorData::internal_error(
465                                    format!("Error listing resource templates: {err}"),
466                                    None,
467                                ))
468                            }
469                        }
470                    }
471                    _ = context.ct.cancelled() => {
472                        info!("[list_resource_templates] request canceled - MCP ID: {}", self.mcp_id);
473                        Err(ErrorData::internal_error(
474                            "Request cancelled".to_string(),
475                            None,
476                        ))
477                    }
478                }
479            }
480            None => {
481                // Server doesn't support resources, return empty list
482                warn!("Server doesn't support resources capability");
483                Ok(rmcp::model::ListResourceTemplatesResult::default())
484            }
485        }
486    }
487
488    async fn list_prompts(
489        &self,
490        request: Option<PaginatedRequestParam>,
491        context: RequestContext<RoleServer>,
492    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
493        // 原子加载后端连接
494        let inner_guard = self.peer.load();
495        let inner = inner_guard.as_ref().ok_or_else(|| {
496            error!("Backend connection is not available (reconnecting)");
497            ErrorData::internal_error(
498                "Backend connection is not available, reconnecting...".to_string(),
499                None,
500            )
501        })?;
502
503        // 检查后端连接是否已关闭
504        if inner.peer.is_transport_closed() {
505            error!("Backend transport is closed");
506            return Err(ErrorData::internal_error(
507                "Backend connection closed, please retry".to_string(),
508                None,
509            ));
510        }
511
512        // Check if the server has prompts capability and forward the request
513        match self.capabilities().prompts {
514            Some(_) => {
515                tokio::select! {
516                    result = inner.peer.list_prompts(request) => {
517                        match result {
518                            Ok(result) => {
519                                debug!("Proxying list_prompts response");
520                                Ok(result)
521                            }
522                            Err(err) => {
523                                error!("Error listing prompts: {:?}", err);
524                                Err(ErrorData::internal_error(
525                                    format!("Error listing prompts: {err}"),
526                                    None,
527                                ))
528                            }
529                        }
530                    }
531                    _ = context.ct.cancelled() => {
532                        info!("[list_prompts] Request canceled - MCP ID: {}", self.mcp_id);
533                        Err(ErrorData::internal_error(
534                            "Request cancelled".to_string(),
535                            None,
536                        ))
537                    }
538                }
539            }
540            None => {
541                // Server doesn't support prompts, return empty list
542                warn!("Server doesn't support prompts capability");
543                Ok(rmcp::model::ListPromptsResult::default())
544            }
545        }
546    }
547
548    async fn get_prompt(
549        &self,
550        request: rmcp::model::GetPromptRequestParam,
551        context: RequestContext<RoleServer>,
552    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
553        // 原子加载后端连接
554        let inner_guard = self.peer.load();
555        let inner = inner_guard.as_ref().ok_or_else(|| {
556            error!("Backend connection is not available (reconnecting)");
557            ErrorData::internal_error(
558                "Backend connection is not available, reconnecting...".to_string(),
559                None,
560            )
561        })?;
562
563        // 检查后端连接是否已关闭
564        if inner.peer.is_transport_closed() {
565            error!("Backend transport is closed");
566            return Err(ErrorData::internal_error(
567                "Backend connection closed, please retry".to_string(),
568                None,
569            ));
570        }
571
572        // Check if the server has prompts capability and forward the request
573        match self.capabilities().prompts {
574            Some(_) => {
575                tokio::select! {
576                    result = inner.peer.get_prompt(request.clone()) => {
577                        match result {
578                            Ok(result) => {
579                                debug!("Proxying get_prompt response");
580                                Ok(result)
581                            }
582                            Err(err) => {
583                                error!("Error getting prompt: {:?}", err);
584                                Err(ErrorData::internal_error(
585                                    format!("Error getting prompt: {err}"),
586                                    None,
587                                ))
588                            }
589                        }
590                    }
591                    _ = context.ct.cancelled() => {
592                        info!("[get_prompt] Request canceled - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
593                        Err(ErrorData::internal_error(
594                            "Request cancelled".to_string(),
595                            None,
596                        ))
597                    }
598                }
599            }
600            None => {
601                // Server doesn't support prompts, return error
602                warn!("Server doesn't support prompts capability");
603                Ok(rmcp::model::GetPromptResult {
604                    description: None,
605                    messages: Vec::new(),
606                })
607            }
608        }
609    }
610
611    async fn complete(
612        &self,
613        request: rmcp::model::CompleteRequestParam,
614        context: RequestContext<RoleServer>,
615    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
616        // 原子加载后端连接
617        let inner_guard = self.peer.load();
618        let inner = inner_guard.as_ref().ok_or_else(|| {
619            error!("Backend connection is not available (reconnecting)");
620            ErrorData::internal_error(
621                "Backend connection is not available, reconnecting...".to_string(),
622                None,
623            )
624        })?;
625
626        // 检查后端连接是否已关闭
627        if inner.peer.is_transport_closed() {
628            error!("Backend transport is closed");
629            return Err(ErrorData::internal_error(
630                "Backend connection closed, please retry".to_string(),
631                None,
632            ));
633        }
634
635        tokio::select! {
636            result = inner.peer.complete(request) => {
637                match result {
638                    Ok(result) => {
639                        debug!("Proxying complete response");
640                        Ok(result)
641                    }
642                    Err(err) => {
643                        error!("Error completing: {:?}", err);
644                        Err(ErrorData::internal_error(
645                            format!("Error completing: {err}"),
646                            None,
647                        ))
648                    }
649                }
650            }
651            _ = context.ct.cancelled() => {
652                info!("[complete] Request canceled - MCP ID: {}", self.mcp_id);
653                Err(ErrorData::internal_error(
654                    "Request cancelled".to_string(),
655                    None,
656                ))
657            }
658        }
659    }
660
661    async fn on_progress(
662        &self,
663        notification: rmcp::model::ProgressNotificationParam,
664        _context: NotificationContext<RoleServer>,
665    ) {
666        // 原子加载后端连接
667        let inner_guard = self.peer.load();
668        let inner = match inner_guard.as_ref() {
669            Some(inner) => inner,
670            None => {
671                error!("Backend connection is not available, cannot forward progress notification");
672                return;
673            }
674        };
675
676        // 检查后端连接是否已关闭
677        if inner.peer.is_transport_closed() {
678            error!("Backend transport is closed, cannot forward progress notification");
679            return;
680        }
681
682        match inner.peer.notify_progress(notification).await {
683            Ok(_) => {
684                debug!("Proxying progress notification");
685            }
686            Err(err) => {
687                error!("Error notifying progress: {:?}", err);
688            }
689        }
690    }
691
692    async fn on_cancelled(
693        &self,
694        notification: rmcp::model::CancelledNotificationParam,
695        _context: NotificationContext<RoleServer>,
696    ) {
697        // 原子加载后端连接
698        let inner_guard = self.peer.load();
699        let inner = match inner_guard.as_ref() {
700            Some(inner) => inner,
701            None => {
702                error!(
703                    "Backend connection is not available, cannot forward cancelled notification"
704                );
705                return;
706            }
707        };
708
709        // 检查后端连接是否已关闭
710        if inner.peer.is_transport_closed() {
711            error!("Backend transport is closed, cannot forward cancelled notification");
712            return;
713        }
714
715        match inner.peer.notify_cancelled(notification).await {
716            Ok(_) => {
717                debug!("Proxying cancelled notification");
718            }
719            Err(err) => {
720                error!("Error notifying cancelled: {:?}", err);
721            }
722        }
723    }
724}
725
726impl SseHandler {
727    /// 获取 capabilities 的引用,避免 clone
728    #[inline]
729    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
730        &self.cached_info.capabilities
731    }
732
733    /// 创建一个默认的 ServerInfo(用于断开状态)
734    fn default_server_info(mcp_id: &str) -> ServerInfo {
735        warn!(
736            "[SseHandler] Create default ServerInfo - MCP ID: {}",
737            mcp_id
738        );
739        ServerInfo {
740            protocol_version: ProtocolVersion::V_2024_11_05,
741            server_info: Implementation {
742                name: "MCP Proxy".to_string(),
743                version: "0.1.0".to_string(),
744                title: None,
745                website_url: None,
746                icons: None,
747            },
748            instructions: None,
749            capabilities: Default::default(),
750        }
751    }
752
753    /// 从 RunningService 提取 ServerInfo
754    fn extract_server_info(
755        client: &RunningService<RoleClient, ClientInfo>,
756        mcp_id: &str,
757    ) -> ServerInfo {
758        client
759            .peer_info()
760            .map(|peer_info| ServerInfo {
761                protocol_version: peer_info.protocol_version.clone(),
762                server_info: Implementation {
763                    name: peer_info.server_info.name.clone(),
764                    version: peer_info.server_info.version.clone(),
765                    title: None,
766                    website_url: None,
767                    icons: None,
768                },
769                instructions: peer_info.instructions.clone(),
770                capabilities: peer_info.capabilities.clone(),
771            })
772            .unwrap_or_else(|| Self::default_server_info(mcp_id))
773    }
774
775    /// 创建断开状态的 handler(用于初始化)
776    /// 后续通过 swap_backend() 注入实际的后端连接
777    pub fn new_disconnected(
778        mcp_id: String,
779        tool_filter: ToolFilter,
780        default_info: ServerInfo,
781    ) -> Self {
782        info!(
783            "[SseHandler] Create a disconnected handler - MCP ID: {}",
784            mcp_id
785        );
786
787        // 记录过滤器配置
788        if tool_filter.is_enabled() {
789            if let Some(ref allow_list) = tool_filter.allow_tools {
790                info!(
791                    "[SseHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
792                    mcp_id, allow_list
793                );
794            }
795            if let Some(ref deny_list) = tool_filter.deny_tools {
796                info!(
797                    "[SseHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
798                    mcp_id, deny_list
799                );
800            }
801        }
802
803        Self {
804            peer: Arc::new(ArcSwapOption::empty()),
805            cached_info: default_info,
806            mcp_id,
807            tool_filter,
808        }
809    }
810
811    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
812        Self::with_mcp_id(client, "unknown".to_string())
813    }
814
815    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
816        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
817    }
818
819    /// 创建带工具过滤器的 SseHandler(带初始后端连接)
820    pub fn with_tool_filter(
821        client: RunningService<RoleClient, ClientInfo>,
822        mcp_id: String,
823        tool_filter: ToolFilter,
824    ) -> Self {
825        use std::ops::Deref;
826
827        // 提取 ServerInfo
828        let cached_info = Self::extract_server_info(&client, &mcp_id);
829
830        // 克隆 Peer 用于并发请求(无需锁)
831        let peer = client.deref().clone();
832
833        // 记录过滤器配置
834        if tool_filter.is_enabled() {
835            if let Some(ref allow_list) = tool_filter.allow_tools {
836                info!(
837                    "[SseHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
838                    mcp_id, allow_list
839                );
840            }
841            if let Some(ref deny_list) = tool_filter.deny_tools {
842                info!(
843                    "[SseHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
844                    mcp_id, deny_list
845                );
846            }
847        }
848
849        // 创建 PeerInner
850        let inner = PeerInner {
851            peer,
852            _running: Arc::new(client),
853        };
854
855        Self {
856            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
857            cached_info,
858            mcp_id,
859            tool_filter,
860        }
861    }
862
863    /// 原子性替换后端连接
864    /// - Some(client): 设置新的后端连接
865    /// - None: 标记后端断开
866    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
867        use std::ops::Deref;
868
869        match new_client {
870            Some(client) => {
871                let peer = client.deref().clone();
872                let inner = PeerInner {
873                    peer,
874                    _running: Arc::new(client),
875                };
876                self.peer.store(Some(Arc::new(inner)));
877                info!(
878                    "[SseHandler] Backend connection updated - MCP ID: {}",
879                    self.mcp_id
880                );
881            }
882            None => {
883                self.peer.store(None);
884                info!(
885                    "[SseHandler] Backend connection disconnected - MCP ID: {}",
886                    self.mcp_id
887                );
888            }
889        }
890    }
891
892    /// 检查后端是否可用(快速检查,不发送请求)
893    pub fn is_backend_available(&self) -> bool {
894        let inner_guard = self.peer.load();
895        match inner_guard.as_ref() {
896            Some(inner) => !inner.peer.is_transport_closed(),
897            None => false,
898        }
899    }
900
901    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
902    pub async fn is_mcp_server_ready(&self) -> bool {
903        !self.is_terminated_async().await
904    }
905
906    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
907    pub fn is_terminated(&self) -> bool {
908        !self.is_backend_available()
909    }
910
911    /// 异步检查后端连接是否已断开(会发送验证请求)
912    pub async fn is_terminated_async(&self) -> bool {
913        // 原子加载后端连接
914        let inner_guard = self.peer.load();
915        let inner = match inner_guard.as_ref() {
916            Some(inner) => inner,
917            None => return true,
918        };
919
920        // 快速检查 transport 状态
921        if inner.peer.is_transport_closed() {
922            return true;
923        }
924
925        // 通过发送轻量级请求来验证连接
926        match inner.peer.list_tools(None).await {
927            Ok(_) => {
928                debug!("Backend connection status check: OK");
929                false
930            }
931            Err(e) => {
932                info!("Backend connection status check: Disconnected, reason: {e}");
933                true
934            }
935        }
936    }
937
938    /// 获取 MCP ID
939    pub fn mcp_id(&self) -> &str {
940        &self.mcp_id
941    }
942
943    /// Update backend from an SseClientConnection
944    ///
945    /// This method allows updating the backend connection using the high-level
946    /// `SseClientConnection` type, which is more convenient than the raw
947    /// `RunningService` type.
948    ///
949    /// # Arguments
950    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
951    pub fn swap_backend_from_connection(&self, conn: Option<crate::client::SseClientConnection>) {
952        match conn {
953            Some(c) => {
954                let running = c.into_running_service();
955                self.swap_backend(Some(running));
956            }
957            None => {
958                self.swap_backend(None);
959            }
960        }
961    }
962}