lsm-tree 3.1.4

A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)
Documentation
// Copyright (c) 2025-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{
    coding::{Decode, Encode},
    table::{
        block::{BlockOffset, Decodable, Encodable, TRAILER_START_MARKER},
        index_block::IndexBlockParsedItem,
        util::SliceIndexes,
    },
};
use crate::{SeqNo, UserKey};
use byteorder::{ReadBytesExt, WriteBytesExt};
use std::io::{Cursor, Seek};
use varint_rs::{VarintReader, VarintWriter};

/// Points to a block on file
#[derive(Copy, Clone, Debug, Default)]
pub struct BlockHandle {
    /// Position of block in file
    offset: BlockOffset,

    /// Size of block in bytes
    size: u32,
}

impl BlockHandle {
    #[must_use]
    pub fn new(offset: BlockOffset, size: u32) -> Self {
        Self { offset, size }
    }

    #[must_use]
    pub fn size(&self) -> u32 {
        self.size
    }

    #[must_use]
    pub fn offset(&self) -> BlockOffset {
        self.offset
    }
}

impl Encode for BlockHandle {
    fn encode_into<W: std::io::Write>(&self, writer: &mut W) -> Result<(), crate::Error> {
        writer.write_u64_varint(*self.offset)?;
        writer.write_u32_varint(self.size)?;
        Ok(())
    }
}

impl Decode for BlockHandle {
    fn decode_from<R: std::io::Read>(reader: &mut R) -> Result<Self, crate::Error>
    where
        Self: Sized,
    {
        let offset = reader.read_u64_varint()?;
        let size = reader.read_u32_varint()?;

        Ok(Self {
            offset: BlockOffset(offset),
            size,
        })
    }
}

/// Points to a block on file
#[derive(Clone, Debug)]
pub struct KeyedBlockHandle {
    /// Key of last item in block
    end_key: UserKey,

    /// Seqno of last item in block
    seqno: SeqNo,

    inner: BlockHandle,
}

impl AsRef<BlockHandle> for KeyedBlockHandle {
    fn as_ref(&self) -> &BlockHandle {
        &self.inner
    }
}

impl KeyedBlockHandle {
    #[must_use]
    pub fn into_inner(self) -> BlockHandle {
        self.inner
    }

    #[must_use]
    pub fn new(end_key: UserKey, seqno: SeqNo, handle: BlockHandle) -> Self {
        Self {
            end_key,
            seqno,
            inner: handle,
        }
    }

    #[must_use]
    pub fn seqno(&self) -> SeqNo {
        self.seqno
    }

    pub fn shift(&mut self, delta: BlockOffset) {
        self.inner.offset += delta;
    }

    #[must_use]
    pub fn size(&self) -> u32 {
        self.inner.size()
    }

    #[must_use]
    pub fn offset(&self) -> BlockOffset {
        self.inner.offset()
    }

    #[must_use]
    pub fn end_key(&self) -> &UserKey {
        &self.end_key
    }
}

#[cfg(test)]
impl PartialEq for KeyedBlockHandle {
    fn eq(&self, other: &Self) -> bool {
        self.offset() == other.offset()
    }
}

impl Encodable<BlockOffset> for KeyedBlockHandle {
    fn encode_full_into<W: std::io::Write>(
        &self,
        writer: &mut W,
        state: &mut BlockOffset,
    ) -> crate::Result<()> {
        // We encode restart markers as:
        // [marker=0] [offset] [size] [seqno] [key len] [end key]
        // 1          2        3      4       5         6

        writer.write_u8(0)?; // 1

        self.inner.encode_into(writer)?; // 2, 3

        unwrap!(writer.write_u64_varint(self.seqno)); // 4

        #[expect(clippy::cast_possible_truncation, reason = "keys are u16 long max")]
        writer.write_u16_varint(self.end_key.len() as u16)?; // 5
        writer.write_all(&self.end_key)?; // 6

        *state = BlockOffset(*self.offset() + u64::from(self.size()));

        Ok(())
    }

    // TODO: see https://github.com/fjall-rs/lsm-tree/issues/184
    #[cfg_attr(coverage_nightly, coverage(off))]
    fn encode_truncated_into<W: std::io::Write>(
        &self,
        _writer: &mut W,
        _state: &mut BlockOffset,
        _shared_len: usize,
    ) -> crate::Result<()> {
        unimplemented!()
    }

    fn key(&self) -> &[u8] {
        &self.end_key
    }
}

impl Decodable<IndexBlockParsedItem> for KeyedBlockHandle {
    fn parse_full(reader: &mut Cursor<&[u8]>, offset: usize) -> Option<IndexBlockParsedItem> {
        let marker = unwrap!(reader.read_u8());

        if marker == TRAILER_START_MARKER {
            return None;
        }

        let handle = unwrap!(BlockHandle::decode_from(reader));
        let seqno = unwrap!(reader.read_u64_varint());

        let key_len: usize = unwrap!(reader.read_u16_varint()).into();
        #[expect(
            clippy::cast_possible_truncation,
            reason = "blocks tend to be some megabytes in size at most, so position should fit into usize"
        )]
        let key_start = offset + reader.position() as usize;

        #[expect(
            clippy::cast_possible_wrap,
            reason = "key_len is bounded by u16::MAX, no wrap expected"
        )]
        let offset_i64 = key_len as i64;
        unwrap!(reader.seek_relative(offset_i64));

        Some(IndexBlockParsedItem {
            prefix: None,
            end_key: SliceIndexes(key_start, key_start + key_len),
            offset: handle.offset(),
            size: handle.size(),
            seqno,
        })
    }

    fn parse_restart_key<'a>(
        reader: &mut Cursor<&[u8]>,
        offset: usize,
        data: &'a [u8],
    ) -> Option<(&'a [u8], SeqNo)> {
        let marker = unwrap!(reader.read_u8());

        if marker == TRAILER_START_MARKER {
            return None;
        }

        let _file_offset = unwrap!(reader.read_u64_varint());
        let _size = unwrap!(reader.read_u32_varint());
        let seqno = unwrap!(reader.read_u64_varint());

        let key_len: usize = unwrap!(reader.read_u16_varint()).into();
        #[expect(
            clippy::cast_possible_truncation,
            reason = "blocks tend to be some megabytes in size at most, so position should fit into usize"
        )]
        let key_start = offset + reader.position() as usize;

        #[expect(
            clippy::cast_possible_wrap,
            reason = "key_len is bounded by u16::MAX, no wrap expected"
        )]
        let key_len_i64 = key_len as i64;
        unwrap!(reader.seek_relative(key_len_i64));

        let key = data.get(key_start..(key_start + key_len));

        key.map(|k| (k, seqno))
    }

    // TODO: see https://github.com/fjall-rs/lsm-tree/issues/184
    #[cfg_attr(coverage_nightly, coverage(off))]
    fn parse_truncated(
        _reader: &mut Cursor<&[u8]>,
        _offset: usize,
        _base_key_offset: usize,
    ) -> Option<IndexBlockParsedItem> {
        unimplemented!()
    }
}