use std::sync::Arc;
use std::time::Duration;
use redis_enterprise::alerts::AlertHandler;
use redis_enterprise::bdb::{CreateDatabaseRequest, DatabaseHandler};
use redis_enterprise::crdb::CrdbHandler;
use redis_enterprise::stats::{StatsHandler, StatsQuery};
use redisctl_core::enterprise::{
backup_database_and_wait, flush_database_and_wait, import_database_and_wait,
};
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::Value;
use tower_mcp::extract::{Json, State};
use tower_mcp::{CallToolResult, Error as McpError, McpRouter, ResultExt, Tool, ToolBuilder};
use crate::state::AppState;
use crate::tools::wrap_list;
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ListDatabasesInput {
#[serde(default)]
pub profile: Option<String>,
#[serde(default)]
pub name_filter: Option<String>,
#[serde(default)]
pub status_filter: Option<String>,
}
pub fn list_databases(state: Arc<AppState>) -> Tool {
ToolBuilder::new("list_enterprise_databases")
.description(
"List all databases on the Redis Enterprise cluster. Supports filtering by name \
(case-insensitive substring match) and status.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, ListDatabasesInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ListDatabasesInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = DatabaseHandler::new(client);
let databases = handler
.list()
.await
.tool_context("Failed to list databases")?;
let filtered: Vec<_> = databases
.into_iter()
.filter(|db| {
if let Some(filter) = &input.name_filter {
db.name.to_lowercase().contains(&filter.to_lowercase())
} else {
true
}
})
.filter(|db| {
if let Some(filter) = &input.status_filter {
db.status
.as_ref()
.map(|s| s.to_lowercase() == filter.to_lowercase())
.unwrap_or(false)
} else {
true
}
})
.collect();
wrap_list("databases", &filtered)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct GetDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub uid: u32,
}
pub fn get_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("get_enterprise_database")
.description("Get detailed information about a specific Redis Enterprise database")
.read_only_safe()
.extractor_handler_typed::<_, _, _, GetDatabaseInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<GetDatabaseInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = DatabaseHandler::new(client);
let database = handler
.get(input.uid)
.await
.tool_context("Failed to get database")?;
CallToolResult::from_serialize(&database)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct GetDatabaseStatsInput {
#[serde(default)]
pub profile: Option<String>,
pub uid: u32,
#[serde(default)]
pub interval: Option<String>,
#[serde(default)]
pub start_time: Option<String>,
#[serde(default)]
pub end_time: Option<String>,
}
pub fn get_database_stats(state: Arc<AppState>) -> Tool {
ToolBuilder::new("get_database_stats")
.description(
"Get statistics for a specific database. By default returns the latest stats. \
Optionally specify interval and time range for historical data.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, GetDatabaseStatsInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<GetDatabaseStatsInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = StatsHandler::new(client);
if input.interval.is_some()
|| input.start_time.is_some()
|| input.end_time.is_some()
{
let query = StatsQuery {
interval: input.interval,
stime: input.start_time,
etime: input.end_time,
metrics: None,
};
let stats = handler
.database(input.uid, Some(query))
.await
.tool_context("Failed to get database stats")?;
CallToolResult::from_serialize(&stats)
} else {
let stats = handler
.database_last(input.uid)
.await
.tool_context("Failed to get database stats")?;
CallToolResult::from_serialize(&stats)
}
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct GetDatabaseEndpointsInput {
#[serde(default)]
pub profile: Option<String>,
pub uid: u32,
}
pub fn get_database_endpoints(state: Arc<AppState>) -> Tool {
ToolBuilder::new("get_database_endpoints")
.description(
"Get connection endpoints for a specific database in the Redis Enterprise cluster",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, GetDatabaseEndpointsInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<GetDatabaseEndpointsInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = DatabaseHandler::new(client);
let endpoints = handler
.endpoints(input.uid)
.await
.tool_context("Failed to get endpoints")?;
wrap_list("endpoints", &endpoints)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ListDatabaseAlertsInput {
#[serde(default)]
pub profile: Option<String>,
pub uid: u32,
}
pub fn list_database_alerts(state: Arc<AppState>) -> Tool {
ToolBuilder::new("list_database_alerts")
.description("List all alerts for a specific database in the Redis Enterprise cluster")
.read_only_safe()
.extractor_handler_typed::<_, _, _, ListDatabaseAlertsInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<ListDatabaseAlertsInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = AlertHandler::new(client);
let alerts = handler
.list_by_database(input.uid)
.await
.tool_context("Failed to list database alerts")?;
wrap_list("alerts", &alerts)
},
)
.build()
}
fn default_enterprise_timeout() -> u64 {
600
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct BackupDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub bdb_uid: u32,
#[serde(default = "default_enterprise_timeout")]
pub timeout_seconds: u64,
}
pub fn backup_enterprise_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("backup_enterprise_database")
.description(
"Trigger a backup of a Redis Enterprise database and wait for completion. \
Requires write permission.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, BackupDatabaseInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<BackupDatabaseInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
backup_database_and_wait(
&client,
input.bdb_uid,
Duration::from_secs(input.timeout_seconds),
None,
)
.await
.tool_context("Failed to backup database")?;
CallToolResult::from_serialize(&serde_json::json!({
"message": "Backup completed successfully",
"bdb_uid": input.bdb_uid
}))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ImportDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub bdb_uid: u32,
pub import_location: String,
#[serde(default)]
pub flush: bool,
#[serde(default = "default_enterprise_timeout")]
pub timeout_seconds: u64,
}
pub fn import_enterprise_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("import_enterprise_database")
.description(
"Import data into a Redis Enterprise database from an external source and wait for completion. \
WARNING: If flush is true, existing data will be deleted before import. \
Requires write permission.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, ImportDatabaseInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<ImportDatabaseInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
import_database_and_wait(
&client,
input.bdb_uid,
&input.import_location,
input.flush,
Duration::from_secs(input.timeout_seconds),
None,
)
.await
.tool_context("Failed to import database")?;
CallToolResult::from_serialize(&serde_json::json!({
"message": "Import completed successfully",
"bdb_uid": input.bdb_uid,
"import_location": input.import_location
}))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct CreateEnterpriseDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub name: String,
pub memory_size: Option<u64>,
pub port: Option<u16>,
#[serde(default)]
pub replication: Option<bool>,
pub persistence: Option<String>,
pub eviction_policy: Option<String>,
#[serde(default)]
pub sharding: Option<bool>,
pub shards_count: Option<u32>,
}
pub fn create_enterprise_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("create_enterprise_database")
.description(
"Create a new database in the Redis Enterprise cluster. \
Returns the created database details. Requires write permission.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, CreateEnterpriseDatabaseInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<CreateEnterpriseDatabaseInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let request = CreateDatabaseRequest {
name: input.name.clone(),
memory_size: input.memory_size,
port: input.port,
replication: input.replication,
persistence: input.persistence.clone(),
eviction_policy: input.eviction_policy.clone(),
sharding: input.sharding,
shards_count: input.shards_count,
shard_count: None,
proxy_policy: None,
rack_aware: None,
module_list: None,
crdt: None,
authentication_redis_pass: None,
};
let handler = DatabaseHandler::new(client);
let database = handler
.create(request)
.await
.tool_context("Failed to create database")?;
CallToolResult::from_serialize(&database)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct UpdateEnterpriseDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub uid: u32,
pub updates: Value,
}
pub fn update_enterprise_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("update_enterprise_database")
.description(
"Update configuration of an existing Redis Enterprise database. \
Pass a JSON object with the fields to update. Requires write permission.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, UpdateEnterpriseDatabaseInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<UpdateEnterpriseDatabaseInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = DatabaseHandler::new(client);
let database = handler
.update(input.uid, input.updates)
.await
.tool_context("Failed to update database")?;
CallToolResult::from_serialize(&database)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct DeleteEnterpriseDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub uid: u32,
}
pub fn delete_enterprise_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("delete_enterprise_database")
.description(
"DANGEROUS: Permanently deletes a database from the Redis Enterprise cluster \
and all its data. This action cannot be undone. Requires write permission.",
)
.destructive()
.extractor_handler_typed::<_, _, _, DeleteEnterpriseDatabaseInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<DeleteEnterpriseDatabaseInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = DatabaseHandler::new(client);
handler
.delete(input.uid)
.await
.tool_context("Failed to delete database")?;
CallToolResult::from_serialize(&serde_json::json!({
"message": "Database deleted successfully",
"uid": input.uid
}))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct FlushEnterpriseDatabaseInput {
#[serde(default)]
pub profile: Option<String>,
pub bdb_uid: u32,
#[serde(default = "default_enterprise_timeout")]
pub timeout_seconds: u64,
}
pub fn flush_enterprise_database(state: Arc<AppState>) -> Tool {
ToolBuilder::new("flush_enterprise_database")
.description(
"DANGEROUS: Removes all data from a Redis Enterprise database. \
This action cannot be undone. Requires write permission.",
)
.destructive()
.extractor_handler_typed::<_, _, _, FlushEnterpriseDatabaseInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<FlushEnterpriseDatabaseInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
flush_database_and_wait(
&client,
input.bdb_uid,
Duration::from_secs(input.timeout_seconds),
None,
)
.await
.tool_context("Failed to flush database")?;
CallToolResult::from_serialize(&serde_json::json!({
"message": "Database flushed successfully",
"bdb_uid": input.bdb_uid
}))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ListCrdbsInput {
#[serde(default)]
pub profile: Option<String>,
}
pub fn list_enterprise_crdbs(state: Arc<AppState>) -> Tool {
ToolBuilder::new("list_enterprise_crdbs")
.description(
"List all Active-Active (CRDB) databases in the Redis Enterprise cluster. \
Returns database names, GUIDs, status, and instance information.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, ListCrdbsInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ListCrdbsInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = CrdbHandler::new(client);
let crdbs = handler.list().await.tool_context("Failed to list CRDBs")?;
wrap_list("crdbs", &crdbs)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct GetCrdbInput {
#[serde(default)]
pub profile: Option<String>,
pub guid: String,
}
pub fn get_enterprise_crdb(state: Arc<AppState>) -> Tool {
ToolBuilder::new("get_enterprise_crdb")
.description(
"Get detailed information about a specific Active-Active (CRDB) database \
including instances, replication status, and configuration.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, GetCrdbInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<GetCrdbInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = CrdbHandler::new(client);
let crdb = handler
.get(&input.guid)
.await
.tool_context("Failed to get CRDB")?;
CallToolResult::from_serialize(&crdb)
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct GetCrdbTasksInput {
#[serde(default)]
pub profile: Option<String>,
pub guid: String,
}
pub fn get_enterprise_crdb_tasks(state: Arc<AppState>) -> Tool {
ToolBuilder::new("get_enterprise_crdb_tasks")
.description(
"Get tasks for a specific Active-Active (CRDB) database. \
Returns pending and completed tasks related to CRDB operations.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, GetCrdbTasksInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<GetCrdbTasksInput>| async move {
let client = state
.enterprise_client_for_profile(input.profile.as_deref())
.await
.map_err(|e| crate::tools::credential_error("enterprise", e))?;
let handler = CrdbHandler::new(client);
let tasks = handler
.tasks(&input.guid)
.await
.tool_context("Failed to get CRDB tasks")?;
CallToolResult::from_serialize(&tasks)
},
)
.build()
}
pub fn router(state: Arc<AppState>) -> McpRouter {
McpRouter::new()
.tool(list_databases(state.clone()))
.tool(get_database(state.clone()))
.tool(get_database_stats(state.clone()))
.tool(get_database_endpoints(state.clone()))
.tool(list_database_alerts(state.clone()))
.tool(list_enterprise_crdbs(state.clone()))
.tool(get_enterprise_crdb(state.clone()))
.tool(get_enterprise_crdb_tasks(state.clone()))
.tool(backup_enterprise_database(state.clone()))
.tool(import_enterprise_database(state.clone()))
.tool(create_enterprise_database(state.clone()))
.tool(update_enterprise_database(state.clone()))
.tool(delete_enterprise_database(state.clone()))
.tool(flush_enterprise_database(state.clone()))
}