Skip to main content

dyolo_kya_redis/
lib.rs

1//! # dyolo-kya-redis
2//!
3//! Production Redis storage backends for [dyolo-kya](https://docs.rs/dyolo-kya).
4//!
5//! | Type | Trait | Description |
6//! |------|-------|-------------|
7//! | [`RedisRevocationStore`] | `AsyncRevocationStore` | Stores revoked cert fingerprints with optional TTL |
8//! | [`RedisNonceStore`] | `AsyncNonceStore` | Stores consumed nonces with TTL tied to max cert lifetime |
9//!
10//! Both types use a connection pool (`deadpool-redis`) for production throughput
11//! and are safe to share across Tokio tasks via `Arc`.
12//!
13//! ## Quick start
14//!
15//! ```rust,no_run
16//! use std::sync::Arc;
17//! use std::time::Duration;
18//! use dyolo_kya_redis::{RedisRevocationStore, RedisNonceStore};
19//!
20//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
21//! let rev = Arc::new(
22//!     RedisRevocationStore::connect("redis://127.0.0.1/", "kya:rev", None).await?
23//! );
24//! let nonces = Arc::new(
25//!     RedisNonceStore::connect("redis://127.0.0.1/", "kya:nonce", Duration::from_secs(7200)).await?
26//! );
27//! # Ok(())
28//! # }
29//! ```
30//!
31//! ## Key naming
32//!
33//! Both stores prefix all Redis keys with a configurable namespace:
34//! - Revocation: `{namespace}:{hex-fingerprint}` → value `1`, no expiry by default
35//! - Nonces:     `{namespace}:{hex-nonce}` → value `1`, expiry = `nonce_ttl_secs`
36//!
37//! ## Distributed TTL strategy
38//!
39//! Set `nonce_ttl_secs` to:
40//! ```text
41//! max_cert_lifetime_secs + max_clock_drift_secs + safety_margin_secs
42//! ```
43
44use async_trait::async_trait;
45use deadpool_redis::{Config, Pool, Runtime};
46use dyolo_kya::error::{KyaStorageError, StorageErrorKind};
47use dyolo_kya::registry::r#async::{AsyncNonceStore, AsyncRevocationStore};
48use redis::AsyncCommands;
49
50// ── Connection helpers ────────────────────────────────────────────────────────
51
52fn redis_err_to_storage(e: redis::RedisError) -> KyaStorageError {
53    use redis::ErrorKind::*;
54    let kind = match e.kind() {
55        IoError | BusyLoadingError | TryAgain | NotBusy | MasterDown => StorageErrorKind::Transient,
56        _ => StorageErrorKind::Permanent, // ResponseError is permanent (e.g. wrong type/args)
57    };
58    KyaStorageError {
59        kind,
60        message: e.to_string(),
61    }
62}
63
64fn pool_err_to_storage(e: deadpool_redis::PoolError) -> KyaStorageError {
65    KyaStorageError::transient(format!("Redis pool error: {e}"))
66}
67
68// ── RedisRevocationStore ──────────────────────────────────────────────────────
69
70/// Redis-backed [`AsyncRevocationStore`] with optional per-key TTL.
71///
72/// Each revoked fingerprint is stored as `"{namespace}:{hex-fingerprint}"` with value `1`.
73///
74/// - **`None` TTL** (default): keys persist until explicitly deleted.
75/// - **`Some(secs)` TTL**: keys auto-expire after `secs` seconds.
76pub struct RedisRevocationStore {
77    pool: Pool,
78    namespace: String,
79    ttl_secs: Option<u64>,
80}
81
82impl RedisRevocationStore {
83    /// Connect to Redis and create a connection pool.
84    pub async fn connect(
85        url: &str,
86        namespace: &str,
87        ttl: Option<std::time::Duration>,
88    ) -> Result<Self, KyaStorageError> {
89        let cfg = Config::from_url(url);
90        let pool = cfg
91            .create_pool(Some(Runtime::Tokio1))
92            .map_err(|e| KyaStorageError::permanent(format!("Redis pool creation failed: {e}")))?;
93        let mut conn = pool.get().await.map_err(pool_err_to_storage)?;
94        let _: String = redis::cmd("PING")
95            .query_async(&mut conn)
96            .await
97            .map_err(redis_err_to_storage)?;
98        Ok(Self {
99            pool,
100            namespace: namespace.to_owned(),
101            ttl_secs: ttl.map(|d| d.as_secs()),
102        })
103    }
104
105    fn key(&self, fingerprint: &[u8; 32]) -> String {
106        format!("{}:{}", self.namespace, hex::encode(fingerprint))
107    }
108
109    /// Admin: List all currently revoked certificate fingerprints in this namespace.
110    pub async fn list_revoked(&self) -> Result<Vec<String>, KyaStorageError> {
111        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
112        let pattern = format!("{}:*", self.namespace);
113        let keys: Vec<String> = redis::cmd("KEYS")
114            .arg(&pattern)
115            .query_async(&mut conn)
116            .await
117            .map_err(redis_err_to_storage)?;
118
119        let prefix_len = self.namespace.len() + 1;
120        Ok(keys
121            .into_iter()
122            .map(|k| k.chars().skip(prefix_len).collect())
123            .collect())
124    }
125
126    /// Admin: Count the number of currently revoked certificates.
127    pub async fn count(&self) -> Result<usize, KyaStorageError> {
128        let keys = self.list_revoked().await?;
129        Ok(keys.len())
130    }
131
132    /// Health check endpoint integration.
133    pub async fn ping(&self) -> Result<(), KyaStorageError> {
134        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
135        let _: String = redis::cmd("PING")
136            .query_async(&mut conn)
137            .await
138            .map_err(redis_err_to_storage)?;
139        Ok(())
140    }
141}
142
143#[async_trait]
144impl AsyncRevocationStore for RedisRevocationStore {
145    async fn is_revoked(&self, fingerprint: &[u8; 32]) -> Result<bool, KyaStorageError> {
146        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
147        let exists: bool = conn
148            .exists(self.key(fingerprint))
149            .await
150            .map_err(redis_err_to_storage)?;
151        Ok(exists)
152    }
153
154    async fn revoke(&self, fingerprint: &[u8; 32]) -> Result<(), KyaStorageError> {
155        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
156        let key = self.key(fingerprint);
157        match self.ttl_secs {
158            Some(ttl) => conn
159                .set_ex::<_, _, ()>(key, 1u8, ttl)
160                .await
161                .map_err(redis_err_to_storage)?,
162            None => conn
163                .set::<_, _, ()>(key, 1u8)
164                .await
165                .map_err(redis_err_to_storage)?,
166        }
167        Ok(())
168    }
169
170    async fn revoke_batch(&self, fingerprints: &[[u8; 32]]) -> Result<(), KyaStorageError> {
171        if fingerprints.is_empty() {
172            return Ok(());
173        }
174
175        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
176        let mut pipe = redis::pipe();
177
178        for fp in fingerprints {
179            let key = self.key(fp);
180            if let Some(ttl) = self.ttl_secs {
181                pipe.set_ex(key, 1u8, ttl);
182            } else {
183                pipe.set(key, 1u8);
184            }
185        }
186
187        let _: () = pipe
188            .query_async(&mut conn)
189            .await
190            .map_err(redis_err_to_storage)?;
191        Ok(())
192    }
193}
194
195// ── RedisNonceStore ───────────────────────────────────────────────────────────
196
197/// Redis-backed [`AsyncNonceStore`] with mandatory per-key TTL.
198///
199/// Nonce consumption uses `SET NX PX <ttl_ms>` — a single atomic command that
200/// checks and sets in one round-trip, eliminating the TOCTOU gap present in any
201/// two-step check-then-set protocol.
202///
203/// `try_consume` returns `Ok(true)` when the nonce was fresh (now consumed) and
204/// `Ok(false)` when it was already consumed. Treat `false` as a replay attack.
205pub struct RedisNonceStore {
206    pool: Pool,
207    namespace: String,
208    nonce_ttl_ms: u64,
209}
210
211impl RedisNonceStore {
212    /// Connect to Redis and create a connection pool.
213    ///
214    /// - `url` — Redis connection URL
215    /// - `namespace` — key prefix (e.g. `"kya:nonce:prod"`)
216    /// - `ttl` — mandatory TTL for consumed nonces
217    pub async fn connect(
218        url: &str,
219        namespace: &str,
220        ttl: std::time::Duration,
221    ) -> Result<Self, KyaStorageError> {
222        let cfg = Config::from_url(url);
223        let pool = cfg
224            .create_pool(Some(Runtime::Tokio1))
225            .map_err(|e| KyaStorageError::permanent(format!("Redis pool creation failed: {e}")))?;
226        let mut conn = pool.get().await.map_err(pool_err_to_storage)?;
227        let _: String = redis::cmd("PING")
228            .query_async(&mut conn)
229            .await
230            .map_err(redis_err_to_storage)?;
231        Ok(Self {
232            pool,
233            namespace: namespace.to_owned(),
234            nonce_ttl_ms: ttl.as_millis() as u64,
235        })
236    }
237
238    fn key(&self, nonce: &[u8; 16]) -> String {
239        format!("{}:{}", self.namespace, hex::encode(nonce))
240    }
241
242    /// Admin: Count the number of currently active consumed nonces.
243    pub async fn count(&self) -> Result<usize, KyaStorageError> {
244        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
245        let pattern = format!("{}:*", self.namespace);
246        let keys: Vec<String> = redis::cmd("KEYS")
247            .arg(&pattern)
248            .query_async(&mut conn)
249            .await
250            .map_err(redis_err_to_storage)?;
251        Ok(keys.len())
252    }
253}
254
255#[async_trait]
256impl AsyncNonceStore for RedisNonceStore {
257    /// Atomically check and consume a batch of nonces using a Lua script.
258    ///
259    /// This ensures all nonces are evaluated in a single atomic transaction.
260    /// If any nonce already exists, the script returns 0 and no keys are set.
261    /// Otherwise, all nonces are set with the configured TTL and returns 1.
262    async fn try_consume_batch(&self, nonces: &[[u8; 16]]) -> Result<bool, KyaStorageError> {
263        if nonces.is_empty() {
264            return Ok(true);
265        }
266        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
267
268        let script = redis::Script::new(
269            r#"
270            for _, key in ipairs(KEYS) do
271                if redis.call('EXISTS', key) == 1 then
272                    return 0
273                end
274            end
275            for _, key in ipairs(KEYS) do
276                redis.call('SET', key, 1, 'PX', ARGV[1])
277            end
278            return 1
279        "#,
280        );
281
282        let mut inv = script.prepare_invoke();
283        for nonce in nonces {
284            inv.key(self.key(nonce));
285        }
286        inv.arg(self.nonce_ttl_ms);
287
288        let result: i32 = inv
289            .invoke_async(&mut conn)
290            .await
291            .map_err(redis_err_to_storage)?;
292        Ok(result == 1)
293    }
294
295    /// Atomically check and consume a nonce using `SET NX PX`.
296    ///
297    /// A single `SET key 1 NX PX <ttl_ms>` command is issued. Redis returns
298    /// `OK` (mapped to `true`) when the key did not exist and was set, or
299    /// `nil` (mapped to `false`) when the key already existed.
300    /// This is a single-roundtrip atomic operation — no TOCTOU window.
301    async fn try_consume(&self, nonce: &[u8; 16]) -> Result<bool, KyaStorageError> {
302        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
303        // SET NX PX returns the string "OK" on success, or nil on conflict.
304        // Mapping to Option<String>: Some(_) = newly set (fresh), None = already existed.
305        let result: Option<String> = redis::cmd("SET")
306            .arg(self.key(nonce))
307            .arg(1u8)
308            .arg("NX")
309            .arg("PX")
310            .arg(self.nonce_ttl_ms)
311            .query_async(&mut conn)
312            .await
313            .map_err(redis_err_to_storage)?;
314        Ok(result.is_some())
315    }
316
317    async fn is_consumed(&self, nonce: &[u8; 16]) -> Result<bool, KyaStorageError> {
318        let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
319        let exists: bool = conn
320            .exists(self.key(nonce))
321            .await
322            .map_err(redis_err_to_storage)?;
323        Ok(exists)
324    }
325
326    async fn mark_consumed(&self, nonce: &[u8; 16]) -> Result<(), KyaStorageError> {
327        self.try_consume(nonce).await.map(|_| ())
328    }
329}