Skip to main content

mcp_streamable_proxy/
proxy_handler.rs

1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3/**
4 * Create a local SSE server that proxies requests to a stdio MCP server.
5 */
6use rmcp::{
7    ErrorData, RoleClient, RoleServer, ServerHandler,
8    model::{
9        CallToolRequestParams, CallToolResult, ClientInfo, Content, Implementation,
10        ListToolsResult, PaginatedRequestParams, ServerInfo,
11    },
12    service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Instant;
17use tracing::{debug, error, info, warn};
18
19/// 全局请求计数器,用于生成唯一的请求 ID
20static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22/// 包装后端连接和运行服务
23/// 用于 ArcSwap 热替换
24#[derive(Debug)]
25struct PeerInner {
26    /// Peer 用于发送请求
27    peer: Peer<RoleClient>,
28    /// 保持 RunningService 的所有权,确保服务生命周期
29    #[allow(dead_code)]
30    _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33/// A proxy handler that forwards requests to a client based on the server's capabilities
34/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
35///
36/// **增强功能**:
37/// - 后端版本控制:每次 swap_backend 都会递增版本号
38/// - 支持 Session 版本跟踪:配合 ProxyAwareSessionManager 使用
39#[derive(Clone, Debug)]
40pub struct ProxyHandler {
41    /// 后端连接(ArcSwap 支持无锁原子替换)
42    /// None 表示后端断开/重连中
43    peer: Arc<ArcSwapOption<PeerInner>>,
44    /// 缓存的服务器信息(保持不变,重连后应一致)
45    cached_info: ServerInfo,
46    /// MCP ID 用于日志记录
47    mcp_id: String,
48    /// 工具过滤配置
49    tool_filter: ToolFilter,
50    /// 后端版本号(每次 swap_backend 递增)
51    /// 用于跟踪后端连接变化,使旧 session 失效
52    backend_version: Arc<AtomicU64>,
53}
54
55impl ServerHandler for ProxyHandler {
56    fn get_info(&self) -> ServerInfo {
57        self.cached_info.clone()
58    }
59
60    #[tracing::instrument(skip(self, request, context), fields(
61        mcp_id = %self.mcp_id,
62        request = ?request,
63    ))]
64    async fn list_tools(
65        &self,
66        request: Option<PaginatedRequestParams>,
67        context: RequestContext<RoleServer>,
68    ) -> Result<ListToolsResult, ErrorData> {
69        // 原子加载后端连接
70        let inner_guard = self.peer.load();
71        let inner = inner_guard.as_ref().ok_or_else(|| {
72            error!("Backend connection is not available (reconnecting)");
73            ErrorData::internal_error(
74                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
75                None,
76            )
77        })?;
78
79        // 检查后端连接是否已关闭
80        if inner.peer.is_transport_closed() {
81            error!("Backend transport is closed");
82            return Err(ErrorData::internal_error(
83                "Backend connection closed, please retry".to_string(),
84                None,
85            ));
86        }
87
88        // Check if the server has tools capability and forward the request
89        match self.capabilities().tools {
90            Some(_) => {
91                // 使用 tokio::select! 同时等待取消和结果
92                tokio::select! {
93                    result = inner.peer.list_tools(request) => {
94                        match result {
95                            Ok(result) => {
96                                // 根据过滤配置过滤工具列表
97                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
98                                    result
99                                        .tools
100                                        .into_iter()
101                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
102                                        .collect()
103                                } else {
104                                    result.tools
105                                };
106
107                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
108                                info!(
109                                    "[list_tools] 工具列表结果 - MCP ID: {}, 工具数量: {}{}",
110                                    self.mcp_id,
111                                    filtered_tools.len(),
112                                    if self.tool_filter.is_enabled() {
113                                        " (已过滤)"
114                                    } else {
115                                        ""
116                                    }
117                                );
118
119                                debug!(
120                                    "Proxying list_tools response with {} tools",
121                                    filtered_tools.len()
122                                );
123                                Ok(ListToolsResult {
124                                    tools: filtered_tools,
125                                    next_cursor: result.next_cursor,
126                                    meta: result.meta, // rmcp 0.12 新增字段
127                                })
128                            }
129                            Err(err) => {
130                                error!("Error listing tools: {:?}", err);
131                                Err(ErrorData::internal_error(
132                                    format!("Error listing tools: {err}"),
133                                    None,
134                                ))
135                            }
136                        }
137                    }
138                    _ = context.ct.cancelled() => {
139                        info!("[list_tools] 请求被取消 - MCP ID: {}", self.mcp_id);
140                        Err(ErrorData::internal_error(
141                            "Request cancelled".to_string(),
142                            None,
143                        ))
144                    }
145                }
146            }
147            None => {
148                // Server doesn't support tools, return empty list
149                warn!("Server doesn't support tools capability");
150                Ok(ListToolsResult::default())
151            }
152        }
153    }
154
155    #[tracing::instrument(skip(self, request, context), fields(
156        mcp_id = %self.mcp_id,
157        tool_name = %request.name,
158        tool_arguments = ?request.arguments,
159    ))]
160    async fn call_tool(
161        &self,
162        request: CallToolRequestParams,
163        context: RequestContext<RoleServer>,
164    ) -> Result<CallToolResult, ErrorData> {
165        // 生成唯一请求 ID 用于追踪
166        let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
167        let start = Instant::now();
168
169        info!(
170            "[call_tool:{}] 开始 - 工具: {}, MCP ID: {}",
171            request_id, request.name, self.mcp_id
172        );
173
174        // 首先检查工具是否被过滤
175        if !self.tool_filter.is_allowed(&request.name) {
176            info!(
177                "[call_tool:{}] 工具被过滤 - MCP ID: {}, 工具: {}",
178                request_id, self.mcp_id, request.name
179            );
180            return Ok(CallToolResult::error(vec![Content::text(format!(
181                "Tool '{}' is not allowed by filter configuration",
182                request.name
183            ))]));
184        }
185
186        // 原子加载后端连接
187        let inner_guard = self.peer.load();
188        let inner = match inner_guard.as_ref() {
189            Some(inner) => {
190                let transport_closed = inner.peer.is_transport_closed();
191                info!(
192                    "[call_tool:{}] 后端连接存在 - transport_closed: {}",
193                    request_id, transport_closed
194                );
195                inner
196            }
197            None => {
198                error!(
199                    "[call_tool:{}] 后端连接不可用 (正在重连) - MCP ID: {}",
200                    request_id, self.mcp_id
201                );
202                return Ok(CallToolResult::error(vec![Content::text(
203                    "Backend connection is not available, reconnecting...",
204                )]));
205            }
206        };
207
208        // 检查后端连接是否已关闭
209        if inner.peer.is_transport_closed() {
210            error!(
211                "[call_tool:{}] 后端 transport 已关闭 - MCP ID: {}",
212                request_id, self.mcp_id
213            );
214            return Ok(CallToolResult::error(vec![Content::text(
215                "Backend connection closed, please retry",
216            )]));
217        }
218
219        // Check if the server has tools capability and forward the request
220        let result = match self.capabilities().tools {
221            Some(_) => {
222                // 记录发送请求到后端的时间点
223                info!(
224                    "[call_tool:{}] 发送请求到后端... - 工具: {}, 已耗时: {}ms",
225                    request_id,
226                    request.name,
227                    start.elapsed().as_millis()
228                );
229
230                // 创建后端调用的 Future,使用 pin 固定
231                let call_future = inner.peer.call_tool(request.clone());
232                tokio::pin!(call_future);
233
234                // 等待心跳间隔(30秒)
235                const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
236                let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
237                // 跳过第一次立即触发
238                heartbeat_interval.tick().await;
239
240                // 使用循环 + select! 实现等待心跳
241                let call_result = loop {
242                    tokio::select! {
243                        // biased 确保按顺序优先检查,避免心跳检查抢占实际结果
244                        biased;
245
246                        result = &mut call_future => {
247                            break result;
248                        }
249                        _ = context.ct.cancelled() => {
250                            let elapsed = start.elapsed();
251                            warn!(
252                                "[call_tool:{}] 请求被取消 - 工具: {}, 耗时: {}ms, MCP ID: {}",
253                                request_id, request.name, elapsed.as_millis(), self.mcp_id
254                            );
255                            return Ok(CallToolResult::error(vec![Content::text(
256                                "Request cancelled"
257                            )]));
258                        }
259                        _ = heartbeat_interval.tick() => {
260                            // 定期打印等待日志,证明 mcp-proxy 在等待后端响应
261                            let elapsed = start.elapsed();
262                            let transport_closed = inner.peer.is_transport_closed();
263                            info!(
264                                "[call_tool:{}] 等待后端响应中... - 工具: {}, 已等待: {}ms, \
265                                transport_closed: {}, MCP ID: {}",
266                                request_id, request.name, elapsed.as_millis(),
267                                transport_closed, self.mcp_id
268                            );
269                        }
270                    }
271                };
272
273                let elapsed = start.elapsed();
274                match &call_result {
275                    Ok(call_result) => {
276                        // 记录工具调用结果
277                        let is_error = call_result.is_error.unwrap_or(false);
278                        info!(
279                            "[call_tool:{}] 收到响应 - 工具: {}, 耗时: {}ms, is_error: {}, MCP ID: {}",
280                            request_id,
281                            request.name,
282                            elapsed.as_millis(),
283                            is_error,
284                            self.mcp_id
285                        );
286                        if is_error {
287                            debug!(
288                                "[call_tool:{}] 错误响应内容: {:?}",
289                                request_id, call_result.content
290                            );
291                        }
292                        Ok(call_result.clone())
293                    }
294                    Err(err) => {
295                        error!(
296                            "[call_tool:{}] 后端返回错误 - 工具: {}, 耗时: {}ms, 错误: {:?}, MCP ID: {}",
297                            request_id,
298                            request.name,
299                            elapsed.as_millis(),
300                            err,
301                            self.mcp_id
302                        );
303                        // Return an error result instead of propagating the error
304                        Ok(CallToolResult::error(vec![Content::text(format!(
305                            "Error: {err}"
306                        ))]))
307                    }
308                }
309            }
310            None => {
311                error!(
312                    "[call_tool:{}] 服务器不支持 tools capability - MCP ID: {}",
313                    request_id, self.mcp_id
314                );
315                Ok(CallToolResult::error(vec![Content::text(
316                    "Server doesn't support tools capability",
317                )]))
318            }
319        };
320
321        let total_elapsed = start.elapsed();
322        info!(
323            "[call_tool:{}] 完成 - 工具: {}, 总耗时: {}ms",
324            request_id,
325            request.name,
326            total_elapsed.as_millis()
327        );
328        result
329    }
330
331    async fn list_resources(
332        &self,
333        request: Option<PaginatedRequestParams>,
334        context: RequestContext<RoleServer>,
335    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
336        // 原子加载后端连接
337        let inner_guard = self.peer.load();
338        let inner = inner_guard.as_ref().ok_or_else(|| {
339            error!("Backend connection is not available (reconnecting)");
340            ErrorData::internal_error(
341                "Backend connection is not available, reconnecting...".to_string(),
342                None,
343            )
344        })?;
345
346        // 检查后端连接是否已关闭
347        if inner.peer.is_transport_closed() {
348            error!("Backend transport is closed");
349            return Err(ErrorData::internal_error(
350                "Backend connection closed, please retry".to_string(),
351                None,
352            ));
353        }
354
355        // Check if the server has resources capability and forward the request
356        match self.capabilities().resources {
357            Some(_) => {
358                tokio::select! {
359                    result = inner.peer.list_resources(request) => {
360                        match result {
361                            Ok(result) => {
362                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
363                                info!(
364                                    "[list_resources] 资源列表结果 - MCP ID: {}, 资源数量: {}",
365                                    self.mcp_id,
366                                    result.resources.len()
367                                );
368
369                                debug!("Proxying list_resources response");
370                                Ok(result)
371                            }
372                            Err(err) => {
373                                error!("Error listing resources: {:?}", err);
374                                Err(ErrorData::internal_error(
375                                    format!("Error listing resources: {err}"),
376                                    None,
377                                ))
378                            }
379                        }
380                    }
381                    _ = context.ct.cancelled() => {
382                        info!("[list_resources] 请求被取消 - MCP ID: {}", self.mcp_id);
383                        Err(ErrorData::internal_error(
384                            "Request cancelled".to_string(),
385                            None,
386                        ))
387                    }
388                }
389            }
390            None => {
391                // Server doesn't support resources, return empty list
392                warn!("Server doesn't support resources capability");
393                Ok(rmcp::model::ListResourcesResult::default())
394            }
395        }
396    }
397
398    async fn read_resource(
399        &self,
400        request: rmcp::model::ReadResourceRequestParams,
401        context: RequestContext<RoleServer>,
402    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
403        // 原子加载后端连接
404        let inner_guard = self.peer.load();
405        let inner = inner_guard.as_ref().ok_or_else(|| {
406            error!("Backend connection is not available (reconnecting)");
407            ErrorData::internal_error(
408                "Backend connection is not available, reconnecting...".to_string(),
409                None,
410            )
411        })?;
412
413        // 检查后端连接是否已关闭
414        if inner.peer.is_transport_closed() {
415            error!("Backend transport is closed");
416            return Err(ErrorData::internal_error(
417                "Backend connection closed, please retry".to_string(),
418                None,
419            ));
420        }
421
422        // Check if the server has resources capability and forward the request
423        match self.capabilities().resources {
424            Some(_) => {
425                tokio::select! {
426                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParams::new(request.uri.clone())) => {
427                        match result {
428                            Ok(result) => {
429                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
430                                info!(
431                                    "[read_resource] 资源读取结果 - MCP ID: {}, URI: {}",
432                                    self.mcp_id, request.uri
433                                );
434
435                                debug!("Proxying read_resource response for {}", request.uri);
436                                Ok(result)
437                            }
438                            Err(err) => {
439                                error!("Error reading resource: {:?}", err);
440                                Err(ErrorData::internal_error(
441                                    format!("Error reading resource: {err}"),
442                                    None,
443                                ))
444                            }
445                        }
446                    }
447                    _ = context.ct.cancelled() => {
448                        info!("[read_resource] 请求被取消 - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
449                        Err(ErrorData::internal_error(
450                            "Request cancelled".to_string(),
451                            None,
452                        ))
453                    }
454                }
455            }
456            None => {
457                // Server doesn't support resources, return error
458                error!("Server doesn't support resources capability");
459                Ok(rmcp::model::ReadResourceResult::new(vec![]))
460            }
461        }
462    }
463
464    async fn list_resource_templates(
465        &self,
466        request: Option<PaginatedRequestParams>,
467        context: RequestContext<RoleServer>,
468    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
469        // 原子加载后端连接
470        let inner_guard = self.peer.load();
471        let inner = inner_guard.as_ref().ok_or_else(|| {
472            error!("Backend connection is not available (reconnecting)");
473            ErrorData::internal_error(
474                "Backend connection is not available, reconnecting...".to_string(),
475                None,
476            )
477        })?;
478
479        // 检查后端连接是否已关闭
480        if inner.peer.is_transport_closed() {
481            error!("Backend transport is closed");
482            return Err(ErrorData::internal_error(
483                "Backend connection closed, please retry".to_string(),
484                None,
485            ));
486        }
487
488        // Check if the server has resources capability and forward the request
489        match self.capabilities().resources {
490            Some(_) => {
491                tokio::select! {
492                    result = inner.peer.list_resource_templates(request) => {
493                        match result {
494                            Ok(result) => {
495                                debug!("Proxying list_resource_templates response");
496                                Ok(result)
497                            }
498                            Err(err) => {
499                                error!("Error listing resource templates: {:?}", err);
500                                Err(ErrorData::internal_error(
501                                    format!("Error listing resource templates: {err}"),
502                                    None,
503                                ))
504                            }
505                        }
506                    }
507                    _ = context.ct.cancelled() => {
508                        info!("[list_resource_templates] 请求被取消 - MCP ID: {}", self.mcp_id);
509                        Err(ErrorData::internal_error(
510                            "Request cancelled".to_string(),
511                            None,
512                        ))
513                    }
514                }
515            }
516            None => {
517                // Server doesn't support resources, return empty list
518                warn!("Server doesn't support resources capability");
519                Ok(rmcp::model::ListResourceTemplatesResult::default())
520            }
521        }
522    }
523
524    async fn list_prompts(
525        &self,
526        request: Option<PaginatedRequestParams>,
527        context: RequestContext<RoleServer>,
528    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
529        // 原子加载后端连接
530        let inner_guard = self.peer.load();
531        let inner = inner_guard.as_ref().ok_or_else(|| {
532            error!("Backend connection is not available (reconnecting)");
533            ErrorData::internal_error(
534                "Backend connection is not available, reconnecting...".to_string(),
535                None,
536            )
537        })?;
538
539        // 检查后端连接是否已关闭
540        if inner.peer.is_transport_closed() {
541            error!("Backend transport is closed");
542            return Err(ErrorData::internal_error(
543                "Backend connection closed, please retry".to_string(),
544                None,
545            ));
546        }
547
548        // Check if the server has prompts capability and forward the request
549        match self.capabilities().prompts {
550            Some(_) => {
551                tokio::select! {
552                    result = inner.peer.list_prompts(request) => {
553                        match result {
554                            Ok(result) => {
555                                debug!("Proxying list_prompts response");
556                                Ok(result)
557                            }
558                            Err(err) => {
559                                error!("Error listing prompts: {:?}", err);
560                                Err(ErrorData::internal_error(
561                                    format!("Error listing prompts: {err}"),
562                                    None,
563                                ))
564                            }
565                        }
566                    }
567                    _ = context.ct.cancelled() => {
568                        info!("[list_prompts] 请求被取消 - MCP ID: {}", self.mcp_id);
569                        Err(ErrorData::internal_error(
570                            "Request cancelled".to_string(),
571                            None,
572                        ))
573                    }
574                }
575            }
576            None => {
577                // Server doesn't support prompts, return empty list
578                warn!("Server doesn't support prompts capability");
579                Ok(rmcp::model::ListPromptsResult::default())
580            }
581        }
582    }
583
584    async fn get_prompt(
585        &self,
586        request: rmcp::model::GetPromptRequestParams,
587        context: RequestContext<RoleServer>,
588    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
589        // 原子加载后端连接
590        let inner_guard = self.peer.load();
591        let inner = inner_guard.as_ref().ok_or_else(|| {
592            error!("Backend connection is not available (reconnecting)");
593            ErrorData::internal_error(
594                "Backend connection is not available, reconnecting...".to_string(),
595                None,
596            )
597        })?;
598
599        // 检查后端连接是否已关闭
600        if inner.peer.is_transport_closed() {
601            error!("Backend transport is closed");
602            return Err(ErrorData::internal_error(
603                "Backend connection closed, please retry".to_string(),
604                None,
605            ));
606        }
607
608        // Check if the server has prompts capability and forward the request
609        match self.capabilities().prompts {
610            Some(_) => {
611                tokio::select! {
612                    result = inner.peer.get_prompt(request.clone()) => {
613                        match result {
614                            Ok(result) => {
615                                debug!("Proxying get_prompt response");
616                                Ok(result)
617                            }
618                            Err(err) => {
619                                error!("Error getting prompt: {:?}", err);
620                                Err(ErrorData::internal_error(
621                                    format!("Error getting prompt: {err}"),
622                                    None,
623                                ))
624                            }
625                        }
626                    }
627                    _ = context.ct.cancelled() => {
628                        info!("[get_prompt] 请求被取消 - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
629                        Err(ErrorData::internal_error(
630                            "Request cancelled".to_string(),
631                            None,
632                        ))
633                    }
634                }
635            }
636            None => {
637                // Server doesn't support prompts, return empty messages
638                warn!("Server doesn't support prompts capability");
639                let messages = Vec::new();
640                Ok(rmcp::model::GetPromptResult::new(messages))
641            }
642        }
643    }
644
645    async fn complete(
646        &self,
647        request: rmcp::model::CompleteRequestParams,
648        context: RequestContext<RoleServer>,
649    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
650        // 原子加载后端连接
651        let inner_guard = self.peer.load();
652        let inner = inner_guard.as_ref().ok_or_else(|| {
653            error!("Backend connection is not available (reconnecting)");
654            ErrorData::internal_error(
655                "Backend connection is not available, reconnecting...".to_string(),
656                None,
657            )
658        })?;
659
660        // 检查后端连接是否已关闭
661        if inner.peer.is_transport_closed() {
662            error!("Backend transport is closed");
663            return Err(ErrorData::internal_error(
664                "Backend connection closed, please retry".to_string(),
665                None,
666            ));
667        }
668
669        tokio::select! {
670            result = inner.peer.complete(request) => {
671                match result {
672                    Ok(result) => {
673                        debug!("Proxying complete response");
674                        Ok(result)
675                    }
676                    Err(err) => {
677                        error!("Error completing: {:?}", err);
678                        Err(ErrorData::internal_error(
679                            format!("Error completing: {err}"),
680                            None,
681                        ))
682                    }
683                }
684            }
685            _ = context.ct.cancelled() => {
686                info!("[complete] 请求被取消 - MCP ID: {}", self.mcp_id);
687                Err(ErrorData::internal_error(
688                    "Request cancelled".to_string(),
689                    None,
690                ))
691            }
692        }
693    }
694
695    async fn on_progress(
696        &self,
697        notification: rmcp::model::ProgressNotificationParam,
698        _context: NotificationContext<RoleServer>,
699    ) {
700        // 原子加载后端连接
701        let inner_guard = self.peer.load();
702        let inner = match inner_guard.as_ref() {
703            Some(inner) => inner,
704            None => {
705                error!("Backend connection is not available, cannot forward progress notification");
706                return;
707            }
708        };
709
710        // 检查后端连接是否已关闭
711        if inner.peer.is_transport_closed() {
712            error!("Backend transport is closed, cannot forward progress notification");
713            return;
714        }
715
716        match inner.peer.notify_progress(notification).await {
717            Ok(_) => {
718                debug!("Proxying progress notification");
719            }
720            Err(err) => {
721                error!("Error notifying progress: {:?}", err);
722            }
723        }
724    }
725
726    async fn on_cancelled(
727        &self,
728        notification: rmcp::model::CancelledNotificationParam,
729        _context: NotificationContext<RoleServer>,
730    ) {
731        // 原子加载后端连接
732        let inner_guard = self.peer.load();
733        let inner = match inner_guard.as_ref() {
734            Some(inner) => inner,
735            None => {
736                error!(
737                    "Backend connection is not available, cannot forward cancelled notification"
738                );
739                return;
740            }
741        };
742
743        // 检查后端连接是否已关闭
744        if inner.peer.is_transport_closed() {
745            error!("Backend transport is closed, cannot forward cancelled notification");
746            return;
747        }
748
749        match inner.peer.notify_cancelled(notification).await {
750            Ok(_) => {
751                debug!("Proxying cancelled notification");
752            }
753            Err(err) => {
754                error!("Error notifying cancelled: {:?}", err);
755            }
756        }
757    }
758}
759
760impl ProxyHandler {
761    /// 获取 capabilities 的引用,避免 clone
762    #[inline]
763    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
764        &self.cached_info.capabilities
765    }
766
767    /// 创建一个默认的 ServerInfo(用于断开状态)
768    fn default_server_info(mcp_id: &str) -> ServerInfo {
769        warn!("[ProxyHandler] 创建默认 ServerInfo - MCP ID: {}", mcp_id);
770        ServerInfo::new(rmcp::model::ServerCapabilities::default())
771            .with_server_info(Implementation::new("MCP Proxy", "0.1.0"))
772    }
773
774    /// 从 RunningService 提取 ServerInfo
775    fn extract_server_info(
776        client: &RunningService<RoleClient, ClientInfo>,
777        mcp_id: &str,
778    ) -> ServerInfo {
779        client
780            .peer_info()
781            .map(|peer_info| {
782                ServerInfo::new(peer_info.capabilities.clone())
783                    .with_protocol_version(peer_info.protocol_version.clone())
784                    .with_server_info(Implementation::new(
785                        peer_info.server_info.name.clone(),
786                        peer_info.server_info.version.clone(),
787                    ))
788                    .with_instructions(peer_info.instructions.clone().unwrap_or_default())
789            })
790            .unwrap_or_else(|| Self::default_server_info(mcp_id))
791    }
792
793    /// 创建断开状态的 handler(用于初始化)
794    /// 后续通过 swap_backend() 注入实际的后端连接
795    pub fn new_disconnected(
796        mcp_id: String,
797        tool_filter: ToolFilter,
798        default_info: ServerInfo,
799    ) -> Self {
800        info!("[ProxyHandler] 创建断开状态的 handler - MCP ID: {}", mcp_id);
801
802        // 记录过滤器配置
803        if tool_filter.is_enabled() {
804            if let Some(ref allow_list) = tool_filter.allow_tools {
805                info!(
806                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
807                    mcp_id, allow_list
808                );
809            }
810            if let Some(ref deny_list) = tool_filter.deny_tools {
811                info!(
812                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
813                    mcp_id, deny_list
814                );
815            }
816        }
817
818        Self {
819            peer: Arc::new(ArcSwapOption::empty()),
820            cached_info: default_info,
821            mcp_id,
822            tool_filter,
823            backend_version: Arc::new(AtomicU64::new(0)), // 断开状态版本为 0
824        }
825    }
826
827    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
828        Self::with_mcp_id(client, "unknown".to_string())
829    }
830
831    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
832        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
833    }
834
835    /// 创建带工具过滤器的 ProxyHandler(带初始后端连接)
836    pub fn with_tool_filter(
837        client: RunningService<RoleClient, ClientInfo>,
838        mcp_id: String,
839        tool_filter: ToolFilter,
840    ) -> Self {
841        use std::ops::Deref;
842
843        // 提取 ServerInfo
844        let cached_info = Self::extract_server_info(&client, &mcp_id);
845
846        // 克隆 Peer 用于并发请求(无需锁)
847        let peer = client.deref().clone();
848
849        // 记录过滤器配置
850        if tool_filter.is_enabled() {
851            if let Some(ref allow_list) = tool_filter.allow_tools {
852                info!(
853                    "[ProxyHandler] 工具白名单已启用 - MCP ID: {}, 允许的工具: {:?}",
854                    mcp_id, allow_list
855                );
856            }
857            if let Some(ref deny_list) = tool_filter.deny_tools {
858                info!(
859                    "[ProxyHandler] 工具黑名单已启用 - MCP ID: {}, 排除的工具: {:?}",
860                    mcp_id, deny_list
861                );
862            }
863        }
864
865        // 创建 PeerInner
866        let inner = PeerInner {
867            peer,
868            _running: Arc::new(client),
869        };
870
871        Self {
872            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
873            cached_info,
874            mcp_id,
875            tool_filter,
876            backend_version: Arc::new(AtomicU64::new(1)), // 初始版本为 1
877        }
878    }
879
880    /// 原子性替换后端连接
881    /// - Some(client): 设置新的后端连接
882    /// - None: 标记后端断开
883    ///
884    /// **版本控制**:每次调用都会递增 backend_version,使旧 session 失效
885    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
886        use std::ops::Deref;
887
888        match new_client {
889            Some(client) => {
890                let peer = client.deref().clone();
891                let inner = PeerInner {
892                    peer,
893                    _running: Arc::new(client),
894                };
895                self.peer.store(Some(Arc::new(inner)));
896                info!("[ProxyHandler] 后端连接已更新 - MCP ID: {}", self.mcp_id);
897            }
898            None => {
899                self.peer.store(None);
900                info!("[ProxyHandler] 后端连接已断开 - MCP ID: {}", self.mcp_id);
901            }
902        }
903
904        // 关键:递增版本号,使所有旧 session 失效
905        let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
906        info!(
907            "[ProxyHandler] 后端版本更新: {} - MCP ID: {}",
908            new_version, self.mcp_id
909        );
910    }
911
912    /// 检查后端是否可用(快速检查,不发送请求)
913    pub fn is_backend_available(&self) -> bool {
914        let inner_guard = self.peer.load();
915        match inner_guard.as_ref() {
916            Some(inner) => !inner.peer.is_transport_closed(),
917            None => false,
918        }
919    }
920
921    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
922    pub async fn is_mcp_server_ready(&self) -> bool {
923        !self.is_terminated_async().await
924    }
925
926    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
927    pub fn is_terminated(&self) -> bool {
928        !self.is_backend_available()
929    }
930
931    /// 异步检查后端连接是否已断开(会发送验证请求)
932    pub async fn is_terminated_async(&self) -> bool {
933        // 原子加载后端连接
934        let inner_guard = self.peer.load();
935        let inner = match inner_guard.as_ref() {
936            Some(inner) => inner,
937            None => return true,
938        };
939
940        // 快速检查 transport 状态
941        if inner.peer.is_transport_closed() {
942            return true;
943        }
944
945        // 通过发送轻量级请求来验证连接
946        match inner.peer.list_tools(None).await {
947            Ok(_) => {
948                debug!("后端连接状态检查: 正常");
949                false
950            }
951            Err(e) => {
952                info!("后端连接状态检查: 已断开,原因: {e}");
953                true
954            }
955        }
956    }
957
958    /// 获取 MCP ID
959    pub fn mcp_id(&self) -> &str {
960        &self.mcp_id
961    }
962
963    /// 获取当前后端版本号
964    ///
965    /// 版本号用于跟踪后端连接变化:
966    /// - 0: 断开状态
967    /// - 1+: 已连接,每次 swap_backend 递增
968    ///
969    /// **用途**:配合 ProxyAwareSessionManager 实现 session 版本控制
970    pub fn get_backend_version(&self) -> u64 {
971        self.backend_version.load(Ordering::SeqCst)
972    }
973
974    /// Update backend from a StreamClientConnection
975    ///
976    /// This method allows updating the backend connection using the high-level
977    /// `StreamClientConnection` type, which is more convenient than the raw
978    /// `RunningService` type.
979    ///
980    /// # Arguments
981    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
982    pub fn swap_backend_from_connection(
983        &self,
984        conn: Option<crate::client::StreamClientConnection>,
985    ) {
986        match conn {
987            Some(c) => {
988                let running = c.into_running_service();
989                self.swap_backend(Some(running));
990            }
991            None => {
992                self.swap_backend(None);
993            }
994        }
995    }
996}