rmqtt_storage/
storage_redis_cluster.rs

1//! Redis-based storage implementation for key-value, map, and list data structures.
2//!
3//! This module provides a Redis-backed storage system with support for:
4//! - Key-value storage with expiration
5//! - Map (hash) data structures
6//! - List data structures
7//! - Counters with atomic operations
8//! - Iteration and scanning capabilities
9//! - Cluster-aware operations
10
11use std::collections::BTreeMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use async_trait::async_trait;
18use redis::{
19    aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig},
20    cluster::ClusterClient,
21    cluster_async::ClusterConnection,
22    cluster_routing::get_slot,
23    pipe, AsyncCommands, Cmd,
24};
25use serde::de::DeserializeOwned;
26use serde::Deserialize;
27use serde::Serialize;
28use serde_json::Value;
29
30use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB};
31use crate::{Result, StorageList, StorageMap};
32
33#[allow(unused_imports)]
34use crate::{timestamp_millis, TimestampMillis};
35
36use crate::storage::{KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR};
37
38/// Type alias for Redis cluster connection
39type RedisConnection = ClusterConnection;
40
41/// Configuration for Redis storage
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct RedisConfig {
44    /// Redis server URLs
45    pub urls: Vec<String>,
46
47    /// Key prefix for all storage operations
48    pub prefix: String,
49}
50
51impl Default for RedisConfig {
52    fn default() -> Self {
53        RedisConfig {
54            urls: Vec::default(),
55            prefix: "__def".into(),
56        }
57    }
58}
59
60/// Redis storage database implementation
61#[derive(Clone)]
62pub struct RedisStorageDB {
63    /// Prefix for all keys
64    prefix: Key,
65    /// Asynchronous connection to Redis cluster
66    async_conn: RedisConnection,
67    /// Connection managers for cluster nodes
68    nodes: Vec<ConnectionManager>,
69    /// Last update time for cluster nodes
70    nodes_update_time: TimestampMillis,
71}
72
73impl RedisStorageDB {
74    /// Creates a new Redis storage instance
75    #[inline]
76    pub(crate) async fn new(cfg: RedisConfig) -> Result<Self> {
77        let prefix = [cfg.prefix.as_bytes(), SEPARATOR].concat();
78
79        let client = ClusterClient::builder(cfg.urls)
80            .retry_wait_formula(2, 100)
81            .retries(2)
82            .connection_timeout(Duration::from_secs(15))
83            .response_timeout(Duration::from_secs(10))
84            .build()?;
85
86        let async_conn = client.get_async_connection().await?;
87
88        let mut db = Self {
89            prefix,
90            async_conn,
91            nodes: Vec::new(),
92            nodes_update_time: timestamp_millis(),
93        }
94        .cleanup();
95        db.refresh_cluster_nodes().await?;
96        Ok(db)
97    }
98
99    /// Starts background cleanup task
100    fn cleanup(self) -> Self {
101        let db = self.clone();
102        tokio::spawn(async move {
103            loop {
104                tokio::time::sleep(std::time::Duration::from_secs(30)).await;
105                let mut async_conn = db.async_conn();
106                let db_zkey = db.make_len_sortedset_key();
107                if let Err(e) = async_conn
108                    .zrembyscore::<'_, _, _, _, ()>(db_zkey.as_slice(), 0, timestamp_millis())
109                    .await
110                {
111                    log::error!("{:?}", e);
112                }
113            }
114        });
115        self
116    }
117
118    /// Gets a clone of the async connection
119    #[inline]
120    fn async_conn(&self) -> RedisConnection {
121        self.async_conn.clone()
122    }
123
124    /// Gets a mutable reference to the async connection
125    #[inline]
126    fn async_conn_mut(&mut self) -> &mut RedisConnection {
127        &mut self.async_conn
128    }
129
130    /// Refreshes cluster node information
131    #[inline]
132    async fn refresh_cluster_nodes(&mut self) -> Result<()> {
133        let slots = self
134            .async_conn()
135            .req_packed_command(Cmd::new().arg("CLUSTER").arg("SLOTS"))
136            .await?;
137
138        let mut addrs = Vec::new();
139        for slot in slots
140            .as_sequence()
141            .map(|arrs| {
142                arrs.iter()
143                    .filter_map(|obj| obj.as_sequence())
144                    .collect::<Vec<_>>()
145            })
146            .iter()
147            .flatten()
148            .collect::<Vec<_>>()
149        {
150            if let Some(addr_info) = slot.get(2) {
151                if let Some(addr_items) = addr_info.as_sequence() {
152                    if addr_items.len() > 1 {
153                        if let (redis::Value::BulkString(addr), redis::Value::Int(port)) =
154                            (&addr_items[0], &addr_items[1])
155                        {
156                            let addr =
157                                format!("redis://{}:{}", String::from_utf8_lossy(addr), *port);
158                            addrs.push(addr);
159                        }
160                    }
161                }
162            }
163        }
164
165        // let mut nodes = BTreeMap::new();
166        let mut nodes = Vec::new();
167        for addr in addrs {
168            let client = match redis::Client::open(addr.as_str()) {
169                Ok(c) => c,
170                Err(e) => {
171                    log::error!("open redis node error, addr is {}, {:?}", addr, e);
172                    return Err(anyhow!(e));
173                }
174            };
175            let mgr_cfg = ConnectionManagerConfig::default()
176                .set_exponent_base(100)
177                .set_factor(2)
178                .set_number_of_retries(2)
179                .set_connection_timeout(Duration::from_secs(15))
180                .set_response_timeout(Duration::from_secs(10));
181            let conn = match client.get_connection_manager_with_config(mgr_cfg).await {
182                Ok(conn) => conn,
183                Err(e) => {
184                    log::error!("get redis connection error, addr {:?}, {:?}", addr, e);
185                    return Err(anyhow!(e));
186                }
187            };
188            nodes.push(conn);
189        }
190        self.nodes = nodes;
191        log::info!("nodes.len(): {:?}", self.nodes.len());
192        Ok(())
193    }
194
195    /// Gets mutable reference to cluster nodes with refresh
196    #[inline]
197    async fn nodes_mut(&mut self) -> Result<&mut Vec<ConnectionManager>> {
198        //Refresh after a certain interval.
199        if (timestamp_millis() - self.nodes_update_time) > 5000 {
200            self.refresh_cluster_nodes().await?;
201        }
202        Ok(&mut self.nodes)
203    }
204
205    /// Creates key for length tracking sorted set
206    #[inline]
207    #[allow(dead_code)]
208    fn make_len_sortedset_key(&self) -> Key {
209        [KEY_PREFIX_LEN, self.prefix.as_slice()].concat()
210    }
211
212    /// Creates full key with prefix
213    #[inline]
214    fn make_full_key<K>(&self, key: K) -> Key
215    where
216        K: AsRef<[u8]>,
217    {
218        [KEY_PREFIX, self.prefix.as_slice(), key.as_ref()].concat()
219    }
220
221    /// Creates scan pattern with prefix
222    #[inline]
223    fn make_scan_pattern_match<P: AsRef<[u8]>>(&self, pattern: P) -> Key {
224        [KEY_PREFIX, self.prefix.as_slice(), pattern.as_ref()].concat()
225    }
226
227    /// Creates full map name with prefix
228    #[inline]
229    fn make_map_full_name<K>(&self, name: K) -> Key
230    where
231        K: AsRef<[u8]>,
232    {
233        [MAP_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
234    }
235
236    /// Creates full list name with prefix
237    #[inline]
238    fn make_list_full_name<K>(&self, name: K) -> Key
239    where
240        K: AsRef<[u8]>,
241    {
242        [LIST_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat()
243    }
244
245    /// Creates map prefix pattern for scanning
246    #[inline]
247    fn make_map_prefix_match(&self) -> Key {
248        [MAP_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
249    }
250
251    /// Creates list prefix pattern for scanning
252    #[inline]
253    fn make_list_prefix_match(&self) -> Key {
254        [LIST_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat()
255    }
256
257    /// Extracts map key from full name
258    #[inline]
259    fn map_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
260        full_name[MAP_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
261    }
262
263    /// Extracts list key from full name
264    #[inline]
265    fn list_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] {
266        full_name[LIST_NAME_PREFIX.len() + self.prefix.len()..].as_ref()
267    }
268
269    /// Gets full key name for a given key
270    #[inline]
271    async fn _get_full_name(&self, key: &[u8]) -> Result<Key> {
272        let map_full_name = self.make_map_full_name(key);
273        let mut async_conn = self.async_conn();
274        let full_name = if async_conn.exists(map_full_name.as_slice()).await? {
275            map_full_name
276        } else {
277            let list_full_name = self.make_list_full_name(key);
278            if async_conn.exists(list_full_name.as_slice()).await? {
279                list_full_name
280            } else {
281                self.make_full_key(key)
282            }
283        };
284        Ok(full_name)
285    }
286
287    /// Internal method to insert a key-value pair
288    #[inline]
289    async fn _insert<K, V>(
290        &self,
291        _key: K,
292        _val: &V,
293        _expire_interval: Option<TimestampMillis>,
294    ) -> Result<()>
295    where
296        K: AsRef<[u8]> + Sync + Send,
297        V: serde::ser::Serialize + Sync + Send,
298    {
299        #[cfg(not(feature = "len"))]
300        {
301            let full_key = self.make_full_key(_key.as_ref());
302            if let Some(expire_interval) = _expire_interval {
303                let mut async_conn = self.async_conn();
304                pipe()
305                    .atomic()
306                    .set(full_key.as_slice(), bincode::serialize(_val)?)
307                    .pexpire(full_key.as_slice(), expire_interval)
308                    .query_async::<()>(&mut async_conn)
309                    .await?;
310            } else {
311                let _: () = self
312                    .async_conn()
313                    .set(full_key, bincode::serialize(_val)?)
314                    .await?;
315            }
316        }
317        #[cfg(feature = "len")]
318        {
319            return Err(anyhow!("unsupported!"));
320            //@TODO ...
321            #[allow(unreachable_code)]
322            {
323                let full_key = self.make_full_key(_key.as_ref());
324                let db_zkey = self.make_len_sortedset_key();
325                let mut async_conn = self.async_conn();
326                if get_slot(db_zkey.as_slice()) == get_slot(full_key.as_slice()) {
327                    if let Some(expire_interval) = _expire_interval {
328                        pipe()
329                            .atomic()
330                            .set(full_key.as_slice(), bincode::serialize(_val)?)
331                            .pexpire(full_key.as_slice(), expire_interval)
332                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
333                            .query_async::<()>(&mut async_conn)
334                            .await?;
335                    } else {
336                        pipe()
337                            .atomic()
338                            .set(full_key.as_slice(), bincode::serialize(_val)?)
339                            .zadd(db_zkey, _key.as_ref(), i64::MAX)
340                            .query_async::<()>(&mut async_conn)
341                            .await?;
342                    }
343                } else {
344                    //
345                    if let Some(expire_interval) = _expire_interval {
346                        let _: () = pipe()
347                            .atomic()
348                            .set(full_key.as_slice(), bincode::serialize(_val)?)
349                            .pexpire(full_key.as_slice(), expire_interval)
350                            .query_async(&mut async_conn)
351                            .await?;
352                        let _: () = async_conn
353                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
354                            .await?;
355                    } else {
356                        let _: () = async_conn
357                            .set(full_key.as_slice(), bincode::serialize(_val)?)
358                            .await?;
359                        let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
360                    }
361                }
362            }
363        }
364
365        Ok(())
366    }
367
368    /// Internal method for batch insertion
369    #[inline]
370    async fn _batch_insert<V>(
371        &self,
372        _key_val_expires: Vec<(Key, Key, V, Option<TimestampMillis>)>,
373    ) -> Result<()>
374    where
375        V: serde::ser::Serialize + Sync + Send,
376    {
377        #[cfg(not(feature = "len"))]
378        {
379            let keys_vals: Vec<(&Key, Vec<u8>)> = _key_val_expires
380                .iter()
381                .map(|(_, full_key, v, _)| {
382                    bincode::serialize(&v)
383                        .map(move |v| (full_key, v))
384                        .map_err(|e| anyhow!(e))
385                })
386                .collect::<Result<Vec<_>>>()?;
387
388            let mut async_conn = self.async_conn();
389            let mut p = pipe();
390            let mut rpipe = p.atomic().mset(keys_vals.as_slice());
391            for (_, full_key, _, at) in _key_val_expires {
392                if let Some(at) = at {
393                    rpipe = rpipe.expire(full_key, at);
394                }
395            }
396            rpipe.query_async::<()>(&mut async_conn).await?;
397            Ok(())
398        }
399
400        #[cfg(feature = "len")]
401        {
402            return Err(anyhow!("unsupported!"));
403            //@TODO ...
404            #[allow(unreachable_code)]
405            {
406                for (k, _, v, expire) in _key_val_expires {
407                    self._insert(k.as_slice(), &v, expire).await?;
408                }
409                Ok(())
410            }
411        }
412    }
413
414    /// Internal method for batch removal
415    #[inline]
416    async fn _batch_remove(&self, _keys: Vec<Key>) -> Result<()> {
417        #[cfg(not(feature = "len"))]
418        {
419            let full_keys = _keys
420                .iter()
421                .map(|k| self.make_full_key(k))
422                .collect::<Vec<_>>();
423            let _: () = self.async_conn().del(full_keys).await?;
424        }
425        #[cfg(feature = "len")]
426        {
427            return Err(anyhow!("unsupported!"));
428            #[allow(unreachable_code)]
429            //@TODO ...
430            {
431                for key in _keys {
432                    self._remove(key).await?;
433                }
434            }
435        }
436        Ok(())
437    }
438
439    /// Internal method to increment a counter
440    #[inline]
441    async fn _counter_incr<K>(
442        &self,
443        _key: K,
444        _increment: isize,
445        _expire_interval: Option<TimestampMillis>,
446    ) -> Result<()>
447    where
448        K: AsRef<[u8]> + Sync + Send,
449    {
450        #[cfg(not(feature = "len"))]
451        {
452            let full_key = self.make_full_key(_key.as_ref());
453            if let Some(expire_interval) = _expire_interval {
454                let mut async_conn = self.async_conn();
455                pipe()
456                    .atomic()
457                    .incr(full_key.as_slice(), _increment)
458                    .pexpire(full_key.as_slice(), expire_interval)
459                    .query_async::<()>(&mut async_conn)
460                    .await?;
461            } else {
462                let _: () = self.async_conn().incr(full_key, _increment).await?;
463            }
464        }
465        #[cfg(feature = "len")]
466        {
467            return Err(anyhow!("unsupported!"));
468            //@TODO ...
469            #[allow(unreachable_code)]
470            {
471                let full_key = self.make_full_key(_key.as_ref());
472                let db_zkey = self.make_len_sortedset_key();
473                if get_slot(&db_zkey) == get_slot(full_key.as_slice()) {
474                    let mut async_conn = self.async_conn();
475                    if let Some(expire_interval) = _expire_interval {
476                        pipe()
477                            .atomic()
478                            .incr(full_key.as_slice(), _increment)
479                            .pexpire(full_key.as_slice(), expire_interval)
480                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
481                            .query_async::<()>(&mut async_conn)
482                            .await?;
483                    } else {
484                        pipe()
485                            .atomic()
486                            .incr(full_key.as_slice(), _increment)
487                            .zadd(db_zkey, _key.as_ref(), i64::MAX)
488                            .query_async::<()>(&mut async_conn)
489                            .await?;
490                    }
491                } else {
492                    let mut async_conn = self.async_conn();
493                    if let Some(expire_interval) = _expire_interval {
494                        let _: () = pipe()
495                            .atomic()
496                            .incr(full_key.as_slice(), _increment)
497                            .pexpire(full_key.as_slice(), expire_interval)
498                            .query_async::<()>(&mut async_conn)
499                            .await?;
500                        let _: () = async_conn
501                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
502                            .await?;
503                    } else {
504                        let _: () = async_conn.incr(full_key.as_slice(), _increment).await?;
505                        let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
506                    }
507                }
508            }
509        }
510        Ok(())
511    }
512
513    /// Internal method to decrement a counter
514    #[inline]
515    async fn _counter_decr<K>(
516        &self,
517        _key: K,
518        _decrement: isize,
519        _expire_interval: Option<TimestampMillis>,
520    ) -> Result<()>
521    where
522        K: AsRef<[u8]> + Sync + Send,
523    {
524        #[cfg(not(feature = "len"))]
525        {
526            let full_key = self.make_full_key(_key.as_ref());
527            if let Some(expire_interval) = _expire_interval {
528                let mut async_conn = self.async_conn();
529                pipe()
530                    .atomic()
531                    .decr(full_key.as_slice(), _decrement)
532                    .pexpire(full_key.as_slice(), expire_interval)
533                    .query_async::<()>(&mut async_conn)
534                    .await?;
535            } else {
536                let _: () = self.async_conn().decr(full_key, _decrement).await?;
537            }
538        }
539        #[cfg(feature = "len")]
540        {
541            return Err(anyhow!("unsupported!"));
542            //@TODO ...
543            #[allow(unreachable_code)]
544            {
545                let full_key = self.make_full_key(_key.as_ref());
546                let db_zkey = self.make_len_sortedset_key();
547                let mut async_conn = self.async_conn();
548                if get_slot(&db_zkey) == get_slot(full_key.as_slice()) {
549                    if let Some(expire_interval) = _expire_interval {
550                        pipe()
551                            .atomic()
552                            .decr(full_key.as_slice(), _decrement)
553                            .pexpire(full_key.as_slice(), expire_interval)
554                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
555                            .query_async::<()>(&mut async_conn)
556                            .await?;
557                    } else {
558                        pipe()
559                            .atomic()
560                            .decr(full_key.as_slice(), _decrement)
561                            .zadd(db_zkey, _key.as_ref(), i64::MAX)
562                            .query_async::<()>(&mut async_conn)
563                            .await?;
564                    }
565                } else {
566                    //
567                    if let Some(expire_interval) = _expire_interval {
568                        let _: () = pipe()
569                            .atomic()
570                            .decr(full_key.as_slice(), _decrement)
571                            .pexpire(full_key.as_slice(), expire_interval)
572                            .query_async::<()>(&mut async_conn)
573                            .await?;
574                        let _: () = async_conn
575                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
576                            .await?;
577                    } else {
578                        let _: () = async_conn.decr(full_key.as_slice(), _decrement).await?;
579                        let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
580                    }
581                }
582            }
583        }
584        Ok(())
585    }
586
587    /// Internal method to set a counter value
588    #[inline]
589    async fn _counter_set<K>(
590        &self,
591        _key: K,
592        _val: isize,
593        _expire_interval: Option<TimestampMillis>,
594    ) -> Result<()>
595    where
596        K: AsRef<[u8]> + Sync + Send,
597    {
598        #[cfg(not(feature = "len"))]
599        {
600            let full_key = self.make_full_key(_key.as_ref());
601            if let Some(expire_interval) = _expire_interval {
602                let mut async_conn = self.async_conn();
603                pipe()
604                    .atomic()
605                    .set(full_key.as_slice(), _val)
606                    .pexpire(full_key.as_slice(), expire_interval)
607                    .query_async::<()>(&mut async_conn)
608                    .await?;
609            } else {
610                let _: () = self.async_conn().set(full_key, _val).await?;
611            }
612        }
613        #[cfg(feature = "len")]
614        {
615            return Err(anyhow!("unsupported!"));
616            //@TODO ...
617            #[allow(unreachable_code)]
618            {
619                let full_key = self.make_full_key(_key.as_ref());
620                let db_zkey = self.make_len_sortedset_key();
621                let mut async_conn = self.async_conn();
622                if get_slot(&db_zkey) == get_slot(full_key.as_slice()) {
623                    if let Some(expire_interval) = _expire_interval {
624                        pipe()
625                            .atomic()
626                            .set(full_key.as_slice(), _val)
627                            .pexpire(full_key.as_slice(), expire_interval)
628                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
629                            .query_async::<()>(&mut async_conn)
630                            .await?;
631                    } else {
632                        pipe()
633                            .atomic()
634                            .set(full_key.as_slice(), _val)
635                            .zadd(db_zkey, _key.as_ref(), i64::MAX)
636                            .query_async::<()>(&mut async_conn)
637                            .await?;
638                    }
639                } else {
640                    //
641                    if let Some(expire_interval) = _expire_interval {
642                        pipe()
643                            .atomic()
644                            .set(full_key.as_slice(), _val)
645                            .pexpire(full_key.as_slice(), expire_interval)
646                            .query_async::<()>(&mut async_conn)
647                            .await?;
648                        let _: () = async_conn
649                            .zadd(db_zkey, _key.as_ref(), timestamp_millis() + expire_interval)
650                            .await?;
651                    } else {
652                        let _: () = async_conn.set(full_key.as_slice(), _val).await?;
653                        let _: () = async_conn.zadd(db_zkey, _key.as_ref(), i64::MAX).await?;
654                    }
655                }
656            }
657        }
658
659        Ok(())
660    }
661
662    /// Internal method to remove a key
663    #[inline]
664    async fn _remove<K>(&self, _key: K) -> Result<()>
665    where
666        K: AsRef<[u8]> + Sync + Send,
667    {
668        #[cfg(not(feature = "len"))]
669        {
670            let full_key = self.make_full_key(_key.as_ref());
671            let _: () = self.async_conn().del(full_key).await?;
672            Ok(())
673        }
674        #[cfg(feature = "len")]
675        {
676            return Err(anyhow!("unsupported!"));
677            //@TODO ...
678            #[allow(unreachable_code)]
679            {
680                let full_key = self.make_full_key(_key.as_ref());
681                let db_zkey = self.make_len_sortedset_key();
682
683                let mut async_conn = self.async_conn();
684                if get_slot(&db_zkey) == get_slot(_key.as_ref()) {
685                    pipe()
686                        .atomic()
687                        .del(full_key.as_slice())
688                        .zrem(db_zkey, _key.as_ref())
689                        .query_async::<()>(&mut async_conn)
690                        .await?;
691                } else {
692                    let _: () = async_conn.zrem(db_zkey, _key.as_ref()).await?;
693                    let _: () = async_conn.del(full_key).await?;
694                }
695                Ok(())
696            }
697        }
698    }
699}
700
701#[async_trait]
702impl StorageDB for RedisStorageDB {
703    type MapType = RedisStorageMap;
704    type ListType = RedisStorageList;
705
706    /// Creates a new map with optional expiration
707    #[inline]
708    async fn map<V: AsRef<[u8]> + Sync + Send>(
709        &self,
710        name: V,
711        expire: Option<TimestampMillis>,
712    ) -> Result<Self::MapType> {
713        let full_name = self.make_map_full_name(name.as_ref());
714        Ok(
715            RedisStorageMap::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
716                .await?,
717        )
718    }
719
720    /// Removes a map
721    #[inline]
722    async fn map_remove<K>(&self, name: K) -> Result<()>
723    where
724        K: AsRef<[u8]> + Sync + Send,
725    {
726        let map_full_name = self.make_map_full_name(name.as_ref());
727        let _: () = self.async_conn().del(map_full_name).await?;
728        Ok(())
729    }
730
731    /// Checks if a map exists
732    #[inline]
733    async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
734        let map_full_name = self.make_map_full_name(key.as_ref());
735        Ok(self.async_conn().exists(map_full_name).await?)
736    }
737
738    /// Creates a new list with optional expiration
739    #[inline]
740    async fn list<V: AsRef<[u8]> + Sync + Send>(
741        &self,
742        name: V,
743        expire: Option<TimestampMillis>,
744    ) -> Result<Self::ListType> {
745        let full_name = self.make_list_full_name(name.as_ref());
746        Ok(
747            RedisStorageList::new_expire(name.as_ref().to_vec(), full_name, expire, self.clone())
748                .await?,
749        )
750    }
751
752    /// Removes a list
753    #[inline]
754    async fn list_remove<K>(&self, name: K) -> Result<()>
755    where
756        K: AsRef<[u8]> + Sync + Send,
757    {
758        let list_full_name = self.make_list_full_name(name.as_ref());
759        let _: () = self.async_conn().del(list_full_name).await?;
760        Ok(())
761    }
762
763    /// Checks if a list exists
764    #[inline]
765    async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
766        let list_full_name = self.make_list_full_name(key.as_ref());
767        Ok(self.async_conn().exists(list_full_name).await?)
768    }
769
770    /// Inserts a key-value pair
771    #[inline]
772    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
773    where
774        K: AsRef<[u8]> + Sync + Send,
775        V: serde::ser::Serialize + Sync + Send,
776    {
777        self._insert(key, val, None).await
778    }
779
780    /// Gets a value by key
781    #[inline]
782    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
783    where
784        K: AsRef<[u8]> + Sync + Send,
785        V: DeserializeOwned + Sync + Send,
786    {
787        let full_key = self.make_full_key(key);
788        if let Some(v) = self
789            .async_conn()
790            .get::<_, Option<Vec<u8>>>(full_key)
791            .await?
792        {
793            Ok(Some(bincode::deserialize::<V>(v.as_ref())?))
794        } else {
795            Ok(None)
796        }
797    }
798
799    /// Removes a key
800    #[inline]
801    async fn remove<K>(&self, key: K) -> Result<()>
802    where
803        K: AsRef<[u8]> + Sync + Send,
804    {
805        self._remove(key).await
806    }
807
808    /// Batch insertion of key-value pairs
809    #[inline]
810    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
811    where
812        V: Serialize + Sync + Send,
813    {
814        if !key_vals.is_empty() {
815            let mut slot_keys_vals_expires = key_vals
816                .into_iter()
817                .map(|(k, v)| {
818                    let full_key = self.make_full_key(k.as_slice());
819                    (get_slot(&full_key), (k, full_key, v, None))
820                })
821                .collect::<Vec<_>>();
822
823            if !slot_keys_vals_expires.is_empty() {
824                for keys_vals_expires in transform_by_slot(slot_keys_vals_expires) {
825                    self._batch_insert(keys_vals_expires).await?;
826                }
827            } else if let Some((_, keys_vals_expires)) = slot_keys_vals_expires.pop() {
828                self._batch_insert(vec![keys_vals_expires]).await?;
829            }
830        }
831        Ok(())
832    }
833
834    /// Batch removal of keys
835    #[inline]
836    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
837        if !keys.is_empty() {
838            self._batch_remove(keys).await?;
839        }
840        Ok(())
841    }
842
843    /// Increments a counter
844    #[inline]
845    async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
846    where
847        K: AsRef<[u8]> + Sync + Send,
848    {
849        self._counter_incr(key, increment, None).await
850    }
851
852    /// Decrements a counter
853    #[inline]
854    async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
855    where
856        K: AsRef<[u8]> + Sync + Send,
857    {
858        self._counter_decr(key, decrement, None).await
859    }
860
861    /// Gets a counter value
862    #[inline]
863    async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
864    where
865        K: AsRef<[u8]> + Sync + Send,
866    {
867        let full_key = self.make_full_key(key);
868        Ok(self.async_conn().get::<_, Option<isize>>(full_key).await?)
869    }
870
871    /// Sets a counter value
872    #[inline]
873    async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
874    where
875        K: AsRef<[u8]> + Sync + Send,
876    {
877        self._counter_set(key, val, None).await
878    }
879
880    /// Checks if a key exists
881    #[inline]
882    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
883        //HEXISTS key field
884        let full_key = self.make_full_key(key.as_ref());
885        Ok(self.async_conn().exists(full_key).await?)
886    }
887
888    /// Gets the number of keys in the database
889    #[inline]
890    #[cfg(feature = "len")]
891    async fn len(&self) -> Result<usize> {
892        return Err(anyhow!("unsupported!"));
893        //@TODO ...
894        #[allow(unreachable_code)]
895        {
896            let db_zkey = self.make_len_sortedset_key();
897            let mut async_conn = self.async_conn();
898            let (_, count) = pipe()
899                .zrembyscore(db_zkey.as_slice(), 0, timestamp_millis())
900                .zcard(db_zkey.as_slice())
901                .query_async::<(i64, usize)>(&mut async_conn)
902                .await?;
903            Ok(count)
904        }
905    }
906
907    /// Gets the total database size
908    #[inline]
909    async fn db_size(&self) -> Result<usize> {
910        let mut dbsize = 0;
911        for mut async_conn in self.nodes.clone() {
912            //DBSIZE
913            let dbsize_val = redis::pipe()
914                .cmd("DBSIZE")
915                .query_async::<redis::Value>(&mut async_conn)
916                .await?;
917            dbsize += dbsize_val
918                .as_sequence()
919                .and_then(|vs| {
920                    vs.iter().next().and_then(|v| {
921                        if let redis::Value::Int(v) = v {
922                            Some(*v)
923                        } else {
924                            None
925                        }
926                    })
927                })
928                .unwrap_or_default();
929        }
930        Ok(dbsize as usize)
931    }
932
933    /// Sets expiration time for a key
934    #[inline]
935    #[cfg(feature = "ttl")]
936    async fn expire_at<K>(&self, _key: K, _at: TimestampMillis) -> Result<bool>
937    where
938        K: AsRef<[u8]> + Sync + Send,
939    {
940        #[cfg(not(feature = "len"))]
941        {
942            let full_name = self.make_full_key(_key.as_ref());
943            let res = self
944                .async_conn()
945                .pexpire_at::<_, bool>(full_name, _at)
946                .await?;
947            Ok(res)
948        }
949        #[cfg(feature = "len")]
950        {
951            return Err(anyhow!("unsupported!"));
952            //@TODO ...
953            #[allow(unreachable_code)]
954            {
955                let full_name = self.make_full_key(_key.as_ref());
956                let db_zkey = self.make_len_sortedset_key();
957                let mut async_conn = self.async_conn();
958                if get_slot(&db_zkey) == get_slot(full_name.as_slice()) {
959                    let (_, res) = pipe()
960                        .atomic()
961                        .zadd(db_zkey, _key.as_ref(), _at)
962                        .pexpire_at(full_name.as_slice(), _at)
963                        .query_async::<(i64, bool)>(&mut async_conn)
964                        .await?;
965                    Ok(res)
966                } else {
967                    let res = async_conn.zadd(db_zkey, _key.as_ref(), _at).await?;
968                    let _: () = async_conn.pexpire_at(full_name.as_slice(), _at).await?;
969                    Ok(res)
970                }
971            }
972        }
973    }
974
975    /// Sets expiration duration for a key
976    #[inline]
977    #[cfg(feature = "ttl")]
978    async fn expire<K>(&self, _key: K, _dur: TimestampMillis) -> Result<bool>
979    where
980        K: AsRef<[u8]> + Sync + Send,
981    {
982        #[cfg(not(feature = "len"))]
983        {
984            let full_name = self.make_full_key(_key.as_ref());
985            let res = self
986                .async_conn()
987                .pexpire::<_, bool>(full_name, _dur)
988                .await?;
989            Ok(res)
990        }
991        #[cfg(feature = "len")]
992        {
993            return Err(anyhow!("unsupported!"));
994            //@TODO ...
995            #[allow(unreachable_code)]
996            {
997                let full_name = self.make_full_key(_key.as_ref());
998                let db_zkey = self.make_len_sortedset_key();
999                let mut async_conn = self.async_conn();
1000                if get_slot(db_zkey.as_slice()) == get_slot(full_name.as_slice()) {
1001                    let (_, res) = pipe()
1002                        .atomic()
1003                        .zadd(db_zkey, _key.as_ref(), timestamp_millis() + _dur)
1004                        .pexpire(full_name.as_slice(), _dur)
1005                        .query_async::<(i64, bool)>(&mut async_conn)
1006                        .await?;
1007                    Ok(res)
1008                } else {
1009                    let _: () = async_conn
1010                        .zadd(db_zkey, _key.as_ref(), timestamp_millis() + _dur)
1011                        .await?;
1012                    let res = async_conn
1013                        .pexpire::<_, bool>(full_name.as_slice(), _dur)
1014                        .await?;
1015                    Ok(res)
1016                }
1017            }
1018        }
1019    }
1020
1021    /// Gets time-to-live for a key
1022    #[inline]
1023    #[cfg(feature = "ttl")]
1024    async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
1025    where
1026        K: AsRef<[u8]> + Sync + Send,
1027    {
1028        let mut async_conn = self.async_conn();
1029        let full_key = self.make_full_key(key.as_ref());
1030        let res = async_conn.pttl::<_, isize>(full_key).await?;
1031        match res {
1032            -2 => Ok(None),
1033            -1 => Ok(Some(TimestampMillis::MAX)),
1034            _ => Ok(Some(res as TimestampMillis)),
1035        }
1036    }
1037
1038    /// Creates an iterator for all maps
1039    #[inline]
1040    async fn map_iter<'a>(
1041        &'a mut self,
1042    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
1043        let db = self.clone();
1044        let pattern = self.make_map_prefix_match();
1045
1046        let mut iters = Vec::new();
1047        for conn in self.nodes_mut().await?.iter_mut() {
1048            let iter = conn.scan_match::<_, Key>(pattern.as_slice()).await?;
1049            iters.push(iter);
1050        }
1051
1052        let iter = AsyncMapIter { db, iters };
1053        Ok(Box::new(iter))
1054    }
1055
1056    /// Creates an iterator for all lists
1057    #[inline]
1058    async fn list_iter<'a>(
1059        &'a mut self,
1060    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
1061        let db = self.clone();
1062        let pattern = self.make_list_prefix_match();
1063
1064        let mut iters = Vec::new();
1065        for conn in self.nodes_mut().await?.iter_mut() {
1066            let iter = conn.scan_match::<_, Key>(pattern.as_slice()).await?;
1067            iters.push(iter);
1068        }
1069
1070        let iter = AsyncListIter { db, iters };
1071        Ok(Box::new(iter))
1072    }
1073
1074    /// Creates an iterator for keys matching a pattern
1075    async fn scan<'a, P>(
1076        &'a mut self,
1077        pattern: P,
1078    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
1079    where
1080        P: AsRef<[u8]> + Send + Sync,
1081    {
1082        let pattern = self.make_scan_pattern_match(pattern);
1083        let prefix_len = KEY_PREFIX.len() + self.prefix.len();
1084
1085        let mut iters = Vec::new();
1086        for conn in self.nodes_mut().await?.iter_mut() {
1087            let iter = conn.scan_match::<_, Key>(pattern.as_slice()).await?;
1088            iters.push(iter);
1089        }
1090        Ok(Box::new(AsyncDbKeyIter { prefix_len, iters }))
1091    }
1092
1093    /// Gets database information
1094    #[inline]
1095    async fn info(&self) -> Result<Value> {
1096        Ok(serde_json::json!({
1097            "storage_engine": "RedisCluster",
1098            "dbsize": self.db_size().await?,
1099        }))
1100    }
1101}
1102
1103/// Redis-backed map storage implementation
1104#[derive(Clone)]
1105pub struct RedisStorageMap {
1106    /// Name of the map
1107    name: Key,
1108    /// Full key name with prefix
1109    full_name: Key,
1110    /// Optional expiration time in milliseconds
1111    #[allow(dead_code)]
1112    expire: Option<TimestampMillis>,
1113    /// Flag indicating if the map is empty
1114    empty: Arc<AtomicBool>,
1115    /// Reference to the parent database
1116    pub(crate) db: RedisStorageDB,
1117}
1118
1119impl RedisStorageMap {
1120    /// Creates a new map without expiration
1121    #[inline]
1122    pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
1123        Self {
1124            name,
1125            full_name,
1126            expire: None,
1127            empty: Arc::new(AtomicBool::new(true)),
1128            db,
1129        }
1130    }
1131
1132    /// Creates a new map with expiration
1133    #[inline]
1134    pub(crate) async fn new_expire(
1135        name: Key,
1136        full_name: Key,
1137        expire: Option<TimestampMillis>,
1138        mut db: RedisStorageDB,
1139    ) -> Result<Self> {
1140        let empty = if expire.is_some() {
1141            let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
1142            Arc::new(AtomicBool::new(empty))
1143        } else {
1144            Arc::new(AtomicBool::new(true))
1145        };
1146        Ok(Self {
1147            name,
1148            full_name,
1149            expire,
1150            empty,
1151            db,
1152        })
1153    }
1154
1155    /// Gets a clone of the async connection
1156    #[inline]
1157    fn async_conn(&self) -> RedisConnection {
1158        self.db.async_conn()
1159    }
1160
1161    /// Gets a mutable reference to the async connection
1162    #[inline]
1163    fn async_conn_mut(&mut self) -> &mut RedisConnection {
1164        self.db.async_conn_mut()
1165    }
1166
1167    /// Checks if the map is empty
1168    #[inline]
1169    async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
1170        //HSCAN key cursor [MATCH pattern] [COUNT count]
1171        let res = async_conn
1172            .hscan::<_, Vec<u8>>(full_name)
1173            .await?
1174            .next_item()
1175            .await
1176            .is_none();
1177        Ok(res)
1178    }
1179
1180    /// Internal method to insert with expiration handling
1181    #[inline]
1182    async fn _insert_expire(&self, key: &[u8], val: Vec<u8>) -> Result<()> {
1183        let mut async_conn = self.async_conn();
1184        let name = self.full_name.as_slice();
1185
1186        #[cfg(feature = "ttl")]
1187        if self.empty.load(Ordering::SeqCst) {
1188            if let Some(expire) = self.expire.as_ref() {
1189                //HSET key field value
1190                //PEXPIRE key ms
1191                let _: () = redis::pipe()
1192                    .atomic()
1193                    .hset(name, key, val)
1194                    .pexpire(name, *expire)
1195                    .query_async(&mut async_conn)
1196                    .await?;
1197                self.empty.store(false, Ordering::SeqCst);
1198                return Ok(());
1199            }
1200        }
1201
1202        //HSET key field value
1203        let _: () = async_conn.hset(name, key.as_ref(), val).await?;
1204        Ok(())
1205    }
1206
1207    /// Internal method for batch insertion with expiration
1208    #[inline]
1209    async fn _batch_insert_expire(&self, key_vals: Vec<(Key, Vec<u8>)>) -> Result<()> {
1210        let mut async_conn = self.async_conn();
1211        let name = self.full_name.as_slice();
1212
1213        #[cfg(feature = "ttl")]
1214        if self.empty.load(Ordering::SeqCst) {
1215            if let Some(expire) = self.expire.as_ref() {
1216                //HMSET key field value
1217                //PEXPIRE key ms
1218                let _: () = redis::pipe()
1219                    .atomic()
1220                    .hset_multiple(name, key_vals.as_slice())
1221                    .pexpire(name, *expire)
1222                    .query_async(&mut async_conn)
1223                    .await?;
1224
1225                self.empty.store(false, Ordering::SeqCst);
1226                return Ok(());
1227            }
1228        }
1229
1230        //HSET key field value
1231        let _: () = async_conn.hset_multiple(name, key_vals.as_slice()).await?;
1232        Ok(())
1233    }
1234}
1235
1236#[async_trait]
1237impl Map for RedisStorageMap {
1238    /// Gets the map name
1239    #[inline]
1240    fn name(&self) -> &[u8] {
1241        self.name.as_slice()
1242    }
1243
1244    /// Inserts a key-value pair into the map
1245    #[inline]
1246    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
1247    where
1248        K: AsRef<[u8]> + Sync + Send,
1249        V: Serialize + Sync + Send + ?Sized,
1250    {
1251        self._insert_expire(key.as_ref(), bincode::serialize(val)?)
1252            .await
1253    }
1254
1255    /// Gets a value from the map
1256    #[inline]
1257    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
1258    where
1259        K: AsRef<[u8]> + Sync + Send,
1260        V: DeserializeOwned + Sync + Send,
1261    {
1262        //HSET key field value
1263        let res: Option<Vec<u8>> = self
1264            .async_conn()
1265            .hget(self.full_name.as_slice(), key.as_ref())
1266            .await?;
1267        if let Some(res) = res {
1268            Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1269        } else {
1270            Ok(None)
1271        }
1272    }
1273
1274    /// Removes a key from the map
1275    #[inline]
1276    async fn remove<K>(&self, key: K) -> Result<()>
1277    where
1278        K: AsRef<[u8]> + Sync + Send,
1279    {
1280        //HDEL key field [field ...]
1281        let _: () = self
1282            .async_conn()
1283            .hdel(self.full_name.as_slice(), key.as_ref())
1284            .await?;
1285        Ok(())
1286    }
1287
1288    /// Checks if a key exists in the map
1289    #[inline]
1290    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1291        //HEXISTS key field
1292        let res = self
1293            .async_conn()
1294            .hexists(self.full_name.as_slice(), key.as_ref())
1295            .await?;
1296        Ok(res)
1297    }
1298
1299    /// Gets the number of elements in the map
1300    #[cfg(feature = "map_len")]
1301    #[inline]
1302    async fn len(&self) -> Result<usize> {
1303        //HLEN key
1304        Ok(self.async_conn().hlen(self.full_name.as_slice()).await?)
1305    }
1306
1307    /// Checks if the map is empty
1308    #[inline]
1309    async fn is_empty(&self) -> Result<bool> {
1310        //HSCAN key cursor [MATCH pattern] [COUNT count]
1311        let res = self
1312            .async_conn()
1313            .hscan::<_, Vec<u8>>(self.full_name.as_slice())
1314            .await?
1315            .next_item()
1316            .await
1317            .is_none();
1318        Ok(res)
1319    }
1320
1321    /// Clears all elements from the map
1322    #[inline]
1323    async fn clear(&self) -> Result<()> {
1324        //DEL key [key ...]
1325        let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1326        self.empty.store(true, Ordering::SeqCst);
1327        Ok(())
1328    }
1329
1330    /// Removes and returns a value from the map
1331    #[inline]
1332    async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
1333    where
1334        K: AsRef<[u8]> + Sync + Send,
1335        V: DeserializeOwned + Sync + Send,
1336    {
1337        //HSET key field value
1338        //HDEL key field [field ...]
1339        let name = self.full_name.as_slice();
1340        let mut conn = self.async_conn();
1341        let (res, _): (Option<Vec<u8>>, isize) = redis::pipe()
1342            .atomic()
1343            .hget(name, key.as_ref())
1344            .hdel(name, key.as_ref())
1345            .query_async(&mut conn)
1346            .await?;
1347
1348        if let Some(res) = res {
1349            Ok(Some(bincode::deserialize::<V>(res.as_ref())?))
1350        } else {
1351            Ok(None)
1352        }
1353    }
1354
1355    /// Removes all keys with a given prefix
1356    #[inline]
1357    async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
1358    where
1359        K: AsRef<[u8]> + Sync + Send,
1360    {
1361        let name = self.full_name.as_slice();
1362        let mut conn = self.async_conn();
1363        let mut conn2 = conn.clone();
1364        let mut prefix = prefix.as_ref().to_vec();
1365        prefix.push(b'*');
1366        let mut removeds = Vec::new();
1367        while let Some(key) = conn
1368            .hscan_match::<_, _, Vec<u8>>(name, prefix.as_slice())
1369            .await?
1370            .next_item()
1371            .await
1372        {
1373            removeds.push(key?);
1374            if removeds.len() > 20 {
1375                let _: () = conn2.hdel(name, removeds.as_slice()).await?;
1376                removeds.clear();
1377            }
1378        }
1379        if !removeds.is_empty() {
1380            let _: () = conn.hdel(name, removeds).await?;
1381        }
1382        Ok(())
1383    }
1384
1385    /// Batch insertion of key-value pairs
1386    #[inline]
1387    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
1388    where
1389        V: Serialize + Sync + Send,
1390    {
1391        if !key_vals.is_empty() {
1392            let key_vals = key_vals
1393                .into_iter()
1394                .map(|(k, v)| {
1395                    bincode::serialize(&v)
1396                        .map(move |v| (k, v))
1397                        .map_err(|e| anyhow!(e))
1398                })
1399                .collect::<Result<Vec<_>>>()?;
1400
1401            self._batch_insert_expire(key_vals).await?;
1402        }
1403        Ok(())
1404    }
1405
1406    /// Batch removal of keys
1407    #[inline]
1408    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1409        if !keys.is_empty() {
1410            let _: () = self
1411                .async_conn()
1412                .hdel(self.full_name.as_slice(), keys)
1413                .await?;
1414        }
1415        Ok(())
1416    }
1417
1418    /// Creates an iterator over key-value pairs
1419    #[inline]
1420    async fn iter<'a, V>(
1421        &'a mut self,
1422    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1423    where
1424        V: DeserializeOwned + Sync + Send + 'a + 'static,
1425    {
1426        let name = self.full_name.clone();
1427        let iter = AsyncIter {
1428            iter: self
1429                .async_conn_mut()
1430                .hscan::<_, (Key, Vec<u8>)>(name)
1431                .await?,
1432            _m: std::marker::PhantomData,
1433        };
1434        Ok(Box::new(iter))
1435    }
1436
1437    /// Creates an iterator over keys
1438    #[inline]
1439    async fn key_iter<'a>(
1440        &'a mut self,
1441    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
1442        let iter = AsyncKeyIter {
1443            iter: self
1444                .db
1445                .async_conn
1446                .hscan::<_, (Key, ())>(self.full_name.as_slice())
1447                .await?,
1448        };
1449        Ok(Box::new(iter))
1450    }
1451
1452    /// Creates an iterator over key-value pairs with a prefix
1453    #[inline]
1454    async fn prefix_iter<'a, P, V>(
1455        &'a mut self,
1456        prefix: P,
1457    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
1458    where
1459        P: AsRef<[u8]> + Send + Sync,
1460        V: DeserializeOwned + Sync + Send + 'a + 'static,
1461    {
1462        let name = self.full_name.clone();
1463        let mut prefix = prefix.as_ref().to_vec();
1464        prefix.push(b'*');
1465        let iter = AsyncIter {
1466            iter: self
1467                .async_conn_mut()
1468                .hscan_match::<_, _, (Key, Vec<u8>)>(name, prefix.as_slice())
1469                .await?,
1470            _m: std::marker::PhantomData,
1471        };
1472        Ok(Box::new(iter))
1473    }
1474
1475    /// Sets expiration time for the map
1476    #[cfg(feature = "ttl")]
1477    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1478        let res = self
1479            .async_conn()
1480            .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1481            .await?;
1482        Ok(res)
1483    }
1484
1485    /// Sets expiration duration for the map
1486    #[cfg(feature = "ttl")]
1487    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1488        let res = self
1489            .async_conn()
1490            .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1491            .await?;
1492        Ok(res)
1493    }
1494
1495    /// Gets time-to-live for the map
1496    #[cfg(feature = "ttl")]
1497    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1498        let mut async_conn = self.async_conn();
1499        let res = async_conn
1500            .pttl::<_, isize>(self.full_name.as_slice())
1501            .await?;
1502        match res {
1503            -2 => Ok(None),
1504            -1 => Ok(Some(TimestampMillis::MAX)),
1505            _ => Ok(Some(res as TimestampMillis)),
1506        }
1507    }
1508}
1509
1510/// Redis-backed list storage implementation
1511#[derive(Clone)]
1512pub struct RedisStorageList {
1513    /// Name of the list
1514    name: Key,
1515    /// Full key name with prefix
1516    full_name: Key,
1517    /// Optional expiration time in milliseconds
1518    #[allow(dead_code)]
1519    expire: Option<TimestampMillis>,
1520    /// Flag indicating if the list is empty
1521    empty: Arc<AtomicBool>,
1522    /// Reference to the parent database
1523    pub(crate) db: RedisStorageDB,
1524}
1525
1526impl RedisStorageList {
1527    /// Creates a new list without expiration
1528    #[inline]
1529    pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self {
1530        Self {
1531            name,
1532            full_name,
1533            expire: None,
1534            empty: Arc::new(AtomicBool::new(true)),
1535            db,
1536        }
1537    }
1538
1539    /// Creates a new list with expiration
1540    #[inline]
1541    pub(crate) async fn new_expire(
1542        name: Key,
1543        full_name: Key,
1544        expire: Option<TimestampMillis>,
1545        mut db: RedisStorageDB,
1546    ) -> Result<Self> {
1547        let empty = if expire.is_some() {
1548            let empty = Self::_is_empty(&mut db.async_conn, full_name.as_slice()).await?;
1549            Arc::new(AtomicBool::new(empty))
1550        } else {
1551            Arc::new(AtomicBool::new(true))
1552        };
1553        Ok(Self {
1554            name,
1555            full_name,
1556            expire,
1557            empty,
1558            db,
1559        })
1560    }
1561
1562    /// Gets a clone of the async connection
1563    #[inline]
1564    pub(crate) fn async_conn(&self) -> RedisConnection {
1565        self.db.async_conn()
1566    }
1567
1568    /// Checks if the list is empty
1569    #[inline]
1570    async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result<bool> {
1571        Ok(async_conn.llen::<_, usize>(full_name).await? == 0)
1572    }
1573
1574    /// Internal method to push with expiration handling
1575    #[inline]
1576    async fn _push_expire(&self, val: Vec<u8>) -> Result<()> {
1577        let mut async_conn = self.async_conn();
1578        let name = self.full_name.as_slice();
1579
1580        #[cfg(feature = "ttl")]
1581        if self.empty.load(Ordering::SeqCst) {
1582            if let Some(expire) = self.expire.as_ref() {
1583                //RPUSH key value [value ...]
1584                //PEXPIRE key ms
1585                let _: () = redis::pipe()
1586                    .atomic()
1587                    .rpush(name, val)
1588                    .pexpire(name, *expire)
1589                    .query_async(&mut async_conn)
1590                    .await?;
1591                self.empty.store(false, Ordering::SeqCst);
1592                return Ok(());
1593            }
1594        }
1595
1596        //RPUSH key value [value ...]
1597        let _: () = async_conn.rpush(name, val).await?;
1598        Ok(())
1599    }
1600
1601    /// Internal method for batch push with expiration
1602    #[inline]
1603    async fn _pushs_expire(&self, vals: Vec<Vec<u8>>) -> Result<()> {
1604        let mut async_conn = self.async_conn();
1605
1606        #[cfg(feature = "ttl")]
1607        if self.empty.load(Ordering::SeqCst) {
1608            if let Some(expire) = self.expire.as_ref() {
1609                let name = self.full_name.as_slice();
1610                //RPUSH key value [value ...]
1611                //PEXPIRE key ms
1612                let _: () = redis::pipe()
1613                    .atomic()
1614                    .rpush(name, vals)
1615                    .pexpire(name, *expire)
1616                    .query_async(&mut async_conn)
1617                    .await?;
1618                self.empty.store(false, Ordering::SeqCst);
1619                return Ok(());
1620            }
1621        }
1622
1623        //RPUSH key value [value ...]
1624        let _: () = async_conn.rpush(self.full_name.as_slice(), vals).await?;
1625        Ok(())
1626    }
1627
1628    /// Internal method for push with limit and expiration
1629    #[inline]
1630    async fn _push_limit_expire(
1631        &self,
1632        val: Vec<u8>,
1633        limit: usize,
1634        pop_front_if_limited: bool,
1635    ) -> Result<Option<Vec<u8>>> {
1636        let mut conn = self.async_conn();
1637
1638        #[cfg(feature = "ttl")]
1639        if self.empty.load(Ordering::SeqCst) {
1640            if let Some(expire) = self.expire.as_ref() {
1641                let name = self.full_name.as_slice();
1642                let count = conn.llen::<_, usize>(name).await?;
1643                let res = if count < limit {
1644                    let _: () = redis::pipe()
1645                        .atomic()
1646                        .rpush(name, val)
1647                        .pexpire(name, *expire)
1648                        .query_async(&mut conn)
1649                        .await?;
1650                    Ok(None)
1651                } else if pop_front_if_limited {
1652                    let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1653                        .atomic()
1654                        .lpop(name, None)
1655                        .rpush(name, val)
1656                        .pexpire(name, *expire)
1657                        .query_async(&mut conn)
1658                        .await?;
1659
1660                    Ok(poped)
1661                } else {
1662                    Err(anyhow::Error::msg("Is full"))
1663                };
1664                self.empty.store(false, Ordering::SeqCst);
1665                return res;
1666            }
1667        }
1668
1669        self._push_limit(val, limit, pop_front_if_limited, &mut conn)
1670            .await
1671    }
1672
1673    /// Internal method for push with limit
1674    #[inline]
1675    async fn _push_limit(
1676        &self,
1677        val: Vec<u8>,
1678        limit: usize,
1679        pop_front_if_limited: bool,
1680        async_conn: &mut RedisConnection,
1681    ) -> Result<Option<Vec<u8>>> {
1682        let name = self.full_name.as_slice();
1683
1684        let count = async_conn.llen::<_, usize>(name).await?;
1685        if count < limit {
1686            let _: () = async_conn.rpush(name, val).await?;
1687            Ok(None)
1688        } else if pop_front_if_limited {
1689            let (poped, _): (Option<Vec<u8>>, Option<()>) = redis::pipe()
1690                .atomic()
1691                .lpop(name, None)
1692                .rpush(name, val)
1693                .query_async(async_conn)
1694                .await?;
1695            Ok(poped)
1696        } else {
1697            Err(anyhow::Error::msg("Is full"))
1698        }
1699    }
1700}
1701
1702#[async_trait]
1703impl List for RedisStorageList {
1704    /// Gets the list name
1705    #[inline]
1706    fn name(&self) -> &[u8] {
1707        self.name.as_slice()
1708    }
1709
1710    /// Pushes a value to the end of the list
1711    #[inline]
1712    async fn push<V>(&self, val: &V) -> Result<()>
1713    where
1714        V: Serialize + Sync + Send,
1715    {
1716        self._push_expire(bincode::serialize(val)?).await
1717    }
1718
1719    /// Pushes multiple values to the end of the list
1720    #[inline]
1721    async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
1722    where
1723        V: Serialize + Sync + Send,
1724    {
1725        //RPUSH key value [value ...]
1726        let vals = vals
1727            .into_iter()
1728            .map(|v| bincode::serialize(&v).map_err(|e| anyhow!(e)))
1729            .collect::<Result<Vec<_>>>()?;
1730        self._pushs_expire(vals).await
1731    }
1732
1733    /// Pushes a value with size limit handling
1734    #[inline]
1735    async fn push_limit<V>(
1736        &self,
1737        val: &V,
1738        limit: usize,
1739        pop_front_if_limited: bool,
1740    ) -> Result<Option<V>>
1741    where
1742        V: Serialize + Sync + Send,
1743        V: DeserializeOwned,
1744    {
1745        let data = bincode::serialize(val)?;
1746
1747        if let Some(res) = self
1748            ._push_limit_expire(data, limit, pop_front_if_limited)
1749            .await?
1750        {
1751            Ok(Some(
1752                bincode::deserialize::<V>(res.as_ref()).map_err(|e| anyhow!(e))?,
1753            ))
1754        } else {
1755            Ok(None)
1756        }
1757    }
1758
1759    /// Pops a value from the front of the list
1760    #[inline]
1761    async fn pop<V>(&self) -> Result<Option<V>>
1762    where
1763        V: DeserializeOwned + Sync + Send,
1764    {
1765        //LPOP key
1766        let removed = self
1767            .async_conn()
1768            .lpop::<_, Option<Vec<u8>>>(self.full_name.as_slice(), None)
1769            .await?;
1770
1771        let removed = if let Some(v) = removed {
1772            Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1773        } else {
1774            None
1775        };
1776
1777        Ok(removed)
1778    }
1779
1780    /// Gets all values in the list
1781    #[inline]
1782    async fn all<V>(&self) -> Result<Vec<V>>
1783    where
1784        V: DeserializeOwned + Sync + Send,
1785    {
1786        //LRANGE key 0 -1
1787        let all = self
1788            .async_conn()
1789            .lrange::<_, Vec<Vec<u8>>>(self.full_name.as_slice(), 0, -1)
1790            .await?;
1791        all.iter()
1792            .map(|v| bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
1793            .collect::<Result<Vec<_>>>()
1794    }
1795
1796    /// Gets a value by index
1797    #[inline]
1798    async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
1799    where
1800        V: DeserializeOwned + Sync + Send,
1801    {
1802        //LINDEX key index
1803        let val = self
1804            .async_conn()
1805            .lindex::<_, Option<Vec<u8>>>(self.full_name.as_slice(), idx as isize)
1806            .await?;
1807
1808        Ok(if let Some(v) = val {
1809            Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e))?)
1810        } else {
1811            None
1812        })
1813    }
1814
1815    /// Gets the length of the list
1816    #[inline]
1817    async fn len(&self) -> Result<usize> {
1818        //LLEN key
1819        Ok(self.async_conn().llen(self.full_name.as_slice()).await?)
1820    }
1821
1822    /// Checks if the list is empty
1823    #[inline]
1824    async fn is_empty(&self) -> Result<bool> {
1825        Ok(self.len().await? == 0)
1826    }
1827
1828    /// Clears the list
1829    #[inline]
1830    async fn clear(&self) -> Result<()> {
1831        let _: () = self.async_conn().del(self.full_name.as_slice()).await?;
1832        self.empty.store(true, Ordering::SeqCst);
1833        Ok(())
1834    }
1835
1836    /// Creates an iterator over list values
1837    #[inline]
1838    async fn iter<'a, V>(
1839        &'a mut self,
1840    ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
1841    where
1842        V: DeserializeOwned + Sync + Send + 'a + 'static,
1843    {
1844        Ok(Box::new(AsyncListValIter::new(
1845            self.full_name.as_slice(),
1846            self.db.async_conn(),
1847        )))
1848    }
1849
1850    /// Sets expiration time for the list
1851    #[cfg(feature = "ttl")]
1852    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1853        let res = self
1854            .async_conn()
1855            .pexpire_at::<_, bool>(self.full_name.as_slice(), at)
1856            .await?;
1857        Ok(res)
1858    }
1859
1860    /// Sets expiration duration for the list
1861    #[cfg(feature = "ttl")]
1862    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1863        let res = self
1864            .async_conn()
1865            .pexpire::<_, bool>(self.full_name.as_slice(), dur)
1866            .await?;
1867        Ok(res)
1868    }
1869
1870    /// Gets time-to-live for the list
1871    #[cfg(feature = "ttl")]
1872    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1873        let mut async_conn = self.async_conn();
1874        let res = async_conn
1875            .pttl::<_, isize>(self.full_name.as_slice())
1876            .await?;
1877        match res {
1878            -2 => Ok(None),
1879            -1 => Ok(Some(TimestampMillis::MAX)),
1880            _ => Ok(Some(res as TimestampMillis)),
1881        }
1882    }
1883}
1884
1885/// Iterator for list values
1886pub struct AsyncListValIter<'a, V> {
1887    name: &'a [u8],
1888    conn: RedisConnection,
1889    start: isize,
1890    limit: isize,
1891    catch_vals: Vec<Vec<u8>>,
1892    _m: std::marker::PhantomData<V>,
1893}
1894
1895impl<'a, V> AsyncListValIter<'a, V> {
1896    fn new(name: &'a [u8], conn: RedisConnection) -> Self {
1897        let start = 0;
1898        let limit = 20;
1899        Self {
1900            name,
1901            conn,
1902            start,
1903            limit,
1904            catch_vals: Vec::with_capacity((limit + 1) as usize),
1905            _m: std::marker::PhantomData,
1906        }
1907    }
1908}
1909
1910#[async_trait]
1911impl<V> AsyncIterator for AsyncListValIter<'_, V>
1912where
1913    V: DeserializeOwned + Sync + Send,
1914{
1915    type Item = Result<V>;
1916
1917    async fn next(&mut self) -> Option<Self::Item> {
1918        if let Some(val) = self.catch_vals.pop() {
1919            return Some(bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)));
1920        }
1921
1922        let vals = self
1923            .conn
1924            .lrange::<_, Vec<Vec<u8>>>(self.name, self.start, self.start + self.limit)
1925            .await;
1926
1927        match vals {
1928            Err(e) => return Some(Err(anyhow!(e))),
1929            Ok(vals) => {
1930                if vals.is_empty() {
1931                    return None;
1932                }
1933                self.start += vals.len() as isize;
1934                self.catch_vals = vals;
1935                self.catch_vals.reverse();
1936            }
1937        }
1938
1939        self.catch_vals
1940            .pop()
1941            .map(|val| bincode::deserialize::<V>(val.as_ref()).map_err(|e| anyhow!(e)))
1942    }
1943}
1944
1945/// Iterator for map entries
1946pub struct AsyncIter<'a, V> {
1947    iter: redis::AsyncIter<'a, (Key, Vec<u8>)>,
1948    _m: std::marker::PhantomData<V>,
1949}
1950
1951#[async_trait]
1952impl<V> AsyncIterator for AsyncIter<'_, V>
1953where
1954    V: DeserializeOwned + Sync + Send,
1955{
1956    type Item = IterItem<V>;
1957
1958    async fn next(&mut self) -> Option<Self::Item> {
1959        let item = match self.iter.next_item().await {
1960            None => None,
1961            Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
1962            Some(Ok(item)) => Some(item),
1963        };
1964        item.map(|(key, v)| match bincode::deserialize::<V>(v.as_ref()) {
1965            Ok(v) => Ok((key, v)),
1966            Err(e) => Err(anyhow::Error::new(e)),
1967        })
1968    }
1969}
1970
1971/// Iterator for database keys
1972pub struct AsyncDbKeyIter<'a> {
1973    prefix_len: usize,
1974    iters: Vec<redis::AsyncIter<'a, Key>>,
1975}
1976
1977#[async_trait]
1978impl AsyncIterator for AsyncDbKeyIter<'_> {
1979    type Item = Result<Key>;
1980    async fn next(&mut self) -> Option<Self::Item> {
1981        while let Some(iter) = self.iters.last_mut() {
1982            let item = match iter.next_item().await {
1983                None => None,
1984                Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
1985                Some(Ok(key)) => Some(Ok(key[self.prefix_len..].to_vec())),
1986            };
1987
1988            if item.is_some() {
1989                return item;
1990            }
1991            self.iters.pop();
1992            if self.iters.is_empty() {
1993                return None;
1994            }
1995        }
1996        None
1997    }
1998}
1999
2000/// Iterator for map keys
2001pub struct AsyncKeyIter<'a> {
2002    iter: redis::AsyncIter<'a, (Key, ())>,
2003}
2004
2005#[async_trait]
2006impl AsyncIterator for AsyncKeyIter<'_> {
2007    type Item = Result<Key>;
2008
2009    async fn next(&mut self) -> Option<Self::Item> {
2010        match self.iter.next_item().await {
2011            None => None,
2012            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
2013            Some(Ok((key, _))) => Some(Ok(key)),
2014        }
2015    }
2016}
2017
2018/// Iterator for maps
2019pub struct AsyncMapIter<'a> {
2020    db: RedisStorageDB,
2021    iters: Vec<redis::AsyncIter<'a, Key>>,
2022}
2023
2024#[async_trait]
2025impl AsyncIterator for AsyncMapIter<'_> {
2026    type Item = Result<StorageMap>;
2027
2028    async fn next(&mut self) -> Option<Self::Item> {
2029        while let Some(iter) = self.iters.last_mut() {
2030            let full_name = match iter.next_item().await {
2031                None => None,
2032                Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
2033                Some(Ok(key)) => Some(key),
2034            };
2035
2036            if let Some(full_name) = full_name {
2037                let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec();
2038                let m = RedisStorageMap::new(name, full_name, self.db.clone());
2039                return Some(Ok(StorageMap::RedisCluster(m)));
2040            }
2041            self.iters.pop();
2042            if self.iters.is_empty() {
2043                return None;
2044            }
2045        }
2046        None
2047    }
2048}
2049
2050/// Iterator for lists
2051pub struct AsyncListIter<'a> {
2052    db: RedisStorageDB,
2053    iters: Vec<redis::AsyncIter<'a, Key>>,
2054}
2055
2056#[async_trait]
2057impl AsyncIterator for AsyncListIter<'_> {
2058    type Item = Result<StorageList>;
2059
2060    async fn next(&mut self) -> Option<Self::Item> {
2061        while let Some(iter) = self.iters.last_mut() {
2062            let full_name = match iter.next_item().await {
2063                None => None,
2064                Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
2065                Some(Ok(key)) => Some(key),
2066            };
2067
2068            if let Some(full_name) = full_name {
2069                let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec();
2070                let l = RedisStorageList::new(name, full_name, self.db.clone());
2071                return Some(Ok(StorageList::RedisCluster(l)));
2072            }
2073            self.iters.pop();
2074            if self.iters.is_empty() {
2075                return None;
2076            }
2077        }
2078        None
2079    }
2080}
2081
2082/// Groups items by Redis slot
2083#[inline]
2084fn transform_by_slot<T>(input: Vec<(u16, T)>) -> Vec<Vec<T>> {
2085    let mut grouped_data: BTreeMap<u16, Vec<T>> = BTreeMap::new();
2086
2087    for (group_key, item) in input {
2088        grouped_data.entry(group_key).or_default().push(item);
2089    }
2090
2091    grouped_data.into_values().collect()
2092}