1use crate::db_cache::{CachedEntry, CachedItem, CachedKey};
8use crate::db_state::SsTableId;
9use crate::error::SlateDBError;
10use crate::filter::BloomFilter;
11use crate::flatbuffer_types::SsTableIndexOwned;
12use crate::format::block::Block;
13use crate::sst_stats::SstStats;
14use bytes::Bytes;
15use serde::de::Error;
16use serde::{Deserialize, Deserializer, Serialize, Serializer};
17use std::sync::Arc;
18use ulid::Ulid;
19
20#[derive(Serialize, Deserialize)]
21enum SerializedSsTableId {
22 Wal(u64),
23 Compacted(Ulid),
24}
25
26impl From<SerializedSsTableId> for SsTableId {
27 fn from(value: SerializedSsTableId) -> Self {
28 match value {
29 SerializedSsTableId::Wal(id) => SsTableId::Wal(id),
30 SerializedSsTableId::Compacted(id) => SsTableId::Compacted(id),
31 }
32 }
33}
34
35impl From<SsTableId> for SerializedSsTableId {
36 fn from(value: SsTableId) -> Self {
37 match value {
38 SsTableId::Wal(id) => SerializedSsTableId::Wal(id),
39 SsTableId::Compacted(id) => SerializedSsTableId::Compacted(id),
40 }
41 }
42}
43
44#[derive(Serialize, Deserialize)]
45enum SerializedCachedKey {
46 V1(SerializedSsTableId, u64),
47 V2(u64, SerializedSsTableId, u64),
48}
49
50impl From<SerializedCachedKey> for CachedKey {
51 fn from(value: SerializedCachedKey) -> Self {
52 match value {
53 SerializedCachedKey::V1(sst_id, block_id) => CachedKey {
54 scope_id: 0,
55 sst_id: sst_id.into(),
56 block_id,
57 },
58 SerializedCachedKey::V2(scope_id, sst_id, block_id) => CachedKey {
59 scope_id,
60 sst_id: sst_id.into(),
61 block_id,
62 },
63 }
64 }
65}
66
67impl From<CachedKey> for SerializedCachedKey {
68 fn from(value: CachedKey) -> Self {
69 SerializedCachedKey::V2(value.scope_id, value.sst_id.into(), value.block_id)
70 }
71}
72
73impl Serialize for CachedKey {
74 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
75 where
76 S: Serializer,
77 {
78 let serialized_key: SerializedCachedKey = self.clone().into();
79 serialized_key.serialize(serializer)
80 }
81}
82
83impl<'de> Deserialize<'de> for CachedKey {
84 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
85 where
86 D: Deserializer<'de>,
87 {
88 let serialized_key = SerializedCachedKey::deserialize(deserializer)?;
89 Ok(serialized_key.into())
90 }
91}
92
93#[derive(Serialize, Deserialize)]
94enum SerializedCachedEntryV1 {
95 Block(Bytes),
96 SsTableIndex(Bytes),
97 BloomFilter(Bytes),
98 SstStats(Bytes),
99}
100
101impl SerializedCachedEntryV1 {
102 fn into_cached_entry(self) -> Result<CachedEntry, SlateDBError> {
103 let item = match self {
104 SerializedCachedEntryV1::Block(encoded) => {
105 let block = Block::decode(encoded);
106 CachedItem::Block(Arc::new(block))
107 }
108 SerializedCachedEntryV1::SsTableIndex(encoded) => {
109 let index = SsTableIndexOwned::new(encoded)?;
110 CachedItem::SsTableIndex(Arc::new(index))
111 }
112 SerializedCachedEntryV1::BloomFilter(encoded) => {
113 let filter = BloomFilter::decode(encoded.as_ref());
114 CachedItem::BloomFilter(Arc::new(filter))
115 }
116 SerializedCachedEntryV1::SstStats(encoded) => {
117 let stats = SstStats::decode(encoded)?;
118 CachedItem::SstStats(Arc::new(stats))
119 }
120 };
121 Ok(CachedEntry { item })
122 }
123}
124
125#[derive(Serialize, Deserialize)]
126enum SerializedCachedEntry {
127 V1(SerializedCachedEntryV1),
128}
129
130impl SerializedCachedEntry {
131 fn into_cached_entry(self) -> Result<CachedEntry, SlateDBError> {
132 match self {
133 SerializedCachedEntry::V1(entry) => entry.into_cached_entry(),
134 }
135 }
136}
137
138impl From<CachedEntry> for SerializedCachedEntry {
139 fn from(value: CachedEntry) -> Self {
140 match value.item {
141 CachedItem::Block(block) => {
142 let encoded = block.encode();
143 SerializedCachedEntry::V1(SerializedCachedEntryV1::Block(encoded))
144 }
145 CachedItem::SsTableIndex(index) => {
146 let encoded = index.data();
147 SerializedCachedEntry::V1(SerializedCachedEntryV1::SsTableIndex(encoded))
148 }
149 CachedItem::BloomFilter(filter) => {
150 let encoded = filter.encode();
151 SerializedCachedEntry::V1(SerializedCachedEntryV1::BloomFilter(encoded))
152 }
153 CachedItem::SstStats(stats) => {
154 let encoded = stats.encode();
155 SerializedCachedEntry::V1(SerializedCachedEntryV1::SstStats(encoded))
156 }
157 }
158 }
159}
160
161impl Serialize for CachedEntry {
162 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
163 where
164 S: Serializer,
165 {
166 let serialized_entry: SerializedCachedEntry = self.clone().into();
167 serialized_entry.serialize(serializer)
168 }
169}
170
171impl<'de> Deserialize<'de> for CachedEntry {
172 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
173 where
174 D: Deserializer<'de>,
175 {
176 let serialized_entry = SerializedCachedEntry::deserialize(deserializer)?;
177 serialized_entry
178 .into_cached_entry()
179 .map_err(|e| D::Error::custom(format!("slatedb error ({})", e).to_lowercase()))
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use crate::block_iterator::BlockIteratorLatest;
187 use crate::db_cache::{CachedEntry, CachedItem, CachedKey};
188 use crate::db_state::SsTableId;
189 use crate::filter::BloomFilterBuilder;
190 use crate::flatbuffer_types::{
191 BlockMeta, BlockMetaArgs, SsTableIndex, SsTableIndexArgs, SsTableIndexOwned,
192 };
193 use crate::format::sst::BlockBuilder;
194 use crate::iter::IterationOrder;
195 use crate::sst_stats::SstStats;
196 use crate::test_utils::assert_iterator;
197 use crate::types::RowEntry;
198 use bytes::Bytes;
199 use std::sync::Arc;
200 use ulid::Ulid;
201
202 #[test]
203 fn test_should_serialize_deserialize_compacted_sst_key() {
204 let key = CachedKey {
205 scope_id: 0,
206 sst_id: SsTableId::Compacted(Ulid::from((123, 456))),
207 block_id: 99,
208 };
209
210 let encoded = bincode::serialize(&key).unwrap();
211 let decoded: CachedKey = bincode::deserialize(&encoded).unwrap();
212
213 assert_eq!(decoded, key);
214 }
215
216 #[test]
217 fn test_should_serialize_deserialize_wal_sst_key() {
218 let key = CachedKey {
219 scope_id: 5,
220 sst_id: SsTableId::Wal(123),
221 block_id: 99,
222 };
223
224 let encoded = bincode::serialize(&key).unwrap();
225 let decoded: CachedKey = bincode::deserialize(&encoded).unwrap();
226
227 assert_eq!(decoded, key);
228 }
229
230 #[tokio::test]
231 async fn test_should_serialize_deserialize_block() {
232 let rows = vec![
233 RowEntry::new_value(b"foo", b"bar", 0),
234 RowEntry::new_merge(b"biz", b"baz", 1),
235 RowEntry::new_tombstone(b"bla", 2),
236 ];
237 let mut builder = BlockBuilder::new_latest(4096);
238 for row in rows.iter() {
239 assert!(builder.add(row.clone()).unwrap());
240 }
241 let block = Arc::new(builder.build().unwrap());
242 let entry = CachedEntry {
243 item: CachedItem::Block(block.clone()),
244 };
245
246 let encoded = bincode::serialize(&entry).unwrap();
247 let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
248
249 let decoded_block = decoded.block().unwrap();
250 assert!(block.as_ref() == decoded_block.as_ref());
251 let mut iter = BlockIteratorLatest::new(decoded_block, IterationOrder::Ascending);
252 assert_iterator(&mut iter, rows).await;
253 }
254
255 #[test]
256 fn test_should_serialize_deserialize_index() {
257 let first_keys = vec![b"foo".as_slice(), b"bar".as_slice(), b"baz".as_slice()];
258 let index = Arc::new(build_index_with_first_keys(&first_keys));
259 let entry = CachedEntry {
260 item: CachedItem::SsTableIndex(index.clone()),
261 };
262
263 let encoded = bincode::serialize(&entry).unwrap();
264 let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
265
266 let decoded_index = decoded.sst_index().unwrap();
267 assert!(index.as_ref() == decoded_index.as_ref());
268 }
269
270 #[test]
271 fn test_should_serialize_deserialize_filter() {
272 let keys = vec![b"foo", b"bar", b"baz"];
273 let mut builder = BloomFilterBuilder::new(10);
274 for k in keys {
275 builder.add_key(k);
276 }
277 let filter = Arc::new(builder.build());
278 let entry = CachedEntry {
279 item: CachedItem::BloomFilter(filter.clone()),
280 };
281
282 let encoded = bincode::serialize(&entry).unwrap();
283 let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
284
285 let decoded_filter = decoded.bloom_filter().unwrap();
286 assert!(filter.as_ref() == decoded_filter.as_ref());
287 }
288
289 #[test]
290 fn test_should_serialize_deserialize_stats() {
291 let stats = Arc::new(SstStats {
292 num_puts: 100,
293 num_deletes: 10,
294 num_merges: 5,
295 raw_key_size: 2048,
296 raw_val_size: 8192,
297 block_stats: vec![],
298 });
299 let entry = CachedEntry {
300 item: CachedItem::SstStats(stats.clone()),
301 };
302
303 let encoded = bincode::serialize(&entry).unwrap();
304 let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
305
306 let decoded_stats = decoded.sst_stats().unwrap();
307 assert_eq!(stats.as_ref(), decoded_stats.as_ref());
308 }
309
310 fn build_index_with_first_keys(first_keys: &[&[u8]]) -> SsTableIndexOwned {
311 let mut index_builder = flatbuffers::FlatBufferBuilder::new();
312 let mut block_metas = Vec::new();
313 for fk in first_keys {
314 let fk = index_builder.create_vector(fk);
315 let block_meta = BlockMeta::create(
316 &mut index_builder,
317 &BlockMetaArgs {
318 first_key: Some(fk),
319 offset: 0u64,
320 },
321 );
322 block_metas.push(block_meta);
323 }
324 let block_metas = index_builder.create_vector(&block_metas);
325 let index_wip = SsTableIndex::create(
326 &mut index_builder,
327 &SsTableIndexArgs {
328 block_meta: Some(block_metas),
329 },
330 );
331 index_builder.finish(index_wip, None);
332 let index_bytes = Bytes::copy_from_slice(index_builder.finished_data());
333 SsTableIndexOwned::new(index_bytes).unwrap()
334 }
335}