vtcode_core/tools/registry/
mcp_facade.rs1use rustc_hash::{FxHashMap, FxHashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Result, anyhow};
8use serde_json::Value;
9use tracing::{debug, warn};
10use vtcode_commons::{ErrorCategory, classify_anyhow_error};
11
12use crate::mcp::{McpClient, McpToolExecutor, McpToolInfo};
13use crate::tools::mcp::build_mcp_registration;
14
15use super::ToolRegistry;
16use super::mcp_helpers::normalize_mcp_tool_identifier;
17
18impl ToolRegistry {
19 pub async fn with_mcp_client(self, mcp_client: Arc<McpClient>) -> Self {
21 if let Ok(mut guard) = self.mcp_client.write() {
22 *guard = Some(mcp_client);
23 }
24 self.mcp_tool_index.write().await.clear();
25 self.mcp_reverse_index.write().await.clear();
26 if let Ok(mut cache) = self.cached_available_tools.write() {
27 *cache = None;
28 }
29 self.initialized
30 .store(false, std::sync::atomic::Ordering::Relaxed);
31 self
32 }
33
34 pub async fn set_mcp_client(&self, mcp_client: Arc<McpClient>) {
36 if let Ok(mut guard) = self.mcp_client.write() {
37 *guard = Some(mcp_client);
38 }
39 self.mcp_tool_index.write().await.clear();
40 self.mcp_reverse_index.write().await.clear();
41 if let Ok(mut cache) = self.cached_available_tools.write() {
42 *cache = None;
43 }
44 self.initialized
45 .store(false, std::sync::atomic::Ordering::Relaxed);
46 }
47
48 pub fn mcp_client(&self) -> Option<Arc<McpClient>> {
50 self.mcp_client.read().ok().and_then(|g| g.clone())
51 }
52
53 pub async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
55 let index = self.mcp_tool_index.read().await;
56 if index.is_empty() {
57 return Ok(Vec::new());
58 }
59
60 let mut mcp_tools = Vec::new();
61 for (provider, tools) in index.iter() {
62 for tool_name in tools {
63 let canonical_name = format!("mcp::{}::{}", provider, tool_name);
64 if let Some(registration) = self.inventory.get_registration(&canonical_name) {
65 mcp_tools.push(McpToolInfo {
66 name: tool_name.clone(),
67 description: registration
68 .metadata()
69 .description()
70 .unwrap_or("")
71 .to_string(),
72 provider: provider.clone(),
73 input_schema: registration
74 .parameter_schema()
75 .cloned()
76 .unwrap_or(Value::Null),
77 });
78 }
79 }
80 }
81
82 Ok(mcp_tools)
83 }
84
85 pub async fn has_mcp_tool(&self, tool_name: &str) -> bool {
87 self.mcp_reverse_index.read().await.contains_key(tool_name)
88 }
89
90 pub async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
92 let client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
93 if let Some(mcp_client) = client_opt {
94 mcp_client.execute_mcp_tool(tool_name, &args).await
95 } else {
96 Err(anyhow!("MCP client not available"))
97 }
98 }
99
100 pub(super) async fn resolve_mcp_tool_alias(&self, tool_name: &str) -> Option<String> {
101 let normalized = normalize_mcp_tool_identifier(tool_name);
102 if normalized.is_empty() {
103 return None;
104 }
105
106 let index = self.mcp_tool_index.read().await;
107 for tools in index.values() {
108 for tool in tools {
109 if normalize_mcp_tool_identifier(tool) == normalized {
110 return Some(tool.clone());
111 }
112 }
113 }
114
115 None
116 }
117
118 pub async fn refresh_mcp_tools(&self) -> Result<()> {
120 let mcp_client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
121 if let Some(mcp_client) = mcp_client_opt {
122 debug!(
123 "Refreshing MCP tools for {} providers",
124 mcp_client.get_status().provider_count
125 );
126
127 let mut tools: Option<Vec<McpToolInfo>> = None;
128 let mut last_err: Option<anyhow::Error> = None;
129 for attempt in 0..3 {
130 match mcp_client.list_mcp_tools().await {
131 Ok(list) => {
132 tools = Some(list);
133 break;
134 }
135 Err(err) => {
136 last_err = Some(err);
137 let jitter = (attempt * 37) % 80;
138 let pow = 2_u64.saturating_pow(attempt.min(4) as u32); let backoff =
140 Duration::from_millis(200 * pow + jitter).min(Duration::from_secs(3));
141 warn!(
142 attempt = attempt + 1,
143 delay_ms = %backoff.as_millis(),
144 "Failed to list MCP tools, retrying with backoff"
145 );
146 tokio::time::sleep(backoff).await;
147 }
148 }
149 }
150
151 let tools = match tools {
152 Some(list) => list,
153 None => {
154 let error_for_log = last_err
155 .as_ref()
156 .map(|error| error.to_string())
157 .unwrap_or_else(|| "unknown MCP error".to_string());
158 warn!(
159 error = %error_for_log,
160 "Failed to refresh MCP tools after retries; keeping existing cache"
161 );
162 let category = last_err
163 .as_ref()
164 .map(classify_anyhow_error)
165 .unwrap_or(ErrorCategory::ExecutionError);
166 self.mcp_circuit_breaker.record_failure_category(category);
167 return Ok(());
168 }
169 };
170 let existing_tools: Vec<String> = {
171 let index = self.mcp_tool_index.read().await;
172 index
173 .iter()
174 .flat_map(|(provider, names)| {
175 names
176 .iter()
177 .map(move |name| format!("mcp::{}::{}", provider, name))
178 })
179 .collect()
180 };
181 for canonical_name in existing_tools {
182 if let Err(err) = self.inventory.remove_tool(&canonical_name) {
183 warn!(
184 tool = %canonical_name,
185 error = %err,
186 "failed to remove stale MCP proxy tool"
187 );
188 }
189 }
190
191 let mut provider_map: FxHashMap<String, Vec<String>> = FxHashMap::default();
192 let mut seen_tools = FxHashSet::default();
193
194 for tool in &tools {
195 let canonical_name = format!("mcp::{}::{}", tool.provider, tool.name);
196 if seen_tools.insert(canonical_name.clone()) {
197 let registration =
198 build_mcp_registration(Arc::clone(&mcp_client), &tool.provider, tool, None);
199
200 if let Err(err) = self.inventory.register_tool(registration) {
201 warn!(
202 tool = %tool.name,
203 provider = %tool.provider,
204 error = %err,
205 "failed to register MCP proxy tool"
206 );
207 }
208 }
209 }
210
211 for tool in tools {
212 provider_map
213 .entry(tool.provider.clone())
214 .or_default()
215 .push(tool.name.clone());
216 }
217
218 for tools in provider_map.values_mut() {
219 tools.sort();
220 tools.dedup();
221 }
222
223 *self.mcp_tool_index.write().await = provider_map;
224 {
225 let mut reverse_index = self.mcp_reverse_index.write().await;
226 reverse_index.clear();
227 let index = self.mcp_tool_index.read().await;
228 for (provider, tools) in index.iter() {
229 for tool in tools {
230 reverse_index.insert(tool.clone(), provider.clone());
231 }
232 }
233 }
234
235 let mcp_index = self.mcp_tool_index.read().await;
236 let std_index: hashbrown::HashMap<String, Vec<String>> = mcp_index
238 .iter()
239 .map(|(k, v)| (k.clone(), v.clone()))
240 .collect();
241 if let Some(allowlist) = self
242 .policy_gateway
243 .lock()
244 .await
245 .update_mcp_tools(&std_index)
246 .await?
247 {
248 mcp_client.update_allowlist(allowlist);
249 }
250
251 if let Ok(mut cache) = self.cached_available_tools.write() {
252 *cache = None;
253 }
254 self.rebuild_tool_assembly().await;
255 self.tool_catalog_state
256 .note_explicit_refresh("mcp_tool_refresh");
257 self.sync_policy_catalog().await;
258 self.mcp_circuit_breaker.record_success();
260 Ok(())
261 } else {
262 debug!("No MCP client configured, nothing to refresh");
263 Ok(())
264 }
265 }
266}