vtcode_core/tools/registry/
mcp_facade.rs1use rustc_hash::{FxHashMap, FxHashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Result, anyhow};
8use serde_json::Value;
9use tracing::{debug, warn};
10use vtcode_commons::{ErrorCategory, classify_anyhow_error};
11
12use crate::mcp::{McpClient, McpToolExecutor, McpToolInfo};
13use crate::tools::mcp::build_mcp_registration;
14
15use super::ToolRegistry;
16use super::mcp_helpers::normalize_mcp_tool_identifier;
17
18impl ToolRegistry {
19 pub async fn with_mcp_client(self, mcp_client: Arc<McpClient>) -> Self {
21 if let Ok(mut guard) = self.mcp_client.write() {
22 *guard = Some(mcp_client);
23 }
24 self.mcp_tool_index.write().await.clear();
25 self.mcp_reverse_index.write().await.clear();
26 if let Ok(mut cache) = self.cached_available_tools.write() {
27 *cache = None;
28 }
29 self.initialized
30 .store(false, std::sync::atomic::Ordering::Relaxed);
31 self
32 }
33
34 pub async fn set_mcp_client(&self, mcp_client: Arc<McpClient>) {
36 if let Ok(mut guard) = self.mcp_client.write() {
37 *guard = Some(mcp_client);
38 }
39 self.mcp_tool_index.write().await.clear();
40 self.mcp_reverse_index.write().await.clear();
41 if let Ok(mut cache) = self.cached_available_tools.write() {
42 *cache = None;
43 }
44 self.initialized
45 .store(false, std::sync::atomic::Ordering::Relaxed);
46 }
47
48 pub async fn clear_mcp_client(&self) {
50 if let Ok(mut guard) = self.mcp_client.write() {
51 *guard = None;
52 }
53 self.mcp_tool_index.write().await.clear();
54 self.mcp_reverse_index.write().await.clear();
55 if let Ok(mut cache) = self.cached_available_tools.write() {
56 *cache = None;
57 }
58 self.initialized
59 .store(false, std::sync::atomic::Ordering::Relaxed);
60 }
61
62 pub fn mcp_client(&self) -> Option<Arc<McpClient>> {
64 self.mcp_client.read().ok().and_then(|g| g.clone())
65 }
66
67 pub async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
69 let index = self.mcp_tool_index.read().await;
70 if index.is_empty() {
71 return Ok(Vec::new());
72 }
73
74 let mut mcp_tools = Vec::new();
75 for (provider, tools) in index.iter() {
76 for tool_name in tools {
77 let canonical_name = format!("mcp::{}::{}", provider, tool_name);
78 if let Some(registration) = self.inventory.get_registration(&canonical_name) {
79 mcp_tools.push(McpToolInfo {
80 name: tool_name.clone(),
81 description: registration
82 .metadata()
83 .description()
84 .unwrap_or("")
85 .to_string(),
86 provider: provider.clone(),
87 input_schema: registration
88 .parameter_schema()
89 .cloned()
90 .unwrap_or(Value::Null),
91 });
92 }
93 }
94 }
95
96 Ok(mcp_tools)
97 }
98
99 pub async fn has_mcp_tool(&self, tool_name: &str) -> bool {
101 self.mcp_reverse_index.read().await.contains_key(tool_name)
102 }
103
104 pub async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
106 let client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
107 if let Some(mcp_client) = client_opt {
108 mcp_client.execute_mcp_tool(tool_name, &args).await
109 } else {
110 Err(anyhow!("MCP client not available"))
111 }
112 }
113
114 pub(super) async fn resolve_mcp_tool_alias(&self, tool_name: &str) -> Option<String> {
115 let normalized = normalize_mcp_tool_identifier(tool_name);
116 if normalized.is_empty() {
117 return None;
118 }
119
120 let index = self.mcp_tool_index.read().await;
121 for tools in index.values() {
122 for tool in tools {
123 if normalize_mcp_tool_identifier(tool) == normalized {
124 return Some(tool.clone());
125 }
126 }
127 }
128
129 None
130 }
131
132 pub async fn refresh_mcp_tools(&self) -> Result<()> {
134 let mcp_client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
135 if let Some(mcp_client) = mcp_client_opt {
136 debug!(
137 "Refreshing MCP tools for {} providers",
138 mcp_client.get_status().provider_count
139 );
140
141 let mut tools: Option<Vec<McpToolInfo>> = None;
142 let mut last_err: Option<anyhow::Error> = None;
143 for attempt in 0..3 {
144 match mcp_client.list_mcp_tools().await {
145 Ok(list) => {
146 tools = Some(list);
147 break;
148 }
149 Err(err) => {
150 last_err = Some(err);
151 let jitter = (attempt * 37) % 80;
152 let pow = 2_u64.saturating_pow(attempt.min(4) as u32); let backoff =
154 Duration::from_millis(200 * pow + jitter).min(Duration::from_secs(3));
155 warn!(
156 attempt = attempt + 1,
157 delay_ms = %backoff.as_millis(),
158 "Failed to list MCP tools, retrying with backoff"
159 );
160 tokio::time::sleep(backoff).await;
161 }
162 }
163 }
164
165 let tools = match tools {
166 Some(list) => list,
167 None => {
168 let error_for_log = last_err
169 .as_ref()
170 .map(|error| error.to_string())
171 .unwrap_or_else(|| "unknown MCP error".to_string());
172 warn!(
173 error = %error_for_log,
174 "Failed to refresh MCP tools after retries; keeping existing cache"
175 );
176 let category = last_err
177 .as_ref()
178 .map(classify_anyhow_error)
179 .unwrap_or(ErrorCategory::ExecutionError);
180 self.mcp_circuit_breaker.record_failure_category(category);
181 return Ok(());
182 }
183 };
184 let existing_tools: Vec<String> = {
185 let index = self.mcp_tool_index.read().await;
186 index
187 .iter()
188 .flat_map(|(provider, names)| {
189 names
190 .iter()
191 .map(move |name| format!("mcp::{}::{}", provider, name))
192 })
193 .collect()
194 };
195 for canonical_name in existing_tools {
196 if let Err(err) = self.inventory.remove_tool(&canonical_name) {
197 warn!(
198 tool = %canonical_name,
199 error = %err,
200 "failed to remove stale MCP proxy tool"
201 );
202 }
203 }
204
205 let mut provider_map: FxHashMap<String, Vec<String>> = FxHashMap::default();
206 let mut seen_tools = FxHashSet::default();
207
208 for tool in &tools {
209 let canonical_name = format!("mcp::{}::{}", tool.provider, tool.name);
210 if seen_tools.insert(canonical_name.clone()) {
211 let registration =
212 build_mcp_registration(Arc::clone(&mcp_client), &tool.provider, tool, None);
213
214 if let Err(err) = self.inventory.register_tool(registration) {
215 warn!(
216 tool = %tool.name,
217 provider = %tool.provider,
218 error = %err,
219 "failed to register MCP proxy tool"
220 );
221 }
222 }
223 }
224
225 for tool in tools {
226 provider_map
227 .entry(tool.provider.clone())
228 .or_default()
229 .push(tool.name.clone());
230 }
231
232 for tools in provider_map.values_mut() {
233 tools.sort();
234 tools.dedup();
235 }
236
237 *self.mcp_tool_index.write().await = provider_map;
238 {
239 let mut reverse_index = self.mcp_reverse_index.write().await;
240 reverse_index.clear();
241 let index = self.mcp_tool_index.read().await;
242 for (provider, tools) in index.iter() {
243 for tool in tools {
244 reverse_index.insert(tool.clone(), provider.clone());
245 }
246 }
247 }
248
249 let mcp_index = self.mcp_tool_index.read().await;
250 let std_index: hashbrown::HashMap<String, Vec<String>> = mcp_index
252 .iter()
253 .map(|(k, v)| (k.clone(), v.clone()))
254 .collect();
255 if let Some(allowlist) = self
256 .policy_gateway
257 .lock()
258 .await
259 .update_mcp_tools(&std_index)
260 .await?
261 {
262 mcp_client.update_allowlist(allowlist);
263 }
264
265 if let Ok(mut cache) = self.cached_available_tools.write() {
266 *cache = None;
267 }
268 self.rebuild_tool_assembly().await;
269 self.tool_catalog_state
270 .note_explicit_refresh("mcp_tool_refresh");
271 self.sync_policy_catalog().await;
272 self.mcp_circuit_breaker.record_success();
274 Ok(())
275 } else {
276 debug!("No MCP client configured, nothing to refresh");
277 Ok(())
278 }
279 }
280}