1use async_trait::async_trait;
25use redis::aio::ConnectionManager;
26use redis::{Client, AsyncCommands, pipe, cmd};
27use crate::sync_item::{SyncItem, ContentType};
28use super::traits::{BatchWriteResult, CacheStore, StorageError};
29use crate::resilience::retry::{retry, RetryConfig};
30
31pub struct RedisStore {
32 connection: ConnectionManager,
33 prefix: String,
35}
36
37impl RedisStore {
38 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
40 Self::with_prefix(connection_string, None).await
41 }
42
43 pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
59 let client = Client::open(connection_string)
60 .map_err(|e| StorageError::Backend(e.to_string()))?;
61
62 let connection = retry("redis_connect", &RetryConfig::startup(), || async {
64 ConnectionManager::new(client.clone()).await
65 })
66 .await
67 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
68
69 Ok(Self {
70 connection,
71 prefix: prefix.unwrap_or("").to_string(),
72 })
73 }
74
75 #[inline]
77 fn prefixed_key(&self, key: &str) -> String {
78 if self.prefix.is_empty() {
79 key.to_string()
80 } else {
81 format!("{}{}", self.prefix, key)
82 }
83 }
84
85 #[inline]
88 #[allow(dead_code)]
89 fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
90 if self.prefix.is_empty() {
91 key
92 } else {
93 key.strip_prefix(&self.prefix).unwrap_or(key)
94 }
95 }
96
97 pub fn connection(&self) -> ConnectionManager {
99 self.connection.clone()
100 }
101
102 pub fn prefix(&self) -> &str {
104 &self.prefix
105 }
106
107 fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
121 let payload: serde_json::Value = serde_json::from_slice(&item.content)
123 .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
124
125 let mut audit = serde_json::Map::new();
127 if let Some(ref batch_id) = item.batch_id {
128 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
129 }
130 if let Some(ref trace_parent) = item.trace_parent {
131 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
132 }
133 if let Some(ref home) = item.home_instance_id {
134 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
135 }
136
137 let mut doc = serde_json::json!({
139 "version": item.version,
140 "timestamp": item.updated_at,
141 "state": item.state,
142 "payload": payload
143 });
144
145 if !item.merkle_root.is_empty() {
147 doc["payload_hash"] = serde_json::Value::String(item.merkle_root.clone());
148 }
149
150 if !audit.is_empty() {
152 doc["audit"] = serde_json::Value::Object(audit);
153 }
154
155 serde_json::to_string(&doc)
156 .map_err(|e| StorageError::Backend(e.to_string()))
157 }
158
159 fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
161 let doc: serde_json::Value = serde_json::from_str(json_str)
162 .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
163
164 let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
166 let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
167 let merkle_root = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
168 let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
169
170 let audit = doc.get("audit");
172 let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
173 let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
174 let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
175
176 let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
178 let content = serde_json::to_vec(&payload)
179 .map_err(|e| StorageError::Backend(e.to_string()))?;
180
181 Ok(SyncItem::reconstruct(
182 id.to_string(),
183 version,
184 updated_at,
185 ContentType::Json,
186 content,
187 batch_id,
188 trace_parent,
189 merkle_root,
190 home_instance_id,
191 state,
192 ))
193 }
194}
195
196#[async_trait]
197impl CacheStore for RedisStore {
198 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
199 let conn = self.connection.clone();
200 let prefixed_id = self.prefixed_key(id);
201 let original_id = id.to_string();
202
203 let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
205 let mut conn = conn.clone();
206 let key = prefixed_id.clone();
207 async move {
208 let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
209 Ok(if t == "none" { None } else { Some(t) })
210 }
211 })
212 .await
213 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
214
215 match key_type.as_deref() {
216 None => Ok(None), Some("ReJSON-RL") => {
218 let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
220 let mut conn = conn.clone();
221 let key = prefixed_id.clone();
222 async move {
223 let data: Option<String> = cmd("JSON.GET")
224 .arg(&key)
225 .query_async(&mut conn)
226 .await?;
227 Ok(data)
228 }
229 })
230 .await
231 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
232
233 match json_str {
234 Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
235 None => Ok(None),
236 }
237 }
238 Some("string") => {
239 let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
241 let mut conn = conn.clone();
242 let key = prefixed_id.clone();
243 async move {
244 let data: Option<Vec<u8>> = conn.get(&key).await?;
245 Ok(data)
246 }
247 })
248 .await
249 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
250
251 data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
252 .transpose()
253 }
254 Some(other) => {
255 Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
256 }
257 }
258 }
259
260 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
261 let conn = self.connection.clone();
262 let prefixed_id = self.prefixed_key(&item.object_id);
263
264 match item.content_type {
265 ContentType::Json => {
266 let json_doc = Self::build_json_document(item)?;
268
269 retry("redis_json_set", &RetryConfig::query(), || {
270 let mut conn = conn.clone();
271 let key = prefixed_id.clone();
272 let doc = json_doc.clone();
273 async move {
274 let _: () = cmd("JSON.SET")
276 .arg(&key)
277 .arg("$")
278 .arg(&doc)
279 .query_async(&mut conn)
280 .await?;
281 Ok(())
282 }
283 })
284 .await
285 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
286 }
287 ContentType::Binary => {
288 let data = serde_json::to_vec(item)
290 .map_err(|e| StorageError::Backend(e.to_string()))?;
291
292 retry("redis_set", &RetryConfig::query(), || {
293 let mut conn = conn.clone();
294 let key = prefixed_id.clone();
295 let data = data.clone();
296 async move {
297 let _: () = conn.set(&key, &data).await?;
298 Ok(())
299 }
300 })
301 .await
302 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
303 }
304 }
305 }
306
307 async fn delete(&self, id: &str) -> Result<(), StorageError> {
308 let conn = self.connection.clone();
309 let prefixed_id = self.prefixed_key(id);
310
311 retry("redis_delete", &RetryConfig::query(), || {
312 let mut conn = conn.clone();
313 let key = prefixed_id.clone();
314 async move {
315 let _: () = conn.del(&key).await?;
316 Ok(())
317 }
318 })
319 .await
320 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
321 }
322
323 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
324 let conn = self.connection.clone();
325 let prefixed_id = self.prefixed_key(id);
326
327 retry("redis_exists", &RetryConfig::query(), || {
328 let mut conn = conn.clone();
329 let key = prefixed_id.clone();
330 async move {
331 let exists: bool = conn.exists(&key).await?;
332 Ok(exists)
333 }
334 })
335 .await
336 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
337 }
338
339 async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
341 self.put_batch_with_ttl(items, None).await
342 }
343
344 async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
346 self.put_batch_impl(items, ttl_secs).await
347 }
348}
349
350impl RedisStore {
351 async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
355 if items.is_empty() {
356 return Ok(BatchWriteResult {
357 batch_id: String::new(),
358 written: 0,
359 verified: true,
360 });
361 }
362
363 #[derive(Clone)]
365 enum PreparedItem {
366 Json { key: String, id: String, state: String, doc: String },
367 Blob { key: String, id: String, state: String, data: Vec<u8> },
368 }
369
370 let prepared: Result<Vec<_>, _> = items.iter()
371 .map(|item| {
372 let prefixed_key = self.prefixed_key(&item.object_id);
373 let id = item.object_id.clone();
374 let state = item.state.clone();
375 match item.content_type {
376 ContentType::Json => {
377 Self::build_json_document(item)
378 .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
379 }
380 ContentType::Binary => {
381 serde_json::to_vec(item)
382 .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
383 .map_err(|e| StorageError::Backend(e.to_string()))
384 }
385 }
386 })
387 .collect();
388 let prepared = prepared?;
389 let count = prepared.len();
390
391 let conn = self.connection.clone();
392 let prefix = self.prefix.clone();
393
394 retry("redis_put_batch", &RetryConfig::query(), || {
395 let mut conn = conn.clone();
396 let prepared = prepared.clone();
397 let prefix = prefix.clone();
398 async move {
399 let mut pipeline = pipe();
400
401 for item in &prepared {
402 match item {
403 PreparedItem::Json { key, id, state, doc } => {
404 pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
406 if let Some(ttl) = ttl_secs {
407 pipeline.expire(key, ttl as i64);
408 }
409 let state_key = format!("{}state:{}", prefix, state);
411 pipeline.cmd("SADD").arg(&state_key).arg(id);
412 }
413 PreparedItem::Blob { key, id, state, data } => {
414 if let Some(ttl) = ttl_secs {
416 pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
417 } else {
418 pipeline.set(key, data.as_slice());
419 }
420 let state_key = format!("{}state:{}", prefix, state);
422 pipeline.cmd("SADD").arg(&state_key).arg(id);
423 }
424 }
425 }
426
427 pipeline.query_async::<()>(&mut conn).await?;
428 Ok(())
429 }
430 })
431 .await
432 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
433
434 Ok(BatchWriteResult {
435 batch_id: String::new(),
436 written: count,
437 verified: true,
438 })
439 }
440
441 pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
444 if ids.is_empty() {
445 return Ok(vec![]);
446 }
447
448 let conn = self.connection.clone();
449 let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
451
452 retry("redis_exists_batch", &RetryConfig::query(), || {
453 let mut conn = conn.clone();
454 let prefixed_ids = prefixed_ids.clone();
455 async move {
456 let mut pipeline = pipe();
457 for key in &prefixed_ids {
458 pipeline.exists(key);
459 }
460
461 let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
462 Ok(results)
463 }
464 })
465 .await
466 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
467 }
468
469 pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
477 let mut conn = self.connection.clone();
478 let state_key = format!("{}state:{}", self.prefix, state);
479
480 let ids: Vec<String> = cmd("SMEMBERS")
481 .arg(&state_key)
482 .query_async(&mut conn)
483 .await
484 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
485
486 Ok(ids)
487 }
488
489 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
491 let mut conn = self.connection.clone();
492 let state_key = format!("{}state:{}", self.prefix, state);
493
494 let count: u64 = cmd("SCARD")
495 .arg(&state_key)
496 .query_async(&mut conn)
497 .await
498 .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
499
500 Ok(count)
501 }
502
503 pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
505 let mut conn = self.connection.clone();
506 let state_key = format!("{}state:{}", self.prefix, state);
507
508 let is_member: bool = cmd("SISMEMBER")
509 .arg(&state_key)
510 .arg(id)
511 .query_async(&mut conn)
512 .await
513 .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
514
515 Ok(is_member)
516 }
517
518 pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
522 let mut conn = self.connection.clone();
523 let from_key = format!("{}state:{}", self.prefix, from_state);
524 let to_key = format!("{}state:{}", self.prefix, to_state);
525
526 let moved: bool = cmd("SMOVE")
527 .arg(&from_key)
528 .arg(&to_key)
529 .arg(id)
530 .query_async(&mut conn)
531 .await
532 .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
533
534 Ok(moved)
535 }
536
537 pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
539 let mut conn = self.connection.clone();
540 let state_key = format!("{}state:{}", self.prefix, state);
541
542 let removed: u32 = cmd("SREM")
543 .arg(&state_key)
544 .arg(id)
545 .query_async(&mut conn)
546 .await
547 .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
548
549 Ok(removed > 0)
550 }
551
552 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
556 let mut conn = self.connection.clone();
557 let state_key = format!("{}state:{}", self.prefix, state);
558
559 let ids: Vec<String> = cmd("SMEMBERS")
561 .arg(&state_key)
562 .query_async(&mut conn)
563 .await
564 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
565
566 if ids.is_empty() {
567 return Ok(0);
568 }
569
570 let count = ids.len() as u64;
571
572 let mut pipeline = pipe();
574 for id in &ids {
575 let key = self.prefixed_key(id);
576 pipeline.del(&key);
577 }
578 pipeline.del(&state_key);
579
580 pipeline.query_async::<()>(&mut conn)
581 .await
582 .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
583
584 Ok(count)
585 }
586}