Skip to main content

co_primitives/library/
lsm_tree_map.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use super::{dag_cbor_size_serializer::DagCborSizeSerializer, node_builder::NodeReader};
5use crate::{
6	AnyBlockStorage, Block, BlockSerializer, BlockStorageExt, Link, Node, NodeBuilder, NodeBuilderError,
7	NodeSerializer, NodeStream, OptionLink, StorageError,
8};
9use anyhow::anyhow;
10use bloomfilter::Bloom;
11use cid::Cid;
12use either::Either;
13use futures::{pin_mut, stream, FutureExt, Stream, StreamExt, TryStreamExt};
14use num_rational::Ratio;
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use std::{
17	cmp::Ordering,
18	collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
19	fmt::Debug,
20	future::ready,
21	hash::Hash,
22	marker::PhantomData,
23	num::TryFromIntError,
24};
25
26const INLINE_SIZE_FACTOR_LEVELS: Ratio<usize> = Ratio::new_raw(2, 8);
27const INLINE_SIZE_FACTOR_ACTIVE: Ratio<usize> = Ratio::new_raw(4, 8);
28
29/// LSM Tree Root.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct Root<K, V>
32where
33	K: Hash + Ord + Clone + Send + Sync + 'static,
34	V: Clone + Send + Sync + 'static,
35{
36	/// Levels.
37	/// Inline size: `MAX_BLOCK_SIZE * INLINE_SIZE_FACTOR_LEVELS`
38	#[serde(rename = "l", default = "Node::default", skip_serializing_if = "Node::is_empty")]
39	pub levels: Node<Level<K, V>>,
40
41	/// Active "in memory data".
42	/// Inline size: `MAX_BLOCK_SIZE * INLINE_SIZE_FACTOR_ACTIVE`
43	#[serde(rename = "a", default = "Node::default", skip_serializing_if = "Node::is_empty")]
44	pub active: Node<(K, Value<V>)>,
45
46	/// Tree settings.
47	#[serde(
48		rename = "s",
49		default = "LsmTreeMapSettings::default",
50		skip_serializing_if = "LsmTreeMapSettings::is_default"
51	)]
52	pub settings: LsmTreeMapSettings,
53}
54
55/// LSM Tree Level.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct Level<K, V>
58where
59	K: Hash + Ord + Clone + Send + Sync + 'static,
60	V: Clone + Send + Sync + 'static,
61{
62	/// Runs (SSTable) in the level.
63	#[serde(rename = "r", default = "OptionLink::default", skip_serializing_if = "OptionLink::is_none")]
64	pub runs: OptionLink<Node<Run<K, V>>>,
65}
66
67/// LSM Tree Run.
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct Run<K, V>
70where
71	K: Hash + Ord + Clone + Send + Sync + 'static,
72	V: Clone + Send + Sync + 'static,
73{
74	/// Entries Root DAG Node.
75	#[serde(rename = "e")]
76	pub entries: OptionLink<RunNode<K, V>>,
77
78	/// Count of entries.
79	#[serde(rename = "s")]
80	pub size: u64,
81
82	/// Key Bloom Filter.
83	#[serde(rename = "i")]
84	pub bloom: BloomFilter,
85
86	/// The lowest key.
87	#[serde(rename = "l")]
88	pub min_key: K,
89
90	/// The highest key.
91	#[serde(rename = "h")]
92	pub max_key: K,
93}
94impl<K, V> Run<K, V>
95where
96	K: Hash + Ord + Clone + Send + Sync + 'static,
97	V: Clone + Send + Sync + 'static,
98{
99	/// Tests if the run possibly contains an key.
100	fn may_contains_key(&self, key: &K) -> bool {
101		key >= &self.min_key && key <= &self.max_key && self.bloom.may_contains_key(key)
102	}
103}
104
105/// Bloom Filter Implementations.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[non_exhaustive]
108pub enum BloomFilter {
109	/// `bloomfilter = "3"`
110	#[serde(rename = "b", with = "serde_bytes")]
111	Bloomfilter(Vec<u8>),
112}
113impl BloomFilter {
114	/// Check if the bloom filter possibly contains an key.
115	pub fn may_contains_key<K: Hash + Ord + Clone + Send + Sync + 'static>(&self, key: &K) -> bool {
116		match self {
117			BloomFilter::Bloomfilter(data) => {
118				if let Ok(bloom) = Bloom::from_slice(data) {
119					bloom.check(key)
120				} else {
121					true
122				}
123			},
124		}
125	}
126}
127
128// /// LSM Tree Sorted Data Block Node.
129// #[derive(Debug, Clone, Serialize, Deserialize)]
130// pub enum Node<K, V>
131// where
132// 	K: Hash + Ord + Clone + 'static,
133// 	V: Clone + 'static,
134// {
135// 	#[serde(rename = "n")]
136// 	Internal(Vec<Link<Self>>),
137
138// 	#[serde(rename = "l")]
139// 	Leaf(BTreeMap<K, V>),
140// }
141// impl<K, V> Node<K, V>
142// where
143// 	K: Hash + Ord + Clone + 'static,
144// 	V: Clone + 'static,
145// {
146// 	fn may_contains_key(&self, key: &K) -> bool {
147// 		key >= &self.min_key && key <= &self.max_key
148// 	}
149// }
150
151/// LSM Tree Value.
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum Value<V>
154where
155	V: Clone + 'static,
156{
157	#[serde(rename = "v")]
158	Value(V),
159	#[serde(rename = "t")]
160	Tombstone,
161}
162impl<V> PartialEq for Value<V>
163where
164	V: PartialEq + Clone + 'static,
165{
166	fn eq(&self, other: &Self) -> bool {
167		match (self, other) {
168			(Self::Tombstone, Self::Tombstone) => true,
169			(Self::Value(a), Self::Value(b)) => a == b,
170			_ => false,
171		}
172	}
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub enum RunNode<K, V>
177where
178	K: Hash + Ord + Clone + Send + Sync + 'static,
179	V: Clone + Send + Sync + 'static,
180{
181	/// A node which contains sorted children nodes.
182	#[serde(rename = "n")]
183	Node {
184		/// The node.
185		#[serde(rename = "n")]
186		nodes: Vec<Link<Self>>,
187
188		/// The lowest key.
189		#[serde(rename = "l")]
190		min_key: K,
191
192		/// The highest key.
193		#[serde(rename = "h")]
194		max_key: K,
195	},
196
197	/// A leaf node which contains items sorted by key.
198	#[serde(rename = "l")]
199	Leaf(BTreeMap<K, Value<V>>),
200}
201impl<K, V> RunNode<K, V>
202where
203	K: Hash + Ord + Clone + Send + Sync + 'static,
204	V: Clone + Send + Sync + 'static,
205{
206	/// Tests if the run possibly contains an key.
207	fn may_contains_key(&self, key: &K) -> bool {
208		match self {
209			RunNode::Node { nodes: _, min_key, max_key } => key >= min_key && key <= max_key,
210			RunNode::Leaf(items) => items.contains_key(key),
211		}
212	}
213}
214impl<K, V> NodeReader<(K, Value<V>)> for RunNode<K, V>
215where
216	K: Hash + Ord + Clone + Send + Sync + 'static,
217	V: Clone + Send + Sync + 'static,
218{
219	type Filter = RunNodeFilter<K>;
220
221	fn read(self, filter: &Self::Filter) -> Either<Vec<Cid>, Vec<(K, Value<V>)>> {
222		match self {
223			RunNode::Node { nodes, min_key, max_key } => {
224				if filter.test(&min_key, &max_key) {
225					// println!("- filter {:?} min: {:?} max: {:?}", filter, min_key, max_key);
226					Either::Left(Vec::new())
227				} else {
228					// println!("+ use    {:?} min: {:?} max: {:?}", filter, min_key, max_key);
229					Either::Left(nodes.into_iter().map(Into::into).collect())
230				}
231			},
232			RunNode::Leaf(items) => Either::Right(items.into_iter().collect()),
233		}
234	}
235}
236
237#[derive(Debug, Default)]
238pub enum RunNodeFilter<K> {
239	#[default]
240	None,
241	Max(K),
242	Min(K),
243}
244impl<K> RunNodeFilter<K> {
245	pub fn min(key: Option<K>) -> Self {
246		match key {
247			None => Self::None,
248			Some(key) => Self::Min(key),
249		}
250	}
251
252	pub fn max(key: Option<K>) -> Self {
253		match key {
254			None => Self::None,
255			Some(key) => Self::Max(key),
256		}
257	}
258
259	/// Test if min_key .. max_key should be skipped
260	pub fn test(&self, min_key: &K, max_key: &K) -> bool
261	where
262		K: Ord,
263	{
264		match self {
265			RunNodeFilter::Min(filter_min_key) => min_key > filter_min_key,
266			RunNodeFilter::Max(filter_max_key) => max_key < filter_max_key,
267			RunNodeFilter::None => false,
268		}
269	}
270}
271
272/// Serializer which records min/max keys while serialize blocks.
273/// This metadata is written to the upper level blocks for fast retrival of values.
274/// Note: This only works if all previous nodes are serialized using the same serialize instance.
275///  But as we always wrote full immutable runs this should be ok.
276#[derive(Debug)]
277pub struct RunNodeSerializer<K, V> {
278	_d: PhantomData<(K, V)>,
279	pending: BTreeMap<Cid, (K, K)>,
280}
281impl<K, V> RunNodeSerializer<K, V> {
282	pub fn new() -> Self {
283		Self { _d: Default::default(), pending: Default::default() }
284	}
285}
286impl<K, V> NodeSerializer<RunNode<K, V>, (K, Value<V>)> for RunNodeSerializer<K, V>
287where
288	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
289	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
290{
291	fn nodes(&mut self, nodes: Vec<Link<RunNode<K, V>>>) -> Result<RunNode<K, V>, NodeBuilderError> {
292		let mut min_key = None;
293		let mut max_key = None;
294		for node in nodes.iter() {
295			if let Some((node_min_key, node_max_key)) = self.pending.remove(node.cid()) {
296				if min_key.is_none() || Some(&node_min_key).cmp(&min_key.as_ref()) == Ordering::Less {
297					min_key = Some(node_min_key);
298				}
299				if max_key.is_none() || Some(&node_max_key).cmp(&max_key.as_ref()) == Ordering::Greater {
300					max_key = Some(node_max_key);
301				}
302			}
303		}
304		Ok(RunNode::Node {
305			nodes,
306			min_key: min_key.ok_or(NodeBuilderError::InvalidArgument(anyhow!("Unable to determine min key")))?,
307			max_key: max_key.ok_or(NodeBuilderError::InvalidArgument(anyhow!("Unable to determine max key")))?,
308		})
309	}
310
311	fn leaf(&mut self, entries: Vec<(K, Value<V>)>) -> Result<RunNode<K, V>, NodeBuilderError> {
312		Ok(RunNode::Leaf(entries.into_iter().collect()))
313	}
314
315	fn serialize(&mut self, max_bock_size: usize, node: RunNode<K, V>) -> Result<Block, NodeBuilderError> {
316		let block = BlockSerializer::new()
317			.with_max_block_size(max_bock_size)
318			.serialize(&node)
319			.map_err(|err| NodeBuilderError::Encoding(err.into()))?;
320
321		// record min/max for faster insert
322		match &node {
323			RunNode::Node { nodes: _, min_key, max_key } => {
324				self.pending.insert(*block.cid(), (min_key.clone(), max_key.clone()));
325			},
326			RunNode::Leaf(items) => {
327				if let (Some((first, _)), Some((last, _))) = (items.first_key_value(), items.last_key_value()) {
328					self.pending.insert(*block.cid(), (first.clone(), last.clone()));
329				}
330			},
331		}
332
333		// result
334		Ok(block)
335	}
336
337	fn item_size_hint(&self, item: &(K, Value<V>)) -> Option<usize> {
338		let mut serializer = DagCborSizeSerializer::new();
339		item.serialize(&mut serializer).ok()?;
340		Some(serializer.size)
341	}
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
345pub struct LsmTreeMapSettings {
346	/// Max entries (K/V pairs) count in a leaf node.
347	#[serde(
348		rename = "n",
349		default = "LsmTreeMapSettings::default_max_node_entries",
350		skip_serializing_if = "LsmTreeMapSettings::is_default_max_node_entries"
351	)]
352	pub max_node_entries: u64,
353
354	/// Limits entries (K/V pairs) in active (in memory) run.
355	/// Overruning this limit will cause a new L0 run gets created.
356	#[serde(
357		rename = "a",
358		default = "LsmTreeMapSettings::default_max_active_entries",
359		skip_serializing_if = "LsmTreeMapSettings::is_default_max_active_entries"
360	)]
361	pub max_active_entries: u64,
362
363	/// Limits runs in a level.
364	/// Overruning this limit will cause a compaction to next level.
365	#[serde(
366		rename = "r",
367		default = "LsmTreeMapSettings::default_max_run_count",
368		skip_serializing_if = "LsmTreeMapSettings::is_default_max_run_count"
369	)]
370	pub max_run_count: u64,
371}
372impl LsmTreeMapSettings {
373	fn default_max_node_entries() -> u64 {
374		2u64.checked_pow(8).expect("to pow") // 256
375	}
376
377	fn default_max_active_entries() -> u64 {
378		2u64.checked_pow(14).expect("to pow") // 16k
379	}
380
381	fn default_max_run_count() -> u64 {
382		2u64.checked_pow(4).expect("to pow") // 16
383	}
384
385	fn is_default_max_node_entries(value: &u64) -> bool {
386		*value == Self::default_max_node_entries()
387	}
388
389	fn is_default_max_active_entries(value: &u64) -> bool {
390		*value == Self::default_max_active_entries()
391	}
392
393	fn is_default_max_run_count(value: &u64) -> bool {
394		*value == Self::default_max_run_count()
395	}
396
397	/// Wherter this are default settings.
398	pub fn is_default(&self) -> bool {
399		self == &Self::default()
400	}
401}
402impl Default for LsmTreeMapSettings {
403	fn default() -> Self {
404		Self {
405			max_node_entries: Self::default_max_node_entries(),
406			max_active_entries: Self::default_max_active_entries(),
407			max_run_count: Self::default_max_run_count(),
408		}
409	}
410}
411
412/// LSM Tree Instance.
413#[derive(Clone)]
414pub struct LsmTreeMap<S, K, V>
415where
416	S: AnyBlockStorage,
417	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
418	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
419{
420	/// Storage instance.
421	storage: S,
422
423	/// LSM Root.
424	root: OptionLink<Root<K, V>>,
425
426	/// Active in memory pairs.
427	active: BTreeMap<K, Value<V>>,
428
429	// Limits items in memory run.
430	settings: LsmTreeMapSettings,
431	// level_cache: Arc<RwLock<BTreeMap<Cid, Level<K, V>>>>,
432	// run_cache: Arc<RwLock<BTreeMap<Cid, Run<K, V>>>>,
433}
434impl<S, K, V> LsmTreeMap<S, K, V>
435where
436	S: AnyBlockStorage,
437	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
438	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
439{
440	/// Create new empty tree.
441	pub fn new(storage: S, settings: LsmTreeMapSettings) -> Self {
442		Self { active: Default::default(), settings, root: OptionLink::none(), storage }
443	}
444
445	/// Load Tree from root CID.
446	pub async fn load(storage: S, root: Link<Root<K, V>>) -> Result<Self, StorageError> {
447		let mut result = Self { active: Default::default(), settings: Default::default(), root: root.into(), storage };
448		if let Some(root) = result.root().await? {
449			result.settings = root.settings;
450			result.active = NodeStream::from_node(result.storage.clone(), root.active, None)
451				.try_collect()
452				.await?;
453		}
454		Ok(result)
455	}
456
457	/// Insert/Replace key.
458	pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
459		// insert entry to memory run
460		self.active.insert(key, Value::Value(value));
461
462		// flush?
463		if self.active.len() >= self.settings.max_active_entries as usize {
464			self.flush_active().boxed().await?;
465		}
466
467		// result
468		Ok(())
469	}
470
471	/// Remove key.
472	pub async fn remove(&mut self, key: K) -> Result<(), StorageError> {
473		// special case: only active items: directly remove
474		if self.root.is_none() {
475			self.active.remove(&key);
476			return Ok(());
477		}
478
479		// insert tombstone to memory run
480		self.active.insert(key, Value::Tombstone);
481
482		// flush?
483		if self.active.len() >= self.settings.max_active_entries as usize {
484			self.flush_active().boxed().await?;
485		}
486
487		// result
488		Ok(())
489	}
490
491	/// Get value for key.
492	pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
493		match self.active.get(key) {
494			Some(Value::Value(v)) => Ok(Some(v.clone())),
495			Some(Value::Tombstone) => Ok(None),
496			None => {
497				// iterate runs (most up-to-date is the first)
498				let runs = Self::load_levels_and_runs(self.storage.clone(), self.root);
499				pin_mut!(runs);
500				while let Some(item) = runs.try_next().await? {
501					if let Some((_, run)) = item.right() {
502						if run.may_contains_key(key) {
503							// iterate (DAG) RunNode's from first to last skipping nodes which can not contain the
504							// value
505							let mut stack: VecDeque<Link<RunNode<K, V>>> = Default::default();
506							if let Some(cid) = run.entries.link() {
507								stack.push_back(cid);
508							}
509							while let Some(cid) = stack.pop_front() {
510								let node = self.storage.get_value(&cid).await?;
511								if node.may_contains_key(key) {
512									match node {
513										RunNode::Node { nodes, min_key: _, max_key: _ } => {
514											stack.extend(nodes.into_iter());
515										},
516										RunNode::Leaf(mut items) => {
517											if let Some(value) = items.remove(key) {
518												return match value {
519													Value::Value(value) => Ok(Some(value)),
520													Value::Tombstone => Ok(None),
521												};
522											}
523										},
524									}
525								}
526							}
527						}
528					}
529				}
530				Ok(None)
531			},
532		}
533	}
534
535	pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
536		Ok(self.get(key).await?.is_some())
537	}
538
539	/// Stream all tree entries.
540	pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
541		self.stream_query(None)
542	}
543
544	/// Stream tree entries.
545	pub fn stream_query(&self, start_at: Option<K>) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
546		self.create_stream(None, start_at).try_filter_map(|item| {
547			ready(Ok(match item.1 {
548				Value::Value(value) => Some((item.0, value)),
549				Value::Tombstone => None,
550			}))
551		})
552	}
553
554	/// Stream all tree entries in reverse order.
555	pub fn reverse_stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
556		self.reverse_stream_query(None)
557	}
558
559	/// Stream tree entries in reverse order.
560	pub fn reverse_stream_query(
561		&self,
562		start_at: Option<K>,
563	) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
564		self.create_reverse_stream(None, start_at).try_filter_map(|item| {
565			ready(Ok(match item.1 {
566				Value::Value(value) => Some((item.0, value)),
567				Value::Tombstone => None,
568			}))
569		})
570	}
571
572	/// Store active items and return the root link.
573	///
574	/// # Guarantees
575	/// - The method detects if the collection is empty after this and accordingly returns [`OptionLink::none`].
576	pub async fn store(&mut self) -> Result<OptionLink<Root<K, V>>, StorageError> {
577		// get root
578		let mut root = match self.root().await? {
579			Some(root) => {
580				// collection empty (no items or only tombstoned items)?
581				if self.is_empty().await? {
582					return Ok(OptionLink::none());
583				}
584
585				// root
586				root
587			},
588			None => {
589				// collection empty (no root and no active)?
590				if self.active.is_empty() || self.is_empty().await? {
591					return Ok(OptionLink::none());
592				}
593
594				// new root
595				Root { levels: Default::default(), active: Default::default(), settings: self.settings.clone() }
596			},
597		};
598
599		// store active
600		root.active =
601			store_active(&self.storage, self.settings.max_node_entries, stream::iter(self.active.iter()).map(Ok))
602				.await?;
603
604		// store root
605		self.root = self.storage.set_value(&root).await?.into();
606
607		// result
608		Ok(self.root)
609	}
610
611	/// Tree stats.
612	pub async fn stats(&self) -> Result<LsmTreeStats, StorageError> {
613		Self::load_levels_and_runs(self.storage.clone(), self.root)
614			.try_fold(
615				LsmTreeStats { entries: 0, active_entries: self.active.len(), levels: 0, runs: 0 },
616				|mut result, item| {
617					match item {
618						Either::Left(_) => result.levels += 1,
619						Either::Right((_, run)) => {
620							result.runs += 1;
621							result.entries += run.size as usize;
622						},
623					}
624					ready(Ok(result))
625				},
626			)
627			.await
628	}
629
630	/// Whether the collection is empty.
631	pub async fn is_empty(&self) -> Result<bool, StorageError> {
632		Ok(self.min_key().await?.is_none())
633	}
634
635	/// Find the first (active - not tombstoned) key.
636	pub async fn min_key(&self) -> Result<Option<K>, StorageError> {
637		let stream = self.stream();
638		pin_mut!(stream);
639		let first = stream.try_next().await?;
640		Ok(first.map(|(key, _)| key))
641	}
642
643	/// Find the last (active - not tombstoned) key.
644	pub async fn max_key(&self) -> Result<Option<K>, StorageError> {
645		let stream = self.reverse_stream();
646		pin_mut!(stream);
647		let first = stream.try_next().await?;
648		Ok(first.map(|(key, _)| key))
649	}
650
651	/// Compact tree.
652	///
653	/// # Arguments
654	/// - `flush_active` - Flush active "in memory" entries to L0.
655	pub async fn compact(&mut self, flush_active: bool) -> Result<(), StorageError> {
656		if flush_active {
657			self.flush_active().boxed().await?;
658		}
659		self.compact_level(0, Some(self.settings.max_run_count as usize)).await?;
660		Ok(())
661	}
662
663	/// Iterate on runs.
664	///
665	/// # Arguments
666	/// - `only_run_indicies` - Only use specified runs specified by global run index.
667	/// - `start_at` - Start streaming at (inclusive) key.
668	fn create_stream(
669		&self,
670		only_run_indicies: Option<BTreeSet<usize>>,
671		start_at: Option<K>,
672	) -> impl Stream<Item = Result<(K, Value<V>), StorageError>> + Send + use<S, K, V> {
673		let storage = self.storage.clone();
674		let active = self.active.clone();
675		let root = self.root;
676		async_stream::try_stream! {
677			// heap (max-sorted)
678			let mut heap = match only_run_indicies {
679				Some(_) => BinaryHeap::new(),
680				None => BinaryHeap::<TreeStreamItem<K, V>>::from(
681					active
682						.into_iter()
683						.map(|item| TreeStreamItem { run: None, item })
684						.collect::<Vec<_>>(),
685				),
686			};
687
688			// runs
689			//  filter and pop first item of each run
690			let mut runs: Vec<(usize, RunNodeStream<S, K, V>)> = Self::load_levels_and_runs(storage.clone(), root)
691				.try_filter_map(|item| ready(Ok(item.right())))
692				.try_filter(|(index, _)| ready(match &only_run_indicies {
693					Some(run_indicies) => run_indicies.contains(index),
694					None => true,
695				}))
696				.map_ok(|(index, run)| (index, NodeStream::from_link(storage.clone(), run.entries).with_filter(RunNodeFilter::max(start_at.clone()))))
697				.try_collect()
698				.await?;
699			for (run_index, run) in runs.iter_mut() {
700				if let Some(item) = run.try_next().await? {
701					heap.push(TreeStreamItem { run: Some(*run_index), item });
702				}
703			}
704
705			// walk tree
706			while let Some(item) = Self::pop_and_fetch(&mut heap, &mut runs).await? {
707				// pop superseded keys
708				//  we sort the TreeStreamItem by key and run index so we receive the next key front the most recent run first
709				while let Some(next_item) = heap.peek() {
710					if next_item.item.0 == item.item.0 {
711						Self::pop_and_fetch(&mut heap, &mut runs).await?;
712					} else {
713						break;
714					}
715				}
716
717				// skip items before start at
718				//  we need to filter overlaps as nodes which come before start_at will be skipped
719				if let Some(start_at) = &start_at {
720					if start_at > &item.item.0 {
721						continue;
722					}
723				}
724
725				// yield values
726				yield item.item;
727			}
728		}
729		// TreeStream::new(self)
730	}
731
732	/// Iterate on runs.
733	fn create_reverse_stream(
734		&self,
735		only_run_indicies: Option<BTreeSet<usize>>,
736		start_at: Option<K>,
737	) -> impl Stream<Item = Result<(K, Value<V>), StorageError>> + use<S, K, V> {
738		let storage = self.storage.clone();
739		let active = self.active.clone();
740		let root = self.root;
741		async_stream::try_stream! {
742			// heap (max-sorted)
743			let mut heap = match only_run_indicies {
744				Some(_) => BinaryHeap::new(),
745				None => BinaryHeap::<ReverseTreeStreamItem<K, V>>::from(
746					active
747						.into_iter()
748						.map(|item| ReverseTreeStreamItem { run: None, item })
749						.collect::<Vec<_>>(),
750				),
751			};
752
753			// runs
754			//  filter and pop first item of each run
755			let mut runs: Vec<(usize, RunNodeStream<S, K, V>)> = Self::load_levels_and_runs(storage.clone(), root)
756				.try_filter_map(|item| ready(Ok(item.right())))
757				.try_filter(|(index, _)| ready(match &only_run_indicies {
758					Some(run_indicies) => run_indicies.contains(index),
759					None => true,
760				}))
761				.map_ok(|(index, run)| (index, NodeStream::from_link(storage.clone(), run.entries).with_reverse().with_filter(RunNodeFilter::min(start_at.clone()))))
762				.try_collect()
763				.await?;
764			for (run_index, run) in runs.iter_mut() {
765				if let Some(item) = run.try_next().await? {
766					heap.push(ReverseTreeStreamItem { run: Some(*run_index), item });
767				}
768			}
769
770			// walk tree
771			while let Some(item) = Self::pop_and_fetch_reverse(&mut heap, &mut runs).await? {
772				// pop superseded keys
773				//  we sort the ReverseNodeStream by key and run index so we receive the next key front the most recent run first
774				while let Some(next_item) = heap.peek() {
775					if next_item.item.0 == item.item.0 {
776						Self::pop_and_fetch_reverse(&mut heap, &mut runs).await?;
777					} else {
778						break;
779					}
780				}
781
782				// skip items after start at
783				//  we need to filter overlaps as nodes which come after start_at will be skipped
784				if let Some(start_at) = &start_at {
785					if start_at < &item.item.0 {
786						continue;
787					}
788				}
789
790				// yield values
791				yield item.item;
792			}
793		}
794		// TreeStream::new(self)
795	}
796
797	/// Pop item and continue to read the run.
798	async fn pop_and_fetch(
799		heap: &mut BinaryHeap<TreeStreamItem<K, V>>,
800		runs: &mut [(usize, RunNodeStream<S, K, V>)],
801	) -> Result<Option<TreeStreamItem<K, V>>, StorageError> {
802		if let Some(item) = heap.pop() {
803			// fetch next
804			//  every time we take an item from an run we fetch the next one
805			//  so the heap can determine which are the next in sequence
806			//  once a run runs out of items it will not be asked again as all run items poped
807			if let Some(run_index) = item.run {
808				if let Some((_, run)) = runs.get_mut(run_index) {
809					if let Some(item) = run.try_next().await? {
810						heap.push(TreeStreamItem { run: Some(run_index), item });
811					}
812				}
813			}
814
815			// result
816			Ok(Some(item))
817		} else {
818			Ok(None)
819		}
820	}
821
822	/// Pop item and continue to read the run.
823	async fn pop_and_fetch_reverse(
824		heap: &mut BinaryHeap<ReverseTreeStreamItem<K, V>>,
825		runs: &mut [(usize, RunNodeStream<S, K, V>)],
826	) -> Result<Option<ReverseTreeStreamItem<K, V>>, StorageError> {
827		if let Some(item) = heap.pop() {
828			// fetch next
829			//  every time we take an item from an run we fetch the next one
830			//  so the heap can determine which are the next in sequence
831			//  once a run runs out of items it will not be asked again as all run items poped
832			if let Some(run_index) = item.run {
833				if let Some((_, run)) = runs.get_mut(run_index) {
834					if let Some(item) = run.try_next().await? {
835						heap.push(ReverseTreeStreamItem { run: Some(run_index), item });
836					}
837				}
838			}
839
840			// result
841			Ok(Some(item))
842		} else {
843			Ok(None)
844		}
845	}
846
847	/// Flush active in memory entries as a new run.
848	async fn flush_active(&mut self) -> Result<usize, StorageError> {
849		// validate
850		if self.active.is_empty() {
851			return Ok(0);
852		}
853		let min_key = if let Some((min_key, _)) = self.active.first_key_value() {
854			min_key.clone()
855		} else {
856			return Ok(0);
857		};
858		let max_key = if let Some((max_key, _)) = self.active.last_key_value() {
859			max_key.clone()
860		} else {
861			return Ok(0);
862		};
863		if min_key == max_key {
864			return Ok(0);
865		}
866
867		// run
868		//  TODO: remove clone (because of the hardcoded Value<V>?)
869		let run = if let Some(run) = store_run(
870			&self.storage,
871			self.settings.max_node_entries,
872			stream::iter(self.active.clone().into_iter()).map(Ok),
873			self.active.len(),
874		)
875		.await?
876		{
877			run
878		} else {
879			return Ok(0);
880		};
881
882		// get/create root
883		let mut root = match self.root().await? {
884			Some(root) => root,
885			None => Root { levels: Default::default(), active: Default::default(), settings: self.settings.clone() },
886		};
887
888		// add as first run to level 0
889		//  note: this currently loads all run "metadata" entries into memory however that should be ok because we dont
890		//   expect the run numbers to be very large.
891		let mut levels = Self::load_levels(self.storage.clone(), root.levels.clone()).await?;
892		if levels.is_empty() {
893			levels.insert(0, Level { runs: Default::default() });
894		}
895		let mut runs = NodeStream::from_link(self.storage.clone(), levels[0].runs)
896			.try_collect::<VecDeque<_>>()
897			.await?;
898		runs.push_front(run);
899		let runs_count = runs.len();
900		levels[0].runs =
901			store_items(&self.storage, self.settings.max_node_entries, stream::iter(runs.iter()).map(Ok)).await?;
902		root.levels = self.store_levels(levels).await?;
903
904		// store root with next level0
905		self.root = self.storage.set_value(&root).await?.into();
906
907		// cleanup when everything has succedded
908		self.active.clear();
909
910		// compact?
911		if runs_count >= self.settings.max_run_count as usize {
912			self.compact_level(0, Some(self.settings.max_run_count as usize)).await?;
913		}
914
915		// result
916		Ok(0)
917	}
918
919	/// Compact `level` to next level.
920	///
921	/// # Arguments
922	/// - `level_index` - The gloabl level index to compact
923	/// - `cascade` - Cascade to next (`level_index + n`) levels when they react the specified run count
924	async fn compact_level(&mut self, level_index: usize, cascade: Option<usize>) -> Result<(), StorageError> {
925		let next_level_index = level_index + 1;
926
927		// root
928		let mut root = if let Some(root) = self.root().await? { root } else { return Ok(()) };
929		let mut levels = Self::load_levels(self.storage.clone(), root.levels.clone()).await?;
930
931		// level
932		if levels.get(level_index).is_none() {
933			return Ok(());
934		}
935
936		// next level
937		if levels.get(next_level_index).is_none() {
938			levels.insert(next_level_index, Level { runs: OptionLink::none() });
939		}
940
941		// find runs to compact
942		let mut runs = Vec::new();
943		{
944			let levels_and_runs = Self::load_levels_and_runs(self.storage.clone(), self.root);
945			pin_mut!(levels_and_runs);
946			let mut current_global_level_index = 0;
947			while let Some(level_or_run) = levels_and_runs.try_next().await? {
948				match level_or_run {
949					Either::Left((global_level_index, _level)) => {
950						current_global_level_index = global_level_index;
951						// stop when hit older levels
952						if current_global_level_index > next_level_index {
953							break;
954						}
955					},
956					Either::Right((global_run_index, run)) => {
957						if current_global_level_index == level_index {
958							// we use all runs from the level
959							runs.push((current_global_level_index, global_run_index, run));
960						} else if current_global_level_index == next_level_index {
961							// test overlap in next level
962							if runs
963								.iter()
964								.any(|(_, _, r)| r.min_key <= run.max_key && run.min_key <= r.max_key)
965							{
966								runs.push((current_global_level_index, global_run_index, run));
967							}
968						}
969					},
970				}
971			}
972		}
973		if runs.is_empty() {
974			return Ok(());
975		}
976
977		// runs
978		let entries =
979			self.create_stream(Some(runs.iter().map(|(_, global_run_index, _)| *global_run_index).collect()), None);
980		let run = store_run(
981			&self.storage,
982			self.settings.max_node_entries,
983			entries,
984			runs.iter().fold(0, |result, (_, _, run)| result + run.size) as usize,
985		)
986		.await?;
987		let run = if let Some(run) = run {
988			run
989		} else {
990			return Ok(());
991		};
992
993		// replace runs
994		let (level_run_count, next_level_run_count) = {
995			let mut level_runs = NodeStream::from_link(self.storage.clone(), levels[level_index].runs)
996				.try_collect::<Vec<_>>()
997				.await?;
998			let mut next_level_runs = NodeStream::from_link(self.storage.clone(), levels[next_level_index].runs)
999				.try_collect::<Vec<_>>()
1000				.await?;
1001
1002			// remove old
1003			for (global_level_index, global_run_index, _) in runs.iter().rev() {
1004				let local_run_index = *global_run_index - *global_level_index;
1005				if *global_level_index == level_index && level_runs.get(local_run_index).is_some() {
1006					level_runs.remove(*global_run_index - *global_level_index);
1007				}
1008				if *global_level_index == next_level_index && next_level_runs.get(local_run_index).is_some() {
1009					next_level_runs.remove(*global_run_index - *global_level_index);
1010				}
1011			}
1012
1013			// insert new
1014			next_level_runs.insert(0, run);
1015
1016			// store runs
1017			let level_run_count = level_runs.len();
1018			let next_level_run_count = next_level_runs.len();
1019			levels[level_index].runs =
1020				store_items(&self.storage, self.settings.max_node_entries, stream::iter(&level_runs).map(Ok)).await?;
1021			levels[next_level_index].runs =
1022				store_items(&self.storage, self.settings.max_node_entries, stream::iter(&next_level_runs).map(Ok))
1023					.await?;
1024
1025			// counts
1026			(level_run_count, next_level_run_count)
1027		};
1028
1029		// clear empty
1030		let next_level_index = if level_run_count == 0 {
1031			levels.remove(level_index);
1032			level_index
1033		} else {
1034			next_level_index
1035		};
1036
1037		// store levels
1038		root.levels = self.store_levels(levels).await?;
1039
1040		// store root
1041		self.root = self.storage.set_value(&root).await?.into();
1042
1043		// cascade
1044		if let Some(max_run_count) = cascade {
1045			if next_level_run_count >= max_run_count {
1046				Box::pin(self.compact_level(next_level_index, cascade)).await?;
1047			}
1048		}
1049
1050		// result
1051		// Ok((next_level_index, next_level_run_count))
1052		Ok(())
1053	}
1054
1055	/// Root.
1056	async fn root(&self) -> Result<Option<Root<K, V>>, StorageError> {
1057		Self::load_root(&self.storage, self.root).await
1058	}
1059
1060	/// Store Levels.
1061	async fn store_levels(&self, levels: Vec<Level<K, V>>) -> Result<Node<Level<K, V>>, StorageError> {
1062		let mut builder = NodeBuilder::default()
1063			.with_items_size_max((INLINE_SIZE_FACTOR_LEVELS * self.storage.max_block_size()).to_integer());
1064		builder.extend(levels).map_err(|e| StorageError::InvalidArgument(e.into()))?;
1065		let (node, blocks) = builder.into_node().map_err(|e| StorageError::InvalidArgument(e.into()))?;
1066		for block in blocks {
1067			self.storage.set(block).await?;
1068		}
1069		Ok(node)
1070	}
1071
1072	/// Root.
1073	async fn load_root(storage: &S, root: OptionLink<Root<K, V>>) -> Result<Option<Root<K, V>>, StorageError> {
1074		storage.get_value_or_none(&root).await
1075	}
1076
1077	/// Levels.
1078	async fn load_levels(storage: S, levels: Node<Level<K, V>>) -> Result<Vec<Level<K, V>>, StorageError> {
1079		NodeStream::from_node(storage, levels, None).try_collect().await
1080	}
1081
1082	/// All active levels and runs with its (current) global index.
1083	/// Sorted by newest to oldest run.
1084	fn load_levels_and_runs(
1085		storage: S,
1086		root: OptionLink<Root<K, V>>,
1087	) -> impl Stream<Item = Result<EitherLevelOrRun<K, V>, StorageError>> + Send {
1088		async_stream::try_stream! {
1089			let mut global_level_index = 0;
1090			let mut global_run_index = 0;
1091			if let Some(root) = Self::load_root(&storage, root).await? {
1092				for level in  Self::load_levels(storage.clone(), root.levels).await? {
1093					let runs = NodeStream::from_link(storage.clone(), level.runs);
1094					yield Either::Left((global_level_index, level));
1095					for await run in runs {
1096						yield Either::Right((global_run_index, run?));
1097						global_run_index += 1;
1098					}
1099					global_level_index += 1;
1100				}
1101			}
1102		}
1103	}
1104}
1105
1106type EitherLevelOrRun<K, V> = Either<(usize, Level<K, V>), (usize, Run<K, V>)>;
1107type RunNodeStream<S, K, V> = NodeStream<S, (K, Value<V>), RunNode<K, V>>;
1108
1109/// Store as DAG Node and return the root [`cid::Cid`].
1110async fn store_items<'a, S, T>(
1111	storage: &S,
1112	max_node_entries: u64,
1113	items: impl Stream<Item = Result<&'a T, StorageError>> + Send,
1114) -> Result<OptionLink<Node<T>>, StorageError>
1115where
1116	S: AnyBlockStorage,
1117	T: Clone + Serialize + 'a,
1118{
1119	let mut node_builder = NodeBuilder::<_>::new(
1120		storage.max_block_size(),
1121		max_node_entries
1122			.try_into()
1123			.map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
1124		Default::default(),
1125	);
1126	pin_mut!(items);
1127	while let Some(item) = items.try_next().await? {
1128		node_builder.push(item).map_err(|e| StorageError::InvalidArgument(e.into()))?;
1129		for block in node_builder.take_blocks() {
1130			storage.set(block).await?;
1131		}
1132	}
1133	let (root, blocks) = node_builder
1134		.into_blocks()
1135		.map_err(|e| StorageError::InvalidArgument(e.into()))?;
1136	for block in blocks.into_iter() {
1137		storage.set(block).await?;
1138	}
1139
1140	// result
1141	Ok(root.cid().into())
1142}
1143
1144/// Store `entries` as DAG Node and return the root [`cid::Cid`].
1145async fn store_active<S, K, V>(
1146	storage: &S,
1147	max_node_entries: u64,
1148	entries: impl Stream<Item = Result<(&K, &Value<V>), StorageError>>,
1149) -> Result<Node<(K, Value<V>)>, StorageError>
1150where
1151	S: AnyBlockStorage,
1152	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1153	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1154{
1155	let mut node_builder = NodeBuilder::<_>::new(
1156		storage.max_block_size(),
1157		max_node_entries
1158			.try_into()
1159			.map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
1160		Default::default(),
1161	)
1162	.with_items_size_max((INLINE_SIZE_FACTOR_ACTIVE * storage.max_block_size()).to_integer());
1163	pin_mut!(entries);
1164	while let Some(item) = entries.try_next().await? {
1165		node_builder
1166			.push((item.0.clone(), item.1.clone()))
1167			.map_err(|e| StorageError::InvalidArgument(e.into()))?;
1168		for block in node_builder.take_blocks() {
1169			storage.set(block).await?;
1170		}
1171	}
1172	let (node, blocks) = node_builder.into_node().map_err(|e| StorageError::InvalidArgument(e.into()))?;
1173	for block in blocks.into_iter() {
1174		storage.set(block).await?;
1175	}
1176
1177	// result
1178	Ok(node)
1179}
1180
1181/// Store `entries` as DAG and return the root [`cid::Cid`].
1182async fn store_run_node<S, K, V>(
1183	storage: &S,
1184	max_node_entries: u64,
1185	entries: impl Stream<Item = Result<(K, Value<V>), StorageError>>,
1186) -> Result<OptionLink<RunNode<K, V>>, StorageError>
1187where
1188	S: AnyBlockStorage,
1189	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1190	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1191{
1192	let mut node_builder = NodeBuilder::<(K, Value<V>), RunNode<K, V>, RunNodeSerializer<K, V>>::new(
1193		storage.max_block_size(),
1194		max_node_entries
1195			.try_into()
1196			.map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
1197		RunNodeSerializer::new(),
1198	);
1199	pin_mut!(entries);
1200	while let Some(item) = entries.try_next().await? {
1201		node_builder.push(item).map_err(|e| StorageError::InvalidArgument(e.into()))?;
1202		for block in node_builder.take_blocks() {
1203			storage.set(block).await?;
1204		}
1205	}
1206	let (root, blocks) = node_builder
1207		.into_blocks()
1208		.map_err(|e| StorageError::InvalidArgument(e.into()))?;
1209	for block in blocks.into_iter() {
1210		storage.set(block).await?;
1211	}
1212
1213	// result
1214	Ok(root)
1215}
1216
1217/// Store a new run to storage composed of `entries`.
1218async fn store_run<S, K, V>(
1219	storage: &S,
1220	max_node_entries: u64,
1221	entries: impl Stream<Item = Result<(K, Value<V>), StorageError>>,
1222	entries_size_hint: usize,
1223) -> Result<Option<Run<K, V>>, StorageError>
1224where
1225	S: AnyBlockStorage,
1226	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1227	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1228{
1229	// entries
1230	let mut min_key: Option<K> = None;
1231	let mut max_key: Option<K> = None;
1232	let mut size = 0;
1233	let mut bloom = Bloom::<K>::new_for_fp_rate_with_seed(entries_size_hint, 0.001, &[0; 32])
1234		.map_err(|e| StorageError::InvalidArgument(anyhow!("bloomfilter: {}", e)))?;
1235	let entries_link = store_run_node(
1236		storage,
1237		max_node_entries,
1238		entries.inspect_ok(|(key, _)| {
1239			if min_key.is_none() || Some(key).cmp(&min_key.as_ref()) == Ordering::Less {
1240				min_key = Some(key.clone());
1241			}
1242			if max_key.is_none() || Some(key).cmp(&max_key.as_ref()) == Ordering::Greater {
1243				max_key = Some(key.clone());
1244			}
1245			size += 1;
1246			bloom.set(key);
1247		}),
1248	)
1249	.await?;
1250	let (min_key, max_key) = match (min_key, max_key) {
1251		(Some(min_key), Some(max_key)) => (min_key, max_key),
1252		_ => return Ok(None),
1253	};
1254	let next_run =
1255		Run { entries: entries_link, size, bloom: BloomFilter::Bloomfilter(bloom.to_bytes()), min_key, max_key };
1256	Ok(Some(next_run))
1257}
1258
1259/// Tree stats.
1260#[derive(Debug, Clone, PartialEq)]
1261pub struct LsmTreeStats {
1262	/// Approx. entries in tree (upper bound).
1263	pub entries: usize,
1264
1265	/// Active "in memory" entries.
1266	pub active_entries: usize,
1267
1268	/// Levels in tree.
1269	pub levels: usize,
1270
1271	/// Runs in tree.
1272	pub runs: usize,
1273}
1274
1275#[derive(Debug)]
1276struct TreeStreamItem<K, V>
1277where
1278	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1279	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1280{
1281	run: Option<usize>,
1282	item: (K, Value<V>),
1283}
1284impl<K, V> PartialEq for TreeStreamItem<K, V>
1285where
1286	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1287	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1288{
1289	fn eq(&self, other: &Self) -> bool {
1290		self.item.0 == other.item.0
1291	}
1292}
1293impl<K, V> Eq for TreeStreamItem<K, V>
1294where
1295	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1296	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1297{
1298}
1299impl<K, V> PartialOrd for TreeStreamItem<K, V>
1300where
1301	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1302	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1303{
1304	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1305		Some(self.cmp(other))
1306	}
1307}
1308impl<K, V> Ord for TreeStreamItem<K, V>
1309where
1310	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1311	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1312{
1313	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1314		// invert the result as the binaryheap always returns the max item first
1315		match self.item.0.cmp(&other.item.0) {
1316			Ordering::Less => Ordering::Greater,
1317			Ordering::Greater => Ordering::Less,
1318			Ordering::Equal => {
1319				// sort by run index so the most recent run item is the last
1320				other.run.cmp(&self.run)
1321			},
1322		}
1323	}
1324}
1325
1326#[derive(Debug)]
1327struct ReverseTreeStreamItem<K, V>
1328where
1329	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1330	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1331{
1332	run: Option<usize>,
1333	item: (K, Value<V>),
1334}
1335impl<K, V> PartialEq for ReverseTreeStreamItem<K, V>
1336where
1337	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1338	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1339{
1340	fn eq(&self, other: &Self) -> bool {
1341		self.item.0 == other.item.0
1342	}
1343}
1344impl<K, V> Eq for ReverseTreeStreamItem<K, V>
1345where
1346	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1347	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1348{
1349}
1350impl<K, V> PartialOrd for ReverseTreeStreamItem<K, V>
1351where
1352	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1353	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1354{
1355	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1356		Some(self.cmp(other))
1357	}
1358}
1359impl<K, V> Ord for ReverseTreeStreamItem<K, V>
1360where
1361	K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1362	V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
1363{
1364	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1365		match self.item.0.cmp(&other.item.0) {
1366			Ordering::Less => Ordering::Less,
1367			Ordering::Greater => Ordering::Greater,
1368			Ordering::Equal => {
1369				// sort by run index so the most recent run item is the last
1370				other.run.cmp(&self.run)
1371			},
1372		}
1373	}
1374}
1375
1376#[cfg(test)]
1377mod tests {
1378	use super::{LsmTreeMap, Value};
1379	use crate::{
1380		from_cbor,
1381		library::{lsm_tree_map::LsmTreeStats, test::TestStorage},
1382		to_cbor, LsmTreeMapSettings,
1383	};
1384	use futures::TryStreamExt;
1385
1386	#[tokio::test]
1387	async fn smoke() {
1388		let storage = TestStorage::default();
1389		let mut tree = LsmTreeMap::new(storage.clone(), Default::default());
1390		tree.insert("hello".to_owned(), "world".to_owned()).await.unwrap();
1391		tree.insert("1".to_owned(), "2".to_owned()).await.unwrap();
1392		tree.insert("3".to_owned(), "4".to_owned()).await.unwrap();
1393		assert_eq!(
1394			tree.stream().try_collect::<Vec<_>>().await.unwrap(),
1395			vec![
1396				("1".to_owned(), "2".to_owned()),
1397				("3".to_owned(), "4".to_owned()),
1398				("hello".to_owned(), "world".to_owned())
1399			]
1400		);
1401
1402		// reload
1403		let root = tree.store().await.unwrap().unwrap();
1404		let tree2 = LsmTreeMap::load(storage.clone(), root).await.unwrap();
1405		assert_eq!(
1406			tree2.stream().try_collect::<Vec<_>>().await.unwrap(),
1407			vec![
1408				("1".to_owned(), "2".to_owned()),
1409				("3".to_owned(), "4".to_owned()),
1410				("hello".to_owned(), "world".to_owned())
1411			]
1412		);
1413	}
1414
1415	#[tokio::test]
1416	async fn test_get() {
1417		let storage = TestStorage::default();
1418		let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
1419		let mut tree = LsmTreeMap::new(storage.clone(), settings);
1420		tree.insert(1, 100).await.unwrap();
1421		tree.insert(2, 200).await.unwrap();
1422		tree.insert(3, 300).await.unwrap();
1423		assert_eq!(tree.get(&1).await.unwrap(), Some(100));
1424		assert_eq!(tree.get(&2).await.unwrap(), Some(200));
1425		assert_eq!(tree.get(&3).await.unwrap(), Some(300));
1426	}
1427
1428	#[tokio::test]
1429	async fn test_compact() {
1430		let storage = TestStorage::default();
1431		let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
1432		let mut tree = LsmTreeMap::new(storage.clone(), settings);
1433		for i in 0..10 {
1434			tree.insert(i, i).await.unwrap();
1435		}
1436		assert_eq!(
1437			tree.stream().try_collect::<Vec<_>>().await.unwrap(),
1438			vec![(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),]
1439		);
1440		let stats = tree.stats().await.unwrap();
1441		assert_eq!(stats, LsmTreeStats { active_entries: 0, entries: 10, levels: 1, runs: 1 });
1442		// somethinf like this should happen:
1443		// flush: 2
1444		// store run: 2
1445		// flush: 2
1446		// store run: 2
1447		// store run: 4
1448		// compact: from 0 to 1 with size 4
1449		// drop level 0
1450		// flush: 2
1451		// store run: 2
1452		// store run: 6
1453		// compact: from 0 to 1 with size 6
1454		// drop level 0
1455		// flush: 2
1456		// store run: 2
1457		// store run: 8
1458		// compact: from 0 to 1 with size 8
1459		// drop level 0
1460		// flush: 2
1461		// store run: 2
1462		// store run: 10
1463		// compact: from 0 to 1 with size 10
1464		// drop level 0
1465	}
1466
1467	#[tokio::test]
1468	async fn test_stream() {
1469		let storage = TestStorage::default();
1470		let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
1471		let mut tree = LsmTreeMap::new(storage.clone(), settings);
1472		tree.insert(0, 0).await.unwrap();
1473		tree.insert(1, 1).await.unwrap();
1474		tree.insert(2, 2).await.unwrap();
1475		tree.insert(3, 3).await.unwrap();
1476		tree.remove(3).await.unwrap();
1477		tree.insert(3, 30).await.unwrap();
1478		tree.remove(3).await.unwrap();
1479		tree.insert(2, 20).await.unwrap();
1480		tree.flush_active().await.unwrap();
1481		assert_eq!(tree.stream().try_collect::<Vec<_>>().await.unwrap(), vec![(0, 0), (1, 1), (2, 20),]);
1482		assert_eq!(tree.reverse_stream().try_collect::<Vec<_>>().await.unwrap(), vec![(2, 20), (1, 1), (0, 0),]);
1483	}
1484
1485	#[tokio::test]
1486	async fn test_stream_query() {
1487		let storage = TestStorage::default();
1488		let settings = LsmTreeMapSettings { max_node_entries: 2, max_active_entries: 2, max_run_count: 2 };
1489		let mut tree = LsmTreeMap::new(storage.clone(), settings);
1490		for i in 0..10 {
1491			tree.insert(i, i).await.unwrap();
1492		}
1493		assert_eq!(
1494			tree.stream_query(Some(5)).try_collect::<Vec<_>>().await.unwrap(),
1495			vec![(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
1496		);
1497		assert_eq!(
1498			tree.reverse_stream_query(Some(5)).try_collect::<Vec<_>>().await.unwrap(),
1499			vec![(5, 5), (4, 4), (3, 3), (2, 2), (1, 1), (0, 0)]
1500		);
1501	}
1502
1503	#[tokio::test]
1504	async fn test_store_empty() {
1505		for count in [1, 10] {
1506			let storage = TestStorage::default();
1507			let settings = LsmTreeMapSettings { max_node_entries: 2, max_active_entries: 2, max_run_count: 2 };
1508			let mut tree = LsmTreeMap::new(storage.clone(), settings);
1509			for i in 0..count {
1510				tree.insert(i, i).await.unwrap();
1511			}
1512			for i in 0..count {
1513				tree.remove(i).await.unwrap();
1514			}
1515			assert!(tree.store().await.unwrap().is_none());
1516		}
1517	}
1518
1519	#[test]
1520	fn test_settings_default() {
1521		assert_eq!(LsmTreeMapSettings::default().max_node_entries, 256);
1522		assert_eq!(LsmTreeMapSettings::default().max_active_entries, 16384);
1523		assert_eq!(LsmTreeMapSettings::default().max_run_count, 16);
1524		assert!(LsmTreeMapSettings::default().is_default());
1525		let mut not_default = LsmTreeMapSettings::default();
1526		not_default.max_node_entries += 1;
1527		assert!(!not_default.is_default());
1528	}
1529
1530	#[test]
1531	fn test_serialize_value() {
1532		let empty_v = Value::<()>::Value(());
1533		let v = Value::<u8>::Value(0);
1534		let t = Value::<u8>::Tombstone;
1535
1536		// cbor
1537		let v_cbor = to_cbor(&v).unwrap();
1538		let empty_v_cbor = to_cbor(&empty_v).unwrap();
1539		let t_cbor = to_cbor(&t).unwrap();
1540		assert_eq!(from_cbor::<Value::<()>>(&empty_v_cbor).unwrap(), empty_v);
1541		assert_eq!(from_cbor::<Value::<u8>>(&v_cbor).unwrap(), v);
1542		assert_eq!(from_cbor::<Value::<u8>>(&t_cbor).unwrap(), t);
1543	}
1544}