1use crate::errors::{ComputerError, ComputerResult};
12use crate::mcp_clients::manager::MCPServerManager;
13use crate::mcp_clients::model::MCPServerInput;
14use futures_util::FutureExt;
15use serde_json::Value;
16use smcp::{
17 events::{
18 CLIENT_GET_CONFIG, CLIENT_GET_DESKTOP, CLIENT_GET_TOOLS, CLIENT_TOOL_CALL,
19 SERVER_JOIN_OFFICE, SERVER_LEAVE_OFFICE, SERVER_UPDATE_CONFIG, SERVER_UPDATE_DESKTOP,
20 SERVER_UPDATE_TOOL_LIST,
21 },
22 GetComputerConfigReq, GetComputerConfigRet, GetDesktopReq, GetDesktopRet, GetToolsReq,
23 GetToolsRet, ToolCallReq, SMCP_NAMESPACE,
24};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tf_rust_socketio::{
28 asynchronous::{Client, ClientBuilder},
29 Event, Payload, TransportType,
30};
31use tokio::sync::RwLock;
32use tracing::{debug, error, info};
33
34pub struct SmcpComputerClient {
37 client: Client,
39 computer_name: String,
41 office_id: Arc<RwLock<Option<String>>>,
43 #[allow(dead_code)]
45 inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
46}
47
48impl SmcpComputerClient {
49 pub async fn new(
52 url: &str,
53 manager: Arc<RwLock<Option<MCPServerManager>>>,
54 computer_name: String,
55 auth_secret: Option<String>,
56 inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
57 ) -> ComputerResult<Self> {
58 let office_id = Arc::new(RwLock::new(None));
59 let manager_clone = manager.clone();
60 let computer_name_clone = computer_name.clone();
61 let office_id_clone = office_id.clone();
62 let inputs_clone = inputs.clone();
63
64 let mut builder = ClientBuilder::new(url)
67 .namespace(SMCP_NAMESPACE)
68 .transport_type(TransportType::Websocket);
69
70 if let Some(secret) = auth_secret {
73 builder = builder.opening_header("x-api-key", secret.as_str());
74 }
75
76 let client = builder
77 .on_any(move |event, payload, client| {
78 let event_str = match event {
81 Event::Custom(s) => s,
82 _ => return async {}.boxed(),
83 };
84
85 match event_str.as_str() {
86 CLIENT_TOOL_CALL => {
87 let manager = manager_clone.clone();
88 let computer_name = computer_name_clone.clone();
89 let office_id = office_id_clone.clone();
90 let client_clone = client.clone();
91 let payload_clone = payload.clone();
92
93 async move {
94 match Self::handle_tool_call_with_ack(
95 payload,
96 manager,
97 computer_name,
98 office_id,
99 client_clone,
100 )
101 .await
102 {
103 Ok((ack_id, response)) => {
104 if let Some(id) = ack_id {
105 if let Err(e) = client.ack_with_id(id, response).await {
106 error!("Failed to send ack: {}", e);
107 }
108 }
109 }
110 Err(e) => {
111 error!("Error handling tool call: {}", e);
112 if let Ok((Some(id), _)) = Self::extract_ack_id(payload_clone) {
114 let error_response = serde_json::json!({
115 "isError": true,
116 "content": [],
117 "structuredContent": {
118 "error": e.to_string(),
119 "error_type": "ComputerError"
120 }
121 });
122 let _ = client.ack_with_id(id, error_response).await;
123 }
124 }
125 }
126 }
127 .boxed()
128 }
129 CLIENT_GET_TOOLS => {
130 let manager = manager_clone.clone();
131 let computer_name = computer_name_clone.clone();
132 let office_id = office_id_clone.clone();
133 let client_clone = client.clone();
134
135 async move {
136 match Self::handle_get_tools_with_ack(
137 payload,
138 manager,
139 computer_name,
140 office_id,
141 client_clone,
142 )
143 .await
144 {
145 Ok((ack_id, response)) => {
146 if let Some(id) = ack_id {
147 if let Err(e) = client.ack_with_id(id, response).await {
148 error!("Failed to send ack: {}", e);
149 }
150 }
151 }
152 Err(e) => {
153 error!("Error handling get tools: {}", e);
154 }
155 }
156 }
157 .boxed()
158 }
159 CLIENT_GET_CONFIG => {
160 let manager = manager_clone.clone();
161 let computer_name = computer_name_clone.clone();
162 let office_id = office_id_clone.clone();
163 let client_clone = client.clone();
164 let inputs = inputs_clone.clone();
165
166 async move {
167 match Self::handle_get_config_with_ack(
168 payload,
169 manager,
170 computer_name,
171 office_id,
172 client_clone,
173 inputs,
174 )
175 .await
176 {
177 Ok((ack_id, response)) => {
178 if let Some(id) = ack_id {
179 if let Err(e) = client.ack_with_id(id, response).await {
180 error!("Failed to send ack: {}", e);
181 }
182 }
183 }
184 Err(e) => {
185 error!("Error handling get config: {}", e);
186 }
187 }
188 }
189 .boxed()
190 }
191 CLIENT_GET_DESKTOP => {
192 let manager = manager_clone.clone();
193 let computer_name = computer_name_clone.clone();
194 let office_id = office_id_clone.clone();
195 let client_clone = client.clone();
196
197 async move {
198 match Self::handle_get_desktop_with_ack(
199 payload,
200 manager,
201 computer_name,
202 office_id,
203 client_clone,
204 )
205 .await
206 {
207 Ok((ack_id, response)) => {
208 if let Some(id) = ack_id {
209 if let Err(e) = client.ack_with_id(id, response).await {
210 error!("Failed to send ack: {}", e);
211 }
212 }
213 }
214 Err(e) => {
215 error!("Error handling get desktop: {}", e);
216 }
217 }
218 }
219 .boxed()
220 }
221 _ => {
222 debug!("Unhandled event: {}", event_str);
223 async {}.boxed()
224 }
225 }
226 })
227 .connect()
228 .await
229 .map_err(|e| ComputerError::SocketIoError(format!("Failed to connect: {}", e)))?;
230
231 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
238
239 info!(
240 "Connected to SMCP server at {} with computer name: {}",
241 url, computer_name
242 );
243
244 Ok(Self {
245 client,
246 computer_name,
247 office_id,
248 inputs,
249 })
250 }
251
252 pub async fn join_office(&self, office_id: &str) -> ComputerResult<()> {
255 debug!("Joining office: {}", office_id);
256
257 *self.office_id.write().await = Some(office_id.to_string());
260
261 let req_data = serde_json::json!({
262 "office_id": office_id,
263 "role": "computer",
264 "name": self.computer_name
265 });
266
267 match self.call(SERVER_JOIN_OFFICE, req_data, Some(10)).await {
270 Ok(response) => {
271 debug!("Join office response: {:?}", response);
274
275 let actual_response = if response.len() == 1 {
278 if let Some(arr) = response.first().and_then(|v| v.as_array()) {
279 arr.to_vec()
280 } else {
281 response
282 }
283 } else {
284 response
285 };
286
287 if !actual_response.is_empty() {
288 if let Some(success) = actual_response.first().and_then(|v| v.as_bool()) {
289 if success {
290 info!("Successfully joined office: {}", office_id);
291 Ok(())
292 } else {
293 *self.office_id.write().await = None;
295 let error_msg = actual_response
296 .get(1)
297 .and_then(|v| v.as_str())
298 .unwrap_or("Unknown error");
299 Err(ComputerError::SocketIoError(format!(
300 "Failed to join office: {}",
301 error_msg
302 )))
303 }
304 } else {
305 *self.office_id.write().await = None;
306 Err(ComputerError::SocketIoError(format!(
307 "Invalid response format from server: {:?}",
308 actual_response
309 )))
310 }
311 } else {
312 *self.office_id.write().await = None;
313 Err(ComputerError::SocketIoError(
314 "Empty response from server".to_string(),
315 ))
316 }
317 }
318 Err(e) => {
319 *self.office_id.write().await = None;
320 Err(e)
321 }
322 }
323 }
324
325 pub async fn get_current_office_id(&self) -> ComputerResult<String> {
327 let office_id = self.office_id.read().await;
328 match office_id.as_ref() {
329 Some(id) => Ok(id.clone()),
330 None => Err(ComputerError::InvalidState(
331 "Not currently in any office".to_string(),
332 )),
333 }
334 }
335
336 pub async fn leave_office(&self, office_id: &str) -> ComputerResult<()> {
339 debug!("Leaving office: {}", office_id);
340
341 let req_data = serde_json::json!({
342 "office_id": office_id
343 });
344
345 self.emit(SERVER_LEAVE_OFFICE, req_data).await?;
346 *self.office_id.write().await = None;
347
348 info!("Left office: {}", office_id);
349 Ok(())
350 }
351
352 pub async fn emit_update_config(&self) -> ComputerResult<()> {
355 let office_id = self.office_id.read().await;
356 if office_id.is_some() {
357 let req_data = serde_json::json!({
358 "computer": self.computer_name
359 });
360 self.emit(SERVER_UPDATE_CONFIG, req_data).await?;
361 info!("Emitted config update notification");
362 }
363 Ok(())
364 }
365
366 pub async fn emit_update_tool_list(&self) -> ComputerResult<()> {
369 let office_id = self.office_id.read().await;
370 if office_id.is_some() {
371 let req_data = serde_json::json!({
372 "computer": self.computer_name
373 });
374 self.emit(SERVER_UPDATE_TOOL_LIST, req_data).await?;
375 info!("Emitted tool list update notification");
376 }
377 Ok(())
378 }
379
380 pub async fn emit_update_desktop(&self) -> ComputerResult<()> {
383 let office_id = self.office_id.read().await;
384 if office_id.is_some() {
385 let req_data = serde_json::json!({
386 "computer": self.computer_name
387 });
388 self.emit(SERVER_UPDATE_DESKTOP, req_data).await?;
389 info!("Emitted desktop update notification");
390 }
391 Ok(())
392 }
393
394 async fn emit(&self, event: &str, data: Value) -> ComputerResult<()> {
397 if event.starts_with("notify:") || event.starts_with("client:") {
399 return Err(ComputerError::InvalidState(
400 format!(
401 "Computer 不允许发送 notify:* 或 client:* 事件 / Computer cannot send notify:* or client:* events: {}",
402 event
403 )
404 ));
405 }
406
407 debug!("Emitting event: {}", event);
408
409 self.client
410 .emit(event, Payload::Text(vec![data], None))
411 .await
412 .map_err(|e| ComputerError::SocketIoError(format!("Failed to emit {}: {}", event, e)))
413 }
414
415 async fn call(
418 &self,
419 event: &str,
420 data: Value,
421 timeout_secs: Option<u64>,
422 ) -> ComputerResult<Vec<Value>> {
423 if event.starts_with("notify:") || event.starts_with("client:") {
425 return Err(ComputerError::InvalidState(
426 format!(
427 "Computer 不允许发送 notify:* 或 client:* 事件 / Computer cannot send notify:* or client:* events: {}",
428 event
429 )
430 ));
431 }
432
433 let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(30));
434 debug!("Calling event: {} with timeout {:?}", event, timeout);
435
436 let (tx, rx) = tokio::sync::oneshot::channel();
437 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
438
439 let callback = move |payload: Payload, _client: Client| {
440 if let Some(tx_opt) = tx.try_lock().ok().and_then(|mut m| m.take()) {
441 let _ = tx_opt.send(payload);
442 }
443 async {}.boxed()
444 };
445
446 self.client
447 .emit_with_ack(event, Payload::Text(vec![data], None), timeout, callback)
448 .await
449 .map_err(|e| {
450 ComputerError::SocketIoError(format!("Failed to call {}: {}", event, e))
451 })?;
452
453 match tokio::time::timeout(timeout, rx).await {
456 Ok(Ok(response)) => {
457 match response {
459 Payload::Text(values, _) => {
460 debug!("Received response: {:?}", values);
461 Ok(values)
462 }
463 #[allow(deprecated)]
464 Payload::String(s, _) => {
465 let parsed: Vec<Value> = serde_json::from_str(&s).map_err(|e| {
468 ComputerError::SocketIoError(format!("Failed to parse response: {}", e))
469 })?;
470 debug!("Received parsed response: {:?}", parsed);
471 Ok(parsed)
472 }
473 Payload::Binary(_, _) => Err(ComputerError::SocketIoError(
474 "Binary response not supported".to_string(),
475 )),
476 }
477 }
478 Ok(Err(_)) => {
479 error!("Channel closed while calling event: {}", event);
480 Err(ComputerError::SocketIoError(
481 "Channel closed while waiting for response".to_string(),
482 ))
483 }
484 Err(_) => {
485 error!("Timeout while calling event: {}", event);
486 Err(ComputerError::SocketIoError(
487 "Timeout while waiting for response".to_string(),
488 ))
489 }
490 }
491 }
492
493 async fn handle_tool_call_with_ack(
496 payload: Payload,
497 manager: Arc<RwLock<Option<MCPServerManager>>>,
498 computer_name: String,
499 _office_id: Arc<RwLock<Option<String>>>,
500 _client: Client,
501 ) -> ComputerResult<(Option<i32>, Value)> {
502 let (ack_id, req) = Self::extract_ack_and_parse::<ToolCallReq>(payload)?;
503
504 if computer_name != req.computer {
507 return Err(ComputerError::ValidationError(format!(
508 "Computer name mismatch: expected {}, got {}",
509 computer_name, req.computer
510 )));
511 }
512
513 let result = {
515 let manager_guard = manager.read().await;
516 match manager_guard.as_ref() {
517 Some(mgr) => {
518 mgr.execute_tool(
519 &req.tool_name,
520 req.params,
521 Some(std::time::Duration::from_secs(req.timeout as u64)),
522 )
523 .await?
524 }
525 None => {
526 return Err(ComputerError::InvalidState(
527 "MCP Manager not initialized".to_string(),
528 ));
529 }
530 }
531 };
532
533 let result_value =
534 serde_json::to_value(result).map_err(ComputerError::SerializationError)?;
535
536 info!("Tool call executed successfully: {}", req.tool_name);
537 Ok((ack_id, result_value))
538 }
539
540 async fn handle_get_tools_with_ack(
543 payload: Payload,
544 manager: Arc<RwLock<Option<MCPServerManager>>>,
545 computer_name: String,
546 _office_id: Arc<RwLock<Option<String>>>,
547 _client: Client,
548 ) -> ComputerResult<(Option<i32>, Value)> {
549 let (ack_id, req) = Self::extract_ack_and_parse::<GetToolsReq>(payload)?;
550
551 if computer_name != req.computer {
554 return Err(ComputerError::ValidationError(format!(
555 "Computer name mismatch: expected {}, got {}",
556 computer_name, req.computer
557 )));
558 }
559
560 let tools: Vec<smcp::SMCPTool> = {
562 let manager_guard = manager.read().await;
563 match manager_guard.as_ref() {
564 Some(mgr) => {
565 let tool_list = mgr.list_available_tools().await;
568 tool_list
569 .into_iter()
570 .map(|tool| smcp::SMCPTool {
571 name: tool.name,
572 description: tool.description,
573 params_schema: tool.input_schema,
574 return_schema: None,
575 meta: None,
576 })
577 .collect()
578 }
579 None => {
580 return Err(ComputerError::InvalidState(
581 "MCP Manager not initialized".to_string(),
582 ));
583 }
584 }
585 };
586
587 let response = GetToolsRet {
588 tools: tools.clone(),
589 req_id: req.base.req_id,
590 };
591
592 info!(
593 "Returned {} tools for agent {}",
594 tools.len(),
595 req.base.agent
596 );
597 Ok((ack_id, serde_json::to_value(response)?))
598 }
599
600 async fn handle_get_config_with_ack(
603 payload: Payload,
604 manager: Arc<RwLock<Option<MCPServerManager>>>,
605 computer_name: String,
606 _office_id: Arc<RwLock<Option<String>>>,
607 _client: Client,
608 inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
609 ) -> ComputerResult<(Option<i32>, Value)> {
610 let (ack_id, req) = Self::extract_ack_and_parse::<GetComputerConfigReq>(payload)?;
611
612 if computer_name != req.computer {
615 return Err(ComputerError::ValidationError(format!(
616 "Computer name mismatch: expected {}, got {}",
617 computer_name, req.computer
618 )));
619 }
620
621 let servers = {
623 let manager_guard = manager.read().await;
624 match manager_guard.as_ref() {
625 Some(mgr) => {
626 mgr.get_server_configs().await
629 }
630 None => {
631 return Err(ComputerError::InvalidState(
632 "MCP Manager not initialized".to_string(),
633 ));
634 }
635 }
636 };
637
638 let inputs_data = {
642 let inputs_guard = inputs.read().await;
643 if inputs_guard.is_empty() {
644 None
645 } else {
646 let inputs_vec: Vec<serde_json::Value> = inputs_guard
647 .values()
648 .filter_map(|input| serde_json::to_value(input).ok())
649 .collect();
650 if inputs_vec.is_empty() {
651 None
652 } else {
653 Some(inputs_vec)
654 }
655 }
656 };
657
658 let response = GetComputerConfigRet {
659 servers,
660 inputs: inputs_data,
661 };
662
663 info!("Returned config for agent {}", req.base.agent);
664 Ok((ack_id, serde_json::to_value(response)?))
665 }
666
667 async fn handle_get_desktop_with_ack(
670 payload: Payload,
671 _manager: Arc<RwLock<Option<MCPServerManager>>>,
672 computer_name: String,
673 _office_id: Arc<RwLock<Option<String>>>,
674 _client: Client,
675 ) -> ComputerResult<(Option<i32>, Value)> {
676 let (ack_id, req) = Self::extract_ack_and_parse::<GetDesktopReq>(payload)?;
677
678 if computer_name != req.computer {
681 return Err(ComputerError::ValidationError(format!(
682 "Computer name mismatch: expected {}, got {}",
683 computer_name, req.computer
684 )));
685 }
686
687 let desktops = Vec::<String>::new(); let response = GetDesktopRet {
693 desktops: Some(desktops),
694 req_id: req.base.req_id,
695 };
696
697 info!("Returned desktop for agent {}", req.base.agent);
698 Ok((ack_id, serde_json::to_value(response)?))
699 }
700
701 fn extract_ack_and_parse<T: serde::de::DeserializeOwned>(
704 payload: Payload,
705 ) -> ComputerResult<(Option<i32>, T)> {
706 match payload {
707 Payload::Text(mut values, ack_id) => {
708 if let Some(value) = values.pop() {
709 let req =
710 serde_json::from_value(value).map_err(ComputerError::SerializationError)?;
711 Ok((ack_id, req))
712 } else {
713 Err(ComputerError::ProtocolError("Empty payload".to_string()))
714 }
715 }
716 #[allow(deprecated)]
717 Payload::String(s, ack_id) => {
718 let req = serde_json::from_str(&s).map_err(ComputerError::SerializationError)?;
719 Ok((ack_id, req))
720 }
721 Payload::Binary(_, _) => Err(ComputerError::SocketIoError(
722 "Binary payload not supported".to_string(),
723 )),
724 }
725 }
726
727 fn extract_ack_id(payload: Payload) -> ComputerResult<(Option<i32>, ())> {
730 match payload {
731 Payload::Text(_, ack_id) => Ok((ack_id, ())),
732 #[allow(deprecated)]
733 Payload::String(_, ack_id) => Ok((ack_id, ())),
734 Payload::Binary(_, _) => Ok((None, ())),
735 }
736 }
737
738 pub async fn disconnect(self) -> ComputerResult<()> {
741 debug!("Disconnecting from server");
742 self.client
743 .disconnect()
744 .await
745 .map_err(|e| ComputerError::SocketIoError(format!("Failed to disconnect: {}", e)))?;
746 info!("Disconnected from server");
747 Ok(())
748 }
749
750 pub async fn get_office_id(&self) -> Option<String> {
753 self.office_id.read().await.clone()
754 }
755
756 pub fn get_url(&self) -> String {
759 "unknown".to_string()
762 }
763
764 pub fn get_namespace(&self) -> String {
767 "/smcp".to_string()
770 }
771}