use redis::aio::ConnectionManager;
use redis::AsyncCommands;
use rustauth_core::error::RustAuthError;
use rustauth_core::options::{SecondaryStorage, SecondaryStorageFuture};
use crate::url::{secondary_storage_scan_pattern, validate_key_prefix};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RedisSecondaryStorageOptions {
pub key_prefix: String,
pub scan_count: u32,
}
impl Default for RedisSecondaryStorageOptions {
fn default() -> Self {
Self {
key_prefix: "rustauth:".to_owned(),
scan_count: 100,
}
}
}
#[derive(Clone)]
pub struct RedisSecondaryStorage {
manager: ConnectionManager,
options: RedisSecondaryStorageOptions,
}
impl RedisSecondaryStorage {
pub async fn connect(redis_url: &str) -> Result<Self, RustAuthError> {
Self::connect_with_options(redis_url, RedisSecondaryStorageOptions::default()).await
}
pub async fn connect_with_options(
redis_url: &str,
options: RedisSecondaryStorageOptions,
) -> Result<Self, RustAuthError> {
let manager = crate::connect_manager(redis_url).await?;
Ok(Self::new(manager, options))
}
pub fn new(manager: ConnectionManager, options: RedisSecondaryStorageOptions) -> Self {
Self { manager, options }
}
pub async fn list_keys(&self) -> Result<Vec<String>, RustAuthError> {
validate_secondary_storage_options(&self.options)?;
let secondary_prefix = self.secondary_prefix();
let pattern = secondary_storage_scan_pattern(&secondary_prefix);
let physical_keys = scan_keys(&self.manager, &pattern, self.options.scan_count).await?;
let mut keys = Vec::new();
for key in physical_keys {
if let Some(unprefixed) = key.strip_prefix(secondary_prefix.as_str()) {
keys.push(unprefixed.to_owned());
}
}
Ok(keys)
}
pub async fn clear(&self) -> Result<(), RustAuthError> {
let keys = self
.list_keys()
.await?
.into_iter()
.map(|key| self.prefixed_key(&key))
.collect::<Result<Vec<_>, _>>()?;
if keys.is_empty() {
return Ok(());
}
let mut manager = self.manager.clone();
let _: usize = manager
.del(keys)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
Ok(())
}
fn secondary_prefix(&self) -> String {
format!("{}secondary:", self.options.key_prefix)
}
fn prefixed_key(&self, key: &str) -> Result<String, RustAuthError> {
validate_key_prefix(&self.options.key_prefix)?;
Ok(format!("{}secondary:{key}", self.options.key_prefix))
}
}
impl SecondaryStorage for RedisSecondaryStorage {
fn get<'a>(&'a self, key: &'a str) -> SecondaryStorageFuture<'a, Option<String>> {
Box::pin(async move {
let mut manager = self.manager.clone();
manager
.get(self.prefixed_key(key)?)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))
})
}
fn set<'a>(
&'a self,
key: &'a str,
value: String,
ttl_seconds: Option<u64>,
) -> SecondaryStorageFuture<'a, ()> {
Box::pin(async move {
let redis_key = self.prefixed_key(key)?;
let mut manager = self.manager.clone();
match ttl_seconds {
Some(0) => {
let _: usize = manager
.del(redis_key)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
}
Some(ttl_seconds) => {
let _: () = manager
.set_ex(redis_key, value, ttl_seconds)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
}
None => {
let _: () = manager
.set(redis_key, value)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
}
}
Ok(())
})
}
fn set_if_not_exists<'a>(
&'a self,
key: &'a str,
value: String,
ttl_seconds: Option<u64>,
) -> SecondaryStorageFuture<'a, bool> {
Box::pin(async move {
let redis_key = self.prefixed_key(key)?;
let mut manager = self.manager.clone();
if ttl_seconds == Some(0) {
return Ok(false);
}
let mut command = redis::cmd("SET");
command.arg(&redis_key).arg(value).arg("NX");
if let Some(ttl_seconds) = ttl_seconds {
command.arg("EX").arg(ttl_seconds);
}
let created: Option<String> = command
.query_async(&mut manager)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
Ok(created.is_some())
})
}
fn delete<'a>(&'a self, key: &'a str) -> SecondaryStorageFuture<'a, ()> {
Box::pin(async move {
let mut manager = self.manager.clone();
let _: usize = manager
.del(self.prefixed_key(key)?)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
Ok(())
})
}
fn take<'a>(&'a self, key: &'a str) -> SecondaryStorageFuture<'a, Option<String>> {
Box::pin(async move {
let mut manager = self.manager.clone();
redis::cmd("GETDEL")
.arg(self.prefixed_key(key)?)
.query_async(&mut manager)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))
})
}
fn compare_and_set<'a>(
&'a self,
key: &'a str,
expected: Option<String>,
value: String,
ttl_seconds: Option<u64>,
) -> SecondaryStorageFuture<'a, bool> {
Box::pin(async move {
let redis_key = self.prefixed_key(key)?;
let mut manager = self.manager.clone();
if ttl_seconds == Some(0) {
let deleted = self.delete_if_value(key, expected).await?;
return Ok(deleted);
}
let script = r#"
local current = redis.call("GET", KEYS[1])
local expected_is_nil = ARGV[1]
local expected = ARGV[2]
if expected_is_nil == "1" then
if current ~= false then return 0 end
else
if current ~= expected then return 0 end
end
if ARGV[4] == "" then
redis.call("SET", KEYS[1], ARGV[3])
else
redis.call("SET", KEYS[1], ARGV[3], "EX", tonumber(ARGV[4]))
end
return 1
"#;
let expected_is_nil = expected.is_none();
let expected = expected.unwrap_or_default();
let ttl = ttl_seconds.map(|ttl| ttl.to_string()).unwrap_or_default();
let applied: i64 = redis::cmd("EVAL")
.arg(script)
.arg(1)
.arg(redis_key)
.arg(if expected_is_nil { "1" } else { "0" })
.arg(expected)
.arg(value)
.arg(ttl)
.query_async(&mut manager)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
Ok(applied == 1)
})
}
fn delete_if_value<'a>(
&'a self,
key: &'a str,
expected: Option<String>,
) -> SecondaryStorageFuture<'a, bool> {
Box::pin(async move {
let Some(expected) = expected else {
return Ok(false);
};
let redis_key = self.prefixed_key(key)?;
let mut manager = self.manager.clone();
let script = r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("DEL", KEYS[1])
return 1
end
return 0
"#;
let deleted: i64 = redis::cmd("EVAL")
.arg(script)
.arg(1)
.arg(redis_key)
.arg(expected)
.query_async(&mut manager)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
Ok(deleted == 1)
})
}
}
fn validate_secondary_storage_options(
options: &RedisSecondaryStorageOptions,
) -> Result<(), RustAuthError> {
validate_key_prefix(&options.key_prefix)?;
if options.scan_count == 0 {
return Err(RustAuthError::InvalidConfig(
"secondary storage scan count must be greater than zero".to_owned(),
));
}
Ok(())
}
async fn scan_keys(
manager: &ConnectionManager,
pattern: &str,
count: u32,
) -> Result<Vec<String>, RustAuthError> {
let mut conn = manager.clone();
let mut cursor = 0u64;
let mut keys = Vec::new();
loop {
let (next_cursor, page): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(pattern)
.arg("COUNT")
.arg(count)
.query_async(&mut conn)
.await
.map_err(|error| RustAuthError::Adapter(error.to_string()))?;
keys.extend(page);
if next_cursor == 0 {
break;
}
cursor = next_cursor;
}
Ok(keys)
}
#[cfg(test)]
mod tests {
use super::*;
use rustauth_core::error::RustAuthError;
#[test]
fn list_keys_rejects_empty_prefix() {
let options = RedisSecondaryStorageOptions {
key_prefix: String::new(),
scan_count: 100,
};
assert!(matches!(
validate_secondary_storage_options(&options),
Err(RustAuthError::InvalidConfig(message))
if message == "secondary storage key prefix must not be empty"
));
}
#[test]
fn list_keys_rejects_zero_scan_count() {
let options = RedisSecondaryStorageOptions {
key_prefix: "rustauth:".to_owned(),
scan_count: 0,
};
assert!(matches!(
validate_secondary_storage_options(&options),
Err(RustAuthError::InvalidConfig(message))
if message == "secondary storage scan count must be greater than zero"
));
}
}