1use async_trait::async_trait;
45use deadpool_redis::{Config, Pool, Runtime};
46use dyolo_kya::error::{KyaStorageError, StorageErrorKind};
47use dyolo_kya::registry::r#async::{AsyncNonceStore, AsyncRevocationStore};
48use redis::AsyncCommands;
49
50fn redis_err_to_storage(e: redis::RedisError) -> KyaStorageError {
53 use redis::ErrorKind::*;
54 let kind = match e.kind() {
55 IoError | BusyLoadingError | TryAgain | NotBusy | MasterDown => StorageErrorKind::Transient,
56 _ => StorageErrorKind::Permanent, };
58 KyaStorageError {
59 kind,
60 message: e.to_string(),
61 }
62}
63
64fn pool_err_to_storage(e: deadpool_redis::PoolError) -> KyaStorageError {
65 KyaStorageError::transient(format!("Redis pool error: {e}"))
66}
67
68pub struct RedisRevocationStore {
77 pool: Pool,
78 namespace: String,
79 ttl_secs: Option<u64>,
80}
81
82impl RedisRevocationStore {
83 pub async fn connect(
85 url: &str,
86 namespace: &str,
87 ttl: Option<std::time::Duration>,
88 ) -> Result<Self, KyaStorageError> {
89 let cfg = Config::from_url(url);
90 let pool = cfg
91 .create_pool(Some(Runtime::Tokio1))
92 .map_err(|e| KyaStorageError::permanent(format!("Redis pool creation failed: {e}")))?;
93 let mut conn = pool.get().await.map_err(pool_err_to_storage)?;
94 let _: String = redis::cmd("PING")
95 .query_async(&mut conn)
96 .await
97 .map_err(redis_err_to_storage)?;
98 Ok(Self {
99 pool,
100 namespace: namespace.to_owned(),
101 ttl_secs: ttl.map(|d| d.as_secs()),
102 })
103 }
104
105 fn key(&self, fingerprint: &[u8; 32]) -> String {
106 format!("{}:{}", self.namespace, hex::encode(fingerprint))
107 }
108
109 pub async fn list_revoked(&self) -> Result<Vec<String>, KyaStorageError> {
111 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
112 let pattern = format!("{}:*", self.namespace);
113 let keys: Vec<String> = redis::cmd("KEYS")
114 .arg(&pattern)
115 .query_async(&mut conn)
116 .await
117 .map_err(redis_err_to_storage)?;
118
119 let prefix_len = self.namespace.len() + 1;
120 Ok(keys
121 .into_iter()
122 .map(|k| k.chars().skip(prefix_len).collect())
123 .collect())
124 }
125
126 pub async fn count(&self) -> Result<usize, KyaStorageError> {
128 let keys = self.list_revoked().await?;
129 Ok(keys.len())
130 }
131
132 pub async fn ping(&self) -> Result<(), KyaStorageError> {
134 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
135 let _: String = redis::cmd("PING")
136 .query_async(&mut conn)
137 .await
138 .map_err(redis_err_to_storage)?;
139 Ok(())
140 }
141}
142
143#[async_trait]
144impl AsyncRevocationStore for RedisRevocationStore {
145 async fn is_revoked(&self, fingerprint: &[u8; 32]) -> Result<bool, KyaStorageError> {
146 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
147 let exists: bool = conn
148 .exists(self.key(fingerprint))
149 .await
150 .map_err(redis_err_to_storage)?;
151 Ok(exists)
152 }
153
154 async fn revoke(&self, fingerprint: &[u8; 32]) -> Result<(), KyaStorageError> {
155 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
156 let key = self.key(fingerprint);
157 match self.ttl_secs {
158 Some(ttl) => conn
159 .set_ex::<_, _, ()>(key, 1u8, ttl)
160 .await
161 .map_err(redis_err_to_storage)?,
162 None => conn
163 .set::<_, _, ()>(key, 1u8)
164 .await
165 .map_err(redis_err_to_storage)?,
166 }
167 Ok(())
168 }
169
170 async fn revoke_batch(&self, fingerprints: &[[u8; 32]]) -> Result<(), KyaStorageError> {
171 if fingerprints.is_empty() {
172 return Ok(());
173 }
174
175 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
176 let mut pipe = redis::pipe();
177
178 for fp in fingerprints {
179 let key = self.key(fp);
180 if let Some(ttl) = self.ttl_secs {
181 pipe.set_ex(key, 1u8, ttl);
182 } else {
183 pipe.set(key, 1u8);
184 }
185 }
186
187 let _: () = pipe
188 .query_async(&mut conn)
189 .await
190 .map_err(redis_err_to_storage)?;
191 Ok(())
192 }
193}
194
195pub struct RedisNonceStore {
206 pool: Pool,
207 namespace: String,
208 nonce_ttl_ms: u64,
209}
210
211impl RedisNonceStore {
212 pub async fn connect(
218 url: &str,
219 namespace: &str,
220 ttl: std::time::Duration,
221 ) -> Result<Self, KyaStorageError> {
222 let cfg = Config::from_url(url);
223 let pool = cfg
224 .create_pool(Some(Runtime::Tokio1))
225 .map_err(|e| KyaStorageError::permanent(format!("Redis pool creation failed: {e}")))?;
226 let mut conn = pool.get().await.map_err(pool_err_to_storage)?;
227 let _: String = redis::cmd("PING")
228 .query_async(&mut conn)
229 .await
230 .map_err(redis_err_to_storage)?;
231 Ok(Self {
232 pool,
233 namespace: namespace.to_owned(),
234 nonce_ttl_ms: ttl.as_millis() as u64,
235 })
236 }
237
238 fn key(&self, nonce: &[u8; 16]) -> String {
239 format!("{}:{}", self.namespace, hex::encode(nonce))
240 }
241
242 pub async fn count(&self) -> Result<usize, KyaStorageError> {
244 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
245 let pattern = format!("{}:*", self.namespace);
246 let keys: Vec<String> = redis::cmd("KEYS")
247 .arg(&pattern)
248 .query_async(&mut conn)
249 .await
250 .map_err(redis_err_to_storage)?;
251 Ok(keys.len())
252 }
253}
254
255#[async_trait]
256impl AsyncNonceStore for RedisNonceStore {
257 async fn try_consume_batch(&self, nonces: &[[u8; 16]]) -> Result<bool, KyaStorageError> {
263 if nonces.is_empty() {
264 return Ok(true);
265 }
266 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
267
268 let script = redis::Script::new(
269 r#"
270 for _, key in ipairs(KEYS) do
271 if redis.call('EXISTS', key) == 1 then
272 return 0
273 end
274 end
275 for _, key in ipairs(KEYS) do
276 redis.call('SET', key, 1, 'PX', ARGV[1])
277 end
278 return 1
279 "#,
280 );
281
282 let mut inv = script.prepare_invoke();
283 for nonce in nonces {
284 inv.key(self.key(nonce));
285 }
286 inv.arg(self.nonce_ttl_ms);
287
288 let result: i32 = inv
289 .invoke_async(&mut conn)
290 .await
291 .map_err(redis_err_to_storage)?;
292 Ok(result == 1)
293 }
294
295 async fn try_consume(&self, nonce: &[u8; 16]) -> Result<bool, KyaStorageError> {
302 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
303 let result: Option<String> = redis::cmd("SET")
306 .arg(self.key(nonce))
307 .arg(1u8)
308 .arg("NX")
309 .arg("PX")
310 .arg(self.nonce_ttl_ms)
311 .query_async(&mut conn)
312 .await
313 .map_err(redis_err_to_storage)?;
314 Ok(result.is_some())
315 }
316
317 async fn is_consumed(&self, nonce: &[u8; 16]) -> Result<bool, KyaStorageError> {
318 let mut conn = self.pool.get().await.map_err(pool_err_to_storage)?;
319 let exists: bool = conn
320 .exists(self.key(nonce))
321 .await
322 .map_err(redis_err_to_storage)?;
323 Ok(exists)
324 }
325
326 async fn mark_consumed(&self, nonce: &[u8; 16]) -> Result<(), KyaStorageError> {
327 self.try_consume(nonce).await.map(|_| ())
328 }
329}