1use fred::clients::Client;
2use fred::interfaces::KeysInterface;
3use fred::types::scripts::Script;
4use fred::types::{Expiration, SetOptions};
5use rustauth_core::error::RustAuthError;
6use rustauth_core::options::{SecondaryStorage, SecondaryStorageFuture};
7
8use crate::error::fred_error;
9use crate::store::connect_client;
10use crate::FredSecondaryStorageOptions;
11
12#[derive(Clone)]
13pub struct FredSecondaryStorage {
14 client: Client,
15 options: FredSecondaryStorageOptions,
16}
17
18impl FredSecondaryStorage {
19 pub async fn connect(url: &str) -> Result<Self, RustAuthError> {
20 Self::connect_with_options(url, FredSecondaryStorageOptions::default()).await
21 }
22
23 pub async fn connect_with_options(
24 url: &str,
25 options: FredSecondaryStorageOptions,
26 ) -> Result<Self, RustAuthError> {
27 let client = connect_client(url).await?;
28 Ok(Self::new(client, options))
29 }
30
31 pub fn new(client: Client, options: FredSecondaryStorageOptions) -> Self {
32 Self { client, options }
33 }
34
35 pub async fn list_keys(&self) -> Result<Vec<String>, RustAuthError> {
36 validate_secondary_storage_options(&self.options)?;
37 let secondary_prefix = self.secondary_prefix();
38 let pattern = secondary_storage_scan_pattern(&secondary_prefix);
39 let mut cursor = "0".to_owned();
40 let mut keys = Vec::new();
41
42 loop {
43 let (next_cursor, page): (String, Vec<String>) = self
44 .client
45 .scan_page(cursor, pattern.clone(), Some(self.options.scan_count), None)
46 .await
47 .map_err(|error| fred_error("secondary scan", error))?;
48 for key in page {
49 if let Some(unprefixed) = key.strip_prefix(secondary_prefix.as_str()) {
50 keys.push(unprefixed.to_owned());
51 }
52 }
53 if next_cursor == "0" {
54 break;
55 }
56 cursor = next_cursor;
57 }
58
59 Ok(keys)
60 }
61
62 pub async fn clear(&self) -> Result<(), RustAuthError> {
63 let keys = self
64 .list_keys()
65 .await?
66 .into_iter()
67 .map(|key| self.prefixed_key(&key))
68 .collect::<Result<Vec<_>, _>>()?;
69 if keys.is_empty() {
70 return Ok(());
71 }
72 self.client
73 .del::<u64, _>(keys)
74 .await
75 .map_err(|error| fred_error("secondary clear", error))?;
76 Ok(())
77 }
78
79 fn secondary_prefix(&self) -> String {
80 format!("{}secondary:", self.options.key_prefix)
81 }
82
83 fn prefixed_key(&self, key: &str) -> Result<String, RustAuthError> {
84 validate_key_prefix(&self.options.key_prefix)?;
85 Ok(format!("{}secondary:{key}", self.options.key_prefix))
86 }
87}
88
89impl SecondaryStorage for FredSecondaryStorage {
90 fn get<'a>(&'a self, key: &'a str) -> SecondaryStorageFuture<'a, Option<String>> {
91 Box::pin(async move {
92 self.client
93 .get::<Option<String>, _>(self.prefixed_key(key)?)
94 .await
95 .map_err(|error| fred_error("secondary get", error))
96 })
97 }
98
99 fn set<'a>(
100 &'a self,
101 key: &'a str,
102 value: String,
103 ttl_seconds: Option<u64>,
104 ) -> SecondaryStorageFuture<'a, ()> {
105 Box::pin(async move {
106 let redis_key = self.prefixed_key(key)?;
107 if ttl_seconds == Some(0) {
108 self.client
109 .del::<u64, _>(redis_key)
110 .await
111 .map_err(|error| fred_error("secondary set", error))?;
112 return Ok(());
113 }
114 let expire = ttl_seconds
115 .map(|ttl| {
116 i64::try_from(ttl).map(Expiration::EX).map_err(|_| {
117 RustAuthError::InvalidConfig(
118 "secondary storage ttl must fit in i64".to_owned(),
119 )
120 })
121 })
122 .transpose()?;
123 self.client
124 .set::<(), _, _>(redis_key, value, expire, None, false)
125 .await
126 .map_err(|error| fred_error("secondary set", error))
127 })
128 }
129
130 fn set_if_not_exists<'a>(
131 &'a self,
132 key: &'a str,
133 value: String,
134 ttl_seconds: Option<u64>,
135 ) -> SecondaryStorageFuture<'a, bool> {
136 Box::pin(async move {
137 let redis_key = self.prefixed_key(key)?;
138 if ttl_seconds == Some(0) {
139 return Ok(false);
140 }
141 let expire = ttl_seconds
142 .map(|ttl| {
143 i64::try_from(ttl).map(Expiration::EX).map_err(|_| {
144 RustAuthError::InvalidConfig(
145 "secondary storage ttl must fit in i64".to_owned(),
146 )
147 })
148 })
149 .transpose()?;
150 let created = self
151 .client
152 .set::<Option<String>, _, _>(redis_key, value, expire, Some(SetOptions::NX), false)
153 .await
154 .map_err(|error| fred_error("secondary set_if_not_exists", error))?;
155 Ok(created.is_some())
156 })
157 }
158
159 fn delete<'a>(&'a self, key: &'a str) -> SecondaryStorageFuture<'a, ()> {
160 Box::pin(async move {
161 self.client
162 .del::<u64, _>(self.prefixed_key(key)?)
163 .await
164 .map_err(|error| fred_error("secondary delete", error))?;
165 Ok(())
166 })
167 }
168
169 fn take<'a>(&'a self, key: &'a str) -> SecondaryStorageFuture<'a, Option<String>> {
170 Box::pin(async move {
171 self.client
172 .getdel::<Option<String>, _>(self.prefixed_key(key)?)
173 .await
174 .map_err(|error| fred_error("secondary take", error))
175 })
176 }
177
178 fn compare_and_set<'a>(
179 &'a self,
180 key: &'a str,
181 expected: Option<String>,
182 value: String,
183 ttl_seconds: Option<u64>,
184 ) -> SecondaryStorageFuture<'a, bool> {
185 Box::pin(async move {
186 if ttl_seconds == Some(0) {
187 return self.delete_if_value(key, expected).await;
188 }
189 let redis_key = self.prefixed_key(key)?;
190 let script = Script::from_lua(
191 r#"
192local current = redis.call("GET", KEYS[1])
193local expected_is_nil = ARGV[1]
194local expected = ARGV[2]
195if expected_is_nil == "1" then
196 if current ~= false then return 0 end
197else
198 if current ~= expected then return 0 end
199end
200if ARGV[4] == "" then
201 redis.call("SET", KEYS[1], ARGV[3])
202else
203 redis.call("SET", KEYS[1], ARGV[3], "EX", tonumber(ARGV[4]))
204end
205return 1
206"#,
207 );
208 let expected_is_nil = expected.is_none();
209 let expected = expected.unwrap_or_default();
210 let ttl = ttl_seconds.map(|ttl| ttl.to_string()).unwrap_or_default();
211 let applied: i64 = script
212 .evalsha_with_reload(
213 &self.client,
214 vec![redis_key],
215 vec![
216 if expected_is_nil { "1" } else { "0" }.to_owned(),
217 expected,
218 value,
219 ttl,
220 ],
221 )
222 .await
223 .map_err(|error| fred_error("secondary compare_and_set", error))?;
224 Ok(applied == 1)
225 })
226 }
227
228 fn delete_if_value<'a>(
229 &'a self,
230 key: &'a str,
231 expected: Option<String>,
232 ) -> SecondaryStorageFuture<'a, bool> {
233 Box::pin(async move {
234 let Some(expected) = expected else {
235 return Ok(false);
236 };
237 let redis_key = self.prefixed_key(key)?;
238 let script = Script::from_lua(
239 r#"
240if redis.call("GET", KEYS[1]) == ARGV[1] then
241 redis.call("DEL", KEYS[1])
242 return 1
243end
244return 0
245"#,
246 );
247 let deleted: i64 = script
248 .evalsha_with_reload(&self.client, vec![redis_key], vec![expected])
249 .await
250 .map_err(|error| fred_error("secondary delete_if_value", error))?;
251 Ok(deleted == 1)
252 })
253 }
254}
255
256fn secondary_storage_scan_pattern(prefix: &str) -> String {
257 let mut pattern = String::with_capacity(prefix.len() + 1);
258 for character in prefix.chars() {
259 match character {
260 '*' | '?' | '[' | ']' | '\\' => {
261 pattern.push('\\');
262 pattern.push(character);
263 }
264 _ => pattern.push(character),
265 }
266 }
267 pattern.push('*');
268 pattern
269}
270
271fn validate_key_prefix(prefix: &str) -> Result<(), RustAuthError> {
272 if prefix.is_empty() {
273 return Err(RustAuthError::InvalidConfig(
274 "secondary storage key prefix must not be empty".to_owned(),
275 ));
276 }
277 Ok(())
278}
279
280fn validate_secondary_storage_options(
281 options: &FredSecondaryStorageOptions,
282) -> Result<(), RustAuthError> {
283 validate_key_prefix(&options.key_prefix)?;
284 if options.scan_count == 0 {
285 return Err(RustAuthError::InvalidConfig(
286 "secondary storage scan count must be greater than zero".to_owned(),
287 ));
288 }
289 Ok(())
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn scan_pattern_escapes_redis_glob_metacharacters() {
298 assert_eq!(
299 secondary_storage_scan_pattern(r"tenant:*?[]\:"),
300 r"tenant:\*\?\[\]\\:*"
301 );
302 }
303
304 #[test]
305 fn scan_pattern_leaves_plain_prefixes_readable() {
306 assert_eq!(
307 secondary_storage_scan_pattern("rustauth:test:"),
308 "rustauth:test:*"
309 );
310 }
311
312 #[test]
313 fn secondary_scan_pattern_is_disjoint_from_rate_limit_namespace() {
314 let prefix = "rustauth:";
315 let secondary_pattern = secondary_storage_scan_pattern(&format!("{prefix}secondary:"));
316 assert_eq!(secondary_pattern, "rustauth:secondary:*");
317
318 let rate_limit_key = format!("{prefix}rate-limit:10.0.0.1|/sign-in");
319 assert!(
320 !rate_limit_key.starts_with("rustauth:secondary:"),
321 "co-located rate-limit keys must not share the secondary clear() scan prefix (OPE-37)"
322 );
323
324 let legacy_clear_pattern = secondary_storage_scan_pattern(prefix);
326 assert_eq!(legacy_clear_pattern, "rustauth:*");
327 assert!(rate_limit_key.starts_with(prefix));
328 }
329
330 #[test]
331 fn secondary_storage_matches_redis_secondary_namespace_layout() {
332 let storage = FredSecondaryStorage::new(
333 Client::default(),
334 FredSecondaryStorageOptions {
335 key_prefix: "test:".to_owned(),
336 scan_count: 100,
337 },
338 );
339
340 assert_eq!(
342 storage.prefixed_key("session:token").ok(),
343 Some("test:secondary:session:token".to_owned())
344 );
345 assert_eq!(storage.secondary_prefix(), "test:secondary:");
346 assert_eq!(
347 secondary_storage_scan_pattern(&storage.secondary_prefix()),
348 "test:secondary:*"
349 );
350 }
351
352 #[tokio::test]
353 async fn list_keys_rejects_empty_prefix_before_calling_redis() {
354 let storage = FredSecondaryStorage::new(
355 Client::default(),
356 FredSecondaryStorageOptions {
357 key_prefix: String::new(),
358 scan_count: 100,
359 },
360 );
361
362 assert!(matches!(
363 storage.list_keys().await,
364 Err(RustAuthError::InvalidConfig(message))
365 if message == "secondary storage key prefix must not be empty"
366 ));
367 }
368
369 #[tokio::test]
370 async fn list_keys_rejects_zero_scan_count_before_calling_redis() {
371 let storage = FredSecondaryStorage::new(
372 Client::default(),
373 FredSecondaryStorageOptions {
374 key_prefix: "rustauth:test:".to_owned(),
375 scan_count: 0,
376 },
377 );
378
379 assert!(matches!(
380 storage.list_keys().await,
381 Err(RustAuthError::InvalidConfig(message))
382 if message == "secondary storage scan count must be greater than zero"
383 ));
384 }
385
386 #[tokio::test]
387 async fn clear_rejects_empty_prefix_before_calling_redis() {
388 let storage = FredSecondaryStorage::new(
389 Client::default(),
390 FredSecondaryStorageOptions {
391 key_prefix: String::new(),
392 scan_count: 100,
393 },
394 );
395
396 assert!(matches!(
397 storage.clear().await,
398 Err(RustAuthError::InvalidConfig(message))
399 if message == "secondary storage key prefix must not be empty"
400 ));
401 }
402
403 #[tokio::test]
404 async fn clear_rejects_zero_scan_count_before_calling_redis() {
405 let storage = FredSecondaryStorage::new(
406 Client::default(),
407 FredSecondaryStorageOptions {
408 key_prefix: "rustauth:test:".to_owned(),
409 scan_count: 0,
410 },
411 );
412
413 assert!(matches!(
414 storage.clear().await,
415 Err(RustAuthError::InvalidConfig(message))
416 if message == "secondary storage scan count must be greater than zero"
417 ));
418 }
419
420 fn empty_prefix_storage() -> FredSecondaryStorage {
421 FredSecondaryStorage::new(
422 Client::default(),
423 FredSecondaryStorageOptions {
424 key_prefix: String::new(),
425 scan_count: 100,
426 },
427 )
428 }
429
430 #[tokio::test]
431 async fn get_rejects_empty_prefix_before_calling_redis() {
432 assert!(matches!(
433 empty_prefix_storage().get("session").await,
434 Err(RustAuthError::InvalidConfig(message))
435 if message == "secondary storage key prefix must not be empty"
436 ));
437 }
438
439 #[tokio::test]
440 async fn set_rejects_empty_prefix_before_calling_redis() {
441 assert!(matches!(
442 empty_prefix_storage().set("session", "value".to_owned(), None).await,
443 Err(RustAuthError::InvalidConfig(message))
444 if message == "secondary storage key prefix must not be empty"
445 ));
446 }
447
448 #[tokio::test]
449 async fn delete_rejects_empty_prefix_before_calling_redis() {
450 assert!(matches!(
451 empty_prefix_storage().delete("session").await,
452 Err(RustAuthError::InvalidConfig(message))
453 if message == "secondary storage key prefix must not be empty"
454 ));
455 }
456
457 #[tokio::test]
458 async fn take_rejects_empty_prefix_before_calling_redis() {
459 assert!(matches!(
460 empty_prefix_storage().take("session").await,
461 Err(RustAuthError::InvalidConfig(message))
462 if message == "secondary storage key prefix must not be empty"
463 ));
464 }
465}