1#![allow(clippy::type_complexity)]
2use crate::custom_serde::serialize::{Deserializer, Key, Serializer, Value};
47use core::fmt;
48use core::iter::{DoubleEndedIterator, Iterator};
49use core::ops::{Bound, RangeBounds};
50use sled::{
51 transaction::{ConflictableTransactionResult, TransactionResult},
52 IVec, Result,
53};
54use std::marker::PhantomData;
55
56pub mod serialize;
57
58#[cfg(feature = "convert")]
59pub mod convert;
60
61#[cfg(feature = "key-generating")]
62pub mod key_generating;
63
64#[derive(Debug)]
103pub struct Tree<K, V, SerDe> {
104 inner: sled::Tree,
105 _key: PhantomData<fn() -> K>,
106 _value: PhantomData<fn() -> V>,
107 _serde: PhantomData<fn(SerDe)>,
108}
109
110impl<K, V, SerDe> Clone for Tree<K, V, SerDe> {
113 fn clone(&self) -> Self {
114 Self {
115 inner: self.inner.clone(),
116 _key: PhantomData,
117 _value: PhantomData,
118 _serde: PhantomData,
119 }
120 }
121}
122
123impl<K, V, SerDe> Tree<K, V, SerDe> {
128 pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
160 Self {
161 inner: db.open_tree(id.as_ref()).unwrap(),
162 _key: PhantomData,
163 _value: PhantomData,
164 _serde: PhantomData,
165 }
166 }
167
168 pub fn insert(&self, key: &K, value: &V) -> Result<Option<Value<K, V, SerDe>>>
170 where
171 SerDe: serialize::SerDe<K, V>,
172 {
173 self.inner
174 .insert(
175 SerDe::SK::serialize(key).as_ref(),
176 SerDe::SV::serialize(value).as_ref(),
177 )
178 .map(|opt| opt.map(|old_value| SerDe::DV::deserialize(old_value)))
179 }
180
181 pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
183 where
184 F: Fn(&TransactionalTree<K, V, SerDe>) -> ConflictableTransactionResult<A, E>,
185 {
186 self.inner.transaction(|sled_transactional_tree| {
187 f(&TransactionalTree {
188 inner: sled_transactional_tree,
189 _key: PhantomData,
190 _value: PhantomData,
191 _serde: PhantomData,
192 })
193 })
194 }
195
196 pub fn apply_batch(&self, batch: Batch<K, V, SerDe>) -> Result<()> {
200 self.inner.apply_batch(batch.inner)
201 }
202
203 pub fn get(&self, key: &K) -> Result<Option<Value<K, V, SerDe>>>
205 where
206 SerDe: serialize::SerDe<K, V>,
207 {
208 self.inner
209 .get(SerDe::SK::serialize(key))
210 .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
211 }
212
213 pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<Value<K, V, SerDe>>>
215 where
216 SerDe: serialize::SerDe<K, V>,
217 {
218 self.inner
219 .get(key_bytes.as_ref())
220 .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
221 }
222
223 pub fn get_kv_from_raw<B: AsRef<[u8]>>(
226 &self,
227 key_bytes: B,
228 ) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
229 where
230 SerDe: serialize::SerDe<K, V>,
231 {
232 self.inner.get(key_bytes.as_ref()).map(|opt| {
233 opt.map(|v| {
234 (
235 SerDe::DK::deserialize(sled::IVec::from(key_bytes.as_ref())),
236 SerDe::DV::deserialize(v),
237 )
238 })
239 })
240 }
241
242 pub fn remove(&self, key: &K) -> Result<Option<Value<K, V, SerDe>>>
244 where
245 SerDe: serialize::SerDe<K, V>,
246 {
247 self.inner
248 .remove(SerDe::SK::serialize(key).as_ref())
249 .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
250 }
251
252 pub fn compare_and_swap(
258 &self,
259 key: &K,
260 old: Option<&V>,
261 new: Option<&V>,
262 ) -> Result<core::result::Result<(), CompareAndSwapError<Value<K, V, SerDe>>>>
263 where
264 SerDe: serialize::SerDe<K, V>,
265 {
266 self.inner
267 .compare_and_swap(
268 SerDe::SK::serialize(key),
269 old.map(|old| SerDe::SV::serialize(old))
270 .as_ref()
271 .map(|old| old.as_ref()),
272 new.map(|new| SerDe::SV::serialize(new))
273 .as_ref()
274 .map(|new| new.as_ref()),
275 )
276 .map(|cas_res| {
277 cas_res.map_err(|cas_err| CompareAndSwapError {
278 current: cas_err.current.map(|b| SerDe::DV::deserialize(b)),
279 proposed: cas_err.proposed.map(|b| SerDe::DV::deserialize(b)),
280 })
281 })
282 }
283
284 pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<Value<K, V, SerDe>>>
286 where
287 SerDe: serialize::SerDe<K, V>,
288 F: FnMut(Option<Value<K, V, SerDe>>) -> Option<V>,
289 {
290 self.inner
291 .update_and_fetch(SerDe::SK::serialize(key), |opt_value| {
292 f(opt_value.map(|v| SerDe::DV::deserialize(sled::IVec::from(v)))).map(|value| {
293 let bytes = SerDe::SV::serialize(&value);
295 let mut v = Vec::with_capacity(bytes.as_ref().len());
296 v.extend(bytes.as_ref());
297 v
298 })
299 })
300 .map(|res| res.map(|v| SerDe::DV::deserialize(v)))
301 }
302
303 pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<Value<K, V, SerDe>>>
306 where
307 SerDe: serialize::SerDe<K, V>,
308 F: FnMut(Option<Value<K, V, SerDe>>) -> Option<V>,
309 {
310 self.inner
311 .fetch_and_update(SerDe::SK::serialize(key), |opt_value| {
312 f(opt_value.map(|v| SerDe::DV::deserialize(sled::IVec::from(v)))).map(|value| {
313 let bytes = SerDe::SV::serialize(&value);
315 let mut v = Vec::with_capacity(bytes.as_ref().len());
316 v.extend(bytes.as_ref());
317 v
318 })
319 })
320 .map(|res| res.map(|v| SerDe::DV::deserialize(v)))
321 }
322
323 pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V, SerDe>
333 where
334 SerDe: serialize::SerDe<K, V>,
335 {
336 Subscriber::from_sled(self.inner.watch_prefix(SerDe::SK::serialize(prefix)))
337 }
338
339 pub fn watch_all(&self) -> Subscriber<K, V, SerDe>
348 where
349 SerDe: serialize::SerDe<K, V>,
350 {
351 Subscriber::from_sled(self.inner.watch_prefix(vec![]))
352 }
353
354 pub fn flush(&self) -> Result<usize> {
365 self.inner.flush()
366 }
367
368 pub async fn flush_async(&self) -> Result<usize> {
379 self.inner.flush_async().await
380 }
381
382 pub fn contains_key(&self, key: &K) -> Result<bool>
385 where
386 SerDe: serialize::SerDe<K, V>,
387 {
388 self.inner.contains_key(SerDe::SK::serialize(key))
389 }
390
391 pub fn get_lt(&self, key: &K) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
394 where
395 SerDe: serialize::SerDe<K, V>,
396 {
397 self.inner
398 .get_lt(SerDe::SK::serialize(key))
399 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
400 }
401
402 pub fn get_gt(&self, key: &K) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
405 where
406 SerDe: serialize::SerDe<K, V>,
407 {
408 self.inner
409 .get_gt(SerDe::SK::serialize(key))
410 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
411 }
412
413 pub fn merge(&self, key: &K, value: &V) -> Result<Option<Value<K, V, SerDe>>>
426 where
427 SerDe: serialize::SerDe<K, V>,
428 {
429 self.inner
430 .merge(SerDe::SK::serialize(key), SerDe::SV::serialize(value))
431 .map(|res| res.map(|old_v| SerDe::DV::deserialize(old_v)))
432 }
433
434 pub fn set_merge_operator(&self, merge_operator: impl sled::MergeOperator + 'static) {
450 self.inner.set_merge_operator(merge_operator);
451 }
452
453 pub fn iter(&self) -> Iter<K, V, SerDe> {
456 Iter::from_sled(self.inner.iter())
457 }
458
459 pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V, SerDe>
462 where
463 SerDe: serialize::SerDe<K, V>,
464 {
465 match (range.start_bound(), range.end_bound()) {
466 (Bound::Unbounded, Bound::Unbounded) => {
467 Iter::from_sled(self.inner.range::<&[u8], _>(..))
468 }
469 (Bound::Unbounded, Bound::Excluded(b)) => {
470 Iter::from_sled(self.inner.range(..SerDe::SK::serialize(b)))
471 }
472 (Bound::Unbounded, Bound::Included(b)) => {
473 Iter::from_sled(self.inner.range(..=SerDe::SK::serialize(b)))
474 }
475 (Bound::Excluded(b), Bound::Unbounded) => {
477 Iter::from_sled(self.inner.range(SerDe::SK::serialize(b)..))
478 }
479 (Bound::Excluded(b), Bound::Excluded(bb)) => Iter::from_sled(
480 self.inner
481 .range(SerDe::SK::serialize(b)..SerDe::SK::serialize(bb)),
482 ),
483 (Bound::Excluded(b), Bound::Included(bb)) => Iter::from_sled(
484 self.inner
485 .range(SerDe::SK::serialize(b)..=SerDe::SK::serialize(bb)),
486 ),
487 (Bound::Included(b), Bound::Unbounded) => {
488 Iter::from_sled(self.inner.range(SerDe::SK::serialize(b)..))
489 }
490 (Bound::Included(b), Bound::Excluded(bb)) => Iter::from_sled(
491 self.inner
492 .range(SerDe::SK::serialize(b)..SerDe::SK::serialize(bb)),
493 ),
494 (Bound::Included(b), Bound::Included(bb)) => Iter::from_sled(
495 self.inner
496 .range(SerDe::SK::serialize(b)..=SerDe::SK::serialize(bb)),
497 ),
498 }
499 }
500
501 pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V, SerDe>
504 where
505 SerDe: serialize::SerDe<K, V>,
506 {
507 Iter::from_sled(self.inner.scan_prefix(SerDe::SK::serialize(prefix)))
508 }
509
510 pub fn first(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
513 where
514 SerDe: serialize::SerDe<K, V>,
515 {
516 self.inner
517 .first()
518 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
519 }
520
521 pub fn last(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
524 where
525 SerDe: serialize::SerDe<K, V>,
526 {
527 self.inner
528 .last()
529 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
530 }
531
532 pub fn pop_max(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
534 where
535 SerDe: serialize::SerDe<K, V>,
536 {
537 self.inner
538 .pop_max()
539 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
540 }
541
542 pub fn pop_min(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
544 where
545 SerDe: serialize::SerDe<K, V>,
546 {
547 self.inner
548 .pop_min()
549 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
550 }
551
552 pub fn len(&self) -> usize {
554 self.inner.len()
555 }
556
557 pub fn is_empty(&self) -> bool {
559 self.inner.is_empty()
560 }
561
562 pub fn clear(&self) -> Result<()> {
566 self.inner.clear()
567 }
568
569 pub fn name(&self) -> IVec {
571 self.inner.name()
572 }
573
574 pub fn checksum(&self) -> Result<u32> {
580 self.inner.checksum()
581 }
582}
583
584pub struct TransactionalTree<'a, K, V, SerDe> {
585 inner: &'a sled::transaction::TransactionalTree,
586 _key: PhantomData<fn() -> K>,
587 _value: PhantomData<fn() -> V>,
588 _serde: PhantomData<fn(SerDe)>,
589}
590
591impl<'a, K, V, SerDe> TransactionalTree<'a, K, V, SerDe> {
592 pub fn insert(
593 &self,
594 key: &K,
595 value: &V,
596 ) -> std::result::Result<
597 Option<Value<K, V, SerDe>>,
598 sled::transaction::UnabortableTransactionError,
599 >
600 where
601 SerDe: serialize::SerDe<K, V>,
602 {
603 self.inner
604 .insert(
605 SerDe::SK::serialize(key).as_ref(),
606 SerDe::SV::serialize(value).as_ref(),
607 )
608 .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
609 }
610
611 pub fn remove(
612 &self,
613 key: &K,
614 ) -> std::result::Result<
615 Option<Value<K, V, SerDe>>,
616 sled::transaction::UnabortableTransactionError,
617 >
618 where
619 SerDe: serialize::SerDe<K, V>,
620 {
621 self.inner
622 .remove(SerDe::SK::serialize(key).as_ref())
623 .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
624 }
625
626 pub fn get(
627 &self,
628 key: &K,
629 ) -> std::result::Result<
630 Option<Value<K, V, SerDe>>,
631 sled::transaction::UnabortableTransactionError,
632 >
633 where
634 SerDe: serialize::SerDe<K, V>,
635 {
636 self.inner
637 .get(SerDe::SK::serialize(key))
638 .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
639 }
640
641 pub fn apply_batch(
642 &self,
643 batch: &Batch<K, V, SerDe>,
644 ) -> std::result::Result<(), sled::transaction::UnabortableTransactionError> {
645 self.inner.apply_batch(&batch.inner)
646 }
647
648 pub fn flush(&self) {
649 self.inner.flush()
650 }
651
652 pub fn generate_id(&self) -> Result<u64> {
653 self.inner.generate_id()
654 }
655}
656
657pub struct Iter<K, V, SerDe> {
658 inner: sled::Iter,
659 _key: PhantomData<fn() -> K>,
660 _value: PhantomData<fn() -> V>,
661 _serde: PhantomData<fn(SerDe)>,
662}
663
664impl<K, V, SerDe: serialize::SerDe<K, V>> Iterator for Iter<K, V, SerDe> {
665 type Item = Result<(Key<K, V, SerDe>, Value<K, V, SerDe>)>;
666
667 fn next(&mut self) -> Option<Self::Item> {
668 self.inner
669 .next()
670 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
671 }
672
673 fn last(mut self) -> Option<Self::Item> {
674 self.inner
675 .next_back()
676 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
677 }
678}
679
680impl<K, V, SerDe: serialize::SerDe<K, V>> DoubleEndedIterator for Iter<K, V, SerDe> {
681 fn next_back(&mut self) -> Option<Self::Item> {
682 self.inner
683 .next_back()
684 .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
685 }
686}
687
688impl<K, V, SerDe> Iter<K, V, SerDe> {
689 pub fn from_sled(iter: sled::Iter) -> Self {
690 Iter {
691 inner: iter,
692 _key: PhantomData,
693 _value: PhantomData,
694 _serde: PhantomData,
695 }
696 }
697
698 pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<Key<K, V, SerDe>>> + Send + Sync
699 where
700 SerDe: serialize::SerDe<K, V>,
701 K: Sync + Send,
702 V: Sync + Send,
703 {
704 self.map(|r| r.map(|(k, _v)| k))
705 }
706
707 pub fn values(self) -> impl DoubleEndedIterator<Item = Result<Value<K, V, SerDe>>> + Send + Sync
709 where
710 SerDe: serialize::SerDe<K, V>,
711 K: Sync + Send,
712 V: Sync + Send,
713 {
714 self.map(|r| r.map(|(_k, v)| v))
715 }
716}
717
718#[derive(Clone, Debug)]
719pub struct Batch<K, V, SerDe> {
720 inner: sled::Batch,
721 _key: PhantomData<fn() -> K>,
722 _value: PhantomData<fn() -> V>,
723 _serde: PhantomData<fn(SerDe)>,
724}
725
726impl<K, V, SerDe> Batch<K, V, SerDe> {
727 pub fn insert(&mut self, key: &K, value: &V)
728 where
729 SerDe: serialize::SerDe<K, V>,
730 {
731 self.inner.insert(
732 SerDe::SK::serialize(key).as_ref(),
733 SerDe::SV::serialize(value).as_ref(),
734 );
735 }
736
737 pub fn remove(&mut self, key: &K)
738 where
739 SerDe: serialize::SerDe<K, V>,
740 {
741 self.inner.remove(SerDe::SK::serialize(key).as_ref())
742 }
743}
744
745impl<K, V, SerDe> Default for Batch<K, V, SerDe> {
747 fn default() -> Self {
748 Self {
749 inner: Default::default(),
750 _key: PhantomData,
751 _value: PhantomData,
752 _serde: PhantomData,
753 }
754 }
755}
756
757use pin_project::pin_project;
758#[pin_project]
759pub struct Subscriber<K, V, SerDe> {
760 #[pin]
761 inner: sled::Subscriber,
762 _key: PhantomData<fn() -> K>,
763 _value: PhantomData<fn() -> V>,
764 _serde: PhantomData<fn(SerDe)>,
765}
766
767impl<K, V, SerDe> Subscriber<K, V, SerDe> {
768 pub fn next_timeout(
769 &mut self,
770 timeout: core::time::Duration,
771 ) -> core::result::Result<Event<K, V, SerDe>, std::sync::mpsc::RecvTimeoutError>
772 where
773 SerDe: serialize::SerDe<K, V>,
774 {
775 self.inner
776 .next_timeout(timeout)
777 .map(|e| Event::from_sled(e))
778 }
779
780 pub fn from_sled(subscriber: sled::Subscriber) -> Self {
781 Self {
782 inner: subscriber,
783 _key: PhantomData,
784 _value: PhantomData,
785 _serde: PhantomData,
786 }
787 }
788}
789
790use core::future::Future;
791use core::pin::Pin;
792use core::task::{Context, Poll};
793impl<K: Unpin, V: Unpin, SerDe: serialize::SerDe<K, V>> Future for Subscriber<K, V, SerDe> {
794 type Output = Option<Event<K, V, SerDe>>;
795
796 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
797 self.project()
798 .inner
799 .poll(cx)
800 .map(|opt| opt.map(|e| Event::from_sled(e)))
801 }
802}
803
804impl<K, V, SerDe: serialize::SerDe<K, V>> Iterator for Subscriber<K, V, SerDe> {
805 type Item = Event<K, V, SerDe>;
806
807 fn next(&mut self) -> Option<Event<K, V, SerDe>> {
808 self.inner.next().map(|e| Event::from_sled(e))
809 }
810}
811
812pub enum Event<K, V, SerDe: serialize::SerDe<K, V>> {
813 Insert {
814 key: Key<K, V, SerDe>,
815 value: Value<K, V, SerDe>,
816 },
817 Remove {
818 key: Key<K, V, SerDe>,
819 },
820}
821
822impl<K, V, SerDe: serialize::SerDe<K, V>> Event<K, V, SerDe> {
823 pub fn key(&self) -> &Key<K, V, SerDe> {
824 match self {
825 Self::Insert { key, .. } | Self::Remove { key } => key,
826 }
827 }
828
829 pub fn from_sled(event: sled::Event) -> Self {
830 match event {
831 sled::Event::Insert { key, value } => Self::Insert {
832 key: SerDe::DK::deserialize(key),
833 value: SerDe::DV::deserialize(value),
834 },
835 sled::Event::Remove { key } => Self::Remove {
836 key: SerDe::DK::deserialize(key),
837 },
838 }
839 }
840}
841
842#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
844pub struct CompareAndSwapError<V> {
845 pub current: Option<V>,
847 pub proposed: Option<V>,
849}
850
851impl<V> fmt::Display for CompareAndSwapError<V> {
852 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853 write!(f, "Compare and swap conflict")
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use super::*;
860 use crate::custom_serde::serialize::BincodeSerDe;
861
862 #[test]
863 fn test_range() {
864 let config = sled::Config::new().temporary(true);
865 let db = config.open().unwrap();
866
867 let tree: Tree<u32, u32, BincodeSerDe> = Tree::open(&db, "test_tree");
868
869 tree.insert(&1, &2).unwrap();
870 tree.insert(&3, &4).unwrap();
871 tree.insert(&6, &2).unwrap();
872 tree.insert(&10, &2).unwrap();
873 tree.insert(&15, &2).unwrap();
874 tree.flush().unwrap();
875
876 let expect_results = [(6, 2), (10, 2)];
877
878 for (i, result) in tree.range(6..11).enumerate() {
879 assert_eq!(result.unwrap(), expect_results[i]);
880 }
881 }
882
883 #[test]
884 fn test_cas() {
885 let config = sled::Config::new().temporary(true);
886 let db = config.open().unwrap();
887
888 let tree: Tree<u32, u32, BincodeSerDe> = Tree::open(&db, "test_tree");
889
890 let current = 2;
891 tree.insert(&1, ¤t).unwrap();
892 let expected = 3;
893 let proposed = 4;
894 let res = tree
895 .compare_and_swap(&1, Some(&expected), Some(&proposed))
896 .expect("db failure");
897
898 assert_eq!(
899 res,
900 Err(CompareAndSwapError {
901 current: Some(current),
902 proposed: Some(proposed),
903 }),
904 );
905 }
906}