#![allow(dead_code)]
use crate::error::SlateDBError;
use crate::format::block::{Block, SIZEOF_U16};
use crate::format::row_codec_v2::{SstRowCodecV2, SstRowEntryV2};
use crate::types::RowEntry;
use bytes::Bytes;
const DEFAULT_RESTART_INTERVAL: usize = 16;
fn compute_prefix(lhs: &[u8], rhs: &[u8]) -> usize {
compute_prefix_chunks::<128>(lhs, rhs)
}
fn compute_prefix_chunks<const N: usize>(lhs: &[u8], rhs: &[u8]) -> usize {
let off = std::iter::zip(lhs.chunks_exact(N), rhs.chunks_exact(N))
.take_while(|(a, b)| a == b)
.count()
* N;
off + std::iter::zip(&lhs[off..], &rhs[off..])
.take_while(|(a, b)| a == b)
.count()
}
pub(super) struct BlockBuilderV2 {
data: Vec<u8>,
restarts: Vec<u16>,
block_size: usize,
restart_interval: usize,
counter: usize,
last_key: Bytes,
}
impl BlockBuilderV2 {
pub(crate) fn new(block_size: usize) -> Self {
Self {
data: Vec::new(),
restarts: Vec::new(),
block_size,
restart_interval: DEFAULT_RESTART_INTERVAL,
counter: 0,
last_key: Bytes::new(),
}
}
#[cfg(test)]
pub(crate) fn new_with_restart_interval(block_size: usize, restart_interval: usize) -> Self {
Self {
data: Vec::new(),
restarts: Vec::new(),
block_size,
restart_interval,
counter: 0,
last_key: Bytes::new(),
}
}
fn current_size(&self) -> usize {
self.data.len()
+ self.restarts.len() * SIZEOF_U16 + SIZEOF_U16 }
fn entry_encoded_size(&self, entry: &RowEntry) -> usize {
let shared_bytes = if self.is_restart_point() {
0
} else {
compute_prefix(&self.last_key, &entry.key) as u32
};
let key_suffix = &entry.key[shared_bytes as usize..];
let temp_entry = SstRowEntryV2::new(
shared_bytes,
Bytes::copy_from_slice(key_suffix),
entry.seq,
entry.value.clone(),
entry.create_ts,
entry.expire_ts,
);
temp_entry.encoded_size()
}
fn is_restart_point(&self) -> bool {
self.counter.is_multiple_of(self.restart_interval)
}
pub(crate) fn would_fit(&self, entry: &RowEntry) -> bool {
if self.is_empty() {
return true;
}
let entry_size = self.entry_encoded_size(entry);
let restart_overhead = if self.is_restart_point() {
SIZEOF_U16 } else {
0
};
self.current_size() + entry_size + restart_overhead <= self.block_size
}
pub(crate) fn add(&mut self, entry: RowEntry) -> Result<bool, SlateDBError> {
if entry.key.is_empty() {
return Err(SlateDBError::EmptyKey);
}
if !self.would_fit(&entry) {
return Ok(false);
}
let shared_bytes = if self.is_restart_point() {
assert!(
self.data.len() <= u16::MAX as usize,
"Block data too large for u16 offset: {} bytes",
self.data.len()
);
self.restarts.push(self.data.len() as u16);
0 } else {
compute_prefix(&self.last_key, &entry.key) as u32
};
let key_suffix = Bytes::copy_from_slice(&entry.key[shared_bytes as usize..]);
let sst_row = SstRowEntryV2::new(
shared_bytes,
key_suffix,
entry.seq,
entry.value,
entry.create_ts,
entry.expire_ts,
);
let codec = SstRowCodecV2::new();
codec.encode(&mut self.data, &sst_row);
self.last_key = entry.key;
self.counter += 1;
Ok(true)
}
pub(crate) fn is_empty(&self) -> bool {
self.counter == 0
}
pub(crate) fn build(self) -> Result<Block, SlateDBError> {
if self.is_empty() {
return Err(SlateDBError::EmptyBlock);
}
Ok(Block {
data: Bytes::from(self.data),
offsets: self.restarts,
})
}
#[cfg(test)]
pub(crate) fn add_value(
&mut self,
key: &[u8],
value: &[u8],
ts: Option<i64>,
expire_ts: Option<i64>,
) -> bool {
let entry = RowEntry::new(
key.to_vec().into(),
crate::types::ValueDeletable::Value(Bytes::copy_from_slice(value)),
0,
ts,
expire_ts,
);
self.add(entry).unwrap_or(false)
}
#[cfg(test)]
pub(crate) fn restart_interval(&self) -> usize {
self.restart_interval
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::ValueDeletable;
use rstest::rstest;
fn make_entry(key: &[u8], value: &[u8], seq: u64) -> RowEntry {
RowEntry::new(
Bytes::copy_from_slice(key),
ValueDeletable::Value(Bytes::copy_from_slice(value)),
seq,
None,
None,
)
}
#[test]
fn should_build_single_entry_block() {
let mut builder = BlockBuilderV2::new(4096);
let added = builder.add(make_entry(b"key1", b"value1", 1));
assert!(added.unwrap());
let block = builder.build().expect("build failed");
assert_eq!(block.offsets.len(), 1);
assert_eq!(block.offsets[0], 0);
}
#[test]
fn should_build_block_with_restart_points() {
let mut builder = BlockBuilderV2::new_with_restart_interval(4096, 4);
for i in 0..10 {
let key = format!("key_{:03}", i);
let value = format!("value_{}", i);
assert!(builder
.add(make_entry(key.as_bytes(), value.as_bytes(), i as u64))
.unwrap());
}
let block = builder.build().expect("build failed");
assert_eq!(block.offsets.len(), 3); }
#[test]
fn should_encode_decode_round_trip() {
let mut builder = BlockBuilderV2::new_with_restart_interval(4096, 4);
for i in 0..8 {
let key = format!("key_{:03}", i);
let value = format!("value_{}", i);
let _ = builder.add(make_entry(key.as_bytes(), value.as_bytes(), i as u64));
}
let block = builder.build().expect("build failed");
let encoded = block.encode();
let decoded = Block::decode(encoded);
assert_eq!(block.data, decoded.data);
assert_eq!(block.offsets, decoded.offsets);
}
#[test]
fn should_reject_entry_exceeding_block_size() {
let mut builder = BlockBuilderV2::new(100);
let _ = builder.add(make_entry(b"key1", b"value1", 1));
let large_value = vec![b'x'; 200];
let would_fit = builder.would_fit(&make_entry(b"key2", &large_value, 2));
assert!(!would_fit);
}
#[test]
fn should_accept_first_entry_exceeding_block_size() {
let builder = BlockBuilderV2::new(10);
let large_value = vec![b'x'; 200];
let would_fit = builder.would_fit(&make_entry(b"key1", &large_value, 1));
assert!(would_fit);
}
#[test]
fn should_compute_correct_restart_offsets() {
let mut builder = BlockBuilderV2::new_with_restart_interval(4096, 2);
let _ = builder.add(make_entry(b"aaa", b"v1", 1)); let _ = builder.add(make_entry(b"aab", b"v2", 2)); let _ = builder.add(make_entry(b"bbb", b"v3", 3)); let _ = builder.add(make_entry(b"bbc", b"v4", 4)); let _ = builder.add(make_entry(b"ccc", b"v5", 5));
let block = builder.build().expect("build failed");
assert_eq!(block.offsets.len(), 3);
assert_eq!(block.offsets[0], 0);
assert!(block.offsets[1] > 0);
assert!(block.offsets[2] > block.offsets[1]);
}
#[test]
fn should_store_full_key_at_restart_points() {
let mut builder = BlockBuilderV2::new_with_restart_interval(4096, 2);
let _ = builder.add(make_entry(b"prefix_aaa", b"v1", 1)); let _ = builder.add(make_entry(b"prefix_aab", b"v2", 2)); let _ = builder.add(make_entry(b"prefix_bbb", b"v3", 3));
let block = builder.build().expect("build failed");
let codec = SstRowCodecV2::new();
let mut data: &[u8] = &block.data[block.offsets[0] as usize..];
let entry0 = codec.decode(&mut data).expect("decode failed");
let mut data: &[u8] = &block.data[block.offsets[1] as usize..];
let entry2 = codec.decode(&mut data).expect("decode failed");
assert_eq!(entry0.shared_bytes, 0);
assert_eq!(entry0.key_suffix.as_ref(), b"prefix_aaa");
assert_eq!(entry2.shared_bytes, 0);
assert_eq!(entry2.key_suffix.as_ref(), b"prefix_bbb");
}
#[test]
fn should_use_prefix_compression_between_restarts() {
let mut builder = BlockBuilderV2::new_with_restart_interval(4096, 16);
let _ = builder.add(make_entry(b"prefix_aaa", b"v1", 1)); let _ = builder.add(make_entry(b"prefix_aab", b"v2", 2));
let block = builder.build().expect("build failed");
let codec = SstRowCodecV2::new();
let restart_offset = block.offsets[0] as usize;
let mut data: &[u8] = &block.data[restart_offset..];
let first_entry = codec.decode(&mut data).expect("decode failed");
let second_entry = codec.decode(&mut data).expect("decode failed");
assert_eq!(first_entry.shared_bytes, 0);
assert_eq!(second_entry.shared_bytes, 9); assert_eq!(second_entry.key_suffix.as_ref(), b"b"); }
#[test]
fn should_return_correct_block_size() {
let mut builder = BlockBuilderV2::new_with_restart_interval(4096, 4);
for i in 0..5 {
let key = format!("key_{:03}", i);
let value = format!("val_{}", i);
let _ = builder.add(make_entry(key.as_bytes(), value.as_bytes(), i as u64));
}
let block = builder.build().expect("build failed");
let encoded = block.encode();
assert_eq!(block.size(), encoded.len());
}
#[test]
fn should_build_empty_returns_error() {
let builder = BlockBuilderV2::new(4096);
let result = builder.build();
assert!(matches!(result, Err(SlateDBError::EmptyBlock)));
}
#[test]
fn should_handle_tombstones() {
let mut builder = BlockBuilderV2::new(4096);
let _ = builder.add(RowEntry::new(
Bytes::from("key1"),
ValueDeletable::Value(Bytes::from("value1")),
1,
None,
None,
));
let _ = builder.add(RowEntry::new(
Bytes::from("key2"),
ValueDeletable::Tombstone,
2,
Some(100),
None,
));
let block = builder.build().expect("build failed");
assert!(!block.data.is_empty());
}
#[rstest]
#[case(1)]
#[case(2)]
#[case(4)]
#[case(16)]
#[case(32)]
fn should_handle_various_restart_intervals(#[case] interval: usize) {
let mut builder = BlockBuilderV2::new_with_restart_interval(8192, interval);
let num_entries: usize = 50;
for i in 0..num_entries {
let key = format!("key_{:05}", i);
let value = format!("value_{}", i);
let _ = builder.add(make_entry(key.as_bytes(), value.as_bytes(), i as u64));
}
let block = builder.build().expect("build failed");
let expected_restarts = num_entries.div_ceil(interval);
assert_eq!(block.offsets.len(), expected_restarts);
}
#[test]
fn should_add_value_helper_works() {
let mut builder = BlockBuilderV2::new(4096);
assert!(builder.add_value(b"key1", b"value1", None, None));
assert!(builder.add_value(b"key2", b"value2", None, None));
let block = builder.build().expect("build failed");
assert!(!block.data.is_empty());
}
#[test]
fn should_handle_large_key_requiring_u32_length() {
let large_key_size = 3 * 1024 * 1024; let large_key = vec![b'k'; large_key_size];
let value = b"small_value";
let mut builder = BlockBuilderV2::new(4 * 1024 * 1024);
let entry = RowEntry::new(
Bytes::from(large_key.clone()),
ValueDeletable::Value(Bytes::copy_from_slice(value)),
1,
None,
None,
);
let added = builder.add(entry);
assert!(added.unwrap());
let block = builder.build().expect("build failed");
let encoded = block.encode();
let decoded = Block::decode(encoded);
assert_eq!(block.data, decoded.data);
assert_eq!(block.offsets, decoded.offsets);
let codec = SstRowCodecV2::new();
let mut data: &[u8] = &decoded.data[decoded.offsets[0] as usize..];
let entry = codec.decode(&mut data).expect("decode failed");
assert_eq!(entry.shared_bytes, 0);
assert_eq!(entry.key_suffix.len(), large_key_size);
assert_eq!(entry.key_suffix.as_ref(), large_key.as_slice());
}
#[test]
fn should_handle_large_key_with_prefix_compression() {
let prefix_size = 1024 * 1024; let suffix_size = 100;
let mut key1 = vec![b'p'; prefix_size];
key1.extend(vec![b'a'; suffix_size]);
let mut key2 = vec![b'p'; prefix_size];
key2.extend(vec![b'b'; suffix_size]);
let mut builder = BlockBuilderV2::new(4 * 1024 * 1024);
let _ = builder.add(RowEntry::new(
Bytes::from(key1.clone()),
ValueDeletable::Value(Bytes::from("v1")),
1,
None,
None,
));
let _ = builder.add(RowEntry::new(
Bytes::from(key2.clone()),
ValueDeletable::Value(Bytes::from("v2")),
2,
None,
None,
));
let block = builder.build().expect("build failed");
let codec = SstRowCodecV2::new();
let mut data: &[u8] = &block.data[block.offsets[0] as usize..];
let first_entry = codec.decode(&mut data).expect("decode failed");
assert_eq!(first_entry.shared_bytes, 0);
assert_eq!(first_entry.key_suffix.len(), key1.len());
let second_entry = codec.decode(&mut data).expect("decode failed");
assert_eq!(second_entry.shared_bytes, prefix_size as u32);
assert_eq!(second_entry.key_suffix.len(), suffix_size);
}
}
#[cfg(test)]
mod block_size_comparison {
use super::*;
use crate::format::sst::BlockBuilder;
use crate::types::ValueDeletable;
const BLOCK_SIZE: usize = 64 * 1024;
struct ComparisonResult {
scenario: &'static str,
entry_count: usize,
v1_size: usize,
v2_size: usize,
}
impl ComparisonResult {
fn print(&self) {
let diff = self.v1_size as i64 - self.v2_size as i64;
let pct = if self.v1_size > 0 {
(diff as f64 / self.v1_size as f64) * 100.0
} else {
0.0
};
println!(
"{:<40} | {:>6} entries | V1: {:>8} bytes | V2: {:>8} bytes | diff: {:>+6} ({:>+.1}%)",
self.scenario, self.entry_count, self.v1_size, self.v2_size, diff, pct
);
}
}
fn build_v1_block(entries: &[RowEntry]) -> usize {
let mut builder = BlockBuilder::new_v1(BLOCK_SIZE);
for entry in entries {
let _ = builder.add(entry.clone());
}
builder.build().map(|b| b.size()).unwrap_or(0)
}
fn build_v2_block(entries: &[RowEntry]) -> usize {
let mut builder = BlockBuilderV2::new(BLOCK_SIZE);
for entry in entries {
let _ = builder.add(entry.clone());
}
builder.build().map(|b| b.size()).unwrap_or(0)
}
fn compare(scenario: &'static str, entries: Vec<RowEntry>) -> ComparisonResult {
let entry_count = entries.len();
let v1_size = build_v1_block(&entries);
let v2_size = build_v2_block(&entries);
ComparisonResult {
scenario,
entry_count,
v1_size,
v2_size,
}
}
fn make_entry(key: &[u8], value: &[u8], seq: u64) -> RowEntry {
RowEntry::new(
Bytes::copy_from_slice(key),
ValueDeletable::Value(Bytes::copy_from_slice(value)),
seq,
None,
None,
)
}
fn make_tombstone(key: &[u8], seq: u64) -> RowEntry {
RowEntry::new(
Bytes::copy_from_slice(key),
ValueDeletable::Tombstone,
seq,
None,
None,
)
}
#[test]
fn should_compare_block_sizes() {
println!("\n=== Block Size Comparison: V1 vs V2 ===\n");
println!(
"{:<40} | {:>13} | {:>18} | {:>18} | {:>18}",
"Scenario", "Entries", "V1 Size", "V2 Size", "Difference"
);
println!("{}", "-".repeat(120));
let results = vec![
compare("Sequential keys (key0001..key0100)", {
(1..=100)
.map(|i| make_entry(format!("key{:04}", i).as_bytes(), b"value", i))
.collect()
}),
compare("Sequential keys, 100-byte values", {
let value = vec![b'v'; 100];
(1..=100)
.map(|i| make_entry(format!("key{:04}", i).as_bytes(), &value, i))
.collect()
}),
compare("Long common prefix (90% shared)", {
let prefix = "com.example.application.module.submodule.";
(1..=100)
.map(|i| make_entry(format!("{}{:04}", prefix, i).as_bytes(), b"value", i))
.collect()
}),
compare("Random keys (no common prefix)", {
(1..=100)
.map(|i| {
let key = format!("{:08x}{:08x}", i * 2654435761, i * 1597334677);
make_entry(key.as_bytes(), b"value", i)
})
.collect()
}),
compare("Few entries (10 sequential)", {
(1..=10)
.map(|i| make_entry(format!("key{:04}", i).as_bytes(), b"value", i))
.collect()
}),
compare("Many small entries (500)", {
(1..=500)
.map(|i| make_entry(format!("k{:04}", i).as_bytes(), b"v", i))
.collect()
}),
compare("Tombstones (100 sequential keys)", {
(1..=100)
.map(|i| make_tombstone(format!("key{:04}", i).as_bytes(), i))
.collect()
}),
compare("Mixed: 50% values, 50% tombstones", {
(1..=100)
.map(|i| {
if i % 2 == 0 {
make_entry(format!("key{:04}", i).as_bytes(), b"value", i)
} else {
make_tombstone(format!("key{:04}", i).as_bytes(), i)
}
})
.collect()
}),
compare("Varying value sizes (1-500 bytes)", {
(1..=100)
.map(|i| {
let value = vec![b'v'; (i as usize * 5) % 500 + 1];
make_entry(format!("key{:04}", i).as_bytes(), &value, i)
})
.collect()
}),
compare("Large values (1KB each)", {
let value = vec![b'v'; 1024];
(1..=50)
.map(|i| make_entry(format!("key{:04}", i).as_bytes(), &value, i))
.collect()
}),
compare("Short keys (1-3 chars)", {
(1..=100)
.map(|i| {
let key = format!("{}", (b'a' + (i % 26) as u8) as char);
make_entry(key.as_bytes(), b"value", i)
})
.collect()
}),
compare("UUID-like keys", {
(1..=100)
.map(|i| {
let key = format!(
"{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
i * 12345,
i * 67,
i * 89,
i * 101,
i * 112131
);
make_entry(key.as_bytes(), b"value", i)
})
.collect()
}),
compare("Hierarchical paths (/a/b/c/...)", {
(1..=100)
.map(|i| {
let depth = (i % 5) + 1;
let path: String = (0..depth)
.map(|d| format!("/level{}", d))
.collect::<String>()
+ &format!("/item{:04}", i);
make_entry(path.as_bytes(), b"value", i)
})
.collect()
}),
compare("With create timestamps", {
(1..=100)
.map(|i| {
RowEntry::new(
Bytes::from(format!("key{:04}", i)),
ValueDeletable::Value(Bytes::from("value")),
i,
Some(1700000000000 + i as i64),
None,
)
})
.collect()
}),
compare("With create and expire timestamps", {
(1..=100)
.map(|i| {
RowEntry::new(
Bytes::from(format!("key{:04}", i)),
ValueDeletable::Value(Bytes::from("value")),
i,
Some(1700000000000 + i as i64),
Some(1800000000000 + i as i64),
)
})
.collect()
}),
];
for result in &results {
result.print();
}
println!("\n=== Summary ===");
let total_v1: usize = results.iter().map(|r| r.v1_size).sum();
let total_v2: usize = results.iter().map(|r| r.v2_size).sum();
let total_diff = total_v1 as i64 - total_v2 as i64;
let total_pct = (total_diff as f64 / total_v1 as f64) * 100.0;
println!(
"Total across all scenarios: V1={} bytes, V2={} bytes, diff={:+} ({:+.1}%)\n",
total_v1, total_v2, total_diff, total_pct
);
}
}