1use core::fmt;
16use std::borrow::Cow;
17use std::fmt::Debug;
18use std::io;
19use std::io::{ErrorKind, Read};
20use std::ops::Deref;
21use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
22use std::sync::Arc;
23
24use anyhow::{anyhow, Error};
25use async_trait::async_trait;
26use convert::Bytesize;
27use serde::de::DeserializeOwned;
28use serde::Deserialize;
29use serde::Serialize;
30use serde_json::Value;
31
32#[allow(unused_imports)]
33use sled::transaction::TransactionResult;
34use sled::transaction::{
35 ConflictableTransactionError, ConflictableTransactionResult, TransactionError,
36 TransactionalTree,
37};
38#[allow(unused_imports)]
39use sled::Transactional;
40use sled::{Batch, IVec, Tree};
41use tokio::runtime::Handle;
42use tokio::sync::mpsc;
43use tokio::sync::oneshot;
44use tokio::task::spawn_blocking;
45
46use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB};
47#[allow(unused_imports)]
48use crate::{timestamp_millis, TimestampMillis};
49use crate::{Result, StorageList, StorageMap};
50
51const SEPARATOR: &[u8] = b"@";
53const KV_TREE: &[u8] = b"__kv_tree@";
55const MAP_TREE: &[u8] = b"__map_tree@";
57const LIST_TREE: &[u8] = b"__list_tree@";
59const EXPIRE_KEYS_TREE: &[u8] = b"__expire_key_tree@";
61const KEY_EXPIRE_TREE: &[u8] = b"__key_expire_tree@";
63const MAP_NAME_PREFIX: &[u8] = b"__map@";
65const MAP_KEY_SEPARATOR: &[u8] = b"@__item@";
67#[allow(dead_code)]
68const MAP_KEY_COUNT_SUFFIX: &[u8] = b"@__count@";
70
71const LIST_NAME_PREFIX: &[u8] = b"__list@";
73const LIST_KEY_COUNT_SUFFIX: &[u8] = b"@__count@";
75const LIST_KEY_CONTENT_SUFFIX: &[u8] = b"@__content@";
77
78#[allow(dead_code)]
80#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
81enum KeyType {
82 KV,
84 Map,
86 List,
88}
89
90impl KeyType {
91 #[inline]
93 #[allow(dead_code)]
94 fn encode(&self) -> &[u8] {
95 match self {
96 KeyType::KV => &[1],
97 KeyType::Map => &[2],
98 KeyType::List => &[3],
99 }
100 }
101
102 #[inline]
104 #[allow(dead_code)]
105 fn decode(v: &[u8]) -> Result<Self> {
106 if v.is_empty() {
107 Err(anyhow!("invalid data"))
108 } else {
109 match v[0] {
110 1 => Ok(KeyType::KV),
111 2 => Ok(KeyType::Map),
112 3 => Ok(KeyType::List),
113 _ => Err(anyhow!("invalid data")),
114 }
115 }
116 }
117}
118
119enum Command {
121 DBInsert(SledStorageDB, Key, Vec<u8>, oneshot::Sender<Result<()>>),
123 DBGet(SledStorageDB, IVec, oneshot::Sender<Result<Option<IVec>>>),
124 DBRemove(SledStorageDB, IVec, oneshot::Sender<Result<()>>),
125 DBMapNew(
126 SledStorageDB,
127 IVec,
128 Option<TimestampMillis>,
129 oneshot::Sender<Result<SledStorageMap>>,
130 ),
131 DBMapRemove(SledStorageDB, IVec, oneshot::Sender<Result<()>>),
132 DBMapContainsKey(SledStorageDB, IVec, oneshot::Sender<Result<bool>>),
133 DBListNew(
134 SledStorageDB,
135 IVec,
136 Option<TimestampMillis>,
137 oneshot::Sender<Result<SledStorageList>>,
138 ),
139 DBListRemove(SledStorageDB, IVec, oneshot::Sender<Result<()>>),
140 DBListContainsKey(SledStorageDB, IVec, oneshot::Sender<Result<bool>>),
141 DBBatchInsert(SledStorageDB, Vec<(Key, IVec)>, oneshot::Sender<Result<()>>),
142 DBBatchRemove(SledStorageDB, Vec<Key>, oneshot::Sender<Result<()>>),
143 DBCounterIncr(SledStorageDB, IVec, isize, oneshot::Sender<Result<()>>),
144 DBCounterDecr(SledStorageDB, IVec, isize, oneshot::Sender<Result<()>>),
145 DBCounterGet(SledStorageDB, IVec, oneshot::Sender<Result<Option<isize>>>),
146 DBCounterSet(SledStorageDB, IVec, isize, oneshot::Sender<Result<()>>),
147 DBContainsKey(SledStorageDB, IVec, oneshot::Sender<Result<bool>>),
148 #[cfg(feature = "ttl")]
149 DBExpireAt(
150 SledStorageDB,
151 IVec,
152 TimestampMillis,
153 oneshot::Sender<Result<bool>>,
154 ),
155 #[cfg(feature = "ttl")]
156 DBTtl(
157 SledStorageDB,
158 IVec,
159 oneshot::Sender<Result<Option<TimestampMillis>>>,
160 ),
161 DBMapPrefixIter(SledStorageDB, oneshot::Sender<sled::Iter>),
162 DBListPrefixIter(SledStorageDB, oneshot::Sender<sled::Iter>),
163 DBScanIter(SledStorageDB, Vec<u8>, oneshot::Sender<sled::Iter>),
164 #[allow(dead_code)]
165 DBLen(SledStorageDB, oneshot::Sender<usize>),
166 DBSize(SledStorageDB, oneshot::Sender<usize>),
167
168 MapInsert(SledStorageMap, IVec, IVec, oneshot::Sender<Result<()>>),
170 MapGet(SledStorageMap, IVec, oneshot::Sender<Result<Option<IVec>>>),
171 MapRemove(SledStorageMap, IVec, oneshot::Sender<Result<()>>),
172 MapContainsKey(SledStorageMap, IVec, oneshot::Sender<Result<bool>>),
173 #[cfg(feature = "map_len")]
174 MapLen(SledStorageMap, oneshot::Sender<Result<usize>>),
175 MapIsEmpty(SledStorageMap, oneshot::Sender<Result<bool>>),
176 MapClear(SledStorageMap, oneshot::Sender<Result<()>>),
177 MapRemoveAndFetch(SledStorageMap, IVec, oneshot::Sender<Result<Option<IVec>>>),
178 MapRemoveWithPrefix(SledStorageMap, IVec, oneshot::Sender<Result<()>>),
179 MapBatchInsert(
180 SledStorageMap,
181 Vec<(IVec, IVec)>,
182 oneshot::Sender<Result<()>>,
183 ),
184 MapBatchRemove(SledStorageMap, Vec<IVec>, oneshot::Sender<Result<()>>),
185 #[cfg(feature = "ttl")]
186 MapExpireAt(
187 SledStorageMap,
188 TimestampMillis,
189 oneshot::Sender<Result<bool>>,
190 ),
191 #[cfg(feature = "ttl")]
192 MapTTL(
193 SledStorageMap,
194 oneshot::Sender<Result<Option<TimestampMillis>>>,
195 ),
196 MapIsExpired(SledStorageMap, oneshot::Sender<Result<bool>>),
197 MapPrefixIter(SledStorageMap, Option<IVec>, oneshot::Sender<sled::Iter>),
198
199 ListPush(SledStorageList, IVec, oneshot::Sender<Result<()>>),
201 ListPushs(SledStorageList, Vec<IVec>, oneshot::Sender<Result<()>>),
202 ListPushLimit(
203 SledStorageList,
204 IVec,
205 usize,
206 bool,
207 oneshot::Sender<Result<Option<IVec>>>,
208 ),
209 ListPop(SledStorageList, oneshot::Sender<Result<Option<IVec>>>),
210 ListAll(SledStorageList, oneshot::Sender<Result<Vec<IVec>>>),
211 ListGetIndex(
212 SledStorageList,
213 usize,
214 oneshot::Sender<Result<Option<IVec>>>,
215 ),
216 ListLen(SledStorageList, oneshot::Sender<Result<usize>>),
217 ListIsEmpty(SledStorageList, oneshot::Sender<Result<bool>>),
218 ListClear(SledStorageList, oneshot::Sender<Result<()>>),
219 #[cfg(feature = "ttl")]
220 ListExpireAt(
221 SledStorageList,
222 TimestampMillis,
223 oneshot::Sender<Result<bool>>,
224 ),
225 #[cfg(feature = "ttl")]
226 ListTTL(
227 SledStorageList,
228 oneshot::Sender<Result<Option<TimestampMillis>>>,
229 ),
230 ListIsExpired(SledStorageList, oneshot::Sender<Result<bool>>),
231 ListPrefixIter(SledStorageList, oneshot::Sender<sled::Iter>),
232
233 #[allow(clippy::type_complexity)]
235 IterNext(
236 sled::Iter,
237 oneshot::Sender<(sled::Iter, Option<sled::Result<(IVec, IVec)>>)>,
238 ),
239}
240
241pub type CleanupFun = fn(&SledStorageDB);
243
244fn def_cleanup(_db: &SledStorageDB) {
246 #[cfg(feature = "ttl")]
247 {
248 let db = _db.clone();
249 std::thread::spawn(move || {
250 let limit = 500;
251 loop {
252 std::thread::sleep(std::time::Duration::from_secs(10));
253 let mut total_cleanups = 0;
254 let now = std::time::Instant::now();
255 loop {
256 let now = std::time::Instant::now();
257 let count = db.cleanup(limit);
258 total_cleanups += count;
259 if count > 0 {
260 log::debug!(
261 "def_cleanup: {}, total cleanups: {}, active_count(): {}, cost time: {:?}",
262 count,
263 total_cleanups,
264 db.active_count(),
265 now.elapsed()
266 );
267 }
268 if count < limit {
269 break;
270 }
271 if db.active_count() > 50 {
272 std::thread::sleep(std::time::Duration::from_millis(500));
273 } else {
274 std::thread::sleep(std::time::Duration::from_millis(0));
275 }
276 }
277 if now.elapsed().as_secs() > 3 {
278 log::info!(
279 "total cleanups: {}, cost time: {:?}",
280 total_cleanups,
281 now.elapsed()
282 );
283 }
284 }
285 });
286 }
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct SledConfig {
292 pub path: String,
294 pub cache_capacity: Bytesize,
296 #[serde(skip, default = "SledConfig::cleanup_f_default")]
298 pub cleanup_f: CleanupFun,
299}
300
301impl Default for SledConfig {
302 fn default() -> Self {
303 SledConfig {
304 path: String::default(),
305 cache_capacity: Bytesize::from(1024 * 1024 * 1024),
306 cleanup_f: def_cleanup,
307 }
308 }
309}
310
311impl SledConfig {
312 #[inline]
314 pub fn to_sled_config(&self) -> Result<sled::Config> {
315 if self.path.trim().is_empty() {
316 return Err(Error::msg("storage dir is empty"));
317 }
318 let sled_cfg = sled::Config::default()
319 .path(self.path.trim())
320 .cache_capacity(self.cache_capacity.as_u64())
321 .mode(sled::Mode::HighThroughput);
322 Ok(sled_cfg)
323 }
324
325 #[inline]
327 fn cleanup_f_default() -> CleanupFun {
328 def_cleanup
329 }
330}
331
332fn _increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
334 let number = match old {
335 Some(bytes) => {
336 if let Ok(array) = bytes.try_into() {
337 let number = isize::from_be_bytes(array);
338 number + 1
339 } else {
340 1
341 }
342 }
343 None => 1,
344 };
345
346 Some(number.to_be_bytes().to_vec())
347}
348
349fn _decrement(old: Option<&[u8]>) -> Option<Vec<u8>> {
351 let number = match old {
352 Some(bytes) => {
353 if let Ok(array) = bytes.try_into() {
354 let number = isize::from_be_bytes(array);
355 number - 1
356 } else {
357 -1
358 }
359 }
360 None => -1,
361 };
362
363 Some(number.to_be_bytes().to_vec())
364}
365
366#[derive(Clone)]
368pub struct Pattern(Arc<Vec<PatternChar>>);
369
370impl Deref for Pattern {
371 type Target = Vec<PatternChar>;
372
373 fn deref(&self) -> &Self::Target {
374 &self.0
375 }
376}
377
378impl From<&str> for Pattern {
379 fn from(pattern: &str) -> Self {
380 Pattern::parse(pattern.as_bytes())
381 }
382}
383
384impl From<&[u8]> for Pattern {
385 fn from(pattern: &[u8]) -> Self {
386 Pattern::parse(pattern)
387 }
388}
389
390#[derive(Clone)]
392pub enum PatternChar {
393 Literal(u8),
395 Wildcard,
397 AnyChar,
399}
400
401impl Pattern {
402 pub fn parse(pattern: &[u8]) -> Self {
404 let mut parsed_pattern = Vec::new();
405 let mut chars = pattern.bytes().peekable();
406
407 while let Some(Ok(c)) = chars.next() {
408 if c == b'\\' {
409 if let Some(Ok(next_char)) = chars.next() {
410 match next_char {
411 b'?' => parsed_pattern.push(PatternChar::Literal(b'?')),
412 b'*' => parsed_pattern.push(PatternChar::Literal(b'*')),
413 _ => {
414 parsed_pattern.push(PatternChar::Literal(b'\\'));
415 parsed_pattern.push(PatternChar::Literal(next_char));
416 }
417 }
418 }
419 } else {
420 match c {
421 b'?' => parsed_pattern.push(PatternChar::AnyChar),
422 b'*' => parsed_pattern.push(PatternChar::Wildcard),
423 _ => parsed_pattern.push(PatternChar::Literal(c)),
424 }
425 }
426 }
427
428 Pattern(Arc::new(parsed_pattern))
429 }
430}
431
432fn is_match<P: Into<Pattern>>(pattern: P, text: &[u8]) -> bool {
434 let pattern = pattern.into();
435 let text_chars = text;
436 let pattern_len = pattern.len();
437 let text_len = text_chars.len();
438
439 let mut dp = vec![vec![false; text_len + 1]; pattern_len + 1];
440 dp[0][0] = true;
441
442 for i in 1..=pattern_len {
443 if let PatternChar::Wildcard = pattern[i - 1] {
444 dp[i][0] = dp[i - 1][0];
445 }
446 for j in 1..=text_len {
447 match pattern[i - 1] {
448 PatternChar::Wildcard => {
449 dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
450 }
451 PatternChar::AnyChar | PatternChar::Literal(_) => {
452 if let PatternChar::Literal(c) = pattern[i - 1] {
453 dp[i][j] = (c == b'?' || c == text_chars[j - 1]) && dp[i - 1][j - 1];
454 } else {
455 dp[i][j] = dp[i - 1][j - 1];
456 }
457 }
458 }
459 }
460 }
461
462 dp[pattern_len][text_len]
463}
464
465pub trait BytesReplace {
467 fn replace(self, from: &[u8], to: &[u8]) -> Vec<u8>;
469}
470
471impl BytesReplace for &[u8] {
472 fn replace(self, from: &[u8], to: &[u8]) -> Vec<u8> {
473 let input = self;
474 let mut result = Vec::new();
475 let mut i = 0;
476 while i < input.len() {
477 if input[i..].starts_with(from) {
478 result.extend_from_slice(to);
479 i += from.len();
480 } else {
481 result.push(input[i]);
482 i += 1;
483 }
484 }
485 result
486 }
487}
488
489#[derive(Clone)]
491pub struct SledStorageDB {
492 pub(crate) db: Arc<sled::Db>,
494 pub(crate) kv_tree: sled::Tree,
496 pub(crate) map_tree: sled::Tree,
498 pub(crate) list_tree: sled::Tree,
500 #[allow(dead_code)]
502 pub(crate) expire_key_tree: sled::Tree,
503 #[allow(dead_code)]
505 pub(crate) key_expire_tree: sled::Tree,
506 cmd_tx: mpsc::Sender<Command>,
508 active_count: Arc<AtomicIsize>,
510}
511
512impl SledStorageDB {
513 #[inline]
515 pub(crate) async fn new(cfg: SledConfig) -> Result<Self> {
516 let sled_cfg = cfg.to_sled_config()?;
517 let (db, kv_tree, map_tree, list_tree, expire_key_tree, key_expire_tree) =
518 sled_cfg.open().map(|db| {
519 let kv_tree = db.open_tree(KV_TREE);
520 let map_tree = db.open_tree(MAP_TREE);
521 let list_tree = db.open_tree(LIST_TREE);
522 let expire_key_tree = db.open_tree(EXPIRE_KEYS_TREE);
523 let key_expire_tree = db.open_tree(KEY_EXPIRE_TREE);
524 (
525 Arc::new(db),
526 kv_tree,
527 map_tree,
528 list_tree,
529 expire_key_tree,
530 key_expire_tree,
531 )
532 })?;
533 let kv_tree = kv_tree?;
534 let map_tree = map_tree?;
535 let list_tree = list_tree?;
536 let expire_key_tree = expire_key_tree?;
537 let key_expire_tree = key_expire_tree?;
538 let active_count = Arc::new(AtomicIsize::new(0));
539 let active_count1 = active_count.clone();
540
541 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::channel::<Command>(300_000);
542 spawn_blocking(move || {
543 Handle::current().block_on(async move {
544 while let Some(cmd) = cmd_rx.recv().await {
545 let err = anyhow::Error::msg("send result fail");
546 let snd_res = match cmd {
547 Command::DBInsert(db, key, val, res_tx) => res_tx
548 .send(db._insert(key.as_slice(), val.as_slice()))
549 .map_err(|_| err),
550 Command::DBGet(db, key, res_tx) => {
551 res_tx.send(db._get(key.as_ref())).map_err(|_| err)
552 }
553 Command::DBRemove(db, key, res_tx) => {
554 res_tx.send(db._kv_remove(key.as_ref())).map_err(|_| err)
555 }
556 Command::DBMapNew(db, name, expire_ms, res_tx) => {
557 let map =
558 SledStorageMap::_new_expire(name.as_ref().to_vec(), expire_ms, db);
559 res_tx.send(map).map_err(|_| err)
560 }
561 Command::DBMapRemove(db, name, res_tx) => {
562 res_tx.send(db._map_remove(name.as_ref())).map_err(|_| err)
563 }
564 Command::DBMapContainsKey(db, key, res_tx) => res_tx
565 .send(db._self_map_contains_key(key.as_ref()))
566 .map_err(|_| err),
567 Command::DBListNew(db, name, expire_ms, res_tx) => {
568 let list =
569 SledStorageList::_new_expire(name.as_ref().to_vec(), expire_ms, db);
570 res_tx.send(list).map_err(|_| err)
571 }
572 Command::DBListRemove(db, name, res_tx) => {
573 res_tx.send(db._list_remove(name.as_ref())).map_err(|_| err)
574 }
575 Command::DBListContainsKey(db, key, res_tx) => res_tx
576 .send(db._self_list_contains_key(key.as_ref()))
577 .map_err(|_| err),
578 Command::DBBatchInsert(db, key_vals, res_tx) => {
579 res_tx.send(db._batch_insert(key_vals)).map_err(|_| err)
580 }
581 Command::DBBatchRemove(db, keys, res_tx) => {
582 res_tx.send(db._batch_remove(keys)).map_err(|_| err)
583 }
584 Command::DBCounterIncr(db, key, increment, res_tx) => res_tx
585 .send(db._counter_incr(key.as_ref(), increment))
586 .map_err(|_| err),
587 Command::DBCounterDecr(db, key, increment, res_tx) => res_tx
588 .send(db._counter_decr(key.as_ref(), increment))
589 .map_err(|_| err),
590 Command::DBCounterGet(db, key, res_tx) => {
591 res_tx.send(db._counter_get(key.as_ref())).map_err(|_| err)
592 }
593 Command::DBCounterSet(db, key, val, res_tx) => res_tx
594 .send(db._counter_set(key.as_ref(), val))
595 .map_err(|_| err),
596 Command::DBContainsKey(db, key, res_tx) => res_tx
597 .send(db._self_contains_key(key.as_ref()))
598 .map_err(|_| err),
599 #[cfg(feature = "ttl")]
600 Command::DBExpireAt(db, key, at, res_tx) => res_tx
601 .send(db._expire_at(key.as_ref(), at, KeyType::KV))
602 .map_err(|_| err),
603 #[cfg(feature = "ttl")]
604 Command::DBTtl(db, key, res_tx) => {
605 res_tx.send(db._self_ttl(key.as_ref())).map_err(|_| err)
606 }
607 Command::DBMapPrefixIter(db, res_tx) => {
608 res_tx.send(db._map_scan_prefix()).map_err(|_| err)
609 }
610 Command::DBListPrefixIter(db, res_tx) => {
611 res_tx.send(db._list_scan_prefix()).map_err(|_| err)
612 }
613 Command::DBScanIter(db, pattern, res_tx) => {
614 res_tx.send(db._db_scan_prefix(pattern)).map_err(|_| err)
615 }
616 Command::DBLen(db, res_tx) => res_tx.send(db._kv_len()).map_err(|_| err),
617 Command::DBSize(db, res_tx) => res_tx.send(db._db_size()).map_err(|_| err),
618
619 Command::MapInsert(map, key, val, res_tx) => {
620 res_tx.send(map._insert(key, val)).map_err(|_| err)
621 }
622 Command::MapGet(map, key, res_tx) => {
623 res_tx.send(map._get(key)).map_err(|_| err)
624 }
625 Command::MapRemove(map, key, res_tx) => {
626 res_tx.send(map._remove(key)).map_err(|_| err)
627 }
628 Command::MapContainsKey(map, key, res_tx) => {
629 res_tx.send(map._contains_key(key)).map_err(|_| err)
630 }
631 #[cfg(feature = "map_len")]
632 Command::MapLen(map, res_tx) => res_tx.send(map._len()).map_err(|_| err),
633 Command::MapIsEmpty(map, res_tx) => {
634 res_tx.send(map._is_empty()).map_err(|_| err)
635 }
636 Command::MapClear(map, res_tx) => {
637 res_tx.send(map._clear()).map_err(|_| err)
638 }
639 Command::MapRemoveAndFetch(map, key, res_tx) => {
640 res_tx.send(map._remove_and_fetch(key)).map_err(|_| err)
641 }
642 Command::MapRemoveWithPrefix(map, key, res_tx) => {
643 res_tx.send(map._remove_with_prefix(key)).map_err(|_| err)
644 }
645 Command::MapBatchInsert(map, key_vals, res_tx) => {
646 res_tx.send(map._batch_insert(key_vals)).map_err(|_| err)
647 }
648 Command::MapBatchRemove(map, keys, res_tx) => {
649 res_tx.send(map._batch_remove(keys)).map_err(|_| err)
650 }
651 #[cfg(feature = "ttl")]
652 Command::MapExpireAt(map, at, res_tx) => {
653 res_tx.send(map._expire_at(at)).map_err(|_| err)
654 }
655 #[cfg(feature = "ttl")]
656 Command::MapTTL(map, res_tx) => res_tx.send(map._ttl()).map_err(|_| err),
657 Command::MapIsExpired(map, res_tx) => {
658 res_tx.send(map._is_expired()).map_err(|_| err)
659 }
660 Command::MapPrefixIter(map, prefix, res_tx) => {
661 res_tx.send(map._prefix_iter(prefix)).map_err(|_| err)
662 }
663
664 Command::ListPush(list, val, res_tx) => {
665 res_tx.send(list._push(val)).map_err(|_| err)
666 }
667 Command::ListPushs(list, vals, res_tx) => {
668 res_tx.send(list._pushs(vals)).map_err(|_| err)
669 }
670 Command::ListPushLimit(list, data, limit, pop_front_if_limited, res_tx) => {
671 res_tx
672 .send(list._push_limit(data, limit, pop_front_if_limited))
673 .map_err(|_| err)
674 }
675 Command::ListPop(list, res_tx) => res_tx.send(list._pop()).map_err(|_| err),
676 Command::ListAll(list, res_tx) => res_tx.send(list._all()).map_err(|_| err),
677 Command::ListGetIndex(list, idx, res_tx) => {
678 res_tx.send(list._get_index(idx)).map_err(|_| err)
679 }
680 Command::ListLen(list, res_tx) => res_tx.send(list._len()).map_err(|_| err),
681 Command::ListIsEmpty(list, res_tx) => {
682 res_tx.send(list._is_empty()).map_err(|_| err)
683 }
684 Command::ListClear(list, res_tx) => {
685 res_tx.send(list._clear()).map_err(|_| err)
686 }
687 #[cfg(feature = "ttl")]
688 Command::ListExpireAt(list, at, res_tx) => {
689 res_tx.send(list._expire_at(at)).map_err(|_| err)
690 }
691 #[cfg(feature = "ttl")]
692 Command::ListTTL(list, res_tx) => res_tx.send(list._ttl()).map_err(|_| err),
693 Command::ListIsExpired(list, res_tx) => {
694 res_tx.send(list._is_expired()).map_err(|_| err)
695 }
696 Command::ListPrefixIter(list, res_tx) => {
697 res_tx.send(list._prefix_iter()).map_err(|_| err)
698 }
699
700 Command::IterNext(mut iter, res_tx) => {
701 let item = iter.next();
702 res_tx.send((iter, item)).map_err(|_| err)
703 }
704 };
705
706 if let Err(e) = snd_res {
707 log::error!("{:?}", e);
708 }
709
710 active_count1.fetch_sub(1, Ordering::Relaxed);
711 }
712 })
713 });
714
715 let db = Self {
716 db,
717 kv_tree,
718 map_tree,
719 list_tree,
720 expire_key_tree,
721 key_expire_tree,
722 cmd_tx,
723 active_count,
724 };
725
726 (cfg.cleanup_f)(&db);
727
728 Ok(db)
729 }
730
731 #[cfg(feature = "ttl")]
733 #[inline]
734 pub fn cleanup(&self, limit: usize) -> usize {
735 let rmeove = |typ: &KeyType, key: &[u8]| -> Result<()> {
736 match typ {
737 KeyType::Map => {
738 self._map(key.as_ref())._clear()?;
739 }
740 KeyType::List => {
741 self._list(key.as_ref())._clear()?;
742 }
743 KeyType::KV => {
744 self.kv_tree.remove(key.as_ref())?;
745 }
746 }
747 Ok(())
748 };
749 let mut count = 0;
750 let mut expire_at_key_types = Vec::new();
751 for item in self.expire_key_tree.iter() {
752 if count > limit {
753 break;
754 }
755 let (expire_at_key, key_type) = match item {
756 Ok(item) => item,
757 Err(e) => {
758 log::error!("{:?}", e);
759 break;
760 }
761 };
762
763 let (expire_at_bytes, _) = expire_at_key.as_ref().split_at(8);
764
765 let expire_at = match expire_at_bytes.try_into() {
766 Ok(at) => i64::from_be_bytes(at),
767 Err(e) => {
768 log::error!("{:?}", e);
769 break;
770 }
771 };
772
773 if expire_at > timestamp_millis() {
774 break;
775 }
776
777 let key_type = match KeyType::decode(key_type.as_ref()) {
778 Ok(key_type) => key_type,
779 Err(e) => {
780 log::error!("{:?}", e);
781 break;
782 }
783 };
784
785 expire_at_key_types.push((expire_at_key, key_type));
786 count += 1;
787 }
788
789 let mut key_expire_batch = sled::Batch::default();
790 let mut expire_key_batch = sled::Batch::default();
791 let keys: Vec<(&[u8], &KeyType)> = expire_at_key_types
792 .iter()
793 .map(|(expire_at_key, key_type)| {
794 let (_, key) = expire_at_key.as_ref().split_at(8);
795 key_expire_batch.remove(key);
796 expire_key_batch.remove(expire_at_key);
797 (key, key_type)
798 })
799 .collect();
800
801 for (key, key_type) in keys {
802 if let Err(e) = rmeove(key_type, key) {
803 log::error!("{:?}", e);
804 }
805 }
806
807 if let Err(e) = (&self.key_expire_tree, &self.expire_key_tree).transaction(
815 |(key_expire_tx, expire_key_tx)| {
816 key_expire_tx.apply_batch(&key_expire_batch)?;
817 expire_key_tx.apply_batch(&expire_key_batch)?;
818 Ok::<_, ConflictableTransactionError<()>>(())
819 },
820 ) {
821 log::error!("{:?}", e);
822 }
823 count
824 }
825
826 #[cfg(feature = "ttl")]
828 #[inline]
829 pub fn cleanup_kvs(&self, limit: usize) -> usize {
830 let mut count = 0;
831 let mut expire_at_key_types = Vec::new();
832 for item in self.expire_key_tree.iter() {
833 if count > limit {
834 break;
835 }
836 let (expire_at_key, key_type) = match item {
837 Ok(item) => item,
838 Err(e) => {
839 log::error!("{:?}", e);
840 break;
841 }
842 };
843
844 let (expire_at_bytes, _) = expire_at_key.as_ref().split_at(8);
845
846 let expire_at = match expire_at_bytes.try_into() {
847 Ok(at) => i64::from_be_bytes(at),
848 Err(e) => {
849 log::error!("{:?}", e);
850 break;
851 }
852 };
853
854 if expire_at > timestamp_millis() {
855 break;
856 }
857
858 let key_type = match KeyType::decode(key_type.as_ref()) {
859 Ok(key_type) => key_type,
860 Err(e) => {
861 log::error!("{:?}", e);
862 break;
863 }
864 };
865
866 if matches!(key_type, KeyType::KV) {
867 expire_at_key_types.push(expire_at_key);
868 count += 1;
869 }
870 }
871
872 let mut key_expire_batch = sled::Batch::default();
873 let mut expire_key_batch = sled::Batch::default();
874 let mut keys = Batch::default();
875 for expire_at_key in expire_at_key_types {
876 let (_, key) = expire_at_key.as_ref().split_at(8);
877 key_expire_batch.remove(key);
878 expire_key_batch.remove(expire_at_key.as_ref());
879 keys.remove(key);
880 }
881
882 if let Err(e) = (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree).transaction(
883 |(kv_tx, key_expire_tx, expire_key_tx)| {
884 kv_tx.apply_batch(&keys)?;
885 key_expire_tx.apply_batch(&key_expire_batch)?;
886 expire_key_tx.apply_batch(&expire_key_batch)?;
887 Ok::<_, ConflictableTransactionError<()>>(())
888 },
889 ) {
890 log::error!("{:?}", e);
891 }
892 count
893 }
894
895 #[inline]
897 pub fn active_count(&self) -> isize {
898 self.active_count.load(Ordering::Relaxed)
899 }
900
901 #[inline]
913 fn make_map_prefix_name<K>(name: K) -> Key
914 where
915 K: AsRef<[u8]>,
916 {
917 [MAP_NAME_PREFIX, name.as_ref(), SEPARATOR].concat()
918 }
919
920 #[inline]
922 fn make_map_item_prefix_name<K>(name: K) -> Key
923 where
924 K: AsRef<[u8]>,
925 {
926 [MAP_NAME_PREFIX, name.as_ref(), MAP_KEY_SEPARATOR].concat()
927 }
928
929 #[inline]
931 fn make_map_count_key_name<K>(name: K) -> Key
932 where
933 K: AsRef<[u8]>,
934 {
935 [MAP_NAME_PREFIX, name.as_ref(), MAP_KEY_COUNT_SUFFIX].concat()
936 }
937
938 #[inline]
940 fn map_count_key_to_name(key: &[u8]) -> &[u8] {
941 key[MAP_NAME_PREFIX.len()..key.as_ref().len() - MAP_KEY_COUNT_SUFFIX.len()].as_ref()
942 }
943
944 #[inline]
946 fn is_map_count_key(key: &[u8]) -> bool {
947 key.starts_with(MAP_NAME_PREFIX) && key.ends_with(MAP_KEY_COUNT_SUFFIX)
948 }
949
950 #[allow(dead_code)]
952 #[inline]
953 fn map_item_key_to_name(key: &[u8]) -> Option<&[u8]> {
954 use super::storage::SplitSubslice;
955 if let Some((prefix, _)) = key.split_subslice(MAP_KEY_SEPARATOR) {
956 if prefix.starts_with(MAP_NAME_PREFIX) {
957 return Some(
958 prefix[MAP_NAME_PREFIX.len()..(prefix.len() - MAP_KEY_SEPARATOR.len())]
959 .as_ref(),
960 );
961 }
962 }
963 None
964 }
965
966 #[inline]
968 fn make_list_prefix<K>(name: K) -> Key
969 where
970 K: AsRef<[u8]>,
971 {
972 [LIST_NAME_PREFIX, name.as_ref()].concat()
973 }
974
975 #[inline]
977 fn make_list_count_key(name: &[u8]) -> Vec<u8> {
978 [LIST_NAME_PREFIX, name, LIST_KEY_COUNT_SUFFIX].concat()
979 }
980
981 #[inline]
983 fn list_count_key_to_name(key: &[u8]) -> &[u8] {
984 key[LIST_NAME_PREFIX.len()..key.as_ref().len() - LIST_KEY_COUNT_SUFFIX.len()].as_ref()
985 }
986
987 #[inline]
989 fn is_list_count_key(key: &[u8]) -> bool {
990 key.starts_with(LIST_NAME_PREFIX) && key.ends_with(LIST_KEY_COUNT_SUFFIX)
991 }
992
993 #[inline]
995 fn _contains_key<K: AsRef<[u8]> + Sync + Send>(
996 &self,
997 key: K,
998 key_type: KeyType,
999 ) -> Result<bool> {
1000 match key_type {
1001 KeyType::KV => Self::_kv_contains_key(&self.kv_tree, key),
1002 KeyType::Map => Self::_map_contains_key(&self.map_tree, key),
1003 KeyType::List => Self::_list_contains_key(&self.list_tree, key),
1004 }
1005 }
1006
1007 #[inline]
1009 fn _kv_contains_key<K: AsRef<[u8]> + Sync + Send>(kv: &Tree, key: K) -> Result<bool> {
1010 Ok(kv.contains_key(key.as_ref())?)
1011 }
1012
1013 #[inline]
1015 fn _map_contains_key<K: AsRef<[u8]> + Sync + Send>(tree: &Tree, key: K) -> Result<bool> {
1016 let count_key = SledStorageDB::make_map_count_key_name(key.as_ref());
1017 Ok(tree.contains_key(count_key)?)
1018 }
1019
1020 #[inline]
1022 fn _list_contains_key<K: AsRef<[u8]> + Sync + Send>(tree: &Tree, name: K) -> Result<bool> {
1023 let count_key = SledStorageDB::make_list_count_key(name.as_ref());
1024 Ok(tree.contains_key(count_key)?)
1025 }
1026
1027 #[inline]
1029 fn _map_remove<K>(&self, key: K) -> Result<()>
1030 where
1031 K: AsRef<[u8]>,
1032 {
1033 #[cfg(not(feature = "ttl"))]
1034 self._map(key.as_ref())._clear()?;
1035 #[cfg(feature = "ttl")]
1036 {
1037 let map = self._map(key.as_ref());
1038 let map_clear_batch = map._make_clear_batch();
1039 (&self.map_tree, &self.key_expire_tree, &self.expire_key_tree)
1040 .transaction(|(map_tx, key_expire_tx, expire_key_tx)| {
1041 map._tx_clear(map_tx, &map_clear_batch)?;
1042 Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key.as_ref())?;
1043 Ok::<(), ConflictableTransactionError<()>>(())
1044 })
1045 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1046 }
1047 Ok(())
1048 }
1049
1050 #[inline]
1052 fn _list_remove<K>(&self, key: K) -> Result<()>
1053 where
1054 K: AsRef<[u8]>,
1055 {
1056 #[cfg(not(feature = "ttl"))]
1057 self._list(key.as_ref())._clear()?;
1058 #[cfg(feature = "ttl")]
1059 {
1060 let list = self._list(key.as_ref());
1061 let list_clear_batch = list._make_clear_batch();
1062 (
1063 &self.list_tree,
1064 &self.key_expire_tree,
1065 &self.expire_key_tree,
1066 )
1067 .transaction(|(list_tx, key_expire_tx, expire_key_tx)| {
1068 SledStorageList::_tx_clear(list_tx, &list_clear_batch)?;
1069 Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key.as_ref())?;
1070 Ok::<(), ConflictableTransactionError<()>>(())
1071 })
1072 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1073 }
1074 Ok(())
1075 }
1076
1077 #[inline]
1079 fn _kv_remove<K>(&self, key: K) -> Result<()>
1080 where
1081 K: AsRef<[u8]>,
1082 {
1083 #[cfg(not(feature = "ttl"))]
1084 self.kv_tree.remove(key.as_ref())?;
1085 #[cfg(feature = "ttl")]
1086 {
1087 (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1088 .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1089 kv_tx.remove(key.as_ref())?;
1090 Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key.as_ref())?;
1091 Ok::<(), ConflictableTransactionError<()>>(())
1092 })
1093 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1094 }
1095 Ok(())
1096 }
1097
1098 #[cfg(feature = "ttl")]
1100 #[inline]
1101 fn _remove_expire_key(&self, key: &[u8]) -> Result<()> {
1102 if let Some(expire_at_bytes) = self.key_expire_tree.get(key)? {
1103 self.key_expire_tree.remove(key)?;
1104 let expire_key = [expire_at_bytes.as_ref(), key].concat();
1105 self.expire_key_tree.remove(expire_key.as_slice())?;
1106 }
1107 Ok(())
1108 }
1109
1110 #[cfg(feature = "ttl")]
1112 #[inline]
1113 fn _tx_remove_expire_key(
1114 key_expire_tx: &TransactionalTree,
1115 expire_key_tx: &TransactionalTree,
1116 key: &[u8],
1117 ) -> ConflictableTransactionResult<()> {
1118 if let Some(expire_at_bytes) = key_expire_tx.get(key)? {
1119 key_expire_tx.remove(key)?;
1120 let expire_key = [expire_at_bytes.as_ref(), key].concat();
1121 expire_key_tx.remove(expire_key.as_slice())?;
1122 }
1123 Ok(())
1124 }
1125
1126 #[inline]
1128 fn _is_expired<K, F>(&self, _key: K, _contains_key_f: F) -> Result<bool>
1129 where
1130 K: AsRef<[u8]> + Sync + Send,
1131 F: Fn(&[u8]) -> Result<bool>,
1132 {
1133 #[cfg(feature = "ttl")]
1134 {
1135 if let Some((expire_at, _)) = self._ttl_at(_key, _contains_key_f)? {
1136 Ok(timestamp_millis() >= expire_at)
1137 } else {
1138 Ok(true)
1139 }
1140 }
1141 #[cfg(not(feature = "ttl"))]
1142 Ok(false)
1143 }
1144
1145 #[inline]
1147 fn _ttl<K, F>(
1148 &self,
1149 key: K,
1150 contains_key_f: F,
1151 ) -> Result<Option<(TimestampMillis, Option<IVec>)>>
1152 where
1153 K: AsRef<[u8]> + Sync + Send,
1154 F: Fn(&[u8]) -> Result<bool>,
1155 {
1156 Ok(self
1157 ._ttl_at(key, contains_key_f)?
1158 .map(|(expire_at, at_bytes)| (expire_at - timestamp_millis(), at_bytes)))
1159 }
1160
1161 #[inline]
1163 fn _ttl_at<K, F>(
1164 &self,
1165 c_key: K,
1166 contains_key_f: F,
1167 ) -> Result<Option<(TimestampMillis, Option<IVec>)>>
1168 where
1169 K: AsRef<[u8]> + Sync + Send,
1170 F: Fn(&[u8]) -> Result<bool>,
1171 {
1172 let ttl_res = match self.key_expire_tree.get(c_key.as_ref()) {
1173 Ok(Some(at_bytes)) => {
1174 if contains_key_f(c_key.as_ref())? {
1175 Ok(Some((
1176 TimestampMillis::from_be_bytes(at_bytes.as_ref().try_into()?),
1177 Some(at_bytes),
1178 )))
1179 } else {
1180 Ok(None)
1181 }
1182 }
1183 Ok(None) => {
1184 if contains_key_f(c_key.as_ref())? {
1185 Ok(Some((TimestampMillis::MAX, None)))
1186 } else {
1187 Ok(None)
1188 }
1189 }
1190 Err(e) => Err(anyhow!(e)),
1191 }?;
1192 Ok(ttl_res)
1193 }
1194
1195 #[inline]
1197 fn _insert(&self, key: &[u8], val: &[u8]) -> Result<()> {
1198 #[cfg(not(feature = "ttl"))]
1199 self.kv_tree.insert(key, val)?;
1200 #[cfg(feature = "ttl")]
1201 {
1202 (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1203 .transaction(|(kv_tx, key_expire_tx, expire_keys_tx)| {
1204 kv_tx.insert(key, val)?;
1205 Self::_tx_remove_expire_key(key_expire_tx, expire_keys_tx, key)?;
1206 Ok::<(), ConflictableTransactionError<()>>(())
1207 })
1208 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1209 }
1210 Ok(())
1211 }
1212
1213 #[inline]
1215 fn _get(&self, key: &[u8]) -> Result<Option<IVec>> {
1216 let res = if self._is_expired(key.as_ref(), |k| Self::_kv_contains_key(&self.kv_tree, k))? {
1217 None
1218 } else {
1219 self.kv_tree.get(key)?
1220 };
1221 Ok(res)
1222 }
1223
1224 #[inline]
1226 fn _self_map_contains_key(&self, key: &[u8]) -> Result<bool> {
1227 #[cfg(feature = "ttl")]
1228 {
1229 if self._is_expired(key, |k| Self::_map_contains_key(&self.map_tree, k))? {
1230 Ok(false)
1231 } else {
1232 Ok(true)
1234 }
1235 }
1236
1237 #[cfg(not(feature = "ttl"))]
1238 Self::_map_contains_key(&self.map_tree, key)
1239 }
1240
1241 #[inline]
1243 fn _self_list_contains_key(&self, key: &[u8]) -> Result<bool> {
1244 #[cfg(feature = "ttl")]
1245 {
1246 let this = self;
1247 if this._is_expired(key, |k| Self::_list_contains_key(&self.list_tree, k))? {
1248 Ok(false)
1249 } else {
1250 Ok(true)
1252 }
1253 }
1254
1255 #[cfg(not(feature = "ttl"))]
1256 Self::_list_contains_key(&self.list_tree, key)
1257 }
1258
1259 #[inline]
1261 fn _batch_insert(&self, key_vals: Vec<(Key, IVec)>) -> Result<()> {
1262 if key_vals.is_empty() {
1263 return Ok(());
1264 }
1265
1266 let mut batch = Batch::default();
1267 for (k, v) in key_vals.iter() {
1268 batch.insert(k.as_slice(), v.as_ref());
1269 }
1270
1271 let this = self;
1272 #[cfg(not(feature = "ttl"))]
1273 this.kv_tree.apply_batch(batch)?;
1274
1275 #[cfg(feature = "ttl")]
1276 {
1277 let mut remove_key_expire_batch = Batch::default();
1278 let mut remove_expire_key_batch = Batch::default();
1279 for (k, _) in key_vals.iter() {
1280 if let Some((expire_at, Some(expire_at_bytes))) =
1281 this._ttl(k.as_slice(), |k| Self::_kv_contains_key(&self.kv_tree, k))?
1282 {
1283 if expire_at <= 0 {
1284 remove_key_expire_batch.remove(k.as_slice());
1285 let expire_key = [expire_at_bytes.as_ref(), k.as_slice()].concat();
1286 remove_expire_key_batch.remove(expire_key.as_slice())
1287 }
1288 }
1289 }
1290
1291 (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1295 .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1296 key_expire_tx.apply_batch(&remove_key_expire_batch)?;
1297 expire_key_tx.apply_batch(&remove_expire_key_batch)?;
1298 kv_tx.apply_batch(&batch)?;
1299 Ok::<(), ConflictableTransactionError<()>>(())
1300 })
1301 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1302 }
1303 Ok(())
1304 }
1305
1306 #[inline]
1308 fn _batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1309 if keys.is_empty() {
1310 return Ok(());
1311 }
1312
1313 let mut batch = Batch::default();
1314 for k in keys.iter() {
1315 batch.remove(k.as_slice());
1316 }
1317 #[cfg(not(feature = "ttl"))]
1318 self.kv_tree.apply_batch(batch)?;
1319
1320 #[cfg(feature = "ttl")]
1321 {
1322 let mut remove_key_expire_batch = Batch::default();
1323 let mut remove_expire_key_batch = Batch::default();
1324 for k in keys.iter() {
1325 if let Some(expire_at_bytes) = self.key_expire_tree.get(k)? {
1326 remove_key_expire_batch.remove(k.as_slice());
1327 let expire_key = [expire_at_bytes.as_ref(), k.as_slice()].concat();
1328 remove_expire_key_batch.remove(expire_key.as_slice())
1329 }
1330 }
1331 (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1335 .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1336 key_expire_tx.apply_batch(&remove_key_expire_batch)?;
1337 expire_key_tx.apply_batch(&remove_expire_key_batch)?;
1338 kv_tx.apply_batch(&batch)?;
1339 Ok::<(), ConflictableTransactionError<()>>(())
1340 })
1341 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1342 }
1343
1344 Ok(())
1345 }
1346
1347 #[inline]
1349 fn _counter_incr(&self, key: &[u8], increment: isize) -> Result<()> {
1350 self.kv_tree.fetch_and_update(key, |old: Option<&[u8]>| {
1351 let number = match old {
1352 Some(bytes) => {
1353 if let Ok(array) = bytes.try_into() {
1354 let number = isize::from_be_bytes(array);
1355 number + increment
1356 } else {
1357 increment
1358 }
1359 }
1360 None => increment,
1361 };
1362 Some(number.to_be_bytes().to_vec())
1363 })?;
1364 Ok(())
1365 }
1366
1367 #[inline]
1369 fn _counter_decr(&self, key: &[u8], decrement: isize) -> Result<()> {
1370 self.kv_tree.fetch_and_update(key, |old: Option<&[u8]>| {
1371 let number = match old {
1372 Some(bytes) => {
1373 if let Ok(array) = bytes.try_into() {
1374 let number = isize::from_be_bytes(array);
1375 number - decrement
1376 } else {
1377 -decrement
1378 }
1379 }
1380 None => -decrement,
1381 };
1382 Some(number.to_be_bytes().to_vec())
1383 })?;
1384 Ok(())
1385 }
1386
1387 #[inline]
1389 fn _counter_get(&self, key: &[u8]) -> Result<Option<isize>> {
1390 let this = self;
1391 if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? {
1392 Ok(None)
1393 } else if let Some(v) = this.kv_tree.get(key)? {
1394 Ok(Some(isize::from_be_bytes(v.as_ref().try_into()?)))
1395 } else {
1396 Ok(None)
1397 }
1398 }
1399
1400 #[inline]
1402 fn _counter_set(&self, key: &[u8], val: isize) -> Result<()> {
1403 let val = val.to_be_bytes().to_vec();
1404
1405 #[cfg(not(feature = "ttl"))]
1406 self.kv_tree.insert(key, val.as_slice())?;
1407 #[cfg(feature = "ttl")]
1408 {
1409 (&self.kv_tree, &self.key_expire_tree, &self.expire_key_tree)
1412 .transaction(|(kv_tx, key_expire_tx, expire_key_tx)| {
1413 Self::_tx_remove_expire_key(key_expire_tx, expire_key_tx, key)?;
1414 kv_tx.insert(key, val.as_slice())?;
1415 Ok::<(), ConflictableTransactionError<()>>(())
1416 })
1417 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1418 }
1419 Ok(())
1420 }
1421
1422 #[inline]
1424 fn _self_contains_key(&self, key: &[u8]) -> Result<bool> {
1425 #[cfg(feature = "ttl")]
1426 {
1427 let this = self;
1428 if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? {
1429 Ok(false)
1430 } else {
1431 Ok(true)
1433 }
1434 }
1435 #[cfg(not(feature = "ttl"))]
1436 Self::_kv_contains_key(&self.kv_tree, key)
1437 }
1438
1439 #[inline]
1441 #[cfg(feature = "ttl")]
1442 fn _expire_at(&self, key: &[u8], at: TimestampMillis, key_type: KeyType) -> Result<bool> {
1443 if self._contains_key(key, key_type)? {
1444 let res = (&self.key_expire_tree, &self.expire_key_tree)
1445 .transaction(|(key_expire_tx, expire_key_tx)| {
1446 Self::_tx_expire_at(key_expire_tx, expire_key_tx, key, at, key_type)
1447 })
1448 .map_err(|e| anyhow!(format!("{:?}", e)))?;
1449 Ok(res)
1450 } else {
1451 Ok(false)
1452 }
1453 }
1454
1455 #[inline]
1457 #[cfg(feature = "ttl")]
1458 fn _tx_expire_at(
1459 key_expire_tx: &TransactionalTree,
1460 expire_key_tx: &TransactionalTree,
1461 key: &[u8],
1462 at: TimestampMillis,
1463 key_type: KeyType,
1464 ) -> ConflictableTransactionResult<bool> {
1465 let at_bytes = at.to_be_bytes();
1466 key_expire_tx.insert(key, at_bytes.as_slice())?;
1467 let res = expire_key_tx
1468 .insert([at_bytes.as_ref(), key].concat(), key_type.encode())
1469 .map(|_| true)?;
1470 Ok(res)
1471 }
1472
1473 #[inline]
1475 #[cfg(feature = "ttl")]
1476 fn _self_ttl(&self, key: &[u8]) -> Result<Option<TimestampMillis>> {
1477 Ok(self
1478 ._ttl(key, |k| Self::_kv_contains_key(&self.kv_tree, k))?
1479 .and_then(|(ttl, _)| if ttl > 0 { Some(ttl) } else { None }))
1480 }
1481
1482 #[inline]
1484 fn _map_scan_prefix(&self) -> sled::Iter {
1485 self.map_tree.scan_prefix(MAP_NAME_PREFIX)
1486 }
1487
1488 #[inline]
1490 fn _list_scan_prefix(&self) -> sled::Iter {
1491 self.list_tree.scan_prefix(LIST_NAME_PREFIX)
1492 }
1493
1494 #[inline]
1496 fn _db_scan_prefix(&self, pattern: Vec<u8>) -> sled::Iter {
1497 let mut last_esc_char = false;
1498 let mut has_esc_char = false;
1499 let start_pattern = pattern
1500 .splitn(2, |x| {
1501 if !last_esc_char && (*x == b'*' || *x == b'?') {
1502 true
1503 } else {
1504 last_esc_char = *x == b'\\';
1505 if last_esc_char && !has_esc_char {
1506 has_esc_char = true;
1507 }
1508 false
1509 }
1510 })
1511 .next();
1512 let start_pattern = if has_esc_char {
1513 start_pattern.map(|start_pattern| {
1514 Cow::Owned(
1515 start_pattern
1516 .replace(b"\\*", b"*")
1517 .as_slice()
1518 .replace(b"\\?", b"?"),
1519 )
1520 })
1521 } else {
1522 start_pattern.map(Cow::Borrowed)
1523 };
1524 let iter = if let Some(start_pattern) = start_pattern {
1525 self.kv_tree.scan_prefix(start_pattern.as_ref())
1526 } else {
1527 self.kv_tree.iter()
1528 };
1529 iter
1530 }
1531
1532 #[inline]
1534 fn _kv_len(&self) -> usize {
1535 #[cfg(feature = "ttl")]
1536 {
1537 let limit = 500;
1538 loop {
1539 if self.cleanup_kvs(limit) < limit {
1540 break;
1541 }
1542 }
1543 }
1544 self.kv_tree.len()
1545 }
1546
1547 #[inline]
1549 fn _db_size(&self) -> usize {
1550 self.db.len() + self.kv_tree.len() + self.map_tree.len() + self.list_tree.len()
1551 }
1552
1553 #[inline]
1555 async fn cmd_send(&self, cmd: Command) -> Result<()> {
1556 self.active_count.fetch_add(1, Ordering::Relaxed);
1557 if let Err(e) = self.cmd_tx.send(cmd).await {
1558 self.active_count.fetch_sub(1, Ordering::Relaxed);
1559 Err(anyhow!(e))
1560 } else {
1561 Ok(())
1562 }
1563 }
1564
1565 #[inline]
1567 fn _map<N: AsRef<[u8]>>(&self, name: N) -> SledStorageMap {
1568 SledStorageMap::_new(name.as_ref().to_vec(), self.clone())
1569 }
1570
1571 #[inline]
1573 fn _list<V: AsRef<[u8]>>(&self, name: V) -> SledStorageList {
1574 SledStorageList::_new(name.as_ref().to_vec(), self.clone())
1575 }
1576}
1577
1578#[async_trait]
1579impl StorageDB for SledStorageDB {
1580 type MapType = SledStorageMap;
1581 type ListType = SledStorageList;
1582
1583 #[inline]
1585 async fn map<N: AsRef<[u8]> + Sync + Send>(
1586 &self,
1587 name: N,
1588 expire: Option<TimestampMillis>,
1589 ) -> Result<Self::MapType> {
1590 SledStorageMap::new_expire(name.as_ref().to_vec(), expire, self.clone()).await
1591 }
1592
1593 #[inline]
1595 async fn map_remove<K>(&self, name: K) -> Result<()>
1596 where
1597 K: AsRef<[u8]> + Sync + Send,
1598 {
1599 let (tx, rx) = oneshot::channel();
1600 self.cmd_send(Command::DBMapRemove(self.clone(), name.as_ref().into(), tx))
1601 .await?;
1602 rx.await??;
1603 Ok(())
1604 }
1605
1606 #[inline]
1608 async fn map_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1609 let (tx, rx) = oneshot::channel();
1610 self.cmd_send(Command::DBMapContainsKey(
1611 self.clone(),
1612 key.as_ref().into(),
1613 tx,
1614 ))
1615 .await?;
1616 Ok(rx.await??)
1617 }
1618
1619 #[inline]
1621 async fn list<V: AsRef<[u8]> + Sync + Send>(
1622 &self,
1623 name: V,
1624 expire: Option<TimestampMillis>,
1625 ) -> Result<Self::ListType> {
1626 SledStorageList::new_expire(name.as_ref().to_vec(), expire, self.clone()).await
1627 }
1628
1629 #[inline]
1631 async fn list_remove<K>(&self, name: K) -> Result<()>
1632 where
1633 K: AsRef<[u8]> + Sync + Send,
1634 {
1635 let (tx, rx) = oneshot::channel();
1636 self.cmd_send(Command::DBListRemove(
1637 self.clone(),
1638 name.as_ref().into(),
1639 tx,
1640 ))
1641 .await?;
1642 rx.await??;
1643 Ok(())
1644 }
1645
1646 #[inline]
1648 async fn list_contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1649 let (tx, rx) = oneshot::channel();
1650 self.cmd_send(Command::DBListContainsKey(
1651 self.clone(),
1652 key.as_ref().into(),
1653 tx,
1654 ))
1655 .await?;
1656 Ok(rx.await??)
1657 }
1658
1659 #[inline]
1661 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
1662 where
1663 K: AsRef<[u8]> + Sync + Send,
1664 V: serde::ser::Serialize + Sync + Send,
1665 {
1666 let val = bincode::serialize(val)?;
1667 let (tx, rx) = oneshot::channel();
1668 self.cmd_send(Command::DBInsert(
1669 self.clone(),
1670 key.as_ref().to_vec(),
1671 val,
1672 tx,
1673 ))
1674 .await?;
1675 rx.await??;
1676 Ok(())
1677 }
1678
1679 #[inline]
1681 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
1682 where
1683 K: AsRef<[u8]> + Sync + Send,
1684 V: DeserializeOwned + Sync + Send,
1685 {
1686 let (tx, rx) = oneshot::channel();
1687 self.cmd_send(Command::DBGet(self.clone(), key.as_ref().into(), tx))
1688 .await?;
1689 match rx.await?? {
1690 Some(v) => Ok(Some(bincode::deserialize::<V>(v.as_ref())?)),
1691 None => Ok(None),
1692 }
1693 }
1694
1695 #[inline]
1697 async fn remove<K>(&self, key: K) -> Result<()>
1698 where
1699 K: AsRef<[u8]> + Sync + Send,
1700 {
1701 let (tx, rx) = oneshot::channel();
1702 self.cmd_send(Command::DBRemove(self.clone(), key.as_ref().into(), tx))
1703 .await?;
1704 rx.await??;
1705 Ok(())
1706 }
1707
1708 #[inline]
1710 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
1711 where
1712 V: Serialize + Sync + Send,
1713 {
1714 if key_vals.is_empty() {
1715 return Ok(());
1716 }
1717
1718 let key_vals = key_vals
1719 .into_iter()
1720 .map(|(k, v)| {
1721 bincode::serialize(&v)
1722 .map(|v| (k, v.into()))
1723 .map_err(|e| anyhow!(e))
1724 })
1725 .collect::<Result<Vec<_>>>()?;
1726
1727 let (tx, rx) = oneshot::channel();
1728 self.cmd_send(Command::DBBatchInsert(self.clone(), key_vals, tx))
1729 .await?;
1730 Ok(rx.await??)
1731 }
1732
1733 #[inline]
1735 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
1736 if keys.is_empty() {
1737 return Ok(());
1738 }
1739
1740 let (tx, rx) = oneshot::channel();
1741 self.cmd_send(Command::DBBatchRemove(self.clone(), keys, tx))
1742 .await?;
1743 Ok(rx.await??)
1744 }
1745
1746 #[inline]
1748 async fn counter_incr<K>(&self, key: K, increment: isize) -> Result<()>
1749 where
1750 K: AsRef<[u8]> + Sync + Send,
1751 {
1752 let (tx, rx) = oneshot::channel();
1753 self.cmd_send(Command::DBCounterIncr(
1754 self.clone(),
1755 key.as_ref().into(),
1756 increment,
1757 tx,
1758 ))
1759 .await?;
1760 Ok(rx.await??)
1761 }
1762
1763 #[inline]
1765 async fn counter_decr<K>(&self, key: K, decrement: isize) -> Result<()>
1766 where
1767 K: AsRef<[u8]> + Sync + Send,
1768 {
1769 let (tx, rx) = oneshot::channel();
1770 self.cmd_send(Command::DBCounterDecr(
1771 self.clone(),
1772 key.as_ref().into(),
1773 decrement,
1774 tx,
1775 ))
1776 .await?;
1777 Ok(rx.await??)
1778 }
1779
1780 #[inline]
1782 async fn counter_get<K>(&self, key: K) -> Result<Option<isize>>
1783 where
1784 K: AsRef<[u8]> + Sync + Send,
1785 {
1786 let (tx, rx) = oneshot::channel();
1787 self.cmd_send(Command::DBCounterGet(self.clone(), key.as_ref().into(), tx))
1788 .await?;
1789 Ok(rx.await??)
1790 }
1791
1792 #[inline]
1794 async fn counter_set<K>(&self, key: K, val: isize) -> Result<()>
1795 where
1796 K: AsRef<[u8]> + Sync + Send,
1797 {
1798 let (tx, rx) = oneshot::channel();
1799 self.cmd_send(Command::DBCounterSet(
1800 self.clone(),
1801 key.as_ref().into(),
1802 val,
1803 tx,
1804 ))
1805 .await?;
1806 Ok(rx.await??)
1807 }
1808
1809 #[inline]
1811 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
1812 let (tx, rx) = oneshot::channel();
1813 self.cmd_send(Command::DBContainsKey(
1814 self.clone(),
1815 key.as_ref().into(),
1816 tx,
1817 ))
1818 .await?;
1819 Ok(rx.await??)
1820 }
1821
1822 #[inline]
1824 #[cfg(feature = "len")]
1825 async fn len(&self) -> Result<usize> {
1826 let (tx, rx) = oneshot::channel();
1827 self.cmd_send(Command::DBLen(self.clone(), tx)).await?;
1828 Ok(rx.await?)
1829 }
1830
1831 #[inline]
1833 async fn db_size(&self) -> Result<usize> {
1834 let (tx, rx) = oneshot::channel();
1835 self.cmd_send(Command::DBSize(self.clone(), tx)).await?;
1836 Ok(rx.await?)
1837 }
1838
1839 #[inline]
1841 #[cfg(feature = "ttl")]
1842 async fn expire_at<K>(&self, key: K, at: TimestampMillis) -> Result<bool>
1843 where
1844 K: AsRef<[u8]> + Sync + Send,
1845 {
1846 let (tx, rx) = oneshot::channel();
1847 self.cmd_send(Command::DBExpireAt(
1848 self.clone(),
1849 key.as_ref().into(),
1850 at,
1851 tx,
1852 ))
1853 .await?;
1854 Ok(rx.await??)
1855 }
1856
1857 #[inline]
1859 #[cfg(feature = "ttl")]
1860 async fn expire<K>(&self, key: K, dur: TimestampMillis) -> Result<bool>
1861 where
1862 K: AsRef<[u8]> + Sync + Send,
1863 {
1864 let at = timestamp_millis() + dur;
1865 self.expire_at(key, at).await
1866 }
1867
1868 #[inline]
1870 #[cfg(feature = "ttl")]
1871 async fn ttl<K>(&self, key: K) -> Result<Option<TimestampMillis>>
1872 where
1873 K: AsRef<[u8]> + Sync + Send,
1874 {
1875 let (tx, rx) = oneshot::channel();
1876 self.cmd_send(Command::DBTtl(self.clone(), key.as_ref().into(), tx))
1877 .await?;
1878 Ok(rx.await??)
1879 }
1880
1881 #[inline]
1883 async fn map_iter<'a>(
1884 &'a mut self,
1885 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageMap>> + Send + 'a>> {
1886 let (tx, rx) = oneshot::channel();
1887 self.cmd_send(Command::DBMapPrefixIter(self.clone(), tx))
1888 .await?;
1889 let iter = rx.await?;
1890 let iter = Box::new(AsyncMapIter::new(self, iter));
1891 Ok(iter)
1892 }
1893
1894 #[inline]
1896 async fn list_iter<'a>(
1897 &'a mut self,
1898 ) -> Result<Box<dyn AsyncIterator<Item = Result<StorageList>> + Send + 'a>> {
1899 let (tx, rx) = oneshot::channel();
1900 self.cmd_send(Command::DBListPrefixIter(self.clone(), tx))
1901 .await?;
1902 let iter = rx.await?;
1903 let iter = Box::new(AsyncListIter {
1904 db: self,
1905 iter: Some(iter),
1906 });
1907 Ok(iter)
1908 }
1909
1910 async fn scan<'a, P>(
1912 &'a mut self,
1913 pattern: P,
1914 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>>
1915 where
1916 P: AsRef<[u8]> + Send + Sync,
1917 {
1918 let pattern = pattern.as_ref();
1919 let (tx, rx) = oneshot::channel();
1920 self.cmd_send(Command::DBScanIter(self.clone(), pattern.to_vec(), tx))
1921 .await?;
1922 let iter = rx.await?;
1923 let pattern = Pattern::from(pattern);
1924 let iter = Box::new(AsyncDbKeyIter {
1925 db: self,
1926 pattern,
1927 iter: Some(iter),
1928 });
1929 Ok(iter)
1930 }
1931
1932 #[inline]
1934 async fn info(&self) -> Result<Value> {
1935 let active_count = self.active_count.load(Ordering::Relaxed);
1936 Ok(spawn_blocking(move || {
1938 serde_json::json!({
1979 "storage_engine": "Sled",
1980 "active_count": active_count,
1981 })
1989 })
1990 .await?)
1991 }
1992}
1993
1994#[derive(Clone)]
1996pub struct SledStorageMap {
1997 name: Key,
1999 map_prefix_name: Key,
2001 map_item_prefix_name: Key,
2003 map_count_key_name: Key,
2005 empty: Arc<AtomicBool>,
2007 pub(crate) db: SledStorageDB,
2009}
2010
2011impl SledStorageMap {
2012 #[inline]
2014 async fn new_expire(
2015 name: Key,
2016 expire_ms: Option<TimestampMillis>,
2017 db: SledStorageDB,
2018 ) -> Result<Self> {
2019 let (tx, rx) = oneshot::channel();
2020 db.cmd_send(Command::DBMapNew(db.clone(), name.into(), expire_ms, tx))
2021 .await?;
2022 rx.await?
2023 }
2024
2025 #[inline]
2027 fn _new_expire(
2028 name: Key,
2029 _expire_ms: Option<TimestampMillis>,
2030 db: SledStorageDB,
2031 ) -> Result<Self> {
2032 let m = Self::_new(name, db);
2033 m.empty.store(m._is_empty()?, Ordering::SeqCst);
2034 #[cfg(feature = "ttl")]
2035 if let Some(expire_ms) = _expire_ms.as_ref() {
2036 m._expire_at(timestamp_millis() + *expire_ms)?;
2037 }
2038 Ok(m)
2039 }
2040
2041 #[inline]
2043 fn _new(name: Key, db: SledStorageDB) -> Self {
2044 let map_prefix_name = SledStorageDB::make_map_prefix_name(name.as_slice());
2045 let map_item_prefix_name = SledStorageDB::make_map_item_prefix_name(name.as_slice());
2046 let map_count_key_name = SledStorageDB::make_map_count_key_name(name.as_slice());
2047 SledStorageMap {
2048 name,
2049 map_prefix_name,
2050 map_item_prefix_name,
2051 map_count_key_name,
2052 empty: Arc::new(AtomicBool::new(true)),
2053 db,
2054 }
2055 }
2056
2057 #[inline]
2059 fn tree(&self) -> &sled::Tree {
2060 &self.db.map_tree
2061 }
2062
2063 #[inline]
2065 fn make_map_item_key<K: AsRef<[u8]>>(&self, key: K) -> Key {
2066 [self.map_item_prefix_name.as_ref(), key.as_ref()].concat()
2067 }
2068
2069 #[cfg(feature = "map_len")]
2071 #[inline]
2072 fn _len_get(&self) -> Result<isize> {
2073 self._counter_get(self.map_count_key_name.as_slice())
2074 }
2075
2076 #[inline]
2078 fn _tx_counter_inc<K: AsRef<[u8]>>(
2079 tx: &TransactionalTree,
2080 key: K,
2081 ) -> ConflictableTransactionResult<()> {
2082 let val = match tx.get(key.as_ref())? {
2083 Some(data) => {
2084 if let Ok(array) = data.as_ref().try_into() {
2085 let number = isize::from_be_bytes(array);
2086 number + 1
2087 } else {
2088 1
2089 }
2090 }
2091 None => 1,
2092 };
2093 tx.insert(key.as_ref(), val.to_be_bytes().as_slice())?;
2094 Ok(())
2095 }
2096
2097 #[inline]
2099 fn _tx_counter_dec<K: AsRef<[u8]>>(
2100 tx: &TransactionalTree,
2101 key: K,
2102 ) -> ConflictableTransactionResult<()> {
2103 let val = match tx.get(key.as_ref())? {
2104 Some(data) => {
2105 if let Ok(array) = data.as_ref().try_into() {
2106 let number = isize::from_be_bytes(array);
2107 number - 1
2108 } else {
2109 -1
2110 }
2111 }
2112 None => -1,
2113 };
2114 if val > 0 {
2115 tx.insert(key.as_ref(), val.to_be_bytes().as_slice())?;
2116 } else {
2117 tx.remove(key.as_ref())?;
2118 }
2119 Ok(())
2120 }
2121
2122 #[inline]
2124 fn _tx_counter_get<K: AsRef<[u8]>, E>(
2125 tx: &TransactionalTree,
2126 key: K,
2127 ) -> ConflictableTransactionResult<isize, E> {
2128 if let Some(v) = tx.get(key)? {
2129 let c = match v.as_ref().try_into() {
2130 Ok(c) => c,
2131 Err(e) => {
2132 return Err(ConflictableTransactionError::Storage(sled::Error::Io(
2133 io::Error::new(ErrorKind::InvalidData, e),
2134 )))
2135 }
2136 };
2137 Ok(isize::from_be_bytes(c))
2138 } else {
2139 Ok(0)
2140 }
2141 }
2142
2143 #[inline]
2145 fn _tx_counter_set<K: AsRef<[u8]>, E>(
2146 tx: &TransactionalTree,
2147 key: K,
2148 val: isize,
2149 ) -> ConflictableTransactionResult<(), E> {
2150 tx.insert(key.as_ref(), val.to_be_bytes().as_slice())?;
2151 Ok(())
2152 }
2153
2154 #[inline]
2156 fn _tx_counter_remove<K: AsRef<[u8]>, E>(
2157 tx: &TransactionalTree,
2158 key: K,
2159 ) -> ConflictableTransactionResult<(), E> {
2160 tx.remove(key.as_ref())?;
2161 Ok(())
2162 }
2163
2164 #[inline]
2166 fn _counter_get<K: AsRef<[u8]>>(&self, key: K) -> Result<isize> {
2167 if let Some(v) = self.tree().get(key)? {
2168 Ok(isize::from_be_bytes(v.as_ref().try_into()?))
2169 } else {
2170 Ok(0)
2171 }
2172 }
2173
2174 #[inline]
2176 fn _counter_init(&self) -> Result<()> {
2177 let tree = self.tree();
2178 if !tree.contains_key(self.map_count_key_name.as_slice())? {
2179 tree.insert(
2180 self.map_count_key_name.as_slice(),
2181 0isize.to_be_bytes().as_slice(),
2182 )?;
2183 }
2184 Ok(())
2185 }
2186
2187 #[inline]
2189 fn _clear(&self) -> Result<()> {
2190 let batch = self._make_clear_batch();
2191 self.tree()
2192 .transaction(|tx| self._tx_clear(tx, &batch))
2193 .map_err(|e| anyhow!(format!("{:?}", e)))?;
2194 Ok(())
2195 }
2196
2197 #[inline]
2199 fn _tx_clear(
2200 &self,
2201 map_tree_tx: &TransactionalTree,
2202 batch: &Batch,
2203 ) -> ConflictableTransactionResult<()> {
2204 map_tree_tx.apply_batch(batch)?;
2205 self.empty.store(true, Ordering::SeqCst);
2206 Ok(())
2207 }
2208
2209 #[inline]
2211 fn _make_clear_batch(&self) -> Batch {
2212 let mut batch = Batch::default();
2213 for item in self.tree().scan_prefix(self.map_prefix_name.as_slice()) {
2215 match item {
2216 Ok((key, _)) => {
2217 batch.remove(key);
2218 }
2219 Err(e) => {
2220 log::warn!("{:?}", e);
2221 }
2222 }
2223 }
2224 batch
2225 }
2226
2227 #[inline]
2229 fn _insert(&self, key: IVec, val: IVec) -> Result<()> {
2230 let item_key = self.make_map_item_key(key.as_ref());
2231 let this = self;
2232 #[cfg(feature = "map_len")]
2233 {
2234 let count_key = this.map_count_key_name.as_slice();
2235 this.tree()
2236 .transaction(move |tx| {
2237 if tx.insert(item_key.as_slice(), val.as_ref())?.is_none() {
2238 Self::_tx_counter_inc(tx, count_key)?;
2239 }
2240 Ok(())
2241 })
2242 .map_err(|e| anyhow!(format!("{:?}", e)))?;
2243 }
2244 #[cfg(not(feature = "map_len"))]
2245 {
2246 if self.empty.load(Ordering::SeqCst) {
2247 self._counter_init()?;
2248 self.empty.store(false, Ordering::SeqCst)
2249 }
2250 this.tree().insert(item_key.as_slice(), val.as_ref())?;
2251 }
2252
2253 #[cfg(feature = "ttl")]
2254 {
2255 if this.db._is_expired(this.name.as_slice(), |k| {
2256 SledStorageDB::_map_contains_key(this.tree(), k)
2257 })? {
2258 (&self.db.key_expire_tree, &self.db.expire_key_tree)
2260 .transaction(|(key_expire_tx, expire_key_tx)| {
2261 SledStorageDB::_tx_remove_expire_key(
2262 key_expire_tx,
2263 expire_key_tx,
2264 this.name.as_slice(),
2265 )?;
2266 Ok::<(), ConflictableTransactionError<()>>(())
2267 })
2268 .map_err(|e| anyhow!(format!("{:?}", e)))?;
2269 }
2270 }
2271
2272 Ok(())
2273 }
2274
2275 #[inline]
2277 fn _get(&self, key: IVec) -> Result<Option<IVec>> {
2278 let this = self;
2279 let item_key = self.make_map_item_key(key.as_ref());
2280 let res = if !this.db._is_expired(this.name.as_slice(), |k| {
2281 SledStorageDB::_map_contains_key(this.tree(), k)
2282 })? {
2283 this.tree().get(item_key).map_err(|e| anyhow!(e))?
2284 } else {
2285 None
2286 };
2287 Ok(res)
2288 }
2289
2290 #[inline]
2292 fn _remove(&self, key: IVec) -> Result<()> {
2293 let tree = self.tree();
2294 let key = self.make_map_item_key(key.as_ref());
2295
2296 #[cfg(feature = "map_len")]
2297 {
2298 let count_key = self.map_count_key_name.to_vec();
2299 tree.transaction(move |tx| {
2300 if tx.remove(key.as_slice())?.is_some() {
2301 Self::_tx_counter_dec(tx, count_key.as_slice())?;
2302 }
2303 Ok(())
2304 })
2305 .map_err(|e| anyhow!(format!("{:?}", e)))?;
2306 }
2307
2308 #[cfg(not(feature = "map_len"))]
2309 {
2310 tree.remove(key.as_slice())?;
2311 }
2312
2313 Ok(())
2314 }
2315
2316 #[inline]
2318 fn _contains_key(&self, key: IVec) -> Result<bool> {
2319 let key = self.make_map_item_key(key.as_ref());
2320 Ok(self.tree().contains_key(key)?)
2321 }
2322
2323 #[cfg(feature = "map_len")]
2325 #[inline]
2326 fn _len(&self) -> Result<usize> {
2327 let this = self;
2328 let len = {
2329 if this.db._is_expired(this.name.as_slice(), |k| {
2330 SledStorageDB::_map_contains_key(this.tree(), k)
2331 })? {
2332 Ok(0)
2333 } else {
2334 this._len_get()
2335 }
2336 }?;
2337 Ok(len as usize)
2338 }
2339
2340 #[inline]
2342 fn _is_empty(&self) -> Result<bool> {
2343 let this = self;
2344 let res = {
2345 if this.db._is_expired(this.name.as_slice(), |k| {
2346 SledStorageDB::_map_contains_key(this.tree(), k)
2347 })? {
2348 true
2349 } else {
2350 self.tree()
2351 .scan_prefix(self.map_item_prefix_name.as_slice())
2352 .next()
2353 .is_none()
2354 }
2355 };
2356 Ok(res)
2357 }
2358
2359 #[inline]
2361 fn _remove_and_fetch(&self, key: IVec) -> Result<Option<IVec>> {
2362 let key = self.make_map_item_key(key.as_ref());
2363 let this = self;
2364 let removed = {
2365 if this.db._is_expired(this.name.as_slice(), |k| {
2366 SledStorageDB::_map_contains_key(this.tree(), k)
2367 })? {
2368 Ok(None)
2369 } else {
2370 #[cfg(feature = "map_len")]
2371 {
2372 let count_key = this.map_count_key_name.to_vec();
2373 this.tree().transaction(move |tx| {
2374 if let Some(removed) = tx.remove(key.as_slice())? {
2375 Self::_tx_counter_dec(tx, count_key.as_slice())?;
2376 Ok(Some(removed))
2377 } else {
2378 Ok(None)
2379 }
2380 })
2381 }
2382 #[cfg(not(feature = "map_len"))]
2383 {
2384 let removed = this.tree().remove(key.as_slice())?;
2385 Ok::<_, TransactionError<()>>(removed)
2386 }
2387 }
2388 }
2389 .map_err(|e| anyhow!(format!("{:?}", e)))?;
2390
2391 Ok(removed)
2392 }
2393
2394 #[inline]
2396 fn _remove_with_prefix(&self, prefix: IVec) -> Result<()> {
2397 let tree = self.tree();
2398 let prefix = [self.map_item_prefix_name.as_slice(), prefix.as_ref()]
2399 .concat()
2400 .to_vec();
2401
2402 #[cfg(feature = "map_len")]
2403 let map_count_key_name = self.map_count_key_name.to_vec();
2404 {
2405 let mut removeds = Batch::default();
2406 #[cfg(feature = "map_len")]
2407 let mut c = 0;
2408 for item in tree.scan_prefix(prefix) {
2409 match item {
2410 Ok((k, _v)) => {
2411 removeds.remove(k.as_ref());
2412 #[cfg(feature = "map_len")]
2413 {
2414 c += 1;
2415 }
2416 }
2417 Err(e) => {
2418 log::warn!("{:?}", e);
2419 }
2420 }
2421 }
2422
2423 #[cfg(feature = "map_len")]
2424 {
2425 tree.transaction(move |tx| {
2426 let len = Self::_tx_counter_get(tx, map_count_key_name.as_slice())? - c;
2427 if len > 0 {
2428 Self::_tx_counter_set(tx, map_count_key_name.as_slice(), len)?;
2429 } else {
2430 Self::_tx_counter_remove(tx, map_count_key_name.as_slice())?;
2431 };
2432 tx.apply_batch(&removeds)?;
2433 Ok::<(), ConflictableTransactionError<sled::Error>>(())
2434 })
2435 }
2436 #[cfg(not(feature = "map_len"))]
2437 {
2438 tree.apply_batch(removeds)?;
2439 Ok::<(), ConflictableTransactionError<sled::Error>>(())
2440 }
2441 }?;
2442 Ok(())
2443 }
2444
2445 #[inline]
2447 fn _batch_insert(&self, key_vals: Vec<(IVec, IVec)>) -> Result<()> {
2448 for (k, v) in key_vals {
2449 self._insert(k, v)?;
2450 }
2451 Ok(())
2452 }
2453
2454 #[inline]
2456 fn _batch_remove(&self, keys: Vec<IVec>) -> Result<()> {
2457 for k in keys {
2458 self._remove(k)?;
2459 }
2460 Ok(())
2461 }
2462
2463 #[cfg(feature = "ttl")]
2465 #[inline]
2466 fn _expire_at(&self, at: TimestampMillis) -> Result<bool> {
2467 self.db._expire_at(self.name.as_slice(), at, KeyType::Map)
2468 }
2469
2470 #[cfg(feature = "ttl")]
2472 #[inline]
2473 fn _ttl(&self) -> Result<Option<TimestampMillis>> {
2474 let res = self
2475 .db
2476 ._ttl(self.name(), |k| {
2477 SledStorageDB::_map_contains_key(self.tree(), k)
2478 })?
2479 .and_then(|(at, _)| if at > 0 { Some(at) } else { None });
2480 Ok(res)
2481 }
2482
2483 #[inline]
2485 fn _is_expired(&self) -> Result<bool> {
2486 self.db._is_expired(self.name.as_slice(), |k| {
2487 SledStorageDB::_map_contains_key(self.tree(), k)
2488 })
2489 }
2490
2491 #[inline]
2493 async fn call_is_expired(&self) -> Result<bool> {
2494 let (tx, rx) = oneshot::channel();
2495 self.db
2496 .cmd_send(Command::MapIsExpired(self.clone(), tx))
2497 .await?;
2498 rx.await?
2499 }
2500
2501 #[inline]
2503 fn _prefix_iter(&self, prefix: Option<IVec>) -> sled::Iter {
2504 if let Some(prefix) = prefix {
2505 self.tree()
2506 .scan_prefix([self.map_item_prefix_name.as_slice(), prefix.as_ref()].concat())
2507 } else {
2508 self.tree()
2509 .scan_prefix(self.map_item_prefix_name.as_slice())
2510 }
2511 }
2512
2513 #[inline]
2515 async fn call_prefix_iter(&self, prefix: Option<IVec>) -> Result<sled::Iter> {
2516 let (tx, rx) = oneshot::channel();
2517 self.db
2518 .cmd_send(Command::MapPrefixIter(self.clone(), prefix, tx))
2519 .await?;
2520 Ok(rx.await?)
2521 }
2522}
2523
2524#[async_trait]
2525impl Map for SledStorageMap {
2526 #[inline]
2528 fn name(&self) -> &[u8] {
2529 self.name.as_slice()
2530 }
2531
2532 #[inline]
2534 async fn insert<K, V>(&self, key: K, val: &V) -> Result<()>
2535 where
2536 K: AsRef<[u8]> + Sync + Send,
2537 V: Serialize + Sync + Send + ?Sized,
2538 {
2539 let val = bincode::serialize(val)?;
2540 let (tx, rx) = oneshot::channel();
2541 self.db
2542 .cmd_send(Command::MapInsert(
2543 self.clone(),
2544 key.as_ref().into(),
2545 val.into(),
2546 tx,
2547 ))
2548 .await?;
2549 rx.await??;
2550 Ok(())
2551 }
2552
2553 #[inline]
2555 async fn get<K, V>(&self, key: K) -> Result<Option<V>>
2556 where
2557 K: AsRef<[u8]> + Sync + Send,
2558 V: DeserializeOwned + Sync + Send,
2559 {
2560 let (tx, rx) = oneshot::channel();
2561 self.db
2562 .cmd_send(Command::MapGet(self.clone(), key.as_ref().into(), tx))
2563 .await?;
2564
2565 match rx.await?? {
2566 Some(v) => Ok(Some(bincode::deserialize::<V>(v.as_ref())?)),
2567 None => Ok(None),
2568 }
2569 }
2570
2571 #[inline]
2573 async fn remove<K>(&self, key: K) -> Result<()>
2574 where
2575 K: AsRef<[u8]> + Sync + Send,
2576 {
2577 let (tx, rx) = oneshot::channel();
2578 self.db
2579 .cmd_send(Command::MapRemove(self.clone(), key.as_ref().into(), tx))
2580 .await?;
2581 rx.await??;
2582 Ok(())
2583 }
2584
2585 #[inline]
2587 async fn contains_key<K: AsRef<[u8]> + Sync + Send>(&self, key: K) -> Result<bool> {
2588 let (tx, rx) = oneshot::channel();
2589 self.db
2590 .cmd_send(Command::MapContainsKey(
2591 self.clone(),
2592 key.as_ref().into(),
2593 tx,
2594 ))
2595 .await?;
2596 Ok(rx.await??)
2597 }
2598
2599 #[cfg(feature = "map_len")]
2601 #[inline]
2602 async fn len(&self) -> Result<usize> {
2603 let (tx, rx) = oneshot::channel();
2604 self.db.cmd_send(Command::MapLen(self.clone(), tx)).await?;
2605 Ok(rx.await??)
2606 }
2607
2608 #[inline]
2610 async fn is_empty(&self) -> Result<bool> {
2611 let (tx, rx) = oneshot::channel();
2612 self.db
2613 .cmd_send(Command::MapIsEmpty(self.clone(), tx))
2614 .await?;
2615 Ok(rx.await??)
2616 }
2617
2618 #[inline]
2620 async fn clear(&self) -> Result<()> {
2621 let (tx, rx) = oneshot::channel();
2622 self.db
2623 .cmd_send(Command::MapClear(self.clone(), tx))
2624 .await?;
2625 rx.await??;
2626 Ok(())
2627 }
2628
2629 #[inline]
2631 async fn remove_and_fetch<K, V>(&self, key: K) -> Result<Option<V>>
2632 where
2633 K: AsRef<[u8]> + Sync + Send,
2634 V: DeserializeOwned + Sync + Send,
2635 {
2636 let (tx, rx) = oneshot::channel();
2637 self.db
2638 .cmd_send(Command::MapRemoveAndFetch(
2639 self.clone(),
2640 key.as_ref().into(),
2641 tx,
2642 ))
2643 .await?;
2644
2645 match rx.await?? {
2646 Some(v) => Ok(Some(bincode::deserialize::<V>(v.as_ref())?)),
2647 None => Ok(None),
2648 }
2649 }
2650
2651 #[inline]
2653 async fn remove_with_prefix<K>(&self, prefix: K) -> Result<()>
2654 where
2655 K: AsRef<[u8]> + Sync + Send,
2656 {
2657 let (tx, rx) = oneshot::channel();
2658 self.db
2659 .cmd_send(Command::MapRemoveWithPrefix(
2660 self.clone(),
2661 prefix.as_ref().into(),
2662 tx,
2663 ))
2664 .await?;
2665 rx.await??;
2666 Ok(())
2667 }
2668
2669 #[inline]
2671 async fn batch_insert<V>(&self, key_vals: Vec<(Key, V)>) -> Result<()>
2672 where
2673 V: serde::ser::Serialize + Sync + Send,
2674 {
2675 let key_vals = key_vals
2676 .into_iter()
2677 .map(|(k, v)| {
2678 bincode::serialize(&v)
2679 .map(|v| (k.into(), v.into()))
2680 .map_err(|e| anyhow!(e))
2681 })
2682 .collect::<Result<Vec<(IVec, IVec)>>>()?;
2683
2684 let (tx, rx) = oneshot::channel();
2685 self.db
2686 .cmd_send(Command::MapBatchInsert(self.clone(), key_vals, tx))
2687 .await?;
2688 rx.await??;
2689 Ok(())
2690 }
2691
2692 #[inline]
2694 async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
2695 let keys = keys.into_iter().map(|k| k.into()).collect::<Vec<IVec>>();
2696
2697 let (tx, rx) = oneshot::channel();
2698 self.db
2699 .cmd_send(Command::MapBatchRemove(self.clone(), keys, tx))
2700 .await?;
2701 rx.await??;
2702 Ok(())
2703 }
2704
2705 #[inline]
2707 async fn iter<'a, V>(
2708 &'a mut self,
2709 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
2710 where
2711 V: DeserializeOwned + Sync + Send + 'a + 'static,
2712 {
2713 let this = self;
2714 let res = {
2715 if this.call_is_expired().await? {
2716 let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> =
2717 Box::new(AsyncEmptyIter {
2718 _m: std::marker::PhantomData,
2719 });
2720 Ok::<_, anyhow::Error>(iter)
2721 } else {
2722 let tem_prefix_name = this.map_item_prefix_name.len();
2723 let iter = this.call_prefix_iter(None).await?;
2724 let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> = Box::new(AsyncIter {
2725 db: &this.db,
2726 prefix_len: tem_prefix_name,
2727 iter: Some(iter),
2728 _m: std::marker::PhantomData,
2729 });
2730 Ok::<_, anyhow::Error>(iter)
2731 }
2732 }?;
2733 Ok(res)
2734 }
2735
2736 #[inline]
2738 async fn key_iter<'a>(
2739 &'a mut self,
2740 ) -> Result<Box<dyn AsyncIterator<Item = Result<Key>> + Send + 'a>> {
2741 let this = self;
2742 let res = {
2743 if this.call_is_expired().await? {
2744 let iter: Box<dyn AsyncIterator<Item = Result<Key>> + Send> =
2745 Box::new(AsyncEmptyIter {
2746 _m: std::marker::PhantomData,
2747 });
2748 Ok::<_, anyhow::Error>(iter)
2749 } else {
2750 let iter = this.call_prefix_iter(None).await?;
2751 let iter: Box<dyn AsyncIterator<Item = Result<Key>> + Send> =
2752 Box::new(AsyncKeyIter {
2753 db: &this.db,
2754 prefix_len: this.map_item_prefix_name.len(),
2755 iter: Some(iter),
2756 });
2757 Ok::<_, anyhow::Error>(iter)
2758 }
2759 }?;
2760 Ok(res)
2761 }
2762
2763 #[inline]
2765 async fn prefix_iter<'a, P, V>(
2766 &'a mut self,
2767 prefix: P,
2768 ) -> Result<Box<dyn AsyncIterator<Item = IterItem<V>> + Send + 'a>>
2769 where
2770 P: AsRef<[u8]> + Send + Sync,
2771 V: DeserializeOwned + Sync + Send + 'a + 'static,
2772 {
2773 let this = self;
2774 let res = {
2775 if this.call_is_expired().await? {
2776 let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> =
2777 Box::new(AsyncEmptyIter {
2778 _m: std::marker::PhantomData,
2779 });
2780 Ok::<_, anyhow::Error>(iter)
2781 } else {
2782 let iter = this
2783 .call_prefix_iter(Some(IVec::from(prefix.as_ref())))
2784 .await?;
2785 let iter: Box<dyn AsyncIterator<Item = IterItem<V>> + Send> = Box::new(AsyncIter {
2786 db: &this.db,
2787 prefix_len: this.map_item_prefix_name.len(),
2788 iter: Some(iter),
2789 _m: std::marker::PhantomData,
2790 });
2791 Ok::<_, anyhow::Error>(iter)
2792 }
2793 }?;
2794 Ok(res)
2795 }
2796
2797 #[cfg(feature = "ttl")]
2799 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
2800 let (tx, rx) = oneshot::channel();
2801 self.db
2802 .cmd_send(Command::MapExpireAt(self.clone(), at, tx))
2803 .await?;
2804 Ok(rx.await??)
2805 }
2806
2807 #[cfg(feature = "ttl")]
2809 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
2810 let at = timestamp_millis() + dur;
2811 self.expire_at(at).await
2812 }
2813
2814 #[cfg(feature = "ttl")]
2816 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
2817 let (tx, rx) = oneshot::channel();
2818 self.db.cmd_send(Command::MapTTL(self.clone(), tx)).await?;
2819 Ok(rx.await??)
2820 }
2821}
2822
2823#[derive(Clone)]
2825pub struct SledStorageList {
2826 name: Key,
2828 prefix_name: Key,
2830 pub(crate) db: SledStorageDB,
2832}
2833
2834impl SledStorageList {
2835 #[inline]
2837 async fn new_expire(
2838 name: Key,
2839 expire_ms: Option<TimestampMillis>,
2840 db: SledStorageDB,
2841 ) -> Result<Self> {
2842 let (tx, rx) = oneshot::channel();
2843 db.cmd_send(Command::DBListNew(db.clone(), name.into(), expire_ms, tx))
2844 .await?;
2845 rx.await?
2846 }
2847
2848 #[inline]
2850 fn _new_expire(
2851 name: Key,
2852 _expire_ms: Option<TimestampMillis>,
2853 db: SledStorageDB,
2854 ) -> Result<Self> {
2855 let l = Self::_new(name, db);
2856 #[cfg(feature = "ttl")]
2857 if let Some(expire_ms) = _expire_ms {
2858 l._expire_at(timestamp_millis() + expire_ms)?;
2859 }
2860 Ok(l)
2861 }
2862
2863 #[inline]
2865 fn _new(name: Key, db: SledStorageDB) -> Self {
2866 let prefix_name = SledStorageDB::make_list_prefix(name.as_slice());
2867 SledStorageList {
2868 name,
2869 prefix_name,
2870 db,
2871 }
2872 }
2873
2874 #[inline]
2876 pub(crate) fn name(&self) -> &[u8] {
2877 self.name.as_slice()
2878 }
2879
2880 #[inline]
2882 pub(crate) fn tree(&self) -> &sled::Tree {
2883 &self.db.list_tree
2884 }
2885
2886 #[inline]
2888 fn make_list_count_key(&self) -> Vec<u8> {
2889 let list_count_key = [self.prefix_name.as_ref(), LIST_KEY_COUNT_SUFFIX].concat();
2890 list_count_key
2891 }
2892
2893 #[inline]
2895 fn make_list_content_prefix(prefix_name: &[u8], idx: Option<&[u8]>) -> Vec<u8> {
2896 if let Some(idx) = idx {
2897 [prefix_name, LIST_KEY_CONTENT_SUFFIX, idx].concat()
2898 } else {
2899 [prefix_name, LIST_KEY_CONTENT_SUFFIX].concat()
2900 }
2901 }
2902
2903 #[inline]
2905 fn make_list_content_key(&self, idx: usize) -> Vec<u8> {
2906 Self::make_list_content_prefix(
2907 self.prefix_name.as_ref(),
2908 Some(idx.to_be_bytes().as_slice()),
2909 )
2910 }
2911
2912 #[inline]
2914 fn make_list_content_keys(&self, start: usize, end: usize) -> Vec<Vec<u8>> {
2915 (start..end)
2916 .map(|idx| self.make_list_content_key(idx))
2917 .collect()
2918 }
2919
2920 #[inline]
2922 fn tx_list_count_get<K, E>(
2923 tx: &TransactionalTree,
2924 list_count_key: K,
2925 ) -> ConflictableTransactionResult<(usize, usize), E>
2926 where
2927 K: AsRef<[u8]>,
2928 {
2929 if let Some(v) = tx.get(list_count_key.as_ref())? {
2930 let (start, end) = bincode::deserialize::<(usize, usize)>(v.as_ref()).map_err(|e| {
2931 ConflictableTransactionError::Storage(sled::Error::Io(io::Error::new(
2932 ErrorKind::InvalidData,
2933 e,
2934 )))
2935 })?;
2936 Ok((start, end))
2937 } else {
2938 Ok((0, 0))
2939 }
2940 }
2941
2942 #[inline]
2944 fn tx_list_count_set<K, E>(
2945 tx: &TransactionalTree,
2946 key_count: K,
2947 start: usize,
2948 end: usize,
2949 ) -> ConflictableTransactionResult<(), E>
2950 where
2951 K: AsRef<[u8]>,
2952 {
2953 let count_bytes = bincode::serialize(&(start, end)).map_err(|e| {
2954 ConflictableTransactionError::Storage(sled::Error::Io(io::Error::new(
2955 ErrorKind::InvalidData,
2956 e,
2957 )))
2958 })?;
2959 tx.insert(key_count.as_ref(), count_bytes.as_slice())?;
2960 Ok(())
2961 }
2962
2963 #[inline]
2965 fn tx_list_content_set<K, V, E>(
2966 tx: &TransactionalTree,
2967 key_content: K,
2968 data: V,
2969 ) -> ConflictableTransactionResult<(), E>
2970 where
2971 K: AsRef<[u8]>,
2972 V: AsRef<[u8]>,
2973 {
2974 tx.insert(key_content.as_ref(), data.as_ref())?;
2975 Ok(())
2976 }
2977
2978 #[inline]
2980 fn tx_list_content_batch_set<K, V, E>(
2981 tx: &TransactionalTree,
2982 key_contents: Vec<(K, V)>,
2983 ) -> ConflictableTransactionResult<(), E>
2984 where
2985 K: AsRef<[u8]>,
2986 V: AsRef<[u8]>,
2987 {
2988 let mut batch = Batch::default();
2989 for (k, v) in key_contents {
2990 batch.insert(k.as_ref(), v.as_ref());
2991 }
2992 tx.apply_batch(&batch)?;
2993 Ok(())
2994 }
2995
2996 #[inline]
2998 fn _clear(&self) -> Result<()> {
2999 let mut batch = Batch::default();
3000 let list_count_key = self.make_list_count_key();
3001 batch.remove(list_count_key);
3002 let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None);
3003 for item in self.tree().scan_prefix(list_content_prefix).keys() {
3004 match item {
3005 Ok(k) => {
3006 batch.remove(k);
3007 }
3008 Err(e) => {
3009 log::warn!("{:?}", e);
3010 }
3011 }
3012 }
3013 self.tree()
3014 .transaction(|tx| {
3015 tx.apply_batch(&batch)?;
3016 Ok::<_, ConflictableTransactionError<()>>(())
3017 })
3018 .map_err(|e| anyhow!(format!("{:?}", e)))?;
3019 Ok(())
3020 }
3021
3022 #[inline]
3024 fn _tx_clear(
3025 list_tree_tx: &TransactionalTree,
3026 batch: &Batch,
3027 ) -> ConflictableTransactionResult<()> {
3028 list_tree_tx.apply_batch(batch)?;
3029 Ok(())
3030 }
3031
3032 #[inline]
3034 fn _make_clear_batch(&self) -> Batch {
3035 let mut batch = Batch::default();
3036 let list_count_key = self.make_list_count_key();
3037 batch.remove(list_count_key);
3038 let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None);
3039 for item in self.tree().scan_prefix(list_content_prefix).keys() {
3040 match item {
3041 Ok(k) => {
3042 batch.remove(k);
3043 }
3044 Err(e) => {
3045 log::warn!("{:?}", e);
3046 }
3047 }
3048 }
3049 batch
3050 }
3051
3052 #[inline]
3054 fn _push(&self, data: IVec) -> Result<()> {
3055 let this = self;
3056 this.tree().transaction(move |tx| {
3057 let list_count_key = this.make_list_count_key();
3058 let (start, mut end) = Self::tx_list_count_get::<
3059 _,
3060 ConflictableTransactionError<sled::Error>,
3061 >(tx, list_count_key.as_slice())?;
3062 end += 1;
3063 Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3064
3065 let list_content_key = this.make_list_content_key(end);
3066 Self::tx_list_content_set(tx, list_content_key.as_slice(), data.as_ref())?;
3067 Ok(())
3068 })?;
3069
3070 #[cfg(feature = "ttl")]
3071 {
3072 if this.db._is_expired(this.name.as_slice(), |k| {
3073 SledStorageDB::_list_contains_key(this.tree(), k)
3074 })? {
3075 (&self.db.key_expire_tree, &self.db.expire_key_tree)
3077 .transaction(|(key_expire_tx, expire_key_tx)| {
3078 SledStorageDB::_tx_remove_expire_key(
3079 key_expire_tx,
3080 expire_key_tx,
3081 this.name.as_slice(),
3082 )?;
3083 Ok::<(), ConflictableTransactionError<()>>(())
3084 })
3085 .map_err(|e| anyhow!(format!("{:?}", e)))?;
3086 }
3087 }
3088
3089 Ok(())
3090 }
3091
3092 #[inline]
3094 fn _pushs(&self, vals: Vec<IVec>) -> Result<()> {
3095 if vals.is_empty() {
3096 return Ok(());
3097 }
3098 let tree = self.tree();
3099 let this = self;
3100
3101 tree.transaction(move |tx| {
3102 let list_count_key = this.make_list_count_key();
3103 let (start, mut end) = Self::tx_list_count_get::<
3104 _,
3105 ConflictableTransactionError<sled::Error>,
3106 >(tx, list_count_key.as_slice())?;
3107
3108 let mut list_content_keys = this.make_list_content_keys(end + 1, end + vals.len() + 1);
3109 end += vals.len();
3111 Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3112
3113 let list_contents = vals
3114 .iter()
3115 .map(|val| (list_content_keys.remove(0), val))
3116 .collect::<Vec<_>>();
3117 Self::tx_list_content_batch_set(tx, list_contents)?;
3118 Ok(())
3119 })?;
3120
3121 #[cfg(feature = "ttl")]
3122 {
3123 if this.db._is_expired(this.name.as_slice(), |k| {
3124 SledStorageDB::_list_contains_key(this.tree(), k)
3125 })? {
3126 (&self.db.key_expire_tree, &self.db.expire_key_tree)
3128 .transaction(|(key_expire_tx, expire_key_tx)| {
3129 SledStorageDB::_tx_remove_expire_key(
3130 key_expire_tx,
3131 expire_key_tx,
3132 this.name.as_slice(),
3133 )?;
3134 Ok::<(), ConflictableTransactionError<()>>(())
3135 })
3136 .map_err(|e| anyhow!(format!("{:?}", e)))?;
3137 }
3138 }
3139 Ok(())
3140 }
3141
3142 #[inline]
3144 fn _push_limit(
3145 &self,
3146 data: IVec,
3147 limit: usize,
3148 pop_front_if_limited: bool,
3149 ) -> Result<Option<IVec>> {
3150 let tree = self.tree();
3151 let this = self;
3152 let removed = {
3153 let res = tree.transaction(move |tx| {
3154 let list_count_key = this.make_list_count_key();
3155 let (mut start, mut end) = Self::tx_list_count_get::<
3156 _,
3157 ConflictableTransactionError<sled::Error>,
3158 >(tx, list_count_key.as_slice())?;
3159 let count = end - start;
3160
3161 if count < limit {
3162 end += 1;
3163 Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3164 let list_content_key = this.make_list_content_key(end);
3165 Self::tx_list_content_set(tx, list_content_key.as_slice(), data.as_ref())?;
3166 Ok(None)
3167 } else if pop_front_if_limited {
3168 let mut removed = None;
3169 let removed_content_key = this.make_list_content_key(start + 1);
3170 if let Some(v) = tx.remove(removed_content_key)? {
3171 removed = Some(v);
3172 start += 1;
3173 }
3174 end += 1;
3175 Self::tx_list_count_set(tx, list_count_key.as_slice(), start, end)?;
3176 let list_content_key = this.make_list_content_key(end);
3177 Self::tx_list_content_set(tx, list_content_key.as_slice(), data.as_ref())?;
3178 Ok(removed)
3179 } else {
3180 Err(ConflictableTransactionError::Storage(sled::Error::Io(
3181 io::Error::new(ErrorKind::InvalidData, "Is full"),
3182 )))
3183 }
3184 });
3185
3186 #[cfg(feature = "ttl")]
3187 {
3188 if this.db._is_expired(this.name.as_slice(), |k| {
3189 SledStorageDB::_list_contains_key(this.tree(), k)
3190 })? {
3191 (&self.db.key_expire_tree, &self.db.expire_key_tree)
3193 .transaction(|(key_expire_tx, expire_key_tx)| {
3194 SledStorageDB::_tx_remove_expire_key(
3195 key_expire_tx,
3196 expire_key_tx,
3197 this.name.as_slice(),
3198 )?;
3199 Ok::<(), ConflictableTransactionError<()>>(())
3200 })
3201 .map_err(|e| anyhow!(format!("{:?}", e)))?;
3202 }
3203 }
3204
3205 Ok::<_, TransactionError<()>>(res)
3206 }
3207 .map_err(|e| anyhow!(format!("{:?}", e)))??;
3208
3209 Ok(removed)
3210 }
3211
3212 #[inline]
3214 fn _pop(&self) -> Result<Option<IVec>> {
3215 let this = self;
3216 let removed = {
3217 if this.db._is_expired(this.name.as_slice(), |k| {
3218 SledStorageDB::_list_contains_key(this.tree(), k)
3219 })? {
3220 Ok(None)
3221 } else {
3222 let removed = this.tree().transaction(move |tx| {
3223 let list_count_key = this.make_list_count_key();
3224 let (start, end) = Self::tx_list_count_get(tx, list_count_key.as_slice())?;
3225
3226 let mut removed = None;
3227 if (end - start) > 0 {
3228 let removed_content_key = this.make_list_content_key(start + 1);
3229 if let Some(v) = tx.remove(removed_content_key)? {
3230 removed = Some(v);
3231 Self::tx_list_count_set(tx, list_count_key.as_slice(), start + 1, end)?;
3232 }
3233 }
3234 Ok::<_, ConflictableTransactionError<sled::Error>>(removed)
3235 });
3236 removed
3237 }
3238 }?;
3239
3240 Ok(removed)
3241 }
3242
3243 #[inline]
3245 fn _all(&self) -> Result<Vec<IVec>> {
3246 let this = self;
3247 let res = {
3248 if this.db._is_expired(this.name.as_slice(), |k| {
3249 SledStorageDB::_list_contains_key(this.tree(), k)
3250 })? {
3251 Ok(vec![])
3252 } else {
3253 let key_content_prefix =
3254 Self::make_list_content_prefix(this.prefix_name.as_slice(), None);
3255 this.tree()
3256 .scan_prefix(key_content_prefix)
3257 .values()
3258 .map(|item| item.map_err(anyhow::Error::new))
3259 .collect::<Result<Vec<_>>>()
3260 }
3261 }?;
3262 Ok(res)
3263 }
3264
3265 #[inline]
3267 fn _get_index(&self, idx: usize) -> Result<Option<IVec>> {
3268 let this = self;
3269 let res = {
3270 if this.db._is_expired(this.name.as_slice(), |k| {
3271 SledStorageDB::_list_contains_key(this.tree(), k)
3272 })? {
3273 Ok(None)
3274 } else {
3275 this.tree().transaction(move |tx| {
3276 let list_count_key = this.make_list_count_key();
3277 let (start, end) = Self::tx_list_count_get::<
3278 _,
3279 ConflictableTransactionError<sled::Error>,
3280 >(tx, list_count_key.as_slice())?;
3281 if idx < (end - start) {
3282 let list_content_key = this.make_list_content_key(start + idx + 1);
3283 if let Some(v) = tx.get(list_content_key)? {
3284 Ok(Some(v))
3285 } else {
3286 Ok(None)
3287 }
3288 } else {
3289 Ok(None)
3290 }
3291 })
3292 }
3293 }?;
3294 Ok(res)
3295 }
3296
3297 #[inline]
3299 fn _len(&self) -> Result<usize> {
3300 let this = self;
3301 let res = {
3302 if this.db._is_expired(this.name.as_slice(), |k| {
3303 SledStorageDB::_list_contains_key(this.tree(), k)
3304 })? {
3305 Ok::<usize, anyhow::Error>(0)
3306 } else {
3307 let list_count_key = this.make_list_count_key();
3308 if let Some(v) = this.tree().get(list_count_key.as_slice())? {
3309 let (start, end) = bincode::deserialize::<(usize, usize)>(v.as_ref())?;
3310 Ok(end - start)
3311 } else {
3312 Ok(0)
3313 }
3314 }
3315 }?;
3316 Ok(res)
3317 }
3318
3319 #[inline]
3321 fn _is_empty(&self) -> Result<bool> {
3322 let this = self;
3323 let res = {
3324 if this.db._is_expired(this.name.as_slice(), |k| {
3325 SledStorageDB::_list_contains_key(this.tree(), k)
3326 })? {
3327 Ok::<bool, anyhow::Error>(true)
3328 } else {
3329 let list_content_prefix =
3330 Self::make_list_content_prefix(this.prefix_name.as_slice(), None);
3331 Ok(this
3332 .tree()
3333 .scan_prefix(list_content_prefix)
3334 .keys()
3335 .next()
3336 .is_none())
3337 }
3338 }?;
3339 Ok(res)
3340 }
3341
3342 #[cfg(feature = "ttl")]
3344 #[inline]
3345 fn _expire_at(&self, at: TimestampMillis) -> Result<bool> {
3346 self.db._expire_at(self.name.as_slice(), at, KeyType::List)
3347 }
3348
3349 #[cfg(feature = "ttl")]
3351 #[inline]
3352 fn _ttl(&self) -> Result<Option<TimestampMillis>> {
3353 Ok(self
3354 .db
3355 ._ttl(self.name(), |k| {
3356 SledStorageDB::_list_contains_key(self.tree(), k)
3357 })?
3358 .and_then(|(at, _)| if at > 0 { Some(at) } else { None }))
3359 }
3360
3361 #[inline]
3363 fn _is_expired(&self) -> Result<bool> {
3364 self.db._is_expired(self.name.as_slice(), |k| {
3365 SledStorageDB::_list_contains_key(self.tree(), k)
3366 })
3367 }
3368
3369 #[inline]
3371 async fn call_is_expired(&self) -> Result<bool> {
3372 let (tx, rx) = oneshot::channel();
3373 self.db
3374 .cmd_send(Command::ListIsExpired(self.clone(), tx))
3375 .await?;
3376 rx.await?
3377 }
3378
3379 #[inline]
3381 fn _prefix_iter(&self) -> sled::Iter {
3382 let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None);
3383 self.tree().scan_prefix(list_content_prefix)
3384 }
3385
3386 #[inline]
3388 async fn call_prefix_iter(&self) -> Result<sled::Iter> {
3389 let (tx, rx) = oneshot::channel();
3390 self.db
3391 .cmd_send(Command::ListPrefixIter(self.clone(), tx))
3392 .await?;
3393 Ok(rx.await?)
3394 }
3395}
3396
3397#[async_trait]
3398impl List for SledStorageList {
3399 #[inline]
3401 fn name(&self) -> &[u8] {
3402 self.name.as_slice()
3403 }
3404
3405 #[inline]
3407 async fn push<V>(&self, val: &V) -> Result<()>
3408 where
3409 V: serde::ser::Serialize + Sync + Send,
3410 {
3411 let val = bincode::serialize(val)?;
3412 let (tx, rx) = oneshot::channel();
3413 self.db
3414 .cmd_send(Command::ListPush(self.clone(), val.into(), tx))
3415 .await?;
3416 rx.await??;
3417 Ok(())
3418 }
3419
3420 #[inline]
3422 async fn pushs<V>(&self, vals: Vec<V>) -> Result<()>
3423 where
3424 V: Serialize + Sync + Send,
3425 {
3426 if vals.is_empty() {
3427 return Ok(());
3428 }
3429
3430 let vals = vals
3431 .into_iter()
3432 .map(|v| {
3433 bincode::serialize(&v)
3434 .map(|v| v.into())
3435 .map_err(|e| anyhow!(e))
3436 })
3437 .collect::<Result<Vec<_>>>()?;
3438
3439 let (tx, rx) = oneshot::channel();
3440 self.db
3441 .cmd_send(Command::ListPushs(self.clone(), vals, tx))
3442 .await?;
3443 rx.await??;
3444 Ok(())
3445 }
3446
3447 #[inline]
3449 async fn push_limit<V>(
3450 &self,
3451 val: &V,
3452 limit: usize,
3453 pop_front_if_limited: bool,
3454 ) -> Result<Option<V>>
3455 where
3456 V: serde::ser::Serialize + Sync + Send,
3457 V: DeserializeOwned,
3458 {
3459 let data = bincode::serialize(val)?;
3460
3461 let (tx, rx) = oneshot::channel();
3462 self.db
3463 .cmd_send(Command::ListPushLimit(
3464 self.clone(),
3465 data.into(),
3466 limit,
3467 pop_front_if_limited,
3468 tx,
3469 ))
3470 .await?;
3471
3472 let removed = if let Some(removed) = rx.await?? {
3473 Some(
3474 bincode::deserialize::<V>(removed.as_ref())
3475 .map_err(|e| sled::Error::Io(io::Error::new(ErrorKind::InvalidData, e)))?,
3476 )
3477 } else {
3478 None
3479 };
3480 Ok(removed)
3481 }
3482
3483 #[inline]
3485 async fn pop<V>(&self) -> Result<Option<V>>
3486 where
3487 V: DeserializeOwned + Sync + Send,
3488 {
3489 let (tx, rx) = oneshot::channel();
3490 self.db.cmd_send(Command::ListPop(self.clone(), tx)).await?;
3491
3492 let removed = if let Some(removed) = rx.await?? {
3493 Some(
3494 bincode::deserialize::<V>(removed.as_ref())
3495 .map_err(|e| sled::Error::Io(io::Error::new(ErrorKind::InvalidData, e)))?,
3496 )
3497 } else {
3498 None
3499 };
3500 Ok(removed)
3501 }
3502
3503 #[inline]
3505 async fn all<V>(&self) -> Result<Vec<V>>
3506 where
3507 V: DeserializeOwned + Sync + Send,
3508 {
3509 let (tx, rx) = oneshot::channel();
3510 self.db.cmd_send(Command::ListAll(self.clone(), tx)).await?;
3511
3512 rx.await??
3513 .iter()
3514 .map(|v| bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
3515 .collect::<Result<Vec<_>>>()
3516 }
3517
3518 #[inline]
3520 async fn get_index<V>(&self, idx: usize) -> Result<Option<V>>
3521 where
3522 V: DeserializeOwned + Sync + Send,
3523 {
3524 let (tx, rx) = oneshot::channel();
3525 self.db
3526 .cmd_send(Command::ListGetIndex(self.clone(), idx, tx))
3527 .await?;
3528
3529 Ok(if let Some(res) = rx.await?? {
3530 Some(bincode::deserialize::<V>(res.as_ref()).map_err(|e| anyhow!(e))?)
3531 } else {
3532 None
3533 })
3534 }
3535
3536 #[inline]
3538 async fn len(&self) -> Result<usize> {
3539 let (tx, rx) = oneshot::channel();
3540 self.db.cmd_send(Command::ListLen(self.clone(), tx)).await?;
3541 Ok(rx.await??)
3542 }
3543
3544 #[inline]
3546 async fn is_empty(&self) -> Result<bool> {
3547 let (tx, rx) = oneshot::channel();
3548 self.db
3549 .cmd_send(Command::ListIsEmpty(self.clone(), tx))
3550 .await?;
3551 Ok(rx.await??)
3552 }
3553
3554 #[inline]
3556 async fn clear(&self) -> Result<()> {
3557 let (tx, rx) = oneshot::channel();
3558 self.db
3559 .cmd_send(Command::ListClear(self.clone(), tx))
3560 .await?;
3561 Ok(rx.await??)
3562 }
3563
3564 #[inline]
3566 async fn iter<'a, V>(
3567 &'a mut self,
3568 ) -> Result<Box<dyn AsyncIterator<Item = Result<V>> + Send + 'a>>
3569 where
3570 V: DeserializeOwned + Sync + Send + 'a + 'static,
3571 {
3572 let this = self;
3573 let res = {
3574 if this.call_is_expired().await? {
3575 let iter: Box<dyn AsyncIterator<Item = Result<V>> + Send> =
3576 Box::new(AsyncEmptyIter {
3577 _m: std::marker::PhantomData,
3578 });
3579 Ok::<_, anyhow::Error>(iter)
3580 } else {
3581 let iter = this.call_prefix_iter().await?;
3582 let iter: Box<dyn AsyncIterator<Item = Result<V>> + Send> =
3583 Box::new(AsyncListValIter {
3584 db: &this.db,
3585 iter: Some(iter),
3586 _m: std::marker::PhantomData,
3587 });
3588 Ok::<_, anyhow::Error>(iter)
3589 }
3590 }?;
3591 Ok(res)
3592 }
3593
3594 #[cfg(feature = "ttl")]
3596 async fn expire_at(&self, at: TimestampMillis) -> Result<bool> {
3597 let (tx, rx) = oneshot::channel();
3598 self.db
3599 .cmd_send(Command::ListExpireAt(self.clone(), at, tx))
3600 .await?;
3601 Ok(rx.await??)
3602 }
3603
3604 #[cfg(feature = "ttl")]
3606 async fn expire(&self, dur: TimestampMillis) -> Result<bool> {
3607 let at = timestamp_millis() + dur;
3608 self.expire_at(at).await
3609 }
3610
3611 #[cfg(feature = "ttl")]
3613 async fn ttl(&self) -> Result<Option<TimestampMillis>> {
3614 let (tx, rx) = oneshot::channel();
3615 self.db.cmd_send(Command::ListTTL(self.clone(), tx)).await?;
3616 Ok(rx.await??)
3617 }
3618}
3619
3620pub struct AsyncIter<'a, V> {
3622 db: &'a SledStorageDB,
3623 prefix_len: usize,
3624 iter: Option<sled::Iter>,
3625 _m: std::marker::PhantomData<V>,
3626}
3627
3628impl<V> Debug for AsyncIter<'_, V> {
3629 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3630 f.debug_tuple("AsyncIter .. ").finish()
3631 }
3632}
3633
3634#[async_trait]
3635impl<V> AsyncIterator for AsyncIter<'_, V>
3636where
3637 V: DeserializeOwned + Sync + Send + 'static,
3638{
3639 type Item = IterItem<V>;
3640
3641 async fn next(&mut self) -> Option<Self::Item> {
3642 let mut iter = self.iter.take()?;
3643 let (tx, rx) = oneshot::channel();
3644 if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3645 return Some(Err(e));
3646 }
3647 let item = match rx.await {
3648 Err(e) => {
3649 return Some(Err(anyhow::Error::new(e)));
3650 }
3651 Ok((it, item)) => {
3652 iter = it;
3653 item
3654 }
3655 };
3656
3657 match item {
3658 None => None,
3659 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3660 Some(Ok((k, v))) => {
3661 let name = k.as_ref()[self.prefix_len..].to_vec();
3662 match bincode::deserialize::<V>(v.as_ref()) {
3663 Ok(v) => {
3664 self.iter = Some(iter);
3665 Some(Ok((name, v)))
3666 }
3667 Err(e) => Some(Err(anyhow::Error::new(e))),
3668 }
3669 }
3670 }
3671 }
3672}
3673
3674pub struct AsyncKeyIter<'a> {
3676 db: &'a SledStorageDB,
3677 prefix_len: usize,
3678 iter: Option<sled::Iter>,
3679}
3680
3681impl Debug for AsyncKeyIter<'_> {
3682 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3683 f.debug_tuple("AsyncKeyIter .. ").finish()
3684 }
3685}
3686
3687#[async_trait]
3688impl AsyncIterator for AsyncKeyIter<'_> {
3689 type Item = Result<Key>;
3690
3691 async fn next(&mut self) -> Option<Self::Item> {
3692 let mut iter = self.iter.take()?;
3693 let (tx, rx) = oneshot::channel();
3694 if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3695 return Some(Err(e));
3696 }
3697 let item = match rx.await {
3698 Err(e) => {
3699 return Some(Err(anyhow::Error::new(e)));
3700 }
3701 Ok((it, item)) => {
3702 iter = it;
3703 item
3704 }
3705 };
3706
3707 return match item {
3708 None => None,
3709 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3710 Some(Ok((k, _))) => {
3711 self.iter = Some(iter);
3712 let name = k.as_ref()[self.prefix_len..].to_vec();
3713 Some(Ok(name))
3714 }
3715 };
3716 }
3717}
3718
3719pub struct AsyncListValIter<'a, V> {
3721 db: &'a SledStorageDB,
3722 iter: Option<sled::Iter>,
3723 _m: std::marker::PhantomData<V>,
3724}
3725
3726impl<V> Debug for AsyncListValIter<'_, V> {
3727 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3728 f.debug_tuple("AsyncListValIter .. ").finish()
3729 }
3730}
3731
3732#[async_trait]
3733impl<V> AsyncIterator for AsyncListValIter<'_, V>
3734where
3735 V: DeserializeOwned + Sync + Send + 'static,
3736{
3737 type Item = Result<V>;
3738
3739 async fn next(&mut self) -> Option<Self::Item> {
3740 let mut iter = self.iter.take()?;
3741 let (tx, rx) = oneshot::channel();
3742 if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3743 return Some(Err(e));
3744 }
3745 let item = match rx.await {
3746 Err(e) => {
3747 return Some(Err(anyhow::Error::new(e)));
3748 }
3749 Ok((it, item)) => {
3750 iter = it;
3751 item
3752 }
3753 };
3754
3755 match item {
3756 None => None,
3757 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3758 Some(Ok((_k, v))) => {
3759 self.iter = Some(iter);
3760 Some(bincode::deserialize::<V>(v.as_ref()).map_err(|e| anyhow!(e)))
3761 }
3762 }
3763 }
3764}
3765
3766pub struct AsyncEmptyIter<T> {
3768 _m: std::marker::PhantomData<T>,
3769}
3770
3771impl<T> Debug for AsyncEmptyIter<T> {
3772 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3773 f.debug_tuple("AsyncEmptyIter .. ").finish()
3774 }
3775}
3776
3777#[async_trait]
3778impl<T> AsyncIterator for AsyncEmptyIter<T>
3779where
3780 T: Send + Sync + 'static,
3781{
3782 type Item = T;
3783
3784 async fn next(&mut self) -> Option<Self::Item> {
3785 None
3786 }
3787}
3788
3789pub struct AsyncMapIter<'a> {
3791 db: &'a SledStorageDB,
3792 iter: Option<sled::Iter>,
3793}
3794
3795impl<'a> AsyncMapIter<'a> {
3796 fn new(db: &'a SledStorageDB, iter: sled::Iter) -> Self {
3797 Self {
3798 db,
3799 iter: Some(iter),
3800 }
3801 }
3802}
3803
3804impl Debug for AsyncMapIter<'_> {
3805 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3806 f.debug_tuple("AsyncMapIter .. ").finish()
3807 }
3808}
3809
3810#[async_trait]
3811impl AsyncIterator for AsyncMapIter<'_> {
3812 type Item = Result<StorageMap>;
3813
3814 async fn next(&mut self) -> Option<Self::Item> {
3815 let mut iter = self.iter.take()?;
3816 loop {
3817 let (tx, rx) = oneshot::channel();
3818 if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3819 return Some(Err(e));
3820 }
3821 let item = match rx.await {
3822 Err(e) => {
3823 return Some(Err(anyhow::Error::new(e)));
3824 }
3825 Ok((it, item)) => {
3826 iter = it;
3827 item
3828 }
3829 };
3830
3831 match item {
3832 None => return None,
3833 Some(Err(e)) => return Some(Err(anyhow::Error::new(e))),
3834 Some(Ok((k, _))) => {
3835 if !SledStorageDB::is_map_count_key(k.as_ref()) {
3836 continue;
3837 }
3838 self.iter = Some(iter);
3839 let name = SledStorageDB::map_count_key_to_name(k.as_ref());
3840 return Some(Ok(StorageMap::Sled(self.db._map(name))));
3841 }
3842 }
3843 }
3844 }
3845}
3846
3847pub struct AsyncListIter<'a> {
3849 db: &'a SledStorageDB,
3850 iter: Option<sled::Iter>,
3851}
3852
3853impl Debug for AsyncListIter<'_> {
3854 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3855 f.debug_tuple("AsyncListIter .. ").finish()
3856 }
3857}
3858
3859#[async_trait]
3860impl AsyncIterator for AsyncListIter<'_> {
3861 type Item = Result<StorageList>;
3862
3863 async fn next(&mut self) -> Option<Self::Item> {
3864 let mut iter = self.iter.take()?;
3865 loop {
3866 let (tx, rx) = oneshot::channel();
3867 if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3868 return Some(Err(e));
3869 }
3870 let item = match rx.await {
3871 Err(e) => {
3872 return Some(Err(anyhow::Error::new(e)));
3873 }
3874 Ok((it, item)) => {
3875 iter = it;
3876 item
3877 }
3878 };
3879 return match item {
3880 None => None,
3881 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3882 Some(Ok((k, _))) => {
3883 if !SledStorageDB::is_list_count_key(k.as_ref()) {
3884 continue;
3885 }
3886 self.iter = Some(iter);
3887 let name = SledStorageDB::list_count_key_to_name(k.as_ref());
3888 Some(Ok(StorageList::Sled(self.db._list(name))))
3889 }
3890 };
3891 }
3892 }
3893}
3894
3895pub struct AsyncDbKeyIter<'a> {
3897 db: &'a SledStorageDB,
3898 pattern: Pattern,
3899 iter: Option<sled::Iter>,
3900}
3901
3902impl Debug for AsyncDbKeyIter<'_> {
3903 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3904 f.debug_tuple("AsyncDbKeyIter .. ").finish()
3905 }
3906}
3907
3908#[async_trait]
3909impl AsyncIterator for AsyncDbKeyIter<'_> {
3910 type Item = Result<Key>;
3911
3912 async fn next(&mut self) -> Option<Self::Item> {
3913 let mut iter = self.iter.take()?;
3914 loop {
3915 let (tx, rx) = oneshot::channel();
3916 if let Err(e) = self.db.cmd_send(Command::IterNext(iter, tx)).await {
3917 return Some(Err(e));
3918 }
3919 let item = match rx.await {
3920 Err(e) => {
3921 return Some(Err(anyhow::Error::new(e)));
3922 }
3923 Ok((it, item)) => {
3924 iter = it;
3925 item
3926 }
3927 };
3928
3929 return match item {
3930 None => None,
3931 Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
3932 Some(Ok((k, _))) => {
3933 if !is_match(self.pattern.clone(), k.as_ref()) {
3934 continue;
3935 }
3936 self.iter = Some(iter);
3937 Some(Ok(k.to_vec()))
3938 }
3939 };
3940 }
3941 }
3942}