mcp_stdio_proxy/client/
cli.rs

1// MCP-Proxy CLI 简化实现 - 修复版本
2// 直接使用 rmcp 库的功能,无需复杂的 trait 抽象
3
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Duration;
7
8use clap::Parser;
9use anyhow::{Result, bail};
10use serde::Deserialize;
11use tokio::process::Command;
12use tracing::error;
13
14use rmcp::{
15    ServiceExt,
16    model::{ClientCapabilities, ClientInfo},
17    transport::{SseClientTransport, StreamableHttpClientTransport, TokioChildProcess, sse_client::SseClientConfig, streamable_http_client::StreamableHttpClientTransportConfig, stdio},
18};
19use crate::proxy::{ProxyHandler, ToolFilter};
20
21/// MCP-Proxy CLI 主命令结构
22#[derive(Parser, Debug)]
23#[command(name = "mcp-proxy")]
24#[command(version = env!("CARGO_PKG_VERSION"))]
25#[command(about = "MCP 协议转换代理工具", long_about = None)]
26pub struct Cli {
27    #[command(subcommand)]
28    pub command: Option<Commands>,
29    
30    /// 直接URL模式(向后兼容)
31    #[arg(value_name = "URL", help = "MCP 服务的 URL 地址(直接模式)")]
32    pub url: Option<String>,
33    
34    /// 全局详细输出
35    #[arg(short, long, global = true)]
36    pub verbose: bool,
37    
38    /// 全局静默模式
39    #[arg(short, long, global = true)]
40    pub quiet: bool,
41}
42
43#[derive(clap::Subcommand, Debug)]
44pub enum Commands {
45    /// 协议转换模式 - 将 URL 转换为 stdio
46    Convert(ConvertArgs),
47
48    /// 检查服务状态
49    Check(CheckArgs),
50
51    /// 协议检测
52    Detect(DetectArgs),
53
54    /// 代理模式 - 将 stdio MCP 服务代理为 HTTP/SSE 服务
55    Proxy(super::proxy_server::ProxyArgs),
56}
57
58/// 协议转换参数
59#[derive(Parser, Debug, Clone)]
60pub struct ConvertArgs {
61    /// MCP 服务的 URL 地址(可选,与 --config/--config-file 二选一)
62    #[arg(value_name = "URL", help = "MCP 服务的 URL 地址")]
63    pub url: Option<String>,
64
65    /// MCP 服务配置 JSON
66    #[arg(long, conflicts_with = "config_file", help = "MCP 服务配置 JSON")]
67    pub config: Option<String>,
68
69    /// MCP 服务配置文件路径
70    #[arg(long, conflicts_with = "config", help = "MCP 服务配置文件路径")]
71    pub config_file: Option<std::path::PathBuf>,
72
73    /// MCP 服务名称(多服务配置时必需)
74    #[arg(short, long, help = "MCP 服务名称(多服务配置时必需)")]
75    pub name: Option<String>,
76
77    /// 指定远程服务协议类型(不指定则自动检测)
78    #[arg(long, value_enum, help = "指定远程服务协议类型(不指定则自动检测)")]
79    pub protocol: Option<super::proxy_server::ProxyProtocol>,
80
81    /// 认证 header (如: "Bearer token")
82    #[arg(short, long, help = "认证 header")]
83    pub auth: Option<String>,
84
85    /// 自定义 HTTP headers
86    #[arg(short = 'H', long, value_parser = parse_key_val, help = "自定义 HTTP headers (KEY=VALUE 格式)")]
87    pub header: Vec<(String, String)>,
88
89    /// 重试次数
90    #[arg(long, default_value = "0", help = "重试次数,0 表示无限重试")]
91    pub retries: u32,
92
93    /// 工具白名单(逗号分隔),只允许指定的工具
94    #[arg(long, value_delimiter = ',', help = "工具白名单(逗号分隔),只允许指定的工具")]
95    pub allow_tools: Option<Vec<String>>,
96
97    /// 工具黑名单(逗号分隔),排除指定的工具
98    #[arg(long, value_delimiter = ',', help = "工具黑名单(逗号分隔),排除指定的工具")]
99    pub deny_tools: Option<Vec<String>>,
100
101    /// 客户端 ping 间隔(秒),0 表示禁用
102    #[arg(long, default_value = "30", help = "客户端 ping 间隔(秒),0 表示禁用")]
103    pub ping_interval: u64,
104
105    /// 客户端 ping 超时(秒)
106    #[arg(long, default_value = "10", help = "客户端 ping 超时(秒),超时则认为连接断开")]
107    pub ping_timeout: u64,
108}
109
110/// 检查参数
111#[derive(Parser, Debug)]
112pub struct CheckArgs {
113    /// 要检查的 MCP 服务 URL
114    #[arg(value_name = "URL")]
115    pub url: String,
116    
117    /// 认证 header
118    #[arg(short, long)]
119    pub auth: Option<String>,
120    
121    /// 超时时间
122    #[arg(long, default_value = "10")]
123    pub timeout: u64,
124}
125
126/// 协议检测参数
127#[derive(Parser, Debug)]
128pub struct DetectArgs {
129    /// 要检测的 MCP 服务 URL
130    #[arg(value_name = "URL")]
131    pub url: String,
132    
133    /// 认证 header
134    #[arg(short, long)]
135    pub auth: Option<String>,
136}
137
138/// 解析 KEY=VALUE 格式的辅助函数
139fn parse_key_val(s: &str) -> Result<(String, String)> {
140    let pos = s.find('=')
141        .ok_or_else(|| anyhow::anyhow!("无效的 KEY=VALUE 格式: {}", s))?;
142    Ok((s[..pos].to_string(), s[pos + 1..].to_string()))
143}
144
145// ============== MCP 配置解析相关 ==============
146
147/// MCP 配置格式
148#[derive(Deserialize, Debug)]
149struct McpConfig {
150    #[serde(rename = "mcpServers")]
151    mcp_servers: HashMap<String, McpServerInnerConfig>,
152}
153
154/// MCP 服务配置(支持 Command 和 Url 两种类型)
155#[derive(Deserialize, Debug, Clone)]
156#[serde(untagged)]
157enum McpServerInnerConfig {
158    Command(StdioConfig),
159    Url(UrlConfig),
160}
161
162/// stdio 配置(本地命令)
163#[derive(Deserialize, Debug, Clone)]
164struct StdioConfig {
165    command: String,
166    args: Option<Vec<String>>,
167    env: Option<HashMap<String, String>>,
168}
169
170/// URL 配置(远程服务)
171#[derive(Deserialize, Debug, Clone)]
172struct UrlConfig {
173    #[serde(skip_serializing_if = "Option::is_none")]
174    url: Option<String>,
175    #[serde(
176        skip_serializing_if = "Option::is_none",
177        default,
178        rename = "baseUrl",
179        alias = "baseurl",
180        alias = "base_url"
181    )]
182    base_url: Option<String>,
183    #[serde(default, rename = "type", alias = "Type")]
184    r#type: Option<String>,
185    pub headers: Option<HashMap<String, String>>,
186    #[serde(default, alias = "authToken", alias = "auth_token")]
187    pub auth_token: Option<String>,
188    pub timeout: Option<u64>,
189}
190
191impl UrlConfig {
192    fn get_url(&self) -> Option<&str> {
193        self.url.as_deref().or(self.base_url.as_deref())
194    }
195}
196
197/// 解析后的配置源
198enum McpConfigSource {
199    /// 直接 URL 模式(命令行参数)
200    DirectUrl {
201        url: String,
202    },
203    /// 远程服务配置(JSON 配置)
204    RemoteService {
205        name: String,
206        url: String,
207        protocol: Option<super::protocol::McpProtocol>,
208        headers: HashMap<String, String>,
209        timeout: Option<u64>,
210    },
211    /// 本地命令配置(JSON 配置)
212    LocalCommand {
213        name: String,
214        command: String,
215        args: Vec<String>,
216        env: HashMap<String, String>,
217    },
218}
219
220/// 解析 convert 命令的配置
221fn parse_convert_config(args: &ConvertArgs) -> Result<McpConfigSource> {
222    // 优先级:url > config > config_file
223    if let Some(ref url) = args.url {
224        return Ok(McpConfigSource::DirectUrl { url: url.clone() });
225    }
226
227    // 读取 JSON 配置
228    let json_str = if let Some(ref config) = args.config {
229        config.clone()
230    } else if let Some(ref path) = args.config_file {
231        std::fs::read_to_string(path)
232            .map_err(|e| anyhow::anyhow!("读取配置文件失败: {}", e))?
233    } else {
234        bail!("必须提供 URL、--config 或 --config-file 参数之一");
235    };
236
237    // 解析 JSON 配置
238    let mcp_config: McpConfig = serde_json::from_str(&json_str)
239        .map_err(|e| anyhow::anyhow!(
240            "配置解析失败: {}。配置必须是标准 MCP 格式,包含 mcpServers 字段",
241            e
242        ))?;
243
244    let servers = mcp_config.mcp_servers;
245
246    if servers.is_empty() {
247        bail!("配置中没有找到任何 MCP 服务");
248    }
249
250    // 选择服务
251    let (name, inner_config) = if servers.len() == 1 {
252        servers.into_iter().next().unwrap()
253    } else if let Some(ref name) = args.name {
254        let config = servers.get(name)
255            .cloned()
256            .ok_or_else(|| anyhow::anyhow!(
257                "服务 '{}' 不存在。可用服务: {:?}",
258                name,
259                servers.keys().collect::<Vec<_>>()
260            ))?;
261        (name.clone(), config)
262    } else {
263        bail!(
264            "配置包含多个服务 {:?},请使用 --name 指定要使用的服务",
265            servers.keys().collect::<Vec<_>>()
266        );
267    };
268
269    // 根据配置类型返回
270    match inner_config {
271        McpServerInnerConfig::Command(stdio) => {
272            Ok(McpConfigSource::LocalCommand {
273                name,
274                command: stdio.command,
275                args: stdio.args.unwrap_or_default(),
276                env: stdio.env.unwrap_or_default(),
277            })
278        }
279        McpServerInnerConfig::Url(url_config) => {
280            let url = url_config.get_url()
281                .ok_or_else(|| anyhow::anyhow!("URL 配置缺少 url 或 baseUrl 字段"))?
282                .to_string();
283
284            // 解析协议类型
285            let protocol = url_config.r#type.as_ref().and_then(|t| {
286                match t.as_str() {
287                    "sse" => Some(super::protocol::McpProtocol::Sse),
288                    "http" | "stream" => Some(super::protocol::McpProtocol::Stream),
289                    _ => None,
290                }
291            });
292
293            // 合并 headers:JSON 配置中的 auth_token -> Authorization
294            let mut headers = url_config.headers.clone().unwrap_or_default();
295            if let Some(auth_token) = &url_config.auth_token {
296                headers.insert("Authorization".to_string(), auth_token.clone());
297            }
298
299            Ok(McpConfigSource::RemoteService {
300                name,
301                url,
302                protocol,
303                headers,
304                timeout: url_config.timeout,
305            })
306        }
307    }
308}
309
310/// 合并 headers:JSON 配置 + 命令行参数(命令行优先)
311fn merge_headers(
312    config_headers: HashMap<String, String>,
313    cli_headers: &[(String, String)],
314    cli_auth: Option<&String>,
315) -> HashMap<String, String> {
316    let mut merged = config_headers;
317
318    // 命令行 -H 参数覆盖配置
319    for (key, value) in cli_headers {
320        merged.insert(key.clone(), value.clone());
321    }
322
323    // 命令行 --auth 参数优先级最高
324    if let Some(auth_value) = cli_auth {
325        merged.insert("Authorization".to_string(), auth_value.clone());
326    }
327
328    merged
329}
330
331/// 运行 CLI 主逻辑
332pub async fn run_cli(cli: Cli) -> Result<()> {
333    match cli.command {
334        Some(Commands::Convert(args)) => {
335            run_convert_command(args, cli.verbose, cli.quiet).await
336        }
337        Some(Commands::Check(args)) => {
338            run_check_command(args, cli.verbose, cli.quiet).await
339        }
340        Some(Commands::Detect(args)) => {
341            run_detect_command(args, cli.verbose, cli.quiet).await
342        }
343        Some(Commands::Proxy(args)) => {
344            super::proxy_server::run_proxy_command(args, cli.verbose, cli.quiet).await
345        }
346        None => {
347            // 直接 URL 模式(向后兼容)
348            if let Some(url) = cli.url {
349                let args = ConvertArgs {
350                    url: Some(url),
351                    config: None,
352                    config_file: None,
353                    name: None,
354                    protocol: None,
355                    auth: None,
356                    header: vec![],
357                    retries: 0,    // 无限重试
358                    allow_tools: None,
359                    deny_tools: None,
360                    ping_interval: 30,  // 默认 30 秒 ping 一次
361                    ping_timeout: 10,   // 默认 10 秒超时
362                };
363                run_convert_command(args, cli.verbose, cli.quiet).await
364            } else {
365                bail!("请提供 URL 或使用子命令")
366            }
367        }
368    }
369}
370
371/// 运行转换命令 - 核心功能
372async fn run_convert_command(args: ConvertArgs, verbose: bool, quiet: bool) -> Result<()> {
373    // 检查 --allow-tools 和 --deny-tools 互斥
374    if args.allow_tools.is_some() && args.deny_tools.is_some() {
375        bail!("--allow-tools 和 --deny-tools 不能同时使用,请只选择其中一个");
376    }
377
378    // 创建工具过滤器
379    let tool_filter = if let Some(allow_tools) = args.allow_tools.clone() {
380        ToolFilter::allow(allow_tools)
381    } else if let Some(deny_tools) = args.deny_tools.clone() {
382        ToolFilter::deny(deny_tools)
383    } else {
384        ToolFilter::default()
385    };
386
387    // 解析配置
388    let config_source = parse_convert_config(&args)?;
389
390    // 配置客户端能力
391    let client_info = ClientInfo {
392        protocol_version: Default::default(),
393        capabilities: ClientCapabilities::builder()
394            .enable_experimental()
395            .enable_roots()
396            .enable_roots_list_changed()
397            .enable_sampling()
398            .build(),
399        ..Default::default()
400    };
401
402    // 根据配置源执行不同逻辑
403    match config_source {
404        McpConfigSource::DirectUrl { url } => {
405            // 直接 URL 模式(带自动重连)
406            run_url_mode_with_retry(&args, &url, HashMap::new(), None, client_info, tool_filter, verbose, quiet).await
407        }
408        McpConfigSource::RemoteService { name, url, protocol, headers, timeout } => {
409            // 远程服务配置模式
410            if !quiet {
411                eprintln!("🚀 MCP-Stdio-Proxy: {} ({}) → stdio", name, url);
412            }
413            // 合并 headers:配置 + 命令行
414            let merged_headers = merge_headers(headers, &args.header, args.auth.as_ref());
415            run_url_mode_with_retry(&args, &url, merged_headers, protocol.or(timeout.map(|_| super::protocol::McpProtocol::Stream)), client_info, tool_filter, verbose, quiet).await
416        }
417        McpConfigSource::LocalCommand { name, command, args: cmd_args, env } => {
418            // 本地命令模式
419            run_command_mode(&name, &command, cmd_args, env, client_info, tool_filter, verbose, quiet).await
420        }
421    }
422}
423
424/// URL 模式执行(带自动重连)
425/// 先建立后端连接,成功后再启动 stdio server
426/// 当连接断开时 watchdog 会自动重连
427async fn run_url_mode_with_retry(
428    args: &ConvertArgs,
429    url: &str,
430    merged_headers: HashMap<String, String>,
431    config_protocol: Option<super::protocol::McpProtocol>,
432    client_info: ClientInfo,
433    tool_filter: ToolFilter,
434    verbose: bool,
435    quiet: bool,
436) -> Result<()> {
437    if !quiet && merged_headers.is_empty() {
438        eprintln!("🚀 MCP-Stdio-Proxy: {} → stdio", url);
439    }
440
441    // 显示过滤器配置
442    if !quiet {
443        if let Some(ref allow_tools) = args.allow_tools {
444            eprintln!("🔧 工具白名单: {:?}", allow_tools);
445        }
446        if let Some(ref deny_tools) = args.deny_tools {
447            eprintln!("🔧 工具黑名单: {:?}", deny_tools);
448        }
449        eprintln!("🔗 正在连接到后端服务...");
450    }
451
452    // 1. 先建立后端连接(带超时)
453    let connect_timeout = Duration::from_secs(30);
454    let initial_connect_result = tokio::time::timeout(
455        connect_timeout,
456        try_connect(
457            args,
458            url,
459            &merged_headers,
460            config_protocol.clone(),
461            client_info.clone(),
462            verbose,
463            quiet,
464            false, // is_retry = false
465        )
466    ).await;
467
468    let initial_running = match initial_connect_result {
469        Ok(Ok(running)) => running,
470        Ok(Err(e)) => {
471            bail!("连接后端失败: {}", e);
472        }
473        Err(_) => {
474            bail!("连接后端超时 ({}秒)", connect_timeout.as_secs());
475        }
476    };
477
478    if !quiet {
479        eprintln!("✅ 后端连接成功");
480    }
481
482    // 2. 使用已连接的后端创建 ProxyHandler
483    let proxy_handler = Arc::new(ProxyHandler::with_tool_filter(
484        initial_running,
485        "cli".to_string(),
486        tool_filter.clone(),
487    ));
488
489    // 3. 启动 stdio server
490    let stdio_transport = stdio();
491    let server = (*proxy_handler).clone().serve(stdio_transport).await?;
492
493    if !quiet {
494        eprintln!("💡 stdio server 已启动,开始代理转换...");
495    }
496
497    // 4. 启动 watchdog 任务(负责监控连接健康 + 断线重连)
498    // 注意:skip_initial_connect = true,因为我们已经建立了连接
499    let handler_for_watchdog = proxy_handler.clone();
500    let mut watchdog_handle = tokio::spawn(run_reconnection_watchdog(
501        handler_for_watchdog,
502        args.clone(),
503        url.to_string(),
504        merged_headers,
505        config_protocol,
506        client_info,
507        tool_filter,
508        verbose,
509        quiet,
510        true, // skip_initial_connect: 跳过首次连接,直接进入监控阶段
511    ));
512
513    // 5. 等待 stdio server 退出(通常是 stdin EOF)
514    tokio::select! {
515        result = server.waiting() => {
516            // stdio server 退出,清理 watchdog
517            watchdog_handle.abort();
518            result?;
519        }
520        watchdog_result = &mut watchdog_handle => {
521            // watchdog 异常退出(不应该发生)
522            if let Err(e) = watchdog_result {
523                if !e.is_cancelled() {
524                    error!("Watchdog task failed: {:?}", e);
525                }
526            }
527        }
528    }
529
530    Ok(())
531}
532
533/// 重连 watchdog:负责监控连接健康、断开时重连
534///
535/// - `skip_initial_connect`: 如果为 true,跳过首次连接尝试,直接进入监控阶段
536///   (用于已经建立连接的场景)
537async fn run_reconnection_watchdog(
538    handler: Arc<ProxyHandler>,
539    args: ConvertArgs,
540    url: String,
541    merged_headers: HashMap<String, String>,
542    config_protocol: Option<super::protocol::McpProtocol>,
543    client_info: ClientInfo,
544    _tool_filter: ToolFilter,  // 保留参数以便将来使用
545    verbose: bool,
546    quiet: bool,
547    skip_initial_connect: bool,
548) {
549    let max_retries = args.retries;
550    let mut attempt = 0u32;
551    let mut backoff_secs = 1u64;
552    const MAX_BACKOFF_SECS: u64 = 30;
553
554    // 如果跳过首次连接,直接进入监控阶段
555    if skip_initial_connect {
556        // 监控现有连接的健康状态
557        let disconnect_reason = monitor_connection(
558            &handler,
559            args.ping_interval,
560            args.ping_timeout,
561            quiet,
562        ).await;
563
564        // 连接断开,标记后端不可用
565        handler.swap_backend(None);
566
567        if !quiet {
568            eprintln!("⚠️  连接断开: {}", disconnect_reason);
569        }
570        // 继续进入重连循环,此时 attempt 从 0 开始
571        // 第一次进入循环时 attempt = 1,is_retry = true(因为这是断线重连)
572    }
573
574    loop {
575        attempt += 1;
576        // 如果跳过了首次连接,所有循环都是重连
577        let is_retry = skip_initial_connect || attempt > 1;
578
579        // 重连时显示日志
580        if is_retry && !quiet {
581            eprintln!("🔗 正在建立连接 (第{}次尝试)...", attempt);
582        }
583
584        // 每次连接需要新的 ClientInfo
585        let client_info_clone = ClientInfo {
586            protocol_version: client_info.protocol_version.clone(),
587            capabilities: client_info.capabilities.clone(),
588            client_info: client_info.client_info.clone(),
589        };
590
591        // 尝试建立连接
592        let connect_result = try_connect(
593            &args,
594            &url,
595            &merged_headers,
596            config_protocol.clone(),
597            client_info_clone,
598            verbose,
599            quiet,
600            is_retry,
601        ).await;
602
603        match connect_result {
604            Ok(running) => {
605                // 连接成功,热替换后端
606                handler.swap_backend(Some(running));
607                backoff_secs = 1; // 重置退避
608
609                if !quiet {
610                    if is_retry {
611                        eprintln!("✅ 重连成功,恢复代理服务");
612                    } else {
613                        eprintln!("✅ 连接成功,开始代理转换...");
614                    }
615
616                    // 打印工具列表(需要通过 handler 访问)
617                    // 由于 handler 现在有连接,可以直接使用
618                }
619
620                // 监控连接健康
621                let disconnect_reason = monitor_connection(
622                    &handler,
623                    args.ping_interval,
624                    args.ping_timeout,
625                    quiet,
626                ).await;
627
628                // 连接断开,标记后端不可用
629                handler.swap_backend(None);
630
631                if !quiet {
632                    eprintln!("⚠️  连接断开: {}", disconnect_reason);
633                }
634            }
635            Err(e) => {
636                // 连接失败
637                let error_type = classify_error(&e);
638
639                // 检查是否还有重试次数(0 表示无限重试)
640                if max_retries > 0 && attempt >= max_retries {
641                    if !quiet {
642                        eprintln!("❌ 连接失败,已达最大重试次数 ({})", max_retries);
643                        eprintln!("   错误类型: {}", error_type);
644                        eprintln!("   错误详情: {}", e);
645                    }
646                    // 达到最大重试次数,退出 watchdog(但 stdio server 继续运行)
647                    break;
648                }
649
650                if !quiet {
651                    if max_retries == 0 {
652                        eprintln!("⚠️  连接失败 [{}]: {},{}秒后重连 (第{}次)...",
653                            error_type, summarize_error(&e), backoff_secs, attempt);
654                    } else {
655                        eprintln!("⚠️  连接失败 [{}]: {},{}秒后重连 ({}/{})...",
656                            error_type, summarize_error(&e), backoff_secs, attempt, max_retries);
657                    }
658                }
659
660                // verbose 模式下显示完整错误
661                if verbose && !quiet {
662                    eprintln!("   完整错误: {}", e);
663                }
664            }
665        }
666
667        // 等待退避时间后重试
668        tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
669
670        // 指数退避,但不超过最大值
671        backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS);
672    }
673}
674
675/// 尝试建立到远程 MCP 服务的连接
676async fn try_connect(
677    args: &ConvertArgs,
678    url: &str,
679    merged_headers: &HashMap<String, String>,
680    config_protocol: Option<super::protocol::McpProtocol>,
681    client_info: ClientInfo,
682    _verbose: bool,  // 保留参数以便将来使用
683    quiet: bool,
684    is_retry: bool,
685) -> Result<rmcp::service::RunningService<rmcp::RoleClient, ClientInfo>> {
686    // 确定协议类型:命令行参数 > 配置文件 > 自动检测
687    let protocol = if let Some(ref proto) = args.protocol {
688        let detected = match proto {
689            super::proxy_server::ProxyProtocol::Sse => super::protocol::McpProtocol::Sse,
690            super::proxy_server::ProxyProtocol::Stream => super::protocol::McpProtocol::Stream,
691        };
692        if !quiet && !is_retry {
693            eprintln!("🔧 使用指定协议: {}", protocol_name(&detected));
694        }
695        detected
696    } else if let Some(proto) = config_protocol {
697        if !quiet && !is_retry {
698            eprintln!("🔧 使用配置协议: {}", protocol_name(&proto));
699        }
700        proto
701    } else {
702        let detected = super::protocol::detect_mcp_protocol(url).await?;
703        if !quiet && !is_retry {
704            eprintln!("🔍 检测到 {} 协议", protocol_name(&detected));
705        }
706        detected
707    };
708
709    if !quiet && !is_retry {
710        eprintln!("🔗 建立连接...");
711    }
712
713    // 构建 HTTP 客户端
714    let http_client = create_http_client_with_headers(merged_headers, &args.header, args.auth.as_ref())?;
715
716    // 创建传输并启动 rmcp 客户端
717    let running = match protocol {
718        super::protocol::McpProtocol::Sse => {
719            let cfg = SseClientConfig {
720                sse_endpoint: url.to_string().into(),
721                ..Default::default()
722            };
723            let transport = SseClientTransport::start_with_client(http_client, cfg).await?;
724            client_info.serve(transport).await?
725        }
726        super::protocol::McpProtocol::Stream => {
727            let cfg = StreamableHttpClientTransportConfig {
728                uri: url.to_string().into(),
729                ..Default::default()
730            };
731            let transport = StreamableHttpClientTransport::with_client(http_client, cfg);
732            client_info.serve(transport).await?
733        }
734        super::protocol::McpProtocol::Stdio => {
735            bail!("Stdio 协议不支持通过 URL 转换,请使用 --config 配置本地命令")
736        }
737    };
738
739    // 打印工具列表
740    if !quiet {
741        match running.list_tools(None).await {
742            Ok(tools_result) => {
743                let tools = &tools_result.tools;
744                if tools.is_empty() {
745                    eprintln!("⚠️  工具列表为空 (tools/list 返回 0 个工具)");
746                } else {
747                    eprintln!("🔧 可用工具 ({} 个):", tools.len());
748                    for tool in tools {
749                        let desc = tool.description.as_deref().unwrap_or("无描述");
750                        let desc_short = if desc.chars().count() > 50 {
751                            format!("{}...", desc.chars().take(50).collect::<String>())
752                        } else {
753                            desc.to_string()
754                        };
755                        eprintln!("   - {} : {}", tool.name, desc_short);
756                    }
757                }
758            }
759            Err(e) => {
760                eprintln!("⚠️  获取工具列表失败: {}", e);
761            }
762        }
763
764        if args.ping_interval > 0 && !is_retry {
765            eprintln!("💓 心跳检测: 每 {}s ping 一次(超时 {}s)", args.ping_interval, args.ping_timeout);
766        }
767    }
768
769    Ok(running)
770}
771
772/// 监控连接健康,返回断开原因
773async fn monitor_connection(
774    handler: &ProxyHandler,
775    ping_interval: u64,
776    ping_timeout: u64,
777    quiet: bool,
778) -> String {
779    if ping_interval == 0 {
780        // 无 ping 模式,等待后端自然断开
781        loop {
782            tokio::time::sleep(Duration::from_secs(1)).await;
783            if !handler.is_backend_available() {
784                return "后端连接已关闭".to_string();
785            }
786        }
787    }
788
789    let mut interval = tokio::time::interval(Duration::from_secs(ping_interval));
790    interval.tick().await; // 跳过第一次立即触发
791
792    loop {
793        interval.tick().await;
794
795        // 快速检查后端可用性
796        if !handler.is_backend_available() {
797            return "后端连接已关闭".to_string();
798        }
799
800        // 异步检查(发送请求验证)
801        let check_result = tokio::time::timeout(
802            Duration::from_secs(ping_timeout),
803            handler.is_terminated_async()
804        ).await;
805
806        match check_result {
807            Ok(true) => {
808                // 连接已断开
809                return "Ping 检测失败(服务错误)".to_string();
810            }
811            Ok(false) => {
812                // 连接正常,继续
813            }
814            Err(_) => {
815                // 超时
816                if !quiet {
817                    eprintln!("❌ Ping 检测超时({}s)", ping_timeout);
818                }
819                return format!("Ping 检测超时({}s)", ping_timeout);
820            }
821        }
822    }
823}
824
825/// 错误分类
826fn classify_error(e: &anyhow::Error) -> &'static str {
827    let err_str = e.to_string().to_lowercase();
828
829    if err_str.contains("timeout") || err_str.contains("timed out") {
830        "超时"
831    } else if err_str.contains("connection refused") {
832        "连接被拒绝"
833    } else if err_str.contains("connection reset") {
834        "连接被重置"
835    } else if err_str.contains("dns") || err_str.contains("resolve") {
836        "DNS解析失败"
837    } else if err_str.contains("certificate") || err_str.contains("ssl") || err_str.contains("tls") {
838        "SSL/TLS错误"
839    } else if err_str.contains("session") {
840        "会话错误"
841    } else if err_str.contains("sending request") || err_str.contains("network") {
842        "网络错误"
843    } else if err_str.contains("eof") || err_str.contains("closed") || err_str.contains("shutdown") {
844        "连接关闭"
845    } else {
846        "未知错误"
847    }
848}
849
850/// 简化错误信息(用于单行日志)
851fn summarize_error(e: &anyhow::Error) -> String {
852    let full = e.to_string();
853    // 截取第一行或前80个字符
854    let first_line = full.lines().next().unwrap_or(&full);
855    // 使用 chars() 安全处理 UTF-8 字符,避免在多字节字符中间截断
856    if first_line.chars().count() > 80 {
857        format!("{}...", first_line.chars().take(77).collect::<String>())
858    } else {
859        first_line.to_string()
860    }
861}
862
863/// 命令模式执行(本地子进程)
864async fn run_command_mode(
865    name: &str,
866    command: &str,
867    cmd_args: Vec<String>,
868    env: HashMap<String, String>,
869    client_info: ClientInfo,
870    tool_filter: ToolFilter,
871    verbose: bool,
872    quiet: bool,
873) -> Result<()> {
874    if !quiet {
875        eprintln!("🚀 MCP-Stdio-Proxy: {} (command) → stdio", name);
876        eprintln!("   命令: {} {:?}", command, cmd_args);
877        if verbose && !env.is_empty() {
878            eprintln!("   环境变量: {:?}", env);
879        }
880    }
881
882    // 显示过滤器配置
883    if !quiet {
884        if tool_filter.is_enabled() {
885            eprintln!("🔧 工具过滤已启用");
886        }
887    }
888
889    // 创建子进程命令
890    let mut cmd = Command::new(command);
891    cmd.args(&cmd_args);
892    for (k, v) in &env {
893        cmd.env(k, v);
894    }
895
896    // 启动子进程
897    let tokio_process = TokioChildProcess::new(cmd)?;
898
899    if !quiet {
900        eprintln!("🔗 启动子进程...");
901    }
902
903    // 连接到子进程
904    let running = client_info.serve(tokio_process).await?;
905
906    if !quiet {
907        eprintln!("✅ 子进程已启动,开始代理转换...");
908
909        // 打印工具列表
910        match running.list_tools(None).await {
911            Ok(tools_result) => {
912                let tools = &tools_result.tools;
913                if tools.is_empty() {
914                    eprintln!("⚠️  工具列表为空 (tools/list 返回 0 个工具)");
915                } else {
916                    eprintln!("🔧 可用工具 ({} 个):", tools.len());
917                    for tool in tools {
918                        let desc = tool.description.as_deref().unwrap_or("无描述");
919                        let desc_short = if desc.chars().count() > 50 {
920                            format!("{}...", desc.chars().take(50).collect::<String>())
921                        } else {
922                            desc.to_string()
923                        };
924                        eprintln!("   - {} : {}", tool.name, desc_short);
925                    }
926                }
927            }
928            Err(e) => {
929                eprintln!("⚠️  获取工具列表失败: {}", e);
930            }
931        }
932
933        eprintln!("💡 现在可以通过 stdin 发送 JSON-RPC 请求");
934    }
935
936    // 使用 ProxyHandler + stdio 将本地 MCP 服务透明暴露为 stdio
937    let proxy_handler = ProxyHandler::with_tool_filter(running, name.to_string(), tool_filter);
938    let stdio_transport = stdio();
939    let server = proxy_handler.serve(stdio_transport).await?;
940    server.waiting().await?;
941
942    Ok(())
943}
944
945/// 获取协议名称
946fn protocol_name(protocol: &super::protocol::McpProtocol) -> &'static str {
947    match protocol {
948        super::protocol::McpProtocol::Sse => "SSE",
949        super::protocol::McpProtocol::Stream => "Streamable HTTP",
950        super::protocol::McpProtocol::Stdio => "Stdio",
951    }
952}
953
954/// 创建 HTTP 客户端(使用合并后的 headers)
955fn create_http_client_with_headers(
956    config_headers: &HashMap<String, String>,
957    cli_headers: &[(String, String)],
958    cli_auth: Option<&String>,
959) -> Result<reqwest::Client> {
960    let mut headers = reqwest::header::HeaderMap::new();
961
962    // 1. 先添加配置中的 headers
963    for (key, value) in config_headers {
964        headers.insert(
965            key.parse::<reqwest::header::HeaderName>()?,
966            value.parse()?,
967        );
968    }
969
970    // 2. 命令行 -H 参数覆盖
971    for (key, value) in cli_headers {
972        headers.insert(
973            key.parse::<reqwest::header::HeaderName>()?,
974            value.parse()?,
975        );
976    }
977
978    // 3. 命令行 --auth 参数优先级最高
979    if let Some(auth) = cli_auth {
980        headers.insert("Authorization", auth.parse()?);
981    }
982
983    let client = reqwest::Client::builder()
984        .default_headers(headers)
985        .build()?;
986
987    Ok(client)
988}
989
990/// 运行检查命令
991async fn run_check_command(args: CheckArgs, _verbose: bool, quiet: bool) -> Result<()> {
992    if !quiet {
993        eprintln!("🔍 检查服务: {}", args.url);
994    }
995
996    match super::protocol::detect_mcp_protocol(&args.url).await {
997        Ok(protocol) => {
998            if !quiet {
999                eprintln!("✅ 服务正常,检测到 {} 协议", protocol);
1000            }
1001            Ok(())
1002        }
1003        Err(e) => {
1004            if !quiet {
1005                eprintln!("❌ 服务检查失败: {}", e);
1006            }
1007            Err(e)
1008        }
1009    }
1010}
1011
1012/// 运行协议检测命令
1013async fn run_detect_command(args: DetectArgs, _verbose: bool, quiet: bool) -> Result<()> {
1014    let protocol = super::protocol::detect_mcp_protocol(&args.url).await?;
1015
1016    if quiet {
1017        println!("{}", protocol);
1018    } else {
1019        eprintln!("{}", protocol);
1020    }
1021
1022    Ok(())
1023}