use crate::utils::error_messages::ERR_CREATE_POLICY_DIR;
use anyhow::{Context, Result};
use dialoguer::console::style;
use hashbrown::{HashMap, HashSet};
use indexmap::{IndexMap, IndexSet};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use crate::config::constants::tools;
use crate::config::core::tools::{ToolPolicy as ConfigToolPolicy, ToolsConfig};
use crate::config::loader::{ConfigManager, VTCodeConfig};
use crate::config::mcp::{McpAllowListConfig, McpAllowListRules};
use crate::tools::mcp::parse_canonical_mcp_tool_name;
use crate::tools::names::canonical_tool_name;
use crate::utils::file_utils::{
ensure_dir_exists, read_file_with_context, write_file_with_context,
};
const AUTO_ALLOW_TOOLS: &[&str] = &[
tools::UNIFIED_SEARCH,
tools::READ_FILE,
"cargo_check",
"cargo_test",
"git_status",
"git_diff",
"git_log",
];
const SHELL_APPROVAL_SCOPE_MARKER: &str = "|sandbox_permissions=";
const DEFAULT_APPROVAL_SCOPE_SIGNATURE: &str =
"sandbox_permissions=\"use_default\"|additional_permissions=null";
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ToolPolicy {
Allow,
#[default]
Prompt,
Deny,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ToolExecutionDecision {
Allowed,
Denied,
DeniedWithFeedback(String),
}
impl ToolExecutionDecision {
pub fn is_allowed(&self) -> bool {
matches!(self, Self::Allowed)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolPolicyConfig {
pub version: u32,
pub available_tools: Vec<String>,
pub policies: IndexMap<String, ToolPolicy>,
#[serde(default)]
pub constraints: IndexMap<String, ToolConstraints>,
#[serde(default)]
pub mcp: McpPolicyStore,
#[serde(default)]
pub approval_cache: ApprovalCacheConfig,
}
impl Default for ToolPolicyConfig {
fn default() -> Self {
Self {
version: 1,
available_tools: Vec::new(),
policies: IndexMap::new(),
constraints: IndexMap::new(),
mcp: McpPolicyStore::default(),
approval_cache: ApprovalCacheConfig::default(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ApprovalCacheConfig {
#[serde(default)]
pub allowed: IndexSet<String>,
#[serde(default)]
pub prefixes: IndexSet<String>,
#[serde(default)]
pub regexes: IndexSet<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpPolicyStore {
#[serde(default = "default_secure_mcp_allowlist")]
pub allowlist: McpAllowListConfig,
#[serde(default)]
pub providers: IndexMap<String, McpProviderPolicy>,
}
impl Default for McpPolicyStore {
fn default() -> Self {
Self {
allowlist: default_secure_mcp_allowlist(),
providers: IndexMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct McpProviderPolicy {
#[serde(default)]
pub tools: IndexMap<String, ToolPolicy>,
}
const MCP_LOGGING_EVENTS: &[&str] = &[
"mcp.tool_execution",
"mcp.tool_failed",
"mcp.tool_denied",
"mcp.tool_filtered",
"mcp.provider_initialized",
];
const MCP_DEFAULT_LOGGING_EVENTS: &[&str] = &[
"mcp.provider_initialized",
"mcp.provider_initialization_failed",
"mcp.tool_filtered",
"mcp.tool_execution",
"mcp.tool_failed",
"mcp.tool_denied",
];
#[inline]
fn mcp_standard_logging() -> Vec<String> {
MCP_LOGGING_EVENTS.iter().map(|s| (*s).into()).collect()
}
#[inline]
fn mcp_provider_config_with(extra: (&str, Vec<&str>)) -> BTreeMap<String, Vec<String>> {
BTreeMap::from([
("provider".into(), vec!["max_concurrent_requests".into()]),
(
extra.0.into(),
extra.1.into_iter().map(Into::into).collect(),
),
])
}
fn default_secure_mcp_allowlist() -> McpAllowListConfig {
let default_logging = Some(
MCP_DEFAULT_LOGGING_EVENTS
.iter()
.map(|s| (*s).into())
.collect(),
);
let default_configuration = Some(BTreeMap::from([
(
"client".into(),
vec![
"max_concurrent_connections".into(),
"request_timeout_seconds".into(),
"retry_attempts".into(),
"startup_timeout_seconds".into(),
"tool_timeout_seconds".into(),
"experimental_use_rmcp_client".into(),
],
),
(
"ui".into(),
vec![
"mode".into(),
"max_events".into(),
"show_provider_names".into(),
],
),
(
"server".into(),
vec![
"enabled".into(),
"bind_address".into(),
"port".into(),
"transport".into(),
"name".into(),
"version".into(),
],
),
]));
let time_rules = McpAllowListRules {
tools: Some(vec![
"get_*".into(),
"list_*".into(),
"convert_timezone".into(),
"describe_timezone".into(),
"time_*".into(),
]),
resources: Some(vec!["timezone:*".into(), "location:*".into()]),
logging: Some(mcp_standard_logging()),
configuration: Some(mcp_provider_config_with((
"time",
vec!["local_timezone_override"],
))),
..Default::default()
};
let context_rules = McpAllowListRules {
tools: Some(vec![
"search_*".into(),
"fetch_*".into(),
"list_*".into(),
"context7_*".into(),
"get_*".into(),
]),
resources: Some(vec![
"docs::*".into(),
"snippets::*".into(),
"repositories::*".into(),
"context7::*".into(),
]),
prompts: Some(vec![
"context7::*".into(),
"support::*".into(),
"docs::*".into(),
]),
logging: Some(mcp_standard_logging()),
configuration: Some(mcp_provider_config_with((
"context7",
vec!["workspace", "search_scope", "max_results"],
))),
};
let seq_rules = McpAllowListRules {
tools: Some(vec![
"plan".into(),
"critique".into(),
"reflect".into(),
"decompose".into(),
"sequential_*".into(),
]),
resources: None,
prompts: Some(vec![
"sequential-thinking::*".into(),
"plan".into(),
"reflect".into(),
"critique".into(),
]),
logging: Some(mcp_standard_logging()),
configuration: Some(mcp_provider_config_with((
"sequencing",
vec!["max_depth", "max_branches"],
))),
};
let mut allowlist = McpAllowListConfig {
enforce: true,
default: McpAllowListRules {
logging: default_logging,
configuration: default_configuration,
..Default::default()
},
..Default::default()
};
allowlist.providers.insert("time".into(), time_rules);
allowlist.providers.insert("context7".into(), context_rules);
allowlist
.providers
.insert("sequential-thinking".into(), seq_rules);
allowlist
}
fn parse_mcp_policy_key(tool_name: &str) -> Option<(String, String)> {
parse_canonical_mcp_tool_name(tool_name)
.map(|(provider, tool)| (provider.to_string(), tool.to_string()))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlternativeToolPolicyConfig {
pub version: u32,
pub default: AlternativeDefaultPolicy,
pub tools: IndexMap<String, AlternativeToolPolicy>,
#[serde(default)]
pub constraints: IndexMap<String, ToolConstraints>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlternativeDefaultPolicy {
pub allow: bool,
pub rate_limit_per_run: u32,
pub max_concurrent: u32,
pub fs_write: bool,
pub network: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlternativeToolPolicy {
pub allow: bool,
#[serde(default)]
pub fs_write: bool,
#[serde(default)]
pub network: bool,
#[serde(default)]
pub args_policy: Option<AlternativeArgsPolicy>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlternativeArgsPolicy {
pub deny_substrings: Vec<String>,
}
pub trait PermissionPromptHandler: Send + Sync {
fn prompt_tool_permission(&mut self, tool_name: &str) -> Result<ToolExecutionDecision>;
}
pub struct ToolPolicyManager {
config_path: PathBuf,
config: ToolPolicyConfig,
permission_handler: Option<Box<dyn PermissionPromptHandler>>,
workspace_root: Option<PathBuf>,
}
impl Clone for ToolPolicyManager {
fn clone(&self) -> Self {
Self {
config_path: self.config_path.clone(),
config: self.config.clone(),
permission_handler: None, workspace_root: self.workspace_root.clone(),
}
}
}
impl ToolPolicyManager {
pub async fn new() -> Result<Self> {
let config_path = Self::get_config_path().await?;
let config = Self::load_or_create_config(&config_path).await?;
Ok(Self {
config_path,
config,
permission_handler: None,
workspace_root: None,
})
}
pub async fn new_with_workspace(workspace_root: &Path) -> Result<Self> {
let config_path = Self::get_workspace_config_path(workspace_root).await?;
let config = Self::load_or_create_config(&config_path).await?;
Ok(Self {
config_path,
config,
permission_handler: None,
workspace_root: Some(workspace_root.to_path_buf()),
})
}
pub async fn new_with_config_path<P: Into<PathBuf>>(config_path: P) -> Result<Self> {
let config_path = config_path.into();
if let Some(parent) = config_path.parent()
&& !tokio::fs::try_exists(parent).await.unwrap_or(false)
{
ensure_dir_exists(parent)
.await
.with_context(|| format!("{} at {}", ERR_CREATE_POLICY_DIR, parent.display()))?;
}
let config = Self::load_or_create_config(&config_path).await?;
Ok(Self {
config_path,
config,
permission_handler: None,
workspace_root: None,
})
}
pub fn set_permission_handler(&mut self, handler: Box<dyn PermissionPromptHandler>) {
self.permission_handler = Some(handler);
}
async fn get_config_path() -> Result<PathBuf> {
let home_dir = dirs::home_dir().context("Could not determine home directory")?;
let vtcode_dir = home_dir.join(".vtcode");
if !tokio::fs::try_exists(&vtcode_dir).await.unwrap_or(false) {
ensure_dir_exists(&vtcode_dir)
.await
.context("Failed to create ~/.vtcode directory")?;
}
Ok(vtcode_dir.join("tool-policy.json"))
}
async fn get_workspace_config_path(workspace_root: &Path) -> Result<PathBuf> {
let workspace_vtcode_dir = workspace_root.join(".vtcode");
if !tokio::fs::try_exists(&workspace_vtcode_dir)
.await
.unwrap_or(false)
{
ensure_dir_exists(&workspace_vtcode_dir)
.await
.with_context(|| {
format!(
"Failed to create workspace policy directory at {}",
workspace_vtcode_dir.display()
)
})?;
}
Ok(workspace_vtcode_dir.join("tool-policy.json"))
}
async fn load_or_create_config(config_path: &PathBuf) -> Result<ToolPolicyConfig> {
if tokio::fs::try_exists(config_path).await.unwrap_or(false) {
let content = read_file_with_context(config_path, "tool policy config")
.await
.context("Failed to read tool policy config")?;
if let Ok(alt_config) = serde_json::from_str::<AlternativeToolPolicyConfig>(&content) {
return Ok(Self::convert_from_alternative(alt_config));
}
match serde_json::from_str(&content) {
Ok(mut config) => {
Self::apply_auto_allow_defaults(&mut config);
Self::ensure_network_constraints(&mut config);
Ok(config)
}
Err(parse_err) => {
tracing::warn!(
"Invalid tool policy config at {} ({}). Resetting to defaults.",
config_path.display(),
parse_err
);
Self::reset_to_default(config_path).await
}
}
} else {
let mut config = ToolPolicyConfig::default();
Self::apply_auto_allow_defaults(&mut config);
Self::ensure_network_constraints(&mut config);
Ok(config)
}
}
fn apply_auto_allow_defaults(config: &mut ToolPolicyConfig) {
for &tool in AUTO_ALLOW_TOOLS {
config
.policies
.entry(tool.into())
.and_modify(|policy| *policy = ToolPolicy::Allow)
.or_insert(ToolPolicy::Allow);
if !config.available_tools.iter().any(|t| t == tool) {
config.available_tools.push(tool.into());
}
}
Self::ensure_network_constraints(config);
}
fn ensure_network_constraints(_config: &mut ToolPolicyConfig) {
}
async fn reset_to_default(config_path: &PathBuf) -> Result<ToolPolicyConfig> {
let backup_path = config_path.with_extension("json.bak");
if let Err(err) = tokio::fs::rename(config_path, &backup_path).await {
tracing::warn!(
"Unable to back up invalid tool policy config ({}). {}",
config_path.display(),
err
);
} else {
tracing::info!(
"Backed up invalid tool policy config to {}",
backup_path.display()
);
}
let default_config = ToolPolicyConfig::default();
Self::write_config(config_path.as_path(), &default_config).await?;
Ok(default_config)
}
async fn write_config(path: &Path, config: &ToolPolicyConfig) -> Result<()> {
if let Some(parent) = path.parent()
&& !tokio::fs::try_exists(parent).await.unwrap_or(false)
{
ensure_dir_exists(parent)
.await
.with_context(|| format!("{} at {}", ERR_CREATE_POLICY_DIR, parent.display()))?;
}
let serialized = serde_json::to_string_pretty(config)
.context("Failed to serialize tool policy config")?;
write_file_with_context(path, &serialized, "tool policy config")
.await
.with_context(|| format!("Failed to write tool policy config: {}", path.display()))
}
fn convert_from_alternative(alt_config: AlternativeToolPolicyConfig) -> ToolPolicyConfig {
let mut policies = IndexMap::new();
for (tool_name, alt_policy) in alt_config.tools {
let policy = if alt_policy.allow {
ToolPolicy::Allow
} else {
ToolPolicy::Deny
};
policies.insert(tool_name, policy);
}
let mut config = ToolPolicyConfig {
version: alt_config.version,
available_tools: policies.keys().cloned().collect(),
policies,
constraints: alt_config.constraints,
mcp: McpPolicyStore::default(),
approval_cache: ApprovalCacheConfig::default(),
};
Self::apply_auto_allow_defaults(&mut config);
config
}
fn apply_config_policy(&mut self, tool_name: &str, policy: ConfigToolPolicy) {
let canonical = canonical_tool_name(tool_name);
let runtime_policy = match policy {
ConfigToolPolicy::Allow => ToolPolicy::Allow,
ConfigToolPolicy::Prompt => ToolPolicy::Prompt,
ConfigToolPolicy::Deny => ToolPolicy::Deny,
};
self.config
.policies
.insert(canonical.into_owned(), runtime_policy);
}
fn resolve_config_policy(tools_config: &ToolsConfig, tool_name: &str) -> ConfigToolPolicy {
let canonical = canonical_tool_name(tool_name);
let lookup: &str = &canonical;
if let Some(policy) = tools_config.policies.get(lookup) {
return *policy;
}
match tool_name {
tools::UNIFIED_SEARCH => tools_config
.policies
.get("list_dir")
.or_else(|| tools_config.policies.get("list_directory"))
.cloned(),
_ => None,
}
.unwrap_or(tools_config.default_policy)
}
pub async fn apply_tools_config(&mut self, tools_config: &ToolsConfig) -> Result<()> {
if self.config.available_tools.is_empty() {
return Ok(());
}
let tools: Vec<_> = self.config.available_tools.to_vec();
for tool in tools {
let config_policy = Self::resolve_config_policy(tools_config, &tool);
self.apply_config_policy(&tool, config_policy);
}
Self::apply_auto_allow_defaults(&mut self.config);
self.save_config().await
}
pub async fn update_available_tools(&mut self, tools: Vec<String>) -> Result<()> {
let mut canonical_tools = Vec::with_capacity(tools.len());
let mut seen = HashSet::with_capacity(tools.len());
for tool in tools {
let canonical = canonical_tool_name(&tool).into_owned();
if seen.insert(canonical.clone()) {
canonical_tools.push(canonical);
}
}
canonical_tools.sort();
let current_tools: HashSet<_> = self.config.policies.keys().cloned().collect();
let new_tools: HashSet<_> = canonical_tools
.iter()
.filter(|name| !name.starts_with("mcp::"))
.cloned()
.collect();
let mut has_changes = false;
for tool in canonical_tools
.iter()
.filter(|tool| !tool.starts_with("mcp::") && !current_tools.contains(*tool))
{
let default_policy = if AUTO_ALLOW_TOOLS.contains(&tool.as_str()) {
ToolPolicy::Allow
} else {
ToolPolicy::Prompt
};
self.config.policies.insert(tool.clone(), default_policy);
has_changes = true;
}
let tools_to_remove: Vec<_> = self
.config
.policies
.keys()
.filter(|tool| !new_tools.contains(*tool))
.cloned()
.collect();
for tool in tools_to_remove {
self.config.policies.shift_remove(&tool);
has_changes = true;
}
let mut sorted_available = self.config.available_tools.clone();
sorted_available.sort();
if sorted_available != canonical_tools {
self.config.available_tools = canonical_tools;
has_changes = true;
}
Self::ensure_network_constraints(&mut self.config);
if has_changes {
self.save_config().await
} else {
Ok(())
}
}
pub async fn update_mcp_tools(
&mut self,
provider_tools: &HashMap<String, Vec<String>>,
) -> Result<()> {
let stored_providers: HashSet<String> = self.config.mcp.providers.keys().cloned().collect();
let mut has_changes = false;
for (provider, tools) in provider_tools {
let entry = self
.config
.mcp
.providers
.entry(provider.clone())
.or_default();
let existing_tools: HashSet<_> = entry.tools.keys().cloned().collect();
let advertised: HashSet<_> = tools.iter().cloned().collect();
for tool in tools {
if !existing_tools.contains(tool) {
entry.tools.insert(tool.clone(), ToolPolicy::Prompt);
has_changes = true;
}
}
for stale in existing_tools.difference(&advertised) {
entry.tools.shift_remove(stale);
has_changes = true;
}
}
let advertised_providers: HashSet<String> = provider_tools.keys().cloned().collect();
for provider in stored_providers
.difference(&advertised_providers)
.cloned()
.collect::<Vec<_>>()
{
self.config.mcp.providers.shift_remove(provider.as_str());
has_changes = true;
}
let stale_runtime_keys: Vec<_> = self
.config
.policies
.keys()
.filter(|name| name.starts_with("mcp::"))
.cloned()
.collect();
for key in stale_runtime_keys {
self.config.policies.shift_remove(&key);
has_changes = true;
}
let mut available: Vec<String> = self
.config
.available_tools
.iter()
.filter(|name| !name.starts_with("mcp::"))
.cloned()
.collect();
available.extend(
self.config
.mcp
.providers
.iter()
.flat_map(|(provider, policy)| {
policy
.tools
.keys()
.map(move |tool| format!("mcp::{}::{}", provider, tool))
}),
);
available.sort();
available.dedup();
if self.config.available_tools != available {
self.config.available_tools = available;
has_changes = true;
}
if has_changes {
self.save_config().await
} else {
Ok(())
}
}
pub fn get_mcp_tool_policy(&self, provider: &str, tool: &str) -> ToolPolicy {
self.config
.mcp
.providers
.get(provider)
.and_then(|policy| policy.tools.get(tool))
.cloned()
.unwrap_or(ToolPolicy::Prompt)
}
pub async fn set_mcp_tool_policy(
&mut self,
provider: &str,
tool: &str,
policy: ToolPolicy,
) -> Result<()> {
let entry = self
.config
.mcp
.providers
.entry(provider.into())
.or_default();
entry.tools.insert(tool.into(), policy);
self.save_config().await
}
pub fn mcp_allowlist(&self) -> &McpAllowListConfig {
&self.config.mcp.allowlist
}
pub async fn set_mcp_allowlist(&mut self, allowlist: McpAllowListConfig) -> Result<()> {
self.config.mcp.allowlist = allowlist;
self.save_config().await
}
pub fn get_policy(&self, tool_name: &str) -> ToolPolicy {
let canonical = canonical_tool_name(tool_name);
if let Some((provider, tool)) = parse_mcp_policy_key(tool_name) {
return self.get_mcp_tool_policy(&provider, &tool);
}
self.config
.policies
.get(&*canonical)
.cloned()
.unwrap_or(ToolPolicy::Prompt)
}
pub fn get_constraints(&self, tool_name: &str) -> Option<&ToolConstraints> {
let canonical = canonical_tool_name(tool_name);
self.config.constraints.get(&*canonical)
}
pub async fn should_execute_tool(&mut self, tool_name: &str) -> Result<ToolExecutionDecision> {
if let Some((provider, tool)) = parse_mcp_policy_key(tool_name) {
return match self.get_mcp_tool_policy(&provider, &tool) {
ToolPolicy::Allow => Ok(ToolExecutionDecision::Allowed),
ToolPolicy::Deny => Ok(ToolExecutionDecision::Denied),
ToolPolicy::Prompt => {
if ToolPolicyManager::is_auto_allow_tool(tool_name) {
self.set_mcp_tool_policy(&provider, &tool, ToolPolicy::Allow)
.await?;
Ok(ToolExecutionDecision::Allowed)
} else {
if let Some(ref mut handler) = self.permission_handler {
handler.prompt_tool_permission(tool_name)
} else {
Ok(ToolExecutionDecision::Allowed)
}
}
}
};
}
let canonical = canonical_tool_name(tool_name);
match self.get_policy(canonical.as_ref()) {
ToolPolicy::Allow => Ok(ToolExecutionDecision::Allowed),
ToolPolicy::Deny => Ok(ToolExecutionDecision::Denied),
ToolPolicy::Prompt => {
let canonical_name = canonical.as_ref();
if AUTO_ALLOW_TOOLS.contains(&canonical_name) {
self.set_policy(canonical_name, ToolPolicy::Allow).await?;
return Ok(ToolExecutionDecision::Allowed);
}
if let Some(ref mut handler) = self.permission_handler {
handler.prompt_tool_permission(tool_name)
} else {
Ok(ToolExecutionDecision::Allowed)
}
}
}
}
pub fn is_auto_allow_tool(tool_name: &str) -> bool {
let canonical = canonical_tool_name(tool_name);
AUTO_ALLOW_TOOLS.contains(&canonical.as_ref())
}
pub fn prompt_user_for_tool(&mut self, tool_name: &str) -> Result<ToolExecutionDecision> {
if let Some(ref mut handler) = self.permission_handler {
handler.prompt_tool_permission(tool_name)
} else {
Ok(ToolExecutionDecision::Allowed)
}
}
pub async fn set_policy(&mut self, tool_name: &str, policy: ToolPolicy) -> Result<()> {
if let Some((provider, tool)) = parse_mcp_policy_key(tool_name) {
return self.set_mcp_tool_policy(&provider, &tool, policy).await;
}
let canonical = canonical_tool_name(tool_name).into_owned();
self.config
.policies
.insert(canonical.clone(), policy.clone());
self.save_config().await?;
self.persist_policy_to_workspace_config(&canonical, policy)
}
pub(crate) async fn seed_default_policy(
&mut self,
tool_name: &str,
policy: ToolPolicy,
) -> Result<()> {
let canonical = canonical_tool_name(tool_name).into_owned();
self.config.policies.insert(canonical, policy);
self.save_config().await
}
pub async fn reset_all_to_prompt(&mut self) -> Result<()> {
for policy in self.config.policies.values_mut() {
*policy = ToolPolicy::Prompt;
}
for provider in self.config.mcp.providers.values_mut() {
for policy in provider.tools.values_mut() {
*policy = ToolPolicy::Prompt;
}
}
self.config.approval_cache.allowed.clear();
self.config.approval_cache.prefixes.clear();
self.config.approval_cache.regexes.clear();
self.save_config().await
}
pub async fn allow_all_tools(&mut self) -> Result<()> {
for policy in self.config.policies.values_mut() {
*policy = ToolPolicy::Allow;
}
for provider in self.config.mcp.providers.values_mut() {
for policy in provider.tools.values_mut() {
*policy = ToolPolicy::Allow;
}
}
self.save_config().await
}
pub async fn deny_all_tools(&mut self) -> Result<()> {
for policy in self.config.policies.values_mut() {
*policy = ToolPolicy::Deny;
}
for provider in self.config.mcp.providers.values_mut() {
for policy in provider.tools.values_mut() {
*policy = ToolPolicy::Deny;
}
}
self.config.approval_cache.allowed.clear();
self.config.approval_cache.prefixes.clear();
self.config.approval_cache.regexes.clear();
self.save_config().await
}
pub fn get_policy_summary(&self) -> IndexMap<String, ToolPolicy> {
let mut summary = self.config.policies.clone();
for (provider, policy) in &self.config.mcp.providers {
for (tool, status) in &policy.tools {
summary.insert(format!("mcp::{}::{}", provider, tool), status.clone());
}
}
summary
}
pub fn has_approval_cache_key(&self, approval_key: &str) -> bool {
self.config.approval_cache.allowed.contains(approval_key)
|| self
.config
.approval_cache
.regexes
.iter()
.filter_map(|pattern| Regex::new(pattern).ok())
.any(|regex| regex.is_match(approval_key))
}
pub async fn add_approval_cache_key(&mut self, approval_key: impl Into<String>) -> Result<()> {
if self
.config
.approval_cache
.allowed
.insert(approval_key.into())
{
self.save_config().await?;
}
Ok(())
}
pub async fn add_approval_cache_prefix(
&mut self,
prefix_entry: impl Into<String>,
) -> Result<()> {
if self
.config
.approval_cache
.prefixes
.insert(prefix_entry.into())
{
self.save_config().await?;
}
Ok(())
}
pub fn matching_shell_approval_prefix(
&self,
command_words: &[String],
scope_signature: &str,
) -> Option<String> {
self.config
.approval_cache
.prefixes
.iter()
.find_map(|entry| {
let (prefix_text, entry_scope_signature) =
split_shell_approval_entry(entry.as_str());
let prefix_words = shell_words::split(prefix_text).ok()?;
let entry_scope_signature =
entry_scope_signature.unwrap_or(DEFAULT_APPROVAL_SCOPE_SIGNATURE);
(entry_scope_signature == scope_signature
&& shell_command_words_match_prefix(command_words, &prefix_words))
.then(|| entry.clone())
})
}
pub async fn clear_approval_cache(&mut self) -> Result<()> {
if !self.config.approval_cache.allowed.is_empty()
|| !self.config.approval_cache.prefixes.is_empty()
|| !self.config.approval_cache.regexes.is_empty()
{
self.config.approval_cache.allowed.clear();
self.config.approval_cache.prefixes.clear();
self.config.approval_cache.regexes.clear();
self.save_config().await?;
}
Ok(())
}
async fn save_config(&self) -> Result<()> {
Self::write_config(&self.config_path, &self.config).await
}
fn persist_policy_to_workspace_config(
&self,
tool_name: &str,
policy: ToolPolicy,
) -> Result<()> {
let Some(workspace_root) = self.workspace_root.as_ref() else {
return Ok(());
};
let config_path = workspace_root.join("vtcode.toml");
let mut config = if config_path.exists() {
ConfigManager::load_from_file(&config_path)
.with_context(|| {
format!(
"Failed to load config for tool policy persistence at {}",
config_path.display()
)
})?
.config()
.clone()
} else {
VTCodeConfig::default()
};
config
.tools
.policies
.insert(tool_name.to_string(), Self::to_config_policy(policy));
ConfigManager::save_config_to_path(&config_path, &config)
.with_context(|| format!("Failed to persist tool policy to {}", config_path.display()))
}
fn to_config_policy(policy: ToolPolicy) -> ConfigToolPolicy {
match policy {
ToolPolicy::Allow => ConfigToolPolicy::Allow,
ToolPolicy::Prompt => ConfigToolPolicy::Prompt,
ToolPolicy::Deny => ConfigToolPolicy::Deny,
}
}
pub fn print_status(&self) {
println!("{}", style("Tool Policy Status").cyan().bold());
println!("Config file: {}", self.config_path.display());
println!();
let summary = self.get_policy_summary();
if summary.is_empty() {
println!("No tools configured yet.");
return;
}
let mut allow_count = 0;
let mut prompt_count = 0;
let mut deny_count = 0;
for (tool, policy) in &summary {
let (status, color_name) = match policy {
ToolPolicy::Allow => {
allow_count += 1;
("ALLOW", "green")
}
ToolPolicy::Prompt => {
prompt_count += 1;
("PROMPT", "cyan")
}
ToolPolicy::Deny => {
deny_count += 1;
("DENY", "red")
}
};
let status_styled = match color_name {
"green" => style(status).green(),
"cyan" => style(status).cyan(),
"red" => style(status).red(),
_ => style(status),
};
println!(
" {} {}",
style(format!("{:15}", tool)).cyan(),
status_styled
);
}
println!();
println!(
"Summary: {} allowed, {} prompt, {} denied",
style(allow_count).green(),
style(prompt_count).cyan(),
style(deny_count).red()
);
}
pub fn config_path(&self) -> &Path {
&self.config_path
}
}
fn split_shell_approval_entry(entry: &str) -> (&str, Option<&str>) {
if let Some(index) = entry.find(SHELL_APPROVAL_SCOPE_MARKER) {
let (prefix, scoped) = entry.split_at(index);
(prefix, Some(&scoped[1..]))
} else {
(entry, None)
}
}
fn shell_command_words_match_prefix(command_words: &[String], prefix_words: &[String]) -> bool {
command_words.len() >= prefix_words.len()
&& prefix_words
.iter()
.zip(command_words.iter())
.all(|(prefix, command)| prefix == command)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ToolConstraints {
#[serde(default)]
pub allowed_modes: Option<Vec<String>>,
#[serde(default)]
pub max_results_per_call: Option<usize>,
#[serde(default)]
pub max_items_per_call: Option<usize>,
#[serde(default)]
pub default_response_format: Option<String>,
#[serde(default)]
pub max_bytes_per_read: Option<usize>,
#[serde(default)]
pub max_response_bytes: Option<usize>,
#[serde(default)]
pub allowed_url_schemes: Option<Vec<String>>,
#[serde(default)]
pub denied_url_hosts: Option<Vec<String>>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::constants::tools;
use tempfile::tempdir;
#[test]
fn test_tool_policy_config_serialization() {
let mut config = ToolPolicyConfig {
available_tools: vec![tools::READ_FILE.to_owned(), tools::WRITE_FILE.to_owned()],
..Default::default()
};
config
.policies
.insert(tools::READ_FILE.to_owned(), ToolPolicy::Allow);
config
.policies
.insert(tools::WRITE_FILE.to_owned(), ToolPolicy::Prompt);
config
.approval_cache
.allowed
.insert("unified_exec:cargo test".to_string());
let json = serde_json::to_string_pretty(&config).unwrap();
let deserialized: ToolPolicyConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config.available_tools, deserialized.available_tools);
assert_eq!(config.policies, deserialized.policies);
assert_eq!(config.approval_cache, deserialized.approval_cache);
}
#[tokio::test]
async fn test_policy_updates() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("tool-policy.json");
let mut config = ToolPolicyConfig {
available_tools: vec!["tool1".to_owned()],
..Default::default()
};
config
.policies
.insert("tool1".to_owned(), ToolPolicy::Prompt);
let content = serde_json::to_string_pretty(&config).unwrap();
std::fs::write(&config_path, content).unwrap();
let mut loaded_config = ToolPolicyManager::load_or_create_config(&config_path)
.await
.unwrap();
let new_tools = vec!["tool1".to_owned(), "tool2".to_owned()];
let current_tools: HashSet<_> = loaded_config.available_tools.iter().cloned().collect();
for tool in &new_tools {
if !current_tools.contains(tool) {
loaded_config
.policies
.insert(tool.clone(), ToolPolicy::Prompt);
}
}
loaded_config.available_tools = new_tools;
assert!(loaded_config.policies.len() >= 2);
assert_eq!(
loaded_config.policies.get("tool2"),
Some(&ToolPolicy::Prompt)
);
assert_eq!(
loaded_config.policies.get("tool1"),
Some(&ToolPolicy::Prompt)
);
}
#[tokio::test]
async fn approval_cache_keys_round_trip() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("tool-policy.json");
let mut manager = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("manager");
manager
.add_approval_cache_key(
"cargo test|sandbox_permissions=\"use_default\"|additional_permissions=null",
)
.await
.expect("persist approval");
let reloaded = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("reload manager");
assert!(reloaded.has_approval_cache_key(
"cargo test|sandbox_permissions=\"use_default\"|additional_permissions=null"
));
}
#[tokio::test]
async fn approval_cache_prefixes_match_shell_prefixes() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("tool-policy.json");
let mut manager = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("manager");
manager
.add_approval_cache_prefix(
"cargo test|sandbox_permissions=\"use_default\"|additional_permissions=null",
)
.await
.expect("persist prefix");
let reloaded = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("reload manager");
let command_words = vec![
"cargo".to_string(),
"test".to_string(),
"-p".to_string(),
"vtcode-core".to_string(),
];
assert!(
reloaded
.matching_shell_approval_prefix(
&command_words,
"sandbox_permissions=\"use_default\"|additional_permissions=null",
)
.is_some()
);
}
#[tokio::test]
async fn approval_cache_regexes_match_keys() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("tool-policy.json");
let mut manager = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("manager");
manager
.config
.approval_cache
.regexes
.insert("^cargo (check|fmt)\\|sandbox_permissions=\\\"use_default\\\".*$".to_string());
manager.save_config().await.expect("save regex");
let reloaded = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("reload manager");
assert!(reloaded.has_approval_cache_key(
"cargo check|sandbox_permissions=\"use_default\"|additional_permissions=null"
));
}
#[tokio::test]
async fn reset_to_prompt_clears_approval_cache() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("tool-policy.json");
let mut manager = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("manager");
manager
.add_approval_cache_key("read_file")
.await
.expect("persist approval");
manager
.add_approval_cache_prefix(
"cargo check|sandbox_permissions=\"use_default\"|additional_permissions=null",
)
.await
.expect("persist prefix");
manager
.config
.approval_cache
.regexes
.insert("^cargo check.*$".to_string());
manager.reset_all_to_prompt().await.expect("reset policies");
let reloaded = ToolPolicyManager::new_with_config_path(&config_path)
.await
.expect("reload manager");
assert!(!reloaded.has_approval_cache_key("read_file"));
assert!(reloaded.config.approval_cache.prefixes.is_empty());
assert!(reloaded.config.approval_cache.regexes.is_empty());
}
}