1use core::fmt;
12
13use async_trait::async_trait;
14use serde::de::DeserializeOwned;
15use serde::Serialize;
16
17#[cfg(feature = "redis")]
18use crate::storage_redis::{RedisStorageDB, RedisStorageList, RedisStorageMap};
19#[cfg(feature = "redis-cluster")]
20use crate::storage_redis_cluster::{
21 RedisStorageDB as RedisClusterStorageDB, RedisStorageList as RedisClusterStorageList,
22 RedisStorageMap as RedisClusterStorageMap,
23};
24#[cfg(feature = "sled")]
25use crate::storage_sled::{SledStorageDB, SledStorageList, SledStorageMap};
26use crate::Result;
27
28#[allow(unused_imports)]
29use crate::TimestampMillis;
30
31#[allow(unused)]
32pub(crate) const SEPARATOR: &[u8] = b"@";
33#[allow(unused)]
34pub(crate) const KEY_PREFIX: &[u8] = b"__rmqtt@";
35#[allow(unused)]
36pub(crate) const KEY_PREFIX_LEN: &[u8] = b"__rmqtt_len@";
37#[allow(unused)]
38pub(crate) const MAP_NAME_PREFIX: &[u8] = b"__rmqtt_map@";
39#[allow(unused)]
40pub(crate) const LIST_NAME_PREFIX: &[u8] = b"__rmqtt_list@";
41
42pub type Key = Vec<u8>;
44
45pub type IterItem<V> = Result<(Key, V)>;
47
48#[async_trait]
50pub trait AsyncIterator {
51 type Item;
52
53 async fn next(&mut self) -> Option<Self::Item>;
55}
56
57#[cfg(feature = "sled")]
59pub trait SplitSubslice {
60 fn split_subslice(&self, subslice: &[u8]) -> Option<(&[u8], &[u8])>;
62}
63
64#[cfg(feature = "sled")]
65impl SplitSubslice for [u8] {
66 fn split_subslice(&self, subslice: &[u8]) -> Option<(&[u8], &[u8])> {
67 self.windows(subslice.len())
68 .position(|window| window == subslice)
69 .map(|index| self.split_at(index + subslice.len()))
70 }
71}
72
73#[async_trait]
75#[allow(clippy::len_without_is_empty)]
76pub trait StorageDB: Send + Sync {
77 type MapType: Map;
79
80 type ListType: List;
82
83 async fn map<N: AsRef<[u8]> + Sync + Send>(
85 &self,
86 name: N,
87 expire: Option<TimestampMillis>,
88 ) -> Result<Self::MapType>;
89
90 async fn map_remove<K>(&self, name: K) -> Result<()>
92 where
93 K: AsRef<[u8]> + Sync + Send;
94
95 async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
97
98 async fn list<V: AsRef<[u8]> + Sync + Send>(
100 &self,
101 name: V,
102 expire: Option<TimestampMillis>,
103 ) -> Result<Self::ListType>;
104
105 async fn list_remove<K>(&self, name: K) -> Result<()>
107 where
108 K: AsRef<[u8]> + Sync + Send;
109
110 async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
112
113 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
115 where
116 K: AsRef<[u8]> + Sync + Send,
117 V: serde::ser::Serialize + Sync + Send;
118
119 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
121 where
122 K: AsRef<[u8]> + Sync + Send,
123 V: DeserializeOwned + Sync + Send;
124
125 async fn remove<K>(&self, key: K) -> Result<()>
127 where
128 K: AsRef<[u8]> + Sync + Send;
129
130 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
132 where
133 V: serde::ser::Serialize + Sync + Send;
134
135 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()>;
137
138 async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
140 where
141 K: AsRef<[u8]> + Sync + Send;
142
143 async fn counter_decr<K>(&self, key: K, increment: isize) -> Result<()>
145 where
146 K: AsRef<[u8]> + Sync + Send;
147
148 async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
150 where
151 K: AsRef<[u8]> + Sync + Send;
152
153 async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
155 where
156 K: AsRef<[u8]> + Sync + Send;
157
158 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
160
161 #[cfg(feature = "len")]
163 async fn len(&self) -> Result<usize>;
164
165 async fn db_size(&self) -> Result<usize>;
167
168 #[cfg(feature = "ttl")]
170 async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
171 where
172 K: AsRef<[u8]> + Sync + Send;
173
174 #[cfg(feature = "ttl")]
176 async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
177 where
178 K: AsRef<[u8]> + Sync + Send;
179
180 #[cfg(feature = "ttl")]
182 async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
183 where
184 K: AsRef<[u8]> + Sync + Send;
185
186 async fn map_iter<'a>(
188 &'a mut self,
189 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>>;
190
191 async fn list_iter<'a>(
193 &'a mut self,
194 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>>;
195
196 async fn scan<'a, P>(
198 &'a mut self,
199 pattern: P,
200 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
201 where
202 P: AsRef<[u8]> + Send + Sync;
203
204 async fn info(&self) -> Result<serde_json::Value>;
206}
207
208#[async_trait]
210pub trait Map: Sync + Send {
211 fn name(&self) -> &[u8];
213
214 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
216 where
217 K: AsRef<[u8]> + Sync + Send,
218 V: serde::ser::Serialize + Sync + Send + ?Sized;
219
220 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
222 where
223 K: AsRef<[u8]> + Sync + Send,
224 V: DeserializeOwned + Sync + Send;
225
226 async fn remove<K>(&self, key: K) -> Result<()>
228 where
229 K: AsRef<[u8]> + Sync + Send;
230
231 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool>;
233
234 #[cfg(feature = "map_len")]
236 async fn len(&self) -> Result<usize>;
237
238 async fn is_empty(&self) -> Result<bool>;
240
241 async fn clear(&self) -> Result<()>;
243
244 async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
246 where
247 K: AsRef<[u8]> + Sync + Send,
248 V: DeserializeOwned + Sync + Send;
249
250 async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
252 where
253 K: AsRef<[u8]> + Sync + Send;
254
255 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
257 where
258 V: serde::ser::Serialize + Sync + Send;
259
260 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()>;
262
263 async fn iter<'a, V>(
265 &'a mut self,
266 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
267 where
268 V: DeserializeOwned + Sync + Send + 'a + 'static;
269
270 async fn key_iter<'a>(
272 &'a mut self,
273 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>;
274
275 async fn prefix_iter<'a, P, V>(
277 &'a mut self,
278 prefix: P,
279 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
280 where
281 P: AsRef<[u8]> + Send + Sync,
282 V: DeserializeOwned + Sync + Send + 'a + 'static;
283
284 #[cfg(feature = "ttl")]
286 async fn expire_at(&self, at: TimestampMillis) -> Result<bool>;
287
288 #[cfg(feature = "ttl")]
290 async fn expire(&self, dur: TimestampMillis) -> Result<bool>;
291
292 #[cfg(feature = "ttl")]
294 async fn ttl(&self) -> Result<Option<TimestampMillis>>;
295}
296
297#[async_trait]
299pub trait List: Sync + Send {
300 fn name(&self) -> &[u8];
302
303 async fn push<V>(&self, val: &V) -> Result<()>
305 where
306 V: serde::ser::Serialize + Sync + Send;
307
308 async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
310 where
311 V: serde::ser::Serialize + Sync + Send;
312
313 async fn push_limit<V>(
315 &self,
316 val: &V,
317 limit: usize,
318 pop_front_if_limited: bool,
319 ) -> Result<Option<V>>
320 where
321 V: serde::ser::Serialize + Sync + Send,
322 V: DeserializeOwned;
323
324 async fn pop<V>(&self) -> Result<Option<V>>
326 where
327 V: DeserializeOwned + Sync + Send;
328
329 async fn all<V>(&self) -> Result<Vec<V>>
331 where
332 V: DeserializeOwned + Sync + Send;
333
334 async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
336 where
337 V: DeserializeOwned + Sync + Send;
338
339 async fn len(&self) -> Result<usize>;
341
342 async fn is_empty(&self) -> Result<bool>;
344
345 async fn clear(&self) -> Result<()>;
347
348 async fn iter<'a, V>(
350 &'a mut self,
351 ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
352 where
353 V: DeserializeOwned + Sync + Send + 'a + 'static;
354
355 #[cfg(feature = "ttl")]
357 async fn expire_at(&self, at: TimestampMillis) -> Result<bool>;
358
359 #[cfg(feature = "ttl")]
361 async fn expire(&self, dur: TimestampMillis) -> Result<bool>;
362
363 #[cfg(feature = "ttl")]
365 async fn ttl(&self) -> Result<Option<TimestampMillis>>;
366}
367
368#[derive(Clone)]
370pub enum DefaultStorageDB {
371 #[cfg(feature = "sled")]
372 Sled(SledStorageDB),
374 #[cfg(feature = "redis")]
375 Redis(RedisStorageDB),
377 #[cfg(feature = "redis-cluster")]
378 RedisCluster(RedisClusterStorageDB),
380}
381
382impl DefaultStorageDB {
383 #[inline]
385 pub async fn map<V: AsRef<[u8]> + Sync + Send>(
386 &self,
387 name: V,
388 expire: Option<TimestampMillis>,
389 ) -> Result<StorageMap> {
390 Ok(match self {
391 #[cfg(feature = "sled")]
392 DefaultStorageDB::Sled(db) => StorageMap::Sled(db.map(name, expire).await?),
393 #[cfg(feature = "redis")]
394 DefaultStorageDB::Redis(db) => StorageMap::Redis(db.map(name, expire).await?),
395 #[cfg(feature = "redis-cluster")]
396 DefaultStorageDB::RedisCluster(db) => {
397 StorageMap::RedisCluster(db.map(name, expire).await?)
398 }
399 })
400 }
401
402 #[inline]
404 pub async fn map_remove<K>(&self, name: K) -> Result<()>
405 where
406 K: AsRef<[u8]> + Sync + Send,
407 {
408 match self {
409 #[cfg(feature = "sled")]
410 DefaultStorageDB::Sled(db) => db.map_remove(name).await,
411 #[cfg(feature = "redis")]
412 DefaultStorageDB::Redis(db) => db.map_remove(name).await,
413 #[cfg(feature = "redis-cluster")]
414 DefaultStorageDB::RedisCluster(db) => db.map_remove(name).await,
415 }
416 }
417
418 #[inline]
420 pub async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
421 match self {
422 #[cfg(feature = "sled")]
423 DefaultStorageDB::Sled(db) => db.map_contains_key(key).await,
424 #[cfg(feature = "redis")]
425 DefaultStorageDB::Redis(db) => db.map_contains_key(key).await,
426 #[cfg(feature = "redis-cluster")]
427 DefaultStorageDB::RedisCluster(db) => db.map_contains_key(key).await,
428 }
429 }
430
431 #[inline]
433 pub async fn list<V: AsRef<[u8]> + Sync + Send>(
434 &self,
435 name: V,
436 expire: Option<TimestampMillis>,
437 ) -> Result<StorageList> {
438 Ok(match self {
439 #[cfg(feature = "sled")]
440 DefaultStorageDB::Sled(db) => StorageList::Sled(db.list(name, expire).await?),
441 #[cfg(feature = "redis")]
442 DefaultStorageDB::Redis(db) => StorageList::Redis(db.list(name, expire).await?),
443 #[cfg(feature = "redis-cluster")]
444 DefaultStorageDB::RedisCluster(db) => {
445 StorageList::RedisCluster(db.list(name, expire).await?)
446 }
447 })
448 }
449
450 #[inline]
452 pub async fn list_remove<K>(&self, name: K) -> Result<()>
453 where
454 K: AsRef<[u8]> + Sync + Send,
455 {
456 match self {
457 #[cfg(feature = "sled")]
458 DefaultStorageDB::Sled(db) => db.list_remove(name).await,
459 #[cfg(feature = "redis")]
460 DefaultStorageDB::Redis(db) => db.list_remove(name).await,
461 #[cfg(feature = "redis-cluster")]
462 DefaultStorageDB::RedisCluster(db) => db.list_remove(name).await,
463 }
464 }
465
466 #[inline]
468 pub async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
469 match self {
470 #[cfg(feature = "sled")]
471 DefaultStorageDB::Sled(db) => db.list_contains_key(key).await,
472 #[cfg(feature = "redis")]
473 DefaultStorageDB::Redis(db) => db.list_contains_key(key).await,
474 #[cfg(feature = "redis-cluster")]
475 DefaultStorageDB::RedisCluster(db) => db.list_contains_key(key).await,
476 }
477 }
478
479 #[inline]
481 pub async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
482 where
483 K: AsRef<[u8]> + Sync + Send,
484 V: Serialize + Sync + Send,
485 {
486 match self {
487 #[cfg(feature = "sled")]
488 DefaultStorageDB::Sled(db) => db.insert(key, val).await,
489 #[cfg(feature = "redis")]
490 DefaultStorageDB::Redis(db) => db.insert(key, val).await,
491 #[cfg(feature = "redis-cluster")]
492 DefaultStorageDB::RedisCluster(db) => db.insert(key, val).await,
493 }
494 }
495
496 #[inline]
498 pub async fn get<K, V>(&self, key: K) -> Result<Option<V>>
499 where
500 K: AsRef<[u8]> + Sync + Send,
501 V: DeserializeOwned + Sync + Send,
502 {
503 match self {
504 #[cfg(feature = "sled")]
505 DefaultStorageDB::Sled(db) => db.get(key).await,
506 #[cfg(feature = "redis")]
507 DefaultStorageDB::Redis(db) => db.get(key).await,
508 #[cfg(feature = "redis-cluster")]
509 DefaultStorageDB::RedisCluster(db) => db.get(key).await,
510 }
511 }
512
513 #[inline]
515 pub async fn remove<K>(&self, key: K) -> Result<()>
516 where
517 K: AsRef<[u8]> + Sync + Send,
518 {
519 match self {
520 #[cfg(feature = "sled")]
521 DefaultStorageDB::Sled(db) => db.remove(key).await,
522 #[cfg(feature = "redis")]
523 DefaultStorageDB::Redis(db) => db.remove(key).await,
524 #[cfg(feature = "redis-cluster")]
525 DefaultStorageDB::RedisCluster(db) => db.remove(key).await,
526 }
527 }
528
529 #[inline]
531 pub async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
532 where
533 V: serde::ser::Serialize + Sync + Send,
534 {
535 match self {
536 #[cfg(feature = "sled")]
537 DefaultStorageDB::Sled(db) => db.batch_insert(key_vals).await,
538 #[cfg(feature = "redis")]
539 DefaultStorageDB::Redis(db) => db.batch_insert(key_vals).await,
540 #[cfg(feature = "redis-cluster")]
541 DefaultStorageDB::RedisCluster(db) => db.batch_insert(key_vals).await,
542 }
543 }
544
545 #[inline]
547 pub async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
548 match self {
549 #[cfg(feature = "sled")]
550 DefaultStorageDB::Sled(db) => db.batch_remove(keys).await,
551 #[cfg(feature = "redis")]
552 DefaultStorageDB::Redis(db) => db.batch_remove(keys).await,
553 #[cfg(feature = "redis-cluster")]
554 DefaultStorageDB::RedisCluster(db) => db.batch_remove(keys).await,
555 }
556 }
557
558 #[inline]
560 pub async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
561 where
562 K: AsRef<[u8]> + Sync + Send,
563 {
564 match self {
565 #[cfg(feature = "sled")]
566 DefaultStorageDB::Sled(db) => db.counter_incr(key, increment).await,
567 #[cfg(feature = "redis")]
568 DefaultStorageDB::Redis(db) => db.counter_incr(key, increment).await,
569 #[cfg(feature = "redis-cluster")]
570 DefaultStorageDB::RedisCluster(db) => db.counter_incr(key, increment).await,
571 }
572 }
573
574 #[inline]
576 pub async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
577 where
578 K: AsRef<[u8]> + Sync + Send,
579 {
580 match self {
581 #[cfg(feature = "sled")]
582 DefaultStorageDB::Sled(db) => db.counter_decr(key, decrement).await,
583 #[cfg(feature = "redis")]
584 DefaultStorageDB::Redis(db) => db.counter_decr(key, decrement).await,
585 #[cfg(feature = "redis-cluster")]
586 DefaultStorageDB::RedisCluster(db) => db.counter_decr(key, decrement).await,
587 }
588 }
589
590 #[inline]
592 pub async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
593 where
594 K: AsRef<[u8]> + Sync + Send,
595 {
596 match self {
597 #[cfg(feature = "sled")]
598 DefaultStorageDB::Sled(db) => db.counter_get(key).await,
599 #[cfg(feature = "redis")]
600 DefaultStorageDB::Redis(db) => db.counter_get(key).await,
601 #[cfg(feature = "redis-cluster")]
602 DefaultStorageDB::RedisCluster(db) => db.counter_get(key).await,
603 }
604 }
605
606 #[inline]
608 pub async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
609 where
610 K: AsRef<[u8]> + Sync + Send,
611 {
612 match self {
613 #[cfg(feature = "sled")]
614 DefaultStorageDB::Sled(db) => db.counter_set(key, val).await,
615 #[cfg(feature = "redis")]
616 DefaultStorageDB::Redis(db) => db.counter_set(key, val).await,
617 #[cfg(feature = "redis-cluster")]
618 DefaultStorageDB::RedisCluster(db) => db.counter_set(key, val).await,
619 }
620 }
621
622 #[inline]
624 #[cfg(feature = "len")]
625 pub async fn len(&self) -> Result<usize> {
626 match self {
627 #[cfg(feature = "sled")]
628 DefaultStorageDB::Sled(db) => db.len().await,
629 #[cfg(feature = "redis")]
630 DefaultStorageDB::Redis(db) => db.len().await,
631 #[cfg(feature = "redis-cluster")]
632 DefaultStorageDB::RedisCluster(db) => db.len().await,
633 }
634 }
635
636 #[inline]
638 pub async fn db_size(&self) -> Result<usize> {
639 match self {
640 #[cfg(feature = "sled")]
641 DefaultStorageDB::Sled(db) => db.db_size().await,
642 #[cfg(feature = "redis")]
643 DefaultStorageDB::Redis(db) => db.db_size().await,
644 #[cfg(feature = "redis-cluster")]
645 DefaultStorageDB::RedisCluster(db) => db.db_size().await,
646 }
647 }
648
649 #[inline]
651 pub async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
652 match self {
653 #[cfg(feature = "sled")]
654 DefaultStorageDB::Sled(db) => db.contains_key(key).await,
655 #[cfg(feature = "redis")]
656 DefaultStorageDB::Redis(db) => db.contains_key(key).await,
657 #[cfg(feature = "redis-cluster")]
658 DefaultStorageDB::RedisCluster(db) => db.contains_key(key).await,
659 }
660 }
661
662 #[inline]
664 #[cfg(feature = "ttl")]
665 pub async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
666 where
667 K: AsRef<[u8]> + Sync + Send,
668 {
669 match self {
670 #[cfg(feature = "sled")]
671 DefaultStorageDB::Sled(db) => db.expire_at(key, at).await,
672 #[cfg(feature = "redis")]
673 DefaultStorageDB::Redis(db) => db.expire_at(key, at).await,
674 #[cfg(feature = "redis-cluster")]
675 DefaultStorageDB::RedisCluster(db) => db.expire_at(key, at).await,
676 }
677 }
678
679 #[inline]
681 #[cfg(feature = "ttl")]
682 pub async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
683 where
684 K: AsRef<[u8]> + Sync + Send,
685 {
686 match self {
687 #[cfg(feature = "sled")]
688 DefaultStorageDB::Sled(db) => db.expire(key, dur).await,
689 #[cfg(feature = "redis")]
690 DefaultStorageDB::Redis(db) => db.expire(key, dur).await,
691 #[cfg(feature = "redis-cluster")]
692 DefaultStorageDB::RedisCluster(db) => db.expire(key, dur).await,
693 }
694 }
695
696 #[inline]
698 #[cfg(feature = "ttl")]
699 pub async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
700 where
701 K: AsRef<[u8]> + Sync + Send,
702 {
703 match self {
704 #[cfg(feature = "sled")]
705 DefaultStorageDB::Sled(db) => db.ttl(key).await,
706 #[cfg(feature = "redis")]
707 DefaultStorageDB::Redis(db) => db.ttl(key).await,
708 #[cfg(feature = "redis-cluster")]
709 DefaultStorageDB::RedisCluster(db) => db.ttl(key).await,
710 }
711 }
712
713 #[inline]
715 pub async fn map_iter<'a>(
716 &'a mut self,
717 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
718 match self {
719 #[cfg(feature = "sled")]
720 DefaultStorageDB::Sled(db) => db.map_iter().await,
721 #[cfg(feature = "redis")]
722 DefaultStorageDB::Redis(db) => db.map_iter().await,
723 #[cfg(feature = "redis-cluster")]
724 DefaultStorageDB::RedisCluster(db) => db.map_iter().await,
725 }
726 }
727
728 #[inline]
730 pub async fn list_iter<'a>(
731 &'a mut self,
732 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
733 match self {
734 #[cfg(feature = "sled")]
735 DefaultStorageDB::Sled(db) => db.list_iter().await,
736 #[cfg(feature = "redis")]
737 DefaultStorageDB::Redis(db) => db.list_iter().await,
738 #[cfg(feature = "redis-cluster")]
739 DefaultStorageDB::RedisCluster(db) => db.list_iter().await,
740 }
741 }
742
743 #[inline]
745 pub async fn scan<'a, P>(
746 &'a mut self,
747 pattern: P,
748 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
749 where
750 P: AsRef<[u8]> + Send + Sync,
751 {
752 match self {
753 #[cfg(feature = "sled")]
754 DefaultStorageDB::Sled(db) => db.scan(pattern).await,
755 #[cfg(feature = "redis")]
756 DefaultStorageDB::Redis(db) => db.scan(pattern).await,
757 #[cfg(feature = "redis-cluster")]
758 DefaultStorageDB::RedisCluster(db) => db.scan(pattern).await,
759 }
760 }
761
762 #[inline]
764 pub async fn info(&self) -> Result<serde_json::Value> {
765 match self {
766 #[cfg(feature = "sled")]
767 DefaultStorageDB::Sled(db) => db.info().await,
768 #[cfg(feature = "redis")]
769 DefaultStorageDB::Redis(db) => db.info().await,
770 #[cfg(feature = "redis-cluster")]
771 DefaultStorageDB::RedisCluster(db) => db.info().await,
772 }
773 }
774}
775
776#[derive(Clone)]
778pub enum StorageMap {
779 #[cfg(feature = "sled")]
780 Sled(SledStorageMap),
782 #[cfg(feature = "redis")]
783 Redis(RedisStorageMap),
785 #[cfg(feature = "redis-cluster")]
786 RedisCluster(RedisClusterStorageMap),
788}
789
790#[async_trait]
791impl Map for StorageMap {
792 fn name(&self) -> &[u8] {
793 match self {
794 #[cfg(feature = "sled")]
795 StorageMap::Sled(m) => m.name(),
796 #[cfg(feature = "redis")]
797 StorageMap::Redis(m) => m.name(),
798 #[cfg(feature = "redis-cluster")]
799 StorageMap::RedisCluster(m) => m.name(),
800 }
801 }
802
803 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
804 where
805 K: AsRef<[u8]> + Sync + Send,
806 V: Serialize + Sync + Send + ?Sized,
807 {
808 match self {
809 #[cfg(feature = "sled")]
810 StorageMap::Sled(m) => m.insert(key, val).await,
811 #[cfg(feature = "redis")]
812 StorageMap::Redis(m) => m.insert(key, val).await,
813 #[cfg(feature = "redis-cluster")]
814 StorageMap::RedisCluster(m) => m.insert(key, val).await,
815 }
816 }
817
818 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
819 where
820 K: AsRef<[u8]> + Sync + Send,
821 V: DeserializeOwned + Sync + Send,
822 {
823 match self {
824 #[cfg(feature = "sled")]
825 StorageMap::Sled(m) => m.get(key).await,
826 #[cfg(feature = "redis")]
827 StorageMap::Redis(m) => m.get(key).await,
828 #[cfg(feature = "redis-cluster")]
829 StorageMap::RedisCluster(m) => m.get(key).await,
830 }
831 }
832
833 async fn remove<K>(&self, key: K) -> Result<()>
834 where
835 K: AsRef<[u8]> + Sync + Send,
836 {
837 match self {
838 #[cfg(feature = "sled")]
839 StorageMap::Sled(m) => m.remove(key).await,
840 #[cfg(feature = "redis")]
841 StorageMap::Redis(m) => m.remove(key).await,
842 #[cfg(feature = "redis-cluster")]
843 StorageMap::RedisCluster(m) => m.remove(key).await,
844 }
845 }
846
847 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
848 match self {
849 #[cfg(feature = "sled")]
850 StorageMap::Sled(m) => m.contains_key(key).await,
851 #[cfg(feature = "redis")]
852 StorageMap::Redis(m) => m.contains_key(key).await,
853 #[cfg(feature = "redis-cluster")]
854 StorageMap::RedisCluster(m) => m.contains_key(key).await,
855 }
856 }
857
858 #[cfg(feature = "map_len")]
859 async fn len(&self) -> Result<usize> {
860 match self {
861 #[cfg(feature = "sled")]
862 StorageMap::Sled(m) => m.len().await,
863 #[cfg(feature = "redis")]
864 StorageMap::Redis(m) => m.len().await,
865 #[cfg(feature = "redis-cluster")]
866 StorageMap::RedisCluster(m) => m.len().await,
867 }
868 }
869
870 async fn is_empty(&self) -> Result<bool> {
871 match self {
872 #[cfg(feature = "sled")]
873 StorageMap::Sled(m) => m.is_empty().await,
874 #[cfg(feature = "redis")]
875 StorageMap::Redis(m) => m.is_empty().await,
876 #[cfg(feature = "redis-cluster")]
877 StorageMap::RedisCluster(m) => m.is_empty().await,
878 }
879 }
880
881 async fn clear(&self) -> Result<()> {
882 match self {
883 #[cfg(feature = "sled")]
884 StorageMap::Sled(m) => m.clear().await,
885 #[cfg(feature = "redis")]
886 StorageMap::Redis(m) => m.clear().await,
887 #[cfg(feature = "redis-cluster")]
888 StorageMap::RedisCluster(m) => m.clear().await,
889 }
890 }
891
892 async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
893 where
894 K: AsRef<[u8]> + Sync + Send,
895 V: DeserializeOwned + Sync + Send,
896 {
897 match self {
898 #[cfg(feature = "sled")]
899 StorageMap::Sled(m) => m.remove_and_fetch(key).await,
900 #[cfg(feature = "redis")]
901 StorageMap::Redis(m) => m.remove_and_fetch(key).await,
902 #[cfg(feature = "redis-cluster")]
903 StorageMap::RedisCluster(m) => m.remove_and_fetch(key).await,
904 }
905 }
906
907 async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
908 where
909 K: AsRef<[u8]> + Sync + Send,
910 {
911 match self {
912 #[cfg(feature = "sled")]
913 StorageMap::Sled(m) => m.remove_with_prefix(prefix).await,
914 #[cfg(feature = "redis")]
915 StorageMap::Redis(m) => m.remove_with_prefix(prefix).await,
916 #[cfg(feature = "redis-cluster")]
917 StorageMap::RedisCluster(m) => m.remove_with_prefix(prefix).await,
918 }
919 }
920
921 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
922 where
923 V: Serialize + Sync + Send,
924 {
925 match self {
926 #[cfg(feature = "sled")]
927 StorageMap::Sled(m) => m.batch_insert(key_vals).await,
928 #[cfg(feature = "redis")]
929 StorageMap::Redis(m) => m.batch_insert(key_vals).await,
930 #[cfg(feature = "redis-cluster")]
931 StorageMap::RedisCluster(m) => m.batch_insert(key_vals).await,
932 }
933 }
934
935 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
936 match self {
937 #[cfg(feature = "sled")]
938 StorageMap::Sled(m) => m.batch_remove(keys).await,
939 #[cfg(feature = "redis")]
940 StorageMap::Redis(m) => m.batch_remove(keys).await,
941 #[cfg(feature = "redis-cluster")]
942 StorageMap::RedisCluster(m) => m.batch_remove(keys).await,
943 }
944 }
945
946 async fn iter<'a, V>(
947 &'a mut self,
948 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
949 where
950 V: DeserializeOwned + Sync + Send + 'a + 'static,
951 {
952 match self {
953 #[cfg(feature = "sled")]
954 StorageMap::Sled(m) => m.iter().await,
955 #[cfg(feature = "redis")]
956 StorageMap::Redis(m) => m.iter().await,
957 #[cfg(feature = "redis-cluster")]
958 StorageMap::RedisCluster(m) => m.iter().await,
959 }
960 }
961
962 async fn key_iter<'a>(
963 &'a mut self,
964 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
965 match self {
966 #[cfg(feature = "sled")]
967 StorageMap::Sled(m) => m.key_iter().await,
968 #[cfg(feature = "redis")]
969 StorageMap::Redis(m) => m.key_iter().await,
970 #[cfg(feature = "redis-cluster")]
971 StorageMap::RedisCluster(m) => m.key_iter().await,
972 }
973 }
974
975 async fn prefix_iter<'a, P, V>(
976 &'a mut self,
977 prefix: P,
978 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
979 where
980 P: AsRef<[u8]> + Send + Sync,
981 V: DeserializeOwned + Sync + Send + 'a + 'static,
982 {
983 match self {
984 #[cfg(feature = "sled")]
985 StorageMap::Sled(m) => m.prefix_iter(prefix).await,
986 #[cfg(feature = "redis")]
987 StorageMap::Redis(m) => m.prefix_iter(prefix).await,
988 #[cfg(feature = "redis-cluster")]
989 StorageMap::RedisCluster(m) => m.prefix_iter(prefix).await,
990 }
991 }
992
993 #[cfg(feature = "ttl")]
994 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
995 match self {
996 #[cfg(feature = "sled")]
997 StorageMap::Sled(m) => m.expire_at(at).await,
998 #[cfg(feature = "redis")]
999 StorageMap::Redis(m) => m.expire_at(at).await,
1000 #[cfg(feature = "redis-cluster")]
1001 StorageMap::RedisCluster(m) => m.expire_at(at).await,
1002 }
1003 }
1004
1005 #[cfg(feature = "ttl")]
1006 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1007 match self {
1008 #[cfg(feature = "sled")]
1009 StorageMap::Sled(m) => m.expire(dur).await,
1010 #[cfg(feature = "redis")]
1011 StorageMap::Redis(m) => m.expire(dur).await,
1012 #[cfg(feature = "redis-cluster")]
1013 StorageMap::RedisCluster(m) => m.expire(dur).await,
1014 }
1015 }
1016
1017 #[cfg(feature = "ttl")]
1018 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1019 match self {
1020 #[cfg(feature = "sled")]
1021 StorageMap::Sled(m) => m.ttl().await,
1022 #[cfg(feature = "redis")]
1023 StorageMap::Redis(m) => m.ttl().await,
1024 #[cfg(feature = "redis-cluster")]
1025 StorageMap::RedisCluster(m) => m.ttl().await,
1026 }
1027 }
1028}
1029
1030#[derive(Clone)]
1032pub enum StorageList {
1033 #[cfg(feature = "sled")]
1034 Sled(SledStorageList),
1036 #[cfg(feature = "redis")]
1037 Redis(RedisStorageList),
1039 #[cfg(feature = "redis-cluster")]
1040 RedisCluster(RedisClusterStorageList),
1042}
1043
1044impl fmt::Debug for StorageList {
1045 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046 let name = match self {
1047 #[cfg(feature = "sled")]
1048 StorageList::Sled(list) => list.name(),
1049 #[cfg(feature = "redis")]
1050 StorageList::Redis(list) => list.name(),
1051 #[cfg(feature = "redis-cluster")]
1052 StorageList::RedisCluster(list) => list.name(),
1053 };
1054
1055 f.debug_tuple(&format!("StorageList({:?})", String::from_utf8_lossy(name)))
1056 .finish()
1057 }
1058}
1059
1060#[async_trait]
1061impl List for StorageList {
1062 fn name(&self) -> &[u8] {
1063 match self {
1064 #[cfg(feature = "sled")]
1065 StorageList::Sled(m) => m.name(),
1066 #[cfg(feature = "redis")]
1067 StorageList::Redis(m) => m.name(),
1068 #[cfg(feature = "redis-cluster")]
1069 StorageList::RedisCluster(m) => m.name(),
1070 }
1071 }
1072
1073 async fn push<V>(&self, val: &V) -> Result<()>
1074 where
1075 V: Serialize + Sync + Send,
1076 {
1077 match self {
1078 #[cfg(feature = "sled")]
1079 StorageList::Sled(list) => list.push(val).await,
1080 #[cfg(feature = "redis")]
1081 StorageList::Redis(list) => list.push(val).await,
1082 #[cfg(feature = "redis-cluster")]
1083 StorageList::RedisCluster(list) => list.push(val).await,
1084 }
1085 }
1086
1087 async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
1088 where
1089 V: serde::ser::Serialize + Sync + Send,
1090 {
1091 match self {
1092 #[cfg(feature = "sled")]
1093 StorageList::Sled(list) => list.pushs(vals).await,
1094 #[cfg(feature = "redis")]
1095 StorageList::Redis(list) => list.pushs(vals).await,
1096 #[cfg(feature = "redis-cluster")]
1097 StorageList::RedisCluster(list) => list.pushs(vals).await,
1098 }
1099 }
1100
1101 async fn push_limit<V>(
1102 &self,
1103 val: &V,
1104 limit: usize,
1105 pop_front_if_limited: bool,
1106 ) -> Result<Option<V>>
1107 where
1108 V: Serialize + Sync + Send,
1109 V: DeserializeOwned,
1110 {
1111 match self {
1112 #[cfg(feature = "sled")]
1113 StorageList::Sled(list) => list.push_limit(val, limit, pop_front_if_limited).await,
1114 #[cfg(feature = "redis")]
1115 StorageList::Redis(list) => list.push_limit(val, limit, pop_front_if_limited).await,
1116 #[cfg(feature = "redis-cluster")]
1117 StorageList::RedisCluster(list) => {
1118 list.push_limit(val, limit, pop_front_if_limited).await
1119 }
1120 }
1121 }
1122
1123 async fn pop<V>(&self) -> Result<Option<V>>
1124 where
1125 V: DeserializeOwned + Sync + Send,
1126 {
1127 match self {
1128 #[cfg(feature = "sled")]
1129 StorageList::Sled(list) => list.pop().await,
1130 #[cfg(feature = "redis")]
1131 StorageList::Redis(list) => list.pop().await,
1132 #[cfg(feature = "redis-cluster")]
1133 StorageList::RedisCluster(list) => list.pop().await,
1134 }
1135 }
1136
1137 async fn all<V>(&self) -> Result<Vec<V>>
1138 where
1139 V: DeserializeOwned + Sync + Send,
1140 {
1141 match self {
1142 #[cfg(feature = "sled")]
1143 StorageList::Sled(list) => list.all().await,
1144 #[cfg(feature = "redis")]
1145 StorageList::Redis(list) => list.all().await,
1146 #[cfg(feature = "redis-cluster")]
1147 StorageList::RedisCluster(list) => list.all().await,
1148 }
1149 }
1150
1151 async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
1152 where
1153 V: DeserializeOwned + Sync + Send,
1154 {
1155 match self {
1156 #[cfg(feature = "sled")]
1157 StorageList::Sled(list) => list.get_index(idx).await,
1158 #[cfg(feature = "redis")]
1159 StorageList::Redis(list) => list.get_index(idx).await,
1160 #[cfg(feature = "redis-cluster")]
1161 StorageList::RedisCluster(list) => list.get_index(idx).await,
1162 }
1163 }
1164
1165 async fn len(&self) -> Result<usize> {
1166 match self {
1167 #[cfg(feature = "sled")]
1168 StorageList::Sled(list) => list.len().await,
1169 #[cfg(feature = "redis")]
1170 StorageList::Redis(list) => list.len().await,
1171 #[cfg(feature = "redis-cluster")]
1172 StorageList::RedisCluster(list) => list.len().await,
1173 }
1174 }
1175
1176 async fn is_empty(&self) -> Result<bool> {
1177 match self {
1178 #[cfg(feature = "sled")]
1179 StorageList::Sled(list) => list.is_empty().await,
1180 #[cfg(feature = "redis")]
1181 StorageList::Redis(list) => list.is_empty().await,
1182 #[cfg(feature = "redis-cluster")]
1183 StorageList::RedisCluster(list) => list.is_empty().await,
1184 }
1185 }
1186
1187 async fn clear(&self) -> Result<()> {
1188 match self {
1189 #[cfg(feature = "sled")]
1190 StorageList::Sled(list) => list.clear().await,
1191 #[cfg(feature = "redis")]
1192 StorageList::Redis(list) => list.clear().await,
1193 #[cfg(feature = "redis-cluster")]
1194 StorageList::RedisCluster(list) => list.clear().await,
1195 }
1196 }
1197
1198 async fn iter<'a, V>(
1199 &'a mut self,
1200 ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
1201 where
1202 V: DeserializeOwned + Sync + Send + 'a + 'static,
1203 {
1204 match self {
1205 #[cfg(feature = "sled")]
1206 StorageList::Sled(list) => list.iter().await,
1207 #[cfg(feature = "redis")]
1208 StorageList::Redis(list) => list.iter().await,
1209 #[cfg(feature = "redis-cluster")]
1210 StorageList::RedisCluster(list) => list.iter().await,
1211 }
1212 }
1213
1214 #[cfg(feature = "ttl")]
1215 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
1216 match self {
1217 #[cfg(feature = "sled")]
1218 StorageList::Sled(l) => l.expire_at(at).await,
1219 #[cfg(feature = "redis")]
1220 StorageList::Redis(l) => l.expire_at(at).await,
1221 #[cfg(feature = "redis-cluster")]
1222 StorageList::RedisCluster(l) => l.expire_at(at).await,
1223 }
1224 }
1225
1226 #[cfg(feature = "ttl")]
1227 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
1228 match self {
1229 #[cfg(feature = "sled")]
1230 StorageList::Sled(l) => l.expire(dur).await,
1231 #[cfg(feature = "redis")]
1232 StorageList::Redis(l) => l.expire(dur).await,
1233 #[cfg(feature = "redis-cluster")]
1234 StorageList::RedisCluster(l) => l.expire(dur).await,
1235 }
1236 }
1237
1238 #[cfg(feature = "ttl")]
1239 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
1240 match self {
1241 #[cfg(feature = "sled")]
1242 StorageList::Sled(l) => l.ttl().await,
1243 #[cfg(feature = "redis")]
1244 StorageList::Redis(l) => l.ttl().await,
1245 #[cfg(feature = "redis-cluster")]
1246 StorageList::RedisCluster(l) => l.ttl().await,
1247 }
1248 }
1249}