use std::time::Duration;
use bytes::Bytes;
use redis::Client;
use redis::sentinel::Sentinel;
use tracing::{debug, info};
use crate::Cache as CacheTrait;
use crate::utils::namespaced;
#[derive(Clone)]
pub struct RedisCache {
manager: redis::aio::ConnectionManager,
prefix: String,
}
impl RedisCache {
pub async fn connect(url: &str, prefix: &str) -> anyhow::Result<Self> {
Self::connect_with_name(url, prefix, None).await
}
pub async fn connect_with_name(
url: &str,
prefix: &str,
connection_name: Option<&str>,
) -> anyhow::Result<Self> {
let client = Client::open(url)?;
let mut manager = client.get_connection_manager().await?;
if let Some(name) = connection_name {
redis::cmd("CLIENT")
.arg("SETNAME")
.arg(name)
.query_async::<()>(&mut manager)
.await?;
}
Ok(Self {
manager,
prefix: prefix.to_owned(),
})
}
pub async fn connect_sentinel(
sentinel_urls: &[&str],
master_name: &str,
prefix: &str,
connection_name: Option<&str>,
) -> anyhow::Result<Self> {
info!(
sentinel_urls = ?sentinel_urls,
master_name = %master_name,
"Connecting to Redis via Sentinel"
);
let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
let client = sentinel.async_master_for(master_name, None).await?;
let mut manager = client.get_connection_manager().await?;
if let Some(name) = connection_name {
redis::cmd("CLIENT")
.arg("SETNAME")
.arg(name)
.query_async::<()>(&mut manager)
.await?;
}
info!(master_name = %master_name, "Redis Sentinel connected to master");
Ok(Self {
manager,
prefix: prefix.to_owned(),
})
}
#[inline]
fn namespaced(&self, key: &[u8]) -> Bytes {
namespaced(&self.prefix, key)
}
pub async fn scan_keys(&self) -> anyhow::Result<Vec<String>> {
let mut conn = self.manager.clone();
let pattern = format!("{}*", self.prefix);
let mut keys = Vec::new();
let mut cursor: i64 = 0;
info!(target: "mx-cache", pattern = %pattern, "starting Redis SCAN");
loop {
let (new_cursor, batch): (i64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(1000)
.query_async(&mut conn)
.await?;
let batch_len = batch.len();
for key in batch {
if let Some(stripped) = key.strip_prefix(&self.prefix) {
keys.push(stripped.to_owned());
}
}
cursor = new_cursor;
debug!(target: "mx-cache", cursor = cursor, batch_size = batch_len, total = keys.len(), "SCAN iteration");
if cursor == 0 {
break;
}
}
info!(target: "mx-cache", total_found = keys.len(), "Redis SCAN complete");
Ok(keys)
}
}
impl std::fmt::Debug for RedisCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisCache")
.field("prefix", &self.prefix)
.finish()
}
}
impl CacheTrait for RedisCache {
fn set_nx_px(
&self,
key: &[u8],
value: &[u8],
ttl: Duration,
) -> impl Future<Output = anyhow::Result<bool>> + Send {
let ttl_ms = ttl.as_millis() as u64;
let ns = self.namespaced(key);
let value = Bytes::copy_from_slice(value);
let mut manager = self.manager.clone();
async move {
let result: Option<String> = redis::cmd("SET")
.arg(ns.as_ref())
.arg(value.as_ref())
.arg("PX")
.arg(ttl_ms)
.arg("NX")
.query_async(&mut manager)
.await?;
Ok(result.is_some())
}
}
fn set(
&self,
key: &[u8],
value: &[u8],
ttl: Duration,
) -> impl Future<Output = anyhow::Result<()>> + Send {
let ttl_ms = ttl.as_millis() as u64;
let ns = self.namespaced(key);
let value = Bytes::copy_from_slice(value);
let mut manager = self.manager.clone();
async move {
let _: () = redis::cmd("SET")
.arg(ns.as_ref())
.arg(value.as_ref())
.arg("PX")
.arg(ttl_ms)
.query_async(&mut manager)
.await?;
Ok(())
}
}
fn get(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send {
let ns = self.namespaced(key);
let mut manager = self.manager.clone();
async move {
let result: Option<Vec<u8>> = redis::cmd("GET")
.arg(ns.as_ref())
.query_async(&mut manager)
.await?;
Ok(result)
}
}
fn del(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<()>> + Send {
let ns = self.namespaced(key);
let mut manager = self.manager.clone();
async move {
let _: () = redis::cmd("DEL")
.arg(ns.as_ref())
.query_async(&mut manager)
.await?;
Ok(())
}
}
}