#[cfg(feature = "redis")]
use std::collections::VecDeque;
use std::net::ToSocketAddrs;
use crate::commands::del::{self, Del};
use crate::commands::exists::{self, Exists};
use crate::commands::expire::{self, Expire};
use crate::commands::get::{self, Get};
use crate::commands::getex::{self, GetEx};
#[cfg(feature = "redis")]
use crate::commands::redis::{
self, RedisCommand as OptimizedRedisCommand, RedisCommandKind, RedisCommandRouteKeys,
RedisRespCommand, RedisResponse,
};
use crate::commands::resp::RespCommand;
use crate::commands::set::{self, Set};
use crate::commands::setex::{self, SetEx};
use crate::commands::ttl::{self, Ttl};
use crate::connection::ScnpConnection;
use crate::error::{Result, ShardCacheClientError};
#[cfg(feature = "redis")]
use crate::routing::ShardCacheRoute;
use crate::routing::{ShardCacheDirectRouter, ShardCacheRouteMode};
#[cfg(feature = "redis")]
#[derive(Debug, Clone, Copy)]
enum RedisPipelineResponse {
Native,
Resp,
}
#[derive(Debug)]
pub struct ShardCacheClient {
conn: ScnpConnection,
#[cfg(feature = "redis")]
redis_pipeline_responses: VecDeque<RedisPipelineResponse>,
}
impl ShardCacheClient {
pub fn connect(addr: impl ToSocketAddrs) -> Result<Self> {
Ok(Self {
conn: ScnpConnection::connect(addr)?,
#[cfg(feature = "redis")]
redis_pipeline_responses: VecDeque::new(),
})
}
pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
self.conn.execute(Get::new(key, out))
}
pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
self.conn.execute(Set::new(key, value))
}
pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
self.conn.execute(SetEx::new(key, value, ttl_ms))
}
pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
self.conn.execute(GetEx::new(key, ttl_ms, out))
}
pub fn del(&mut self, key: &[u8]) -> Result<bool> {
self.conn.execute(Del::new(key))
}
pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
self.conn.execute(Exists::new(key))
}
pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
self.conn.execute(Ttl::new(key))
}
pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
self.conn.execute(Expire::new(key, ttl_ms))
}
#[cfg(feature = "redis")]
pub fn redis(&mut self) -> crate::Redis<'_, Self> {
crate::Redis::new(self)
}
#[cfg(feature = "redis")]
pub fn redis_command(
&mut self,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<RedisResponse> {
self.conn.execute(OptimizedRedisCommand::new(command, args))
}
#[cfg(feature = "redis")]
pub fn redis_command_by_name(
&mut self,
command: &[u8],
args: &[&[u8]],
) -> Result<RedisResponse> {
match RedisCommandKind::from_name(command) {
Some(command) => self.redis_command(command, args),
None => self.redis_resp_command(command, args),
}
}
#[cfg(feature = "redis")]
pub fn redis_resp_command(&mut self, command: &[u8], args: &[&[u8]]) -> Result<RedisResponse> {
validate_redis_command_name(command)?;
self.conn.execute(RedisRespCommand::new(command, args))
}
pub fn resp_command_into(&mut self, parts: &[&[u8]], out: &mut Vec<u8>) -> Result<bool> {
self.conn.execute(RespCommand::new(parts, out))
}
pub fn scan_resp_into(&mut self, cursor: u64, count: usize, out: &mut Vec<u8>) -> Result<bool> {
let cursor = cursor.to_string();
let count = count.to_string();
self.resp_command_into(
&[b"SCNP.SCAN", cursor.as_bytes(), b"COUNT", count.as_bytes()],
out,
)
}
pub fn scan_shard_resp_into(
&mut self,
shard_id: usize,
cursor: u64,
count: usize,
out: &mut Vec<u8>,
) -> Result<bool> {
let shard_id = shard_id.to_string();
let cursor = cursor.to_string();
let count = count.to_string();
self.resp_command_into(
&[
b"SCNP.SCANSHARD",
shard_id.as_bytes(),
cursor.as_bytes(),
b"COUNT",
count.as_bytes(),
],
out,
)
}
pub fn begin_pipeline_get(&mut self, key: &[u8]) -> Result<()> {
get::write_request(&mut self.conn, None, key)
}
pub fn begin_pipeline_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
set::write_request(&mut self.conn, None, key, value)
}
pub fn begin_pipeline_set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
setex::write_request(&mut self.conn, None, key, value, ttl_ms)
}
pub fn begin_pipeline_get_ex(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
getex::write_request(&mut self.conn, None, key, ttl_ms)
}
pub fn begin_pipeline_del(&mut self, key: &[u8]) -> Result<()> {
del::write_request(&mut self.conn, None, key)
}
pub fn begin_pipeline_exists(&mut self, key: &[u8]) -> Result<()> {
exists::write_request(&mut self.conn, None, key)
}
pub fn begin_pipeline_ttl(&mut self, key: &[u8]) -> Result<()> {
ttl::write_request(&mut self.conn, None, key)
}
pub fn begin_pipeline_expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
expire::write_request(&mut self.conn, None, key, ttl_ms)
}
#[cfg(feature = "redis")]
pub fn begin_pipeline_redis_command(
&mut self,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<()> {
redis::write_request(&mut self.conn, command, None, args)?;
self.redis_pipeline_responses
.push_back(RedisPipelineResponse::Native);
Ok(())
}
#[cfg(feature = "redis")]
pub fn begin_pipeline_redis_command_by_name(
&mut self,
command: &[u8],
args: &[&[u8]],
) -> Result<()> {
match RedisCommandKind::from_name(command) {
Some(command) => self.begin_pipeline_redis_command(command, args),
None => self.begin_pipeline_redis_resp_command(command, args),
}
}
#[cfg(feature = "redis")]
pub fn begin_pipeline_redis_resp_command(
&mut self,
command: &[u8],
args: &[&[u8]],
) -> Result<()> {
validate_redis_command_name(command)?;
redis::write_resp_request(&mut self.conn, command, args)?;
self.redis_pipeline_responses
.push_back(RedisPipelineResponse::Resp);
Ok(())
}
pub fn flush_pipeline(&mut self) -> Result<()> {
self.conn.flush()
}
pub fn finish_pipeline_get_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
self.conn
.read_value(<Get as crate::commands::ScnpCommand>::NAME, out)
}
pub fn finish_pipeline_set(&mut self) -> Result<()> {
self.conn
.expect_ok(<Set as crate::commands::ScnpCommand>::NAME)
}
pub fn finish_pipeline_set_ex(&mut self) -> Result<()> {
self.conn
.expect_ok(<SetEx as crate::commands::ScnpCommand>::NAME)
}
pub fn finish_pipeline_get_ex_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
self.conn
.read_value(<GetEx as crate::commands::ScnpCommand>::NAME, out)
}
pub fn finish_pipeline_del(&mut self) -> Result<bool> {
self.conn
.read_integer(<Del as crate::commands::ScnpCommand>::NAME)
.map(|deleted| deleted != 0)
}
pub fn finish_pipeline_exists(&mut self) -> Result<bool> {
self.conn
.read_integer(<Exists as crate::commands::ScnpCommand>::NAME)
.map(|exists| exists != 0)
}
pub fn finish_pipeline_ttl(&mut self) -> Result<i64> {
self.conn
.read_integer(<Ttl as crate::commands::ScnpCommand>::NAME)
}
pub fn finish_pipeline_expire(&mut self) -> Result<bool> {
self.conn
.read_integer(<Expire as crate::commands::ScnpCommand>::NAME)
.map(|changed| changed != 0)
}
#[cfg(feature = "redis")]
pub fn finish_pipeline_redis_command(&mut self) -> Result<RedisResponse> {
match self
.redis_pipeline_responses
.pop_front()
.unwrap_or(RedisPipelineResponse::Native)
{
RedisPipelineResponse::Native => self.conn.read_native_redis_response("REDIS"),
RedisPipelineResponse::Resp => self.conn.read_resp_redis_response("RESP"),
}
}
}
impl ShardCacheDirectRouter {
pub fn connect_shard(&self, shard_id: usize) -> Result<ShardCacheDirectShardClient> {
Ok(ShardCacheDirectShardClient {
router: *self,
shard_id,
conn: ScnpConnection::connect(self.shard_addr(shard_id)?)?,
})
}
}
#[derive(Debug)]
pub struct ShardCacheDirectClient {
router: ShardCacheDirectRouter,
conns: Vec<ScnpConnection>,
}
impl ShardCacheDirectClient {
pub fn connect(addr: impl ToSocketAddrs, shard_count: usize) -> Result<Self> {
let router = ShardCacheDirectRouter::new(addr, shard_count)?;
Self::connect_with_router(router)
}
pub fn connect_with_route_mode(
addr: impl ToSocketAddrs,
shard_count: usize,
route_mode: ShardCacheRouteMode,
) -> Result<Self> {
let router = ShardCacheDirectRouter::new(addr, shard_count)?.with_route_mode(route_mode);
Self::connect_with_router(router)
}
fn connect_with_router(router: ShardCacheDirectRouter) -> Result<Self> {
let mut conns = Vec::with_capacity(router.shard_count());
for shard_id in 0..router.shard_count() {
conns.push(ScnpConnection::connect(router.shard_addr(shard_id)?)?);
}
Ok(Self { router, conns })
}
pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(Get::routed(route, key, out))
}
pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(Set::routed(route, key, value))
}
pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(SetEx::routed(route, key, value, ttl_ms))
}
pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(GetEx::routed(route, key, ttl_ms, out))
}
pub fn del(&mut self, key: &[u8]) -> Result<bool> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(Del::routed(route, key))
}
pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(Exists::routed(route, key))
}
pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(Ttl::routed(route, key))
}
pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
let route = self.router.route_key(key);
self.conns[route.shard_id].execute(Expire::routed(route, key, ttl_ms))
}
#[cfg(feature = "redis")]
pub fn redis(&mut self) -> crate::Redis<'_, Self> {
crate::Redis::new(self)
}
#[cfg(feature = "redis")]
pub fn redis_command(
&mut self,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<RedisResponse> {
let route = redis_direct_route(&self.router, command, args)?;
let shard_id = route.map_or(0, |route| route.shard_id);
self.conns[shard_id].execute(OptimizedRedisCommand::routed(command, route, args))
}
#[cfg(feature = "redis")]
pub fn redis_command_by_name(
&mut self,
command: &[u8],
args: &[&[u8]],
) -> Result<RedisResponse> {
self.redis_command(redis_command_kind_from_name(command)?, args)
}
pub fn scan_shard_resp_into(
&mut self,
shard_id: usize,
cursor: u64,
count: usize,
out: &mut Vec<u8>,
) -> Result<bool> {
if shard_id >= self.conns.len() {
return Err(ShardCacheClientError::Config(format!(
"shard {shard_id} is outside configured shard count {}",
self.conns.len()
)));
}
let shard_id_text = shard_id.to_string();
let cursor = cursor.to_string();
let count = count.to_string();
self.conns[shard_id].execute(RespCommand::new(
&[
b"SCNP.SCANSHARD",
shard_id_text.as_bytes(),
cursor.as_bytes(),
b"COUNT",
count.as_bytes(),
],
out,
))
}
}
#[derive(Debug)]
pub struct ShardCacheDirectShardClient {
router: ShardCacheDirectRouter,
shard_id: usize,
conn: ScnpConnection,
}
impl ShardCacheDirectShardClient {
pub fn shard_id(&self) -> usize {
self.shard_id
}
pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
let route = self.checked_route(key)?;
self.conn.execute(Get::routed(route, key, out))
}
pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
let route = self.checked_route(key)?;
self.conn.execute(Set::routed(route, key, value))
}
pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
let route = self.checked_route(key)?;
self.conn.execute(SetEx::routed(route, key, value, ttl_ms))
}
pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
let route = self.checked_route(key)?;
self.conn.execute(GetEx::routed(route, key, ttl_ms, out))
}
pub fn del(&mut self, key: &[u8]) -> Result<bool> {
let route = self.checked_route(key)?;
self.conn.execute(Del::routed(route, key))
}
pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
let route = self.checked_route(key)?;
self.conn.execute(Exists::routed(route, key))
}
pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
let route = self.checked_route(key)?;
self.conn.execute(Ttl::routed(route, key))
}
pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
let route = self.checked_route(key)?;
self.conn.execute(Expire::routed(route, key, ttl_ms))
}
#[cfg(feature = "redis")]
pub fn redis(&mut self) -> crate::Redis<'_, Self> {
crate::Redis::new(self)
}
#[cfg(feature = "redis")]
pub fn redis_command(
&mut self,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<RedisResponse> {
let route = redis_direct_shard_route(&self.router, self.shard_id, command, args)?;
self.conn
.execute(OptimizedRedisCommand::routed(command, route, args))
}
#[cfg(feature = "redis")]
pub fn redis_command_by_name(
&mut self,
command: &[u8],
args: &[&[u8]],
) -> Result<RedisResponse> {
self.redis_command(redis_command_kind_from_name(command)?, args)
}
pub fn scan_resp_into(&mut self, cursor: u64, count: usize, out: &mut Vec<u8>) -> Result<bool> {
let shard_id = self.shard_id.to_string();
let cursor = cursor.to_string();
let count = count.to_string();
self.conn.execute(RespCommand::new(
&[
b"SCNP.SCANSHARD",
shard_id.as_bytes(),
cursor.as_bytes(),
b"COUNT",
count.as_bytes(),
],
out,
))
}
pub fn begin_pipeline_get(&mut self, key: &[u8]) -> Result<()> {
let route = self.checked_route(key)?;
get::write_request(&mut self.conn, Some(route), key)
}
pub fn begin_pipeline_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
let route = self.checked_route(key)?;
set::write_request(&mut self.conn, Some(route), key, value)
}
pub fn begin_pipeline_set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
let route = self.checked_route(key)?;
setex::write_request(&mut self.conn, Some(route), key, value, ttl_ms)
}
pub fn begin_pipeline_get_ex(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
let route = self.checked_route(key)?;
getex::write_request(&mut self.conn, Some(route), key, ttl_ms)
}
pub fn begin_pipeline_del(&mut self, key: &[u8]) -> Result<()> {
let route = self.checked_route(key)?;
del::write_request(&mut self.conn, Some(route), key)
}
pub fn begin_pipeline_exists(&mut self, key: &[u8]) -> Result<()> {
let route = self.checked_route(key)?;
exists::write_request(&mut self.conn, Some(route), key)
}
pub fn begin_pipeline_ttl(&mut self, key: &[u8]) -> Result<()> {
let route = self.checked_route(key)?;
ttl::write_request(&mut self.conn, Some(route), key)
}
pub fn begin_pipeline_expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
let route = self.checked_route(key)?;
expire::write_request(&mut self.conn, Some(route), key, ttl_ms)
}
#[cfg(feature = "redis")]
pub fn begin_pipeline_redis_command(
&mut self,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<()> {
let route = redis_direct_shard_route(&self.router, self.shard_id, command, args)?;
redis::write_request(&mut self.conn, command, route, args)
}
#[cfg(feature = "redis")]
pub fn begin_pipeline_redis_command_by_name(
&mut self,
command: &[u8],
args: &[&[u8]],
) -> Result<()> {
self.begin_pipeline_redis_command(redis_command_kind_from_name(command)?, args)
}
pub fn flush_pipeline(&mut self) -> Result<()> {
self.conn.flush()
}
pub fn finish_pipeline_get_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
self.conn
.read_value(<Get as crate::commands::ScnpCommand>::NAME, out)
}
pub fn finish_pipeline_set(&mut self) -> Result<()> {
self.conn
.expect_ok(<Set as crate::commands::ScnpCommand>::NAME)
}
pub fn finish_pipeline_set_ex(&mut self) -> Result<()> {
self.conn
.expect_ok(<SetEx as crate::commands::ScnpCommand>::NAME)
}
pub fn finish_pipeline_get_ex_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
self.conn
.read_value(<GetEx as crate::commands::ScnpCommand>::NAME, out)
}
pub fn finish_pipeline_del(&mut self) -> Result<bool> {
self.conn
.read_integer(<Del as crate::commands::ScnpCommand>::NAME)
.map(|deleted| deleted != 0)
}
pub fn finish_pipeline_exists(&mut self) -> Result<bool> {
self.conn
.read_integer(<Exists as crate::commands::ScnpCommand>::NAME)
.map(|exists| exists != 0)
}
pub fn finish_pipeline_ttl(&mut self) -> Result<i64> {
self.conn
.read_integer(<Ttl as crate::commands::ScnpCommand>::NAME)
}
pub fn finish_pipeline_expire(&mut self) -> Result<bool> {
self.conn
.read_integer(<Expire as crate::commands::ScnpCommand>::NAME)
.map(|changed| changed != 0)
}
#[cfg(feature = "redis")]
pub fn finish_pipeline_redis_command(&mut self) -> Result<RedisResponse> {
self.conn.read_native_redis_response("REDIS")
}
fn checked_route(&self, key: &[u8]) -> Result<crate::routing::ShardCacheRoute> {
let route = self.router.route_key(key);
if route.shard_id != self.shard_id {
return Err(ShardCacheClientError::Config(format!(
"key routes to shard {}, but client is connected to shard {}",
route.shard_id, self.shard_id
)));
}
Ok(route)
}
}
#[cfg(feature = "redis")]
fn redis_command_kind_from_name(command: &[u8]) -> Result<RedisCommandKind> {
RedisCommandKind::from_name(command).ok_or_else(|| {
ShardCacheClientError::Config(format!(
"Redis command `{}` is not available on direct SCNP shard clients; use ShardCacheClient on the fanout listener for command-name fallback",
String::from_utf8_lossy(command)
))
})
}
#[cfg(feature = "redis")]
fn validate_redis_command_name(command: &[u8]) -> Result<()> {
if command.is_empty() {
return Err(ShardCacheClientError::Config(
"Redis command name cannot be empty".into(),
));
}
if command.iter().any(|byte| byte.is_ascii_whitespace()) {
return Err(ShardCacheClientError::Config(format!(
"Redis command name cannot contain whitespace: `{}`",
String::from_utf8_lossy(command)
)));
}
Ok(())
}
#[cfg(feature = "redis")]
fn redis_direct_route(
router: &ShardCacheDirectRouter,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<Option<ShardCacheRoute>> {
let keys = match command.route_keys(args) {
RedisCommandRouteKeys::None => return Ok(None),
RedisCommandRouteKeys::AllShards => {
return Err(ShardCacheClientError::Config(format!(
"{} requires all shards; use ShardCacheClient on the fanout listener",
command.name()
)));
}
RedisCommandRouteKeys::Keys(keys) if keys.is_empty() => return Ok(None),
RedisCommandRouteKeys::Keys(keys) => keys,
};
let first_route = router.route_key(keys[0]);
for key in keys.iter().skip(1) {
let route = router.route_key(key);
if route.shard_id != first_route.shard_id {
return Err(ShardCacheClientError::Config(format!(
"{} keys span multiple direct shards",
command.name()
)));
}
}
Ok(Some(first_route))
}
#[cfg(feature = "redis")]
fn redis_direct_shard_route(
router: &ShardCacheDirectRouter,
shard_id: usize,
command: RedisCommandKind,
args: &[&[u8]],
) -> Result<Option<ShardCacheRoute>> {
let route = redis_direct_route(router, command, args)?;
if let Some(route) = route
&& route.shard_id != shard_id
{
return Err(ShardCacheClientError::Config(format!(
"{} routes to shard {}, but client is connected to shard {}",
command.name(),
route.shard_id,
shard_id
)));
}
Ok(route)
}