Skip to main content

mcp_streamable_proxy/
proxy_handler.rs

1use arc_swap::ArcSwapOption;
2pub use mcp_common::ToolFilter;
3/**
4 * Create a local SSE server that proxies requests to a stdio MCP server.
5 */
6use rmcp::{
7    ErrorData, RoleClient, RoleServer, ServerHandler,
8    model::{
9        CallToolRequestParams, CallToolResult, ClientInfo, Content, Implementation,
10        ListToolsResult, PaginatedRequestParams, ServerInfo,
11    },
12    service::{NotificationContext, Peer, RequestContext, RunningService},
13};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Instant;
17use tracing::{debug, error, info, warn};
18
19/// 全局请求计数器,用于生成唯一的请求 ID
20static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1);
21
22/// 包装后端连接和运行服务
23/// 用于 ArcSwap 热替换
24#[derive(Debug)]
25struct PeerInner {
26    /// Peer 用于发送请求
27    peer: Peer<RoleClient>,
28    /// 保持 RunningService 的所有权,确保服务生命周期
29    #[allow(dead_code)]
30    _running: Arc<RunningService<RoleClient, ClientInfo>>,
31}
32
33/// A proxy handler that forwards requests to a client based on the server's capabilities
34/// 使用 ArcSwap 实现后端热替换,支持断开时立即返回错误
35///
36/// **增强功能**:
37/// - 后端版本控制:每次 swap_backend 都会递增版本号
38/// - 支持 Session 版本跟踪:配合 ProxyAwareSessionManager 使用
39#[derive(Clone, Debug)]
40pub struct ProxyHandler {
41    /// 后端连接(ArcSwap 支持无锁原子替换)
42    /// None 表示后端断开/重连中
43    peer: Arc<ArcSwapOption<PeerInner>>,
44    /// 缓存的服务器信息(保持不变,重连后应一致)
45    cached_info: ServerInfo,
46    /// MCP ID 用于日志记录
47    mcp_id: String,
48    /// 工具过滤配置
49    tool_filter: ToolFilter,
50    /// 后端版本号(每次 swap_backend 递增)
51    /// 用于跟踪后端连接变化,使旧 session 失效
52    backend_version: Arc<AtomicU64>,
53}
54
55impl ServerHandler for ProxyHandler {
56    fn get_info(&self) -> ServerInfo {
57        self.cached_info.clone()
58    }
59
60    #[tracing::instrument(skip(self, request, context), fields(
61        mcp_id = %self.mcp_id,
62        request = ?request,
63    ))]
64    async fn list_tools(
65        &self,
66        request: Option<PaginatedRequestParams>,
67        context: RequestContext<RoleServer>,
68    ) -> Result<ListToolsResult, ErrorData> {
69        // 原子加载后端连接
70        let inner_guard = self.peer.load();
71        let inner = inner_guard.as_ref().ok_or_else(|| {
72            error!("Backend connection is not available (reconnecting)");
73            ErrorData::internal_error(
74                "Backend connection is not availabletemp/rust-sdk, reconnecting...".to_string(),
75                None,
76            )
77        })?;
78
79        // 检查后端连接是否已关闭
80        if inner.peer.is_transport_closed() {
81            error!("Backend transport is closed");
82            return Err(ErrorData::internal_error(
83                "Backend connection closed, please retry".to_string(),
84                None,
85            ));
86        }
87
88        // Check if the server has tools capability and forward the request
89        match self.capabilities().tools {
90            Some(_) => {
91                // 使用 tokio::select! 同时等待取消和结果
92                tokio::select! {
93                    result = inner.peer.list_tools(request) => {
94                        match result {
95                            Ok(result) => {
96                                // 根据过滤配置过滤工具列表
97                                let filtered_tools: Vec<_> = if self.tool_filter.is_enabled() {
98                                    result
99                                        .tools
100                                        .into_iter()
101                                        .filter(|tool| self.tool_filter.is_allowed(&tool.name))
102                                        .collect()
103                                } else {
104                                    result.tools
105                                };
106
107                                // 记录工具列表结果,这些结果会通过 SSE 推送给客户端
108                                info!(
109                                    "[list_tools] Tool list results - MCP ID: {}, number of tools: {}{}",
110                                    self.mcp_id,
111                                    filtered_tools.len(),
112                                    if self.tool_filter.is_enabled() {
113                                        " (filtered)"
114                                    } else {
115                                        ""
116                                    }
117                                );
118
119                                debug!(
120                                    "Proxying list_tools response with {} tools",
121                                    filtered_tools.len()
122                                );
123                                Ok(ListToolsResult {
124                                    tools: filtered_tools,
125                                    next_cursor: result.next_cursor,
126                                    meta: result.meta, // rmcp 0.12 新增字段
127                                })
128                            }
129                            Err(err) => {
130                                error!("Error listing tools: {:?}", err);
131                                Err(ErrorData::internal_error(
132                                    format!("Error listing tools: {err}"),
133                                    None,
134                                ))
135                            }
136                        }
137                    }
138                    _ = context.ct.cancelled() => {
139                        info!("[list_tools] Request canceled - MCP ID: {}", self.mcp_id);
140                        Err(ErrorData::internal_error(
141                            "Request cancelled".to_string(),
142                            None,
143                        ))
144                    }
145                }
146            }
147            None => {
148                // Server doesn't support tools, return empty list
149                warn!("Server doesn't support tools capability");
150                Ok(ListToolsResult::default())
151            }
152        }
153    }
154
155    #[tracing::instrument(skip(self, request, context), fields(
156        mcp_id = %self.mcp_id,
157        tool_name = %request.name,
158        tool_arguments = ?request.arguments,
159    ))]
160    async fn call_tool(
161        &self,
162        request: CallToolRequestParams,
163        context: RequestContext<RoleServer>,
164    ) -> Result<CallToolResult, ErrorData> {
165        // 生成唯一请求 ID 用于追踪
166        let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
167        let start = Instant::now();
168
169        info!(
170            "[call_tool:{}] Start - Tool: {}, MCP ID: {}",
171            request_id, request.name, self.mcp_id
172        );
173
174        // 首先检查工具是否被过滤
175        if !self.tool_filter.is_allowed(&request.name) {
176            info!(
177                "[call_tool:{}] Tool is filtered - MCP ID: {}, Tool: {}",
178                request_id, self.mcp_id, request.name
179            );
180            return Ok(CallToolResult::error(vec![Content::text(format!(
181                "Tool '{}' is not allowed by filter configuration",
182                request.name
183            ))]));
184        }
185
186        // 原子加载后端连接
187        let inner_guard = self.peer.load();
188        let inner = match inner_guard.as_ref() {
189            Some(inner) => {
190                let transport_closed = inner.peer.is_transport_closed();
191                info!(
192                    "[call_tool:{}] Backend connection exists - transport_closed: {}",
193                    request_id, transport_closed
194                );
195                inner
196            }
197            None => {
198                error!(
199                    "[call_tool:{}] Backend connection unavailable (reconnecting) - MCP ID: {}",
200                    request_id, self.mcp_id
201                );
202                return Ok(CallToolResult::error(vec![Content::text(
203                    "Backend connection is not available, reconnecting...",
204                )]));
205            }
206        };
207
208        // 检查后端连接是否已关闭
209        if inner.peer.is_transport_closed() {
210            error!(
211                "[call_tool:{}] Backend transport is closed - MCP ID: {}",
212                request_id, self.mcp_id
213            );
214            return Ok(CallToolResult::error(vec![Content::text(
215                "Backend connection closed, please retry",
216            )]));
217        }
218
219        // Check if the server has tools capability and forward the request
220        let result = match self.capabilities().tools {
221            Some(_) => {
222                // 记录发送请求到后端的时间点
223                info!(
224                    "[call_tool:{}] Send request to backend... - Tool: {}, Elapsed time: {}ms",
225                    request_id,
226                    request.name,
227                    start.elapsed().as_millis()
228                );
229
230                // 创建后端调用的 Future,使用 pin 固定
231                let call_future = inner.peer.call_tool(request.clone());
232                tokio::pin!(call_future);
233
234                // 等待心跳间隔(30秒)
235                const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
236                let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
237                // 跳过第一次立即触发
238                heartbeat_interval.tick().await;
239
240                // 使用循环 + select! 实现等待心跳
241                let call_result = loop {
242                    tokio::select! {
243                        // biased 确保按顺序优先检查,避免心跳检查抢占实际结果
244                        biased;
245
246                        result = &mut call_future => {
247                            break result;
248                        }
249                        _ = context.ct.cancelled() => {
250                            let elapsed = start.elapsed();
251                            warn!(
252                                "[call_tool:{}] Request canceled - Tool: {}, Time taken: {}ms, MCP ID: {}",
253                                request_id, request.name, elapsed.as_millis(), self.mcp_id
254                            );
255                            return Ok(CallToolResult::error(vec![Content::text(
256                                "Request cancelled"
257                            )]));
258                        }
259                        _ = heartbeat_interval.tick() => {
260                            // 定期打印等待日志,证明 mcp-proxy 在等待后端响应
261                            let elapsed = start.elapsed();
262                            let transport_closed = inner.peer.is_transport_closed();
263                            info!(
264                                "[call_tool:{}] Waiting for backend response... - Tool: {}, Waiting: {}ms, \\ transport_closed: {}, MCP ID: {}",
265                                request_id, request.name, elapsed.as_millis(),
266                                transport_closed, self.mcp_id
267                            );
268                        }
269                    }
270                };
271
272                let elapsed = start.elapsed();
273                match &call_result {
274                    Ok(call_result) => {
275                        // 记录工具调用结果
276                        let is_error = call_result.is_error.unwrap_or(false);
277                        info!(
278                            "[call_tool:{}] Response received - tool: {}, time taken: {}ms, is_error: {}, MCP ID: {}",
279                            request_id,
280                            request.name,
281                            elapsed.as_millis(),
282                            is_error,
283                            self.mcp_id
284                        );
285                        if is_error {
286                            debug!(
287                                "[call_tool:{}] Error response content: {:?}",
288                                request_id, call_result.content
289                            );
290                        }
291                        Ok(call_result.clone())
292                    }
293                    Err(err) => {
294                        error!(
295                            "[call_tool:{}] Backend returns error - Tool: {}, Time: {}ms, Error: {:?}, MCP ID: {}",
296                            request_id,
297                            request.name,
298                            elapsed.as_millis(),
299                            err,
300                            self.mcp_id
301                        );
302                        // Return an error result instead of propagating the error
303                        Ok(CallToolResult::error(vec![Content::text(format!(
304                            "Error: {err}"
305                        ))]))
306                    }
307                }
308            }
309            None => {
310                error!(
311                    "[call_tool:{}] The server does not support tools capability - MCP ID: {}",
312                    request_id, self.mcp_id
313                );
314                Ok(CallToolResult::error(vec![Content::text(
315                    "Server doesn't support tools capability",
316                )]))
317            }
318        };
319
320        let total_elapsed = start.elapsed();
321        info!(
322            "[call_tool:{}] Completed - Tool: {}, total time taken: {}ms",
323            request_id,
324            request.name,
325            total_elapsed.as_millis()
326        );
327        result
328    }
329
330    async fn list_resources(
331        &self,
332        request: Option<PaginatedRequestParams>,
333        context: RequestContext<RoleServer>,
334    ) -> Result<rmcp::model::ListResourcesResult, ErrorData> {
335        // 原子加载后端连接
336        let inner_guard = self.peer.load();
337        let inner = inner_guard.as_ref().ok_or_else(|| {
338            error!("Backend connection is not available (reconnecting)");
339            ErrorData::internal_error(
340                "Backend connection is not available, reconnecting...".to_string(),
341                None,
342            )
343        })?;
344
345        // 检查后端连接是否已关闭
346        if inner.peer.is_transport_closed() {
347            error!("Backend transport is closed");
348            return Err(ErrorData::internal_error(
349                "Backend connection closed, please retry".to_string(),
350                None,
351            ));
352        }
353
354        // Check if the server has resources capability and forward the request
355        match self.capabilities().resources {
356            Some(_) => {
357                tokio::select! {
358                    result = inner.peer.list_resources(request) => {
359                        match result {
360                            Ok(result) => {
361                                // 记录资源列表结果,这些结果会通过 SSE 推送给客户端
362                                info!(
363                                    "[list_resources] Resource list results - MCP ID: {}, resource quantity: {}",
364                                    self.mcp_id,
365                                    result.resources.len()
366                                );
367
368                                debug!("Proxying list_resources response");
369                                Ok(result)
370                            }
371                            Err(err) => {
372                                error!("Error listing resources: {:?}", err);
373                                Err(ErrorData::internal_error(
374                                    format!("Error listing resources: {err}"),
375                                    None,
376                                ))
377                            }
378                        }
379                    }
380                    _ = context.ct.cancelled() => {
381                        info!("[list_resources] Request canceled - MCP ID: {}", self.mcp_id);
382                        Err(ErrorData::internal_error(
383                            "Request cancelled".to_string(),
384                            None,
385                        ))
386                    }
387                }
388            }
389            None => {
390                // Server doesn't support resources, return empty list
391                warn!("Server doesn't support resources capability");
392                Ok(rmcp::model::ListResourcesResult::default())
393            }
394        }
395    }
396
397    async fn read_resource(
398        &self,
399        request: rmcp::model::ReadResourceRequestParams,
400        context: RequestContext<RoleServer>,
401    ) -> Result<rmcp::model::ReadResourceResult, ErrorData> {
402        // 原子加载后端连接
403        let inner_guard = self.peer.load();
404        let inner = inner_guard.as_ref().ok_or_else(|| {
405            error!("Backend connection is not available (reconnecting)");
406            ErrorData::internal_error(
407                "Backend connection is not available, reconnecting...".to_string(),
408                None,
409            )
410        })?;
411
412        // 检查后端连接是否已关闭
413        if inner.peer.is_transport_closed() {
414            error!("Backend transport is closed");
415            return Err(ErrorData::internal_error(
416                "Backend connection closed, please retry".to_string(),
417                None,
418            ));
419        }
420
421        // Check if the server has resources capability and forward the request
422        match self.capabilities().resources {
423            Some(_) => {
424                tokio::select! {
425                    result = inner.peer.read_resource(rmcp::model::ReadResourceRequestParams::new(request.uri.clone())) => {
426                        match result {
427                            Ok(result) => {
428                                // 记录资源读取结果,这些结果会通过 SSE 推送给客户端
429                                info!(
430                                    "[read_resource] Resource read result - MCP ID: {}, URI: {}",
431                                    self.mcp_id, request.uri
432                                );
433
434                                debug!("Proxying read_resource response for {}", request.uri);
435                                Ok(result)
436                            }
437                            Err(err) => {
438                                error!("Error reading resource: {:?}", err);
439                                Err(ErrorData::internal_error(
440                                    format!("Error reading resource: {err}"),
441                                    None,
442                                ))
443                            }
444                        }
445                    }
446                    _ = context.ct.cancelled() => {
447                        info!("[read_resource] Request canceled - MCP ID: {}, URI: {}", self.mcp_id, request.uri);
448                        Err(ErrorData::internal_error(
449                            "Request cancelled".to_string(),
450                            None,
451                        ))
452                    }
453                }
454            }
455            None => {
456                // Server doesn't support resources, return error
457                error!("Server doesn't support resources capability");
458                Ok(rmcp::model::ReadResourceResult::new(vec![]))
459            }
460        }
461    }
462
463    async fn list_resource_templates(
464        &self,
465        request: Option<PaginatedRequestParams>,
466        context: RequestContext<RoleServer>,
467    ) -> Result<rmcp::model::ListResourceTemplatesResult, ErrorData> {
468        // 原子加载后端连接
469        let inner_guard = self.peer.load();
470        let inner = inner_guard.as_ref().ok_or_else(|| {
471            error!("Backend connection is not available (reconnecting)");
472            ErrorData::internal_error(
473                "Backend connection is not available, reconnecting...".to_string(),
474                None,
475            )
476        })?;
477
478        // 检查后端连接是否已关闭
479        if inner.peer.is_transport_closed() {
480            error!("Backend transport is closed");
481            return Err(ErrorData::internal_error(
482                "Backend connection closed, please retry".to_string(),
483                None,
484            ));
485        }
486
487        // Check if the server has resources capability and forward the request
488        match self.capabilities().resources {
489            Some(_) => {
490                tokio::select! {
491                    result = inner.peer.list_resource_templates(request) => {
492                        match result {
493                            Ok(result) => {
494                                debug!("Proxying list_resource_templates response");
495                                Ok(result)
496                            }
497                            Err(err) => {
498                                error!("Error listing resource templates: {:?}", err);
499                                Err(ErrorData::internal_error(
500                                    format!("Error listing resource templates: {err}"),
501                                    None,
502                                ))
503                            }
504                        }
505                    }
506                    _ = context.ct.cancelled() => {
507                        info!("[list_resource_templates] request canceled - MCP ID: {}", self.mcp_id);
508                        Err(ErrorData::internal_error(
509                            "Request cancelled".to_string(),
510                            None,
511                        ))
512                    }
513                }
514            }
515            None => {
516                // Server doesn't support resources, return empty list
517                warn!("Server doesn't support resources capability");
518                Ok(rmcp::model::ListResourceTemplatesResult::default())
519            }
520        }
521    }
522
523    async fn list_prompts(
524        &self,
525        request: Option<PaginatedRequestParams>,
526        context: RequestContext<RoleServer>,
527    ) -> Result<rmcp::model::ListPromptsResult, ErrorData> {
528        // 原子加载后端连接
529        let inner_guard = self.peer.load();
530        let inner = inner_guard.as_ref().ok_or_else(|| {
531            error!("Backend connection is not available (reconnecting)");
532            ErrorData::internal_error(
533                "Backend connection is not available, reconnecting...".to_string(),
534                None,
535            )
536        })?;
537
538        // 检查后端连接是否已关闭
539        if inner.peer.is_transport_closed() {
540            error!("Backend transport is closed");
541            return Err(ErrorData::internal_error(
542                "Backend connection closed, please retry".to_string(),
543                None,
544            ));
545        }
546
547        // Check if the server has prompts capability and forward the request
548        match self.capabilities().prompts {
549            Some(_) => {
550                tokio::select! {
551                    result = inner.peer.list_prompts(request) => {
552                        match result {
553                            Ok(result) => {
554                                debug!("Proxying list_prompts response");
555                                Ok(result)
556                            }
557                            Err(err) => {
558                                error!("Error listing prompts: {:?}", err);
559                                Err(ErrorData::internal_error(
560                                    format!("Error listing prompts: {err}"),
561                                    None,
562                                ))
563                            }
564                        }
565                    }
566                    _ = context.ct.cancelled() => {
567                        info!("[list_prompts] Request canceled - MCP ID: {}", self.mcp_id);
568                        Err(ErrorData::internal_error(
569                            "Request cancelled".to_string(),
570                            None,
571                        ))
572                    }
573                }
574            }
575            None => {
576                // Server doesn't support prompts, return empty list
577                warn!("Server doesn't support prompts capability");
578                Ok(rmcp::model::ListPromptsResult::default())
579            }
580        }
581    }
582
583    async fn get_prompt(
584        &self,
585        request: rmcp::model::GetPromptRequestParams,
586        context: RequestContext<RoleServer>,
587    ) -> Result<rmcp::model::GetPromptResult, ErrorData> {
588        // 原子加载后端连接
589        let inner_guard = self.peer.load();
590        let inner = inner_guard.as_ref().ok_or_else(|| {
591            error!("Backend connection is not available (reconnecting)");
592            ErrorData::internal_error(
593                "Backend connection is not available, reconnecting...".to_string(),
594                None,
595            )
596        })?;
597
598        // 检查后端连接是否已关闭
599        if inner.peer.is_transport_closed() {
600            error!("Backend transport is closed");
601            return Err(ErrorData::internal_error(
602                "Backend connection closed, please retry".to_string(),
603                None,
604            ));
605        }
606
607        // Check if the server has prompts capability and forward the request
608        match self.capabilities().prompts {
609            Some(_) => {
610                tokio::select! {
611                    result = inner.peer.get_prompt(request.clone()) => {
612                        match result {
613                            Ok(result) => {
614                                debug!("Proxying get_prompt response");
615                                Ok(result)
616                            }
617                            Err(err) => {
618                                error!("Error getting prompt: {:?}", err);
619                                Err(ErrorData::internal_error(
620                                    format!("Error getting prompt: {err}"),
621                                    None,
622                                ))
623                            }
624                        }
625                    }
626                    _ = context.ct.cancelled() => {
627                        info!("[get_prompt] Request canceled - MCP ID: {}, prompt: {:?}", self.mcp_id, request.name);
628                        Err(ErrorData::internal_error(
629                            "Request cancelled".to_string(),
630                            None,
631                        ))
632                    }
633                }
634            }
635            None => {
636                // Server doesn't support prompts, return empty messages
637                warn!("Server doesn't support prompts capability");
638                let messages = Vec::new();
639                Ok(rmcp::model::GetPromptResult::new(messages))
640            }
641        }
642    }
643
644    async fn complete(
645        &self,
646        request: rmcp::model::CompleteRequestParams,
647        context: RequestContext<RoleServer>,
648    ) -> Result<rmcp::model::CompleteResult, ErrorData> {
649        // 原子加载后端连接
650        let inner_guard = self.peer.load();
651        let inner = inner_guard.as_ref().ok_or_else(|| {
652            error!("Backend connection is not available (reconnecting)");
653            ErrorData::internal_error(
654                "Backend connection is not available, reconnecting...".to_string(),
655                None,
656            )
657        })?;
658
659        // 检查后端连接是否已关闭
660        if inner.peer.is_transport_closed() {
661            error!("Backend transport is closed");
662            return Err(ErrorData::internal_error(
663                "Backend connection closed, please retry".to_string(),
664                None,
665            ));
666        }
667
668        tokio::select! {
669            result = inner.peer.complete(request) => {
670                match result {
671                    Ok(result) => {
672                        debug!("Proxying complete response");
673                        Ok(result)
674                    }
675                    Err(err) => {
676                        error!("Error completing: {:?}", err);
677                        Err(ErrorData::internal_error(
678                            format!("Error completing: {err}"),
679                            None,
680                        ))
681                    }
682                }
683            }
684            _ = context.ct.cancelled() => {
685                info!("[complete] Request canceled - MCP ID: {}", self.mcp_id);
686                Err(ErrorData::internal_error(
687                    "Request cancelled".to_string(),
688                    None,
689                ))
690            }
691        }
692    }
693
694    async fn on_progress(
695        &self,
696        notification: rmcp::model::ProgressNotificationParam,
697        _context: NotificationContext<RoleServer>,
698    ) {
699        // 原子加载后端连接
700        let inner_guard = self.peer.load();
701        let inner = match inner_guard.as_ref() {
702            Some(inner) => inner,
703            None => {
704                error!("Backend connection is not available, cannot forward progress notification");
705                return;
706            }
707        };
708
709        // 检查后端连接是否已关闭
710        if inner.peer.is_transport_closed() {
711            error!("Backend transport is closed, cannot forward progress notification");
712            return;
713        }
714
715        match inner.peer.notify_progress(notification).await {
716            Ok(_) => {
717                debug!("Proxying progress notification");
718            }
719            Err(err) => {
720                error!("Error notifying progress: {:?}", err);
721            }
722        }
723    }
724
725    async fn on_cancelled(
726        &self,
727        notification: rmcp::model::CancelledNotificationParam,
728        _context: NotificationContext<RoleServer>,
729    ) {
730        // 原子加载后端连接
731        let inner_guard = self.peer.load();
732        let inner = match inner_guard.as_ref() {
733            Some(inner) => inner,
734            None => {
735                error!(
736                    "Backend connection is not available, cannot forward cancelled notification"
737                );
738                return;
739            }
740        };
741
742        // 检查后端连接是否已关闭
743        if inner.peer.is_transport_closed() {
744            error!("Backend transport is closed, cannot forward cancelled notification");
745            return;
746        }
747
748        match inner.peer.notify_cancelled(notification).await {
749            Ok(_) => {
750                debug!("Proxying cancelled notification");
751            }
752            Err(err) => {
753                error!("Error notifying cancelled: {:?}", err);
754            }
755        }
756    }
757}
758
759impl ProxyHandler {
760    /// 获取 capabilities 的引用,避免 clone
761    #[inline]
762    fn capabilities(&self) -> &rmcp::model::ServerCapabilities {
763        &self.cached_info.capabilities
764    }
765
766    /// 创建一个默认的 ServerInfo(用于断开状态)
767    fn default_server_info(mcp_id: &str) -> ServerInfo {
768        warn!(
769            "[ProxyHandler] Create default ServerInfo - MCP ID: {}",
770            mcp_id
771        );
772        ServerInfo::new(rmcp::model::ServerCapabilities::default())
773            .with_server_info(Implementation::new("MCP Proxy", "0.1.0"))
774    }
775
776    /// 从 RunningService 提取 ServerInfo
777    fn extract_server_info(
778        client: &RunningService<RoleClient, ClientInfo>,
779        mcp_id: &str,
780    ) -> ServerInfo {
781        client
782            .peer_info()
783            .map(|peer_info| {
784                ServerInfo::new(peer_info.capabilities.clone())
785                    .with_protocol_version(peer_info.protocol_version.clone())
786                    .with_server_info(Implementation::new(
787                        peer_info.server_info.name.clone(),
788                        peer_info.server_info.version.clone(),
789                    ))
790                    .with_instructions(peer_info.instructions.clone().unwrap_or_default())
791            })
792            .unwrap_or_else(|| Self::default_server_info(mcp_id))
793    }
794
795    /// 创建断开状态的 handler(用于初始化)
796    /// 后续通过 swap_backend() 注入实际的后端连接
797    pub fn new_disconnected(
798        mcp_id: String,
799        tool_filter: ToolFilter,
800        default_info: ServerInfo,
801    ) -> Self {
802        info!(
803            "[ProxyHandler] Create a disconnected handler - MCP ID: {}",
804            mcp_id
805        );
806
807        // 记录过滤器配置
808        if tool_filter.is_enabled() {
809            if let Some(ref allow_list) = tool_filter.allow_tools {
810                info!(
811                    "[ProxyHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
812                    mcp_id, allow_list
813                );
814            }
815            if let Some(ref deny_list) = tool_filter.deny_tools {
816                info!(
817                    "[ProxyHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
818                    mcp_id, deny_list
819                );
820            }
821        }
822
823        Self {
824            peer: Arc::new(ArcSwapOption::empty()),
825            cached_info: default_info,
826            mcp_id,
827            tool_filter,
828            backend_version: Arc::new(AtomicU64::new(0)), // 断开状态版本为 0
829        }
830    }
831
832    pub fn new(client: RunningService<RoleClient, ClientInfo>) -> Self {
833        Self::with_mcp_id(client, "unknown".to_string())
834    }
835
836    pub fn with_mcp_id(client: RunningService<RoleClient, ClientInfo>, mcp_id: String) -> Self {
837        Self::with_tool_filter(client, mcp_id, ToolFilter::default())
838    }
839
840    /// 创建带工具过滤器的 ProxyHandler(带初始后端连接)
841    pub fn with_tool_filter(
842        client: RunningService<RoleClient, ClientInfo>,
843        mcp_id: String,
844        tool_filter: ToolFilter,
845    ) -> Self {
846        use std::ops::Deref;
847
848        // 提取 ServerInfo
849        let cached_info = Self::extract_server_info(&client, &mcp_id);
850
851        // 克隆 Peer 用于并发请求(无需锁)
852        let peer = client.deref().clone();
853
854        // 记录过滤器配置
855        if tool_filter.is_enabled() {
856            if let Some(ref allow_list) = tool_filter.allow_tools {
857                info!(
858                    "[ProxyHandler] Tool whitelist enabled - MCP ID: {}, allowed tools: {:?}",
859                    mcp_id, allow_list
860                );
861            }
862            if let Some(ref deny_list) = tool_filter.deny_tools {
863                info!(
864                    "[ProxyHandler] Tool blacklist enabled - MCP ID: {}, excluded tools: {:?}",
865                    mcp_id, deny_list
866                );
867            }
868        }
869
870        // 创建 PeerInner
871        let inner = PeerInner {
872            peer,
873            _running: Arc::new(client),
874        };
875
876        Self {
877            peer: Arc::new(ArcSwapOption::from(Some(Arc::new(inner)))),
878            cached_info,
879            mcp_id,
880            tool_filter,
881            backend_version: Arc::new(AtomicU64::new(1)), // 初始版本为 1
882        }
883    }
884
885    /// 原子性替换后端连接
886    /// - Some(client): 设置新的后端连接
887    /// - None: 标记后端断开
888    ///
889    /// **版本控制**:每次调用都会递增 backend_version,使旧 session 失效
890    pub fn swap_backend(&self, new_client: Option<RunningService<RoleClient, ClientInfo>>) {
891        use std::ops::Deref;
892
893        match new_client {
894            Some(client) => {
895                let peer = client.deref().clone();
896                let inner = PeerInner {
897                    peer,
898                    _running: Arc::new(client),
899                };
900                self.peer.store(Some(Arc::new(inner)));
901                info!(
902                    "[ProxyHandler] Backend connection updated - MCP ID: {}",
903                    self.mcp_id
904                );
905            }
906            None => {
907                self.peer.store(None);
908                info!(
909                    "[ProxyHandler] Backend connection disconnected - MCP ID: {}",
910                    self.mcp_id
911                );
912            }
913        }
914
915        // 关键:递增版本号,使所有旧 session 失效
916        let new_version = self.backend_version.fetch_add(1, Ordering::SeqCst) + 1;
917        info!(
918            "[ProxyHandler] Backend version update: {} - MCP ID: {}",
919            new_version, self.mcp_id
920        );
921    }
922
923    /// 检查后端是否可用(快速检查,不发送请求)
924    pub fn is_backend_available(&self) -> bool {
925        let inner_guard = self.peer.load();
926        match inner_guard.as_ref() {
927            Some(inner) => !inner.peer.is_transport_closed(),
928            None => false,
929        }
930    }
931
932    /// 检查 mcp 服务是否正常(异步版本,会发送验证请求)
933    pub async fn is_mcp_server_ready(&self) -> bool {
934        !self.is_terminated_async().await
935    }
936
937    /// 检查后端连接是否已关闭(同步版本,仅检查 transport 状态)
938    pub fn is_terminated(&self) -> bool {
939        !self.is_backend_available()
940    }
941
942    /// 异步检查后端连接是否已断开(会发送验证请求)
943    pub async fn is_terminated_async(&self) -> bool {
944        // 原子加载后端连接
945        let inner_guard = self.peer.load();
946        let inner = match inner_guard.as_ref() {
947            Some(inner) => inner,
948            None => return true,
949        };
950
951        // 快速检查 transport 状态
952        if inner.peer.is_transport_closed() {
953            return true;
954        }
955
956        // 通过发送轻量级请求来验证连接
957        match inner.peer.list_tools(None).await {
958            Ok(_) => {
959                debug!("Backend connection status check: OK");
960                false
961            }
962            Err(e) => {
963                info!("Backend connection status check: Disconnected, reason: {e}");
964                true
965            }
966        }
967    }
968
969    /// 获取 MCP ID
970    pub fn mcp_id(&self) -> &str {
971        &self.mcp_id
972    }
973
974    /// 获取当前后端版本号
975    ///
976    /// 版本号用于跟踪后端连接变化:
977    /// - 0: 断开状态
978    /// - 1+: 已连接,每次 swap_backend 递增
979    ///
980    /// **用途**:配合 ProxyAwareSessionManager 实现 session 版本控制
981    pub fn get_backend_version(&self) -> u64 {
982        self.backend_version.load(Ordering::SeqCst)
983    }
984
985    /// Update backend from a StreamClientConnection
986    ///
987    /// This method allows updating the backend connection using the high-level
988    /// `StreamClientConnection` type, which is more convenient than the raw
989    /// `RunningService` type.
990    ///
991    /// # Arguments
992    /// * `conn` - Some(connection) to set new backend, None to mark disconnected
993    pub fn swap_backend_from_connection(
994        &self,
995        conn: Option<crate::client::StreamClientConnection>,
996    ) {
997        match conn {
998            Some(c) => {
999                let running = c.into_running_service();
1000                self.swap_backend(Some(running));
1001            }
1002            None => {
1003                self.swap_backend(None);
1004            }
1005        }
1006    }
1007}