use super::{dag_cbor_size_serializer::DagCborSizeSerializer, node_builder::NodeReader};
use crate::{
AnyBlockStorage, Block, BlockSerializer, BlockStorageExt, Link, Node, NodeBuilder, NodeBuilderError,
NodeSerializer, NodeStream, OptionLink, StorageError,
};
use anyhow::anyhow;
use bloomfilter::Bloom;
use cid::Cid;
use either::Either;
use futures::{pin_mut, stream, FutureExt, Stream, StreamExt, TryStreamExt};
use num_rational::Ratio;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque},
fmt::Debug,
future::ready,
hash::Hash,
marker::PhantomData,
num::TryFromIntError,
};
const INLINE_SIZE_FACTOR_LEVELS: Ratio<usize> = Ratio::new_raw(2, 8);
const INLINE_SIZE_FACTOR_ACTIVE: Ratio<usize> = Ratio::new_raw(4, 8);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Root<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
#[serde(rename = "l", default = "Node::default", skip_serializing_if = "Node::is_empty")]
pub levels: Node<Level<K, V>>,
#[serde(rename = "a", default = "Node::default", skip_serializing_if = "Node::is_empty")]
pub active: Node<(K, Value<V>)>,
#[serde(
rename = "s",
default = "LsmTreeMapSettings::default",
skip_serializing_if = "LsmTreeMapSettings::is_default"
)]
pub settings: LsmTreeMapSettings,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Level<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
#[serde(rename = "r", default = "OptionLink::default", skip_serializing_if = "OptionLink::is_none")]
pub runs: OptionLink<Node<Run<K, V>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Run<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
#[serde(rename = "e")]
pub entries: OptionLink<RunNode<K, V>>,
#[serde(rename = "s")]
pub size: u64,
#[serde(rename = "i")]
pub bloom: BloomFilter,
#[serde(rename = "l")]
pub min_key: K,
#[serde(rename = "h")]
pub max_key: K,
}
impl<K, V> Run<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
fn may_contains_key(&self, key: &K) -> bool {
key >= &self.min_key && key <= &self.max_key && self.bloom.may_contains_key(key)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub enum BloomFilter {
#[serde(rename = "b", with = "serde_bytes")]
Bloomfilter(Vec<u8>),
}
impl BloomFilter {
pub fn may_contains_key<K: Hash + Ord + Clone + Send + Sync + 'static>(&self, key: &K) -> bool {
match self {
BloomFilter::Bloomfilter(data) => {
if let Ok(bloom) = Bloom::from_slice(data) {
bloom.check(key)
} else {
true
}
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Value<V>
where
V: Clone + 'static,
{
#[serde(rename = "v")]
Value(V),
#[serde(rename = "t")]
Tombstone,
}
impl<V> PartialEq for Value<V>
where
V: PartialEq + Clone + 'static,
{
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Tombstone, Self::Tombstone) => true,
(Self::Value(a), Self::Value(b)) => a == b,
_ => false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RunNode<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
#[serde(rename = "n")]
Node {
#[serde(rename = "n")]
nodes: Vec<Link<Self>>,
#[serde(rename = "l")]
min_key: K,
#[serde(rename = "h")]
max_key: K,
},
#[serde(rename = "l")]
Leaf(BTreeMap<K, Value<V>>),
}
impl<K, V> RunNode<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
fn may_contains_key(&self, key: &K) -> bool {
match self {
RunNode::Node { nodes: _, min_key, max_key } => key >= min_key && key <= max_key,
RunNode::Leaf(items) => items.contains_key(key),
}
}
}
impl<K, V> NodeReader<(K, Value<V>)> for RunNode<K, V>
where
K: Hash + Ord + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
type Filter = RunNodeFilter<K>;
fn read(self, filter: &Self::Filter) -> Either<Vec<Cid>, Vec<(K, Value<V>)>> {
match self {
RunNode::Node { nodes, min_key, max_key } => {
if filter.test(&min_key, &max_key) {
Either::Left(Vec::new())
} else {
Either::Left(nodes.into_iter().map(Into::into).collect())
}
},
RunNode::Leaf(items) => Either::Right(items.into_iter().collect()),
}
}
}
#[derive(Debug, Default)]
pub enum RunNodeFilter<K> {
#[default]
None,
Max(K),
Min(K),
}
impl<K> RunNodeFilter<K> {
pub fn min(key: Option<K>) -> Self {
match key {
None => Self::None,
Some(key) => Self::Min(key),
}
}
pub fn max(key: Option<K>) -> Self {
match key {
None => Self::None,
Some(key) => Self::Max(key),
}
}
pub fn test(&self, min_key: &K, max_key: &K) -> bool
where
K: Ord,
{
match self {
RunNodeFilter::Min(filter_min_key) => min_key > filter_min_key,
RunNodeFilter::Max(filter_max_key) => max_key < filter_max_key,
RunNodeFilter::None => false,
}
}
}
#[derive(Debug)]
pub struct RunNodeSerializer<K, V> {
_d: PhantomData<(K, V)>,
pending: BTreeMap<Cid, (K, K)>,
}
impl<K, V> RunNodeSerializer<K, V> {
pub fn new() -> Self {
Self { _d: Default::default(), pending: Default::default() }
}
}
impl<K, V> NodeSerializer<RunNode<K, V>, (K, Value<V>)> for RunNodeSerializer<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn nodes(&mut self, nodes: Vec<Link<RunNode<K, V>>>) -> Result<RunNode<K, V>, NodeBuilderError> {
let mut min_key = None;
let mut max_key = None;
for node in nodes.iter() {
if let Some((node_min_key, node_max_key)) = self.pending.remove(node.cid()) {
if min_key.is_none() || Some(&node_min_key).cmp(&min_key.as_ref()) == Ordering::Less {
min_key = Some(node_min_key);
}
if max_key.is_none() || Some(&node_max_key).cmp(&max_key.as_ref()) == Ordering::Greater {
max_key = Some(node_max_key);
}
}
}
Ok(RunNode::Node {
nodes,
min_key: min_key.ok_or(NodeBuilderError::InvalidArgument(anyhow!("Unable to determine min key")))?,
max_key: max_key.ok_or(NodeBuilderError::InvalidArgument(anyhow!("Unable to determine max key")))?,
})
}
fn leaf(&mut self, entries: Vec<(K, Value<V>)>) -> Result<RunNode<K, V>, NodeBuilderError> {
Ok(RunNode::Leaf(entries.into_iter().collect()))
}
fn serialize(&mut self, max_bock_size: usize, node: RunNode<K, V>) -> Result<Block, NodeBuilderError> {
let block = BlockSerializer::new()
.with_max_block_size(max_bock_size)
.serialize(&node)
.map_err(|err| NodeBuilderError::Encoding(err.into()))?;
match &node {
RunNode::Node { nodes: _, min_key, max_key } => {
self.pending.insert(*block.cid(), (min_key.clone(), max_key.clone()));
},
RunNode::Leaf(items) => {
if let (Some((first, _)), Some((last, _))) = (items.first_key_value(), items.last_key_value()) {
self.pending.insert(*block.cid(), (first.clone(), last.clone()));
}
},
}
Ok(block)
}
fn item_size_hint(&self, item: &(K, Value<V>)) -> Option<usize> {
let mut serializer = DagCborSizeSerializer::new();
item.serialize(&mut serializer).ok()?;
Some(serializer.size)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LsmTreeMapSettings {
#[serde(
rename = "n",
default = "LsmTreeMapSettings::default_max_node_entries",
skip_serializing_if = "LsmTreeMapSettings::is_default_max_node_entries"
)]
pub max_node_entries: u64,
#[serde(
rename = "a",
default = "LsmTreeMapSettings::default_max_active_entries",
skip_serializing_if = "LsmTreeMapSettings::is_default_max_active_entries"
)]
pub max_active_entries: u64,
#[serde(
rename = "r",
default = "LsmTreeMapSettings::default_max_run_count",
skip_serializing_if = "LsmTreeMapSettings::is_default_max_run_count"
)]
pub max_run_count: u64,
}
impl LsmTreeMapSettings {
fn default_max_node_entries() -> u64 {
2u64.checked_pow(8).expect("to pow") }
fn default_max_active_entries() -> u64 {
2u64.checked_pow(14).expect("to pow") }
fn default_max_run_count() -> u64 {
2u64.checked_pow(4).expect("to pow") }
fn is_default_max_node_entries(value: &u64) -> bool {
*value == Self::default_max_node_entries()
}
fn is_default_max_active_entries(value: &u64) -> bool {
*value == Self::default_max_active_entries()
}
fn is_default_max_run_count(value: &u64) -> bool {
*value == Self::default_max_run_count()
}
pub fn is_default(&self) -> bool {
self == &Self::default()
}
}
impl Default for LsmTreeMapSettings {
fn default() -> Self {
Self {
max_node_entries: Self::default_max_node_entries(),
max_active_entries: Self::default_max_active_entries(),
max_run_count: Self::default_max_run_count(),
}
}
}
#[derive(Clone)]
pub struct LsmTreeMap<S, K, V>
where
S: AnyBlockStorage,
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
storage: S,
root: OptionLink<Root<K, V>>,
active: BTreeMap<K, Value<V>>,
settings: LsmTreeMapSettings,
}
impl<S, K, V> LsmTreeMap<S, K, V>
where
S: AnyBlockStorage,
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
pub fn new(storage: S, settings: LsmTreeMapSettings) -> Self {
Self { active: Default::default(), settings, root: OptionLink::none(), storage }
}
pub async fn load(storage: S, root: Link<Root<K, V>>) -> Result<Self, StorageError> {
let mut result = Self { active: Default::default(), settings: Default::default(), root: root.into(), storage };
if let Some(root) = result.root().await? {
result.settings = root.settings;
result.active = NodeStream::from_node(result.storage.clone(), root.active, None)
.try_collect()
.await?;
}
Ok(result)
}
pub async fn insert(&mut self, key: K, value: V) -> Result<(), StorageError> {
self.active.insert(key, Value::Value(value));
if self.active.len() >= self.settings.max_active_entries as usize {
self.flush_active().boxed().await?;
}
Ok(())
}
pub async fn remove(&mut self, key: K) -> Result<(), StorageError> {
if self.root.is_none() {
self.active.remove(&key);
return Ok(());
}
self.active.insert(key, Value::Tombstone);
if self.active.len() >= self.settings.max_active_entries as usize {
self.flush_active().boxed().await?;
}
Ok(())
}
pub async fn get(&self, key: &K) -> Result<Option<V>, StorageError> {
match self.active.get(key) {
Some(Value::Value(v)) => Ok(Some(v.clone())),
Some(Value::Tombstone) => Ok(None),
None => {
let runs = Self::load_levels_and_runs(self.storage.clone(), self.root);
pin_mut!(runs);
while let Some(item) = runs.try_next().await? {
if let Some((_, run)) = item.right() {
if run.may_contains_key(key) {
let mut stack: VecDeque<Link<RunNode<K, V>>> = Default::default();
if let Some(cid) = run.entries.link() {
stack.push_back(cid);
}
while let Some(cid) = stack.pop_front() {
let node = self.storage.get_value(&cid).await?;
if node.may_contains_key(key) {
match node {
RunNode::Node { nodes, min_key: _, max_key: _ } => {
stack.extend(nodes.into_iter());
},
RunNode::Leaf(mut items) => {
if let Some(value) = items.remove(key) {
return match value {
Value::Value(value) => Ok(Some(value)),
Value::Tombstone => Ok(None),
};
}
},
}
}
}
}
}
}
Ok(None)
},
}
}
pub async fn contains_key(&self, key: &K) -> Result<bool, StorageError> {
Ok(self.get(key).await?.is_some())
}
pub fn stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
self.stream_query(None)
}
pub fn stream_query(&self, start_at: Option<K>) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
self.create_stream(None, start_at).try_filter_map(|item| {
ready(Ok(match item.1 {
Value::Value(value) => Some((item.0, value)),
Value::Tombstone => None,
}))
})
}
pub fn reverse_stream(&self) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
self.reverse_stream_query(None)
}
pub fn reverse_stream_query(
&self,
start_at: Option<K>,
) -> impl Stream<Item = Result<(K, V), StorageError>> + use<S, K, V> {
self.create_reverse_stream(None, start_at).try_filter_map(|item| {
ready(Ok(match item.1 {
Value::Value(value) => Some((item.0, value)),
Value::Tombstone => None,
}))
})
}
pub async fn store(&mut self) -> Result<OptionLink<Root<K, V>>, StorageError> {
let mut root = match self.root().await? {
Some(root) => {
if self.is_empty().await? {
return Ok(OptionLink::none());
}
root
},
None => {
if self.active.is_empty() || self.is_empty().await? {
return Ok(OptionLink::none());
}
Root { levels: Default::default(), active: Default::default(), settings: self.settings.clone() }
},
};
root.active =
store_active(&self.storage, self.settings.max_node_entries, stream::iter(self.active.iter()).map(Ok))
.await?;
self.root = self.storage.set_value(&root).await?.into();
Ok(self.root)
}
pub async fn stats(&self) -> Result<LsmTreeStats, StorageError> {
Self::load_levels_and_runs(self.storage.clone(), self.root)
.try_fold(
LsmTreeStats { entries: 0, active_entries: self.active.len(), levels: 0, runs: 0 },
|mut result, item| {
match item {
Either::Left(_) => result.levels += 1,
Either::Right((_, run)) => {
result.runs += 1;
result.entries += run.size as usize;
},
}
ready(Ok(result))
},
)
.await
}
pub async fn is_empty(&self) -> Result<bool, StorageError> {
Ok(self.min_key().await?.is_none())
}
pub async fn min_key(&self) -> Result<Option<K>, StorageError> {
let stream = self.stream();
pin_mut!(stream);
let first = stream.try_next().await?;
Ok(first.map(|(key, _)| key))
}
pub async fn max_key(&self) -> Result<Option<K>, StorageError> {
let stream = self.reverse_stream();
pin_mut!(stream);
let first = stream.try_next().await?;
Ok(first.map(|(key, _)| key))
}
pub async fn compact(&mut self, flush_active: bool) -> Result<(), StorageError> {
if flush_active {
self.flush_active().boxed().await?;
}
self.compact_level(0, Some(self.settings.max_run_count as usize)).await?;
Ok(())
}
fn create_stream(
&self,
only_run_indicies: Option<BTreeSet<usize>>,
start_at: Option<K>,
) -> impl Stream<Item = Result<(K, Value<V>), StorageError>> + Send + use<S, K, V> {
let storage = self.storage.clone();
let active = self.active.clone();
let root = self.root;
async_stream::try_stream! {
let mut heap = match only_run_indicies {
Some(_) => BinaryHeap::new(),
None => BinaryHeap::<TreeStreamItem<K, V>>::from(
active
.into_iter()
.map(|item| TreeStreamItem { run: None, item })
.collect::<Vec<_>>(),
),
};
let mut runs: Vec<(usize, RunNodeStream<S, K, V>)> = Self::load_levels_and_runs(storage.clone(), root)
.try_filter_map(|item| ready(Ok(item.right())))
.try_filter(|(index, _)| ready(match &only_run_indicies {
Some(run_indicies) => run_indicies.contains(index),
None => true,
}))
.map_ok(|(index, run)| (index, NodeStream::from_link(storage.clone(), run.entries).with_filter(RunNodeFilter::max(start_at.clone()))))
.try_collect()
.await?;
for (run_index, run) in runs.iter_mut() {
if let Some(item) = run.try_next().await? {
heap.push(TreeStreamItem { run: Some(*run_index), item });
}
}
while let Some(item) = Self::pop_and_fetch(&mut heap, &mut runs).await? {
while let Some(next_item) = heap.peek() {
if next_item.item.0 == item.item.0 {
Self::pop_and_fetch(&mut heap, &mut runs).await?;
} else {
break;
}
}
if let Some(start_at) = &start_at {
if start_at > &item.item.0 {
continue;
}
}
yield item.item;
}
}
}
fn create_reverse_stream(
&self,
only_run_indicies: Option<BTreeSet<usize>>,
start_at: Option<K>,
) -> impl Stream<Item = Result<(K, Value<V>), StorageError>> + use<S, K, V> {
let storage = self.storage.clone();
let active = self.active.clone();
let root = self.root;
async_stream::try_stream! {
let mut heap = match only_run_indicies {
Some(_) => BinaryHeap::new(),
None => BinaryHeap::<ReverseTreeStreamItem<K, V>>::from(
active
.into_iter()
.map(|item| ReverseTreeStreamItem { run: None, item })
.collect::<Vec<_>>(),
),
};
let mut runs: Vec<(usize, RunNodeStream<S, K, V>)> = Self::load_levels_and_runs(storage.clone(), root)
.try_filter_map(|item| ready(Ok(item.right())))
.try_filter(|(index, _)| ready(match &only_run_indicies {
Some(run_indicies) => run_indicies.contains(index),
None => true,
}))
.map_ok(|(index, run)| (index, NodeStream::from_link(storage.clone(), run.entries).with_reverse().with_filter(RunNodeFilter::min(start_at.clone()))))
.try_collect()
.await?;
for (run_index, run) in runs.iter_mut() {
if let Some(item) = run.try_next().await? {
heap.push(ReverseTreeStreamItem { run: Some(*run_index), item });
}
}
while let Some(item) = Self::pop_and_fetch_reverse(&mut heap, &mut runs).await? {
while let Some(next_item) = heap.peek() {
if next_item.item.0 == item.item.0 {
Self::pop_and_fetch_reverse(&mut heap, &mut runs).await?;
} else {
break;
}
}
if let Some(start_at) = &start_at {
if start_at < &item.item.0 {
continue;
}
}
yield item.item;
}
}
}
async fn pop_and_fetch(
heap: &mut BinaryHeap<TreeStreamItem<K, V>>,
runs: &mut [(usize, RunNodeStream<S, K, V>)],
) -> Result<Option<TreeStreamItem<K, V>>, StorageError> {
if let Some(item) = heap.pop() {
if let Some(run_index) = item.run {
if let Some((_, run)) = runs.get_mut(run_index) {
if let Some(item) = run.try_next().await? {
heap.push(TreeStreamItem { run: Some(run_index), item });
}
}
}
Ok(Some(item))
} else {
Ok(None)
}
}
async fn pop_and_fetch_reverse(
heap: &mut BinaryHeap<ReverseTreeStreamItem<K, V>>,
runs: &mut [(usize, RunNodeStream<S, K, V>)],
) -> Result<Option<ReverseTreeStreamItem<K, V>>, StorageError> {
if let Some(item) = heap.pop() {
if let Some(run_index) = item.run {
if let Some((_, run)) = runs.get_mut(run_index) {
if let Some(item) = run.try_next().await? {
heap.push(ReverseTreeStreamItem { run: Some(run_index), item });
}
}
}
Ok(Some(item))
} else {
Ok(None)
}
}
async fn flush_active(&mut self) -> Result<usize, StorageError> {
if self.active.is_empty() {
return Ok(0);
}
let min_key = if let Some((min_key, _)) = self.active.first_key_value() {
min_key.clone()
} else {
return Ok(0);
};
let max_key = if let Some((max_key, _)) = self.active.last_key_value() {
max_key.clone()
} else {
return Ok(0);
};
if min_key == max_key {
return Ok(0);
}
let run = if let Some(run) = store_run(
&self.storage,
self.settings.max_node_entries,
stream::iter(self.active.clone().into_iter()).map(Ok),
self.active.len(),
)
.await?
{
run
} else {
return Ok(0);
};
let mut root = match self.root().await? {
Some(root) => root,
None => Root { levels: Default::default(), active: Default::default(), settings: self.settings.clone() },
};
let mut levels = Self::load_levels(self.storage.clone(), root.levels.clone()).await?;
if levels.is_empty() {
levels.insert(0, Level { runs: Default::default() });
}
let mut runs = NodeStream::from_link(self.storage.clone(), levels[0].runs)
.try_collect::<VecDeque<_>>()
.await?;
runs.push_front(run);
let runs_count = runs.len();
levels[0].runs =
store_items(&self.storage, self.settings.max_node_entries, stream::iter(runs.iter()).map(Ok)).await?;
root.levels = self.store_levels(levels).await?;
self.root = self.storage.set_value(&root).await?.into();
self.active.clear();
if runs_count >= self.settings.max_run_count as usize {
self.compact_level(0, Some(self.settings.max_run_count as usize)).await?;
}
Ok(0)
}
async fn compact_level(&mut self, level_index: usize, cascade: Option<usize>) -> Result<(), StorageError> {
let next_level_index = level_index + 1;
let mut root = if let Some(root) = self.root().await? { root } else { return Ok(()) };
let mut levels = Self::load_levels(self.storage.clone(), root.levels.clone()).await?;
if levels.get(level_index).is_none() {
return Ok(());
}
if levels.get(next_level_index).is_none() {
levels.insert(next_level_index, Level { runs: OptionLink::none() });
}
let mut runs = Vec::new();
{
let levels_and_runs = Self::load_levels_and_runs(self.storage.clone(), self.root);
pin_mut!(levels_and_runs);
let mut current_global_level_index = 0;
while let Some(level_or_run) = levels_and_runs.try_next().await? {
match level_or_run {
Either::Left((global_level_index, _level)) => {
current_global_level_index = global_level_index;
if current_global_level_index > next_level_index {
break;
}
},
Either::Right((global_run_index, run)) => {
if current_global_level_index == level_index {
runs.push((current_global_level_index, global_run_index, run));
} else if current_global_level_index == next_level_index {
if runs
.iter()
.any(|(_, _, r)| r.min_key <= run.max_key && run.min_key <= r.max_key)
{
runs.push((current_global_level_index, global_run_index, run));
}
}
},
}
}
}
if runs.is_empty() {
return Ok(());
}
let entries =
self.create_stream(Some(runs.iter().map(|(_, global_run_index, _)| *global_run_index).collect()), None);
let run = store_run(
&self.storage,
self.settings.max_node_entries,
entries,
runs.iter().fold(0, |result, (_, _, run)| result + run.size) as usize,
)
.await?;
let run = if let Some(run) = run {
run
} else {
return Ok(());
};
let (level_run_count, next_level_run_count) = {
let mut level_runs = NodeStream::from_link(self.storage.clone(), levels[level_index].runs)
.try_collect::<Vec<_>>()
.await?;
let mut next_level_runs = NodeStream::from_link(self.storage.clone(), levels[next_level_index].runs)
.try_collect::<Vec<_>>()
.await?;
for (global_level_index, global_run_index, _) in runs.iter().rev() {
let local_run_index = *global_run_index - *global_level_index;
if *global_level_index == level_index && level_runs.get(local_run_index).is_some() {
level_runs.remove(*global_run_index - *global_level_index);
}
if *global_level_index == next_level_index && next_level_runs.get(local_run_index).is_some() {
next_level_runs.remove(*global_run_index - *global_level_index);
}
}
next_level_runs.insert(0, run);
let level_run_count = level_runs.len();
let next_level_run_count = next_level_runs.len();
levels[level_index].runs =
store_items(&self.storage, self.settings.max_node_entries, stream::iter(&level_runs).map(Ok)).await?;
levels[next_level_index].runs =
store_items(&self.storage, self.settings.max_node_entries, stream::iter(&next_level_runs).map(Ok))
.await?;
(level_run_count, next_level_run_count)
};
let next_level_index = if level_run_count == 0 {
levels.remove(level_index);
level_index
} else {
next_level_index
};
root.levels = self.store_levels(levels).await?;
self.root = self.storage.set_value(&root).await?.into();
if let Some(max_run_count) = cascade {
if next_level_run_count >= max_run_count {
Box::pin(self.compact_level(next_level_index, cascade)).await?;
}
}
Ok(())
}
async fn root(&self) -> Result<Option<Root<K, V>>, StorageError> {
Self::load_root(&self.storage, self.root).await
}
async fn store_levels(&self, levels: Vec<Level<K, V>>) -> Result<Node<Level<K, V>>, StorageError> {
let mut builder = NodeBuilder::default()
.with_items_size_max((INLINE_SIZE_FACTOR_LEVELS * self.storage.max_block_size()).to_integer());
builder.extend(levels).map_err(|e| StorageError::InvalidArgument(e.into()))?;
let (node, blocks) = builder.into_node().map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in blocks {
self.storage.set(block).await?;
}
Ok(node)
}
async fn load_root(storage: &S, root: OptionLink<Root<K, V>>) -> Result<Option<Root<K, V>>, StorageError> {
storage.get_value_or_none(&root).await
}
async fn load_levels(storage: S, levels: Node<Level<K, V>>) -> Result<Vec<Level<K, V>>, StorageError> {
NodeStream::from_node(storage, levels, None).try_collect().await
}
fn load_levels_and_runs(
storage: S,
root: OptionLink<Root<K, V>>,
) -> impl Stream<Item = Result<EitherLevelOrRun<K, V>, StorageError>> + Send {
async_stream::try_stream! {
let mut global_level_index = 0;
let mut global_run_index = 0;
if let Some(root) = Self::load_root(&storage, root).await? {
for level in Self::load_levels(storage.clone(), root.levels).await? {
let runs = NodeStream::from_link(storage.clone(), level.runs);
yield Either::Left((global_level_index, level));
for await run in runs {
yield Either::Right((global_run_index, run?));
global_run_index += 1;
}
global_level_index += 1;
}
}
}
}
}
type EitherLevelOrRun<K, V> = Either<(usize, Level<K, V>), (usize, Run<K, V>)>;
type RunNodeStream<S, K, V> = NodeStream<S, (K, Value<V>), RunNode<K, V>>;
async fn store_items<'a, S, T>(
storage: &S,
max_node_entries: u64,
items: impl Stream<Item = Result<&'a T, StorageError>> + Send,
) -> Result<OptionLink<Node<T>>, StorageError>
where
S: AnyBlockStorage,
T: Clone + Serialize + 'a,
{
let mut node_builder = NodeBuilder::<_>::new(
storage.max_block_size(),
max_node_entries
.try_into()
.map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
Default::default(),
);
pin_mut!(items);
while let Some(item) = items.try_next().await? {
node_builder.push(item).map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in node_builder.take_blocks() {
storage.set(block).await?;
}
}
let (root, blocks) = node_builder
.into_blocks()
.map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in blocks.into_iter() {
storage.set(block).await?;
}
Ok(root.cid().into())
}
async fn store_active<S, K, V>(
storage: &S,
max_node_entries: u64,
entries: impl Stream<Item = Result<(&K, &Value<V>), StorageError>>,
) -> Result<Node<(K, Value<V>)>, StorageError>
where
S: AnyBlockStorage,
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
let mut node_builder = NodeBuilder::<_>::new(
storage.max_block_size(),
max_node_entries
.try_into()
.map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
Default::default(),
)
.with_items_size_max((INLINE_SIZE_FACTOR_ACTIVE * storage.max_block_size()).to_integer());
pin_mut!(entries);
while let Some(item) = entries.try_next().await? {
node_builder
.push((item.0.clone(), item.1.clone()))
.map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in node_builder.take_blocks() {
storage.set(block).await?;
}
}
let (node, blocks) = node_builder.into_node().map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in blocks.into_iter() {
storage.set(block).await?;
}
Ok(node)
}
async fn store_run_node<S, K, V>(
storage: &S,
max_node_entries: u64,
entries: impl Stream<Item = Result<(K, Value<V>), StorageError>>,
) -> Result<OptionLink<RunNode<K, V>>, StorageError>
where
S: AnyBlockStorage,
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
let mut node_builder = NodeBuilder::<(K, Value<V>), RunNode<K, V>, RunNodeSerializer<K, V>>::new(
storage.max_block_size(),
max_node_entries
.try_into()
.map_err(|e: TryFromIntError| StorageError::InvalidArgument(e.into()))?,
RunNodeSerializer::new(),
);
pin_mut!(entries);
while let Some(item) = entries.try_next().await? {
node_builder.push(item).map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in node_builder.take_blocks() {
storage.set(block).await?;
}
}
let (root, blocks) = node_builder
.into_blocks()
.map_err(|e| StorageError::InvalidArgument(e.into()))?;
for block in blocks.into_iter() {
storage.set(block).await?;
}
Ok(root)
}
async fn store_run<S, K, V>(
storage: &S,
max_node_entries: u64,
entries: impl Stream<Item = Result<(K, Value<V>), StorageError>>,
entries_size_hint: usize,
) -> Result<Option<Run<K, V>>, StorageError>
where
S: AnyBlockStorage,
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
let mut min_key: Option<K> = None;
let mut max_key: Option<K> = None;
let mut size = 0;
let mut bloom = Bloom::<K>::new_for_fp_rate_with_seed(entries_size_hint, 0.001, &[0; 32])
.map_err(|e| StorageError::InvalidArgument(anyhow!("bloomfilter: {}", e)))?;
let entries_link = store_run_node(
storage,
max_node_entries,
entries.inspect_ok(|(key, _)| {
if min_key.is_none() || Some(key).cmp(&min_key.as_ref()) == Ordering::Less {
min_key = Some(key.clone());
}
if max_key.is_none() || Some(key).cmp(&max_key.as_ref()) == Ordering::Greater {
max_key = Some(key.clone());
}
size += 1;
bloom.set(key);
}),
)
.await?;
let (min_key, max_key) = match (min_key, max_key) {
(Some(min_key), Some(max_key)) => (min_key, max_key),
_ => return Ok(None),
};
let next_run =
Run { entries: entries_link, size, bloom: BloomFilter::Bloomfilter(bloom.to_bytes()), min_key, max_key };
Ok(Some(next_run))
}
#[derive(Debug, Clone, PartialEq)]
pub struct LsmTreeStats {
pub entries: usize,
pub active_entries: usize,
pub levels: usize,
pub runs: usize,
}
#[derive(Debug)]
struct TreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
run: Option<usize>,
item: (K, Value<V>),
}
impl<K, V> PartialEq for TreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn eq(&self, other: &Self) -> bool {
self.item.0 == other.item.0
}
}
impl<K, V> Eq for TreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
}
impl<K, V> PartialOrd for TreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<K, V> Ord for TreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.item.0.cmp(&other.item.0) {
Ordering::Less => Ordering::Greater,
Ordering::Greater => Ordering::Less,
Ordering::Equal => {
other.run.cmp(&self.run)
},
}
}
}
#[derive(Debug)]
struct ReverseTreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
run: Option<usize>,
item: (K, Value<V>),
}
impl<K, V> PartialEq for ReverseTreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn eq(&self, other: &Self) -> bool {
self.item.0 == other.item.0
}
}
impl<K, V> Eq for ReverseTreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
}
impl<K, V> PartialOrd for ReverseTreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<K, V> Ord for ReverseTreeStreamItem<K, V>
where
K: Hash + Ord + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.item.0.cmp(&other.item.0) {
Ordering::Less => Ordering::Less,
Ordering::Greater => Ordering::Greater,
Ordering::Equal => {
other.run.cmp(&self.run)
},
}
}
}
#[cfg(test)]
mod tests {
use super::{LsmTreeMap, Value};
use crate::{
from_cbor,
library::{lsm_tree_map::LsmTreeStats, test::TestStorage},
to_cbor, LsmTreeMapSettings,
};
use futures::TryStreamExt;
#[tokio::test]
async fn smoke() {
let storage = TestStorage::default();
let mut tree = LsmTreeMap::new(storage.clone(), Default::default());
tree.insert("hello".to_owned(), "world".to_owned()).await.unwrap();
tree.insert("1".to_owned(), "2".to_owned()).await.unwrap();
tree.insert("3".to_owned(), "4".to_owned()).await.unwrap();
assert_eq!(
tree.stream().try_collect::<Vec<_>>().await.unwrap(),
vec![
("1".to_owned(), "2".to_owned()),
("3".to_owned(), "4".to_owned()),
("hello".to_owned(), "world".to_owned())
]
);
let root = tree.store().await.unwrap().unwrap();
let tree2 = LsmTreeMap::load(storage.clone(), root).await.unwrap();
assert_eq!(
tree2.stream().try_collect::<Vec<_>>().await.unwrap(),
vec![
("1".to_owned(), "2".to_owned()),
("3".to_owned(), "4".to_owned()),
("hello".to_owned(), "world".to_owned())
]
);
}
#[tokio::test]
async fn test_get() {
let storage = TestStorage::default();
let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
let mut tree = LsmTreeMap::new(storage.clone(), settings);
tree.insert(1, 100).await.unwrap();
tree.insert(2, 200).await.unwrap();
tree.insert(3, 300).await.unwrap();
assert_eq!(tree.get(&1).await.unwrap(), Some(100));
assert_eq!(tree.get(&2).await.unwrap(), Some(200));
assert_eq!(tree.get(&3).await.unwrap(), Some(300));
}
#[tokio::test]
async fn test_compact() {
let storage = TestStorage::default();
let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
let mut tree = LsmTreeMap::new(storage.clone(), settings);
for i in 0..10 {
tree.insert(i, i).await.unwrap();
}
assert_eq!(
tree.stream().try_collect::<Vec<_>>().await.unwrap(),
vec![(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),]
);
let stats = tree.stats().await.unwrap();
assert_eq!(stats, LsmTreeStats { active_entries: 0, entries: 10, levels: 1, runs: 1 });
}
#[tokio::test]
async fn test_stream() {
let storage = TestStorage::default();
let settings = LsmTreeMapSettings { max_node_entries: 32, max_active_entries: 2, max_run_count: 2 };
let mut tree = LsmTreeMap::new(storage.clone(), settings);
tree.insert(0, 0).await.unwrap();
tree.insert(1, 1).await.unwrap();
tree.insert(2, 2).await.unwrap();
tree.insert(3, 3).await.unwrap();
tree.remove(3).await.unwrap();
tree.insert(3, 30).await.unwrap();
tree.remove(3).await.unwrap();
tree.insert(2, 20).await.unwrap();
tree.flush_active().await.unwrap();
assert_eq!(tree.stream().try_collect::<Vec<_>>().await.unwrap(), vec![(0, 0), (1, 1), (2, 20),]);
assert_eq!(tree.reverse_stream().try_collect::<Vec<_>>().await.unwrap(), vec![(2, 20), (1, 1), (0, 0),]);
}
#[tokio::test]
async fn test_stream_query() {
let storage = TestStorage::default();
let settings = LsmTreeMapSettings { max_node_entries: 2, max_active_entries: 2, max_run_count: 2 };
let mut tree = LsmTreeMap::new(storage.clone(), settings);
for i in 0..10 {
tree.insert(i, i).await.unwrap();
}
assert_eq!(
tree.stream_query(Some(5)).try_collect::<Vec<_>>().await.unwrap(),
vec![(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
);
assert_eq!(
tree.reverse_stream_query(Some(5)).try_collect::<Vec<_>>().await.unwrap(),
vec![(5, 5), (4, 4), (3, 3), (2, 2), (1, 1), (0, 0)]
);
}
#[tokio::test]
async fn test_store_empty() {
for count in [1, 10] {
let storage = TestStorage::default();
let settings = LsmTreeMapSettings { max_node_entries: 2, max_active_entries: 2, max_run_count: 2 };
let mut tree = LsmTreeMap::new(storage.clone(), settings);
for i in 0..count {
tree.insert(i, i).await.unwrap();
}
for i in 0..count {
tree.remove(i).await.unwrap();
}
assert!(tree.store().await.unwrap().is_none());
}
}
#[test]
fn test_settings_default() {
assert_eq!(LsmTreeMapSettings::default().max_node_entries, 256);
assert_eq!(LsmTreeMapSettings::default().max_active_entries, 16384);
assert_eq!(LsmTreeMapSettings::default().max_run_count, 16);
assert!(LsmTreeMapSettings::default().is_default());
let mut not_default = LsmTreeMapSettings::default();
not_default.max_node_entries += 1;
assert!(!not_default.is_default());
}
#[test]
fn test_serialize_value() {
let empty_v = Value::<()>::Value(());
let v = Value::<u8>::Value(0);
let t = Value::<u8>::Tombstone;
let v_cbor = to_cbor(&v).unwrap();
let empty_v_cbor = to_cbor(&empty_v).unwrap();
let t_cbor = to_cbor(&t).unwrap();
assert_eq!(from_cbor::<Value::<()>>(&empty_v_cbor).unwrap(), empty_v);
assert_eq!(from_cbor::<Value::<u8>>(&v_cbor).unwrap(), v);
assert_eq!(from_cbor::<Value::<u8>>(&t_cbor).unwrap(), t);
}
}