use std::sync::Arc;
use rmcp::{
ErrorData as RmcpError, RoleServer, ServerHandler, handler::server::router::tool::ToolRouter,
handler::server::wrapper::Parameters, model::*, schemars, service::RequestContext, tool,
tool_handler, tool_router,
};
use serde::Deserialize;
use tokio::sync::RwLock;
use tracing::{debug, info};
use crate::cloud_tools::CloudTools;
use crate::enterprise_tools::EnterpriseTools;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub profile: Option<String>,
pub read_only: bool,
}
#[derive(Clone)]
pub struct RedisCtlMcp {
config: Arc<ServerConfig>,
tool_router: ToolRouter<RedisCtlMcp>,
cloud_tools: Arc<RwLock<Option<CloudTools>>>,
enterprise_tools: Arc<RwLock<Option<EnterpriseTools>>>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct SubscriptionIdParam {
pub subscription_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseIdParam {
pub subscription_id: i64,
pub database_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TaskIdParam {
pub task_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct NodeIdParam {
pub node_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct EnterpriseDatabaseIdParam {
pub database_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateEnterpriseDatabaseParam {
pub name: String,
#[serde(default)]
pub memory_size_mb: Option<u64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateEnterpriseDatabaseParam {
pub database_id: i64,
#[serde(default)]
pub memory_size: Option<u64>,
#[serde(default)]
pub replication: Option<bool>,
#[serde(default)]
pub data_persistence: Option<String>,
#[serde(default)]
pub eviction_policy: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ExportDatabaseParam {
pub database_id: i64,
pub export_location: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ImportDatabaseParam {
pub database_id: i64,
pub import_location: String,
#[serde(default)]
pub flush_before_import: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct RestoreDatabaseParam {
pub database_id: i64,
#[serde(default)]
pub backup_uid: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateClusterParam {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub email_alerts: Option<bool>,
#[serde(default)]
pub rack_aware: Option<bool>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateNodeParam {
pub node_id: i64,
#[serde(default)]
pub accept_servers: Option<bool>,
#[serde(default)]
pub external_addr: Option<Vec<String>>,
#[serde(default)]
pub rack_id: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ShardIdParam {
pub shard_uid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct AlertIdParam {
pub alert_uid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UserIdParam {
pub user_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateUserParam {
pub email: String,
pub password: String,
pub role: String,
#[serde(default)]
pub name: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct RoleIdParam {
pub role_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateRoleParam {
pub name: String,
#[serde(default)]
pub management: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct AclIdParam {
pub acl_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateAclParam {
pub name: String,
pub acl: String,
#[serde(default)]
pub description: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ModuleIdParam {
pub module_uid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CrdbGuidParam {
pub crdb_guid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateCrdbParam {
pub crdb_guid: String,
#[serde(default)]
pub memory_size: Option<u64>,
#[serde(default)]
pub encryption: Option<bool>,
#[serde(default)]
pub data_persistence: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DebugInfoTaskIdParam {
pub task_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CloudProviderParam {
#[serde(default)]
pub provider: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateProSubscriptionParam {
pub request: serde_json::Value,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateEssentialsSubscriptionParam {
pub name: String,
pub plan_id: i64,
#[serde(default)]
pub payment_method_id: Option<i64>,
}
impl RedisCtlMcp {
pub fn new(profile: Option<&str>, read_only: bool) -> anyhow::Result<Self> {
let config = Arc::new(ServerConfig {
profile: profile.map(String::from),
read_only,
});
info!(
profile = ?config.profile,
read_only = config.read_only,
"Initializing RedisCtlMcp server"
);
Ok(Self {
config,
tool_router: Self::tool_router(),
cloud_tools: Arc::new(RwLock::new(None)),
enterprise_tools: Arc::new(RwLock::new(None)),
})
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
async fn get_cloud_tools(&self) -> Result<CloudTools, RmcpError> {
let mut guard = self.cloud_tools.write().await;
if guard.is_none() {
debug!("Initializing Cloud tools");
let tools = CloudTools::new(self.config.profile.as_deref())
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
*guard = Some(tools);
}
Ok(guard.clone().unwrap())
}
async fn get_enterprise_tools(&self) -> Result<EnterpriseTools, RmcpError> {
let mut guard = self.enterprise_tools.write().await;
if guard.is_none() {
debug!("Initializing Enterprise tools");
let tools = EnterpriseTools::new(self.config.profile.as_deref())
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
*guard = Some(tools);
}
Ok(guard.clone().unwrap())
}
}
#[tool_router]
impl RedisCtlMcp {
#[tool(
description = "Get Redis Cloud account information including account ID, name, and settings"
)]
async fn cloud_account_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_account_get");
let tools = self.get_cloud_tools().await?;
tools.get_account().await
}
#[tool(description = "List all Redis Cloud subscriptions in the account")]
async fn cloud_subscriptions_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_subscriptions_list");
let tools = self.get_cloud_tools().await?;
tools.list_subscriptions().await
}
#[tool(description = "Get detailed information about a specific Redis Cloud subscription")]
async fn cloud_subscription_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_subscription_get"
);
let tools = self.get_cloud_tools().await?;
tools.get_subscription(params.subscription_id).await
}
#[tool(description = "List all databases in a specific Redis Cloud subscription")]
async fn cloud_databases_list(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_databases_list"
);
let tools = self.get_cloud_tools().await?;
tools.list_databases(params.subscription_id).await
}
#[tool(description = "Get detailed information about a specific Redis Cloud database")]
async fn cloud_database_get(
&self,
Parameters(params): Parameters<DatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
database_id = params.database_id,
"Tool called: cloud_database_get"
);
let tools = self.get_cloud_tools().await?;
tools
.get_database(params.subscription_id, params.database_id)
.await
}
#[tool(description = "List recent async tasks in the Redis Cloud account")]
async fn cloud_tasks_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_tasks_list");
let tools = self.get_cloud_tools().await?;
tools.list_tasks().await
}
#[tool(description = "Get the status of a specific Redis Cloud async task")]
async fn cloud_task_get(
&self,
Parameters(params): Parameters<TaskIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(task_id = %params.task_id, "Tool called: cloud_task_get");
let tools = self.get_cloud_tools().await?;
tools.get_task(¶ms.task_id).await
}
#[tool(description = "List all payment methods configured for your Redis Cloud account")]
async fn cloud_payment_methods_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_payment_methods_get");
let tools = self.get_cloud_tools().await?;
tools.get_payment_methods().await
}
#[tool(
description = "List all available database modules (capabilities) supported in your account"
)]
async fn cloud_database_modules_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_database_modules_get");
let tools = self.get_cloud_tools().await?;
tools.get_database_modules().await
}
#[tool(
description = "Get available regions across cloud providers (AWS, GCP, Azure) for Pro subscriptions"
)]
async fn cloud_regions_get(
&self,
Parameters(params): Parameters<CloudProviderParam>,
) -> Result<CallToolResult, RmcpError> {
info!(provider = ?params.provider, "Tool called: cloud_regions_get");
let tools = self.get_cloud_tools().await?;
tools.get_regions(params.provider.as_deref()).await
}
#[tool(
description = "Create a new Pro subscription with advanced configuration options. Requires JSON payload with cloudProviders and databases arrays. Use cloud_regions_get to find available regions."
)]
async fn cloud_pro_subscription_create(
&self,
Parameters(params): Parameters<CreateProSubscriptionParam>,
) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_pro_subscription_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools.create_subscription(params.request).await
}
#[tool(
description = "Delete a Pro subscription. All databases must be deleted first. This is a destructive operation."
)]
async fn cloud_pro_subscription_delete(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_pro_subscription_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools.delete_subscription(params.subscription_id).await
}
#[tool(description = "List all Essentials (fixed) subscriptions in the account")]
async fn cloud_essentials_subscriptions_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_essentials_subscriptions_list");
let tools = self.get_cloud_tools().await?;
tools.list_essentials_subscriptions().await
}
#[tool(description = "Get detailed information about a specific Essentials subscription")]
async fn cloud_essentials_subscription_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_essentials_subscription_get"
);
let tools = self.get_cloud_tools().await?;
tools
.get_essentials_subscription(params.subscription_id)
.await
}
#[tool(
description = "Create a new Essentials subscription. Use cloud_essentials_plans_list to find available plans."
)]
async fn cloud_essentials_subscription_create(
&self,
Parameters(params): Parameters<CreateEssentialsSubscriptionParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
name = %params.name,
plan_id = params.plan_id,
"Tool called: cloud_essentials_subscription_create"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.create_essentials_subscription(¶ms.name, params.plan_id, params.payment_method_id)
.await
}
#[tool(
description = "Delete an Essentials subscription. This is a destructive operation that cannot be undone."
)]
async fn cloud_essentials_subscription_delete(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_essentials_subscription_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.delete_essentials_subscription(params.subscription_id)
.await
}
#[tool(
description = "List available Essentials subscription plans with pricing. Optionally filter by cloud provider (AWS, GCP, Azure)."
)]
async fn cloud_essentials_plans_list(
&self,
Parameters(params): Parameters<CloudProviderParam>,
) -> Result<CallToolResult, RmcpError> {
info!(provider = ?params.provider, "Tool called: cloud_essentials_plans_list");
let tools = self.get_cloud_tools().await?;
tools
.list_essentials_plans(params.provider.as_deref())
.await
}
#[tool(
description = "Get Redis Enterprise cluster information including name, version, and node count"
)]
async fn enterprise_cluster_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_get");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster().await
}
#[tool(description = "List all nodes in the Redis Enterprise cluster with their status")]
async fn enterprise_nodes_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_nodes_list");
let tools = self.get_enterprise_tools().await?;
tools.list_nodes().await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise node")]
async fn enterprise_node_get(
&self,
Parameters(params): Parameters<NodeIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(node_id = params.node_id, "Tool called: enterprise_node_get");
let tools = self.get_enterprise_tools().await?;
tools.get_node(params.node_id).await
}
#[tool(description = "List all databases (BDBs) in the Redis Enterprise cluster")]
async fn enterprise_databases_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_databases_list");
let tools = self.get_enterprise_tools().await?;
tools.list_databases().await
}
#[tool(
description = "Get detailed information about a specific Redis Enterprise database (BDB)"
)]
async fn enterprise_database_get(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_get"
);
let tools = self.get_enterprise_tools().await?;
tools.get_database(params.database_id).await
}
#[tool(description = "Get performance statistics for a specific Redis Enterprise database")]
async fn enterprise_database_stats(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_stats"
);
let tools = self.get_enterprise_tools().await?;
tools.get_database_stats(params.database_id).await
}
#[tool(description = "List all shards across all databases in the Redis Enterprise cluster")]
async fn enterprise_shards_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_shards_list");
let tools = self.get_enterprise_tools().await?;
tools.list_shards().await
}
#[tool(description = "List active alerts in the Redis Enterprise cluster")]
async fn enterprise_alerts_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_alerts_list");
let tools = self.get_enterprise_tools().await?;
tools.list_alerts().await
}
#[tool(description = "Get recent event logs from the Redis Enterprise cluster")]
async fn enterprise_logs_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_logs_get");
let tools = self.get_enterprise_tools().await?;
tools.get_logs().await
}
#[tool(
description = "Get Redis Enterprise license information including expiration and capacity"
)]
async fn enterprise_license_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_license_get");
let tools = self.get_enterprise_tools().await?;
tools.get_license().await
}
#[tool(
description = "Create a new Redis Enterprise database. Requires name, optionally memory_size_mb (default 100)."
)]
async fn enterprise_database_create(
&self,
Parameters(params): Parameters<CreateEnterpriseDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
name = %params.name,
memory_size_mb = ?params.memory_size_mb,
"Tool called: enterprise_database_create"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_database(¶ms.name, params.memory_size_mb)
.await
}
#[tool(
description = "Delete a Redis Enterprise database. This is a destructive operation that cannot be undone."
)]
async fn enterprise_database_delete(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_database(params.database_id).await
}
#[tool(
description = "Update a Redis Enterprise database configuration. Supports memory_size (bytes), replication, data_persistence, and eviction_policy."
)]
async fn enterprise_database_update(
&self,
Parameters(params): Parameters<UpdateEnterpriseDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_update"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(memory_size) = params.memory_size {
updates.insert("memory_size".to_string(), serde_json::json!(memory_size));
}
if let Some(replication) = params.replication {
updates.insert("replication".to_string(), serde_json::json!(replication));
}
if let Some(ref data_persistence) = params.data_persistence {
updates.insert(
"data_persistence".to_string(),
serde_json::json!(data_persistence),
);
}
if let Some(ref eviction_policy) = params.eviction_policy {
updates.insert(
"eviction_policy".to_string(),
serde_json::json!(eviction_policy),
);
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: memory_size, replication, data_persistence, eviction_policy",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_database(params.database_id, serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Flush all data from a Redis Enterprise database. This is a destructive operation that cannot be undone."
)]
async fn enterprise_database_flush(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_flush"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.flush_database(params.database_id).await
}
#[tool(description = "Get performance metrics for a Redis Enterprise database")]
async fn enterprise_database_metrics(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_metrics"
);
let tools = self.get_enterprise_tools().await?;
tools.get_database_metrics(params.database_id).await
}
#[tool(
description = "Export a Redis Enterprise database to a specified location (S3, FTP, etc.)"
)]
async fn enterprise_database_export(
&self,
Parameters(params): Parameters<ExportDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
export_location = %params.export_location,
"Tool called: enterprise_database_export"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.export_database(params.database_id, ¶ms.export_location)
.await
}
#[tool(description = "Import data into a Redis Enterprise database from a specified location")]
async fn enterprise_database_import(
&self,
Parameters(params): Parameters<ImportDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
import_location = %params.import_location,
flush_before_import = params.flush_before_import,
"Tool called: enterprise_database_import"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.import_database(
params.database_id,
¶ms.import_location,
params.flush_before_import,
)
.await
}
#[tool(description = "Trigger a backup of a Redis Enterprise database")]
async fn enterprise_database_backup(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_backup"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.backup_database(params.database_id).await
}
#[tool(description = "Restore a Redis Enterprise database from a backup")]
async fn enterprise_database_restore(
&self,
Parameters(params): Parameters<RestoreDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
backup_uid = ?params.backup_uid,
"Tool called: enterprise_database_restore"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.restore_database(params.database_id, params.backup_uid.as_deref())
.await
}
#[tool(
description = "Get Redis Enterprise cluster statistics including memory, CPU, and throughput metrics"
)]
async fn enterprise_cluster_stats(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_stats");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_stats().await
}
#[tool(description = "Get Redis Enterprise cluster settings and configuration")]
async fn enterprise_cluster_settings(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_settings");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_settings().await
}
#[tool(
description = "Get Redis Enterprise cluster topology showing nodes, shards, and their relationships"
)]
async fn enterprise_cluster_topology(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_topology");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_topology().await
}
#[tool(
description = "Update Redis Enterprise cluster configuration. Supports name, email_alerts, and rack_aware settings."
)]
async fn enterprise_cluster_update(
&self,
Parameters(params): Parameters<UpdateClusterParam>,
) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_update");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(ref name) = params.name {
updates.insert("name".to_string(), serde_json::json!(name));
}
if let Some(email_alerts) = params.email_alerts {
updates.insert("email_alerts".to_string(), serde_json::json!(email_alerts));
}
if let Some(rack_aware) = params.rack_aware {
updates.insert("rack_aware".to_string(), serde_json::json!(rack_aware));
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: name, email_alerts, rack_aware",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_cluster(serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Get statistics for a specific Redis Enterprise node including CPU, memory, and network metrics"
)]
async fn enterprise_node_stats(
&self,
Parameters(params): Parameters<NodeIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
node_id = params.node_id,
"Tool called: enterprise_node_stats"
);
let tools = self.get_enterprise_tools().await?;
tools.get_node_stats(params.node_id).await
}
#[tool(
description = "Update a Redis Enterprise node configuration. Supports accept_servers, external_addr, and rack_id."
)]
async fn enterprise_node_update(
&self,
Parameters(params): Parameters<UpdateNodeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
node_id = params.node_id,
"Tool called: enterprise_node_update"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(accept_servers) = params.accept_servers {
updates.insert(
"accept_servers".to_string(),
serde_json::json!(accept_servers),
);
}
if let Some(ref external_addr) = params.external_addr {
updates.insert(
"external_addr".to_string(),
serde_json::json!(external_addr),
);
}
if let Some(ref rack_id) = params.rack_id {
updates.insert("rack_id".to_string(), serde_json::json!(rack_id));
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: accept_servers, external_addr, rack_id",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_node(params.node_id, serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Remove a node from the Redis Enterprise cluster. This is a destructive operation."
)]
async fn enterprise_node_remove(
&self,
Parameters(params): Parameters<NodeIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
node_id = params.node_id,
"Tool called: enterprise_node_remove"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.remove_node(params.node_id).await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise shard")]
async fn enterprise_shard_get(
&self,
Parameters(params): Parameters<ShardIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(shard_uid = %params.shard_uid, "Tool called: enterprise_shard_get");
let tools = self.get_enterprise_tools().await?;
tools.get_shard(¶ms.shard_uid).await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise alert")]
async fn enterprise_alert_get(
&self,
Parameters(params): Parameters<AlertIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(alert_uid = %params.alert_uid, "Tool called: enterprise_alert_get");
let tools = self.get_enterprise_tools().await?;
tools.get_alert(¶ms.alert_uid).await
}
#[tool(description = "List all users in the Redis Enterprise cluster")]
async fn enterprise_users_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_users_list");
let tools = self.get_enterprise_tools().await?;
tools.list_users().await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise user")]
async fn enterprise_user_get(
&self,
Parameters(params): Parameters<UserIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(user_id = params.user_id, "Tool called: enterprise_user_get");
let tools = self.get_enterprise_tools().await?;
tools.get_user(params.user_id).await
}
#[tool(description = "Create a new user in the Redis Enterprise cluster")]
async fn enterprise_user_create(
&self,
Parameters(params): Parameters<CreateUserParam>,
) -> Result<CallToolResult, RmcpError> {
info!(email = %params.email, "Tool called: enterprise_user_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_user(
¶ms.email,
¶ms.password,
¶ms.role,
params.name.as_deref(),
)
.await
}
#[tool(description = "Delete a user from the Redis Enterprise cluster")]
async fn enterprise_user_delete(
&self,
Parameters(params): Parameters<UserIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
user_id = params.user_id,
"Tool called: enterprise_user_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_user(params.user_id).await
}
#[tool(description = "List all roles in the Redis Enterprise cluster")]
async fn enterprise_roles_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_roles_list");
let tools = self.get_enterprise_tools().await?;
tools.list_roles().await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise role")]
async fn enterprise_role_get(
&self,
Parameters(params): Parameters<RoleIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(role_id = params.role_id, "Tool called: enterprise_role_get");
let tools = self.get_enterprise_tools().await?;
tools.get_role(params.role_id).await
}
#[tool(description = "Create a new role in the Redis Enterprise cluster")]
async fn enterprise_role_create(
&self,
Parameters(params): Parameters<CreateRoleParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_role_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_role(¶ms.name, params.management.as_deref())
.await
}
#[tool(description = "Delete a role from the Redis Enterprise cluster")]
async fn enterprise_role_delete(
&self,
Parameters(params): Parameters<RoleIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
role_id = params.role_id,
"Tool called: enterprise_role_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_role(params.role_id).await
}
#[tool(description = "List all Redis ACLs in the Redis Enterprise cluster")]
async fn enterprise_acls_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_acls_list");
let tools = self.get_enterprise_tools().await?;
tools.list_acls().await
}
#[tool(description = "Get detailed information about a specific Redis ACL")]
async fn enterprise_acl_get(
&self,
Parameters(params): Parameters<AclIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(acl_id = params.acl_id, "Tool called: enterprise_acl_get");
let tools = self.get_enterprise_tools().await?;
tools.get_acl(params.acl_id).await
}
#[tool(description = "Create a new Redis ACL in the Redis Enterprise cluster")]
async fn enterprise_acl_create(
&self,
Parameters(params): Parameters<CreateAclParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_acl_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_acl(¶ms.name, ¶ms.acl, params.description.as_deref())
.await
}
#[tool(description = "Delete a Redis ACL from the Redis Enterprise cluster")]
async fn enterprise_acl_delete(
&self,
Parameters(params): Parameters<AclIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(acl_id = params.acl_id, "Tool called: enterprise_acl_delete");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_acl(params.acl_id).await
}
#[tool(description = "List all Redis modules available in the Redis Enterprise cluster")]
async fn enterprise_modules_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_modules_list");
let tools = self.get_enterprise_tools().await?;
tools.list_modules().await
}
#[tool(description = "Get detailed information about a specific Redis module")]
async fn enterprise_module_get(
&self,
Parameters(params): Parameters<ModuleIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(module_uid = %params.module_uid, "Tool called: enterprise_module_get");
let tools = self.get_enterprise_tools().await?;
tools.get_module(¶ms.module_uid).await
}
#[tool(description = "List all Active-Active (CRDB) databases in the Redis Enterprise cluster")]
async fn enterprise_crdbs_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_crdbs_list");
let tools = self.get_enterprise_tools().await?;
tools.list_crdbs().await
}
#[tool(description = "Get detailed information about a specific Active-Active (CRDB) database")]
async fn enterprise_crdb_get(
&self,
Parameters(params): Parameters<CrdbGuidParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_get");
let tools = self.get_enterprise_tools().await?;
tools.get_crdb(¶ms.crdb_guid).await
}
#[tool(description = "Update an Active-Active (CRDB) database configuration")]
async fn enterprise_crdb_update(
&self,
Parameters(params): Parameters<UpdateCrdbParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_update");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(memory_size) = params.memory_size {
updates.insert("memory_size".to_string(), serde_json::json!(memory_size));
}
if let Some(encryption) = params.encryption {
updates.insert("encryption".to_string(), serde_json::json!(encryption));
}
if let Some(ref data_persistence) = params.data_persistence {
updates.insert(
"data_persistence".to_string(),
serde_json::json!(data_persistence),
);
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: memory_size, encryption, data_persistence",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_crdb(¶ms.crdb_guid, serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Delete an Active-Active (CRDB) database. This is a destructive operation."
)]
async fn enterprise_crdb_delete(
&self,
Parameters(params): Parameters<CrdbGuidParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_delete");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_crdb(¶ms.crdb_guid).await
}
#[tool(description = "List debug info collection tasks in the Redis Enterprise cluster")]
async fn enterprise_debuginfo_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_debuginfo_list");
let tools = self.get_enterprise_tools().await?;
tools.list_debuginfo().await
}
#[tool(description = "Get the status of a specific debug info collection task")]
async fn enterprise_debuginfo_status(
&self,
Parameters(params): Parameters<DebugInfoTaskIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(task_id = %params.task_id, "Tool called: enterprise_debuginfo_status");
let tools = self.get_enterprise_tools().await?;
tools.get_debuginfo_status(¶ms.task_id).await
}
}
#[tool_handler]
impl ServerHandler for RedisCtlMcp {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ServerCapabilities::builder().enable_tools().build(),
server_info: Implementation::from_build_env(),
instructions: Some(
"Redis Cloud and Enterprise management tools. \
Use cloud_* tools for Redis Cloud operations and \
enterprise_* tools for Redis Enterprise operations. \
All tools are currently read-only."
.to_string(),
),
}
}
async fn initialize(
&self,
_request: InitializeRequestParam,
_context: RequestContext<RoleServer>,
) -> Result<InitializeResult, RmcpError> {
info!("MCP client connected, initializing session");
Ok(self.get_info())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_creation() {
let server = RedisCtlMcp::new(None, true);
assert!(server.is_ok());
let server = server.unwrap();
assert!(server.config().read_only);
assert!(server.config().profile.is_none());
}
#[test]
fn test_server_with_profile() {
let server = RedisCtlMcp::new(Some("test-profile"), false);
assert!(server.is_ok());
let server = server.unwrap();
assert!(!server.config().read_only);
assert_eq!(server.config().profile, Some("test-profile".to_string()));
}
}