Skip to main content

smcp_computer/
socketio_client.rs

1/*!
2* 文件名: socketio_client
3* 作者: JQQ
4* 创建日期: 2025/12/16
5* 最后修改日期: 2025/12/16
6* 版权: 2023 JQQ. All rights reserved.
7* 依赖: tf_rust_socketio, tokio, serde
8* 描述: SMCP Computer的Socket.IO客户端实现 / Socket.IO client implementation for SMCP Computer
9*/
10
11use crate::errors::{ComputerError, ComputerResult};
12use crate::mcp_clients::manager::MCPServerManager;
13use crate::mcp_clients::model::MCPServerInput;
14use futures_util::FutureExt;
15use serde_json::Value;
16use smcp::{
17    events::{
18        CLIENT_GET_CONFIG, CLIENT_GET_DESKTOP, CLIENT_GET_TOOLS, CLIENT_TOOL_CALL,
19        SERVER_JOIN_OFFICE, SERVER_LEAVE_OFFICE, SERVER_UPDATE_CONFIG, SERVER_UPDATE_DESKTOP,
20        SERVER_UPDATE_TOOL_LIST,
21    },
22    GetComputerConfigReq, GetComputerConfigRet, GetDesktopReq, GetDesktopRet, GetToolsReq,
23    GetToolsRet, ToolCallReq, SMCP_NAMESPACE,
24};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tf_rust_socketio::{
28    asynchronous::{Client, ClientBuilder},
29    Event, Payload, TransportType,
30};
31use tokio::sync::RwLock;
32use tracing::{debug, error, info};
33
34/// SMCP Computer Socket.IO客户端
35/// SMCP Computer Socket.IO client
36pub struct SmcpComputerClient {
37    /// Socket.IO客户端实例 / Socket.IO client instance
38    client: Client,
39    /// Computer名称 / Computer name
40    computer_name: String,
41    /// 当前所在的office ID / Current office ID
42    office_id: Arc<RwLock<Option<String>>>,
43    /// 输入定义映射 / Input definitions map
44    #[allow(dead_code)]
45    inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
46}
47
48impl SmcpComputerClient {
49    /// 创建新的Socket.IO客户端
50    /// Create a new Socket.IO client
51    pub async fn new(
52        url: &str,
53        manager: Arc<RwLock<Option<MCPServerManager>>>,
54        computer_name: String,
55        auth_secret: Option<String>,
56        inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
57    ) -> ComputerResult<Self> {
58        let office_id = Arc::new(RwLock::new(None));
59        let manager_clone = manager.clone();
60        let computer_name_clone = computer_name.clone();
61        let office_id_clone = office_id.clone();
62        let inputs_clone = inputs.clone();
63
64        // 使用ClientBuilder注册事件处理器
65        // Use ClientBuilder to register event handlers
66        let mut builder = ClientBuilder::new(url)
67            .namespace(SMCP_NAMESPACE)
68            .transport_type(TransportType::Websocket);
69
70        // 如果提供了认证密钥,添加到请求头
71        // If auth secret is provided, add to request headers
72        if let Some(secret) = auth_secret {
73            builder = builder.opening_header("x-api-key", secret.as_str());
74        }
75
76        let client = builder
77            .on_any(move |event, payload, client| {
78                // 只处理自定义事件
79                // Only handle custom events
80                let event_str = match event {
81                    Event::Custom(s) => s,
82                    _ => return async {}.boxed(),
83                };
84
85                match event_str.as_str() {
86                    CLIENT_TOOL_CALL => {
87                        let manager = manager_clone.clone();
88                        let computer_name = computer_name_clone.clone();
89                        let office_id = office_id_clone.clone();
90                        let client_clone = client.clone();
91                        let payload_clone = payload.clone();
92
93                        async move {
94                            match Self::handle_tool_call_with_ack(
95                                payload,
96                                manager,
97                                computer_name,
98                                office_id,
99                                client_clone,
100                            )
101                            .await
102                            {
103                                Ok((ack_id, response)) => {
104                                    if let Some(id) = ack_id {
105                                        if let Err(e) = client.ack_with_id(id, response).await {
106                                            error!("Failed to send ack: {}", e);
107                                        }
108                                    }
109                                }
110                                Err(e) => {
111                                    error!("Error handling tool call: {}", e);
112                                    // 尝试返回错误响应 / Try to return error response
113                                    if let Ok((Some(id), _)) = Self::extract_ack_id(payload_clone) {
114                                        let error_response = serde_json::json!({
115                                            "isError": true,
116                                            "content": [],
117                                            "structuredContent": {
118                                                "error": e.to_string(),
119                                                "error_type": "ComputerError"
120                                            }
121                                        });
122                                        let _ = client.ack_with_id(id, error_response).await;
123                                    }
124                                }
125                            }
126                        }
127                        .boxed()
128                    }
129                    CLIENT_GET_TOOLS => {
130                        let manager = manager_clone.clone();
131                        let computer_name = computer_name_clone.clone();
132                        let office_id = office_id_clone.clone();
133                        let client_clone = client.clone();
134
135                        async move {
136                            match Self::handle_get_tools_with_ack(
137                                payload,
138                                manager,
139                                computer_name,
140                                office_id,
141                                client_clone,
142                            )
143                            .await
144                            {
145                                Ok((ack_id, response)) => {
146                                    if let Some(id) = ack_id {
147                                        if let Err(e) = client.ack_with_id(id, response).await {
148                                            error!("Failed to send ack: {}", e);
149                                        }
150                                    }
151                                }
152                                Err(e) => {
153                                    error!("Error handling get tools: {}", e);
154                                }
155                            }
156                        }
157                        .boxed()
158                    }
159                    CLIENT_GET_CONFIG => {
160                        let manager = manager_clone.clone();
161                        let computer_name = computer_name_clone.clone();
162                        let office_id = office_id_clone.clone();
163                        let client_clone = client.clone();
164                        let inputs = inputs_clone.clone();
165
166                        async move {
167                            match Self::handle_get_config_with_ack(
168                                payload,
169                                manager,
170                                computer_name,
171                                office_id,
172                                client_clone,
173                                inputs,
174                            )
175                            .await
176                            {
177                                Ok((ack_id, response)) => {
178                                    if let Some(id) = ack_id {
179                                        if let Err(e) = client.ack_with_id(id, response).await {
180                                            error!("Failed to send ack: {}", e);
181                                        }
182                                    }
183                                }
184                                Err(e) => {
185                                    error!("Error handling get config: {}", e);
186                                }
187                            }
188                        }
189                        .boxed()
190                    }
191                    CLIENT_GET_DESKTOP => {
192                        let manager = manager_clone.clone();
193                        let computer_name = computer_name_clone.clone();
194                        let office_id = office_id_clone.clone();
195                        let client_clone = client.clone();
196
197                        async move {
198                            match Self::handle_get_desktop_with_ack(
199                                payload,
200                                manager,
201                                computer_name,
202                                office_id,
203                                client_clone,
204                            )
205                            .await
206                            {
207                                Ok((ack_id, response)) => {
208                                    if let Some(id) = ack_id {
209                                        if let Err(e) = client.ack_with_id(id, response).await {
210                                            error!("Failed to send ack: {}", e);
211                                        }
212                                    }
213                                }
214                                Err(e) => {
215                                    error!("Error handling get desktop: {}", e);
216                                }
217                            }
218                        }
219                        .boxed()
220                    }
221                    _ => {
222                        debug!("Unhandled event: {}", event_str);
223                        async {}.boxed()
224                    }
225                }
226            })
227            .connect()
228            .await
229            .map_err(|e| ComputerError::SocketIoError(format!("Failed to connect: {}", e)))?;
230
231        // 等待一小段时间确保 Socket.IO namespace 连接完全建立
232        // Wait a short time to ensure Socket.IO namespace connection is fully established
233        // Socket.IO 有两个连接阶段:Transport 层和 Namespace 层
234        // Socket.IO has two connection phases: Transport layer and Namespace layer
235        // connect() 只保证 Transport 层连接,namespace 连接是异步的
236        // connect() only guarantees Transport layer connection, namespace connection is async
237        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
238
239        info!(
240            "Connected to SMCP server at {} with computer name: {}",
241            url, computer_name
242        );
243
244        Ok(Self {
245            client,
246            computer_name,
247            office_id,
248            inputs,
249        })
250    }
251
252    /// 加入Office(Socket.IO Room)
253    /// Join an Office (Socket.IO Room)
254    pub async fn join_office(&self, office_id: &str) -> ComputerResult<()> {
255        debug!("Joining office: {}", office_id);
256
257        // 先设置office_id
258        // Set office_id first
259        *self.office_id.write().await = Some(office_id.to_string());
260
261        let req_data = serde_json::json!({
262            "office_id": office_id,
263            "role": "computer",
264            "name": self.computer_name
265        });
266
267        // 使用call方法等待服务器响应
268        // Use call method to wait for server response
269        match self.call(SERVER_JOIN_OFFICE, req_data, Some(10)).await {
270            Ok(response) => {
271                // 服务器返回的是 (bool, Option<String>) 元组序列化后的数组
272                // Server returns serialized array of (bool, Option<String>) tuple
273                debug!("Join office response: {:?}", response);
274
275                // 检查响应是否包含嵌套数组
276                // Check if response contains nested array
277                let actual_response = if response.len() == 1 {
278                    if let Some(arr) = response.first().and_then(|v| v.as_array()) {
279                        arr.to_vec()
280                    } else {
281                        response
282                    }
283                } else {
284                    response
285                };
286
287                if !actual_response.is_empty() {
288                    if let Some(success) = actual_response.first().and_then(|v| v.as_bool()) {
289                        if success {
290                            info!("Successfully joined office: {}", office_id);
291                            Ok(())
292                        } else {
293                            // 加入失败,重置office_id / Reset office_id on failure
294                            *self.office_id.write().await = None;
295                            let error_msg = actual_response
296                                .get(1)
297                                .and_then(|v| v.as_str())
298                                .unwrap_or("Unknown error");
299                            Err(ComputerError::SocketIoError(format!(
300                                "Failed to join office: {}",
301                                error_msg
302                            )))
303                        }
304                    } else {
305                        *self.office_id.write().await = None;
306                        Err(ComputerError::SocketIoError(format!(
307                            "Invalid response format from server: {:?}",
308                            actual_response
309                        )))
310                    }
311                } else {
312                    *self.office_id.write().await = None;
313                    Err(ComputerError::SocketIoError(
314                        "Empty response from server".to_string(),
315                    ))
316                }
317            }
318            Err(e) => {
319                *self.office_id.write().await = None;
320                Err(e)
321            }
322        }
323    }
324
325    /// 获取当前Office ID / Get current Office ID
326    pub async fn get_current_office_id(&self) -> ComputerResult<String> {
327        let office_id = self.office_id.read().await;
328        match office_id.as_ref() {
329            Some(id) => Ok(id.clone()),
330            None => Err(ComputerError::InvalidState(
331                "Not currently in any office".to_string(),
332            )),
333        }
334    }
335
336    /// 离开Office
337    /// Leave an Office
338    pub async fn leave_office(&self, office_id: &str) -> ComputerResult<()> {
339        debug!("Leaving office: {}", office_id);
340
341        let req_data = serde_json::json!({
342            "office_id": office_id
343        });
344
345        self.emit(SERVER_LEAVE_OFFICE, req_data).await?;
346        *self.office_id.write().await = None;
347
348        info!("Left office: {}", office_id);
349        Ok(())
350    }
351
352    /// 发送配置更新通知
353    /// Emit config update notification
354    pub async fn emit_update_config(&self) -> ComputerResult<()> {
355        let office_id = self.office_id.read().await;
356        if office_id.is_some() {
357            let req_data = serde_json::json!({
358                "computer": self.computer_name
359            });
360            self.emit(SERVER_UPDATE_CONFIG, req_data).await?;
361            info!("Emitted config update notification");
362        }
363        Ok(())
364    }
365
366    /// 发送工具列表更新通知
367    /// Emit tool list update notification
368    pub async fn emit_update_tool_list(&self) -> ComputerResult<()> {
369        let office_id = self.office_id.read().await;
370        if office_id.is_some() {
371            let req_data = serde_json::json!({
372                "computer": self.computer_name
373            });
374            self.emit(SERVER_UPDATE_TOOL_LIST, req_data).await?;
375            info!("Emitted tool list update notification");
376        }
377        Ok(())
378    }
379
380    /// 发送桌面更新通知
381    /// Emit desktop update notification
382    pub async fn emit_update_desktop(&self) -> ComputerResult<()> {
383        let office_id = self.office_id.read().await;
384        if office_id.is_some() {
385            let req_data = serde_json::json!({
386                "computer": self.computer_name
387            });
388            self.emit(SERVER_UPDATE_DESKTOP, req_data).await?;
389            info!("Emitted desktop update notification");
390        }
391        Ok(())
392    }
393
394    /// 发送事件(不等待响应)
395    /// Emit event without waiting for response
396    async fn emit(&self, event: &str, data: Value) -> ComputerResult<()> {
397        // 检查事件名 policy / Check event name policy
398        if event.starts_with("notify:") || event.starts_with("client:") {
399            return Err(ComputerError::InvalidState(
400                format!(
401                    "Computer 不允许发送 notify:* 或 client:* 事件 / Computer cannot send notify:* or client:* events: {}",
402                    event
403                )
404            ));
405        }
406
407        debug!("Emitting event: {}", event);
408
409        self.client
410            .emit(event, Payload::Text(vec![data], None))
411            .await
412            .map_err(|e| ComputerError::SocketIoError(format!("Failed to emit {}: {}", event, e)))
413    }
414
415    /// 发送事件并等待响应
416    /// Emit event and wait for response
417    async fn call(
418        &self,
419        event: &str,
420        data: Value,
421        timeout_secs: Option<u64>,
422    ) -> ComputerResult<Vec<Value>> {
423        // 检查事件名 policy / Check event name policy
424        if event.starts_with("notify:") || event.starts_with("client:") {
425            return Err(ComputerError::InvalidState(
426                format!(
427                    "Computer 不允许发送 notify:* 或 client:* 事件 / Computer cannot send notify:* or client:* events: {}",
428                    event
429                )
430            ));
431        }
432
433        let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(30));
434        debug!("Calling event: {} with timeout {:?}", event, timeout);
435
436        let (tx, rx) = tokio::sync::oneshot::channel();
437        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
438
439        let callback = move |payload: Payload, _client: Client| {
440            if let Some(tx_opt) = tx.try_lock().ok().and_then(|mut m| m.take()) {
441                let _ = tx_opt.send(payload);
442            }
443            async {}.boxed()
444        };
445
446        self.client
447            .emit_with_ack(event, Payload::Text(vec![data], None), timeout, callback)
448            .await
449            .map_err(|e| {
450                ComputerError::SocketIoError(format!("Failed to call {}: {}", event, e))
451            })?;
452
453        // 使用 tokio::time::timeout 来确保 rx.await 不会无限期等待
454        // Use tokio::time::timeout to ensure rx.await doesn't wait forever
455        match tokio::time::timeout(timeout, rx).await {
456            Ok(Ok(response)) => {
457                // 从响应中提取JSON数据 / Extract JSON data from response
458                match response {
459                    Payload::Text(values, _) => {
460                        debug!("Received response: {:?}", values);
461                        Ok(values)
462                    }
463                    #[allow(deprecated)]
464                    Payload::String(s, _) => {
465                        // 尝试解析字符串为JSON数组
466                        // Try to parse string as JSON array
467                        let parsed: Vec<Value> = serde_json::from_str(&s).map_err(|e| {
468                            ComputerError::SocketIoError(format!("Failed to parse response: {}", e))
469                        })?;
470                        debug!("Received parsed response: {:?}", parsed);
471                        Ok(parsed)
472                    }
473                    Payload::Binary(_, _) => Err(ComputerError::SocketIoError(
474                        "Binary response not supported".to_string(),
475                    )),
476                }
477            }
478            Ok(Err(_)) => {
479                error!("Channel closed while calling event: {}", event);
480                Err(ComputerError::SocketIoError(
481                    "Channel closed while waiting for response".to_string(),
482                ))
483            }
484            Err(_) => {
485                error!("Timeout while calling event: {}", event);
486                Err(ComputerError::SocketIoError(
487                    "Timeout while waiting for response".to_string(),
488                ))
489            }
490        }
491    }
492
493    /// 处理工具调用事件(带ACK响应)
494    /// Handle tool call event (with ACK response)
495    async fn handle_tool_call_with_ack(
496        payload: Payload,
497        manager: Arc<RwLock<Option<MCPServerManager>>>,
498        computer_name: String,
499        _office_id: Arc<RwLock<Option<String>>>,
500        _client: Client,
501    ) -> ComputerResult<(Option<i32>, Value)> {
502        let (ack_id, req) = Self::extract_ack_and_parse::<ToolCallReq>(payload)?;
503
504        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
505        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
506        if computer_name != req.computer {
507            return Err(ComputerError::ValidationError(format!(
508                "Computer name mismatch: expected {}, got {}",
509                computer_name, req.computer
510            )));
511        }
512
513        // 执行工具调用 / Execute tool call
514        let result = {
515            let manager_guard = manager.read().await;
516            match manager_guard.as_ref() {
517                Some(mgr) => {
518                    mgr.execute_tool(
519                        &req.tool_name,
520                        req.params,
521                        Some(std::time::Duration::from_secs(req.timeout as u64)),
522                    )
523                    .await?
524                }
525                None => {
526                    return Err(ComputerError::InvalidState(
527                        "MCP Manager not initialized".to_string(),
528                    ));
529                }
530            }
531        };
532
533        let result_value =
534            serde_json::to_value(result).map_err(ComputerError::SerializationError)?;
535
536        info!("Tool call executed successfully: {}", req.tool_name);
537        Ok((ack_id, result_value))
538    }
539
540    /// 处理获取工具列表事件(带ACK响应)
541    /// Handle get tools event (with ACK response)
542    async fn handle_get_tools_with_ack(
543        payload: Payload,
544        manager: Arc<RwLock<Option<MCPServerManager>>>,
545        computer_name: String,
546        _office_id: Arc<RwLock<Option<String>>>,
547        _client: Client,
548    ) -> ComputerResult<(Option<i32>, Value)> {
549        let (ack_id, req) = Self::extract_ack_and_parse::<GetToolsReq>(payload)?;
550
551        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
552        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
553        if computer_name != req.computer {
554            return Err(ComputerError::ValidationError(format!(
555                "Computer name mismatch: expected {}, got {}",
556                computer_name, req.computer
557            )));
558        }
559
560        // 获取工具列表 / Get tools list
561        let tools: Vec<smcp::SMCPTool> = {
562            let manager_guard = manager.read().await;
563            match manager_guard.as_ref() {
564                Some(mgr) => {
565                    // 转换Tool为SMCPTool
566                    // Convert Tool to SMCPTool
567                    let tool_list = mgr.list_available_tools().await;
568                    tool_list
569                        .into_iter()
570                        .map(convert_tool_to_smcp_tool)
571                        .collect()
572                }
573                None => {
574                    return Err(ComputerError::InvalidState(
575                        "MCP Manager not initialized".to_string(),
576                    ));
577                }
578            }
579        };
580
581        let response = GetToolsRet {
582            tools: tools.clone(),
583            req_id: req.base.req_id,
584        };
585
586        info!(
587            "Returned {} tools for agent {}",
588            tools.len(),
589            req.base.agent
590        );
591        Ok((ack_id, serde_json::to_value(response)?))
592    }
593
594    /// 处理获取配置事件(带ACK响应)
595    /// Handle get config event (with ACK response)
596    async fn handle_get_config_with_ack(
597        payload: Payload,
598        manager: Arc<RwLock<Option<MCPServerManager>>>,
599        computer_name: String,
600        _office_id: Arc<RwLock<Option<String>>>,
601        _client: Client,
602        inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
603    ) -> ComputerResult<(Option<i32>, Value)> {
604        let (ack_id, req) = Self::extract_ack_and_parse::<GetComputerConfigReq>(payload)?;
605
606        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
607        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
608        if computer_name != req.computer {
609            return Err(ComputerError::ValidationError(format!(
610                "Computer name mismatch: expected {}, got {}",
611                computer_name, req.computer
612            )));
613        }
614
615        // 获取配置 / Get config
616        let servers = {
617            let manager_guard = manager.read().await;
618            match manager_guard.as_ref() {
619                Some(mgr) => {
620                    // 获取完整服务器配置(不只是状态)
621                    // Get complete server configurations (not just status)
622                    mgr.get_server_configs().await
623                }
624                None => {
625                    return Err(ComputerError::InvalidState(
626                        "MCP Manager not initialized".to_string(),
627                    ));
628                }
629            }
630        };
631
632        // 获取输入定义 / Get input definitions
633        // 将 HashMap<String, MCPServerInput> 转换为 Vec<serde_json::Value>
634        // Convert HashMap<String, MCPServerInput> to Vec<serde_json::Value>
635        let inputs_data = {
636            let inputs_guard = inputs.read().await;
637            if inputs_guard.is_empty() {
638                None
639            } else {
640                let inputs_vec: Vec<serde_json::Value> = inputs_guard
641                    .values()
642                    .filter_map(|input| serde_json::to_value(input).ok())
643                    .collect();
644                if inputs_vec.is_empty() {
645                    None
646                } else {
647                    Some(inputs_vec)
648                }
649            }
650        };
651
652        let response = GetComputerConfigRet {
653            servers,
654            inputs: inputs_data,
655        };
656
657        info!("Returned config for agent {}", req.base.agent);
658        Ok((ack_id, serde_json::to_value(response)?))
659    }
660
661    /// 处理获取桌面事件(带ACK响应)
662    /// Handle get desktop event (with ACK response)
663    async fn handle_get_desktop_with_ack(
664        payload: Payload,
665        _manager: Arc<RwLock<Option<MCPServerManager>>>,
666        computer_name: String,
667        _office_id: Arc<RwLock<Option<String>>>,
668        _client: Client,
669    ) -> ComputerResult<(Option<i32>, Value)> {
670        let (ack_id, req) = Self::extract_ack_and_parse::<GetDesktopReq>(payload)?;
671
672        // 验证 computer_name(Server 路由已保证请求来自同一 office,无需验证 agent 字段)
673        // Validate computer_name (Server routing ensures request is from same office, no need to validate agent field)
674        if computer_name != req.computer {
675            return Err(ComputerError::ValidationError(format!(
676                "Computer name mismatch: expected {}, got {}",
677                computer_name, req.computer
678            )));
679        }
680
681        // 获取桌面 / Get desktop
682        // TODO: 实现实际的桌面捕获逻辑
683        // TODO: Implement actual desktop capture logic
684        let desktops = Vec::<String>::new(); // 暂时返回空列表 / Return empty list for now
685
686        let response = GetDesktopRet {
687            desktops: Some(desktops),
688            req_id: req.base.req_id,
689        };
690
691        info!("Returned desktop for agent {}", req.base.agent);
692        Ok((ack_id, serde_json::to_value(response)?))
693    }
694
695    /// 从payload中提取ack_id并解析数据
696    /// Extract ack_id from payload and parse data
697    fn extract_ack_and_parse<T: serde::de::DeserializeOwned>(
698        payload: Payload,
699    ) -> ComputerResult<(Option<i32>, T)> {
700        match payload {
701            Payload::Text(mut values, ack_id) => {
702                if let Some(value) = values.pop() {
703                    let req =
704                        serde_json::from_value(value).map_err(ComputerError::SerializationError)?;
705                    Ok((ack_id, req))
706                } else {
707                    Err(ComputerError::ProtocolError("Empty payload".to_string()))
708                }
709            }
710            #[allow(deprecated)]
711            Payload::String(s, ack_id) => {
712                let req = serde_json::from_str(&s).map_err(ComputerError::SerializationError)?;
713                Ok((ack_id, req))
714            }
715            Payload::Binary(_, _) => Err(ComputerError::SocketIoError(
716                "Binary payload not supported".to_string(),
717            )),
718        }
719    }
720
721    /// 仅提取ack_id(用于错误处理)
722    /// Extract ack_id only (for error handling)
723    fn extract_ack_id(payload: Payload) -> ComputerResult<(Option<i32>, ())> {
724        match payload {
725            Payload::Text(_, ack_id) => Ok((ack_id, ())),
726            #[allow(deprecated)]
727            Payload::String(_, ack_id) => Ok((ack_id, ())),
728            Payload::Binary(_, _) => Ok((None, ())),
729        }
730    }
731
732    /// 断开连接
733    /// Disconnect from server
734    pub async fn disconnect(self) -> ComputerResult<()> {
735        debug!("Disconnecting from server");
736        self.client
737            .disconnect()
738            .await
739            .map_err(|e| ComputerError::SocketIoError(format!("Failed to disconnect: {}", e)))?;
740        info!("Disconnected from server");
741        Ok(())
742    }
743
744    /// 获取当前office ID
745    /// Get current office ID
746    pub async fn get_office_id(&self) -> Option<String> {
747        self.office_id.read().await.clone()
748    }
749
750    /// 获取连接的 URL
751    /// Get connected URL
752    pub fn get_url(&self) -> String {
753        // 由于 tf_rust_socketio 的 Client 没有 uri() 方法,返回默认值
754        // Since tf_rust_socketio Client doesn't have uri() method, return default
755        "unknown".to_string()
756    }
757
758    /// 获取连接的 namespace
759    /// Get connected namespace
760    pub fn get_namespace(&self) -> String {
761        // 从 client 中获取 namespace,如果无法获取则返回默认值
762        // Get namespace from client, return default if unable to get
763        "/smcp".to_string()
764    }
765}
766
767/// 将内部 Tool 转换为协议类型 SMCPTool
768/// Convert internal Tool to protocol type SMCPTool
769pub(crate) fn convert_tool_to_smcp_tool(tool: crate::mcp_clients::model::Tool) -> smcp::SMCPTool {
770    let mut meta_map = serde_json::Map::new();
771
772    // 传递 tool.meta 中的所有键值(如 a2c_tool_meta)
773    // 值需要序列化为 JSON 字符串,与 Python SDK 对齐
774    if let Some(existing_meta) = &tool.meta {
775        for (k, v) in existing_meta.iter() {
776            let str_val = if v.is_string() {
777                v.as_str().unwrap().to_string()
778            } else {
779                serde_json::to_string(v).unwrap_or_default()
780            };
781            meta_map.insert(k.clone(), serde_json::Value::String(str_val));
782        }
783    }
784
785    // 添加 MCP_TOOL_ANNOTATION
786    if let Some(annotations) = &tool.annotations {
787        if let Ok(json_str) = serde_json::to_string(annotations) {
788            meta_map.insert(
789                "MCP_TOOL_ANNOTATION".to_string(),
790                serde_json::Value::String(json_str),
791            );
792        }
793    }
794
795    let meta = if meta_map.is_empty() {
796        None
797    } else {
798        Some(serde_json::Value::Object(meta_map))
799    };
800
801    let description = tool.description.as_deref().unwrap_or("").to_string();
802    let params_schema = tool.schema_as_json_value();
803    smcp::SMCPTool {
804        name: tool.name.to_string(),
805        description,
806        params_schema,
807        return_schema: None,
808        meta,
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815    use crate::mcp_clients::model::{Tool, ToolAnnotations};
816    use serde_json::json;
817
818    fn make_tool(
819        meta: Option<serde_json::Map<String, serde_json::Value>>,
820        annotations: Option<ToolAnnotations>,
821    ) -> Tool {
822        use std::sync::Arc;
823        let input_schema: serde_json::Map<String, serde_json::Value> =
824            serde_json::from_value(json!({"type": "object"})).unwrap();
825        Tool {
826            name: "test_tool".into(),
827            title: None,
828            description: Some("A test tool".into()),
829            input_schema: Arc::new(input_schema),
830            output_schema: None,
831            annotations,
832            icons: None,
833            meta: meta.map(rmcp::model::Meta),
834        }
835    }
836
837    #[test]
838    fn test_tool_to_smcp_tool_with_meta_and_annotations() {
839        let mut meta = serde_json::Map::new();
840        meta.insert(
841            "a2c_tool_meta".to_string(),
842            json!({"tags": ["browser"], "priority": 1}),
843        );
844        let annotations = ToolAnnotations {
845            title: Some("Test".to_string()),
846            read_only_hint: Some(false),
847            destructive_hint: Some(false),
848            idempotent_hint: None,
849            open_world_hint: Some(false),
850        };
851        let smcp_tool = convert_tool_to_smcp_tool(make_tool(Some(meta), Some(annotations)));
852
853        let meta_obj = smcp_tool.meta.unwrap();
854        let meta_map = meta_obj.as_object().unwrap();
855        assert!(meta_map.contains_key("a2c_tool_meta"));
856        assert!(meta_map.contains_key("MCP_TOOL_ANNOTATION"));
857        // Values should be JSON strings
858        assert!(meta_map["a2c_tool_meta"].is_string());
859        assert!(meta_map["MCP_TOOL_ANNOTATION"].is_string());
860    }
861
862    #[test]
863    fn test_tool_to_smcp_tool_only_meta() {
864        let mut meta = serde_json::Map::new();
865        meta.insert("a2c_tool_meta".to_string(), json!({"tags": ["fs"]}));
866        let smcp_tool = convert_tool_to_smcp_tool(make_tool(Some(meta), None));
867
868        let meta_obj = smcp_tool.meta.unwrap();
869        let meta_map = meta_obj.as_object().unwrap();
870        assert_eq!(meta_map.len(), 1);
871        assert!(meta_map.contains_key("a2c_tool_meta"));
872    }
873
874    #[test]
875    fn test_tool_to_smcp_tool_only_annotations() {
876        let annotations = ToolAnnotations {
877            title: Some("My Tool".to_string()),
878            read_only_hint: Some(true),
879            destructive_hint: Some(false),
880            idempotent_hint: None,
881            open_world_hint: Some(false),
882        };
883        let smcp_tool = convert_tool_to_smcp_tool(make_tool(None, Some(annotations)));
884
885        let meta_obj = smcp_tool.meta.unwrap();
886        let meta_map = meta_obj.as_object().unwrap();
887        assert_eq!(meta_map.len(), 1);
888        assert!(meta_map.contains_key("MCP_TOOL_ANNOTATION"));
889    }
890
891    #[test]
892    fn test_tool_to_smcp_tool_no_meta_no_annotations() {
893        let smcp_tool = convert_tool_to_smcp_tool(make_tool(None, None));
894        assert!(smcp_tool.meta.is_none());
895    }
896
897    #[test]
898    fn test_tool_to_smcp_tool_string_value_not_double_serialized() {
899        let mut meta = serde_json::Map::new();
900        meta.insert(
901            "simple_key".to_string(),
902            serde_json::Value::String("already_a_string".to_string()),
903        );
904        let smcp_tool = convert_tool_to_smcp_tool(make_tool(Some(meta), None));
905
906        let meta_obj = smcp_tool.meta.unwrap();
907        let meta_map = meta_obj.as_object().unwrap();
908        // Should be the raw string, not "\"already_a_string\""
909        assert_eq!(meta_map["simple_key"].as_str().unwrap(), "already_a_string");
910    }
911}