1use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use redis::aio::{ConnectionManager, ConnectionManagerConfig};
18use redis::{pipe, AsyncCommands};
19use serde::de::DeserializeOwned;
20use serde::Deserialize;
21use serde::Serialize;
22use serde_json::Value;
23
24use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB};
25use crate::{Result, StorageList, StorageMap};
26
27#[allow(unused_imports)]
28use crate::{timestamp_millis, TimestampMillis};
29
30use crate::storage::{KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR};
31
32type RedisConnection = ConnectionManager;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RedisConfig {
38 pub url: String,
40 pub prefix: String,
42}
43
44impl Default for RedisConfig {
45 fn default() -> Self {
46 RedisConfig {
47 url: String::default(),
48 prefix: "__def".into(),
49 }
50 }
51}
52
53#[derive(Clone)]
55pub struct RedisStorageDB {
56 prefix: Key,
58 async_conn: RedisConnection,
60}
61
62impl RedisStorageDB {
63 #[inline]
65 pub(crate) async fn new(cfg: RedisConfig) -> Result<Self> {
66 let prefix = [cfg.prefix.as_bytes(), SEPARATOR].concat();
67 let client = match redis::Client::open(cfg.url.as_str()) {
68 Ok(c) => c,
69 Err(e) => {
70 log::error!("open redis error, config is {:?}, {:?}", cfg, e);
71 return Err(anyhow!(e));
72 }
73 };
74
75 let mgr_cfg = ConnectionManagerConfig::default()
77 .set_exponent_base(100)
78 .set_factor(2)
79 .set_number_of_retries(2)
80 .set_connection_timeout(Duration::from_secs(15))
81 .set_response_timeout(Duration::from_secs(10));
82
83 let async_conn = match client.get_connection_manager_with_config(mgr_cfg).await {
85 Ok(conn) => conn,
86 Err(e) => {
87 log::error!("get redis connection error, config is {:?}, {:?}", cfg, e);
88 return Err(anyhow!(e));
89 }
90 };
91
92 let db = Self { prefix, async_conn }.cleanup();
94 Ok(db)
95 }
96
97 fn cleanup(self) -> Self {
99 let db = self.clone();
100 tokio::spawn(async move {
101 loop {
102 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
103 let mut async_conn = db.async_conn();
104 let db_zkey = db.make_len_sortedset_key();
105 if let Err(e) = async_conn
106 .zrembyscore::<'_, _, _, _, ()>(db_zkey.as_slice(), 0, timestamp_millis())
107 .await
108 {
109 log::error!("{:?}", e);
110 }
111 }
112 });
113 self
114 }
115
116 #[inline]
118 fn async_conn(&self) -> RedisConnection {
119 self.async_conn.clone()
120 }
121
122 #[inline]
124 fn async_conn_mut(&mut self) -> &mut RedisConnection {
125 &mut self.async_conn
126 }
127
128 #[inline]
130 #[allow(dead_code)]
131 fn make_len_sortedset_key(&self) -> Key {
132 [KEY_PREFIX_LEN, self.prefix.as_slice()].concat()
133 }
134
135 #[inline]
137 fn make_full_key<K>(&self, key: K) -> Key
138 where
139 K: AsRef<[u8]>,
140 {
141 [KEY_PREFIX, self.prefix.as_slice(), key.as_ref()].concat()
142 }
143
144 #[inline]
146 fn make_scan_pattern_match<P: AsRef<[u8]>>(&self, pattern: P) -> Key {
147 [KEY_PREFIX, self.prefix.as_slice(), pattern.as_ref()].concat()
148 }
149
150 #[inline]
152 fn make_map_full_name<K>(&self, name: K) -> Key
153 where
154 K: AsRef<[u8]>,
155 {
156 [MAP_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
157 }
158
159 #[inline]
161 fn make_list_full_name<K>(&self, name: K) -> Key
162 where
163 K: AsRef<[u8]>,
164 {
165 [LIST_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
166 }
167
168 #[inline]
170 fn make_map_prefix_match(&self) -> Key {
171 [MAP_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
172 }
173
174 #[inline]
176 fn make_list_prefix_match(&self) -> Key {
177 [LIST_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
178 }
179
180 #[inline]
182 fn map_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
183 full_name[MAP_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
184 }
185
186 #[inline]
188 fn list_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
189 full_name[LIST_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
190 }
191
192 #[inline]
194 async fn _get_full_name(&self, key: &[u8]) -> Result<Key> {
195 let map_full_name = self.make_map_full_name(key);
196 let mut async_conn = self.async_conn();
197 let full_name = if async_conn.exists(map_full_name.as_slice()).await? {
198 map_full_name
199 } else {
200 let list_full_name = self.make_list_full_name(key);
201 if async_conn.exists(list_full_name.as_slice()).await? {
202 list_full_name
203 } else {
204 self.make_full_key(key)
205 }
206 };
207 Ok(full_name)
208 }
209
210 #[inline]
212 async fn _insert<K, V>(
213 &self,
214 key: K,
215 val: &V,
216 expire_interval: Option<TimestampMillis>,
217 ) -> Result<()>
218 where
219 K: AsRef<[u8]> + Sync + Send,
220 V: serde::ser::Serialize + Sync + Send,
221 {
222 let full_key = self.make_full_key(key.as_ref());
223
224 #[cfg(not(feature = "len"))]
225 {
226 if let Some(expire_interval) = expire_interval {
227 let mut async_conn = self.async_conn();
228 pipe()
229 .atomic()
230 .set(full_key.as_slice(), bincode::serialize(val)?)
231 .pexpire(full_key.as_slice(), expire_interval)
232 .query_async::<()>(&mut async_conn)
233 .await?;
234 } else {
235 let _: () = self
236 .async_conn()
237 .set(full_key, bincode::serialize(val)?)
238 .await?;
239 }
240 }
241 #[cfg(feature = "len")]
242 {
243 let db_zkey = self.make_len_sortedset_key();
244 let mut async_conn = self.async_conn();
245 if let Some(expire_interval) = expire_interval {
246 pipe()
247 .atomic()
248 .set(full_key.as_slice(), bincode::serialize(val)?)
249 .pexpire(full_key.as_slice(), expire_interval)
250 .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
251 .query_async::<()>(&mut async_conn)
252 .await?;
253 } else {
254 pipe()
255 .atomic()
256 .set(full_key.as_slice(), bincode::serialize(val)?)
257 .zadd(db_zkey, key.as_ref(), i64::MAX)
258 .query_async::<()>(&mut async_conn)
259 .await?;
260 }
261 }
262
263 Ok(())
264 }
265
266 #[inline]
268 async fn _batch_insert(
269 &self,
270 key_val_expires: Vec<(Key, Vec<u8>, Option<TimestampMillis>)>,
271 ) -> Result<()> {
272 #[cfg(not(feature = "len"))]
273 {
274 let keys_vals: Vec<(Key, &Vec<u8>)> = key_val_expires
275 .iter()
276 .map(|(key_ref, value, _)| (self.make_full_key(key_ref), value))
277 .collect();
278
279 let mut async_conn = self.async_conn();
280 let mut p = pipe();
281 let mut rpipe = p.atomic().mset(keys_vals.as_slice());
282 for (k, _, at) in key_val_expires {
283 if let Some(at) = at {
284 rpipe = rpipe.expire(k, at);
285 }
286 }
287 rpipe.query_async::<()>(&mut async_conn).await?;
288 }
289
290 #[cfg(feature = "len")]
291 {
292 let (full_key_vals, expire_keys): (Vec<_>, Vec<_>) = key_val_expires
293 .iter()
294 .map(|(key_ref, value, timestamp)| {
295 let full_key_vals = (self.make_full_key(key_ref), value);
296 let expire_keys = (
297 timestamp
298 .map(|t| timestamp_millis() + t)
299 .unwrap_or(i64::MAX),
300 key_ref,
301 );
302 (full_key_vals, expire_keys)
303 })
304 .unzip();
305
306 let db_zkey = self.make_len_sortedset_key();
307 let mut async_conn = self.async_conn();
308 let mut p = pipe();
309 let mut rpipe = p
310 .atomic()
311 .mset(full_key_vals.as_slice())
312 .zadd_multiple(db_zkey, expire_keys.as_slice());
313 for (k, _, at) in key_val_expires {
314 if let Some(at) = at {
315 rpipe = rpipe.expire(k, at);
316 }
317 }
318 rpipe.query_async::<((), ())>(&mut async_conn).await?;
319 }
320 Ok(())
321 }
322
323 #[inline]
325 async fn _batch_remove(&self, keys: Vec<Key>) -> Result<()> {
326 let full_keys = keys
327 .iter()
328 .map(|k| self.make_full_key(k))
329 .collect::<Vec<_>>();
330 #[cfg(not(feature = "len"))]
331 {
332 let _: () = self.async_conn().del(full_keys).await?;
333 }
334 #[cfg(feature = "len")]
335 {
336 let db_zkey = self.make_len_sortedset_key();
337 let mut async_conn = self.async_conn();
338 pipe()
339 .atomic()
340 .del(full_keys.as_slice())
341 .zrem(db_zkey, keys)
342 .query_async::<()>(&mut async_conn)
343 .await?;
344 }
345 Ok(())
346 }
347
348 #[inline]
350 async fn _counter_incr<K>(
351 &self,
352 key: K,
353 increment: isize,
354 expire_interval: Option<TimestampMillis>,
355 ) -> Result<()>
356 where
357 K: AsRef<[u8]> + Sync + Send,
358 {
359 let full_key = self.make_full_key(key.as_ref());
360 #[cfg(not(feature = "len"))]
361 {
362 if let Some(expire_interval) = expire_interval {
363 let mut async_conn = self.async_conn();
364 pipe()
365 .atomic()
366 .incr(full_key.as_slice(), increment)
367 .pexpire(full_key.as_slice(), expire_interval)
368 .query_async::<()>(&mut async_conn)
369 .await?;
370 } else {
371 let _: () = self.async_conn().incr(full_key, increment).await?;
372 }
373 }
374 #[cfg(feature = "len")]
375 {
376 let db_zkey = self.make_len_sortedset_key();
377 let mut async_conn = self.async_conn();
378 if let Some(expire_interval) = expire_interval {
379 pipe()
380 .atomic()
381 .incr(full_key.as_slice(), increment)
382 .pexpire(full_key.as_slice(), expire_interval)
383 .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
384 .query_async::<()>(&mut async_conn)
385 .await?;
386 } else {
387 pipe()
388 .atomic()
389 .incr(full_key.as_slice(), increment)
390 .zadd(db_zkey, key.as_ref(), i64::MAX)
391 .query_async::<()>(&mut async_conn)
392 .await?;
393 }
394 }
395 Ok(())
396 }
397
398 #[inline]
400 async fn _counter_decr<K>(
401 &self,
402 key: K,
403 decrement: isize,
404 expire_interval: Option<TimestampMillis>,
405 ) -> Result<()>
406 where
407 K: AsRef<[u8]> + Sync + Send,
408 {
409 let full_key = self.make_full_key(key.as_ref());
410
411 #[cfg(not(feature = "len"))]
412 {
413 if let Some(expire_interval) = expire_interval {
414 let mut async_conn = self.async_conn();
415 pipe()
416 .atomic()
417 .decr(full_key.as_slice(), decrement)
418 .pexpire(full_key.as_slice(), expire_interval)
419 .query_async::<()>(&mut async_conn)
420 .await?;
421 } else {
422 let _: () = self.async_conn().decr(full_key, decrement).await?;
423 }
424 }
425 #[cfg(feature = "len")]
426 {
427 let db_zkey = self.make_len_sortedset_key();
428 let mut async_conn = self.async_conn();
429 if let Some(expire_interval) = expire_interval {
430 pipe()
431 .atomic()
432 .decr(full_key.as_slice(), decrement)
433 .pexpire(full_key.as_slice(), expire_interval)
434 .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
435 .query_async::<()>(&mut async_conn)
436 .await?;
437 } else {
438 pipe()
439 .atomic()
440 .decr(full_key.as_slice(), decrement)
441 .zadd(db_zkey, key.as_ref(), i64::MAX)
442 .query_async::<()>(&mut async_conn)
443 .await?;
444 }
445 }
446 Ok(())
447 }
448
449 #[inline]
451 async fn _counter_set<K>(
452 &self,
453 key: K,
454 val: isize,
455 expire_interval: Option<TimestampMillis>,
456 ) -> Result<()>
457 where
458 K: AsRef<[u8]> + Sync + Send,
459 {
460 let full_key = self.make_full_key(key.as_ref());
461 #[cfg(not(feature = "len"))]
462 {
463 if let Some(expire_interval) = expire_interval {
464 let mut async_conn = self.async_conn();
465 pipe()
466 .atomic()
467 .set(full_key.as_slice(), val)
468 .pexpire(full_key.as_slice(), expire_interval)
469 .query_async::<()>(&mut async_conn)
470 .await?;
471 } else {
472 let _: () = self.async_conn().set(full_key, val).await?;
473 }
474 }
475 #[cfg(feature = "len")]
476 {
477 let db_zkey = self.make_len_sortedset_key();
478 let mut async_conn = self.async_conn();
479 if let Some(expire_interval) = expire_interval {
480 pipe()
481 .atomic()
482 .set(full_key.as_slice(), val)
483 .pexpire(full_key.as_slice(), expire_interval)
484 .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
485 .query_async::<()>(&mut async_conn)
486 .await?;
487 } else {
488 pipe()
489 .atomic()
490 .set(full_key.as_slice(), val)
491 .zadd(db_zkey, key.as_ref(), i64::MAX)
492 .query_async::<()>(&mut async_conn)
493 .await?;
494 }
495 }
496
497 Ok(())
498 }
499
500 #[inline]
502 async fn _remove<K>(&self, key: K) -> Result<()>
503 where
504 K: AsRef<[u8]> + Sync + Send,
505 {
506 let full_key = self.make_full_key(key.as_ref());
507
508 #[cfg(not(feature = "len"))]
509 {
510 let _: () = self.async_conn().del(full_key).await?;
511 }
512 #[cfg(feature = "len")]
513 {
514 let db_zkey = self.make_len_sortedset_key();
515 let mut async_conn = self.async_conn();
516 pipe()
517 .atomic()
518 .del(full_key.as_slice())
519 .zrem(db_zkey, key.as_ref())
520 .query_async::<()>(&mut async_conn)
521 .await?;
522 }
523 Ok(())
524 }
525}
526
527#[async_trait]
528impl StorageDB for RedisStorageDB {
529 type MapType = RedisStorageMap;
530 type ListType = RedisStorageList;
531
532 #[inline]
534 async fn map<V: AsRef<[u8]> + Sync + Send>(
535 &self,
536 name: V,
537 expire: Option<TimestampMillis>,
538 ) -> Result<Self::MapType> {
539 let full_name = self.make_map_full_name(name.as_ref());
540 Ok(
541 RedisStorageMap::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
542 .await?,
543 )
544 }
545
546 #[inline]
548 async fn map_remove<K>(&self, name: K) -> Result<()>
549 where
550 K: AsRef<[u8]> + Sync + Send,
551 {
552 let map_full_name = self.make_map_full_name(name.as_ref());
553 let _: () = self.async_conn().del(map_full_name).await?;
554 Ok(())
555 }
556
557 #[inline]
559 async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
560 let map_full_name = self.make_map_full_name(key.as_ref());
561 Ok(self.async_conn().exists(map_full_name).await?)
562 }
563
564 #[inline]
566 async fn list<V: AsRef<[u8]> + Sync + Send>(
567 &self,
568 name: V,
569 expire: Option<TimestampMillis>,
570 ) -> Result<Self::ListType> {
571 let full_name = self.make_list_full_name(name.as_ref());
572 Ok(
573 RedisStorageList::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
574 .await?,
575 )
576 }
577
578 #[inline]
580 async fn list_remove<K>(&self, name: K) -> Result<()>
581 where
582 K: AsRef<[u8]> + Sync + Send,
583 {
584 let list_full_name = self.make_list_full_name(name.as_ref());
585 let _: () = self.async_conn().del(list_full_name).await?;
586 Ok(())
587 }
588
589 #[inline]
591 async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
592 let list_full_name = self.make_list_full_name(key.as_ref());
593 Ok(self.async_conn().exists(list_full_name).await?)
594 }
595
596 #[inline]
598 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
599 where
600 K: AsRef<[u8]> + Sync + Send,
601 V: serde::ser::Serialize + Sync + Send,
602 {
603 self._insert(key, val, None).await
604 }
605
606 #[inline]
608 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
609 where
610 K: AsRef<[u8]> + Sync + Send,
611 V: DeserializeOwned + Sync + Send,
612 {
613 let full_key = self.make_full_key(key);
614 if let Some(v) = self
615 .async_conn()
616 .get::<_, Option<Vec<u8>>>(full_key)
617 .await?
618 {
619 Ok(Some(bincode::deserialize::<V>(v.as_ref())?))
620 } else {
621 Ok(None)
622 }
623 }
624
625 #[inline]
627 async fn remove<K>(&self, key: K) -> Result<()>
628 where
629 K: AsRef<[u8]> + Sync + Send,
630 {
631 self._remove(key).await
632 }
633
634 #[inline]
636 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
637 where
638 V: Serialize + Sync + Send,
639 {
640 if !key_vals.is_empty() {
641 let keys_vals_expires = key_vals
642 .into_iter()
643 .map(|(k, v)| {
644 bincode::serialize(&v)
645 .map(move |v| (k, v, None))
646 .map_err(|e| anyhow!(e))
647 })
648 .collect::<Result<Vec<_>>>()?;
649 self._batch_insert(keys_vals_expires).await?;
650 }
651 Ok(())
652 }
653
654 #[inline]
656 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
657 if !keys.is_empty() {
658 self._batch_remove(keys).await?;
659 }
660 Ok(())
661 }
662
663 #[inline]
665 async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
666 where
667 K: AsRef<[u8]> + Sync + Send,
668 {
669 self._counter_incr(key, increment, None).await
670 }
671
672 #[inline]
674 async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
675 where
676 K: AsRef<[u8]> + Sync + Send,
677 {
678 self._counter_decr(key, decrement, None).await
679 }
680
681 #[inline]
683 async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
684 where
685 K: AsRef<[u8]> + Sync + Send,
686 {
687 let full_key = self.make_full_key(key);
688 Ok(self.async_conn().get::<_, Option<isize>>(full_key).await?)
689 }
690
691 #[inline]
693 async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
694 where
695 K: AsRef<[u8]> + Sync + Send,
696 {
697 self._counter_set(key, val, None).await
698 }
699
700 #[inline]
702 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
703 let full_key = self.make_full_key(key.as_ref());
704 Ok(self.async_conn().exists(full_key).await?)
705 }
706
707 #[inline]
709 #[cfg(feature = "len")]
710 async fn len(&self) -> Result<usize> {
711 let db_zkey = self.make_len_sortedset_key();
712 let mut async_conn = self.async_conn();
713 let (_, count) = pipe()
714 .zrembyscore(db_zkey.as_slice(), 0, timestamp_millis())
715 .zcard(db_zkey.as_slice())
716 .query_async::<(i64, usize)>(&mut async_conn)
717 .await?;
718 Ok(count)
719 }
720
721 #[inline]
723 async fn db_size(&self) -> Result<usize> {
724 let mut async_conn = self.async_conn();
725 let dbsize = redis::pipe()
727 .cmd("DBSIZE")
728 .query_async::<redis::Value>(&mut async_conn)
729 .await?;
730 let dbsize = dbsize.as_sequence().and_then(|vs| {
731 vs.iter().next().and_then(|v| {
732 if let redis::Value::Int(v) = v {
733 Some(*v)
734 } else {
735 None
736 }
737 })
738 });
739 Ok(dbsize.unwrap_or(0) as usize)
740 }
741
742 #[inline]
744 #[cfg(feature = "ttl")]
745 async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
746 where
747 K: AsRef<[u8]> + Sync + Send,
748 {
749 let full_name = self.make_full_key(key.as_ref());
750 #[cfg(not(feature = "len"))]
751 {
752 let res = self
753 .async_conn()
754 .pexpire_at::<_, bool>(full_name, at)
755 .await?;
756 Ok(res)
757 }
758 #[cfg(feature = "len")]
759 {
760 let db_zkey = self.make_len_sortedset_key();
761 let mut async_conn = self.async_conn();
762 let (_, res) = pipe()
763 .atomic()
764 .zadd(db_zkey, key.as_ref(), at)
765 .pexpire_at(full_name.as_slice(), at)
766 .query_async::<(i64, bool)>(&mut async_conn)
767 .await?;
768 Ok(res)
769 }
770 }
771
772 #[inline]
774 #[cfg(feature = "ttl")]
775 async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
776 where
777 K: AsRef<[u8]> + Sync + Send,
778 {
779 let full_name = self.make_full_key(key.as_ref());
780
781 #[cfg(not(feature = "len"))]
782 {
783 let res = self.async_conn().pexpire::<_, bool>(full_name, dur).await?;
784 Ok(res)
785 }
786 #[cfg(feature = "len")]
787 {
788 let db_zkey = self.make_len_sortedset_key();
789 let mut async_conn = self.async_conn();
790 let (_, res) = pipe()
791 .atomic()
792 .zadd(db_zkey, key.as_ref(), timestamp_millis() + dur)
793 .pexpire(full_name.as_slice(), dur)
794 .query_async::<(i64, bool)>(&mut async_conn)
795 .await?;
796 Ok(res)
797 }
798 }
799
800 #[inline]
802 #[cfg(feature = "ttl")]
803 async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
804 where
805 K: AsRef<[u8]> + Sync + Send,
806 {
807 let mut async_conn = self.async_conn();
808 let full_key = self.make_full_key(key.as_ref());
809 let res = async_conn.pttl::<_, isize>(full_key).await?;
810 match res {
811 -2 => Ok(None),
812 -1 => Ok(Some(TimestampMillis::MAX)),
813 _ => Ok(Some(res as TimestampMillis)),
814 }
815 }
816
817 #[inline]
819 async fn map_iter<'a>(
820 &'a mut self,
821 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
822 let pattern = self.make_map_prefix_match();
823 let iter = AsyncMapIter {
824 db: self.clone(),
825 iter: self.async_conn_mut().scan_match::<_, Key>(pattern).await?,
826 };
827 Ok(Box::new(iter))
828 }
829
830 #[inline]
832 async fn list_iter<'a>(
833 &'a mut self,
834 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
835 let pattern = self.make_list_prefix_match();
836 let iter = AsyncListIter {
837 db: self.clone(),
838 iter: self.async_conn_mut().scan_match::<_, Key>(pattern).await?,
839 };
840 Ok(Box::new(iter))
841 }
842
843 async fn scan<'a, P>(
845 &'a mut self,
846 pattern: P,
847 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
848 where
849 P: AsRef<[u8]> + Send + Sync,
850 {
851 let pattern = self.make_scan_pattern_match(pattern);
852 let prefix_len = KEY_PREFIX.len() + self.prefix.len();
853 let iter = AsyncDbKeyIter {
854 prefix_len,
855 iter: self
856 .async_conn_mut()
857 .scan_match::<_, Key>(pattern.as_slice())
858 .await?,
859 };
860 Ok(Box::new(iter))
861 }
862
863 #[inline]
865 async fn info(&self) -> Result<Value> {
866 let mut conn = self.async_conn();
867 let dbsize = redis::pipe()
868 .cmd("dbsize")
869 .query_async::<redis::Value>(&mut conn)
870 .await?;
871 let dbsize = dbsize.as_sequence().and_then(|vs| {
872 vs.iter().next().and_then(|v| {
873 if let redis::Value::Int(v) = v {
874 Some(*v)
875 } else {
876 None
877 }
878 })
879 });
880 Ok(serde_json::json!({
881 "storage_engine": "Redis",
882 "dbsize": dbsize,
883 }))
884 }
885}
886
887#[derive(Clone)]
889pub struct RedisStorageMap {
890 name: Key,
892 full_name: Key,
894 #[allow(dead_code)]
896 expire: Option<TimestampMillis>,
897 empty: Arc<AtomicBool>,
899 pub(crate) db: RedisStorageDB,
901}
902
903impl RedisStorageMap {
904 #[inline]
906 pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
907 Self {
908 name,
909 full_name,
910 expire: None,
911 empty: Arc::new(AtomicBool::new(true)),
912 db,
913 }
914 }
915
916 #[inline]
918 pub(crate) async fn new_expire(
919 name: Key,
920 full_name: Key,
921 expire: Option<TimestampMillis>,
922 mut db: RedisStorageDB,
923 ) -> Result<Self> {
924 let empty = if expire.is_some() {
925 let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
926 Arc::new(AtomicBool::new(empty))
927 } else {
928 Arc::new(AtomicBool::new(true))
929 };
930 Ok(Self {
931 name,
932 full_name,
933 expire,
934 empty,
935 db,
936 })
937 }
938
939 #[inline]
941 fn async_conn(&self) -> RedisConnection {
942 self.db.async_conn()
943 }
944
945 #[inline]
947 fn async_conn_mut(&mut self) -> &mut RedisConnection {
948 self.db.async_conn_mut()
949 }
950
951 #[inline]
953 async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
954 let res = async_conn
955 .hscan::<_, Vec<u8>>(full_name)
956 .await?
957 .next_item()
958 .await
959 .is_none();
960 Ok(res)
961 }
962
963 #[inline]
965 async fn _insert_expire(&self, key: &[u8], val: Vec<u8>) -> Result<()> {
966 let mut async_conn = self.async_conn();
967 let name = self.full_name.as_slice();
968
969 #[cfg(feature = "ttl")]
970 if self.empty.load(Ordering::SeqCst) {
971 if let Some(expire) = self.expire.as_ref() {
972 let _: () = redis::pipe()
973 .atomic()
974 .hset(name, key, val)
975 .pexpire(name, *expire)
976 .query_async(&mut async_conn)
977 .await?;
978 self.empty.store(false, Ordering::SeqCst);
979 return Ok(());
980 }
981 }
982
983 let _: () = async_conn.hset(name, key.as_ref(), val).await?;
984 Ok(())
985 }
986
987 #[inline]
989 async fn _batch_insert_expire(&self, key_vals: Vec<(Key, Vec<u8>)>) -> Result<()> {
990 let mut async_conn = self.async_conn();
991 let name = self.full_name.as_slice();
992
993 #[cfg(feature = "ttl")]
994 if self.empty.load(Ordering::SeqCst) {
995 if let Some(expire) = self.expire.as_ref() {
996 let _: () = redis::pipe()
997 .atomic()
998 .hset_multiple(name, key_vals.as_slice())
999 .pexpire(name, *expire)
1000 .query_async(&mut async_conn)
1001 .await?;
1002
1003 self.empty.store(false, Ordering::SeqCst);
1004 return Ok(());
1005 }
1006 }
1007
1008 let _: () = async_conn.hset_multiple(name, key_vals.as_slice()).await?;
1009 Ok(())
1010 }
1011}
1012
1013#[async_trait]
1014impl Map for RedisStorageMap {
1015 #[inline]
1017 fn name(&self) -> &[u8] {
1018 self.name.as_slice()
1019 }
1020
1021 #[inline]
1023 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
1024 where
1025 K: AsRef<[u8]> + Sync + Send,
1026 V: Serialize + Sync + Send + ?Sized,
1027 {
1028 self._insert_expire(key.as_ref(), bincode::serialize(val)?)
1029 .await
1030 }
1031
1032 #[inline]
1034 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
1035 where
1036 K: AsRef<[u8]> + Sync + Send,
1037 V: DeserializeOwned + Sync + Send,
1038 {
1039 let res: Option<Vec<u8>> = self
1040 .async_conn()
1041 .hget(self.full_name.as_slice(), key.as_ref())
1042 .await?;
1043 if let Some(res) = res {
1044 Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1045 } else {
1046 Ok(None)
1047 }
1048 }
1049
1050 #[inline]
1052 async fn remove<K>(&self, key: K) -> Result<()>
1053 where
1054 K: AsRef<[u8]> + Sync + Send,
1055 {
1056 let _: () = self
1057 .async_conn()
1058 .hdel(self.full_name.as_slice(), key.as_ref())
1059 .await?;
1060 Ok(())
1061 }
1062
1063 #[inline]
1065 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1066 let res = self
1067 .async_conn()
1068 .hexists(self.full_name.as_slice(), key.as_ref())
1069 .await?;
1070 Ok(res)
1071 }
1072
1073 #[cfg(feature = "map_len")]
1075 #[inline]
1076 async fn len(&self) -> Result<usize> {
1077 Ok(self.async_conn().hlen(self.full_name.as_slice()).await?)
1078 }
1079
1080 #[inline]
1082 async fn is_empty(&self) -> Result<bool> {
1083 let res = self
1084 .async_conn()
1085 .hscan::<_, Vec<u8>>(self.full_name.as_slice())
1086 .await?
1087 .next_item()
1088 .await
1089 .is_none();
1090 Ok(res)
1091 }
1092
1093 #[inline]
1095 async fn clear(&self) -> Result<()> {
1096 let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1097 self.empty.store(true, Ordering::SeqCst);
1098 Ok(())
1099 }
1100
1101 #[inline]
1103 async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
1104 where
1105 K: AsRef<[u8]> + Sync + Send,
1106 V: DeserializeOwned + Sync + Send,
1107 {
1108 let name = self.full_name.as_slice();
1109 let mut conn = self.async_conn();
1110 let (res, _): (Option<Vec<u8>>, isize) = redis::pipe()
1111 .atomic()
1112 .hget(name, key.as_ref())
1113 .hdel(name, key.as_ref())
1114 .query_async(&mut conn)
1115 .await?;
1116
1117 if let Some(res) = res {
1118 Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1119 } else {
1120 Ok(None)
1121 }
1122 }
1123
1124 #[inline]
1126 async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
1127 where
1128 K: AsRef<[u8]> + Sync + Send,
1129 {
1130 let name = self.full_name.as_slice();
1131 let mut conn = self.async_conn();
1132 let mut conn2 = conn.clone();
1133 let mut prefix = prefix.as_ref().to_vec();
1134 prefix.push(b'*');
1135 let mut removeds = Vec::new();
1136 while let Some(key) = conn
1137 .hscan_match::<_, _, Vec<u8>>(name, prefix.as_slice())
1138 .await?
1139 .next_item()
1140 .await
1141 {
1142 removeds.push(key?);
1143 if removeds.len() > 20 {
1144 let _: () = conn2.hdel(name, removeds.as_slice()).await?;
1145 removeds.clear();
1146 }
1147 }
1148 if !removeds.is_empty() {
1149 let _: () = conn.hdel(name, removeds).await?;
1150 }
1151 Ok(())
1152 }
1153
1154 #[inline]
1156 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
1157 where
1158 V: Serialize + Sync + Send,
1159 {
1160 if !key_vals.is_empty() {
1161 let key_vals = key_vals
1162 .into_iter()
1163 .map(|(k, v)| {
1164 bincode::serialize(&v)
1165 .map(move |v| (k, v))
1166 .map_err(|e| anyhow!(e))
1167 })
1168 .collect::<Result<Vec<_>>>()?;
1169
1170 self._batch_insert_expire(key_vals).await?;
1171 }
1172 Ok(())
1173 }
1174
1175 #[inline]
1177 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1178 if !keys.is_empty() {
1179 let _: () = self
1180 .async_conn()
1181 .hdel(self.full_name.as_slice(), keys)
1182 .await?;
1183 }
1184 Ok(())
1185 }
1186
1187 #[inline]
1189 async fn iter<'a, V>(
1190 &'a mut self,
1191 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1192 where
1193 V: DeserializeOwned + Sync + Send + 'a + 'static,
1194 {
1195 let name = self.full_name.clone();
1196 let iter = AsyncIter {
1197 iter: self
1198 .async_conn_mut()
1199 .hscan::<_, (Key, Vec<u8>)>(name)
1200 .await?,
1201 _m: std::marker::PhantomData,
1202 };
1203 Ok(Box::new(iter))
1204 }
1205
1206 #[inline]
1208 async fn key_iter<'a>(
1209 &'a mut self,
1210 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
1211 let iter = AsyncKeyIter {
1212 iter: self
1213 .db
1214 .async_conn
1215 .hscan::<_, (Key, ())>(self.full_name.as_slice())
1216 .await?,
1217 };
1218 Ok(Box::new(iter))
1219 }
1220
1221 #[inline]
1223 async fn prefix_iter<'a, P, V>(
1224 &'a mut self,
1225 prefix: P,
1226 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1227 where
1228 P: AsRef<[u8]> + Send + Sync,
1229 V: DeserializeOwned + Sync + Send + 'a + 'static,
1230 {
1231 let name = self.full_name.clone();
1232 let mut prefix = prefix.as_ref().to_vec();
1233 prefix.push(b'*');
1234 let iter = AsyncIter {
1235 iter: self
1236 .async_conn_mut()
1237 .hscan_match::<_, _, (Key, Vec<u8>)>(name, prefix.as_slice())
1238 .await?,
1239 _m: std::marker::PhantomData,
1240 };
1241 Ok(Box::new(iter))
1242 }
1243
1244 #[cfg(feature = "ttl")]
1246 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1247 let res = self
1248 .async_conn()
1249 .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1250 .await?;
1251 Ok(res)
1252 }
1253
1254 #[cfg(feature = "ttl")]
1256 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1257 let res = self
1258 .async_conn()
1259 .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1260 .await?;
1261 Ok(res)
1262 }
1263
1264 #[cfg(feature = "ttl")]
1266 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1267 let mut async_conn = self.async_conn();
1268 let res = async_conn
1269 .pttl::<_, isize>(self.full_name.as_slice())
1270 .await?;
1271 match res {
1272 -2 => Ok(None),
1273 -1 => Ok(Some(TimestampMillis::MAX)),
1274 _ => Ok(Some(res as TimestampMillis)),
1275 }
1276 }
1277}
1278
1279#[derive(Clone)]
1281pub struct RedisStorageList {
1282 name: Key,
1284 full_name: Key,
1286 #[allow(dead_code)]
1288 expire: Option<TimestampMillis>,
1289 empty: Arc<AtomicBool>,
1291 pub(crate) db: RedisStorageDB,
1293}
1294
1295impl RedisStorageList {
1296 #[inline]
1298 pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
1299 Self {
1300 name,
1301 full_name,
1302 expire: None,
1303 empty: Arc::new(AtomicBool::new(true)),
1304 db,
1305 }
1306 }
1307
1308 #[inline]
1310 pub(crate) async fn new_expire(
1311 name: Key,
1312 full_name: Key,
1313 expire: Option<TimestampMillis>,
1314 mut db: RedisStorageDB,
1315 ) -> Result<Self> {
1316 let empty = if expire.is_some() {
1317 let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
1318 Arc::new(AtomicBool::new(empty))
1319 } else {
1320 Arc::new(AtomicBool::new(true))
1321 };
1322 Ok(Self {
1323 name,
1324 full_name,
1325 expire,
1326 empty,
1327 db,
1328 })
1329 }
1330
1331 #[inline]
1333 pub(crate) fn async_conn(&self) -> RedisConnection {
1334 self.db.async_conn()
1335 }
1336
1337 #[inline]
1339 async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
1340 Ok(async_conn.llen::<_, usize>(full_name).await? == 0)
1341 }
1342
1343 #[inline]
1345 async fn _push_expire(&self, val: Vec<u8>) -> Result<()> {
1346 let mut async_conn = self.async_conn();
1347 let name = self.full_name.as_slice();
1348
1349 #[cfg(feature = "ttl")]
1350 if self.empty.load(Ordering::SeqCst) {
1351 if let Some(expire) = self.expire.as_ref() {
1352 let _: () = redis::pipe()
1353 .atomic()
1354 .rpush(name, val)
1355 .pexpire(name, *expire)
1356 .query_async(&mut async_conn)
1357 .await?;
1358 self.empty.store(false, Ordering::SeqCst);
1359 return Ok(());
1360 }
1361 }
1362
1363 let _: () = async_conn.rpush(name, val).await?;
1364 Ok(())
1365 }
1366
1367 #[inline]
1369 async fn _pushs_expire(&self, vals: Vec<Vec<u8>>) -> Result<()> {
1370 let mut async_conn = self.async_conn();
1371
1372 #[cfg(feature = "ttl")]
1373 if self.empty.load(Ordering::SeqCst) {
1374 if let Some(expire) = self.expire.as_ref() {
1375 let name = self.full_name.as_slice();
1376 let _: () = redis::pipe()
1377 .atomic()
1378 .rpush(name, vals)
1379 .pexpire(name, *expire)
1380 .query_async(&mut async_conn)
1381 .await?;
1382 self.empty.store(false, Ordering::SeqCst);
1383 return Ok(());
1384 }
1385 }
1386
1387 let _: () = async_conn.rpush(self.full_name.as_slice(), vals).await?;
1388 Ok(())
1389 }
1390
1391 #[inline]
1393 async fn _push_limit_expire(
1394 &self,
1395 val: Vec<u8>,
1396 limit: usize,
1397 pop_front_if_limited: bool,
1398 ) -> Result<Option<Vec<u8>>> {
1399 let mut conn = self.async_conn();
1400
1401 #[cfg(feature = "ttl")]
1402 if self.empty.load(Ordering::SeqCst) {
1403 if let Some(expire) = self.expire.as_ref() {
1404 let name = self.full_name.as_slice();
1405 let count = conn.llen::<_, usize>(name).await?;
1406 let res = if count < limit {
1407 let _: () = redis::pipe()
1408 .atomic()
1409 .rpush(name, val)
1410 .pexpire(name, *expire)
1411 .query_async(&mut conn)
1412 .await?;
1413 Ok(None)
1414 } else if pop_front_if_limited {
1415 let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1416 .atomic()
1417 .lpop(name, None)
1418 .rpush(name, val)
1419 .pexpire(name, *expire)
1420 .query_async(&mut conn)
1421 .await?;
1422
1423 Ok(poped)
1424 } else {
1425 Err(anyhow::Error::msg("Is full"))
1426 };
1427 self.empty.store(false, Ordering::SeqCst);
1428 return res;
1429 }
1430 }
1431
1432 self._push_limit(val, limit, pop_front_if_limited, &mut conn)
1433 .await
1434 }
1435
1436 #[inline]
1438 async fn _push_limit(
1439 &self,
1440 val: Vec<u8>,
1441 limit: usize,
1442 pop_front_if_limited: bool,
1443 async_conn: &mut RedisConnection,
1444 ) -> Result<Option<Vec<u8>>> {
1445 let name = self.full_name.as_slice();
1446
1447 let count = async_conn.llen::<_, usize>(name).await?;
1448 if count < limit {
1449 let _: () = async_conn.rpush(name, val).await?;
1450 Ok(None)
1451 } else if pop_front_if_limited {
1452 let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1453 .atomic()
1454 .lpop(name, None)
1455 .rpush(name, val)
1456 .query_async(async_conn)
1457 .await?;
1458 Ok(poped)
1459 } else {
1460 Err(anyhow::Error::msg("Is full"))
1461 }
1462 }
1463}
1464
1465#[async_trait]
1466impl List for RedisStorageList {
1467 #[inline]
1469 fn name(&self) -> &[u8] {
1470 self.name.as_slice()
1471 }
1472
1473 #[inline]
1475 async fn push<V>(&self, val: &V) -> Result<()>
1476 where
1477 V: Serialize + Sync + Send,
1478 {
1479 self._push_expire(bincode::serialize(val)?).await
1480 }
1481
1482 #[inline]
1484 async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
1485 where
1486 V: Serialize + Sync + Send,
1487 {
1488 let vals = vals
1489 .into_iter()
1490 .map(|v| bincode::serialize(&v).map_err(|e| anyhow!(e)))
1491 .collect::<Result<Vec<_>>>()?;
1492 self._pushs_expire(vals).await
1493 }
1494
1495 #[inline]
1497 async fn push_limit<V>(
1498 &self,
1499 val: &V,
1500 limit: usize,
1501 pop_front_if_limited: bool,
1502 ) -> Result<Option<V>>
1503 where
1504 V: Serialize + Sync + Send,
1505 V: DeserializeOwned,
1506 {
1507 let data = bincode::serialize(val)?;
1508
1509 if let Some(res) = self
1510 ._push_limit_expire(data, limit, pop_front_if_limited)
1511 .await?
1512 {
1513 Ok(Some(
1514 bincode::deserialize::<V>(res.as_ref()).map_err(|e| anyhow!(e))?,
1515 ))
1516 } else {
1517 Ok(None)
1518 }
1519 }
1520
1521 #[inline]
1523 async fn pop<V>(&self) -> Result<Option<V>>
1524 where
1525 V: DeserializeOwned + Sync + Send,
1526 {
1527 let removed = self
1528 .async_conn()
1529 .lpop::<_, Option<Vec<u8>>>(self.full_name.as_slice(), None)
1530 .await?;
1531
1532 let removed = if let Some(v) = removed {
1533 Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1534 } else {
1535 None
1536 };
1537
1538 Ok(removed)
1539 }
1540
1541 #[inline]
1543 async fn all<V>(&self) -> Result<Vec<V>>
1544 where
1545 V: DeserializeOwned + Sync + Send,
1546 {
1547 let all = self
1548 .async_conn()
1549 .lrange::<_, Vec<Vec<u8>>>(self.full_name.as_slice(), 0, -1)
1550 .await?;
1551 all.iter()
1552 .map(|v| bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
1553 .collect::<Result<Vec<_>>>()
1554 }
1555
1556 #[inline]
1558 async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
1559 where
1560 V: DeserializeOwned + Sync + Send,
1561 {
1562 let val = self
1563 .async_conn()
1564 .lindex::<_, Option<Vec<u8>>>(self.full_name.as_slice(), idx as isize)
1565 .await?;
1566
1567 Ok(if let Some(v) = val {
1568 Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1569 } else {
1570 None
1571 })
1572 }
1573
1574 #[inline]
1576 async fn len(&self) -> Result<usize> {
1577 Ok(self.async_conn().llen(self.full_name.as_slice()).await?)
1578 }
1579
1580 #[inline]
1582 async fn is_empty(&self) -> Result<bool> {
1583 Ok(self.len().await? == 0)
1584 }
1585
1586 #[inline]
1588 async fn clear(&self) -> Result<()> {
1589 let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1590 self.empty.store(true, Ordering::SeqCst);
1591 Ok(())
1592 }
1593
1594 #[inline]
1596 async fn iter<'a, V>(
1597 &'a mut self,
1598 ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
1599 where
1600 V: DeserializeOwned + Sync + Send + 'a + 'static,
1601 {
1602 Ok(Box::new(AsyncListValIter::new(
1603 self.full_name.as_slice(),
1604 self.db.async_conn(),
1605 )))
1606 }
1607
1608 #[cfg(feature = "ttl")]
1610 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1611 let res = self
1612 .async_conn()
1613 .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1614 .await?;
1615 Ok(res)
1616 }
1617
1618 #[cfg(feature = "ttl")]
1620 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1621 let res = self
1622 .async_conn()
1623 .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1624 .await?;
1625 Ok(res)
1626 }
1627
1628 #[cfg(feature = "ttl")]
1630 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1631 let mut async_conn = self.async_conn();
1632 let res = async_conn
1633 .pttl::<_, isize>(self.full_name.as_slice())
1634 .await?;
1635 match res {
1636 -2 => Ok(None),
1637 -1 => Ok(Some(TimestampMillis::MAX)),
1638 _ => Ok(Some(res as TimestampMillis)),
1639 }
1640 }
1641}
1642
1643pub struct AsyncListValIter<'a, V> {
1645 name: &'a [u8],
1646 conn: RedisConnection,
1647 start: isize,
1648 limit: isize,
1649 catch_vals: Vec<Vec<u8>>,
1650 _m: std::marker::PhantomData<V>,
1651}
1652
1653impl<'a, V> AsyncListValIter<'a, V> {
1654 fn new(name: &'a [u8], conn: RedisConnection) -> Self {
1656 let start = 0;
1657 let limit = 20;
1658 Self {
1659 name,
1660 conn,
1661 start,
1662 limit,
1663 catch_vals: Vec::with_capacity((limit + 1) as usize),
1664 _m: std::marker::PhantomData,
1665 }
1666 }
1667}
1668
1669#[async_trait]
1670impl<V> AsyncIterator for AsyncListValIter<'_, V>
1671where
1672 V: DeserializeOwned + Sync + Send + 'static,
1673{
1674 type Item = Result<V>;
1675
1676 async fn next(&mut self) -> Option<Self::Item> {
1677 if let Some(val) = self.catch_vals.pop() {
1678 return Some(bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)));
1679 }
1680
1681 let vals = self
1682 .conn
1683 .lrange::<_, Vec<Vec<u8>>>(self.name, self.start, self.start + self.limit)
1684 .await;
1685
1686 match vals {
1687 Err(e) => return Some(Err(anyhow!(e))),
1688 Ok(vals) => {
1689 if vals.is_empty() {
1690 return None;
1691 }
1692 self.start += vals.len() as isize;
1693 self.catch_vals = vals;
1694 self.catch_vals.reverse();
1695 }
1696 }
1697
1698 self.catch_vals
1699 .pop()
1700 .map(|val| bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)))
1701 }
1702}
1703
1704pub struct AsyncIter<'a, V> {
1706 iter: redis::AsyncIter<'a, (Key, Vec<u8>)>,
1707 _m: std::marker::PhantomData<V>,
1708}
1709
1710#[async_trait]
1711impl<'a, V> AsyncIterator for AsyncIter<'a, V>
1712where
1713 V: DeserializeOwned + Sync + Send + 'a,
1714{
1715 type Item = IterItem<V>;
1716
1717 async fn next(&mut self) -> Option<Self::Item> {
1718 match self.iter.next_item().await {
1719 None => None,
1720 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1721 Some(Ok((key, v))) => match bincode::deserialize::<V>(v.as_ref()) {
1722 Ok(v) => Some(Ok((key, v))),
1723 Err(e) => Some(Err(anyhow::Error::new(e))),
1724 },
1725 }
1726 }
1727}
1728
1729pub struct AsyncDbKeyIter<'a> {
1731 prefix_len: usize,
1732 iter: redis::AsyncIter<'a, Key>,
1733}
1734
1735#[async_trait]
1736impl AsyncIterator for AsyncDbKeyIter<'_> {
1737 type Item = Result<Key>;
1738
1739 async fn next(&mut self) -> Option<Self::Item> {
1740 match self.iter.next_item().await {
1741 None => None,
1742 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1743 Some(Ok(key)) => Some(Ok(key[self.prefix_len..].to_vec())),
1744 }
1745 }
1746}
1747
1748pub struct AsyncKeyIter<'a> {
1750 iter: redis::AsyncIter<'a, (Key, ())>,
1751}
1752
1753#[async_trait]
1754impl AsyncIterator for AsyncKeyIter<'_> {
1755 type Item = Result<Key>;
1756
1757 async fn next(&mut self) -> Option<Self::Item> {
1758 match self.iter.next_item().await {
1759 None => None,
1760 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1761 Some(Ok((key, _))) => Some(Ok(key)),
1762 }
1763 }
1764}
1765
1766pub struct AsyncMapIter<'a> {
1768 db: RedisStorageDB,
1769 iter: redis::AsyncIter<'a, Key>,
1770}
1771
1772#[async_trait]
1773impl AsyncIterator for AsyncMapIter<'_> {
1774 type Item = Result<StorageMap>;
1775
1776 async fn next(&mut self) -> Option<Self::Item> {
1777 let full_name = match self.iter.next_item().await {
1778 None => return None,
1779 Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
1780 Some(Ok(key)) => key,
1781 };
1782
1783 let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec();
1784 let m = RedisStorageMap::new(name, full_name, self.db.clone());
1785 Some(Ok(StorageMap::Redis(m)))
1786 }
1787}
1788
1789pub struct AsyncListIter<'a> {
1791 db: RedisStorageDB,
1792 iter: redis::AsyncIter<'a, Key>,
1793}
1794
1795#[async_trait]
1796impl AsyncIterator for AsyncListIter<'_> {
1797 type Item = Result<StorageList>;
1798
1799 async fn next(&mut self) -> Option<Self::Item> {
1800 let full_name = match self.iter.next_item().await {
1801 None => return None,
1802 Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
1803 Some(Ok(key)) => key,
1804 };
1805
1806 let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec();
1807 let l = RedisStorageList::new(name, full_name, self.db.clone());
1808 Some(Ok(StorageList::Redis(l)))
1809 }
1810}