slatedb 0.10.0

A cloud native embedded storage engine built on object storage.
Documentation
use bytes::Bytes;

/// Represents a key-value pair known not to be a tombstone.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct KeyValue {
    pub key: Bytes,
    pub value: Bytes,
}

impl<K, V> From<(&K, &V)> for KeyValue
where
    K: AsRef<[u8]>,
    V: AsRef<[u8]>,
{
    fn from(record: (&K, &V)) -> Self {
        let key = Bytes::copy_from_slice(record.0.as_ref());
        let value = Bytes::copy_from_slice(record.1.as_ref());
        KeyValue { key, value }
    }
}

/// Represents a key-value pair that may be a tombstone.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct RowEntry {
    pub(crate) key: Bytes,
    pub(crate) value: ValueDeletable,
    pub(crate) seq: u64,
    pub(crate) create_ts: Option<i64>,
    pub(crate) expire_ts: Option<i64>,
}

impl RowEntry {
    pub(crate) fn new(
        key: Bytes,
        value: ValueDeletable,
        seq: u64,
        create_ts: Option<i64>,
        expire_ts: Option<i64>,
    ) -> Self {
        Self {
            key,
            value,
            seq,
            create_ts,
            expire_ts,
        }
    }

    pub fn estimated_size(&self) -> usize {
        let mut size = self.key.len() + self.value.len();
        // Add size for sequence number
        size += std::mem::size_of::<u64>();
        // Add size for timestamps
        if self.create_ts.is_some() {
            size += std::mem::size_of::<i64>();
        }
        if self.expire_ts.is_some() {
            size += std::mem::size_of::<i64>();
        }
        size
    }

    /// Returns the encoded size of this entry when written to an SST block.
    /// The `key_prefix_len` is the number of bytes shared with the block's first key.
    pub(crate) fn encoded_size(&self, key_prefix_len: usize) -> usize {
        let key_suffix_len = self.key.len() - key_prefix_len;
        let mut size = std::mem::size_of::<u16>() // key_prefix_len
            + std::mem::size_of::<u16>() // key_suffix_len
            + key_suffix_len
            + std::mem::size_of::<u64>() // seq
            + std::mem::size_of::<u8>(); // flags

        if self.expire_ts.is_some() {
            size += std::mem::size_of::<i64>();
        }
        if self.create_ts.is_some() {
            size += std::mem::size_of::<i64>();
        }
        if !self.value.is_tombstone() {
            size += std::mem::size_of::<u32>(); // value_len
            size += self.value.len();
        }
        size
    }

    #[cfg(test)]
    pub(crate) fn new_value(key: &[u8], value: &[u8], seq: u64) -> Self {
        Self {
            key: Bytes::copy_from_slice(key),
            value: ValueDeletable::Value(Bytes::copy_from_slice(value)),
            seq,
            create_ts: None,
            expire_ts: None,
        }
    }

    #[cfg(test)]
    pub(crate) fn new_merge(key: &[u8], value: &[u8], seq: u64) -> Self {
        Self {
            key: Bytes::copy_from_slice(key),
            value: ValueDeletable::Merge(Bytes::copy_from_slice(value)),
            seq,
            create_ts: None,
            expire_ts: None,
        }
    }

    #[cfg(test)]
    pub(crate) fn new_tombstone(key: &[u8], seq: u64) -> Self {
        Self {
            key: Bytes::copy_from_slice(key),
            value: ValueDeletable::Tombstone,
            seq,
            create_ts: None,
            expire_ts: None,
        }
    }

    #[cfg(test)]
    pub(crate) fn with_create_ts(&self, create_ts: i64) -> Self {
        Self {
            key: self.key.clone(),
            value: self.value.clone(),
            seq: self.seq,
            create_ts: Some(create_ts),
            expire_ts: self.expire_ts,
        }
    }

    #[cfg(test)]
    pub(crate) fn with_expire_ts(&self, expire_ts: i64) -> Self {
        Self {
            key: self.key.clone(),
            value: self.value.clone(),
            seq: self.seq,
            create_ts: self.create_ts,
            expire_ts: Some(expire_ts),
        }
    }
}

/// The metadata associated with a `KeyValueDeletable`
/// TODO: can be removed
#[cfg(test)]
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct RowAttributes {
    pub(crate) ts: Option<i64>,
    pub(crate) expire_ts: Option<i64>,
}

/// Represents a value that may be a tombstone.
/// Equivalent to `Option<Bytes>`, but used internally
/// to prevent type confusion between `None` indicating
/// that a key does not exist, and `Tombstone` indicating
/// that the key exists but has a tombstone value.
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum ValueDeletable {
    Value(Bytes),
    Merge(Bytes),
    Tombstone,
}

impl ValueDeletable {
    pub(crate) fn len(&self) -> usize {
        match self {
            ValueDeletable::Value(v) | ValueDeletable::Merge(v) => v.len(),
            ValueDeletable::Tombstone => 0,
        }
    }

    pub fn is_tombstone(&self) -> bool {
        matches!(self, ValueDeletable::Tombstone)
    }

    pub fn as_bytes(&self) -> Option<Bytes> {
        match self {
            ValueDeletable::Value(v) | ValueDeletable::Merge(v) => Some(v.clone()),
            ValueDeletable::Tombstone => None,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::row_codec::SstRowEntry;
    use rstest::rstest;

    // Tombstone encoding:
    //   key_prefix_len (u16) + key_suffix_len (u16) + key_suffix + seq (u64) + flags (u8)
    // = 2 + 2 + key_suffix_len + 8 + 1
    // = 13 + key_suffix_len
    #[rstest]
    #[case(0, 18)] // key_suffix_len=5: 13 + 5 = 18
    #[case(2, 16)] // key_suffix_len=3: 13 + 3 = 16
    #[case(4, 14)] // key_suffix_len=1: 13 + 1 = 14
    fn encoded_size_tombstone(#[case] prefix_len: usize, #[case] expected: usize) {
        let entry = RowEntry::new_tombstone(b"hello", 1);
        assert_eq!(entry.encoded_size(prefix_len), expected);
    }

    // Value encoding:
    //   tombstone encoding + value_len (u32) + value
    // = 13 + key_suffix_len + 4 + value_len
    // = 17 + key_suffix_len + value_len
    #[rstest]
    #[case(0, 25)] // key_suffix_len=5, value_len=3: 17 + 5 + 3 = 25
    #[case(2, 23)] // key_suffix_len=3, value_len=3: 17 + 3 + 3 = 23
    #[case(4, 21)] // key_suffix_len=1, value_len=3: 17 + 1 + 3 = 21
    fn encoded_size_value(#[case] prefix_len: usize, #[case] expected: usize) {
        let entry = RowEntry::new_value(b"hello", b"val", 1);
        assert_eq!(entry.encoded_size(prefix_len), expected);
    }

    #[test]
    fn encoded_size_matches_sst_row_entry() {
        let entry = RowEntry::new_value(b"prefixkey", b"value", 1);
        let sst_entry = SstRowEntry::new(
            6,
            Bytes::from("key"),
            1,
            ValueDeletable::Value(Bytes::from("value")),
            None,
            None,
        );
        assert_eq!(entry.encoded_size(6), sst_entry.size());
    }
}