Skip to main content

smcp_computer/mcp_clients/
stdio_client.rs

1/**
2* 文件名: stdio_client
3* 作者: JQQ
4* 创建日期: 2025/12/15
5* 最后修改日期: 2025/12/15
6* 版权: 2023 JQQ. All rights reserved.
7* 依赖: tokio, serde_json
8* 描述: STDIO类型的MCP客户端实现
9*/
10use super::base_client::BaseMCPClient;
11use super::model::*;
12use super::{ResourceCache, SubscriptionManager};
13use crate::desktop::window_uri::{is_window_uri, WindowURI};
14use async_trait::async_trait;
15use serde_json;
16use std::process::Stdio;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::io::{AsyncBufReadExt, BufReader};
20use tokio::process::{Child, Command};
21use tokio::sync::Mutex;
22use tracing::{debug, error, info, warn};
23
24/// STDIO MCP客户端 / STDIO MCP client
25pub struct StdioMCPClient {
26    /// 基础客户端 / Base client
27    base: BaseMCPClient<StdioServerParameters>,
28    /// 子进程 / Child process
29    child_process: Arc<Mutex<Option<Child>>>,
30    /// 会话ID / Session ID
31    session_id: Arc<Mutex<Option<String>>>,
32    /// 订阅管理器 / Subscription manager
33    subscription_manager: SubscriptionManager,
34    /// 资源缓存 / Resource cache
35    resource_cache: ResourceCache,
36}
37
38impl std::fmt::Debug for StdioMCPClient {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("StdioMCPClient")
41            .field("command", &self.base.params.command)
42            .field("args", &self.base.params.args)
43            .field("state", &self.base.state())
44            .finish()
45    }
46}
47
48impl StdioMCPClient {
49    /// 创建新的STDIO客户端 / Create new STDIO client
50    pub fn new(params: StdioServerParameters) -> Self {
51        Self {
52            base: BaseMCPClient::new(params),
53            child_process: Arc::new(Mutex::new(None)),
54            session_id: Arc::new(Mutex::new(None)),
55            subscription_manager: SubscriptionManager::new(),
56            resource_cache: ResourceCache::new(Duration::from_secs(60)), // 默认 60 秒 TTL
57        }
58    }
59
60    /// 启动子进程 / Start child process
61    async fn start_child_process(
62        &self,
63        params: &StdioServerParameters,
64    ) -> Result<Child, MCPClientError> {
65        let mut cmd = Command::new(&params.command);
66
67        // 设置参数 / Set arguments
68        cmd.args(&params.args);
69
70        // 设置环境变量 / Set environment variables
71        for (key, value) in &params.env {
72            cmd.env(key, value);
73        }
74
75        // 设置工作目录 / Set working directory
76        if let Some(cwd) = &params.cwd {
77            cmd.current_dir(cwd);
78        }
79
80        // 配置stdio / Configure stdio
81        cmd.stdin(Stdio::piped())
82            .stdout(Stdio::piped())
83            .stderr(Stdio::piped());
84
85        debug!("Starting command: {} {:?}", params.command, params.args);
86
87        let child = cmd.spawn().map_err(|e| {
88            MCPClientError::ConnectionError(format!("Failed to start process: {}", e))
89        })?;
90
91        Ok(child)
92    }
93
94    /// 发送JSON-RPC请求 / Send JSON-RPC request
95    /// 发送通知(不需要响应) / Send notification (no response expected)
96    async fn send_notification(
97        &self,
98        notification: &serde_json::Value,
99    ) -> Result<(), MCPClientError> {
100        let mut child = self.child_process.lock().await;
101        if let Some(ref mut process) = *child {
102            if let Some(stdin) = process.stdin.as_mut() {
103                let notification_str = serde_json::to_string(notification)?;
104                use tokio::io::AsyncWriteExt;
105                stdin.write_all(notification_str.as_bytes()).await?;
106                stdin.write_all(b"\n").await?;
107                stdin.flush().await?;
108
109                debug!("Sent notification: {}", notification_str);
110                info!("Sent notification to MCP server: {}", notification_str);
111                return Ok(());
112            }
113        }
114        Err(MCPClientError::ConnectionError(
115            "Process not available".to_string(),
116        ))
117    }
118
119    async fn send_request(
120        &self,
121        request: &serde_json::Value,
122    ) -> Result<serde_json::Value, MCPClientError> {
123        let mut child = self.child_process.lock().await;
124        if let Some(ref mut process) = *child {
125            if let Some(stdin) = process.stdin.as_mut() {
126                let request_str = serde_json::to_string(request)?;
127                use tokio::io::AsyncWriteExt;
128                stdin.write_all(request_str.as_bytes()).await?;
129                stdin.write_all(b"\n").await?;
130                stdin.flush().await?;
131
132                debug!("Sent request: {}", request_str);
133                info!("Sent request to MCP server: {}", request_str);
134
135                // 读取响应 / Read response
136                if let Some(stdout) = process.stdout.as_mut() {
137                    let mut reader = BufReader::new(stdout);
138                    let mut line = String::new();
139
140                    info!("Waiting for response from MCP server...");
141
142                    // 添加超时以防止无限阻塞
143                    return match tokio::time::timeout(
144                        std::time::Duration::from_secs(30),
145                        reader.read_line(&mut line),
146                    )
147                    .await
148                    {
149                        Ok(Ok(0)) => {
150                            // Try to read stderr for diagnostic info
151                            let mut stderr_output = String::new();
152                            if let Some(stderr) = process.stderr.as_mut() {
153                                let mut stderr_reader = BufReader::new(stderr);
154                                let _ = tokio::time::timeout(
155                                    std::time::Duration::from_secs(2),
156                                    async {
157                                        loop {
158                                            let mut buf = String::new();
159                                            match stderr_reader.read_line(&mut buf).await {
160                                                Ok(0) | Err(_) => break,
161                                                Ok(_) => stderr_output.push_str(&buf),
162                                            }
163                                        }
164                                    },
165                                )
166                                .await;
167                            }
168                            if stderr_output.is_empty() {
169                                error!("Process closed stdout without response (no stderr output)");
170                            } else {
171                                error!(
172                                    "Process closed stdout without response. stderr: {}",
173                                    stderr_output.trim()
174                                );
175                            }
176                            Err(MCPClientError::ConnectionError(format!(
177                                "Process closed stdout. stderr: {}",
178                                stderr_output.trim()
179                            )))
180                        }
181                        Ok(Ok(_)) => {
182                            info!("Received raw response: {}", line.trim());
183                            debug!("Received response: {}", line.trim());
184                            let response: serde_json::Value = serde_json::from_str(line.trim())
185                                .map_err(|e| {
186                                    error!("Failed to parse JSON response: {}", e);
187                                    MCPClientError::ProtocolError(format!("Invalid JSON: {}", e))
188                                })?;
189                            info!("Parsed JSON response: {}", response);
190                            Ok(response)
191                        }
192                        Ok(Err(e)) => Err(MCPClientError::ConnectionError(format!(
193                            "Failed to read response: {}",
194                            e
195                        ))),
196                        Err(_) => Err(MCPClientError::TimeoutError(
197                            "No response received within timeout".to_string(),
198                        )),
199                    };
200                }
201            }
202        }
203
204        Err(MCPClientError::ConnectionError(
205            "Process not running".to_string(),
206        ))
207    }
208
209    /// 初始化会话 / Initialize session
210    async fn initialize_session(&self) -> Result<(), MCPClientError> {
211        let init_request = serde_json::json!({
212            "jsonrpc": "2.0",
213            "id": 1,
214            "method": "initialize",
215            "params": {
216                "protocolVersion": "2024-11-05",
217                "capabilities": {
218                    "tools": {},
219                    "resources": {}
220                },
221                "clientInfo": {
222                    "name": "a2c-smcp-rust",
223                    "version": "0.1.0"
224                }
225            }
226        });
227
228        let response = self.send_request(&init_request).await?;
229
230        // 检查响应 / Check response
231        if let Some(error) = response.get("error") {
232            return Err(MCPClientError::ProtocolError(format!(
233                "Initialize error: {}",
234                error
235            )));
236        }
237
238        if let Some(result) = response.get("result") {
239            if let Some(session_id) = result.get("sessionId").and_then(|v| v.as_str()) {
240                *self.session_id.lock().await = Some(session_id.to_string());
241            }
242        }
243
244        // 发送initialized通知 / Send initialized notification
245        let initialized_notification = serde_json::json!({
246            "jsonrpc": "2.0",
247            "method": "notifications/initialized"
248        });
249
250        // 通知不需要响应 / Notifications don't need response
251        self.send_notification(&initialized_notification).await?;
252
253        info!("Session initialized successfully");
254        Ok(())
255    }
256
257    // ========== 订阅管理 API / Subscription Management API ==========
258
259    /// 检查是否已订阅指定资源
260    pub async fn is_subscribed(&self, uri: &str) -> bool {
261        self.subscription_manager.is_subscribed(uri).await
262    }
263
264    /// 获取所有订阅的 URI 列表
265    pub async fn get_subscriptions(&self) -> Vec<String> {
266        self.subscription_manager.get_subscriptions().await
267    }
268
269    /// 获取订阅数量
270    pub async fn subscription_count(&self) -> usize {
271        self.subscription_manager.subscription_count().await
272    }
273
274    // ========== 资源缓存 API / Resource Cache API ==========
275
276    /// 获取缓存的资源数据
277    pub async fn get_cached_resource(&self, uri: &str) -> Option<serde_json::Value> {
278        self.resource_cache.get(uri).await
279    }
280
281    /// 检查资源是否已缓存
282    pub async fn has_cache(&self, uri: &str) -> bool {
283        self.resource_cache.contains(uri).await
284    }
285
286    /// 获取缓存大小
287    pub async fn cache_size(&self) -> usize {
288        self.resource_cache.size().await
289    }
290
291    /// 清理过期的缓存
292    pub async fn cleanup_cache(&self) -> usize {
293        self.resource_cache.cleanup_expired().await
294    }
295
296    /// 清空所有缓存
297    pub async fn clear_cache(&self) {
298        self.resource_cache.clear().await
299    }
300
301    /// 获取所有缓存的 URI 列表
302    pub async fn cache_keys(&self) -> Vec<String> {
303        self.resource_cache.keys().await
304    }
305}
306
307#[async_trait]
308impl MCPClientProtocol for StdioMCPClient {
309    fn state(&self) -> ClientState {
310        self.base.state()
311    }
312
313    async fn connect(&self) -> Result<(), MCPClientError> {
314        // 检查是否可以连接 / Check if can connect
315        if !self.base.can_connect().await {
316            return Err(MCPClientError::ConnectionError(format!(
317                "Cannot connect in state: {}",
318                self.base.get_state().await
319            )));
320        }
321
322        // 获取参数 / Get parameters
323        let params = self.base.params.clone();
324
325        // 启动子进程 / Start child process
326        let child = self.start_child_process(&params).await?;
327        *self.child_process.lock().await = Some(child);
328
329        // 初始化会话 / Initialize session
330        self.initialize_session().await?;
331
332        // 更新状态 / Update state
333        self.base.update_state(ClientState::Connected).await;
334        info!("STDIO client connected successfully");
335
336        Ok(())
337    }
338
339    async fn disconnect(&self) -> Result<(), MCPClientError> {
340        // 检查是否可以断开 / Check if can disconnect
341        if !self.base.can_disconnect().await {
342            return Err(MCPClientError::ConnectionError(format!(
343                "Cannot disconnect in state: {}",
344                self.base.get_state().await
345            )));
346        }
347
348        // 停止子进程 / Stop child process
349        let mut child = self.child_process.lock().await;
350        if let Some(mut process) = child.take() {
351            // 尝试优雅关闭 / Try graceful shutdown
352            let shutdown_request = serde_json::json!({
353                "jsonrpc": "2.0",
354                "id": 2,
355                "method": "shutdown"
356            });
357
358            // 直接写入而不调用 send_request 以避免死锁
359            if let Some(stdin) = process.stdin.as_mut() {
360                let request_str = serde_json::to_string(&shutdown_request)?;
361                use tokio::io::AsyncWriteExt;
362                if let Err(e) = stdin.write_all(request_str.as_bytes()).await {
363                    warn!("Failed to send shutdown request: {}", e);
364                } else {
365                    let _ = stdin.write_all(b"\n").await;
366                    let _ = stdin.flush().await;
367                }
368            }
369
370            // 发送exit通知 / Send exit notification
371            let exit_notification = serde_json::json!({
372                "jsonrpc": "2.0",
373                "method": "exit"
374            });
375
376            if let Some(stdin) = process.stdin.as_mut() {
377                let request_str = serde_json::to_string(&exit_notification)?;
378                use tokio::io::AsyncWriteExt;
379                if let Err(e) = stdin.write_all(request_str.as_bytes()).await {
380                    warn!("Failed to send exit notification: {}", e);
381                } else {
382                    let _ = stdin.write_all(b"\n").await;
383                    let _ = stdin.flush().await;
384                }
385            }
386
387            // 释放锁,然后等待进程退出
388            drop(child);
389
390            // 等待进程退出或强制杀死 / Wait for process exit or force kill
391            match tokio::time::timeout(std::time::Duration::from_secs(5), process.wait()).await {
392                Ok(Ok(status)) => {
393                    debug!("Process exited with status: {}", status);
394                }
395                Ok(Err(e)) => {
396                    error!("Error waiting for process: {}", e);
397                }
398                Err(_) => {
399                    warn!("Process did not exit within timeout, killing it");
400                    if let Err(e) = process.kill().await {
401                        error!("Failed to kill process: {}", e);
402                    }
403                }
404            }
405        } else {
406            // 没有进程时也要释放锁
407            drop(child);
408        }
409
410        // 清理会话ID / Clear session ID
411        *self.session_id.lock().await = None;
412
413        // 更新状态 / Update state
414        self.base.update_state(ClientState::Disconnected).await;
415        info!("STDIO client disconnected successfully");
416
417        Ok(())
418    }
419
420    async fn list_tools(&self) -> Result<Vec<Tool>, MCPClientError> {
421        if self.base.get_state().await != ClientState::Connected {
422            return Err(MCPClientError::ConnectionError("Not connected".to_string()));
423        }
424
425        let request = serde_json::json!({
426            "jsonrpc": "2.0",
427            "id": 3,
428            "method": "tools/list"
429        });
430
431        let response = self.send_request(&request).await?;
432        info!("Received list_tools response: {}", response);
433
434        if let Some(error) = response.get("error") {
435            return Err(MCPClientError::ProtocolError(format!(
436                "List tools error: {}",
437                error
438            )));
439        }
440
441        if let Some(result) = response.get("result") {
442            info!("Result field: {}", result);
443            if let Some(tools) = result.get("tools").and_then(|v| v.as_array()) {
444                info!("Found {} tools", tools.len());
445                let mut tool_list = Vec::new();
446                for (i, tool) in tools.iter().enumerate() {
447                    info!("Tool {}: {}", i, tool);
448                    if let Ok(parsed_tool) = serde_json::from_value::<Tool>(tool.clone()) {
449                        tool_list.push(parsed_tool);
450                    } else {
451                        warn!("Failed to parse tool {}: {}", i, tool);
452                    }
453                }
454                return Ok(tool_list);
455            } else {
456                warn!("No tools array found in result");
457            }
458        } else {
459            warn!("No result field found in response");
460        }
461
462        Ok(vec![])
463    }
464
465    async fn call_tool(
466        &self,
467        tool_name: &str,
468        params: serde_json::Value,
469    ) -> Result<CallToolResult, MCPClientError> {
470        if self.base.get_state().await != ClientState::Connected {
471            return Err(MCPClientError::ConnectionError("Not connected".to_string()));
472        }
473
474        let request = serde_json::json!({
475            "jsonrpc": "2.0",
476            "id": 4,
477            "method": "tools/call",
478            "params": {
479                "name": tool_name,
480                "arguments": params
481            }
482        });
483
484        let response = self.send_request(&request).await?;
485
486        if let Some(error) = response.get("error") {
487            return Err(MCPClientError::ProtocolError(format!(
488                "Call tool error: {}",
489                error
490            )));
491        }
492
493        if let Some(result) = response.get("result") {
494            let call_result: CallToolResult = serde_json::from_value(result.clone())?;
495            return Ok(call_result);
496        }
497
498        Err(MCPClientError::ProtocolError(
499            "Invalid response".to_string(),
500        ))
501    }
502
503    async fn list_windows(&self) -> Result<Vec<Resource>, MCPClientError> {
504        if self.base.get_state().await != ClientState::Connected {
505            return Err(MCPClientError::ConnectionError("Not connected".to_string()));
506        }
507
508        // 支持分页获取资源 / Support pagination for resources
509        let mut all_resources = Vec::new();
510        let mut cursor: Option<String> = None;
511
512        loop {
513            let mut request = serde_json::json!({
514                "jsonrpc": "2.0",
515                "id": 5,
516                "method": "resources/list"
517            });
518
519            // 添加分页参数 / Add pagination parameter
520            if let Some(ref c) = cursor {
521                request["params"] = serde_json::json!({ "cursor": c });
522            }
523
524            let response = self.send_request(&request).await?;
525
526            if let Some(error) = response.get("error") {
527                return Err(MCPClientError::ProtocolError(format!(
528                    "List resources error: {}",
529                    error
530                )));
531            }
532
533            if let Some(result) = response.get("result") {
534                // 解析资源列表 / Parse resource list
535                if let Some(resources) = result.get("resources").and_then(|v| v.as_array()) {
536                    for resource in resources {
537                        if let Ok(parsed_resource) =
538                            serde_json::from_value::<Resource>(resource.clone())
539                        {
540                            all_resources.push(parsed_resource);
541                        }
542                    }
543                }
544
545                // 检查是否有下一页 / Check if there's a next page
546                cursor = result
547                    .get("nextCursor")
548                    .and_then(|v| v.as_str())
549                    .map(|s| s.to_string());
550
551                if cursor.is_none() {
552                    break;
553                }
554            } else {
555                break;
556            }
557        }
558
559        // 过滤 window:// 资源并按 priority 排序 / Filter window:// resources and sort by priority
560        let mut filtered_resources: Vec<(Resource, i32)> = Vec::new();
561
562        for resource in all_resources {
563            if !is_window_uri(&resource.uri) {
564                continue;
565            }
566
567            // 解析 priority / Parse priority
568            let priority = if let Ok(uri) = WindowURI::new(&resource.uri) {
569                uri.priority().unwrap_or(0)
570            } else {
571                0
572            };
573
574            filtered_resources.push((resource, priority));
575        }
576
577        // 按 priority 降序排序 / Sort by priority in descending order
578        filtered_resources.sort_by(|a, b| b.1.cmp(&a.1));
579
580        // 返回仅包含 Resource 的列表 / Return list containing only Resource
581        Ok(filtered_resources.into_iter().map(|(r, _)| r).collect())
582    }
583
584    async fn get_window_detail(
585        &self,
586        resource: Resource,
587    ) -> Result<ReadResourceResult, MCPClientError> {
588        if self.base.get_state().await != ClientState::Connected {
589            return Err(MCPClientError::ConnectionError("Not connected".to_string()));
590        }
591
592        let request = serde_json::json!({
593            "jsonrpc": "2.0",
594            "id": 6,
595            "method": "resources/read",
596            "params": {
597                "uri": resource.uri
598            }
599        });
600
601        let response = self.send_request(&request).await?;
602
603        if let Some(error) = response.get("error") {
604            return Err(MCPClientError::ProtocolError(format!(
605                "Read resource error: {}",
606                error
607            )));
608        }
609
610        if let Some(result) = response.get("result") {
611            let read_result: ReadResourceResult = serde_json::from_value(result.clone())?;
612            return Ok(read_result);
613        }
614
615        Err(MCPClientError::ProtocolError(
616            "Invalid response".to_string(),
617        ))
618    }
619
620    async fn subscribe_window(&self, resource: Resource) -> Result<(), MCPClientError> {
621        if self.base.get_state().await != ClientState::Connected {
622            return Err(MCPClientError::ConnectionError("Not connected".to_string()));
623        }
624
625        let request = serde_json::json!({
626            "jsonrpc": "2.0",
627            "id": 7,
628            "method": "resources/subscribe",
629            "params": {
630                "uri": resource.uri
631            }
632        });
633
634        let response = self.send_request(&request).await?;
635
636        if let Some(error) = response.get("error") {
637            return Err(MCPClientError::ProtocolError(format!(
638                "Subscribe resource error: {}",
639                error
640            )));
641        }
642
643        // 订阅成功后,更新本地订阅状态
644        let _ = self
645            .subscription_manager
646            .add_subscription(resource.uri.clone())
647            .await;
648
649        // 立即获取并缓存资源数据
650        match self.get_window_detail(resource.clone()).await {
651            Ok(result) => {
652                // 将 contents 转换为 JSON Value
653                if !result.contents.is_empty() {
654                    // 取第一个内容并转换为 JSON
655                    if let Ok(json_value) = serde_json::to_value(&result.contents[0]) {
656                        self.resource_cache
657                            .set(resource.uri.clone(), json_value, None)
658                            .await;
659                        info!("Subscribed and cached: {}", resource.uri);
660                    }
661                }
662            }
663            Err(e) => {
664                warn!("Failed to fetch resource data after subscription: {:?}", e);
665            }
666        }
667
668        Ok(())
669    }
670
671    async fn unsubscribe_window(&self, resource: Resource) -> Result<(), MCPClientError> {
672        if self.base.get_state().await != ClientState::Connected {
673            return Err(MCPClientError::ConnectionError("Not connected".to_string()));
674        }
675
676        let request = serde_json::json!({
677            "jsonrpc": "2.0",
678            "id": 8,
679            "method": "resources/unsubscribe",
680            "params": {
681                "uri": resource.uri
682            }
683        });
684
685        let response = self.send_request(&request).await?;
686
687        if let Some(error) = response.get("error") {
688            return Err(MCPClientError::ProtocolError(format!(
689                "Unsubscribe resource error: {}",
690                error
691            )));
692        }
693
694        // 取消订阅成功后,移除本地订阅状态
695        let _ = self
696            .subscription_manager
697            .remove_subscription(&resource.uri)
698            .await;
699
700        // 清理缓存
701        self.resource_cache.remove(&resource.uri).await;
702        info!("Unsubscribed and removed cache: {}", resource.uri);
703
704        Ok(())
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711    use serde_json::json;
712    use std::collections::HashMap;
713    use tokio::time::{sleep, Duration};
714
715    #[tokio::test]
716    async fn test_stdio_client_creation() {
717        let params = StdioServerParameters {
718            command: "echo".to_string(),
719            args: vec!["hello".to_string()],
720            env: HashMap::new(),
721            cwd: None,
722        };
723
724        let client = StdioMCPClient::new(params);
725        assert_eq!(client.state(), ClientState::Initialized);
726        assert_eq!(client.base.params.command, "echo");
727    }
728
729    #[tokio::test]
730    async fn test_stdio_client_with_env() {
731        let mut env = HashMap::new();
732        env.insert("TEST_VAR".to_string(), "test_value".to_string());
733        env.insert("PATH".to_string(), "/usr/bin".to_string());
734
735        let params = StdioServerParameters {
736            command: "echo".to_string(),
737            args: vec!["test".to_string()],
738            env,
739            cwd: Some("/tmp".to_string()),
740        };
741
742        let client = StdioMCPClient::new(params);
743        assert_eq!(
744            client.base.params.env.get("TEST_VAR"),
745            Some(&"test_value".to_string())
746        );
747        assert_eq!(client.base.params.cwd, Some("/tmp".to_string()));
748    }
749
750    #[tokio::test]
751    async fn test_session_id_management() {
752        let params = StdioServerParameters {
753            command: "echo".to_string(),
754            args: vec!["test".to_string()],
755            env: HashMap::new(),
756            cwd: None,
757        };
758
759        let client = StdioMCPClient::new(params);
760
761        // 初始会话ID应该为空 / Initial session ID should be None
762        let session_id = client.session_id.lock().await;
763        assert!(session_id.is_none());
764        drop(session_id);
765
766        // 设置会话ID / Set session ID
767        *client.session_id.lock().await = Some("session123".to_string());
768        let session_id = client.session_id.lock().await;
769        assert_eq!(session_id.as_ref().unwrap(), "session123");
770    }
771
772    #[tokio::test]
773    async fn test_start_child_process_with_echo() {
774        let params = StdioServerParameters {
775            command: "echo".to_string(),
776            args: vec!["hello world".to_string()],
777            env: HashMap::new(),
778            cwd: None,
779        };
780
781        let client = StdioMCPClient::new(params);
782
783        // 启动子进程
784        let result = client.start_child_process(&client.base.params).await;
785        assert!(result.is_ok());
786
787        // 子进程应该成功启动
788        let mut child = result.unwrap();
789
790        // 等待一小段时间让进程运行
791        sleep(Duration::from_millis(100)).await;
792
793        // 尝试杀死进程(清理)
794        let _ = child.kill().await;
795    }
796
797    #[tokio::test]
798    async fn test_start_child_process_with_invalid_command() {
799        let params = StdioServerParameters {
800            command: "nonexistent_command_12345".to_string(),
801            args: vec![],
802            env: HashMap::new(),
803            cwd: None,
804        };
805
806        let client = StdioMCPClient::new(params.clone());
807
808        // 启动不存在的命令应该失败
809        let result = client.start_child_process(&params).await;
810        assert!(result.is_err());
811        assert!(matches!(
812            result.unwrap_err(),
813            MCPClientError::ConnectionError(_)
814        ));
815    }
816
817    #[tokio::test]
818    async fn test_send_request_without_process() {
819        let params = StdioServerParameters {
820            command: "echo".to_string(),
821            args: vec!["test".to_string()],
822            env: HashMap::new(),
823            cwd: None,
824        };
825
826        let client = StdioMCPClient::new(params);
827
828        // 没有进程时发送请求应该失败
829        let request = json!({"jsonrpc": "2.0", "method": "test"});
830        let result = client.send_request(&request).await;
831        assert!(result.is_err());
832        assert!(matches!(
833            result.unwrap_err(),
834            MCPClientError::ConnectionError(_)
835        ));
836    }
837
838    #[tokio::test]
839    async fn test_connect_state_checks() {
840        let params = StdioServerParameters {
841            command: "echo".to_string(),
842            args: vec!["test".to_string()],
843            env: HashMap::new(),
844            cwd: None,
845        };
846
847        let client = StdioMCPClient::new(params);
848
849        // 在已连接状态下尝试连接应该失败
850        client.base.update_state(ClientState::Connected).await;
851        let result = client.connect().await;
852        assert!(result.is_err());
853        assert!(matches!(
854            result.unwrap_err(),
855            MCPClientError::ConnectionError(_)
856        ));
857    }
858
859    #[tokio::test]
860    async fn test_disconnect_state_checks() {
861        let params = StdioServerParameters {
862            command: "echo".to_string(),
863            args: vec!["test".to_string()],
864            env: HashMap::new(),
865            cwd: None,
866        };
867
868        let client = StdioMCPClient::new(params);
869
870        // 在未连接状态下尝试断开应该失败
871        let result = client.disconnect().await;
872        assert!(result.is_err());
873        assert!(matches!(
874            result.unwrap_err(),
875            MCPClientError::ConnectionError(_)
876        ));
877    }
878
879    #[tokio::test]
880    async fn test_list_tools_requires_connection() {
881        let params = StdioServerParameters {
882            command: "echo".to_string(),
883            args: vec!["test".to_string()],
884            env: HashMap::new(),
885            cwd: None,
886        };
887
888        let client = StdioMCPClient::new(params);
889
890        // 未连接状态下调用 list_tools 应该失败
891        let result = client.list_tools().await;
892        assert!(result.is_err());
893        assert!(matches!(
894            result.unwrap_err(),
895            MCPClientError::ConnectionError(_)
896        ));
897    }
898
899    #[tokio::test]
900    async fn test_call_tool_requires_connection() {
901        let params = StdioServerParameters {
902            command: "echo".to_string(),
903            args: vec!["test".to_string()],
904            env: HashMap::new(),
905            cwd: None,
906        };
907
908        let client = StdioMCPClient::new(params);
909
910        // 未连接状态下调用 call_tool 应该失败
911        let result = client.call_tool("test_tool", json!({})).await;
912        assert!(result.is_err());
913        assert!(matches!(
914            result.unwrap_err(),
915            MCPClientError::ConnectionError(_)
916        ));
917    }
918
919    #[tokio::test]
920    async fn test_list_windows_requires_connection() {
921        let params = StdioServerParameters {
922            command: "echo".to_string(),
923            args: vec!["test".to_string()],
924            env: HashMap::new(),
925            cwd: None,
926        };
927
928        let client = StdioMCPClient::new(params);
929
930        // 未连接状态下调用 list_windows 应该失败
931        let result = client.list_windows().await;
932        assert!(result.is_err());
933        assert!(matches!(
934            result.unwrap_err(),
935            MCPClientError::ConnectionError(_)
936        ));
937    }
938
939    #[tokio::test]
940    async fn test_get_window_detail_requires_connection() {
941        let params = StdioServerParameters {
942            command: "echo".to_string(),
943            args: vec!["test".to_string()],
944            env: HashMap::new(),
945            cwd: None,
946        };
947
948        let client = StdioMCPClient::new(params);
949
950        let resource = Resource {
951            uri: "window://123".to_string(),
952            name: "Test Window".to_string(),
953            description: None,
954            mime_type: None,
955        };
956
957        // 未连接状态下调用 get_window_detail 应该失败
958        let result = client.get_window_detail(resource).await;
959        assert!(result.is_err());
960        assert!(matches!(
961            result.unwrap_err(),
962            MCPClientError::ConnectionError(_)
963        ));
964    }
965
966    #[tokio::test]
967    async fn test_initialize_session_request_format() {
968        let params = StdioServerParameters {
969            command: "echo".to_string(),
970            args: vec!["test".to_string()],
971            env: HashMap::new(),
972            cwd: None,
973        };
974
975        let client = StdioMCPClient::new(params);
976
977        // 由于 echo 不会返回有效的 JSON-RPC 响应,初始化会失败
978        let result = client.initialize_session().await;
979        assert!(result.is_err());
980    }
981
982    #[tokio::test]
983    async fn test_disconnect_cleanup() {
984        let params = StdioServerParameters {
985            command: "echo".to_string(),
986            args: vec!["test".to_string()],
987            env: HashMap::new(),
988            cwd: None,
989        };
990
991        let client = StdioMCPClient::new(params);
992
993        // 设置会话ID
994        *client.session_id.lock().await = Some("session123".to_string());
995
996        // 设置为已连接状态
997        client.base.update_state(ClientState::Connected).await;
998
999        // 断开连接(即使失败也应该清理会话ID)
1000        let _ = client.disconnect().await;
1001
1002        // 验证会话ID被清理
1003        let session_id = client.session_id.lock().await;
1004        assert!(session_id.is_none());
1005
1006        // 验证状态变为已断开
1007        assert_eq!(client.base.get_state().await, ClientState::Disconnected);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_child_process_cleanup() {
1012        let params = StdioServerParameters {
1013            command: "sleep".to_string(),
1014            args: vec!["10".to_string()],
1015            env: HashMap::new(),
1016            cwd: None,
1017        };
1018
1019        let client = StdioMCPClient::new(params.clone());
1020
1021        // 启动一个长时间运行的进程
1022        let child = client.start_child_process(&params).await.unwrap();
1023        *client.child_process.lock().await = Some(child);
1024
1025        // 设置为已连接状态(这样 disconnect 才会清理进程)
1026        client.base.update_state(ClientState::Connected).await;
1027
1028        // 验证进程正在运行
1029        let child_guard = client.child_process.lock().await;
1030        assert!(child_guard.is_some());
1031        drop(child_guard);
1032
1033        // 断开连接应该清理进程
1034        let _ = client.disconnect().await;
1035
1036        // 验证进程被清理
1037        let child_guard = client.child_process.lock().await;
1038        assert!(child_guard.is_none());
1039    }
1040
1041    #[tokio::test]
1042    async fn test_error_handling_in_list_tools() {
1043        let params = StdioServerParameters {
1044            command: "echo".to_string(),
1045            args: vec!["test".to_string()],
1046            env: HashMap::new(),
1047            cwd: None,
1048        };
1049
1050        let client = StdioMCPClient::new(params);
1051
1052        // 模拟已连接状态
1053        client.base.update_state(ClientState::Connected).await;
1054
1055        // 尝试列出工具(会因为没有有效的 MCP 服务器而返回错误)
1056        let result = client.list_tools().await;
1057        assert!(result.is_err());
1058    }
1059
1060    #[tokio::test]
1061    async fn test_error_handling_in_call_tool() {
1062        let params = StdioServerParameters {
1063            command: "echo".to_string(),
1064            args: vec!["test".to_string()],
1065            env: HashMap::new(),
1066            cwd: None,
1067        };
1068
1069        let client = StdioMCPClient::new(params);
1070
1071        // 模拟已连接状态
1072        client.base.update_state(ClientState::Connected).await;
1073
1074        // 尝试调用工具(会因为没有有效的 MCP 服务器而返回错误)
1075        let result = client
1076            .call_tool("test_tool", json!({"param": "value"}))
1077            .await;
1078        assert!(result.is_err());
1079    }
1080
1081    #[tokio::test]
1082    async fn test_start_child_process_with_working_directory() {
1083        let params = StdioServerParameters {
1084            command: "pwd".to_string(),
1085            args: vec![],
1086            env: HashMap::new(),
1087            cwd: Some("/tmp".to_string()),
1088        };
1089
1090        let client = StdioMCPClient::new(params.clone());
1091
1092        // 启动子进程并设置工作目录
1093        let result = client.start_child_process(&params).await;
1094        assert!(result.is_ok());
1095
1096        let mut child = result.unwrap();
1097
1098        // 等待进程完成
1099        let _ = child.wait().await;
1100    }
1101
1102    #[tokio::test]
1103    async fn test_stdio_client_debug_format() {
1104        let params = StdioServerParameters {
1105            command: "echo".to_string(),
1106            args: vec!["test".to_string()],
1107            env: HashMap::new(),
1108            cwd: None,
1109        };
1110
1111        let client = StdioMCPClient::new(params);
1112
1113        // 验证 Debug trait 实现
1114        let debug_str = format!("{:?}", client);
1115        assert!(debug_str.contains("StdioMCPClient"));
1116    }
1117}