1use async_trait::async_trait;
28use redis::aio::ConnectionManager;
29use redis::{Client, AsyncCommands, pipe, cmd};
30use crate::sync_item::{SyncItem, ContentType};
31use super::traits::{BatchWriteResult, CacheStore, StorageError};
32use crate::resilience::retry::{retry, RetryConfig};
33
34pub struct RedisStore {
35 connection: ConnectionManager,
36 prefix: String,
38}
39
40impl RedisStore {
41 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
43 Self::with_prefix(connection_string, None).await
44 }
45
46 pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
62 let client = Client::open(connection_string)
63 .map_err(|e| StorageError::Backend(e.to_string()))?;
64
65 let connection = retry("redis_connect", &RetryConfig::startup(), || async {
67 ConnectionManager::new(client.clone()).await
68 })
69 .await
70 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
71
72 Ok(Self {
73 connection,
74 prefix: prefix.unwrap_or("").to_string(),
75 })
76 }
77
78 #[inline]
80 fn prefixed_key(&self, key: &str) -> String {
81 if self.prefix.is_empty() {
82 key.to_string()
83 } else {
84 format!("{}{}", self.prefix, key)
85 }
86 }
87
88 #[inline]
91 #[allow(dead_code)]
92 fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
93 if self.prefix.is_empty() {
94 key
95 } else {
96 key.strip_prefix(&self.prefix).unwrap_or(key)
97 }
98 }
99
100 pub fn connection(&self) -> ConnectionManager {
102 self.connection.clone()
103 }
104
105 pub fn prefix(&self) -> &str {
107 &self.prefix
108 }
109
110 fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
126 let payload: serde_json::Value = serde_json::from_slice(&item.content)
128 .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
129
130 let mut audit = serde_json::Map::new();
132 if let Some(ref batch_id) = item.batch_id {
133 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
134 }
135 if let Some(ref trace_parent) = item.trace_parent {
136 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
137 }
138 if let Some(ref home) = item.home_instance_id {
139 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
140 }
141
142 let mut doc = serde_json::json!({
144 "version": item.version,
145 "timestamp": item.updated_at,
146 "state": item.state,
147 "access_count": item.access_count,
148 "last_accessed": item.last_accessed,
149 "payload": payload
150 });
151
152 if !item.content_hash.is_empty() {
154 doc["payload_hash"] = serde_json::Value::String(item.content_hash.clone());
155 }
156
157 if !audit.is_empty() {
159 doc["audit"] = serde_json::Value::Object(audit);
160 }
161
162 serde_json::to_string(&doc)
163 .map_err(|e| StorageError::Backend(e.to_string()))
164 }
165
166 fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
168 let doc: serde_json::Value = serde_json::from_str(json_str)
169 .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
170
171 let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
173 let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
174 let content_hash = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
175 let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
176
177 let access_count = doc.get("access_count").and_then(|v| v.as_u64()).unwrap_or(0);
179 let last_accessed = doc.get("last_accessed").and_then(|v| v.as_u64()).unwrap_or(0);
180
181 let audit = doc.get("audit");
183 let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
184 let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
185 let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
186
187 let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
189 let content = serde_json::to_vec(&payload)
190 .map_err(|e| StorageError::Backend(e.to_string()))?;
191
192 Ok(SyncItem::reconstruct(
193 id.to_string(),
194 version,
195 updated_at,
196 ContentType::Json,
197 content,
198 batch_id,
199 trace_parent,
200 content_hash,
201 home_instance_id,
202 state,
203 access_count,
204 last_accessed,
205 ))
206 }
207
208 fn parse_ft_search_response(value: redis::Value) -> Result<Vec<String>, redis::RedisError> {
211 match value {
212 redis::Value::Array(arr) => {
213 if arr.is_empty() {
214 return Ok(vec![]);
215 }
216 let keys: Vec<String> = arr.into_iter()
218 .skip(1) .filter_map(|v| match v {
220 redis::Value::BulkString(bytes) => String::from_utf8(bytes).ok(),
221 redis::Value::SimpleString(s) => Some(s),
222 _ => None,
223 })
224 .collect();
225 Ok(keys)
226 }
227 _ => Ok(vec![]),
228 }
229 }
230}
231
232#[async_trait]
233impl CacheStore for RedisStore {
234 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
235 let conn = self.connection.clone();
236 let prefixed_id = self.prefixed_key(id);
237 let original_id = id.to_string();
238
239 let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
241 let mut conn = conn.clone();
242 let key = prefixed_id.clone();
243 async move {
244 let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
245 Ok(if t == "none" { None } else { Some(t) })
246 }
247 })
248 .await
249 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
250
251 match key_type.as_deref() {
252 None => Ok(None), Some("ReJSON-RL") => {
254 let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
256 let mut conn = conn.clone();
257 let key = prefixed_id.clone();
258 async move {
259 let data: Option<String> = cmd("JSON.GET")
260 .arg(&key)
261 .query_async(&mut conn)
262 .await?;
263 Ok(data)
264 }
265 })
266 .await
267 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
268
269 match json_str {
270 Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
271 None => Ok(None),
272 }
273 }
274 Some("string") => {
275 let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
277 let mut conn = conn.clone();
278 let key = prefixed_id.clone();
279 async move {
280 let data: Option<Vec<u8>> = conn.get(&key).await?;
281 Ok(data)
282 }
283 })
284 .await
285 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
286
287 data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
288 .transpose()
289 }
290 Some(other) => {
291 Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
292 }
293 }
294 }
295
296 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
297 let conn = self.connection.clone();
298 let prefixed_id = self.prefixed_key(&item.object_id);
299
300 match item.content_type {
301 ContentType::Json => {
302 let json_doc = Self::build_json_document(item)?;
304
305 retry("redis_json_set", &RetryConfig::query(), || {
306 let mut conn = conn.clone();
307 let key = prefixed_id.clone();
308 let doc = json_doc.clone();
309 async move {
310 let _: () = cmd("JSON.SET")
312 .arg(&key)
313 .arg("$")
314 .arg(&doc)
315 .query_async(&mut conn)
316 .await?;
317 Ok(())
318 }
319 })
320 .await
321 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
322 }
323 ContentType::Binary => {
324 let data = serde_json::to_vec(item)
326 .map_err(|e| StorageError::Backend(e.to_string()))?;
327
328 retry("redis_set", &RetryConfig::query(), || {
329 let mut conn = conn.clone();
330 let key = prefixed_id.clone();
331 let data = data.clone();
332 async move {
333 let _: () = conn.set(&key, &data).await?;
334 Ok(())
335 }
336 })
337 .await
338 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
339 }
340 }
341 }
342
343 async fn delete(&self, id: &str) -> Result<(), StorageError> {
344 let conn = self.connection.clone();
345 let prefixed_id = self.prefixed_key(id);
346
347 retry("redis_delete", &RetryConfig::query(), || {
348 let mut conn = conn.clone();
349 let key = prefixed_id.clone();
350 async move {
351 let _: () = conn.del(&key).await?;
352 Ok(())
353 }
354 })
355 .await
356 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
357 }
358
359 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
360 let conn = self.connection.clone();
361 let prefixed_id = self.prefixed_key(id);
362
363 retry("redis_exists", &RetryConfig::query(), || {
364 let mut conn = conn.clone();
365 let key = prefixed_id.clone();
366 async move {
367 let exists: bool = conn.exists(&key).await?;
368 Ok(exists)
369 }
370 })
371 .await
372 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
373 }
374
375 async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
377 self.put_batch_with_ttl(items, None).await
378 }
379
380 async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
382 self.put_batch_impl(items, ttl_secs).await
383 }
384
385 async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
387 let conn = self.connection.clone();
388
389 retry("redis_ft_create", &RetryConfig::query(), || {
390 let mut conn = conn.clone();
391 let args = args.to_vec();
392 async move {
393 let mut cmd = cmd("FT.CREATE");
394 for arg in &args {
395 cmd.arg(arg);
396 }
397 let _: () = cmd.query_async(&mut conn).await?;
398 Ok(())
399 }
400 })
401 .await
402 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
403 }
404
405 async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
407 let conn = self.connection.clone();
408 let index = index.to_string();
409
410 retry("redis_ft_dropindex", &RetryConfig::query(), || {
411 let mut conn = conn.clone();
412 let index = index.clone();
413 async move {
414 let _: () = cmd("FT.DROPINDEX")
415 .arg(&index)
416 .query_async(&mut conn)
417 .await?;
418 Ok(())
419 }
420 })
421 .await
422 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
423 }
424
425 async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
427 let conn = self.connection.clone();
428 let index = index.to_string();
429 let query = query.to_string();
430
431 retry("redis_ft_search", &RetryConfig::query(), || {
432 let mut conn = conn.clone();
433 let index = index.clone();
434 let query = query.clone();
435 async move {
436 let result: redis::Value = cmd("FT.SEARCH")
439 .arg(&index)
440 .arg(&query)
441 .arg("LIMIT")
442 .arg(0)
443 .arg(limit)
444 .arg("NOCONTENT")
445 .query_async(&mut conn)
446 .await?;
447
448 Self::parse_ft_search_response(result)
450 }
451 })
452 .await
453 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
454 }
455
456 async fn ft_search_with_params(
459 &self,
460 index: &str,
461 query: &str,
462 params: &[(String, Vec<u8>)],
463 limit: usize,
464 ) -> Result<Vec<String>, StorageError> {
465 let conn = self.connection.clone();
466 let index = index.to_string();
467 let query = query.to_string();
468 let params: Vec<(String, Vec<u8>)> = params.to_vec();
469
470 retry("redis_ft_search_knn", &RetryConfig::query(), || {
471 let mut conn = conn.clone();
472 let index = index.clone();
473 let query = query.clone();
474 let params = params.clone();
475 async move {
476 let mut command = cmd("FT.SEARCH");
478 command.arg(&index).arg(&query);
479
480 if !params.is_empty() {
482 command.arg("PARAMS").arg(params.len() * 2);
483 for (name, blob) in ¶ms {
484 command.arg(name).arg(blob.as_slice());
485 }
486 }
487
488 command.arg("LIMIT").arg(0).arg(limit).arg("NOCONTENT");
489
490 let result: redis::Value = command.query_async(&mut conn).await?;
491 Self::parse_ft_search_response(result)
492 }
493 })
494 .await
495 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
496 }
497}
498
499impl RedisStore {
500 async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
504 if items.is_empty() {
505 return Ok(BatchWriteResult {
506 batch_id: String::new(),
507 written: 0,
508 verified: true,
509 });
510 }
511
512 #[derive(Clone)]
514 enum PreparedItem {
515 Json { key: String, id: String, state: String, doc: String },
516 Blob { key: String, id: String, state: String, data: Vec<u8> },
517 }
518
519 let prepared: Result<Vec<_>, _> = items.iter()
520 .map(|item| {
521 let prefixed_key = self.prefixed_key(&item.object_id);
522 let id = item.object_id.clone();
523 let state = item.state.clone();
524 match item.content_type {
525 ContentType::Json => {
526 Self::build_json_document(item)
527 .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
528 }
529 ContentType::Binary => {
530 serde_json::to_vec(item)
531 .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
532 .map_err(|e| StorageError::Backend(e.to_string()))
533 }
534 }
535 })
536 .collect();
537 let prepared = prepared?;
538 let count = prepared.len();
539
540 let conn = self.connection.clone();
541 let prefix = self.prefix.clone();
542
543 retry("redis_put_batch", &RetryConfig::batch_write(), || {
544 let mut conn = conn.clone();
545 let prepared = prepared.clone();
546 let prefix = prefix.clone();
547 async move {
548 let mut pipeline = pipe();
549
550 for item in &prepared {
551 match item {
552 PreparedItem::Json { key, id, state, doc } => {
553 pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
555 if let Some(ttl) = ttl_secs {
556 pipeline.expire(key, ttl as i64);
557 }
558 let state_key = format!("{}state:{}", prefix, state);
560 pipeline.cmd("SADD").arg(&state_key).arg(id);
561 }
562 PreparedItem::Blob { key, id, state, data } => {
563 if let Some(ttl) = ttl_secs {
565 pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
566 } else {
567 pipeline.set(key, data.as_slice());
568 }
569 let state_key = format!("{}state:{}", prefix, state);
571 pipeline.cmd("SADD").arg(&state_key).arg(id);
572 }
573 }
574 }
575
576 pipeline.query_async::<()>(&mut conn).await?;
577 Ok(())
578 }
579 })
580 .await
581 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
582
583 Ok(BatchWriteResult {
584 batch_id: String::new(),
585 written: count,
586 verified: true,
587 })
588 }
589
590 pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
593 if ids.is_empty() {
594 return Ok(vec![]);
595 }
596
597 let conn = self.connection.clone();
598 let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
600
601 retry("redis_exists_batch", &RetryConfig::query(), || {
602 let mut conn = conn.clone();
603 let prefixed_ids = prefixed_ids.clone();
604 async move {
605 let mut pipeline = pipe();
606 for key in &prefixed_ids {
607 pipeline.exists(key);
608 }
609
610 let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
611 Ok(results)
612 }
613 })
614 .await
615 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
616 }
617
618 pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
626 let mut conn = self.connection.clone();
627 let state_key = format!("{}state:{}", self.prefix, state);
628
629 let ids: Vec<String> = cmd("SMEMBERS")
630 .arg(&state_key)
631 .query_async(&mut conn)
632 .await
633 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
634
635 Ok(ids)
636 }
637
638 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
640 let mut conn = self.connection.clone();
641 let state_key = format!("{}state:{}", self.prefix, state);
642
643 let count: u64 = cmd("SCARD")
644 .arg(&state_key)
645 .query_async(&mut conn)
646 .await
647 .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
648
649 Ok(count)
650 }
651
652 pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
654 let mut conn = self.connection.clone();
655 let state_key = format!("{}state:{}", self.prefix, state);
656
657 let is_member: bool = cmd("SISMEMBER")
658 .arg(&state_key)
659 .arg(id)
660 .query_async(&mut conn)
661 .await
662 .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
663
664 Ok(is_member)
665 }
666
667 pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
671 let mut conn = self.connection.clone();
672 let from_key = format!("{}state:{}", self.prefix, from_state);
673 let to_key = format!("{}state:{}", self.prefix, to_state);
674
675 let moved: bool = cmd("SMOVE")
676 .arg(&from_key)
677 .arg(&to_key)
678 .arg(id)
679 .query_async(&mut conn)
680 .await
681 .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
682
683 Ok(moved)
684 }
685
686 pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
688 let mut conn = self.connection.clone();
689 let state_key = format!("{}state:{}", self.prefix, state);
690
691 let removed: u32 = cmd("SREM")
692 .arg(&state_key)
693 .arg(id)
694 .query_async(&mut conn)
695 .await
696 .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
697
698 Ok(removed > 0)
699 }
700
701 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
705 let mut conn = self.connection.clone();
706 let state_key = format!("{}state:{}", self.prefix, state);
707
708 let ids: Vec<String> = cmd("SMEMBERS")
710 .arg(&state_key)
711 .query_async(&mut conn)
712 .await
713 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
714
715 if ids.is_empty() {
716 return Ok(0);
717 }
718
719 let count = ids.len() as u64;
720
721 let mut pipeline = pipe();
723 for id in &ids {
724 let key = self.prefixed_key(id);
725 pipeline.del(&key);
726 }
727 pipeline.del(&state_key);
728
729 pipeline.query_async::<()>(&mut conn)
730 .await
731 .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
732
733 Ok(count)
734 }
735
736 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
747 let mut conn = self.connection.clone();
748
749 let pattern = format!("{}{}*", self.prefix, prefix);
751
752 let mut items = Vec::new();
753 let mut cursor: u64 = 0;
754
755 loop {
757 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
759 .arg(cursor)
760 .arg("MATCH")
761 .arg(&pattern)
762 .arg("COUNT")
763 .arg(100) .query_async(&mut conn)
765 .await
766 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
767
768 for key in keys {
770 if items.len() >= limit {
771 break;
772 }
773
774 let json_opt: Option<String> = cmd("JSON.GET")
775 .arg(&key)
776 .query_async(&mut conn)
777 .await
778 .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
779
780 if let Some(json_str) = json_opt {
781 let id = self.strip_prefix(&key);
783 if let Ok(item) = Self::parse_json_document(id, &json_str) {
784 items.push(item);
785 }
786 }
787 }
788
789 cursor = new_cursor;
790
791 if cursor == 0 || items.len() >= limit {
793 break;
794 }
795 }
796
797 Ok(items)
798 }
799
800 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
804 let mut conn = self.connection.clone();
805 let pattern = format!("{}{}*", self.prefix, prefix);
806
807 let mut count: u64 = 0;
808 let mut cursor: u64 = 0;
809
810 loop {
811 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
812 .arg(cursor)
813 .arg("MATCH")
814 .arg(&pattern)
815 .arg("COUNT")
816 .arg(1000)
817 .query_async(&mut conn)
818 .await
819 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
820
821 count += keys.len() as u64;
822 cursor = new_cursor;
823
824 if cursor == 0 {
825 break;
826 }
827 }
828
829 Ok(count)
830 }
831
832 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
836 let mut conn = self.connection.clone();
837 let pattern = format!("{}{}*", self.prefix, prefix);
838
839 let mut deleted: u64 = 0;
840 let mut cursor: u64 = 0;
841
842 loop {
843 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
844 .arg(cursor)
845 .arg("MATCH")
846 .arg(&pattern)
847 .arg("COUNT")
848 .arg(1000)
849 .query_async(&mut conn)
850 .await
851 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
852
853 if !keys.is_empty() {
854 let mut pipeline = pipe();
856 for key in &keys {
857 pipeline.del(key);
858 }
859 pipeline.query_async::<()>(&mut conn)
860 .await
861 .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
862
863 deleted += keys.len() as u64;
864 }
865
866 cursor = new_cursor;
867
868 if cursor == 0 {
869 break;
870 }
871 }
872
873 Ok(deleted)
874 }
875
876 pub async fn xadd_cdc(
885 &self,
886 entry: &crate::cdc::CdcEntry,
887 maxlen: u64
888 ) -> Result<String, StorageError> {
889 let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
890 let fields = entry.to_redis_fields();
891
892 let mut conn = self.connection.clone();
893
894 let mut command = cmd("XADD");
896 command.arg(&stream_key);
897 command.arg("MAXLEN");
898 command.arg("~");
899 command.arg(maxlen);
900 command.arg("*"); for (field, value) in fields {
903 command.arg(field);
904 command.arg(value.as_bytes());
905 }
906
907 let entry_id: String = command
908 .query_async(&mut conn)
909 .await
910 .map_err(|e| StorageError::Backend(format!("XADD CDC failed: {}", e)))?;
911
912 Ok(entry_id)
913 }
914
915 pub async fn xadd_cdc_batch(
919 &self,
920 entries: &[crate::cdc::CdcEntry],
921 maxlen: u64
922 ) -> Result<Vec<String>, StorageError> {
923 if entries.is_empty() {
924 return Ok(vec![]);
925 }
926
927 let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
928 let mut conn = self.connection.clone();
929
930 let mut pipeline = pipe();
931
932 for entry in entries {
933 let fields = entry.to_redis_fields();
934
935 let mut command = cmd("XADD");
937 command.arg(&stream_key);
938 command.arg("MAXLEN");
939 command.arg("~");
940 command.arg(maxlen);
941 command.arg("*");
942
943 for (field, value) in fields {
944 command.arg(field);
945 command.arg(value.as_bytes());
946 }
947
948 pipeline.add_command(command);
949 }
950
951 let ids: Vec<String> = pipeline
952 .query_async(&mut conn)
953 .await
954 .map_err(|e| StorageError::Backend(format!("XADD CDC batch failed: {}", e)))?;
955
956 Ok(ids)
957 }
958}