mcp_streamable_proxy/
proxy_handler.rs

1use tracing::{debug, info, warn, error};
2/**
3 * Create a local SSE server that proxies requests to a stdio MCP server.
4 */
5use rmcp::{
6    ErrorData, RoleClient, RoleServer, ServerHandler,
7    model::{
8        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
9        PaginatedRequestParam, ServerInfo,
10    },
11    service::{NotificationContext, Peer, RequestContext, RunningService},
12};
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use arc_swap::ArcSwapOption;
16pub use mcp_common::ToolFilter;
17
18/// 包装后端连接和运行服务
19/// 用于 ArcSwap 热替换
20#[derive(Debug)]
21struct PeerInner {
22    /// Peer 用于发送请求
23    peer: Peer<RoleClient>,
24    /// 保持 RunningService 的所有权,确保服务生命周期
25    #[allow(dead_code)]
26    _running: Arc<RunningService<RoleClient, ClientInfo>>,
27}
28
29/// A proxy handler that forwards requests to a client based on the server's capabilities
30/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
31///
32/// **增强功能**:
33/// - 后端版本控制:每次 swap_backend 都会递增版本号
34/// - 支持 Session 版本跟踪:配合 ProxyAwareSessionManager 使用
35#[derive(Clone, Debug)]
36pub struct ProxyHandler {
37    /// 后端连接(ArcSwap 支持无锁原子替换)
38    /// None 表示后端断开/重连中
39    peer: Arc<ArcSwapOption<PeerInner>>,
40    /// 缓存的服务器信息(保持不变,重连后应一致)
41    cached_info: ServerInfo,
42    /// MCP ID 用于日志记录
43    mcp_id: String,
44    /// 工具过滤配置
45    tool_filter: ToolFilter,
46    /// 后端版本号(每次 swap_backend 递增)
47    /// 用于跟踪后端连接变化,使旧 session 失效
48    backend_version: Arc<AtomicU64>,
49}
50
51impl ServerHandler for ProxyHandler {
52    fn get_info(&self) -> ServerInfo {
53        self.cached_info.clone()
54    }
55
56    #[tracing::instrument(skip(self, request, context), fields(
57        mcp_id = %self.mcp_id,
58        request = ?request,
59    ))]
60    async fn list_tools(
61        &self,
62        request: Option<PaginatedRequestParam>,
63        context: RequestContext<RoleServer>,
64    ) -> Result<ListToolsResult, ErrorData> {
65        // 原子加载后端连接
66        let inner_guard = self.peer.load();
67        let inner = inner_guard.as_ref().ok_or_else(|| {
68            error!("Backend connection is not available (reconnecting)");
69            ErrorData::internal_error(
70                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
71                None,
72            )
73        })?;
74
75        // 检查后端连接是否已关闭
76        if inner.peer.is_transport_closed() {
77            error!("Backend transport is closed");
78            return Err(ErrorData::internal_error(
79                "Backend connection closed, please retry".to_string(),
80                None,
81            ));
82        }
83
84        // Check if the server has tools capability and forward the request
85        match self.capabilities().tools {
86            Some(_) => {
87                // 使用 tokio::select! 同时等待取消和结果
88                tokio::select! {
89                    result = inner.peer.list_tools(request) => {
90                        match result {
91                            Ok(result) => {
92                                // 根据过滤配置过滤工具列表
93                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
94                                    result
95                                        .tools
96                                        .into_iter()
97                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
98                                        .collect()
99                                } else {
100                                    result.tools
101                                };
102
103                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
104                                info!(
105                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
106                                    self.mcp_id,
107                                    filtered_tools.len(),
108                                    if self.tool_filter.is_enabled() {
109                                        " (已过滤)"
110                                    } else {
111                                        ""
112                                    }
113                                );
114
115                                debug!(
116                                    "Proxying list_tools response with {} tools",
117                                    filtered_tools.len()
118                                );
119                                Ok(ListToolsResult {
120                                    tools: filtered_tools,
121                                    next_cursor: result.next_cursor,
122                                    meta: result.meta, // rmcp 0.12 新增字段
123                                })
124                            }
125                            Err(err) => {
126                                error!("Error listing tools: {:?}", err);
127                                Err(ErrorData::internal_error(
128                                    format!("Error listing tools: {err}"),
129                                    None,
130                                ))
131                            }
132                        }
133                    }
134                    _ = context.ct.cancelled() => {
135                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
136                        Err(ErrorData::internal_error(
137                            "Request cancelled".to_string(),
138                            None,
139                        ))
140                    }
141                }
142            }
143            None => {
144                // Server doesn't support tools, return empty list
145                warn!("Server doesn't support tools capability");
146                Ok(ListToolsResult::default())
147            }
148        }
149    }
150
151    #[tracing::instrument(skip(self, request, context), fields(
152        mcp_id = %self.mcp_id,
153        tool_name = %request.name,
154        tool_arguments = ?request.arguments,
155    ))]
156    async fn call_tool(
157        &self,
158        request: CallToolRequestParam,
159        context: RequestContext<RoleServer>,
160    ) -> Result<CallToolResult, ErrorData> {
161        // 首先检查工具是否被过滤
162        if !self.tool_filter.is_allowed(&request.name) {
163            info!(
164                "[call_tool] 工具被过滤 - MCP ID: {}, 工具: {}",
165                self.mcp_id, request.name
166            );
167            return Ok(CallToolResult::error(vec![Content::text(format!(
168                "Tool '{}' is not allowed by filter configuration",
169                request.name
170            ))]));
171        }
172
173        // 原子加载后端连接
174        let inner_guard = self.peer.load();
175        let inner = match inner_guard.as_ref() {
176            Some(inner) => inner,
177            None => {
178                error!("Backend connection is not available (reconnecting)");
179                return Ok(CallToolResult::error(vec![Content::text(
180                    "Backend connection is not available, reconnecting..."
181                )]));
182            }
183        };
184
185        // 检查后端连接是否已关闭
186        if inner.peer.is_transport_closed() {
187            error!("Backend transport is closed");
188            return Ok(CallToolResult::error(vec![Content::text(
189                "Backend connection closed, please retry",
190            )]));
191        }
192
193        // Check if the server has tools capability and forward the request
194        match self.capabilities().tools {
195            Some(_) => {
196                // 使用 tokio::select! 同时等待取消和结果
197                tokio::select! {
198                    result = inner.peer.call_tool(request.clone()) => {
199                        match result {
200                            Ok(result) => {
201                                // 记录工具调用结果,这些结果会通过 SSE 推送给客户端
202                                info!(
203                                    "[call_tool] 工具调用成功 - MCP ID: {}, 工具: {}",
204                                    self.mcp_id, request.name
205                                );
206
207                                debug!("Tool call succeeded");
208                                Ok(result)
209                            }
210                            Err(err) => {
211                                error!("Error calling tool: {:?}", err);
212                                // Return an error result instead of propagating the error
213                                Ok(CallToolResult::error(vec![Content::text(format!(
214                                    "Error: {err}"
215                                ))]))
216                            }
217                        }
218                    }
219                    _ = context.ct.cancelled() => {
220                        info!(
221                            "[call_tool] 请求被取消 - MCP ID: {}, 工具: {}",
222                            self.mcp_id, request.name
223                        );
224                        Ok(CallToolResult::error(vec![Content::text(
225                            "Request cancelled"
226                        )]))
227                    }
228                }
229            }
230            None => {
231                error!("Server doesn't support tools capability");
232                Ok(CallToolResult::error(vec![Content::text(
233                    "Server doesn't support tools capability",
234                )]))
235            }
236        }
237    }
238
239    async fn list_resources(
240        &self,
241        request: Option<PaginatedRequestParam>,
242        context: RequestContext<RoleServer>,
243    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
244        // 原子加载后端连接
245        let inner_guard = self.peer.load();
246        let inner = inner_guard.as_ref().ok_or_else(|| {
247            error!("Backend connection is not available (reconnecting)");
248            ErrorData::internal_error(
249                "Backend connection is not available, reconnecting...".to_string(),
250                None,
251            )
252        })?;
253
254        // 检查后端连接是否已关闭
255        if inner.peer.is_transport_closed() {
256            error!("Backend transport is closed");
257            return Err(ErrorData::internal_error(
258                "Backend connection closed, please retry".to_string(),
259                None,
260            ));
261        }
262
263        // Check if the server has resources capability and forward the request
264        match self.capabilities().resources {
265            Some(_) => {
266                tokio::select! {
267                    result = inner.peer.list_resources(request) => {
268                        match result {
269                            Ok(result) => {
270                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
271                                info!(
272                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
273                                    self.mcp_id,
274                                    result.resources.len()
275                                );
276
277                                debug!("Proxying list_resources response");
278                                Ok(result)
279                            }
280                            Err(err) => {
281                                error!("Error listing resources: {:?}", err);
282                                Err(ErrorData::internal_error(
283                                    format!("Error listing resources: {err}"),
284                                    None,
285                                ))
286                            }
287                        }
288                    }
289                    _ = context.ct.cancelled() => {
290                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
291                        Err(ErrorData::internal_error(
292                            "Request cancelled".to_string(),
293                            None,
294                        ))
295                    }
296                }
297            }
298            None => {
299                // Server doesn't support resources, return empty list
300                warn!("Server doesn't support resources capability");
301                Ok(rmcp::model::ListResourcesResult::default())
302            }
303        }
304    }
305
306    async fn read_resource(
307        &self,
308        request: rmcp::model::ReadResourceRequestParam,
309        context: RequestContext<RoleServer>,
310    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
311        // 原子加载后端连接
312        let inner_guard = self.peer.load();
313        let inner = inner_guard.as_ref().ok_or_else(|| {
314            error!("Backend connection is not available (reconnecting)");
315            ErrorData::internal_error(
316                "Backend connection is not available, reconnecting...".to_string(),
317                None,
318            )
319        })?;
320
321        // 检查后端连接是否已关闭
322        if inner.peer.is_transport_closed() {
323            error!("Backend transport is closed");
324            return Err(ErrorData::internal_error(
325                "Backend connection closed, please retry".to_string(),
326                None,
327            ));
328        }
329
330        // Check if the server has resources capability and forward the request
331        match self.capabilities().resources {
332            Some(_) => {
333                tokio::select! {
334                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
335                        uri: request.uri.clone(),
336                    }) => {
337                        match result {
338                            Ok(result) => {
339                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
340                                info!(
341                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
342                                    self.mcp_id, request.uri
343                                );
344
345                                debug!("Proxying read_resource response for {}", request.uri);
346                                Ok(result)
347                            }
348                            Err(err) => {
349                                error!("Error reading resource: {:?}", err);
350                                Err(ErrorData::internal_error(
351                                    format!("Error reading resource: {err}"),
352                                    None,
353                                ))
354                            }
355                        }
356                    }
357                    _ = context.ct.cancelled() => {
358                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
359                        Err(ErrorData::internal_error(
360                            "Request cancelled".to_string(),
361                            None,
362                        ))
363                    }
364                }
365            }
366            None => {
367                // Server doesn't support resources, return error
368                error!("Server doesn't support resources capability");
369                Ok(rmcp::model::ReadResourceResult {
370                    contents: Vec::new(),
371                })
372            }
373        }
374    }
375
376    async fn list_resource_templates(
377        &self,
378        request: Option<PaginatedRequestParam>,
379        context: RequestContext<RoleServer>,
380    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
381        // 原子加载后端连接
382        let inner_guard = self.peer.load();
383        let inner = inner_guard.as_ref().ok_or_else(|| {
384            error!("Backend connection is not available (reconnecting)");
385            ErrorData::internal_error(
386                "Backend connection is not available, reconnecting...".to_string(),
387                None,
388            )
389        })?;
390
391        // 检查后端连接是否已关闭
392        if inner.peer.is_transport_closed() {
393            error!("Backend transport is closed");
394            return Err(ErrorData::internal_error(
395                "Backend connection closed, please retry".to_string(),
396                None,
397            ));
398        }
399
400        // Check if the server has resources capability and forward the request
401        match self.capabilities().resources {
402            Some(_) => {
403                tokio::select! {
404                    result = inner.peer.list_resource_templates(request) => {
405                        match result {
406                            Ok(result) => {
407                                debug!("Proxying list_resource_templates response");
408                                Ok(result)
409                            }
410                            Err(err) => {
411                                error!("Error listing resource templates: {:?}", err);
412                                Err(ErrorData::internal_error(
413                                    format!("Error listing resource templates: {err}"),
414                                    None,
415                                ))
416                            }
417                        }
418                    }
419                    _ = context.ct.cancelled() => {
420                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
421                        Err(ErrorData::internal_error(
422                            "Request cancelled".to_string(),
423                            None,
424                        ))
425                    }
426                }
427            }
428            None => {
429                // Server doesn't support resources, return empty list
430                warn!("Server doesn't support resources capability");
431                Ok(rmcp::model::ListResourceTemplatesResult::default())
432            }
433        }
434    }
435
436    async fn list_prompts(
437        &self,
438        request: Option<PaginatedRequestParam>,
439        context: RequestContext<RoleServer>,
440    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
441        // 原子加载后端连接
442        let inner_guard = self.peer.load();
443        let inner = inner_guard.as_ref().ok_or_else(|| {
444            error!("Backend connection is not available (reconnecting)");
445            ErrorData::internal_error(
446                "Backend connection is not available, reconnecting...".to_string(),
447                None,
448            )
449        })?;
450
451        // 检查后端连接是否已关闭
452        if inner.peer.is_transport_closed() {
453            error!("Backend transport is closed");
454            return Err(ErrorData::internal_error(
455                "Backend connection closed, please retry".to_string(),
456                None,
457            ));
458        }
459
460        // Check if the server has prompts capability and forward the request
461        match self.capabilities().prompts {
462            Some(_) => {
463                tokio::select! {
464                    result = inner.peer.list_prompts(request) => {
465                        match result {
466                            Ok(result) => {
467                                debug!("Proxying list_prompts response");
468                                Ok(result)
469                            }
470                            Err(err) => {
471                                error!("Error listing prompts: {:?}", err);
472                                Err(ErrorData::internal_error(
473                                    format!("Error listing prompts: {err}"),
474                                    None,
475                                ))
476                            }
477                        }
478                    }
479                    _ = context.ct.cancelled() => {
480                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
481                        Err(ErrorData::internal_error(
482                            "Request cancelled".to_string(),
483                            None,
484                        ))
485                    }
486                }
487            }
488            None => {
489                // Server doesn't support prompts, return empty list
490                warn!("Server doesn't support prompts capability");
491                Ok(rmcp::model::ListPromptsResult::default())
492            }
493        }
494    }
495
496    async fn get_prompt(
497        &self,
498        request: rmcp::model::GetPromptRequestParam,
499        context: RequestContext<RoleServer>,
500    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
501        // 原子加载后端连接
502        let inner_guard = self.peer.load();
503        let inner = inner_guard.as_ref().ok_or_else(|| {
504            error!("Backend connection is not available (reconnecting)");
505            ErrorData::internal_error(
506                "Backend connection is not available, reconnecting...".to_string(),
507                None,
508            )
509        })?;
510
511        // 检查后端连接是否已关闭
512        if inner.peer.is_transport_closed() {
513            error!("Backend transport is closed");
514            return Err(ErrorData::internal_error(
515                "Backend connection closed, please retry".to_string(),
516                None,
517            ));
518        }
519
520        // Check if the server has prompts capability and forward the request
521        match self.capabilities().prompts {
522            Some(_) => {
523                tokio::select! {
524                    result = inner.peer.get_prompt(request.clone()) => {
525                        match result {
526                            Ok(result) => {
527                                debug!("Proxying get_prompt response");
528                                Ok(result)
529                            }
530                            Err(err) => {
531                                error!("Error getting prompt: {:?}", err);
532                                Err(ErrorData::internal_error(
533                                    format!("Error getting prompt: {err}"),
534                                    None,
535                                ))
536                            }
537                        }
538                    }
539                    _ = context.ct.cancelled() => {
540                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
541                        Err(ErrorData::internal_error(
542                            "Request cancelled".to_string(),
543                            None,
544                        ))
545                    }
546                }
547            }
548            None => {
549                // Server doesn't support prompts, return error
550                warn!("Server doesn't support prompts capability");
551                Ok(rmcp::model::GetPromptResult {
552                    description: None,
553                    messages: Vec::new(),
554                })
555            }
556        }
557    }
558
559    async fn complete(
560        &self,
561        request: rmcp::model::CompleteRequestParam,
562        context: RequestContext<RoleServer>,
563    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
564        // 原子加载后端连接
565        let inner_guard = self.peer.load();
566        let inner = inner_guard.as_ref().ok_or_else(|| {
567            error!("Backend connection is not available (reconnecting)");
568            ErrorData::internal_error(
569                "Backend connection is not available, reconnecting...".to_string(),
570                None,
571            )
572        })?;
573
574        // 检查后端连接是否已关闭
575        if inner.peer.is_transport_closed() {
576            error!("Backend transport is closed");
577            return Err(ErrorData::internal_error(
578                "Backend connection closed, please retry".to_string(),
579                None,
580            ));
581        }
582
583        tokio::select! {
584            result = inner.peer.complete(request) => {
585                match result {
586                    Ok(result) => {
587                        debug!("Proxying complete response");
588                        Ok(result)
589                    }
590                    Err(err) => {
591                        error!("Error completing: {:?}", err);
592                        Err(ErrorData::internal_error(
593                            format!("Error completing: {err}"),
594                            None,
595                        ))
596                    }
597                }
598            }
599            _ = context.ct.cancelled() => {
600                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
601                Err(ErrorData::internal_error(
602                    "Request cancelled".to_string(),
603                    None,
604                ))
605            }
606        }
607    }
608
609    async fn on_progress(
610        &self,
611        notification: rmcp::model::ProgressNotificationParam,
612        _context: NotificationContext<RoleServer>,
613    ) {
614        // 原子加载后端连接
615        let inner_guard = self.peer.load();
616        let inner = match inner_guard.as_ref() {
617            Some(inner) => inner,
618            None => {
619                error!("Backend connection is not available, cannot forward progress notification");
620                return;
621            }
622        };
623
624        // 检查后端连接是否已关闭
625        if inner.peer.is_transport_closed() {
626            error!("Backend transport is closed, cannot forward progress notification");
627            return;
628        }
629
630        match inner.peer.notify_progress(notification).await {
631            Ok(_) => {
632                debug!("Proxying progress notification");
633            }
634            Err(err) => {
635                error!("Error notifying progress: {:?}", err);
636            }
637        }
638    }
639
640    async fn on_cancelled(
641        &self,
642        notification: rmcp::model::CancelledNotificationParam,
643        _context: NotificationContext<RoleServer>,
644    ) {
645        // 原子加载后端连接
646        let inner_guard = self.peer.load();
647        let inner = match inner_guard.as_ref() {
648            Some(inner) => inner,
649            None => {
650                error!("Backend connection is not available, cannot forward cancelled notification");
651                return;
652            }
653        };
654
655        // 检查后端连接是否已关闭
656        if inner.peer.is_transport_closed() {
657            error!("Backend transport is closed, cannot forward cancelled notification");
658            return;
659        }
660
661        match inner.peer.notify_cancelled(notification).await {
662            Ok(_) => {
663                debug!("Proxying cancelled notification");
664            }
665            Err(err) => {
666                error!("Error notifying cancelled: {:?}", err);
667            }
668        }
669    }
670}
671
672impl ProxyHandler {
673    /// 获取 capabilities 的引用,避免 clone
674    #[inline]
675    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
676        &self.cached_info.capabilities
677    }
678
679    /// 创建一个默认的 ServerInfo(用于断开状态)
680    fn default_server_info(mcp_id: &str) -> ServerInfo {
681        warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
682        ServerInfo {
683            protocol_version: Default::default(),
684            server_info: Implementation {
685                name: "MCP Proxy".to_string(),
686                version: "0.1.0".to_string(),
687                title: None,
688                website_url: None,
689                icons: None,
690            },
691            instructions: None,
692            capabilities: Default::default(),
693        }
694    }
695
696    /// 从 RunningService 提取 ServerInfo
697    fn extract_server_info(client: &RunningService<RoleClient, ClientInfo>, mcp_id: &str) -> ServerInfo {
698        client.peer_info().map(|peer_info| ServerInfo {
699            protocol_version: peer_info.protocol_version.clone(),
700            server_info: Implementation {
701                name: peer_info.server_info.name.clone(),
702                version: peer_info.server_info.version.clone(),
703                title: None,
704                website_url: None,
705                icons: None,
706            },
707            instructions: peer_info.instructions.clone(),
708            capabilities: peer_info.capabilities.clone(),
709        }).unwrap_or_else(|| Self::default_server_info(mcp_id))
710    }
711
712    /// 创建断开状态的 handler(用于初始化)
713    /// 后续通过 swap_backend() 注入实际的后端连接
714    pub fn new_disconnected(mcp_id: String, tool_filter: ToolFilter, default_info: ServerInfo) -> Self {
715        info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
716
717        // 记录过滤器配置
718        if tool_filter.is_enabled() {
719            if let Some(ref allow_list) = tool_filter.allow_tools {
720                info!(
721                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
722                    mcp_id, allow_list
723                );
724            }
725            if let Some(ref deny_list) = tool_filter.deny_tools {
726                info!(
727                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
728                    mcp_id, deny_list
729                );
730            }
731        }
732
733        Self {
734            peer: Arc::new(ArcSwapOption::empty()),
735            cached_info: default_info,
736            mcp_id,
737            tool_filter,
738            backend_version: Arc::new(AtomicU64::new(0)), // 断开状态版本为 0
739        }
740    }
741
742    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
743        Self::with_mcp_id(client, "unknown".to_string())
744    }
745
746    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
747        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
748    }
749
750    /// 创建带工具过滤器的 ProxyHandler(带初始后端连接)
751    pub fn with_tool_filter(
752        client: RunningService<RoleClient, ClientInfo>,
753        mcp_id: String,
754        tool_filter: ToolFilter,
755    ) -> Self {
756        use std::ops::Deref;
757
758        // 提取 ServerInfo
759        let cached_info = Self::extract_server_info(&client, &mcp_id);
760
761        // 克隆 Peer 用于并发请求(无需锁)
762        let peer = client.deref().clone();
763
764        // 记录过滤器配置
765        if tool_filter.is_enabled() {
766            if let Some(ref allow_list) = tool_filter.allow_tools {
767                info!(
768                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
769                    mcp_id, allow_list
770                );
771            }
772            if let Some(ref deny_list) = tool_filter.deny_tools {
773                info!(
774                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
775                    mcp_id, deny_list
776                );
777            }
778        }
779
780        // 创建 PeerInner
781        let inner = PeerInner {
782            peer,
783            _running: Arc::new(client),
784        };
785
786        Self {
787            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
788            cached_info,
789            mcp_id,
790            tool_filter,
791            backend_version: Arc::new(AtomicU64::new(1)), // 初始版本为 1
792        }
793    }
794
795    /// 原子性替换后端连接
796    /// - Some(client): 设置新的后端连接
797    /// - None: 标记后端断开
798    ///
799    /// **版本控制**:每次调用都会递增 backend_version,使旧 session 失效
800    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
801        use std::ops::Deref;
802
803        match new_client {
804            Some(client) => {
805                let peer = client.deref().clone();
806                let inner = PeerInner {
807                    peer,
808                    _running: Arc::new(client),
809                };
810                self.peer.store(Some(Arc::new(inner)));
811                info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
812            }
813            None => {
814                self.peer.store(None);
815                info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
816            }
817        }
818
819        // 关键:递增版本号,使所有旧 session 失效
820        let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
821        info!(
822            "[ProxyHandler] 后端版本更新: {} - MCP ID: {}",
823            new_version, self.mcp_id
824        );
825    }
826
827    /// 检查后端是否可用(快速检查,不发送请求)
828    pub fn is_backend_available(&self) -> bool {
829        let inner_guard = self.peer.load();
830        match inner_guard.as_ref() {
831            Some(inner) => !inner.peer.is_transport_closed(),
832            None => false,
833        }
834    }
835
836    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
837    pub async fn is_mcp_server_ready(&self) -> bool {
838        !self.is_terminated_async().await
839    }
840
841    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
842    pub fn is_terminated(&self) -> bool {
843        !self.is_backend_available()
844    }
845
846    /// 异步检查后端连接是否已断开(会发送验证请求)
847    pub async fn is_terminated_async(&self) -> bool {
848        // 原子加载后端连接
849        let inner_guard = self.peer.load();
850        let inner = match inner_guard.as_ref() {
851            Some(inner) => inner,
852            None => return true,
853        };
854
855        // 快速检查 transport 状态
856        if inner.peer.is_transport_closed() {
857            return true;
858        }
859
860        // 通过发送轻量级请求来验证连接
861        match inner.peer.list_tools(None).await {
862            Ok(_) => {
863                debug!("后端连接状态检查: 正常");
864                false
865            }
866            Err(e) => {
867                info!("后端连接状态检查: 已断开,原因: {e}");
868                true
869            }
870        }
871    }
872
873    /// 获取 MCP ID
874    pub fn mcp_id(&self) -> &str {
875        &self.mcp_id
876    }
877
878    /// 获取当前后端版本号
879    ///
880    /// 版本号用于跟踪后端连接变化:
881    /// - 0: 断开状态
882    /// - 1+: 已连接,每次 swap_backend 递增
883    ///
884    /// **用途**:配合 ProxyAwareSessionManager 实现 session 版本控制
885    pub fn get_backend_version(&self) -> u64 {
886        self.backend_version.load(Ordering::SeqCst)
887    }
888}