Skip to main content

vtcode_core/tools/registry/
mcp_facade.rs

1//! MCP client integration for ToolRegistry.
2
3use rustc_hash::{FxHashMap, FxHashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Result, anyhow};
8use serde_json::Value;
9use tracing::{debug, warn};
10use vtcode_commons::{ErrorCategory, classify_anyhow_error};
11
12use crate::mcp::{McpClient, McpToolExecutor, McpToolInfo};
13use crate::tools::mcp::build_mcp_registration;
14
15use super::ToolRegistry;
16use super::mcp_helpers::normalize_mcp_tool_identifier;
17
18impl ToolRegistry {
19    /// Set the MCP client for this registry.
20    pub async fn with_mcp_client(self, mcp_client: Arc<McpClient>) -> Self {
21        if let Ok(mut guard) = self.mcp_client.write() {
22            *guard = Some(mcp_client);
23        }
24        self.mcp_tool_index.write().await.clear();
25        self.mcp_reverse_index.write().await.clear();
26        if let Ok(mut cache) = self.cached_available_tools.write() {
27            *cache = None;
28        }
29        self.initialized
30            .store(false, std::sync::atomic::Ordering::Relaxed);
31        self
32    }
33
34    /// Attach an MCP client without consuming the registry.
35    pub async fn set_mcp_client(&self, mcp_client: Arc<McpClient>) {
36        if let Ok(mut guard) = self.mcp_client.write() {
37            *guard = Some(mcp_client);
38        }
39        self.mcp_tool_index.write().await.clear();
40        self.mcp_reverse_index.write().await.clear();
41        if let Ok(mut cache) = self.cached_available_tools.write() {
42            *cache = None;
43        }
44        self.initialized
45            .store(false, std::sync::atomic::Ordering::Relaxed);
46    }
47
48    /// Detach the current MCP client and clear MCP tool indexes.
49    pub async fn clear_mcp_client(&self) {
50        if let Ok(mut guard) = self.mcp_client.write() {
51            *guard = None;
52        }
53        self.mcp_tool_index.write().await.clear();
54        self.mcp_reverse_index.write().await.clear();
55        if let Ok(mut cache) = self.cached_available_tools.write() {
56            *cache = None;
57        }
58        self.initialized
59            .store(false, std::sync::atomic::Ordering::Relaxed);
60    }
61
62    /// Get the MCP client if available.
63    pub fn mcp_client(&self) -> Option<Arc<McpClient>> {
64        self.mcp_client.read().ok().and_then(|g| g.clone())
65    }
66
67    /// List all MCP tools.
68    pub async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
69        let index = self.mcp_tool_index.read().await;
70        if index.is_empty() {
71            return Ok(Vec::new());
72        }
73
74        let mut mcp_tools = Vec::new();
75        for (provider, tools) in index.iter() {
76            for tool_name in tools {
77                let canonical_name = format!("mcp::{}::{}", provider, tool_name);
78                if let Some(registration) = self.inventory.get_registration(&canonical_name) {
79                    mcp_tools.push(McpToolInfo {
80                        name: tool_name.clone(),
81                        description: registration
82                            .metadata()
83                            .description()
84                            .unwrap_or("")
85                            .to_string(),
86                        provider: provider.clone(),
87                        input_schema: registration
88                            .parameter_schema()
89                            .cloned()
90                            .unwrap_or(Value::Null),
91                    });
92                }
93            }
94        }
95
96        Ok(mcp_tools)
97    }
98
99    /// Check if an MCP tool exists.
100    pub async fn has_mcp_tool(&self, tool_name: &str) -> bool {
101        self.mcp_reverse_index.read().await.contains_key(tool_name)
102    }
103
104    /// Execute an MCP tool.
105    pub async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
106        let client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
107        if let Some(mcp_client) = client_opt {
108            mcp_client.execute_mcp_tool(tool_name, &args).await
109        } else {
110            Err(anyhow!("MCP client not available"))
111        }
112    }
113
114    pub(super) async fn resolve_mcp_tool_alias(&self, tool_name: &str) -> Option<String> {
115        let normalized = normalize_mcp_tool_identifier(tool_name);
116        if normalized.is_empty() {
117            return None;
118        }
119
120        let index = self.mcp_tool_index.read().await;
121        for tools in index.values() {
122            for tool in tools {
123                if normalize_mcp_tool_identifier(tool) == normalized {
124                    return Some(tool.clone());
125                }
126            }
127        }
128
129        None
130    }
131
132    /// Refresh MCP tools (reconnect to providers and update tool lists).
133    pub async fn refresh_mcp_tools(&self) -> Result<()> {
134        let mcp_client_opt = { self.mcp_client.read().ok().and_then(|g| g.clone()) };
135        if let Some(mcp_client) = mcp_client_opt {
136            debug!(
137                "Refreshing MCP tools for {} providers",
138                mcp_client.get_status().provider_count
139            );
140
141            let mut tools: Option<Vec<McpToolInfo>> = None;
142            let mut last_err: Option<anyhow::Error> = None;
143            for attempt in 0..3 {
144                match mcp_client.list_mcp_tools().await {
145                    Ok(list) => {
146                        tools = Some(list);
147                        break;
148                    }
149                    Err(err) => {
150                        last_err = Some(err);
151                        let jitter = (attempt * 37) % 80;
152                        let pow = 2_u64.saturating_pow(attempt.min(4) as u32); // cap exponent
153                        let backoff =
154                            Duration::from_millis(200 * pow + jitter).min(Duration::from_secs(3));
155                        warn!(
156                            attempt = attempt + 1,
157                            delay_ms = %backoff.as_millis(),
158                            "Failed to list MCP tools, retrying with backoff"
159                        );
160                        tokio::time::sleep(backoff).await;
161                    }
162                }
163            }
164
165            let tools = match tools {
166                Some(list) => list,
167                None => {
168                    let error_for_log = last_err
169                        .as_ref()
170                        .map(|error| error.to_string())
171                        .unwrap_or_else(|| "unknown MCP error".to_string());
172                    warn!(
173                        error = %error_for_log,
174                        "Failed to refresh MCP tools after retries; keeping existing cache"
175                    );
176                    let category = last_err
177                        .as_ref()
178                        .map(classify_anyhow_error)
179                        .unwrap_or(ErrorCategory::ExecutionError);
180                    self.mcp_circuit_breaker.record_failure_category(category);
181                    return Ok(());
182                }
183            };
184            let existing_tools: Vec<String> = {
185                let index = self.mcp_tool_index.read().await;
186                index
187                    .iter()
188                    .flat_map(|(provider, names)| {
189                        names
190                            .iter()
191                            .map(move |name| format!("mcp::{}::{}", provider, name))
192                    })
193                    .collect()
194            };
195            for canonical_name in existing_tools {
196                if let Err(err) = self.inventory.remove_tool(&canonical_name) {
197                    warn!(
198                        tool = %canonical_name,
199                        error = %err,
200                        "failed to remove stale MCP proxy tool"
201                    );
202                }
203            }
204
205            let mut provider_map: FxHashMap<String, Vec<String>> = FxHashMap::default();
206            let mut seen_tools = FxHashSet::default();
207
208            for tool in &tools {
209                let canonical_name = format!("mcp::{}::{}", tool.provider, tool.name);
210                if seen_tools.insert(canonical_name.clone()) {
211                    let registration =
212                        build_mcp_registration(Arc::clone(&mcp_client), &tool.provider, tool, None);
213
214                    if let Err(err) = self.inventory.register_tool(registration) {
215                        warn!(
216                            tool = %tool.name,
217                            provider = %tool.provider,
218                            error = %err,
219                            "failed to register MCP proxy tool"
220                        );
221                    }
222                }
223            }
224
225            for tool in tools {
226                provider_map
227                    .entry(tool.provider.clone())
228                    .or_default()
229                    .push(tool.name.clone());
230            }
231
232            for tools in provider_map.values_mut() {
233                tools.sort();
234                tools.dedup();
235            }
236
237            *self.mcp_tool_index.write().await = provider_map;
238            {
239                let mut reverse_index = self.mcp_reverse_index.write().await;
240                reverse_index.clear();
241                let index = self.mcp_tool_index.read().await;
242                for (provider, tools) in index.iter() {
243                    for tool in tools {
244                        reverse_index.insert(tool.clone(), provider.clone());
245                    }
246                }
247            }
248
249            let mcp_index = self.mcp_tool_index.read().await;
250            // Convert FxHashMap to std HashMap for policy manager API compatibility
251            let std_index: hashbrown::HashMap<String, Vec<String>> = mcp_index
252                .iter()
253                .map(|(k, v)| (k.clone(), v.clone()))
254                .collect();
255            if let Some(allowlist) = self
256                .policy_gateway
257                .lock()
258                .await
259                .update_mcp_tools(&std_index)
260                .await?
261            {
262                mcp_client.update_allowlist(allowlist);
263            }
264
265            if let Ok(mut cache) = self.cached_available_tools.write() {
266                *cache = None;
267            }
268            self.rebuild_tool_assembly().await;
269            self.tool_catalog_state
270                .note_explicit_refresh("mcp_tool_refresh");
271            self.sync_policy_catalog().await;
272            // MP-3: Record success in circuit breaker
273            self.mcp_circuit_breaker.record_success();
274            Ok(())
275        } else {
276            debug!("No MCP client configured, nothing to refresh");
277            Ok(())
278        }
279    }
280}