1use std::collections::HashMap;
6use std::path::Path;
7use std::process::Stdio;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11
12use dashmap::DashMap;
13use serde::{Deserialize, Serialize};
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::{Child, ChildStdin, ChildStdout, Command};
16use tracing::{Span, instrument};
17
18use super::registry::{ToolResult, ToolSchema};
19
20const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(180);
23
24const DEFAULT_INIT_TIMEOUT: Duration = Duration::from_secs(60);
26
27pub enum McpConnection {
29 Stdio {
31 #[allow(dead_code)]
33 child: Child,
34 stdin: ChildStdin,
36 stdout: BufReader<ChildStdout>,
38 },
39}
40
41#[allow(missing_debug_implementations)]
43pub struct ExternalMcpServer {
44 pub name: String,
46 connection: McpConnection,
48 tools: Vec<ToolSchema>,
50 initialized: bool,
52 request_id: AtomicU64,
54 total_requests: AtomicU64,
56 total_request_time_ms: AtomicU64,
58 connected_at: Option<Instant>,
60 initialized_at: Option<Instant>,
62}
63
64#[derive(Debug, Serialize)]
66struct JsonRpcRequest {
67 jsonrpc: &'static str,
68 id: u64,
69 method: String,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 params: Option<serde_json::Value>,
72}
73
74impl JsonRpcRequest {
75 fn new(id: u64, method: impl Into<String>, params: Option<serde_json::Value>) -> Self {
76 Self {
77 jsonrpc: "2.0",
78 id,
79 method: method.into(),
80 params,
81 }
82 }
83}
84
85#[derive(Debug, Deserialize)]
87struct JsonRpcResponse {
88 #[allow(dead_code)]
89 jsonrpc: String,
90 #[allow(dead_code)]
91 id: u64,
92 result: Option<serde_json::Value>,
93 error: Option<JsonRpcError>,
94}
95
96#[derive(Debug, Deserialize)]
98struct JsonRpcError {
99 code: i64,
100 message: String,
101}
102
103impl ExternalMcpServer {
104 #[instrument(
109 name = "mcp_connect_stdio",
110 skip(env, cwd),
111 fields(
112 server_name = %name,
113 command = %command,
114 args_count = args.len(),
115 has_env = env.is_some(),
116 has_cwd = cwd.is_some(),
117 )
118 )]
119 pub async fn connect_stdio(
120 name: String,
121 command: &str,
122 args: &[String],
123 env: Option<&HashMap<String, String>>,
124 cwd: Option<&Path>,
125 ) -> Result<Self, ExternalMcpError> {
126 let start_time = Instant::now();
127
128 tracing::info!(
129 server_name = %name,
130 command = %command,
131 args = ?args,
132 cwd = ?cwd,
133 "Starting external MCP server process"
134 );
135
136 let mut cmd = Command::new(command);
137 cmd.args(args)
138 .stdin(Stdio::piped())
139 .stdout(Stdio::piped())
140 .stderr(Stdio::null());
141
142 if let Some(env) = env {
143 tracing::debug!(
144 server_name = %name,
145 env_vars = ?env.keys().collect::<Vec<_>>(),
146 "Setting environment variables for MCP server"
147 );
148 cmd.envs(env);
149 }
150
151 if let Some(cwd) = cwd {
152 cmd.current_dir(cwd);
153 }
154
155 let mut child = cmd.spawn().map_err(|e| {
156 tracing::error!(
157 server_name = %name,
158 command = %command,
159 cwd = ?cwd,
160 error = %e,
161 error_type = %std::any::type_name::<std::io::Error>(),
162 error_kind = ?e.kind(),
163 "Failed to spawn MCP server process"
164 );
165 ExternalMcpError::SpawnFailed {
166 command: command.to_string(),
167 error: e.to_string(),
168 }
169 })?;
170
171 let pid = child.id();
172 tracing::debug!(
173 server_name = %name,
174 pid = ?pid,
175 "MCP server process spawned"
176 );
177
178 let stdin = child.stdin.take().ok_or(ExternalMcpError::NoStdin)?;
179 let stdout = child
180 .stdout
181 .take()
182 .ok_or(ExternalMcpError::NoStdout)
183 .map(BufReader::new)?;
184
185 let connection = McpConnection::Stdio {
186 child,
187 stdin,
188 stdout,
189 };
190
191 let elapsed = start_time.elapsed();
192 tracing::info!(
193 server_name = %name,
194 pid = ?pid,
195 elapsed_ms = elapsed.as_millis(),
196 "MCP server process started successfully"
197 );
198
199 Ok(Self {
200 name,
201 connection,
202 tools: Vec::new(),
203 initialized: false,
204 request_id: AtomicU64::new(1),
205 total_requests: AtomicU64::new(0),
206 total_request_time_ms: AtomicU64::new(0),
207 connected_at: Some(start_time),
208 initialized_at: None,
209 })
210 }
211
212 #[instrument(
222 name = "mcp_initialize",
223 skip(self),
224 fields(
225 server_name = %self.name,
226 timeout_secs = DEFAULT_INIT_TIMEOUT.as_secs(),
227 )
228 )]
229 pub async fn initialize(&mut self) -> Result<(), ExternalMcpError> {
230 let init_start = Instant::now();
231
232 tracing::info!(
233 server_name = %self.name,
234 "Starting MCP server initialization"
235 );
236
237 let init_result = tokio::time::timeout(DEFAULT_INIT_TIMEOUT, async {
239 let request_id = self.next_request_id();
241 let request = JsonRpcRequest::new(
242 request_id,
243 "initialize",
244 Some(serde_json::json!({
245 "protocolVersion": "2024-11-05",
246 "capabilities": {},
247 "clientInfo": {
248 "name": "claude-code-acp-rs",
249 "version": env!("CARGO_PKG_VERSION")
250 }
251 })),
252 );
253
254 tracing::debug!(
255 server_name = %self.name,
256 request_id = request_id,
257 "Sending initialize request"
258 );
259
260 let init_response = self.send_request_internal(request).await?;
261
262 if let Some(ref result) = init_response.result {
264 if let Some(server_info) = result.get("serverInfo") {
265 tracing::info!(
266 server_name = %self.name,
267 remote_server_name = ?server_info.get("name"),
268 remote_server_version = ?server_info.get("version"),
269 protocol_version = ?result.get("protocolVersion"),
270 "Received initialize response from MCP server"
271 );
272 }
273 }
274
275 tracing::debug!(
277 server_name = %self.name,
278 "Sending initialized notification"
279 );
280 self.send_notification("notifications/initialized", None)
281 .await?;
282
283 let tools_request_id = self.next_request_id();
285 let tools_request = JsonRpcRequest::new(tools_request_id, "tools/list", None);
286
287 tracing::debug!(
288 server_name = %self.name,
289 request_id = tools_request_id,
290 "Sending tools/list request"
291 );
292
293 let tools_response = self.send_request_internal(tools_request).await?;
294
295 if let Some(result) = tools_response.result {
297 if let Some(tools) = result.get("tools").and_then(|t| t.as_array()) {
298 self.tools = tools
299 .iter()
300 .filter_map(|t| {
301 let name = t.get("name")?.as_str()?;
302 let description =
303 t.get("description").and_then(|d| d.as_str()).unwrap_or("");
304 let input_schema = t
305 .get("inputSchema")
306 .cloned()
307 .unwrap_or(serde_json::json!({"type": "object"}));
308
309 Some(ToolSchema {
310 name: name.to_string(),
311 description: description.to_string(),
312 input_schema,
313 })
314 })
315 .collect();
316
317 let tool_names: Vec<&str> =
319 self.tools.iter().map(|t| t.name.as_str()).collect();
320 tracing::info!(
321 server_name = %self.name,
322 tool_count = self.tools.len(),
323 tools = ?tool_names,
324 "Received tools from MCP server"
325 );
326 }
327 }
328
329 Ok::<(), ExternalMcpError>(())
330 })
331 .await;
332
333 match init_result {
334 Ok(Ok(())) => {
335 self.initialized = true;
336 self.initialized_at = Some(Instant::now());
337
338 let elapsed = init_start.elapsed();
339 tracing::info!(
340 server_name = %self.name,
341 elapsed_ms = elapsed.as_millis(),
342 tool_count = self.tools.len(),
343 "MCP server initialization completed successfully"
344 );
345
346 Ok(())
347 }
348 Ok(Err(e)) => {
349 let elapsed = init_start.elapsed();
350 tracing::error!(
351 server_name = %self.name,
352 elapsed_ms = elapsed.as_millis(),
353 error = %e,
354 "MCP server initialization failed"
355 );
356 Err(e)
357 }
358 Err(_) => {
359 let elapsed = init_start.elapsed();
360 tracing::error!(
361 server_name = %self.name,
362 elapsed_ms = elapsed.as_millis(),
363 timeout_secs = DEFAULT_INIT_TIMEOUT.as_secs(),
364 "MCP server initialization timed out"
365 );
366 #[allow(clippy::cast_possible_truncation)]
367 Err(ExternalMcpError::Timeout {
368 operation: "initialize".to_string(),
369 timeout_ms: DEFAULT_INIT_TIMEOUT.as_millis() as u64,
370 })
371 }
372 }
373 }
374
375 fn next_request_id(&self) -> u64 {
377 self.request_id.fetch_add(1, Ordering::SeqCst)
378 }
379
380 #[instrument(
384 name = "mcp_send_request",
385 skip(self, request),
386 fields(
387 server_name = %self.name,
388 method = %request.method,
389 request_id = request.id,
390 )
391 )]
392 async fn send_request(
393 &mut self,
394 request: JsonRpcRequest,
395 ) -> Result<JsonRpcResponse, ExternalMcpError> {
396 let method = request.method.clone();
397 let request_id = request.id;
398
399 let result =
400 tokio::time::timeout(DEFAULT_REQUEST_TIMEOUT, self.send_request_internal(request))
401 .await;
402
403 if let Ok(inner_result) = result { inner_result } else {
404 tracing::error!(
405 server_name = %self.name,
406 method = %method,
407 request_id = request_id,
408 timeout_ms = DEFAULT_REQUEST_TIMEOUT.as_millis(),
409 "MCP request timed out"
410 );
411 #[allow(clippy::cast_possible_truncation)]
412 Err(ExternalMcpError::Timeout {
413 operation: method,
414 timeout_ms: DEFAULT_REQUEST_TIMEOUT.as_millis() as u64,
415 })
416 }
417 }
418
419 async fn send_request_internal(
421 &mut self,
422 request: JsonRpcRequest,
423 ) -> Result<JsonRpcResponse, ExternalMcpError> {
424 let start_time = Instant::now();
425 let method = request.method.clone();
426 let request_id = request.id;
427
428 let McpConnection::Stdio { stdin, stdout, .. } = &mut self.connection;
429
430 let request_json = serde_json::to_string(&request)
432 .map_err(|e| ExternalMcpError::SerializationError(e.to_string()))?;
433
434 tracing::debug!(
435 server_name = %self.name,
436 method = %method,
437 request_id = request_id,
438 request_size = request_json.len(),
439 "Sending JSON-RPC request to MCP server"
440 );
441
442 stdin
443 .write_all(request_json.as_bytes())
444 .await
445 .map_err(|e| {
446 tracing::error!(
447 server_name = %self.name,
448 method = %method,
449 request_size = request_json.len(),
450 error = %e,
451 error_type = %std::any::type_name::<std::io::Error>(),
452 error_kind = ?e.kind(),
453 "Failed to write request to MCP server"
454 );
455 ExternalMcpError::WriteError(e.to_string())
456 })?;
457 stdin
458 .write_all(b"\n")
459 .await
460 .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
461 stdin
462 .flush()
463 .await
464 .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
465
466 let write_elapsed = start_time.elapsed();
467 tracing::debug!(
468 server_name = %self.name,
469 method = %method,
470 write_elapsed_ms = write_elapsed.as_millis(),
471 "Request sent, waiting for response"
472 );
473
474 let mut line = String::new();
476 stdout.read_line(&mut line).await.map_err(|e| {
477 tracing::error!(
478 server_name = %self.name,
479 method = %method,
480 error = %e,
481 "Failed to read response from MCP server"
482 );
483 ExternalMcpError::ReadError(e.to_string())
484 })?;
485
486 let total_elapsed = start_time.elapsed();
487
488 self.total_requests.fetch_add(1, Ordering::Relaxed);
490 #[allow(clippy::cast_possible_truncation)]
491 self.total_request_time_ms
492 .fetch_add(total_elapsed.as_millis() as u64, Ordering::Relaxed);
493
494 tracing::debug!(
495 server_name = %self.name,
496 method = %method,
497 request_id = request_id,
498 response_size = line.len(),
499 elapsed_ms = total_elapsed.as_millis(),
500 "Received response from MCP server"
501 );
502
503 let response: JsonRpcResponse = serde_json::from_str(&line).map_err(|e| {
504 tracing::error!(
505 server_name = %self.name,
506 method = %method,
507 error = %e,
508 response_preview = %line.chars().take(200).collect::<String>(),
509 "Failed to parse JSON-RPC response"
510 );
511 ExternalMcpError::DeserializationError(e.to_string())
512 })?;
513
514 let read_elapsed = total_elapsed.saturating_sub(write_elapsed);
515
516 tracing::info!(
518 server_name = %self.name,
519 method = %method,
520 request_id = request_id,
521 request_size_bytes = request_json.len(),
522 response_size_bytes = line.len(),
523 write_duration_ms = write_elapsed.as_millis(),
524 read_duration_ms = read_elapsed.as_millis(),
525 total_round_trip_ms = total_elapsed.as_millis(),
526 "MCP JSON-RPC request completed successfully"
527 );
528
529 if let Some(error) = response.error {
530 tracing::warn!(
531 server_name = %self.name,
532 method = %method,
533 request_id = request_id,
534 error_code = error.code,
535 error_message = %error.message,
536 elapsed_ms = total_elapsed.as_millis(),
537 "MCP server returned error"
538 );
539 return Err(ExternalMcpError::RpcError {
540 code: error.code,
541 message: error.message,
542 });
543 }
544
545 tracing::debug!(
546 server_name = %self.name,
547 method = %method,
548 request_id = request_id,
549 elapsed_ms = total_elapsed.as_millis(),
550 "MCP request completed successfully"
551 );
552
553 Ok(response)
554 }
555
556 async fn send_notification(
558 &mut self,
559 method: &str,
560 params: Option<serde_json::Value>,
561 ) -> Result<(), ExternalMcpError> {
562 let McpConnection::Stdio { stdin, .. } = &mut self.connection;
563
564 let notification = serde_json::json!({
565 "jsonrpc": "2.0",
566 "method": method,
567 "params": params
568 });
569
570 let notification_json = serde_json::to_string(¬ification)
571 .map_err(|e| ExternalMcpError::SerializationError(e.to_string()))?;
572
573 stdin
574 .write_all(notification_json.as_bytes())
575 .await
576 .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
577 stdin
578 .write_all(b"\n")
579 .await
580 .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
581 stdin
582 .flush()
583 .await
584 .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
585
586 Ok(())
587 }
588
589 #[instrument(
593 name = "mcp_call_tool",
594 skip(self, arguments),
595 fields(
596 server_name = %self.name,
597 tool_name = %tool_name,
598 args_size = arguments.to_string().len(),
599 )
600 )]
601 pub async fn call_tool(
602 &mut self,
603 tool_name: &str,
604 arguments: serde_json::Value,
605 ) -> Result<ToolResult, ExternalMcpError> {
606 let start_time = Instant::now();
607
608 if !self.initialized {
609 tracing::error!(
610 server_name = %self.name,
611 tool_name = %tool_name,
612 "Attempted to call tool on uninitialized server"
613 );
614 return Err(ExternalMcpError::NotInitialized);
615 }
616
617 tracing::info!(
618 server_name = %self.name,
619 tool_name = %tool_name,
620 "Calling external MCP tool"
621 );
622
623 let request_id = self.next_request_id();
624 let request = JsonRpcRequest::new(
625 request_id,
626 "tools/call",
627 Some(serde_json::json!({
628 "name": tool_name,
629 "arguments": arguments
630 })),
631 );
632
633 let response = self.send_request(request).await?;
634
635 let elapsed = start_time.elapsed();
636
637 if let Some(result) = response.result {
639 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
641 let text: Vec<String> = content
642 .iter()
643 .filter_map(|c| {
644 if c.get("type").and_then(|t| t.as_str()) == Some("text") {
645 c.get("text").and_then(|t| t.as_str()).map(String::from)
646 } else {
647 None
648 }
649 })
650 .collect();
651
652 let is_error = result
653 .get("is_error")
654 .or_else(|| result.get("isError")) .and_then(|e| e.as_bool())
656 .unwrap_or(false);
657
658 let result_preview = text.join("\n").chars().take(200).collect::<String>();
659
660 if is_error {
661 tracing::warn!(
662 server_name = %self.name,
663 tool_name = %tool_name,
664 elapsed_ms = elapsed.as_millis(),
665 result_preview = %result_preview,
666 "External MCP tool returned error"
667 );
668 return Ok(ToolResult::error(text.join("\n")));
669 }
670
671 tracing::info!(
672 server_name = %self.name,
673 tool_name = %tool_name,
674 elapsed_ms = elapsed.as_millis(),
675 result_len = text.iter().map(|s| s.len()).sum::<usize>(),
676 "External MCP tool completed successfully"
677 );
678 return Ok(ToolResult::success(text.join("\n")));
679 }
680
681 tracing::info!(
683 server_name = %self.name,
684 tool_name = %tool_name,
685 elapsed_ms = elapsed.as_millis(),
686 "External MCP tool completed (raw JSON response)"
687 );
688 Ok(ToolResult::success(result.to_string()))
689 } else {
690 tracing::info!(
691 server_name = %self.name,
692 tool_name = %tool_name,
693 elapsed_ms = elapsed.as_millis(),
694 "External MCP tool completed (empty response)"
695 );
696 Ok(ToolResult::success(""))
697 }
698 }
699
700 pub fn stats(&self) -> McpServerStats {
702 McpServerStats {
703 server_name: self.name.clone(),
704 total_requests: self.total_requests.load(Ordering::Relaxed),
705 total_request_time_ms: self.total_request_time_ms.load(Ordering::Relaxed),
706 tool_count: self.tools.len(),
707 initialized: self.initialized,
708 connected_at: self.connected_at,
709 initialized_at: self.initialized_at,
710 }
711 }
712
713 pub fn tools(&self) -> &[ToolSchema] {
715 &self.tools
716 }
717
718 pub fn is_initialized(&self) -> bool {
720 self.initialized
721 }
722}
723
724#[allow(missing_debug_implementations)]
726pub struct ExternalMcpManager {
727 servers: DashMap<String, Arc<tokio::sync::Mutex<ExternalMcpServer>>>,
731}
732
733impl ExternalMcpManager {
734 pub fn new() -> Self {
736 Self {
737 servers: DashMap::new(),
738 }
739 }
740
741 #[instrument(
746 name = "mcp_manager_connect",
747 skip(self, env, cwd),
748 fields(
749 server_name = %name,
750 command = %command,
751 )
752 )]
753 pub async fn connect(
754 &self,
755 name: String,
756 command: &str,
757 args: &[String],
758 env: Option<&HashMap<String, String>>,
759 cwd: Option<&Path>,
760 ) -> Result<(), ExternalMcpError> {
761 let overall_start = Instant::now();
762
763 tracing::info!(
764 server_name = %name,
765 command = %command,
766 args = ?args,
767 "Connecting to external MCP server"
768 );
769
770 let connect_start = Instant::now();
772 let mut server =
773 ExternalMcpServer::connect_stdio(name.clone(), command, args, env, cwd).await?;
774 let connect_elapsed = connect_start.elapsed();
775
776 tracing::debug!(
777 server_name = %name,
778 connect_elapsed_ms = connect_elapsed.as_millis(),
779 "MCP server process connected"
780 );
781
782 let init_start = Instant::now();
784 server.initialize().await?;
785 let init_elapsed = init_start.elapsed();
786
787 let overall_elapsed = overall_start.elapsed();
788
789 tracing::info!(
790 server_name = %name,
791 tool_count = server.tools().len(),
792 connect_elapsed_ms = connect_elapsed.as_millis(),
793 init_elapsed_ms = init_elapsed.as_millis(),
794 total_elapsed_ms = overall_elapsed.as_millis(),
795 "Successfully connected and initialized MCP server"
796 );
797
798 let tool_names: Vec<&str> = server.tools().iter().map(|t| t.name.as_str()).collect();
800 tracing::debug!(
801 server_name = %name,
802 tools = ?tool_names,
803 "MCP server tools available"
804 );
805
806 self.servers
808 .insert(name, Arc::new(tokio::sync::Mutex::new(server)));
809 Ok(())
810 }
811
812 pub fn disconnect(&self, name: &str) {
814 self.servers.remove(name);
815 }
816
817 pub fn server_names(&self) -> Vec<String> {
819 self.servers
820 .iter()
821 .map(|entry| entry.key().clone())
822 .collect()
823 }
824
825 pub fn all_tools(&self) -> Vec<ToolSchema> {
829 let mut tools = Vec::new();
830
831 for entry in &self.servers {
832 let server_name = entry.key();
833 let server = entry.value();
834 let Ok(server_guard) = server.try_lock() else {
836 tracing::warn!(
837 server_name = %server_name,
838 "MCP server is busy, skipping for tool listing"
839 );
840 continue; };
842
843 for tool in server_guard.tools() {
844 tools.push(ToolSchema {
845 name: format!("mcp__{}_{}", server_name, tool.name),
846 description: format!("[{}] {}", server_name, tool.description),
847 input_schema: tool.input_schema.clone(),
848 });
849 }
850 }
851
852 tools
853 }
854
855 #[instrument(
859 name = "mcp_manager_call_tool",
860 skip(self, arguments),
861 fields(
862 full_tool_name = %full_tool_name,
863 )
864 )]
865 pub async fn call_tool(
866 &self,
867 full_tool_name: &str,
868 arguments: serde_json::Value,
869 ) -> Result<ToolResult, ExternalMcpError> {
870 let parts: Vec<&str> = full_tool_name.splitn(3, "__").collect();
872 if parts.len() != 3 || parts[0] != "mcp" {
873 tracing::warn!(
874 full_tool_name = %full_tool_name,
875 "Invalid external MCP tool name format"
876 );
877 return Err(ExternalMcpError::InvalidToolName(
878 full_tool_name.to_string(),
879 ));
880 }
881
882 let server_name = parts[1];
883 let tool_name = parts[2];
884
885 Span::current().record("server_name", server_name);
887 Span::current().record("tool_name", tool_name);
888
889 tracing::debug!(
890 server_name = %server_name,
891 tool_name = %tool_name,
892 "Routing tool call to external MCP server"
893 );
894
895 let server_arc = self.servers.get(server_name).ok_or_else(|| {
897 let available: Vec<String> = self.server_names();
898 tracing::error!(
899 server_name = %server_name,
900 tool_name = %tool_name,
901 available_servers = ?available,
902 "External MCP server not found"
903 );
904 ExternalMcpError::ServerNotFound(server_name.to_string())
905 })?;
906
907 let server = server_arc.clone();
909 drop(server_arc); let start_time = Instant::now();
912
913 let result = {
916 let mut server_guard = server.lock().await;
917 server_guard.call_tool(tool_name, arguments).await?
918 };
919
920 let elapsed = start_time.elapsed();
921 tracing::info!(
922 server_name = %server_name,
923 tool_name = %tool_name,
924 elapsed_ms = elapsed.as_millis(),
925 is_error = result.is_error,
926 "External MCP tool call completed"
927 );
928
929 Ok(result)
930 }
931
932 pub fn all_stats(&self) -> Vec<McpServerStats> {
934 self.servers
935 .iter()
936 .filter_map(|entry| {
937 let server = entry.value();
938 if let Ok(guard) = server.try_lock() { Some(guard.stats()) } else {
940 tracing::warn!(
941 server_name = %entry.key(),
942 "MCP server is busy, skipping for stats"
943 );
944 None
945 }
946 })
947 .collect()
948 }
949
950 pub fn is_external_tool(name: &str) -> bool {
955 if !name.starts_with("mcp__") {
956 return false;
957 }
958
959 let parts: Vec<&str> = name.splitn(3, "__").collect();
961 if parts.len() != 3 || parts[0] != "mcp" {
962 return false;
963 }
964
965 parts[1] != "acp"
967 }
968
969 pub fn get_friendly_tool_name(name: &str) -> Option<String> {
980 if !Self::is_external_tool(name) {
981 return None;
982 }
983
984 let parts: Vec<&str> = name.splitn(3, "__").collect();
985 let server_name = parts.get(1)?;
986 let tool_name = parts.get(2)?;
987
988 match (*server_name, *tool_name) {
991 ("web-fetch", "webReader") => Some("WebFetch".to_string()),
993 ("web-reader", "webReader") => Some("WebFetch".to_string()),
994
995 ("web-search-prime", "webSearchPrime") => Some("WebSearch".to_string()),
997
998 _ => None,
1000 }
1001 }
1002}
1003
1004impl Default for ExternalMcpManager {
1005 fn default() -> Self {
1006 Self::new()
1007 }
1008}
1009
1010#[derive(Debug, Clone)]
1012pub struct McpServerStats {
1013 pub server_name: String,
1015 pub total_requests: u64,
1017 pub total_request_time_ms: u64,
1019 pub tool_count: usize,
1021 pub initialized: bool,
1023 pub connected_at: Option<Instant>,
1025 pub initialized_at: Option<Instant>,
1027}
1028
1029impl McpServerStats {
1030 #[allow(clippy::cast_precision_loss)]
1032 pub fn avg_request_time_ms(&self) -> f64 {
1033 if self.total_requests == 0 {
1034 0.0
1035 } else {
1036 self.total_request_time_ms as f64 / self.total_requests as f64
1037 }
1038 }
1039
1040 pub fn uptime(&self) -> Option<Duration> {
1042 self.connected_at.map(|t| t.elapsed())
1043 }
1044}
1045
1046#[derive(Debug, thiserror::Error)]
1048pub enum ExternalMcpError {
1049 #[error("Failed to spawn MCP server '{command}': {error}")]
1051 SpawnFailed { command: String, error: String },
1052
1053 #[error("No stdin available for MCP server")]
1055 NoStdin,
1056
1057 #[error("No stdout available for MCP server")]
1059 NoStdout,
1060
1061 #[error("Serialization error: {0}")]
1063 SerializationError(String),
1064
1065 #[error("Deserialization error: {0}")]
1067 DeserializationError(String),
1068
1069 #[error("Write error: {0}")]
1071 WriteError(String),
1072
1073 #[error("Read error: {0}")]
1075 ReadError(String),
1076
1077 #[error("RPC error {code}: {message}")]
1079 RpcError { code: i64, message: String },
1080
1081 #[error("Server not initialized")]
1083 NotInitialized,
1084
1085 #[error("Invalid tool name format: {0}")]
1087 InvalidToolName(String),
1088
1089 #[error("MCP server not found: {0}")]
1091 ServerNotFound(String),
1092
1093 #[error("MCP operation '{operation}' timed out after {timeout_ms}ms")]
1095 Timeout { operation: String, timeout_ms: u64 },
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use super::*;
1101
1102 #[test]
1103 fn test_external_mcp_manager_new() {
1104 let _manager = ExternalMcpManager::new();
1105 assert!(ExternalMcpManager::is_external_tool("mcp__server__tool"));
1107 assert!(!ExternalMcpManager::is_external_tool("Read"));
1108 assert!(!ExternalMcpManager::is_external_tool("mcp__acp__Read"));
1109 }
1110
1111 #[test]
1112 fn test_is_external_tool() {
1113 assert!(ExternalMcpManager::is_external_tool(
1115 "mcp__myserver__mytool"
1116 ));
1117 assert!(ExternalMcpManager::is_external_tool(
1118 "mcp__filesystem__read_file"
1119 ));
1120
1121 assert!(!ExternalMcpManager::is_external_tool("Read"));
1123 assert!(!ExternalMcpManager::is_external_tool("Bash"));
1124 assert!(!ExternalMcpManager::is_external_tool("mcp__acp__Read")); assert!(!ExternalMcpManager::is_external_tool("mcp__single")); }
1127
1128 #[tokio::test]
1129 async fn test_manager_server_names_empty() {
1130 let manager = ExternalMcpManager::new();
1131 let names = manager.server_names();
1132 assert!(names.is_empty());
1133 }
1134
1135 #[tokio::test]
1136 async fn test_manager_all_tools_empty() {
1137 let manager = ExternalMcpManager::new();
1138 let tools = manager.all_tools();
1139 assert!(tools.is_empty());
1140 }
1141
1142 #[test]
1143 fn test_get_friendly_tool_name_web_fetch() {
1144 assert_eq!(
1145 ExternalMcpManager::get_friendly_tool_name("mcp__web-fetch__webReader"),
1146 Some("WebFetch".to_string())
1147 );
1148 assert_eq!(
1149 ExternalMcpManager::get_friendly_tool_name("mcp__web-reader__webReader"),
1150 Some("WebFetch".to_string())
1151 );
1152 }
1153
1154 #[test]
1155 fn test_get_friendly_tool_name_web_search() {
1156 assert_eq!(
1157 ExternalMcpManager::get_friendly_tool_name("mcp__web-search-prime__webSearchPrime"),
1158 Some("WebSearch".to_string())
1159 );
1160 }
1161
1162 #[test]
1163 fn test_get_friendly_tool_name_non_mcp_tool() {
1164 assert_eq!(ExternalMcpManager::get_friendly_tool_name("Read"), None);
1165 assert_eq!(ExternalMcpManager::get_friendly_tool_name("Bash"), None);
1166 assert_eq!(
1167 ExternalMcpManager::get_friendly_tool_name("mcp__acp__Read"),
1168 None
1169 );
1170 }
1171
1172 #[test]
1173 fn test_get_friendly_tool_name_unknown_mcp_tool() {
1174 assert_eq!(
1176 ExternalMcpManager::get_friendly_tool_name("mcp__zai-mcp-server__ui_to_artifact"),
1177 None
1178 );
1179 assert_eq!(
1180 ExternalMcpManager::get_friendly_tool_name("mcp__context7__query-docs"),
1181 None
1182 );
1183 assert_eq!(
1184 ExternalMcpManager::get_friendly_tool_name("mcp__my-server__my_custom_tool"),
1185 None
1186 );
1187 }
1188}