use std::sync::Arc;
use schemars::JsonSchema;
use serde::Deserialize;
use tower_mcp::extract::{Json, State};
use tower_mcp::{CallToolResult, Error as McpError, McpRouter, ResultExt, Tool, ToolBuilder};
use crate::state::AppState;
pub fn router(state: Arc<AppState>) -> McpRouter {
McpRouter::new()
.tool(keys(state.clone()))
.tool(scan(state.clone()))
.tool(get(state.clone()))
.tool(key_type(state.clone()))
.tool(ttl(state.clone()))
.tool(exists(state.clone()))
.tool(memory_usage(state.clone()))
.tool(object_encoding(state.clone()))
.tool(object_freq(state.clone()))
.tool(object_idletime(state.clone()))
.tool(object_help(state.clone()))
.tool(set(state.clone()))
.tool(del(state.clone()))
.tool(expire(state.clone()))
.tool(rename(state))
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct KeysInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
#[serde(default = "default_pattern")]
pub pattern: String,
#[serde(default = "default_limit")]
pub limit: usize,
}
fn default_pattern() -> String {
"*".to_string()
}
fn default_limit() -> usize {
100
}
pub fn keys(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_keys")
.description(
"List keys matching a pattern using SCAN (production-safe, non-blocking). \
Returns up to 'limit' keys.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, KeysInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<KeysInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let mut cursor: u64 = 0;
let mut all_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&input.pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await
.tool_context("SCAN failed")?;
all_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 || all_keys.len() >= input.limit {
break;
}
}
all_keys.truncate(input.limit);
let output = if all_keys.is_empty() {
format!("No keys found matching pattern '{}'", input.pattern)
} else {
format!(
"Found {} key(s) matching '{}'\n\n{}",
all_keys.len(),
input.pattern,
all_keys.join("\n")
)
};
Ok(CallToolResult::text(output))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct GetInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn get(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_get")
.description("Get the value of a key from Redis")
.read_only_safe()
.extractor_handler_typed::<_, _, _, GetInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<GetInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let value: Option<String> = redis::cmd("GET")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("GET failed")?;
match value {
Some(v) => Ok(CallToolResult::text(v)),
None => Ok(CallToolResult::text(format!(
"(nil) - key '{}' not found",
input.key
))),
}
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct TypeInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn key_type(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_type")
.description("Get the type of a key (string, list, set, zset, hash, stream)")
.read_only_safe()
.extractor_handler_typed::<_, _, _, TypeInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<TypeInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let key_type: String = redis::cmd("TYPE")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("TYPE failed")?;
Ok(CallToolResult::text(format!("{}: {}", input.key, key_type)))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct TtlInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn ttl(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_ttl")
.description("Get the time-to-live (TTL) of a key in seconds. Returns -1 if no expiry, -2 if key doesn't exist.")
.read_only_safe()
.extractor_handler_typed::<_, _, _, TtlInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<TtlInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str())
.tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let ttl: i64 = redis::cmd("TTL")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("TTL failed")?;
let message = match ttl {
-2 => format!("{}: key does not exist", input.key),
-1 => format!("{}: no expiry set", input.key),
_ => format!("{}: {} seconds remaining", input.key, ttl),
};
Ok(CallToolResult::text(message))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ExistsInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub keys: Vec<String>,
}
pub fn exists(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_exists")
.description("Check if one or more keys exist. Returns the count of keys that exist.")
.read_only_safe()
.extractor_handler_typed::<_, _, _, ExistsInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ExistsInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let mut cmd = redis::cmd("EXISTS");
for key in &input.keys {
cmd.arg(key);
}
let count: i64 = cmd
.query_async(&mut conn)
.await
.tool_context("EXISTS failed")?;
Ok(CallToolResult::text(format!(
"{} of {} key(s) exist",
count,
input.keys.len()
)))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct MemoryUsageInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn memory_usage(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_memory_usage")
.description("Get the memory usage of a key in bytes")
.read_only_safe()
.extractor_handler_typed::<_, _, _, MemoryUsageInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<MemoryUsageInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let bytes: Option<i64> = redis::cmd("MEMORY")
.arg("USAGE")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("MEMORY USAGE failed")?;
match bytes {
Some(b) => Ok(CallToolResult::text(format!("{}: {} bytes", input.key, b))),
None => Ok(CallToolResult::text(format!(
"{}: key does not exist",
input.key
))),
}
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ScanInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
#[serde(default = "default_pattern")]
pub pattern: String,
#[serde(default)]
pub key_type: Option<String>,
#[serde(default = "default_limit")]
pub limit: usize,
}
pub fn scan(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_scan")
.description(
"Scan keys with optional type filter. More efficient than redis_keys when filtering \
by type (string, list, set, zset, hash, stream).",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, ScanInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ScanInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let mut cursor: u64 = 0;
let mut all_keys: Vec<String> = Vec::new();
loop {
let mut cmd = redis::cmd("SCAN");
cmd.arg(cursor)
.arg("MATCH")
.arg(&input.pattern)
.arg("COUNT")
.arg(100);
if let Some(ref key_type) = input.key_type {
cmd.arg("TYPE").arg(key_type);
}
let (new_cursor, keys): (u64, Vec<String>) = cmd
.query_async(&mut conn)
.await
.tool_context("SCAN failed")?;
all_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 || all_keys.len() >= input.limit {
break;
}
}
all_keys.truncate(input.limit);
let type_info = input
.key_type
.as_ref()
.map(|t| format!(" of type '{}'", t))
.unwrap_or_default();
let output = if all_keys.is_empty() {
format!(
"No keys{} found matching pattern '{}'",
type_info, input.pattern
)
} else {
format!(
"Found {} key(s){} matching '{}'\n\n{}",
all_keys.len(),
type_info,
input.pattern,
all_keys.join("\n")
)
};
Ok(CallToolResult::text(output))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ObjectEncodingInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn object_encoding(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_object_encoding")
.description(
"Get the internal encoding of a key (e.g., embstr, int, raw, quicklist, listpack, \
hashtable, intset, skiplist). Useful for understanding memory usage patterns.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, ObjectEncodingInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<ObjectEncodingInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str())
.tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let encoding: Option<String> = redis::cmd("OBJECT")
.arg("ENCODING")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("OBJECT ENCODING failed")?;
match encoding {
Some(enc) => Ok(CallToolResult::text(format!("{}: {}", input.key, enc))),
None => Ok(CallToolResult::text(format!(
"{}: key does not exist",
input.key
))),
}
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ObjectFreqInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn object_freq(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_object_freq")
.description(
"Get the LFU access frequency counter for a key using OBJECT FREQ. \
Only works when maxmemory-policy is set to allkeys-lfu or volatile-lfu.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, ObjectFreqInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ObjectFreqInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let freq: i64 = redis::cmd("OBJECT")
.arg("FREQ")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("OBJECT FREQ failed")?;
Ok(CallToolResult::text(format!(
"{}: LFU frequency counter = {}",
input.key, freq
)))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ObjectIdletimeInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
}
pub fn object_idletime(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_object_idletime")
.description(
"Get the idle time of a key in seconds using OBJECT IDLETIME. \
Shows how long since the key was last accessed.",
)
.read_only_safe()
.extractor_handler_typed::<_, _, _, ObjectIdletimeInput>(
state,
|State(state): State<Arc<AppState>>,
Json(input): Json<ObjectIdletimeInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str())
.tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let idle: i64 = redis::cmd("OBJECT")
.arg("IDLETIME")
.arg(&input.key)
.query_async(&mut conn)
.await
.tool_context("OBJECT IDLETIME failed")?;
Ok(CallToolResult::text(format!(
"{}: idle for {} seconds",
input.key, idle
)))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ObjectHelpInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
}
pub fn object_help(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_object_help")
.description("Get available OBJECT subcommands using OBJECT HELP")
.read_only_safe()
.extractor_handler_typed::<_, _, _, ObjectHelpInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ObjectHelpInput>| async move {
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let result: Vec<String> = redis::cmd("OBJECT")
.arg("HELP")
.query_async(&mut conn)
.await
.tool_context("OBJECT HELP failed")?;
Ok(CallToolResult::text(format!(
"OBJECT subcommands:\n{}",
result.join("\n")
)))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct SetInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
pub value: String,
#[serde(default)]
pub ex: Option<u64>,
#[serde(default)]
pub px: Option<u64>,
#[serde(default)]
pub nx: bool,
#[serde(default)]
pub xx: bool,
}
pub fn set(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_set")
.description(
"Set a key to a string value with optional expiry and conditional flags. \
Use EX for seconds, PX for milliseconds expiry. Use NX to only set if \
the key does not exist, XX to only set if it exists.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, SetInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<SetInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let mut cmd = redis::cmd("SET");
cmd.arg(&input.key).arg(&input.value);
if let Some(ex) = input.ex {
cmd.arg("EX").arg(ex);
}
if let Some(px) = input.px {
cmd.arg("PX").arg(px);
}
if input.nx {
cmd.arg("NX");
}
if input.xx {
cmd.arg("XX");
}
let result: Option<String> = cmd
.query_async(&mut conn)
.await
.tool_context("SET failed")?;
match result {
Some(_) => Ok(CallToolResult::text(format!(
"OK - set '{}' successfully",
input.key
))),
None => Ok(CallToolResult::text(format!(
"Key '{}' not set (condition not met: {})",
input.key,
if input.nx {
"NX - key already exists"
} else {
"XX - key does not exist"
}
))),
}
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct DelInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub keys: Vec<String>,
}
pub fn del(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_del")
.description(
"DANGEROUS: Permanently deletes one or more keys and their data. \
This action cannot be undone. Returns the number of keys removed.",
)
.destructive()
.extractor_handler_typed::<_, _, _, DelInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<DelInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let mut cmd = redis::cmd("DEL");
for key in &input.keys {
cmd.arg(key);
}
let count: i64 = cmd
.query_async(&mut conn)
.await
.tool_context("DEL failed")?;
Ok(CallToolResult::text(format!(
"Deleted {} of {} key(s)",
count,
input.keys.len()
)))
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct ExpireInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
pub seconds: i64,
}
pub fn expire(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_expire")
.description(
"Set a timeout on a key in seconds. The key will be automatically deleted \
after the timeout expires. Returns whether the timeout was set.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, ExpireInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<ExpireInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let result: bool = redis::cmd("EXPIRE")
.arg(&input.key)
.arg(input.seconds)
.query_async(&mut conn)
.await
.tool_context("EXPIRE failed")?;
if result {
Ok(CallToolResult::text(format!(
"OK - TTL set to {} seconds on '{}'",
input.seconds, input.key
)))
} else {
Ok(CallToolResult::text(format!(
"Key '{}' does not exist or timeout could not be set",
input.key
)))
}
},
)
.build()
}
#[derive(Debug, Deserialize, JsonSchema)]
pub struct RenameInput {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub profile: Option<String>,
pub key: String,
pub newkey: String,
}
pub fn rename(state: Arc<AppState>) -> Tool {
ToolBuilder::new("redis_rename")
.description(
"Rename a key. Returns an error if the source key does not exist. \
If the destination key already exists, it is overwritten.",
)
.non_destructive()
.extractor_handler_typed::<_, _, _, RenameInput>(
state,
|State(state): State<Arc<AppState>>, Json(input): Json<RenameInput>| async move {
if !state.is_write_allowed() {
return Err(McpError::tool(
"Write operations not allowed in read-only mode",
));
}
let url = super::resolve_redis_url(input.url, input.profile.as_deref(), &state)?;
let client = redis::Client::open(url.as_str()).tool_context("Invalid URL")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.tool_context("Connection failed")?;
let _: () = redis::cmd("RENAME")
.arg(&input.key)
.arg(&input.newkey)
.query_async(&mut conn)
.await
.tool_context("RENAME failed")?;
Ok(CallToolResult::text(format!(
"OK - renamed '{}' to '{}'",
input.key, input.newkey
)))
},
)
.build()
}