use crate::db_cache::{CachedEntry, CachedItem, CachedKey};
use crate::db_state::SsTableId;
use crate::error::SlateDBError;
use crate::filter::BloomFilter;
use crate::flatbuffer_types::SsTableIndexOwned;
use crate::format::block::Block;
use crate::sst_stats::SstStats;
use bytes::Bytes;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::sync::Arc;
use ulid::Ulid;
#[derive(Serialize, Deserialize)]
enum SerializedSsTableId {
Wal(u64),
Compacted(Ulid),
}
impl From<SerializedSsTableId> for SsTableId {
fn from(value: SerializedSsTableId) -> Self {
match value {
SerializedSsTableId::Wal(id) => SsTableId::Wal(id),
SerializedSsTableId::Compacted(id) => SsTableId::Compacted(id),
}
}
}
impl From<SsTableId> for SerializedSsTableId {
fn from(value: SsTableId) -> Self {
match value {
SsTableId::Wal(id) => SerializedSsTableId::Wal(id),
SsTableId::Compacted(id) => SerializedSsTableId::Compacted(id),
}
}
}
#[derive(Serialize, Deserialize)]
enum SerializedCachedKey {
V1(SerializedSsTableId, u64),
V2(u64, SerializedSsTableId, u64),
}
impl From<SerializedCachedKey> for CachedKey {
fn from(value: SerializedCachedKey) -> Self {
match value {
SerializedCachedKey::V1(sst_id, block_id) => CachedKey {
scope_id: 0,
sst_id: sst_id.into(),
block_id,
},
SerializedCachedKey::V2(scope_id, sst_id, block_id) => CachedKey {
scope_id,
sst_id: sst_id.into(),
block_id,
},
}
}
}
impl From<CachedKey> for SerializedCachedKey {
fn from(value: CachedKey) -> Self {
SerializedCachedKey::V2(value.scope_id, value.sst_id.into(), value.block_id)
}
}
impl Serialize for CachedKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let serialized_key: SerializedCachedKey = self.clone().into();
serialized_key.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for CachedKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let serialized_key = SerializedCachedKey::deserialize(deserializer)?;
Ok(serialized_key.into())
}
}
#[derive(Serialize, Deserialize)]
enum SerializedCachedEntryV1 {
Block(Bytes),
SsTableIndex(Bytes),
BloomFilter(Bytes),
SstStats(Bytes),
}
impl SerializedCachedEntryV1 {
fn into_cached_entry(self) -> Result<CachedEntry, SlateDBError> {
let item = match self {
SerializedCachedEntryV1::Block(encoded) => {
let block = Block::decode(encoded);
CachedItem::Block(Arc::new(block))
}
SerializedCachedEntryV1::SsTableIndex(encoded) => {
let index = SsTableIndexOwned::new(encoded)?;
CachedItem::SsTableIndex(Arc::new(index))
}
SerializedCachedEntryV1::BloomFilter(encoded) => {
let filter = BloomFilter::decode(encoded.as_ref());
CachedItem::BloomFilter(Arc::new(filter))
}
SerializedCachedEntryV1::SstStats(encoded) => {
let stats = SstStats::decode(encoded)?;
CachedItem::SstStats(Arc::new(stats))
}
};
Ok(CachedEntry { item })
}
}
#[derive(Serialize, Deserialize)]
enum SerializedCachedEntry {
V1(SerializedCachedEntryV1),
}
impl SerializedCachedEntry {
fn into_cached_entry(self) -> Result<CachedEntry, SlateDBError> {
match self {
SerializedCachedEntry::V1(entry) => entry.into_cached_entry(),
}
}
}
impl From<CachedEntry> for SerializedCachedEntry {
fn from(value: CachedEntry) -> Self {
match value.item {
CachedItem::Block(block) => {
let encoded = block.encode();
SerializedCachedEntry::V1(SerializedCachedEntryV1::Block(encoded))
}
CachedItem::SsTableIndex(index) => {
let encoded = index.data();
SerializedCachedEntry::V1(SerializedCachedEntryV1::SsTableIndex(encoded))
}
CachedItem::BloomFilter(filter) => {
let encoded = filter.encode();
SerializedCachedEntry::V1(SerializedCachedEntryV1::BloomFilter(encoded))
}
CachedItem::SstStats(stats) => {
let encoded = stats.encode();
SerializedCachedEntry::V1(SerializedCachedEntryV1::SstStats(encoded))
}
}
}
}
impl Serialize for CachedEntry {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let serialized_entry: SerializedCachedEntry = self.clone().into();
serialized_entry.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for CachedEntry {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let serialized_entry = SerializedCachedEntry::deserialize(deserializer)?;
serialized_entry
.into_cached_entry()
.map_err(|e| D::Error::custom(format!("slatedb error ({})", e).to_lowercase()))
}
}
#[cfg(test)]
mod tests {
use crate::block_iterator::BlockIteratorLatest;
use crate::db_cache::{CachedEntry, CachedItem, CachedKey};
use crate::db_state::SsTableId;
use crate::filter::BloomFilterBuilder;
use crate::flatbuffer_types::{
BlockMeta, BlockMetaArgs, SsTableIndex, SsTableIndexArgs, SsTableIndexOwned,
};
use crate::format::sst::BlockBuilder;
use crate::iter::IterationOrder;
use crate::sst_stats::SstStats;
use crate::test_utils::assert_iterator;
use crate::types::RowEntry;
use bytes::Bytes;
use std::sync::Arc;
use ulid::Ulid;
#[test]
fn test_should_serialize_deserialize_compacted_sst_key() {
let key = CachedKey {
scope_id: 0,
sst_id: SsTableId::Compacted(Ulid::from((123, 456))),
block_id: 99,
};
let encoded = bincode::serialize(&key).unwrap();
let decoded: CachedKey = bincode::deserialize(&encoded).unwrap();
assert_eq!(decoded, key);
}
#[test]
fn test_should_serialize_deserialize_wal_sst_key() {
let key = CachedKey {
scope_id: 5,
sst_id: SsTableId::Wal(123),
block_id: 99,
};
let encoded = bincode::serialize(&key).unwrap();
let decoded: CachedKey = bincode::deserialize(&encoded).unwrap();
assert_eq!(decoded, key);
}
#[tokio::test]
async fn test_should_serialize_deserialize_block() {
let rows = vec![
RowEntry::new_value(b"foo", b"bar", 0),
RowEntry::new_merge(b"biz", b"baz", 1),
RowEntry::new_tombstone(b"bla", 2),
];
let mut builder = BlockBuilder::new_latest(4096);
for row in rows.iter() {
assert!(builder.add(row.clone()).unwrap());
}
let block = Arc::new(builder.build().unwrap());
let entry = CachedEntry {
item: CachedItem::Block(block.clone()),
};
let encoded = bincode::serialize(&entry).unwrap();
let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
let decoded_block = decoded.block().unwrap();
assert!(block.as_ref() == decoded_block.as_ref());
let mut iter = BlockIteratorLatest::new(decoded_block, IterationOrder::Ascending);
assert_iterator(&mut iter, rows).await;
}
#[test]
fn test_should_serialize_deserialize_index() {
let first_keys = vec![b"foo".as_slice(), b"bar".as_slice(), b"baz".as_slice()];
let index = Arc::new(build_index_with_first_keys(&first_keys));
let entry = CachedEntry {
item: CachedItem::SsTableIndex(index.clone()),
};
let encoded = bincode::serialize(&entry).unwrap();
let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
let decoded_index = decoded.sst_index().unwrap();
assert!(index.as_ref() == decoded_index.as_ref());
}
#[test]
fn test_should_serialize_deserialize_filter() {
let keys = vec![b"foo", b"bar", b"baz"];
let mut builder = BloomFilterBuilder::new(10);
for k in keys {
builder.add_key(k);
}
let filter = Arc::new(builder.build());
let entry = CachedEntry {
item: CachedItem::BloomFilter(filter.clone()),
};
let encoded = bincode::serialize(&entry).unwrap();
let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
let decoded_filter = decoded.bloom_filter().unwrap();
assert!(filter.as_ref() == decoded_filter.as_ref());
}
#[test]
fn test_should_serialize_deserialize_stats() {
let stats = Arc::new(SstStats {
num_puts: 100,
num_deletes: 10,
num_merges: 5,
raw_key_size: 2048,
raw_val_size: 8192,
block_stats: vec![],
});
let entry = CachedEntry {
item: CachedItem::SstStats(stats.clone()),
};
let encoded = bincode::serialize(&entry).unwrap();
let decoded: CachedEntry = bincode::deserialize(&encoded).unwrap();
let decoded_stats = decoded.sst_stats().unwrap();
assert_eq!(stats.as_ref(), decoded_stats.as_ref());
}
fn build_index_with_first_keys(first_keys: &[&[u8]]) -> SsTableIndexOwned {
let mut index_builder = flatbuffers::FlatBufferBuilder::new();
let mut block_metas = Vec::new();
for fk in first_keys {
let fk = index_builder.create_vector(fk);
let block_meta = BlockMeta::create(
&mut index_builder,
&BlockMetaArgs {
first_key: Some(fk),
offset: 0u64,
},
);
block_metas.push(block_meta);
}
let block_metas = index_builder.create_vector(&block_metas);
let index_wip = SsTableIndex::create(
&mut index_builder,
&SsTableIndexArgs {
block_meta: Some(block_metas),
},
);
index_builder.finish(index_wip, None);
let index_bytes = Bytes::copy_from_slice(index_builder.finished_data());
SsTableIndexOwned::new(index_bytes).unwrap()
}
}