use std::collections::HashSet;
use std::sync::Arc;
use anyhow::Result;
use clap::{Parser, ValueEnum};
use redisctl_core::Config;
#[cfg(any(feature = "cloud", feature = "enterprise", feature = "database"))]
use redisctl_core::DeploymentType;
use tower_mcp::{CapabilityFilter, DenialBehavior, McpRouter, Tool, transport::StdioTransport};
use tracing::info;
use tracing_subscriber::{EnvFilter, fmt, prelude::*};
mod error;
mod prompts;
mod resources;
mod state;
mod tools;
use state::{AppState, CredentialSource};
#[derive(Debug, Clone, Copy, Default, ValueEnum)]
enum Transport {
#[default]
Stdio,
Http,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ValueEnum)]
enum Toolset {
#[cfg(feature = "cloud")]
Cloud,
#[cfg(feature = "enterprise")]
Enterprise,
#[cfg(feature = "database")]
Database,
App,
}
impl std::fmt::Display for Toolset {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(feature = "cloud")]
Toolset::Cloud => write!(f, "cloud"),
#[cfg(feature = "enterprise")]
Toolset::Enterprise => write!(f, "enterprise"),
#[cfg(feature = "database")]
Toolset::Database => write!(f, "database"),
Toolset::App => write!(f, "app"),
}
}
}
#[derive(Parser, Debug)]
#[command(name = "redisctl-mcp")]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long, value_enum, default_value = "stdio")]
transport: Transport,
#[arg(short, long, env = "REDISCTL_PROFILE")]
profile: Vec<String>,
#[arg(long, default_value = "true")]
read_only: bool,
#[arg(long, env = "REDIS_URL")]
database_url: Option<String>,
#[arg(long, value_delimiter = ',', value_enum)]
tools: Option<Vec<Toolset>>,
#[arg(long, default_value = "127.0.0.1")]
host: String,
#[arg(long, default_value = "8080")]
port: u16,
#[arg(long)]
oauth: bool,
#[arg(long, env = "OAUTH_ISSUER")]
oauth_issuer: Option<String>,
#[arg(long, env = "OAUTH_AUDIENCE")]
oauth_audience: Option<String>,
#[arg(long, env = "OAUTH_JWKS_URI")]
jwks_uri: Option<String>,
#[arg(long, default_value = "10")]
max_concurrent: usize,
#[arg(long, default_value = "100")]
rate_limit_ms: u64,
#[arg(long, default_value = "30")]
request_timeout_secs: u64,
#[arg(long, default_value = "info", env = "RUST_LOG")]
log_level: String,
}
fn toolsets_from_config(config: &Config) -> Option<HashSet<Toolset>> {
if config.profiles.is_empty() {
return None;
}
let mut set = HashSet::new();
set.insert(Toolset::App);
#[cfg(feature = "cloud")]
if !config
.get_profiles_of_type(DeploymentType::Cloud)
.is_empty()
{
set.insert(Toolset::Cloud);
}
#[cfg(feature = "enterprise")]
if !config
.get_profiles_of_type(DeploymentType::Enterprise)
.is_empty()
{
set.insert(Toolset::Enterprise);
}
#[cfg(feature = "database")]
if !config
.get_profiles_of_type(DeploymentType::Database)
.is_empty()
{
set.insert(Toolset::Database);
}
Some(set)
}
fn detect_toolsets_from_config() -> Option<HashSet<Toolset>> {
let config = Config::load().ok()?;
let result = toolsets_from_config(&config);
if let Some(ref toolsets) = result {
let names: Vec<String> = toolsets.iter().map(|t| t.to_string()).collect();
info!(toolsets = ?names, "Auto-detected toolsets from config profiles");
}
result
}
fn enabled_toolsets(args: &Args) -> HashSet<Toolset> {
if let Some(ref tools) = args.tools {
return tools.iter().copied().collect();
}
if let Some(toolsets) = detect_toolsets_from_config() {
return toolsets;
}
let mut set = HashSet::new();
#[cfg(feature = "cloud")]
set.insert(Toolset::Cloud);
#[cfg(feature = "enterprise")]
set.insert(Toolset::Enterprise);
#[cfg(feature = "database")]
set.insert(Toolset::Database);
set.insert(Toolset::App);
set
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::registry()
.with(fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| args.log_level.clone().into()))
.init();
let enabled = enabled_toolsets(&args);
let enabled_names: Vec<String> = enabled.iter().map(|t| t.to_string()).collect();
info!(
transport = ?args.transport,
profiles = ?args.profile,
read_only = args.read_only,
toolsets = ?enabled_names,
"Starting redisctl-mcp server"
);
let credential_source = if args.oauth {
CredentialSource::OAuth {
issuer: args.oauth_issuer.clone(),
audience: args.oauth_audience.clone(),
}
} else {
CredentialSource::Profiles(args.profile.clone())
};
let state = Arc::new(AppState::new(
credential_source,
args.read_only,
args.database_url.clone(),
)?);
let router = build_router(state.clone(), args.read_only, &enabled)?;
match args.transport {
Transport::Stdio => {
info!("Running with stdio transport");
StdioTransport::new(router).run().await?;
}
Transport::Http => {
info!(host = %args.host, port = args.port, "Running with HTTP transport");
run_http_server(router, &args).await?;
}
}
Ok(())
}
fn build_router(
state: Arc<AppState>,
read_only: bool,
enabled: &HashSet<Toolset>,
) -> Result<McpRouter> {
let mut router = McpRouter::new().server_info("redisctl-mcp", env!("CARGO_PKG_VERSION"));
#[cfg(feature = "cloud")]
if enabled.contains(&Toolset::Cloud) {
router = router.merge(tools::cloud::router(state.clone()));
}
#[cfg(feature = "enterprise")]
if enabled.contains(&Toolset::Enterprise) {
router = router.merge(tools::enterprise::router(state.clone()));
}
#[cfg(feature = "database")]
if enabled.contains(&Toolset::Database) {
router = router.merge(tools::redis::router(state.clone()));
}
if enabled.contains(&Toolset::App) {
router = router.merge(tools::profile::router(state.clone()));
}
let safety_tier = if read_only {
"**Active safety tier: READ-ONLY** -- only read-only tools are available. \
Write and destructive tools are hidden and will return unauthorized if called directly."
} else {
"**Active safety tier: WRITE-ENABLED** -- all tools including writes are available. \
Destructive tools (delete, flush) are enabled. Exercise caution with destructive tools."
};
let prefix = format!(
"# Redis Cloud and Enterprise MCP Server\n\n\
## Safety Model\n\n\
Every tool carries MCP annotation hints that describe its safety characteristics:\n\
- `readOnlyHint = true` -- reads data, never modifies state\n\
- `destructiveHint = false` -- writes data but is non-destructive (create, update, backup)\n\
- `destructiveHint = true` -- irreversible operation (delete, flush)\n\n\
{safety_tier}\n",
);
let suffix = "\n## Authentication\n\n\
In stdio mode, credentials are resolved from redisctl profiles.\n\
In HTTP mode with OAuth, credentials can be passed via JWT claims.";
router = router.auto_instructions_with(Some(prefix), Some(suffix));
let router = if read_only {
info!("Applying read-only filter - write tools will be hidden");
router.tool_filter(
CapabilityFilter::<Tool>::write_guard(|_session| false)
.denial_behavior(DenialBehavior::Unauthorized),
)
} else {
router
};
Ok(router)
}
#[cfg(feature = "http")]
async fn run_http_server(router: McpRouter, args: &Args) -> Result<()> {
use std::time::Duration;
use tower::limit::ConcurrencyLimitLayer;
use tower::timeout::TimeoutLayer;
use tower_mcp::HttpTransport;
let addr = format!("{}:{}", args.host, args.port);
let transport = HttpTransport::new(router)
.layer(TimeoutLayer::new(Duration::from_secs(
args.request_timeout_secs,
)))
.layer(ConcurrencyLimitLayer::new(args.max_concurrent));
if args.oauth {
let _issuer = args
.oauth_issuer
.as_ref()
.ok_or_else(|| anyhow::anyhow!("--oauth-issuer required when OAuth is enabled"))?;
info!(issuer = %_issuer, "OAuth authentication enabled");
}
transport.serve(&addr).await?;
Ok(())
}
#[cfg(not(feature = "http"))]
async fn run_http_server(_router: McpRouter, _args: &Args) -> Result<()> {
anyhow::bail!("HTTP transport requires the 'http' feature")
}
#[cfg(test)]
mod tests {
use super::*;
use redisctl_core::{Profile, ProfileCredentials};
fn cloud_profile() -> Profile {
Profile {
deployment_type: DeploymentType::Cloud,
credentials: ProfileCredentials::Cloud {
api_key: "key".to_string(),
api_secret: "secret".to_string(),
api_url: "https://api.redislabs.com/v1".to_string(),
},
files_api_key: None,
resilience: None,
tags: vec![],
}
}
fn enterprise_profile() -> Profile {
Profile {
deployment_type: DeploymentType::Enterprise,
credentials: ProfileCredentials::Enterprise {
url: "https://localhost:9443".to_string(),
username: "admin".to_string(),
password: Some("password".to_string()),
insecure: false,
ca_cert: None,
},
files_api_key: None,
resilience: None,
tags: vec![],
}
}
fn database_profile() -> Profile {
Profile {
deployment_type: DeploymentType::Database,
credentials: ProfileCredentials::Database {
host: "localhost".to_string(),
port: 6379,
password: None,
tls: false,
username: "default".to_string(),
database: 0,
},
files_api_key: None,
resilience: None,
tags: vec![],
}
}
#[test]
fn empty_config_returns_none() {
let config = Config::default();
assert!(toolsets_from_config(&config).is_none());
}
#[test]
fn cloud_only_profiles() {
let mut config = Config::default();
config.set_profile("mycloud".to_string(), cloud_profile());
let toolsets = toolsets_from_config(&config).unwrap();
assert!(toolsets.contains(&Toolset::App));
#[cfg(feature = "cloud")]
assert!(toolsets.contains(&Toolset::Cloud));
#[cfg(feature = "enterprise")]
assert!(!toolsets.contains(&Toolset::Enterprise));
#[cfg(feature = "database")]
assert!(!toolsets.contains(&Toolset::Database));
}
#[test]
fn enterprise_only_profiles() {
let mut config = Config::default();
config.set_profile("myent".to_string(), enterprise_profile());
let toolsets = toolsets_from_config(&config).unwrap();
assert!(toolsets.contains(&Toolset::App));
#[cfg(feature = "cloud")]
assert!(!toolsets.contains(&Toolset::Cloud));
#[cfg(feature = "enterprise")]
assert!(toolsets.contains(&Toolset::Enterprise));
#[cfg(feature = "database")]
assert!(!toolsets.contains(&Toolset::Database));
}
#[test]
fn cloud_and_enterprise_profiles() {
let mut config = Config::default();
config.set_profile("mycloud".to_string(), cloud_profile());
config.set_profile("myent".to_string(), enterprise_profile());
let toolsets = toolsets_from_config(&config).unwrap();
assert!(toolsets.contains(&Toolset::App));
#[cfg(feature = "cloud")]
assert!(toolsets.contains(&Toolset::Cloud));
#[cfg(feature = "enterprise")]
assert!(toolsets.contains(&Toolset::Enterprise));
#[cfg(feature = "database")]
assert!(!toolsets.contains(&Toolset::Database));
}
#[test]
fn database_only_profiles() {
let mut config = Config::default();
config.set_profile("mydb".to_string(), database_profile());
let toolsets = toolsets_from_config(&config).unwrap();
assert!(toolsets.contains(&Toolset::App));
#[cfg(feature = "cloud")]
assert!(!toolsets.contains(&Toolset::Cloud));
#[cfg(feature = "enterprise")]
assert!(!toolsets.contains(&Toolset::Enterprise));
#[cfg(feature = "database")]
assert!(toolsets.contains(&Toolset::Database));
}
#[test]
fn all_three_profile_types() {
let mut config = Config::default();
config.set_profile("mycloud".to_string(), cloud_profile());
config.set_profile("myent".to_string(), enterprise_profile());
config.set_profile("mydb".to_string(), database_profile());
let toolsets = toolsets_from_config(&config).unwrap();
assert!(toolsets.contains(&Toolset::App));
#[cfg(feature = "cloud")]
assert!(toolsets.contains(&Toolset::Cloud));
#[cfg(feature = "enterprise")]
assert!(toolsets.contains(&Toolset::Enterprise));
#[cfg(feature = "database")]
assert!(toolsets.contains(&Toolset::Database));
}
fn assert_read_only(tool: &Tool, name: &str) {
let ann = tool
.annotations
.as_ref()
.unwrap_or_else(|| panic!("{name}: missing annotations"));
assert!(ann.read_only_hint, "{name}: should be read_only");
assert!(ann.idempotent_hint, "{name}: should be idempotent");
assert!(
!ann.destructive_hint,
"{name}: should be non-destructive (destructive_hint=false)"
);
}
fn assert_non_destructive_write(tool: &Tool, name: &str) {
let ann = tool
.annotations
.as_ref()
.unwrap_or_else(|| panic!("{name}: missing annotations"));
assert!(!ann.read_only_hint, "{name}: should NOT be read_only");
assert!(
!ann.destructive_hint,
"{name}: should be non-destructive (destructive_hint=false)"
);
}
fn assert_destructive(tool: &Tool, name: &str) {
let ann = tool
.annotations
.as_ref()
.unwrap_or_else(|| panic!("{name}: missing annotations"));
assert!(
ann.destructive_hint,
"{name}: should be destructive (destructive_hint=true)"
);
assert!(
!ann.read_only_hint,
"{name}: destructive tool should NOT be read_only"
);
let desc = tool
.description
.as_deref()
.unwrap_or_else(|| panic!("{name}: missing description"));
assert!(
desc.starts_with("DANGEROUS:"),
"{name}: destructive tool description should start with 'DANGEROUS:', got: {desc}"
);
}
fn test_state() -> Arc<AppState> {
Arc::new(AppState::new(state::CredentialSource::Profiles(vec![]), true, None).unwrap())
}
#[test]
fn profile_read_tools_are_read_only() {
let state = test_state();
assert_read_only(
&tools::profile::list_profiles(state.clone()),
"profile_list",
);
assert_read_only(&tools::profile::show_profile(state.clone()), "profile_show");
assert_read_only(&tools::profile::config_path(state.clone()), "profile_path");
assert_read_only(
&tools::profile::validate_config(state.clone()),
"profile_validate",
);
}
#[test]
fn profile_write_tools_are_non_destructive() {
let state = test_state();
assert_non_destructive_write(
&tools::profile::create_profile(state.clone()),
"profile_create",
);
assert_non_destructive_write(
&tools::profile::set_default_cloud(state.clone()),
"profile_set_default_cloud",
);
assert_non_destructive_write(
&tools::profile::set_default_enterprise(state.clone()),
"profile_set_default_enterprise",
);
}
#[test]
fn profile_destructive_tools() {
let state = test_state();
assert_destructive(
&tools::profile::delete_profile(state.clone()),
"profile_delete",
);
}
#[cfg(feature = "cloud")]
mod cloud_annotations {
use super::*;
#[test]
fn cloud_read_tools_are_read_only() {
let state = test_state();
assert_read_only(
&tools::cloud::list_subscriptions(state.clone()),
"list_subscriptions",
);
assert_read_only(&tools::cloud::get_account(state.clone()), "get_account");
assert_read_only(
&tools::cloud::list_fixed_subscriptions(state.clone()),
"list_fixed_subscriptions",
);
assert_read_only(
&tools::cloud::get_vpc_peering(state.clone()),
"get_vpc_peering",
);
}
#[test]
fn cloud_write_tools_are_non_destructive() {
let state = test_state();
assert_non_destructive_write(
&tools::cloud::create_database(state.clone()),
"create_database",
);
assert_non_destructive_write(
&tools::cloud::update_database(state.clone()),
"update_database",
);
assert_non_destructive_write(
&tools::cloud::backup_database(state.clone()),
"backup_database",
);
assert_non_destructive_write(
&tools::cloud::create_acl_user(state.clone()),
"create_acl_user",
);
assert_non_destructive_write(
&tools::cloud::create_vpc_peering(state.clone()),
"create_vpc_peering",
);
assert_non_destructive_write(
&tools::cloud::create_fixed_database(state.clone()),
"create_fixed_database",
);
}
#[test]
fn cloud_destructive_tools() {
let state = test_state();
assert_destructive(
&tools::cloud::delete_database(state.clone()),
"delete_database",
);
assert_destructive(
&tools::cloud::delete_subscription(state.clone()),
"delete_subscription",
);
assert_destructive(
&tools::cloud::flush_database(state.clone()),
"flush_database",
);
assert_destructive(
&tools::cloud::delete_acl_user(state.clone()),
"delete_acl_user",
);
assert_destructive(
&tools::cloud::delete_cloud_account(state.clone()),
"delete_cloud_account",
);
assert_destructive(
&tools::cloud::delete_vpc_peering(state.clone()),
"delete_vpc_peering",
);
assert_destructive(
&tools::cloud::delete_private_link(state.clone()),
"delete_private_link",
);
assert_destructive(
&tools::cloud::delete_fixed_database(state.clone()),
"delete_fixed_database",
);
assert_destructive(
&tools::cloud::delete_fixed_subscription(state.clone()),
"delete_fixed_subscription",
);
}
}
#[cfg(feature = "enterprise")]
mod enterprise_annotations {
use super::*;
#[test]
fn enterprise_read_tools_are_read_only() {
let state = test_state();
assert_read_only(
&tools::enterprise::get_cluster(state.clone()),
"get_cluster",
);
assert_read_only(
&tools::enterprise::list_databases(state.clone()),
"list_enterprise_databases",
);
assert_read_only(
&tools::enterprise::list_users(state.clone()),
"list_enterprise_users",
);
assert_read_only(
&tools::enterprise::list_alerts(state.clone()),
"list_alerts",
);
}
#[test]
fn enterprise_write_tools_are_non_destructive() {
let state = test_state();
assert_non_destructive_write(
&tools::enterprise::update_cluster(state.clone()),
"update_enterprise_cluster",
);
assert_non_destructive_write(
&tools::enterprise::create_enterprise_database(state.clone()),
"create_enterprise_database",
);
assert_non_destructive_write(
&tools::enterprise::create_enterprise_user(state.clone()),
"create_enterprise_user",
);
}
#[test]
fn enterprise_destructive_tools() {
let state = test_state();
assert_destructive(
&tools::enterprise::delete_enterprise_database(state.clone()),
"delete_enterprise_database",
);
assert_destructive(
&tools::enterprise::flush_enterprise_database(state.clone()),
"flush_enterprise_database",
);
assert_destructive(
&tools::enterprise::delete_enterprise_user(state.clone()),
"delete_enterprise_user",
);
assert_destructive(
&tools::enterprise::delete_enterprise_role(state.clone()),
"delete_enterprise_role",
);
assert_destructive(
&tools::enterprise::delete_enterprise_acl(state.clone()),
"delete_enterprise_acl",
);
}
}
#[cfg(feature = "database")]
mod database_annotations {
use super::*;
#[test]
fn redis_read_tools_are_read_only() {
let state = test_state();
assert_read_only(&tools::redis::ping(state.clone()), "redis_ping");
assert_read_only(&tools::redis::info(state.clone()), "redis_info");
assert_read_only(&tools::redis::keys(state.clone()), "redis_keys");
assert_read_only(&tools::redis::get(state.clone()), "redis_get");
assert_read_only(&tools::redis::hgetall(state.clone()), "redis_hgetall");
assert_read_only(
&tools::redis::health_check(state.clone()),
"redis_health_check",
);
}
#[test]
fn redis_write_tools_are_non_destructive() {
let state = test_state();
assert_non_destructive_write(
&tools::redis::config_set(state.clone()),
"redis_config_set",
);
assert_non_destructive_write(&tools::redis::set(state.clone()), "redis_set");
assert_non_destructive_write(&tools::redis::expire(state.clone()), "redis_expire");
assert_non_destructive_write(&tools::redis::hset(state.clone()), "redis_hset");
assert_non_destructive_write(&tools::redis::lpush(state.clone()), "redis_lpush");
assert_non_destructive_write(&tools::redis::xadd(state.clone()), "redis_xadd");
}
#[test]
fn redis_destructive_tools() {
let state = test_state();
assert_destructive(&tools::redis::flushdb(state.clone()), "redis_flushdb");
assert_destructive(&tools::redis::del(state.clone()), "redis_del");
}
}
#[test]
fn instructions_contain_safety_model() {
let state = test_state();
let enabled: HashSet<Toolset> = [Toolset::App].into_iter().collect();
let _router = build_router(state.clone(), true, &enabled).unwrap();
let _router_write = build_router(state.clone(), false, &enabled).unwrap();
}
}