rmqtt_storage/
storage.rs

1//! Abstract storage layer with support for multiple backends (sled, Redis, Redis Cluster)
2//!
3//! Defines core storage traits and unified interfaces for:
4//! - Key-value storage (StorageDB)
5//! - Map structures (Map)
6//! - List structures (List)
7//!
8//! Provides backend-agnostic enums (DefaultStorageDB, StorageMap, StorageList)
9//! that dispatch operations to concrete implementations based on enabled features.
10
11use core::fmt;
12
13use async_trait::async_trait;
14use serde::de::DeserializeOwned;
15use serde::Serialize;
16
17#[cfg(feature = "redis")]
18use crate::storage_redis::{RedisStorageDB, RedisStorageList, RedisStorageMap};
19#[cfg(feature = "redis-cluster")]
20use crate::storage_redis_cluster::{
21    RedisStorageDB as RedisClusterStorageDB, RedisStorageList as RedisClusterStorageList,
22    RedisStorageMap as RedisClusterStorageMap,
23};
24#[cfg(feature = "sled")]
25use crate::storage_sled::{SledStorageDB, SledStorageList, SledStorageMap};
26use crate::Result;
27
28#[allow(unused_imports)]
29use crate::TimestampMillis;
30
31#[allow(unused)]
32pub(crate) const SEPARATOR: &[u8] = b"@";
33#[allow(unused)]
34pub(crate) const KEY_PREFIX: &[u8] = b"__rmqtt@";
35#[allow(unused)]
36pub(crate) const KEY_PREFIX_LEN: &[u8] = b"__rmqtt_len@";
37#[allow(unused)]
38pub(crate) const MAP_NAME_PREFIX: &[u8] = b"__rmqtt_map@";
39#[allow(unused)]
40pub(crate) const LIST_NAME_PREFIX: &[u8] = b"__rmqtt_list@";
41
42/// Type alias for storage keys
43pub type Key = Vec<u8>;
44
45/// Result type for iteration items (key-value pair)
46pub type IterItem<V> = Result<(Key, V)>;
47
48/// Asynchronous iterator trait for storage operations
49#[async_trait]
50pub trait AsyncIterator {
51    type Item;
52
53    /// Fetches the next item from the iterator
54    async fn next(&mut self) -> Option<Self::Item>;
55}
56
57/// Trait for splitting byte slices (used in sled backend)
58#[cfg(feature = "sled")]
59pub trait SplitSubslice {
60    /// Splits slice at the first occurrence of given subslice
61    fn split_subslice(&self, subslice: &[u8]) -> Option<(&[u8], &[u8])>;
62}
63
64#[cfg(feature = "sled")]
65impl SplitSubslice for [u8] {
66    fn split_subslice(&self, subslice: &[u8]) -> Option<(&[u8], &[u8])> {
67        self.windows(subslice.len())
68            .position(|window| window == subslice)
69            .map(|index| self.split_at(index + subslice.len()))
70    }
71}
72
73/// Core storage database operations
74#[async_trait]
75#[allow(clippy::len_without_is_empty)]
76pub trait StorageDB: Send + Sync {
77    /// Concrete Map type for this storage
78    type MapType: Map;
79
80    /// Concrete List type for this storage
81    type ListType: List;
82
83    /// Creates or accesses a named map
84    async fn map<N: AsRef<[u8]> + Sync + Send>(
85        &self,
86        name: N,
87        expire: Option<TimestampMillis>,
88    ) -> Result<Self::MapType>;
89
90    /// Removes an entire map
91    async fn map_remove<K>(&self, name: K) -> Result<()>
92    where
93        K: AsRef<[u8]> + Sync + Send;
94
95    /// Checks if a map exists
96    async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
97
98    /// Creates or accesses a named list
99    async fn list<V: AsRef<[u8]> + Sync + Send>(
100        &self,
101        name: V,
102        expire: Option<TimestampMillis>,
103    ) -> Result<Self::ListType>;
104
105    /// Removes an entire list
106    async fn list_remove<K>(&self, name: K) -> Result<()>
107    where
108        K: AsRef<[u8]> + Sync + Send;
109
110    /// Checks if a list exists
111    async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
112
113    /// Inserts a key-value pair
114    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
115    where
116        K: AsRef<[u8]> + Sync + Send,
117        V: serde::ser::Serialize + Sync + Send;
118
119    /// Retrieves a value by key
120    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
121    where
122        K: AsRef<[u8]> + Sync + Send,
123        V: DeserializeOwned + Sync + Send;
124
125    /// Removes a key-value pair
126    async fn remove<K>(&self, key: K) -> Result<()>
127    where
128        K: AsRef<[u8]> + Sync + Send;
129
130    /// Batch insert of multiple key-value pairs
131    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
132    where
133        V: serde::ser::Serialize + Sync + Send;
134
135    /// Batch removal of keys
136    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()>;
137
138    /// Increments a counter value
139    async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
140    where
141        K: AsRef<[u8]> + Sync + Send;
142
143    /// Decrements a counter value
144    async fn counter_decr<K>(&self, key: K, increment: isize) -> Result<()>
145    where
146        K: AsRef<[u8]> + Sync + Send;
147
148    /// Gets current counter value
149    async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
150    where
151        K: AsRef<[u8]> + Sync + Send;
152
153    /// Sets counter to specific value
154    async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
155    where
156        K: AsRef<[u8]> + Sync + Send;
157
158    /// Checks if key exists
159    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
160
161    /// Gets number of items in storage (requires "len" feature)
162    #[cfg(feature = "len")]
163    async fn len(&self) -> Result<usize>;
164
165    /// Gets total storage size in bytes
166    async fn db_size(&self) -> Result<usize>;
167
168    /// Sets expiration timestamp for a key (requires "ttl" feature)
169    #[cfg(feature = "ttl")]
170    async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
171    where
172        K: AsRef<[u8]> + Sync + Send;
173
174    /// Sets expiration duration for a key (requires "ttl" feature)
175    #[cfg(feature = "ttl")]
176    async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
177    where
178        K: AsRef<[u8]> + Sync + Send;
179
180    /// Gets remaining time-to-live for a key (requires "ttl" feature)
181    #[cfg(feature = "ttl")]
182    async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
183    where
184        K: AsRef<[u8]> + Sync + Send;
185
186    /// Iterates over all maps in storage
187    async fn map_iter<'a>(
188        &'a mut self,
189    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>>;
190
191    /// Iterates over all lists in storage
192    async fn list_iter<'a>(
193        &'a mut self,
194    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>>;
195
196    /// Scans keys matching pattern (supports * and ? wildcards)
197    async fn scan<'a, P>(
198        &'a mut self,
199        pattern: P,
200    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
201    where
202        P: AsRef<[u8]> + Send + Sync;
203
204    /// Gets storage backend information
205    async fn info(&self) -> Result<serde_json::Value>;
206}
207
208/// Map (dictionary) storage operations
209#[async_trait]
210pub trait Map: Sync + Send {
211    /// Gets the name of this map
212    fn name(&self) -> &[u8];
213
214    /// Inserts a key-value pair into the map
215    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
216    where
217        K: AsRef<[u8]> + Sync + Send,
218        V: serde::ser::Serialize + Sync + Send + ?Sized;
219
220    /// Retrieves a value from the map
221    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
222    where
223        K: AsRef<[u8]> + Sync + Send,
224        V: DeserializeOwned + Sync + Send;
225
226    /// Removes a key from the map
227    async fn remove<K>(&self, key: K) -> Result<()>
228    where
229        K: AsRef<[u8]> + Sync + Send;
230
231    /// Checks if key exists in the map
232    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
233
234    /// Gets number of items in map (requires "map_len" feature)
235    #[cfg(feature = "map_len")]
236    async fn len(&self) -> Result<usize>;
237
238    /// Checks if map is empty
239    async fn is_empty(&self) -> Result<bool>;
240
241    /// Clears all entries in the map
242    async fn clear(&self) -> Result<()>;
243
244    /// Removes a key and returns its value
245    async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
246    where
247        K: AsRef<[u8]> + Sync + Send,
248        V: DeserializeOwned + Sync + Send;
249
250    /// Removes all keys with given prefix
251    async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
252    where
253        K: AsRef<[u8]> + Sync + Send;
254
255    /// Batch insert of key-value pairs
256    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
257    where
258        V: serde::ser::Serialize + Sync + Send;
259
260    /// Batch removal of keys
261    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()>;
262
263    /// Iterates over all key-value pairs
264    async fn iter<'a, V>(
265        &'a mut self,
266    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
267    where
268        V: DeserializeOwned + Sync + Send + 'a + 'static;
269
270    /// Iterates over all keys
271    async fn key_iter<'a>(
272        &'a mut self,
273    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>;
274
275    /// Iterates over key-value pairs with given prefix
276    async fn prefix_iter<'a, P, V>(
277        &'a mut self,
278        prefix: P,
279    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
280    where
281        P: AsRef<[u8]> + Send + Sync,
282        V: DeserializeOwned + Sync + Send + 'a + 'static;
283
284    /// Sets expiration timestamp for the entire map (requires "ttl" feature)
285    #[cfg(feature = "ttl")]
286    async fn expire_at(&self, at: TimestampMillis) -> Result<bool>;
287
288    /// Sets expiration duration for the entire map (requires "ttl" feature)
289    #[cfg(feature = "ttl")]
290    async fn expire(&self, dur: TimestampMillis) -> Result<bool>;
291
292    /// Gets remaining time-to-live for the map (requires "ttl" feature)
293    #[cfg(feature = "ttl")]
294    async fn ttl(&self) -> Result<Option<TimestampMillis>>;
295}
296
297/// List storage operations
298#[async_trait]
299pub trait List: Sync + Send {
300    /// Gets the name of this list
301    fn name(&self) -> &[u8];
302
303    /// Appends a value to the end of the list
304    async fn push<V>(&self, val: &V) -> Result<()>
305    where
306        V: serde::ser::Serialize + Sync + Send;
307
308    /// Appends multiple values to the list
309    async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
310    where
311        V: serde::ser::Serialize + Sync + Send;
312
313    /// Pushes with size limit and optional pop-front behavior
314    async fn push_limit<V>(
315        &self,
316        val: &V,
317        limit: usize,
318        pop_front_if_limited: bool,
319    ) -> Result<Option<V>>
320    where
321        V: serde::ser::Serialize + Sync + Send,
322        V: DeserializeOwned;
323
324    /// Removes and returns the first value in the list
325    async fn pop<V>(&self) -> Result<Option<V>>
326    where
327        V: DeserializeOwned + Sync + Send;
328
329    /// Retrieves all values in the list
330    async fn all<V>(&self) -> Result<Vec<V>>
331    where
332        V: DeserializeOwned + Sync + Send;
333
334    /// Gets value by index
335    async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
336    where
337        V: DeserializeOwned + Sync + Send;
338
339    /// Gets number of items in the list
340    async fn len(&self) -> Result<usize>;
341
342    /// Checks if list is empty
343    async fn is_empty(&self) -> Result<bool>;
344
345    /// Clears all items from the list
346    async fn clear(&self) -> Result<()>;
347
348    /// Iterates over all values
349    async fn iter<'a, V>(
350        &'a mut self,
351    ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
352    where
353        V: DeserializeOwned + Sync + Send + 'a + 'static;
354
355    /// Sets expiration timestamp for the entire list (requires "ttl" feature)
356    #[cfg(feature = "ttl")]
357    async fn expire_at(&self, at: TimestampMillis) -> Result<bool>;
358
359    /// Sets expiration duration for the entire list (requires "ttl" feature)
360    #[cfg(feature = "ttl")]
361    async fn expire(&self, dur: TimestampMillis) -> Result<bool>;
362
363    /// Gets remaining time-to-live for the list (requires "ttl" feature)
364    #[cfg(feature = "ttl")]
365    async fn ttl(&self) -> Result<Option<TimestampMillis>>;
366}
367
368/// Unified storage backend enum (dispatches to concrete implementations)
369#[derive(Clone)]
370pub enum DefaultStorageDB {
371    #[cfg(feature = "sled")]
372    /// Sled database backend
373    Sled(SledStorageDB),
374    #[cfg(feature = "redis")]
375    /// Redis backend
376    Redis(RedisStorageDB),
377    #[cfg(feature = "redis-cluster")]
378    /// Redis Cluster backend
379    RedisCluster(RedisClusterStorageDB),
380}
381
382impl DefaultStorageDB {
383    /// Accesses a named map
384    #[inline]
385    pub async fn map<V: AsRef<[u8]> + Sync + Send>(
386        &self,
387        name: V,
388        expire: Option<TimestampMillis>,
389    ) -> Result<StorageMap> {
390        Ok(match self {
391            #[cfg(feature = "sled")]
392            DefaultStorageDB::Sled(db) => StorageMap::Sled(db.map(name, expire).await?),
393            #[cfg(feature = "redis")]
394            DefaultStorageDB::Redis(db) => StorageMap::Redis(db.map(name, expire).await?),
395            #[cfg(feature = "redis-cluster")]
396            DefaultStorageDB::RedisCluster(db) => {
397                StorageMap::RedisCluster(db.map(name, expire).await?)
398            }
399        })
400    }
401
402    /// Removes a named map
403    #[inline]
404    pub async fn map_remove<K>(&self, name: K) -> Result<()>
405    where
406        K: AsRef<[u8]> + Sync + Send,
407    {
408        match self {
409            #[cfg(feature = "sled")]
410            DefaultStorageDB::Sled(db) => db.map_remove(name).await,
411            #[cfg(feature = "redis")]
412            DefaultStorageDB::Redis(db) => db.map_remove(name).await,
413            #[cfg(feature = "redis-cluster")]
414            DefaultStorageDB::RedisCluster(db) => db.map_remove(name).await,
415        }
416    }
417
418    /// Checks if map exists
419    #[inline]
420    pub async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
421        match self {
422            #[cfg(feature = "sled")]
423            DefaultStorageDB::Sled(db) => db.map_contains_key(key).await,
424            #[cfg(feature = "redis")]
425            DefaultStorageDB::Redis(db) => db.map_contains_key(key).await,
426            #[cfg(feature = "redis-cluster")]
427            DefaultStorageDB::RedisCluster(db) => db.map_contains_key(key).await,
428        }
429    }
430
431    /// Accesses a named list
432    #[inline]
433    pub async fn list<V: AsRef<[u8]> + Sync + Send>(
434        &self,
435        name: V,
436        expire: Option<TimestampMillis>,
437    ) -> Result<StorageList> {
438        Ok(match self {
439            #[cfg(feature = "sled")]
440            DefaultStorageDB::Sled(db) => StorageList::Sled(db.list(name, expire).await?),
441            #[cfg(feature = "redis")]
442            DefaultStorageDB::Redis(db) => StorageList::Redis(db.list(name, expire).await?),
443            #[cfg(feature = "redis-cluster")]
444            DefaultStorageDB::RedisCluster(db) => {
445                StorageList::RedisCluster(db.list(name, expire).await?)
446            }
447        })
448    }
449
450    /// Removes a named list
451    #[inline]
452    pub async fn list_remove<K>(&self, name: K) -> Result<()>
453    where
454        K: AsRef<[u8]> + Sync + Send,
455    {
456        match self {
457            #[cfg(feature = "sled")]
458            DefaultStorageDB::Sled(db) => db.list_remove(name).await,
459            #[cfg(feature = "redis")]
460            DefaultStorageDB::Redis(db) => db.list_remove(name).await,
461            #[cfg(feature = "redis-cluster")]
462            DefaultStorageDB::RedisCluster(db) => db.list_remove(name).await,
463        }
464    }
465
466    /// Checks if list exists
467    #[inline]
468    pub async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
469        match self {
470            #[cfg(feature = "sled")]
471            DefaultStorageDB::Sled(db) => db.list_contains_key(key).await,
472            #[cfg(feature = "redis")]
473            DefaultStorageDB::Redis(db) => db.list_contains_key(key).await,
474            #[cfg(feature = "redis-cluster")]
475            DefaultStorageDB::RedisCluster(db) => db.list_contains_key(key).await,
476        }
477    }
478
479    /// Inserts a key-value pair
480    #[inline]
481    pub async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
482    where
483        K: AsRef<[u8]> + Sync + Send,
484        V: Serialize + Sync + Send,
485    {
486        match self {
487            #[cfg(feature = "sled")]
488            DefaultStorageDB::Sled(db) => db.insert(key, val).await,
489            #[cfg(feature = "redis")]
490            DefaultStorageDB::Redis(db) => db.insert(key, val).await,
491            #[cfg(feature = "redis-cluster")]
492            DefaultStorageDB::RedisCluster(db) => db.insert(key, val).await,
493        }
494    }
495
496    /// Retrieves a value by key
497    #[inline]
498    pub async fn get<K, V>(&self, key: K) -> Result<Option<V>>
499    where
500        K: AsRef<[u8]> + Sync + Send,
501        V: DeserializeOwned + Sync + Send,
502    {
503        match self {
504            #[cfg(feature = "sled")]
505            DefaultStorageDB::Sled(db) => db.get(key).await,
506            #[cfg(feature = "redis")]
507            DefaultStorageDB::Redis(db) => db.get(key).await,
508            #[cfg(feature = "redis-cluster")]
509            DefaultStorageDB::RedisCluster(db) => db.get(key).await,
510        }
511    }
512
513    /// Removes a key-value pair
514    #[inline]
515    pub async fn remove<K>(&self, key: K) -> Result<()>
516    where
517        K: AsRef<[u8]> + Sync + Send,
518    {
519        match self {
520            #[cfg(feature = "sled")]
521            DefaultStorageDB::Sled(db) => db.remove(key).await,
522            #[cfg(feature = "redis")]
523            DefaultStorageDB::Redis(db) => db.remove(key).await,
524            #[cfg(feature = "redis-cluster")]
525            DefaultStorageDB::RedisCluster(db) => db.remove(key).await,
526        }
527    }
528
529    /// Batch insert of key-value pairs
530    #[inline]
531    pub async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
532    where
533        V: serde::ser::Serialize + Sync + Send,
534    {
535        match self {
536            #[cfg(feature = "sled")]
537            DefaultStorageDB::Sled(db) => db.batch_insert(key_vals).await,
538            #[cfg(feature = "redis")]
539            DefaultStorageDB::Redis(db) => db.batch_insert(key_vals).await,
540            #[cfg(feature = "redis-cluster")]
541            DefaultStorageDB::RedisCluster(db) => db.batch_insert(key_vals).await,
542        }
543    }
544
545    /// Batch removal of keys
546    #[inline]
547    pub async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
548        match self {
549            #[cfg(feature = "sled")]
550            DefaultStorageDB::Sled(db) => db.batch_remove(keys).await,
551            #[cfg(feature = "redis")]
552            DefaultStorageDB::Redis(db) => db.batch_remove(keys).await,
553            #[cfg(feature = "redis-cluster")]
554            DefaultStorageDB::RedisCluster(db) => db.batch_remove(keys).await,
555        }
556    }
557
558    /// Increments a counter
559    #[inline]
560    pub async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
561    where
562        K: AsRef<[u8]> + Sync + Send,
563    {
564        match self {
565            #[cfg(feature = "sled")]
566            DefaultStorageDB::Sled(db) => db.counter_incr(key, increment).await,
567            #[cfg(feature = "redis")]
568            DefaultStorageDB::Redis(db) => db.counter_incr(key, increment).await,
569            #[cfg(feature = "redis-cluster")]
570            DefaultStorageDB::RedisCluster(db) => db.counter_incr(key, increment).await,
571        }
572    }
573
574    /// Decrements a counter
575    #[inline]
576    pub async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
577    where
578        K: AsRef<[u8]> + Sync + Send,
579    {
580        match self {
581            #[cfg(feature = "sled")]
582            DefaultStorageDB::Sled(db) => db.counter_decr(key, decrement).await,
583            #[cfg(feature = "redis")]
584            DefaultStorageDB::Redis(db) => db.counter_decr(key, decrement).await,
585            #[cfg(feature = "redis-cluster")]
586            DefaultStorageDB::RedisCluster(db) => db.counter_decr(key, decrement).await,
587        }
588    }
589
590    /// Gets counter value
591    #[inline]
592    pub async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
593    where
594        K: AsRef<[u8]> + Sync + Send,
595    {
596        match self {
597            #[cfg(feature = "sled")]
598            DefaultStorageDB::Sled(db) => db.counter_get(key).await,
599            #[cfg(feature = "redis")]
600            DefaultStorageDB::Redis(db) => db.counter_get(key).await,
601            #[cfg(feature = "redis-cluster")]
602            DefaultStorageDB::RedisCluster(db) => db.counter_get(key).await,
603        }
604    }
605
606    /// Sets counter value
607    #[inline]
608    pub async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
609    where
610        K: AsRef<[u8]> + Sync + Send,
611    {
612        match self {
613            #[cfg(feature = "sled")]
614            DefaultStorageDB::Sled(db) => db.counter_set(key, val).await,
615            #[cfg(feature = "redis")]
616            DefaultStorageDB::Redis(db) => db.counter_set(key, val).await,
617            #[cfg(feature = "redis-cluster")]
618            DefaultStorageDB::RedisCluster(db) => db.counter_set(key, val).await,
619        }
620    }
621
622    /// Gets number of items (requires "len" feature)
623    #[inline]
624    #[cfg(feature = "len")]
625    pub async fn len(&self) -> Result<usize> {
626        match self {
627            #[cfg(feature = "sled")]
628            DefaultStorageDB::Sled(db) => db.len().await,
629            #[cfg(feature = "redis")]
630            DefaultStorageDB::Redis(db) => db.len().await,
631            #[cfg(feature = "redis-cluster")]
632            DefaultStorageDB::RedisCluster(db) => db.len().await,
633        }
634    }
635
636    /// Gets total storage size in bytes
637    #[inline]
638    pub async fn db_size(&self) -> Result<usize> {
639        match self {
640            #[cfg(feature = "sled")]
641            DefaultStorageDB::Sled(db) => db.db_size().await,
642            #[cfg(feature = "redis")]
643            DefaultStorageDB::Redis(db) => db.db_size().await,
644            #[cfg(feature = "redis-cluster")]
645            DefaultStorageDB::RedisCluster(db) => db.db_size().await,
646        }
647    }
648
649    /// Checks if key exists
650    #[inline]
651    pub async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
652        match self {
653            #[cfg(feature = "sled")]
654            DefaultStorageDB::Sled(db) => db.contains_key(key).await,
655            #[cfg(feature = "redis")]
656            DefaultStorageDB::Redis(db) => db.contains_key(key).await,
657            #[cfg(feature = "redis-cluster")]
658            DefaultStorageDB::RedisCluster(db) => db.contains_key(key).await,
659        }
660    }
661
662    /// Sets expiration timestamp (requires "ttl" feature)
663    #[inline]
664    #[cfg(feature = "ttl")]
665    pub async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
666    where
667        K: AsRef<[u8]> + Sync + Send,
668    {
669        match self {
670            #[cfg(feature = "sled")]
671            DefaultStorageDB::Sled(db) => db.expire_at(key, at).await,
672            #[cfg(feature = "redis")]
673            DefaultStorageDB::Redis(db) => db.expire_at(key, at).await,
674            #[cfg(feature = "redis-cluster")]
675            DefaultStorageDB::RedisCluster(db) => db.expire_at(key, at).await,
676        }
677    }
678
679    /// Sets expiration duration (requires "ttl" feature)
680    #[inline]
681    #[cfg(feature = "ttl")]
682    pub async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
683    where
684        K: AsRef<[u8]> + Sync + Send,
685    {
686        match self {
687            #[cfg(feature = "sled")]
688            DefaultStorageDB::Sled(db) => db.expire(key, dur).await,
689            #[cfg(feature = "redis")]
690            DefaultStorageDB::Redis(db) => db.expire(key, dur).await,
691            #[cfg(feature = "redis-cluster")]
692            DefaultStorageDB::RedisCluster(db) => db.expire(key, dur).await,
693        }
694    }
695
696    /// Gets time-to-live (requires "ttl" feature)
697    #[inline]
698    #[cfg(feature = "ttl")]
699    pub async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
700    where
701        K: AsRef<[u8]> + Sync + Send,
702    {
703        match self {
704            #[cfg(feature = "sled")]
705            DefaultStorageDB::Sled(db) => db.ttl(key).await,
706            #[cfg(feature = "redis")]
707            DefaultStorageDB::Redis(db) => db.ttl(key).await,
708            #[cfg(feature = "redis-cluster")]
709            DefaultStorageDB::RedisCluster(db) => db.ttl(key).await,
710        }
711    }
712
713    /// Iterates over maps
714    #[inline]
715    pub async fn map_iter<'a>(
716        &'a mut self,
717    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
718        match self {
719            #[cfg(feature = "sled")]
720            DefaultStorageDB::Sled(db) => db.map_iter().await,
721            #[cfg(feature = "redis")]
722            DefaultStorageDB::Redis(db) => db.map_iter().await,
723            #[cfg(feature = "redis-cluster")]
724            DefaultStorageDB::RedisCluster(db) => db.map_iter().await,
725        }
726    }
727
728    /// Iterates over lists
729    #[inline]
730    pub async fn list_iter<'a>(
731        &'a mut self,
732    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
733        match self {
734            #[cfg(feature = "sled")]
735            DefaultStorageDB::Sled(db) => db.list_iter().await,
736            #[cfg(feature = "redis")]
737            DefaultStorageDB::Redis(db) => db.list_iter().await,
738            #[cfg(feature = "redis-cluster")]
739            DefaultStorageDB::RedisCluster(db) => db.list_iter().await,
740        }
741    }
742
743    /// Scans keys matching pattern
744    #[inline]
745    pub async fn scan<'a, P>(
746        &'a mut self,
747        pattern: P,
748    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
749    where
750        P: AsRef<[u8]> + Send + Sync,
751    {
752        match self {
753            #[cfg(feature = "sled")]
754            DefaultStorageDB::Sled(db) => db.scan(pattern).await,
755            #[cfg(feature = "redis")]
756            DefaultStorageDB::Redis(db) => db.scan(pattern).await,
757            #[cfg(feature = "redis-cluster")]
758            DefaultStorageDB::RedisCluster(db) => db.scan(pattern).await,
759        }
760    }
761
762    /// Gets storage information
763    #[inline]
764    pub async fn info(&self) -> Result<serde_json::Value> {
765        match self {
766            #[cfg(feature = "sled")]
767            DefaultStorageDB::Sled(db) => db.info().await,
768            #[cfg(feature = "redis")]
769            DefaultStorageDB::Redis(db) => db.info().await,
770            #[cfg(feature = "redis-cluster")]
771            DefaultStorageDB::RedisCluster(db) => db.info().await,
772        }
773    }
774}
775
776/// Unified map implementation enum
777#[derive(Clone)]
778pub enum StorageMap {
779    #[cfg(feature = "sled")]
780    /// Sled map implementation
781    Sled(SledStorageMap),
782    #[cfg(feature = "redis")]
783    /// Redis map implementation
784    Redis(RedisStorageMap),
785    #[cfg(feature = "redis-cluster")]
786    /// Redis Cluster map implementation
787    RedisCluster(RedisClusterStorageMap),
788}
789
790#[async_trait]
791impl Map for StorageMap {
792    fn name(&self) -> &[u8] {
793        match self {
794            #[cfg(feature = "sled")]
795            StorageMap::Sled(m) => m.name(),
796            #[cfg(feature = "redis")]
797            StorageMap::Redis(m) => m.name(),
798            #[cfg(feature = "redis-cluster")]
799            StorageMap::RedisCluster(m) => m.name(),
800        }
801    }
802
803    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
804    where
805        K: AsRef<[u8]> + Sync + Send,
806        V: Serialize + Sync + Send + ?Sized,
807    {
808        match self {
809            #[cfg(feature = "sled")]
810            StorageMap::Sled(m) => m.insert(key, val).await,
811            #[cfg(feature = "redis")]
812            StorageMap::Redis(m) => m.insert(key, val).await,
813            #[cfg(feature = "redis-cluster")]
814            StorageMap::RedisCluster(m) => m.insert(key, val).await,
815        }
816    }
817
818    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
819    where
820        K: AsRef<[u8]> + Sync + Send,
821        V: DeserializeOwned + Sync + Send,
822    {
823        match self {
824            #[cfg(feature = "sled")]
825            StorageMap::Sled(m) => m.get(key).await,
826            #[cfg(feature = "redis")]
827            StorageMap::Redis(m) => m.get(key).await,
828            #[cfg(feature = "redis-cluster")]
829            StorageMap::RedisCluster(m) => m.get(key).await,
830        }
831    }
832
833    async fn remove<K>(&self, key: K) -> Result<()>
834    where
835        K: AsRef<[u8]> + Sync + Send,
836    {
837        match self {
838            #[cfg(feature = "sled")]
839            StorageMap::Sled(m) => m.remove(key).await,
840            #[cfg(feature = "redis")]
841            StorageMap::Redis(m) => m.remove(key).await,
842            #[cfg(feature = "redis-cluster")]
843            StorageMap::RedisCluster(m) => m.remove(key).await,
844        }
845    }
846
847    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
848        match self {
849            #[cfg(feature = "sled")]
850            StorageMap::Sled(m) => m.contains_key(key).await,
851            #[cfg(feature = "redis")]
852            StorageMap::Redis(m) => m.contains_key(key).await,
853            #[cfg(feature = "redis-cluster")]
854            StorageMap::RedisCluster(m) => m.contains_key(key).await,
855        }
856    }
857
858    #[cfg(feature = "map_len")]
859    async fn len(&self) -> Result<usize> {
860        match self {
861            #[cfg(feature = "sled")]
862            StorageMap::Sled(m) => m.len().await,
863            #[cfg(feature = "redis")]
864            StorageMap::Redis(m) => m.len().await,
865            #[cfg(feature = "redis-cluster")]
866            StorageMap::RedisCluster(m) => m.len().await,
867        }
868    }
869
870    async fn is_empty(&self) -> Result<bool> {
871        match self {
872            #[cfg(feature = "sled")]
873            StorageMap::Sled(m) => m.is_empty().await,
874            #[cfg(feature = "redis")]
875            StorageMap::Redis(m) => m.is_empty().await,
876            #[cfg(feature = "redis-cluster")]
877            StorageMap::RedisCluster(m) => m.is_empty().await,
878        }
879    }
880
881    async fn clear(&self) -> Result<()> {
882        match self {
883            #[cfg(feature = "sled")]
884            StorageMap::Sled(m) => m.clear().await,
885            #[cfg(feature = "redis")]
886            StorageMap::Redis(m) => m.clear().await,
887            #[cfg(feature = "redis-cluster")]
888            StorageMap::RedisCluster(m) => m.clear().await,
889        }
890    }
891
892    async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
893    where
894        K: AsRef<[u8]> + Sync + Send,
895        V: DeserializeOwned + Sync + Send,
896    {
897        match self {
898            #[cfg(feature = "sled")]
899            StorageMap::Sled(m) => m.remove_and_fetch(key).await,
900            #[cfg(feature = "redis")]
901            StorageMap::Redis(m) => m.remove_and_fetch(key).await,
902            #[cfg(feature = "redis-cluster")]
903            StorageMap::RedisCluster(m) => m.remove_and_fetch(key).await,
904        }
905    }
906
907    async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
908    where
909        K: AsRef<[u8]> + Sync + Send,
910    {
911        match self {
912            #[cfg(feature = "sled")]
913            StorageMap::Sled(m) => m.remove_with_prefix(prefix).await,
914            #[cfg(feature = "redis")]
915            StorageMap::Redis(m) => m.remove_with_prefix(prefix).await,
916            #[cfg(feature = "redis-cluster")]
917            StorageMap::RedisCluster(m) => m.remove_with_prefix(prefix).await,
918        }
919    }
920
921    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
922    where
923        V: Serialize + Sync + Send,
924    {
925        match self {
926            #[cfg(feature = "sled")]
927            StorageMap::Sled(m) => m.batch_insert(key_vals).await,
928            #[cfg(feature = "redis")]
929            StorageMap::Redis(m) => m.batch_insert(key_vals).await,
930            #[cfg(feature = "redis-cluster")]
931            StorageMap::RedisCluster(m) => m.batch_insert(key_vals).await,
932        }
933    }
934
935    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
936        match self {
937            #[cfg(feature = "sled")]
938            StorageMap::Sled(m) => m.batch_remove(keys).await,
939            #[cfg(feature = "redis")]
940            StorageMap::Redis(m) => m.batch_remove(keys).await,
941            #[cfg(feature = "redis-cluster")]
942            StorageMap::RedisCluster(m) => m.batch_remove(keys).await,
943        }
944    }
945
946    async fn iter<'a, V>(
947        &'a mut self,
948    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
949    where
950        V: DeserializeOwned + Sync + Send + 'a + 'static,
951    {
952        match self {
953            #[cfg(feature = "sled")]
954            StorageMap::Sled(m) => m.iter().await,
955            #[cfg(feature = "redis")]
956            StorageMap::Redis(m) => m.iter().await,
957            #[cfg(feature = "redis-cluster")]
958            StorageMap::RedisCluster(m) => m.iter().await,
959        }
960    }
961
962    async fn key_iter<'a>(
963        &'a mut self,
964    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
965        match self {
966            #[cfg(feature = "sled")]
967            StorageMap::Sled(m) => m.key_iter().await,
968            #[cfg(feature = "redis")]
969            StorageMap::Redis(m) => m.key_iter().await,
970            #[cfg(feature = "redis-cluster")]
971            StorageMap::RedisCluster(m) => m.key_iter().await,
972        }
973    }
974
975    async fn prefix_iter<'a, P, V>(
976        &'a mut self,
977        prefix: P,
978    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
979    where
980        P: AsRef<[u8]> + Send + Sync,
981        V: DeserializeOwned + Sync + Send + 'a + 'static,
982    {
983        match self {
984            #[cfg(feature = "sled")]
985            StorageMap::Sled(m) => m.prefix_iter(prefix).await,
986            #[cfg(feature = "redis")]
987            StorageMap::Redis(m) => m.prefix_iter(prefix).await,
988            #[cfg(feature = "redis-cluster")]
989            StorageMap::RedisCluster(m) => m.prefix_iter(prefix).await,
990        }
991    }
992
993    #[cfg(feature = "ttl")]
994    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
995        match self {
996            #[cfg(feature = "sled")]
997            StorageMap::Sled(m) => m.expire_at(at).await,
998            #[cfg(feature = "redis")]
999            StorageMap::Redis(m) => m.expire_at(at).await,
1000            #[cfg(feature = "redis-cluster")]
1001            StorageMap::RedisCluster(m) => m.expire_at(at).await,
1002        }
1003    }
1004
1005    #[cfg(feature = "ttl")]
1006    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1007        match self {
1008            #[cfg(feature = "sled")]
1009            StorageMap::Sled(m) => m.expire(dur).await,
1010            #[cfg(feature = "redis")]
1011            StorageMap::Redis(m) => m.expire(dur).await,
1012            #[cfg(feature = "redis-cluster")]
1013            StorageMap::RedisCluster(m) => m.expire(dur).await,
1014        }
1015    }
1016
1017    #[cfg(feature = "ttl")]
1018    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1019        match self {
1020            #[cfg(feature = "sled")]
1021            StorageMap::Sled(m) => m.ttl().await,
1022            #[cfg(feature = "redis")]
1023            StorageMap::Redis(m) => m.ttl().await,
1024            #[cfg(feature = "redis-cluster")]
1025            StorageMap::RedisCluster(m) => m.ttl().await,
1026        }
1027    }
1028}
1029
1030/// Unified list implementation enum
1031#[derive(Clone)]
1032pub enum StorageList {
1033    #[cfg(feature = "sled")]
1034    /// Sled list implementation
1035    Sled(SledStorageList),
1036    #[cfg(feature = "redis")]
1037    /// Redis list implementation
1038    Redis(RedisStorageList),
1039    #[cfg(feature = "redis-cluster")]
1040    /// Redis Cluster list implementation
1041    RedisCluster(RedisClusterStorageList),
1042}
1043
1044impl fmt::Debug for StorageList {
1045    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046        let name = match self {
1047            #[cfg(feature = "sled")]
1048            StorageList::Sled(list) => list.name(),
1049            #[cfg(feature = "redis")]
1050            StorageList::Redis(list) => list.name(),
1051            #[cfg(feature = "redis-cluster")]
1052            StorageList::RedisCluster(list) => list.name(),
1053        };
1054
1055        f.debug_tuple(&format!("StorageList({:?})", String::from_utf8_lossy(name)))
1056            .finish()
1057    }
1058}
1059
1060#[async_trait]
1061impl List for StorageList {
1062    fn name(&self) -> &[u8] {
1063        match self {
1064            #[cfg(feature = "sled")]
1065            StorageList::Sled(m) => m.name(),
1066            #[cfg(feature = "redis")]
1067            StorageList::Redis(m) => m.name(),
1068            #[cfg(feature = "redis-cluster")]
1069            StorageList::RedisCluster(m) => m.name(),
1070        }
1071    }
1072
1073    async fn push<V>(&self, val: &V) -> Result<()>
1074    where
1075        V: Serialize + Sync + Send,
1076    {
1077        match self {
1078            #[cfg(feature = "sled")]
1079            StorageList::Sled(list) => list.push(val).await,
1080            #[cfg(feature = "redis")]
1081            StorageList::Redis(list) => list.push(val).await,
1082            #[cfg(feature = "redis-cluster")]
1083            StorageList::RedisCluster(list) => list.push(val).await,
1084        }
1085    }
1086
1087    async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
1088    where
1089        V: serde::ser::Serialize + Sync + Send,
1090    {
1091        match self {
1092            #[cfg(feature = "sled")]
1093            StorageList::Sled(list) => list.pushs(vals).await,
1094            #[cfg(feature = "redis")]
1095            StorageList::Redis(list) => list.pushs(vals).await,
1096            #[cfg(feature = "redis-cluster")]
1097            StorageList::RedisCluster(list) => list.pushs(vals).await,
1098        }
1099    }
1100
1101    async fn push_limit<V>(
1102        &self,
1103        val: &V,
1104        limit: usize,
1105        pop_front_if_limited: bool,
1106    ) -> Result<Option<V>>
1107    where
1108        V: Serialize + Sync + Send,
1109        V: DeserializeOwned,
1110    {
1111        match self {
1112            #[cfg(feature = "sled")]
1113            StorageList::Sled(list) => list.push_limit(val, limit, pop_front_if_limited).await,
1114            #[cfg(feature = "redis")]
1115            StorageList::Redis(list) => list.push_limit(val, limit, pop_front_if_limited).await,
1116            #[cfg(feature = "redis-cluster")]
1117            StorageList::RedisCluster(list) => {
1118                list.push_limit(val, limit, pop_front_if_limited).await
1119            }
1120        }
1121    }
1122
1123    async fn pop<V>(&self) -> Result<Option<V>>
1124    where
1125        V: DeserializeOwned + Sync + Send,
1126    {
1127        match self {
1128            #[cfg(feature = "sled")]
1129            StorageList::Sled(list) => list.pop().await,
1130            #[cfg(feature = "redis")]
1131            StorageList::Redis(list) => list.pop().await,
1132            #[cfg(feature = "redis-cluster")]
1133            StorageList::RedisCluster(list) => list.pop().await,
1134        }
1135    }
1136
1137    async fn all<V>(&self) -> Result<Vec<V>>
1138    where
1139        V: DeserializeOwned + Sync + Send,
1140    {
1141        match self {
1142            #[cfg(feature = "sled")]
1143            StorageList::Sled(list) => list.all().await,
1144            #[cfg(feature = "redis")]
1145            StorageList::Redis(list) => list.all().await,
1146            #[cfg(feature = "redis-cluster")]
1147            StorageList::RedisCluster(list) => list.all().await,
1148        }
1149    }
1150
1151    async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
1152    where
1153        V: DeserializeOwned + Sync + Send,
1154    {
1155        match self {
1156            #[cfg(feature = "sled")]
1157            StorageList::Sled(list) => list.get_index(idx).await,
1158            #[cfg(feature = "redis")]
1159            StorageList::Redis(list) => list.get_index(idx).await,
1160            #[cfg(feature = "redis-cluster")]
1161            StorageList::RedisCluster(list) => list.get_index(idx).await,
1162        }
1163    }
1164
1165    async fn len(&self) -> Result<usize> {
1166        match self {
1167            #[cfg(feature = "sled")]
1168            StorageList::Sled(list) => list.len().await,
1169            #[cfg(feature = "redis")]
1170            StorageList::Redis(list) => list.len().await,
1171            #[cfg(feature = "redis-cluster")]
1172            StorageList::RedisCluster(list) => list.len().await,
1173        }
1174    }
1175
1176    async fn is_empty(&self) -> Result<bool> {
1177        match self {
1178            #[cfg(feature = "sled")]
1179            StorageList::Sled(list) => list.is_empty().await,
1180            #[cfg(feature = "redis")]
1181            StorageList::Redis(list) => list.is_empty().await,
1182            #[cfg(feature = "redis-cluster")]
1183            StorageList::RedisCluster(list) => list.is_empty().await,
1184        }
1185    }
1186
1187    async fn clear(&self) -> Result<()> {
1188        match self {
1189            #[cfg(feature = "sled")]
1190            StorageList::Sled(list) => list.clear().await,
1191            #[cfg(feature = "redis")]
1192            StorageList::Redis(list) => list.clear().await,
1193            #[cfg(feature = "redis-cluster")]
1194            StorageList::RedisCluster(list) => list.clear().await,
1195        }
1196    }
1197
1198    async fn iter<'a, V>(
1199        &'a mut self,
1200    ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
1201    where
1202        V: DeserializeOwned + Sync + Send + 'a + 'static,
1203    {
1204        match self {
1205            #[cfg(feature = "sled")]
1206            StorageList::Sled(list) => list.iter().await,
1207            #[cfg(feature = "redis")]
1208            StorageList::Redis(list) => list.iter().await,
1209            #[cfg(feature = "redis-cluster")]
1210            StorageList::RedisCluster(list) => list.iter().await,
1211        }
1212    }
1213
1214    #[cfg(feature = "ttl")]
1215    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1216        match self {
1217            #[cfg(feature = "sled")]
1218            StorageList::Sled(l) => l.expire_at(at).await,
1219            #[cfg(feature = "redis")]
1220            StorageList::Redis(l) => l.expire_at(at).await,
1221            #[cfg(feature = "redis-cluster")]
1222            StorageList::RedisCluster(l) => l.expire_at(at).await,
1223        }
1224    }
1225
1226    #[cfg(feature = "ttl")]
1227    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1228        match self {
1229            #[cfg(feature = "sled")]
1230            StorageList::Sled(l) => l.expire(dur).await,
1231            #[cfg(feature = "redis")]
1232            StorageList::Redis(l) => l.expire(dur).await,
1233            #[cfg(feature = "redis-cluster")]
1234            StorageList::RedisCluster(l) => l.expire(dur).await,
1235        }
1236    }
1237
1238    #[cfg(feature = "ttl")]
1239    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1240        match self {
1241            #[cfg(feature = "sled")]
1242            StorageList::Sled(l) => l.ttl().await,
1243            #[cfg(feature = "redis")]
1244            StorageList::Redis(l) => l.ttl().await,
1245            #[cfg(feature = "redis-cluster")]
1246            StorageList::RedisCluster(l) => l.ttl().await,
1247        }
1248    }
1249}