rmqtt_storage/
storage_sled.rs

1//! Sled-based persistent storage implementation
2//!
3//! This module provides a persistent storage solution backed by Sled (an embedded database).
4//! It implements key-value storage, maps (dictionaries), and lists (queues) with support for:
5//! - Atomic operations and transactions
6//! - Asynchronous API
7//! - TTL/expiration (optional feature)
8//! - Counters
9//! - Batch operations
10//! - Iterators
11//!
12//! The implementation uses multiple sled trees for different data types and provides
13//! a command-based interface with background processing for concurrent operations.
14
15use core::fmt;
16use std::borrow::Cow;
17use std::fmt::Debug;
18use std::io;
19use std::io::{ErrorKind, Read};
20use std::ops::Deref;
21use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
22use std::sync::Arc;
23
24use anyhow::{anyhow, Error};
25use async_trait::async_trait;
26use convert::Bytesize;
27use serde::de::DeserializeOwned;
28use serde::Deserialize;
29use serde::Serialize;
30use serde_json::Value;
31
32#[allow(unused_imports)]
33use sled::transaction::TransactionResult;
34use sled::transaction::{
35    ConflictableTransactionError, ConflictableTransactionResult, TransactionError,
36    TransactionalTree,
37};
38#[allow(unused_imports)]
39use sled::Transactional;
40use sled::{Batch, IVec, Tree};
41use tokio::runtime::Handle;
42use tokio::sync::mpsc;
43use tokio::sync::oneshot;
44use tokio::task::spawn_blocking;
45
46use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB};
47#[allow(unused_imports)]
48use crate::{timestamp_millis, TimestampMillis};
49use crate::{Result, StorageList, StorageMap};
50
51/// Byte separator used in composite keys
52const SEPARATOR: &[u8] = b"@";
53/// Tree name for key-value storage
54const KV_TREE: &[u8] = b"__kv_tree@";
55/// Tree name for map metadata
56const MAP_TREE: &[u8] = b"__map_tree@";
57/// Tree name for list metadata
58const LIST_TREE: &[u8] = b"__list_tree@";
59/// Tree for tracking expiration times (expire_at => key)
60const EXPIRE_KEYS_TREE: &[u8] = b"__expire_key_tree@";
61/// Tree for tracking key expiration (key => expire_at)
62const KEY_EXPIRE_TREE: &[u8] = b"__key_expire_tree@";
63/// Prefix for map keys
64const MAP_NAME_PREFIX: &[u8] = b"__map@";
65/// Separator between map name and item key
66const MAP_KEY_SEPARATOR: &[u8] = b"@__item@";
67#[allow(dead_code)]
68/// Suffix for map count keys
69const MAP_KEY_COUNT_SUFFIX: &[u8] = b"@__count@";
70
71/// Prefix for list keys
72const LIST_NAME_PREFIX: &[u8] = b"__list@";
73/// Suffix for list count keys
74const LIST_KEY_COUNT_SUFFIX: &[u8] = b"@__count@";
75/// Suffix for list content keys
76const LIST_KEY_CONTENT_SUFFIX: &[u8] = b"@__content@";
77
78/// Enum representing different key types in storage
79#[allow(dead_code)]
80#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
81enum KeyType {
82    /// Key-value pair
83    KV,
84    /// Map structure
85    Map,
86    /// List structure
87    List,
88}
89
90impl KeyType {
91    /// Encodes key type to a single byte
92    #[inline]
93    #[allow(dead_code)]
94    fn encode(&self) -> &[u8] {
95        match self {
96            KeyType::KV => &[1],
97            KeyType::Map => &[2],
98            KeyType::List => &[3],
99        }
100    }
101
102    /// Decodes key type from byte representation
103    #[inline]
104    #[allow(dead_code)]
105    fn decode(v: &[u8]) -> Result<Self> {
106        if v.is_empty() {
107            Err(anyhow!("invalid data"))
108        } else {
109            match v[0] {
110                1 => Ok(KeyType::KV),
111                2 => Ok(KeyType::Map),
112                3 => Ok(KeyType::List),
113                _ => Err(anyhow!("invalid data")),
114            }
115        }
116    }
117}
118
119/// Enum representing all possible storage operations
120enum Command {
121    // Database operations
122    DBInsert(SledStorageDB, Key, Vec<u8>, oneshot::Sender<Result<()>>),
123    DBGet(SledStorageDB, IVec, oneshot::Sender<Result<Option<IVec>>>),
124    DBRemove(SledStorageDB, IVec, oneshot::Sender<Result<()>>),
125    DBMapNew(
126        SledStorageDB,
127        IVec,
128        Option<TimestampMillis>,
129        oneshot::Sender<Result<SledStorageMap>>,
130    ),
131    DBMapRemove(SledStorageDB, IVec, oneshot::Sender<Result<()>>),
132    DBMapContainsKey(SledStorageDB, IVec, oneshot::Sender<Result<bool>>),
133    DBListNew(
134        SledStorageDB,
135        IVec,
136        Option<TimestampMillis>,
137        oneshot::Sender<Result<SledStorageList>>,
138    ),
139    DBListRemove(SledStorageDB, IVec, oneshot::Sender<Result<()>>),
140    DBListContainsKey(SledStorageDB, IVec, oneshot::Sender<Result<bool>>),
141    DBBatchInsert(SledStorageDB, Vec<(Key, IVec)>, oneshot::Sender<Result<()>>),
142    DBBatchRemove(SledStorageDB, Vec<Key>, oneshot::Sender<Result<()>>),
143    DBCounterIncr(SledStorageDB, IVec, isize, oneshot::Sender<Result<()>>),
144    DBCounterDecr(SledStorageDB, IVec, isize, oneshot::Sender<Result<()>>),
145    DBCounterGet(SledStorageDB, IVec, oneshot::Sender<Result<Option<isize>>>),
146    DBCounterSet(SledStorageDB, IVec, isize, oneshot::Sender<Result<()>>),
147    DBContainsKey(SledStorageDB, IVec, oneshot::Sender<Result<bool>>),
148    #[cfg(feature = "ttl")]
149    DBExpireAt(
150        SledStorageDB,
151        IVec,
152        TimestampMillis,
153        oneshot::Sender<Result<bool>>,
154    ),
155    #[cfg(feature = "ttl")]
156    DBTtl(
157        SledStorageDB,
158        IVec,
159        oneshot::Sender<Result<Option<TimestampMillis>>>,
160    ),
161    DBMapPrefixIter(SledStorageDB, oneshot::Sender<sled::Iter>),
162    DBListPrefixIter(SledStorageDB, oneshot::Sender<sled::Iter>),
163    DBScanIter(SledStorageDB, Vec<u8>, oneshot::Sender<sled::Iter>),
164    #[allow(dead_code)]
165    DBLen(SledStorageDB, oneshot::Sender<usize>),
166    DBSize(SledStorageDB, oneshot::Sender<usize>),
167
168    // Map operations
169    MapInsert(SledStorageMap, IVec, IVec, oneshot::Sender<Result<()>>),
170    MapGet(SledStorageMap, IVec, oneshot::Sender<Result<Option<IVec>>>),
171    MapRemove(SledStorageMap, IVec, oneshot::Sender<Result<()>>),
172    MapContainsKey(SledStorageMap, IVec, oneshot::Sender<Result<bool>>),
173    #[cfg(feature = "map_len")]
174    MapLen(SledStorageMap, oneshot::Sender<Result<usize>>),
175    MapIsEmpty(SledStorageMap, oneshot::Sender<Result<bool>>),
176    MapClear(SledStorageMap, oneshot::Sender<Result<()>>),
177    MapRemoveAndFetch(SledStorageMap, IVec, oneshot::Sender<Result<Option<IVec>>>),
178    MapRemoveWithPrefix(SledStorageMap, IVec, oneshot::Sender<Result<()>>),
179    MapBatchInsert(
180        SledStorageMap,
181        Vec<(IVec, IVec)>,
182        oneshot::Sender<Result<()>>,
183    ),
184    MapBatchRemove(SledStorageMap, Vec<IVec>, oneshot::Sender<Result<()>>),
185    #[cfg(feature = "ttl")]
186    MapExpireAt(
187        SledStorageMap,
188        TimestampMillis,
189        oneshot::Sender<Result<bool>>,
190    ),
191    #[cfg(feature = "ttl")]
192    MapTTL(
193        SledStorageMap,
194        oneshot::Sender<Result<Option<TimestampMillis>>>,
195    ),
196    MapIsExpired(SledStorageMap, oneshot::Sender<Result<bool>>),
197    MapPrefixIter(SledStorageMap, Option<IVec>, oneshot::Sender<sled::Iter>),
198
199    // List operations
200    ListPush(SledStorageList, IVec, oneshot::Sender<Result<()>>),
201    ListPushs(SledStorageList, Vec<IVec>, oneshot::Sender<Result<()>>),
202    ListPushLimit(
203        SledStorageList,
204        IVec,
205        usize,
206        bool,
207        oneshot::Sender<Result<Option<IVec>>>,
208    ),
209    ListPop(SledStorageList, oneshot::Sender<Result<Option<IVec>>>),
210    ListAll(SledStorageList, oneshot::Sender<Result<Vec<IVec>>>),
211    ListGetIndex(
212        SledStorageList,
213        usize,
214        oneshot::Sender<Result<Option<IVec>>>,
215    ),
216    ListLen(SledStorageList, oneshot::Sender<Result<usize>>),
217    ListIsEmpty(SledStorageList, oneshot::Sender<Result<bool>>),
218    ListClear(SledStorageList, oneshot::Sender<Result<()>>),
219    #[cfg(feature = "ttl")]
220    ListExpireAt(
221        SledStorageList,
222        TimestampMillis,
223        oneshot::Sender<Result<bool>>,
224    ),
225    #[cfg(feature = "ttl")]
226    ListTTL(
227        SledStorageList,
228        oneshot::Sender<Result<Option<TimestampMillis>>>,
229    ),
230    ListIsExpired(SledStorageList, oneshot::Sender<Result<bool>>),
231    ListPrefixIter(SledStorageList, oneshot::Sender<sled::Iter>),
232
233    // Iterator operation
234    #[allow(clippy::type_complexity)]
235    IterNext(
236        sled::Iter,
237        oneshot::Sender<(sled::Iter, Option<sled::Result<(IVec, IVec)>>)>,
238    ),
239}
240
241/// Type alias for cleanup function signature
242pub type CleanupFun = fn(&SledStorageDB);
243
244/// Default cleanup function that runs in background thread
245fn def_cleanup(_db: &SledStorageDB) {
246    #[cfg(feature = "ttl")]
247    {
248        let db = _db.clone();
249        std::thread::spawn(move || {
250            let limit = 500;
251            loop {
252                std::thread::sleep(std::time::Duration::from_secs(10));
253                let mut total_cleanups = 0;
254                let now = std::time::Instant::now();
255                loop {
256                    let now = std::time::Instant::now();
257                    let count = db.cleanup(limit);
258                    total_cleanups += count;
259                    if count > 0 {
260                        log::debug!(
261                            "def_cleanup: {}, total cleanups: {}, active_count(): {}, cost time: {:?}",
262                            count,
263                            total_cleanups,
264                            db.active_count(),
265                            now.elapsed()
266                        );
267                    }
268                    if count < limit {
269                        break;
270                    }
271                    if db.active_count() > 50 {
272                        std::thread::sleep(std::time::Duration::from_millis(500));
273                    } else {
274                        std::thread::sleep(std::time::Duration::from_millis(0));
275                    }
276                }
277                if now.elapsed().as_secs() > 3 {
278                    log::info!(
279                        "total cleanups: {}, cost time: {:?}",
280                        total_cleanups,
281                        now.elapsed()
282                    );
283                }
284            }
285        });
286    }
287}
288
289/// Configuration for Sled storage backend
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct SledConfig {
292    /// Path to database directory
293    pub path: String,
294    /// Cache capacity in bytes
295    pub cache_capacity: Bytesize,
296    /// Cleanup function for expired keys
297    #[serde(skip, default = "SledConfig::cleanup_f_default")]
298    pub cleanup_f: CleanupFun,
299}
300
301impl Default for SledConfig {
302    fn default() -> Self {
303        SledConfig {
304            path: String::default(),
305            cache_capacity: Bytesize::from(1024 * 1024 * 1024),
306            cleanup_f: def_cleanup,
307        }
308    }
309}
310
311impl SledConfig {
312    /// Converts to Sled's native configuration
313    #[inline]
314    pub fn to_sled_config(&self) -> Result<sled::Config> {
315        if self.path.trim().is_empty() {
316            return Err(Error::msg("storage dir is empty"));
317        }
318        let sled_cfg = sled::Config::default()
319            .path(self.path.trim())
320            .cache_capacity(self.cache_capacity.as_u64())
321            .mode(sled::Mode::HighThroughput);
322        Ok(sled_cfg)
323    }
324
325    /// Returns default cleanup function
326    #[inline]
327    fn cleanup_f_default() -> CleanupFun {
328        def_cleanup
329    }
330}
331
332/// Increments a counter value stored in bytes
333fn _increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
334    let number = match old {
335        Some(bytes) => {
336            if let Ok(array) = bytes.try_into() {
337                let number = isize::from_be_bytes(array);
338                number + 1
339            } else {
340                1
341            }
342        }
343        None => 1,
344    };
345
346    Some(number.to_be_bytes().to_vec())
347}
348
349/// Decrements a counter value stored in bytes
350fn _decrement(old: Option<&[u8]>) -> Option<Vec<u8>> {
351    let number = match old {
352        Some(bytes) => {
353            if let Ok(array) = bytes.try_into() {
354                let number = isize::from_be_bytes(array);
355                number - 1
356            } else {
357                -1
358            }
359        }
360        None => -1,
361    };
362
363    Some(number.to_be_bytes().to_vec())
364}
365
366/// Pattern for matching keys with wildcards
367#[derive(Clone)]
368pub struct Pattern(Arc<Vec<PatternChar>>);
369
370impl Deref for Pattern {
371    type Target = Vec<PatternChar>;
372
373    fn deref(&self) -> &Self::Target {
374        &self.0
375    }
376}
377
378impl From<&str> for Pattern {
379    fn from(pattern: &str) -> Self {
380        Pattern::parse(pattern.as_bytes())
381    }
382}
383
384impl From<&[u8]> for Pattern {
385    fn from(pattern: &[u8]) -> Self {
386        Pattern::parse(pattern)
387    }
388}
389
390/// Represents a single character in a pattern
391#[derive(Clone)]
392pub enum PatternChar {
393    /// Literal byte
394    Literal(u8),
395    /// Wildcard matching zero or more characters
396    Wildcard,
397    /// Matches any single character
398    AnyChar,
399}
400
401impl Pattern {
402    /// Parses a byte pattern into PatternChar sequence
403    pub fn parse(pattern: &[u8]) -> Self {
404        let mut parsed_pattern = Vec::new();
405        let mut chars = pattern.bytes().peekable();
406
407        while let Some(Ok(c)) = chars.next() {
408            if c == b'\\' {
409                if let Some(Ok(next_char)) = chars.next() {
410                    match next_char {
411                        b'?' => parsed_pattern.push(PatternChar::Literal(b'?')),
412                        b'*' => parsed_pattern.push(PatternChar::Literal(b'*')),
413                        _ => {
414                            parsed_pattern.push(PatternChar::Literal(b'\\'));
415                            parsed_pattern.push(PatternChar::Literal(next_char));
416                        }
417                    }
418                }
419            } else {
420                match c {
421                    b'?' => parsed_pattern.push(PatternChar::AnyChar),
422                    b'*' => parsed_pattern.push(PatternChar::Wildcard),
423                    _ => parsed_pattern.push(PatternChar::Literal(c)),
424                }
425            }
426        }
427
428        Pattern(Arc::new(parsed_pattern))
429    }
430}
431
432/// Checks if text matches the given pattern
433fn is_match<P: Into<Pattern>>(pattern: P, text: &[u8]) -> bool {
434    let pattern = pattern.into();
435    let text_chars = text;
436    let pattern_len = pattern.len();
437    let text_len = text_chars.len();
438
439    let mut dp = vec![vec![false; text_len + 1]; pattern_len + 1];
440    dp[0][0] = true;
441
442    for i in 1..=pattern_len {
443        if let PatternChar::Wildcard = pattern[i - 1] {
444            dp[i][0] = dp[i - 1][0];
445        }
446        for j in 1..=text_len {
447            match pattern[i - 1] {
448                PatternChar::Wildcard => {
449                    dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
450                }
451                PatternChar::AnyChar | PatternChar::Literal(_) => {
452                    if let PatternChar::Literal(c) = pattern[i - 1] {
453                        dp[i][j] = (c == b'?' || c == text_chars[j - 1]) && dp[i - 1][j - 1];
454                    } else {
455                        dp[i][j] = dp[i - 1][j - 1];
456                    }
457                }
458            }
459        }
460    }
461
462    dp[pattern_len][text_len]
463}
464
465/// Trait for byte replacement
466pub trait BytesReplace {
467    /// Replaces all occurrences of `from` with `to` in the byte slice
468    fn replace(self, from: &[u8], to: &[u8]) -> Vec<u8>;
469}
470
471impl BytesReplace for &[u8] {
472    fn replace(self, from: &[u8], to: &[u8]) -> Vec<u8> {
473        let input = self;
474        let mut result = Vec::new();
475        let mut i = 0;
476        while i < input.len() {
477            if input[i..].starts_with(from) {
478                result.extend_from_slice(to);
479                i += from.len();
480            } else {
481                result.push(input[i]);
482                i += 1;
483            }
484        }
485        result
486    }
487}
488
489/// Main database handle for Sled storage
490#[derive(Clone)]
491pub struct SledStorageDB {
492    /// Underlying sled database
493    pub(crate) db: Arc<sled::Db>,
494    /// Tree for key-value storage
495    pub(crate) kv_tree: sled::Tree,
496    /// Tree for map metadata
497    pub(crate) map_tree: sled::Tree,
498    /// Tree for list metadata
499    pub(crate) list_tree: sled::Tree,
500    /// Tree for tracking expiration times
501    #[allow(dead_code)]
502    pub(crate) expire_key_tree: sled::Tree,
503    /// Tree for tracking key expiration
504    #[allow(dead_code)]
505    pub(crate) key_expire_tree: sled::Tree,
506    /// Channel sender for commands
507    cmd_tx: mpsc::Sender<Command>,
508    /// Count of active commands
509    active_count: Arc<AtomicIsize>,
510}
511
512impl SledStorageDB {
513    /// Creates a new SledStorageDB instance
514    #[inline]
515    pub(crate) async fn new(cfg: SledConfig) -> Result<Self> {
516        let sled_cfg = cfg.to_sled_config()?;
517        let (db, kv_tree, map_tree, list_tree, expire_key_tree, key_expire_tree) =
518            sled_cfg.open().map(|db| {
519                let kv_tree = db.open_tree(KV_TREE);
520                let map_tree = db.open_tree(MAP_TREE);
521                let list_tree = db.open_tree(LIST_TREE);
522                let expire_key_tree = db.open_tree(EXPIRE_KEYS_TREE);
523                let key_expire_tree = db.open_tree(KEY_EXPIRE_TREE);
524                (
525                    Arc::new(db),
526                    kv_tree,
527                    map_tree,
528                    list_tree,
529                    expire_key_tree,
530                    key_expire_tree,
531                )
532            })?;
533        let kv_tree = kv_tree?;
534        let map_tree = map_tree?;
535        let list_tree = list_tree?;
536        let expire_key_tree = expire_key_tree?;
537        let key_expire_tree = key_expire_tree?;
538        let active_count = Arc::new(AtomicIsize::new(0));
539        let active_count1 = active_count.clone();
540
541        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::channel::<Command>(300_000);
542        spawn_blocking(move || {
543            Handle::current().block_on(async move {
544                while let Some(cmd) = cmd_rx.recv().await {
545                    let err = anyhow::Error::msg("send result fail");
546                    let snd_res = match cmd {
547                        Command::DBInsert(db, key, val, res_tx) => res_tx
548                            .send(db._insert(key.as_slice(), val.as_slice()))
549                            .map_err(|_| err),
550                        Command::DBGet(db, key, res_tx) => {
551                            res_tx.send(db._get(key.as_ref())).map_err(|_| err)
552                        }
553                        Command::DBRemove(db, key, res_tx) => {
554                            res_tx.send(db._kv_remove(key.as_ref())).map_err(|_| err)
555                        }
556                        Command::DBMapNew(db, name, expire_ms, res_tx) => {
557                            let map =
558                                SledStorageMap::_new_expire(name.as_ref().to_vec(), expire_ms, db);
559                            res_tx.send(map).map_err(|_| err)
560                        }
561                        Command::DBMapRemove(db, name, res_tx) => {
562                            res_tx.send(db._map_remove(name.as_ref())).map_err(|_| err)
563                        }
564                        Command::DBMapContainsKey(db, key, res_tx) => res_tx
565                            .send(db._self_map_contains_key(key.as_ref()))
566                            .map_err(|_| err),
567                        Command::DBListNew(db, name, expire_ms, res_tx) => {
568                            let list =
569                                SledStorageList::_new_expire(name.as_ref().to_vec(), expire_ms, db);
570                            res_tx.send(list).map_err(|_| err)
571                        }
572                        Command::DBListRemove(db, name, res_tx) => {
573                            res_tx.send(db._list_remove(name.as_ref())).map_err(|_| err)
574                        }
575                        Command::DBListContainsKey(db, key, res_tx) => res_tx
576                            .send(db._self_list_contains_key(key.as_ref()))
577                            .map_err(|_| err),
578                        Command::DBBatchInsert(db, key_vals, res_tx) => {
579                            res_tx.send(db._batch_insert(key_vals)).map_err(|_| err)
580                        }
581                        Command::DBBatchRemove(db, keys, res_tx) => {
582                            res_tx.send(db._batch_remove(keys)).map_err(|_| err)
583                        }
584                        Command::DBCounterIncr(db, key, increment, res_tx) => res_tx
585                            .send(db._counter_incr(key.as_ref(), increment))
586                            .map_err(|_| err),
587                        Command::DBCounterDecr(db, key, increment, res_tx) => res_tx
588                            .send(db._counter_decr(key.as_ref(), increment))
589                            .map_err(|_| err),
590                        Command::DBCounterGet(db, key, res_tx) => {
591                            res_tx.send(db._counter_get(key.as_ref())).map_err(|_| err)
592                        }
593                        Command::DBCounterSet(db, key, val, res_tx) => res_tx
594                            .send(db._counter_set(key.as_ref(), val))
595                            .map_err(|_| err),
596                        Command::DBContainsKey(db, key, res_tx) => res_tx
597                            .send(db._self_contains_key(key.as_ref()))
598                            .map_err(|_| err),
599                        #[cfg(feature = "ttl")]
600                        Command::DBExpireAt(db, key, at, res_tx) => res_tx
601                            .send(db._expire_at(key.as_ref(), at, KeyType::KV))
602                            .map_err(|_| err),
603                        #[cfg(feature = "ttl")]
604                        Command::DBTtl(db, key, res_tx) => {
605                            res_tx.send(db._self_ttl(key.as_ref())).map_err(|_| err)
606                        }
607                        Command::DBMapPrefixIter(db, res_tx) => {
608                            res_tx.send(db._map_scan_prefix()).map_err(|_| err)
609                        }
610                        Command::DBListPrefixIter(db, res_tx) => {
611                            res_tx.send(db._list_scan_prefix()).map_err(|_| err)
612                        }
613                        Command::DBScanIter(db, pattern, res_tx) => {
614                            res_tx.send(db._db_scan_prefix(pattern)).map_err(|_| err)
615                        }
616                        Command::DBLen(db, res_tx) => res_tx.send(db._kv_len()).map_err(|_| err),
617                        Command::DBSize(db, res_tx) => res_tx.send(db._db_size()).map_err(|_| err),
618
619                        Command::MapInsert(map, key, val, res_tx) => {
620                            res_tx.send(map._insert(key, val)).map_err(|_| err)
621                        }
622                        Command::MapGet(map, key, res_tx) => {
623                            res_tx.send(map._get(key)).map_err(|_| err)
624                        }
625                        Command::MapRemove(map, key, res_tx) => {
626                            res_tx.send(map._remove(key)).map_err(|_| err)
627                        }
628                        Command::MapContainsKey(map, key, res_tx) => {
629                            res_tx.send(map._contains_key(key)).map_err(|_| err)
630                        }
631                        #[cfg(feature = "map_len")]
632                        Command::MapLen(map, res_tx) => res_tx.send(map._len()).map_err(|_| err),
633                        Command::MapIsEmpty(map, res_tx) => {
634                            res_tx.send(map._is_empty()).map_err(|_| err)
635                        }
636                        Command::MapClear(map, res_tx) => {
637                            res_tx.send(map._clear()).map_err(|_| err)
638                        }
639                        Command::MapRemoveAndFetch(map, key, res_tx) => {
640                            res_tx.send(map._remove_and_fetch(key)).map_err(|_| err)
641                        }
642                        Command::MapRemoveWithPrefix(map, key, res_tx) => {
643                            res_tx.send(map._remove_with_prefix(key)).map_err(|_| err)
644                        }
645                        Command::MapBatchInsert(map, key_vals, res_tx) => {
646                            res_tx.send(map._batch_insert(key_vals)).map_err(|_| err)
647                        }
648                        Command::MapBatchRemove(map, keys, res_tx) => {
649                            res_tx.send(map._batch_remove(keys)).map_err(|_| err)
650                        }
651                        #[cfg(feature = "ttl")]
652                        Command::MapExpireAt(map, at, res_tx) => {
653                            res_tx.send(map._expire_at(at)).map_err(|_| err)
654                        }
655                        #[cfg(feature = "ttl")]
656                        Command::MapTTL(map, res_tx) => res_tx.send(map._ttl()).map_err(|_| err),
657                        Command::MapIsExpired(map, res_tx) => {
658                            res_tx.send(map._is_expired()).map_err(|_| err)
659                        }
660                        Command::MapPrefixIter(map, prefix, res_tx) => {
661                            res_tx.send(map._prefix_iter(prefix)).map_err(|_| err)
662                        }
663
664                        Command::ListPush(list, val, res_tx) => {
665                            res_tx.send(list._push(val)).map_err(|_| err)
666                        }
667                        Command::ListPushs(list, vals, res_tx) => {
668                            res_tx.send(list._pushs(vals)).map_err(|_| err)
669                        }
670                        Command::ListPushLimit(list, data, limit, pop_front_if_limited, res_tx) => {
671                            res_tx
672                                .send(list._push_limit(data, limit, pop_front_if_limited))
673                                .map_err(|_| err)
674                        }
675                        Command::ListPop(list, res_tx) => res_tx.send(list._pop()).map_err(|_| err),
676                        Command::ListAll(list, res_tx) => res_tx.send(list._all()).map_err(|_| err),
677                        Command::ListGetIndex(list, idx, res_tx) => {
678                            res_tx.send(list._get_index(idx)).map_err(|_| err)
679                        }
680                        Command::ListLen(list, res_tx) => res_tx.send(list._len()).map_err(|_| err),
681                        Command::ListIsEmpty(list, res_tx) => {
682                            res_tx.send(list._is_empty()).map_err(|_| err)
683                        }
684                        Command::ListClear(list, res_tx) => {
685                            res_tx.send(list._clear()).map_err(|_| err)
686                        }
687                        #[cfg(feature = "ttl")]
688                        Command::ListExpireAt(list, at, res_tx) => {
689                            res_tx.send(list._expire_at(at)).map_err(|_| err)
690                        }
691                        #[cfg(feature = "ttl")]
692                        Command::ListTTL(list, res_tx) => res_tx.send(list._ttl()).map_err(|_| err),
693                        Command::ListIsExpired(list, res_tx) => {
694                            res_tx.send(list._is_expired()).map_err(|_| err)
695                        }
696                        Command::ListPrefixIter(list, res_tx) => {
697                            res_tx.send(list._prefix_iter()).map_err(|_| err)
698                        }
699
700                        Command::IterNext(mut iter, res_tx) => {
701                            let item = iter.next();
702                            res_tx.send((iter, item)).map_err(|_| err)
703                        }
704                    };
705
706                    if let Err(e) = snd_res {
707                        log::error!("{:?}", e);
708                    }
709
710                    active_count1.fetch_sub(1, Ordering::Relaxed);
711                }
712            })
713        });
714
715        let db = Self {
716            db,
717            kv_tree,
718            map_tree,
719            list_tree,
720            expire_key_tree,
721            key_expire_tree,
722            cmd_tx,
723            active_count,
724        };
725
726        (cfg.cleanup_f)(&db);
727
728        Ok(db)
729    }
730
731    /// Cleans up expired keys (TTL feature)
732    #[cfg(feature = "ttl")]
733    #[inline]
734    pub fn cleanup(&self, limit: usize) -> usize {
735        let rmeove = |typ: &KeyType, key: &[u8]| -> Result<()> {
736            match typ {
737                KeyType::Map => {
738                    self._map(key.as_ref())._clear()?;
739                }
740                KeyType::List => {
741                    self._list(key.as_ref())._clear()?;
742                }
743                KeyType::KV => {
744                    self.kv_tree.remove(key.as_ref())?;
745                }
746            }
747            Ok(())
748        };
749        let mut count = 0;
750        let mut expire_at_key_types = Vec::new();
751        for item in self.expire_key_tree.iter() {
752            if count > limit {
753                break;
754            }
755            let (expire_at_key, key_type) = match item {
756                Ok(item) => item,
757                Err(e) => {
758                    log::error!("{:?}", e);
759                    break;
760                }
761            };
762
763            let (expire_at_bytes, _) = expire_at_key.as_ref().split_at(8);
764
765            let expire_at = match expire_at_bytes.try_into() {
766                Ok(at) => i64::from_be_bytes(at),
767                Err(e) => {
768                    log::error!("{:?}", e);
769                    break;
770                }
771            };
772
773            if expire_at > timestamp_millis() {
774                break;
775            }
776
777            let key_type = match KeyType::decode(key_type.as_ref()) {
778                Ok(key_type) => key_type,
779                Err(e) => {
780                    log::error!("{:?}", e);
781                    break;
782                }
783            };
784
785            expire_at_key_types.push((expire_at_key, key_type));
786            count += 1;
787        }
788
789        let mut key_expire_batch = sled::Batch::default();
790        let mut expire_key_batch = sled::Batch::default();
791        let keys: Vec<(&[u8], &KeyType)> = expire_at_key_types
792            .iter()
793            .map(|(expire_at_key, key_type)| {
794                let (_, key) = expire_at_key.as_ref().split_at(8);
795                key_expire_batch.remove(key);
796                expire_key_batch.remove(expire_at_key);
797                (key, key_type)
798            })
799            .collect();
800
801        for (key, key_type) in keys {
802            if let Err(e) = rmeove(key_type, key) {
803                log::error!("{:?}", e);
804            }
805        }
806
807        // if let Err(e) = self.key_expire_tree.apply_batch(key_expire_batch) {
808        //     log::error!("{:?}", e);
809        // }
810        // if let Err(e) = self.expire_key_tree.apply_batch(expire_key_batch) {
811        //     log::error!("{:?}", e);
812        // }
813
814        if let Err(e) = (&self.key_expire_tree, &self.expire_key_tree).transaction(
815            |(key_expire_tx, expire_key_tx)| {
816                key_expire_tx.apply_batch(&key_expire_batch)?;
817                expire_key_tx.apply_batch(&expire_key_batch)?;
818                Ok::<_, ConflictableTransactionError<()>>(())
819            },
820        ) {
821            log::error!("{:?}", e);
822        }
823        count
824    }
825
826    /// Cleans up expired key-value pairs (TTL feature)
827    #[cfg(feature = "ttl")]
828    #[inline]
829    pub fn cleanup_kvs(&self, limit: usize) -> usize {
830        let mut count = 0;
831        let mut expire_at_key_types = Vec::new();
832        for item in self.expire_key_tree.iter() {
833            if count > limit {
834                break;
835            }
836            let (expire_at_key, key_type) = match item {
837                Ok(item) => item,
838                Err(e) => {
839                    log::error!("{:?}", e);
840                    break;
841                }
842            };
843
844            let (expire_at_bytes, _) = expire_at_key.as_ref().split_at(8);
845
846            let expire_at = match expire_at_bytes.try_into() {
847                Ok(at) => i64::from_be_bytes(at),
848                Err(e) => {
849                    log::error!("{:?}", e);
850                    break;
851                }
852            };
853
854            if expire_at > timestamp_millis() {
855                break;
856            }
857
858            let key_type = match KeyType::decode(key_type.as_ref()) {
859                Ok(key_type) => key_type,
860                Err(e) => {
861                    log::error!("{:?}", e);
862                    break;
863                }
864            };
865
866            if matches!(key_type, KeyType::KV) {
867                expire_at_key_types.push(expire_at_key);
868                count += 1;
869            }
870        }
871
872        let mut key_expire_batch = sled::Batch::default();
873        let mut expire_key_batch = sled::Batch::default();
874        let mut keys = Batch::default();
875        for expire_at_key in expire_at_key_types {
876            let (_, key) = expire_at_key.as_ref().split_at(8);
877            key_expire_batch.remove(key);
878            expire_key_batch.remove(expire_at_key.as_ref());
879            keys.remove(key);
880        }
881
882        if let Err(e) = (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree).transaction(
883            |(kv_tx, key_expire_tx, expire_key_tx)| {
884                kv_tx.apply_batch(&keys)?;
885                key_expire_tx.apply_batch(&key_expire_batch)?;
886                expire_key_tx.apply_batch(&expire_key_batch)?;
887                Ok::<_, ConflictableTransactionError<()>>(())
888            },
889        ) {
890            log::error!("{:?}", e);
891        }
892        count
893    }
894
895    /// Returns the count of active commands
896    #[inline]
897    pub fn active_count(&self) -> isize {
898        self.active_count.load(Ordering::Relaxed)
899    }
900
901    // #[inline]
902    // pub fn map_size(&self) -> usize {
903    //     self.map_tree.len()
904    // }
905    //
906    // #[inline]
907    // pub fn list_size(&self) -> usize {
908    //     self.list_tree.len()
909    // }
910
911    /// Creates a map prefix name
912    #[inline]
913    fn make_map_prefix_name<K>(name: K) -> Key
914    where
915        K: AsRef<[u8]>,
916    {
917        [MAP_NAME_PREFIX, name.as_ref(), SEPARATOR].concat()
918    }
919
920    /// Creates a map item prefix name
921    #[inline]
922    fn make_map_item_prefix_name<K>(name: K) -> Key
923    where
924        K: AsRef<[u8]>,
925    {
926        [MAP_NAME_PREFIX, name.as_ref(), MAP_KEY_SEPARATOR].concat()
927    }
928
929    /// Creates a map count key name
930    #[inline]
931    fn make_map_count_key_name<K>(name: K) -> Key
932    where
933        K: AsRef<[u8]>,
934    {
935        [MAP_NAME_PREFIX, name.as_ref(), MAP_KEY_COUNT_SUFFIX].concat()
936    }
937
938    /// Extracts map name from count key
939    #[inline]
940    fn map_count_key_to_name(key: &[u8]) -> &[u8] {
941        key[MAP_NAME_PREFIX.len()..key.as_ref().len() - MAP_KEY_COUNT_SUFFIX.len()].as_ref()
942    }
943
944    /// Checks if a key is a map count key
945    #[inline]
946    fn is_map_count_key(key: &[u8]) -> bool {
947        key.starts_with(MAP_NAME_PREFIX) && key.ends_with(MAP_KEY_COUNT_SUFFIX)
948    }
949
950    /// Extracts map name from item key
951    #[allow(dead_code)]
952    #[inline]
953    fn map_item_key_to_name(key: &[u8]) -> Option<&[u8]> {
954        use super::storage::SplitSubslice;
955        if let Some((prefix, _)) = key.split_subslice(MAP_KEY_SEPARATOR) {
956            if prefix.starts_with(MAP_NAME_PREFIX) {
957                return Some(
958                    prefix[MAP_NAME_PREFIX.len()..(prefix.len() - MAP_KEY_SEPARATOR.len())]
959                        .as_ref(),
960                );
961            }
962        }
963        None
964    }
965
966    /// Creates a list prefix
967    #[inline]
968    fn make_list_prefix<K>(name: K) -> Key
969    where
970        K: AsRef<[u8]>,
971    {
972        [LIST_NAME_PREFIX, name.as_ref()].concat()
973    }
974
975    /// Creates a list count key
976    #[inline]
977    fn make_list_count_key(name: &[u8]) -> Vec<u8> {
978        [LIST_NAME_PREFIX, name, LIST_KEY_COUNT_SUFFIX].concat()
979    }
980
981    /// Extracts list name from count key
982    #[inline]
983    fn list_count_key_to_name(key: &[u8]) -> &[u8] {
984        key[LIST_NAME_PREFIX.len()..key.as_ref().len() - LIST_KEY_COUNT_SUFFIX.len()].as_ref()
985    }
986
987    /// Checks if a key is a list count key
988    #[inline]
989    fn is_list_count_key(key: &[u8]) -> bool {
990        key.starts_with(LIST_NAME_PREFIX) && key.ends_with(LIST_KEY_COUNT_SUFFIX)
991    }
992
993    /// Checks if a key exists for a specific key type
994    #[inline]
995    fn _contains_key<K: AsRef<[u8]> + Sync + Send>(
996        &self,
997        key: K,
998        key_type: KeyType,
999    ) -> Result<bool> {
1000        match key_type {
1001            KeyType::KV => Self::_kv_contains_key(&self.kv_tree, key),
1002            KeyType::Map => Self::_map_contains_key(&self.map_tree, key),
1003            KeyType::List => Self::_list_contains_key(&self.list_tree, key),
1004        }
1005    }
1006
1007    /// Checks if a key exists in key-value store
1008    #[inline]
1009    fn _kv_contains_key<K: AsRef<[u8]> + Sync + Send>(kv: &Tree, key: K) -> Result<bool> {
1010        Ok(kv.contains_key(key.as_ref())?)
1011    }
1012
1013    /// Checks if a map exists
1014    #[inline]
1015    fn _map_contains_key<K: AsRef<[u8]> + Sync + Send>(tree: &Tree, key: K) -> Result<bool> {
1016        let count_key = SledStorageDB::make_map_count_key_name(key.as_ref());
1017        Ok(tree.contains_key(count_key)?)
1018    }
1019
1020    /// Checks if a list exists
1021    #[inline]
1022    fn _list_contains_key<K: AsRef<[u8]> + Sync + Send>(tree: &Tree, name: K) -> Result<bool> {
1023        let count_key = SledStorageDB::make_list_count_key(name.as_ref());
1024        Ok(tree.contains_key(count_key)?)
1025    }
1026
1027    /// Removes a map
1028    #[inline]
1029    fn _map_remove<K>(&self, key: K) -> Result<()>
1030    where
1031        K: AsRef<[u8]>,
1032    {
1033        #[cfg(not(feature = "ttl"))]
1034        self._map(key.as_ref())._clear()?;
1035        #[cfg(feature = "ttl")]
1036        {
1037            let map = self._map(key.as_ref());
1038            let map_clear_batch = map._make_clear_batch();
1039            (&self.map_tree, &self.key_expire_tree, &self.expire_key_tree)
1040                .transaction(|(map_tx, key_expire_tx, expire_key_tx)| {
1041                    map._tx_clear(map_tx, &map_clear_batch)?;
1042                    Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key.as_ref())?;
1043                    Ok::<(), ConflictableTransactionError<()>>(())
1044                })
1045                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1046        }
1047        Ok(())
1048    }
1049
1050    /// Removes a list
1051    #[inline]
1052    fn _list_remove<K>(&self, key: K) -> Result<()>
1053    where
1054        K: AsRef<[u8]>,
1055    {
1056        #[cfg(not(feature = "ttl"))]
1057        self._list(key.as_ref())._clear()?;
1058        #[cfg(feature = "ttl")]
1059        {
1060            let list = self._list(key.as_ref());
1061            let list_clear_batch = list._make_clear_batch();
1062            (
1063                &self.list_tree,
1064                &self.key_expire_tree,
1065                &self.expire_key_tree,
1066            )
1067                .transaction(|(list_tx, key_expire_tx, expire_key_tx)| {
1068                    SledStorageList::_tx_clear(list_tx, &list_clear_batch)?;
1069                    Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key.as_ref())?;
1070                    Ok::<(), ConflictableTransactionError<()>>(())
1071                })
1072                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1073        }
1074        Ok(())
1075    }
1076
1077    /// Removes a key-value pair
1078    #[inline]
1079    fn _kv_remove<K>(&self, key: K) -> Result<()>
1080    where
1081        K: AsRef<[u8]>,
1082    {
1083        #[cfg(not(feature = "ttl"))]
1084        self.kv_tree.remove(key.as_ref())?;
1085        #[cfg(feature = "ttl")]
1086        {
1087            (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1088                .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1089                    kv_tx.remove(key.as_ref())?;
1090                    Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key.as_ref())?;
1091                    Ok::<(), ConflictableTransactionError<()>>(())
1092                })
1093                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1094        }
1095        Ok(())
1096    }
1097
1098    /// Removes expiration key (TTL feature)
1099    #[cfg(feature = "ttl")]
1100    #[inline]
1101    fn _remove_expire_key(&self, key: &[u8]) -> Result<()> {
1102        if let Some(expire_at_bytes) = self.key_expire_tree.get(key)? {
1103            self.key_expire_tree.remove(key)?;
1104            let expire_key = [expire_at_bytes.as_ref(), key].concat();
1105            self.expire_key_tree.remove(expire_key.as_slice())?;
1106        }
1107        Ok(())
1108    }
1109
1110    /// Transactionally removes expiration key (TTL feature)
1111    #[cfg(feature = "ttl")]
1112    #[inline]
1113    fn _tx_remove_expire_key(
1114        key_expire_tx: &TransactionalTree,
1115        expire_key_tx: &TransactionalTree,
1116        key: &[u8],
1117    ) -> ConflictableTransactionResult<()> {
1118        if let Some(expire_at_bytes) = key_expire_tx.get(key)? {
1119            key_expire_tx.remove(key)?;
1120            let expire_key = [expire_at_bytes.as_ref(), key].concat();
1121            expire_key_tx.remove(expire_key.as_slice())?;
1122        }
1123        Ok(())
1124    }
1125
1126    /// Checks if a key is expired
1127    #[inline]
1128    fn _is_expired<K, F>(&self, _key: K, _contains_key_f: F) -> Result<bool>
1129    where
1130        K: AsRef<[u8]> + Sync + Send,
1131        F: Fn(&[u8]) -> Result<bool>,
1132    {
1133        #[cfg(feature = "ttl")]
1134        {
1135            if let Some((expire_at, _)) = self._ttl_at(_key, _contains_key_f)? {
1136                Ok(timestamp_millis() >= expire_at)
1137            } else {
1138                Ok(true)
1139            }
1140        }
1141        #[cfg(not(feature = "ttl"))]
1142        Ok(false)
1143    }
1144
1145    /// Gets time-to-live for a key
1146    #[inline]
1147    fn _ttl<K, F>(
1148        &self,
1149        key: K,
1150        contains_key_f: F,
1151    ) -> Result<Option<(TimestampMillis, Option<IVec>)>>
1152    where
1153        K: AsRef<[u8]> + Sync + Send,
1154        F: Fn(&[u8]) -> Result<bool>,
1155    {
1156        Ok(self
1157            ._ttl_at(key, contains_key_f)?
1158            .map(|(expire_at, at_bytes)| (expire_at - timestamp_millis(), at_bytes)))
1159    }
1160
1161    /// Gets expiration time for a key
1162    #[inline]
1163    fn _ttl_at<K, F>(
1164        &self,
1165        c_key: K,
1166        contains_key_f: F,
1167    ) -> Result<Option<(TimestampMillis, Option<IVec>)>>
1168    where
1169        K: AsRef<[u8]> + Sync + Send,
1170        F: Fn(&[u8]) -> Result<bool>,
1171    {
1172        let ttl_res = match self.key_expire_tree.get(c_key.as_ref()) {
1173            Ok(Some(at_bytes)) => {
1174                if contains_key_f(c_key.as_ref())? {
1175                    Ok(Some((
1176                        TimestampMillis::from_be_bytes(at_bytes.as_ref().try_into()?),
1177                        Some(at_bytes),
1178                    )))
1179                } else {
1180                    Ok(None)
1181                }
1182            }
1183            Ok(None) => {
1184                if contains_key_f(c_key.as_ref())? {
1185                    Ok(Some((TimestampMillis::MAX, None)))
1186                } else {
1187                    Ok(None)
1188                }
1189            }
1190            Err(e) => Err(anyhow!(e)),
1191        }?;
1192        Ok(ttl_res)
1193    }
1194
1195    /// Inserts a key-value pair
1196    #[inline]
1197    fn _insert(&self, key: &[u8], val: &[u8]) -> Result<()> {
1198        #[cfg(not(feature = "ttl"))]
1199        self.kv_tree.insert(key, val)?;
1200        #[cfg(feature = "ttl")]
1201        {
1202            (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1203                .transaction(|(kv_tx, key_expire_tx, expire_keys_tx)| {
1204                    kv_tx.insert(key, val)?;
1205                    Self::_tx_remove_expire_key(key_expire_tx, expire_keys_tx, key)?;
1206                    Ok::<(), ConflictableTransactionError<()>>(())
1207                })
1208                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1209        }
1210        Ok(())
1211    }
1212
1213    /// Gets a value by key
1214    #[inline]
1215    fn _get(&self, key: &[u8]) -> Result<Option<IVec>> {
1216        let res = if self._is_expired(key.as_ref(), |k| Self::_kv_contains_key(&self.kv_tree, k))? {
1217            None
1218        } else {
1219            self.kv_tree.get(key)?
1220        };
1221        Ok(res)
1222    }
1223
1224    /// Checks if a map key exists
1225    #[inline]
1226    fn _self_map_contains_key(&self, key: &[u8]) -> Result<bool> {
1227        #[cfg(feature = "ttl")]
1228        {
1229            if self._is_expired(key, |k| Self::_map_contains_key(&self.map_tree, k))? {
1230                Ok(false)
1231            } else {
1232                //Self::_map_contains_key(&self.map_tree, key)
1233                Ok(true)
1234            }
1235        }
1236
1237        #[cfg(not(feature = "ttl"))]
1238        Self::_map_contains_key(&self.map_tree, key)
1239    }
1240
1241    /// Checks if a list key exists
1242    #[inline]
1243    fn _self_list_contains_key(&self, key: &[u8]) -> Result<bool> {
1244        #[cfg(feature = "ttl")]
1245        {
1246            let this = self;
1247            if this._is_expired(key, |k| Self::_list_contains_key(&self.list_tree, k))? {
1248                Ok(false)
1249            } else {
1250                // Self::_list_contains_key(&this.list_tree, key)
1251                Ok(true)
1252            }
1253        }
1254
1255        #[cfg(not(feature = "ttl"))]
1256        Self::_list_contains_key(&self.list_tree, key)
1257    }
1258
1259    /// Batch insert key-value pairs
1260    #[inline]
1261    fn _batch_insert(&self, key_vals: Vec<(Key, IVec)>) -> Result<()> {
1262        if key_vals.is_empty() {
1263            return Ok(());
1264        }
1265
1266        let mut batch = Batch::default();
1267        for (k, v) in key_vals.iter() {
1268            batch.insert(k.as_slice(), v.as_ref());
1269        }
1270
1271        let this = self;
1272        #[cfg(not(feature = "ttl"))]
1273        this.kv_tree.apply_batch(batch)?;
1274
1275        #[cfg(feature = "ttl")]
1276        {
1277            let mut remove_key_expire_batch = Batch::default();
1278            let mut remove_expire_key_batch = Batch::default();
1279            for (k, _) in key_vals.iter() {
1280                if let Some((expire_at, Some(expire_at_bytes))) =
1281                    this._ttl(k.as_slice(), |k| Self::_kv_contains_key(&self.kv_tree, k))?
1282                {
1283                    if expire_at <= 0 {
1284                        remove_key_expire_batch.remove(k.as_slice());
1285                        let expire_key = [expire_at_bytes.as_ref(), k.as_slice()].concat();
1286                        remove_expire_key_batch.remove(expire_key.as_slice())
1287                    }
1288                }
1289            }
1290
1291            // this.key_expire_tree.apply_batch(remove_key_expire_batch)?;
1292            // this.expire_key_tree.apply_batch(remove_expire_key_batch)?;
1293            // this.kv_tree.apply_batch(batch)?;
1294            (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1295                .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1296                    key_expire_tx.apply_batch(&remove_key_expire_batch)?;
1297                    expire_key_tx.apply_batch(&remove_expire_key_batch)?;
1298                    kv_tx.apply_batch(&batch)?;
1299                    Ok::<(), ConflictableTransactionError<()>>(())
1300                })
1301                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1302        }
1303        Ok(())
1304    }
1305
1306    /// Batch remove keys
1307    #[inline]
1308    fn _batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1309        if keys.is_empty() {
1310            return Ok(());
1311        }
1312
1313        let mut batch = Batch::default();
1314        for k in keys.iter() {
1315            batch.remove(k.as_slice());
1316        }
1317        #[cfg(not(feature = "ttl"))]
1318        self.kv_tree.apply_batch(batch)?;
1319
1320        #[cfg(feature = "ttl")]
1321        {
1322            let mut remove_key_expire_batch = Batch::default();
1323            let mut remove_expire_key_batch = Batch::default();
1324            for k in keys.iter() {
1325                if let Some(expire_at_bytes) = self.key_expire_tree.get(k)? {
1326                    remove_key_expire_batch.remove(k.as_slice());
1327                    let expire_key = [expire_at_bytes.as_ref(), k.as_slice()].concat();
1328                    remove_expire_key_batch.remove(expire_key.as_slice())
1329                }
1330            }
1331            // this.key_expire_tree.apply_batch(remove_key_expire_batch)?;
1332            // this.expire_key_tree.apply_batch(remove_expire_key_batch)?;
1333            // this.kv_tree.apply_batch(batch)?;
1334            (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1335                .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1336                    key_expire_tx.apply_batch(&remove_key_expire_batch)?;
1337                    expire_key_tx.apply_batch(&remove_expire_key_batch)?;
1338                    kv_tx.apply_batch(&batch)?;
1339                    Ok::<(), ConflictableTransactionError<()>>(())
1340                })
1341                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1342        }
1343
1344        Ok(())
1345    }
1346
1347    /// Increments a counter
1348    #[inline]
1349    fn _counter_incr(&self, key: &[u8], increment: isize) -> Result<()> {
1350        self.kv_tree.fetch_and_update(key, |old: Option<&[u8]>| {
1351            let number = match old {
1352                Some(bytes) => {
1353                    if let Ok(array) = bytes.try_into() {
1354                        let number = isize::from_be_bytes(array);
1355                        number + increment
1356                    } else {
1357                        increment
1358                    }
1359                }
1360                None => increment,
1361            };
1362            Some(number.to_be_bytes().to_vec())
1363        })?;
1364        Ok(())
1365    }
1366
1367    /// Decrements a counter
1368    #[inline]
1369    fn _counter_decr(&self, key: &[u8], decrement: isize) -> Result<()> {
1370        self.kv_tree.fetch_and_update(key, |old: Option<&[u8]>| {
1371            let number = match old {
1372                Some(bytes) => {
1373                    if let Ok(array) = bytes.try_into() {
1374                        let number = isize::from_be_bytes(array);
1375                        number - decrement
1376                    } else {
1377                        -decrement
1378                    }
1379                }
1380                None => -decrement,
1381            };
1382            Some(number.to_be_bytes().to_vec())
1383        })?;
1384        Ok(())
1385    }
1386
1387    /// Gets counter value
1388    #[inline]
1389    fn _counter_get(&self, key: &[u8]) -> Result<Option<isize>> {
1390        let this = self;
1391        if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? {
1392            Ok(None)
1393        } else if let Some(v) = this.kv_tree.get(key)? {
1394            Ok(Some(isize::from_be_bytes(v.as_ref().try_into()?)))
1395        } else {
1396            Ok(None)
1397        }
1398    }
1399
1400    /// Sets counter value
1401    #[inline]
1402    fn _counter_set(&self, key: &[u8], val: isize) -> Result<()> {
1403        let val = val.to_be_bytes().to_vec();
1404
1405        #[cfg(not(feature = "ttl"))]
1406        self.kv_tree.insert(key, val.as_slice())?;
1407        #[cfg(feature = "ttl")]
1408        {
1409            // self._remove_expire_key(key)?;
1410            // kv_tree.insert(key, val.as_slice())?;
1411            (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1412                .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1413                    Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key)?;
1414                    kv_tx.insert(key, val.as_slice())?;
1415                    Ok::<(), ConflictableTransactionError<()>>(())
1416                })
1417                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1418        }
1419        Ok(())
1420    }
1421
1422    /// Checks if a key exists in key-value store
1423    #[inline]
1424    fn _self_contains_key(&self, key: &[u8]) -> Result<bool> {
1425        #[cfg(feature = "ttl")]
1426        {
1427            let this = self;
1428            if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? {
1429                Ok(false)
1430            } else {
1431                // this._contains_key(key, KeyType::KV)
1432                Ok(true)
1433            }
1434        }
1435        #[cfg(not(feature = "ttl"))]
1436        Self::_kv_contains_key(&self.kv_tree, key)
1437    }
1438
1439    /// Sets expiration time for a key (TTL feature)
1440    #[inline]
1441    #[cfg(feature = "ttl")]
1442    fn _expire_at(&self, key: &[u8], at: TimestampMillis, key_type: KeyType) -> Result<bool> {
1443        if self._contains_key(key, key_type)? {
1444            let res = (&self.key_expire_tree, &self.expire_key_tree)
1445                .transaction(|(key_expire_tx, expire_key_tx)| {
1446                    Self::_tx_expire_at(key_expire_tx, expire_key_tx, key, at, key_type)
1447                })
1448                .map_err(|e| anyhow!(format!("{:?}", e)))?;
1449            Ok(res)
1450        } else {
1451            Ok(false)
1452        }
1453    }
1454
1455    /// Transactionally sets expiration time (TTL feature)
1456    #[inline]
1457    #[cfg(feature = "ttl")]
1458    fn _tx_expire_at(
1459        key_expire_tx: &TransactionalTree,
1460        expire_key_tx: &TransactionalTree,
1461        key: &[u8],
1462        at: TimestampMillis,
1463        key_type: KeyType,
1464    ) -> ConflictableTransactionResult<bool> {
1465        let at_bytes = at.to_be_bytes();
1466        key_expire_tx.insert(key, at_bytes.as_slice())?;
1467        let res = expire_key_tx
1468            .insert([at_bytes.as_ref(), key].concat(), key_type.encode())
1469            .map(|_| true)?;
1470        Ok(res)
1471    }
1472
1473    /// Gets time-to-live for a key (TTL feature)
1474    #[inline]
1475    #[cfg(feature = "ttl")]
1476    fn _self_ttl(&self, key: &[u8]) -> Result<Option<TimestampMillis>> {
1477        Ok(self
1478            ._ttl(key, |k| Self::_kv_contains_key(&self.kv_tree, k))?
1479            .and_then(|(ttl, _)| if ttl > 0 { Some(ttl) } else { None }))
1480    }
1481
1482    /// Creates an iterator for map prefixes
1483    #[inline]
1484    fn _map_scan_prefix(&self) -> sled::Iter {
1485        self.map_tree.scan_prefix(MAP_NAME_PREFIX)
1486    }
1487
1488    /// Creates an iterator for list prefixes
1489    #[inline]
1490    fn _list_scan_prefix(&self) -> sled::Iter {
1491        self.list_tree.scan_prefix(LIST_NAME_PREFIX)
1492    }
1493
1494    /// Creates an iterator for database scan with pattern
1495    #[inline]
1496    fn _db_scan_prefix(&self, pattern: Vec<u8>) -> sled::Iter {
1497        let mut last_esc_char = false;
1498        let mut has_esc_char = false;
1499        let start_pattern = pattern
1500            .splitn(2, |x| {
1501                if !last_esc_char && (*x == b'*' || *x == b'?') {
1502                    true
1503                } else {
1504                    last_esc_char = *x == b'\\';
1505                    if last_esc_char && !has_esc_char {
1506                        has_esc_char = true;
1507                    }
1508                    false
1509                }
1510            })
1511            .next();
1512        let start_pattern = if has_esc_char {
1513            start_pattern.map(|start_pattern| {
1514                Cow::Owned(
1515                    start_pattern
1516                        .replace(b"\\*", b"*")
1517                        .as_slice()
1518                        .replace(b"\\?", b"?"),
1519                )
1520            })
1521        } else {
1522            start_pattern.map(Cow::Borrowed)
1523        };
1524        let iter = if let Some(start_pattern) = start_pattern {
1525            self.kv_tree.scan_prefix(start_pattern.as_ref())
1526        } else {
1527            self.kv_tree.iter()
1528        };
1529        iter
1530    }
1531
1532    /// Gets number of key-value pairs
1533    #[inline]
1534    fn _kv_len(&self) -> usize {
1535        #[cfg(feature = "ttl")]
1536        {
1537            let limit = 500;
1538            loop {
1539                if self.cleanup_kvs(limit) < limit {
1540                    break;
1541                }
1542            }
1543        }
1544        self.kv_tree.len()
1545    }
1546
1547    /// Gets total database size
1548    #[inline]
1549    fn _db_size(&self) -> usize {
1550        self.db.len() + self.kv_tree.len() + self.map_tree.len() + self.list_tree.len()
1551    }
1552
1553    /// Sends a command to the background processor
1554    #[inline]
1555    async fn cmd_send(&self, cmd: Command) -> Result<()> {
1556        self.active_count.fetch_add(1, Ordering::Relaxed);
1557        if let Err(e) = self.cmd_tx.send(cmd).await {
1558            self.active_count.fetch_sub(1, Ordering::Relaxed);
1559            Err(anyhow!(e))
1560        } else {
1561            Ok(())
1562        }
1563    }
1564
1565    /// Gets a map handle
1566    #[inline]
1567    fn _map<N: AsRef<[u8]>>(&self, name: N) -> SledStorageMap {
1568        SledStorageMap::_new(name.as_ref().to_vec(), self.clone())
1569    }
1570
1571    /// Gets a list handle
1572    #[inline]
1573    fn _list<V: AsRef<[u8]>>(&self, name: V) -> SledStorageList {
1574        SledStorageList::_new(name.as_ref().to_vec(), self.clone())
1575    }
1576}
1577
1578#[async_trait]
1579impl StorageDB for SledStorageDB {
1580    type MapType = SledStorageMap;
1581    type ListType = SledStorageList;
1582
1583    /// Creates or gets a map with optional expiration
1584    #[inline]
1585    async fn map<N: AsRef<[u8]> + Sync + Send>(
1586        &self,
1587        name: N,
1588        expire: Option<TimestampMillis>,
1589    ) -> Result<Self::MapType> {
1590        SledStorageMap::new_expire(name.as_ref().to_vec(), expire, self.clone()).await
1591    }
1592
1593    /// Removes a map
1594    #[inline]
1595    async fn map_remove<K>(&self, name: K) -> Result<()>
1596    where
1597        K: AsRef<[u8]> + Sync + Send,
1598    {
1599        let (tx, rx) = oneshot::channel();
1600        self.cmd_send(Command::DBMapRemove(self.clone(), name.as_ref().into(), tx))
1601            .await?;
1602        rx.await??;
1603        Ok(())
1604    }
1605
1606    /// Checks if a map exists
1607    #[inline]
1608    async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1609        let (tx, rx) = oneshot::channel();
1610        self.cmd_send(Command::DBMapContainsKey(
1611            self.clone(),
1612            key.as_ref().into(),
1613            tx,
1614        ))
1615        .await?;
1616        Ok(rx.await??)
1617    }
1618
1619    /// Creates or gets a list with optional expiration
1620    #[inline]
1621    async fn list<V: AsRef<[u8]> + Sync + Send>(
1622        &self,
1623        name: V,
1624        expire: Option<TimestampMillis>,
1625    ) -> Result<Self::ListType> {
1626        SledStorageList::new_expire(name.as_ref().to_vec(), expire, self.clone()).await
1627    }
1628
1629    /// Removes a list
1630    #[inline]
1631    async fn list_remove<K>(&self, name: K) -> Result<()>
1632    where
1633        K: AsRef<[u8]> + Sync + Send,
1634    {
1635        let (tx, rx) = oneshot::channel();
1636        self.cmd_send(Command::DBListRemove(
1637            self.clone(),
1638            name.as_ref().into(),
1639            tx,
1640        ))
1641        .await?;
1642        rx.await??;
1643        Ok(())
1644    }
1645
1646    /// Checks if a list exists
1647    #[inline]
1648    async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1649        let (tx, rx) = oneshot::channel();
1650        self.cmd_send(Command::DBListContainsKey(
1651            self.clone(),
1652            key.as_ref().into(),
1653            tx,
1654        ))
1655        .await?;
1656        Ok(rx.await??)
1657    }
1658
1659    /// Inserts a key-value pair
1660    #[inline]
1661    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
1662    where
1663        K: AsRef<[u8]> + Sync + Send,
1664        V: serde::ser::Serialize + Sync + Send,
1665    {
1666        let val = bincode::serialize(val)?;
1667        let (tx, rx) = oneshot::channel();
1668        self.cmd_send(Command::DBInsert(
1669            self.clone(),
1670            key.as_ref().to_vec(),
1671            val,
1672            tx,
1673        ))
1674        .await?;
1675        rx.await??;
1676        Ok(())
1677    }
1678
1679    /// Gets a value by key
1680    #[inline]
1681    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
1682    where
1683        K: AsRef<[u8]> + Sync + Send,
1684        V: DeserializeOwned + Sync + Send,
1685    {
1686        let (tx, rx) = oneshot::channel();
1687        self.cmd_send(Command::DBGet(self.clone(), key.as_ref().into(), tx))
1688            .await?;
1689        match rx.await?? {
1690            Some(v) => Ok(Some(bincode::deserialize::<V>(v.as_ref())?)),
1691            None => Ok(None),
1692        }
1693    }
1694
1695    /// Removes a key-value pair
1696    #[inline]
1697    async fn remove<K>(&self, key: K) -> Result<()>
1698    where
1699        K: AsRef<[u8]> + Sync + Send,
1700    {
1701        let (tx, rx) = oneshot::channel();
1702        self.cmd_send(Command::DBRemove(self.clone(), key.as_ref().into(), tx))
1703            .await?;
1704        rx.await??;
1705        Ok(())
1706    }
1707
1708    /// Batch inserts key-value pairs
1709    #[inline]
1710    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
1711    where
1712        V: Serialize + Sync + Send,
1713    {
1714        if key_vals.is_empty() {
1715            return Ok(());
1716        }
1717
1718        let key_vals = key_vals
1719            .into_iter()
1720            .map(|(k, v)| {
1721                bincode::serialize(&v)
1722                    .map(|v| (k, v.into()))
1723                    .map_err(|e| anyhow!(e))
1724            })
1725            .collect::<Result<Vec<_>>>()?;
1726
1727        let (tx, rx) = oneshot::channel();
1728        self.cmd_send(Command::DBBatchInsert(self.clone(), key_vals, tx))
1729            .await?;
1730        Ok(rx.await??)
1731    }
1732
1733    /// Batch removes keys
1734    #[inline]
1735    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1736        if keys.is_empty() {
1737            return Ok(());
1738        }
1739
1740        let (tx, rx) = oneshot::channel();
1741        self.cmd_send(Command::DBBatchRemove(self.clone(), keys, tx))
1742            .await?;
1743        Ok(rx.await??)
1744    }
1745
1746    /// Increments a counter
1747    #[inline]
1748    async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
1749    where
1750        K: AsRef<[u8]> + Sync + Send,
1751    {
1752        let (tx, rx) = oneshot::channel();
1753        self.cmd_send(Command::DBCounterIncr(
1754            self.clone(),
1755            key.as_ref().into(),
1756            increment,
1757            tx,
1758        ))
1759        .await?;
1760        Ok(rx.await??)
1761    }
1762
1763    /// Decrements a counter
1764    #[inline]
1765    async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
1766    where
1767        K: AsRef<[u8]> + Sync + Send,
1768    {
1769        let (tx, rx) = oneshot::channel();
1770        self.cmd_send(Command::DBCounterDecr(
1771            self.clone(),
1772            key.as_ref().into(),
1773            decrement,
1774            tx,
1775        ))
1776        .await?;
1777        Ok(rx.await??)
1778    }
1779
1780    /// Gets counter value
1781    #[inline]
1782    async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
1783    where
1784        K: AsRef<[u8]> + Sync + Send,
1785    {
1786        let (tx, rx) = oneshot::channel();
1787        self.cmd_send(Command::DBCounterGet(self.clone(), key.as_ref().into(), tx))
1788            .await?;
1789        Ok(rx.await??)
1790    }
1791
1792    /// Sets counter value
1793    #[inline]
1794    async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
1795    where
1796        K: AsRef<[u8]> + Sync + Send,
1797    {
1798        let (tx, rx) = oneshot::channel();
1799        self.cmd_send(Command::DBCounterSet(
1800            self.clone(),
1801            key.as_ref().into(),
1802            val,
1803            tx,
1804        ))
1805        .await?;
1806        Ok(rx.await??)
1807    }
1808
1809    /// Checks if a key exists
1810    #[inline]
1811    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1812        let (tx, rx) = oneshot::channel();
1813        self.cmd_send(Command::DBContainsKey(
1814            self.clone(),
1815            key.as_ref().into(),
1816            tx,
1817        ))
1818        .await?;
1819        Ok(rx.await??)
1820    }
1821
1822    /// Gets number of key-value pairs (if enabled)
1823    #[inline]
1824    #[cfg(feature = "len")]
1825    async fn len(&self) -> Result<usize> {
1826        let (tx, rx) = oneshot::channel();
1827        self.cmd_send(Command::DBLen(self.clone(), tx)).await?;
1828        Ok(rx.await?)
1829    }
1830
1831    /// Gets total database size
1832    #[inline]
1833    async fn db_size(&self) -> Result<usize> {
1834        let (tx, rx) = oneshot::channel();
1835        self.cmd_send(Command::DBSize(self.clone(), tx)).await?;
1836        Ok(rx.await?)
1837    }
1838
1839    /// Sets expiration time for a key (TTL feature)
1840    #[inline]
1841    #[cfg(feature = "ttl")]
1842    async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
1843    where
1844        K: AsRef<[u8]> + Sync + Send,
1845    {
1846        let (tx, rx) = oneshot::channel();
1847        self.cmd_send(Command::DBExpireAt(
1848            self.clone(),
1849            key.as_ref().into(),
1850            at,
1851            tx,
1852        ))
1853        .await?;
1854        Ok(rx.await??)
1855    }
1856
1857    /// Sets time-to-live for a key (TTL feature)
1858    #[inline]
1859    #[cfg(feature = "ttl")]
1860    async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
1861    where
1862        K: AsRef<[u8]> + Sync + Send,
1863    {
1864        let at = timestamp_millis() + dur;
1865        self.expire_at(key, at).await
1866    }
1867
1868    /// Gets time-to-live for a key (TTL feature)
1869    #[inline]
1870    #[cfg(feature = "ttl")]
1871    async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
1872    where
1873        K: AsRef<[u8]> + Sync + Send,
1874    {
1875        let (tx, rx) = oneshot::channel();
1876        self.cmd_send(Command::DBTtl(self.clone(), key.as_ref().into(), tx))
1877            .await?;
1878        Ok(rx.await??)
1879    }
1880
1881    /// Iterates over all maps
1882    #[inline]
1883    async fn map_iter<'a>(
1884        &'a mut self,
1885    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
1886        let (tx, rx) = oneshot::channel();
1887        self.cmd_send(Command::DBMapPrefixIter(self.clone(), tx))
1888            .await?;
1889        let iter = rx.await?;
1890        let iter = Box::new(AsyncMapIter::new(self, iter));
1891        Ok(iter)
1892    }
1893
1894    /// Iterates over all lists
1895    #[inline]
1896    async fn list_iter<'a>(
1897        &'a mut self,
1898    ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
1899        let (tx, rx) = oneshot::channel();
1900        self.cmd_send(Command::DBListPrefixIter(self.clone(), tx))
1901            .await?;
1902        let iter = rx.await?;
1903        let iter = Box::new(AsyncListIter {
1904            db: self,
1905            iter: Some(iter),
1906        });
1907        Ok(iter)
1908    }
1909
1910    /// Scans keys matching pattern
1911    async fn scan<'a, P>(
1912        &'a mut self,
1913        pattern: P,
1914    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
1915    where
1916        P: AsRef<[u8]> + Send + Sync,
1917    {
1918        let pattern = pattern.as_ref();
1919        let (tx, rx) = oneshot::channel();
1920        self.cmd_send(Command::DBScanIter(self.clone(), pattern.to_vec(), tx))
1921            .await?;
1922        let iter = rx.await?;
1923        let pattern = Pattern::from(pattern);
1924        let iter = Box::new(AsyncDbKeyIter {
1925            db: self,
1926            pattern,
1927            iter: Some(iter),
1928        });
1929        Ok(iter)
1930    }
1931
1932    /// Gets database information
1933    #[inline]
1934    async fn info(&self) -> Result<Value> {
1935        let active_count = self.active_count.load(Ordering::Relaxed);
1936        // let this = self.clone();
1937        Ok(spawn_blocking(move || {
1938            // let size_on_disk = this.db.size_on_disk().unwrap_or_default();
1939            // let db_size = this.db_size();
1940            // let map_size = this.map_size();
1941            // let list_size = this.list_size();
1942
1943            // let limit = 20;
1944
1945            // let mut db_keys = Vec::new();
1946            // for (i, key) in this.db.iter().keys().enumerate() {
1947            //     let key = key
1948            //         .map(|k| String::from_utf8_lossy(k.as_ref()).to_string())
1949            //         .unwrap_or_else(|e| e.to_string());
1950            //     db_keys.push(key);
1951            //     if i > limit {
1952            //         break;
1953            //     }
1954            // }
1955
1956            // let mut map_names = Vec::new();
1957            // for (i, key) in this.map_tree.iter().keys().enumerate() {
1958            //     let key = key
1959            //         .map(|k| String::from_utf8_lossy(k.as_ref()).to_string())
1960            //         .unwrap_or_else(|e| e.to_string());
1961            //     map_names.push(key);
1962            //     if i > limit {
1963            //         break;
1964            //     }
1965            // }
1966
1967            // let mut list_names = Vec::new();
1968            // for (i, key) in this.list_tree.iter().keys().enumerate() {
1969            //     let key = key
1970            //         .map(|k| String::from_utf8_lossy(k.as_ref()).to_string())
1971            //         .unwrap_or_else(|e| e.to_string());
1972            //     list_names.push(key);
1973            //     if i > limit {
1974            //         break;
1975            //     }
1976            // }
1977
1978            serde_json::json!({
1979                "storage_engine": "Sled",
1980                "active_count": active_count,
1981                // "db_size": db_size,
1982                // "map_size": map_size,
1983                // "list_size": list_size,
1984                // "size_on_disk": size_on_disk,
1985                // "db_keys": db_keys,
1986                // "map_names": map_names,
1987                // "list_names": list_names,
1988            })
1989        })
1990        .await?)
1991    }
1992}
1993
1994/// Map structure for key-value storage within a namespace
1995#[derive(Clone)]
1996pub struct SledStorageMap {
1997    /// Map name
1998    name: Key,
1999    /// Prefix for map keys
2000    map_prefix_name: Key,
2001    /// Prefix for map items
2002    map_item_prefix_name: Key,
2003    /// Key for map count
2004    map_count_key_name: Key,
2005    /// Flag indicating if map is empty
2006    empty: Arc<AtomicBool>,
2007    /// Database handle
2008    pub(crate) db: SledStorageDB,
2009}
2010
2011impl SledStorageMap {
2012    /// Creates a new map with optional expiration
2013    #[inline]
2014    async fn new_expire(
2015        name: Key,
2016        expire_ms: Option<TimestampMillis>,
2017        db: SledStorageDB,
2018    ) -> Result<Self> {
2019        let (tx, rx) = oneshot::channel();
2020        db.cmd_send(Command::DBMapNew(db.clone(), name.into(), expire_ms, tx))
2021            .await?;
2022        rx.await?
2023    }
2024
2025    /// Internal method to create map with expiration
2026    #[inline]
2027    fn _new_expire(
2028        name: Key,
2029        _expire_ms: Option<TimestampMillis>,
2030        db: SledStorageDB,
2031    ) -> Result<Self> {
2032        let m = Self::_new(name, db);
2033        m.empty.store(m._is_empty()?, Ordering::SeqCst);
2034        #[cfg(feature = "ttl")]
2035        if let Some(expire_ms) = _expire_ms.as_ref() {
2036            m._expire_at(timestamp_millis() + *expire_ms)?;
2037        }
2038        Ok(m)
2039    }
2040
2041    /// Internal method to create map
2042    #[inline]
2043    fn _new(name: Key, db: SledStorageDB) -> Self {
2044        let map_prefix_name = SledStorageDB::make_map_prefix_name(name.as_slice());
2045        let map_item_prefix_name = SledStorageDB::make_map_item_prefix_name(name.as_slice());
2046        let map_count_key_name = SledStorageDB::make_map_count_key_name(name.as_slice());
2047        SledStorageMap {
2048            name,
2049            map_prefix_name,
2050            map_item_prefix_name,
2051            map_count_key_name,
2052            empty: Arc::new(AtomicBool::new(true)),
2053            db,
2054        }
2055    }
2056
2057    /// Gets the underlying tree
2058    #[inline]
2059    fn tree(&self) -> &sled::Tree {
2060        &self.db.map_tree
2061    }
2062
2063    /// Creates a full item key
2064    #[inline]
2065    fn make_map_item_key<K: AsRef<[u8]>>(&self, key: K) -> Key {
2066        [self.map_item_prefix_name.as_ref(), key.as_ref()].concat()
2067    }
2068
2069    /// Gets map length (if enabled)
2070    #[cfg(feature = "map_len")]
2071    #[inline]
2072    fn _len_get(&self) -> Result<isize> {
2073        self._counter_get(self.map_count_key_name.as_slice())
2074    }
2075
2076    /// Transactionally increments a counter
2077    #[inline]
2078    fn _tx_counter_inc<K: AsRef<[u8]>>(
2079        tx: &TransactionalTree,
2080        key: K,
2081    ) -> ConflictableTransactionResult<()> {
2082        let val = match tx.get(key.as_ref())? {
2083            Some(data) => {
2084                if let Ok(array) = data.as_ref().try_into() {
2085                    let number = isize::from_be_bytes(array);
2086                    number + 1
2087                } else {
2088                    1
2089                }
2090            }
2091            None => 1,
2092        };
2093        tx.insert(key.as_ref(), val.to_be_bytes().as_slice())?;
2094        Ok(())
2095    }
2096
2097    /// Transactionally decrements a counter
2098    #[inline]
2099    fn _tx_counter_dec<K: AsRef<[u8]>>(
2100        tx: &TransactionalTree,
2101        key: K,
2102    ) -> ConflictableTransactionResult<()> {
2103        let val = match tx.get(key.as_ref())? {
2104            Some(data) => {
2105                if let Ok(array) = data.as_ref().try_into() {
2106                    let number = isize::from_be_bytes(array);
2107                    number - 1
2108                } else {
2109                    -1
2110                }
2111            }
2112            None => -1,
2113        };
2114        if val > 0 {
2115            tx.insert(key.as_ref(), val.to_be_bytes().as_slice())?;
2116        } else {
2117            tx.remove(key.as_ref())?;
2118        }
2119        Ok(())
2120    }
2121
2122    /// Transactionally gets counter value
2123    #[inline]
2124    fn _tx_counter_get<K: AsRef<[u8]>, E>(
2125        tx: &TransactionalTree,
2126        key: K,
2127    ) -> ConflictableTransactionResult<isize, E> {
2128        if let Some(v) = tx.get(key)? {
2129            let c = match v.as_ref().try_into() {
2130                Ok(c) => c,
2131                Err(e) => {
2132                    return Err(ConflictableTransactionError::Storage(sled::Error::Io(
2133                        io::Error::new(ErrorKind::InvalidData, e),
2134                    )))
2135                }
2136            };
2137            Ok(isize::from_be_bytes(c))
2138        } else {
2139            Ok(0)
2140        }
2141    }
2142
2143    /// Transactionally sets counter value
2144    #[inline]
2145    fn _tx_counter_set<K: AsRef<[u8]>, E>(
2146        tx: &TransactionalTree,
2147        key: K,
2148        val: isize,
2149    ) -> ConflictableTransactionResult<(), E> {
2150        tx.insert(key.as_ref(), val.to_be_bytes().as_slice())?;
2151        Ok(())
2152    }
2153
2154    /// Transactionally removes counter
2155    #[inline]
2156    fn _tx_counter_remove<K: AsRef<[u8]>, E>(
2157        tx: &TransactionalTree,
2158        key: K,
2159    ) -> ConflictableTransactionResult<(), E> {
2160        tx.remove(key.as_ref())?;
2161        Ok(())
2162    }
2163
2164    /// Gets counter value
2165    #[inline]
2166    fn _counter_get<K: AsRef<[u8]>>(&self, key: K) -> Result<isize> {
2167        if let Some(v) = self.tree().get(key)? {
2168            Ok(isize::from_be_bytes(v.as_ref().try_into()?))
2169        } else {
2170            Ok(0)
2171        }
2172    }
2173
2174    /// Initializes counter if not present
2175    #[inline]
2176    fn _counter_init(&self) -> Result<()> {
2177        let tree = self.tree();
2178        if !tree.contains_key(self.map_count_key_name.as_slice())? {
2179            tree.insert(
2180                self.map_count_key_name.as_slice(),
2181                0isize.to_be_bytes().as_slice(),
2182            )?;
2183        }
2184        Ok(())
2185    }
2186
2187    /// Clears the map
2188    #[inline]
2189    fn _clear(&self) -> Result<()> {
2190        let batch = self._make_clear_batch();
2191        self.tree()
2192            .transaction(|tx| self._tx_clear(tx, &batch))
2193            .map_err(|e| anyhow!(format!("{:?}", e)))?;
2194        Ok(())
2195    }
2196
2197    /// Transactionally clears the map
2198    #[inline]
2199    fn _tx_clear(
2200        &self,
2201        map_tree_tx: &TransactionalTree,
2202        batch: &Batch,
2203    ) -> ConflictableTransactionResult<()> {
2204        map_tree_tx.apply_batch(batch)?;
2205        self.empty.store(true, Ordering::SeqCst);
2206        Ok(())
2207    }
2208
2209    /// Creates batch for clearing map
2210    #[inline]
2211    fn _make_clear_batch(&self) -> Batch {
2212        let mut batch = Batch::default();
2213        //clear key-value
2214        for item in self.tree().scan_prefix(self.map_prefix_name.as_slice()) {
2215            match item {
2216                Ok((key, _)) => {
2217                    batch.remove(key);
2218                }
2219                Err(e) => {
2220                    log::warn!("{:?}", e);
2221                }
2222            }
2223        }
2224        batch
2225    }
2226
2227    /// Inserts a key-value pair into the map
2228    #[inline]
2229    fn _insert(&self, key: IVec, val: IVec) -> Result<()> {
2230        let item_key = self.make_map_item_key(key.as_ref());
2231        let this = self;
2232        #[cfg(feature = "map_len")]
2233        {
2234            let count_key = this.map_count_key_name.as_slice();
2235            this.tree()
2236                .transaction(move |tx| {
2237                    if tx.insert(item_key.as_slice(), val.as_ref())?.is_none() {
2238                        Self::_tx_counter_inc(tx, count_key)?;
2239                    }
2240                    Ok(())
2241                })
2242                .map_err(|e| anyhow!(format!("{:?}", e)))?;
2243        }
2244        #[cfg(not(feature = "map_len"))]
2245        {
2246            if self.empty.load(Ordering::SeqCst) {
2247                self._counter_init()?;
2248                self.empty.store(false, Ordering::SeqCst)
2249            }
2250            this.tree().insert(item_key.as_slice(), val.as_ref())?;
2251        }
2252
2253        #[cfg(feature = "ttl")]
2254        {
2255            if this.db._is_expired(this.name.as_slice(), |k| {
2256                SledStorageDB::_map_contains_key(this.tree(), k)
2257            })? {
2258                // this.db._remove_expire_key(this.name.as_slice())?;
2259                (&self.db.key_expire_tree, &self.db.expire_key_tree)
2260                    .transaction(|(key_expire_tx, expire_key_tx)| {
2261                        SledStorageDB::_tx_remove_expire_key(
2262                            key_expire_tx,
2263                            expire_key_tx,
2264                            this.name.as_slice(),
2265                        )?;
2266                        Ok::<(), ConflictableTransactionError<()>>(())
2267                    })
2268                    .map_err(|e| anyhow!(format!("{:?}", e)))?;
2269            }
2270        }
2271
2272        Ok(())
2273    }
2274
2275    /// Gets a value from the map
2276    #[inline]
2277    fn _get(&self, key: IVec) -> Result<Option<IVec>> {
2278        let this = self;
2279        let item_key = self.make_map_item_key(key.as_ref());
2280        let res = if !this.db._is_expired(this.name.as_slice(), |k| {
2281            SledStorageDB::_map_contains_key(this.tree(), k)
2282        })? {
2283            this.tree().get(item_key).map_err(|e| anyhow!(e))?
2284        } else {
2285            None
2286        };
2287        Ok(res)
2288    }
2289
2290    /// Removes a key from the map
2291    #[inline]
2292    fn _remove(&self, key: IVec) -> Result<()> {
2293        let tree = self.tree();
2294        let key = self.make_map_item_key(key.as_ref());
2295
2296        #[cfg(feature = "map_len")]
2297        {
2298            let count_key = self.map_count_key_name.to_vec();
2299            tree.transaction(move |tx| {
2300                if tx.remove(key.as_slice())?.is_some() {
2301                    Self::_tx_counter_dec(tx, count_key.as_slice())?;
2302                }
2303                Ok(())
2304            })
2305            .map_err(|e| anyhow!(format!("{:?}", e)))?;
2306        }
2307
2308        #[cfg(not(feature = "map_len"))]
2309        {
2310            tree.remove(key.as_slice())?;
2311        }
2312
2313        Ok(())
2314    }
2315
2316    /// Checks if key exists in map
2317    #[inline]
2318    fn _contains_key(&self, key: IVec) -> Result<bool> {
2319        let key = self.make_map_item_key(key.as_ref());
2320        Ok(self.tree().contains_key(key)?)
2321    }
2322
2323    /// Gets map length (if enabled)
2324    #[cfg(feature = "map_len")]
2325    #[inline]
2326    fn _len(&self) -> Result<usize> {
2327        let this = self;
2328        let len = {
2329            if this.db._is_expired(this.name.as_slice(), |k| {
2330                SledStorageDB::_map_contains_key(this.tree(), k)
2331            })? {
2332                Ok(0)
2333            } else {
2334                this._len_get()
2335            }
2336        }?;
2337        Ok(len as usize)
2338    }
2339
2340    /// Checks if map is empty
2341    #[inline]
2342    fn _is_empty(&self) -> Result<bool> {
2343        let this = self;
2344        let res = {
2345            if this.db._is_expired(this.name.as_slice(), |k| {
2346                SledStorageDB::_map_contains_key(this.tree(), k)
2347            })? {
2348                true
2349            } else {
2350                self.tree()
2351                    .scan_prefix(self.map_item_prefix_name.as_slice())
2352                    .next()
2353                    .is_none()
2354            }
2355        };
2356        Ok(res)
2357    }
2358
2359    /// Removes and returns a value
2360    #[inline]
2361    fn _remove_and_fetch(&self, key: IVec) -> Result<Option<IVec>> {
2362        let key = self.make_map_item_key(key.as_ref());
2363        let this = self;
2364        let removed = {
2365            if this.db._is_expired(this.name.as_slice(), |k| {
2366                SledStorageDB::_map_contains_key(this.tree(), k)
2367            })? {
2368                Ok(None)
2369            } else {
2370                #[cfg(feature = "map_len")]
2371                {
2372                    let count_key = this.map_count_key_name.to_vec();
2373                    this.tree().transaction(move |tx| {
2374                        if let Some(removed) = tx.remove(key.as_slice())? {
2375                            Self::_tx_counter_dec(tx, count_key.as_slice())?;
2376                            Ok(Some(removed))
2377                        } else {
2378                            Ok(None)
2379                        }
2380                    })
2381                }
2382                #[cfg(not(feature = "map_len"))]
2383                {
2384                    let removed = this.tree().remove(key.as_slice())?;
2385                    Ok::<_, TransactionError<()>>(removed)
2386                }
2387            }
2388        }
2389        .map_err(|e| anyhow!(format!("{:?}", e)))?;
2390
2391        Ok(removed)
2392    }
2393
2394    /// Removes keys with prefix
2395    #[inline]
2396    fn _remove_with_prefix(&self, prefix: IVec) -> Result<()> {
2397        let tree = self.tree();
2398        let prefix = [self.map_item_prefix_name.as_slice(), prefix.as_ref()]
2399            .concat()
2400            .to_vec();
2401
2402        #[cfg(feature = "map_len")]
2403        let map_count_key_name = self.map_count_key_name.to_vec();
2404        {
2405            let mut removeds = Batch::default();
2406            #[cfg(feature = "map_len")]
2407            let mut c = 0;
2408            for item in tree.scan_prefix(prefix) {
2409                match item {
2410                    Ok((k, _v)) => {
2411                        removeds.remove(k.as_ref());
2412                        #[cfg(feature = "map_len")]
2413                        {
2414                            c += 1;
2415                        }
2416                    }
2417                    Err(e) => {
2418                        log::warn!("{:?}", e);
2419                    }
2420                }
2421            }
2422
2423            #[cfg(feature = "map_len")]
2424            {
2425                tree.transaction(move |tx| {
2426                    let len = Self::_tx_counter_get(tx, map_count_key_name.as_slice())? - c;
2427                    if len > 0 {
2428                        Self::_tx_counter_set(tx, map_count_key_name.as_slice(), len)?;
2429                    } else {
2430                        Self::_tx_counter_remove(tx, map_count_key_name.as_slice())?;
2431                    };
2432                    tx.apply_batch(&removeds)?;
2433                    Ok::<(), ConflictableTransactionError<sled::Error>>(())
2434                })
2435            }
2436            #[cfg(not(feature = "map_len"))]
2437            {
2438                tree.apply_batch(removeds)?;
2439                Ok::<(), ConflictableTransactionError<sled::Error>>(())
2440            }
2441        }?;
2442        Ok(())
2443    }
2444
2445    /// Batch inserts key-value pairs
2446    #[inline]
2447    fn _batch_insert(&self, key_vals: Vec<(IVec, IVec)>) -> Result<()> {
2448        for (k, v) in key_vals {
2449            self._insert(k, v)?;
2450        }
2451        Ok(())
2452    }
2453
2454    /// Batch removes keys
2455    #[inline]
2456    fn _batch_remove(&self, keys: Vec<IVec>) -> Result<()> {
2457        for k in keys {
2458            self._remove(k)?;
2459        }
2460        Ok(())
2461    }
2462
2463    /// Sets expiration time (TTL feature)
2464    #[cfg(feature = "ttl")]
2465    #[inline]
2466    fn _expire_at(&self, at: TimestampMillis) -> Result<bool> {
2467        self.db._expire_at(self.name.as_slice(), at, KeyType::Map)
2468    }
2469
2470    /// Gets time-to-live (TTL feature)
2471    #[cfg(feature = "ttl")]
2472    #[inline]
2473    fn _ttl(&self) -> Result<Option<TimestampMillis>> {
2474        let res = self
2475            .db
2476            ._ttl(self.name(), |k| {
2477                SledStorageDB::_map_contains_key(self.tree(), k)
2478            })?
2479            .and_then(|(at, _)| if at > 0 { Some(at) } else { None });
2480        Ok(res)
2481    }
2482
2483    /// Checks if map is expired
2484    #[inline]
2485    fn _is_expired(&self) -> Result<bool> {
2486        self.db._is_expired(self.name.as_slice(), |k| {
2487            SledStorageDB::_map_contains_key(self.tree(), k)
2488        })
2489    }
2490
2491    /// Checks if map is expired (async)
2492    #[inline]
2493    async fn call_is_expired(&self) -> Result<bool> {
2494        let (tx, rx) = oneshot::channel();
2495        self.db
2496            .cmd_send(Command::MapIsExpired(self.clone(), tx))
2497            .await?;
2498        rx.await?
2499    }
2500
2501    /// Creates prefix iterator
2502    #[inline]
2503    fn _prefix_iter(&self, prefix: Option<IVec>) -> sled::Iter {
2504        if let Some(prefix) = prefix {
2505            self.tree()
2506                .scan_prefix([self.map_item_prefix_name.as_slice(), prefix.as_ref()].concat())
2507        } else {
2508            self.tree()
2509                .scan_prefix(self.map_item_prefix_name.as_slice())
2510        }
2511    }
2512
2513    /// Creates prefix iterator (async)
2514    #[inline]
2515    async fn call_prefix_iter(&self, prefix: Option<IVec>) -> Result<sled::Iter> {
2516        let (tx, rx) = oneshot::channel();
2517        self.db
2518            .cmd_send(Command::MapPrefixIter(self.clone(), prefix, tx))
2519            .await?;
2520        Ok(rx.await?)
2521    }
2522}
2523
2524#[async_trait]
2525impl Map for SledStorageMap {
2526    /// Gets map name
2527    #[inline]
2528    fn name(&self) -> &[u8] {
2529        self.name.as_slice()
2530    }
2531
2532    /// Inserts a key-value pair
2533    #[inline]
2534    async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
2535    where
2536        K: AsRef<[u8]> + Sync + Send,
2537        V: Serialize + Sync + Send + ?Sized,
2538    {
2539        let val = bincode::serialize(val)?;
2540        let (tx, rx) = oneshot::channel();
2541        self.db
2542            .cmd_send(Command::MapInsert(
2543                self.clone(),
2544                key.as_ref().into(),
2545                val.into(),
2546                tx,
2547            ))
2548            .await?;
2549        rx.await??;
2550        Ok(())
2551    }
2552
2553    /// Gets a value by key
2554    #[inline]
2555    async fn get<K, V>(&self, key: K) -> Result<Option<V>>
2556    where
2557        K: AsRef<[u8]> + Sync + Send,
2558        V: DeserializeOwned + Sync + Send,
2559    {
2560        let (tx, rx) = oneshot::channel();
2561        self.db
2562            .cmd_send(Command::MapGet(self.clone(), key.as_ref().into(), tx))
2563            .await?;
2564
2565        match rx.await?? {
2566            Some(v) => Ok(Some(bincode::deserialize::<V>(v.as_ref())?)),
2567            None => Ok(None),
2568        }
2569    }
2570
2571    /// Removes a key
2572    #[inline]
2573    async fn remove<K>(&self, key: K) -> Result<()>
2574    where
2575        K: AsRef<[u8]> + Sync + Send,
2576    {
2577        let (tx, rx) = oneshot::channel();
2578        self.db
2579            .cmd_send(Command::MapRemove(self.clone(), key.as_ref().into(), tx))
2580            .await?;
2581        rx.await??;
2582        Ok(())
2583    }
2584
2585    /// Checks if key exists
2586    #[inline]
2587    async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
2588        let (tx, rx) = oneshot::channel();
2589        self.db
2590            .cmd_send(Command::MapContainsKey(
2591                self.clone(),
2592                key.as_ref().into(),
2593                tx,
2594            ))
2595            .await?;
2596        Ok(rx.await??)
2597    }
2598
2599    /// Gets map length (if enabled)
2600    #[cfg(feature = "map_len")]
2601    #[inline]
2602    async fn len(&self) -> Result<usize> {
2603        let (tx, rx) = oneshot::channel();
2604        self.db.cmd_send(Command::MapLen(self.clone(), tx)).await?;
2605        Ok(rx.await??)
2606    }
2607
2608    /// Checks if map is empty
2609    #[inline]
2610    async fn is_empty(&self) -> Result<bool> {
2611        let (tx, rx) = oneshot::channel();
2612        self.db
2613            .cmd_send(Command::MapIsEmpty(self.clone(), tx))
2614            .await?;
2615        Ok(rx.await??)
2616    }
2617
2618    /// Clears the map
2619    #[inline]
2620    async fn clear(&self) -> Result<()> {
2621        let (tx, rx) = oneshot::channel();
2622        self.db
2623            .cmd_send(Command::MapClear(self.clone(), tx))
2624            .await?;
2625        rx.await??;
2626        Ok(())
2627    }
2628
2629    /// Removes and returns a value
2630    #[inline]
2631    async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
2632    where
2633        K: AsRef<[u8]> + Sync + Send,
2634        V: DeserializeOwned + Sync + Send,
2635    {
2636        let (tx, rx) = oneshot::channel();
2637        self.db
2638            .cmd_send(Command::MapRemoveAndFetch(
2639                self.clone(),
2640                key.as_ref().into(),
2641                tx,
2642            ))
2643            .await?;
2644
2645        match rx.await?? {
2646            Some(v) => Ok(Some(bincode::deserialize::<V>(v.as_ref())?)),
2647            None => Ok(None),
2648        }
2649    }
2650
2651    /// Removes keys with prefix
2652    #[inline]
2653    async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
2654    where
2655        K: AsRef<[u8]> + Sync + Send,
2656    {
2657        let (tx, rx) = oneshot::channel();
2658        self.db
2659            .cmd_send(Command::MapRemoveWithPrefix(
2660                self.clone(),
2661                prefix.as_ref().into(),
2662                tx,
2663            ))
2664            .await?;
2665        rx.await??;
2666        Ok(())
2667    }
2668
2669    /// Batch inserts key-value pairs
2670    #[inline]
2671    async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
2672    where
2673        V: serde::ser::Serialize + Sync + Send,
2674    {
2675        let key_vals = key_vals
2676            .into_iter()
2677            .map(|(k, v)| {
2678                bincode::serialize(&v)
2679                    .map(|v| (k.into(), v.into()))
2680                    .map_err(|e| anyhow!(e))
2681            })
2682            .collect::<Result<Vec<(IVec, IVec)>>>()?;
2683
2684        let (tx, rx) = oneshot::channel();
2685        self.db
2686            .cmd_send(Command::MapBatchInsert(self.clone(), key_vals, tx))
2687            .await?;
2688        rx.await??;
2689        Ok(())
2690    }
2691
2692    /// Batch removes keys
2693    #[inline]
2694    async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
2695        let keys = keys.into_iter().map(|k| k.into()).collect::<Vec<IVec>>();
2696
2697        let (tx, rx) = oneshot::channel();
2698        self.db
2699            .cmd_send(Command::MapBatchRemove(self.clone(), keys, tx))
2700            .await?;
2701        rx.await??;
2702        Ok(())
2703    }
2704
2705    /// Iterates over map items
2706    #[inline]
2707    async fn iter<'a, V>(
2708        &'a mut self,
2709    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
2710    where
2711        V: DeserializeOwned + Sync + Send + 'a + 'static,
2712    {
2713        let this = self;
2714        let res = {
2715            if this.call_is_expired().await? {
2716                let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> =
2717                    Box::new(AsyncEmptyIter {
2718                        _m: std::marker::PhantomData,
2719                    });
2720                Ok::<_, anyhow::Error>(iter)
2721            } else {
2722                let tem_prefix_name = this.map_item_prefix_name.len();
2723                let iter = this.call_prefix_iter(None).await?;
2724                let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> = Box::new(AsyncIter {
2725                    db: &this.db,
2726                    prefix_len: tem_prefix_name,
2727                    iter: Some(iter),
2728                    _m: std::marker::PhantomData,
2729                });
2730                Ok::<_, anyhow::Error>(iter)
2731            }
2732        }?;
2733        Ok(res)
2734    }
2735
2736    /// Iterates over map keys
2737    #[inline]
2738    async fn key_iter<'a>(
2739        &'a mut self,
2740    ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
2741        let this = self;
2742        let res = {
2743            if this.call_is_expired().await? {
2744                let iter: Box<dyn AsyncIterator<Item = Result<Key>> + Send> =
2745                    Box::new(AsyncEmptyIter {
2746                        _m: std::marker::PhantomData,
2747                    });
2748                Ok::<_, anyhow::Error>(iter)
2749            } else {
2750                let iter = this.call_prefix_iter(None).await?;
2751                let iter: Box<dyn AsyncIterator<Item = Result<Key>> + Send> =
2752                    Box::new(AsyncKeyIter {
2753                        db: &this.db,
2754                        prefix_len: this.map_item_prefix_name.len(),
2755                        iter: Some(iter),
2756                    });
2757                Ok::<_, anyhow::Error>(iter)
2758            }
2759        }?;
2760        Ok(res)
2761    }
2762
2763    /// Iterates over items with prefix
2764    #[inline]
2765    async fn prefix_iter<'a, P, V>(
2766        &'a mut self,
2767        prefix: P,
2768    ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
2769    where
2770        P: AsRef<[u8]> + Send + Sync,
2771        V: DeserializeOwned + Sync + Send + 'a + 'static,
2772    {
2773        let this = self;
2774        let res = {
2775            if this.call_is_expired().await? {
2776                let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> =
2777                    Box::new(AsyncEmptyIter {
2778                        _m: std::marker::PhantomData,
2779                    });
2780                Ok::<_, anyhow::Error>(iter)
2781            } else {
2782                let iter = this
2783                    .call_prefix_iter(Some(IVec::from(prefix.as_ref())))
2784                    .await?;
2785                let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> = Box::new(AsyncIter {
2786                    db: &this.db,
2787                    prefix_len: this.map_item_prefix_name.len(),
2788                    iter: Some(iter),
2789                    _m: std::marker::PhantomData,
2790                });
2791                Ok::<_, anyhow::Error>(iter)
2792            }
2793        }?;
2794        Ok(res)
2795    }
2796
2797    /// Sets expiration time (TTL feature)
2798    #[cfg(feature = "ttl")]
2799    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
2800        let (tx, rx) = oneshot::channel();
2801        self.db
2802            .cmd_send(Command::MapExpireAt(self.clone(), at, tx))
2803            .await?;
2804        Ok(rx.await??)
2805    }
2806
2807    /// Sets time-to-live (TTL feature)
2808    #[cfg(feature = "ttl")]
2809    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
2810        let at = timestamp_millis() + dur;
2811        self.expire_at(at).await
2812    }
2813
2814    /// Gets time-to-live (TTL feature)
2815    #[cfg(feature = "ttl")]
2816    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
2817        let (tx, rx) = oneshot::channel();
2818        self.db.cmd_send(Command::MapTTL(self.clone(), tx)).await?;
2819        Ok(rx.await??)
2820    }
2821}
2822
2823/// List structure for queue-like storage within a namespace
2824#[derive(Clone)]
2825pub struct SledStorageList {
2826    /// List name
2827    name: Key,
2828    /// Prefix for list keys
2829    prefix_name: Key,
2830    /// Database handle
2831    pub(crate) db: SledStorageDB,
2832}
2833
2834impl SledStorageList {
2835    /// Creates a new list with optional expiration
2836    #[inline]
2837    async fn new_expire(
2838        name: Key,
2839        expire_ms: Option<TimestampMillis>,
2840        db: SledStorageDB,
2841    ) -> Result<Self> {
2842        let (tx, rx) = oneshot::channel();
2843        db.cmd_send(Command::DBListNew(db.clone(), name.into(), expire_ms, tx))
2844            .await?;
2845        rx.await?
2846    }
2847
2848    /// Internal method to create list with expiration
2849    #[inline]
2850    fn _new_expire(
2851        name: Key,
2852        _expire_ms: Option<TimestampMillis>,
2853        db: SledStorageDB,
2854    ) -> Result<Self> {
2855        let l = Self::_new(name, db);
2856        #[cfg(feature = "ttl")]
2857        if let Some(expire_ms) = _expire_ms {
2858            l._expire_at(timestamp_millis() + expire_ms)?;
2859        }
2860        Ok(l)
2861    }
2862
2863    /// Internal method to create list
2864    #[inline]
2865    fn _new(name: Key, db: SledStorageDB) -> Self {
2866        let prefix_name = SledStorageDB::make_list_prefix(name.as_slice());
2867        SledStorageList {
2868            name,
2869            prefix_name,
2870            db,
2871        }
2872    }
2873
2874    /// Gets list name
2875    #[inline]
2876    pub(crate) fn name(&self) -> &[u8] {
2877        self.name.as_slice()
2878    }
2879
2880    /// Gets the underlying tree
2881    #[inline]
2882    pub(crate) fn tree(&self) -> &sled::Tree {
2883        &self.db.list_tree
2884    }
2885
2886    /// Creates list count key
2887    #[inline]
2888    fn make_list_count_key(&self) -> Vec<u8> {
2889        let list_count_key = [self.prefix_name.as_ref(), LIST_KEY_COUNT_SUFFIX].concat();
2890        list_count_key
2891    }
2892
2893    /// Creates list content prefix
2894    #[inline]
2895    fn make_list_content_prefix(prefix_name: &[u8], idx: Option<&[u8]>) -> Vec<u8> {
2896        if let Some(idx) = idx {
2897            [prefix_name, LIST_KEY_CONTENT_SUFFIX, idx].concat()
2898        } else {
2899            [prefix_name, LIST_KEY_CONTENT_SUFFIX].concat()
2900        }
2901    }
2902
2903    /// Creates list content key
2904    #[inline]
2905    fn make_list_content_key(&self, idx: usize) -> Vec<u8> {
2906        Self::make_list_content_prefix(
2907            self.prefix_name.as_ref(),
2908            Some(idx.to_be_bytes().as_slice()),
2909        )
2910    }
2911
2912    /// Creates batch of list content keys
2913    #[inline]
2914    fn make_list_content_keys(&self, start: usize, end: usize) -> Vec<Vec<u8>> {
2915        (start..end)
2916            .map(|idx| self.make_list_content_key(idx))
2917            .collect()
2918    }
2919
2920    /// Transactionally gets list count
2921    #[inline]
2922    fn tx_list_count_get<K, E>(
2923        tx: &TransactionalTree,
2924        list_count_key: K,
2925    ) -> ConflictableTransactionResult<(usize, usize), E>
2926    where
2927        K: AsRef<[u8]>,
2928    {
2929        if let Some(v) = tx.get(list_count_key.as_ref())? {
2930            let (start, end) = bincode::deserialize::<(usize, usize)>(v.as_ref()).map_err(|e| {
2931                ConflictableTransactionError::Storage(sled::Error::Io(io::Error::new(
2932                    ErrorKind::InvalidData,
2933                    e,
2934                )))
2935            })?;
2936            Ok((start, end))
2937        } else {
2938            Ok((0, 0))
2939        }
2940    }
2941
2942    /// Transactionally sets list count
2943    #[inline]
2944    fn tx_list_count_set<K, E>(
2945        tx: &TransactionalTree,
2946        key_count: K,
2947        start: usize,
2948        end: usize,
2949    ) -> ConflictableTransactionResult<(), E>
2950    where
2951        K: AsRef<[u8]>,
2952    {
2953        let count_bytes = bincode::serialize(&(start, end)).map_err(|e| {
2954            ConflictableTransactionError::Storage(sled::Error::Io(io::Error::new(
2955                ErrorKind::InvalidData,
2956                e,
2957            )))
2958        })?;
2959        tx.insert(key_count.as_ref(), count_bytes.as_slice())?;
2960        Ok(())
2961    }
2962
2963    /// Transactionally sets list content
2964    #[inline]
2965    fn tx_list_content_set<K, V, E>(
2966        tx: &TransactionalTree,
2967        key_content: K,
2968        data: V,
2969    ) -> ConflictableTransactionResult<(), E>
2970    where
2971        K: AsRef<[u8]>,
2972        V: AsRef<[u8]>,
2973    {
2974        tx.insert(key_content.as_ref(), data.as_ref())?;
2975        Ok(())
2976    }
2977
2978    /// Transactionally sets batch list content
2979    #[inline]
2980    fn tx_list_content_batch_set<K, V, E>(
2981        tx: &TransactionalTree,
2982        key_contents: Vec<(K, V)>,
2983    ) -> ConflictableTransactionResult<(), E>
2984    where
2985        K: AsRef<[u8]>,
2986        V: AsRef<[u8]>,
2987    {
2988        let mut batch = Batch::default();
2989        for (k, v) in key_contents {
2990            batch.insert(k.as_ref(), v.as_ref());
2991        }
2992        tx.apply_batch(&batch)?;
2993        Ok(())
2994    }
2995
2996    /// Clears the list
2997    #[inline]
2998    fn _clear(&self) -> Result<()> {
2999        let mut batch = Batch::default();
3000        let list_count_key = self.make_list_count_key();
3001        batch.remove(list_count_key);
3002        let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None);
3003        for item in self.tree().scan_prefix(list_content_prefix).keys() {
3004            match item {
3005                Ok(k) => {
3006                    batch.remove(k);
3007                }
3008                Err(e) => {
3009                    log::warn!("{:?}", e);
3010                }
3011            }
3012        }
3013        self.tree()
3014            .transaction(|tx| {
3015                tx.apply_batch(&batch)?;
3016                Ok::<_, ConflictableTransactionError<()>>(())
3017            })
3018            .map_err(|e| anyhow!(format!("{:?}", e)))?;
3019        Ok(())
3020    }
3021
3022    /// Transactionally clears the list
3023    #[inline]
3024    fn _tx_clear(
3025        list_tree_tx: &TransactionalTree,
3026        batch: &Batch,
3027    ) -> ConflictableTransactionResult<()> {
3028        list_tree_tx.apply_batch(batch)?;
3029        Ok(())
3030    }
3031
3032    /// Creates batch for clearing list
3033    #[inline]
3034    fn _make_clear_batch(&self) -> Batch {
3035        let mut batch = Batch::default();
3036        let list_count_key = self.make_list_count_key();
3037        batch.remove(list_count_key);
3038        let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None);
3039        for item in self.tree().scan_prefix(list_content_prefix).keys() {
3040            match item {
3041                Ok(k) => {
3042                    batch.remove(k);
3043                }
3044                Err(e) => {
3045                    log::warn!("{:?}", e);
3046                }
3047            }
3048        }
3049        batch
3050    }
3051
3052    /// Pushes value to list
3053    #[inline]
3054    fn _push(&self, data: IVec) -> Result<()> {
3055        let this = self;
3056        this.tree().transaction(move |tx| {
3057            let list_count_key = this.make_list_count_key();
3058            let (start, mut end) = Self::tx_list_count_get::<
3059                _,
3060                ConflictableTransactionError<sled::Error>,
3061            >(tx, list_count_key.as_slice())?;
3062            end += 1;
3063            Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3064
3065            let list_content_key = this.make_list_content_key(end);
3066            Self::tx_list_content_set(tx, list_content_key.as_slice(), data.as_ref())?;
3067            Ok(())
3068        })?;
3069
3070        #[cfg(feature = "ttl")]
3071        {
3072            if this.db._is_expired(this.name.as_slice(), |k| {
3073                SledStorageDB::_list_contains_key(this.tree(), k)
3074            })? {
3075                // this.db._remove_expire_key(this.name.as_slice())?;
3076                (&self.db.key_expire_tree, &self.db.expire_key_tree)
3077                    .transaction(|(key_expire_tx, expire_key_tx)| {
3078                        SledStorageDB::_tx_remove_expire_key(
3079                            key_expire_tx,
3080                            expire_key_tx,
3081                            this.name.as_slice(),
3082                        )?;
3083                        Ok::<(), ConflictableTransactionError<()>>(())
3084                    })
3085                    .map_err(|e| anyhow!(format!("{:?}", e)))?;
3086            }
3087        }
3088
3089        Ok(())
3090    }
3091
3092    /// Pushes multiple values to list
3093    #[inline]
3094    fn _pushs(&self, vals: Vec<IVec>) -> Result<()> {
3095        if vals.is_empty() {
3096            return Ok(());
3097        }
3098        let tree = self.tree();
3099        let this = self;
3100
3101        tree.transaction(move |tx| {
3102            let list_count_key = this.make_list_count_key();
3103            let (start, mut end) = Self::tx_list_count_get::<
3104                _,
3105                ConflictableTransactionError<sled::Error>,
3106            >(tx, list_count_key.as_slice())?;
3107
3108            let mut list_content_keys = this.make_list_content_keys(end + 1, end + vals.len() + 1);
3109            //assert_eq!(vals.len(), list_content_keys.len());
3110            end += vals.len();
3111            Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3112
3113            let list_contents = vals
3114                .iter()
3115                .map(|val| (list_content_keys.remove(0), val))
3116                .collect::<Vec<_>>();
3117            Self::tx_list_content_batch_set(tx, list_contents)?;
3118            Ok(())
3119        })?;
3120
3121        #[cfg(feature = "ttl")]
3122        {
3123            if this.db._is_expired(this.name.as_slice(), |k| {
3124                SledStorageDB::_list_contains_key(this.tree(), k)
3125            })? {
3126                // this.db._remove_expire_key(this.name.as_slice())?;
3127                (&self.db.key_expire_tree, &self.db.expire_key_tree)
3128                    .transaction(|(key_expire_tx, expire_key_tx)| {
3129                        SledStorageDB::_tx_remove_expire_key(
3130                            key_expire_tx,
3131                            expire_key_tx,
3132                            this.name.as_slice(),
3133                        )?;
3134                        Ok::<(), ConflictableTransactionError<()>>(())
3135                    })
3136                    .map_err(|e| anyhow!(format!("{:?}", e)))?;
3137            }
3138        }
3139        Ok(())
3140    }
3141
3142    /// Pushes value with limit
3143    #[inline]
3144    fn _push_limit(
3145        &self,
3146        data: IVec,
3147        limit: usize,
3148        pop_front_if_limited: bool,
3149    ) -> Result<Option<IVec>> {
3150        let tree = self.tree();
3151        let this = self;
3152        let removed = {
3153            let res = tree.transaction(move |tx| {
3154                let list_count_key = this.make_list_count_key();
3155                let (mut start, mut end) = Self::tx_list_count_get::<
3156                    _,
3157                    ConflictableTransactionError<sled::Error>,
3158                >(tx, list_count_key.as_slice())?;
3159                let count = end - start;
3160
3161                if count < limit {
3162                    end += 1;
3163                    Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3164                    let list_content_key = this.make_list_content_key(end);
3165                    Self::tx_list_content_set(tx, list_content_key.as_slice(), data.as_ref())?;
3166                    Ok(None)
3167                } else if pop_front_if_limited {
3168                    let mut removed = None;
3169                    let removed_content_key = this.make_list_content_key(start + 1);
3170                    if let Some(v) = tx.remove(removed_content_key)? {
3171                        removed = Some(v);
3172                        start += 1;
3173                    }
3174                    end += 1;
3175                    Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3176                    let list_content_key = this.make_list_content_key(end);
3177                    Self::tx_list_content_set(tx, list_content_key.as_slice(), data.as_ref())?;
3178                    Ok(removed)
3179                } else {
3180                    Err(ConflictableTransactionError::Storage(sled::Error::Io(
3181                        io::Error::new(ErrorKind::InvalidData, "Is full"),
3182                    )))
3183                }
3184            });
3185
3186            #[cfg(feature = "ttl")]
3187            {
3188                if this.db._is_expired(this.name.as_slice(), |k| {
3189                    SledStorageDB::_list_contains_key(this.tree(), k)
3190                })? {
3191                    // this.db._remove_expire_key(this.name.as_slice())?;
3192                    (&self.db.key_expire_tree, &self.db.expire_key_tree)
3193                        .transaction(|(key_expire_tx, expire_key_tx)| {
3194                            SledStorageDB::_tx_remove_expire_key(
3195                                key_expire_tx,
3196                                expire_key_tx,
3197                                this.name.as_slice(),
3198                            )?;
3199                            Ok::<(), ConflictableTransactionError<()>>(())
3200                        })
3201                        .map_err(|e| anyhow!(format!("{:?}", e)))?;
3202                }
3203            }
3204
3205            Ok::<_, TransactionError<()>>(res)
3206        }
3207        .map_err(|e| anyhow!(format!("{:?}", e)))??;
3208
3209        Ok(removed)
3210    }
3211
3212    /// Pops value from list
3213    #[inline]
3214    fn _pop(&self) -> Result<Option<IVec>> {
3215        let this = self;
3216        let removed = {
3217            if this.db._is_expired(this.name.as_slice(), |k| {
3218                SledStorageDB::_list_contains_key(this.tree(), k)
3219            })? {
3220                Ok(None)
3221            } else {
3222                let removed = this.tree().transaction(move |tx| {
3223                    let list_count_key = this.make_list_count_key();
3224                    let (start, end) = Self::tx_list_count_get(tx, list_count_key.as_slice())?;
3225
3226                    let mut removed = None;
3227                    if (end - start) > 0 {
3228                        let removed_content_key = this.make_list_content_key(start + 1);
3229                        if let Some(v) = tx.remove(removed_content_key)? {
3230                            removed = Some(v);
3231                            Self::tx_list_count_set(tx, list_count_key.as_slice(), start + 1, end)?;
3232                        }
3233                    }
3234                    Ok::<_, ConflictableTransactionError<sled::Error>>(removed)
3235                });
3236                removed
3237            }
3238        }?;
3239
3240        Ok(removed)
3241    }
3242
3243    /// Gets all values in list
3244    #[inline]
3245    fn _all(&self) -> Result<Vec<IVec>> {
3246        let this = self;
3247        let res = {
3248            if this.db._is_expired(this.name.as_slice(), |k| {
3249                SledStorageDB::_list_contains_key(this.tree(), k)
3250            })? {
3251                Ok(vec![])
3252            } else {
3253                let key_content_prefix =
3254                    Self::make_list_content_prefix(this.prefix_name.as_slice(), None);
3255                this.tree()
3256                    .scan_prefix(key_content_prefix)
3257                    .values()
3258                    .map(|item| item.map_err(anyhow::Error::new))
3259                    .collect::<Result<Vec<_>>>()
3260            }
3261        }?;
3262        Ok(res)
3263    }
3264
3265    /// Gets value by index
3266    #[inline]
3267    fn _get_index(&self, idx: usize) -> Result<Option<IVec>> {
3268        let this = self;
3269        let res = {
3270            if this.db._is_expired(this.name.as_slice(), |k| {
3271                SledStorageDB::_list_contains_key(this.tree(), k)
3272            })? {
3273                Ok(None)
3274            } else {
3275                this.tree().transaction(move |tx| {
3276                    let list_count_key = this.make_list_count_key();
3277                    let (start, end) = Self::tx_list_count_get::<
3278                        _,
3279                        ConflictableTransactionError<sled::Error>,
3280                    >(tx, list_count_key.as_slice())?;
3281                    if idx < (end - start) {
3282                        let list_content_key = this.make_list_content_key(start + idx + 1);
3283                        if let Some(v) = tx.get(list_content_key)? {
3284                            Ok(Some(v))
3285                        } else {
3286                            Ok(None)
3287                        }
3288                    } else {
3289                        Ok(None)
3290                    }
3291                })
3292            }
3293        }?;
3294        Ok(res)
3295    }
3296
3297    /// Gets list length
3298    #[inline]
3299    fn _len(&self) -> Result<usize> {
3300        let this = self;
3301        let res = {
3302            if this.db._is_expired(this.name.as_slice(), |k| {
3303                SledStorageDB::_list_contains_key(this.tree(), k)
3304            })? {
3305                Ok::<usize, anyhow::Error>(0)
3306            } else {
3307                let list_count_key = this.make_list_count_key();
3308                if let Some(v) = this.tree().get(list_count_key.as_slice())? {
3309                    let (start, end) = bincode::deserialize::<(usize, usize)>(v.as_ref())?;
3310                    Ok(end - start)
3311                } else {
3312                    Ok(0)
3313                }
3314            }
3315        }?;
3316        Ok(res)
3317    }
3318
3319    /// Checks if list is empty
3320    #[inline]
3321    fn _is_empty(&self) -> Result<bool> {
3322        let this = self;
3323        let res = {
3324            if this.db._is_expired(this.name.as_slice(), |k| {
3325                SledStorageDB::_list_contains_key(this.tree(), k)
3326            })? {
3327                Ok::<bool, anyhow::Error>(true)
3328            } else {
3329                let list_content_prefix =
3330                    Self::make_list_content_prefix(this.prefix_name.as_slice(), None);
3331                Ok(this
3332                    .tree()
3333                    .scan_prefix(list_content_prefix)
3334                    .keys()
3335                    .next()
3336                    .is_none())
3337            }
3338        }?;
3339        Ok(res)
3340    }
3341
3342    /// Sets expiration time (TTL feature)
3343    #[cfg(feature = "ttl")]
3344    #[inline]
3345    fn _expire_at(&self, at: TimestampMillis) -> Result<bool> {
3346        self.db._expire_at(self.name.as_slice(), at, KeyType::List)
3347    }
3348
3349    /// Gets time-to-live (TTL feature)
3350    #[cfg(feature = "ttl")]
3351    #[inline]
3352    fn _ttl(&self) -> Result<Option<TimestampMillis>> {
3353        Ok(self
3354            .db
3355            ._ttl(self.name(), |k| {
3356                SledStorageDB::_list_contains_key(self.tree(), k)
3357            })?
3358            .and_then(|(at, _)| if at > 0 { Some(at) } else { None }))
3359    }
3360
3361    /// Checks if list is expired
3362    #[inline]
3363    fn _is_expired(&self) -> Result<bool> {
3364        self.db._is_expired(self.name.as_slice(), |k| {
3365            SledStorageDB::_list_contains_key(self.tree(), k)
3366        })
3367    }
3368
3369    /// Checks if list is expired (async)
3370    #[inline]
3371    async fn call_is_expired(&self) -> Result<bool> {
3372        let (tx, rx) = oneshot::channel();
3373        self.db
3374            .cmd_send(Command::ListIsExpired(self.clone(), tx))
3375            .await?;
3376        rx.await?
3377    }
3378
3379    /// Creates prefix iterator
3380    #[inline]
3381    fn _prefix_iter(&self) -> sled::Iter {
3382        let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None);
3383        self.tree().scan_prefix(list_content_prefix)
3384    }
3385
3386    /// Creates prefix iterator (async)
3387    #[inline]
3388    async fn call_prefix_iter(&self) -> Result<sled::Iter> {
3389        let (tx, rx) = oneshot::channel();
3390        self.db
3391            .cmd_send(Command::ListPrefixIter(self.clone(), tx))
3392            .await?;
3393        Ok(rx.await?)
3394    }
3395}
3396
3397#[async_trait]
3398impl List for SledStorageList {
3399    /// Gets list name
3400    #[inline]
3401    fn name(&self) -> &[u8] {
3402        self.name.as_slice()
3403    }
3404
3405    /// Pushes value to list
3406    #[inline]
3407    async fn push<V>(&self, val: &V) -> Result<()>
3408    where
3409        V: serde::ser::Serialize + Sync + Send,
3410    {
3411        let val = bincode::serialize(val)?;
3412        let (tx, rx) = oneshot::channel();
3413        self.db
3414            .cmd_send(Command::ListPush(self.clone(), val.into(), tx))
3415            .await?;
3416        rx.await??;
3417        Ok(())
3418    }
3419
3420    /// Pushes multiple values to list
3421    #[inline]
3422    async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
3423    where
3424        V: Serialize + Sync + Send,
3425    {
3426        if vals.is_empty() {
3427            return Ok(());
3428        }
3429
3430        let vals = vals
3431            .into_iter()
3432            .map(|v| {
3433                bincode::serialize(&v)
3434                    .map(|v| v.into())
3435                    .map_err(|e| anyhow!(e))
3436            })
3437            .collect::<Result<Vec<_>>>()?;
3438
3439        let (tx, rx) = oneshot::channel();
3440        self.db
3441            .cmd_send(Command::ListPushs(self.clone(), vals, tx))
3442            .await?;
3443        rx.await??;
3444        Ok(())
3445    }
3446
3447    /// Pushes value with limit
3448    #[inline]
3449    async fn push_limit<V>(
3450        &self,
3451        val: &V,
3452        limit: usize,
3453        pop_front_if_limited: bool,
3454    ) -> Result<Option<V>>
3455    where
3456        V: serde::ser::Serialize + Sync + Send,
3457        V: DeserializeOwned,
3458    {
3459        let data = bincode::serialize(val)?;
3460
3461        let (tx, rx) = oneshot::channel();
3462        self.db
3463            .cmd_send(Command::ListPushLimit(
3464                self.clone(),
3465                data.into(),
3466                limit,
3467                pop_front_if_limited,
3468                tx,
3469            ))
3470            .await?;
3471
3472        let removed = if let Some(removed) = rx.await?? {
3473            Some(
3474                bincode::deserialize::<V>(removed.as_ref())
3475                    .map_err(|e| sled::Error::Io(io::Error::new(ErrorKind::InvalidData, e)))?,
3476            )
3477        } else {
3478            None
3479        };
3480        Ok(removed)
3481    }
3482
3483    /// Pops value from list
3484    #[inline]
3485    async fn pop<V>(&self) -> Result<Option<V>>
3486    where
3487        V: DeserializeOwned + Sync + Send,
3488    {
3489        let (tx, rx) = oneshot::channel();
3490        self.db.cmd_send(Command::ListPop(self.clone(), tx)).await?;
3491
3492        let removed = if let Some(removed) = rx.await?? {
3493            Some(
3494                bincode::deserialize::<V>(removed.as_ref())
3495                    .map_err(|e| sled::Error::Io(io::Error::new(ErrorKind::InvalidData, e)))?,
3496            )
3497        } else {
3498            None
3499        };
3500        Ok(removed)
3501    }
3502
3503    /// Gets all values in list
3504    #[inline]
3505    async fn all<V>(&self) -> Result<Vec<V>>
3506    where
3507        V: DeserializeOwned + Sync + Send,
3508    {
3509        let (tx, rx) = oneshot::channel();
3510        self.db.cmd_send(Command::ListAll(self.clone(), tx)).await?;
3511
3512        rx.await??
3513            .iter()
3514            .map(|v| bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
3515            .collect::<Result<Vec<_>>>()
3516    }
3517
3518    /// Gets value by index
3519    #[inline]
3520    async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
3521    where
3522        V: DeserializeOwned + Sync + Send,
3523    {
3524        let (tx, rx) = oneshot::channel();
3525        self.db
3526            .cmd_send(Command::ListGetIndex(self.clone(), idx, tx))
3527            .await?;
3528
3529        Ok(if let Some(res) = rx.await?? {
3530            Some(bincode::deserialize::<V>(res.as_ref()).map_err(|e| anyhow!(e))?)
3531        } else {
3532            None
3533        })
3534    }
3535
3536    /// Gets list length
3537    #[inline]
3538    async fn len(&self) -> Result<usize> {
3539        let (tx, rx) = oneshot::channel();
3540        self.db.cmd_send(Command::ListLen(self.clone(), tx)).await?;
3541        Ok(rx.await??)
3542    }
3543
3544    /// Checks if list is empty
3545    #[inline]
3546    async fn is_empty(&self) -> Result<bool> {
3547        let (tx, rx) = oneshot::channel();
3548        self.db
3549            .cmd_send(Command::ListIsEmpty(self.clone(), tx))
3550            .await?;
3551        Ok(rx.await??)
3552    }
3553
3554    /// Clears the list
3555    #[inline]
3556    async fn clear(&self) -> Result<()> {
3557        let (tx, rx) = oneshot::channel();
3558        self.db
3559            .cmd_send(Command::ListClear(self.clone(), tx))
3560            .await?;
3561        Ok(rx.await??)
3562    }
3563
3564    /// Iterates over list values
3565    #[inline]
3566    async fn iter<'a, V>(
3567        &'a mut self,
3568    ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
3569    where
3570        V: DeserializeOwned + Sync + Send + 'a + 'static,
3571    {
3572        let this = self;
3573        let res = {
3574            if this.call_is_expired().await? {
3575                let iter: Box<dyn AsyncIterator<Item = Result<V>> + Send> =
3576                    Box::new(AsyncEmptyIter {
3577                        _m: std::marker::PhantomData,
3578                    });
3579                Ok::<_, anyhow::Error>(iter)
3580            } else {
3581                let iter = this.call_prefix_iter().await?;
3582                let iter: Box<dyn AsyncIterator<Item = Result<V>> + Send> =
3583                    Box::new(AsyncListValIter {
3584                        db: &this.db,
3585                        iter: Some(iter),
3586                        _m: std::marker::PhantomData,
3587                    });
3588                Ok::<_, anyhow::Error>(iter)
3589            }
3590        }?;
3591        Ok(res)
3592    }
3593
3594    /// Sets expiration time (TTL feature)
3595    #[cfg(feature = "ttl")]
3596    async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
3597        let (tx, rx) = oneshot::channel();
3598        self.db
3599            .cmd_send(Command::ListExpireAt(self.clone(), at, tx))
3600            .await?;
3601        Ok(rx.await??)
3602    }
3603
3604    /// Sets time-to-live (TTL feature)
3605    #[cfg(feature = "ttl")]
3606    async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
3607        let at = timestamp_millis() + dur;
3608        self.expire_at(at).await
3609    }
3610
3611    /// Gets time-to-live (TTL feature)
3612    #[cfg(feature = "ttl")]
3613    async fn ttl(&self) -> Result<Option<TimestampMillis>> {
3614        let (tx, rx) = oneshot::channel();
3615        self.db.cmd_send(Command::ListTTL(self.clone(), tx)).await?;
3616        Ok(rx.await??)
3617    }
3618}
3619
3620/// Async iterator for map items
3621pub struct AsyncIter<'a, V> {
3622    db: &'a SledStorageDB,
3623    prefix_len: usize,
3624    iter: Option<sled::Iter>,
3625    _m: std::marker::PhantomData<V>,
3626}
3627
3628impl<V> Debug for AsyncIter<'_, V> {
3629    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3630        f.debug_tuple("AsyncIter .. ").finish()
3631    }
3632}
3633
3634#[async_trait]
3635impl<V> AsyncIterator for AsyncIter<'_, V>
3636where
3637    V: DeserializeOwned + Sync + Send + 'static,
3638{
3639    type Item = IterItem<V>;
3640
3641    async fn next(&mut self) -> Option<Self::Item> {
3642        let mut iter = self.iter.take()?;
3643        let (tx, rx) = oneshot::channel();
3644        if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3645            return Some(Err(e));
3646        }
3647        let item = match rx.await {
3648            Err(e) => {
3649                return Some(Err(anyhow::Error::new(e)));
3650            }
3651            Ok((it, item)) => {
3652                iter = it;
3653                item
3654            }
3655        };
3656
3657        match item {
3658            None => None,
3659            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3660            Some(Ok((k, v))) => {
3661                let name = k.as_ref()[self.prefix_len..].to_vec();
3662                match bincode::deserialize::<V>(v.as_ref()) {
3663                    Ok(v) => {
3664                        self.iter = Some(iter);
3665                        Some(Ok((name, v)))
3666                    }
3667                    Err(e) => Some(Err(anyhow::Error::new(e))),
3668                }
3669            }
3670        }
3671    }
3672}
3673
3674/// Async iterator for map keys
3675pub struct AsyncKeyIter<'a> {
3676    db: &'a SledStorageDB,
3677    prefix_len: usize,
3678    iter: Option<sled::Iter>,
3679}
3680
3681impl Debug for AsyncKeyIter<'_> {
3682    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3683        f.debug_tuple("AsyncKeyIter .. ").finish()
3684    }
3685}
3686
3687#[async_trait]
3688impl AsyncIterator for AsyncKeyIter<'_> {
3689    type Item = Result<Key>;
3690
3691    async fn next(&mut self) -> Option<Self::Item> {
3692        let mut iter = self.iter.take()?;
3693        let (tx, rx) = oneshot::channel();
3694        if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3695            return Some(Err(e));
3696        }
3697        let item = match rx.await {
3698            Err(e) => {
3699                return Some(Err(anyhow::Error::new(e)));
3700            }
3701            Ok((it, item)) => {
3702                iter = it;
3703                item
3704            }
3705        };
3706
3707        return match item {
3708            None => None,
3709            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3710            Some(Ok((k, _))) => {
3711                self.iter = Some(iter);
3712                let name = k.as_ref()[self.prefix_len..].to_vec();
3713                Some(Ok(name))
3714            }
3715        };
3716    }
3717}
3718
3719/// Async iterator for list values
3720pub struct AsyncListValIter<'a, V> {
3721    db: &'a SledStorageDB,
3722    iter: Option<sled::Iter>,
3723    _m: std::marker::PhantomData<V>,
3724}
3725
3726impl<V> Debug for AsyncListValIter<'_, V> {
3727    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3728        f.debug_tuple("AsyncListValIter .. ").finish()
3729    }
3730}
3731
3732#[async_trait]
3733impl<V> AsyncIterator for AsyncListValIter<'_, V>
3734where
3735    V: DeserializeOwned + Sync + Send + 'static,
3736{
3737    type Item = Result<V>;
3738
3739    async fn next(&mut self) -> Option<Self::Item> {
3740        let mut iter = self.iter.take()?;
3741        let (tx, rx) = oneshot::channel();
3742        if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3743            return Some(Err(e));
3744        }
3745        let item = match rx.await {
3746            Err(e) => {
3747                return Some(Err(anyhow::Error::new(e)));
3748            }
3749            Ok((it, item)) => {
3750                iter = it;
3751                item
3752            }
3753        };
3754
3755        match item {
3756            None => None,
3757            Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3758            Some(Ok((_k, v))) => {
3759                self.iter = Some(iter);
3760                Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
3761            }
3762        }
3763    }
3764}
3765
3766/// Empty iterator
3767pub struct AsyncEmptyIter<T> {
3768    _m: std::marker::PhantomData<T>,
3769}
3770
3771impl<T> Debug for AsyncEmptyIter<T> {
3772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3773        f.debug_tuple("AsyncEmptyIter .. ").finish()
3774    }
3775}
3776
3777#[async_trait]
3778impl<T> AsyncIterator for AsyncEmptyIter<T>
3779where
3780    T: Send + Sync + 'static,
3781{
3782    type Item = T;
3783
3784    async fn next(&mut self) -> Option<Self::Item> {
3785        None
3786    }
3787}
3788
3789/// Async iterator for maps
3790pub struct AsyncMapIter<'a> {
3791    db: &'a SledStorageDB,
3792    iter: Option<sled::Iter>,
3793}
3794
3795impl<'a> AsyncMapIter<'a> {
3796    fn new(db: &'a SledStorageDB, iter: sled::Iter) -> Self {
3797        Self {
3798            db,
3799            iter: Some(iter),
3800        }
3801    }
3802}
3803
3804impl Debug for AsyncMapIter<'_> {
3805    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3806        f.debug_tuple("AsyncMapIter .. ").finish()
3807    }
3808}
3809
3810#[async_trait]
3811impl AsyncIterator for AsyncMapIter<'_> {
3812    type Item = Result<StorageMap>;
3813
3814    async fn next(&mut self) -> Option<Self::Item> {
3815        let mut iter = self.iter.take()?;
3816        loop {
3817            let (tx, rx) = oneshot::channel();
3818            if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3819                return Some(Err(e));
3820            }
3821            let item = match rx.await {
3822                Err(e) => {
3823                    return Some(Err(anyhow::Error::new(e)));
3824                }
3825                Ok((it, item)) => {
3826                    iter = it;
3827                    item
3828                }
3829            };
3830
3831            match item {
3832                None => return None,
3833                Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
3834                Some(Ok((k, _))) => {
3835                    if !SledStorageDB::is_map_count_key(k.as_ref()) {
3836                        continue;
3837                    }
3838                    self.iter = Some(iter);
3839                    let name = SledStorageDB::map_count_key_to_name(k.as_ref());
3840                    return Some(Ok(StorageMap::Sled(self.db._map(name))));
3841                }
3842            }
3843        }
3844    }
3845}
3846
3847/// Async iterator for lists
3848pub struct AsyncListIter<'a> {
3849    db: &'a SledStorageDB,
3850    iter: Option<sled::Iter>,
3851}
3852
3853impl Debug for AsyncListIter<'_> {
3854    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3855        f.debug_tuple("AsyncListIter .. ").finish()
3856    }
3857}
3858
3859#[async_trait]
3860impl AsyncIterator for AsyncListIter<'_> {
3861    type Item = Result<StorageList>;
3862
3863    async fn next(&mut self) -> Option<Self::Item> {
3864        let mut iter = self.iter.take()?;
3865        loop {
3866            let (tx, rx) = oneshot::channel();
3867            if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3868                return Some(Err(e));
3869            }
3870            let item = match rx.await {
3871                Err(e) => {
3872                    return Some(Err(anyhow::Error::new(e)));
3873                }
3874                Ok((it, item)) => {
3875                    iter = it;
3876                    item
3877                }
3878            };
3879            return match item {
3880                None => None,
3881                Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3882                Some(Ok((k, _))) => {
3883                    if !SledStorageDB::is_list_count_key(k.as_ref()) {
3884                        continue;
3885                    }
3886                    self.iter = Some(iter);
3887                    let name = SledStorageDB::list_count_key_to_name(k.as_ref());
3888                    Some(Ok(StorageList::Sled(self.db._list(name))))
3889                }
3890            };
3891        }
3892    }
3893}
3894
3895/// Async iterator for database keys with pattern matching
3896pub struct AsyncDbKeyIter<'a> {
3897    db: &'a SledStorageDB,
3898    pattern: Pattern,
3899    iter: Option<sled::Iter>,
3900}
3901
3902impl Debug for AsyncDbKeyIter<'_> {
3903    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3904        f.debug_tuple("AsyncDbKeyIter .. ").finish()
3905    }
3906}
3907
3908#[async_trait]
3909impl AsyncIterator for AsyncDbKeyIter<'_> {
3910    type Item = Result<Key>;
3911
3912    async fn next(&mut self) -> Option<Self::Item> {
3913        let mut iter = self.iter.take()?;
3914        loop {
3915            let (tx, rx) = oneshot::channel();
3916            if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3917                return Some(Err(e));
3918            }
3919            let item = match rx.await {
3920                Err(e) => {
3921                    return Some(Err(anyhow::Error::new(e)));
3922                }
3923                Ok((it, item)) => {
3924                    iter = it;
3925                    item
3926                }
3927            };
3928
3929            return match item {
3930                None => None,
3931                Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3932                Some(Ok((k, _))) => {
3933                    if !is_match(self.pattern.clone(), k.as_ref()) {
3934                        continue;
3935                    }
3936                    self.iter = Some(iter);
3937                    Some(Ok(k.to_vec()))
3938                }
3939            };
3940        }
3941    }
3942}