mcp_streamable_proxy/
proxy_handler.rs

1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3/**
4 * Create a local SSE server that proxies requests to a stdio MCP server.
5 */
6use rmcp::{
7    ErrorData, RoleClient, RoleServer, ServerHandler,
8    model::{
9        CallToolRequestParam, CallToolResult, ClientInfo, Content, Implementation, ListToolsResult,
10        PaginatedRequestParam, ServerInfo,
11    },
12    service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Instant;
17use tracing::{debug, error, info, warn};
18
19/// 全局请求计数器,用于生成唯一的请求 ID
20static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22/// 包装后端连接和运行服务
23/// 用于 ArcSwap 热替换
24#[derive(Debug)]
25struct PeerInner {
26    /// Peer 用于发送请求
27    peer: Peer<RoleClient>,
28    /// 保持 RunningService 的所有权,确保服务生命周期
29    #[allow(dead_code)]
30    _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33/// A proxy handler that forwards requests to a client based on the server's capabilities
34/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
35///
36/// **增强功能**:
37/// - 后端版本控制:每次 swap_backend 都会递增版本号
38/// - 支持 Session 版本跟踪:配合 ProxyAwareSessionManager 使用
39#[derive(Clone, Debug)]
40pub struct ProxyHandler {
41    /// 后端连接(ArcSwap 支持无锁原子替换)
42    /// None 表示后端断开/重连中
43    peer: Arc<ArcSwapOption<PeerInner>>,
44    /// 缓存的服务器信息(保持不变,重连后应一致)
45    cached_info: ServerInfo,
46    /// MCP ID 用于日志记录
47    mcp_id: String,
48    /// 工具过滤配置
49    tool_filter: ToolFilter,
50    /// 后端版本号(每次 swap_backend 递增)
51    /// 用于跟踪后端连接变化,使旧 session 失效
52    backend_version: Arc<AtomicU64>,
53}
54
55impl ServerHandler for ProxyHandler {
56    fn get_info(&self) -> ServerInfo {
57        self.cached_info.clone()
58    }
59
60    #[tracing::instrument(skip(self, request, context), fields(
61        mcp_id = %self.mcp_id,
62        request = ?request,
63    ))]
64    async fn list_tools(
65        &self,
66        request: Option<PaginatedRequestParam>,
67        context: RequestContext<RoleServer>,
68    ) -> Result<ListToolsResult, ErrorData> {
69        // 原子加载后端连接
70        let inner_guard = self.peer.load();
71        let inner = inner_guard.as_ref().ok_or_else(|| {
72            error!("Backend connection is not available (reconnecting)");
73            ErrorData::internal_error(
74                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
75                None,
76            )
77        })?;
78
79        // 检查后端连接是否已关闭
80        if inner.peer.is_transport_closed() {
81            error!("Backend transport is closed");
82            return Err(ErrorData::internal_error(
83                "Backend connection closed, please retry".to_string(),
84                None,
85            ));
86        }
87
88        // Check if the server has tools capability and forward the request
89        match self.capabilities().tools {
90            Some(_) => {
91                // 使用 tokio::select! 同时等待取消和结果
92                tokio::select! {
93                    result = inner.peer.list_tools(request) => {
94                        match result {
95                            Ok(result) => {
96                                // 根据过滤配置过滤工具列表
97                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
98                                    result
99                                        .tools
100                                        .into_iter()
101                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
102                                        .collect()
103                                } else {
104                                    result.tools
105                                };
106
107                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
108                                info!(
109                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
110                                    self.mcp_id,
111                                    filtered_tools.len(),
112                                    if self.tool_filter.is_enabled() {
113                                        " (已过滤)"
114                                    } else {
115                                        ""
116                                    }
117                                );
118
119                                debug!(
120                                    "Proxying list_tools response with {} tools",
121                                    filtered_tools.len()
122                                );
123                                Ok(ListToolsResult {
124                                    tools: filtered_tools,
125                                    next_cursor: result.next_cursor,
126                                    meta: result.meta, // rmcp 0.12 新增字段
127                                })
128                            }
129                            Err(err) => {
130                                error!("Error listing tools: {:?}", err);
131                                Err(ErrorData::internal_error(
132                                    format!("Error listing tools: {err}"),
133                                    None,
134                                ))
135                            }
136                        }
137                    }
138                    _ = context.ct.cancelled() => {
139                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
140                        Err(ErrorData::internal_error(
141                            "Request cancelled".to_string(),
142                            None,
143                        ))
144                    }
145                }
146            }
147            None => {
148                // Server doesn't support tools, return empty list
149                warn!("Server doesn't support tools capability");
150                Ok(ListToolsResult::default())
151            }
152        }
153    }
154
155    #[tracing::instrument(skip(self, request, context), fields(
156        mcp_id = %self.mcp_id,
157        tool_name = %request.name,
158        tool_arguments = ?request.arguments,
159    ))]
160    async fn call_tool(
161        &self,
162        request: CallToolRequestParam,
163        context: RequestContext<RoleServer>,
164    ) -> Result<CallToolResult, ErrorData> {
165        // 生成唯一请求 ID 用于追踪
166        let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
167        let start = Instant::now();
168
169        info!(
170            "[call_tool:{}] 开始 - 工具: {}, MCP ID: {}",
171            request_id, request.name, self.mcp_id
172        );
173
174        // 首先检查工具是否被过滤
175        if !self.tool_filter.is_allowed(&request.name) {
176            info!(
177                "[call_tool:{}] 工具被过滤 - MCP ID: {}, 工具: {}",
178                request_id, self.mcp_id, request.name
179            );
180            return Ok(CallToolResult::error(vec![Content::text(format!(
181                "Tool '{}' is not allowed by filter configuration",
182                request.name
183            ))]));
184        }
185
186        // 原子加载后端连接
187        let inner_guard = self.peer.load();
188        let inner = match inner_guard.as_ref() {
189            Some(inner) => {
190                let transport_closed = inner.peer.is_transport_closed();
191                info!(
192                    "[call_tool:{}] 后端连接存在 - transport_closed: {}",
193                    request_id, transport_closed
194                );
195                inner
196            }
197            None => {
198                error!(
199                    "[call_tool:{}] 后端连接不可用 (正在重连) - MCP ID: {}",
200                    request_id, self.mcp_id
201                );
202                return Ok(CallToolResult::error(vec![Content::text(
203                    "Backend connection is not available, reconnecting...",
204                )]));
205            }
206        };
207
208        // 检查后端连接是否已关闭
209        if inner.peer.is_transport_closed() {
210            error!(
211                "[call_tool:{}] 后端 transport 已关闭 - MCP ID: {}",
212                request_id, self.mcp_id
213            );
214            return Ok(CallToolResult::error(vec![Content::text(
215                "Backend connection closed, please retry",
216            )]));
217        }
218
219        // Check if the server has tools capability and forward the request
220        let result = match self.capabilities().tools {
221            Some(_) => {
222                // 记录发送请求到后端的时间点
223                info!(
224                    "[call_tool:{}] 发送请求到后端... - 工具: {}, 已耗时: {}ms",
225                    request_id,
226                    request.name,
227                    start.elapsed().as_millis()
228                );
229
230                // 创建后端调用的 Future,使用 pin 固定
231                let call_future = inner.peer.call_tool(request.clone());
232                tokio::pin!(call_future);
233
234                // 等待心跳间隔(30秒)
235                const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
236                let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
237                // 跳过第一次立即触发
238                heartbeat_interval.tick().await;
239
240                // 使用循环 + select! 实现等待心跳
241                let call_result = loop {
242                    tokio::select! {
243                        // biased 确保按顺序优先检查,避免心跳检查抢占实际结果
244                        biased;
245
246                        result = &mut call_future => {
247                            break result;
248                        }
249                        _ = context.ct.cancelled() => {
250                            let elapsed = start.elapsed();
251                            warn!(
252                                "[call_tool:{}] 请求被取消 - 工具: {}, 耗时: {}ms, MCP ID: {}",
253                                request_id, request.name, elapsed.as_millis(), self.mcp_id
254                            );
255                            return Ok(CallToolResult::error(vec![Content::text(
256                                "Request cancelled"
257                            )]));
258                        }
259                        _ = heartbeat_interval.tick() => {
260                            // 定期打印等待日志,证明 mcp-proxy 在等待后端响应
261                            let elapsed = start.elapsed();
262                            let transport_closed = inner.peer.is_transport_closed();
263                            info!(
264                                "[call_tool:{}] 等待后端响应中... - 工具: {}, 已等待: {}ms, \
265                                transport_closed: {}, MCP ID: {}",
266                                request_id, request.name, elapsed.as_millis(),
267                                transport_closed, self.mcp_id
268                            );
269                        }
270                    }
271                };
272
273                let elapsed = start.elapsed();
274                match &call_result {
275                    Ok(call_result) => {
276                        // 记录工具调用结果
277                        let is_error = call_result.is_error.unwrap_or(false);
278                        info!(
279                            "[call_tool:{}] 收到响应 - 工具: {}, 耗时: {}ms, is_error: {}, MCP ID: {}",
280                            request_id,
281                            request.name,
282                            elapsed.as_millis(),
283                            is_error,
284                            self.mcp_id
285                        );
286                        if is_error {
287                            debug!(
288                                "[call_tool:{}] 错误响应内容: {:?}",
289                                request_id, call_result.content
290                            );
291                        }
292                        Ok(call_result.clone())
293                    }
294                    Err(err) => {
295                        error!(
296                            "[call_tool:{}] 后端返回错误 - 工具: {}, 耗时: {}ms, 错误: {:?}, MCP ID: {}",
297                            request_id,
298                            request.name,
299                            elapsed.as_millis(),
300                            err,
301                            self.mcp_id
302                        );
303                        // Return an error result instead of propagating the error
304                        Ok(CallToolResult::error(vec![Content::text(format!(
305                            "Error: {err}"
306                        ))]))
307                    }
308                }
309            }
310            None => {
311                error!(
312                    "[call_tool:{}] 服务器不支持 tools capability - MCP ID: {}",
313                    request_id, self.mcp_id
314                );
315                Ok(CallToolResult::error(vec![Content::text(
316                    "Server doesn't support tools capability",
317                )]))
318            }
319        };
320
321        let total_elapsed = start.elapsed();
322        info!(
323            "[call_tool:{}] 完成 - 工具: {}, 总耗时: {}ms",
324            request_id,
325            request.name,
326            total_elapsed.as_millis()
327        );
328        result
329    }
330
331    async fn list_resources(
332        &self,
333        request: Option<PaginatedRequestParam>,
334        context: RequestContext<RoleServer>,
335    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
336        // 原子加载后端连接
337        let inner_guard = self.peer.load();
338        let inner = inner_guard.as_ref().ok_or_else(|| {
339            error!("Backend connection is not available (reconnecting)");
340            ErrorData::internal_error(
341                "Backend connection is not available, reconnecting...".to_string(),
342                None,
343            )
344        })?;
345
346        // 检查后端连接是否已关闭
347        if inner.peer.is_transport_closed() {
348            error!("Backend transport is closed");
349            return Err(ErrorData::internal_error(
350                "Backend connection closed, please retry".to_string(),
351                None,
352            ));
353        }
354
355        // Check if the server has resources capability and forward the request
356        match self.capabilities().resources {
357            Some(_) => {
358                tokio::select! {
359                    result = inner.peer.list_resources(request) => {
360                        match result {
361                            Ok(result) => {
362                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
363                                info!(
364                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
365                                    self.mcp_id,
366                                    result.resources.len()
367                                );
368
369                                debug!("Proxying list_resources response");
370                                Ok(result)
371                            }
372                            Err(err) => {
373                                error!("Error listing resources: {:?}", err);
374                                Err(ErrorData::internal_error(
375                                    format!("Error listing resources: {err}"),
376                                    None,
377                                ))
378                            }
379                        }
380                    }
381                    _ = context.ct.cancelled() => {
382                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
383                        Err(ErrorData::internal_error(
384                            "Request cancelled".to_string(),
385                            None,
386                        ))
387                    }
388                }
389            }
390            None => {
391                // Server doesn't support resources, return empty list
392                warn!("Server doesn't support resources capability");
393                Ok(rmcp::model::ListResourcesResult::default())
394            }
395        }
396    }
397
398    async fn read_resource(
399        &self,
400        request: rmcp::model::ReadResourceRequestParam,
401        context: RequestContext<RoleServer>,
402    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
403        // 原子加载后端连接
404        let inner_guard = self.peer.load();
405        let inner = inner_guard.as_ref().ok_or_else(|| {
406            error!("Backend connection is not available (reconnecting)");
407            ErrorData::internal_error(
408                "Backend connection is not available, reconnecting...".to_string(),
409                None,
410            )
411        })?;
412
413        // 检查后端连接是否已关闭
414        if inner.peer.is_transport_closed() {
415            error!("Backend transport is closed");
416            return Err(ErrorData::internal_error(
417                "Backend connection closed, please retry".to_string(),
418                None,
419            ));
420        }
421
422        // Check if the server has resources capability and forward the request
423        match self.capabilities().resources {
424            Some(_) => {
425                tokio::select! {
426                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParam {
427                        uri: request.uri.clone(),
428                    }) => {
429                        match result {
430                            Ok(result) => {
431                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
432                                info!(
433                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
434                                    self.mcp_id, request.uri
435                                );
436
437                                debug!("Proxying read_resource response for {}", request.uri);
438                                Ok(result)
439                            }
440                            Err(err) => {
441                                error!("Error reading resource: {:?}", err);
442                                Err(ErrorData::internal_error(
443                                    format!("Error reading resource: {err}"),
444                                    None,
445                                ))
446                            }
447                        }
448                    }
449                    _ = context.ct.cancelled() => {
450                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
451                        Err(ErrorData::internal_error(
452                            "Request cancelled".to_string(),
453                            None,
454                        ))
455                    }
456                }
457            }
458            None => {
459                // Server doesn't support resources, return error
460                error!("Server doesn't support resources capability");
461                Ok(rmcp::model::ReadResourceResult {
462                    contents: Vec::new(),
463                })
464            }
465        }
466    }
467
468    async fn list_resource_templates(
469        &self,
470        request: Option<PaginatedRequestParam>,
471        context: RequestContext<RoleServer>,
472    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
473        // 原子加载后端连接
474        let inner_guard = self.peer.load();
475        let inner = inner_guard.as_ref().ok_or_else(|| {
476            error!("Backend connection is not available (reconnecting)");
477            ErrorData::internal_error(
478                "Backend connection is not available, reconnecting...".to_string(),
479                None,
480            )
481        })?;
482
483        // 检查后端连接是否已关闭
484        if inner.peer.is_transport_closed() {
485            error!("Backend transport is closed");
486            return Err(ErrorData::internal_error(
487                "Backend connection closed, please retry".to_string(),
488                None,
489            ));
490        }
491
492        // Check if the server has resources capability and forward the request
493        match self.capabilities().resources {
494            Some(_) => {
495                tokio::select! {
496                    result = inner.peer.list_resource_templates(request) => {
497                        match result {
498                            Ok(result) => {
499                                debug!("Proxying list_resource_templates response");
500                                Ok(result)
501                            }
502                            Err(err) => {
503                                error!("Error listing resource templates: {:?}", err);
504                                Err(ErrorData::internal_error(
505                                    format!("Error listing resource templates: {err}"),
506                                    None,
507                                ))
508                            }
509                        }
510                    }
511                    _ = context.ct.cancelled() => {
512                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
513                        Err(ErrorData::internal_error(
514                            "Request cancelled".to_string(),
515                            None,
516                        ))
517                    }
518                }
519            }
520            None => {
521                // Server doesn't support resources, return empty list
522                warn!("Server doesn't support resources capability");
523                Ok(rmcp::model::ListResourceTemplatesResult::default())
524            }
525        }
526    }
527
528    async fn list_prompts(
529        &self,
530        request: Option<PaginatedRequestParam>,
531        context: RequestContext<RoleServer>,
532    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
533        // 原子加载后端连接
534        let inner_guard = self.peer.load();
535        let inner = inner_guard.as_ref().ok_or_else(|| {
536            error!("Backend connection is not available (reconnecting)");
537            ErrorData::internal_error(
538                "Backend connection is not available, reconnecting...".to_string(),
539                None,
540            )
541        })?;
542
543        // 检查后端连接是否已关闭
544        if inner.peer.is_transport_closed() {
545            error!("Backend transport is closed");
546            return Err(ErrorData::internal_error(
547                "Backend connection closed, please retry".to_string(),
548                None,
549            ));
550        }
551
552        // Check if the server has prompts capability and forward the request
553        match self.capabilities().prompts {
554            Some(_) => {
555                tokio::select! {
556                    result = inner.peer.list_prompts(request) => {
557                        match result {
558                            Ok(result) => {
559                                debug!("Proxying list_prompts response");
560                                Ok(result)
561                            }
562                            Err(err) => {
563                                error!("Error listing prompts: {:?}", err);
564                                Err(ErrorData::internal_error(
565                                    format!("Error listing prompts: {err}"),
566                                    None,
567                                ))
568                            }
569                        }
570                    }
571                    _ = context.ct.cancelled() => {
572                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
573                        Err(ErrorData::internal_error(
574                            "Request cancelled".to_string(),
575                            None,
576                        ))
577                    }
578                }
579            }
580            None => {
581                // Server doesn't support prompts, return empty list
582                warn!("Server doesn't support prompts capability");
583                Ok(rmcp::model::ListPromptsResult::default())
584            }
585        }
586    }
587
588    async fn get_prompt(
589        &self,
590        request: rmcp::model::GetPromptRequestParam,
591        context: RequestContext<RoleServer>,
592    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
593        // 原子加载后端连接
594        let inner_guard = self.peer.load();
595        let inner = inner_guard.as_ref().ok_or_else(|| {
596            error!("Backend connection is not available (reconnecting)");
597            ErrorData::internal_error(
598                "Backend connection is not available, reconnecting...".to_string(),
599                None,
600            )
601        })?;
602
603        // 检查后端连接是否已关闭
604        if inner.peer.is_transport_closed() {
605            error!("Backend transport is closed");
606            return Err(ErrorData::internal_error(
607                "Backend connection closed, please retry".to_string(),
608                None,
609            ));
610        }
611
612        // Check if the server has prompts capability and forward the request
613        match self.capabilities().prompts {
614            Some(_) => {
615                tokio::select! {
616                    result = inner.peer.get_prompt(request.clone()) => {
617                        match result {
618                            Ok(result) => {
619                                debug!("Proxying get_prompt response");
620                                Ok(result)
621                            }
622                            Err(err) => {
623                                error!("Error getting prompt: {:?}", err);
624                                Err(ErrorData::internal_error(
625                                    format!("Error getting prompt: {err}"),
626                                    None,
627                                ))
628                            }
629                        }
630                    }
631                    _ = context.ct.cancelled() => {
632                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
633                        Err(ErrorData::internal_error(
634                            "Request cancelled".to_string(),
635                            None,
636                        ))
637                    }
638                }
639            }
640            None => {
641                // Server doesn't support prompts, return error
642                warn!("Server doesn't support prompts capability");
643                Ok(rmcp::model::GetPromptResult {
644                    description: None,
645                    messages: Vec::new(),
646                })
647            }
648        }
649    }
650
651    async fn complete(
652        &self,
653        request: rmcp::model::CompleteRequestParam,
654        context: RequestContext<RoleServer>,
655    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
656        // 原子加载后端连接
657        let inner_guard = self.peer.load();
658        let inner = inner_guard.as_ref().ok_or_else(|| {
659            error!("Backend connection is not available (reconnecting)");
660            ErrorData::internal_error(
661                "Backend connection is not available, reconnecting...".to_string(),
662                None,
663            )
664        })?;
665
666        // 检查后端连接是否已关闭
667        if inner.peer.is_transport_closed() {
668            error!("Backend transport is closed");
669            return Err(ErrorData::internal_error(
670                "Backend connection closed, please retry".to_string(),
671                None,
672            ));
673        }
674
675        tokio::select! {
676            result = inner.peer.complete(request) => {
677                match result {
678                    Ok(result) => {
679                        debug!("Proxying complete response");
680                        Ok(result)
681                    }
682                    Err(err) => {
683                        error!("Error completing: {:?}", err);
684                        Err(ErrorData::internal_error(
685                            format!("Error completing: {err}"),
686                            None,
687                        ))
688                    }
689                }
690            }
691            _ = context.ct.cancelled() => {
692                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
693                Err(ErrorData::internal_error(
694                    "Request cancelled".to_string(),
695                    None,
696                ))
697            }
698        }
699    }
700
701    async fn on_progress(
702        &self,
703        notification: rmcp::model::ProgressNotificationParam,
704        _context: NotificationContext<RoleServer>,
705    ) {
706        // 原子加载后端连接
707        let inner_guard = self.peer.load();
708        let inner = match inner_guard.as_ref() {
709            Some(inner) => inner,
710            None => {
711                error!("Backend connection is not available, cannot forward progress notification");
712                return;
713            }
714        };
715
716        // 检查后端连接是否已关闭
717        if inner.peer.is_transport_closed() {
718            error!("Backend transport is closed, cannot forward progress notification");
719            return;
720        }
721
722        match inner.peer.notify_progress(notification).await {
723            Ok(_) => {
724                debug!("Proxying progress notification");
725            }
726            Err(err) => {
727                error!("Error notifying progress: {:?}", err);
728            }
729        }
730    }
731
732    async fn on_cancelled(
733        &self,
734        notification: rmcp::model::CancelledNotificationParam,
735        _context: NotificationContext<RoleServer>,
736    ) {
737        // 原子加载后端连接
738        let inner_guard = self.peer.load();
739        let inner = match inner_guard.as_ref() {
740            Some(inner) => inner,
741            None => {
742                error!(
743                    "Backend connection is not available, cannot forward cancelled notification"
744                );
745                return;
746            }
747        };
748
749        // 检查后端连接是否已关闭
750        if inner.peer.is_transport_closed() {
751            error!("Backend transport is closed, cannot forward cancelled notification");
752            return;
753        }
754
755        match inner.peer.notify_cancelled(notification).await {
756            Ok(_) => {
757                debug!("Proxying cancelled notification");
758            }
759            Err(err) => {
760                error!("Error notifying cancelled: {:?}", err);
761            }
762        }
763    }
764}
765
766impl ProxyHandler {
767    /// 获取 capabilities 的引用,避免 clone
768    #[inline]
769    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
770        &self.cached_info.capabilities
771    }
772
773    /// 创建一个默认的 ServerInfo(用于断开状态)
774    fn default_server_info(mcp_id: &str) -> ServerInfo {
775        warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
776        ServerInfo {
777            protocol_version: Default::default(),
778            server_info: Implementation {
779                name: "MCP Proxy".to_string(),
780                version: "0.1.0".to_string(),
781                title: None,
782                website_url: None,
783                icons: None,
784            },
785            instructions: None,
786            capabilities: Default::default(),
787        }
788    }
789
790    /// 从 RunningService 提取 ServerInfo
791    fn extract_server_info(
792        client: &RunningService<RoleClient, ClientInfo>,
793        mcp_id: &str,
794    ) -> ServerInfo {
795        client
796            .peer_info()
797            .map(|peer_info| ServerInfo {
798                protocol_version: peer_info.protocol_version.clone(),
799                server_info: Implementation {
800                    name: peer_info.server_info.name.clone(),
801                    version: peer_info.server_info.version.clone(),
802                    title: None,
803                    website_url: None,
804                    icons: None,
805                },
806                instructions: peer_info.instructions.clone(),
807                capabilities: peer_info.capabilities.clone(),
808            })
809            .unwrap_or_else(|| Self::default_server_info(mcp_id))
810    }
811
812    /// 创建断开状态的 handler(用于初始化)
813    /// 后续通过 swap_backend() 注入实际的后端连接
814    pub fn new_disconnected(
815        mcp_id: String,
816        tool_filter: ToolFilter,
817        default_info: ServerInfo,
818    ) -> Self {
819        info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
820
821        // 记录过滤器配置
822        if tool_filter.is_enabled() {
823            if let Some(ref allow_list) = tool_filter.allow_tools {
824                info!(
825                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
826                    mcp_id, allow_list
827                );
828            }
829            if let Some(ref deny_list) = tool_filter.deny_tools {
830                info!(
831                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
832                    mcp_id, deny_list
833                );
834            }
835        }
836
837        Self {
838            peer: Arc::new(ArcSwapOption::empty()),
839            cached_info: default_info,
840            mcp_id,
841            tool_filter,
842            backend_version: Arc::new(AtomicU64::new(0)), // 断开状态版本为 0
843        }
844    }
845
846    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
847        Self::with_mcp_id(client, "unknown".to_string())
848    }
849
850    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
851        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
852    }
853
854    /// 创建带工具过滤器的 ProxyHandler(带初始后端连接)
855    pub fn with_tool_filter(
856        client: RunningService<RoleClient, ClientInfo>,
857        mcp_id: String,
858        tool_filter: ToolFilter,
859    ) -> Self {
860        use std::ops::Deref;
861
862        // 提取 ServerInfo
863        let cached_info = Self::extract_server_info(&client, &mcp_id);
864
865        // 克隆 Peer 用于并发请求(无需锁)
866        let peer = client.deref().clone();
867
868        // 记录过滤器配置
869        if tool_filter.is_enabled() {
870            if let Some(ref allow_list) = tool_filter.allow_tools {
871                info!(
872                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
873                    mcp_id, allow_list
874                );
875            }
876            if let Some(ref deny_list) = tool_filter.deny_tools {
877                info!(
878                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
879                    mcp_id, deny_list
880                );
881            }
882        }
883
884        // 创建 PeerInner
885        let inner = PeerInner {
886            peer,
887            _running: Arc::new(client),
888        };
889
890        Self {
891            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
892            cached_info,
893            mcp_id,
894            tool_filter,
895            backend_version: Arc::new(AtomicU64::new(1)), // 初始版本为 1
896        }
897    }
898
899    /// 原子性替换后端连接
900    /// - Some(client): 设置新的后端连接
901    /// - None: 标记后端断开
902    ///
903    /// **版本控制**:每次调用都会递增 backend_version,使旧 session 失效
904    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
905        use std::ops::Deref;
906
907        match new_client {
908            Some(client) => {
909                let peer = client.deref().clone();
910                let inner = PeerInner {
911                    peer,
912                    _running: Arc::new(client),
913                };
914                self.peer.store(Some(Arc::new(inner)));
915                info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
916            }
917            None => {
918                self.peer.store(None);
919                info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
920            }
921        }
922
923        // 关键:递增版本号,使所有旧 session 失效
924        let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
925        info!(
926            "[ProxyHandler] 后端版本更新: {} - MCP ID: {}",
927            new_version, self.mcp_id
928        );
929    }
930
931    /// 检查后端是否可用(快速检查,不发送请求)
932    pub fn is_backend_available(&self) -> bool {
933        let inner_guard = self.peer.load();
934        match inner_guard.as_ref() {
935            Some(inner) => !inner.peer.is_transport_closed(),
936            None => false,
937        }
938    }
939
940    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
941    pub async fn is_mcp_server_ready(&self) -> bool {
942        !self.is_terminated_async().await
943    }
944
945    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
946    pub fn is_terminated(&self) -> bool {
947        !self.is_backend_available()
948    }
949
950    /// 异步检查后端连接是否已断开(会发送验证请求)
951    pub async fn is_terminated_async(&self) -> bool {
952        // 原子加载后端连接
953        let inner_guard = self.peer.load();
954        let inner = match inner_guard.as_ref() {
955            Some(inner) => inner,
956            None => return true,
957        };
958
959        // 快速检查 transport 状态
960        if inner.peer.is_transport_closed() {
961            return true;
962        }
963
964        // 通过发送轻量级请求来验证连接
965        match inner.peer.list_tools(None).await {
966            Ok(_) => {
967                debug!("后端连接状态检查: 正常");
968                false
969            }
970            Err(e) => {
971                info!("后端连接状态检查: 已断开,原因: {e}");
972                true
973            }
974        }
975    }
976
977    /// 获取 MCP ID
978    pub fn mcp_id(&self) -> &str {
979        &self.mcp_id
980    }
981
982    /// 获取当前后端版本号
983    ///
984    /// 版本号用于跟踪后端连接变化:
985    /// - 0: 断开状态
986    /// - 1+: 已连接,每次 swap_backend 递增
987    ///
988    /// **用途**:配合 ProxyAwareSessionManager 实现 session 版本控制
989    pub fn get_backend_version(&self) -> u64 {
990        self.backend_version.load(Ordering::SeqCst)
991    }
992
993    /// Update backend from a StreamClientConnection
994    ///
995    /// This method allows updating the backend connection using the high-level
996    /// `StreamClientConnection` type, which is more convenient than the raw
997    /// `RunningService` type.
998    ///
999    /// # Arguments
1000    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
1001    pub fn swap_backend_from_connection(
1002        &self,
1003        conn: Option<crate::client::StreamClientConnection>,
1004    ) {
1005        match conn {
1006            Some(c) => {
1007                let running = c.into_running_service();
1008                self.swap_backend(Some(running));
1009            }
1010            None => {
1011                self.swap_backend(None);
1012            }
1013        }
1014    }
1015}