1use async_trait::async_trait;
25use redis::aio::ConnectionManager;
26use redis::{Client, AsyncCommands, pipe, cmd};
27use crate::sync_item::{SyncItem, ContentType};
28use super::traits::{BatchWriteResult, CacheStore, StorageError};
29use crate::resilience::retry::{retry, RetryConfig};
30
31pub struct RedisStore {
32 connection: ConnectionManager,
33 prefix: String,
35}
36
37impl RedisStore {
38 pub async fn new(connection_string: &str) -> Result<Self, StorageError> {
40 Self::with_prefix(connection_string, None).await
41 }
42
43 pub async fn with_prefix(connection_string: &str, prefix: Option<&str>) -> Result<Self, StorageError> {
59 let client = Client::open(connection_string)
60 .map_err(|e| StorageError::Backend(e.to_string()))?;
61
62 let connection = retry("redis_connect", &RetryConfig::startup(), || async {
64 ConnectionManager::new(client.clone()).await
65 })
66 .await
67 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
68
69 Ok(Self {
70 connection,
71 prefix: prefix.unwrap_or("").to_string(),
72 })
73 }
74
75 #[inline]
77 fn prefixed_key(&self, key: &str) -> String {
78 if self.prefix.is_empty() {
79 key.to_string()
80 } else {
81 format!("{}{}", self.prefix, key)
82 }
83 }
84
85 #[inline]
88 #[allow(dead_code)]
89 fn strip_prefix<'a>(&self, key: &'a str) -> &'a str {
90 if self.prefix.is_empty() {
91 key
92 } else {
93 key.strip_prefix(&self.prefix).unwrap_or(key)
94 }
95 }
96
97 pub fn connection(&self) -> ConnectionManager {
99 self.connection.clone()
100 }
101
102 pub fn prefix(&self) -> &str {
104 &self.prefix
105 }
106
107 fn build_json_document(item: &SyncItem) -> Result<String, StorageError> {
121 let payload: serde_json::Value = serde_json::from_slice(&item.content)
123 .map_err(|e| StorageError::Backend(format!("Invalid JSON content: {}", e)))?;
124
125 let mut audit = serde_json::Map::new();
127 if let Some(ref batch_id) = item.batch_id {
128 audit.insert("batch".to_string(), serde_json::Value::String(batch_id.clone()));
129 }
130 if let Some(ref trace_parent) = item.trace_parent {
131 audit.insert("trace".to_string(), serde_json::Value::String(trace_parent.clone()));
132 }
133 if let Some(ref home) = item.home_instance_id {
134 audit.insert("home".to_string(), serde_json::Value::String(home.clone()));
135 }
136
137 let mut doc = serde_json::json!({
139 "version": item.version,
140 "timestamp": item.updated_at,
141 "state": item.state,
142 "payload": payload
143 });
144
145 if !item.content_hash.is_empty() {
147 doc["payload_hash"] = serde_json::Value::String(item.content_hash.clone());
148 }
149
150 if !audit.is_empty() {
152 doc["audit"] = serde_json::Value::Object(audit);
153 }
154
155 serde_json::to_string(&doc)
156 .map_err(|e| StorageError::Backend(e.to_string()))
157 }
158
159 fn parse_json_document(id: &str, json_str: &str) -> Result<SyncItem, StorageError> {
161 let doc: serde_json::Value = serde_json::from_str(json_str)
162 .map_err(|e| StorageError::Backend(format!("Invalid JSON document: {}", e)))?;
163
164 let version = doc.get("version").and_then(|v| v.as_u64()).unwrap_or(1);
166 let updated_at = doc.get("timestamp").and_then(|v| v.as_i64()).unwrap_or(0);
167 let content_hash = doc.get("payload_hash").and_then(|v| v.as_str()).unwrap_or("").to_string();
168 let state = doc.get("state").and_then(|v| v.as_str()).unwrap_or("default").to_string();
169
170 let audit = doc.get("audit");
172 let batch_id = audit.and_then(|a| a.get("batch")).and_then(|v| v.as_str()).map(String::from);
173 let trace_parent = audit.and_then(|a| a.get("trace")).and_then(|v| v.as_str()).map(String::from);
174 let home_instance_id = audit.and_then(|a| a.get("home")).and_then(|v| v.as_str()).map(String::from);
175
176 let payload = doc.get("payload").cloned().unwrap_or(serde_json::Value::Null);
178 let content = serde_json::to_vec(&payload)
179 .map_err(|e| StorageError::Backend(e.to_string()))?;
180
181 Ok(SyncItem::reconstruct(
182 id.to_string(),
183 version,
184 updated_at,
185 ContentType::Json,
186 content,
187 batch_id,
188 trace_parent,
189 content_hash,
190 home_instance_id,
191 state,
192 ))
193 }
194
195 fn parse_ft_search_response(value: redis::Value) -> Result<Vec<String>, redis::RedisError> {
198 match value {
199 redis::Value::Array(arr) => {
200 if arr.is_empty() {
201 return Ok(vec![]);
202 }
203 let keys: Vec<String> = arr.into_iter()
205 .skip(1) .filter_map(|v| match v {
207 redis::Value::BulkString(bytes) => String::from_utf8(bytes).ok(),
208 redis::Value::SimpleString(s) => Some(s),
209 _ => None,
210 })
211 .collect();
212 Ok(keys)
213 }
214 _ => Ok(vec![]),
215 }
216 }
217}
218
219#[async_trait]
220impl CacheStore for RedisStore {
221 async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError> {
222 let conn = self.connection.clone();
223 let prefixed_id = self.prefixed_key(id);
224 let original_id = id.to_string();
225
226 let key_type: Option<String> = retry("redis_type", &RetryConfig::query(), || {
228 let mut conn = conn.clone();
229 let key = prefixed_id.clone();
230 async move {
231 let t: String = redis::cmd("TYPE").arg(&key).query_async(&mut conn).await?;
232 Ok(if t == "none" { None } else { Some(t) })
233 }
234 })
235 .await
236 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
237
238 match key_type.as_deref() {
239 None => Ok(None), Some("ReJSON-RL") => {
241 let json_str: Option<String> = retry("redis_json_get", &RetryConfig::query(), || {
243 let mut conn = conn.clone();
244 let key = prefixed_id.clone();
245 async move {
246 let data: Option<String> = cmd("JSON.GET")
247 .arg(&key)
248 .query_async(&mut conn)
249 .await?;
250 Ok(data)
251 }
252 })
253 .await
254 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
255
256 match json_str {
257 Some(s) => Self::parse_json_document(&original_id, &s).map(Some),
258 None => Ok(None),
259 }
260 }
261 Some("string") => {
262 let data: Option<Vec<u8>> = retry("redis_get", &RetryConfig::query(), || {
264 let mut conn = conn.clone();
265 let key = prefixed_id.clone();
266 async move {
267 let data: Option<Vec<u8>> = conn.get(&key).await?;
268 Ok(data)
269 }
270 })
271 .await
272 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
273
274 data.map(|bytes| serde_json::from_slice(&bytes).map_err(|e| StorageError::Backend(e.to_string())))
275 .transpose()
276 }
277 Some(other) => {
278 Err(StorageError::Backend(format!("Unexpected Redis key type: {}", other)))
279 }
280 }
281 }
282
283 async fn put(&self, item: &SyncItem) -> Result<(), StorageError> {
284 let conn = self.connection.clone();
285 let prefixed_id = self.prefixed_key(&item.object_id);
286
287 match item.content_type {
288 ContentType::Json => {
289 let json_doc = Self::build_json_document(item)?;
291
292 retry("redis_json_set", &RetryConfig::query(), || {
293 let mut conn = conn.clone();
294 let key = prefixed_id.clone();
295 let doc = json_doc.clone();
296 async move {
297 let _: () = cmd("JSON.SET")
299 .arg(&key)
300 .arg("$")
301 .arg(&doc)
302 .query_async(&mut conn)
303 .await?;
304 Ok(())
305 }
306 })
307 .await
308 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
309 }
310 ContentType::Binary => {
311 let data = serde_json::to_vec(item)
313 .map_err(|e| StorageError::Backend(e.to_string()))?;
314
315 retry("redis_set", &RetryConfig::query(), || {
316 let mut conn = conn.clone();
317 let key = prefixed_id.clone();
318 let data = data.clone();
319 async move {
320 let _: () = conn.set(&key, &data).await?;
321 Ok(())
322 }
323 })
324 .await
325 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
326 }
327 }
328 }
329
330 async fn delete(&self, id: &str) -> Result<(), StorageError> {
331 let conn = self.connection.clone();
332 let prefixed_id = self.prefixed_key(id);
333
334 retry("redis_delete", &RetryConfig::query(), || {
335 let mut conn = conn.clone();
336 let key = prefixed_id.clone();
337 async move {
338 let _: () = conn.del(&key).await?;
339 Ok(())
340 }
341 })
342 .await
343 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
344 }
345
346 async fn exists(&self, id: &str) -> Result<bool, StorageError> {
347 let conn = self.connection.clone();
348 let prefixed_id = self.prefixed_key(id);
349
350 retry("redis_exists", &RetryConfig::query(), || {
351 let mut conn = conn.clone();
352 let key = prefixed_id.clone();
353 async move {
354 let exists: bool = conn.exists(&key).await?;
355 Ok(exists)
356 }
357 })
358 .await
359 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
360 }
361
362 async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
364 self.put_batch_with_ttl(items, None).await
365 }
366
367 async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
369 self.put_batch_impl(items, ttl_secs).await
370 }
371
372 async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
374 let conn = self.connection.clone();
375
376 retry("redis_ft_create", &RetryConfig::query(), || {
377 let mut conn = conn.clone();
378 let args = args.to_vec();
379 async move {
380 let mut cmd = cmd("FT.CREATE");
381 for arg in &args {
382 cmd.arg(arg);
383 }
384 let _: () = cmd.query_async(&mut conn).await?;
385 Ok(())
386 }
387 })
388 .await
389 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
390 }
391
392 async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
394 let conn = self.connection.clone();
395 let index = index.to_string();
396
397 retry("redis_ft_dropindex", &RetryConfig::query(), || {
398 let mut conn = conn.clone();
399 let index = index.clone();
400 async move {
401 let _: () = cmd("FT.DROPINDEX")
402 .arg(&index)
403 .query_async(&mut conn)
404 .await?;
405 Ok(())
406 }
407 })
408 .await
409 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
410 }
411
412 async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
414 let conn = self.connection.clone();
415 let index = index.to_string();
416 let query = query.to_string();
417
418 retry("redis_ft_search", &RetryConfig::query(), || {
419 let mut conn = conn.clone();
420 let index = index.clone();
421 let query = query.clone();
422 async move {
423 let result: redis::Value = cmd("FT.SEARCH")
426 .arg(&index)
427 .arg(&query)
428 .arg("LIMIT")
429 .arg(0)
430 .arg(limit)
431 .arg("NOCONTENT")
432 .query_async(&mut conn)
433 .await?;
434
435 Self::parse_ft_search_response(result)
437 }
438 })
439 .await
440 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
441 }
442}
443
444impl RedisStore {
445 async fn put_batch_impl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
449 if items.is_empty() {
450 return Ok(BatchWriteResult {
451 batch_id: String::new(),
452 written: 0,
453 verified: true,
454 });
455 }
456
457 #[derive(Clone)]
459 enum PreparedItem {
460 Json { key: String, id: String, state: String, doc: String },
461 Blob { key: String, id: String, state: String, data: Vec<u8> },
462 }
463
464 let prepared: Result<Vec<_>, _> = items.iter()
465 .map(|item| {
466 let prefixed_key = self.prefixed_key(&item.object_id);
467 let id = item.object_id.clone();
468 let state = item.state.clone();
469 match item.content_type {
470 ContentType::Json => {
471 Self::build_json_document(item)
472 .map(|doc| PreparedItem::Json { key: prefixed_key, id, state, doc })
473 }
474 ContentType::Binary => {
475 serde_json::to_vec(item)
476 .map(|bytes| PreparedItem::Blob { key: prefixed_key, id, state, data: bytes })
477 .map_err(|e| StorageError::Backend(e.to_string()))
478 }
479 }
480 })
481 .collect();
482 let prepared = prepared?;
483 let count = prepared.len();
484
485 let conn = self.connection.clone();
486 let prefix = self.prefix.clone();
487
488 retry("redis_put_batch", &RetryConfig::query(), || {
489 let mut conn = conn.clone();
490 let prepared = prepared.clone();
491 let prefix = prefix.clone();
492 async move {
493 let mut pipeline = pipe();
494
495 for item in &prepared {
496 match item {
497 PreparedItem::Json { key, id, state, doc } => {
498 pipeline.cmd("JSON.SET").arg(key).arg("$").arg(doc);
500 if let Some(ttl) = ttl_secs {
501 pipeline.expire(key, ttl as i64);
502 }
503 let state_key = format!("{}state:{}", prefix, state);
505 pipeline.cmd("SADD").arg(&state_key).arg(id);
506 }
507 PreparedItem::Blob { key, id, state, data } => {
508 if let Some(ttl) = ttl_secs {
510 pipeline.cmd("SETEX").arg(key).arg(ttl as i64).arg(data.as_slice());
511 } else {
512 pipeline.set(key, data.as_slice());
513 }
514 let state_key = format!("{}state:{}", prefix, state);
516 pipeline.cmd("SADD").arg(&state_key).arg(id);
517 }
518 }
519 }
520
521 pipeline.query_async::<()>(&mut conn).await?;
522 Ok(())
523 }
524 })
525 .await
526 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))?;
527
528 Ok(BatchWriteResult {
529 batch_id: String::new(),
530 written: count,
531 verified: true,
532 })
533 }
534
535 pub async fn exists_batch(&self, ids: &[String]) -> Result<Vec<bool>, StorageError> {
538 if ids.is_empty() {
539 return Ok(vec![]);
540 }
541
542 let conn = self.connection.clone();
543 let prefixed_ids: Vec<String> = ids.iter().map(|id| self.prefixed_key(id)).collect();
545
546 retry("redis_exists_batch", &RetryConfig::query(), || {
547 let mut conn = conn.clone();
548 let prefixed_ids = prefixed_ids.clone();
549 async move {
550 let mut pipeline = pipe();
551 for key in &prefixed_ids {
552 pipeline.exists(key);
553 }
554
555 let results: Vec<bool> = pipeline.query_async(&mut conn).await?;
556 Ok(results)
557 }
558 })
559 .await
560 .map_err(|e: redis::RedisError| StorageError::Backend(e.to_string()))
561 }
562
563 pub async fn list_state_ids(&self, state: &str) -> Result<Vec<String>, StorageError> {
571 let mut conn = self.connection.clone();
572 let state_key = format!("{}state:{}", self.prefix, state);
573
574 let ids: Vec<String> = cmd("SMEMBERS")
575 .arg(&state_key)
576 .query_async(&mut conn)
577 .await
578 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
579
580 Ok(ids)
581 }
582
583 pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError> {
585 let mut conn = self.connection.clone();
586 let state_key = format!("{}state:{}", self.prefix, state);
587
588 let count: u64 = cmd("SCARD")
589 .arg(&state_key)
590 .query_async(&mut conn)
591 .await
592 .map_err(|e| StorageError::Backend(format!("Failed to count state: {}", e)))?;
593
594 Ok(count)
595 }
596
597 pub async fn is_in_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
599 let mut conn = self.connection.clone();
600 let state_key = format!("{}state:{}", self.prefix, state);
601
602 let is_member: bool = cmd("SISMEMBER")
603 .arg(&state_key)
604 .arg(id)
605 .query_async(&mut conn)
606 .await
607 .map_err(|e| StorageError::Backend(format!("Failed to check state membership: {}", e)))?;
608
609 Ok(is_member)
610 }
611
612 pub async fn move_state(&self, id: &str, from_state: &str, to_state: &str) -> Result<bool, StorageError> {
616 let mut conn = self.connection.clone();
617 let from_key = format!("{}state:{}", self.prefix, from_state);
618 let to_key = format!("{}state:{}", self.prefix, to_state);
619
620 let moved: bool = cmd("SMOVE")
621 .arg(&from_key)
622 .arg(&to_key)
623 .arg(id)
624 .query_async(&mut conn)
625 .await
626 .map_err(|e| StorageError::Backend(format!("Failed to move state: {}", e)))?;
627
628 Ok(moved)
629 }
630
631 pub async fn remove_from_state(&self, id: &str, state: &str) -> Result<bool, StorageError> {
633 let mut conn = self.connection.clone();
634 let state_key = format!("{}state:{}", self.prefix, state);
635
636 let removed: u32 = cmd("SREM")
637 .arg(&state_key)
638 .arg(id)
639 .query_async(&mut conn)
640 .await
641 .map_err(|e| StorageError::Backend(format!("Failed to remove from state: {}", e)))?;
642
643 Ok(removed > 0)
644 }
645
646 pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError> {
650 let mut conn = self.connection.clone();
651 let state_key = format!("{}state:{}", self.prefix, state);
652
653 let ids: Vec<String> = cmd("SMEMBERS")
655 .arg(&state_key)
656 .query_async(&mut conn)
657 .await
658 .map_err(|e| StorageError::Backend(format!("Failed to get state members: {}", e)))?;
659
660 if ids.is_empty() {
661 return Ok(0);
662 }
663
664 let count = ids.len() as u64;
665
666 let mut pipeline = pipe();
668 for id in &ids {
669 let key = self.prefixed_key(id);
670 pipeline.del(&key);
671 }
672 pipeline.del(&state_key);
673
674 pipeline.query_async::<()>(&mut conn)
675 .await
676 .map_err(|e| StorageError::Backend(format!("Failed to delete state items: {}", e)))?;
677
678 Ok(count)
679 }
680
681 pub async fn scan_prefix(&self, prefix: &str, limit: usize) -> Result<Vec<SyncItem>, StorageError> {
692 let mut conn = self.connection.clone();
693
694 let pattern = format!("{}{}*", self.prefix, prefix);
696
697 let mut items = Vec::new();
698 let mut cursor: u64 = 0;
699
700 loop {
702 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
704 .arg(cursor)
705 .arg("MATCH")
706 .arg(&pattern)
707 .arg("COUNT")
708 .arg(100) .query_async(&mut conn)
710 .await
711 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
712
713 for key in keys {
715 if items.len() >= limit {
716 break;
717 }
718
719 let json_opt: Option<String> = cmd("JSON.GET")
720 .arg(&key)
721 .query_async(&mut conn)
722 .await
723 .map_err(|e| StorageError::Backend(format!("JSON.GET failed: {}", e)))?;
724
725 if let Some(json_str) = json_opt {
726 let id = self.strip_prefix(&key);
728 if let Ok(item) = Self::parse_json_document(id, &json_str) {
729 items.push(item);
730 }
731 }
732 }
733
734 cursor = new_cursor;
735
736 if cursor == 0 || items.len() >= limit {
738 break;
739 }
740 }
741
742 Ok(items)
743 }
744
745 pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
749 let mut conn = self.connection.clone();
750 let pattern = format!("{}{}*", self.prefix, prefix);
751
752 let mut count: u64 = 0;
753 let mut cursor: u64 = 0;
754
755 loop {
756 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
757 .arg(cursor)
758 .arg("MATCH")
759 .arg(&pattern)
760 .arg("COUNT")
761 .arg(1000)
762 .query_async(&mut conn)
763 .await
764 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
765
766 count += keys.len() as u64;
767 cursor = new_cursor;
768
769 if cursor == 0 {
770 break;
771 }
772 }
773
774 Ok(count)
775 }
776
777 pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError> {
781 let mut conn = self.connection.clone();
782 let pattern = format!("{}{}*", self.prefix, prefix);
783
784 let mut deleted: u64 = 0;
785 let mut cursor: u64 = 0;
786
787 loop {
788 let (new_cursor, keys): (u64, Vec<String>) = cmd("SCAN")
789 .arg(cursor)
790 .arg("MATCH")
791 .arg(&pattern)
792 .arg("COUNT")
793 .arg(1000)
794 .query_async(&mut conn)
795 .await
796 .map_err(|e| StorageError::Backend(format!("SCAN failed: {}", e)))?;
797
798 if !keys.is_empty() {
799 let mut pipeline = pipe();
801 for key in &keys {
802 pipeline.del(key);
803 }
804 pipeline.query_async::<()>(&mut conn)
805 .await
806 .map_err(|e| StorageError::Backend(format!("DEL failed: {}", e)))?;
807
808 deleted += keys.len() as u64;
809 }
810
811 cursor = new_cursor;
812
813 if cursor == 0 {
814 break;
815 }
816 }
817
818 Ok(deleted)
819 }
820
821 pub async fn xadd_cdc(
830 &self,
831 entry: &crate::cdc::CdcEntry,
832 maxlen: u64
833 ) -> Result<String, StorageError> {
834 let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
835 let fields = entry.to_redis_fields();
836
837 let mut conn = self.connection.clone();
838
839 let mut command = cmd("XADD");
841 command.arg(&stream_key);
842 command.arg("MAXLEN");
843 command.arg("~");
844 command.arg(maxlen);
845 command.arg("*"); for (field, value) in fields {
848 command.arg(field);
849 command.arg(value.as_bytes());
850 }
851
852 let entry_id: String = command
853 .query_async(&mut conn)
854 .await
855 .map_err(|e| StorageError::Backend(format!("XADD CDC failed: {}", e)))?;
856
857 Ok(entry_id)
858 }
859
860 pub async fn xadd_cdc_batch(
864 &self,
865 entries: &[crate::cdc::CdcEntry],
866 maxlen: u64
867 ) -> Result<Vec<String>, StorageError> {
868 if entries.is_empty() {
869 return Ok(vec![]);
870 }
871
872 let stream_key = crate::cdc::cdc_stream_key(if self.prefix.is_empty() { None } else { Some(&self.prefix) });
873 let mut conn = self.connection.clone();
874
875 let mut pipeline = pipe();
876
877 for entry in entries {
878 let fields = entry.to_redis_fields();
879
880 let mut command = cmd("XADD");
882 command.arg(&stream_key);
883 command.arg("MAXLEN");
884 command.arg("~");
885 command.arg(maxlen);
886 command.arg("*");
887
888 for (field, value) in fields {
889 command.arg(field);
890 command.arg(value.as_bytes());
891 }
892
893 pipeline.add_command(command);
894 }
895
896 let ids: Vec<String> = pipeline
897 .query_async(&mut conn)
898 .await
899 .map_err(|e| StorageError::Backend(format!("XADD CDC batch failed: {}", e)))?;
900
901 Ok(ids)
902 }
903}