tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::rc::Rc;

use bytes::{Bytes, BytesMut};
use tempest_io::{Io, IoBuf};
use tempest_rt::read_exact;

use crate::{
    StorageError,
    base::{Comparer, InternalKey},
    iterator::StorageIterator,
    sst::{SstHandle, block::BlockIterator},
};

pub(crate) struct SstIterator<I: Io, C: Comparer> {
    handle: Rc<SstHandle<I, C>>,
    current: Option<BlockIterator<C>>,
    next_index_pos: usize,
    last_key: Option<InternalKey<C, Bytes>>,
    /// Entry consumed during seek that should be returned first by next()
    peeked: Option<(InternalKey<C, Bytes>, Bytes)>,
}

impl<I: Io, C: Comparer> SstIterator<I, C> {
    pub(crate) fn new(handle: Rc<SstHandle<I, C>>) -> Self {
        Self {
            handle,
            current: None,
            next_index_pos: 0,
            last_key: None,
            peeked: None,
        }
    }

    pub(crate) fn into_handle(self) -> Rc<SstHandle<I, C>> {
        self.handle
    }

    async fn load_block(
        &mut self,
        index: usize,
        block_offset: u64,
        block_size: u32,
    ) -> Result<(), StorageError> {
        let block_size = block_size as usize;
        let block_buf = BytesMut::with_capacity(block_size);
        let (result, s) =
            read_exact::<_, I>(self.handle.fd, block_buf.slice(..block_size), block_offset).await;
        result?;
        let block_raw = s.into_inner().freeze();
        assert_eq!(block_raw.len(), block_size);
        self.current = Some(BlockIterator::new(block_raw));
        self.next_index_pos = index + 1;
        Ok(())
    }

    async fn next_block(&mut self) -> Result<Option<&mut BlockIterator<C>>, StorageError> {
        if self.current.is_some() {
            return Ok(self.current.as_mut());
        }

        let index = self.next_index_pos;
        let Some((block_offset, block_size)) = self.handle.block_index.get_block_by_index(index)
        else {
            return Ok(None);
        };

        self.load_block(index, block_offset, block_size).await?;
        Ok(self.current.as_mut())
    }
}

impl<I: Io, C: Comparer> StorageIterator<I, C> for SstIterator<I, C> {
    async fn next(&mut self) -> Result<Option<(InternalKey<C, Bytes>, Bytes)>, StorageError> {
        // return any entry stashed during seek first
        if let Some(entry) = self.peeked.take() {
            self.last_key = Some(entry.0.clone());
            return Ok(Some(entry));
        }

        if let Some(iter) = self.next_block().await? {
            match iter.next() {
                Some(entry) => {
                    self.last_key = Some(entry.0.clone());
                    return Ok(Some(entry));
                }
                None => {
                    self.current = None;
                    if let Some(iter) = self.next_block().await? {
                        let entry = iter.next();
                        if let Some(ref e) = entry {
                            self.last_key = Some(e.0.clone());
                        }
                        return Ok(entry);
                    }
                }
            }
        }

        Ok(None)
    }

    async fn seek(&mut self, key: InternalKey<C, Bytes>) -> Result<(), StorageError> {
        // backward seek is a no-op
        if let Some(ref last) = self.last_key {
            if &key <= last {
                return Ok(());
            }
        }

        self.peeked = None;

        let Some((index, block_offset, block_size)) = self.handle.block_index.get_block_for(&key)
        else {
            // key is beyond all blocks - exhaust the iterator
            self.current = None;
            self.next_index_pos = usize::MAX;
            return Ok(());
        };

        self.load_block(index, block_offset, block_size).await?;

        // advance within the block until we reach or pass the seek key
        loop {
            let Some(iter) = self.current.as_mut() else {
                break;
            };
            match iter.next() {
                None => {
                    self.current = None;
                    break;
                }
                Some(entry) => {
                    if entry.0.compare_logical(&key).is_ge() {
                        // stash this entry so next() returns it first
                        self.last_key = Some(entry.0.clone());
                        self.peeked = Some(entry);
                        break;
                    }
                }
            }
        }

        Ok(())
    }
}