1use async_trait::async_trait;
25use redis::aio::ConnectionManager;
26use redis::{Client, AsyncCommands, pipe, cmd};
27use crate::sync_item::{SyncItem, ContentType};
28use super::traits::{BatchWriteResult, CacheStore, StorageError};
29use crate::resilience::retry::{retry, RetryConfig};
30
31pub struct RedisStore {
32 connection: ConnectionManager,
33 prefix: String,
35}
36
37impl RedisStore {
38 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
40 Self::with_prefix(connection_string, None).await
41 }
42
43 pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
59 let client = Client::open(connection_string)
60 .map_err(|e| StorageError::Backend(e.to_string()))?;
61
62 let connection = retry("redis_connect", &RetryConfig::startup(), || async {
64 ConnectionManager::new(client.clone()).await
65 })
66 .await
67 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
68
69 Ok(Self {
70 connection,
71 prefix: prefix.unwrap_or("").to_string(),
72 })
73 }
74
75 #[inline]
77 fn prefixed_key(&self, key: &str) -> String {
78 if self.prefix.is_empty() {
79 key.to_string()
80 } else {
81 format!("{}{}", self.prefix, key)
82 }
83 }
84
85 #[inline]
88 #[allow(dead_code)]
89 fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
90 if self.prefix.is_empty() {
91 key
92 } else {
93 key.strip_prefix(&self.prefix).unwrap_or(key)
94 }
95 }
96
97 pub fn connection(&self) -> ConnectionManager {
99 self.connection.clone()
100 }
101
102 pub fn prefix(&self) -> &str {
104 &self.prefix
105 }
106
107 fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
120 let payload: serde_json::Value = serde_json::from_slice(&item.content)
122 .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
123
124 let mut audit = serde_json::Map::new();
126 if let Some(ref batch_id) = item.batch_id {
127 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
128 }
129 if let Some(ref trace_parent) = item.trace_parent {
130 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
131 }
132 if let Some(ref home) = item.home_instance_id {
133 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
134 }
135
136 let mut doc = serde_json::json!({
138 "version": item.version,
139 "timestamp": item.updated_at,
140 "payload": payload
141 });
142
143 if !item.merkle_root.is_empty() {
145 doc["payload_hash"] = serde_json::Value::String(item.merkle_root.clone());
146 }
147
148 if !audit.is_empty() {
150 doc["audit"] = serde_json::Value::Object(audit);
151 }
152
153 serde_json::to_string(&doc)
154 .map_err(|e| StorageError::Backend(e.to_string()))
155 }
156
157 fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
159 let doc: serde_json::Value = serde_json::from_str(json_str)
160 .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
161
162 let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
164 let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
165 let merkle_root = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
166
167 let audit = doc.get("audit");
169 let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
170 let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
171 let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
172
173 let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
175 let content = serde_json::to_vec(&payload)
176 .map_err(|e| StorageError::Backend(e.to_string()))?;
177
178 Ok(SyncItem::reconstruct(
179 id.to_string(),
180 version,
181 updated_at,
182 ContentType::Json,
183 content,
184 batch_id,
185 trace_parent,
186 merkle_root,
187 home_instance_id,
188 ))
189 }
190}
191
192#[async_trait]
193impl CacheStore for RedisStore {
194 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
195 let conn = self.connection.clone();
196 let prefixed_id = self.prefixed_key(id);
197 let original_id = id.to_string();
198
199 let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
201 let mut conn = conn.clone();
202 let key = prefixed_id.clone();
203 async move {
204 let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
205 Ok(if t == "none" { None } else { Some(t) })
206 }
207 })
208 .await
209 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
210
211 match key_type.as_deref() {
212 None => Ok(None), Some("ReJSON-RL") => {
214 let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
216 let mut conn = conn.clone();
217 let key = prefixed_id.clone();
218 async move {
219 let data: Option<String> = cmd("JSON.GET")
220 .arg(&key)
221 .query_async(&mut conn)
222 .await?;
223 Ok(data)
224 }
225 })
226 .await
227 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
228
229 match json_str {
230 Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
231 None => Ok(None),
232 }
233 }
234 Some("string") => {
235 let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
237 let mut conn = conn.clone();
238 let key = prefixed_id.clone();
239 async move {
240 let data: Option<Vec<u8>> = conn.get(&key).await?;
241 Ok(data)
242 }
243 })
244 .await
245 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
246
247 data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
248 .transpose()
249 }
250 Some(other) => {
251 Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
252 }
253 }
254 }
255
256 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
257 let conn = self.connection.clone();
258 let prefixed_id = self.prefixed_key(&item.object_id);
259
260 match item.content_type {
261 ContentType::Json => {
262 let json_doc = Self::build_json_document(item)?;
264
265 retry("redis_json_set", &RetryConfig::query(), || {
266 let mut conn = conn.clone();
267 let key = prefixed_id.clone();
268 let doc = json_doc.clone();
269 async move {
270 let _: () = cmd("JSON.SET")
272 .arg(&key)
273 .arg("$")
274 .arg(&doc)
275 .query_async(&mut conn)
276 .await?;
277 Ok(())
278 }
279 })
280 .await
281 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
282 }
283 ContentType::Binary => {
284 let data = serde_json::to_vec(item)
286 .map_err(|e| StorageError::Backend(e.to_string()))?;
287
288 retry("redis_set", &RetryConfig::query(), || {
289 let mut conn = conn.clone();
290 let key = prefixed_id.clone();
291 let data = data.clone();
292 async move {
293 let _: () = conn.set(&key, &data).await?;
294 Ok(())
295 }
296 })
297 .await
298 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
299 }
300 }
301 }
302
303 async fn delete(&self, id: &str) -> Result<(), StorageError> {
304 let conn = self.connection.clone();
305 let prefixed_id = self.prefixed_key(id);
306
307 retry("redis_delete", &RetryConfig::query(), || {
308 let mut conn = conn.clone();
309 let key = prefixed_id.clone();
310 async move {
311 let _: () = conn.del(&key).await?;
312 Ok(())
313 }
314 })
315 .await
316 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
317 }
318
319 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
320 let conn = self.connection.clone();
321 let prefixed_id = self.prefixed_key(id);
322
323 retry("redis_exists", &RetryConfig::query(), || {
324 let mut conn = conn.clone();
325 let key = prefixed_id.clone();
326 async move {
327 let exists: bool = conn.exists(&key).await?;
328 Ok(exists)
329 }
330 })
331 .await
332 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
333 }
334
335 async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
337 self.put_batch_with_ttl(items, None).await
338 }
339
340 async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
342 self.put_batch_impl(items, ttl_secs).await
343 }
344}
345
346impl RedisStore {
347 async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
350 if items.is_empty() {
351 return Ok(BatchWriteResult {
352 batch_id: String::new(),
353 written: 0,
354 verified: true,
355 });
356 }
357
358 #[derive(Clone)]
360 enum PreparedItem {
361 Json { key: String, doc: String },
362 Blob { key: String, data: Vec<u8> },
363 }
364
365 let prepared: Result<Vec<_>, _> = items.iter()
366 .map(|item| {
367 let prefixed_key = self.prefixed_key(&item.object_id);
368 match item.content_type {
369 ContentType::Json => {
370 Self::build_json_document(item)
371 .map(|doc| PreparedItem::Json { key: prefixed_key, doc })
372 }
373 ContentType::Binary => {
374 serde_json::to_vec(item)
375 .map(|bytes| PreparedItem::Blob { key: prefixed_key, data: bytes })
376 .map_err(|e| StorageError::Backend(e.to_string()))
377 }
378 }
379 })
380 .collect();
381 let prepared = prepared?;
382 let count = prepared.len();
383
384 let conn = self.connection.clone();
385
386 retry("redis_put_batch", &RetryConfig::query(), || {
387 let mut conn = conn.clone();
388 let prepared = prepared.clone();
389 async move {
390 let mut pipeline = pipe();
391
392 for item in &prepared {
393 match item {
394 PreparedItem::Json { key, doc } => {
395 pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
397 if let Some(ttl) = ttl_secs {
398 pipeline.expire(key, ttl as i64);
399 }
400 }
401 PreparedItem::Blob { key, data } => {
402 if let Some(ttl) = ttl_secs {
404 pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
405 } else {
406 pipeline.set(key, data.as_slice());
407 }
408 }
409 }
410 }
411
412 pipeline.query_async::<()>(&mut conn).await?;
413 Ok(())
414 }
415 })
416 .await
417 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
418
419 Ok(BatchWriteResult {
420 batch_id: String::new(),
421 written: count,
422 verified: true,
423 })
424 }
425
426 pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
429 if ids.is_empty() {
430 return Ok(vec![]);
431 }
432
433 let conn = self.connection.clone();
434 let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
436
437 retry("redis_exists_batch", &RetryConfig::query(), || {
438 let mut conn = conn.clone();
439 let prefixed_ids = prefixed_ids.clone();
440 async move {
441 let mut pipeline = pipe();
442 for key in &prefixed_ids {
443 pipeline.exists(key);
444 }
445
446 let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
447 Ok(results)
448 }
449 })
450 .await
451 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
452 }
453}