Skip to main content

smcp_computer/
socketio_client.rs

1/*!
2* 文件名: socketio_client
3* 作者: JQQ
4* 创建日期: 2025/12/16
5* 最后修改日期: 2025/12/16
6* 版权: 2023 JQQ. All rights reserved.
7* 依赖: tf_rust_socketio, tokio, serde
8* 描述: SMCP Computer的Socket.IO客户端实现 / Socket.IO client implementation for SMCP Computer
9*/
10
11use crate::errors::{ComputerError, ComputerResult};
12use crate::mcp_clients::manager::MCPServerManager;
13use crate::mcp_clients::model::MCPServerInput;
14use futures_util::FutureExt;
15use serde_json::Value;
16use smcp::{
17    events::{
18        CLIENT_GET_CONFIG, CLIENT_GET_DESKTOP, CLIENT_GET_TOOLS, CLIENT_TOOL_CALL,
19        SERVER_JOIN_OFFICE, SERVER_LEAVE_OFFICE, SERVER_UPDATE_CONFIG, SERVER_UPDATE_DESKTOP,
20        SERVER_UPDATE_TOOL_LIST,
21    },
22    GetComputerConfigReq, GetComputerConfigRet, GetDesktopReq, GetDesktopRet, GetToolsReq,
23    GetToolsRet, ToolCallReq, SMCP_NAMESPACE,
24};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tf_rust_socketio::{
28    asynchronous::{Client, ClientBuilder},
29    Event, Payload, TransportType,
30};
31use tokio::sync::RwLock;
32use tracing::{debug, error, info};
33
34/// SMCP Computer Socket.IO客户端
35/// SMCP Computer Socket.IO client
36pub struct SmcpComputerClient {
37    /// Socket.IO客户端实例 / Socket.IO client instance
38    client: Client,
39    /// Computer名称 / Computer name
40    computer_name: String,
41    /// 当前所在的office ID / Current office ID
42    office_id: Arc<RwLock<Option<String>>>,
43    /// 输入定义映射 / Input definitions map
44    #[allow(dead_code)]
45    inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
46}
47
48impl SmcpComputerClient {
49    /// 创建新的Socket.IO客户端
50    /// Create a new Socket.IO client
51    pub async fn new(
52        url: &str,
53        manager: Arc<RwLock<Option<MCPServerManager>>>,
54        computer_name: String,
55        auth_secret: Option<String>,
56        inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
57        headers: Option<HashMap<String, String>>,
58    ) -> ComputerResult<Self> {
59        let office_id = Arc::new(RwLock::new(None));
60        let manager_clone = manager.clone();
61        let computer_name_clone = computer_name.clone();
62        let office_id_clone = office_id.clone();
63        let inputs_clone = inputs.clone();
64
65        // 使用ClientBuilder注册事件处理器
66        // Use ClientBuilder to register event handlers
67        let mut builder = ClientBuilder::new(url)
68            .namespace(SMCP_NAMESPACE)
69            .transport_type(TransportType::Websocket);
70
71        // 如果提供了认证密钥,添加到请求头
72        // If auth secret is provided, add to request headers
73        if let Some(secret) = auth_secret {
74            builder = builder.opening_header("x-api-key", secret.as_str());
75        }
76
77        // 添加自定义 HTTP headers / Add custom HTTP headers
78        // Safety: opening_header 底层使用 http::HeaderValue::from_bytes() 做 RFC 7230 校验,
79        // 会拒绝包含 \r\n 等控制字符的恶意输入,无需额外防御 header injection。
80        // Safety: opening_header internally uses http::HeaderValue::from_bytes() for RFC 7230
81        // validation, rejecting \r\n and other control characters. No extra injection defense needed.
82        if let Some(custom_headers) = headers {
83            for (key, value) in custom_headers {
84                builder = builder.opening_header(key.as_str(), value.as_str());
85            }
86        }
87
88        let client = builder
89            .on_any(move |event, payload, client| {
90                // 只处理自定义事件
91                // Only handle custom events
92                let event_str = match event {
93                    Event::Custom(s) => s,
94                    _ => return async {}.boxed(),
95                };
96
97                match event_str.as_str() {
98                    CLIENT_TOOL_CALL => {
99                        let manager = manager_clone.clone();
100                        let computer_name = computer_name_clone.clone();
101                        let office_id = office_id_clone.clone();
102                        let client_clone = client.clone();
103                        let payload_clone = payload.clone();
104
105                        async move {
106                            match Self::handle_tool_call_with_ack(
107                                payload,
108                                manager,
109                                computer_name,
110                                office_id,
111                                client_clone,
112                            )
113                            .await
114                            {
115                                Ok((ack_id, response)) => {
116                                    if let Some(id) = ack_id {
117                                        if let Err(e) = client.ack_with_id(id, response).await {
118                                            error!("Failed to send ack: {}", e);
119                                        }
120                                    }
121                                }
122                                Err(e) => {
123                                    error!("Error handling tool call: {}", e);
124                                    // 尝试返回错误响应 / Try to return error response
125                                    if let Ok((Some(id), _)) = Self::extract_ack_id(payload_clone) {
126                                        let error_response = serde_json::json!({
127                                            "isError": true,
128                                            "content": [],
129                                            "structuredContent": {
130                                                "error": e.to_string(),
131                                                "error_type": "ComputerError"
132                                            }
133                                        });
134                                        let _ = client.ack_with_id(id, error_response).await;
135                                    }
136                                }
137                            }
138                        }
139                        .boxed()
140                    }
141                    CLIENT_GET_TOOLS => {
142                        let manager = manager_clone.clone();
143                        let computer_name = computer_name_clone.clone();
144                        let office_id = office_id_clone.clone();
145                        let client_clone = client.clone();
146
147                        async move {
148                            match Self::handle_get_tools_with_ack(
149                                payload,
150                                manager,
151                                computer_name,
152                                office_id,
153                                client_clone,
154                            )
155                            .await
156                            {
157                                Ok((ack_id, response)) => {
158                                    if let Some(id) = ack_id {
159                                        if let Err(e) = client.ack_with_id(id, response).await {
160                                            error!("Failed to send ack: {}", e);
161                                        }
162                                    }
163                                }
164                                Err(e) => {
165                                    error!("Error handling get tools: {}", e);
166                                }
167                            }
168                        }
169                        .boxed()
170                    }
171                    CLIENT_GET_CONFIG => {
172                        let manager = manager_clone.clone();
173                        let computer_name = computer_name_clone.clone();
174                        let office_id = office_id_clone.clone();
175                        let client_clone = client.clone();
176                        let inputs = inputs_clone.clone();
177
178                        async move {
179                            match Self::handle_get_config_with_ack(
180                                payload,
181                                manager,
182                                computer_name,
183                                office_id,
184                                client_clone,
185                                inputs,
186                            )
187                            .await
188                            {
189                                Ok((ack_id, response)) => {
190                                    if let Some(id) = ack_id {
191                                        if let Err(e) = client.ack_with_id(id, response).await {
192                                            error!("Failed to send ack: {}", e);
193                                        }
194                                    }
195                                }
196                                Err(e) => {
197                                    error!("Error handling get config: {}", e);
198                                }
199                            }
200                        }
201                        .boxed()
202                    }
203                    CLIENT_GET_DESKTOP => {
204                        let manager = manager_clone.clone();
205                        let computer_name = computer_name_clone.clone();
206                        let office_id = office_id_clone.clone();
207                        let client_clone = client.clone();
208
209                        async move {
210                            match Self::handle_get_desktop_with_ack(
211                                payload,
212                                manager,
213                                computer_name,
214                                office_id,
215                                client_clone,
216                            )
217                            .await
218                            {
219                                Ok((ack_id, response)) => {
220                                    if let Some(id) = ack_id {
221                                        if let Err(e) = client.ack_with_id(id, response).await {
222                                            error!("Failed to send ack: {}", e);
223                                        }
224                                    }
225                                }
226                                Err(e) => {
227                                    error!("Error handling get desktop: {}", e);
228                                }
229                            }
230                        }
231                        .boxed()
232                    }
233                    _ => {
234                        debug!("Unhandled event: {}", event_str);
235                        async {}.boxed()
236                    }
237                }
238            })
239            .connect()
240            .await
241            .map_err(|e| ComputerError::SocketIoError(format!("Failed to connect: {}", e)))?;
242
243        // 等待一小段时间确保 Socket.IO namespace 连接完全建立
244        // Wait a short time to ensure Socket.IO namespace connection is fully established
245        // Socket.IO 有两个连接阶段:Transport 层和 Namespace 层
246        // Socket.IO has two connection phases: Transport layer and Namespace layer
247        // connect() 只保证 Transport 层连接,namespace 连接是异步的
248        // connect() only guarantees Transport layer connection, namespace connection is async
249        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
250
251        info!(
252            "Connected to SMCP server at {} with computer name: {}",
253            url, computer_name
254        );
255
256        Ok(Self {
257            client,
258            computer_name,
259            office_id,
260            inputs,
261        })
262    }
263
264    /// 加入Office(Socket.IO Room)
265    /// Join an Office (Socket.IO Room)
266    pub async fn join_office(&self, office_id: &str) -> ComputerResult<()> {
267        debug!("Joining office: {}", office_id);
268
269        // 先设置office_id
270        // Set office_id first
271        *self.office_id.write().await = Some(office_id.to_string());
272
273        let req_data = serde_json::json!({
274            "office_id": office_id,
275            "role": "computer",
276            "name": self.computer_name
277        });
278
279        // 使用call方法等待服务器响应
280        // Use call method to wait for server response
281        match self.call(SERVER_JOIN_OFFICE, req_data, Some(10)).await {
282            Ok(response) => {
283                // 服务器返回的是 (bool, Option<String>) 元组序列化后的数组
284                // Server returns serialized array of (bool, Option<String>) tuple
285                debug!("Join office response: {:?}", response);
286
287                // 检查响应是否包含嵌套数组
288                // Check if response contains nested array
289                let actual_response = if response.len() == 1 {
290                    if let Some(arr) = response.first().and_then(|v| v.as_array()) {
291                        arr.to_vec()
292                    } else {
293                        response
294                    }
295                } else {
296                    response
297                };
298
299                if !actual_response.is_empty() {
300                    if let Some(success) = actual_response.first().and_then(|v| v.as_bool()) {
301                        if success {
302                            info!("Successfully joined office: {}", office_id);
303                            Ok(())
304                        } else {
305                            // 加入失败,重置office_id / Reset office_id on failure
306                            *self.office_id.write().await = None;
307                            let error_msg = actual_response
308                                .get(1)
309                                .and_then(|v| v.as_str())
310                                .unwrap_or("Unknown error");
311                            Err(ComputerError::SocketIoError(format!(
312                                "Failed to join office: {}",
313                                error_msg
314                            )))
315                        }
316                    } else {
317                        *self.office_id.write().await = None;
318                        Err(ComputerError::SocketIoError(format!(
319                            "Invalid response format from server: {:?}",
320                            actual_response
321                        )))
322                    }
323                } else {
324                    *self.office_id.write().await = None;
325                    Err(ComputerError::SocketIoError(
326                        "Empty response from server".to_string(),
327                    ))
328                }
329            }
330            Err(e) => {
331                *self.office_id.write().await = None;
332                Err(e)
333            }
334        }
335    }
336
337    /// 获取当前Office ID / Get current Office ID
338    pub async fn get_current_office_id(&self) -> ComputerResult<String> {
339        let office_id = self.office_id.read().await;
340        match office_id.as_ref() {
341            Some(id) => Ok(id.clone()),
342            None => Err(ComputerError::InvalidState(
343                "Not currently in any office".to_string(),
344            )),
345        }
346    }
347
348    /// 离开Office
349    /// Leave an Office
350    pub async fn leave_office(&self, office_id: &str) -> ComputerResult<()> {
351        debug!("Leaving office: {}", office_id);
352
353        let req_data = serde_json::json!({
354            "office_id": office_id
355        });
356
357        self.emit(SERVER_LEAVE_OFFICE, req_data).await?;
358        *self.office_id.write().await = None;
359
360        info!("Left office: {}", office_id);
361        Ok(())
362    }
363
364    /// 发送配置更新通知
365    /// Emit config update notification
366    pub async fn emit_update_config(&self) -> ComputerResult<()> {
367        let office_id = self.office_id.read().await;
368        if office_id.is_some() {
369            let req_data = serde_json::json!({
370                "computer": self.computer_name
371            });
372            self.emit(SERVER_UPDATE_CONFIG, req_data).await?;
373            info!("Emitted config update notification");
374        }
375        Ok(())
376    }
377
378    /// 发送工具列表更新通知
379    /// Emit tool list update notification
380    pub async fn emit_update_tool_list(&self) -> ComputerResult<()> {
381        let office_id = self.office_id.read().await;
382        if office_id.is_some() {
383            let req_data = serde_json::json!({
384                "computer": self.computer_name
385            });
386            self.emit(SERVER_UPDATE_TOOL_LIST, req_data).await?;
387            info!("Emitted tool list update notification");
388        }
389        Ok(())
390    }
391
392    /// 发送桌面更新通知
393    /// Emit desktop update notification
394    pub async fn emit_update_desktop(&self) -> ComputerResult<()> {
395        let office_id = self.office_id.read().await;
396        if office_id.is_some() {
397            let req_data = serde_json::json!({
398                "computer": self.computer_name
399            });
400            self.emit(SERVER_UPDATE_DESKTOP, req_data).await?;
401            info!("Emitted desktop update notification");
402        }
403        Ok(())
404    }
405
406    /// 发送事件(不等待响应)
407    /// Emit event without waiting for response
408    async fn emit(&self, event: &str, data: Value) -> ComputerResult<()> {
409        // 检查事件名 policy / Check event name policy
410        if event.starts_with("notify:") || event.starts_with("client:") {
411            return Err(ComputerError::InvalidState(
412                format!(
413                    "Computer 不允许发送 notify:* 或 client:* 事件 / Computer cannot send notify:* or client:* events: {}",
414                    event
415                )
416            ));
417        }
418
419        debug!("Emitting event: {}", event);
420
421        self.client
422            .emit(event, Payload::Text(vec![data], None))
423            .await
424            .map_err(|e| ComputerError::SocketIoError(format!("Failed to emit {}: {}", event, e)))
425    }
426
427    /// 发送事件并等待响应
428    /// Emit event and wait for response
429    async fn call(
430        &self,
431        event: &str,
432        data: Value,
433        timeout_secs: Option<u64>,
434    ) -> ComputerResult<Vec<Value>> {
435        // 检查事件名 policy / Check event name policy
436        if event.starts_with("notify:") || event.starts_with("client:") {
437            return Err(ComputerError::InvalidState(
438                format!(
439                    "Computer 不允许发送 notify:* 或 client:* 事件 / Computer cannot send notify:* or client:* events: {}",
440                    event
441                )
442            ));
443        }
444
445        let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(30));
446        debug!("Calling event: {} with timeout {:?}", event, timeout);
447
448        let (tx, rx) = tokio::sync::oneshot::channel();
449        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
450
451        let callback = move |payload: Payload, _client: Client| {
452            if let Some(tx_opt) = tx.try_lock().ok().and_then(|mut m| m.take()) {
453                let _ = tx_opt.send(payload);
454            }
455            async {}.boxed()
456        };
457
458        self.client
459            .emit_with_ack(event, Payload::Text(vec![data], None), timeout, callback)
460            .await
461            .map_err(|e| {
462                ComputerError::SocketIoError(format!("Failed to call {}: {}", event, e))
463            })?;
464
465        // 使用 tokio::time::timeout 来确保 rx.await 不会无限期等待
466        // Use tokio::time::timeout to ensure rx.await doesn't wait forever
467        match tokio::time::timeout(timeout, rx).await {
468            Ok(Ok(response)) => {
469                // 从响应中提取JSON数据 / Extract JSON data from response
470                match response {
471                    Payload::Text(values, _) => {
472                        debug!("Received response: {:?}", values);
473                        Ok(values)
474                    }
475                    #[allow(deprecated)]
476                    Payload::String(s, _) => {
477                        // 尝试解析字符串为JSON数组
478                        // Try to parse string as JSON array
479                        let parsed: Vec<Value> = serde_json::from_str(&s).map_err(|e| {
480                            ComputerError::SocketIoError(format!("Failed to parse response: {}", e))
481                        })?;
482                        debug!("Received parsed response: {:?}", parsed);
483                        Ok(parsed)
484                    }
485                    Payload::Binary(_, _) => Err(ComputerError::SocketIoError(
486                        "Binary response not supported".to_string(),
487                    )),
488                }
489            }
490            Ok(Err(_)) => {
491                error!("Channel closed while calling event: {}", event);
492                Err(ComputerError::SocketIoError(
493                    "Channel closed while waiting for response".to_string(),
494                ))
495            }
496            Err(_) => {
497                error!("Timeout while calling event: {}", event);
498                Err(ComputerError::SocketIoError(
499                    "Timeout while waiting for response".to_string(),
500                ))
501            }
502        }
503    }
504
505    /// 处理工具调用事件(带ACK响应)
506    /// Handle tool call event (with ACK response)
507    async fn handle_tool_call_with_ack(
508        payload: Payload,
509        manager: Arc<RwLock<Option<MCPServerManager>>>,
510        computer_name: String,
511        _office_id: Arc<RwLock<Option<String>>>,
512        _client: Client,
513    ) -> ComputerResult<(Option<i32>, Value)> {
514        let (ack_id, req) = Self::extract_ack_and_parse::<ToolCallReq>(payload)?;
515
516        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
517        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
518        if computer_name != req.computer {
519            return Err(ComputerError::ValidationError(format!(
520                "Computer name mismatch: expected {}, got {}",
521                computer_name, req.computer
522            )));
523        }
524
525        // 执行工具调用 / Execute tool call
526        let result = {
527            let manager_guard = manager.read().await;
528            match manager_guard.as_ref() {
529                Some(mgr) => {
530                    mgr.execute_tool(
531                        &req.tool_name,
532                        req.params,
533                        Some(std::time::Duration::from_secs(req.timeout as u64)),
534                    )
535                    .await?
536                }
537                None => {
538                    return Err(ComputerError::InvalidState(
539                        "MCP Manager not initialized".to_string(),
540                    ));
541                }
542            }
543        };
544
545        let result_value =
546            serde_json::to_value(result).map_err(ComputerError::SerializationError)?;
547
548        info!("Tool call executed successfully: {}", req.tool_name);
549        Ok((ack_id, result_value))
550    }
551
552    /// 处理获取工具列表事件(带ACK响应)
553    /// Handle get tools event (with ACK response)
554    async fn handle_get_tools_with_ack(
555        payload: Payload,
556        manager: Arc<RwLock<Option<MCPServerManager>>>,
557        computer_name: String,
558        _office_id: Arc<RwLock<Option<String>>>,
559        _client: Client,
560    ) -> ComputerResult<(Option<i32>, Value)> {
561        let (ack_id, req) = Self::extract_ack_and_parse::<GetToolsReq>(payload)?;
562
563        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
564        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
565        if computer_name != req.computer {
566            return Err(ComputerError::ValidationError(format!(
567                "Computer name mismatch: expected {}, got {}",
568                computer_name, req.computer
569            )));
570        }
571
572        // 获取工具列表 / Get tools list
573        let tools: Vec<smcp::SMCPTool> = {
574            let manager_guard = manager.read().await;
575            match manager_guard.as_ref() {
576                Some(mgr) => {
577                    // 转换Tool为SMCPTool
578                    // Convert Tool to SMCPTool
579                    let tool_list = mgr.list_available_tools().await;
580                    tool_list
581                        .into_iter()
582                        .map(convert_tool_to_smcp_tool)
583                        .collect()
584                }
585                None => {
586                    return Err(ComputerError::InvalidState(
587                        "MCP Manager not initialized".to_string(),
588                    ));
589                }
590            }
591        };
592
593        let response = GetToolsRet {
594            tools: tools.clone(),
595            req_id: req.base.req_id,
596        };
597
598        info!(
599            "Returned {} tools for agent {}",
600            tools.len(),
601            req.base.agent
602        );
603        Ok((ack_id, serde_json::to_value(response)?))
604    }
605
606    /// 处理获取配置事件(带ACK响应)
607    /// Handle get config event (with ACK response)
608    async fn handle_get_config_with_ack(
609        payload: Payload,
610        manager: Arc<RwLock<Option<MCPServerManager>>>,
611        computer_name: String,
612        _office_id: Arc<RwLock<Option<String>>>,
613        _client: Client,
614        inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
615    ) -> ComputerResult<(Option<i32>, Value)> {
616        let (ack_id, req) = Self::extract_ack_and_parse::<GetComputerConfigReq>(payload)?;
617
618        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
619        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
620        if computer_name != req.computer {
621            return Err(ComputerError::ValidationError(format!(
622                "Computer name mismatch: expected {}, got {}",
623                computer_name, req.computer
624            )));
625        }
626
627        // 获取配置 / Get config
628        let servers = {
629            let manager_guard = manager.read().await;
630            match manager_guard.as_ref() {
631                Some(mgr) => {
632                    // 获取完整服务器配置(不只是状态)
633                    // Get complete server configurations (not just status)
634                    mgr.get_server_configs().await
635                }
636                None => {
637                    return Err(ComputerError::InvalidState(
638                        "MCP Manager not initialized".to_string(),
639                    ));
640                }
641            }
642        };
643
644        // 获取输入定义 / Get input definitions
645        // 将 HashMap<String, MCPServerInput> 转换为 Vec<serde_json::Value>
646        // Convert HashMap<String, MCPServerInput> to Vec<serde_json::Value>
647        let inputs_data = {
648            let inputs_guard = inputs.read().await;
649            if inputs_guard.is_empty() {
650                None
651            } else {
652                let inputs_vec: Vec<serde_json::Value> = inputs_guard
653                    .values()
654                    .filter_map(|input| serde_json::to_value(input).ok())
655                    .collect();
656                if inputs_vec.is_empty() {
657                    None
658                } else {
659                    Some(inputs_vec)
660                }
661            }
662        };
663
664        let response = GetComputerConfigRet {
665            servers,
666            inputs: inputs_data,
667        };
668
669        info!("Returned config for agent {}", req.base.agent);
670        Ok((ack_id, serde_json::to_value(response)?))
671    }
672
673    /// 处理获取桌面事件(带ACK响应)
674    /// Handle get desktop event (with ACK response)
675    async fn handle_get_desktop_with_ack(
676        payload: Payload,
677        _manager: Arc<RwLock<Option<MCPServerManager>>>,
678        computer_name: String,
679        _office_id: Arc<RwLock<Option<String>>>,
680        _client: Client,
681    ) -> ComputerResult<(Option<i32>, Value)> {
682        let (ack_id, req) = Self::extract_ack_and_parse::<GetDesktopReq>(payload)?;
683
684        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
685        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
686        if computer_name != req.computer {
687            return Err(ComputerError::ValidationError(format!(
688                "Computer name mismatch: expected {}, got {}",
689                computer_name, req.computer
690            )));
691        }
692
693        // 获取桌面 / Get desktop
694        // TODO: 实现实际的桌面捕获逻辑
695        // TODO: Implement actual desktop capture logic
696        let desktops = Vec::<String>::new(); // 暂时返回空列表 / Return empty list for now
697
698        let response = GetDesktopRet {
699            desktops: Some(desktops),
700            req_id: req.base.req_id,
701        };
702
703        info!("Returned desktop for agent {}", req.base.agent);
704        Ok((ack_id, serde_json::to_value(response)?))
705    }
706
707    /// 从payload中提取ack_id并解析数据
708    /// Extract ack_id from payload and parse data
709    fn extract_ack_and_parse<T: serde::de::DeserializeOwned>(
710        payload: Payload,
711    ) -> ComputerResult<(Option<i32>, T)> {
712        match payload {
713            Payload::Text(mut values, ack_id) => {
714                if let Some(value) = values.pop() {
715                    let req =
716                        serde_json::from_value(value).map_err(ComputerError::SerializationError)?;
717                    Ok((ack_id, req))
718                } else {
719                    Err(ComputerError::ProtocolError("Empty payload".to_string()))
720                }
721            }
722            #[allow(deprecated)]
723            Payload::String(s, ack_id) => {
724                let req = serde_json::from_str(&s).map_err(ComputerError::SerializationError)?;
725                Ok((ack_id, req))
726            }
727            Payload::Binary(_, _) => Err(ComputerError::SocketIoError(
728                "Binary payload not supported".to_string(),
729            )),
730        }
731    }
732
733    /// 仅提取ack_id(用于错误处理)
734    /// Extract ack_id only (for error handling)
735    fn extract_ack_id(payload: Payload) -> ComputerResult<(Option<i32>, ())> {
736        match payload {
737            Payload::Text(_, ack_id) => Ok((ack_id, ())),
738            #[allow(deprecated)]
739            Payload::String(_, ack_id) => Ok((ack_id, ())),
740            Payload::Binary(_, _) => Ok((None, ())),
741        }
742    }
743
744    /// 断开连接
745    /// Disconnect from server
746    pub async fn disconnect(self) -> ComputerResult<()> {
747        debug!("Disconnecting from server");
748        self.client
749            .disconnect()
750            .await
751            .map_err(|e| ComputerError::SocketIoError(format!("Failed to disconnect: {}", e)))?;
752        info!("Disconnected from server");
753        Ok(())
754    }
755
756    /// 获取当前office ID
757    /// Get current office ID
758    pub async fn get_office_id(&self) -> Option<String> {
759        self.office_id.read().await.clone()
760    }
761
762    /// 获取连接的 URL
763    /// Get connected URL
764    pub fn get_url(&self) -> String {
765        // 由于 tf_rust_socketio 的 Client 没有 uri() 方法,返回默认值
766        // Since tf_rust_socketio Client doesn't have uri() method, return default
767        "unknown".to_string()
768    }
769
770    /// 获取连接的 namespace
771    /// Get connected namespace
772    pub fn get_namespace(&self) -> String {
773        // 从 client 中获取 namespace,如果无法获取则返回默认值
774        // Get namespace from client, return default if unable to get
775        "/smcp".to_string()
776    }
777}
778
779/// 将内部 Tool 转换为协议类型 SMCPTool
780/// Convert internal Tool to protocol type SMCPTool
781pub(crate) fn convert_tool_to_smcp_tool(tool: crate::mcp_clients::model::Tool) -> smcp::SMCPTool {
782    let mut meta_map = serde_json::Map::new();
783
784    // 传递 tool.meta 中的所有键值(如 a2c_tool_meta)
785    // 值需要序列化为 JSON 字符串,与 Python SDK 对齐
786    if let Some(existing_meta) = &tool.meta {
787        for (k, v) in existing_meta.iter() {
788            let str_val = if v.is_string() {
789                v.as_str().unwrap().to_string()
790            } else {
791                serde_json::to_string(v).unwrap_or_default()
792            };
793            meta_map.insert(k.clone(), serde_json::Value::String(str_val));
794        }
795    }
796
797    // 添加 MCP_TOOL_ANNOTATION
798    if let Some(annotations) = &tool.annotations {
799        if let Ok(json_str) = serde_json::to_string(annotations) {
800            meta_map.insert(
801                "MCP_TOOL_ANNOTATION".to_string(),
802                serde_json::Value::String(json_str),
803            );
804        }
805    }
806
807    let meta = if meta_map.is_empty() {
808        None
809    } else {
810        Some(serde_json::Value::Object(meta_map))
811    };
812
813    let description = tool.description.as_deref().unwrap_or("").to_string();
814    let params_schema = tool.schema_as_json_value();
815    smcp::SMCPTool {
816        name: tool.name.to_string(),
817        description,
818        params_schema,
819        return_schema: None,
820        meta,
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827    use crate::mcp_clients::model::{Tool, ToolAnnotations};
828    use serde_json::json;
829
830    fn make_tool(
831        meta: Option<serde_json::Map<String, serde_json::Value>>,
832        annotations: Option<ToolAnnotations>,
833    ) -> Tool {
834        use std::sync::Arc;
835        let input_schema: serde_json::Map<String, serde_json::Value> =
836            serde_json::from_value(json!({"type": "object"})).unwrap();
837        Tool {
838            name: "test_tool".into(),
839            title: None,
840            description: Some("A test tool".into()),
841            input_schema: Arc::new(input_schema),
842            output_schema: None,
843            annotations,
844            icons: None,
845            meta: meta.map(rmcp::model::Meta),
846        }
847    }
848
849    #[test]
850    fn test_tool_to_smcp_tool_with_meta_and_annotations() {
851        let mut meta = serde_json::Map::new();
852        meta.insert(
853            "a2c_tool_meta".to_string(),
854            json!({"tags": ["browser"], "priority": 1}),
855        );
856        let annotations = ToolAnnotations {
857            title: Some("Test".to_string()),
858            read_only_hint: Some(false),
859            destructive_hint: Some(false),
860            idempotent_hint: None,
861            open_world_hint: Some(false),
862        };
863        let smcp_tool = convert_tool_to_smcp_tool(make_tool(Some(meta), Some(annotations)));
864
865        let meta_obj = smcp_tool.meta.unwrap();
866        let meta_map = meta_obj.as_object().unwrap();
867        assert!(meta_map.contains_key("a2c_tool_meta"));
868        assert!(meta_map.contains_key("MCP_TOOL_ANNOTATION"));
869        // Values should be JSON strings
870        assert!(meta_map["a2c_tool_meta"].is_string());
871        assert!(meta_map["MCP_TOOL_ANNOTATION"].is_string());
872    }
873
874    #[test]
875    fn test_tool_to_smcp_tool_only_meta() {
876        let mut meta = serde_json::Map::new();
877        meta.insert("a2c_tool_meta".to_string(), json!({"tags": ["fs"]}));
878        let smcp_tool = convert_tool_to_smcp_tool(make_tool(Some(meta), None));
879
880        let meta_obj = smcp_tool.meta.unwrap();
881        let meta_map = meta_obj.as_object().unwrap();
882        assert_eq!(meta_map.len(), 1);
883        assert!(meta_map.contains_key("a2c_tool_meta"));
884    }
885
886    #[test]
887    fn test_tool_to_smcp_tool_only_annotations() {
888        let annotations = ToolAnnotations {
889            title: Some("My Tool".to_string()),
890            read_only_hint: Some(true),
891            destructive_hint: Some(false),
892            idempotent_hint: None,
893            open_world_hint: Some(false),
894        };
895        let smcp_tool = convert_tool_to_smcp_tool(make_tool(None, Some(annotations)));
896
897        let meta_obj = smcp_tool.meta.unwrap();
898        let meta_map = meta_obj.as_object().unwrap();
899        assert_eq!(meta_map.len(), 1);
900        assert!(meta_map.contains_key("MCP_TOOL_ANNOTATION"));
901    }
902
903    #[test]
904    fn test_tool_to_smcp_tool_no_meta_no_annotations() {
905        let smcp_tool = convert_tool_to_smcp_tool(make_tool(None, None));
906        assert!(smcp_tool.meta.is_none());
907    }
908
909    #[test]
910    fn test_tool_to_smcp_tool_string_value_not_double_serialized() {
911        let mut meta = serde_json::Map::new();
912        meta.insert(
913            "simple_key".to_string(),
914            serde_json::Value::String("already_a_string".to_string()),
915        );
916        let smcp_tool = convert_tool_to_smcp_tool(make_tool(Some(meta), None));
917
918        let meta_obj = smcp_tool.meta.unwrap();
919        let meta_map = meta_obj.as_object().unwrap();
920        // Should be the raw string, not "\"already_a_string\""
921        assert_eq!(meta_map["simple_key"].as_str().unwrap(), "already_a_string");
922    }
923}