1use async_trait::async_trait;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{Mutex, RwLock};
17use tracing::{debug, error, info};
18
19use crate::errors::{ComputerError, ComputerResult};
20use crate::inputs::handler::InputHandler;
21use crate::inputs::model::InputValue;
22use crate::inputs::utils::run_command;
23use crate::mcp_clients::{
24 manager::MCPServerManager,
25 model::{
26 content_as_text, is_call_tool_error, CallToolResult, Content, MCPServerConfig,
27 MCPServerInput, ReadResourceResult, Resource, Tool,
28 },
29 ConfigRender, RenderError,
30};
31use crate::socketio_client::SmcpComputerClient;
32
33type ConfirmCallbackType = Arc<dyn Fn(&str, &str, &str, &serde_json::Value) -> bool + Send + Sync>;
35
36fn parse_headers_string(headers: &str) -> HashMap<String, String> {
39 headers
40 .split(',')
41 .filter_map(|pair| {
42 let mut parts = pair.splitn(2, ':');
43 match (parts.next(), parts.next()) {
44 (Some(k), Some(v)) if !k.trim().is_empty() => {
45 Some((k.trim().to_string(), v.trim().to_string()))
46 }
47 _ => None,
48 }
49 })
50 .collect()
51}
52
53fn input_value_to_json(value: InputValue) -> serde_json::Value {
55 match value {
56 InputValue::String(s) => serde_json::Value::String(s),
57 InputValue::Number(n) => serde_json::Value::Number(serde_json::Number::from(n)),
58 InputValue::Float(f) => serde_json::Value::Number(
59 serde_json::Number::from_f64(f).unwrap_or(serde_json::Number::from(0)),
60 ),
61 InputValue::Bool(b) => serde_json::Value::Bool(b),
62 }
63}
64
65fn json_to_input_value(value: serde_json::Value) -> ComputerResult<InputValue> {
67 match value {
68 serde_json::Value::String(s) => Ok(InputValue::String(s)),
69 serde_json::Value::Number(n) => {
70 if let Some(i) = n.as_i64() {
71 Ok(InputValue::Number(i))
72 } else if let Some(u) = n.as_u64() {
73 Ok(InputValue::Number(u as i64))
74 } else if let Some(f) = n.as_f64() {
75 Ok(InputValue::Float(f))
76 } else {
77 Err(ComputerError::ValidationError(
78 "Invalid number value".to_string(),
79 ))
80 }
81 }
82 serde_json::Value::Bool(b) => Ok(InputValue::Bool(b)),
83 serde_json::Value::Null => Err(ComputerError::ValidationError(
84 "Null value not supported".to_string(),
85 )),
86 _ => Err(ComputerError::ValidationError(
87 "Unsupported value type".to_string(),
88 )),
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct ToolCallRecord {
95 pub timestamp: DateTime<Utc>,
97 pub req_id: String,
99 pub server: String,
101 pub tool: String,
103 pub parameters: serde_json::Value,
105 pub timeout: Option<f64>,
107 pub success: bool,
109 pub error: Option<String>,
111}
112
113#[async_trait]
116pub trait Session: Send + Sync {
117 async fn resolve_input(&self, input: &MCPServerInput) -> ComputerResult<serde_json::Value>;
119
120 fn session_id(&self) -> &str;
122}
123
124pub struct SilentSession {
126 id: String,
127}
128
129impl SilentSession {
130 pub fn new(id: impl Into<String>) -> Self {
132 Self { id: id.into() }
133 }
134}
135
136#[async_trait]
137impl Session for SilentSession {
138 async fn resolve_input(&self, input: &MCPServerInput) -> ComputerResult<serde_json::Value> {
139 match input {
141 MCPServerInput::PromptString(input) => Ok(serde_json::Value::String(
142 input.default.clone().unwrap_or_default(),
143 )),
144 MCPServerInput::PickString(input) => Ok(serde_json::Value::String(
145 input
146 .default
147 .clone()
148 .unwrap_or_else(|| input.options.first().cloned().unwrap_or_default()),
149 )),
150 MCPServerInput::Command(input) => {
151 let args: Vec<String> = input
153 .args
154 .as_ref()
155 .map(|m| {
156 let mut sorted_pairs: Vec<_> = m.iter().collect();
157 sorted_pairs.sort_by_key(|(k, _)| *k);
158 sorted_pairs.into_iter().map(|(_, v)| v.clone()).collect()
159 })
160 .unwrap_or_default();
161 match run_command(&input.command, &args).await {
162 Ok(output) => Ok(serde_json::Value::String(output)),
163 Err(e) => Err(ComputerError::RuntimeError(format!(
164 "Failed to execute command '{}': {}",
165 input.command, e
166 ))),
167 }
168 }
169 }
170 }
171
172 fn session_id(&self) -> &str {
173 &self.id
174 }
175}
176
177pub struct Computer<S: Session> {
179 name: String,
181 mcp_manager: Arc<RwLock<Option<MCPServerManager>>>,
183 inputs: Arc<RwLock<HashMap<String, MCPServerInput>>>,
187 mcp_servers: RwLock<HashMap<String, MCPServerConfig>>,
189 input_handler: Arc<RwLock<InputHandler>>,
191 auto_connect: bool,
193 auto_reconnect: bool,
195 tool_history: Arc<Mutex<Vec<ToolCallRecord>>>,
197 session: S,
199 socketio_client: Arc<RwLock<Option<Arc<SmcpComputerClient>>>>,
203 confirm_callback: Option<ConfirmCallbackType>,
205}
206
207impl<S: Session> Computer<S> {
208 pub fn new(
210 name: impl Into<String>,
211 session: S,
212 inputs: Option<HashMap<String, MCPServerInput>>,
213 mcp_servers: Option<HashMap<String, MCPServerConfig>>,
214 auto_connect: bool,
215 auto_reconnect: bool,
216 ) -> Self {
217 let name = name.into();
218 let inputs = inputs.unwrap_or_default();
219 let mcp_servers = mcp_servers.unwrap_or_default();
220
221 Self {
222 name,
223 mcp_manager: Arc::new(RwLock::new(None)),
224 inputs: Arc::new(RwLock::new(inputs)),
225 mcp_servers: RwLock::new(mcp_servers),
226 input_handler: Arc::new(RwLock::new(InputHandler::new())),
227 auto_connect,
228 auto_reconnect,
229 tool_history: Arc::new(Mutex::new(Vec::new())),
230 session,
231 socketio_client: Arc::new(RwLock::new(None)),
232 confirm_callback: None,
233 }
234 }
235
236 pub fn with_confirm_callback<F>(mut self, callback: F) -> Self
238 where
239 F: Fn(&str, &str, &str, &serde_json::Value) -> bool + Send + Sync + 'static,
240 {
241 self.confirm_callback = Some(Arc::new(callback));
242 self
243 }
244
245 pub fn name(&self) -> &str {
247 &self.name
248 }
249
250 pub fn get_socketio_client(&self) -> Arc<RwLock<Option<Arc<SmcpComputerClient>>>> {
254 self.socketio_client.clone()
255 }
256
257 pub async fn boot_up(&self) -> ComputerResult<()> {
259 info!("Starting Computer: {}", self.name);
260
261 let manager = MCPServerManager::new();
263
264 let servers = self.mcp_servers.read().await;
266 let mut validated_servers = Vec::new();
267
268 for (_name, server_config) in servers.iter() {
269 match self.render_server_config(server_config).await {
270 Ok(validated) => validated_servers.push(validated),
271 Err(e) => {
272 error!(
273 "Failed to render server config {}: {}",
274 server_config.name(),
275 e
276 );
277 validated_servers.push(server_config.clone());
279 }
280 }
281 }
282
283 manager.initialize(validated_servers).await?;
285
286 *self.mcp_manager.write().await = Some(manager);
288
289 info!("Computer {} started successfully", self.name);
290 Ok(())
291 }
292
293 async fn render_server_config(
297 &self,
298 config: &MCPServerConfig,
299 ) -> ComputerResult<MCPServerConfig> {
300 let config_json = serde_json::to_value(config)?;
302
303 let renderer = ConfigRender::default();
305
306 let inputs = self.inputs.read().await;
308 let inputs_clone: std::collections::HashMap<String, MCPServerInput> = inputs.clone();
309 drop(inputs); let mut resolved_values: std::collections::HashMap<String, serde_json::Value> =
315 std::collections::HashMap::new();
316 for (input_id, input) in inputs_clone.iter() {
317 match self.session.resolve_input(input).await {
318 Ok(value) => {
319 resolved_values.insert(input_id.clone(), value);
320 }
321 Err(e) => {
322 debug!(
323 "Failed to resolve input '{}': {}, will use default",
324 input_id, e
325 );
326 if let Some(default) = input.default() {
328 resolved_values.insert(input_id.clone(), default);
329 }
330 }
331 }
332 }
333
334 let resolver = |input_id: String| {
336 let values = resolved_values.clone();
337 async move {
338 if let Some(value) = values.get(&input_id) {
339 Ok(value.clone())
340 } else {
341 Err(RenderError::InputNotFound(input_id))
342 }
343 }
344 };
345
346 let rendered_json = renderer.render(config_json, resolver).await?;
348
349 let rendered_config: MCPServerConfig = serde_json::from_value(rendered_json)?;
351
352 Ok(rendered_config)
353 }
354
355 pub async fn add_or_update_server(&self, server: MCPServerConfig) -> ComputerResult<()> {
357 {
359 let mut manager_guard = self.mcp_manager.write().await;
360 if manager_guard.is_none() {
361 *manager_guard = Some(MCPServerManager::new());
362 }
363 }
364
365 let validated = self.render_server_config(&server).await?;
367
368 let manager = self.mcp_manager.read().await;
370 if let Some(ref manager) = *manager {
371 manager.add_or_update_server(validated).await?;
372 }
373
374 {
376 let mut servers = self.mcp_servers.write().await;
377 servers.insert(server.name().to_string(), server);
378 }
379
380 let _ = self.emit_update_config().await;
382
383 Ok(())
384 }
385
386 pub async fn remove_server(&self, server_name: &str) -> ComputerResult<()> {
388 let manager = self.mcp_manager.read().await;
389 if let Some(ref manager) = *manager {
390 manager.remove_server(server_name).await?;
391 }
392
393 {
395 let mut servers = self.mcp_servers.write().await;
396 servers.remove(server_name);
397 }
398
399 let _ = self.emit_update_config().await;
401
402 Ok(())
403 }
404
405 pub async fn update_inputs(
407 &self,
408 inputs: HashMap<String, MCPServerInput>,
409 ) -> ComputerResult<()> {
410 *self.inputs.write().await = inputs;
411
412 {
414 let mut input_handler = self.input_handler.write().await;
415 *input_handler = InputHandler::new();
416 }
417
418 let _ = self.emit_update_config().await;
420
421 Ok(())
422 }
423
424 pub async fn add_or_update_input(&self, input: MCPServerInput) -> ComputerResult<()> {
426 let input_id = input.id().to_string();
427 {
428 let mut inputs = self.inputs.write().await;
429 inputs.insert(input_id.clone(), input);
430 }
431
432 self.clear_input_values(Some(&input_id)).await?;
434
435 let _ = self.emit_update_config().await;
437
438 Ok(())
439 }
440
441 pub async fn remove_input(&self, input_id: &str) -> ComputerResult<bool> {
443 let removed = {
444 let mut inputs = self.inputs.write().await;
445 inputs.remove(input_id).is_some()
446 };
447
448 if removed {
449 self.clear_input_values(Some(input_id)).await?;
451
452 let _ = self.emit_update_config().await;
454 }
455
456 Ok(removed)
457 }
458
459 pub async fn get_input(&self, input_id: &str) -> ComputerResult<Option<MCPServerInput>> {
461 let inputs = self.inputs.read().await;
462 Ok(inputs.get(input_id).cloned())
463 }
464
465 pub async fn list_inputs(&self) -> ComputerResult<Vec<MCPServerInput>> {
467 let inputs = self.inputs.read().await;
468 Ok(inputs.values().cloned().collect())
469 }
470
471 pub async fn get_input_value(
473 &self,
474 input_id: &str,
475 ) -> ComputerResult<Option<serde_json::Value>> {
476 let handler = self.input_handler.read().await;
478 let cached_values = handler.get_all_cached_values().await;
479
480 for (key, value) in cached_values {
482 if key.starts_with(input_id) {
485 let parts: Vec<&str> = key.split(':').collect();
487 if !parts.is_empty() && parts[0] == input_id {
488 return Ok(Some(input_value_to_json(value)));
489 }
490 }
491 }
492
493 Ok(None)
494 }
495
496 pub async fn set_input_value(
498 &self,
499 input_id: &str,
500 value: serde_json::Value,
501 ) -> ComputerResult<bool> {
502 {
504 let inputs = self.inputs.read().await;
505 if !inputs.contains_key(input_id) {
506 return Ok(false);
507 }
508 }
509
510 let handler = self.input_handler.read().await;
512 let input_value = json_to_input_value(value)?;
513 handler
514 .set_cached_value(input_id.to_string(), input_value)
515 .await;
516
517 Ok(true)
518 }
519
520 pub async fn remove_input_value(&self, input_id: &str) -> ComputerResult<bool> {
522 let handler = self.input_handler.read().await;
523 let removed = handler.remove_cached_value(input_id).await.is_some();
524 Ok(removed)
525 }
526
527 pub async fn list_input_values(&self) -> ComputerResult<HashMap<String, serde_json::Value>> {
529 let handler = self.input_handler.read().await;
530 let cached_values = handler.get_all_cached_values().await;
531
532 let mut result = HashMap::new();
533 for (key, value) in cached_values {
534 let parts: Vec<&str> = key.split(':').collect();
537 if !parts.is_empty() {
538 result.insert(parts[0].to_string(), input_value_to_json(value));
539 }
540 }
541
542 Ok(result)
543 }
544
545 pub async fn clear_input_values(&self, input_id: Option<&str>) -> ComputerResult<()> {
547 let handler = self.input_handler.read().await;
548
549 if let Some(id) = input_id {
550 let cached_values = handler.get_all_cached_values().await;
552 let keys_to_remove: Vec<String> = cached_values
553 .keys()
554 .filter(|key| key.starts_with(id))
555 .cloned()
556 .collect();
557
558 for key in keys_to_remove {
559 handler.remove_cached_value(&key).await;
560 }
561 } else {
562 handler.clear_all_cache().await;
564 }
565
566 Ok(())
567 }
568
569 pub async fn get_available_tools(&self) -> ComputerResult<Vec<Tool>> {
571 let manager = self.mcp_manager.read().await;
572 if let Some(ref manager) = *manager {
573 let tools: Vec<Tool> = manager.list_available_tools().await;
574 Ok(tools)
578 } else {
579 Err(ComputerError::InvalidState(
580 "Computer not initialized".to_string(),
581 ))
582 }
583 }
584
585 pub async fn list_all_windows(
587 &self,
588 window_uri: Option<&str>,
589 ) -> ComputerResult<Vec<(String, Resource)>> {
590 let manager = self.mcp_manager.read().await;
591 if let Some(ref manager) = *manager {
592 Ok(manager.list_all_windows(window_uri).await)
593 } else {
594 Err(ComputerError::InvalidState(
595 "Computer not initialized".to_string(),
596 ))
597 }
598 }
599
600 pub async fn get_windows_details(
602 &self,
603 window_uri: Option<&str>,
604 ) -> ComputerResult<Vec<(String, Resource, ReadResourceResult)>> {
605 let manager = self.mcp_manager.read().await;
606 if let Some(ref manager) = *manager {
607 Ok(manager.get_windows_details(window_uri).await)
608 } else {
609 Err(ComputerError::InvalidState(
610 "Computer not initialized".to_string(),
611 ))
612 }
613 }
614
615 pub async fn get_window_detail(
617 &self,
618 server_name: &str,
619 resource: Resource,
620 ) -> ComputerResult<ReadResourceResult> {
621 let manager = self.mcp_manager.read().await;
622 if let Some(ref manager) = *manager {
623 manager.get_window_detail(server_name, resource).await
624 } else {
625 Err(ComputerError::InvalidState(
626 "Computer not initialized".to_string(),
627 ))
628 }
629 }
630
631 pub async fn execute_tool(
633 &self,
634 req_id: &str,
635 tool_name: &str,
636 parameters: serde_json::Value,
637 timeout: Option<f64>,
638 ) -> ComputerResult<CallToolResult> {
639 let manager = self.mcp_manager.read().await;
640 if let Some(ref manager) = *manager {
641 let (server_name, tool_name) =
643 manager.validate_tool_call(tool_name, ¶meters).await?;
644 let server_name = server_name.to_string();
645 let tool_name = tool_name.to_string();
646
647 let timestamp = Utc::now();
648 let mut success = false;
649 let mut error_msg = None;
650 let result: CallToolResult;
651
652 let need_confirm = true; let parameters_for_call = parameters.clone();
658
659 if need_confirm {
660 if let Some(ref callback) = self.confirm_callback {
661 let confirmed = callback(req_id, &server_name, &tool_name, ¶meters);
662 if confirmed {
663 let timeout_duration = timeout.map(std::time::Duration::from_secs_f64);
664 result = manager
665 .call_tool(
666 &server_name,
667 &tool_name,
668 parameters_for_call,
669 timeout_duration,
670 )
671 .await?;
672 success = !is_call_tool_error(&result);
673 } else {
674 result = CallToolResult::success(vec![Content::text(
675 "工具调用二次确认被拒绝,请稍后再试",
676 )]);
677 }
678 } else {
679 result = CallToolResult::error(vec![Content::text(
680 "当前工具需要调用前进行二次确认,但客户端目前没有实现二次确认回调方法",
681 )]);
682 error_msg = Some("No confirmation callback".to_string());
683 }
684 } else {
685 let timeout_duration = timeout.map(std::time::Duration::from_secs_f64);
686 result = manager
687 .call_tool(
688 &server_name,
689 &tool_name,
690 parameters_for_call,
691 timeout_duration,
692 )
693 .await?;
694 success = !is_call_tool_error(&result);
695 }
696
697 if is_call_tool_error(&result) {
698 error_msg = result
699 .content
700 .iter()
701 .find_map(|c| content_as_text(c).map(|t| t.to_string()));
702 }
703
704 let record = ToolCallRecord {
706 timestamp,
707 req_id: req_id.to_string(),
708 server: server_name,
709 tool: tool_name,
710 parameters,
711 timeout,
712 success,
713 error: error_msg,
714 };
715
716 {
717 let mut history = self.tool_history.lock().await;
718 history.push(record);
719 if history.len() > 10 {
721 history.remove(0);
722 }
723 }
724
725 Ok(result)
726 } else {
727 Err(ComputerError::InvalidState(
728 "Computer not initialized".to_string(),
729 ))
730 }
731 }
732
733 pub async fn get_tool_history(&self) -> ComputerResult<Vec<ToolCallRecord>> {
735 let history = self.tool_history.lock().await;
736 Ok(history.clone())
737 }
738
739 pub async fn get_server_status(&self) -> Vec<(String, bool, String)> {
741 let manager_guard = self.mcp_manager.read().await;
742 if let Some(ref manager) = *manager_guard {
743 manager.get_server_status().await
744 } else {
745 Vec::new()
746 }
747 }
748
749 pub async fn list_mcp_servers(&self) -> Vec<MCPServerConfig> {
751 let servers = self.mcp_servers.read().await;
752 servers.values().cloned().collect()
753 }
754
755 pub async fn start_mcp_client(&self, server_name: &str) -> ComputerResult<()> {
757 let manager_guard = self.mcp_manager.read().await;
758 if let Some(ref manager) = *manager_guard {
759 if server_name == "all" {
760 manager.start_all().await
761 } else {
762 manager.start_client(server_name).await
763 }
764 } else {
765 Err(ComputerError::InvalidState(
766 "MCP Manager not initialized".to_string(),
767 ))
768 }
769 }
770
771 pub async fn stop_mcp_client(&self, server_name: &str) -> ComputerResult<()> {
773 let manager_guard = self.mcp_manager.read().await;
774 if let Some(ref manager) = *manager_guard {
775 if server_name == "all" {
776 manager.stop_all().await
777 } else {
778 manager.stop_client(server_name).await
779 }
780 } else {
781 Err(ComputerError::InvalidState(
782 "MCP Manager not initialized".to_string(),
783 ))
784 }
785 }
786
787 pub async fn is_mcp_manager_initialized(&self) -> bool {
789 let manager_guard = self.mcp_manager.read().await;
790 manager_guard.is_some()
791 }
792
793 pub async fn set_socketio_client(&self, client: Arc<SmcpComputerClient>) {
797 let mut socketio_ref = self.socketio_client.write().await;
798 *socketio_ref = Some(client);
801 }
802
803 pub async fn connect_socketio(
805 &self,
806 url: &str,
807 #[allow(unused_variables)] namespace: &str,
808 auth: &Option<String>,
809 headers: &Option<String>,
810 ) -> ComputerResult<()> {
811 let _manager_check = {
813 let manager_guard = self.mcp_manager.read().await;
814 match manager_guard.as_ref() {
815 Some(_m) => {
816 true
819 }
820 None => {
821 return Err(ComputerError::InvalidState(
822 "MCP Manager not initialized. Please add and start servers first."
823 .to_string(),
824 ));
825 }
826 }
827 };
828
829 let new_manager = MCPServerManager::new();
834
835 let parsed_headers = headers.as_deref().map(parse_headers_string);
837
838 let client = SmcpComputerClient::new(
841 url,
842 Arc::new(RwLock::new(Some(new_manager))),
843 self.name.clone(),
844 auth.clone(),
845 self.inputs.clone(),
846 parsed_headers,
847 )
848 .await?;
849
850 let client_arc = Arc::new(client);
852 self.set_socketio_client(client_arc.clone()).await;
853
854 info!(
855 "Connected to SMCP server at {} with computer name: {}",
856 url, self.name
857 );
858
859 Ok(())
860 }
861
862 pub async fn disconnect_socketio(&self) -> ComputerResult<()> {
864 let mut socketio_ref = self.socketio_client.write().await;
865 *socketio_ref = None;
866 info!("Disconnected from server");
867 Ok(())
868 }
869
870 pub async fn join_office(&self, office_id: &str, _computer_name: &str) -> ComputerResult<()> {
872 let socketio_ref = self.socketio_client.read().await;
873 if let Some(ref client) = *socketio_ref {
874 client.join_office(office_id).await?;
877 return Ok(());
878 }
879 Err(ComputerError::InvalidState(
880 "Socket.IO client not connected".to_string(),
881 ))
882 }
883
884 pub async fn leave_office(&self) -> ComputerResult<()> {
886 let socketio_ref = self.socketio_client.read().await;
887 if let Some(ref client) = *socketio_ref {
888 let current_office_id = client.get_current_office_id().await?;
891 client.leave_office(¤t_office_id).await?;
892 return Ok(());
893 }
894 Err(ComputerError::InvalidState(
895 "Socket.IO client not connected".to_string(),
896 ))
897 }
898
899 pub async fn emit_update_config(&self) -> ComputerResult<()> {
901 let socketio_ref = self.socketio_client.read().await;
902 if let Some(ref client) = *socketio_ref {
903 client.emit_update_config().await?;
906 return Ok(());
907 }
908 Err(ComputerError::InvalidState(
909 "Socket.IO client not connected".to_string(),
910 ))
911 }
912
913 pub async fn shutdown(&self) -> ComputerResult<()> {
915 info!("Shutting down Computer: {}", self.name);
916
917 let mut manager_guard = self.mcp_manager.write().await;
918 if let Some(manager) = manager_guard.take() {
919 manager.stop_all().await?;
920 }
921
922 {
924 let mut socketio_ref = self.socketio_client.write().await;
925 *socketio_ref = None;
926 }
927
928 info!("Computer {} shutdown successfully", self.name);
929 Ok(())
930 }
931}
932
933impl<S: Session + Clone> Clone for Computer<S> {
935 fn clone(&self) -> Self {
936 Self {
937 name: self.name.clone(),
938 mcp_manager: Arc::clone(&self.mcp_manager),
939 inputs: Arc::new(RwLock::new(HashMap::new())), mcp_servers: RwLock::new(HashMap::new()),
941 input_handler: Arc::clone(&self.input_handler),
942 auto_connect: self.auto_connect,
943 auto_reconnect: self.auto_reconnect,
944 tool_history: Arc::clone(&self.tool_history),
945 session: self.session.clone(),
946 socketio_client: Arc::clone(&self.socketio_client),
947 confirm_callback: self.confirm_callback.clone(),
948 }
949 }
950}
951
952#[async_trait]
954pub trait ManagerChangeHandler: Send + Sync {
955 async fn on_change(&self, message: ManagerChangeMessage) -> ComputerResult<()>;
957}
958
959#[derive(Debug, Clone)]
961pub enum ManagerChangeMessage {
962 ToolListChanged,
964 ResourceListChanged { windows: Vec<String> },
966 ResourceUpdated { uri: String },
968}
969
970#[async_trait]
971impl<S: Session> ManagerChangeHandler for Computer<S> {
972 async fn on_change(&self, message: ManagerChangeMessage) -> ComputerResult<()> {
973 match message {
974 ManagerChangeMessage::ToolListChanged => {
975 debug!("Tool list changed, notifying Socket.IO client");
976 let socketio_ref = self.socketio_client.read().await;
977 if let Some(ref client) = *socketio_ref {
978 client.emit_update_tool_list().await?;
981 }
982 }
983 ManagerChangeMessage::ResourceListChanged { windows: _ } => {
984 debug!("Resource list changed, checking for window updates");
985 }
987 ManagerChangeMessage::ResourceUpdated { uri } => {
988 debug!("Resource updated: {}", uri);
989 }
991 }
992 Ok(())
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use super::*;
999 use crate::mcp_clients::model::{
1000 CommandInput, MCPServerConfig, MCPServerInput, PickStringInput, PromptStringInput,
1001 StdioServerConfig, StdioServerParameters,
1002 };
1003
1004 #[tokio::test]
1005 async fn test_computer_creation() {
1006 let session = SilentSession::new("test");
1007 let computer = Computer::new("test_computer", session, None, None, true, true);
1008
1009 assert_eq!(computer.name, "test_computer");
1010 assert!(computer.auto_connect);
1011 assert!(computer.auto_reconnect);
1012 }
1013
1014 #[tokio::test]
1015 async fn test_computer_with_initial_inputs_and_servers() {
1016 let session = SilentSession::new("test");
1017 let mut inputs = HashMap::new();
1018 inputs.insert(
1019 "input1".to_string(),
1020 MCPServerInput::PromptString(PromptStringInput {
1021 id: "input1".to_string(),
1022 description: "Test input".to_string(),
1023 default: Some("default".to_string()),
1024 password: Some(false),
1025 }),
1026 );
1027
1028 let mut servers = HashMap::new();
1029 servers.insert(
1030 "server1".to_string(),
1031 MCPServerConfig::Stdio(StdioServerConfig {
1032 name: "server1".to_string(),
1033 disabled: false,
1034 forbidden_tools: vec![],
1035 tool_meta: std::collections::HashMap::new(),
1036 default_tool_meta: None,
1037 vrl: None,
1038 server_parameters: StdioServerParameters {
1039 command: "echo".to_string(),
1040 args: vec![],
1041 env: std::collections::HashMap::new(),
1042 cwd: None,
1043 },
1044 }),
1045 );
1046
1047 let computer = Computer::new(
1048 "test_computer",
1049 session,
1050 Some(inputs),
1051 Some(servers),
1052 false,
1053 false,
1054 );
1055
1056 let inputs = computer.list_inputs().await.unwrap();
1058 assert_eq!(inputs.len(), 1);
1059 match &inputs[0] {
1060 MCPServerInput::PromptString(input) => {
1061 assert_eq!(input.id, "input1");
1062 assert_eq!(input.description, "Test input");
1063 }
1064 _ => panic!("Expected PromptString input"),
1065 }
1066 }
1067
1068 #[tokio::test]
1069 async fn test_input_management() {
1070 let session = SilentSession::new("test");
1071 let computer = Computer::new("test_computer", session, None, None, true, true);
1072
1073 let input = MCPServerInput::PromptString(PromptStringInput {
1075 id: "test_input".to_string(),
1076 description: "Test input".to_string(),
1077 default: Some("default".to_string()),
1078 password: Some(false),
1079 });
1080
1081 computer.add_or_update_input(input.clone()).await.unwrap();
1082
1083 let retrieved = computer.get_input("test_input").await.unwrap();
1085 assert!(retrieved.is_some());
1086
1087 let inputs = computer.list_inputs().await.unwrap();
1089 assert_eq!(inputs.len(), 1);
1090
1091 let updated_input = MCPServerInput::PromptString(PromptStringInput {
1093 id: "test_input".to_string(),
1094 description: "Updated description".to_string(),
1095 default: Some("new_default".to_string()),
1096 password: Some(true),
1097 });
1098 computer.add_or_update_input(updated_input).await.unwrap();
1099
1100 let retrieved = computer.get_input("test_input").await.unwrap().unwrap();
1101 match retrieved {
1102 MCPServerInput::PromptString(input) => {
1103 assert_eq!(input.description, "Updated description");
1104 assert_eq!(input.default, Some("new_default".to_string()));
1105 assert_eq!(input.password, Some(true));
1106 }
1107 _ => panic!("Expected PromptString input"),
1108 }
1109
1110 let removed = computer.remove_input("test_input").await.unwrap();
1112 assert!(removed);
1113
1114 let retrieved = computer.get_input("test_input").await.unwrap();
1115 assert!(retrieved.is_none());
1116
1117 let removed = computer.remove_input("non_existent").await.unwrap();
1119 assert!(!removed);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_multiple_input_types() {
1124 let session = SilentSession::new("test");
1125 let computer = Computer::new("test_computer", session, None, None, true, true);
1126
1127 let prompt_input = MCPServerInput::PromptString(PromptStringInput {
1129 id: "prompt".to_string(),
1130 description: "Prompt input".to_string(),
1131 default: None,
1132 password: Some(false),
1133 });
1134
1135 let pick_input = MCPServerInput::PickString(PickStringInput {
1136 id: "pick".to_string(),
1137 description: "Pick input".to_string(),
1138 options: vec!["option1".to_string(), "option2".to_string()],
1139 default: Some("option1".to_string()),
1140 });
1141
1142 let command_input = MCPServerInput::Command(CommandInput {
1143 id: "command".to_string(),
1144 description: "Command input".to_string(),
1145 command: "ls".to_string(),
1146 args: None,
1147 });
1148
1149 computer.add_or_update_input(prompt_input).await.unwrap();
1150 computer.add_or_update_input(pick_input).await.unwrap();
1151 computer.add_or_update_input(command_input).await.unwrap();
1152
1153 let inputs = computer.list_inputs().await.unwrap();
1154 assert_eq!(inputs.len(), 3);
1155
1156 let input_types: std::collections::HashSet<_> = inputs
1158 .iter()
1159 .map(|input| match input {
1160 MCPServerInput::PromptString(_) => "prompt",
1161 MCPServerInput::PickString(_) => "pick",
1162 MCPServerInput::Command(_) => "command",
1163 })
1164 .collect();
1165
1166 assert!(input_types.contains("prompt"));
1167 assert!(input_types.contains("pick"));
1168 assert!(input_types.contains("command"));
1169 }
1170
1171 #[tokio::test]
1172 async fn test_server_management() {
1173 let session = SilentSession::new("test");
1174 let computer = Computer::new("test_computer", session, None, None, true, true);
1175
1176 let server_config = MCPServerConfig::Stdio(StdioServerConfig {
1178 name: "test_server".to_string(),
1179 disabled: false,
1180 forbidden_tools: vec![],
1181 tool_meta: std::collections::HashMap::new(),
1182 default_tool_meta: None,
1183 vrl: None,
1184 server_parameters: StdioServerParameters {
1185 command: "echo".to_string(),
1186 args: vec!["hello".to_string()],
1187 env: std::collections::HashMap::new(),
1188 cwd: None,
1189 },
1190 });
1191
1192 computer
1193 .add_or_update_server(server_config.clone())
1194 .await
1195 .unwrap();
1196
1197 let updated_config = MCPServerConfig::Stdio(StdioServerConfig {
1200 name: "test_server".to_string(),
1201 disabled: true, forbidden_tools: vec!["tool1".to_string()],
1203 tool_meta: std::collections::HashMap::new(),
1204 default_tool_meta: None,
1205 vrl: None,
1206 server_parameters: StdioServerParameters {
1207 command: "echo".to_string(),
1208 args: vec!["updated".to_string()],
1209 env: std::collections::HashMap::new(),
1210 cwd: None,
1211 },
1212 });
1213
1214 computer.add_or_update_server(updated_config).await.unwrap();
1215
1216 computer.remove_server("test_server").await.unwrap();
1218 }
1219
1220 #[tokio::test]
1221 async fn test_session_trait() {
1222 let session = SilentSession::new("test_session");
1224 assert_eq!(session.session_id(), "test_session");
1225
1226 let prompt_input = MCPServerInput::PromptString(PromptStringInput {
1228 id: "test".to_string(),
1229 description: "Test".to_string(),
1230 default: Some("default_value".to_string()),
1231 password: Some(false),
1232 });
1233
1234 let result = session.resolve_input(&prompt_input).await.unwrap();
1235 assert_eq!(
1236 result,
1237 serde_json::Value::String("default_value".to_string())
1238 );
1239
1240 let no_default_input = MCPServerInput::PromptString(PromptStringInput {
1242 id: "test2".to_string(),
1243 description: "Test2".to_string(),
1244 default: None,
1245 password: Some(false),
1246 });
1247
1248 let result = session.resolve_input(&no_default_input).await.unwrap();
1249 assert_eq!(result, serde_json::Value::String("".to_string()));
1250
1251 let pick_input = MCPServerInput::PickString(PickStringInput {
1253 id: "pick".to_string(),
1254 description: "Pick".to_string(),
1255 options: vec!["opt1".to_string(), "opt2".to_string()],
1256 default: Some("opt2".to_string()),
1257 });
1258
1259 let result = session.resolve_input(&pick_input).await.unwrap();
1260 assert_eq!(result, serde_json::Value::String("opt2".to_string()));
1261
1262 let command_input = MCPServerInput::Command(CommandInput {
1264 id: "cmd".to_string(),
1265 description: "Command".to_string(),
1266 command: "echo hello world".to_string(),
1267 args: None,
1268 });
1269
1270 let result = session.resolve_input(&command_input).await.unwrap();
1271 assert_eq!(result, serde_json::Value::String("hello world".to_string()));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_cache_operations() {
1276 let session = SilentSession::new("test");
1277 let computer = Computer::new("test_computer", session, None, None, true, true);
1278
1279 let input = MCPServerInput::PromptString(PromptStringInput {
1281 id: "test_input".to_string(),
1282 description: "Test input".to_string(),
1283 default: Some("default".to_string()),
1284 password: Some(false),
1285 });
1286 computer.add_or_update_input(input).await.unwrap();
1287
1288 let test_value = serde_json::Value::String("cached_value".to_string());
1290 let set_result = computer
1291 .set_input_value("test_input", test_value.clone())
1292 .await
1293 .unwrap();
1294 assert!(set_result);
1295
1296 let retrieved = computer.get_input_value("test_input").await.unwrap();
1297 assert_eq!(retrieved, Some(test_value));
1298
1299 let invalid_result = computer
1301 .set_input_value(
1302 "nonexistent",
1303 serde_json::Value::String("value".to_string()),
1304 )
1305 .await
1306 .unwrap();
1307 assert!(!invalid_result);
1308
1309 let not_found = computer.get_input_value("nonexistent").await.unwrap();
1311 assert!(not_found.is_none());
1312 }
1313
1314 #[tokio::test]
1315 async fn test_cache_remove_and_clear() {
1316 let session = SilentSession::new("test");
1317 let computer = Computer::new("test_computer", session, None, None, true, true);
1318
1319 let input1 = MCPServerInput::PromptString(PromptStringInput {
1321 id: "input1".to_string(),
1322 description: "Input 1".to_string(),
1323 default: None,
1324 password: Some(false),
1325 });
1326 let input2 = MCPServerInput::PromptString(PromptStringInput {
1327 id: "input2".to_string(),
1328 description: "Input 2".to_string(),
1329 default: None,
1330 password: Some(false),
1331 });
1332 computer.add_or_update_input(input1).await.unwrap();
1333 computer.add_or_update_input(input2).await.unwrap();
1334
1335 computer
1337 .set_input_value("input1", serde_json::Value::String("value1".to_string()))
1338 .await
1339 .unwrap();
1340 computer
1341 .set_input_value("input2", serde_json::Value::String("value2".to_string()))
1342 .await
1343 .unwrap();
1344
1345 let removed = computer.remove_input_value("input1").await.unwrap();
1347 assert!(removed);
1348
1349 let retrieved = computer.get_input_value("input1").await.unwrap();
1350 assert!(retrieved.is_none());
1351
1352 let still_exists = computer.get_input_value("input2").await.unwrap();
1353 assert!(still_exists.is_some());
1354
1355 computer.clear_input_values(None).await.unwrap();
1357 let cleared1 = computer.get_input_value("input1").await.unwrap();
1358 let cleared2 = computer.get_input_value("input2").await.unwrap();
1359 assert!(cleared1.is_none());
1360 assert!(cleared2.is_none());
1361 }
1362
1363 #[tokio::test]
1364 async fn test_cache_list_values() {
1365 let session = SilentSession::new("test");
1366 let computer = Computer::new("test_computer", session, None, None, true, true);
1367
1368 let input1 = MCPServerInput::PromptString(PromptStringInput {
1370 id: "input1".to_string(),
1371 description: "Input 1".to_string(),
1372 default: None,
1373 password: Some(false),
1374 });
1375 let input2 = MCPServerInput::PromptString(PromptStringInput {
1376 id: "input2".to_string(),
1377 description: "Input 2".to_string(),
1378 default: None,
1379 password: Some(false),
1380 });
1381 computer.add_or_update_input(input1).await.unwrap();
1382 computer.add_or_update_input(input2).await.unwrap();
1383
1384 computer
1386 .set_input_value(
1387 "input1",
1388 serde_json::Value::String("string_value".to_string()),
1389 )
1390 .await
1391 .unwrap();
1392 computer
1393 .set_input_value(
1394 "input2",
1395 serde_json::Value::Number(serde_json::Number::from(42)),
1396 )
1397 .await
1398 .unwrap();
1399
1400 let values = computer.list_input_values().await.unwrap();
1402 assert_eq!(values.len(), 2);
1403 assert_eq!(
1404 values.get("input1"),
1405 Some(&serde_json::Value::String("string_value".to_string()))
1406 );
1407 assert_eq!(
1408 values.get("input2"),
1409 Some(&serde_json::Value::Number(serde_json::Number::from(42)))
1410 );
1411 }
1412
1413 #[tokio::test]
1414 async fn test_cache_clear_on_input_update() {
1415 let session = SilentSession::new("test");
1416 let computer = Computer::new("test_computer", session, None, None, true, true);
1417
1418 let input = MCPServerInput::PromptString(PromptStringInput {
1420 id: "test_input".to_string(),
1421 description: "Test input".to_string(),
1422 default: None,
1423 password: Some(false),
1424 });
1425 computer.add_or_update_input(input).await.unwrap();
1426
1427 computer
1429 .set_input_value(
1430 "test_input",
1431 serde_json::Value::String("cached".to_string()),
1432 )
1433 .await
1434 .unwrap();
1435 assert!(computer
1436 .get_input_value("test_input")
1437 .await
1438 .unwrap()
1439 .is_some());
1440
1441 let updated_input = MCPServerInput::PromptString(PromptStringInput {
1443 id: "test_input".to_string(),
1444 description: "Updated input".to_string(),
1445 default: Some("new_default".to_string()),
1446 password: Some(true),
1447 });
1448 computer.add_or_update_input(updated_input).await.unwrap();
1449
1450 assert!(computer
1452 .get_input_value("test_input")
1453 .await
1454 .unwrap()
1455 .is_none());
1456 }
1457
1458 #[tokio::test]
1459 async fn test_cache_clear_on_input_remove() {
1460 let session = SilentSession::new("test");
1461 let computer = Computer::new("test_computer", session, None, None, true, true);
1462
1463 let input = MCPServerInput::PromptString(PromptStringInput {
1465 id: "test_input".to_string(),
1466 description: "Test input".to_string(),
1467 default: None,
1468 password: Some(false),
1469 });
1470 computer.add_or_update_input(input).await.unwrap();
1471
1472 computer
1474 .set_input_value(
1475 "test_input",
1476 serde_json::Value::String("cached".to_string()),
1477 )
1478 .await
1479 .unwrap();
1480 assert!(computer
1481 .get_input_value("test_input")
1482 .await
1483 .unwrap()
1484 .is_some());
1485
1486 let removed = computer.remove_input("test_input").await.unwrap();
1488 assert!(removed);
1489
1490 assert!(computer
1492 .get_input_value("test_input")
1493 .await
1494 .unwrap()
1495 .is_none());
1496 }
1497
1498 #[tokio::test]
1499 async fn test_tool_call_history() {
1500 let session = SilentSession::new("test");
1501 let computer = Computer::new("test_computer", session, None, None, true, true);
1502
1503 let history = computer.get_tool_history().await.unwrap();
1505 assert!(history.is_empty());
1506
1507 }
1510
1511 #[tokio::test]
1512 async fn test_confirmation_callback() {
1513 let session = SilentSession::new("test");
1514 let computer = Computer::new("test_computer", session, None, None, true, true);
1515
1516 let callback_called = Arc::new(Mutex::new(false));
1518 let callback_called_clone = callback_called.clone();
1519
1520 let _computer = computer.with_confirm_callback(move |_req_id, _server, _tool, _params| {
1521 let rt = tokio::runtime::Handle::current();
1524 rt.block_on(async {
1525 let mut called = callback_called_clone.lock().await;
1526 *called = true;
1527 });
1528 true });
1530
1531 }
1534
1535 #[tokio::test]
1536 async fn test_computer_shutdown() {
1537 let session = SilentSession::new("test");
1538 let computer = Computer::new("test_computer", session, None, None, true, true);
1539
1540 computer.shutdown().await.unwrap();
1542
1543 computer.boot_up().await.unwrap();
1545 computer.shutdown().await.unwrap();
1546 }
1547
1548 #[tokio::test]
1549 async fn test_config_render() {
1550 let session = SilentSession::new("test");
1551
1552 let mut inputs = HashMap::new();
1554 inputs.insert(
1555 "api_key".to_string(),
1556 MCPServerInput::PromptString(PromptStringInput {
1557 id: "api_key".to_string(),
1558 description: "API Key".to_string(),
1559 default: Some("test-api-key-12345".to_string()),
1560 password: Some(true),
1561 }),
1562 );
1563 inputs.insert(
1564 "server_url".to_string(),
1565 MCPServerInput::PromptString(PromptStringInput {
1566 id: "server_url".to_string(),
1567 description: "Server URL".to_string(),
1568 default: Some("https://api.example.com".to_string()),
1569 password: Some(false),
1570 }),
1571 );
1572
1573 let computer = Computer::new("test_computer", session, Some(inputs), None, true, true);
1574
1575 let server_config = MCPServerConfig::Stdio(StdioServerConfig {
1577 name: "test_server".to_string(),
1578 disabled: false,
1579 forbidden_tools: vec![],
1580 tool_meta: std::collections::HashMap::new(),
1581 default_tool_meta: None,
1582 vrl: None,
1583 server_parameters: StdioServerParameters {
1584 command: "echo".to_string(),
1585 args: vec!["${input:api_key}".to_string()],
1586 env: {
1587 let mut env = std::collections::HashMap::new();
1588 env.insert("API_URL".to_string(), "${input:server_url}".to_string());
1589 env
1590 },
1591 cwd: None,
1592 },
1593 });
1594
1595 let rendered = computer.render_server_config(&server_config).await.unwrap();
1597
1598 match rendered {
1600 MCPServerConfig::Stdio(config) => {
1601 assert_eq!(config.server_parameters.args[0], "test-api-key-12345");
1602 assert_eq!(
1603 config.server_parameters.env.get("API_URL"),
1604 Some(&"https://api.example.com".to_string())
1605 );
1606 }
1607 _ => panic!("Expected Stdio config"),
1608 }
1609 }
1610
1611 #[tokio::test]
1612 async fn test_config_render_missing_input() {
1613 let session = SilentSession::new("test");
1614 let computer = Computer::new("test_computer", session, None, None, true, true);
1615
1616 let server_config = MCPServerConfig::Stdio(StdioServerConfig {
1618 name: "test_server".to_string(),
1619 disabled: false,
1620 forbidden_tools: vec![],
1621 tool_meta: std::collections::HashMap::new(),
1622 default_tool_meta: None,
1623 vrl: None,
1624 server_parameters: StdioServerParameters {
1625 command: "echo".to_string(),
1626 args: vec!["${input:missing_input}".to_string()],
1627 env: std::collections::HashMap::new(),
1628 cwd: None,
1629 },
1630 });
1631
1632 let rendered = computer.render_server_config(&server_config).await.unwrap();
1634
1635 match rendered {
1636 MCPServerConfig::Stdio(config) => {
1637 assert_eq!(config.server_parameters.args[0], "${input:missing_input}");
1639 }
1640 _ => panic!("Expected Stdio config"),
1641 }
1642 }
1643
1644 #[test]
1645 fn test_parse_headers_string_normal() {
1646 let result = parse_headers_string("x-tenant-id:abc123,x-custom:value");
1647 assert_eq!(result.len(), 2);
1648 assert_eq!(result["x-tenant-id"], "abc123");
1649 assert_eq!(result["x-custom"], "value");
1650 }
1651
1652 #[test]
1653 fn test_parse_headers_string_with_spaces() {
1654 let result = parse_headers_string(" x-tenant-id : abc123 , x-custom : value ");
1655 assert_eq!(result.len(), 2);
1656 assert_eq!(result["x-tenant-id"], "abc123");
1657 assert_eq!(result["x-custom"], "value");
1658 }
1659
1660 #[test]
1661 fn test_parse_headers_string_empty() {
1662 let result = parse_headers_string("");
1663 assert!(result.is_empty());
1664 }
1665
1666 #[test]
1667 fn test_parse_headers_string_missing_value() {
1668 let result = parse_headers_string("key-only,x-valid:ok");
1669 assert_eq!(result.len(), 1);
1670 assert_eq!(result["x-valid"], "ok");
1671 }
1672
1673 #[test]
1674 fn test_parse_headers_string_value_with_colon() {
1675 let result = parse_headers_string("Authorization:Bearer:token123");
1676 assert_eq!(result.len(), 1);
1677 assert_eq!(result["Authorization"], "Bearer:token123");
1678 }
1679}