claude_code_acp/mcp/
external.rs

1//! External MCP server management
2//!
3//! Supports connecting to external MCP servers for extended tool capabilities.
4
5use std::collections::HashMap;
6use std::path::Path;
7use std::process::Stdio;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11
12use dashmap::DashMap;
13use serde::{Deserialize, Serialize};
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::{Child, ChildStdin, ChildStdout, Command};
16use tracing::{Span, instrument};
17
18use super::registry::{ToolResult, ToolSchema};
19
20/// Default timeout for MCP requests (3 minutes)
21/// WebSearch and WebFetch may need significant time due to network I/O
22const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(180);
23
24/// Default timeout for MCP initialization (60 seconds, MCP servers may need time to start)
25const DEFAULT_INIT_TIMEOUT: Duration = Duration::from_secs(60);
26
27/// External MCP server connection type
28pub enum McpConnection {
29    /// Stdio-based connection (spawned process)
30    Stdio {
31        /// The spawned process
32        #[allow(dead_code)]
33        child: Child,
34        /// Writer to send messages
35        stdin: ChildStdin,
36        /// Reader to receive messages
37        stdout: BufReader<ChildStdout>,
38    },
39}
40
41/// External MCP server state
42#[allow(missing_debug_implementations)]
43pub struct ExternalMcpServer {
44    /// Server name
45    pub name: String,
46    /// Connection to the server
47    connection: McpConnection,
48    /// Available tools from this server
49    tools: Vec<ToolSchema>,
50    /// Whether the server is initialized
51    initialized: bool,
52    /// Request ID counter for JSON-RPC
53    request_id: AtomicU64,
54    /// Total requests sent to this server
55    total_requests: AtomicU64,
56    /// Total time spent on requests (in milliseconds)
57    total_request_time_ms: AtomicU64,
58    /// Time when server was connected
59    connected_at: Option<Instant>,
60    /// Time when server was initialized
61    initialized_at: Option<Instant>,
62}
63
64/// JSON-RPC request structure
65#[derive(Debug, Serialize)]
66struct JsonRpcRequest {
67    jsonrpc: &'static str,
68    id: u64,
69    method: String,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    params: Option<serde_json::Value>,
72}
73
74impl JsonRpcRequest {
75    fn new(id: u64, method: impl Into<String>, params: Option<serde_json::Value>) -> Self {
76        Self {
77            jsonrpc: "2.0",
78            id,
79            method: method.into(),
80            params,
81        }
82    }
83}
84
85/// JSON-RPC response structure
86#[derive(Debug, Deserialize)]
87struct JsonRpcResponse {
88    #[allow(dead_code)]
89    jsonrpc: String,
90    #[allow(dead_code)]
91    id: u64,
92    result: Option<serde_json::Value>,
93    error: Option<JsonRpcError>,
94}
95
96/// JSON-RPC error
97#[derive(Debug, Deserialize)]
98struct JsonRpcError {
99    code: i64,
100    message: String,
101}
102
103impl ExternalMcpServer {
104    /// Connect to an external MCP server via stdio
105    ///
106    /// This spawns the MCP server process and establishes stdio communication.
107    /// Use `initialize()` after connecting to complete the handshake.
108    #[instrument(
109        name = "mcp_connect_stdio",
110        skip(env, cwd),
111        fields(
112            server_name = %name,
113            command = %command,
114            args_count = args.len(),
115            has_env = env.is_some(),
116            has_cwd = cwd.is_some(),
117        )
118    )]
119    pub async fn connect_stdio(
120        name: String,
121        command: &str,
122        args: &[String],
123        env: Option<&HashMap<String, String>>,
124        cwd: Option<&Path>,
125    ) -> Result<Self, ExternalMcpError> {
126        let start_time = Instant::now();
127
128        tracing::info!(
129            server_name = %name,
130            command = %command,
131            args = ?args,
132            cwd = ?cwd,
133            "Starting external MCP server process"
134        );
135
136        let mut cmd = Command::new(command);
137        cmd.args(args)
138            .stdin(Stdio::piped())
139            .stdout(Stdio::piped())
140            .stderr(Stdio::null());
141
142        if let Some(env) = env {
143            tracing::debug!(
144                server_name = %name,
145                env_vars = ?env.keys().collect::<Vec<_>>(),
146                "Setting environment variables for MCP server"
147            );
148            cmd.envs(env);
149        }
150
151        if let Some(cwd) = cwd {
152            cmd.current_dir(cwd);
153        }
154
155        let mut child = cmd.spawn().map_err(|e| {
156            tracing::error!(
157                server_name = %name,
158                command = %command,
159                cwd = ?cwd,
160                error = %e,
161                error_type = %std::any::type_name::<std::io::Error>(),
162                error_kind = ?e.kind(),
163                "Failed to spawn MCP server process"
164            );
165            ExternalMcpError::SpawnFailed {
166                command: command.to_string(),
167                error: e.to_string(),
168            }
169        })?;
170
171        let pid = child.id();
172        tracing::debug!(
173            server_name = %name,
174            pid = ?pid,
175            "MCP server process spawned"
176        );
177
178        let stdin = child.stdin.take().ok_or(ExternalMcpError::NoStdin)?;
179        let stdout = child
180            .stdout
181            .take()
182            .ok_or(ExternalMcpError::NoStdout)
183            .map(BufReader::new)?;
184
185        let connection = McpConnection::Stdio {
186            child,
187            stdin,
188            stdout,
189        };
190
191        let elapsed = start_time.elapsed();
192        tracing::info!(
193            server_name = %name,
194            pid = ?pid,
195            elapsed_ms = elapsed.as_millis(),
196            "MCP server process started successfully"
197        );
198
199        Ok(Self {
200            name,
201            connection,
202            tools: Vec::new(),
203            initialized: false,
204            request_id: AtomicU64::new(1),
205            total_requests: AtomicU64::new(0),
206            total_request_time_ms: AtomicU64::new(0),
207            connected_at: Some(start_time),
208            initialized_at: None,
209        })
210    }
211
212    /// Initialize the MCP server
213    ///
214    /// Performs the MCP handshake:
215    /// 1. Send initialize request with client info
216    /// 2. Send initialized notification
217    /// 3. List available tools
218    ///
219    /// This method has a timeout to prevent indefinite blocking if the server
220    /// is unresponsive.
221    #[instrument(
222        name = "mcp_initialize",
223        skip(self),
224        fields(
225            server_name = %self.name,
226            timeout_secs = DEFAULT_INIT_TIMEOUT.as_secs(),
227        )
228    )]
229    pub async fn initialize(&mut self) -> Result<(), ExternalMcpError> {
230        let init_start = Instant::now();
231
232        tracing::info!(
233            server_name = %self.name,
234            "Starting MCP server initialization"
235        );
236
237        // Wrap the entire initialization in a timeout
238        let init_result = tokio::time::timeout(DEFAULT_INIT_TIMEOUT, async {
239            // Send initialize request
240            let request_id = self.next_request_id();
241            let request = JsonRpcRequest::new(
242                request_id,
243                "initialize",
244                Some(serde_json::json!({
245                    "protocolVersion": "2024-11-05",
246                    "capabilities": {},
247                    "clientInfo": {
248                        "name": "claude-code-acp-rs",
249                        "version": env!("CARGO_PKG_VERSION")
250                    }
251                })),
252            );
253
254            tracing::debug!(
255                server_name = %self.name,
256                request_id = request_id,
257                "Sending initialize request"
258            );
259
260            let init_response = self.send_request_internal(request).await?;
261
262            // Log server info if available
263            if let Some(ref result) = init_response.result {
264                if let Some(server_info) = result.get("serverInfo") {
265                    tracing::info!(
266                        server_name = %self.name,
267                        remote_server_name = ?server_info.get("name"),
268                        remote_server_version = ?server_info.get("version"),
269                        protocol_version = ?result.get("protocolVersion"),
270                        "Received initialize response from MCP server"
271                    );
272                }
273            }
274
275            // Send initialized notification
276            tracing::debug!(
277                server_name = %self.name,
278                "Sending initialized notification"
279            );
280            self.send_notification("notifications/initialized", None)
281                .await?;
282
283            // List available tools
284            let tools_request_id = self.next_request_id();
285            let tools_request = JsonRpcRequest::new(tools_request_id, "tools/list", None);
286
287            tracing::debug!(
288                server_name = %self.name,
289                request_id = tools_request_id,
290                "Sending tools/list request"
291            );
292
293            let tools_response = self.send_request_internal(tools_request).await?;
294
295            // Parse tools from response
296            if let Some(result) = tools_response.result {
297                if let Some(tools) = result.get("tools").and_then(|t| t.as_array()) {
298                    self.tools = tools
299                        .iter()
300                        .filter_map(|t| {
301                            let name = t.get("name")?.as_str()?;
302                            let description =
303                                t.get("description").and_then(|d| d.as_str()).unwrap_or("");
304                            let input_schema = t
305                                .get("inputSchema")
306                                .cloned()
307                                .unwrap_or(serde_json::json!({"type": "object"}));
308
309                            Some(ToolSchema {
310                                name: name.to_string(),
311                                description: description.to_string(),
312                                input_schema,
313                            })
314                        })
315                        .collect();
316
317                    // Log tool names
318                    let tool_names: Vec<&str> =
319                        self.tools.iter().map(|t| t.name.as_str()).collect();
320                    tracing::info!(
321                        server_name = %self.name,
322                        tool_count = self.tools.len(),
323                        tools = ?tool_names,
324                        "Received tools from MCP server"
325                    );
326                }
327            }
328
329            Ok::<(), ExternalMcpError>(())
330        })
331        .await;
332
333        match init_result {
334            Ok(Ok(())) => {
335                self.initialized = true;
336                self.initialized_at = Some(Instant::now());
337
338                let elapsed = init_start.elapsed();
339                tracing::info!(
340                    server_name = %self.name,
341                    elapsed_ms = elapsed.as_millis(),
342                    tool_count = self.tools.len(),
343                    "MCP server initialization completed successfully"
344                );
345
346                Ok(())
347            }
348            Ok(Err(e)) => {
349                let elapsed = init_start.elapsed();
350                tracing::error!(
351                    server_name = %self.name,
352                    elapsed_ms = elapsed.as_millis(),
353                    error = %e,
354                    "MCP server initialization failed"
355                );
356                Err(e)
357            }
358            Err(_) => {
359                let elapsed = init_start.elapsed();
360                tracing::error!(
361                    server_name = %self.name,
362                    elapsed_ms = elapsed.as_millis(),
363                    timeout_secs = DEFAULT_INIT_TIMEOUT.as_secs(),
364                    "MCP server initialization timed out"
365                );
366                #[allow(clippy::cast_possible_truncation)]
367                Err(ExternalMcpError::Timeout {
368                    operation: "initialize".to_string(),
369                    timeout_ms: DEFAULT_INIT_TIMEOUT.as_millis() as u64,
370                })
371            }
372        }
373    }
374
375    /// Generate next request ID
376    fn next_request_id(&self) -> u64 {
377        self.request_id.fetch_add(1, Ordering::SeqCst)
378    }
379
380    /// Send a JSON-RPC request and wait for response (with timeout)
381    ///
382    /// This is the public API that wraps the internal method with a timeout.
383    #[instrument(
384        name = "mcp_send_request",
385        skip(self, request),
386        fields(
387            server_name = %self.name,
388            method = %request.method,
389            request_id = request.id,
390        )
391    )]
392    async fn send_request(
393        &mut self,
394        request: JsonRpcRequest,
395    ) -> Result<JsonRpcResponse, ExternalMcpError> {
396        let method = request.method.clone();
397        let request_id = request.id;
398
399        let result =
400            tokio::time::timeout(DEFAULT_REQUEST_TIMEOUT, self.send_request_internal(request))
401                .await;
402
403        if let Ok(inner_result) = result { inner_result } else {
404            tracing::error!(
405                server_name = %self.name,
406                method = %method,
407                request_id = request_id,
408                timeout_ms = DEFAULT_REQUEST_TIMEOUT.as_millis(),
409                "MCP request timed out"
410            );
411            #[allow(clippy::cast_possible_truncation)]
412            Err(ExternalMcpError::Timeout {
413                operation: method,
414                timeout_ms: DEFAULT_REQUEST_TIMEOUT.as_millis() as u64,
415            })
416        }
417    }
418
419    /// Internal implementation of send_request without timeout
420    async fn send_request_internal(
421        &mut self,
422        request: JsonRpcRequest,
423    ) -> Result<JsonRpcResponse, ExternalMcpError> {
424        let start_time = Instant::now();
425        let method = request.method.clone();
426        let request_id = request.id;
427
428        let McpConnection::Stdio { stdin, stdout, .. } = &mut self.connection;
429
430        // Serialize and send request
431        let request_json = serde_json::to_string(&request)
432            .map_err(|e| ExternalMcpError::SerializationError(e.to_string()))?;
433
434        tracing::debug!(
435            server_name = %self.name,
436            method = %method,
437            request_id = request_id,
438            request_size = request_json.len(),
439            "Sending JSON-RPC request to MCP server"
440        );
441
442        stdin
443            .write_all(request_json.as_bytes())
444            .await
445            .map_err(|e| {
446                tracing::error!(
447                    server_name = %self.name,
448                    method = %method,
449                    request_size = request_json.len(),
450                    error = %e,
451                    error_type = %std::any::type_name::<std::io::Error>(),
452                    error_kind = ?e.kind(),
453                    "Failed to write request to MCP server"
454                );
455                ExternalMcpError::WriteError(e.to_string())
456            })?;
457        stdin
458            .write_all(b"\n")
459            .await
460            .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
461        stdin
462            .flush()
463            .await
464            .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
465
466        let write_elapsed = start_time.elapsed();
467        tracing::debug!(
468            server_name = %self.name,
469            method = %method,
470            write_elapsed_ms = write_elapsed.as_millis(),
471            "Request sent, waiting for response"
472        );
473
474        // Read response
475        let mut line = String::new();
476        stdout.read_line(&mut line).await.map_err(|e| {
477            tracing::error!(
478                server_name = %self.name,
479                method = %method,
480                error = %e,
481                "Failed to read response from MCP server"
482            );
483            ExternalMcpError::ReadError(e.to_string())
484        })?;
485
486        let total_elapsed = start_time.elapsed();
487
488        // Update statistics
489        self.total_requests.fetch_add(1, Ordering::Relaxed);
490        #[allow(clippy::cast_possible_truncation)]
491        self.total_request_time_ms
492            .fetch_add(total_elapsed.as_millis() as u64, Ordering::Relaxed);
493
494        tracing::debug!(
495            server_name = %self.name,
496            method = %method,
497            request_id = request_id,
498            response_size = line.len(),
499            elapsed_ms = total_elapsed.as_millis(),
500            "Received response from MCP server"
501        );
502
503        let response: JsonRpcResponse = serde_json::from_str(&line).map_err(|e| {
504            tracing::error!(
505                server_name = %self.name,
506                method = %method,
507                error = %e,
508                response_preview = %line.chars().take(200).collect::<String>(),
509                "Failed to parse JSON-RPC response"
510            );
511            ExternalMcpError::DeserializationError(e.to_string())
512        })?;
513
514        let read_elapsed = total_elapsed.saturating_sub(write_elapsed);
515
516        // Comprehensive performance summary
517        tracing::info!(
518            server_name = %self.name,
519            method = %method,
520            request_id = request_id,
521            request_size_bytes = request_json.len(),
522            response_size_bytes = line.len(),
523            write_duration_ms = write_elapsed.as_millis(),
524            read_duration_ms = read_elapsed.as_millis(),
525            total_round_trip_ms = total_elapsed.as_millis(),
526            "MCP JSON-RPC request completed successfully"
527        );
528
529        if let Some(error) = response.error {
530            tracing::warn!(
531                server_name = %self.name,
532                method = %method,
533                request_id = request_id,
534                error_code = error.code,
535                error_message = %error.message,
536                elapsed_ms = total_elapsed.as_millis(),
537                "MCP server returned error"
538            );
539            return Err(ExternalMcpError::RpcError {
540                code: error.code,
541                message: error.message,
542            });
543        }
544
545        tracing::debug!(
546            server_name = %self.name,
547            method = %method,
548            request_id = request_id,
549            elapsed_ms = total_elapsed.as_millis(),
550            "MCP request completed successfully"
551        );
552
553        Ok(response)
554    }
555
556    /// Send a JSON-RPC notification (no response expected)
557    async fn send_notification(
558        &mut self,
559        method: &str,
560        params: Option<serde_json::Value>,
561    ) -> Result<(), ExternalMcpError> {
562        let McpConnection::Stdio { stdin, .. } = &mut self.connection;
563
564        let notification = serde_json::json!({
565            "jsonrpc": "2.0",
566            "method": method,
567            "params": params
568        });
569
570        let notification_json = serde_json::to_string(&notification)
571            .map_err(|e| ExternalMcpError::SerializationError(e.to_string()))?;
572
573        stdin
574            .write_all(notification_json.as_bytes())
575            .await
576            .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
577        stdin
578            .write_all(b"\n")
579            .await
580            .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
581        stdin
582            .flush()
583            .await
584            .map_err(|e| ExternalMcpError::WriteError(e.to_string()))?;
585
586        Ok(())
587    }
588
589    /// Call a tool on this server
590    ///
591    /// Executes a tool on the external MCP server with timeout protection.
592    #[instrument(
593        name = "mcp_call_tool",
594        skip(self, arguments),
595        fields(
596            server_name = %self.name,
597            tool_name = %tool_name,
598            args_size = arguments.to_string().len(),
599        )
600    )]
601    pub async fn call_tool(
602        &mut self,
603        tool_name: &str,
604        arguments: serde_json::Value,
605    ) -> Result<ToolResult, ExternalMcpError> {
606        let start_time = Instant::now();
607
608        if !self.initialized {
609            tracing::error!(
610                server_name = %self.name,
611                tool_name = %tool_name,
612                "Attempted to call tool on uninitialized server"
613            );
614            return Err(ExternalMcpError::NotInitialized);
615        }
616
617        tracing::info!(
618            server_name = %self.name,
619            tool_name = %tool_name,
620            "Calling external MCP tool"
621        );
622
623        let request_id = self.next_request_id();
624        let request = JsonRpcRequest::new(
625            request_id,
626            "tools/call",
627            Some(serde_json::json!({
628                "name": tool_name,
629                "arguments": arguments
630            })),
631        );
632
633        let response = self.send_request(request).await?;
634
635        let elapsed = start_time.elapsed();
636
637        // Parse tool result
638        if let Some(result) = response.result {
639            // Check if result has content array (MCP format)
640            if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
641                let text: Vec<String> = content
642                    .iter()
643                    .filter_map(|c| {
644                        if c.get("type").and_then(|t| t.as_str()) == Some("text") {
645                            c.get("text").and_then(|t| t.as_str()).map(String::from)
646                        } else {
647                            None
648                        }
649                    })
650                    .collect();
651
652                let is_error = result
653                    .get("is_error")
654                    .or_else(|| result.get("isError")) // Support both snake_case and camelCase
655                    .and_then(|e| e.as_bool())
656                    .unwrap_or(false);
657
658                let result_preview = text.join("\n").chars().take(200).collect::<String>();
659
660                if is_error {
661                    tracing::warn!(
662                        server_name = %self.name,
663                        tool_name = %tool_name,
664                        elapsed_ms = elapsed.as_millis(),
665                        result_preview = %result_preview,
666                        "External MCP tool returned error"
667                    );
668                    return Ok(ToolResult::error(text.join("\n")));
669                }
670
671                tracing::info!(
672                    server_name = %self.name,
673                    tool_name = %tool_name,
674                    elapsed_ms = elapsed.as_millis(),
675                    result_len = text.iter().map(|s| s.len()).sum::<usize>(),
676                    "External MCP tool completed successfully"
677                );
678                return Ok(ToolResult::success(text.join("\n")));
679            }
680
681            // Fallback: return raw JSON
682            tracing::info!(
683                server_name = %self.name,
684                tool_name = %tool_name,
685                elapsed_ms = elapsed.as_millis(),
686                "External MCP tool completed (raw JSON response)"
687            );
688            Ok(ToolResult::success(result.to_string()))
689        } else {
690            tracing::info!(
691                server_name = %self.name,
692                tool_name = %tool_name,
693                elapsed_ms = elapsed.as_millis(),
694                "External MCP tool completed (empty response)"
695            );
696            Ok(ToolResult::success(""))
697        }
698    }
699
700    /// Get server statistics
701    pub fn stats(&self) -> McpServerStats {
702        McpServerStats {
703            server_name: self.name.clone(),
704            total_requests: self.total_requests.load(Ordering::Relaxed),
705            total_request_time_ms: self.total_request_time_ms.load(Ordering::Relaxed),
706            tool_count: self.tools.len(),
707            initialized: self.initialized,
708            connected_at: self.connected_at,
709            initialized_at: self.initialized_at,
710        }
711    }
712
713    /// Get available tools from this server
714    pub fn tools(&self) -> &[ToolSchema] {
715        &self.tools
716    }
717
718    /// Check if the server is initialized
719    pub fn is_initialized(&self) -> bool {
720        self.initialized
721    }
722}
723
724/// Manager for multiple external MCP servers
725#[allow(missing_debug_implementations)]
726pub struct ExternalMcpManager {
727    /// Connected servers by name
728    /// Using DashMap for lock-free concurrent access to different servers
729    /// Using tokio::sync::Mutex to allow holding lock across .await points
730    servers: DashMap<String, Arc<tokio::sync::Mutex<ExternalMcpServer>>>,
731}
732
733impl ExternalMcpManager {
734    /// Create a new external MCP manager
735    pub fn new() -> Self {
736        Self {
737            servers: DashMap::new(),
738        }
739    }
740
741    /// Connect to an MCP server
742    ///
743    /// This method spawns the MCP server process, establishes communication,
744    /// and performs the MCP handshake (initialize + tools/list).
745    #[instrument(
746        name = "mcp_manager_connect",
747        skip(self, env, cwd),
748        fields(
749            server_name = %name,
750            command = %command,
751        )
752    )]
753    pub async fn connect(
754        &self,
755        name: String,
756        command: &str,
757        args: &[String],
758        env: Option<&HashMap<String, String>>,
759        cwd: Option<&Path>,
760    ) -> Result<(), ExternalMcpError> {
761        let overall_start = Instant::now();
762
763        tracing::info!(
764            server_name = %name,
765            command = %command,
766            args = ?args,
767            "Connecting to external MCP server"
768        );
769
770        // Step 1: Spawn and connect
771        let connect_start = Instant::now();
772        let mut server =
773            ExternalMcpServer::connect_stdio(name.clone(), command, args, env, cwd).await?;
774        let connect_elapsed = connect_start.elapsed();
775
776        tracing::debug!(
777            server_name = %name,
778            connect_elapsed_ms = connect_elapsed.as_millis(),
779            "MCP server process connected"
780        );
781
782        // Step 2: Initialize
783        let init_start = Instant::now();
784        server.initialize().await?;
785        let init_elapsed = init_start.elapsed();
786
787        let overall_elapsed = overall_start.elapsed();
788
789        tracing::info!(
790            server_name = %name,
791            tool_count = server.tools().len(),
792            connect_elapsed_ms = connect_elapsed.as_millis(),
793            init_elapsed_ms = init_elapsed.as_millis(),
794            total_elapsed_ms = overall_elapsed.as_millis(),
795            "Successfully connected and initialized MCP server"
796        );
797
798        // Log tool names for debugging
799        let tool_names: Vec<&str> = server.tools().iter().map(|t| t.name.as_str()).collect();
800        tracing::debug!(
801            server_name = %name,
802            tools = ?tool_names,
803            "MCP server tools available"
804        );
805
806        // Insert server into DashMap (no async needed)
807        self.servers
808            .insert(name, Arc::new(tokio::sync::Mutex::new(server)));
809        Ok(())
810    }
811
812    /// Disconnect from an MCP server
813    pub fn disconnect(&self, name: &str) {
814        self.servers.remove(name);
815    }
816
817    /// Get all connected server names
818    pub fn server_names(&self) -> Vec<String> {
819        self.servers
820            .iter()
821            .map(|entry| entry.key().clone())
822            .collect()
823    }
824
825    /// Get all available tools from all servers
826    ///
827    /// Tool names are prefixed with `mcp__<server>__`
828    pub fn all_tools(&self) -> Vec<ToolSchema> {
829        let mut tools = Vec::new();
830
831        for entry in &self.servers {
832            let server_name = entry.key();
833            let server = entry.value();
834            // Try to lock the mutex (non-blocking)
835            let Ok(server_guard) = server.try_lock() else {
836                tracing::warn!(
837                    server_name = %server_name,
838                    "MCP server is busy, skipping for tool listing"
839                );
840                continue; // Skip this server if lock is not available
841            };
842
843            for tool in server_guard.tools() {
844                tools.push(ToolSchema {
845                    name: format!("mcp__{}_{}", server_name, tool.name),
846                    description: format!("[{}] {}", server_name, tool.description),
847                    input_schema: tool.input_schema.clone(),
848                });
849            }
850        }
851
852        tools
853    }
854
855    /// Call a tool on an external server
856    ///
857    /// Tool name should be prefixed with `mcp__<server>__`
858    #[instrument(
859        name = "mcp_manager_call_tool",
860        skip(self, arguments),
861        fields(
862            full_tool_name = %full_tool_name,
863        )
864    )]
865    pub async fn call_tool(
866        &self,
867        full_tool_name: &str,
868        arguments: serde_json::Value,
869    ) -> Result<ToolResult, ExternalMcpError> {
870        // Parse server name and tool name from `mcp__<server>__<tool>`
871        let parts: Vec<&str> = full_tool_name.splitn(3, "__").collect();
872        if parts.len() != 3 || parts[0] != "mcp" {
873            tracing::warn!(
874                full_tool_name = %full_tool_name,
875                "Invalid external MCP tool name format"
876            );
877            return Err(ExternalMcpError::InvalidToolName(
878                full_tool_name.to_string(),
879            ));
880        }
881
882        let server_name = parts[1];
883        let tool_name = parts[2];
884
885        // Record to current span
886        Span::current().record("server_name", server_name);
887        Span::current().record("tool_name", tool_name);
888
889        tracing::debug!(
890            server_name = %server_name,
891            tool_name = %tool_name,
892            "Routing tool call to external MCP server"
893        );
894
895        // Get the server from DashMap
896        let server_arc = self.servers.get(server_name).ok_or_else(|| {
897            let available: Vec<String> = self.server_names();
898            tracing::error!(
899                server_name = %server_name,
900                tool_name = %tool_name,
901                available_servers = ?available,
902                "External MCP server not found"
903            );
904            ExternalMcpError::ServerNotFound(server_name.to_string())
905        })?;
906
907        // Clone the Arc to hold it across the await
908        let server = server_arc.clone();
909        drop(server_arc); // Release DashMap reference
910
911        let start_time = Instant::now();
912
913        // Lock the server's mutex and call the tool
914        // tokio::sync::Mutex allows holding lock across .await points
915        let result = {
916            let mut server_guard = server.lock().await;
917            server_guard.call_tool(tool_name, arguments).await?
918        };
919
920        let elapsed = start_time.elapsed();
921        tracing::info!(
922            server_name = %server_name,
923            tool_name = %tool_name,
924            elapsed_ms = elapsed.as_millis(),
925            is_error = result.is_error,
926            "External MCP tool call completed"
927        );
928
929        Ok(result)
930    }
931
932    /// Get statistics for all connected servers
933    pub fn all_stats(&self) -> Vec<McpServerStats> {
934        self.servers
935            .iter()
936            .filter_map(|entry| {
937                let server = entry.value();
938                // Try to lock the mutex (non-blocking)
939                if let Ok(guard) = server.try_lock() { Some(guard.stats()) } else {
940                    tracing::warn!(
941                        server_name = %entry.key(),
942                        "MCP server is busy, skipping for stats"
943                    );
944                    None
945                }
946            })
947            .collect()
948    }
949
950    /// Check if a tool name refers to an external MCP tool
951    ///
952    /// External MCP tools have the format `mcp__<server>__<tool>` where
953    /// `<server>` is not "acp" (which is reserved for the ACP prefix).
954    pub fn is_external_tool(name: &str) -> bool {
955        if !name.starts_with("mcp__") {
956            return false;
957        }
958
959        // Split by __ and check structure
960        let parts: Vec<&str> = name.splitn(3, "__").collect();
961        if parts.len() != 3 || parts[0] != "mcp" {
962            return false;
963        }
964
965        // "acp" is reserved for the ACP tool prefix, not external MCP
966        parts[1] != "acp"
967    }
968
969    /// Get the friendly name for an external MCP tool
970    ///
971    /// This maps MCP tool names like `mcp__web-fetch__webReader` to friendly names
972    /// like `WebFetch` that can be used in permission settings.
973    ///
974    /// Only supports official Anthropic Claude Code tools:
975    /// - WebFetch (web-fetch/web-reader MCP server)
976    /// - WebSearch (web-search-prime MCP server)
977    ///
978    /// Returns None if the tool is not an external MCP tool or has no known mapping.
979    pub fn get_friendly_tool_name(name: &str) -> Option<String> {
980        if !Self::is_external_tool(name) {
981            return None;
982        }
983
984        let parts: Vec<&str> = name.splitn(3, "__").collect();
985        let server_name = parts.get(1)?;
986        let tool_name = parts.get(2)?;
987
988        // Map known MCP server/tool combinations to friendly names
989        // Only official Anthropic Claude Code tools are supported
990        match (*server_name, *tool_name) {
991            // Web Fetch MCP server
992            ("web-fetch", "webReader") => Some("WebFetch".to_string()),
993            ("web-reader", "webReader") => Some("WebFetch".to_string()),
994
995            // Web Search Prime MCP server
996            ("web-search-prime", "webSearchPrime") => Some("WebSearch".to_string()),
997
998            // Unknown tool - return None
999            _ => None,
1000        }
1001    }
1002}
1003
1004impl Default for ExternalMcpManager {
1005    fn default() -> Self {
1006        Self::new()
1007    }
1008}
1009
1010/// MCP server statistics
1011#[derive(Debug, Clone)]
1012pub struct McpServerStats {
1013    /// Server name
1014    pub server_name: String,
1015    /// Total requests sent
1016    pub total_requests: u64,
1017    /// Total time spent on requests (ms)
1018    pub total_request_time_ms: u64,
1019    /// Number of tools available
1020    pub tool_count: usize,
1021    /// Whether the server is initialized
1022    pub initialized: bool,
1023    /// Time when server was connected
1024    pub connected_at: Option<Instant>,
1025    /// Time when server was initialized
1026    pub initialized_at: Option<Instant>,
1027}
1028
1029impl McpServerStats {
1030    /// Get average request time in milliseconds
1031    #[allow(clippy::cast_precision_loss)]
1032    pub fn avg_request_time_ms(&self) -> f64 {
1033        if self.total_requests == 0 {
1034            0.0
1035        } else {
1036            self.total_request_time_ms as f64 / self.total_requests as f64
1037        }
1038    }
1039
1040    /// Get uptime since connection
1041    pub fn uptime(&self) -> Option<Duration> {
1042        self.connected_at.map(|t| t.elapsed())
1043    }
1044}
1045
1046/// Errors for external MCP operations
1047#[derive(Debug, thiserror::Error)]
1048pub enum ExternalMcpError {
1049    /// Failed to spawn MCP server process
1050    #[error("Failed to spawn MCP server '{command}': {error}")]
1051    SpawnFailed { command: String, error: String },
1052
1053    /// No stdin available
1054    #[error("No stdin available for MCP server")]
1055    NoStdin,
1056
1057    /// No stdout available
1058    #[error("No stdout available for MCP server")]
1059    NoStdout,
1060
1061    /// Serialization error
1062    #[error("Serialization error: {0}")]
1063    SerializationError(String),
1064
1065    /// Deserialization error
1066    #[error("Deserialization error: {0}")]
1067    DeserializationError(String),
1068
1069    /// Write error
1070    #[error("Write error: {0}")]
1071    WriteError(String),
1072
1073    /// Read error
1074    #[error("Read error: {0}")]
1075    ReadError(String),
1076
1077    /// RPC error from server
1078    #[error("RPC error {code}: {message}")]
1079    RpcError { code: i64, message: String },
1080
1081    /// Server not initialized
1082    #[error("Server not initialized")]
1083    NotInitialized,
1084
1085    /// Invalid tool name format
1086    #[error("Invalid tool name format: {0}")]
1087    InvalidToolName(String),
1088
1089    /// Server not found
1090    #[error("MCP server not found: {0}")]
1091    ServerNotFound(String),
1092
1093    /// Request or operation timed out
1094    #[error("MCP operation '{operation}' timed out after {timeout_ms}ms")]
1095    Timeout { operation: String, timeout_ms: u64 },
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use super::*;
1101
1102    #[test]
1103    fn test_external_mcp_manager_new() {
1104        let _manager = ExternalMcpManager::new();
1105        // Just verify creation works
1106        assert!(ExternalMcpManager::is_external_tool("mcp__server__tool"));
1107        assert!(!ExternalMcpManager::is_external_tool("Read"));
1108        assert!(!ExternalMcpManager::is_external_tool("mcp__acp__Read"));
1109    }
1110
1111    #[test]
1112    fn test_is_external_tool() {
1113        // External MCP tools
1114        assert!(ExternalMcpManager::is_external_tool(
1115            "mcp__myserver__mytool"
1116        ));
1117        assert!(ExternalMcpManager::is_external_tool(
1118            "mcp__filesystem__read_file"
1119        ));
1120
1121        // Not external tools
1122        assert!(!ExternalMcpManager::is_external_tool("Read"));
1123        assert!(!ExternalMcpManager::is_external_tool("Bash"));
1124        assert!(!ExternalMcpManager::is_external_tool("mcp__acp__Read")); // ACP prefix, not external
1125        assert!(!ExternalMcpManager::is_external_tool("mcp__single")); // Not enough parts
1126    }
1127
1128    #[tokio::test]
1129    async fn test_manager_server_names_empty() {
1130        let manager = ExternalMcpManager::new();
1131        let names = manager.server_names();
1132        assert!(names.is_empty());
1133    }
1134
1135    #[tokio::test]
1136    async fn test_manager_all_tools_empty() {
1137        let manager = ExternalMcpManager::new();
1138        let tools = manager.all_tools();
1139        assert!(tools.is_empty());
1140    }
1141
1142    #[test]
1143    fn test_get_friendly_tool_name_web_fetch() {
1144        assert_eq!(
1145            ExternalMcpManager::get_friendly_tool_name("mcp__web-fetch__webReader"),
1146            Some("WebFetch".to_string())
1147        );
1148        assert_eq!(
1149            ExternalMcpManager::get_friendly_tool_name("mcp__web-reader__webReader"),
1150            Some("WebFetch".to_string())
1151        );
1152    }
1153
1154    #[test]
1155    fn test_get_friendly_tool_name_web_search() {
1156        assert_eq!(
1157            ExternalMcpManager::get_friendly_tool_name("mcp__web-search-prime__webSearchPrime"),
1158            Some("WebSearch".to_string())
1159        );
1160    }
1161
1162    #[test]
1163    fn test_get_friendly_tool_name_non_mcp_tool() {
1164        assert_eq!(ExternalMcpManager::get_friendly_tool_name("Read"), None);
1165        assert_eq!(ExternalMcpManager::get_friendly_tool_name("Bash"), None);
1166        assert_eq!(
1167            ExternalMcpManager::get_friendly_tool_name("mcp__acp__Read"),
1168            None
1169        );
1170    }
1171
1172    #[test]
1173    fn test_get_friendly_tool_name_unknown_mcp_tool() {
1174        // Unknown MCP tools should return None (only official tools are supported)
1175        assert_eq!(
1176            ExternalMcpManager::get_friendly_tool_name("mcp__zai-mcp-server__ui_to_artifact"),
1177            None
1178        );
1179        assert_eq!(
1180            ExternalMcpManager::get_friendly_tool_name("mcp__context7__query-docs"),
1181            None
1182        );
1183        assert_eq!(
1184            ExternalMcpManager::get_friendly_tool_name("mcp__my-server__my_custom_tool"),
1185            None
1186        );
1187    }
1188}