1use async_trait::async_trait;
28use redis::aio::ConnectionManager;
29use redis::{Client, AsyncCommands, pipe, cmd};
30use crate::sync_item::{SyncItem, ContentType};
31use super::traits::{BatchWriteResult, CacheStore, StorageError};
32use crate::resilience::retry::{retry, RetryConfig};
33
34pub struct RedisStore {
35 connection: ConnectionManager,
36 prefix: String,
38}
39
40impl RedisStore {
41 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
43 Self::with_prefix(connection_string, None).await
44 }
45
46 pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
62 let client = Client::open(connection_string)
63 .map_err(|e| StorageError::Backend(e.to_string()))?;
64
65 let connection = retry("redis_connect", &RetryConfig::startup(), || async {
67 ConnectionManager::new(client.clone()).await
68 })
69 .await
70 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
71
72 Ok(Self {
73 connection,
74 prefix: prefix.unwrap_or("").to_string(),
75 })
76 }
77
78 #[inline]
80 fn prefixed_key(&self, key: &str) -> String {
81 if self.prefix.is_empty() {
82 key.to_string()
83 } else {
84 format!("{}{}", self.prefix, key)
85 }
86 }
87
88 #[inline]
91 #[allow(dead_code)]
92 fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
93 if self.prefix.is_empty() {
94 key
95 } else {
96 key.strip_prefix(&self.prefix).unwrap_or(key)
97 }
98 }
99
100 pub fn connection(&self) -> ConnectionManager {
102 self.connection.clone()
103 }
104
105 pub fn prefix(&self) -> &str {
107 &self.prefix
108 }
109
110 fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
126 let payload: serde_json::Value = serde_json::from_slice(&item.content)
128 .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
129
130 let mut audit = serde_json::Map::new();
132 if let Some(ref batch_id) = item.batch_id {
133 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
134 }
135 if let Some(ref trace_parent) = item.trace_parent {
136 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
137 }
138 if let Some(ref home) = item.home_instance_id {
139 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
140 }
141
142 let mut doc = serde_json::json!({
144 "version": item.version,
145 "timestamp": item.updated_at,
146 "state": item.state,
147 "access_count": item.access_count,
148 "last_accessed": item.last_accessed,
149 "payload": payload
150 });
151
152 if !item.content_hash.is_empty() {
154 doc["payload_hash"] = serde_json::Value::String(item.content_hash.clone());
155 }
156
157 if !audit.is_empty() {
159 doc["audit"] = serde_json::Value::Object(audit);
160 }
161
162 serde_json::to_string(&doc)
163 .map_err(|e| StorageError::Backend(e.to_string()))
164 }
165
166 fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
168 let doc: serde_json::Value = serde_json::from_str(json_str)
169 .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
170
171 let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
173 let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
174 let content_hash = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
175 let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
176
177 let access_count = doc.get("access_count").and_then(|v| v.as_u64()).unwrap_or(0);
179 let last_accessed = doc.get("last_accessed").and_then(|v| v.as_u64()).unwrap_or(0);
180
181 let audit = doc.get("audit");
183 let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
184 let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
185 let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
186
187 let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
189 let content = serde_json::to_vec(&payload)
190 .map_err(|e| StorageError::Backend(e.to_string()))?;
191
192 Ok(SyncItem::reconstruct(
193 id.to_string(),
194 version,
195 updated_at,
196 ContentType::Json,
197 content,
198 batch_id,
199 trace_parent,
200 content_hash,
201 home_instance_id,
202 state,
203 access_count,
204 last_accessed,
205 ))
206 }
207
208 fn parse_ft_search_response(value: redis::Value) -> Result<Vec<String>, redis::RedisError> {
211 match value {
212 redis::Value::Array(arr) => {
213 if arr.is_empty() {
214 return Ok(vec![]);
215 }
216 let keys: Vec<String> = arr.into_iter()
218 .skip(1) .filter_map(|v| match v {
220 redis::Value::BulkString(bytes) => String::from_utf8(bytes).ok(),
221 redis::Value::SimpleString(s) => Some(s),
222 _ => None,
223 })
224 .collect();
225 Ok(keys)
226 }
227 _ => Ok(vec![]),
228 }
229 }
230}
231
232#[async_trait]
233impl CacheStore for RedisStore {
234 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
235 let conn = self.connection.clone();
236 let prefixed_id = self.prefixed_key(id);
237 let original_id = id.to_string();
238
239 let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
241 let mut conn = conn.clone();
242 let key = prefixed_id.clone();
243 async move {
244 let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
245 Ok(if t == "none" { None } else { Some(t) })
246 }
247 })
248 .await
249 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
250
251 match key_type.as_deref() {
252 None => Ok(None), Some("ReJSON-RL") => {
254 let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
256 let mut conn = conn.clone();
257 let key = prefixed_id.clone();
258 async move {
259 let data: Option<String> = cmd("JSON.GET")
260 .arg(&key)
261 .query_async(&mut conn)
262 .await?;
263 Ok(data)
264 }
265 })
266 .await
267 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
268
269 match json_str {
270 Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
271 None => Ok(None),
272 }
273 }
274 Some("string") => {
275 let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
277 let mut conn = conn.clone();
278 let key = prefixed_id.clone();
279 async move {
280 let data: Option<Vec<u8>> = conn.get(&key).await?;
281 Ok(data)
282 }
283 })
284 .await
285 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
286
287 data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
288 .transpose()
289 }
290 Some(other) => {
291 Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
292 }
293 }
294 }
295
296 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
297 let conn = self.connection.clone();
298 let prefixed_id = self.prefixed_key(&item.object_id);
299
300 match item.content_type {
301 ContentType::Json => {
302 let json_doc = Self::build_json_document(item)?;
304
305 retry("redis_json_set", &RetryConfig::query(), || {
306 let mut conn = conn.clone();
307 let key = prefixed_id.clone();
308 let doc = json_doc.clone();
309 async move {
310 let _: () = cmd("JSON.SET")
312 .arg(&key)
313 .arg("$")
314 .arg(&doc)
315 .query_async(&mut conn)
316 .await?;
317 Ok(())
318 }
319 })
320 .await
321 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
322 }
323 ContentType::Binary => {
324 let data = serde_json::to_vec(item)
326 .map_err(|e| StorageError::Backend(e.to_string()))?;
327
328 retry("redis_set", &RetryConfig::query(), || {
329 let mut conn = conn.clone();
330 let key = prefixed_id.clone();
331 let data = data.clone();
332 async move {
333 let _: () = conn.set(&key, &data).await?;
334 Ok(())
335 }
336 })
337 .await
338 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
339 }
340 }
341 }
342
343 async fn delete(&self, id: &str) -> Result<(), StorageError> {
344 let conn = self.connection.clone();
345 let prefixed_id = self.prefixed_key(id);
346
347 retry("redis_delete", &RetryConfig::query(), || {
348 let mut conn = conn.clone();
349 let key = prefixed_id.clone();
350 async move {
351 let _: () = conn.del(&key).await?;
352 Ok(())
353 }
354 })
355 .await
356 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
357 }
358
359 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
360 let conn = self.connection.clone();
361 let prefixed_id = self.prefixed_key(id);
362
363 retry("redis_exists", &RetryConfig::query(), || {
364 let mut conn = conn.clone();
365 let key = prefixed_id.clone();
366 async move {
367 let exists: bool = conn.exists(&key).await?;
368 Ok(exists)
369 }
370 })
371 .await
372 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
373 }
374
375 async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
377 self.put_batch_with_ttl(items, None).await
378 }
379
380 async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
382 self.put_batch_impl(items, ttl_secs).await
383 }
384
385 async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
387 let conn = self.connection.clone();
388
389 retry("redis_ft_create", &RetryConfig::query(), || {
390 let mut conn = conn.clone();
391 let args = args.to_vec();
392 async move {
393 let mut cmd = cmd("FT.CREATE");
394 for arg in &args {
395 cmd.arg(arg);
396 }
397 let _: () = cmd.query_async(&mut conn).await?;
398 Ok(())
399 }
400 })
401 .await
402 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
403 }
404
405 async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
407 let conn = self.connection.clone();
408 let index = index.to_string();
409
410 retry("redis_ft_dropindex", &RetryConfig::query(), || {
411 let mut conn = conn.clone();
412 let index = index.clone();
413 async move {
414 let _: () = cmd("FT.DROPINDEX")
415 .arg(&index)
416 .query_async(&mut conn)
417 .await?;
418 Ok(())
419 }
420 })
421 .await
422 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
423 }
424
425 async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
427 let conn = self.connection.clone();
428 let index = index.to_string();
429 let query = query.to_string();
430
431 retry("redis_ft_search", &RetryConfig::query(), || {
432 let mut conn = conn.clone();
433 let index = index.clone();
434 let query = query.clone();
435 async move {
436 let result: redis::Value = cmd("FT.SEARCH")
439 .arg(&index)
440 .arg(&query)
441 .arg("LIMIT")
442 .arg(0)
443 .arg(limit)
444 .arg("NOCONTENT")
445 .query_async(&mut conn)
446 .await?;
447
448 Self::parse_ft_search_response(result)
450 }
451 })
452 .await
453 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
454 }
455}
456
457impl RedisStore {
458 async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
462 if items.is_empty() {
463 return Ok(BatchWriteResult {
464 batch_id: String::new(),
465 written: 0,
466 verified: true,
467 });
468 }
469
470 #[derive(Clone)]
472 enum PreparedItem {
473 Json { key: String, id: String, state: String, doc: String },
474 Blob { key: String, id: String, state: String, data: Vec<u8> },
475 }
476
477 let prepared: Result<Vec<_>, _> = items.iter()
478 .map(|item| {
479 let prefixed_key = self.prefixed_key(&item.object_id);
480 let id = item.object_id.clone();
481 let state = item.state.clone();
482 match item.content_type {
483 ContentType::Json => {
484 Self::build_json_document(item)
485 .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
486 }
487 ContentType::Binary => {
488 serde_json::to_vec(item)
489 .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
490 .map_err(|e| StorageError::Backend(e.to_string()))
491 }
492 }
493 })
494 .collect();
495 let prepared = prepared?;
496 let count = prepared.len();
497
498 let conn = self.connection.clone();
499 let prefix = self.prefix.clone();
500
501 retry("redis_put_batch", &RetryConfig::query(), || {
502 let mut conn = conn.clone();
503 let prepared = prepared.clone();
504 let prefix = prefix.clone();
505 async move {
506 let mut pipeline = pipe();
507
508 for item in &prepared {
509 match item {
510 PreparedItem::Json { key, id, state, doc } => {
511 pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
513 if let Some(ttl) = ttl_secs {
514 pipeline.expire(key, ttl as i64);
515 }
516 let state_key = format!("{}state:{}", prefix, state);
518 pipeline.cmd("SADD").arg(&state_key).arg(id);
519 }
520 PreparedItem::Blob { key, id, state, data } => {
521 if let Some(ttl) = ttl_secs {
523 pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
524 } else {
525 pipeline.set(key, data.as_slice());
526 }
527 let state_key = format!("{}state:{}", prefix, state);
529 pipeline.cmd("SADD").arg(&state_key).arg(id);
530 }
531 }
532 }
533
534 pipeline.query_async::<()>(&mut conn).await?;
535 Ok(())
536 }
537 })
538 .await
539 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
540
541 Ok(BatchWriteResult {
542 batch_id: String::new(),
543 written: count,
544 verified: true,
545 })
546 }
547
548 pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
551 if ids.is_empty() {
552 return Ok(vec![]);
553 }
554
555 let conn = self.connection.clone();
556 let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
558
559 retry("redis_exists_batch", &RetryConfig::query(), || {
560 let mut conn = conn.clone();
561 let prefixed_ids = prefixed_ids.clone();
562 async move {
563 let mut pipeline = pipe();
564 for key in &prefixed_ids {
565 pipeline.exists(key);
566 }
567
568 let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
569 Ok(results)
570 }
571 })
572 .await
573 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
574 }
575
576 pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
584 let mut conn = self.connection.clone();
585 let state_key = format!("{}state:{}", self.prefix, state);
586
587 let ids: Vec<String> = cmd("SMEMBERS")
588 .arg(&state_key)
589 .query_async(&mut conn)
590 .await
591 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
592
593 Ok(ids)
594 }
595
596 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
598 let mut conn = self.connection.clone();
599 let state_key = format!("{}state:{}", self.prefix, state);
600
601 let count: u64 = cmd("SCARD")
602 .arg(&state_key)
603 .query_async(&mut conn)
604 .await
605 .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
606
607 Ok(count)
608 }
609
610 pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
612 let mut conn = self.connection.clone();
613 let state_key = format!("{}state:{}", self.prefix, state);
614
615 let is_member: bool = cmd("SISMEMBER")
616 .arg(&state_key)
617 .arg(id)
618 .query_async(&mut conn)
619 .await
620 .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
621
622 Ok(is_member)
623 }
624
625 pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
629 let mut conn = self.connection.clone();
630 let from_key = format!("{}state:{}", self.prefix, from_state);
631 let to_key = format!("{}state:{}", self.prefix, to_state);
632
633 let moved: bool = cmd("SMOVE")
634 .arg(&from_key)
635 .arg(&to_key)
636 .arg(id)
637 .query_async(&mut conn)
638 .await
639 .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
640
641 Ok(moved)
642 }
643
644 pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
646 let mut conn = self.connection.clone();
647 let state_key = format!("{}state:{}", self.prefix, state);
648
649 let removed: u32 = cmd("SREM")
650 .arg(&state_key)
651 .arg(id)
652 .query_async(&mut conn)
653 .await
654 .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
655
656 Ok(removed > 0)
657 }
658
659 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
663 let mut conn = self.connection.clone();
664 let state_key = format!("{}state:{}", self.prefix, state);
665
666 let ids: Vec<String> = cmd("SMEMBERS")
668 .arg(&state_key)
669 .query_async(&mut conn)
670 .await
671 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
672
673 if ids.is_empty() {
674 return Ok(0);
675 }
676
677 let count = ids.len() as u64;
678
679 let mut pipeline = pipe();
681 for id in &ids {
682 let key = self.prefixed_key(id);
683 pipeline.del(&key);
684 }
685 pipeline.del(&state_key);
686
687 pipeline.query_async::<()>(&mut conn)
688 .await
689 .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
690
691 Ok(count)
692 }
693
694 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
705 let mut conn = self.connection.clone();
706
707 let pattern = format!("{}{}*", self.prefix, prefix);
709
710 let mut items = Vec::new();
711 let mut cursor: u64 = 0;
712
713 loop {
715 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
717 .arg(cursor)
718 .arg("MATCH")
719 .arg(&pattern)
720 .arg("COUNT")
721 .arg(100) .query_async(&mut conn)
723 .await
724 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
725
726 for key in keys {
728 if items.len() >= limit {
729 break;
730 }
731
732 let json_opt: Option<String> = cmd("JSON.GET")
733 .arg(&key)
734 .query_async(&mut conn)
735 .await
736 .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
737
738 if let Some(json_str) = json_opt {
739 let id = self.strip_prefix(&key);
741 if let Ok(item) = Self::parse_json_document(id, &json_str) {
742 items.push(item);
743 }
744 }
745 }
746
747 cursor = new_cursor;
748
749 if cursor == 0 || items.len() >= limit {
751 break;
752 }
753 }
754
755 Ok(items)
756 }
757
758 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
762 let mut conn = self.connection.clone();
763 let pattern = format!("{}{}*", self.prefix, prefix);
764
765 let mut count: u64 = 0;
766 let mut cursor: u64 = 0;
767
768 loop {
769 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
770 .arg(cursor)
771 .arg("MATCH")
772 .arg(&pattern)
773 .arg("COUNT")
774 .arg(1000)
775 .query_async(&mut conn)
776 .await
777 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
778
779 count += keys.len() as u64;
780 cursor = new_cursor;
781
782 if cursor == 0 {
783 break;
784 }
785 }
786
787 Ok(count)
788 }
789
790 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
794 let mut conn = self.connection.clone();
795 let pattern = format!("{}{}*", self.prefix, prefix);
796
797 let mut deleted: u64 = 0;
798 let mut cursor: u64 = 0;
799
800 loop {
801 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
802 .arg(cursor)
803 .arg("MATCH")
804 .arg(&pattern)
805 .arg("COUNT")
806 .arg(1000)
807 .query_async(&mut conn)
808 .await
809 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
810
811 if !keys.is_empty() {
812 let mut pipeline = pipe();
814 for key in &keys {
815 pipeline.del(key);
816 }
817 pipeline.query_async::<()>(&mut conn)
818 .await
819 .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
820
821 deleted += keys.len() as u64;
822 }
823
824 cursor = new_cursor;
825
826 if cursor == 0 {
827 break;
828 }
829 }
830
831 Ok(deleted)
832 }
833
834 pub async fn xadd_cdc(
843 &self,
844 entry: &crate::cdc::CdcEntry,
845 maxlen: u64
846 ) -> Result<String, StorageError> {
847 let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
848 let fields = entry.to_redis_fields();
849
850 let mut conn = self.connection.clone();
851
852 let mut command = cmd("XADD");
854 command.arg(&stream_key);
855 command.arg("MAXLEN");
856 command.arg("~");
857 command.arg(maxlen);
858 command.arg("*"); for (field, value) in fields {
861 command.arg(field);
862 command.arg(value.as_bytes());
863 }
864
865 let entry_id: String = command
866 .query_async(&mut conn)
867 .await
868 .map_err(|e| StorageError::Backend(format!("XADD CDC failed: {}", e)))?;
869
870 Ok(entry_id)
871 }
872
873 pub async fn xadd_cdc_batch(
877 &self,
878 entries: &[crate::cdc::CdcEntry],
879 maxlen: u64
880 ) -> Result<Vec<String>, StorageError> {
881 if entries.is_empty() {
882 return Ok(vec![]);
883 }
884
885 let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
886 let mut conn = self.connection.clone();
887
888 let mut pipeline = pipe();
889
890 for entry in entries {
891 let fields = entry.to_redis_fields();
892
893 let mut command = cmd("XADD");
895 command.arg(&stream_key);
896 command.arg("MAXLEN");
897 command.arg("~");
898 command.arg(maxlen);
899 command.arg("*");
900
901 for (field, value) in fields {
902 command.arg(field);
903 command.arg(value.as_bytes());
904 }
905
906 pipeline.add_command(command);
907 }
908
909 let ids: Vec<String> = pipeline
910 .query_async(&mut conn)
911 .await
912 .map_err(|e| StorageError::Backend(format!("XADD CDC batch failed: {}", e)))?;
913
914 Ok(ids)
915 }
916}