coordinode-lsm-tree 4.2.0

A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs) — CoordiNode fork
Documentation
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use super::KeyedBlockHandle;
use crate::{
    Cache, CompressionType, GlobalTableId, SeqNo, UserKey,
    comparator::SharedComparator,
    encryption::EncryptionProvider,
    file_accessor::FileAccessor,
    table::{
        BlockHandle, IndexBlock,
        block::BlockType,
        block_index::{BlockIndexIter, iter::OwnedIndexBlockIter},
        util::load_block,
    },
};
use std::{path::PathBuf, sync::Arc};

#[cfg(feature = "metrics")]
use crate::Metrics;

/// Index that translates item keys to data block handles
///
/// The index is loaded on demand.
pub struct VolatileBlockIndex {
    pub(crate) table_id: GlobalTableId,
    pub(crate) path: Arc<PathBuf>,
    pub(crate) file_accessor: FileAccessor,
    pub(crate) cache: Arc<Cache>,
    pub(crate) handle: BlockHandle,
    pub(crate) compression: CompressionType,
    pub(crate) encryption: Option<Arc<dyn EncryptionProvider>>,
    pub(crate) comparator: SharedComparator,

    #[cfg(feature = "metrics")]
    pub(crate) metrics: Arc<Metrics>,
}

impl VolatileBlockIndex {
    pub fn forward_reader(&self, needle: &[u8], seqno: SeqNo) -> Iter {
        let mut iter = Iter::new(self);
        iter.seek_lower(needle, seqno);
        iter
    }

    pub fn iter(&self) -> Iter {
        Iter::new(self)
    }
}

pub struct Iter {
    inner: Option<OwnedIndexBlockIter>,
    table_id: GlobalTableId,
    path: Arc<PathBuf>,
    file_accessor: FileAccessor,
    cache: Arc<Cache>,
    handle: BlockHandle,
    compression: CompressionType,
    encryption: Option<Arc<dyn EncryptionProvider>>,
    comparator: SharedComparator,

    lo: Option<(UserKey, SeqNo)>,
    hi: Option<(UserKey, SeqNo)>,

    #[cfg(feature = "metrics")]
    pub(crate) metrics: Arc<Metrics>,
}

impl Iter {
    fn new(index: &VolatileBlockIndex) -> Self {
        Self {
            inner: None,
            table_id: index.table_id,
            path: index.path.clone(),
            file_accessor: index.file_accessor.clone(),
            cache: index.cache.clone(),
            handle: index.handle,
            compression: index.compression,
            encryption: index.encryption.clone(),
            comparator: index.comparator.clone(),

            lo: None,
            hi: None,

            #[cfg(feature = "metrics")]
            metrics: index.metrics.clone(),
        }
    }
}

impl BlockIndexIter for Iter {
    fn seek_lower(&mut self, key: &[u8], seqno: SeqNo) -> bool {
        self.lo = Some((key.into(), seqno));
        true
    }

    fn seek_upper(&mut self, key: &[u8], seqno: SeqNo) -> bool {
        self.hi = Some((key.into(), seqno));
        true
    }
}

impl Iterator for Iter {
    type Item = crate::Result<KeyedBlockHandle>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(inner) = &mut self.inner {
            inner.next().map(Ok)
        } else {
            let block = fail_iter!(load_block(
                self.table_id,
                &self.path,
                &self.file_accessor,
                &self.cache,
                &self.handle,
                BlockType::Index,
                self.compression,
                self.encryption.as_deref(),
                #[cfg(zstd_any)]
                None,
                #[cfg(feature = "metrics")]
                &self.metrics,
            ));
            let index_block = IndexBlock::new(block);
            let lo = self.lo.as_ref().map(|(k, s)| (k.as_ref(), *s));
            let hi = self.hi.as_ref().map(|(k, s)| (k.as_ref(), *s));

            let mut iter = OwnedIndexBlockIter::from_block_with_bounds(
                index_block,
                self.comparator.clone(),
                lo,
                hi,
            )?;

            let next_item = iter.next().map(Ok);

            self.inner = Some(iter);

            next_item
        }
    }
}

impl DoubleEndedIterator for Iter {
    fn next_back(&mut self) -> Option<Self::Item> {
        if let Some(inner) = &mut self.inner {
            inner.next_back().map(Ok)
        } else {
            let block = fail_iter!(load_block(
                self.table_id,
                &self.path,
                &self.file_accessor,
                &self.cache,
                &self.handle,
                BlockType::Index,
                self.compression,
                self.encryption.as_deref(),
                #[cfg(zstd_any)]
                None,
                #[cfg(feature = "metrics")]
                &self.metrics,
            ));
            let index_block = IndexBlock::new(block);
            let lo = self.lo.as_ref().map(|(k, s)| (k.as_ref(), *s));
            let hi = self.hi.as_ref().map(|(k, s)| (k.as_ref(), *s));

            let mut iter = OwnedIndexBlockIter::from_block_with_bounds(
                index_block,
                self.comparator.clone(),
                lo,
                hi,
            )?;

            let next_item = iter.next_back().map(Ok);

            self.inner = Some(iter);

            next_item
        }
    }
}