rmqtt_storage/
storage_redis.rs

1//! Redis storage implementation for key-value, map, and list data structures
2//!
3//! This module provides a Redis-backed storage system with support for:
4//! - Key-value storage with expiration
5//! - Map (hash) data structures
6//! - List data structures
7//! - Counters with atomic operations
8//! - Iteration and scanning capabilities
9//! - Standalone Redis connection support
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use redis::aio::{ConnectionManager, ConnectionManagerConfig};
18use redis::{pipe, AsyncCommands};
19use serde::de::DeserializeOwned;
20use serde::Deserialize;
21use serde::Serialize;
22use serde_json::Value;
23
24use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB};
25use crate::{Result, StorageList, StorageMap};
26
27#[allow(unused_imports)]
28use crate::{timestamp_millis, TimestampMillis};
29
30use crate::storage::{KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR};
31
32/// Type alias for Redis connection manager
33type RedisConnection = ConnectionManager;
34
35/// Configuration for Redis storage
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RedisConfig {
38    /// Redis server URL
39    pub url: String,
40    /// Key prefix for all storage operations
41    pub prefix: String,
42}
43
44impl Default for RedisConfig {
45    fn default() -> Self {
46        RedisConfig {
47            url: String::default(),
48            prefix: "__def".into(),
49        }
50    }
51}
52
53/// Redis storage database implementation
54#[derive(Clone)]
55pub struct RedisStorageDB {
56    /// Prefix for all keys
57    prefix: Key,
58    /// Asynchronous connection manager
59    async_conn: RedisConnection,
60}
61
62impl RedisStorageDB {
63    /// Creates a new Redis storage instance
64    #[inline]
65    pub(crate) async fn new(cfg: RedisConfig) -> Result<Self> {
66        let prefix = [cfg.prefix.as_bytes(), SEPARATOR].concat();
67        let client = match redis::Client::open(cfg.url.as_str()) {
68            Ok(c) => c,
69            Err(e) => {
70                log::error!("open redis error, config is {:?}, {:?}", cfg, e);
71                return Err(anyhow!(e));
72            }
73        };
74
75        // Configure connection manager with retry settings
76        let mgr_cfg = ConnectionManagerConfig::default()
77            .set_exponent_base(100)
78            .set_factor(2)
79            .set_number_of_retries(2)
80            .set_connection_timeout(Duration::from_secs(15))
81            .set_response_timeout(Duration::from_secs(10));
82
83        // Create connection manager
84        let async_conn = match client.get_connection_manager_with_config(mgr_cfg).await {
85            Ok(conn) => conn,
86            Err(e) => {
87                log::error!("get redis connection error, config is {:?}, {:?}", cfg, e);
88                return Err(anyhow!(e));
89            }
90        };
91
92        // Create database instance and start cleanup task
93        let db = Self { prefix, async_conn }.cleanup();
94        Ok(db)
95    }
96
97    /// Starts background cleanup task
98    fn cleanup(self) -> Self {
99        let db = self.clone();
100        tokio::spawn(async move {
101            loop {
102                tokio::time::sleep(std::time::Duration::from_secs(30)).await;
103                let mut async_conn = db.async_conn();
104                let db_zkey = db.make_len_sortedset_key();
105                if let Err(e) = async_conn
106                    .zrembyscore::<'_, _, _, _, ()>(db_zkey.as_slice(), 0, timestamp_millis())
107                    .await
108                {
109                    log::error!("{:?}", e);
110                }
111            }
112        });
113        self
114    }
115
116    /// Gets a clone of the async connection
117    #[inline]
118    fn async_conn(&self) -> RedisConnection {
119        self.async_conn.clone()
120    }
121
122    /// Gets a mutable reference to the async connection
123    #[inline]
124    fn async_conn_mut(&mut self) -> &mut RedisConnection {
125        &mut self.async_conn
126    }
127
128    /// Creates key for length tracking sorted set
129    #[inline]
130    #[allow(dead_code)]
131    fn make_len_sortedset_key(&self) -> Key {
132        [KEY_PREFIX_LEN, self.prefix.as_slice()].concat()
133    }
134
135    /// Creates full key with prefix
136    #[inline]
137    fn make_full_key<K>(&self, key: K) -> Key
138    where
139        K: AsRef<[u8]>,
140    {
141        [KEY_PREFIX, self.prefix.as_slice(), key.as_ref()].concat()
142    }
143
144    /// Creates scan pattern with prefix
145    #[inline]
146    fn make_scan_pattern_match<P: AsRef<[u8]>>(&self, pattern: P) -> Key {
147        [KEY_PREFIX, self.prefix.as_slice(), pattern.as_ref()].concat()
148    }
149
150    /// Creates full map name with prefix
151    #[inline]
152    fn make_map_full_name<K>(&self, name: K) -> Key
153    where
154        K: AsRef<[u8]>,
155    {
156        [MAP_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
157    }
158
159    /// Creates full list name with prefix
160    #[inline]
161    fn make_list_full_name<K>(&self, name: K) -> Key
162    where
163        K: AsRef<[u8]>,
164    {
165        [LIST_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
166    }
167
168    /// Creates map prefix pattern for scanning
169    #[inline]
170    fn make_map_prefix_match(&self) -> Key {
171        [MAP_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
172    }
173
174    /// Creates list prefix pattern for scanning
175    #[inline]
176    fn make_list_prefix_match(&self) -> Key {
177        [LIST_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
178    }
179
180    /// Extracts map key from full name
181    #[inline]
182    fn map_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
183        full_name[MAP_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
184    }
185
186    /// Extracts list key from full name
187    #[inline]
188    fn list_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
189        full_name[LIST_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
190    }
191
192    /// Gets full key name for a given key
193    #[inline]
194    async fn _get_full_name(&self, key: &[u8]) -> Result<Key> {
195        let map_full_name = self.make_map_full_name(key);
196        let mut async_conn = self.async_conn();
197        let full_name = if async_conn.exists(map_full_name.as_slice()).await? {
198            map_full_name
199        } else {
200            let list_full_name = self.make_list_full_name(key);
201            if async_conn.exists(list_full_name.as_slice()).await? {
202                list_full_name
203            } else {
204                self.make_full_key(key)
205            }
206        };
207        Ok(full_name)
208    }
209
210    /// Internal method to insert a key-value pair
211    #[inline]
212    async fn _insert<K, V>(
213        &self,
214        key: K,
215        val: &V,
216        expire_interval: Option<TimestampMillis>,
217    ) -> Result<()>
218    where
219        K: AsRef<[u8]> + Sync + Send,
220        V: serde::ser::Serialize + Sync + Send,
221    {
222        let full_key = self.make_full_key(key.as_ref());
223
224        #[cfg(not(feature = "len"))]
225        {
226            if let Some(expire_interval) = expire_interval {
227                let mut async_conn = self.async_conn();
228                pipe()
229                    .atomic()
230                    .set(full_key.as_slice(), bincode::serialize(val)?)
231                    .pexpire(full_key.as_slice(), expire_interval)
232                    .query_async::<()>(&mut async_conn)
233                    .await?;
234            } else {
235                let _: () = self
236                    .async_conn()
237                    .set(full_key, bincode::serialize(val)?)
238                    .await?;
239            }
240        }
241        #[cfg(feature = "len")]
242        {
243            let db_zkey = self.make_len_sortedset_key();
244            let mut async_conn = self.async_conn();
245            if let Some(expire_interval) = expire_interval {
246                pipe()
247                    .atomic()
248                    .set(full_key.as_slice(), bincode::serialize(val)?)
249                    .pexpire(full_key.as_slice(), expire_interval)
250                    .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
251                    .query_async::<()>(&mut async_conn)
252                    .await?;
253            } else {
254                pipe()
255                    .atomic()
256                    .set(full_key.as_slice(), bincode::serialize(val)?)
257                    .zadd(db_zkey, key.as_ref(), i64::MAX)
258                    .query_async::<()>(&mut async_conn)
259                    .await?;
260            }
261        }
262
263        Ok(())
264    }
265
266    /// Internal method for batch insertion
267    #[inline]
268    async fn _batch_insert(
269        &self,
270        key_val_expires: Vec<(Key, Vec<u8>, Option<TimestampMillis>)>,
271    ) -> Result<()> {
272        #[cfg(not(feature = "len"))]
273        {
274            let keys_vals: Vec<(Key, &Vec<u8>)> = key_val_expires
275                .iter()
276                .map(|(key_ref, value, _)| (self.make_full_key(key_ref), value))
277                .collect();
278
279            let mut async_conn = self.async_conn();
280            let mut p = pipe();
281            let mut rpipe = p.atomic().mset(keys_vals.as_slice());
282            for (k, _, at) in key_val_expires {
283                if let Some(at) = at {
284                    rpipe = rpipe.expire(k, at);
285                }
286            }
287            rpipe.query_async::<()>(&mut async_conn).await?;
288        }
289
290        #[cfg(feature = "len")]
291        {
292            let (full_key_vals, expire_keys): (Vec<_>, Vec<_>) = key_val_expires
293                .iter()
294                .map(|(key_ref, value, timestamp)| {
295                    let full_key_vals = (self.make_full_key(key_ref), value);
296                    let expire_keys = (
297                        timestamp
298                            .map(|t| timestamp_millis() + t)
299                            .unwrap_or(i64::MAX),
300                        key_ref,
301                    );
302                    (full_key_vals, expire_keys)
303                })
304                .unzip();
305
306            let db_zkey = self.make_len_sortedset_key();
307            let mut async_conn = self.async_conn();
308            let mut p = pipe();
309            let mut rpipe = p
310                .atomic()
311                .mset(full_key_vals.as_slice())
312                .zadd_multiple(db_zkey, expire_keys.as_slice());
313            for (k, _, at) in key_val_expires {
314                if let Some(at) = at {
315                    rpipe = rpipe.expire(k, at);
316                }
317            }
318            rpipe.query_async::<((), ())>(&mut async_conn).await?;
319        }
320        Ok(())
321    }
322
323    /// Internal method for batch removal
324    #[inline]
325    async fn _batch_remove(&self, keys: Vec<Key>) -> Result<()> {
326        let full_keys = keys
327            .iter()
328            .map(|k| self.make_full_key(k))
329            .collect::<Vec<_>>();
330        #[cfg(not(feature = "len"))]
331        {
332            let _: () = self.async_conn().del(full_keys).await?;
333        }
334        #[cfg(feature = "len")]
335        {
336            let db_zkey = self.make_len_sortedset_key();
337            let mut async_conn = self.async_conn();
338            pipe()
339                .atomic()
340                .del(full_keys.as_slice())
341                .zrem(db_zkey, keys)
342                .query_async::<()>(&mut async_conn)
343                .await?;
344        }
345        Ok(())
346    }
347
348    /// Internal method to increment a counter
349    #[inline]
350    async fn _counter_incr<K>(
351        &self,
352        key: K,
353        increment: isize,
354        expire_interval: Option<TimestampMillis>,
355    ) -> Result<()>
356    where
357        K: AsRef<[u8]> + Sync + Send,
358    {
359        let full_key = self.make_full_key(key.as_ref());
360        #[cfg(not(feature = "len"))]
361        {
362            if let Some(expire_interval) = expire_interval {
363                let mut async_conn = self.async_conn();
364                pipe()
365                    .atomic()
366                    .incr(full_key.as_slice(), increment)
367                    .pexpire(full_key.as_slice(), expire_interval)
368                    .query_async::<()>(&mut async_conn)
369                    .await?;
370            } else {
371                let _: () = self.async_conn().incr(full_key, increment).await?;
372            }
373        }
374        #[cfg(feature = "len")]
375        {
376            let db_zkey = self.make_len_sortedset_key();
377            let mut async_conn = self.async_conn();
378            if let Some(expire_interval) = expire_interval {
379                pipe()
380                    .atomic()
381                    .incr(full_key.as_slice(), increment)
382                    .pexpire(full_key.as_slice(), expire_interval)
383                    .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
384                    .query_async::<()>(&mut async_conn)
385                    .await?;
386            } else {
387                pipe()
388                    .atomic()
389                    .incr(full_key.as_slice(), increment)
390                    .zadd(db_zkey, key.as_ref(), i64::MAX)
391                    .query_async::<()>(&mut async_conn)
392                    .await?;
393            }
394        }
395        Ok(())
396    }
397
398    /// Internal method to decrement a counter
399    #[inline]
400    async fn _counter_decr<K>(
401        &self,
402        key: K,
403        decrement: isize,
404        expire_interval: Option<TimestampMillis>,
405    ) -> Result<()>
406    where
407        K: AsRef<[u8]> + Sync + Send,
408    {
409        let full_key = self.make_full_key(key.as_ref());
410
411        #[cfg(not(feature = "len"))]
412        {
413            if let Some(expire_interval) = expire_interval {
414                let mut async_conn = self.async_conn();
415                pipe()
416                    .atomic()
417                    .decr(full_key.as_slice(), decrement)
418                    .pexpire(full_key.as_slice(), expire_interval)
419                    .query_async::<()>(&mut async_conn)
420                    .await?;
421            } else {
422                let _: () = self.async_conn().decr(full_key, decrement).await?;
423            }
424        }
425        #[cfg(feature = "len")]
426        {
427            let db_zkey = self.make_len_sortedset_key();
428            let mut async_conn = self.async_conn();
429            if let Some(expire_interval) = expire_interval {
430                pipe()
431                    .atomic()
432                    .decr(full_key.as_slice(), decrement)
433                    .pexpire(full_key.as_slice(), expire_interval)
434                    .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
435                    .query_async::<()>(&mut async_conn)
436                    .await?;
437            } else {
438                pipe()
439                    .atomic()
440                    .decr(full_key.as_slice(), decrement)
441                    .zadd(db_zkey, key.as_ref(), i64::MAX)
442                    .query_async::<()>(&mut async_conn)
443                    .await?;
444            }
445        }
446        Ok(())
447    }
448
449    /// Internal method to set a counter value
450    #[inline]
451    async fn _counter_set<K>(
452        &self,
453        key: K,
454        val: isize,
455        expire_interval: Option<TimestampMillis>,
456    ) -> Result<()>
457    where
458        K: AsRef<[u8]> + Sync + Send,
459    {
460        let full_key = self.make_full_key(key.as_ref());
461        #[cfg(not(feature = "len"))]
462        {
463            if let Some(expire_interval) = expire_interval {
464                let mut async_conn = self.async_conn();
465                pipe()
466                    .atomic()
467                    .set(full_key.as_slice(), val)
468                    .pexpire(full_key.as_slice(), expire_interval)
469                    .query_async::<()>(&mut async_conn)
470                    .await?;
471            } else {
472                let _: () = self.async_conn().set(full_key, val).await?;
473            }
474        }
475        #[cfg(feature = "len")]
476        {
477            let db_zkey = self.make_len_sortedset_key();
478            let mut async_conn = self.async_conn();
479            if let Some(expire_interval) = expire_interval {
480                pipe()
481                    .atomic()
482                    .set(full_key.as_slice(), val)
483                    .pexpire(full_key.as_slice(), expire_interval)
484                    .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
485                    .query_async::<()>(&mut async_conn)
486                    .await?;
487            } else {
488                pipe()
489                    .atomic()
490                    .set(full_key.as_slice(), val)
491                    .zadd(db_zkey, key.as_ref(), i64::MAX)
492                    .query_async::<()>(&mut async_conn)
493                    .await?;
494            }
495        }
496
497        Ok(())
498    }
499
500    /// Internal method to remove a key
501    #[inline]
502    async fn _remove<K>(&self, key: K) -> Result<()>
503    where
504        K: AsRef<[u8]> + Sync + Send,
505    {
506        let full_key = self.make_full_key(key.as_ref());
507
508        #[cfg(not(feature = "len"))]
509        {
510            let _: () = self.async_conn().del(full_key).await?;
511        }
512        #[cfg(feature = "len")]
513        {
514            let db_zkey = self.make_len_sortedset_key();
515            let mut async_conn = self.async_conn();
516            pipe()
517                .atomic()
518                .del(full_key.as_slice())
519                .zrem(db_zkey, key.as_ref())
520                .query_async::<()>(&mut async_conn)
521                .await?;
522        }
523        Ok(())
524    }
525}
526
527#[async_trait]
528impl StorageDB for RedisStorageDB {
529    type MapType = RedisStorageMap;
530    type ListType = RedisStorageList;
531
532    /// Creates a new map with optional expiration
533    #[inline]
534    async fn map<V: AsRef<[u8]> + Sync + Send>(
535        &self,
536        name: V,
537        expire: Option<TimestampMillis>,
538    ) -> Result<Self::MapType> {
539        let full_name = self.make_map_full_name(name.as_ref());
540        Ok(
541            RedisStorageMap::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
542                .await?,
543        )
544    }
545
546    /// Removes a map
547    #[inline]
548    async fn map_remove<K>(&self, name: K) -> Result<()>
549    where
550        K: AsRef<[u8]> + Sync + Send,
551    {
552        let map_full_name = self.make_map_full_name(name.as_ref());
553        let _: () = self.async_conn().del(map_full_name).await?;
554        Ok(())
555    }
556
557    /// Checks if a map exists
558    #[inline]
559    async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
560        let map_full_name = self.make_map_full_name(key.as_ref());
561        Ok(self.async_conn().exists(map_full_name).await?)
562    }
563
564    /// Creates a new list with optional expiration
565    #[inline]
566    async fn list<V: AsRef<[u8]> + Sync + Send>(
567        &self,
568        name: V,
569        expire: Option<TimestampMillis>,
570    ) -> Result<Self::ListType> {
571        let full_name = self.make_list_full_name(name.as_ref());
572        Ok(
573            RedisStorageList::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
574                .await?,
575        )
576    }
577
578    /// Removes a list
579    #[inline]
580    async fn list_remove<K>(&self, name: K) -> Result<()>
581    where
582        K: AsRef<[u8]> + Sync + Send,
583    {
584        let list_full_name = self.make_list_full_name(name.as_ref());
585        let _: () = self.async_conn().del(list_full_name).await?;
586        Ok(())
587    }
588
589    /// Checks if a list exists
590    #[inline]
591    async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
592        let list_full_name = self.make_list_full_name(key.as_ref());
593        Ok(self.async_conn().exists(list_full_name).await?)
594    }
595
596    /// Inserts a key-value pair
597    #[inline]
598    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
599    where
600        K: AsRef<[u8]> + Sync + Send,
601        V: serde::ser::Serialize + Sync + Send,
602    {
603        self._insert(key, val, None).await
604    }
605
606    /// Gets a value by key
607    #[inline]
608    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
609    where
610        K: AsRef<[u8]> + Sync + Send,
611        V: DeserializeOwned + Sync + Send,
612    {
613        let full_key = self.make_full_key(key);
614        if let Some(v) = self
615            .async_conn()
616            .get::<_, Option<Vec<u8>>>(full_key)
617            .await?
618        {
619            Ok(Some(bincode::deserialize::<V>(v.as_ref())?))
620        } else {
621            Ok(None)
622        }
623    }
624
625    /// Removes a key
626    #[inline]
627    async fn remove<K>(&self, key: K) -> Result<()>
628    where
629        K: AsRef<[u8]> + Sync + Send,
630    {
631        self._remove(key).await
632    }
633
634    /// Batch insertion of key-value pairs
635    #[inline]
636    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
637    where
638        V: Serialize + Sync + Send,
639    {
640        if !key_vals.is_empty() {
641            let keys_vals_expires = key_vals
642                .into_iter()
643                .map(|(k, v)| {
644                    bincode::serialize(&v)
645                        .map(move |v| (k, v, None))
646                        .map_err(|e| anyhow!(e))
647                })
648                .collect::<Result<Vec<_>>>()?;
649            self._batch_insert(keys_vals_expires).await?;
650        }
651        Ok(())
652    }
653
654    /// Batch removal of keys
655    #[inline]
656    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
657        if !keys.is_empty() {
658            self._batch_remove(keys).await?;
659        }
660        Ok(())
661    }
662
663    /// Increments a counter
664    #[inline]
665    async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
666    where
667        K: AsRef<[u8]> + Sync + Send,
668    {
669        self._counter_incr(key, increment, None).await
670    }
671
672    /// Decrements a counter
673    #[inline]
674    async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
675    where
676        K: AsRef<[u8]> + Sync + Send,
677    {
678        self._counter_decr(key, decrement, None).await
679    }
680
681    /// Gets a counter value
682    #[inline]
683    async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
684    where
685        K: AsRef<[u8]> + Sync + Send,
686    {
687        let full_key = self.make_full_key(key);
688        Ok(self.async_conn().get::<_, Option<isize>>(full_key).await?)
689    }
690
691    /// Sets a counter value
692    #[inline]
693    async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
694    where
695        K: AsRef<[u8]> + Sync + Send,
696    {
697        self._counter_set(key, val, None).await
698    }
699
700    /// Checks if a key exists
701    #[inline]
702    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
703        let full_key = self.make_full_key(key.as_ref());
704        Ok(self.async_conn().exists(full_key).await?)
705    }
706
707    /// Gets the number of keys in the database
708    #[inline]
709    #[cfg(feature = "len")]
710    async fn len(&self) -> Result<usize> {
711        let db_zkey = self.make_len_sortedset_key();
712        let mut async_conn = self.async_conn();
713        let (_, count) = pipe()
714            .zrembyscore(db_zkey.as_slice(), 0, timestamp_millis())
715            .zcard(db_zkey.as_slice())
716            .query_async::<(i64, usize)>(&mut async_conn)
717            .await?;
718        Ok(count)
719    }
720
721    /// Gets the total database size
722    #[inline]
723    async fn db_size(&self) -> Result<usize> {
724        let mut async_conn = self.async_conn();
725        //DBSIZE
726        let dbsize = redis::pipe()
727            .cmd("DBSIZE")
728            .query_async::<redis::Value>(&mut async_conn)
729            .await?;
730        let dbsize = dbsize.as_sequence().and_then(|vs| {
731            vs.iter().next().and_then(|v| {
732                if let redis::Value::Int(v) = v {
733                    Some(*v)
734                } else {
735                    None
736                }
737            })
738        });
739        Ok(dbsize.unwrap_or(0) as usize)
740    }
741
742    /// Sets expiration time for a key
743    #[inline]
744    #[cfg(feature = "ttl")]
745    async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
746    where
747        K: AsRef<[u8]> + Sync + Send,
748    {
749        let full_name = self.make_full_key(key.as_ref());
750        #[cfg(not(feature = "len"))]
751        {
752            let res = self
753                .async_conn()
754                .pexpire_at::<_, bool>(full_name, at)
755                .await?;
756            Ok(res)
757        }
758        #[cfg(feature = "len")]
759        {
760            let db_zkey = self.make_len_sortedset_key();
761            let mut async_conn = self.async_conn();
762            let (_, res) = pipe()
763                .atomic()
764                .zadd(db_zkey, key.as_ref(), at)
765                .pexpire_at(full_name.as_slice(), at)
766                .query_async::<(i64, bool)>(&mut async_conn)
767                .await?;
768            Ok(res)
769        }
770    }
771
772    /// Sets expiration duration for a key
773    #[inline]
774    #[cfg(feature = "ttl")]
775    async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
776    where
777        K: AsRef<[u8]> + Sync + Send,
778    {
779        let full_name = self.make_full_key(key.as_ref());
780
781        #[cfg(not(feature = "len"))]
782        {
783            let res = self.async_conn().pexpire::<_, bool>(full_name, dur).await?;
784            Ok(res)
785        }
786        #[cfg(feature = "len")]
787        {
788            let db_zkey = self.make_len_sortedset_key();
789            let mut async_conn = self.async_conn();
790            let (_, res) = pipe()
791                .atomic()
792                .zadd(db_zkey, key.as_ref(), timestamp_millis() + dur)
793                .pexpire(full_name.as_slice(), dur)
794                .query_async::<(i64, bool)>(&mut async_conn)
795                .await?;
796            Ok(res)
797        }
798    }
799
800    /// Gets time-to-live for a key
801    #[inline]
802    #[cfg(feature = "ttl")]
803    async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
804    where
805        K: AsRef<[u8]> + Sync + Send,
806    {
807        let mut async_conn = self.async_conn();
808        let full_key = self.make_full_key(key.as_ref());
809        let res = async_conn.pttl::<_, isize>(full_key).await?;
810        match res {
811            -2 => Ok(None),
812            -1 => Ok(Some(TimestampMillis::MAX)),
813            _ => Ok(Some(res as TimestampMillis)),
814        }
815    }
816
817    /// Creates an iterator for all maps
818    #[inline]
819    async fn map_iter<'a>(
820        &'a mut self,
821    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
822        let pattern = self.make_map_prefix_match();
823        let iter = AsyncMapIter {
824            db: self.clone(),
825            iter: self.async_conn_mut().scan_match::<_, Key>(pattern).await?,
826        };
827        Ok(Box::new(iter))
828    }
829
830    /// Creates an iterator for all lists
831    #[inline]
832    async fn list_iter<'a>(
833        &'a mut self,
834    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
835        let pattern = self.make_list_prefix_match();
836        let iter = AsyncListIter {
837            db: self.clone(),
838            iter: self.async_conn_mut().scan_match::<_, Key>(pattern).await?,
839        };
840        Ok(Box::new(iter))
841    }
842
843    /// Creates an iterator for keys matching a pattern
844    async fn scan<'a, P>(
845        &'a mut self,
846        pattern: P,
847    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
848    where
849        P: AsRef<[u8]> + Send + Sync,
850    {
851        let pattern = self.make_scan_pattern_match(pattern);
852        let prefix_len = KEY_PREFIX.len() + self.prefix.len();
853        let iter = AsyncDbKeyIter {
854            prefix_len,
855            iter: self
856                .async_conn_mut()
857                .scan_match::<_, Key>(pattern.as_slice())
858                .await?,
859        };
860        Ok(Box::new(iter))
861    }
862
863    /// Gets database information
864    #[inline]
865    async fn info(&self) -> Result<Value> {
866        let mut conn = self.async_conn();
867        let dbsize = redis::pipe()
868            .cmd("dbsize")
869            .query_async::<redis::Value>(&mut conn)
870            .await?;
871        let dbsize = dbsize.as_sequence().and_then(|vs| {
872            vs.iter().next().and_then(|v| {
873                if let redis::Value::Int(v) = v {
874                    Some(*v)
875                } else {
876                    None
877                }
878            })
879        });
880        Ok(serde_json::json!({
881            "storage_engine": "Redis",
882            "dbsize": dbsize,
883        }))
884    }
885}
886
887/// Redis-backed map storage implementation
888#[derive(Clone)]
889pub struct RedisStorageMap {
890    /// Name of the map
891    name: Key,
892    /// Full key name with prefix
893    full_name: Key,
894    /// Optional expiration time in milliseconds
895    #[allow(dead_code)]
896    expire: Option<TimestampMillis>,
897    /// Flag indicating if the map is empty
898    empty: Arc<AtomicBool>,
899    /// Reference to the parent database
900    pub(crate) db: RedisStorageDB,
901}
902
903impl RedisStorageMap {
904    /// Creates a new map without expiration
905    #[inline]
906    pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
907        Self {
908            name,
909            full_name,
910            expire: None,
911            empty: Arc::new(AtomicBool::new(true)),
912            db,
913        }
914    }
915
916    /// Creates a new map with expiration
917    #[inline]
918    pub(crate) async fn new_expire(
919        name: Key,
920        full_name: Key,
921        expire: Option<TimestampMillis>,
922        mut db: RedisStorageDB,
923    ) -> Result<Self> {
924        let empty = if expire.is_some() {
925            let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
926            Arc::new(AtomicBool::new(empty))
927        } else {
928            Arc::new(AtomicBool::new(true))
929        };
930        Ok(Self {
931            name,
932            full_name,
933            expire,
934            empty,
935            db,
936        })
937    }
938
939    /// Gets a clone of the async connection
940    #[inline]
941    fn async_conn(&self) -> RedisConnection {
942        self.db.async_conn()
943    }
944
945    /// Gets a mutable reference to the async connection
946    #[inline]
947    fn async_conn_mut(&mut self) -> &mut RedisConnection {
948        self.db.async_conn_mut()
949    }
950
951    /// Checks if the map is empty
952    #[inline]
953    async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
954        let res = async_conn
955            .hscan::<_, Vec<u8>>(full_name)
956            .await?
957            .next_item()
958            .await
959            .is_none();
960        Ok(res)
961    }
962
963    /// Internal method to insert with expiration handling
964    #[inline]
965    async fn _insert_expire(&self, key: &[u8], val: Vec<u8>) -> Result<()> {
966        let mut async_conn = self.async_conn();
967        let name = self.full_name.as_slice();
968
969        #[cfg(feature = "ttl")]
970        if self.empty.load(Ordering::SeqCst) {
971            if let Some(expire) = self.expire.as_ref() {
972                let _: () = redis::pipe()
973                    .atomic()
974                    .hset(name, key, val)
975                    .pexpire(name, *expire)
976                    .query_async(&mut async_conn)
977                    .await?;
978                self.empty.store(false, Ordering::SeqCst);
979                return Ok(());
980            }
981        }
982
983        let _: () = async_conn.hset(name, key.as_ref(), val).await?;
984        Ok(())
985    }
986
987    /// Internal method for batch insertion with expiration
988    #[inline]
989    async fn _batch_insert_expire(&self, key_vals: Vec<(Key, Vec<u8>)>) -> Result<()> {
990        let mut async_conn = self.async_conn();
991        let name = self.full_name.as_slice();
992
993        #[cfg(feature = "ttl")]
994        if self.empty.load(Ordering::SeqCst) {
995            if let Some(expire) = self.expire.as_ref() {
996                let _: () = redis::pipe()
997                    .atomic()
998                    .hset_multiple(name, key_vals.as_slice())
999                    .pexpire(name, *expire)
1000                    .query_async(&mut async_conn)
1001                    .await?;
1002
1003                self.empty.store(false, Ordering::SeqCst);
1004                return Ok(());
1005            }
1006        }
1007
1008        let _: () = async_conn.hset_multiple(name, key_vals.as_slice()).await?;
1009        Ok(())
1010    }
1011}
1012
1013#[async_trait]
1014impl Map for RedisStorageMap {
1015    /// Gets the map name
1016    #[inline]
1017    fn name(&self) -> &[u8] {
1018        self.name.as_slice()
1019    }
1020
1021    /// Inserts a key-value pair into the map
1022    #[inline]
1023    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
1024    where
1025        K: AsRef<[u8]> + Sync + Send,
1026        V: Serialize + Sync + Send + ?Sized,
1027    {
1028        self._insert_expire(key.as_ref(), bincode::serialize(val)?)
1029            .await
1030    }
1031
1032    /// Gets a value from the map
1033    #[inline]
1034    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
1035    where
1036        K: AsRef<[u8]> + Sync + Send,
1037        V: DeserializeOwned + Sync + Send,
1038    {
1039        let res: Option<Vec<u8>> = self
1040            .async_conn()
1041            .hget(self.full_name.as_slice(), key.as_ref())
1042            .await?;
1043        if let Some(res) = res {
1044            Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1045        } else {
1046            Ok(None)
1047        }
1048    }
1049
1050    /// Removes a key from the map
1051    #[inline]
1052    async fn remove<K>(&self, key: K) -> Result<()>
1053    where
1054        K: AsRef<[u8]> + Sync + Send,
1055    {
1056        let _: () = self
1057            .async_conn()
1058            .hdel(self.full_name.as_slice(), key.as_ref())
1059            .await?;
1060        Ok(())
1061    }
1062
1063    /// Checks if a key exists in the map
1064    #[inline]
1065    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1066        let res = self
1067            .async_conn()
1068            .hexists(self.full_name.as_slice(), key.as_ref())
1069            .await?;
1070        Ok(res)
1071    }
1072
1073    /// Gets the number of elements in the map
1074    #[cfg(feature = "map_len")]
1075    #[inline]
1076    async fn len(&self) -> Result<usize> {
1077        Ok(self.async_conn().hlen(self.full_name.as_slice()).await?)
1078    }
1079
1080    /// Checks if the map is empty
1081    #[inline]
1082    async fn is_empty(&self) -> Result<bool> {
1083        let res = self
1084            .async_conn()
1085            .hscan::<_, Vec<u8>>(self.full_name.as_slice())
1086            .await?
1087            .next_item()
1088            .await
1089            .is_none();
1090        Ok(res)
1091    }
1092
1093    /// Clears all elements from the map
1094    #[inline]
1095    async fn clear(&self) -> Result<()> {
1096        let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1097        self.empty.store(true, Ordering::SeqCst);
1098        Ok(())
1099    }
1100
1101    /// Removes and returns a value from the map
1102    #[inline]
1103    async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
1104    where
1105        K: AsRef<[u8]> + Sync + Send,
1106        V: DeserializeOwned + Sync + Send,
1107    {
1108        let name = self.full_name.as_slice();
1109        let mut conn = self.async_conn();
1110        let (res, _): (Option<Vec<u8>>, isize) = redis::pipe()
1111            .atomic()
1112            .hget(name, key.as_ref())
1113            .hdel(name, key.as_ref())
1114            .query_async(&mut conn)
1115            .await?;
1116
1117        if let Some(res) = res {
1118            Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1119        } else {
1120            Ok(None)
1121        }
1122    }
1123
1124    /// Removes all keys with a given prefix
1125    #[inline]
1126    async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
1127    where
1128        K: AsRef<[u8]> + Sync + Send,
1129    {
1130        let name = self.full_name.as_slice();
1131        let mut conn = self.async_conn();
1132        let mut conn2 = conn.clone();
1133        let mut prefix = prefix.as_ref().to_vec();
1134        prefix.push(b'*');
1135        let mut removeds = Vec::new();
1136        while let Some(key) = conn
1137            .hscan_match::<_, _, Vec<u8>>(name, prefix.as_slice())
1138            .await?
1139            .next_item()
1140            .await
1141        {
1142            removeds.push(key?);
1143            if removeds.len() > 20 {
1144                let _: () = conn2.hdel(name, removeds.as_slice()).await?;
1145                removeds.clear();
1146            }
1147        }
1148        if !removeds.is_empty() {
1149            let _: () = conn.hdel(name, removeds).await?;
1150        }
1151        Ok(())
1152    }
1153
1154    /// Batch insertion of key-value pairs
1155    #[inline]
1156    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
1157    where
1158        V: Serialize + Sync + Send,
1159    {
1160        if !key_vals.is_empty() {
1161            let key_vals = key_vals
1162                .into_iter()
1163                .map(|(k, v)| {
1164                    bincode::serialize(&v)
1165                        .map(move |v| (k, v))
1166                        .map_err(|e| anyhow!(e))
1167                })
1168                .collect::<Result<Vec<_>>>()?;
1169
1170            self._batch_insert_expire(key_vals).await?;
1171        }
1172        Ok(())
1173    }
1174
1175    /// Batch removal of keys
1176    #[inline]
1177    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1178        if !keys.is_empty() {
1179            let _: () = self
1180                .async_conn()
1181                .hdel(self.full_name.as_slice(), keys)
1182                .await?;
1183        }
1184        Ok(())
1185    }
1186
1187    /// Creates an iterator over key-value pairs
1188    #[inline]
1189    async fn iter<'a, V>(
1190        &'a mut self,
1191    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1192    where
1193        V: DeserializeOwned + Sync + Send + 'a + 'static,
1194    {
1195        let name = self.full_name.clone();
1196        let iter = AsyncIter {
1197            iter: self
1198                .async_conn_mut()
1199                .hscan::<_, (Key, Vec<u8>)>(name)
1200                .await?,
1201            _m: std::marker::PhantomData,
1202        };
1203        Ok(Box::new(iter))
1204    }
1205
1206    /// Creates an iterator over keys
1207    #[inline]
1208    async fn key_iter<'a>(
1209        &'a mut self,
1210    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
1211        let iter = AsyncKeyIter {
1212            iter: self
1213                .db
1214                .async_conn
1215                .hscan::<_, (Key, ())>(self.full_name.as_slice())
1216                .await?,
1217        };
1218        Ok(Box::new(iter))
1219    }
1220
1221    /// Creates an iterator over key-value pairs with a prefix
1222    #[inline]
1223    async fn prefix_iter<'a, P, V>(
1224        &'a mut self,
1225        prefix: P,
1226    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1227    where
1228        P: AsRef<[u8]> + Send + Sync,
1229        V: DeserializeOwned + Sync + Send + 'a + 'static,
1230    {
1231        let name = self.full_name.clone();
1232        let mut prefix = prefix.as_ref().to_vec();
1233        prefix.push(b'*');
1234        let iter = AsyncIter {
1235            iter: self
1236                .async_conn_mut()
1237                .hscan_match::<_, _, (Key, Vec<u8>)>(name, prefix.as_slice())
1238                .await?,
1239            _m: std::marker::PhantomData,
1240        };
1241        Ok(Box::new(iter))
1242    }
1243
1244    /// Sets expiration time for the map
1245    #[cfg(feature = "ttl")]
1246    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1247        let res = self
1248            .async_conn()
1249            .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1250            .await?;
1251        Ok(res)
1252    }
1253
1254    /// Sets expiration duration for the map
1255    #[cfg(feature = "ttl")]
1256    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1257        let res = self
1258            .async_conn()
1259            .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1260            .await?;
1261        Ok(res)
1262    }
1263
1264    /// Gets time-to-live for the map
1265    #[cfg(feature = "ttl")]
1266    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1267        let mut async_conn = self.async_conn();
1268        let res = async_conn
1269            .pttl::<_, isize>(self.full_name.as_slice())
1270            .await?;
1271        match res {
1272            -2 => Ok(None),
1273            -1 => Ok(Some(TimestampMillis::MAX)),
1274            _ => Ok(Some(res as TimestampMillis)),
1275        }
1276    }
1277}
1278
1279/// Redis-backed list storage implementation
1280#[derive(Clone)]
1281pub struct RedisStorageList {
1282    /// Name of the list
1283    name: Key,
1284    /// Full key name with prefix
1285    full_name: Key,
1286    /// Optional expiration time in milliseconds
1287    #[allow(dead_code)]
1288    expire: Option<TimestampMillis>,
1289    /// Flag indicating if the list is empty
1290    empty: Arc<AtomicBool>,
1291    /// Reference to the parent database
1292    pub(crate) db: RedisStorageDB,
1293}
1294
1295impl RedisStorageList {
1296    /// Creates a new list without expiration
1297    #[inline]
1298    pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
1299        Self {
1300            name,
1301            full_name,
1302            expire: None,
1303            empty: Arc::new(AtomicBool::new(true)),
1304            db,
1305        }
1306    }
1307
1308    /// Creates a new list with expiration
1309    #[inline]
1310    pub(crate) async fn new_expire(
1311        name: Key,
1312        full_name: Key,
1313        expire: Option<TimestampMillis>,
1314        mut db: RedisStorageDB,
1315    ) -> Result<Self> {
1316        let empty = if expire.is_some() {
1317            let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
1318            Arc::new(AtomicBool::new(empty))
1319        } else {
1320            Arc::new(AtomicBool::new(true))
1321        };
1322        Ok(Self {
1323            name,
1324            full_name,
1325            expire,
1326            empty,
1327            db,
1328        })
1329    }
1330
1331    /// Gets a clone of the async connection
1332    #[inline]
1333    pub(crate) fn async_conn(&self) -> RedisConnection {
1334        self.db.async_conn()
1335    }
1336
1337    /// Checks if the list is empty
1338    #[inline]
1339    async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
1340        Ok(async_conn.llen::<_, usize>(full_name).await? == 0)
1341    }
1342
1343    /// Internal method to push with expiration handling
1344    #[inline]
1345    async fn _push_expire(&self, val: Vec<u8>) -> Result<()> {
1346        let mut async_conn = self.async_conn();
1347        let name = self.full_name.as_slice();
1348
1349        #[cfg(feature = "ttl")]
1350        if self.empty.load(Ordering::SeqCst) {
1351            if let Some(expire) = self.expire.as_ref() {
1352                let _: () = redis::pipe()
1353                    .atomic()
1354                    .rpush(name, val)
1355                    .pexpire(name, *expire)
1356                    .query_async(&mut async_conn)
1357                    .await?;
1358                self.empty.store(false, Ordering::SeqCst);
1359                return Ok(());
1360            }
1361        }
1362
1363        let _: () = async_conn.rpush(name, val).await?;
1364        Ok(())
1365    }
1366
1367    /// Internal method for batch push with expiration
1368    #[inline]
1369    async fn _pushs_expire(&self, vals: Vec<Vec<u8>>) -> Result<()> {
1370        let mut async_conn = self.async_conn();
1371
1372        #[cfg(feature = "ttl")]
1373        if self.empty.load(Ordering::SeqCst) {
1374            if let Some(expire) = self.expire.as_ref() {
1375                let name = self.full_name.as_slice();
1376                let _: () = redis::pipe()
1377                    .atomic()
1378                    .rpush(name, vals)
1379                    .pexpire(name, *expire)
1380                    .query_async(&mut async_conn)
1381                    .await?;
1382                self.empty.store(false, Ordering::SeqCst);
1383                return Ok(());
1384            }
1385        }
1386
1387        let _: () = async_conn.rpush(self.full_name.as_slice(), vals).await?;
1388        Ok(())
1389    }
1390
1391    /// Internal method for push with limit and expiration
1392    #[inline]
1393    async fn _push_limit_expire(
1394        &self,
1395        val: Vec<u8>,
1396        limit: usize,
1397        pop_front_if_limited: bool,
1398    ) -> Result<Option<Vec<u8>>> {
1399        let mut conn = self.async_conn();
1400
1401        #[cfg(feature = "ttl")]
1402        if self.empty.load(Ordering::SeqCst) {
1403            if let Some(expire) = self.expire.as_ref() {
1404                let name = self.full_name.as_slice();
1405                let count = conn.llen::<_, usize>(name).await?;
1406                let res = if count < limit {
1407                    let _: () = redis::pipe()
1408                        .atomic()
1409                        .rpush(name, val)
1410                        .pexpire(name, *expire)
1411                        .query_async(&mut conn)
1412                        .await?;
1413                    Ok(None)
1414                } else if pop_front_if_limited {
1415                    let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1416                        .atomic()
1417                        .lpop(name, None)
1418                        .rpush(name, val)
1419                        .pexpire(name, *expire)
1420                        .query_async(&mut conn)
1421                        .await?;
1422
1423                    Ok(poped)
1424                } else {
1425                    Err(anyhow::Error::msg("Is full"))
1426                };
1427                self.empty.store(false, Ordering::SeqCst);
1428                return res;
1429            }
1430        }
1431
1432        self._push_limit(val, limit, pop_front_if_limited, &mut conn)
1433            .await
1434    }
1435
1436    /// Internal method for push with limit
1437    #[inline]
1438    async fn _push_limit(
1439        &self,
1440        val: Vec<u8>,
1441        limit: usize,
1442        pop_front_if_limited: bool,
1443        async_conn: &mut RedisConnection,
1444    ) -> Result<Option<Vec<u8>>> {
1445        let name = self.full_name.as_slice();
1446
1447        let count = async_conn.llen::<_, usize>(name).await?;
1448        if count < limit {
1449            let _: () = async_conn.rpush(name, val).await?;
1450            Ok(None)
1451        } else if pop_front_if_limited {
1452            let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1453                .atomic()
1454                .lpop(name, None)
1455                .rpush(name, val)
1456                .query_async(async_conn)
1457                .await?;
1458            Ok(poped)
1459        } else {
1460            Err(anyhow::Error::msg("Is full"))
1461        }
1462    }
1463}
1464
1465#[async_trait]
1466impl List for RedisStorageList {
1467    /// Gets the list name
1468    #[inline]
1469    fn name(&self) -> &[u8] {
1470        self.name.as_slice()
1471    }
1472
1473    /// Pushes a value to the end of the list
1474    #[inline]
1475    async fn push<V>(&self, val: &V) -> Result<()>
1476    where
1477        V: Serialize + Sync + Send,
1478    {
1479        self._push_expire(bincode::serialize(val)?).await
1480    }
1481
1482    /// Pushes multiple values to the end of the list
1483    #[inline]
1484    async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
1485    where
1486        V: Serialize + Sync + Send,
1487    {
1488        let vals = vals
1489            .into_iter()
1490            .map(|v| bincode::serialize(&v).map_err(|e| anyhow!(e)))
1491            .collect::<Result<Vec<_>>>()?;
1492        self._pushs_expire(vals).await
1493    }
1494
1495    /// Pushes a value with size limit handling
1496    #[inline]
1497    async fn push_limit<V>(
1498        &self,
1499        val: &V,
1500        limit: usize,
1501        pop_front_if_limited: bool,
1502    ) -> Result<Option<V>>
1503    where
1504        V: Serialize + Sync + Send,
1505        V: DeserializeOwned,
1506    {
1507        let data = bincode::serialize(val)?;
1508
1509        if let Some(res) = self
1510            ._push_limit_expire(data, limit, pop_front_if_limited)
1511            .await?
1512        {
1513            Ok(Some(
1514                bincode::deserialize::<V>(res.as_ref()).map_err(|e| anyhow!(e))?,
1515            ))
1516        } else {
1517            Ok(None)
1518        }
1519    }
1520
1521    /// Pops a value from the front of the list
1522    #[inline]
1523    async fn pop<V>(&self) -> Result<Option<V>>
1524    where
1525        V: DeserializeOwned + Sync + Send,
1526    {
1527        let removed = self
1528            .async_conn()
1529            .lpop::<_, Option<Vec<u8>>>(self.full_name.as_slice(), None)
1530            .await?;
1531
1532        let removed = if let Some(v) = removed {
1533            Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1534        } else {
1535            None
1536        };
1537
1538        Ok(removed)
1539    }
1540
1541    /// Gets all values in the list
1542    #[inline]
1543    async fn all<V>(&self) -> Result<Vec<V>>
1544    where
1545        V: DeserializeOwned + Sync + Send,
1546    {
1547        let all = self
1548            .async_conn()
1549            .lrange::<_, Vec<Vec<u8>>>(self.full_name.as_slice(), 0, -1)
1550            .await?;
1551        all.iter()
1552            .map(|v| bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
1553            .collect::<Result<Vec<_>>>()
1554    }
1555
1556    /// Gets a value by index
1557    #[inline]
1558    async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
1559    where
1560        V: DeserializeOwned + Sync + Send,
1561    {
1562        let val = self
1563            .async_conn()
1564            .lindex::<_, Option<Vec<u8>>>(self.full_name.as_slice(), idx as isize)
1565            .await?;
1566
1567        Ok(if let Some(v) = val {
1568            Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1569        } else {
1570            None
1571        })
1572    }
1573
1574    /// Gets the length of the list
1575    #[inline]
1576    async fn len(&self) -> Result<usize> {
1577        Ok(self.async_conn().llen(self.full_name.as_slice()).await?)
1578    }
1579
1580    /// Checks if the list is empty
1581    #[inline]
1582    async fn is_empty(&self) -> Result<bool> {
1583        Ok(self.len().await? == 0)
1584    }
1585
1586    /// Clears the list
1587    #[inline]
1588    async fn clear(&self) -> Result<()> {
1589        let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1590        self.empty.store(true, Ordering::SeqCst);
1591        Ok(())
1592    }
1593
1594    /// Creates an iterator over list values
1595    #[inline]
1596    async fn iter<'a, V>(
1597        &'a mut self,
1598    ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
1599    where
1600        V: DeserializeOwned + Sync + Send + 'a + 'static,
1601    {
1602        Ok(Box::new(AsyncListValIter::new(
1603            self.full_name.as_slice(),
1604            self.db.async_conn(),
1605        )))
1606    }
1607
1608    /// Sets expiration time for the list
1609    #[cfg(feature = "ttl")]
1610    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1611        let res = self
1612            .async_conn()
1613            .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1614            .await?;
1615        Ok(res)
1616    }
1617
1618    /// Sets expiration duration for the list
1619    #[cfg(feature = "ttl")]
1620    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1621        let res = self
1622            .async_conn()
1623            .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1624            .await?;
1625        Ok(res)
1626    }
1627
1628    /// Gets time-to-live for the list
1629    #[cfg(feature = "ttl")]
1630    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1631        let mut async_conn = self.async_conn();
1632        let res = async_conn
1633            .pttl::<_, isize>(self.full_name.as_slice())
1634            .await?;
1635        match res {
1636            -2 => Ok(None),
1637            -1 => Ok(Some(TimestampMillis::MAX)),
1638            _ => Ok(Some(res as TimestampMillis)),
1639        }
1640    }
1641}
1642
1643/// Iterator for list values
1644pub struct AsyncListValIter<'a, V> {
1645    name: &'a [u8],
1646    conn: RedisConnection,
1647    start: isize,
1648    limit: isize,
1649    catch_vals: Vec<Vec<u8>>,
1650    _m: std::marker::PhantomData<V>,
1651}
1652
1653impl<'a, V> AsyncListValIter<'a, V> {
1654    /// Creates a new list value iterator
1655    fn new(name: &'a [u8], conn: RedisConnection) -> Self {
1656        let start = 0;
1657        let limit = 20;
1658        Self {
1659            name,
1660            conn,
1661            start,
1662            limit,
1663            catch_vals: Vec::with_capacity((limit + 1) as usize),
1664            _m: std::marker::PhantomData,
1665        }
1666    }
1667}
1668
1669#[async_trait]
1670impl<V> AsyncIterator for AsyncListValIter<'_, V>
1671where
1672    V: DeserializeOwned + Sync + Send + 'static,
1673{
1674    type Item = Result<V>;
1675
1676    async fn next(&mut self) -> Option<Self::Item> {
1677        if let Some(val) = self.catch_vals.pop() {
1678            return Some(bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)));
1679        }
1680
1681        let vals = self
1682            .conn
1683            .lrange::<_, Vec<Vec<u8>>>(self.name, self.start, self.start + self.limit)
1684            .await;
1685
1686        match vals {
1687            Err(e) => return Some(Err(anyhow!(e))),
1688            Ok(vals) => {
1689                if vals.is_empty() {
1690                    return None;
1691                }
1692                self.start += vals.len() as isize;
1693                self.catch_vals = vals;
1694                self.catch_vals.reverse();
1695            }
1696        }
1697
1698        self.catch_vals
1699            .pop()
1700            .map(|val| bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)))
1701    }
1702}
1703
1704/// Iterator for map entries
1705pub struct AsyncIter<'a, V> {
1706    iter: redis::AsyncIter<'a, (Key, Vec<u8>)>,
1707    _m: std::marker::PhantomData<V>,
1708}
1709
1710#[async_trait]
1711impl<'a, V> AsyncIterator for AsyncIter<'a, V>
1712where
1713    V: DeserializeOwned + Sync + Send + 'a,
1714{
1715    type Item = IterItem<V>;
1716
1717    async fn next(&mut self) -> Option<Self::Item> {
1718        match self.iter.next_item().await {
1719            None => None,
1720            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1721            Some(Ok((key, v))) => match bincode::deserialize::<V>(v.as_ref()) {
1722                Ok(v) => Some(Ok((key, v))),
1723                Err(e) => Some(Err(anyhow::Error::new(e))),
1724            },
1725        }
1726    }
1727}
1728
1729/// Iterator for database keys
1730pub struct AsyncDbKeyIter<'a> {
1731    prefix_len: usize,
1732    iter: redis::AsyncIter<'a, Key>,
1733}
1734
1735#[async_trait]
1736impl AsyncIterator for AsyncDbKeyIter<'_> {
1737    type Item = Result<Key>;
1738
1739    async fn next(&mut self) -> Option<Self::Item> {
1740        match self.iter.next_item().await {
1741            None => None,
1742            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1743            Some(Ok(key)) => Some(Ok(key[self.prefix_len..].to_vec())),
1744        }
1745    }
1746}
1747
1748/// Iterator for map keys
1749pub struct AsyncKeyIter<'a> {
1750    iter: redis::AsyncIter<'a, (Key, ())>,
1751}
1752
1753#[async_trait]
1754impl AsyncIterator for AsyncKeyIter<'_> {
1755    type Item = Result<Key>;
1756
1757    async fn next(&mut self) -> Option<Self::Item> {
1758        match self.iter.next_item().await {
1759            None => None,
1760            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1761            Some(Ok((key, _))) => Some(Ok(key)),
1762        }
1763    }
1764}
1765
1766/// Iterator for maps
1767pub struct AsyncMapIter<'a> {
1768    db: RedisStorageDB,
1769    iter: redis::AsyncIter<'a, Key>,
1770}
1771
1772#[async_trait]
1773impl AsyncIterator for AsyncMapIter<'_> {
1774    type Item = Result<StorageMap>;
1775
1776    async fn next(&mut self) -> Option<Self::Item> {
1777        let full_name = match self.iter.next_item().await {
1778            None => return None,
1779            Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
1780            Some(Ok(key)) => key,
1781        };
1782
1783        let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec();
1784        let m = RedisStorageMap::new(name, full_name, self.db.clone());
1785        Some(Ok(StorageMap::Redis(m)))
1786    }
1787}
1788
1789/// Iterator for lists
1790pub struct AsyncListIter<'a> {
1791    db: RedisStorageDB,
1792    iter: redis::AsyncIter<'a, Key>,
1793}
1794
1795#[async_trait]
1796impl AsyncIterator for AsyncListIter<'_> {
1797    type Item = Result<StorageList>;
1798
1799    async fn next(&mut self) -> Option<Self::Item> {
1800        let full_name = match self.iter.next_item().await {
1801            None => return None,
1802            Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
1803            Some(Ok(key)) => key,
1804        };
1805
1806        let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec();
1807        let l = RedisStorageList::new(name, full_name, self.db.clone());
1808        Some(Ok(StorageList::Redis(l)))
1809    }
1810}