lsm_tree/
value.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    coding::{Decode, DecodeError, Encode, EncodeError},
7    key::InternalKey,
8    segment::block::ItemSize,
9    Slice,
10};
11use std::io::{Read, Write};
12use varint_rs::{VarintReader, VarintWriter};
13
14/// User defined key
15pub type UserKey = Slice;
16
17/// User defined data (blob of bytes)
18#[allow(clippy::module_name_repetitions)]
19pub type UserValue = Slice;
20
21/// Sequence number - a monotonically increasing counter
22///
23/// Values with the same seqno are part of the same batch.
24///
25/// A value with a higher sequence number shadows an item with the
26/// same key and lower sequence number. This enables MVCC.
27///
28/// Stale items are lazily garbage-collected during compaction.
29pub type SeqNo = u64;
30
31/// Value type (regular value or tombstone)
32#[derive(Copy, Clone, Debug, Eq, PartialEq)]
33#[allow(clippy::module_name_repetitions)]
34pub enum ValueType {
35    /// Existing value
36    Value,
37
38    /// Deleted value
39    Tombstone,
40
41    /// "Weak" deletion (a.k.a. `SingleDelete` in `RocksDB`)
42    WeakTombstone,
43}
44
45impl TryFrom<u8> for ValueType {
46    type Error = ();
47
48    fn try_from(value: u8) -> Result<Self, Self::Error> {
49        match value {
50            0 => Ok(Self::Value),
51            1 => Ok(Self::Tombstone),
52            2 => Ok(Self::WeakTombstone),
53            _ => Err(()),
54        }
55    }
56}
57
58impl From<ValueType> for u8 {
59    fn from(value: ValueType) -> Self {
60        match value {
61            ValueType::Value => 0,
62            ValueType::Tombstone => 1,
63            ValueType::WeakTombstone => 2,
64        }
65    }
66}
67
68/// Internal representation of KV pairs
69#[allow(clippy::module_name_repetitions)]
70#[derive(Clone, Eq, PartialEq)]
71pub struct InternalValue {
72    /// Internal key
73    pub key: InternalKey,
74
75    /// User-defined value - an arbitrary byte array
76    ///
77    /// Supports up to 2^32 bytes
78    pub value: UserValue,
79}
80
81impl InternalValue {
82    /// Creates a new [`Value`].
83    ///
84    /// # Panics
85    ///
86    /// Panics if the key length is empty or greater than 2^16, or the value length is greater than 2^32.
87    pub fn new<V: Into<UserValue>>(key: InternalKey, value: V) -> Self {
88        let value = value.into();
89
90        assert!(!key.user_key.is_empty(), "key may not be empty");
91        assert!(
92            u32::try_from(value.len()).is_ok(),
93            "values can be 2^32 bytes in length"
94        );
95
96        Self { key, value }
97    }
98
99    /// Creates a new [`Value`].
100    ///
101    /// # Panics
102    ///
103    /// Panics if the key length is empty or greater than 2^16, or the value length is greater than 2^32.
104    pub fn from_components<K: Into<UserKey>, V: Into<UserValue>>(
105        user_key: K,
106        value: V,
107        seqno: SeqNo,
108        value_type: ValueType,
109    ) -> Self {
110        let key = InternalKey::new(user_key, seqno, value_type);
111        Self::new(key, value)
112    }
113
114    /// Creates a new tombstone.
115    ///
116    /// # Panics
117    ///
118    /// Panics if the key length is empty or greater than 2^16.
119    pub fn new_tombstone<K: Into<UserKey>>(key: K, seqno: u64) -> Self {
120        let key = InternalKey::new(key, seqno, ValueType::Tombstone);
121        Self::new(key, vec![])
122    }
123
124    /// Creates a new weak tombstone.
125    ///
126    /// # Panics
127    ///
128    /// Panics if the key length is empty or greater than 2^16.
129    pub fn new_weak_tombstone<K: Into<UserKey>>(key: K, seqno: u64) -> Self {
130        let key = InternalKey::new(key, seqno, ValueType::WeakTombstone);
131        Self::new(key, vec![])
132    }
133
134    #[doc(hidden)]
135    #[must_use]
136    pub fn is_tombstone(&self) -> bool {
137        self.key.is_tombstone()
138    }
139}
140
141impl ItemSize for InternalValue {
142    fn size(&self) -> usize {
143        std::mem::size_of::<SeqNo>()
144            + std::mem::size_of::<ValueType>()
145            + self.key.user_key.len()
146            + self.value.len()
147    }
148}
149
150impl std::fmt::Debug for InternalValue {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        write!(
153            f,
154            "{:?} => {:?}",
155            self.key,
156            if self.value.len() >= 64 {
157                format!("[ ... {} bytes ]", self.value.len())
158            } else {
159                format!("{:?}", self.value)
160            }
161        )
162    }
163}
164
165impl Encode for InternalValue {
166    fn encode_into<W: Write>(&self, writer: &mut W) -> Result<(), EncodeError> {
167        self.key.encode_into(writer)?;
168
169        // NOTE: Only write value len + value if we are actually a value
170        if !self.is_tombstone() {
171            // NOTE: We know values are limited to 32-bit length
172            #[allow(clippy::cast_possible_truncation)]
173            writer.write_u32_varint(self.value.len() as u32)?;
174            writer.write_all(&self.value)?;
175        }
176
177        Ok(())
178    }
179}
180
181impl Decode for InternalValue {
182    fn decode_from<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
183        let key = InternalKey::decode_from(reader)?;
184
185        if key.is_tombstone() {
186            Ok(Self {
187                key,
188                // TODO: Slice::empty()
189                value: vec![].into(),
190            })
191        } else {
192            // NOTE: Only read value if we are actually a value
193
194            let value_len = reader.read_u32_varint()?;
195            let value = Slice::from_reader(reader, value_len as usize)?;
196
197            Ok(Self { key, value })
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use std::io::Cursor;
206    use test_log::test;
207
208    #[test]
209    fn pik_cmp_user_key() {
210        let a = InternalKey::new(*b"a", 0, ValueType::Value);
211        let b = InternalKey::new(*b"b", 0, ValueType::Value);
212        assert!(a < b);
213    }
214
215    #[test]
216    fn pik_cmp_seqno() {
217        let a = InternalKey::new(*b"a", 0, ValueType::Value);
218        let b = InternalKey::new(*b"a", 1, ValueType::Value);
219        assert!(a > b);
220    }
221
222    #[test]
223    fn value_raw() -> crate::Result<()> {
224        // Create an empty Value instance
225        let value =
226            InternalValue::from_components(vec![1, 2, 3], vec![3, 2, 1], 1, ValueType::Value);
227
228        #[rustfmt::skip]
229        let bytes = [
230            // Seqno
231            1,
232            
233            // Type
234            0,
235
236            // User key
237            3, 1, 2, 3,
238            
239            // User value
240            3, 3, 2, 1,
241        ];
242
243        // Deserialize the empty Value
244        let deserialized = InternalValue::decode_from(&mut Cursor::new(bytes))?;
245
246        // Check if deserialized Value is equivalent to the original empty Value
247        assert_eq!(value, deserialized);
248
249        Ok(())
250    }
251
252    #[test]
253    fn value_empty_value() -> crate::Result<()> {
254        // Create an empty Value instance
255        let value = InternalValue::from_components(vec![1, 2, 3], vec![], 42, ValueType::Value);
256
257        // Serialize the empty Value
258        let mut serialized = Vec::new();
259        value.encode_into(&mut serialized)?;
260
261        // Deserialize the empty Value
262        let deserialized = InternalValue::decode_from(&mut &serialized[..])?;
263
264        // Check if deserialized Value is equivalent to the original empty Value
265        assert_eq!(value, deserialized);
266
267        Ok(())
268    }
269
270    #[test]
271    fn value_with_value() -> crate::Result<()> {
272        // Create an empty Value instance
273        let value = InternalValue::from_components(
274            vec![1, 2, 3],
275            vec![6, 2, 6, 2, 7, 5, 7, 8, 98],
276            42,
277            ValueType::Value,
278        );
279
280        // Serialize the empty Value
281        let mut serialized = Vec::new();
282        value.encode_into(&mut serialized)?;
283
284        // Deserialize the empty Value
285        let deserialized = InternalValue::decode_from(&mut &serialized[..])?;
286
287        // Check if deserialized Value is equivalent to the original empty Value
288        assert_eq!(value, deserialized);
289
290        Ok(())
291    }
292}