1use std::collections::BTreeMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use redis::{
19 aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig},
20 cluster::ClusterClient,
21 cluster_async::ClusterConnection,
22 cluster_routing::get_slot,
23 pipe, AsyncCommands, Cmd,
24};
25use serde::de::DeserializeOwned;
26use serde::Deserialize;
27use serde::Serialize;
28use serde_json::Value;
29
30use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB};
31use crate::{Result, StorageList, StorageMap};
32
33#[allow(unused_imports)]
34use crate::{timestamp_millis, TimestampMillis};
35
36use crate::storage::{KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR};
37
38type RedisConnection = ClusterConnection;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct RedisConfig {
44 pub urls: Vec<String>,
46
47 pub prefix: String,
49}
50
51impl Default for RedisConfig {
52 fn default() -> Self {
53 RedisConfig {
54 urls: Vec::default(),
55 prefix: "__def".into(),
56 }
57 }
58}
59
60#[derive(Clone)]
62pub struct RedisStorageDB {
63 prefix: Key,
65 async_conn: RedisConnection,
67 nodes: Vec<ConnectionManager>,
69 nodes_update_time: TimestampMillis,
71}
72
73impl RedisStorageDB {
74 #[inline]
76 pub(crate) async fn new(cfg: RedisConfig) -> Result<Self> {
77 let prefix = [cfg.prefix.as_bytes(), SEPARATOR].concat();
78
79 let client = ClusterClient::builder(cfg.urls)
80 .retry_wait_formula(2, 100)
81 .retries(2)
82 .connection_timeout(Duration::from_secs(15))
83 .response_timeout(Duration::from_secs(10))
84 .build()?;
85
86 let async_conn = client.get_async_connection().await?;
87
88 let mut db = Self {
89 prefix,
90 async_conn,
91 nodes: Vec::new(),
92 nodes_update_time: timestamp_millis(),
93 }
94 .cleanup();
95 db.refresh_cluster_nodes().await?;
96 Ok(db)
97 }
98
99 fn cleanup(self) -> Self {
101 let db = self.clone();
102 tokio::spawn(async move {
103 loop {
104 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
105 let mut async_conn = db.async_conn();
106 let db_zkey = db.make_len_sortedset_key();
107 if let Err(e) = async_conn
108 .zrembyscore::<'_, _, _, _, ()>(db_zkey.as_slice(), 0, timestamp_millis())
109 .await
110 {
111 log::error!("{:?}", e);
112 }
113 }
114 });
115 self
116 }
117
118 #[inline]
120 fn async_conn(&self) -> RedisConnection {
121 self.async_conn.clone()
122 }
123
124 #[inline]
126 fn async_conn_mut(&mut self) -> &mut RedisConnection {
127 &mut self.async_conn
128 }
129
130 #[inline]
132 async fn refresh_cluster_nodes(&mut self) -> Result<()> {
133 let slots = self
134 .async_conn()
135 .req_packed_command(Cmd::new().arg("CLUSTER").arg("SLOTS"))
136 .await?;
137
138 let mut addrs = Vec::new();
139 for slot in slots
140 .as_sequence()
141 .map(|arrs| {
142 arrs.iter()
143 .filter_map(|obj| obj.as_sequence())
144 .collect::<Vec<_>>()
145 })
146 .iter()
147 .flatten()
148 .collect::<Vec<_>>()
149 {
150 if let Some(addr_info) = slot.get(2) {
151 if let Some(addr_items) = addr_info.as_sequence() {
152 if addr_items.len() > 1 {
153 if let (redis::Value::BulkString(addr), redis::Value::Int(port)) =
154 (&addr_items[0], &addr_items[1])
155 {
156 let addr =
157 format!("redis://{}:{}", String::from_utf8_lossy(addr), *port);
158 addrs.push(addr);
159 }
160 }
161 }
162 }
163 }
164
165 let mut nodes = Vec::new();
167 for addr in addrs {
168 let client = match redis::Client::open(addr.as_str()) {
169 Ok(c) => c,
170 Err(e) => {
171 log::error!("open redis node error, addr is {}, {:?}", addr, e);
172 return Err(anyhow!(e));
173 }
174 };
175 let mgr_cfg = ConnectionManagerConfig::default()
176 .set_exponent_base(100)
177 .set_factor(2)
178 .set_number_of_retries(2)
179 .set_connection_timeout(Duration::from_secs(15))
180 .set_response_timeout(Duration::from_secs(10));
181 let conn = match client.get_connection_manager_with_config(mgr_cfg).await {
182 Ok(conn) => conn,
183 Err(e) => {
184 log::error!("get redis connection error, addr {:?}, {:?}", addr, e);
185 return Err(anyhow!(e));
186 }
187 };
188 nodes.push(conn);
189 }
190 self.nodes = nodes;
191 log::info!("nodes.len(): {:?}", self.nodes.len());
192 Ok(())
193 }
194
195 #[inline]
197 async fn nodes_mut(&mut self) -> Result<&mut Vec<ConnectionManager>> {
198 if (timestamp_millis() - self.nodes_update_time) > 5000 {
200 self.refresh_cluster_nodes().await?;
201 }
202 Ok(&mut self.nodes)
203 }
204
205 #[inline]
207 #[allow(dead_code)]
208 fn make_len_sortedset_key(&self) -> Key {
209 [KEY_PREFIX_LEN, self.prefix.as_slice()].concat()
210 }
211
212 #[inline]
214 fn make_full_key<K>(&self, key: K) -> Key
215 where
216 K: AsRef<[u8]>,
217 {
218 [KEY_PREFIX, self.prefix.as_slice(), key.as_ref()].concat()
219 }
220
221 #[inline]
223 fn make_scan_pattern_match<P: AsRef<[u8]>>(&self, pattern: P) -> Key {
224 [KEY_PREFIX, self.prefix.as_slice(), pattern.as_ref()].concat()
225 }
226
227 #[inline]
229 fn make_map_full_name<K>(&self, name: K) -> Key
230 where
231 K: AsRef<[u8]>,
232 {
233 [MAP_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
234 }
235
236 #[inline]
238 fn make_list_full_name<K>(&self, name: K) -> Key
239 where
240 K: AsRef<[u8]>,
241 {
242 [LIST_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
243 }
244
245 #[inline]
247 fn make_map_prefix_match(&self) -> Key {
248 [MAP_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
249 }
250
251 #[inline]
253 fn make_list_prefix_match(&self) -> Key {
254 [LIST_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
255 }
256
257 #[inline]
259 fn map_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
260 full_name[MAP_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
261 }
262
263 #[inline]
265 fn list_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
266 full_name[LIST_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
267 }
268
269 #[inline]
271 async fn _get_full_name(&self, key: &[u8]) -> Result<Key> {
272 let map_full_name = self.make_map_full_name(key);
273 let mut async_conn = self.async_conn();
274 let full_name = if async_conn.exists(map_full_name.as_slice()).await? {
275 map_full_name
276 } else {
277 let list_full_name = self.make_list_full_name(key);
278 if async_conn.exists(list_full_name.as_slice()).await? {
279 list_full_name
280 } else {
281 self.make_full_key(key)
282 }
283 };
284 Ok(full_name)
285 }
286
287 #[inline]
289 async fn _insert<K, V>(
290 &self,
291 _key: K,
292 _val: &V,
293 _expire_interval: Option<TimestampMillis>,
294 ) -> Result<()>
295 where
296 K: AsRef<[u8]> + Sync + Send,
297 V: serde::ser::Serialize + Sync + Send,
298 {
299 #[cfg(not(feature = "len"))]
300 {
301 let full_key = self.make_full_key(_key.as_ref());
302 if let Some(expire_interval) = _expire_interval {
303 let mut async_conn = self.async_conn();
304 pipe()
305 .atomic()
306 .set(full_key.as_slice(), bincode::serialize(_val)?)
307 .pexpire(full_key.as_slice(), expire_interval)
308 .query_async::<()>(&mut async_conn)
309 .await?;
310 } else {
311 let _: () = self
312 .async_conn()
313 .set(full_key, bincode::serialize(_val)?)
314 .await?;
315 }
316 }
317 #[cfg(feature = "len")]
318 {
319 return Err(anyhow!("unsupported!"));
320 #[allow(unreachable_code)]
322 {
323 let full_key = self.make_full_key(_key.as_ref());
324 let db_zkey = self.make_len_sortedset_key();
325 let mut async_conn = self.async_conn();
326 if get_slot(db_zkey.as_slice()) == get_slot(full_key.as_slice()) {
327 if let Some(expire_interval) = _expire_interval {
328 pipe()
329 .atomic()
330 .set(full_key.as_slice(), bincode::serialize(_val)?)
331 .pexpire(full_key.as_slice(), expire_interval)
332 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
333 .query_async::<()>(&mut async_conn)
334 .await?;
335 } else {
336 pipe()
337 .atomic()
338 .set(full_key.as_slice(), bincode::serialize(_val)?)
339 .zadd(db_zkey, _key.as_ref(), i64::MAX)
340 .query_async::<()>(&mut async_conn)
341 .await?;
342 }
343 } else {
344 if let Some(expire_interval) = _expire_interval {
346 let _: () = pipe()
347 .atomic()
348 .set(full_key.as_slice(), bincode::serialize(_val)?)
349 .pexpire(full_key.as_slice(), expire_interval)
350 .query_async(&mut async_conn)
351 .await?;
352 let _: () = async_conn
353 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
354 .await?;
355 } else {
356 let _: () = async_conn
357 .set(full_key.as_slice(), bincode::serialize(_val)?)
358 .await?;
359 let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
360 }
361 }
362 }
363 }
364
365 Ok(())
366 }
367
368 #[inline]
370 async fn _batch_insert<V>(
371 &self,
372 _key_val_expires: Vec<(Key, Key, V, Option<TimestampMillis>)>,
373 ) -> Result<()>
374 where
375 V: serde::ser::Serialize + Sync + Send,
376 {
377 #[cfg(not(feature = "len"))]
378 {
379 let keys_vals: Vec<(&Key, Vec<u8>)> = _key_val_expires
380 .iter()
381 .map(|(_, full_key, v, _)| {
382 bincode::serialize(&v)
383 .map(move |v| (full_key, v))
384 .map_err(|e| anyhow!(e))
385 })
386 .collect::<Result<Vec<_>>>()?;
387
388 let mut async_conn = self.async_conn();
389 let mut p = pipe();
390 let mut rpipe = p.atomic().mset(keys_vals.as_slice());
391 for (_, full_key, _, at) in _key_val_expires {
392 if let Some(at) = at {
393 rpipe = rpipe.expire(full_key, at);
394 }
395 }
396 rpipe.query_async::<()>(&mut async_conn).await?;
397 Ok(())
398 }
399
400 #[cfg(feature = "len")]
401 {
402 return Err(anyhow!("unsupported!"));
403 #[allow(unreachable_code)]
405 {
406 for (k, _, v, expire) in _key_val_expires {
407 self._insert(k.as_slice(), &v, expire).await?;
408 }
409 Ok(())
410 }
411 }
412 }
413
414 #[inline]
416 async fn _batch_remove(&self, _keys: Vec<Key>) -> Result<()> {
417 #[cfg(not(feature = "len"))]
418 {
419 let full_keys = _keys
420 .iter()
421 .map(|k| self.make_full_key(k))
422 .collect::<Vec<_>>();
423 let _: () = self.async_conn().del(full_keys).await?;
424 }
425 #[cfg(feature = "len")]
426 {
427 return Err(anyhow!("unsupported!"));
428 #[allow(unreachable_code)]
429 {
431 for key in _keys {
432 self._remove(key).await?;
433 }
434 }
435 }
436 Ok(())
437 }
438
439 #[inline]
441 async fn _counter_incr<K>(
442 &self,
443 _key: K,
444 _increment: isize,
445 _expire_interval: Option<TimestampMillis>,
446 ) -> Result<()>
447 where
448 K: AsRef<[u8]> + Sync + Send,
449 {
450 #[cfg(not(feature = "len"))]
451 {
452 let full_key = self.make_full_key(_key.as_ref());
453 if let Some(expire_interval) = _expire_interval {
454 let mut async_conn = self.async_conn();
455 pipe()
456 .atomic()
457 .incr(full_key.as_slice(), _increment)
458 .pexpire(full_key.as_slice(), expire_interval)
459 .query_async::<()>(&mut async_conn)
460 .await?;
461 } else {
462 let _: () = self.async_conn().incr(full_key, _increment).await?;
463 }
464 }
465 #[cfg(feature = "len")]
466 {
467 return Err(anyhow!("unsupported!"));
468 #[allow(unreachable_code)]
470 {
471 let full_key = self.make_full_key(_key.as_ref());
472 let db_zkey = self.make_len_sortedset_key();
473 if get_slot(&db_zkey) == get_slot(full_key.as_slice()) {
474 let mut async_conn = self.async_conn();
475 if let Some(expire_interval) = _expire_interval {
476 pipe()
477 .atomic()
478 .incr(full_key.as_slice(), _increment)
479 .pexpire(full_key.as_slice(), expire_interval)
480 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
481 .query_async::<()>(&mut async_conn)
482 .await?;
483 } else {
484 pipe()
485 .atomic()
486 .incr(full_key.as_slice(), _increment)
487 .zadd(db_zkey, _key.as_ref(), i64::MAX)
488 .query_async::<()>(&mut async_conn)
489 .await?;
490 }
491 } else {
492 let mut async_conn = self.async_conn();
493 if let Some(expire_interval) = _expire_interval {
494 let _: () = pipe()
495 .atomic()
496 .incr(full_key.as_slice(), _increment)
497 .pexpire(full_key.as_slice(), expire_interval)
498 .query_async::<()>(&mut async_conn)
499 .await?;
500 let _: () = async_conn
501 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
502 .await?;
503 } else {
504 let _: () = async_conn.incr(full_key.as_slice(), _increment).await?;
505 let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
506 }
507 }
508 }
509 }
510 Ok(())
511 }
512
513 #[inline]
515 async fn _counter_decr<K>(
516 &self,
517 _key: K,
518 _decrement: isize,
519 _expire_interval: Option<TimestampMillis>,
520 ) -> Result<()>
521 where
522 K: AsRef<[u8]> + Sync + Send,
523 {
524 #[cfg(not(feature = "len"))]
525 {
526 let full_key = self.make_full_key(_key.as_ref());
527 if let Some(expire_interval) = _expire_interval {
528 let mut async_conn = self.async_conn();
529 pipe()
530 .atomic()
531 .decr(full_key.as_slice(), _decrement)
532 .pexpire(full_key.as_slice(), expire_interval)
533 .query_async::<()>(&mut async_conn)
534 .await?;
535 } else {
536 let _: () = self.async_conn().decr(full_key, _decrement).await?;
537 }
538 }
539 #[cfg(feature = "len")]
540 {
541 return Err(anyhow!("unsupported!"));
542 #[allow(unreachable_code)]
544 {
545 let full_key = self.make_full_key(_key.as_ref());
546 let db_zkey = self.make_len_sortedset_key();
547 let mut async_conn = self.async_conn();
548 if get_slot(&db_zkey) == get_slot(full_key.as_slice()) {
549 if let Some(expire_interval) = _expire_interval {
550 pipe()
551 .atomic()
552 .decr(full_key.as_slice(), _decrement)
553 .pexpire(full_key.as_slice(), expire_interval)
554 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
555 .query_async::<()>(&mut async_conn)
556 .await?;
557 } else {
558 pipe()
559 .atomic()
560 .decr(full_key.as_slice(), _decrement)
561 .zadd(db_zkey, _key.as_ref(), i64::MAX)
562 .query_async::<()>(&mut async_conn)
563 .await?;
564 }
565 } else {
566 if let Some(expire_interval) = _expire_interval {
568 let _: () = pipe()
569 .atomic()
570 .decr(full_key.as_slice(), _decrement)
571 .pexpire(full_key.as_slice(), expire_interval)
572 .query_async::<()>(&mut async_conn)
573 .await?;
574 let _: () = async_conn
575 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
576 .await?;
577 } else {
578 let _: () = async_conn.decr(full_key.as_slice(), _decrement).await?;
579 let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
580 }
581 }
582 }
583 }
584 Ok(())
585 }
586
587 #[inline]
589 async fn _counter_set<K>(
590 &self,
591 _key: K,
592 _val: isize,
593 _expire_interval: Option<TimestampMillis>,
594 ) -> Result<()>
595 where
596 K: AsRef<[u8]> + Sync + Send,
597 {
598 #[cfg(not(feature = "len"))]
599 {
600 let full_key = self.make_full_key(_key.as_ref());
601 if let Some(expire_interval) = _expire_interval {
602 let mut async_conn = self.async_conn();
603 pipe()
604 .atomic()
605 .set(full_key.as_slice(), _val)
606 .pexpire(full_key.as_slice(), expire_interval)
607 .query_async::<()>(&mut async_conn)
608 .await?;
609 } else {
610 let _: () = self.async_conn().set(full_key, _val).await?;
611 }
612 }
613 #[cfg(feature = "len")]
614 {
615 return Err(anyhow!("unsupported!"));
616 #[allow(unreachable_code)]
618 {
619 let full_key = self.make_full_key(_key.as_ref());
620 let db_zkey = self.make_len_sortedset_key();
621 let mut async_conn = self.async_conn();
622 if get_slot(&db_zkey) == get_slot(full_key.as_slice()) {
623 if let Some(expire_interval) = _expire_interval {
624 pipe()
625 .atomic()
626 .set(full_key.as_slice(), _val)
627 .pexpire(full_key.as_slice(), expire_interval)
628 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
629 .query_async::<()>(&mut async_conn)
630 .await?;
631 } else {
632 pipe()
633 .atomic()
634 .set(full_key.as_slice(), _val)
635 .zadd(db_zkey, _key.as_ref(), i64::MAX)
636 .query_async::<()>(&mut async_conn)
637 .await?;
638 }
639 } else {
640 if let Some(expire_interval) = _expire_interval {
642 pipe()
643 .atomic()
644 .set(full_key.as_slice(), _val)
645 .pexpire(full_key.as_slice(), expire_interval)
646 .query_async::<()>(&mut async_conn)
647 .await?;
648 let _: () = async_conn
649 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
650 .await?;
651 } else {
652 let _: () = async_conn.set(full_key.as_slice(), _val).await?;
653 let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
654 }
655 }
656 }
657 }
658
659 Ok(())
660 }
661
662 #[inline]
664 async fn _remove<K>(&self, _key: K) -> Result<()>
665 where
666 K: AsRef<[u8]> + Sync + Send,
667 {
668 #[cfg(not(feature = "len"))]
669 {
670 let full_key = self.make_full_key(_key.as_ref());
671 let _: () = self.async_conn().del(full_key).await?;
672 Ok(())
673 }
674 #[cfg(feature = "len")]
675 {
676 return Err(anyhow!("unsupported!"));
677 #[allow(unreachable_code)]
679 {
680 let full_key = self.make_full_key(_key.as_ref());
681 let db_zkey = self.make_len_sortedset_key();
682
683 let mut async_conn = self.async_conn();
684 if get_slot(&db_zkey) == get_slot(_key.as_ref()) {
685 pipe()
686 .atomic()
687 .del(full_key.as_slice())
688 .zrem(db_zkey, _key.as_ref())
689 .query_async::<()>(&mut async_conn)
690 .await?;
691 } else {
692 let _: () = async_conn.zrem(db_zkey, _key.as_ref()).await?;
693 let _: () = async_conn.del(full_key).await?;
694 }
695 Ok(())
696 }
697 }
698 }
699}
700
701#[async_trait]
702impl StorageDB for RedisStorageDB {
703 type MapType = RedisStorageMap;
704 type ListType = RedisStorageList;
705
706 #[inline]
708 async fn map<V: AsRef<[u8]> + Sync + Send>(
709 &self,
710 name: V,
711 expire: Option<TimestampMillis>,
712 ) -> Result<Self::MapType> {
713 let full_name = self.make_map_full_name(name.as_ref());
714 Ok(
715 RedisStorageMap::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
716 .await?,
717 )
718 }
719
720 #[inline]
722 async fn map_remove<K>(&self, name: K) -> Result<()>
723 where
724 K: AsRef<[u8]> + Sync + Send,
725 {
726 let map_full_name = self.make_map_full_name(name.as_ref());
727 let _: () = self.async_conn().del(map_full_name).await?;
728 Ok(())
729 }
730
731 #[inline]
733 async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
734 let map_full_name = self.make_map_full_name(key.as_ref());
735 Ok(self.async_conn().exists(map_full_name).await?)
736 }
737
738 #[inline]
740 async fn list<V: AsRef<[u8]> + Sync + Send>(
741 &self,
742 name: V,
743 expire: Option<TimestampMillis>,
744 ) -> Result<Self::ListType> {
745 let full_name = self.make_list_full_name(name.as_ref());
746 Ok(
747 RedisStorageList::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
748 .await?,
749 )
750 }
751
752 #[inline]
754 async fn list_remove<K>(&self, name: K) -> Result<()>
755 where
756 K: AsRef<[u8]> + Sync + Send,
757 {
758 let list_full_name = self.make_list_full_name(name.as_ref());
759 let _: () = self.async_conn().del(list_full_name).await?;
760 Ok(())
761 }
762
763 #[inline]
765 async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
766 let list_full_name = self.make_list_full_name(key.as_ref());
767 Ok(self.async_conn().exists(list_full_name).await?)
768 }
769
770 #[inline]
772 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
773 where
774 K: AsRef<[u8]> + Sync + Send,
775 V: serde::ser::Serialize + Sync + Send,
776 {
777 self._insert(key, val, None).await
778 }
779
780 #[inline]
782 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
783 where
784 K: AsRef<[u8]> + Sync + Send,
785 V: DeserializeOwned + Sync + Send,
786 {
787 let full_key = self.make_full_key(key);
788 if let Some(v) = self
789 .async_conn()
790 .get::<_, Option<Vec<u8>>>(full_key)
791 .await?
792 {
793 Ok(Some(bincode::deserialize::<V>(v.as_ref())?))
794 } else {
795 Ok(None)
796 }
797 }
798
799 #[inline]
801 async fn remove<K>(&self, key: K) -> Result<()>
802 where
803 K: AsRef<[u8]> + Sync + Send,
804 {
805 self._remove(key).await
806 }
807
808 #[inline]
810 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
811 where
812 V: Serialize + Sync + Send,
813 {
814 if !key_vals.is_empty() {
815 let mut slot_keys_vals_expires = key_vals
816 .into_iter()
817 .map(|(k, v)| {
818 let full_key = self.make_full_key(k.as_slice());
819 (get_slot(&full_key), (k, full_key, v, None))
820 })
821 .collect::<Vec<_>>();
822
823 if !slot_keys_vals_expires.is_empty() {
824 for keys_vals_expires in transform_by_slot(slot_keys_vals_expires) {
825 self._batch_insert(keys_vals_expires).await?;
826 }
827 } else if let Some((_, keys_vals_expires)) = slot_keys_vals_expires.pop() {
828 self._batch_insert(vec![keys_vals_expires]).await?;
829 }
830 }
831 Ok(())
832 }
833
834 #[inline]
836 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
837 if !keys.is_empty() {
838 self._batch_remove(keys).await?;
839 }
840 Ok(())
841 }
842
843 #[inline]
845 async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
846 where
847 K: AsRef<[u8]> + Sync + Send,
848 {
849 self._counter_incr(key, increment, None).await
850 }
851
852 #[inline]
854 async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
855 where
856 K: AsRef<[u8]> + Sync + Send,
857 {
858 self._counter_decr(key, decrement, None).await
859 }
860
861 #[inline]
863 async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
864 where
865 K: AsRef<[u8]> + Sync + Send,
866 {
867 let full_key = self.make_full_key(key);
868 Ok(self.async_conn().get::<_, Option<isize>>(full_key).await?)
869 }
870
871 #[inline]
873 async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
874 where
875 K: AsRef<[u8]> + Sync + Send,
876 {
877 self._counter_set(key, val, None).await
878 }
879
880 #[inline]
882 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
883 let full_key = self.make_full_key(key.as_ref());
885 Ok(self.async_conn().exists(full_key).await?)
886 }
887
888 #[inline]
890 #[cfg(feature = "len")]
891 async fn len(&self) -> Result<usize> {
892 return Err(anyhow!("unsupported!"));
893 #[allow(unreachable_code)]
895 {
896 let db_zkey = self.make_len_sortedset_key();
897 let mut async_conn = self.async_conn();
898 let (_, count) = pipe()
899 .zrembyscore(db_zkey.as_slice(), 0, timestamp_millis())
900 .zcard(db_zkey.as_slice())
901 .query_async::<(i64, usize)>(&mut async_conn)
902 .await?;
903 Ok(count)
904 }
905 }
906
907 #[inline]
909 async fn db_size(&self) -> Result<usize> {
910 let mut dbsize = 0;
911 for mut async_conn in self.nodes.clone() {
912 let dbsize_val = redis::pipe()
914 .cmd("DBSIZE")
915 .query_async::<redis::Value>(&mut async_conn)
916 .await?;
917 dbsize += dbsize_val
918 .as_sequence()
919 .and_then(|vs| {
920 vs.iter().next().and_then(|v| {
921 if let redis::Value::Int(v) = v {
922 Some(*v)
923 } else {
924 None
925 }
926 })
927 })
928 .unwrap_or_default();
929 }
930 Ok(dbsize as usize)
931 }
932
933 #[inline]
935 #[cfg(feature = "ttl")]
936 async fn expire_at<K>(&self, _key: K, _at: TimestampMillis) -> Result<bool>
937 where
938 K: AsRef<[u8]> + Sync + Send,
939 {
940 #[cfg(not(feature = "len"))]
941 {
942 let full_name = self.make_full_key(_key.as_ref());
943 let res = self
944 .async_conn()
945 .pexpire_at::<_, bool>(full_name, _at)
946 .await?;
947 Ok(res)
948 }
949 #[cfg(feature = "len")]
950 {
951 return Err(anyhow!("unsupported!"));
952 #[allow(unreachable_code)]
954 {
955 let full_name = self.make_full_key(_key.as_ref());
956 let db_zkey = self.make_len_sortedset_key();
957 let mut async_conn = self.async_conn();
958 if get_slot(&db_zkey) == get_slot(full_name.as_slice()) {
959 let (_, res) = pipe()
960 .atomic()
961 .zadd(db_zkey, _key.as_ref(), _at)
962 .pexpire_at(full_name.as_slice(), _at)
963 .query_async::<(i64, bool)>(&mut async_conn)
964 .await?;
965 Ok(res)
966 } else {
967 let res = async_conn.zadd(db_zkey, _key.as_ref(), _at).await?;
968 let _: () = async_conn.pexpire_at(full_name.as_slice(), _at).await?;
969 Ok(res)
970 }
971 }
972 }
973 }
974
975 #[inline]
977 #[cfg(feature = "ttl")]
978 async fn expire<K>(&self, _key: K, _dur: TimestampMillis) -> Result<bool>
979 where
980 K: AsRef<[u8]> + Sync + Send,
981 {
982 #[cfg(not(feature = "len"))]
983 {
984 let full_name = self.make_full_key(_key.as_ref());
985 let res = self
986 .async_conn()
987 .pexpire::<_, bool>(full_name, _dur)
988 .await?;
989 Ok(res)
990 }
991 #[cfg(feature = "len")]
992 {
993 return Err(anyhow!("unsupported!"));
994 #[allow(unreachable_code)]
996 {
997 let full_name = self.make_full_key(_key.as_ref());
998 let db_zkey = self.make_len_sortedset_key();
999 let mut async_conn = self.async_conn();
1000 if get_slot(db_zkey.as_slice()) == get_slot(full_name.as_slice()) {
1001 let (_, res) = pipe()
1002 .atomic()
1003 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + _dur)
1004 .pexpire(full_name.as_slice(), _dur)
1005 .query_async::<(i64, bool)>(&mut async_conn)
1006 .await?;
1007 Ok(res)
1008 } else {
1009 let _: () = async_conn
1010 .zadd(db_zkey, _key.as_ref(), timestamp_millis() + _dur)
1011 .await?;
1012 let res = async_conn
1013 .pexpire::<_, bool>(full_name.as_slice(), _dur)
1014 .await?;
1015 Ok(res)
1016 }
1017 }
1018 }
1019 }
1020
1021 #[inline]
1023 #[cfg(feature = "ttl")]
1024 async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
1025 where
1026 K: AsRef<[u8]> + Sync + Send,
1027 {
1028 let mut async_conn = self.async_conn();
1029 let full_key = self.make_full_key(key.as_ref());
1030 let res = async_conn.pttl::<_, isize>(full_key).await?;
1031 match res {
1032 -2 => Ok(None),
1033 -1 => Ok(Some(TimestampMillis::MAX)),
1034 _ => Ok(Some(res as TimestampMillis)),
1035 }
1036 }
1037
1038 #[inline]
1040 async fn map_iter<'a>(
1041 &'a mut self,
1042 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
1043 let db = self.clone();
1044 let pattern = self.make_map_prefix_match();
1045
1046 let mut iters = Vec::new();
1047 for conn in self.nodes_mut().await?.iter_mut() {
1048 let iter = conn.scan_match::<_, Key>(pattern.as_slice()).await?;
1049 iters.push(iter);
1050 }
1051
1052 let iter = AsyncMapIter { db, iters };
1053 Ok(Box::new(iter))
1054 }
1055
1056 #[inline]
1058 async fn list_iter<'a>(
1059 &'a mut self,
1060 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
1061 let db = self.clone();
1062 let pattern = self.make_list_prefix_match();
1063
1064 let mut iters = Vec::new();
1065 for conn in self.nodes_mut().await?.iter_mut() {
1066 let iter = conn.scan_match::<_, Key>(pattern.as_slice()).await?;
1067 iters.push(iter);
1068 }
1069
1070 let iter = AsyncListIter { db, iters };
1071 Ok(Box::new(iter))
1072 }
1073
1074 async fn scan<'a, P>(
1076 &'a mut self,
1077 pattern: P,
1078 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
1079 where
1080 P: AsRef<[u8]> + Send + Sync,
1081 {
1082 let pattern = self.make_scan_pattern_match(pattern);
1083 let prefix_len = KEY_PREFIX.len() + self.prefix.len();
1084
1085 let mut iters = Vec::new();
1086 for conn in self.nodes_mut().await?.iter_mut() {
1087 let iter = conn.scan_match::<_, Key>(pattern.as_slice()).await?;
1088 iters.push(iter);
1089 }
1090 Ok(Box::new(AsyncDbKeyIter { prefix_len, iters }))
1091 }
1092
1093 #[inline]
1095 async fn info(&self) -> Result<Value> {
1096 Ok(serde_json::json!({
1097 "storage_engine": "RedisCluster",
1098 "dbsize": self.db_size().await?,
1099 }))
1100 }
1101}
1102
1103#[derive(Clone)]
1105pub struct RedisStorageMap {
1106 name: Key,
1108 full_name: Key,
1110 #[allow(dead_code)]
1112 expire: Option<TimestampMillis>,
1113 empty: Arc<AtomicBool>,
1115 pub(crate) db: RedisStorageDB,
1117}
1118
1119impl RedisStorageMap {
1120 #[inline]
1122 pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
1123 Self {
1124 name,
1125 full_name,
1126 expire: None,
1127 empty: Arc::new(AtomicBool::new(true)),
1128 db,
1129 }
1130 }
1131
1132 #[inline]
1134 pub(crate) async fn new_expire(
1135 name: Key,
1136 full_name: Key,
1137 expire: Option<TimestampMillis>,
1138 mut db: RedisStorageDB,
1139 ) -> Result<Self> {
1140 let empty = if expire.is_some() {
1141 let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
1142 Arc::new(AtomicBool::new(empty))
1143 } else {
1144 Arc::new(AtomicBool::new(true))
1145 };
1146 Ok(Self {
1147 name,
1148 full_name,
1149 expire,
1150 empty,
1151 db,
1152 })
1153 }
1154
1155 #[inline]
1157 fn async_conn(&self) -> RedisConnection {
1158 self.db.async_conn()
1159 }
1160
1161 #[inline]
1163 fn async_conn_mut(&mut self) -> &mut RedisConnection {
1164 self.db.async_conn_mut()
1165 }
1166
1167 #[inline]
1169 async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
1170 let res = async_conn
1172 .hscan::<_, Vec<u8>>(full_name)
1173 .await?
1174 .next_item()
1175 .await
1176 .is_none();
1177 Ok(res)
1178 }
1179
1180 #[inline]
1182 async fn _insert_expire(&self, key: &[u8], val: Vec<u8>) -> Result<()> {
1183 let mut async_conn = self.async_conn();
1184 let name = self.full_name.as_slice();
1185
1186 #[cfg(feature = "ttl")]
1187 if self.empty.load(Ordering::SeqCst) {
1188 if let Some(expire) = self.expire.as_ref() {
1189 let _: () = redis::pipe()
1192 .atomic()
1193 .hset(name, key, val)
1194 .pexpire(name, *expire)
1195 .query_async(&mut async_conn)
1196 .await?;
1197 self.empty.store(false, Ordering::SeqCst);
1198 return Ok(());
1199 }
1200 }
1201
1202 let _: () = async_conn.hset(name, key.as_ref(), val).await?;
1204 Ok(())
1205 }
1206
1207 #[inline]
1209 async fn _batch_insert_expire(&self, key_vals: Vec<(Key, Vec<u8>)>) -> Result<()> {
1210 let mut async_conn = self.async_conn();
1211 let name = self.full_name.as_slice();
1212
1213 #[cfg(feature = "ttl")]
1214 if self.empty.load(Ordering::SeqCst) {
1215 if let Some(expire) = self.expire.as_ref() {
1216 let _: () = redis::pipe()
1219 .atomic()
1220 .hset_multiple(name, key_vals.as_slice())
1221 .pexpire(name, *expire)
1222 .query_async(&mut async_conn)
1223 .await?;
1224
1225 self.empty.store(false, Ordering::SeqCst);
1226 return Ok(());
1227 }
1228 }
1229
1230 let _: () = async_conn.hset_multiple(name, key_vals.as_slice()).await?;
1232 Ok(())
1233 }
1234}
1235
1236#[async_trait]
1237impl Map for RedisStorageMap {
1238 #[inline]
1240 fn name(&self) -> &[u8] {
1241 self.name.as_slice()
1242 }
1243
1244 #[inline]
1246 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
1247 where
1248 K: AsRef<[u8]> + Sync + Send,
1249 V: Serialize + Sync + Send + ?Sized,
1250 {
1251 self._insert_expire(key.as_ref(), bincode::serialize(val)?)
1252 .await
1253 }
1254
1255 #[inline]
1257 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
1258 where
1259 K: AsRef<[u8]> + Sync + Send,
1260 V: DeserializeOwned + Sync + Send,
1261 {
1262 let res: Option<Vec<u8>> = self
1264 .async_conn()
1265 .hget(self.full_name.as_slice(), key.as_ref())
1266 .await?;
1267 if let Some(res) = res {
1268 Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1269 } else {
1270 Ok(None)
1271 }
1272 }
1273
1274 #[inline]
1276 async fn remove<K>(&self, key: K) -> Result<()>
1277 where
1278 K: AsRef<[u8]> + Sync + Send,
1279 {
1280 let _: () = self
1282 .async_conn()
1283 .hdel(self.full_name.as_slice(), key.as_ref())
1284 .await?;
1285 Ok(())
1286 }
1287
1288 #[inline]
1290 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1291 let res = self
1293 .async_conn()
1294 .hexists(self.full_name.as_slice(), key.as_ref())
1295 .await?;
1296 Ok(res)
1297 }
1298
1299 #[cfg(feature = "map_len")]
1301 #[inline]
1302 async fn len(&self) -> Result<usize> {
1303 Ok(self.async_conn().hlen(self.full_name.as_slice()).await?)
1305 }
1306
1307 #[inline]
1309 async fn is_empty(&self) -> Result<bool> {
1310 let res = self
1312 .async_conn()
1313 .hscan::<_, Vec<u8>>(self.full_name.as_slice())
1314 .await?
1315 .next_item()
1316 .await
1317 .is_none();
1318 Ok(res)
1319 }
1320
1321 #[inline]
1323 async fn clear(&self) -> Result<()> {
1324 let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1326 self.empty.store(true, Ordering::SeqCst);
1327 Ok(())
1328 }
1329
1330 #[inline]
1332 async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
1333 where
1334 K: AsRef<[u8]> + Sync + Send,
1335 V: DeserializeOwned + Sync + Send,
1336 {
1337 let name = self.full_name.as_slice();
1340 let mut conn = self.async_conn();
1341 let (res, _): (Option<Vec<u8>>, isize) = redis::pipe()
1342 .atomic()
1343 .hget(name, key.as_ref())
1344 .hdel(name, key.as_ref())
1345 .query_async(&mut conn)
1346 .await?;
1347
1348 if let Some(res) = res {
1349 Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1350 } else {
1351 Ok(None)
1352 }
1353 }
1354
1355 #[inline]
1357 async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
1358 where
1359 K: AsRef<[u8]> + Sync + Send,
1360 {
1361 let name = self.full_name.as_slice();
1362 let mut conn = self.async_conn();
1363 let mut conn2 = conn.clone();
1364 let mut prefix = prefix.as_ref().to_vec();
1365 prefix.push(b'*');
1366 let mut removeds = Vec::new();
1367 while let Some(key) = conn
1368 .hscan_match::<_, _, Vec<u8>>(name, prefix.as_slice())
1369 .await?
1370 .next_item()
1371 .await
1372 {
1373 removeds.push(key?);
1374 if removeds.len() > 20 {
1375 let _: () = conn2.hdel(name, removeds.as_slice()).await?;
1376 removeds.clear();
1377 }
1378 }
1379 if !removeds.is_empty() {
1380 let _: () = conn.hdel(name, removeds).await?;
1381 }
1382 Ok(())
1383 }
1384
1385 #[inline]
1387 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
1388 where
1389 V: Serialize + Sync + Send,
1390 {
1391 if !key_vals.is_empty() {
1392 let key_vals = key_vals
1393 .into_iter()
1394 .map(|(k, v)| {
1395 bincode::serialize(&v)
1396 .map(move |v| (k, v))
1397 .map_err(|e| anyhow!(e))
1398 })
1399 .collect::<Result<Vec<_>>>()?;
1400
1401 self._batch_insert_expire(key_vals).await?;
1402 }
1403 Ok(())
1404 }
1405
1406 #[inline]
1408 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1409 if !keys.is_empty() {
1410 let _: () = self
1411 .async_conn()
1412 .hdel(self.full_name.as_slice(), keys)
1413 .await?;
1414 }
1415 Ok(())
1416 }
1417
1418 #[inline]
1420 async fn iter<'a, V>(
1421 &'a mut self,
1422 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1423 where
1424 V: DeserializeOwned + Sync + Send + 'a + 'static,
1425 {
1426 let name = self.full_name.clone();
1427 let iter = AsyncIter {
1428 iter: self
1429 .async_conn_mut()
1430 .hscan::<_, (Key, Vec<u8>)>(name)
1431 .await?,
1432 _m: std::marker::PhantomData,
1433 };
1434 Ok(Box::new(iter))
1435 }
1436
1437 #[inline]
1439 async fn key_iter<'a>(
1440 &'a mut self,
1441 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
1442 let iter = AsyncKeyIter {
1443 iter: self
1444 .db
1445 .async_conn
1446 .hscan::<_, (Key, ())>(self.full_name.as_slice())
1447 .await?,
1448 };
1449 Ok(Box::new(iter))
1450 }
1451
1452 #[inline]
1454 async fn prefix_iter<'a, P, V>(
1455 &'a mut self,
1456 prefix: P,
1457 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1458 where
1459 P: AsRef<[u8]> + Send + Sync,
1460 V: DeserializeOwned + Sync + Send + 'a + 'static,
1461 {
1462 let name = self.full_name.clone();
1463 let mut prefix = prefix.as_ref().to_vec();
1464 prefix.push(b'*');
1465 let iter = AsyncIter {
1466 iter: self
1467 .async_conn_mut()
1468 .hscan_match::<_, _, (Key, Vec<u8>)>(name, prefix.as_slice())
1469 .await?,
1470 _m: std::marker::PhantomData,
1471 };
1472 Ok(Box::new(iter))
1473 }
1474
1475 #[cfg(feature = "ttl")]
1477 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1478 let res = self
1479 .async_conn()
1480 .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1481 .await?;
1482 Ok(res)
1483 }
1484
1485 #[cfg(feature = "ttl")]
1487 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1488 let res = self
1489 .async_conn()
1490 .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1491 .await?;
1492 Ok(res)
1493 }
1494
1495 #[cfg(feature = "ttl")]
1497 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1498 let mut async_conn = self.async_conn();
1499 let res = async_conn
1500 .pttl::<_, isize>(self.full_name.as_slice())
1501 .await?;
1502 match res {
1503 -2 => Ok(None),
1504 -1 => Ok(Some(TimestampMillis::MAX)),
1505 _ => Ok(Some(res as TimestampMillis)),
1506 }
1507 }
1508}
1509
1510#[derive(Clone)]
1512pub struct RedisStorageList {
1513 name: Key,
1515 full_name: Key,
1517 #[allow(dead_code)]
1519 expire: Option<TimestampMillis>,
1520 empty: Arc<AtomicBool>,
1522 pub(crate) db: RedisStorageDB,
1524}
1525
1526impl RedisStorageList {
1527 #[inline]
1529 pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
1530 Self {
1531 name,
1532 full_name,
1533 expire: None,
1534 empty: Arc::new(AtomicBool::new(true)),
1535 db,
1536 }
1537 }
1538
1539 #[inline]
1541 pub(crate) async fn new_expire(
1542 name: Key,
1543 full_name: Key,
1544 expire: Option<TimestampMillis>,
1545 mut db: RedisStorageDB,
1546 ) -> Result<Self> {
1547 let empty = if expire.is_some() {
1548 let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
1549 Arc::new(AtomicBool::new(empty))
1550 } else {
1551 Arc::new(AtomicBool::new(true))
1552 };
1553 Ok(Self {
1554 name,
1555 full_name,
1556 expire,
1557 empty,
1558 db,
1559 })
1560 }
1561
1562 #[inline]
1564 pub(crate) fn async_conn(&self) -> RedisConnection {
1565 self.db.async_conn()
1566 }
1567
1568 #[inline]
1570 async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
1571 Ok(async_conn.llen::<_, usize>(full_name).await? == 0)
1572 }
1573
1574 #[inline]
1576 async fn _push_expire(&self, val: Vec<u8>) -> Result<()> {
1577 let mut async_conn = self.async_conn();
1578 let name = self.full_name.as_slice();
1579
1580 #[cfg(feature = "ttl")]
1581 if self.empty.load(Ordering::SeqCst) {
1582 if let Some(expire) = self.expire.as_ref() {
1583 let _: () = redis::pipe()
1586 .atomic()
1587 .rpush(name, val)
1588 .pexpire(name, *expire)
1589 .query_async(&mut async_conn)
1590 .await?;
1591 self.empty.store(false, Ordering::SeqCst);
1592 return Ok(());
1593 }
1594 }
1595
1596 let _: () = async_conn.rpush(name, val).await?;
1598 Ok(())
1599 }
1600
1601 #[inline]
1603 async fn _pushs_expire(&self, vals: Vec<Vec<u8>>) -> Result<()> {
1604 let mut async_conn = self.async_conn();
1605
1606 #[cfg(feature = "ttl")]
1607 if self.empty.load(Ordering::SeqCst) {
1608 if let Some(expire) = self.expire.as_ref() {
1609 let name = self.full_name.as_slice();
1610 let _: () = redis::pipe()
1613 .atomic()
1614 .rpush(name, vals)
1615 .pexpire(name, *expire)
1616 .query_async(&mut async_conn)
1617 .await?;
1618 self.empty.store(false, Ordering::SeqCst);
1619 return Ok(());
1620 }
1621 }
1622
1623 let _: () = async_conn.rpush(self.full_name.as_slice(), vals).await?;
1625 Ok(())
1626 }
1627
1628 #[inline]
1630 async fn _push_limit_expire(
1631 &self,
1632 val: Vec<u8>,
1633 limit: usize,
1634 pop_front_if_limited: bool,
1635 ) -> Result<Option<Vec<u8>>> {
1636 let mut conn = self.async_conn();
1637
1638 #[cfg(feature = "ttl")]
1639 if self.empty.load(Ordering::SeqCst) {
1640 if let Some(expire) = self.expire.as_ref() {
1641 let name = self.full_name.as_slice();
1642 let count = conn.llen::<_, usize>(name).await?;
1643 let res = if count < limit {
1644 let _: () = redis::pipe()
1645 .atomic()
1646 .rpush(name, val)
1647 .pexpire(name, *expire)
1648 .query_async(&mut conn)
1649 .await?;
1650 Ok(None)
1651 } else if pop_front_if_limited {
1652 let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1653 .atomic()
1654 .lpop(name, None)
1655 .rpush(name, val)
1656 .pexpire(name, *expire)
1657 .query_async(&mut conn)
1658 .await?;
1659
1660 Ok(poped)
1661 } else {
1662 Err(anyhow::Error::msg("Is full"))
1663 };
1664 self.empty.store(false, Ordering::SeqCst);
1665 return res;
1666 }
1667 }
1668
1669 self._push_limit(val, limit, pop_front_if_limited, &mut conn)
1670 .await
1671 }
1672
1673 #[inline]
1675 async fn _push_limit(
1676 &self,
1677 val: Vec<u8>,
1678 limit: usize,
1679 pop_front_if_limited: bool,
1680 async_conn: &mut RedisConnection,
1681 ) -> Result<Option<Vec<u8>>> {
1682 let name = self.full_name.as_slice();
1683
1684 let count = async_conn.llen::<_, usize>(name).await?;
1685 if count < limit {
1686 let _: () = async_conn.rpush(name, val).await?;
1687 Ok(None)
1688 } else if pop_front_if_limited {
1689 let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1690 .atomic()
1691 .lpop(name, None)
1692 .rpush(name, val)
1693 .query_async(async_conn)
1694 .await?;
1695 Ok(poped)
1696 } else {
1697 Err(anyhow::Error::msg("Is full"))
1698 }
1699 }
1700}
1701
1702#[async_trait]
1703impl List for RedisStorageList {
1704 #[inline]
1706 fn name(&self) -> &[u8] {
1707 self.name.as_slice()
1708 }
1709
1710 #[inline]
1712 async fn push<V>(&self, val: &V) -> Result<()>
1713 where
1714 V: Serialize + Sync + Send,
1715 {
1716 self._push_expire(bincode::serialize(val)?).await
1717 }
1718
1719 #[inline]
1721 async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
1722 where
1723 V: Serialize + Sync + Send,
1724 {
1725 let vals = vals
1727 .into_iter()
1728 .map(|v| bincode::serialize(&v).map_err(|e| anyhow!(e)))
1729 .collect::<Result<Vec<_>>>()?;
1730 self._pushs_expire(vals).await
1731 }
1732
1733 #[inline]
1735 async fn push_limit<V>(
1736 &self,
1737 val: &V,
1738 limit: usize,
1739 pop_front_if_limited: bool,
1740 ) -> Result<Option<V>>
1741 where
1742 V: Serialize + Sync + Send,
1743 V: DeserializeOwned,
1744 {
1745 let data = bincode::serialize(val)?;
1746
1747 if let Some(res) = self
1748 ._push_limit_expire(data, limit, pop_front_if_limited)
1749 .await?
1750 {
1751 Ok(Some(
1752 bincode::deserialize::<V>(res.as_ref()).map_err(|e| anyhow!(e))?,
1753 ))
1754 } else {
1755 Ok(None)
1756 }
1757 }
1758
1759 #[inline]
1761 async fn pop<V>(&self) -> Result<Option<V>>
1762 where
1763 V: DeserializeOwned + Sync + Send,
1764 {
1765 let removed = self
1767 .async_conn()
1768 .lpop::<_, Option<Vec<u8>>>(self.full_name.as_slice(), None)
1769 .await?;
1770
1771 let removed = if let Some(v) = removed {
1772 Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1773 } else {
1774 None
1775 };
1776
1777 Ok(removed)
1778 }
1779
1780 #[inline]
1782 async fn all<V>(&self) -> Result<Vec<V>>
1783 where
1784 V: DeserializeOwned + Sync + Send,
1785 {
1786 let all = self
1788 .async_conn()
1789 .lrange::<_, Vec<Vec<u8>>>(self.full_name.as_slice(), 0, -1)
1790 .await?;
1791 all.iter()
1792 .map(|v| bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
1793 .collect::<Result<Vec<_>>>()
1794 }
1795
1796 #[inline]
1798 async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
1799 where
1800 V: DeserializeOwned + Sync + Send,
1801 {
1802 let val = self
1804 .async_conn()
1805 .lindex::<_, Option<Vec<u8>>>(self.full_name.as_slice(), idx as isize)
1806 .await?;
1807
1808 Ok(if let Some(v) = val {
1809 Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1810 } else {
1811 None
1812 })
1813 }
1814
1815 #[inline]
1817 async fn len(&self) -> Result<usize> {
1818 Ok(self.async_conn().llen(self.full_name.as_slice()).await?)
1820 }
1821
1822 #[inline]
1824 async fn is_empty(&self) -> Result<bool> {
1825 Ok(self.len().await? == 0)
1826 }
1827
1828 #[inline]
1830 async fn clear(&self) -> Result<()> {
1831 let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1832 self.empty.store(true, Ordering::SeqCst);
1833 Ok(())
1834 }
1835
1836 #[inline]
1838 async fn iter<'a, V>(
1839 &'a mut self,
1840 ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
1841 where
1842 V: DeserializeOwned + Sync + Send + 'a + 'static,
1843 {
1844 Ok(Box::new(AsyncListValIter::new(
1845 self.full_name.as_slice(),
1846 self.db.async_conn(),
1847 )))
1848 }
1849
1850 #[cfg(feature = "ttl")]
1852 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1853 let res = self
1854 .async_conn()
1855 .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1856 .await?;
1857 Ok(res)
1858 }
1859
1860 #[cfg(feature = "ttl")]
1862 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1863 let res = self
1864 .async_conn()
1865 .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1866 .await?;
1867 Ok(res)
1868 }
1869
1870 #[cfg(feature = "ttl")]
1872 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1873 let mut async_conn = self.async_conn();
1874 let res = async_conn
1875 .pttl::<_, isize>(self.full_name.as_slice())
1876 .await?;
1877 match res {
1878 -2 => Ok(None),
1879 -1 => Ok(Some(TimestampMillis::MAX)),
1880 _ => Ok(Some(res as TimestampMillis)),
1881 }
1882 }
1883}
1884
1885pub struct AsyncListValIter<'a, V> {
1887 name: &'a [u8],
1888 conn: RedisConnection,
1889 start: isize,
1890 limit: isize,
1891 catch_vals: Vec<Vec<u8>>,
1892 _m: std::marker::PhantomData<V>,
1893}
1894
1895impl<'a, V> AsyncListValIter<'a, V> {
1896 fn new(name: &'a [u8], conn: RedisConnection) -> Self {
1897 let start = 0;
1898 let limit = 20;
1899 Self {
1900 name,
1901 conn,
1902 start,
1903 limit,
1904 catch_vals: Vec::with_capacity((limit + 1) as usize),
1905 _m: std::marker::PhantomData,
1906 }
1907 }
1908}
1909
1910#[async_trait]
1911impl<V> AsyncIterator for AsyncListValIter<'_, V>
1912where
1913 V: DeserializeOwned + Sync + Send,
1914{
1915 type Item = Result<V>;
1916
1917 async fn next(&mut self) -> Option<Self::Item> {
1918 if let Some(val) = self.catch_vals.pop() {
1919 return Some(bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)));
1920 }
1921
1922 let vals = self
1923 .conn
1924 .lrange::<_, Vec<Vec<u8>>>(self.name, self.start, self.start + self.limit)
1925 .await;
1926
1927 match vals {
1928 Err(e) => return Some(Err(anyhow!(e))),
1929 Ok(vals) => {
1930 if vals.is_empty() {
1931 return None;
1932 }
1933 self.start += vals.len() as isize;
1934 self.catch_vals = vals;
1935 self.catch_vals.reverse();
1936 }
1937 }
1938
1939 self.catch_vals
1940 .pop()
1941 .map(|val| bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)))
1942 }
1943}
1944
1945pub struct AsyncIter<'a, V> {
1947 iter: redis::AsyncIter<'a, (Key, Vec<u8>)>,
1948 _m: std::marker::PhantomData<V>,
1949}
1950
1951#[async_trait]
1952impl<V> AsyncIterator for AsyncIter<'_, V>
1953where
1954 V: DeserializeOwned + Sync + Send,
1955{
1956 type Item = IterItem<V>;
1957
1958 async fn next(&mut self) -> Option<Self::Item> {
1959 let item = match self.iter.next_item().await {
1960 None => None,
1961 Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
1962 Some(Ok(item)) => Some(item),
1963 };
1964 item.map(|(key, v)| match bincode::deserialize::<V>(v.as_ref()) {
1965 Ok(v) => Ok((key, v)),
1966 Err(e) => Err(anyhow::Error::new(e)),
1967 })
1968 }
1969}
1970
1971pub struct AsyncDbKeyIter<'a> {
1973 prefix_len: usize,
1974 iters: Vec<redis::AsyncIter<'a, Key>>,
1975}
1976
1977#[async_trait]
1978impl AsyncIterator for AsyncDbKeyIter<'_> {
1979 type Item = Result<Key>;
1980 async fn next(&mut self) -> Option<Self::Item> {
1981 while let Some(iter) = self.iters.last_mut() {
1982 let item = match iter.next_item().await {
1983 None => None,
1984 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1985 Some(Ok(key)) => Some(Ok(key[self.prefix_len..].to_vec())),
1986 };
1987
1988 if item.is_some() {
1989 return item;
1990 }
1991 self.iters.pop();
1992 if self.iters.is_empty() {
1993 return None;
1994 }
1995 }
1996 None
1997 }
1998}
1999
2000pub struct AsyncKeyIter<'a> {
2002 iter: redis::AsyncIter<'a, (Key, ())>,
2003}
2004
2005#[async_trait]
2006impl AsyncIterator for AsyncKeyIter<'_> {
2007 type Item = Result<Key>;
2008
2009 async fn next(&mut self) -> Option<Self::Item> {
2010 match self.iter.next_item().await {
2011 None => None,
2012 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
2013 Some(Ok((key, _))) => Some(Ok(key)),
2014 }
2015 }
2016}
2017
2018pub struct AsyncMapIter<'a> {
2020 db: RedisStorageDB,
2021 iters: Vec<redis::AsyncIter<'a, Key>>,
2022}
2023
2024#[async_trait]
2025impl AsyncIterator for AsyncMapIter<'_> {
2026 type Item = Result<StorageMap>;
2027
2028 async fn next(&mut self) -> Option<Self::Item> {
2029 while let Some(iter) = self.iters.last_mut() {
2030 let full_name = match iter.next_item().await {
2031 None => None,
2032 Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
2033 Some(Ok(key)) => Some(key),
2034 };
2035
2036 if let Some(full_name) = full_name {
2037 let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec();
2038 let m = RedisStorageMap::new(name, full_name, self.db.clone());
2039 return Some(Ok(StorageMap::RedisCluster(m)));
2040 }
2041 self.iters.pop();
2042 if self.iters.is_empty() {
2043 return None;
2044 }
2045 }
2046 None
2047 }
2048}
2049
2050pub struct AsyncListIter<'a> {
2052 db: RedisStorageDB,
2053 iters: Vec<redis::AsyncIter<'a, Key>>,
2054}
2055
2056#[async_trait]
2057impl AsyncIterator for AsyncListIter<'_> {
2058 type Item = Result<StorageList>;
2059
2060 async fn next(&mut self) -> Option<Self::Item> {
2061 while let Some(iter) = self.iters.last_mut() {
2062 let full_name = match iter.next_item().await {
2063 None => None,
2064 Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
2065 Some(Ok(key)) => Some(key),
2066 };
2067
2068 if let Some(full_name) = full_name {
2069 let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec();
2070 let l = RedisStorageList::new(name, full_name, self.db.clone());
2071 return Some(Ok(StorageList::RedisCluster(l)));
2072 }
2073 self.iters.pop();
2074 if self.iters.is_empty() {
2075 return None;
2076 }
2077 }
2078 None
2079 }
2080}
2081
2082#[inline]
2084fn transform_by_slot<T>(input: Vec<(u16, T)>) -> Vec<Vec<T>> {
2085 let mut grouped_data: BTreeMap<u16, Vec<T>> = BTreeMap::new();
2086
2087 for (group_key, item) in input {
2088 grouped_data.entry(group_key).or_default().push(item);
2089 }
2090
2091 grouped_data.into_values().collect()
2092}