tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use bytes::{Buf, BufMut, Bytes, BytesMut};
use integer_encoding::VarInt;
use tempest_core::utils::PrettyBytes;

use crate::base::{KeyKind, KeyTrailer, SeqNum};

/// # Write Batch
///
/// This defines an aggregation of different write operations into the silo.
///
/// Every batch starts with a header of the following layout:
///
/// ```not_rust
/// +-------------+------------+--- ... ---+
/// | SeqNum (8B) | Count (4B) |  Entries  |
/// +-------------+------------+--- ... ---+
/// ```
///
/// As you can see, after the 12 byte header, there can be up to `u32::MAX` (Count) **Entries**.
///
///
/// There are different entry types, each identified by the [`KeyKind`].
/// Every entry is encoded as the kind, followed by 1 or 2 **varstrings**, depending on the kind.
/// A varstring is a length-prefixed string, where the length itself is also encoded as a varint.
///
/// ```not_rust
/// +-----------+-----------------+-------------------+
/// | Kind (1B) | Key (varstring) | Value (varstring) |
/// +-----------+-----------------+-------------------+
/// ```
///
/// "Key -> Value" is not always a good description. While [`KeyKind::Put`] does set a `Key` to a
/// `Value`, other operations, like range deletions, actually just have two keys as their params.
///
/// The following table shows the format for entries of each [`KeyKind`]:
///
/// ```not_rust
/// Delete      varstring
/// Put         varstring   varstring
// -- TODO --
// RangeDelete
// merging operations
/// ```
#[derive(derive_more::Debug)]
#[debug("WriteBatch(len={}, count={}, committed={})", buf.len(), count, committed)]
pub struct WriteBatch {
    buf: BytesMut,
    count: u32,
    committed: bool,
}

impl WriteBatch {
    /// Create a new write batch.
    pub fn new() -> Self {
        let mut buf = BytesMut::with_capacity(4096);
        buf.put_bytes(0, 12);
        Self {
            buf,
            count: 0,
            committed: false,
        }
    }

    pub fn from_wal(buf: BytesMut) -> Self {
        let count = u32::from_le_bytes(buf[8..12].try_into().unwrap());
        Self {
            // contains our checked data
            buf,
            count,
            // this comes from the wal, so it has already been committed
            committed: true,
        }
    }

    /// Add a `put KEY=VALUE` command to this batch.
    pub fn put(&mut self, key: &[u8], value: &[u8]) -> &mut Self {
        trace!(
            key = ?PrettyBytes(key), value = ?PrettyBytes(value),
            "write batch: put",
        );
        self.count += 1;
        self.put_varint(key.len());
        self.buf.put(key);
        self.buf.put_u8(KeyKind::Put.into());
        self.put_varint(value.len());
        self.buf.put(value);
        self
    }

    /// Add a `delete KEY` command to this batch.
    pub fn delete(&mut self, key: &[u8]) -> &mut Self {
        trace!(
            key = ?PrettyBytes(key),
            "write batch: delete",
        );
        self.count += 1;
        self.put_varint(key.len());
        self.buf.put(key);
        self.buf.put_u8(KeyKind::Delete.into());
        self
    }

    pub(crate) fn put_varint(&mut self, i: usize) {
        // reserve space
        let varint_size = i.required_space();
        self.buf.put_bytes(0, varint_size);
        // encode varint
        let buflen = self.buf.len();
        let written = i.encode_var(&mut self.buf[buflen - varint_size..]);
        debug_assert_eq!(written, varint_size);
    }

    pub(crate) fn commit(&mut self, seqnum: SeqNum) {
        assert!(!self.committed);
        self.buf[0..8].copy_from_slice(&seqnum.get().to_le_bytes());
        self.buf[8..12].copy_from_slice(&self.count.to_le_bytes());
        self.committed = true;
    }

    pub(crate) fn seqnum(buf: &[u8]) -> SeqNum {
        let seqnum_raw = u64::from_le_bytes(buf[0..8].try_into().unwrap());
        SeqNum::new(seqnum_raw).expect("batch contained invalid seqnum")
    }

    pub(crate) fn into_iter(self) -> WriteBatchIterator {
        WriteBatchIterator::new(self.buf.freeze())
    }

    pub(crate) fn count(&self) -> u32 {
        self.count
    }

    pub(crate) fn buf(&self) -> &[u8] {
        assert!(
            self.committed,
            "trying to access a write batch, that was not assigned a seqnum"
        );
        &self.buf
    }
}

pub(crate) struct WriteBatchIterator {
    seqnum: SeqNum,
    buf: Bytes,
}

impl WriteBatchIterator {
    pub(crate) fn new(mut buf: Bytes) -> Self {
        let seqnum_raw = u64::from_le_bytes(buf[0..8].try_into().unwrap());
        let seqnum = SeqNum::new(seqnum_raw).expect("batch contained invalid seqnum");
        buf.advance(12); // advance past header (seqnum and count)
        Self { seqnum, buf }
    }
}

impl Iterator for WriteBatchIterator {
    type Item = (Bytes, KeyTrailer, Bytes);

    fn next(&mut self) -> Option<Self::Item> {
        if self.buf.is_empty() {
            return None;
        }
        let (key_len, varint_len) =
            usize::decode_var(&self.buf).expect("invalid key len varint in write batch buf");
        self.buf.advance(varint_len);
        let key = self.buf.split_to(key_len);

        let key_kind = self.buf[0];
        self.buf.advance(1);
        let kind = KeyKind::try_from(key_kind).expect("invalid key kind in write batch buf");

        let value = match kind {
            KeyKind::Delete => Bytes::new(),
            KeyKind::Put => {
                let (value_len, varint_len) = usize::decode_var(&self.buf)
                    .expect("invalid value len varint in write batch buf");
                self.buf.advance(varint_len);
                let value = self.buf.split_to(value_len);

                value
            }
        };

        Some((key, KeyTrailer::new(self.seqnum, kind), value))
    }
}