mcp_stdio_proxy/proxy/
proxy_handler.rs

1use tracing::{debug, info, warn, error};
2/**
3 * Create a local SSE server that proxies requests to a stdio MCP server.
4 */
5use rmcp::{
6    ErrorData, RoleClient, RoleServer, ServerHandler,
7    model::{
8        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
9        PaginatedRequestParam, ServerInfo,
10    },
11    service::{NotificationContext, Peer, RequestContext, RunningService},
12};
13use std::collections::HashSet;
14use std::sync::Arc;
15use arc_swap::ArcSwapOption;
16
17/// 工具过滤配置
18#[derive(Clone, Debug, Default)]
19pub struct ToolFilter {
20    /// 白名单(只允许这些工具)
21    pub allow_tools: Option<HashSet<String>>,
22    /// 黑名单(排除这些工具)
23    pub deny_tools: Option<HashSet<String>>,
24}
25
26impl ToolFilter {
27    /// 创建白名单过滤器
28    pub fn allow(tools: Vec<String>) -> Self {
29        Self {
30            allow_tools: Some(tools.into_iter().collect()),
31            deny_tools: None,
32        }
33    }
34
35    /// 创建黑名单过滤器
36    pub fn deny(tools: Vec<String>) -> Self {
37        Self {
38            allow_tools: None,
39            deny_tools: Some(tools.into_iter().collect()),
40        }
41    }
42
43    /// 检查工具是否被允许
44    pub fn is_allowed(&self, tool_name: &str) -> bool {
45        // 白名单模式:只有在白名单中的工具才被允许
46        if let Some(ref allow_list) = self.allow_tools {
47            return allow_list.contains(tool_name);
48        }
49        // 黑名单模式:不在黑名单中的工具都被允许
50        if let Some(ref deny_list) = self.deny_tools {
51            return !deny_list.contains(tool_name);
52        }
53        // 无过滤:全部允许
54        true
55    }
56
57    /// 检查是否启用了过滤
58    pub fn is_enabled(&self) -> bool {
59        self.allow_tools.is_some() || self.deny_tools.is_some()
60    }
61}
62
63/// 包装后端连接和运行服务
64/// 用于 ArcSwap 热替换
65#[derive(Debug)]
66struct PeerInner {
67    /// Peer 用于发送请求
68    peer: Peer<RoleClient>,
69    /// 保持 RunningService 的所有权,确保服务生命周期
70    #[allow(dead_code)]
71    _running: Arc<RunningService<RoleClient, ClientInfo>>,
72}
73
74/// A proxy handler that forwards requests to a client based on the server's capabilities
75/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
76#[derive(Clone, Debug)]
77pub struct ProxyHandler {
78    /// 后端连接(ArcSwap 支持无锁原子替换)
79    /// None 表示后端断开/重连中
80    peer: Arc<ArcSwapOption<PeerInner>>,
81    /// 缓存的服务器信息(保持不变,重连后应一致)
82    cached_info: ServerInfo,
83    /// MCP ID 用于日志记录
84    mcp_id: String,
85    /// 工具过滤配置
86    tool_filter: ToolFilter,
87}
88
89impl ServerHandler for ProxyHandler {
90    fn get_info(&self) -> ServerInfo {
91        self.cached_info.clone()
92    }
93
94    #[tracing::instrument(skip(self, request, context), fields(
95        mcp_id = %self.mcp_id,
96        request = ?request,
97    ))]
98    async fn list_tools(
99        &self,
100        request: Option<PaginatedRequestParam>,
101        context: RequestContext<RoleServer>,
102    ) -> Result<ListToolsResult, ErrorData> {
103        // 原子加载后端连接
104        let inner_guard = self.peer.load();
105        let inner = inner_guard.as_ref().ok_or_else(|| {
106            error!("Backend connection is not available (reconnecting)");
107            ErrorData::internal_error(
108                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
109                None,
110            )
111        })?;
112
113        // 检查后端连接是否已关闭
114        if inner.peer.is_transport_closed() {
115            error!("Backend transport is closed");
116            return Err(ErrorData::internal_error(
117                "Backend connection closed, please retry".to_string(),
118                None,
119            ));
120        }
121
122        // Check if the server has tools capability and forward the request
123        match self.capabilities().tools {
124            Some(_) => {
125                // 使用 tokio::select! 同时等待取消和结果
126                tokio::select! {
127                    result = inner.peer.list_tools(request) => {
128                        match result {
129                            Ok(result) => {
130                                // 根据过滤配置过滤工具列表
131                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
132                                    result
133                                        .tools
134                                        .into_iter()
135                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
136                                        .collect()
137                                } else {
138                                    result.tools
139                                };
140
141                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
142                                info!(
143                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
144                                    self.mcp_id,
145                                    filtered_tools.len(),
146                                    if self.tool_filter.is_enabled() {
147                                        " (已过滤)"
148                                    } else {
149                                        ""
150                                    }
151                                );
152
153                                debug!(
154                                    "Proxying list_tools response with {} tools",
155                                    filtered_tools.len()
156                                );
157                                Ok(ListToolsResult {
158                                    tools: filtered_tools,
159                                    next_cursor: result.next_cursor,
160                                })
161                            }
162                            Err(err) => {
163                                error!("Error listing tools: {:?}", err);
164                                Err(ErrorData::internal_error(
165                                    format!("Error listing tools: {err}"),
166                                    None,
167                                ))
168                            }
169                        }
170                    }
171                    _ = context.ct.cancelled() => {
172                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
173                        Err(ErrorData::internal_error(
174                            "Request cancelled".to_string(),
175                            None,
176                        ))
177                    }
178                }
179            }
180            None => {
181                // Server doesn't support tools, return empty list
182                warn!("Server doesn't support tools capability");
183                Ok(ListToolsResult::default())
184            }
185        }
186    }
187
188    #[tracing::instrument(skip(self, request, context), fields(
189        mcp_id = %self.mcp_id,
190        tool_name = %request.name,
191        tool_arguments = ?request.arguments,
192    ))]
193    async fn call_tool(
194        &self,
195        request: CallToolRequestParam,
196        context: RequestContext<RoleServer>,
197    ) -> Result<CallToolResult, ErrorData> {
198        // 首先检查工具是否被过滤
199        if !self.tool_filter.is_allowed(&request.name) {
200            info!(
201                "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
202                self.mcp_id, request.name
203            );
204            return Ok(CallToolResult::error(vec![Content::text(format!(
205                "Tool '{}' is not allowed by filter configuration",
206                request.name
207            ))]));
208        }
209
210        // 原子加载后端连接
211        let inner_guard = self.peer.load();
212        let inner = match inner_guard.as_ref() {
213            Some(inner) => inner,
214            None => {
215                error!("Backend connection is not available (reconnecting)");
216                return Ok(CallToolResult::error(vec![Content::text(
217                    "Backend connection is not available, reconnecting..."
218                )]));
219            }
220        };
221
222        // 检查后端连接是否已关闭
223        if inner.peer.is_transport_closed() {
224            error!("Backend transport is closed");
225            return Ok(CallToolResult::error(vec![Content::text(
226                "Backend connection closed, please retry",
227            )]));
228        }
229
230        // Check if the server has tools capability and forward the request
231        match self.capabilities().tools {
232            Some(_) => {
233                // 使用 tokio::select! 同时等待取消和结果
234                tokio::select! {
235                    result = inner.peer.call_tool(request.clone()) => {
236                        match result {
237                            Ok(result) => {
238                                // 记录工具调用结果,这些结果会通过 SSE 推送给客户端
239                                info!(
240                                    "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}",
241                                    self.mcp_id, request.name
242                                );
243
244                                debug!("Tool call succeeded");
245                                Ok(result)
246                            }
247                            Err(err) => {
248                                error!("Error calling tool: {:?}", err);
249                                // Return an error result instead of propagating the error
250                                Ok(CallToolResult::error(vec![Content::text(format!(
251                                    "Error: {err}"
252                                ))]))
253                            }
254                        }
255                    }
256                    _ = context.ct.cancelled() => {
257                        info!(
258                            "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}",
259                            self.mcp_id, request.name
260                        );
261                        Ok(CallToolResult::error(vec![Content::text(
262                            "Request cancelled"
263                        )]))
264                    }
265                }
266            }
267            None => {
268                error!("Server doesn't support tools capability");
269                Ok(CallToolResult::error(vec![Content::text(
270                    "Server doesn't support tools capability",
271                )]))
272            }
273        }
274    }
275
276    async fn list_resources(
277        &self,
278        request: Option<PaginatedRequestParam>,
279        context: RequestContext<RoleServer>,
280    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
281        // 原子加载后端连接
282        let inner_guard = self.peer.load();
283        let inner = inner_guard.as_ref().ok_or_else(|| {
284            error!("Backend connection is not available (reconnecting)");
285            ErrorData::internal_error(
286                "Backend connection is not available, reconnecting...".to_string(),
287                None,
288            )
289        })?;
290
291        // 检查后端连接是否已关闭
292        if inner.peer.is_transport_closed() {
293            error!("Backend transport is closed");
294            return Err(ErrorData::internal_error(
295                "Backend connection closed, please retry".to_string(),
296                None,
297            ));
298        }
299
300        // Check if the server has resources capability and forward the request
301        match self.capabilities().resources {
302            Some(_) => {
303                tokio::select! {
304                    result = inner.peer.list_resources(request) => {
305                        match result {
306                            Ok(result) => {
307                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
308                                info!(
309                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
310                                    self.mcp_id,
311                                    result.resources.len()
312                                );
313
314                                debug!("Proxying list_resources response");
315                                Ok(result)
316                            }
317                            Err(err) => {
318                                error!("Error listing resources: {:?}", err);
319                                Err(ErrorData::internal_error(
320                                    format!("Error listing resources: {err}"),
321                                    None,
322                                ))
323                            }
324                        }
325                    }
326                    _ = context.ct.cancelled() => {
327                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
328                        Err(ErrorData::internal_error(
329                            "Request cancelled".to_string(),
330                            None,
331                        ))
332                    }
333                }
334            }
335            None => {
336                // Server doesn't support resources, return empty list
337                warn!("Server doesn't support resources capability");
338                Ok(rmcp::model::ListResourcesResult::default())
339            }
340        }
341    }
342
343    async fn read_resource(
344        &self,
345        request: rmcp::model::ReadResourceRequestParam,
346        context: RequestContext<RoleServer>,
347    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
348        // 原子加载后端连接
349        let inner_guard = self.peer.load();
350        let inner = inner_guard.as_ref().ok_or_else(|| {
351            error!("Backend connection is not available (reconnecting)");
352            ErrorData::internal_error(
353                "Backend connection is not available, reconnecting...".to_string(),
354                None,
355            )
356        })?;
357
358        // 检查后端连接是否已关闭
359        if inner.peer.is_transport_closed() {
360            error!("Backend transport is closed");
361            return Err(ErrorData::internal_error(
362                "Backend connection closed, please retry".to_string(),
363                None,
364            ));
365        }
366
367        // Check if the server has resources capability and forward the request
368        match self.capabilities().resources {
369            Some(_) => {
370                tokio::select! {
371                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
372                        uri: request.uri.clone(),
373                    }) => {
374                        match result {
375                            Ok(result) => {
376                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
377                                info!(
378                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
379                                    self.mcp_id, request.uri
380                                );
381
382                                debug!("Proxying read_resource response for {}", request.uri);
383                                Ok(result)
384                            }
385                            Err(err) => {
386                                error!("Error reading resource: {:?}", err);
387                                Err(ErrorData::internal_error(
388                                    format!("Error reading resource: {err}"),
389                                    None,
390                                ))
391                            }
392                        }
393                    }
394                    _ = context.ct.cancelled() => {
395                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
396                        Err(ErrorData::internal_error(
397                            "Request cancelled".to_string(),
398                            None,
399                        ))
400                    }
401                }
402            }
403            None => {
404                // Server doesn't support resources, return error
405                error!("Server doesn't support resources capability");
406                Ok(rmcp::model::ReadResourceResult {
407                    contents: Vec::new(),
408                })
409            }
410        }
411    }
412
413    async fn list_resource_templates(
414        &self,
415        request: Option<PaginatedRequestParam>,
416        context: RequestContext<RoleServer>,
417    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
418        // 原子加载后端连接
419        let inner_guard = self.peer.load();
420        let inner = inner_guard.as_ref().ok_or_else(|| {
421            error!("Backend connection is not available (reconnecting)");
422            ErrorData::internal_error(
423                "Backend connection is not available, reconnecting...".to_string(),
424                None,
425            )
426        })?;
427
428        // 检查后端连接是否已关闭
429        if inner.peer.is_transport_closed() {
430            error!("Backend transport is closed");
431            return Err(ErrorData::internal_error(
432                "Backend connection closed, please retry".to_string(),
433                None,
434            ));
435        }
436
437        // Check if the server has resources capability and forward the request
438        match self.capabilities().resources {
439            Some(_) => {
440                tokio::select! {
441                    result = inner.peer.list_resource_templates(request) => {
442                        match result {
443                            Ok(result) => {
444                                debug!("Proxying list_resource_templates response");
445                                Ok(result)
446                            }
447                            Err(err) => {
448                                error!("Error listing resource templates: {:?}", err);
449                                Err(ErrorData::internal_error(
450                                    format!("Error listing resource templates: {err}"),
451                                    None,
452                                ))
453                            }
454                        }
455                    }
456                    _ = context.ct.cancelled() => {
457                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
458                        Err(ErrorData::internal_error(
459                            "Request cancelled".to_string(),
460                            None,
461                        ))
462                    }
463                }
464            }
465            None => {
466                // Server doesn't support resources, return empty list
467                warn!("Server doesn't support resources capability");
468                Ok(rmcp::model::ListResourceTemplatesResult::default())
469            }
470        }
471    }
472
473    async fn list_prompts(
474        &self,
475        request: Option<PaginatedRequestParam>,
476        context: RequestContext<RoleServer>,
477    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
478        // 原子加载后端连接
479        let inner_guard = self.peer.load();
480        let inner = inner_guard.as_ref().ok_or_else(|| {
481            error!("Backend connection is not available (reconnecting)");
482            ErrorData::internal_error(
483                "Backend connection is not available, reconnecting...".to_string(),
484                None,
485            )
486        })?;
487
488        // 检查后端连接是否已关闭
489        if inner.peer.is_transport_closed() {
490            error!("Backend transport is closed");
491            return Err(ErrorData::internal_error(
492                "Backend connection closed, please retry".to_string(),
493                None,
494            ));
495        }
496
497        // Check if the server has prompts capability and forward the request
498        match self.capabilities().prompts {
499            Some(_) => {
500                tokio::select! {
501                    result = inner.peer.list_prompts(request) => {
502                        match result {
503                            Ok(result) => {
504                                debug!("Proxying list_prompts response");
505                                Ok(result)
506                            }
507                            Err(err) => {
508                                error!("Error listing prompts: {:?}", err);
509                                Err(ErrorData::internal_error(
510                                    format!("Error listing prompts: {err}"),
511                                    None,
512                                ))
513                            }
514                        }
515                    }
516                    _ = context.ct.cancelled() => {
517                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
518                        Err(ErrorData::internal_error(
519                            "Request cancelled".to_string(),
520                            None,
521                        ))
522                    }
523                }
524            }
525            None => {
526                // Server doesn't support prompts, return empty list
527                warn!("Server doesn't support prompts capability");
528                Ok(rmcp::model::ListPromptsResult::default())
529            }
530        }
531    }
532
533    async fn get_prompt(
534        &self,
535        request: rmcp::model::GetPromptRequestParam,
536        context: RequestContext<RoleServer>,
537    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
538        // 原子加载后端连接
539        let inner_guard = self.peer.load();
540        let inner = inner_guard.as_ref().ok_or_else(|| {
541            error!("Backend connection is not available (reconnecting)");
542            ErrorData::internal_error(
543                "Backend connection is not available, reconnecting...".to_string(),
544                None,
545            )
546        })?;
547
548        // 检查后端连接是否已关闭
549        if inner.peer.is_transport_closed() {
550            error!("Backend transport is closed");
551            return Err(ErrorData::internal_error(
552                "Backend connection closed, please retry".to_string(),
553                None,
554            ));
555        }
556
557        // Check if the server has prompts capability and forward the request
558        match self.capabilities().prompts {
559            Some(_) => {
560                tokio::select! {
561                    result = inner.peer.get_prompt(request.clone()) => {
562                        match result {
563                            Ok(result) => {
564                                debug!("Proxying get_prompt response");
565                                Ok(result)
566                            }
567                            Err(err) => {
568                                error!("Error getting prompt: {:?}", err);
569                                Err(ErrorData::internal_error(
570                                    format!("Error getting prompt: {err}"),
571                                    None,
572                                ))
573                            }
574                        }
575                    }
576                    _ = context.ct.cancelled() => {
577                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
578                        Err(ErrorData::internal_error(
579                            "Request cancelled".to_string(),
580                            None,
581                        ))
582                    }
583                }
584            }
585            None => {
586                // Server doesn't support prompts, return error
587                warn!("Server doesn't support prompts capability");
588                Ok(rmcp::model::GetPromptResult {
589                    description: None,
590                    messages: Vec::new(),
591                })
592            }
593        }
594    }
595
596    async fn complete(
597        &self,
598        request: rmcp::model::CompleteRequestParam,
599        context: RequestContext<RoleServer>,
600    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
601        // 原子加载后端连接
602        let inner_guard = self.peer.load();
603        let inner = inner_guard.as_ref().ok_or_else(|| {
604            error!("Backend connection is not available (reconnecting)");
605            ErrorData::internal_error(
606                "Backend connection is not available, reconnecting...".to_string(),
607                None,
608            )
609        })?;
610
611        // 检查后端连接是否已关闭
612        if inner.peer.is_transport_closed() {
613            error!("Backend transport is closed");
614            return Err(ErrorData::internal_error(
615                "Backend connection closed, please retry".to_string(),
616                None,
617            ));
618        }
619
620        tokio::select! {
621            result = inner.peer.complete(request) => {
622                match result {
623                    Ok(result) => {
624                        debug!("Proxying complete response");
625                        Ok(result)
626                    }
627                    Err(err) => {
628                        error!("Error completing: {:?}", err);
629                        Err(ErrorData::internal_error(
630                            format!("Error completing: {err}"),
631                            None,
632                        ))
633                    }
634                }
635            }
636            _ = context.ct.cancelled() => {
637                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
638                Err(ErrorData::internal_error(
639                    "Request cancelled".to_string(),
640                    None,
641                ))
642            }
643        }
644    }
645
646    async fn on_progress(
647        &self,
648        notification: rmcp::model::ProgressNotificationParam,
649        _context: NotificationContext<RoleServer>,
650    ) {
651        // 原子加载后端连接
652        let inner_guard = self.peer.load();
653        let inner = match inner_guard.as_ref() {
654            Some(inner) => inner,
655            None => {
656                error!("Backend connection is not available, cannot forward progress notification");
657                return;
658            }
659        };
660
661        // 检查后端连接是否已关闭
662        if inner.peer.is_transport_closed() {
663            error!("Backend transport is closed, cannot forward progress notification");
664            return;
665        }
666
667        match inner.peer.notify_progress(notification).await {
668            Ok(_) => {
669                debug!("Proxying progress notification");
670            }
671            Err(err) => {
672                error!("Error notifying progress: {:?}", err);
673            }
674        }
675    }
676
677    async fn on_cancelled(
678        &self,
679        notification: rmcp::model::CancelledNotificationParam,
680        _context: NotificationContext<RoleServer>,
681    ) {
682        // 原子加载后端连接
683        let inner_guard = self.peer.load();
684        let inner = match inner_guard.as_ref() {
685            Some(inner) => inner,
686            None => {
687                error!("Backend connection is not available, cannot forward cancelled notification");
688                return;
689            }
690        };
691
692        // 检查后端连接是否已关闭
693        if inner.peer.is_transport_closed() {
694            error!("Backend transport is closed, cannot forward cancelled notification");
695            return;
696        }
697
698        match inner.peer.notify_cancelled(notification).await {
699            Ok(_) => {
700                debug!("Proxying cancelled notification");
701            }
702            Err(err) => {
703                error!("Error notifying cancelled: {:?}", err);
704            }
705        }
706    }
707}
708
709impl ProxyHandler {
710    /// 获取 capabilities 的引用,避免 clone
711    #[inline]
712    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
713        &self.cached_info.capabilities
714    }
715
716    /// 创建一个默认的 ServerInfo(用于断开状态)
717    fn default_server_info(mcp_id: &str) -> ServerInfo {
718        warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
719        ServerInfo {
720            protocol_version: Default::default(),
721            server_info: Implementation {
722                name: "MCP Proxy".to_string(),
723                version: "0.1.0".to_string(),
724                title: None,
725                website_url: None,
726                icons: None,
727            },
728            instructions: None,
729            capabilities: Default::default(),
730        }
731    }
732
733    /// 从 RunningService 提取 ServerInfo
734    fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
735        client.peer_info().map(|peer_info| ServerInfo {
736            protocol_version: peer_info.protocol_version.clone(),
737            server_info: Implementation {
738                name: peer_info.server_info.name.clone(),
739                version: peer_info.server_info.version.clone(),
740                title: None,
741                website_url: None,
742                icons: None,
743            },
744            instructions: peer_info.instructions.clone(),
745            capabilities: peer_info.capabilities.clone(),
746        }).unwrap_or_else(|| Self::default_server_info(mcp_id))
747    }
748
749    /// 创建断开状态的 handler(用于初始化)
750    /// 后续通过 swap_backend() 注入实际的后端连接
751    pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
752        info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
753
754        // 记录过滤器配置
755        if tool_filter.is_enabled() {
756            if let Some(ref allow_list) = tool_filter.allow_tools {
757                info!(
758                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
759                    mcp_id, allow_list
760                );
761            }
762            if let Some(ref deny_list) = tool_filter.deny_tools {
763                info!(
764                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
765                    mcp_id, deny_list
766                );
767            }
768        }
769
770        Self {
771            peer: Arc::new(ArcSwapOption::empty()),
772            cached_info: default_info,
773            mcp_id,
774            tool_filter,
775        }
776    }
777
778    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
779        Self::with_mcp_id(client, "unknown".to_string())
780    }
781
782    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
783        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
784    }
785
786    /// 创建带工具过滤器的 ProxyHandler(带初始后端连接)
787    pub fn with_tool_filter(
788        client: RunningService<RoleClient, ClientInfo>,
789        mcp_id: String,
790        tool_filter: ToolFilter,
791    ) -> Self {
792        use std::ops::Deref;
793
794        // 提取 ServerInfo
795        let cached_info = Self::extract_server_info(&client, &mcp_id);
796
797        // 克隆 Peer 用于并发请求(无需锁)
798        let peer = client.deref().clone();
799
800        // 记录过滤器配置
801        if tool_filter.is_enabled() {
802            if let Some(ref allow_list) = tool_filter.allow_tools {
803                info!(
804                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
805                    mcp_id, allow_list
806                );
807            }
808            if let Some(ref deny_list) = tool_filter.deny_tools {
809                info!(
810                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
811                    mcp_id, deny_list
812                );
813            }
814        }
815
816        // 创建 PeerInner
817        let inner = PeerInner {
818            peer,
819            _running: Arc::new(client),
820        };
821
822        Self {
823            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
824            cached_info,
825            mcp_id,
826            tool_filter,
827        }
828    }
829
830    /// 原子性替换后端连接
831    /// - Some(client): 设置新的后端连接
832    /// - None: 标记后端断开
833    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
834        use std::ops::Deref;
835
836        match new_client {
837            Some(client) => {
838                let peer = client.deref().clone();
839                let inner = PeerInner {
840                    peer,
841                    _running: Arc::new(client),
842                };
843                self.peer.store(Some(Arc::new(inner)));
844                info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
845            }
846            None => {
847                self.peer.store(None);
848                info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
849            }
850        }
851    }
852
853    /// 检查后端是否可用(快速检查,不发送请求)
854    pub fn is_backend_available(&self) -> bool {
855        let inner_guard = self.peer.load();
856        match inner_guard.as_ref() {
857            Some(inner) => !inner.peer.is_transport_closed(),
858            None => false,
859        }
860    }
861
862    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
863    pub async fn is_mcp_server_ready(&self) -> bool {
864        !self.is_terminated_async().await
865    }
866
867    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
868    pub fn is_terminated(&self) -> bool {
869        !self.is_backend_available()
870    }
871
872    /// 异步检查后端连接是否已断开(会发送验证请求)
873    pub async fn is_terminated_async(&self) -> bool {
874        // 原子加载后端连接
875        let inner_guard = self.peer.load();
876        let inner = match inner_guard.as_ref() {
877            Some(inner) => inner,
878            None => return true,
879        };
880
881        // 快速检查 transport 状态
882        if inner.peer.is_transport_closed() {
883            return true;
884        }
885
886        // 通过发送轻量级请求来验证连接
887        match inner.peer.list_tools(None).await {
888            Ok(_) => {
889                debug!("后端连接状态检查: 正常");
890                false
891            }
892            Err(e) => {
893                info!("后端连接状态检查: 已断开,原因: {e}");
894                true
895            }
896        }
897    }
898
899    /// 获取 MCP ID
900    pub fn mcp_id(&self) -> &str {
901        &self.mcp_id
902    }
903}