lsm_tree/
value.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    coding::{Decode, DecodeError, Encode, EncodeError},
7    key::InternalKey,
8    segment::block::ItemSize,
9    Slice,
10};
11use std::io::{Read, Write};
12use varint_rs::{VarintReader, VarintWriter};
13
14/// User defined key
15pub type UserKey = Slice;
16
17/// User defined data (blob of bytes)
18#[allow(clippy::module_name_repetitions)]
19pub type UserValue = Slice;
20
21/// Sequence number - a monotonically increasing counter
22///
23/// Values with the same seqno are part of the same batch.
24///
25/// A value with a higher sequence number shadows an item with the
26/// same key and lower sequence number. This enables MVCC.
27///
28/// Stale items are lazily garbage-collected during compaction.
29pub type SeqNo = u64;
30
31/// Value type (regular value or tombstone)
32#[derive(Copy, Clone, Debug, Eq, PartialEq)]
33#[allow(clippy::module_name_repetitions)]
34pub enum ValueType {
35    /// Existing value
36    Value,
37
38    /// Deleted value
39    Tombstone,
40
41    /// "Weak" deletion (a.k.a. `SingleDelete` in `RocksDB`)
42    WeakTombstone,
43}
44
45impl TryFrom<u8> for ValueType {
46    type Error = ();
47
48    fn try_from(value: u8) -> Result<Self, Self::Error> {
49        match value {
50            0 => Ok(Self::Value),
51            1 => Ok(Self::Tombstone),
52            2 => Ok(Self::WeakTombstone),
53            _ => Err(()),
54        }
55    }
56}
57
58impl From<ValueType> for u8 {
59    fn from(value: ValueType) -> Self {
60        match value {
61            ValueType::Value => 0,
62            ValueType::Tombstone => 1,
63            ValueType::WeakTombstone => 2,
64        }
65    }
66}
67
68/// Internal representation of KV pairs
69#[allow(clippy::module_name_repetitions)]
70#[derive(Clone, Eq, PartialEq)]
71pub struct InternalValue {
72    /// Internal key
73    pub key: InternalKey,
74
75    /// User-defined value - an arbitrary byte array
76    ///
77    /// Supports up to 2^32 bytes
78    pub value: UserValue,
79}
80
81impl InternalValue {
82    /// Creates a new [`Value`].
83    ///
84    /// # Panics
85    ///
86    /// Panics if the key length is empty or greater than 2^16, or the value length is greater than 2^32.
87    pub fn new<V: Into<UserValue>>(key: InternalKey, value: V) -> Self {
88        let value = value.into();
89
90        assert!(!key.user_key.is_empty(), "key may not be empty");
91        assert!(
92            u32::try_from(value.len()).is_ok(),
93            "values can be 2^32 bytes in length"
94        );
95
96        Self { key, value }
97    }
98
99    /// Creates a new [`Value`].
100    ///
101    /// # Panics
102    ///
103    /// Panics if the key length is empty or greater than 2^16, or the value length is greater than 2^32.
104    pub fn from_components<K: Into<UserKey>, V: Into<UserValue>>(
105        user_key: K,
106        value: V,
107        seqno: SeqNo,
108        value_type: ValueType,
109    ) -> Self {
110        let key = InternalKey::new(user_key, seqno, value_type);
111        Self::new(key, value)
112    }
113
114    /// Creates a new tombstone.
115    ///
116    /// # Panics
117    ///
118    /// Panics if the key length is empty or greater than 2^16.
119    pub fn new_tombstone<K: Into<UserKey>>(key: K, seqno: u64) -> Self {
120        let key = InternalKey::new(key, seqno, ValueType::Tombstone);
121        Self::new(key, vec![])
122    }
123
124    /// Creates a new weak tombstone.
125    ///
126    /// # Panics
127    ///
128    /// Panics if the key length is empty or greater than 2^16.
129    pub fn new_weak_tombstone<K: Into<UserKey>>(key: K, seqno: u64) -> Self {
130        let key = InternalKey::new(key, seqno, ValueType::WeakTombstone);
131        Self::new(key, vec![])
132    }
133
134    #[doc(hidden)]
135    #[must_use]
136    pub fn is_tombstone(&self) -> bool {
137        self.key.is_tombstone()
138    }
139}
140
141impl ItemSize for InternalValue {
142    fn size(&self) -> usize {
143        std::mem::size_of::<SeqNo>()
144            + std::mem::size_of::<ValueType>()
145            + self.key.user_key.len()
146            + self.value.len()
147    }
148}
149
150impl std::fmt::Debug for InternalValue {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        write!(
153            f,
154            "{:?} => {:?}",
155            self.key,
156            if self.value.len() >= 64 {
157                format!("[ ... {} bytes ]", self.value.len())
158            } else {
159                format!("{:?}", self.value)
160            }
161        )
162    }
163}
164
165impl Encode for InternalValue {
166    fn encode_into<W: Write>(&self, writer: &mut W) -> Result<(), EncodeError> {
167        self.key.encode_into(writer)?;
168
169        // NOTE: Only write value len + value if we are actually a value
170        if !self.is_tombstone() {
171            // NOTE: We know values are limited to 32-bit length
172            #[allow(clippy::cast_possible_truncation)]
173            writer.write_u32_varint(self.value.len() as u32)?;
174            writer.write_all(&self.value)?;
175        }
176
177        Ok(())
178    }
179}
180
181impl Decode for InternalValue {
182    fn decode_from<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
183        let key = InternalKey::decode_from(reader)?;
184
185        if key.is_tombstone() {
186            Ok(Self {
187                key,
188                value: UserValue::empty(),
189            })
190        } else {
191            // NOTE: Only read value if we are actually a value
192
193            let value_len = reader.read_u32_varint()?;
194            let value = UserValue::from_reader(reader, value_len as usize)?;
195
196            Ok(Self { key, value })
197        }
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use std::io::Cursor;
205    use test_log::test;
206
207    #[test]
208    fn pik_cmp_user_key() {
209        let a = InternalKey::new(*b"a", 0, ValueType::Value);
210        let b = InternalKey::new(*b"b", 0, ValueType::Value);
211        assert!(a < b);
212    }
213
214    #[test]
215    fn pik_cmp_seqno() {
216        let a = InternalKey::new(*b"a", 0, ValueType::Value);
217        let b = InternalKey::new(*b"a", 1, ValueType::Value);
218        assert!(a > b);
219    }
220
221    #[test]
222    fn value_raw() -> crate::Result<()> {
223        // Create an empty Value instance
224        let value =
225            InternalValue::from_components(vec![1, 2, 3], vec![3, 2, 1], 1, ValueType::Value);
226
227        #[rustfmt::skip]
228        let bytes = [
229            // Seqno
230            1,
231            
232            // Type
233            0,
234
235            // User key
236            3, 1, 2, 3,
237            
238            // User value
239            3, 3, 2, 1,
240        ];
241
242        // Deserialize the empty Value
243        let deserialized = InternalValue::decode_from(&mut Cursor::new(bytes))?;
244
245        // Check if deserialized Value is equivalent to the original empty Value
246        assert_eq!(value, deserialized);
247
248        Ok(())
249    }
250
251    #[test]
252    fn value_empty_value() -> crate::Result<()> {
253        // Create an empty Value instance
254        let value = InternalValue::from_components(vec![1, 2, 3], vec![], 42, ValueType::Value);
255
256        // Serialize the empty Value
257        let mut serialized = Vec::new();
258        value.encode_into(&mut serialized)?;
259
260        // Deserialize the empty Value
261        let deserialized = InternalValue::decode_from(&mut &serialized[..])?;
262
263        // Check if deserialized Value is equivalent to the original empty Value
264        assert_eq!(value, deserialized);
265
266        Ok(())
267    }
268
269    #[test]
270    fn value_with_value() -> crate::Result<()> {
271        // Create an empty Value instance
272        let value = InternalValue::from_components(
273            vec![1, 2, 3],
274            vec![6, 2, 6, 2, 7, 5, 7, 8, 98],
275            42,
276            ValueType::Value,
277        );
278
279        // Serialize the empty Value
280        let mut serialized = Vec::new();
281        value.encode_into(&mut serialized)?;
282
283        // Deserialize the empty Value
284        let deserialized = InternalValue::decode_from(&mut &serialized[..])?;
285
286        // Check if deserialized Value is equivalent to the original empty Value
287        assert_eq!(value, deserialized);
288
289        Ok(())
290    }
291}