Skip to main content

vtcode_core/tools/registry/
mcp_facade.rs

1//! MCP client integration for ToolRegistry.
2
3use rustc_hash::{FxHashMap, FxHashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Result, anyhow};
8use serde_json::Value;
9use tracing::{debug, warn};
10use vtcode_commons::{ErrorCategory, classify_anyhow_error};
11
12use crate::mcp::{McpClient, McpToolExecutor, McpToolInfo};
13use crate::tools::mcp::build_mcp_registration;
14
15use super::ToolRegistry;
16use super::mcp_helpers::normalize_mcp_tool_identifier;
17
18impl ToolRegistry {
19    /// Set the MCP client for this registry.
20    pub async fn with_mcp_client(self, mcp_client: Arc<McpClient>) -> Self {
21        if let Ok(mut guard) = self.mcp_client.write() {
22            *guard = Some(mcp_client);
23        }
24        self.mcp_tool_index.write().await.clear();
25        self.mcp_reverse_index.write().await.clear();
26        if let Ok(mut cache) = self.cached_available_tools.write() {
27            *cache = None;
28        }
29        self.initialized
30            .store(false, std::sync::atomic::Ordering::Relaxed);
31        self
32    }
33
34    /// Attach an MCP client without consuming the registry.
35    pub async fn set_mcp_client(&self, mcp_client: Arc<McpClient>) {
36        if let Ok(mut guard) = self.mcp_client.write() {
37            *guard = Some(mcp_client);
38        }
39        self.mcp_tool_index.write().await.clear();
40        self.mcp_reverse_index.write().await.clear();
41        if let Ok(mut cache) = self.cached_available_tools.write() {
42            *cache = None;
43        }
44        self.initialized
45            .store(false, std::sync::atomic::Ordering::Relaxed);
46    }
47
48    /// Get the MCP client if available.
49    pub fn mcp_client(&self) -> Option<Arc<McpClient>> {
50        self.mcp_client.read().ok().and_then(|g| g.clone())
51    }
52
53    /// List all MCP tools.
54    pub async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
55        let index = self.mcp_tool_index.read().await;
56        if index.is_empty() {
57            return Ok(Vec::new());
58        }
59
60        let mut mcp_tools = Vec::new();
61        for (provider, tools) in index.iter() {
62            for tool_name in tools {
63                let canonical_name = format!("mcp::{}::{}", provider, tool_name);
64                if let Some(registration) = self.inventory.get_registration(&canonical_name) {
65                    mcp_tools.push(McpToolInfo {
66                        name: tool_name.clone(),
67                        description: registration
68                            .metadata()
69                            .description()
70                            .unwrap_or("")
71                            .to_string(),
72                        provider: provider.clone(),
73                        input_schema: registration
74                            .parameter_schema()
75                            .cloned()
76                            .unwrap_or(Value::Null),
77                    });
78                }
79            }
80        }
81
82        Ok(mcp_tools)
83    }
84
85    /// Check if an MCP tool exists.
86    pub async fn has_mcp_tool(&self, tool_name: &str) -> bool {
87        self.mcp_reverse_index.read().await.contains_key(tool_name)
88    }
89
90    /// Execute an MCP tool.
91    pub async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
92        let client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
93        if let Some(mcp_client) = client_opt {
94            mcp_client.execute_mcp_tool(tool_name, &args).await
95        } else {
96            Err(anyhow!("MCP client not available"))
97        }
98    }
99
100    pub(super) async fn resolve_mcp_tool_alias(&self, tool_name: &str) -> Option<String> {
101        let normalized = normalize_mcp_tool_identifier(tool_name);
102        if normalized.is_empty() {
103            return None;
104        }
105
106        let index = self.mcp_tool_index.read().await;
107        for tools in index.values() {
108            for tool in tools {
109                if normalize_mcp_tool_identifier(tool) == normalized {
110                    return Some(tool.clone());
111                }
112            }
113        }
114
115        None
116    }
117
118    /// Refresh MCP tools (reconnect to providers and update tool lists).
119    pub async fn refresh_mcp_tools(&self) -> Result<()> {
120        let mcp_client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
121        if let Some(mcp_client) = mcp_client_opt {
122            debug!(
123                "Refreshing MCP tools for {} providers",
124                mcp_client.get_status().provider_count
125            );
126
127            let mut tools: Option<Vec<McpToolInfo>> = None;
128            let mut last_err: Option<anyhow::Error> = None;
129            for attempt in 0..3 {
130                match mcp_client.list_mcp_tools().await {
131                    Ok(list) => {
132                        tools = Some(list);
133                        break;
134                    }
135                    Err(err) => {
136                        last_err = Some(err);
137                        let jitter = (attempt * 37) % 80;
138                        let pow = 2_u64.saturating_pow(attempt.min(4) as u32); // cap exponent
139                        let backoff =
140                            Duration::from_millis(200 * pow + jitter).min(Duration::from_secs(3));
141                        warn!(
142                            attempt = attempt + 1,
143                            delay_ms = %backoff.as_millis(),
144                            "Failed to list MCP tools, retrying with backoff"
145                        );
146                        tokio::time::sleep(backoff).await;
147                    }
148                }
149            }
150
151            let tools = match tools {
152                Some(list) => list,
153                None => {
154                    let error_for_log = last_err
155                        .as_ref()
156                        .map(|error| error.to_string())
157                        .unwrap_or_else(|| "unknown MCP error".to_string());
158                    warn!(
159                        error = %error_for_log,
160                        "Failed to refresh MCP tools after retries; keeping existing cache"
161                    );
162                    let category = last_err
163                        .as_ref()
164                        .map(classify_anyhow_error)
165                        .unwrap_or(ErrorCategory::ExecutionError);
166                    self.mcp_circuit_breaker.record_failure_category(category);
167                    return Ok(());
168                }
169            };
170            let existing_tools: Vec<String> = {
171                let index = self.mcp_tool_index.read().await;
172                index
173                    .iter()
174                    .flat_map(|(provider, names)| {
175                        names
176                            .iter()
177                            .map(move |name| format!("mcp::{}::{}", provider, name))
178                    })
179                    .collect()
180            };
181            for canonical_name in existing_tools {
182                if let Err(err) = self.inventory.remove_tool(&canonical_name) {
183                    warn!(
184                        tool = %canonical_name,
185                        error = %err,
186                        "failed to remove stale MCP proxy tool"
187                    );
188                }
189            }
190
191            let mut provider_map: FxHashMap<String, Vec<String>> = FxHashMap::default();
192            let mut seen_tools = FxHashSet::default();
193
194            for tool in &tools {
195                let canonical_name = format!("mcp::{}::{}", tool.provider, tool.name);
196                if seen_tools.insert(canonical_name.clone()) {
197                    let registration =
198                        build_mcp_registration(Arc::clone(&mcp_client), &tool.provider, tool, None);
199
200                    if let Err(err) = self.inventory.register_tool(registration) {
201                        warn!(
202                            tool = %tool.name,
203                            provider = %tool.provider,
204                            error = %err,
205                            "failed to register MCP proxy tool"
206                        );
207                    }
208                }
209            }
210
211            for tool in tools {
212                provider_map
213                    .entry(tool.provider.clone())
214                    .or_default()
215                    .push(tool.name.clone());
216            }
217
218            for tools in provider_map.values_mut() {
219                tools.sort();
220                tools.dedup();
221            }
222
223            *self.mcp_tool_index.write().await = provider_map;
224            {
225                let mut reverse_index = self.mcp_reverse_index.write().await;
226                reverse_index.clear();
227                let index = self.mcp_tool_index.read().await;
228                for (provider, tools) in index.iter() {
229                    for tool in tools {
230                        reverse_index.insert(tool.clone(), provider.clone());
231                    }
232                }
233            }
234
235            let mcp_index = self.mcp_tool_index.read().await;
236            // Convert FxHashMap to std HashMap for policy manager API compatibility
237            let std_index: hashbrown::HashMap<String, Vec<String>> = mcp_index
238                .iter()
239                .map(|(k, v)| (k.clone(), v.clone()))
240                .collect();
241            if let Some(allowlist) = self
242                .policy_gateway
243                .lock()
244                .await
245                .update_mcp_tools(&std_index)
246                .await?
247            {
248                mcp_client.update_allowlist(allowlist);
249            }
250
251            if let Ok(mut cache) = self.cached_available_tools.write() {
252                *cache = None;
253            }
254            self.rebuild_tool_assembly().await;
255            self.tool_catalog_state
256                .note_explicit_refresh("mcp_tool_refresh");
257            self.sync_policy_catalog().await;
258            // MP-3: Record success in circuit breaker
259            self.mcp_circuit_breaker.record_success();
260            Ok(())
261        } else {
262            debug!("No MCP client configured, nothing to refresh");
263            Ok(())
264        }
265    }
266}