1use super::{dag_cbor_size_serializer::DagCborSizeSerializer, node_builder::NodeReader};
5use crate::{
6 AnyBlockStorage, Block, BlockSerializer, BlockStorageExt, Link, Node, NodeBuilder, NodeBuilderError,
7 NodeSerializer, NodeStream, OptionLink, StorageError,
8};
9use anyhow::anyhow;
10use bloomfilter::Bloom;
11use cid::Cid;
12use either::Either;
13use futures::{pin_mut, stream, FutureExt, Stream, StreamExt, TryStreamExt};
14use num_rational::Ratio;
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use std::{
17 cmp::Ordering,
18 collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
19 fmt::Debug,
20 future::ready,
21 hash::Hash,
22 marker::PhantomData,
23 num::TryFromIntError,
24};
25
26const INLINE_SIZE_FACTOR_LEVELS: Ratio<usize> = Ratio::new_raw(2, 8);
27const INLINE_SIZE_FACTOR_ACTIVE: Ratio<usize> = Ratio::new_raw(4, 8);
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct Root<K, V>
32where
33 K: Hash + Ord + Clone + Send + Sync + 'static,
34 V: Clone + Send + Sync + 'static,
35{
36 #[serde(rename = "l", default = "Node::default", skip_serializing_if = "Node::is_empty")]
39 pub levels: Node<Level<K, V>>,
40
41 #[serde(rename = "a", default = "Node::default", skip_serializing_if = "Node::is_empty")]
44 pub active: Node<(K, Value<V>)>,
45
46 #[serde(
48 rename = "s",
49 default = "LsmTreeMapSettings::default",
50 skip_serializing_if = "LsmTreeMapSettings::is_default"
51 )]
52 pub settings: LsmTreeMapSettings,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct Level<K, V>
58where
59 K: Hash + Ord + Clone + Send + Sync + 'static,
60 V: Clone + Send + Sync + 'static,
61{
62 #[serde(rename = "r", default = "OptionLink::default", skip_serializing_if = "OptionLink::is_none")]
64 pub runs: OptionLink<Node<Run<K, V>>>,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct Run<K, V>
70where
71 K: Hash + Ord + Clone + Send + Sync + 'static,
72 V: Clone + Send + Sync + 'static,
73{
74 #[serde(rename = "e")]
76 pub entries: OptionLink<RunNode<K, V>>,
77
78 #[serde(rename = "s")]
80 pub size: u64,
81
82 #[serde(rename = "i")]
84 pub bloom: BloomFilter,
85
86 #[serde(rename = "l")]
88 pub min_key: K,
89
90 #[serde(rename = "h")]
92 pub max_key: K,
93}
94impl<K, V> Run<K, V>
95where
96 K: Hash + Ord + Clone + Send + Sync + 'static,
97 V: Clone + Send + Sync + 'static,
98{
99 fn may_contains_key(&self, key: &K) -> bool {
101 key >= &self.min_key && key <= &self.max_key && self.bloom.may_contains_key(key)
102 }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107#[non_exhaustive]
108pub enum BloomFilter {
109 #[serde(rename = "b", with = "serde_bytes")]
111 Bloomfilter(Vec<u8>),
112}
113impl BloomFilter {
114 pub fn may_contains_key<K: Hash + Ord + Clone + Send + Sync + 'static>(&self, key: &K) -> bool {
116 match self {
117 BloomFilter::Bloomfilter(data) => {
118 if let Ok(bloom) = Bloom::from_slice(data) {
119 bloom.check(key)
120 } else {
121 true
122 }
123 },
124 }
125 }
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum Value<V>
154where
155 V: Clone + 'static,
156{
157 #[serde(rename = "v")]
158 Value(V),
159 #[serde(rename = "t")]
160 Tombstone,
161}
162impl<V> PartialEq for Value<V>
163where
164 V: PartialEq + Clone + 'static,
165{
166 fn eq(&self, other: &Self) -> bool {
167 match (self, other) {
168 (Self::Tombstone, Self::Tombstone) => true,
169 (Self::Value(a), Self::Value(b)) => a == b,
170 _ => false,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub enum RunNode<K, V>
177where
178 K: Hash + Ord + Clone + Send + Sync + 'static,
179 V: Clone + Send + Sync + 'static,
180{
181 #[serde(rename = "n")]
183 Node {
184 #[serde(rename = "n")]
186 nodes: Vec<Link<Self>>,
187
188 #[serde(rename = "l")]
190 min_key: K,
191
192 #[serde(rename = "h")]
194 max_key: K,
195 },
196
197 #[serde(rename = "l")]
199 Leaf(BTreeMap<K, Value<V>>),
200}
201impl<K, V> RunNode<K, V>
202where
203 K: Hash + Ord + Clone + Send + Sync + 'static,
204 V: Clone + Send + Sync + 'static,
205{
206 fn may_contains_key(&self, key: &K) -> bool {
208 match self {
209 RunNode::Node { nodes: _, min_key, max_key } => key >= min_key && key <= max_key,
210 RunNode::Leaf(items) => items.contains_key(key),
211 }
212 }
213}
214impl<K, V> NodeReader<(K, Value<V>)> for RunNode<K, V>
215where
216 K: Hash + Ord + Clone + Send + Sync + 'static,
217 V: Clone + Send + Sync + 'static,
218{
219 type Filter = RunNodeFilter<K>;
220
221 fn read(self, filter: &Self::Filter) -> Either<Vec<Cid>, Vec<(K, Value<V>)>> {
222 match self {
223 RunNode::Node { nodes, min_key, max_key } => {
224 if filter.test(&min_key, &max_key) {
225 Either::Left(Vec::new())
227 } else {
228 Either::Left(nodes.into_iter().map(Into::into).collect())
230 }
231 },
232 RunNode::Leaf(items) => Either::Right(items.into_iter().collect()),
233 }
234 }
235}
236
237#[derive(Debug, Default)]
238pub enum RunNodeFilter<K> {
239 #[default]
240 None,
241 Max(K),
242 Min(K),
243}
244impl<K> RunNodeFilter<K> {
245 pub fn min(key: Option<K>) -> Self {
246 match key {
247 None => Self::None,
248 Some(key) => Self::Min(key),
249 }
250 }
251
252 pub fn max(key: Option<K>) -> Self {
253 match key {
254 None => Self::None,
255 Some(key) => Self::Max(key),
256 }
257 }
258
259 pub fn test(&self, min_key: &K, max_key: &K) -> bool
261 where
262 K: Ord,
263 {
264 match self {
265 RunNodeFilter::Min(filter_min_key) => min_key > filter_min_key,
266 RunNodeFilter::Max(filter_max_key) => max_key < filter_max_key,
267 RunNodeFilter::None => false,
268 }
269 }
270}
271
272#[derive(Debug)]
277pub struct RunNodeSerializer<K, V> {
278 _d: PhantomData<(K, V)>,
279 pending: BTreeMap<Cid, (K, K)>,
280}
281impl<K, V> RunNodeSerializer<K, V> {
282 pub fn new() -> Self {
283 Self { _d: Default::default(), pending: Default::default() }
284 }
285}
286impl<K, V> NodeSerializer<RunNode<K, V>, (K, Value<V>)> for RunNodeSerializer<K, V>
287where
288 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
289 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
290{
291 fn nodes(&mut self, nodes: Vec<Link<RunNode<K, V>>>) -> Result<RunNode<K, V>, NodeBuilderError> {
292 let mut min_key = None;
293 let mut max_key = None;
294 for node in nodes.iter() {
295 if let Some((node_min_key, node_max_key)) = self.pending.remove(node.cid()) {
296 if min_key.is_none() || Some(&node_min_key).cmp(&min_key.as_ref()) == Ordering::Less {
297 min_key = Some(node_min_key);
298 }
299 if max_key.is_none() || Some(&node_max_key).cmp(&max_key.as_ref()) == Ordering::Greater {
300 max_key = Some(node_max_key);
301 }
302 }
303 }
304 Ok(RunNode::Node {
305 nodes,
306 min_key: min_key.ok_or(NodeBuilderError::InvalidArgument(anyhow!("Unable to determine min key")))?,
307 max_key: max_key.ok_or(NodeBuilderError::InvalidArgument(anyhow!("Unable to determine max key")))?,
308 })
309 }
310
311 fn leaf(&mut self, entries: Vec<(K, Value<V>)>) -> Result<RunNode<K, V>, NodeBuilderError> {
312 Ok(RunNode::Leaf(entries.into_iter().collect()))
313 }
314
315 fn serialize(&mut self, max_bock_size: usize, node: RunNode<K, V>) -> Result<Block, NodeBuilderError> {
316 let block = BlockSerializer::new()
317 .with_max_block_size(max_bock_size)
318 .serialize(&node)
319 .map_err(|err| NodeBuilderError::Encoding(err.into()))?;
320
321 match &node {
323 RunNode::Node { nodes: _, min_key, max_key } => {
324 self.pending.insert(*block.cid(), (min_key.clone(), max_key.clone()));
325 },
326 RunNode::Leaf(items) => {
327 if let (Some((first, _)), Some((last, _))) = (items.first_key_value(), items.last_key_value()) {
328 self.pending.insert(*block.cid(), (first.clone(), last.clone()));
329 }
330 },
331 }
332
333 Ok(block)
335 }
336
337 fn item_size_hint(&self, item: &(K, Value<V>)) -> Option<usize> {
338 let mut serializer = DagCborSizeSerializer::new();
339 item.serialize(&mut serializer).ok()?;
340 Some(serializer.size)
341 }
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
345pub struct LsmTreeMapSettings {
346 #[serde(
348 rename = "n",
349 default = "LsmTreeMapSettings::default_max_node_entries",
350 skip_serializing_if = "LsmTreeMapSettings::is_default_max_node_entries"
351 )]
352 pub max_node_entries: u64,
353
354 #[serde(
357 rename = "a",
358 default = "LsmTreeMapSettings::default_max_active_entries",
359 skip_serializing_if = "LsmTreeMapSettings::is_default_max_active_entries"
360 )]
361 pub max_active_entries: u64,
362
363 #[serde(
366 rename = "r",
367 default = "LsmTreeMapSettings::default_max_run_count",
368 skip_serializing_if = "LsmTreeMapSettings::is_default_max_run_count"
369 )]
370 pub max_run_count: u64,
371}
372impl LsmTreeMapSettings {
373 fn default_max_node_entries() -> u64 {
374 2u64.checked_pow(8).expect("to pow") }
376
377 fn default_max_active_entries() -> u64 {
378 2u64.checked_pow(14).expect("to pow") }
380
381 fn default_max_run_count() -> u64 {
382 2u64.checked_pow(4).expect("to pow") }
384
385 fn is_default_max_node_entries(value: &u64) -> bool {
386 *value == Self::default_max_node_entries()
387 }
388
389 fn is_default_max_active_entries(value: &u64) -> bool {
390 *value == Self::default_max_active_entries()
391 }
392
393 fn is_default_max_run_count(value: &u64) -> bool {
394 *value == Self::default_max_run_count()
395 }
396
397 pub fn is_default(&self) -> bool {
399 self == &Self::default()
400 }
401}
402impl Default for LsmTreeMapSettings {
403 fn default() -> Self {
404 Self {
405 max_node_entries: Self::default_max_node_entries(),
406 max_active_entries: Self::default_max_active_entries(),
407 max_run_count: Self::default_max_run_count(),
408 }
409 }
410}
411
412#[derive(Clone)]
414pub struct LsmTreeMap<S, K, V>
415where
416 S: AnyBlockStorage,
417 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
418 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
419{
420 storage: S,
422
423 root: OptionLink<Root<K, V>>,
425
426 active: BTreeMap<K, Value<V>>,
428
429 settings: LsmTreeMapSettings,
431 }
434impl<S, K, V> LsmTreeMap<S, K, V>
435where
436 S: AnyBlockStorage,
437 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
438 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
439{
440 pub fn new(storage: S, settings: LsmTreeMapSettings) -> Self {
442 Self { active: Default::default(), settings, root: OptionLink::none(), storage }
443 }
444
445 pub async fn load(storage: S, root: Link<Root<K, V>>) -> Result<Self, StorageError> {
447 let mut result = Self { active: Default::default(), settings: Default::default(), root: root.into(), storage };
448 if let Some(root) = result.root().await? {
449 result.settings = root.settings;
450 result.active = NodeStream::from_node(result.storage.clone(), root.active, None)
451 .try_collect()
452 .await?;
453 }
454 Ok(result)
455 }
456
457 pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
459 self.active.insert(key, Value::Value(value));
461
462 if self.active.len() >= self.settings.max_active_entries as usize {
464 self.flush_active().boxed().await?;
465 }
466
467 Ok(())
469 }
470
471 pub async fn remove(&mut self, key: K) -> Result<(), StorageError> {
473 if self.root.is_none() {
475 self.active.remove(&key);
476 return Ok(());
477 }
478
479 self.active.insert(key, Value::Tombstone);
481
482 if self.active.len() >= self.settings.max_active_entries as usize {
484 self.flush_active().boxed().await?;
485 }
486
487 Ok(())
489 }
490
491 pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
493 match self.active.get(key) {
494 Some(Value::Value(v)) => Ok(Some(v.clone())),
495 Some(Value::Tombstone) => Ok(None),
496 None => {
497 let runs = Self::load_levels_and_runs(self.storage.clone(), self.root);
499 pin_mut!(runs);
500 while let Some(item) = runs.try_next().await? {
501 if let Some((_, run)) = item.right() {
502 if run.may_contains_key(key) {
503 let mut stack: VecDeque<Link<RunNode<K, V>>> = Default::default();
506 if let Some(cid) = run.entries.link() {
507 stack.push_back(cid);
508 }
509 while let Some(cid) = stack.pop_front() {
510 let node = self.storage.get_value(&cid).await?;
511 if node.may_contains_key(key) {
512 match node {
513 RunNode::Node { nodes, min_key: _, max_key: _ } => {
514 stack.extend(nodes.into_iter());
515 },
516 RunNode::Leaf(mut items) => {
517 if let Some(value) = items.remove(key) {
518 return match value {
519 Value::Value(value) => Ok(Some(value)),
520 Value::Tombstone => Ok(None),
521 };
522 }
523 },
524 }
525 }
526 }
527 }
528 }
529 }
530 Ok(None)
531 },
532 }
533 }
534
535 pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
536 Ok(self.get(key).await?.is_some())
537 }
538
539 pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
541 self.stream_query(None)
542 }
543
544 pub fn stream_query(&self, start_at: Option<K>) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
546 self.create_stream(None, start_at).try_filter_map(|item| {
547 ready(Ok(match item.1 {
548 Value::Value(value) => Some((item.0, value)),
549 Value::Tombstone => None,
550 }))
551 })
552 }
553
554 pub fn reverse_stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
556 self.reverse_stream_query(None)
557 }
558
559 pub fn reverse_stream_query(
561 &self,
562 start_at: Option<K>,
563 ) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
564 self.create_reverse_stream(None, start_at).try_filter_map(|item| {
565 ready(Ok(match item.1 {
566 Value::Value(value) => Some((item.0, value)),
567 Value::Tombstone => None,
568 }))
569 })
570 }
571
572 pub async fn store(&mut self) -> Result<OptionLink<Root<K, V>>, StorageError> {
577 let mut root = match self.root().await? {
579 Some(root) => {
580 if self.is_empty().await? {
582 return Ok(OptionLink::none());
583 }
584
585 root
587 },
588 None => {
589 if self.active.is_empty() || self.is_empty().await? {
591 return Ok(OptionLink::none());
592 }
593
594 Root { levels: Default::default(), active: Default::default(), settings: self.settings.clone() }
596 },
597 };
598
599 root.active =
601 store_active(&self.storage, self.settings.max_node_entries, stream::iter(self.active.iter()).map(Ok))
602 .await?;
603
604 self.root = self.storage.set_value(&root).await?.into();
606
607 Ok(self.root)
609 }
610
611 pub async fn stats(&self) -> Result<LsmTreeStats, StorageError> {
613 Self::load_levels_and_runs(self.storage.clone(), self.root)
614 .try_fold(
615 LsmTreeStats { entries: 0, active_entries: self.active.len(), levels: 0, runs: 0 },
616 |mut result, item| {
617 match item {
618 Either::Left(_) => result.levels += 1,
619 Either::Right((_, run)) => {
620 result.runs += 1;
621 result.entries += run.size as usize;
622 },
623 }
624 ready(Ok(result))
625 },
626 )
627 .await
628 }
629
630 pub async fn is_empty(&self) -> Result<bool, StorageError> {
632 Ok(self.min_key().await?.is_none())
633 }
634
635 pub async fn min_key(&self) -> Result<Option<K>, StorageError> {
637 let stream = self.stream();
638 pin_mut!(stream);
639 let first = stream.try_next().await?;
640 Ok(first.map(|(key, _)| key))
641 }
642
643 pub async fn max_key(&self) -> Result<Option<K>, StorageError> {
645 let stream = self.reverse_stream();
646 pin_mut!(stream);
647 let first = stream.try_next().await?;
648 Ok(first.map(|(key, _)| key))
649 }
650
651 pub async fn compact(&mut self, flush_active: bool) -> Result<(), StorageError> {
656 if flush_active {
657 self.flush_active().boxed().await?;
658 }
659 self.compact_level(0, Some(self.settings.max_run_count as usize)).await?;
660 Ok(())
661 }
662
663 fn create_stream(
669 &self,
670 only_run_indicies: Option<BTreeSet<usize>>,
671 start_at: Option<K>,
672 ) -> impl Stream<Item = Result<(K, Value<V>), StorageError>> + Send + use<S, K, V> {
673 let storage = self.storage.clone();
674 let active = self.active.clone();
675 let root = self.root;
676 async_stream::try_stream! {
677 let mut heap = match only_run_indicies {
679 Some(_) => BinaryHeap::new(),
680 None => BinaryHeap::<TreeStreamItem<K, V>>::from(
681 active
682 .into_iter()
683 .map(|item| TreeStreamItem { run: None, item })
684 .collect::<Vec<_>>(),
685 ),
686 };
687
688 let mut runs: Vec<(usize, RunNodeStream<S, K, V>)> = Self::load_levels_and_runs(storage.clone(), root)
691 .try_filter_map(|item| ready(Ok(item.right())))
692 .try_filter(|(index, _)| ready(match &only_run_indicies {
693 Some(run_indicies) => run_indicies.contains(index),
694 None => true,
695 }))
696 .map_ok(|(index, run)| (index, NodeStream::from_link(storage.clone(), run.entries).with_filter(RunNodeFilter::max(start_at.clone()))))
697 .try_collect()
698 .await?;
699 for (run_index, run) in runs.iter_mut() {
700 if let Some(item) = run.try_next().await? {
701 heap.push(TreeStreamItem { run: Some(*run_index), item });
702 }
703 }
704
705 while let Some(item) = Self::pop_and_fetch(&mut heap, &mut runs).await? {
707 while let Some(next_item) = heap.peek() {
710 if next_item.item.0 == item.item.0 {
711 Self::pop_and_fetch(&mut heap, &mut runs).await?;
712 } else {
713 break;
714 }
715 }
716
717 if let Some(start_at) = &start_at {
720 if start_at > &item.item.0 {
721 continue;
722 }
723 }
724
725 yield item.item;
727 }
728 }
729 }
731
732 fn create_reverse_stream(
734 &self,
735 only_run_indicies: Option<BTreeSet<usize>>,
736 start_at: Option<K>,
737 ) -> impl Stream<Item = Result<(K, Value<V>), StorageError>> + use<S, K, V> {
738 let storage = self.storage.clone();
739 let active = self.active.clone();
740 let root = self.root;
741 async_stream::try_stream! {
742 let mut heap = match only_run_indicies {
744 Some(_) => BinaryHeap::new(),
745 None => BinaryHeap::<ReverseTreeStreamItem<K, V>>::from(
746 active
747 .into_iter()
748 .map(|item| ReverseTreeStreamItem { run: None, item })
749 .collect::<Vec<_>>(),
750 ),
751 };
752
753 let mut runs: Vec<(usize, RunNodeStream<S, K, V>)> = Self::load_levels_and_runs(storage.clone(), root)
756 .try_filter_map(|item| ready(Ok(item.right())))
757 .try_filter(|(index, _)| ready(match &only_run_indicies {
758 Some(run_indicies) => run_indicies.contains(index),
759 None => true,
760 }))
761 .map_ok(|(index, run)| (index, NodeStream::from_link(storage.clone(), run.entries).with_reverse().with_filter(RunNodeFilter::min(start_at.clone()))))
762 .try_collect()
763 .await?;
764 for (run_index, run) in runs.iter_mut() {
765 if let Some(item) = run.try_next().await? {
766 heap.push(ReverseTreeStreamItem { run: Some(*run_index), item });
767 }
768 }
769
770 while let Some(item) = Self::pop_and_fetch_reverse(&mut heap, &mut runs).await? {
772 while let Some(next_item) = heap.peek() {
775 if next_item.item.0 == item.item.0 {
776 Self::pop_and_fetch_reverse(&mut heap, &mut runs).await?;
777 } else {
778 break;
779 }
780 }
781
782 if let Some(start_at) = &start_at {
785 if start_at < &item.item.0 {
786 continue;
787 }
788 }
789
790 yield item.item;
792 }
793 }
794 }
796
797 async fn pop_and_fetch(
799 heap: &mut BinaryHeap<TreeStreamItem<K, V>>,
800 runs: &mut [(usize, RunNodeStream<S, K, V>)],
801 ) -> Result<Option<TreeStreamItem<K, V>>, StorageError> {
802 if let Some(item) = heap.pop() {
803 if let Some(run_index) = item.run {
808 if let Some((_, run)) = runs.get_mut(run_index) {
809 if let Some(item) = run.try_next().await? {
810 heap.push(TreeStreamItem { run: Some(run_index), item });
811 }
812 }
813 }
814
815 Ok(Some(item))
817 } else {
818 Ok(None)
819 }
820 }
821
822 async fn pop_and_fetch_reverse(
824 heap: &mut BinaryHeap<ReverseTreeStreamItem<K, V>>,
825 runs: &mut [(usize, RunNodeStream<S, K, V>)],
826 ) -> Result<Option<ReverseTreeStreamItem<K, V>>, StorageError> {
827 if let Some(item) = heap.pop() {
828 if let Some(run_index) = item.run {
833 if let Some((_, run)) = runs.get_mut(run_index) {
834 if let Some(item) = run.try_next().await? {
835 heap.push(ReverseTreeStreamItem { run: Some(run_index), item });
836 }
837 }
838 }
839
840 Ok(Some(item))
842 } else {
843 Ok(None)
844 }
845 }
846
847 async fn flush_active(&mut self) -> Result<usize, StorageError> {
849 if self.active.is_empty() {
851 return Ok(0);
852 }
853 let min_key = if let Some((min_key, _)) = self.active.first_key_value() {
854 min_key.clone()
855 } else {
856 return Ok(0);
857 };
858 let max_key = if let Some((max_key, _)) = self.active.last_key_value() {
859 max_key.clone()
860 } else {
861 return Ok(0);
862 };
863 if min_key == max_key {
864 return Ok(0);
865 }
866
867 let run = if let Some(run) = store_run(
870 &self.storage,
871 self.settings.max_node_entries,
872 stream::iter(self.active.clone().into_iter()).map(Ok),
873 self.active.len(),
874 )
875 .await?
876 {
877 run
878 } else {
879 return Ok(0);
880 };
881
882 let mut root = match self.root().await? {
884 Some(root) => root,
885 None => Root { levels: Default::default(), active: Default::default(), settings: self.settings.clone() },
886 };
887
888 let mut levels = Self::load_levels(self.storage.clone(), root.levels.clone()).await?;
892 if levels.is_empty() {
893 levels.insert(0, Level { runs: Default::default() });
894 }
895 let mut runs = NodeStream::from_link(self.storage.clone(), levels[0].runs)
896 .try_collect::<VecDeque<_>>()
897 .await?;
898 runs.push_front(run);
899 let runs_count = runs.len();
900 levels[0].runs =
901 store_items(&self.storage, self.settings.max_node_entries, stream::iter(runs.iter()).map(Ok)).await?;
902 root.levels = self.store_levels(levels).await?;
903
904 self.root = self.storage.set_value(&root).await?.into();
906
907 self.active.clear();
909
910 if runs_count >= self.settings.max_run_count as usize {
912 self.compact_level(0, Some(self.settings.max_run_count as usize)).await?;
913 }
914
915 Ok(0)
917 }
918
919 async fn compact_level(&mut self, level_index: usize, cascade: Option<usize>) -> Result<(), StorageError> {
925 let next_level_index = level_index + 1;
926
927 let mut root = if let Some(root) = self.root().await? { root } else { return Ok(()) };
929 let mut levels = Self::load_levels(self.storage.clone(), root.levels.clone()).await?;
930
931 if levels.get(level_index).is_none() {
933 return Ok(());
934 }
935
936 if levels.get(next_level_index).is_none() {
938 levels.insert(next_level_index, Level { runs: OptionLink::none() });
939 }
940
941 let mut runs = Vec::new();
943 {
944 let levels_and_runs = Self::load_levels_and_runs(self.storage.clone(), self.root);
945 pin_mut!(levels_and_runs);
946 let mut current_global_level_index = 0;
947 while let Some(level_or_run) = levels_and_runs.try_next().await? {
948 match level_or_run {
949 Either::Left((global_level_index, _level)) => {
950 current_global_level_index = global_level_index;
951 if current_global_level_index > next_level_index {
953 break;
954 }
955 },
956 Either::Right((global_run_index, run)) => {
957 if current_global_level_index == level_index {
958 runs.push((current_global_level_index, global_run_index, run));
960 } else if current_global_level_index == next_level_index {
961 if runs
963 .iter()
964 .any(|(_, _, r)| r.min_key <= run.max_key && run.min_key <= r.max_key)
965 {
966 runs.push((current_global_level_index, global_run_index, run));
967 }
968 }
969 },
970 }
971 }
972 }
973 if runs.is_empty() {
974 return Ok(());
975 }
976
977 let entries =
979 self.create_stream(Some(runs.iter().map(|(_, global_run_index, _)| *global_run_index).collect()), None);
980 let run = store_run(
981 &self.storage,
982 self.settings.max_node_entries,
983 entries,
984 runs.iter().fold(0, |result, (_, _, run)| result + run.size) as usize,
985 )
986 .await?;
987 let run = if let Some(run) = run {
988 run
989 } else {
990 return Ok(());
991 };
992
993 let (level_run_count, next_level_run_count) = {
995 let mut level_runs = NodeStream::from_link(self.storage.clone(), levels[level_index].runs)
996 .try_collect::<Vec<_>>()
997 .await?;
998 let mut next_level_runs = NodeStream::from_link(self.storage.clone(), levels[next_level_index].runs)
999 .try_collect::<Vec<_>>()
1000 .await?;
1001
1002 for (global_level_index, global_run_index, _) in runs.iter().rev() {
1004 let local_run_index = *global_run_index - *global_level_index;
1005 if *global_level_index == level_index && level_runs.get(local_run_index).is_some() {
1006 level_runs.remove(*global_run_index - *global_level_index);
1007 }
1008 if *global_level_index == next_level_index && next_level_runs.get(local_run_index).is_some() {
1009 next_level_runs.remove(*global_run_index - *global_level_index);
1010 }
1011 }
1012
1013 next_level_runs.insert(0, run);
1015
1016 let level_run_count = level_runs.len();
1018 let next_level_run_count = next_level_runs.len();
1019 levels[level_index].runs =
1020 store_items(&self.storage, self.settings.max_node_entries, stream::iter(&level_runs).map(Ok)).await?;
1021 levels[next_level_index].runs =
1022 store_items(&self.storage, self.settings.max_node_entries, stream::iter(&next_level_runs).map(Ok))
1023 .await?;
1024
1025 (level_run_count, next_level_run_count)
1027 };
1028
1029 let next_level_index = if level_run_count == 0 {
1031 levels.remove(level_index);
1032 level_index
1033 } else {
1034 next_level_index
1035 };
1036
1037 root.levels = self.store_levels(levels).await?;
1039
1040 self.root = self.storage.set_value(&root).await?.into();
1042
1043 if let Some(max_run_count) = cascade {
1045 if next_level_run_count >= max_run_count {
1046 Box::pin(self.compact_level(next_level_index, cascade)).await?;
1047 }
1048 }
1049
1050 Ok(())
1053 }
1054
1055 async fn root(&self) -> Result<Option<Root<K, V>>, StorageError> {
1057 Self::load_root(&self.storage, self.root).await
1058 }
1059
1060 async fn store_levels(&self, levels: Vec<Level<K, V>>) -> Result<Node<Level<K, V>>, StorageError> {
1062 let mut builder = NodeBuilder::default()
1063 .with_items_size_max((INLINE_SIZE_FACTOR_LEVELS * self.storage.max_block_size()).to_integer());
1064 builder.extend(levels).map_err(|e| StorageError::InvalidArgument(e.into()))?;
1065 let (node, blocks) = builder.into_node().map_err(|e| StorageError::InvalidArgument(e.into()))?;
1066 for block in blocks {
1067 self.storage.set(block).await?;
1068 }
1069 Ok(node)
1070 }
1071
1072 async fn load_root(storage: &S, root: OptionLink<Root<K, V>>) -> Result<Option<Root<K, V>>, StorageError> {
1074 storage.get_value_or_none(&root).await
1075 }
1076
1077 async fn load_levels(storage: S, levels: Node<Level<K, V>>) -> Result<Vec<Level<K, V>>, StorageError> {
1079 NodeStream::from_node(storage, levels, None).try_collect().await
1080 }
1081
1082 fn load_levels_and_runs(
1085 storage: S,
1086 root: OptionLink<Root<K, V>>,
1087 ) -> impl Stream<Item = Result<EitherLevelOrRun<K, V>, StorageError>> + Send {
1088 async_stream::try_stream! {
1089 let mut global_level_index = 0;
1090 let mut global_run_index = 0;
1091 if let Some(root) = Self::load_root(&storage, root).await? {
1092 for level in Self::load_levels(storage.clone(), root.levels).await? {
1093 let runs = NodeStream::from_link(storage.clone(), level.runs);
1094 yield Either::Left((global_level_index, level));
1095 for await run in runs {
1096 yield Either::Right((global_run_index, run?));
1097 global_run_index += 1;
1098 }
1099 global_level_index += 1;
1100 }
1101 }
1102 }
1103 }
1104}
1105
1106type EitherLevelOrRun<K, V> = Either<(usize, Level<K, V>), (usize, Run<K, V>)>;
1107type RunNodeStream<S, K, V> = NodeStream<S, (K, Value<V>), RunNode<K, V>>;
1108
1109async fn store_items<'a, S, T>(
1111 storage: &S,
1112 max_node_entries: u64,
1113 items: impl Stream<Item = Result<&'a T, StorageError>> + Send,
1114) -> Result<OptionLink<Node<T>>, StorageError>
1115where
1116 S: AnyBlockStorage,
1117 T: Clone + Serialize + 'a,
1118{
1119 let mut node_builder = NodeBuilder::<_>::new(
1120 storage.max_block_size(),
1121 max_node_entries
1122 .try_into()
1123 .map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
1124 Default::default(),
1125 );
1126 pin_mut!(items);
1127 while let Some(item) = items.try_next().await? {
1128 node_builder.push(item).map_err(|e| StorageError::InvalidArgument(e.into()))?;
1129 for block in node_builder.take_blocks() {
1130 storage.set(block).await?;
1131 }
1132 }
1133 let (root, blocks) = node_builder
1134 .into_blocks()
1135 .map_err(|e| StorageError::InvalidArgument(e.into()))?;
1136 for block in blocks.into_iter() {
1137 storage.set(block).await?;
1138 }
1139
1140 Ok(root.cid().into())
1142}
1143
1144async fn store_active<S, K, V>(
1146 storage: &S,
1147 max_node_entries: u64,
1148 entries: impl Stream<Item = Result<(&K, &Value<V>), StorageError>>,
1149) -> Result<Node<(K, Value<V>)>, StorageError>
1150where
1151 S: AnyBlockStorage,
1152 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1153 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1154{
1155 let mut node_builder = NodeBuilder::<_>::new(
1156 storage.max_block_size(),
1157 max_node_entries
1158 .try_into()
1159 .map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
1160 Default::default(),
1161 )
1162 .with_items_size_max((INLINE_SIZE_FACTOR_ACTIVE * storage.max_block_size()).to_integer());
1163 pin_mut!(entries);
1164 while let Some(item) = entries.try_next().await? {
1165 node_builder
1166 .push((item.0.clone(), item.1.clone()))
1167 .map_err(|e| StorageError::InvalidArgument(e.into()))?;
1168 for block in node_builder.take_blocks() {
1169 storage.set(block).await?;
1170 }
1171 }
1172 let (node, blocks) = node_builder.into_node().map_err(|e| StorageError::InvalidArgument(e.into()))?;
1173 for block in blocks.into_iter() {
1174 storage.set(block).await?;
1175 }
1176
1177 Ok(node)
1179}
1180
1181async fn store_run_node<S, K, V>(
1183 storage: &S,
1184 max_node_entries: u64,
1185 entries: impl Stream<Item = Result<(K, Value<V>), StorageError>>,
1186) -> Result<OptionLink<RunNode<K, V>>, StorageError>
1187where
1188 S: AnyBlockStorage,
1189 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1190 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1191{
1192 let mut node_builder = NodeBuilder::<(K, Value<V>), RunNode<K, V>, RunNodeSerializer<K, V>>::new(
1193 storage.max_block_size(),
1194 max_node_entries
1195 .try_into()
1196 .map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
1197 RunNodeSerializer::new(),
1198 );
1199 pin_mut!(entries);
1200 while let Some(item) = entries.try_next().await? {
1201 node_builder.push(item).map_err(|e| StorageError::InvalidArgument(e.into()))?;
1202 for block in node_builder.take_blocks() {
1203 storage.set(block).await?;
1204 }
1205 }
1206 let (root, blocks) = node_builder
1207 .into_blocks()
1208 .map_err(|e| StorageError::InvalidArgument(e.into()))?;
1209 for block in blocks.into_iter() {
1210 storage.set(block).await?;
1211 }
1212
1213 Ok(root)
1215}
1216
1217async fn store_run<S, K, V>(
1219 storage: &S,
1220 max_node_entries: u64,
1221 entries: impl Stream<Item = Result<(K, Value<V>), StorageError>>,
1222 entries_size_hint: usize,
1223) -> Result<Option<Run<K, V>>, StorageError>
1224where
1225 S: AnyBlockStorage,
1226 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1227 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1228{
1229 let mut min_key: Option<K> = None;
1231 let mut max_key: Option<K> = None;
1232 let mut size = 0;
1233 let mut bloom = Bloom::<K>::new_for_fp_rate_with_seed(entries_size_hint, 0.001, &[0; 32])
1234 .map_err(|e| StorageError::InvalidArgument(anyhow!("bloomfilter: {}", e)))?;
1235 let entries_link = store_run_node(
1236 storage,
1237 max_node_entries,
1238 entries.inspect_ok(|(key, _)| {
1239 if min_key.is_none() || Some(key).cmp(&min_key.as_ref()) == Ordering::Less {
1240 min_key = Some(key.clone());
1241 }
1242 if max_key.is_none() || Some(key).cmp(&max_key.as_ref()) == Ordering::Greater {
1243 max_key = Some(key.clone());
1244 }
1245 size += 1;
1246 bloom.set(key);
1247 }),
1248 )
1249 .await?;
1250 let (min_key, max_key) = match (min_key, max_key) {
1251 (Some(min_key), Some(max_key)) => (min_key, max_key),
1252 _ => return Ok(None),
1253 };
1254 let next_run =
1255 Run { entries: entries_link, size, bloom: BloomFilter::Bloomfilter(bloom.to_bytes()), min_key, max_key };
1256 Ok(Some(next_run))
1257}
1258
1259#[derive(Debug, Clone, PartialEq)]
1261pub struct LsmTreeStats {
1262 pub entries: usize,
1264
1265 pub active_entries: usize,
1267
1268 pub levels: usize,
1270
1271 pub runs: usize,
1273}
1274
1275#[derive(Debug)]
1276struct TreeStreamItem<K, V>
1277where
1278 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1279 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1280{
1281 run: Option<usize>,
1282 item: (K, Value<V>),
1283}
1284impl<K, V> PartialEq for TreeStreamItem<K, V>
1285where
1286 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1287 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1288{
1289 fn eq(&self, other: &Self) -> bool {
1290 self.item.0 == other.item.0
1291 }
1292}
1293impl<K, V> Eq for TreeStreamItem<K, V>
1294where
1295 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1296 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1297{
1298}
1299impl<K, V> PartialOrd for TreeStreamItem<K, V>
1300where
1301 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1302 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1303{
1304 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1305 Some(self.cmp(other))
1306 }
1307}
1308impl<K, V> Ord for TreeStreamItem<K, V>
1309where
1310 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1311 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1312{
1313 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1314 match self.item.0.cmp(&other.item.0) {
1316 Ordering::Less => Ordering::Greater,
1317 Ordering::Greater => Ordering::Less,
1318 Ordering::Equal => {
1319 other.run.cmp(&self.run)
1321 },
1322 }
1323 }
1324}
1325
1326#[derive(Debug)]
1327struct ReverseTreeStreamItem<K, V>
1328where
1329 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1330 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1331{
1332 run: Option<usize>,
1333 item: (K, Value<V>),
1334}
1335impl<K, V> PartialEq for ReverseTreeStreamItem<K, V>
1336where
1337 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1338 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1339{
1340 fn eq(&self, other: &Self) -> bool {
1341 self.item.0 == other.item.0
1342 }
1343}
1344impl<K, V> Eq for ReverseTreeStreamItem<K, V>
1345where
1346 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1347 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1348{
1349}
1350impl<K, V> PartialOrd for ReverseTreeStreamItem<K, V>
1351where
1352 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1353 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1354{
1355 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1356 Some(self.cmp(other))
1357 }
1358}
1359impl<K, V> Ord for ReverseTreeStreamItem<K, V>
1360where
1361 K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1362 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1363{
1364 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1365 match self.item.0.cmp(&other.item.0) {
1366 Ordering::Less => Ordering::Less,
1367 Ordering::Greater => Ordering::Greater,
1368 Ordering::Equal => {
1369 other.run.cmp(&self.run)
1371 },
1372 }
1373 }
1374}
1375
1376#[cfg(test)]
1377mod tests {
1378 use super::{LsmTreeMap, Value};
1379 use crate::{
1380 from_cbor,
1381 library::{lsm_tree_map::LsmTreeStats, test::TestStorage},
1382 to_cbor, LsmTreeMapSettings,
1383 };
1384 use futures::TryStreamExt;
1385
1386 #[tokio::test]
1387 async fn smoke() {
1388 let storage = TestStorage::default();
1389 let mut tree = LsmTreeMap::new(storage.clone(), Default::default());
1390 tree.insert("hello".to_owned(), "world".to_owned()).await.unwrap();
1391 tree.insert("1".to_owned(), "2".to_owned()).await.unwrap();
1392 tree.insert("3".to_owned(), "4".to_owned()).await.unwrap();
1393 assert_eq!(
1394 tree.stream().try_collect::<Vec<_>>().await.unwrap(),
1395 vec![
1396 ("1".to_owned(), "2".to_owned()),
1397 ("3".to_owned(), "4".to_owned()),
1398 ("hello".to_owned(), "world".to_owned())
1399 ]
1400 );
1401
1402 let root = tree.store().await.unwrap().unwrap();
1404 let tree2 = LsmTreeMap::load(storage.clone(), root).await.unwrap();
1405 assert_eq!(
1406 tree2.stream().try_collect::<Vec<_>>().await.unwrap(),
1407 vec![
1408 ("1".to_owned(), "2".to_owned()),
1409 ("3".to_owned(), "4".to_owned()),
1410 ("hello".to_owned(), "world".to_owned())
1411 ]
1412 );
1413 }
1414
1415 #[tokio::test]
1416 async fn test_get() {
1417 let storage = TestStorage::default();
1418 let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
1419 let mut tree = LsmTreeMap::new(storage.clone(), settings);
1420 tree.insert(1, 100).await.unwrap();
1421 tree.insert(2, 200).await.unwrap();
1422 tree.insert(3, 300).await.unwrap();
1423 assert_eq!(tree.get(&1).await.unwrap(), Some(100));
1424 assert_eq!(tree.get(&2).await.unwrap(), Some(200));
1425 assert_eq!(tree.get(&3).await.unwrap(), Some(300));
1426 }
1427
1428 #[tokio::test]
1429 async fn test_compact() {
1430 let storage = TestStorage::default();
1431 let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
1432 let mut tree = LsmTreeMap::new(storage.clone(), settings);
1433 for i in 0..10 {
1434 tree.insert(i, i).await.unwrap();
1435 }
1436 assert_eq!(
1437 tree.stream().try_collect::<Vec<_>>().await.unwrap(),
1438 vec![(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),]
1439 );
1440 let stats = tree.stats().await.unwrap();
1441 assert_eq!(stats, LsmTreeStats { active_entries: 0, entries: 10, levels: 1, runs: 1 });
1442 }
1466
1467 #[tokio::test]
1468 async fn test_stream() {
1469 let storage = TestStorage::default();
1470 let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
1471 let mut tree = LsmTreeMap::new(storage.clone(), settings);
1472 tree.insert(0, 0).await.unwrap();
1473 tree.insert(1, 1).await.unwrap();
1474 tree.insert(2, 2).await.unwrap();
1475 tree.insert(3, 3).await.unwrap();
1476 tree.remove(3).await.unwrap();
1477 tree.insert(3, 30).await.unwrap();
1478 tree.remove(3).await.unwrap();
1479 tree.insert(2, 20).await.unwrap();
1480 tree.flush_active().await.unwrap();
1481 assert_eq!(tree.stream().try_collect::<Vec<_>>().await.unwrap(), vec![(0, 0), (1, 1), (2, 20),]);
1482 assert_eq!(tree.reverse_stream().try_collect::<Vec<_>>().await.unwrap(), vec![(2, 20), (1, 1), (0, 0),]);
1483 }
1484
1485 #[tokio::test]
1486 async fn test_stream_query() {
1487 let storage = TestStorage::default();
1488 let settings = LsmTreeMapSettings { max_node_entries: 2, max_active_entries: 2, max_run_count: 2 };
1489 let mut tree = LsmTreeMap::new(storage.clone(), settings);
1490 for i in 0..10 {
1491 tree.insert(i, i).await.unwrap();
1492 }
1493 assert_eq!(
1494 tree.stream_query(Some(5)).try_collect::<Vec<_>>().await.unwrap(),
1495 vec![(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
1496 );
1497 assert_eq!(
1498 tree.reverse_stream_query(Some(5)).try_collect::<Vec<_>>().await.unwrap(),
1499 vec![(5, 5), (4, 4), (3, 3), (2, 2), (1, 1), (0, 0)]
1500 );
1501 }
1502
1503 #[tokio::test]
1504 async fn test_store_empty() {
1505 for count in [1, 10] {
1506 let storage = TestStorage::default();
1507 let settings = LsmTreeMapSettings { max_node_entries: 2, max_active_entries: 2, max_run_count: 2 };
1508 let mut tree = LsmTreeMap::new(storage.clone(), settings);
1509 for i in 0..count {
1510 tree.insert(i, i).await.unwrap();
1511 }
1512 for i in 0..count {
1513 tree.remove(i).await.unwrap();
1514 }
1515 assert!(tree.store().await.unwrap().is_none());
1516 }
1517 }
1518
1519 #[test]
1520 fn test_settings_default() {
1521 assert_eq!(LsmTreeMapSettings::default().max_node_entries, 256);
1522 assert_eq!(LsmTreeMapSettings::default().max_active_entries, 16384);
1523 assert_eq!(LsmTreeMapSettings::default().max_run_count, 16);
1524 assert!(LsmTreeMapSettings::default().is_default());
1525 let mut not_default = LsmTreeMapSettings::default();
1526 not_default.max_node_entries += 1;
1527 assert!(!not_default.is_default());
1528 }
1529
1530 #[test]
1531 fn test_serialize_value() {
1532 let empty_v = Value::<()>::Value(());
1533 let v = Value::<u8>::Value(0);
1534 let t = Value::<u8>::Tombstone;
1535
1536 let v_cbor = to_cbor(&v).unwrap();
1538 let empty_v_cbor = to_cbor(&empty_v).unwrap();
1539 let t_cbor = to_cbor(&t).unwrap();
1540 assert_eq!(from_cbor::<Value::<()>>(&empty_v_cbor).unwrap(), empty_v);
1541 assert_eq!(from_cbor::<Value::<u8>>(&v_cbor).unwrap(), v);
1542 assert_eq!(from_cbor::<Value::<u8>>(&t_cbor).unwrap(), t);
1543 }
1544}