use std::sync::Arc;
use rmcp::{
ErrorData as RmcpError, RoleServer, ServerHandler, handler::server::router::tool::ToolRouter,
handler::server::wrapper::Parameters, model::*, schemars, service::RequestContext, tool,
tool_handler, tool_router,
};
use serde::Deserialize;
use tokio::sync::RwLock;
use tracing::{debug, info};
use crate::cloud_tools::CloudTools;
use crate::database_tools::{DatabaseTools, is_write_command, value_to_json};
use crate::enterprise_tools::EnterpriseTools;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub profile: Option<String>,
pub read_only: bool,
pub database_url: Option<String>,
}
#[derive(Clone)]
pub struct RedisCtlMcp {
config: Arc<ServerConfig>,
tool_router: ToolRouter<RedisCtlMcp>,
cloud_tools: Arc<RwLock<Option<CloudTools>>>,
enterprise_tools: Arc<RwLock<Option<EnterpriseTools>>>,
database_tools: Arc<RwLock<Option<DatabaseTools>>>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct SubscriptionIdParam {
pub subscription_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseIdParam {
pub subscription_id: i64,
pub database_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TaskIdParam {
pub task_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct NodeIdParam {
pub node_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct EnterpriseDatabaseIdParam {
pub database_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateEnterpriseDatabaseParam {
pub name: String,
#[serde(default)]
pub memory_size_mb: Option<u64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateEnterpriseDatabaseParam {
pub database_id: i64,
#[serde(default)]
pub memory_size: Option<u64>,
#[serde(default)]
pub replication: Option<bool>,
#[serde(default)]
pub data_persistence: Option<String>,
#[serde(default)]
pub eviction_policy: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ExportDatabaseParam {
pub database_id: i64,
pub export_location: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ImportDatabaseParam {
pub database_id: i64,
pub import_location: String,
#[serde(default)]
pub flush_before_import: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct RestoreDatabaseParam {
pub database_id: i64,
#[serde(default)]
pub backup_uid: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateClusterParam {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub email_alerts: Option<bool>,
#[serde(default)]
pub rack_aware: Option<bool>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateNodeParam {
pub node_id: i64,
#[serde(default)]
pub accept_servers: Option<bool>,
#[serde(default)]
pub external_addr: Option<Vec<String>>,
#[serde(default)]
pub rack_id: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ShardIdParam {
pub shard_uid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct AlertIdParam {
pub alert_uid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UserIdParam {
pub user_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateUserParam {
pub email: String,
pub password: String,
pub role: String,
#[serde(default)]
pub name: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct RoleIdParam {
pub role_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateRoleParam {
pub name: String,
#[serde(default)]
pub management: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct AclIdParam {
pub acl_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateAclParam {
pub name: String,
pub acl: String,
#[serde(default)]
pub description: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ModuleIdParam {
pub module_uid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CrdbGuidParam {
pub crdb_guid: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct UpdateCrdbParam {
pub crdb_guid: String,
#[serde(default)]
pub memory_size: Option<u64>,
#[serde(default)]
pub encryption: Option<bool>,
#[serde(default)]
pub data_persistence: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DebugInfoTaskIdParam {
pub task_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CloudProviderParam {
#[serde(default)]
pub provider: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateProSubscriptionParam {
pub request: serde_json::Value,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateEssentialsSubscriptionParam {
pub name: String,
pub plan_id: i64,
#[serde(default)]
pub payment_method_id: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct LdapMappingIdParam {
pub mapping_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CreateLdapMappingParam {
pub name: String,
pub dn: String,
pub role: String,
#[serde(default)]
pub email: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JobIdParam {
pub job_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ProxyIdParam {
pub proxy_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct EndpointIdParam {
pub endpoint_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DiagnosticReportIdParam {
pub report_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct EssentialsDatabaseIdParam {
pub subscription_id: i64,
pub database_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct VpcPeeringIdParam {
pub subscription_id: i64,
pub peering_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CloudAccountIdParam {
pub account_id: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct CrdbTaskIdParam {
pub task_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TransitGatewayAttachmentIdParam {
pub subscription_id: i64,
pub attachment_id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct BdbGroupIdParam {
pub uid: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct SuffixNameParam {
pub name: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseExecuteParam {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabasePipelineCommand {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabasePipelineParam {
pub commands: Vec<DatabasePipelineCommand>,
#[serde(default)]
pub atomic: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseInfoParam {
#[serde(default)]
pub section: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseScanParam {
#[serde(default = "default_scan_pattern")]
pub pattern: String,
#[serde(default = "default_scan_count")]
pub count: usize,
}
fn default_scan_pattern() -> String {
"*".to_string()
}
fn default_scan_count() -> usize {
100
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseKeyParam {
pub key: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseSlowlogParam {
#[serde(default = "default_slowlog_count")]
pub count: Option<usize>,
}
fn default_slowlog_count() -> Option<usize> {
Some(10)
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseConfigGetParam {
pub pattern: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseLrangeParam {
pub key: String,
#[serde(default)]
pub start: isize,
#[serde(default = "default_lrange_stop")]
pub stop: isize,
}
fn default_lrange_stop() -> isize {
-1
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseZrangeParam {
pub key: String,
#[serde(default)]
pub start: isize,
#[serde(default = "default_zrange_stop")]
pub stop: isize,
}
fn default_zrange_stop() -> isize {
-1
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseSetParam {
pub key: String,
pub value: String,
#[serde(default)]
pub ex: Option<u64>,
#[serde(default)]
pub px: Option<u64>,
#[serde(default)]
pub nx: bool,
#[serde(default)]
pub xx: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseDelParam {
pub keys: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseExpireParam {
pub key: String,
pub seconds: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseIncrbyParam {
pub key: String,
pub increment: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseHsetParam {
pub key: String,
pub field: String,
pub value: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseHsetMultipleParam {
pub key: String,
pub fields: Vec<FieldValuePair>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FieldValuePair {
pub field: String,
pub value: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseHdelParam {
pub key: String,
pub fields: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseHgetParam {
pub key: String,
pub field: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseListPushParam {
pub key: String,
pub values: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseLindexParam {
pub key: String,
pub index: isize,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseLsetParam {
pub key: String,
pub index: isize,
pub value: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseSetMembersParam {
pub key: String,
pub members: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseSismemberParam {
pub key: String,
pub member: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseZaddParam {
pub key: String,
pub members: Vec<ScoreMemberPair>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ScoreMemberPair {
pub score: f64,
pub member: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseZremParam {
pub key: String,
pub members: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseZscoreParam {
pub key: String,
pub member: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseZrangebyscoreParam {
pub key: String,
pub min: String,
pub max: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseZincrbyParam {
pub key: String,
pub member: String,
pub increment: f64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct DatabaseRenameParam {
pub key: String,
pub new_key: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSearchParam {
pub index: String,
pub query: String,
#[serde(default)]
pub nocontent: bool,
#[serde(default)]
pub verbatim: bool,
#[serde(default)]
pub withscores: bool,
#[serde(default)]
pub return_fields: Option<Vec<String>>,
#[serde(default)]
pub sortby: Option<String>,
#[serde(default)]
pub sortby_desc: bool,
#[serde(default)]
pub limit_offset: Option<i64>,
#[serde(default)]
pub limit_num: Option<i64>,
#[serde(default)]
pub highlight_fields: Option<Vec<String>>,
#[serde(default)]
pub highlight_open: Option<String>,
#[serde(default)]
pub highlight_close: Option<String>,
#[serde(default)]
pub language: Option<String>,
#[serde(default)]
pub slop: Option<i64>,
#[serde(default)]
pub inorder: bool,
#[serde(default)]
pub timeout: Option<i64>,
#[serde(default)]
pub dialect: Option<i32>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtAggregateParam {
pub index: String,
pub query: String,
#[serde(default)]
pub verbatim: bool,
#[serde(default)]
pub load: Option<Vec<String>>,
#[serde(default)]
pub groupby: Vec<FtGroupByParam>,
#[serde(default)]
pub apply: Vec<FtApplyParam>,
#[serde(default)]
pub sortby: Option<Vec<Vec<String>>>,
#[serde(default)]
pub sortby_max: Option<i64>,
#[serde(default)]
pub filter: Option<String>,
#[serde(default)]
pub limit_offset: Option<i64>,
#[serde(default)]
pub limit_num: Option<i64>,
#[serde(default)]
pub timeout: Option<i64>,
#[serde(default)]
pub dialect: Option<i32>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtGroupByParam {
pub properties: Vec<String>,
#[serde(default)]
pub reducers: Vec<FtReducerParam>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtReducerParam {
pub function: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub alias: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtApplyParam {
pub expression: String,
pub alias: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtIndexParam {
pub index: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSchemaField {
pub name: String,
#[serde(default)]
pub alias: Option<String>,
pub field_type: String,
#[serde(default)]
pub sortable: bool,
#[serde(default)]
pub noindex: bool,
#[serde(default)]
pub nostem: bool,
#[serde(default)]
pub phonetic: Option<String>,
#[serde(default)]
pub weight: Option<f64>,
#[serde(default)]
pub separator: Option<String>,
#[serde(default)]
pub casesensitive: bool,
#[serde(default)]
pub withsuffixtrie: bool,
#[serde(default)]
pub vector_algorithm: Option<String>,
#[serde(default)]
pub vector_dim: Option<i64>,
#[serde(default)]
pub vector_distance_metric: Option<String>,
#[serde(default)]
pub vector_type: Option<String>,
}
fn default_on_hash() -> String {
"HASH".to_string()
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtCreateParam {
pub index: String,
#[serde(default = "default_on_hash")]
pub on: String,
#[serde(default)]
pub prefixes: Option<Vec<String>>,
#[serde(default)]
pub filter: Option<String>,
#[serde(default)]
pub language: Option<String>,
#[serde(default)]
pub language_field: Option<String>,
#[serde(default)]
pub score: Option<f64>,
#[serde(default)]
pub score_field: Option<String>,
#[serde(default)]
pub payload_field: Option<String>,
#[serde(default)]
pub maxtextfields: bool,
#[serde(default)]
pub temporary: Option<i64>,
#[serde(default)]
pub nooffsets: bool,
#[serde(default)]
pub nohl: bool,
#[serde(default)]
pub nofields: bool,
#[serde(default)]
pub nofreqs: bool,
#[serde(default)]
pub stopwords: Option<Vec<String>>,
#[serde(default)]
pub skip_initial_scan: bool,
pub schema: Vec<FtSchemaField>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtDropIndexParam {
pub index: String,
#[serde(default)]
pub dd: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtAlterParam {
pub index: String,
#[serde(default)]
pub skip_initial_scan: bool,
pub field: FtSchemaField,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtExplainParam {
pub index: String,
pub query: String,
#[serde(default)]
pub dialect: Option<i32>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtTagvalsParam {
pub index: String,
pub field: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSpellcheckParam {
pub index: String,
pub query: String,
#[serde(default)]
pub distance: Option<i32>,
#[serde(default)]
pub dialect: Option<i32>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtAliasAddParam {
pub alias: String,
pub index: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtAliasDelParam {
pub alias: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtAliasUpdateParam {
pub alias: String,
pub index: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSugAddParam {
pub key: String,
pub string: String,
pub score: f64,
#[serde(default)]
pub incr: bool,
#[serde(default)]
pub payload: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSugGetParam {
pub key: String,
pub prefix: String,
#[serde(default)]
pub fuzzy: bool,
#[serde(default)]
pub max: Option<i64>,
#[serde(default)]
pub withscores: bool,
#[serde(default)]
pub withpayloads: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSugDelParam {
pub key: String,
pub string: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSugLenParam {
pub key: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSynDumpParam {
pub index: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct FtSynUpdateParam {
pub index: String,
pub group_id: String,
#[serde(default)]
pub skip_initial_scan: bool,
pub terms: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonGetParam {
pub key: String,
#[serde(default)]
pub paths: Vec<String>,
#[serde(default)]
pub indent: Option<String>,
#[serde(default)]
pub newline: Option<String>,
#[serde(default)]
pub space: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonSetParam {
pub key: String,
#[serde(default = "default_json_path")]
pub path: String,
pub value: String,
#[serde(default)]
pub nx: bool,
#[serde(default)]
pub xx: bool,
}
fn default_json_path() -> String {
"$".to_string()
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonDelParam {
pub key: String,
#[serde(default)]
pub path: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonPathParam {
pub key: String,
#[serde(default)]
pub path: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonArrAppendParam {
pub key: String,
pub path: String,
pub values: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonNumIncrByParam {
pub key: String,
pub path: String,
pub value: f64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonMgetParam {
pub keys: Vec<String>,
pub path: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonObjKeysParam {
pub key: String,
#[serde(default = "default_json_path")]
pub path: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonObjLenParam {
pub key: String,
#[serde(default = "default_json_path")]
pub path: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonArrIndexParam {
pub key: String,
pub path: String,
pub value: String,
#[serde(default)]
pub start: Option<i64>,
#[serde(default)]
pub stop: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonArrPopParam {
pub key: String,
#[serde(default = "default_json_path")]
pub path: String,
#[serde(default)]
pub index: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonArrTrimParam {
pub key: String,
pub path: String,
pub start: i64,
pub stop: i64,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonArrInsertParam {
pub key: String,
pub path: String,
pub index: i64,
pub values: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonClearParam {
pub key: String,
#[serde(default = "default_json_path")]
pub path: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct JsonToggleParam {
pub key: String,
pub path: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TsAddParam {
pub key: String,
pub timestamp: String,
pub value: f64,
#[serde(default)]
pub retention: Option<i64>,
#[serde(default)]
pub encoding: Option<String>,
#[serde(default)]
pub chunk_size: Option<i64>,
#[serde(default)]
pub on_duplicate: Option<String>,
#[serde(default)]
pub labels: Option<Vec<LabelPair>>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct LabelPair {
pub label: String,
pub value: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TsRangeParam {
pub key: String,
pub from: String,
pub to: String,
#[serde(default)]
pub latest: bool,
#[serde(default)]
pub filter_by_ts: Option<Vec<i64>>,
#[serde(default)]
pub filter_by_value_min: Option<f64>,
#[serde(default)]
pub filter_by_value_max: Option<f64>,
#[serde(default)]
pub count: Option<i64>,
#[serde(default)]
pub align: Option<String>,
#[serde(default)]
pub aggregation: Option<String>,
#[serde(default)]
pub bucket_duration: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct TsCreateParam {
pub key: String,
#[serde(default)]
pub retention: Option<i64>,
#[serde(default)]
pub encoding: Option<String>,
#[serde(default)]
pub chunk_size: Option<i64>,
#[serde(default)]
pub duplicate_policy: Option<String>,
#[serde(default)]
pub labels: Option<Vec<LabelPair>>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct BfReserveParam {
pub key: String,
pub error_rate: f64,
pub capacity: u64,
#[serde(default)]
pub expansion: Option<u32>,
#[serde(default)]
pub nonscaling: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct BfAddParam {
pub key: String,
pub item: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct BfMaddParam {
pub key: String,
pub items: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct BfExistsParam {
pub key: String,
pub item: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct BfMexistsParam {
pub key: String,
pub items: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct StreamFieldPair {
pub field: String,
pub value: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XaddParam {
pub key: String,
#[serde(default = "default_xadd_id")]
pub id: String,
pub fields: Vec<StreamFieldPair>,
#[serde(default)]
pub maxlen: Option<i64>,
#[serde(default)]
pub approximate: bool,
}
fn default_xadd_id() -> String {
"*".to_string()
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XreadParam {
pub keys: Vec<String>,
pub ids: Vec<String>,
#[serde(default)]
pub count: Option<i64>,
#[serde(default)]
pub block: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XrangeParam {
pub key: String,
pub start: String,
pub end: String,
#[serde(default)]
pub count: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XrevrangeParam {
pub key: String,
pub end: String,
pub start: String,
#[serde(default)]
pub count: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XinfoStreamParam {
pub key: String,
#[serde(default)]
pub full: bool,
#[serde(default)]
pub count: Option<i64>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XinfoConsumersParam {
pub key: String,
pub group: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XgroupCreateParam {
pub key: String,
pub group: String,
#[serde(default = "default_xgroup_id")]
pub id: String,
#[serde(default)]
pub mkstream: bool,
}
fn default_xgroup_id() -> String {
"$".to_string()
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XgroupDestroyParam {
pub key: String,
pub group: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XgroupDelconsumerParam {
pub key: String,
pub group: String,
pub consumer: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XgroupSetidParam {
pub key: String,
pub group: String,
pub id: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XreadgroupParam {
pub group: String,
pub consumer: String,
pub keys: Vec<String>,
pub ids: Vec<String>,
#[serde(default)]
pub count: Option<i64>,
#[serde(default)]
pub block: Option<i64>,
#[serde(default)]
pub noack: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XackParam {
pub key: String,
pub group: String,
pub ids: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XdelParam {
pub key: String,
pub ids: Vec<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XtrimParam {
pub key: String,
pub maxlen: i64,
#[serde(default)]
pub approximate: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XpendingParam {
pub key: String,
pub group: String,
#[serde(default)]
pub start: Option<String>,
#[serde(default)]
pub end: Option<String>,
#[serde(default)]
pub count: Option<i64>,
#[serde(default)]
pub consumer: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XclaimParam {
pub key: String,
pub group: String,
pub consumer: String,
pub min_idle_time: i64,
pub ids: Vec<String>,
#[serde(default)]
pub idle: Option<i64>,
#[serde(default)]
pub time: Option<i64>,
#[serde(default)]
pub retrycount: Option<i64>,
#[serde(default)]
pub force: bool,
#[serde(default)]
pub justid: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct XautoclaimParam {
pub key: String,
pub group: String,
pub consumer: String,
pub min_idle_time: i64,
pub start: String,
#[serde(default)]
pub count: Option<i64>,
#[serde(default)]
pub justid: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct PublishParam {
pub channel: String,
pub message: String,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct PubsubChannelsParam {
#[serde(default)]
pub pattern: Option<String>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct PubsubNumsubParam {
pub channels: Vec<String>,
}
impl RedisCtlMcp {
pub fn new(
profile: Option<&str>,
read_only: bool,
database_url: Option<&str>,
) -> anyhow::Result<Self> {
let config = Arc::new(ServerConfig {
profile: profile.map(String::from),
read_only,
database_url: database_url.map(String::from),
});
info!(
profile = ?config.profile,
read_only = config.read_only,
database_url = config.database_url.as_ref().map(|_| "[configured]"),
"Initializing RedisCtlMcp server"
);
Ok(Self {
config,
tool_router: Self::tool_router(),
cloud_tools: Arc::new(RwLock::new(None)),
enterprise_tools: Arc::new(RwLock::new(None)),
database_tools: Arc::new(RwLock::new(None)),
})
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
async fn get_cloud_tools(&self) -> Result<CloudTools, RmcpError> {
let mut guard = self.cloud_tools.write().await;
if guard.is_none() {
debug!("Initializing Cloud tools");
let tools = CloudTools::new(self.config.profile.as_deref())
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
*guard = Some(tools);
}
Ok(guard.clone().unwrap())
}
async fn get_enterprise_tools(&self) -> Result<EnterpriseTools, RmcpError> {
let mut guard = self.enterprise_tools.write().await;
if guard.is_none() {
debug!("Initializing Enterprise tools");
let tools = EnterpriseTools::new(self.config.profile.as_deref())
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
*guard = Some(tools);
}
Ok(guard.clone().unwrap())
}
async fn get_database_tools(&self) -> Result<DatabaseTools, RmcpError> {
let mut guard = self.database_tools.write().await;
if guard.is_none() {
debug!("Initializing Database tools");
let tools = if let Some(ref url) = self.config.database_url {
debug!("Using direct database URL connection");
DatabaseTools::new_from_url(url)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?
} else {
debug!("Using profile-based database connection");
DatabaseTools::new(self.config.profile.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?
};
*guard = Some(tools);
}
Ok(guard.clone().unwrap())
}
}
#[tool_router]
impl RedisCtlMcp {
#[tool(
description = "Get Redis Cloud account information including account ID, name, and settings"
)]
async fn cloud_account_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_account_get");
let tools = self.get_cloud_tools().await?;
tools.get_account().await
}
#[tool(description = "List all Redis Cloud subscriptions in the account")]
async fn cloud_subscriptions_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_subscriptions_list");
let tools = self.get_cloud_tools().await?;
tools.list_subscriptions().await
}
#[tool(description = "Get detailed information about a specific Redis Cloud subscription")]
async fn cloud_subscription_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_subscription_get"
);
let tools = self.get_cloud_tools().await?;
tools.get_subscription(params.subscription_id).await
}
#[tool(description = "List all databases in a specific Redis Cloud subscription")]
async fn cloud_databases_list(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_databases_list"
);
let tools = self.get_cloud_tools().await?;
tools.list_databases(params.subscription_id).await
}
#[tool(description = "Get detailed information about a specific Redis Cloud database")]
async fn cloud_database_get(
&self,
Parameters(params): Parameters<DatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
database_id = params.database_id,
"Tool called: cloud_database_get"
);
let tools = self.get_cloud_tools().await?;
tools
.get_database(params.subscription_id, params.database_id)
.await
}
#[tool(description = "List recent async tasks in the Redis Cloud account")]
async fn cloud_tasks_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_tasks_list");
let tools = self.get_cloud_tools().await?;
tools.list_tasks().await
}
#[tool(description = "Get the status of a specific Redis Cloud async task")]
async fn cloud_task_get(
&self,
Parameters(params): Parameters<TaskIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(task_id = %params.task_id, "Tool called: cloud_task_get");
let tools = self.get_cloud_tools().await?;
tools.get_task(¶ms.task_id).await
}
#[tool(description = "List all payment methods configured for your Redis Cloud account")]
async fn cloud_payment_methods_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_payment_methods_get");
let tools = self.get_cloud_tools().await?;
tools.get_payment_methods().await
}
#[tool(
description = "List all available database modules (capabilities) supported in your account"
)]
async fn cloud_database_modules_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_database_modules_get");
let tools = self.get_cloud_tools().await?;
tools.get_database_modules().await
}
#[tool(
description = "Get available regions across cloud providers (AWS, GCP, Azure) for Pro subscriptions"
)]
async fn cloud_regions_get(
&self,
Parameters(params): Parameters<CloudProviderParam>,
) -> Result<CallToolResult, RmcpError> {
info!(provider = ?params.provider, "Tool called: cloud_regions_get");
let tools = self.get_cloud_tools().await?;
tools.get_regions(params.provider.as_deref()).await
}
#[tool(
description = "Create a new Pro subscription with advanced configuration options. Requires JSON payload with cloudProviders and databases arrays. Use cloud_regions_get to find available regions."
)]
async fn cloud_pro_subscription_create(
&self,
Parameters(params): Parameters<CreateProSubscriptionParam>,
) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_pro_subscription_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools.create_subscription(params.request).await
}
#[tool(
description = "Delete a Pro subscription. All databases must be deleted first. This is a destructive operation."
)]
async fn cloud_pro_subscription_delete(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_pro_subscription_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools.delete_subscription(params.subscription_id).await
}
#[tool(description = "List all Essentials (fixed) subscriptions in the account")]
async fn cloud_essentials_subscriptions_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_essentials_subscriptions_list");
let tools = self.get_cloud_tools().await?;
tools.list_essentials_subscriptions().await
}
#[tool(description = "Get detailed information about a specific Essentials subscription")]
async fn cloud_essentials_subscription_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_essentials_subscription_get"
);
let tools = self.get_cloud_tools().await?;
tools
.get_essentials_subscription(params.subscription_id)
.await
}
#[tool(
description = "Create a new Essentials subscription. Use cloud_essentials_plans_list to find available plans."
)]
async fn cloud_essentials_subscription_create(
&self,
Parameters(params): Parameters<CreateEssentialsSubscriptionParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
name = %params.name,
plan_id = params.plan_id,
"Tool called: cloud_essentials_subscription_create"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.create_essentials_subscription(¶ms.name, params.plan_id, params.payment_method_id)
.await
}
#[tool(
description = "Delete an Essentials subscription. This is a destructive operation that cannot be undone."
)]
async fn cloud_essentials_subscription_delete(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_essentials_subscription_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.delete_essentials_subscription(params.subscription_id)
.await
}
#[tool(
description = "List available Essentials subscription plans with pricing. Optionally filter by cloud provider (AWS, GCP, Azure)."
)]
async fn cloud_essentials_plans_list(
&self,
Parameters(params): Parameters<CloudProviderParam>,
) -> Result<CallToolResult, RmcpError> {
info!(provider = ?params.provider, "Tool called: cloud_essentials_plans_list");
let tools = self.get_cloud_tools().await?;
tools
.list_essentials_plans(params.provider.as_deref())
.await
}
#[tool(
description = "Get Redis Enterprise cluster information including name, version, and node count"
)]
async fn enterprise_cluster_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_get");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster().await
}
#[tool(description = "List all nodes in the Redis Enterprise cluster with their status")]
async fn enterprise_nodes_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_nodes_list");
let tools = self.get_enterprise_tools().await?;
tools.list_nodes().await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise node")]
async fn enterprise_node_get(
&self,
Parameters(params): Parameters<NodeIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(node_id = params.node_id, "Tool called: enterprise_node_get");
let tools = self.get_enterprise_tools().await?;
tools.get_node(params.node_id).await
}
#[tool(description = "List all databases (BDBs) in the Redis Enterprise cluster")]
async fn enterprise_databases_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_databases_list");
let tools = self.get_enterprise_tools().await?;
tools.list_databases().await
}
#[tool(
description = "Get detailed information about a specific Redis Enterprise database (BDB)"
)]
async fn enterprise_database_get(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_get"
);
let tools = self.get_enterprise_tools().await?;
tools.get_database(params.database_id).await
}
#[tool(description = "Get performance statistics for a specific Redis Enterprise database")]
async fn enterprise_database_stats(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_stats"
);
let tools = self.get_enterprise_tools().await?;
tools.get_database_stats(params.database_id).await
}
#[tool(description = "List all shards across all databases in the Redis Enterprise cluster")]
async fn enterprise_shards_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_shards_list");
let tools = self.get_enterprise_tools().await?;
tools.list_shards().await
}
#[tool(description = "List active alerts in the Redis Enterprise cluster")]
async fn enterprise_alerts_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_alerts_list");
let tools = self.get_enterprise_tools().await?;
tools.list_alerts().await
}
#[tool(description = "Get recent event logs from the Redis Enterprise cluster")]
async fn enterprise_logs_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_logs_get");
let tools = self.get_enterprise_tools().await?;
tools.get_logs().await
}
#[tool(
description = "Get Redis Enterprise license information including expiration and capacity"
)]
async fn enterprise_license_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_license_get");
let tools = self.get_enterprise_tools().await?;
tools.get_license().await
}
#[tool(
description = "Create a new Redis Enterprise database. Requires name, optionally memory_size_mb (default 100)."
)]
async fn enterprise_database_create(
&self,
Parameters(params): Parameters<CreateEnterpriseDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
name = %params.name,
memory_size_mb = ?params.memory_size_mb,
"Tool called: enterprise_database_create"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_database(¶ms.name, params.memory_size_mb)
.await
}
#[tool(
description = "Delete a Redis Enterprise database. This is a destructive operation that cannot be undone."
)]
async fn enterprise_database_delete(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_database(params.database_id).await
}
#[tool(
description = "Update a Redis Enterprise database configuration. Supports memory_size (bytes), replication, data_persistence, and eviction_policy."
)]
async fn enterprise_database_update(
&self,
Parameters(params): Parameters<UpdateEnterpriseDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_update"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(memory_size) = params.memory_size {
updates.insert("memory_size".to_string(), serde_json::json!(memory_size));
}
if let Some(replication) = params.replication {
updates.insert("replication".to_string(), serde_json::json!(replication));
}
if let Some(ref data_persistence) = params.data_persistence {
updates.insert(
"data_persistence".to_string(),
serde_json::json!(data_persistence),
);
}
if let Some(ref eviction_policy) = params.eviction_policy {
updates.insert(
"eviction_policy".to_string(),
serde_json::json!(eviction_policy),
);
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: memory_size, replication, data_persistence, eviction_policy",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_database(params.database_id, serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Flush all data from a Redis Enterprise database. This is a destructive operation that cannot be undone."
)]
async fn enterprise_database_flush(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_flush"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.flush_database(params.database_id).await
}
#[tool(description = "Get performance metrics for a Redis Enterprise database")]
async fn enterprise_database_metrics(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_metrics"
);
let tools = self.get_enterprise_tools().await?;
tools.get_database_metrics(params.database_id).await
}
#[tool(
description = "Export a Redis Enterprise database to a specified location (S3, FTP, etc.)"
)]
async fn enterprise_database_export(
&self,
Parameters(params): Parameters<ExportDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
export_location = %params.export_location,
"Tool called: enterprise_database_export"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.export_database(params.database_id, ¶ms.export_location)
.await
}
#[tool(description = "Import data into a Redis Enterprise database from a specified location")]
async fn enterprise_database_import(
&self,
Parameters(params): Parameters<ImportDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
import_location = %params.import_location,
flush_before_import = params.flush_before_import,
"Tool called: enterprise_database_import"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.import_database(
params.database_id,
¶ms.import_location,
params.flush_before_import,
)
.await
}
#[tool(description = "Trigger a backup of a Redis Enterprise database")]
async fn enterprise_database_backup(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_database_backup"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.backup_database(params.database_id).await
}
#[tool(description = "Restore a Redis Enterprise database from a backup")]
async fn enterprise_database_restore(
&self,
Parameters(params): Parameters<RestoreDatabaseParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
backup_uid = ?params.backup_uid,
"Tool called: enterprise_database_restore"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.restore_database(params.database_id, params.backup_uid.as_deref())
.await
}
#[tool(
description = "Get Redis Enterprise cluster statistics including memory, CPU, and throughput metrics"
)]
async fn enterprise_cluster_stats(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_stats");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_stats().await
}
#[tool(description = "Get Redis Enterprise cluster settings and configuration")]
async fn enterprise_cluster_settings(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_settings");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_settings().await
}
#[tool(
description = "Get Redis Enterprise cluster topology showing nodes, shards, and their relationships"
)]
async fn enterprise_cluster_topology(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_topology");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_topology().await
}
#[tool(
description = "Update Redis Enterprise cluster configuration. Supports name, email_alerts, and rack_aware settings."
)]
async fn enterprise_cluster_update(
&self,
Parameters(params): Parameters<UpdateClusterParam>,
) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_update");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(ref name) = params.name {
updates.insert("name".to_string(), serde_json::json!(name));
}
if let Some(email_alerts) = params.email_alerts {
updates.insert("email_alerts".to_string(), serde_json::json!(email_alerts));
}
if let Some(rack_aware) = params.rack_aware {
updates.insert("rack_aware".to_string(), serde_json::json!(rack_aware));
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: name, email_alerts, rack_aware",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_cluster(serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Get statistics for a specific Redis Enterprise node including CPU, memory, and network metrics"
)]
async fn enterprise_node_stats(
&self,
Parameters(params): Parameters<NodeIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
node_id = params.node_id,
"Tool called: enterprise_node_stats"
);
let tools = self.get_enterprise_tools().await?;
tools.get_node_stats(params.node_id).await
}
#[tool(
description = "Update a Redis Enterprise node configuration. Supports accept_servers, external_addr, and rack_id."
)]
async fn enterprise_node_update(
&self,
Parameters(params): Parameters<UpdateNodeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
node_id = params.node_id,
"Tool called: enterprise_node_update"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(accept_servers) = params.accept_servers {
updates.insert(
"accept_servers".to_string(),
serde_json::json!(accept_servers),
);
}
if let Some(ref external_addr) = params.external_addr {
updates.insert(
"external_addr".to_string(),
serde_json::json!(external_addr),
);
}
if let Some(ref rack_id) = params.rack_id {
updates.insert("rack_id".to_string(), serde_json::json!(rack_id));
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: accept_servers, external_addr, rack_id",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_node(params.node_id, serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Remove a node from the Redis Enterprise cluster. This is a destructive operation."
)]
async fn enterprise_node_remove(
&self,
Parameters(params): Parameters<NodeIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
node_id = params.node_id,
"Tool called: enterprise_node_remove"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.remove_node(params.node_id).await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise shard")]
async fn enterprise_shard_get(
&self,
Parameters(params): Parameters<ShardIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(shard_uid = %params.shard_uid, "Tool called: enterprise_shard_get");
let tools = self.get_enterprise_tools().await?;
tools.get_shard(¶ms.shard_uid).await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise alert")]
async fn enterprise_alert_get(
&self,
Parameters(params): Parameters<AlertIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(alert_uid = %params.alert_uid, "Tool called: enterprise_alert_get");
let tools = self.get_enterprise_tools().await?;
tools.get_alert(¶ms.alert_uid).await
}
#[tool(description = "List all users in the Redis Enterprise cluster")]
async fn enterprise_users_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_users_list");
let tools = self.get_enterprise_tools().await?;
tools.list_users().await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise user")]
async fn enterprise_user_get(
&self,
Parameters(params): Parameters<UserIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(user_id = params.user_id, "Tool called: enterprise_user_get");
let tools = self.get_enterprise_tools().await?;
tools.get_user(params.user_id).await
}
#[tool(description = "Create a new user in the Redis Enterprise cluster")]
async fn enterprise_user_create(
&self,
Parameters(params): Parameters<CreateUserParam>,
) -> Result<CallToolResult, RmcpError> {
info!(email = %params.email, "Tool called: enterprise_user_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_user(
¶ms.email,
¶ms.password,
¶ms.role,
params.name.as_deref(),
)
.await
}
#[tool(description = "Delete a user from the Redis Enterprise cluster")]
async fn enterprise_user_delete(
&self,
Parameters(params): Parameters<UserIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
user_id = params.user_id,
"Tool called: enterprise_user_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_user(params.user_id).await
}
#[tool(description = "List all roles in the Redis Enterprise cluster")]
async fn enterprise_roles_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_roles_list");
let tools = self.get_enterprise_tools().await?;
tools.list_roles().await
}
#[tool(description = "Get detailed information about a specific Redis Enterprise role")]
async fn enterprise_role_get(
&self,
Parameters(params): Parameters<RoleIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(role_id = params.role_id, "Tool called: enterprise_role_get");
let tools = self.get_enterprise_tools().await?;
tools.get_role(params.role_id).await
}
#[tool(description = "Create a new role in the Redis Enterprise cluster")]
async fn enterprise_role_create(
&self,
Parameters(params): Parameters<CreateRoleParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_role_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_role(¶ms.name, params.management.as_deref())
.await
}
#[tool(description = "Delete a role from the Redis Enterprise cluster")]
async fn enterprise_role_delete(
&self,
Parameters(params): Parameters<RoleIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
role_id = params.role_id,
"Tool called: enterprise_role_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_role(params.role_id).await
}
#[tool(description = "List all Redis ACLs in the Redis Enterprise cluster")]
async fn enterprise_acls_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_acls_list");
let tools = self.get_enterprise_tools().await?;
tools.list_acls().await
}
#[tool(description = "Get detailed information about a specific Redis ACL")]
async fn enterprise_acl_get(
&self,
Parameters(params): Parameters<AclIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(acl_id = params.acl_id, "Tool called: enterprise_acl_get");
let tools = self.get_enterprise_tools().await?;
tools.get_acl(params.acl_id).await
}
#[tool(description = "Create a new Redis ACL in the Redis Enterprise cluster")]
async fn enterprise_acl_create(
&self,
Parameters(params): Parameters<CreateAclParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_acl_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_acl(¶ms.name, ¶ms.acl, params.description.as_deref())
.await
}
#[tool(description = "Delete a Redis ACL from the Redis Enterprise cluster")]
async fn enterprise_acl_delete(
&self,
Parameters(params): Parameters<AclIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(acl_id = params.acl_id, "Tool called: enterprise_acl_delete");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_acl(params.acl_id).await
}
#[tool(description = "List all Redis modules available in the Redis Enterprise cluster")]
async fn enterprise_modules_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_modules_list");
let tools = self.get_enterprise_tools().await?;
tools.list_modules().await
}
#[tool(description = "Get detailed information about a specific Redis module")]
async fn enterprise_module_get(
&self,
Parameters(params): Parameters<ModuleIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(module_uid = %params.module_uid, "Tool called: enterprise_module_get");
let tools = self.get_enterprise_tools().await?;
tools.get_module(¶ms.module_uid).await
}
#[tool(description = "List all Active-Active (CRDB) databases in the Redis Enterprise cluster")]
async fn enterprise_crdbs_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_crdbs_list");
let tools = self.get_enterprise_tools().await?;
tools.list_crdbs().await
}
#[tool(description = "Get detailed information about a specific Active-Active (CRDB) database")]
async fn enterprise_crdb_get(
&self,
Parameters(params): Parameters<CrdbGuidParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_get");
let tools = self.get_enterprise_tools().await?;
tools.get_crdb(¶ms.crdb_guid).await
}
#[tool(description = "Update an Active-Active (CRDB) database configuration")]
async fn enterprise_crdb_update(
&self,
Parameters(params): Parameters<UpdateCrdbParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_update");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let mut updates = serde_json::Map::new();
if let Some(memory_size) = params.memory_size {
updates.insert("memory_size".to_string(), serde_json::json!(memory_size));
}
if let Some(encryption) = params.encryption {
updates.insert("encryption".to_string(), serde_json::json!(encryption));
}
if let Some(ref data_persistence) = params.data_persistence {
updates.insert(
"data_persistence".to_string(),
serde_json::json!(data_persistence),
);
}
if updates.is_empty() {
return Err(RmcpError::invalid_request(
"No updates provided. Specify at least one of: memory_size, encryption, data_persistence",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.update_crdb(¶ms.crdb_guid, serde_json::Value::Object(updates))
.await
}
#[tool(
description = "Delete an Active-Active (CRDB) database. This is a destructive operation."
)]
async fn enterprise_crdb_delete(
&self,
Parameters(params): Parameters<CrdbGuidParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_delete");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_crdb(¶ms.crdb_guid).await
}
#[tool(description = "List debug info collection tasks in the Redis Enterprise cluster")]
async fn enterprise_debuginfo_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_debuginfo_list");
let tools = self.get_enterprise_tools().await?;
tools.list_debuginfo().await
}
#[tool(description = "Get the status of a specific debug info collection task")]
async fn enterprise_debuginfo_status(
&self,
Parameters(params): Parameters<DebugInfoTaskIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(task_id = %params.task_id, "Tool called: enterprise_debuginfo_status");
let tools = self.get_enterprise_tools().await?;
tools.get_debuginfo_status(¶ms.task_id).await
}
#[tool(description = "List all LDAP mappings in the Redis Enterprise cluster")]
async fn enterprise_ldap_mappings_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_ldap_mappings_list");
let tools = self.get_enterprise_tools().await?;
tools.list_ldap_mappings().await
}
#[tool(description = "Get detailed information about a specific LDAP mapping")]
async fn enterprise_ldap_mapping_get(
&self,
Parameters(params): Parameters<LdapMappingIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
mapping_id = params.mapping_id,
"Tool called: enterprise_ldap_mapping_get"
);
let tools = self.get_enterprise_tools().await?;
tools.get_ldap_mapping(params.mapping_id as u64).await
}
#[tool(description = "Create a new LDAP mapping to map LDAP groups to Redis Enterprise roles")]
async fn enterprise_ldap_mapping_create(
&self,
Parameters(params): Parameters<CreateLdapMappingParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_ldap_mapping_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools
.create_ldap_mapping(
¶ms.name,
¶ms.dn,
¶ms.role,
params.email.as_deref(),
)
.await
}
#[tool(description = "Delete an LDAP mapping from the Redis Enterprise cluster")]
async fn enterprise_ldap_mapping_delete(
&self,
Parameters(params): Parameters<LdapMappingIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
mapping_id = params.mapping_id,
"Tool called: enterprise_ldap_mapping_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_ldap_mapping(params.mapping_id as u64).await
}
#[tool(description = "List all scheduled jobs in the Redis Enterprise cluster")]
async fn enterprise_jobs_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_jobs_list");
let tools = self.get_enterprise_tools().await?;
tools.list_jobs().await
}
#[tool(description = "Get detailed information about a specific scheduled job")]
async fn enterprise_job_get(
&self,
Parameters(params): Parameters<JobIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(job_id = %params.job_id, "Tool called: enterprise_job_get");
let tools = self.get_enterprise_tools().await?;
tools.get_job(¶ms.job_id).await
}
#[tool(description = "Get execution history for a specific scheduled job")]
async fn enterprise_job_history(
&self,
Parameters(params): Parameters<JobIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(job_id = %params.job_id, "Tool called: enterprise_job_history");
let tools = self.get_enterprise_tools().await?;
tools.get_job_history(¶ms.job_id).await
}
#[tool(description = "Trigger immediate execution of a scheduled job")]
async fn enterprise_job_trigger(
&self,
Parameters(params): Parameters<JobIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(job_id = %params.job_id, "Tool called: enterprise_job_trigger");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.trigger_job(¶ms.job_id).await
}
#[tool(description = "List all proxies in the Redis Enterprise cluster")]
async fn enterprise_proxies_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_proxies_list");
let tools = self.get_enterprise_tools().await?;
tools.list_proxies().await
}
#[tool(description = "Get detailed information about a specific proxy")]
async fn enterprise_proxy_get(
&self,
Parameters(params): Parameters<ProxyIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
proxy_id = params.proxy_id,
"Tool called: enterprise_proxy_get"
);
let tools = self.get_enterprise_tools().await?;
tools.get_proxy(params.proxy_id as u64).await
}
#[tool(description = "Get statistics for a specific proxy")]
async fn enterprise_proxy_stats(
&self,
Parameters(params): Parameters<ProxyIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
proxy_id = params.proxy_id,
"Tool called: enterprise_proxy_stats"
);
let tools = self.get_enterprise_tools().await?;
tools.get_proxy_stats(params.proxy_id as u64).await
}
#[tool(description = "List proxies for a specific database")]
async fn enterprise_proxies_by_database(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_proxies_by_database"
);
let tools = self.get_enterprise_tools().await?;
tools
.list_proxies_by_database(params.database_id as u64)
.await
}
#[tool(description = "List all database endpoints in the Redis Enterprise cluster")]
async fn enterprise_endpoints_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_endpoints_list");
let tools = self.get_enterprise_tools().await?;
tools.list_endpoints().await
}
#[tool(description = "Get detailed information about a specific endpoint")]
async fn enterprise_endpoint_get(
&self,
Parameters(params): Parameters<EndpointIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(endpoint_id = %params.endpoint_id, "Tool called: enterprise_endpoint_get");
let tools = self.get_enterprise_tools().await?;
tools.get_endpoint(¶ms.endpoint_id).await
}
#[tool(description = "Get statistics for a specific endpoint")]
async fn enterprise_endpoint_stats(
&self,
Parameters(params): Parameters<EndpointIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(endpoint_id = %params.endpoint_id, "Tool called: enterprise_endpoint_stats");
let tools = self.get_enterprise_tools().await?;
tools.get_endpoint_stats(¶ms.endpoint_id).await
}
#[tool(description = "List endpoints for a specific database")]
async fn enterprise_endpoints_by_database(
&self,
Parameters(params): Parameters<EnterpriseDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
database_id = params.database_id,
"Tool called: enterprise_endpoints_by_database"
);
let tools = self.get_enterprise_tools().await?;
tools
.list_endpoints_by_database(params.database_id as u64)
.await
}
#[tool(description = "List available diagnostic checks in the Redis Enterprise cluster")]
async fn enterprise_diagnostic_checks_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_diagnostic_checks_list");
let tools = self.get_enterprise_tools().await?;
tools.list_diagnostic_checks().await
}
#[tool(description = "List diagnostic reports in the Redis Enterprise cluster")]
async fn enterprise_diagnostic_reports_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_diagnostic_reports_list");
let tools = self.get_enterprise_tools().await?;
tools.list_diagnostic_reports().await
}
#[tool(description = "Get a specific diagnostic report")]
async fn enterprise_diagnostic_report_get(
&self,
Parameters(params): Parameters<DiagnosticReportIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(report_id = %params.report_id, "Tool called: enterprise_diagnostic_report_get");
let tools = self.get_enterprise_tools().await?;
tools.get_diagnostic_report(¶ms.report_id).await
}
#[tool(description = "Get the most recent diagnostic report")]
async fn enterprise_diagnostic_report_last(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_diagnostic_report_last");
let tools = self.get_enterprise_tools().await?;
tools.get_last_diagnostic_report().await
}
#[tool(description = "Run diagnostics on the Redis Enterprise cluster")]
async fn enterprise_diagnostics_run(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_diagnostics_run");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.run_diagnostics().await
}
#[tool(description = "List all databases in an Essentials subscription")]
async fn cloud_essentials_databases_list(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_essentials_databases_list"
);
let tools = self.get_cloud_tools().await?;
tools
.list_essentials_databases(params.subscription_id)
.await
}
#[tool(description = "Get detailed information about a specific Essentials database")]
async fn cloud_essentials_database_get(
&self,
Parameters(params): Parameters<EssentialsDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
database_id = params.database_id,
"Tool called: cloud_essentials_database_get"
);
let tools = self.get_cloud_tools().await?;
tools
.get_essentials_database(params.subscription_id, params.database_id)
.await
}
#[tool(description = "Delete an Essentials database. This is a destructive operation.")]
async fn cloud_essentials_database_delete(
&self,
Parameters(params): Parameters<EssentialsDatabaseIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
database_id = params.database_id,
"Tool called: cloud_essentials_database_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.delete_essentials_database(params.subscription_id, params.database_id)
.await
}
#[tool(description = "Get VPC peerings for a subscription")]
async fn cloud_vpc_peerings_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_vpc_peerings_get"
);
let tools = self.get_cloud_tools().await?;
tools.get_vpc_peerings(params.subscription_id).await
}
#[tool(description = "Delete a VPC peering. This is a destructive operation.")]
async fn cloud_vpc_peering_delete(
&self,
Parameters(params): Parameters<VpcPeeringIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
peering_id = params.peering_id,
"Tool called: cloud_vpc_peering_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.delete_vpc_peering(params.subscription_id, params.peering_id)
.await
}
#[tool(
description = "List all cloud provider accounts (AWS, GCP, Azure) configured in your Redis Cloud account"
)]
async fn cloud_accounts_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: cloud_accounts_list");
let tools = self.get_cloud_tools().await?;
tools.list_cloud_accounts().await
}
#[tool(description = "Get detailed information about a specific cloud provider account")]
async fn cloud_account_get_by_id(
&self,
Parameters(params): Parameters<CloudAccountIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
account_id = params.account_id,
"Tool called: cloud_account_get_by_id"
);
let tools = self.get_cloud_tools().await?;
tools.get_cloud_account(params.account_id).await
}
#[tool(description = "Delete a cloud provider account. This is a destructive operation.")]
async fn cloud_account_delete(
&self,
Parameters(params): Parameters<CloudAccountIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
account_id = params.account_id,
"Tool called: cloud_account_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools.delete_cloud_account(params.account_id).await
}
#[tool(description = "List all Active-Active (CRDB) tasks in the Redis Enterprise cluster")]
async fn enterprise_crdb_tasks_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_crdb_tasks_list");
let tools = self.get_enterprise_tools().await?;
tools.list_crdb_tasks().await
}
#[tool(description = "Get detailed information about a specific CRDB task")]
async fn enterprise_crdb_task_get(
&self,
Parameters(params): Parameters<CrdbTaskIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(task_id = %params.task_id, "Tool called: enterprise_crdb_task_get");
let tools = self.get_enterprise_tools().await?;
tools.get_crdb_task(¶ms.task_id).await
}
#[tool(description = "List CRDB tasks for a specific Active-Active database")]
async fn enterprise_crdb_tasks_by_crdb(
&self,
Parameters(params): Parameters<CrdbGuidParam>,
) -> Result<CallToolResult, RmcpError> {
info!(crdb_guid = %params.crdb_guid, "Tool called: enterprise_crdb_tasks_by_crdb");
let tools = self.get_enterprise_tools().await?;
tools.list_crdb_tasks_by_crdb(¶ms.crdb_guid).await
}
#[tool(description = "Cancel a CRDB task")]
async fn enterprise_crdb_task_cancel(
&self,
Parameters(params): Parameters<CrdbTaskIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(task_id = %params.task_id, "Tool called: enterprise_crdb_task_cancel");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.cancel_crdb_task(¶ms.task_id).await
}
#[tool(description = "Get AWS PrivateLink configuration for a subscription")]
async fn cloud_private_link_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_private_link_get"
);
let tools = self.get_cloud_tools().await?;
tools.get_private_link(params.subscription_id).await
}
#[tool(description = "Delete AWS PrivateLink configuration for a subscription")]
async fn cloud_private_link_delete(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_private_link_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools.delete_private_link(params.subscription_id).await
}
#[tool(description = "Get AWS Transit Gateway attachments for a subscription")]
async fn cloud_transit_gateway_attachments_get(
&self,
Parameters(params): Parameters<SubscriptionIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
"Tool called: cloud_transit_gateway_attachments_get"
);
let tools = self.get_cloud_tools().await?;
tools
.get_transit_gateway_attachments(params.subscription_id)
.await
}
#[tool(description = "Delete an AWS Transit Gateway attachment")]
async fn cloud_transit_gateway_attachment_delete(
&self,
Parameters(params): Parameters<TransitGatewayAttachmentIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
subscription_id = params.subscription_id,
attachment_id = %params.attachment_id,
"Tool called: cloud_transit_gateway_attachment_delete"
);
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_cloud_tools().await?;
tools
.delete_transit_gateway_attachment(params.subscription_id, ¶ms.attachment_id)
.await
}
#[tool(description = "List all database groups in the Redis Enterprise cluster")]
async fn enterprise_bdb_groups_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_bdb_groups_list");
let tools = self.get_enterprise_tools().await?;
tools.list_bdb_groups().await
}
#[tool(description = "Get detailed information about a specific database group")]
async fn enterprise_bdb_group_get(
&self,
Parameters(params): Parameters<BdbGroupIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(uid = params.uid, "Tool called: enterprise_bdb_group_get");
let tools = self.get_enterprise_tools().await?;
tools.get_bdb_group(params.uid as u64).await
}
#[tool(description = "Delete a database group")]
async fn enterprise_bdb_group_delete(
&self,
Parameters(params): Parameters<BdbGroupIdParam>,
) -> Result<CallToolResult, RmcpError> {
info!(uid = params.uid, "Tool called: enterprise_bdb_group_delete");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_bdb_group(params.uid as u64).await
}
#[tool(description = "Get OCSP (Online Certificate Status Protocol) configuration")]
async fn enterprise_ocsp_config_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_ocsp_config_get");
let tools = self.get_enterprise_tools().await?;
tools.get_ocsp_config().await
}
#[tool(description = "Get OCSP status showing certificate validation state")]
async fn enterprise_ocsp_status_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_ocsp_status_get");
let tools = self.get_enterprise_tools().await?;
tools.get_ocsp_status().await
}
#[tool(description = "Test OCSP connectivity and certificate validation")]
async fn enterprise_ocsp_test(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_ocsp_test");
let tools = self.get_enterprise_tools().await?;
tools.test_ocsp().await
}
#[tool(description = "List all DNS suffixes in the Redis Enterprise cluster")]
async fn enterprise_suffixes_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_suffixes_list");
let tools = self.get_enterprise_tools().await?;
tools.list_suffixes().await
}
#[tool(description = "Get detailed information about a specific DNS suffix")]
async fn enterprise_suffix_get(
&self,
Parameters(params): Parameters<SuffixNameParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_suffix_get");
let tools = self.get_enterprise_tools().await?;
tools.get_suffix(¶ms.name).await
}
#[tool(description = "Get cluster-level DNS suffixes")]
async fn enterprise_cluster_suffixes_get(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: enterprise_cluster_suffixes_get");
let tools = self.get_enterprise_tools().await?;
tools.get_cluster_suffixes().await
}
#[tool(description = "Delete a DNS suffix")]
async fn enterprise_suffix_delete(
&self,
Parameters(params): Parameters<SuffixNameParam>,
) -> Result<CallToolResult, RmcpError> {
info!(name = %params.name, "Tool called: enterprise_suffix_delete");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_enterprise_tools().await?;
tools.delete_suffix(¶ms.name).await
}
#[tool(
description = "Execute a Redis command directly. Use for commands not covered by specific tools. Write commands are blocked in read-only mode."
)]
async fn database_execute(
&self,
Parameters(params): Parameters<DatabaseExecuteParam>,
) -> Result<CallToolResult, RmcpError> {
info!(command = %params.command, args = ?params.args, "Tool called: database_execute");
if self.config.read_only && is_write_command(¶ms.command) {
return Err(RmcpError::invalid_request(
format!(
"Command '{}' is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
params.command
),
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.execute(¶ms.command, ¶ms.args)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let json = value_to_json(&result);
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&json).unwrap_or_else(|_| json.to_string()),
)]))
}
#[tool(
description = "Execute multiple Redis commands in a single pipeline for improved performance. Reduces network round-trips by batching commands. Use atomic=true for MULTI/EXEC transactional execution."
)]
async fn database_pipeline(
&self,
Parameters(params): Parameters<DatabasePipelineParam>,
) -> Result<CallToolResult, RmcpError> {
info!(
command_count = params.commands.len(),
atomic = params.atomic,
"Tool called: database_pipeline"
);
if self.config.read_only {
for cmd in ¶ms.commands {
if is_write_command(&cmd.command) {
return Err(RmcpError::invalid_request(
format!(
"Command '{}' is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
cmd.command
),
None,
));
}
}
}
let pipeline_commands: Vec<crate::database_tools::PipelineCommand> = params
.commands
.iter()
.map(|c| crate::database_tools::PipelineCommand {
command: c.command.clone(),
args: c.args.clone(),
})
.collect();
let tools = self.get_database_tools().await?;
let start = std::time::Instant::now();
let results = tools
.execute_pipeline(&pipeline_commands, params.atomic)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let elapsed = start.elapsed();
let response: Vec<serde_json::Value> = params
.commands
.iter()
.zip(results.iter())
.map(|(cmd, result)| {
serde_json::json!({
"command": cmd.command,
"args": cmd.args,
"result": value_to_json(result)
})
})
.collect();
let output = serde_json::json!({
"commands": response,
"count": params.commands.len(),
"atomic": params.atomic,
"execution_time_ms": elapsed.as_secs_f64() * 1000.0
});
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string()),
)]))
}
#[tool(
description = "Get Redis server information (INFO command). Returns server stats, memory usage, replication info, etc."
)]
async fn database_info(
&self,
Parameters(params): Parameters<DatabaseInfoParam>,
) -> Result<CallToolResult, RmcpError> {
info!(section = ?params.section, "Tool called: database_info");
let tools = self.get_database_tools().await?;
let result = tools
.info(params.section.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(result)]))
}
#[tool(description = "Get the number of keys in the current database (DBSIZE command)")]
async fn database_dbsize(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_dbsize");
let tools = self.get_database_tools().await?;
let result = tools
.dbsize()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({ "dbsize": result }).to_string(),
)]))
}
#[tool(
description = "Scan keys matching a pattern (SCAN command). Safe alternative to KEYS that doesn't block the server."
)]
async fn database_scan(
&self,
Parameters(params): Parameters<DatabaseScanParam>,
) -> Result<CallToolResult, RmcpError> {
info!(pattern = %params.pattern, count = params.count, "Tool called: database_scan");
let tools = self.get_database_tools().await?;
let keys = tools
.scan(¶ms.pattern, params.count)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"pattern": params.pattern,
"count": keys.len(),
"keys": keys
})
.to_string(),
)]))
}
#[tool(
description = "Get the type of a key (TYPE command). Returns string, list, set, zset, hash, stream, or none."
)]
async fn database_type(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_type");
let tools = self.get_database_tools().await?;
let key_type = tools
.key_type(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"type": key_type
})
.to_string(),
)]))
}
#[tool(
description = "Get the TTL of a key in seconds (TTL command). Returns -1 if no expiration, -2 if key doesn't exist."
)]
async fn database_ttl(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_ttl");
let tools = self.get_database_tools().await?;
let ttl = tools
.ttl(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"ttl_seconds": ttl
})
.to_string(),
)]))
}
#[tool(description = "Get memory usage of a key in bytes (MEMORY USAGE command)")]
async fn database_memory_usage(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_memory_usage");
let tools = self.get_database_tools().await?;
let usage = tools
.memory_usage(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"memory_bytes": usage
})
.to_string(),
)]))
}
#[tool(
description = "Get slow log entries (SLOWLOG GET command). Shows queries that exceeded the slowlog threshold."
)]
async fn database_slowlog(
&self,
Parameters(params): Parameters<DatabaseSlowlogParam>,
) -> Result<CallToolResult, RmcpError> {
info!(count = ?params.count, "Tool called: database_slowlog");
let tools = self.get_database_tools().await?;
let result = tools
.slowlog_get(params.count)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let json = value_to_json(&result);
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&json).unwrap_or_else(|_| json.to_string()),
)]))
}
#[tool(description = "Get the number of entries in the slow log (SLOWLOG LEN command)")]
async fn database_slowlog_len(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_slowlog_len");
let tools = self.get_database_tools().await?;
let len = tools
.slowlog_len()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({ "slowlog_len": len }).to_string(),
)]))
}
#[tool(description = "Get list of connected clients (CLIENT LIST command)")]
async fn database_client_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_client_list");
let tools = self.get_database_tools().await?;
let result = tools
.client_list()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(result)]))
}
#[tool(description = "Get Redis configuration values (CONFIG GET command)")]
async fn database_config_get(
&self,
Parameters(params): Parameters<DatabaseConfigGetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(pattern = %params.pattern, "Tool called: database_config_get");
let tools = self.get_database_tools().await?;
let result = tools
.config_get(¶ms.pattern)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let config: serde_json::Map<String, serde_json::Value> = result
.into_iter()
.map(|(k, v)| (k, serde_json::Value::String(v)))
.collect();
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&config).unwrap_or_else(|_| "{}".to_string()),
)]))
}
#[tool(description = "List loaded Redis modules (MODULE LIST command)")]
async fn database_module_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_module_list");
let tools = self.get_database_tools().await?;
let result = tools
.module_list()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let json = value_to_json(&result);
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&json).unwrap_or_else(|_| json.to_string()),
)]))
}
#[tool(description = "Ping the Redis server to check connectivity")]
async fn database_ping(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_ping");
let tools = self.get_database_tools().await?;
let result = tools
.ping()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({ "response": result }).to_string(),
)]))
}
#[tool(description = "Get the value of a string key (GET command)")]
async fn database_get(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_get");
let tools = self.get_database_tools().await?;
let result = tools
.get(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"value": result
})
.to_string(),
)]))
}
#[tool(description = "Check if a key exists (EXISTS command)")]
async fn database_exists(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_exists");
let tools = self.get_database_tools().await?;
let exists = tools
.exists(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"exists": exists
})
.to_string(),
)]))
}
#[tool(description = "Get all fields and values of a hash (HGETALL command)")]
async fn database_hgetall(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_hgetall");
let tools = self.get_database_tools().await?;
let result = tools
.hgetall(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let hash: serde_json::Map<String, serde_json::Value> = result
.into_iter()
.map(|(k, v)| (k, serde_json::Value::String(v)))
.collect();
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"fields": hash
})
.to_string(),
)]))
}
#[tool(description = "Get the number of fields in a hash (HLEN command)")]
async fn database_hlen(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_hlen");
let tools = self.get_database_tools().await?;
let len = tools
.hlen(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"length": len
})
.to_string(),
)]))
}
#[tool(description = "Get a range of elements from a list (LRANGE command)")]
async fn database_lrange(
&self,
Parameters(params): Parameters<DatabaseLrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, start = params.start, stop = params.stop, "Tool called: database_lrange");
let tools = self.get_database_tools().await?;
let result = tools
.lrange(¶ms.key, params.start, params.stop)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"start": params.start,
"stop": params.stop,
"values": result
})
.to_string(),
)]))
}
#[tool(description = "Get the length of a list (LLEN command)")]
async fn database_llen(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_llen");
let tools = self.get_database_tools().await?;
let len = tools
.llen(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"length": len
})
.to_string(),
)]))
}
#[tool(description = "Get all members of a set (SMEMBERS command)")]
async fn database_smembers(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_smembers");
let tools = self.get_database_tools().await?;
let result = tools
.smembers(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"members": result
})
.to_string(),
)]))
}
#[tool(description = "Get the cardinality (size) of a set (SCARD command)")]
async fn database_scard(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_scard");
let tools = self.get_database_tools().await?;
let card = tools
.scard(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"cardinality": card
})
.to_string(),
)]))
}
#[tool(description = "Get a range of elements from a sorted set (ZRANGE command)")]
async fn database_zrange(
&self,
Parameters(params): Parameters<DatabaseZrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, start = params.start, stop = params.stop, "Tool called: database_zrange");
let tools = self.get_database_tools().await?;
let result = tools
.zrange(¶ms.key, params.start, params.stop)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"start": params.start,
"stop": params.stop,
"members": result
})
.to_string(),
)]))
}
#[tool(description = "Get the cardinality (size) of a sorted set (ZCARD command)")]
async fn database_zcard(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_zcard");
let tools = self.get_database_tools().await?;
let card = tools
.zcard(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"cardinality": card
})
.to_string(),
)]))
}
#[tool(
description = "Set a string value (SET command). Creates or overwrites the key. Supports optional expiration (ex for seconds, px for milliseconds) and conditional set (nx: only if not exists, xx: only if exists). Use this to store strings, numbers, or serialized data. Returns true if set succeeded, false if NX/XX condition failed."
)]
async fn database_set(
&self,
Parameters(params): Parameters<DatabaseSetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_set");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"SET is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let success = tools
.set(
¶ms.key,
¶ms.value,
params.ex,
params.px,
params.nx,
params.xx,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"success": success,
"message": if success { "Value set successfully" } else { "Set failed (NX/XX condition not met)" }
})
.to_string(),
)]))
}
#[tool(
description = "Delete one or more keys (DEL command). Removes keys and their associated values from the database. Returns the number of keys that were actually deleted (keys that didn't exist are not counted). Use this to remove data or clean up expired/unused keys."
)]
async fn database_del(
&self,
Parameters(params): Parameters<DatabaseDelParam>,
) -> Result<CallToolResult, RmcpError> {
info!(keys = ?params.keys, "Tool called: database_del");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"DEL is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let deleted = tools
.del(¶ms.keys)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"keys": params.keys,
"deleted": deleted
})
.to_string(),
)]))
}
#[tool(
description = "Set a key's expiration time in seconds (EXPIRE command). After the timeout, the key will be automatically deleted. Returns true if the timeout was set, false if the key doesn't exist. Use this to implement cache expiration, session timeouts, or temporary data."
)]
async fn database_expire(
&self,
Parameters(params): Parameters<DatabaseExpireParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, seconds = params.seconds, "Tool called: database_expire");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"EXPIRE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let success = tools
.expire(¶ms.key, params.seconds)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"seconds": params.seconds,
"success": success
})
.to_string(),
)]))
}
#[tool(
description = "Remove a key's expiration (PERSIST command). Makes the key persistent (no expiration). Returns true if the timeout was removed, false if the key doesn't exist or has no timeout."
)]
async fn database_persist(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_persist");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"PERSIST is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let success = tools
.persist(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"success": success
})
.to_string(),
)]))
}
#[tool(
description = "Increment a key's integer value by 1 (INCR command). If the key doesn't exist, it's created with value 0 before incrementing. Returns the new value. Use this for counters, rate limiters, or sequence generators. The value must be a valid integer string."
)]
async fn database_incr(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_incr");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"INCR is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let new_value = tools
.incr(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"value": new_value
})
.to_string(),
)]))
}
#[tool(
description = "Decrement a key's integer value by 1 (DECR command). If the key doesn't exist, it's created with value 0 before decrementing. Returns the new value. Use this for countdown counters or decrementing stock levels."
)]
async fn database_decr(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_decr");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"DECR is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let new_value = tools
.decr(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"value": new_value
})
.to_string(),
)]))
}
#[tool(
description = "Increment a key's integer value by a specific amount (INCRBY command). If the key doesn't exist, it's created with value 0 before incrementing. Use negative increment to decrement. Returns the new value. Use this for counters with custom increments like adding points or adjusting balances."
)]
async fn database_incrby(
&self,
Parameters(params): Parameters<DatabaseIncrbyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, increment = params.increment, "Tool called: database_incrby");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"INCRBY is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let new_value = tools
.incrby(¶ms.key, params.increment)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"increment": params.increment,
"value": new_value
})
.to_string(),
)]))
}
#[tool(
description = "Set a field in a hash (HSET command). Creates the hash if it doesn't exist. Returns 1 if the field is new, 0 if the field was updated. Use hashes to store objects like user profiles, product details, or configuration settings where you need to access individual fields."
)]
async fn database_hset(
&self,
Parameters(params): Parameters<DatabaseHsetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, field = %params.field, "Tool called: database_hset");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"HSET is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let added = tools
.hset(¶ms.key, ¶ms.field, ¶ms.value)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"field": params.field,
"added": added,
"message": if added == 1 { "New field created" } else { "Existing field updated" }
})
.to_string(),
)]))
}
#[tool(
description = "Set multiple fields in a hash at once (HSET with multiple field-value pairs). More efficient than multiple HSET calls. Creates the hash if it doesn't exist. Returns the number of new fields added. Use this to create or update entire objects in one operation."
)]
async fn database_hset_multiple(
&self,
Parameters(params): Parameters<DatabaseHsetMultipleParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, field_count = params.fields.len(), "Tool called: database_hset_multiple");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"HSET is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let fields: Vec<(String, String)> = params
.fields
.into_iter()
.map(|f| (f.field, f.value))
.collect();
let added = tools
.hset_multiple(¶ms.key, &fields)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"fields_processed": fields.len(),
"fields_added": added
})
.to_string(),
)]))
}
#[tool(
description = "Delete one or more fields from a hash (HDEL command). Returns the number of fields that were removed (non-existing fields are not counted). Use this to remove specific properties from an object without deleting the entire hash."
)]
async fn database_hdel(
&self,
Parameters(params): Parameters<DatabaseHdelParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, fields = ?params.fields, "Tool called: database_hdel");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"HDEL is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let deleted = tools
.hdel(¶ms.key, ¶ms.fields)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"fields": params.fields,
"deleted": deleted
})
.to_string(),
)]))
}
#[tool(
description = "Get a specific field from a hash (HGET command). Returns null if the field or hash doesn't exist. Use this when you only need one field from a hash instead of fetching all fields with HGETALL."
)]
async fn database_hget(
&self,
Parameters(params): Parameters<DatabaseHgetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, field = %params.field, "Tool called: database_hget");
let tools = self.get_database_tools().await?;
let value = tools
.hget(¶ms.key, ¶ms.field)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"field": params.field,
"value": value
})
.to_string(),
)]))
}
#[tool(
description = "Push values to the left (head) of a list (LPUSH command). Creates the list if it doesn't exist. Values are inserted at the head, so the last value in the input array will be the first element in the list. Returns the new length of the list. Use this for implementing stacks (LIFO) or adding items to the front of a queue."
)]
async fn database_lpush(
&self,
Parameters(params): Parameters<DatabaseListPushParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, value_count = params.values.len(), "Tool called: database_lpush");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"LPUSH is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let new_length = tools
.lpush(¶ms.key, ¶ms.values)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"values_pushed": params.values.len(),
"new_length": new_length
})
.to_string(),
)]))
}
#[tool(
description = "Push values to the right (tail) of a list (RPUSH command). Creates the list if it doesn't exist. Values are appended to the end in order. Returns the new length of the list. Use this for implementing queues (FIFO), event logs, or message lists where order matters."
)]
async fn database_rpush(
&self,
Parameters(params): Parameters<DatabaseListPushParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, value_count = params.values.len(), "Tool called: database_rpush");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"RPUSH is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let new_length = tools
.rpush(¶ms.key, ¶ms.values)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"values_pushed": params.values.len(),
"new_length": new_length
})
.to_string(),
)]))
}
#[tool(
description = "Pop and return a value from the left (head) of a list (LPOP command). Removes and returns the first element. Returns null if the list is empty or doesn't exist. Use this with RPUSH for queue (FIFO) behavior or with LPUSH for stack (LIFO) behavior."
)]
async fn database_lpop(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_lpop");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"LPOP is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let value = tools
.lpop(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"value": value
})
.to_string(),
)]))
}
#[tool(
description = "Pop and return a value from the right (tail) of a list (RPOP command). Removes and returns the last element. Returns null if the list is empty or doesn't exist. Use this with LPUSH for queue (FIFO) behavior."
)]
async fn database_rpop(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_rpop");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"RPOP is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let value = tools
.rpop(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"value": value
})
.to_string(),
)]))
}
#[tool(
description = "Get an element from a list by index (LINDEX command). Index is 0-based; negative indices count from the end (-1 is the last element). Returns null if the index is out of range. Use this to peek at specific positions without removing elements."
)]
async fn database_lindex(
&self,
Parameters(params): Parameters<DatabaseLindexParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, index = params.index, "Tool called: database_lindex");
let tools = self.get_database_tools().await?;
let value = tools
.lindex(¶ms.key, params.index)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"index": params.index,
"value": value
})
.to_string(),
)]))
}
#[tool(
description = "Set an element in a list at a specific index (LSET command). The index must be within the list bounds or an error is returned. Use this to update specific elements in a list without rebuilding the entire list."
)]
async fn database_lset(
&self,
Parameters(params): Parameters<DatabaseLsetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, index = params.index, "Tool called: database_lset");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"LSET is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
tools
.lset(¶ms.key, params.index, ¶ms.value)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"index": params.index,
"success": true
})
.to_string(),
)]))
}
#[tool(
description = "Add members to a set (SADD command). Creates the set if it doesn't exist. Sets store unique values - duplicates are automatically ignored. Returns the number of members that were actually added (not already present). Use sets for tags, categories, unique visitor tracking, or any collection where uniqueness matters."
)]
async fn database_sadd(
&self,
Parameters(params): Parameters<DatabaseSetMembersParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member_count = params.members.len(), "Tool called: database_sadd");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"SADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let added = tools
.sadd(¶ms.key, ¶ms.members)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"members_provided": params.members.len(),
"members_added": added
})
.to_string(),
)]))
}
#[tool(
description = "Remove members from a set (SREM command). Returns the number of members that were actually removed (members that didn't exist are not counted). Use this to untag items, remove categories, or delete specific values from a set."
)]
async fn database_srem(
&self,
Parameters(params): Parameters<DatabaseSetMembersParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member_count = params.members.len(), "Tool called: database_srem");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"SREM is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let removed = tools
.srem(¶ms.key, ¶ms.members)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"members_provided": params.members.len(),
"members_removed": removed
})
.to_string(),
)]))
}
#[tool(
description = "Check if a member exists in a set (SISMEMBER command). Returns true if the member is in the set, false otherwise. Use this to check membership before adding, or to verify tags/permissions."
)]
async fn database_sismember(
&self,
Parameters(params): Parameters<DatabaseSismemberParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member = %params.member, "Tool called: database_sismember");
let tools = self.get_database_tools().await?;
let is_member = tools
.sismember(¶ms.key, ¶ms.member)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"member": params.member,
"is_member": is_member
})
.to_string(),
)]))
}
#[tool(
description = "Add members with scores to a sorted set (ZADD command). Creates the sorted set if it doesn't exist. Members are automatically ordered by score. If a member already exists, its score is updated. Returns the number of new members added (not updated). Use sorted sets for leaderboards, priority queues, time-series data, or any ranked data."
)]
async fn database_zadd(
&self,
Parameters(params): Parameters<DatabaseZaddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member_count = params.members.len(), "Tool called: database_zadd");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"ZADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let members: Vec<(f64, String)> = params
.members
.into_iter()
.map(|m| (m.score, m.member))
.collect();
let added = tools
.zadd(¶ms.key, &members)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"members_provided": members.len(),
"members_added": added
})
.to_string(),
)]))
}
#[tool(
description = "Remove members from a sorted set (ZREM command). Returns the number of members that were actually removed. Use this to remove players from leaderboards, delete scheduled tasks, or clean up ranked data."
)]
async fn database_zrem(
&self,
Parameters(params): Parameters<DatabaseZremParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member_count = params.members.len(), "Tool called: database_zrem");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"ZREM is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let removed = tools
.zrem(¶ms.key, ¶ms.members)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"members_provided": params.members.len(),
"members_removed": removed
})
.to_string(),
)]))
}
#[tool(
description = "Get the score of a member in a sorted set (ZSCORE command). Returns null if the member doesn't exist. Use this to look up a player's score, check priority levels, or get the timestamp of a scheduled item."
)]
async fn database_zscore(
&self,
Parameters(params): Parameters<DatabaseZscoreParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member = %params.member, "Tool called: database_zscore");
let tools = self.get_database_tools().await?;
let score = tools
.zscore(¶ms.key, ¶ms.member)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"member": params.member,
"score": score
})
.to_string(),
)]))
}
#[tool(
description = "Get the rank (position) of a member in a sorted set (ZRANK command). Rank is 0-based with the lowest score at rank 0. Returns null if the member doesn't exist. Use this to find a player's position on a leaderboard or determine priority order."
)]
async fn database_zrank(
&self,
Parameters(params): Parameters<DatabaseZscoreParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member = %params.member, "Tool called: database_zrank");
let tools = self.get_database_tools().await?;
let rank = tools
.zrank(¶ms.key, ¶ms.member)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"member": params.member,
"rank": rank
})
.to_string(),
)]))
}
#[tool(
description = "Get the reverse rank (position from highest score) of a member in a sorted set (ZREVRANK command). Rank is 0-based with the highest score at rank 0. Returns null if the member doesn't exist. Use this for leaderboards where higher scores are better."
)]
async fn database_zrevrank(
&self,
Parameters(params): Parameters<DatabaseZscoreParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member = %params.member, "Tool called: database_zrevrank");
let tools = self.get_database_tools().await?;
let rank = tools
.zrevrank(¶ms.key, ¶ms.member)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"member": params.member,
"rank": rank
})
.to_string(),
)]))
}
#[tool(
description = "Get a range of members from a sorted set in reverse order, highest to lowest score (ZREVRANGE command). Use start=0, stop=-1 to get all members. Perfect for leaderboards where you want the top scorers first. Use this instead of ZRANGE when higher scores should appear first."
)]
async fn database_zrevrange(
&self,
Parameters(params): Parameters<DatabaseZrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, start = params.start, stop = params.stop, "Tool called: database_zrevrange");
let tools = self.get_database_tools().await?;
let members = tools
.zrevrange(¶ms.key, params.start, params.stop)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"start": params.start,
"stop": params.stop,
"members": members
})
.to_string(),
)]))
}
#[tool(
description = "Get members from a sorted set within a score range (ZRANGEBYSCORE command). Use \"-inf\" for negative infinity and \"+inf\" for positive infinity. Returns members with scores between min and max (inclusive). Use this to query time ranges, price ranges, or any score-based filtering."
)]
async fn database_zrangebyscore(
&self,
Parameters(params): Parameters<DatabaseZrangebyscoreParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, min = %params.min, max = %params.max, "Tool called: database_zrangebyscore");
let tools = self.get_database_tools().await?;
let min: f64 = match params.min.as_str() {
"-inf" => f64::NEG_INFINITY,
"+inf" => f64::INFINITY,
s => s.parse().map_err(|_| {
RmcpError::invalid_params(format!("Invalid min score: {}", s), None)
})?,
};
let max: f64 = match params.max.as_str() {
"-inf" => f64::NEG_INFINITY,
"+inf" => f64::INFINITY,
s => s.parse().map_err(|_| {
RmcpError::invalid_params(format!("Invalid max score: {}", s), None)
})?,
};
let members = tools
.zrangebyscore(¶ms.key, min, max)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"min": params.min,
"max": params.max,
"members": members
})
.to_string(),
)]))
}
#[tool(
description = "Increment a member's score in a sorted set (ZINCRBY command). Creates the member with the increment as its score if it doesn't exist. Returns the new score. Use negative increment to decrement. Perfect for updating leaderboard scores, adjusting priorities, or accumulating points."
)]
async fn database_zincrby(
&self,
Parameters(params): Parameters<DatabaseZincrbyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, member = %params.member, increment = params.increment, "Tool called: database_zincrby");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"ZINCRBY is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let new_score = tools
.zincrby(¶ms.key, params.increment, ¶ms.member)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"member": params.member,
"increment": params.increment,
"new_score": new_score
})
.to_string(),
)]))
}
#[tool(
description = "Get a range of members with their scores from a sorted set (ZRANGE WITHSCORES). Returns members ordered from lowest to highest score, each with their score. Use this when you need both the member and its score, like displaying a leaderboard with points."
)]
async fn database_zrange_withscores(
&self,
Parameters(params): Parameters<DatabaseZrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, start = params.start, stop = params.stop, "Tool called: database_zrange_withscores");
let tools = self.get_database_tools().await?;
let members = tools
.zrange_withscores(¶ms.key, params.start, params.stop)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let result: Vec<serde_json::Value> = members
.into_iter()
.map(|(member, score)| serde_json::json!({"member": member, "score": score}))
.collect();
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"start": params.start,
"stop": params.stop,
"members": result
})
.to_string(),
)]))
}
#[tool(
description = "Get a range of members with their scores in reverse order, highest to lowest (ZREVRANGE WITHSCORES). Returns members from highest to lowest score, each with their score. Perfect for leaderboards showing top players with their points."
)]
async fn database_zrevrange_withscores(
&self,
Parameters(params): Parameters<DatabaseZrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, start = params.start, stop = params.stop, "Tool called: database_zrevrange_withscores");
let tools = self.get_database_tools().await?;
let members = tools
.zrevrange_withscores(¶ms.key, params.start, params.stop)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
let result: Vec<serde_json::Value> = members
.into_iter()
.map(|(member, score)| serde_json::json!({"member": member, "score": score}))
.collect();
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"start": params.start,
"stop": params.stop,
"members": result
})
.to_string(),
)]))
}
#[tool(
description = "Rename a key (RENAME command). Atomically renames a key to a new name. If the new key already exists, it will be overwritten. Returns an error if the source key doesn't exist. Use this to reorganize your key namespace or implement atomic key swaps."
)]
async fn database_rename(
&self,
Parameters(params): Parameters<DatabaseRenameParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, new_key = %params.new_key, "Tool called: database_rename");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"RENAME is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
tools
.rename(¶ms.key, ¶ms.new_key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"old_key": params.key,
"new_key": params.new_key,
"success": true
})
.to_string(),
)]))
}
#[tool(
description = "Search a RediSearch index (FT.SEARCH command). Executes a full-text search query against an index, returning matching documents. Supports filters, sorting, pagination, highlighting, and scoring. Use NOCONTENT to get only document IDs for large result sets."
)]
async fn database_ft_search(
&self,
Parameters(params): Parameters<FtSearchParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, query = %params.query, "Tool called: database_ft_search");
let tools = self.get_database_tools().await?;
use crate::database_tools::FtSearchOptions;
let options = FtSearchOptions {
nocontent: params.nocontent,
verbatim: params.verbatim,
withscores: params.withscores,
return_fields: params.return_fields,
sortby: params.sortby,
sortby_desc: params.sortby_desc,
limit_offset: params.limit_offset,
limit_num: params.limit_num,
highlight_fields: params.highlight_fields,
highlight_tags_open: params.highlight_open,
highlight_tags_close: params.highlight_close,
language: params.language,
slop: params.slop,
inorder: params.inorder,
timeout: params.timeout,
dialect: params.dialect,
};
let result = tools
.ft_search(¶ms.index, ¶ms.query, &options)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"query": params.query,
"result": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Run an aggregation query on a RediSearch index (FT.AGGREGATE command). Performs complex aggregations including grouping, sorting, applying transformations, and reducing. Powerful for analytics and reporting on indexed data."
)]
async fn database_ft_aggregate(
&self,
Parameters(params): Parameters<FtAggregateParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, query = %params.query, "Tool called: database_ft_aggregate");
let tools = self.get_database_tools().await?;
use crate::database_tools::{FtAggregateOptions, FtApply, FtGroupBy, FtReducer};
let groupby = params
.groupby
.into_iter()
.map(|g| FtGroupBy {
properties: g.properties,
reducers: g
.reducers
.into_iter()
.map(|r| FtReducer {
function: r.function,
args: r.args,
alias: r.alias,
})
.collect(),
})
.collect();
let apply = params
.apply
.into_iter()
.map(|a| FtApply {
expression: a.expression,
alias: a.alias,
})
.collect();
let sortby = params.sortby.map(|sb| {
sb.into_iter()
.filter_map(|pair| {
if pair.len() >= 2 {
Some((pair[0].clone(), pair[1].clone()))
} else if pair.len() == 1 {
Some((pair[0].clone(), "ASC".to_string()))
} else {
None
}
})
.collect()
});
let options = FtAggregateOptions {
verbatim: params.verbatim,
load: params.load,
groupby,
apply,
sortby,
sortby_max: params.sortby_max,
filter: params.filter,
limit_offset: params.limit_offset,
limit_num: params.limit_num,
timeout: params.timeout,
dialect: params.dialect,
};
let result = tools
.ft_aggregate(¶ms.index, ¶ms.query, &options)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"query": params.query,
"result": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get information about a RediSearch index (FT.INFO command). Returns index schema, number of documents, indexing status, memory usage, and configuration. Useful for monitoring and debugging index performance."
)]
async fn database_ft_info(
&self,
Parameters(params): Parameters<FtIndexParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, "Tool called: database_ft_info");
let tools = self.get_database_tools().await?;
let result = tools
.ft_info(¶ms.index)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"info": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "List all RediSearch indexes (FT._LIST command). Returns the names of all full-text search indexes in the database. Use FT.INFO on individual indexes for detailed information."
)]
async fn database_ft_list(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_ft_list");
let tools = self.get_database_tools().await?;
let indexes = tools
.ft_list()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"indexes": indexes
})
.to_string(),
)]))
}
#[tool(
description = "Create a new RediSearch index with schema definition (FT.CREATE command). This is the primary command for setting up full-text search. Define which keys to index using prefixes, and specify fields with their types (TEXT for full-text, TAG for exact match, NUMERIC for ranges, GEO for location, VECTOR for embeddings). Each field can have options like SORTABLE, NOSTEM, PHONETIC matching, etc. Use 'on' parameter to choose between HASH and JSON document types."
)]
async fn database_ft_create(
&self,
Parameters(params): Parameters<FtCreateParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, "Tool called: database_ft_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.CREATE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_create(¶ms)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"result": value_to_json(&result),
"message": "Index created successfully"
})
.to_string(),
)]))
}
#[tool(
description = "Delete a RediSearch index (FT.DROPINDEX command). Removes the index and optionally deletes the indexed documents. Without 'dd' flag, only the index is removed and documents remain. With 'dd' flag, both the index AND the actual Redis keys are deleted - use with caution in production!"
)]
async fn database_ft_dropindex(
&self,
Parameters(params): Parameters<FtDropIndexParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, dd = %params.dd, "Tool called: database_ft_dropindex");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.DROPINDEX is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_dropindex(¶ms.index, params.dd)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"documents_deleted": params.dd,
"result": value_to_json(&result),
"message": if params.dd { "Index and documents deleted" } else { "Index deleted (documents preserved)" }
})
.to_string(),
)]))
}
#[tool(
description = "Add a new field to an existing RediSearch index (FT.ALTER command). Only supports adding fields - you cannot modify or remove existing fields. Useful for evolving your search schema as requirements change. Use skip_initial_scan=true to avoid rescanning existing documents (the new field will only be indexed for new/modified documents)."
)]
async fn database_ft_alter(
&self,
Parameters(params): Parameters<FtAlterParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, field = %params.field.name, "Tool called: database_ft_alter");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.ALTER is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_alter(¶ms.index, params.skip_initial_scan, ¶ms.field)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"field_added": params.field.name,
"result": value_to_json(&result),
"message": "Field added to index schema"
})
.to_string(),
)]))
}
#[tool(
description = "Get the execution plan for a query without running it (FT.EXPLAIN command). Essential for debugging slow queries and understanding how RediSearch parses and executes your query. Returns a textual representation of the query tree showing INTERSECT, UNION, NUMERIC, TAG operations. Use this to optimize complex queries by understanding which operations are most expensive."
)]
async fn database_ft_explain(
&self,
Parameters(params): Parameters<FtExplainParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, query = %params.query, "Tool called: database_ft_explain");
let tools = self.get_database_tools().await?;
let result = tools
.ft_explain(¶ms.index, ¶ms.query, params.dialect)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"query": params.query,
"execution_plan": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get all unique values for a TAG field (FT.TAGVALS command). Returns every distinct tag value that exists in the indexed documents. Useful for: building filter UIs/facets, understanding data distribution, debugging why tag filters aren't matching, validating data quality. Note: Only works on TAG type fields, not TEXT fields."
)]
async fn database_ft_tagvals(
&self,
Parameters(params): Parameters<FtTagvalsParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, field = %params.field, "Tool called: database_ft_tagvals");
let tools = self.get_database_tools().await?;
let result = tools
.ft_tagvals(¶ms.index, ¶ms.field)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"field": params.field,
"values": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get spelling suggestions for query terms (FT.SPELLCHECK command). Checks each term in the query against the index vocabulary and suggests corrections. Perfect for implementing 'did you mean?' functionality. The distance parameter controls how different suggestions can be (1=one character difference like typos, up to 4 for more aggressive matching). Returns suggestions ranked by how common the suggested term is in the index."
)]
async fn database_ft_spellcheck(
&self,
Parameters(params): Parameters<FtSpellcheckParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, query = %params.query, "Tool called: database_ft_spellcheck");
let tools = self.get_database_tools().await?;
let result = tools
.ft_spellcheck(
¶ms.index,
¶ms.query,
params.distance,
params.dialect,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"query": params.query,
"suggestions": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Create an alias pointing to an index (FT.ALIASADD command). Aliases enable zero-downtime index rebuilds: create alias 'products' -> 'products_v1', rebuild to 'products_v2', then update alias. Your application always queries 'products' and instantly switches to the new index. Aliases also allow multiple names for the same index for different use cases."
)]
async fn database_ft_aliasadd(
&self,
Parameters(params): Parameters<FtAliasAddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(alias = %params.alias, index = %params.index, "Tool called: database_ft_aliasadd");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.ALIASADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_aliasadd(¶ms.alias, ¶ms.index)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"alias": params.alias,
"index": params.index,
"result": value_to_json(&result),
"message": format!("Alias '{}' now points to index '{}'", params.alias, params.index)
})
.to_string(),
)]))
}
#[tool(
description = "Delete an index alias (FT.ALIASDEL command). Removes the alias but does NOT affect the underlying index or its data. After deletion, queries using the alias name will fail until a new alias is created."
)]
async fn database_ft_aliasdel(
&self,
Parameters(params): Parameters<FtAliasDelParam>,
) -> Result<CallToolResult, RmcpError> {
info!(alias = %params.alias, "Tool called: database_ft_aliasdel");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.ALIASDEL is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_aliasdel(¶ms.alias)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"alias": params.alias,
"result": value_to_json(&result),
"message": format!("Alias '{}' deleted", params.alias)
})
.to_string(),
)]))
}
#[tool(
description = "Update an alias to point to a different index (FT.ALIASUPDATE command). This is atomic - queries instantly switch to the new index with no downtime. If the alias doesn't exist, it will be created. Use for blue-green deployments: rebuild index, test it, then atomically switch production traffic."
)]
async fn database_ft_aliasupdate(
&self,
Parameters(params): Parameters<FtAliasUpdateParam>,
) -> Result<CallToolResult, RmcpError> {
info!(alias = %params.alias, index = %params.index, "Tool called: database_ft_aliasupdate");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.ALIASUPDATE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_aliasupdate(¶ms.alias, ¶ms.index)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"alias": params.alias,
"index": params.index,
"result": value_to_json(&result),
"message": format!("Alias '{}' updated to point to index '{}'", params.alias, params.index)
})
.to_string(),
)]))
}
#[tool(
description = "Add a suggestion to an autocomplete dictionary (FT.SUGADD command). Build type-ahead search functionality by storing suggestions with scores. Higher scores rank suggestions higher. Use 'incr' to update scores based on popularity (e.g., increment each time a suggestion is selected). Optionally store payload data like IDs or categories with each suggestion."
)]
async fn database_ft_sugadd(
&self,
Parameters(params): Parameters<FtSugAddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, string = %params.string, score = %params.score, "Tool called: database_ft_sugadd");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.SUGADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_sugadd(
¶ms.key,
¶ms.string,
params.score,
params.incr,
params.payload.as_deref(),
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"suggestion": params.string,
"score": params.score,
"incremented": params.incr,
"result": value_to_json(&result),
"message": "Suggestion added to autocomplete dictionary"
})
.to_string(),
)]))
}
#[tool(
description = "Get autocomplete suggestions matching a prefix (FT.SUGGET command). Returns suggestions starting with the given prefix, ranked by score. Enable 'fuzzy' for typo tolerance (matches with 1 character difference). Use 'max' to limit results. 'withscores' returns ranking scores, 'withpayloads' returns stored metadata. Perfect for search-as-you-type interfaces."
)]
async fn database_ft_sugget(
&self,
Parameters(params): Parameters<FtSugGetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, prefix = %params.prefix, "Tool called: database_ft_sugget");
let tools = self.get_database_tools().await?;
let result = tools
.ft_sugget(
¶ms.key,
¶ms.prefix,
params.fuzzy,
params.max,
params.withscores,
params.withpayloads,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"prefix": params.prefix,
"fuzzy": params.fuzzy,
"suggestions": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Delete a suggestion from an autocomplete dictionary (FT.SUGDEL command). Removes the exact suggestion string from the dictionary. Returns 1 if the suggestion was found and deleted, 0 if not found."
)]
async fn database_ft_sugdel(
&self,
Parameters(params): Parameters<FtSugDelParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, string = %params.string, "Tool called: database_ft_sugdel");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.SUGDEL is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_sugdel(¶ms.key, ¶ms.string)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"suggestion": params.string,
"deleted": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get the number of suggestions in an autocomplete dictionary (FT.SUGLEN command). Returns the total count of unique suggestions stored. Useful for monitoring dictionary size and capacity planning."
)]
async fn database_ft_suglen(
&self,
Parameters(params): Parameters<FtSugLenParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_ft_suglen");
let tools = self.get_database_tools().await?;
let result = tools
.ft_suglen(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"count": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get all synonym groups from an index (FT.SYNDUMP command). Returns a mapping of terms to their synonym group IDs. Useful for reviewing current synonym configuration, debugging why searches aren't matching expected synonyms, and exporting synonym data for backup."
)]
async fn database_ft_syndump(
&self,
Parameters(params): Parameters<FtSynDumpParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, "Tool called: database_ft_syndump");
let tools = self.get_database_tools().await?;
let result = tools
.ft_syndump(¶ms.index)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"synonyms": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Add or update a synonym group (FT.SYNUPDATE command). Synonyms make searching for one term match documents containing related terms. Example: group 'color' with terms ['red', 'crimson', 'scarlet'] - searching for 'red' finds documents with any of these terms. Each call adds terms to the group (doesn't replace). Use skip_initial_scan=true to only apply to new documents."
)]
async fn database_ft_synupdate(
&self,
Parameters(params): Parameters<FtSynUpdateParam>,
) -> Result<CallToolResult, RmcpError> {
info!(index = %params.index, group_id = %params.group_id, terms = ?params.terms, "Tool called: database_ft_synupdate");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"FT.SYNUPDATE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.ft_synupdate(
¶ms.index,
¶ms.group_id,
params.skip_initial_scan,
¶ms.terms,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"index": params.index,
"group_id": params.group_id,
"terms": params.terms,
"result": value_to_json(&result),
"message": format!("Synonym group '{}' updated with {} terms", params.group_id, params.terms.len())
})
.to_string(),
)]))
}
#[tool(
description = "Get JSON value(s) from a key (JSON.GET command). Retrieves JSON data at one or more paths. Returns the JSON-encoded value. Use JSONPath syntax for paths (e.g., '$.store.book[0].title' or '$..price' for recursive). Multiple paths return an object with path keys."
)]
async fn database_json_get(
&self,
Parameters(params): Parameters<JsonGetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_json_get");
let tools = self.get_database_tools().await?;
let paths = if params.paths.is_empty() {
vec!["$".to_string()]
} else {
params.paths
};
let result = tools
.json_get(
¶ms.key,
&paths,
params.indent.as_deref(),
params.newline.as_deref(),
params.space.as_deref(),
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"paths": paths,
"value": result
})
.to_string(),
)]))
}
#[tool(
description = "Set a JSON value at a path (JSON.SET command). Creates or updates JSON data. Use NX to only set if path doesn't exist, XX to only update existing paths. The value must be valid JSON. Path '$' sets the root."
)]
async fn database_json_set(
&self,
Parameters(params): Parameters<JsonSetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_set");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.SET is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let success = tools
.json_set(
¶ms.key,
¶ms.path,
¶ms.value,
params.nx,
params.xx,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"success": success
})
.to_string(),
)]))
}
#[tool(
description = "Delete a JSON value at a path (JSON.DEL command). Removes the JSON value at the specified path. Returns the number of paths deleted. If path is omitted, deletes the entire key."
)]
async fn database_json_del(
&self,
Parameters(params): Parameters<JsonDelParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_json_del");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.DEL is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let deleted = tools
.json_del(¶ms.key, params.path.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"deleted": deleted
})
.to_string(),
)]))
}
#[tool(
description = "Get the type of JSON value at a path (JSON.TYPE command). Returns the JSON type: object, array, string, integer, number, boolean, or null. Useful for introspecting JSON structure."
)]
async fn database_json_type(
&self,
Parameters(params): Parameters<JsonPathParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_json_type");
let tools = self.get_database_tools().await?;
let result = tools
.json_type(¶ms.key, params.path.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"type": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Append values to a JSON array (JSON.ARRAPPEND command). Adds one or more JSON values to the end of the array at the specified path. Returns the new array length. Values must be valid JSON."
)]
async fn database_json_arrappend(
&self,
Parameters(params): Parameters<JsonArrAppendParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_arrappend");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.ARRAPPEND is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_arrappend(¶ms.key, ¶ms.path, ¶ms.values)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"new_length": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get the length of a JSON array (JSON.ARRLEN command). Returns the number of elements in the array at the specified path. Returns null if the path doesn't exist or isn't an array."
)]
async fn database_json_arrlen(
&self,
Parameters(params): Parameters<JsonPathParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_json_arrlen");
let tools = self.get_database_tools().await?;
let result = tools
.json_arrlen(¶ms.key, params.path.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"length": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Increment a JSON number by a value (JSON.NUMINCRBY command). Atomically increments the number at the specified path. Returns the new value. Use negative values to decrement."
)]
async fn database_json_numincrby(
&self,
Parameters(params): Parameters<JsonNumIncrByParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, value = %params.value, "Tool called: database_json_numincrby");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.NUMINCRBY is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_numincrby(¶ms.key, ¶ms.path, params.value)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"increment": params.value,
"new_value": result
})
.to_string(),
)]))
}
#[tool(
description = "Get the length of a JSON string (JSON.STRLEN command). Returns the length of the string at the specified path. Returns null if the path doesn't exist or isn't a string."
)]
async fn database_json_strlen(
&self,
Parameters(params): Parameters<JsonPathParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_json_strlen");
let tools = self.get_database_tools().await?;
let result = tools
.json_strlen(¶ms.key, params.path.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"length": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get JSON values from multiple keys at once (JSON.MGET command). Efficiently retrieves the same JSONPath from many documents in a single operation. Returns an array with values for each key (null for missing keys). Essential for batch reads - much faster than multiple JSON.GET calls."
)]
async fn database_json_mget(
&self,
Parameters(params): Parameters<JsonMgetParam>,
) -> Result<CallToolResult, RmcpError> {
info!(keys = ?params.keys, path = %params.path, "Tool called: database_json_mget");
let tools = self.get_database_tools().await?;
let result = tools
.json_mget(¶ms.keys, ¶ms.path)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"keys": params.keys,
"path": params.path,
"values": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get all keys from a JSON object (JSON.OBJKEYS command). Returns an array of field names at the specified path. Useful for introspecting document structure, building dynamic UIs, or validating schemas. Path must point to an object, not an array or scalar."
)]
async fn database_json_objkeys(
&self,
Parameters(params): Parameters<JsonObjKeysParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_objkeys");
let tools = self.get_database_tools().await?;
let result = tools
.json_objkeys(¶ms.key, ¶ms.path)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"keys": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get the number of keys in a JSON object (JSON.OBJLEN command). Returns the count of fields at the specified path. Useful for checking object size without retrieving all keys. Path must point to an object."
)]
async fn database_json_objlen(
&self,
Parameters(params): Parameters<JsonObjLenParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_objlen");
let tools = self.get_database_tools().await?;
let result = tools
.json_objlen(¶ms.key, ¶ms.path)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"length": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Find the index of an element in a JSON array (JSON.ARRINDEX command). Returns the first index where the value is found, or -1 if not found. Supports optional start/stop indices to search within a range. The value must be valid JSON (use '\"string\"' for string values)."
)]
async fn database_json_arrindex(
&self,
Parameters(params): Parameters<JsonArrIndexParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, value = %params.value, "Tool called: database_json_arrindex");
let tools = self.get_database_tools().await?;
let result = tools
.json_arrindex(
¶ms.key,
¶ms.path,
¶ms.value,
params.start,
params.stop,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"value": params.value,
"index": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Remove and return an element from a JSON array (JSON.ARRPOP command). By default pops the last element (-1). Use index 0 for first element. Negative indices count from end. Returns the popped value as JSON. Useful for implementing queues or stacks."
)]
async fn database_json_arrpop(
&self,
Parameters(params): Parameters<JsonArrPopParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_arrpop");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.ARRPOP is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_arrpop(¶ms.key, ¶ms.path, params.index)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"popped": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Trim a JSON array to a specified range (JSON.ARRTRIM command). Keeps only elements from start to stop (inclusive). Elements outside this range are removed. Useful for maintaining bounded arrays like activity logs or recent items. Negative indices count from end."
)]
async fn database_json_arrtrim(
&self,
Parameters(params): Parameters<JsonArrTrimParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, start = %params.start, stop = %params.stop, "Tool called: database_json_arrtrim");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.ARRTRIM is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_arrtrim(¶ms.key, ¶ms.path, params.start, params.stop)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"new_length": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Insert elements into a JSON array at a specific index (JSON.ARRINSERT command). Existing elements at and after the index shift right to make room. Negative indices count from end. Returns the new array length. Useful for inserting at specific positions."
)]
async fn database_json_arrinsert(
&self,
Parameters(params): Parameters<JsonArrInsertParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, index = %params.index, "Tool called: database_json_arrinsert");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.ARRINSERT is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_arrinsert(¶ms.key, ¶ms.path, params.index, ¶ms.values)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"new_length": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Clear container values or set numbers to 0 (JSON.CLEAR command). For arrays: becomes []. For objects: becomes {}. For numbers: becomes 0. Strings and booleans are unchanged. Returns the count of values cleared. Useful for resetting parts of a document."
)]
async fn database_json_clear(
&self,
Parameters(params): Parameters<JsonClearParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_clear");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.CLEAR is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_clear(¶ms.key, ¶ms.path)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"cleared_count": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Toggle a boolean value (JSON.TOGGLE command). true becomes false, false becomes true. Returns the new boolean value(s). Errors if the path doesn't point to a boolean. Useful for feature flags and status toggles."
)]
async fn database_json_toggle(
&self,
Parameters(params): Parameters<JsonToggleParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, path = %params.path, "Tool called: database_json_toggle");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"JSON.TOGGLE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.json_toggle(¶ms.key, ¶ms.path)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"path": params.path,
"new_value": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Add a sample to a time series (TS.ADD command). Appends a timestamp-value pair. Use '*' for timestamp to auto-generate. Supports retention policy, encoding, chunk size, duplicate policy, and labels. Creates the key if it doesn't exist."
)]
async fn database_ts_add(
&self,
Parameters(params): Parameters<TsAddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, timestamp = %params.timestamp, value = %params.value, "Tool called: database_ts_add");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"TS.ADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
use crate::database_tools::TsAddOptions;
let labels = params
.labels
.map(|l| l.into_iter().map(|lp| (lp.label, lp.value)).collect());
let options = TsAddOptions {
retention: params.retention,
encoding: params.encoding,
chunk_size: params.chunk_size,
on_duplicate: params.on_duplicate,
labels,
};
let result_ts = tools
.ts_add(¶ms.key, ¶ms.timestamp, params.value, &options)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"timestamp": result_ts,
"value": params.value
})
.to_string(),
)]))
}
#[tool(
description = "Get the last sample from a time series (TS.GET command). Returns the most recent timestamp-value pair. Useful for getting current/latest readings from sensors, metrics, etc."
)]
async fn database_ts_get(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_ts_get");
let tools = self.get_database_tools().await?;
let result = tools
.ts_get(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"sample": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Query a range of samples from a time series (TS.RANGE command). Returns samples between two timestamps. Supports filtering, counting, alignment, and aggregation (avg, sum, min, max, count, first, last, range, std.p, std.s, var.p, var.s)."
)]
async fn database_ts_range(
&self,
Parameters(params): Parameters<TsRangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, from = %params.from, to = %params.to, "Tool called: database_ts_range");
let tools = self.get_database_tools().await?;
use crate::database_tools::{TsAggregation, TsRangeOptions};
let aggregation = params.aggregation.map(|agg_type| TsAggregation {
aggregator: agg_type,
bucket_duration: params.bucket_duration.unwrap_or(1000), bucket_timestamp: None,
empty: false,
});
let options = TsRangeOptions {
latest: params.latest,
filter_by_ts: params.filter_by_ts,
filter_by_value_min: params.filter_by_value_min,
filter_by_value_max: params.filter_by_value_max,
count: params.count,
align: params.align,
aggregation,
};
let result = tools
.ts_range(¶ms.key, ¶ms.from, ¶ms.to, &options)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"from": params.from,
"to": params.to,
"samples": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Get information about a time series (TS.INFO command). Returns metadata including retention, chunk count, memory usage, first/last timestamps, labels, and rules. Useful for monitoring and debugging."
)]
async fn database_ts_info(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_ts_info");
let tools = self.get_database_tools().await?;
let result = tools
.ts_info(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"info": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Create a new time series (TS.CREATE command). Creates an empty time series with optional retention, encoding, chunk size, duplicate policy, and labels. Use this to pre-configure a time series before adding samples."
)]
async fn database_ts_create(
&self,
Parameters(params): Parameters<TsCreateParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_ts_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"TS.CREATE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
use crate::database_tools::TsCreateOptions;
let labels = params
.labels
.map(|l| l.into_iter().map(|lp| (lp.label, lp.value)).collect());
let options = TsCreateOptions {
retention: params.retention,
encoding: params.encoding,
chunk_size: params.chunk_size,
duplicate_policy: params.duplicate_policy,
labels,
};
tools
.ts_create(¶ms.key, &options)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"created": true
})
.to_string(),
)]))
}
#[tool(
description = "Create an empty Bloom filter (BF.RESERVE command). Initializes a Bloom filter with specified error rate and capacity. Use expansion factor for auto-scaling, or nonscaling to fix size. Lower error rate = more memory but fewer false positives."
)]
async fn database_bf_reserve(
&self,
Parameters(params): Parameters<BfReserveParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, error_rate = %params.error_rate, capacity = %params.capacity, "Tool called: database_bf_reserve");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"BF.RESERVE is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
tools
.bf_reserve(
¶ms.key,
params.error_rate,
params.capacity,
params.expansion,
params.nonscaling,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"error_rate": params.error_rate,
"capacity": params.capacity,
"created": true
})
.to_string(),
)]))
}
#[tool(
description = "Add an item to a Bloom filter (BF.ADD command). Adds a single item to the filter. Returns true if the item is newly added, false if it may have existed (could be false positive). Creates the filter with default parameters if it doesn't exist."
)]
async fn database_bf_add(
&self,
Parameters(params): Parameters<BfAddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, item = %params.item, "Tool called: database_bf_add");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"BF.ADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let added = tools
.bf_add(¶ms.key, ¶ms.item)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"item": params.item,
"added": added
})
.to_string(),
)]))
}
#[tool(
description = "Add multiple items to a Bloom filter (BF.MADD command). Adds multiple items in a single operation. Returns an array of booleans indicating whether each item was newly added. More efficient than multiple BF.ADD calls."
)]
async fn database_bf_madd(
&self,
Parameters(params): Parameters<BfMaddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, count = %params.items.len(), "Tool called: database_bf_madd");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"BF.MADD is a write operation. Server is in read-only mode. Use --allow-writes to enable write operations.".to_string(),
None,
));
}
let tools = self.get_database_tools().await?;
let results = tools
.bf_madd(¶ms.key, ¶ms.items)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"results": params.items.iter().zip(results.iter()).map(|(item, added)| {
serde_json::json!({"item": item, "added": added})
}).collect::<Vec<_>>()
})
.to_string(),
)]))
}
#[tool(
description = "Check if an item exists in a Bloom filter (BF.EXISTS command). Returns true if the item may exist (with false positive probability), false if it definitely doesn't exist. Bloom filters never have false negatives."
)]
async fn database_bf_exists(
&self,
Parameters(params): Parameters<BfExistsParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, item = %params.item, "Tool called: database_bf_exists");
let tools = self.get_database_tools().await?;
let exists = tools
.bf_exists(¶ms.key, ¶ms.item)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"item": params.item,
"exists": exists
})
.to_string(),
)]))
}
#[tool(
description = "Check if multiple items exist in a Bloom filter (BF.MEXISTS command). Checks multiple items in a single operation. Returns an array of booleans. More efficient than multiple BF.EXISTS calls."
)]
async fn database_bf_mexists(
&self,
Parameters(params): Parameters<BfMexistsParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, count = %params.items.len(), "Tool called: database_bf_mexists");
let tools = self.get_database_tools().await?;
let results = tools
.bf_mexists(¶ms.key, ¶ms.items)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"results": params.items.iter().zip(results.iter()).map(|(item, exists)| {
serde_json::json!({"item": item, "exists": exists})
}).collect::<Vec<_>>()
})
.to_string(),
)]))
}
#[tool(
description = "Get information about a Bloom filter (BF.INFO command). Returns filter metadata including capacity, size, number of filters, items inserted, and expansion rate. Useful for monitoring filter health and memory usage."
)]
async fn database_bf_info(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_bf_info");
let tools = self.get_database_tools().await?;
let result = tools
.bf_info(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"info": value_to_json(&result)
})
.to_string(),
)]))
}
#[tool(
description = "Add an entry to a Redis stream (XADD command). Streams are append-only logs perfect for event sourcing, activity feeds, and message queues. Each entry has an auto-generated or specified ID and contains field-value pairs. Use maxlen to cap stream size and prevent unbounded growth."
)]
async fn database_xadd(
&self,
Parameters(params): Parameters<XaddParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, id = %params.id, fields = params.fields.len(), "Tool called: database_xadd");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XADD is a write operation. Server is in read-only mode.",
None,
));
}
let fields: Vec<(String, String)> = params
.fields
.into_iter()
.map(|f| (f.field, f.value))
.collect();
let tools = self.get_database_tools().await?;
let result = tools
.xadd(
¶ms.key,
¶ms.id,
&fields,
params.maxlen,
params.approximate,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"entry_id": result
})
.to_string(),
)]))
}
#[tool(
description = "Read entries from one or more Redis streams (XREAD command). Returns entries with IDs greater than the specified IDs. Use \"0\" to read from the beginning, \"$\" to read only new entries. Supports blocking mode to wait for new entries - useful for real-time consumers."
)]
async fn database_xread(
&self,
Parameters(params): Parameters<XreadParam>,
) -> Result<CallToolResult, RmcpError> {
info!(keys = ?params.keys, ids = ?params.ids, "Tool called: database_xread");
let tools = self.get_database_tools().await?;
let result = tools
.xread(¶ms.keys, ¶ms.ids, params.count, params.block)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Read entries from a stream within an ID range (XRANGE command). Use \"-\" for the first entry and \"+\" for the last entry. Returns entries in chronological order. Perfect for replaying events or paginating through stream history."
)]
async fn database_xrange(
&self,
Parameters(params): Parameters<XrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, start = %params.start, end = %params.end, "Tool called: database_xrange");
let tools = self.get_database_tools().await?;
let result = tools
.xrange(¶ms.key, ¶ms.start, ¶ms.end, params.count)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Read entries from a stream in reverse order (XREVRANGE command). Returns entries from newest to oldest. Use \"+\" for the last entry and \"-\" for the first. Useful for getting the most recent entries first."
)]
async fn database_xrevrange(
&self,
Parameters(params): Parameters<XrevrangeParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, end = %params.end, start = %params.start, "Tool called: database_xrevrange");
let tools = self.get_database_tools().await?;
let result = tools
.xrevrange(¶ms.key, ¶ms.end, ¶ms.start, params.count)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Get the number of entries in a stream (XLEN command). Returns the count of entries currently in the stream."
)]
async fn database_xlen(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_xlen");
let tools = self.get_database_tools().await?;
let result = tools
.xlen(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"length": result
})
.to_string(),
)]))
}
#[tool(
description = "Get detailed information about a stream (XINFO STREAM command). Returns metadata including length, first/last entry IDs, consumer groups, and optionally full entry data. Essential for monitoring and debugging streams."
)]
async fn database_xinfo_stream(
&self,
Parameters(params): Parameters<XinfoStreamParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, full = params.full, "Tool called: database_xinfo_stream");
let tools = self.get_database_tools().await?;
let result = tools
.xinfo_stream(¶ms.key, params.full, params.count)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Get information about consumer groups on a stream (XINFO GROUPS command). Returns details for each group including name, consumers count, pending messages, and last delivered ID."
)]
async fn database_xinfo_groups(
&self,
Parameters(params): Parameters<DatabaseKeyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, "Tool called: database_xinfo_groups");
let tools = self.get_database_tools().await?;
let result = tools
.xinfo_groups(¶ms.key)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Get information about consumers in a group (XINFO CONSUMERS command). Returns details for each consumer including name, pending messages count, and idle time."
)]
async fn database_xinfo_consumers(
&self,
Parameters(params): Parameters<XinfoConsumersParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, "Tool called: database_xinfo_consumers");
let tools = self.get_database_tools().await?;
let result = tools
.xinfo_consumers(¶ms.key, ¶ms.group)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Create a consumer group on a stream (XGROUP CREATE command). Consumer groups enable multiple consumers to cooperatively process stream entries, with automatic load balancing and message acknowledgment. Use mkstream=true to create the stream if it doesn't exist."
)]
async fn database_xgroup_create(
&self,
Parameters(params): Parameters<XgroupCreateParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, id = %params.id, "Tool called: database_xgroup_create");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XGROUP CREATE is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
tools
.xgroup_create(¶ms.key, ¶ms.group, ¶ms.id, params.mkstream)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"group": params.group,
"status": "created"
})
.to_string(),
)]))
}
#[tool(
description = "Destroy a consumer group (XGROUP DESTROY command). Removes the group and all its consumers. Pending messages are lost. Use with caution in production."
)]
async fn database_xgroup_destroy(
&self,
Parameters(params): Parameters<XgroupDestroyParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, "Tool called: database_xgroup_destroy");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XGROUP DESTROY is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xgroup_destroy(¶ms.key, ¶ms.group)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"group": params.group,
"destroyed": result == 1
})
.to_string(),
)]))
}
#[tool(
description = "Delete a consumer from a group (XGROUP DELCONSUMER command). Returns the number of pending messages that were owned by the consumer. The pending messages become unassigned."
)]
async fn database_xgroup_delconsumer(
&self,
Parameters(params): Parameters<XgroupDelconsumerParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, consumer = %params.consumer, "Tool called: database_xgroup_delconsumer");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XGROUP DELCONSUMER is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xgroup_delconsumer(¶ms.key, ¶ms.group, ¶ms.consumer)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"group": params.group,
"consumer": params.consumer,
"pending_messages_released": result
})
.to_string(),
)]))
}
#[tool(
description = "Set the last delivered ID of a consumer group (XGROUP SETID command). Useful for resetting a group to reprocess messages or skip ahead."
)]
async fn database_xgroup_setid(
&self,
Parameters(params): Parameters<XgroupSetidParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, id = %params.id, "Tool called: database_xgroup_setid");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XGROUP SETID is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
tools
.xgroup_setid(¶ms.key, ¶ms.group, ¶ms.id)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"group": params.group,
"id": params.id,
"status": "updated"
})
.to_string(),
)]))
}
#[tool(
description = "Read entries as a consumer in a group (XREADGROUP command). This is the primary way to consume streams with consumer groups. Use \">\" as the ID to get only new (undelivered) messages. Messages are added to the pending list until acknowledged with XACK."
)]
async fn database_xreadgroup(
&self,
Parameters(params): Parameters<XreadgroupParam>,
) -> Result<CallToolResult, RmcpError> {
info!(group = %params.group, consumer = %params.consumer, keys = ?params.keys, "Tool called: database_xreadgroup");
let tools = self.get_database_tools().await?;
let result = tools
.xreadgroup(
¶ms.group,
¶ms.consumer,
¶ms.keys,
¶ms.ids,
params.count,
params.block,
params.noack,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Acknowledge messages as processed (XACK command). Removes messages from the pending entries list. Essential for reliable stream processing - unacknowledged messages can be reclaimed by other consumers."
)]
async fn database_xack(
&self,
Parameters(params): Parameters<XackParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, ids = ?params.ids, "Tool called: database_xack");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XACK is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xack(¶ms.key, ¶ms.group, ¶ms.ids)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"group": params.group,
"acknowledged": result
})
.to_string(),
)]))
}
#[tool(
description = "Delete entries from a stream (XDEL command). Removes specific entries by ID. Note: Memory may not be immediately reclaimed due to stream's radix tree structure."
)]
async fn database_xdel(
&self,
Parameters(params): Parameters<XdelParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, ids = ?params.ids, "Tool called: database_xdel");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XDEL is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xdel(¶ms.key, ¶ms.ids)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"deleted": result
})
.to_string(),
)]))
}
#[tool(
description = "Trim a stream to a maximum length (XTRIM command). Removes oldest entries to cap stream size. Use approximate=true for better performance (may leave slightly more entries than specified)."
)]
async fn database_xtrim(
&self,
Parameters(params): Parameters<XtrimParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, maxlen = params.maxlen, "Tool called: database_xtrim");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XTRIM is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xtrim(¶ms.key, params.maxlen, params.approximate)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"key": params.key,
"trimmed": result
})
.to_string(),
)]))
}
#[tool(
description = "Get pending entries for a consumer group (XPENDING command). Without range parameters, returns a summary. With start/end/count, returns details about specific pending messages including their ID, consumer, idle time, and delivery count."
)]
async fn database_xpending(
&self,
Parameters(params): Parameters<XpendingParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, "Tool called: database_xpending");
let tools = self.get_database_tools().await?;
let result = tools
.xpending(
¶ms.key,
¶ms.group,
params.start.as_deref(),
params.end.as_deref(),
params.count,
params.consumer.as_deref(),
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Claim pending messages from another consumer (XCLAIM command). Transfers ownership of messages that have been idle for too long, enabling recovery from failed consumers. Returns the claimed messages."
)]
async fn database_xclaim(
&self,
Parameters(params): Parameters<XclaimParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, consumer = %params.consumer, "Tool called: database_xclaim");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XCLAIM is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xclaim(
¶ms.key,
¶ms.group,
¶ms.consumer,
params.min_idle_time,
¶ms.ids,
params.idle,
params.time,
params.retrycount,
params.force,
params.justid,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Auto-claim pending messages (XAUTOCLAIM command). Automatically scans and claims messages idle longer than min_idle_time. Simpler than XCLAIM - just specify a starting ID and the command finds and claims eligible messages."
)]
async fn database_xautoclaim(
&self,
Parameters(params): Parameters<XautoclaimParam>,
) -> Result<CallToolResult, RmcpError> {
info!(key = %params.key, group = %params.group, consumer = %params.consumer, "Tool called: database_xautoclaim");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"XAUTOCLAIM is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.xautoclaim(
¶ms.key,
¶ms.group,
¶ms.consumer,
params.min_idle_time,
¶ms.start,
params.count,
params.justid,
)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Publish a message to a channel (PUBLISH command). Sends a message to all subscribers of the channel. Returns the number of clients that received the message. Note: Pub/Sub is fire-and-forget - messages are not persisted. For durable messaging, use Streams instead."
)]
async fn database_publish(
&self,
Parameters(params): Parameters<PublishParam>,
) -> Result<CallToolResult, RmcpError> {
info!(channel = %params.channel, "Tool called: database_publish");
if self.config.read_only {
return Err(RmcpError::invalid_request(
"PUBLISH is a write operation. Server is in read-only mode.",
None,
));
}
let tools = self.get_database_tools().await?;
let result = tools
.publish(¶ms.channel, ¶ms.message)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"channel": params.channel,
"receivers": result
})
.to_string(),
)]))
}
#[tool(
description = "List active Pub/Sub channels (PUBSUB CHANNELS command). Returns channels that have at least one subscriber. Use pattern to filter channels (e.g., \"news.*\" matches \"news.sports\", \"news.weather\")."
)]
async fn database_pubsub_channels(
&self,
Parameters(params): Parameters<PubsubChannelsParam>,
) -> Result<CallToolResult, RmcpError> {
info!(pattern = ?params.pattern, "Tool called: database_pubsub_channels");
let tools = self.get_database_tools().await?;
let result = tools
.pubsub_channels(params.pattern.as_deref())
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"channels": result
})
.to_string(),
)]))
}
#[tool(
description = "Get subscriber count for specific channels (PUBSUB NUMSUB command). Returns the number of subscribers for each specified channel. Useful for monitoring channel popularity."
)]
async fn database_pubsub_numsub(
&self,
Parameters(params): Parameters<PubsubNumsubParam>,
) -> Result<CallToolResult, RmcpError> {
info!(channels = ?params.channels, "Tool called: database_pubsub_numsub");
let tools = self.get_database_tools().await?;
let result = tools
.pubsub_numsub(¶ms.channels)
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&value_to_json(&result))
.unwrap_or_else(|_| "null".to_string()),
)]))
}
#[tool(
description = "Get the number of pattern subscriptions (PUBSUB NUMPAT command). Returns the total count of clients subscribed to patterns (via PSUBSCRIBE). Does not count channel subscriptions."
)]
async fn database_pubsub_numpat(&self) -> Result<CallToolResult, RmcpError> {
info!("Tool called: database_pubsub_numpat");
let tools = self.get_database_tools().await?;
let result = tools
.pubsub_numpat()
.await
.map_err(|e| RmcpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(
serde_json::json!({
"pattern_subscriptions": result
})
.to_string(),
)]))
}
}
#[tool_handler]
impl ServerHandler for RedisCtlMcp {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ServerCapabilities::builder().enable_tools().build(),
server_info: Implementation::from_build_env(),
instructions: Some(
"Redis Cloud and Enterprise management tools. \
Use cloud_* tools for Redis Cloud operations and \
enterprise_* tools for Redis Enterprise operations. \
All tools are currently read-only."
.to_string(),
),
}
}
async fn initialize(
&self,
_request: InitializeRequestParam,
_context: RequestContext<RoleServer>,
) -> Result<InitializeResult, RmcpError> {
info!("MCP client connected, initializing session");
Ok(self.get_info())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_creation() {
let server = RedisCtlMcp::new(None, true, None);
assert!(server.is_ok());
let server = server.unwrap();
assert!(server.config().read_only);
assert!(server.config().profile.is_none());
assert!(server.config().database_url.is_none());
}
#[test]
fn test_server_with_profile() {
let server = RedisCtlMcp::new(Some("test-profile"), false, None);
assert!(server.is_ok());
let server = server.unwrap();
assert!(!server.config().read_only);
assert_eq!(server.config().profile, Some("test-profile".to_string()));
assert!(server.config().database_url.is_none());
}
#[test]
fn test_server_with_database_url() {
let server = RedisCtlMcp::new(None, true, Some("redis://localhost:6379"));
assert!(server.is_ok());
let server = server.unwrap();
assert!(server.config().read_only);
assert!(server.config().profile.is_none());
assert_eq!(
server.config().database_url,
Some("redis://localhost:6379".to_string())
);
}
}