Skip to main content

slatedb/db_cache/
serde.rs

1//! Serialize/Deserialize impls for CachedKey and CachedEntry to support DbCache impls that
2//! can store cache items on-disk. Internally, the Serialize/Deserialize impls work by
3//! converting these types to representations that derive Serialize/Deserialize. The purpose
4//! of the indirection is to decouple the serialized format from the in-memory representation
5//! used by the rest of the codebase.
6
7use crate::db_cache::{CachedEntry, CachedItem, CachedKey};
8use crate::db_state::SsTableId;
9use crate::error::SlateDBError;
10use crate::filter::BloomFilter;
11use crate::flatbuffer_types::SsTableIndexOwned;
12use crate::format::block::Block;
13use crate::sst_stats::SstStats;
14use bytes::Bytes;
15use serde::de::Error;
16use serde::{Deserialize, Deserializer, Serialize, Serializer};
17use std::sync::Arc;
18use ulid::Ulid;
19
20#[derive(Serialize, Deserialize)]
21enum SerializedSsTableId {
22    Wal(u64),
23    Compacted(Ulid),
24}
25
26impl From<SerializedSsTableId> for SsTableId {
27    fn from(value: SerializedSsTableId) -> Self {
28        match value {
29            SerializedSsTableId::Wal(id) => SsTableId::Wal(id),
30            SerializedSsTableId::Compacted(id) => SsTableId::Compacted(id),
31        }
32    }
33}
34
35impl From<SsTableId> for SerializedSsTableId {
36    fn from(value: SsTableId) -> Self {
37        match value {
38            SsTableId::Wal(id) => SerializedSsTableId::Wal(id),
39            SsTableId::Compacted(id) => SerializedSsTableId::Compacted(id),
40        }
41    }
42}
43
44#[derive(Serialize, Deserialize)]
45enum SerializedCachedKey {
46    V1(SerializedSsTableId, u64),
47    V2(u64, SerializedSsTableId, u64),
48}
49
50impl From<SerializedCachedKey> for CachedKey {
51    fn from(value: SerializedCachedKey) -> Self {
52        match value {
53            SerializedCachedKey::V1(sst_id, block_id) => CachedKey {
54                scope_id: 0,
55                sst_id: sst_id.into(),
56                block_id,
57            },
58            SerializedCachedKey::V2(scope_id, sst_id, block_id) => CachedKey {
59                scope_id,
60                sst_id: sst_id.into(),
61                block_id,
62            },
63        }
64    }
65}
66
67impl From<CachedKey> for SerializedCachedKey {
68    fn from(value: CachedKey) -> Self {
69        SerializedCachedKey::V2(value.scope_id, value.sst_id.into(), value.block_id)
70    }
71}
72
73impl Serialize for CachedKey {
74    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
75    where
76        S: Serializer,
77    {
78        let serialized_key: SerializedCachedKey = self.clone().into();
79        serialized_key.serialize(serializer)
80    }
81}
82
83impl<'de> Deserialize<'de> for CachedKey {
84    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
85    where
86        D: Deserializer<'de>,
87    {
88        let serialized_key = SerializedCachedKey::deserialize(deserializer)?;
89        Ok(serialized_key.into())
90    }
91}
92
93#[derive(Serialize, Deserialize)]
94enum SerializedCachedEntryV1 {
95    Block(Bytes),
96    SsTableIndex(Bytes),
97    BloomFilter(Bytes),
98    SstStats(Bytes),
99}
100
101impl SerializedCachedEntryV1 {
102    fn into_cached_entry(self) -> Result<CachedEntry, SlateDBError> {
103        let item = match self {
104            SerializedCachedEntryV1::Block(encoded) => {
105                let block = Block::decode(encoded);
106                CachedItem::Block(Arc::new(block))
107            }
108            SerializedCachedEntryV1::SsTableIndex(encoded) => {
109                let index = SsTableIndexOwned::new(encoded)?;
110                CachedItem::SsTableIndex(Arc::new(index))
111            }
112            SerializedCachedEntryV1::BloomFilter(encoded) => {
113                let filter = BloomFilter::decode(encoded.as_ref());
114                CachedItem::BloomFilter(Arc::new(filter))
115            }
116            SerializedCachedEntryV1::SstStats(encoded) => {
117                let stats = SstStats::decode(encoded)?;
118                CachedItem::SstStats(Arc::new(stats))
119            }
120        };
121        Ok(CachedEntry { item })
122    }
123}
124
125#[derive(Serialize, Deserialize)]
126enum SerializedCachedEntry {
127    V1(SerializedCachedEntryV1),
128}
129
130impl SerializedCachedEntry {
131    fn into_cached_entry(self) -> Result<CachedEntry, SlateDBError> {
132        match self {
133            SerializedCachedEntry::V1(entry) => entry.into_cached_entry(),
134        }
135    }
136}
137
138impl From<CachedEntry> for SerializedCachedEntry {
139    fn from(value: CachedEntry) -> Self {
140        match value.item {
141            CachedItem::Block(block) => {
142                let encoded = block.encode();
143                SerializedCachedEntry::V1(SerializedCachedEntryV1::Block(encoded))
144            }
145            CachedItem::SsTableIndex(index) => {
146                let encoded = index.data();
147                SerializedCachedEntry::V1(SerializedCachedEntryV1::SsTableIndex(encoded))
148            }
149            CachedItem::BloomFilter(filter) => {
150                let encoded = filter.encode();
151                SerializedCachedEntry::V1(SerializedCachedEntryV1::BloomFilter(encoded))
152            }
153            CachedItem::SstStats(stats) => {
154                let encoded = stats.encode();
155                SerializedCachedEntry::V1(SerializedCachedEntryV1::SstStats(encoded))
156            }
157        }
158    }
159}
160
161impl Serialize for CachedEntry {
162    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
163    where
164        S: Serializer,
165    {
166        let serialized_entry: SerializedCachedEntry = self.clone().into();
167        serialized_entry.serialize(serializer)
168    }
169}
170
171impl<'de> Deserialize<'de> for CachedEntry {
172    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
173    where
174        D: Deserializer<'de>,
175    {
176        let serialized_entry = SerializedCachedEntry::deserialize(deserializer)?;
177        serialized_entry
178            .into_cached_entry()
179            // the error returned by deserialize must be lowercase and not end in a .
180            .map_err(|e| D::Error::custom(format!("slatedb error ({})", e).to_lowercase()))
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use crate::block_iterator::BlockIteratorLatest;
187    use crate::db_cache::{CachedEntry, CachedItem, CachedKey};
188    use crate::db_state::SsTableId;
189    use crate::filter::BloomFilterBuilder;
190    use crate::flatbuffer_types::{
191        BlockMeta, BlockMetaArgs, SsTableIndex, SsTableIndexArgs, SsTableIndexOwned,
192    };
193    use crate::format::sst::BlockBuilder;
194    use crate::iter::IterationOrder;
195    use crate::sst_stats::SstStats;
196    use crate::test_utils::assert_iterator;
197    use crate::types::RowEntry;
198    use bytes::Bytes;
199    use std::sync::Arc;
200    use ulid::Ulid;
201
202    #[test]
203    fn test_should_serialize_deserialize_compacted_sst_key() {
204        let key = CachedKey {
205            scope_id: 0,
206            sst_id: SsTableId::Compacted(Ulid::from((123, 456))),
207            block_id: 99,
208        };
209
210        let encoded = bincode::serialize(&key).unwrap();
211        let decoded: CachedKey = bincode::deserialize(&encoded).unwrap();
212
213        assert_eq!(decoded, key);
214    }
215
216    #[test]
217    fn test_should_serialize_deserialize_wal_sst_key() {
218        let key = CachedKey {
219            scope_id: 5,
220            sst_id: SsTableId::Wal(123),
221            block_id: 99,
222        };
223
224        let encoded = bincode::serialize(&key).unwrap();
225        let decoded: CachedKey = bincode::deserialize(&encoded).unwrap();
226
227        assert_eq!(decoded, key);
228    }
229
230    #[tokio::test]
231    async fn test_should_serialize_deserialize_block() {
232        let rows = vec![
233            RowEntry::new_value(b"foo", b"bar", 0),
234            RowEntry::new_merge(b"biz", b"baz", 1),
235            RowEntry::new_tombstone(b"bla", 2),
236        ];
237        let mut builder = BlockBuilder::new_latest(4096);
238        for row in rows.iter() {
239            assert!(builder.add(row.clone()).unwrap());
240        }
241        let block = Arc::new(builder.build().unwrap());
242        let entry = CachedEntry {
243            item: CachedItem::Block(block.clone()),
244        };
245
246        let encoded = bincode::serialize(&entry).unwrap();
247        let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
248
249        let decoded_block = decoded.block().unwrap();
250        assert!(block.as_ref() == decoded_block.as_ref());
251        let mut iter = BlockIteratorLatest::new(decoded_block, IterationOrder::Ascending);
252        assert_iterator(&mut iter, rows).await;
253    }
254
255    #[test]
256    fn test_should_serialize_deserialize_index() {
257        let first_keys = vec![b"foo".as_slice(), b"bar".as_slice(), b"baz".as_slice()];
258        let index = Arc::new(build_index_with_first_keys(&first_keys));
259        let entry = CachedEntry {
260            item: CachedItem::SsTableIndex(index.clone()),
261        };
262
263        let encoded = bincode::serialize(&entry).unwrap();
264        let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
265
266        let decoded_index = decoded.sst_index().unwrap();
267        assert!(index.as_ref() == decoded_index.as_ref());
268    }
269
270    #[test]
271    fn test_should_serialize_deserialize_filter() {
272        let keys = vec![b"foo", b"bar", b"baz"];
273        let mut builder = BloomFilterBuilder::new(10);
274        for k in keys {
275            builder.add_key(k);
276        }
277        let filter = Arc::new(builder.build());
278        let entry = CachedEntry {
279            item: CachedItem::BloomFilter(filter.clone()),
280        };
281
282        let encoded = bincode::serialize(&entry).unwrap();
283        let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
284
285        let decoded_filter = decoded.bloom_filter().unwrap();
286        assert!(filter.as_ref() == decoded_filter.as_ref());
287    }
288
289    #[test]
290    fn test_should_serialize_deserialize_stats() {
291        let stats = Arc::new(SstStats {
292            num_puts: 100,
293            num_deletes: 10,
294            num_merges: 5,
295            raw_key_size: 2048,
296            raw_val_size: 8192,
297            block_stats: vec![],
298        });
299        let entry = CachedEntry {
300            item: CachedItem::SstStats(stats.clone()),
301        };
302
303        let encoded = bincode::serialize(&entry).unwrap();
304        let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
305
306        let decoded_stats = decoded.sst_stats().unwrap();
307        assert_eq!(stats.as_ref(), decoded_stats.as_ref());
308    }
309
310    fn build_index_with_first_keys(first_keys: &[&[u8]]) -> SsTableIndexOwned {
311        let mut index_builder = flatbuffers::FlatBufferBuilder::new();
312        let mut block_metas = Vec::new();
313        for fk in first_keys {
314            let fk = index_builder.create_vector(fk);
315            let block_meta = BlockMeta::create(
316                &mut index_builder,
317                &BlockMetaArgs {
318                    first_key: Some(fk),
319                    offset: 0u64,
320                },
321            );
322            block_metas.push(block_meta);
323        }
324        let block_metas = index_builder.create_vector(&block_metas);
325        let index_wip = SsTableIndex::create(
326            &mut index_builder,
327            &SsTableIndexArgs {
328                block_meta: Some(block_metas),
329            },
330        );
331        index_builder.finish(index_wip, None);
332        let index_bytes = Bytes::copy_from_slice(index_builder.finished_data());
333        SsTableIndexOwned::new(index_bytes).unwrap()
334    }
335}