use crate::error::Result;
use crate::storage::write_engine::mutation::DecoratedKey;
pub const BASE_SAMPLING_LEVEL: u32 = 128;
#[derive(Debug)]
pub struct SummaryWriter {
min_index_interval: u32,
total_partition_count: u32,
entries: Vec<SummaryEntry>,
first_key: Option<Vec<u8>>,
last_key: Option<Vec<u8>>,
}
#[derive(Debug, Clone)]
struct SummaryEntry {
key: Vec<u8>,
index_position: u64,
}
impl SummaryWriter {
pub fn new(min_index_interval: u32) -> Self {
Self {
min_index_interval,
total_partition_count: 0,
entries: Vec::new(),
first_key: None,
last_key: None,
}
}
pub fn note_partition(&mut self, key: &DecoratedKey) {
let key_bytes = &key.key;
if self.first_key.is_none() {
self.first_key = Some(key_bytes.clone());
}
self.last_key = Some(key_bytes.clone());
self.total_partition_count = self.total_partition_count.saturating_add(1);
}
pub fn add_entry(&mut self, key: &DecoratedKey, index_offset: u64) -> Result<()> {
self.entries.push(SummaryEntry {
key: key.key.clone(),
index_position: index_offset,
});
Ok(())
}
pub fn finish(self) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
if self.entries.is_empty() {
self.write_header(&mut buffer, 0, 0);
return Ok(buffer);
}
let offset_table_size = self.entries.len() * 4; let mut entry_offsets = Vec::with_capacity(self.entries.len());
let mut entry_data = Vec::new();
for entry in &self.entries {
entry_offsets.push((offset_table_size + entry_data.len()) as u32);
entry_data.extend_from_slice(&entry.key);
entry_data.extend_from_slice(&entry.index_position.to_be_bytes());
}
let summary_entries_size = (offset_table_size + entry_data.len()) as u64;
self.write_header(&mut buffer, self.entries.len() as u32, summary_entries_size);
for offset in entry_offsets {
buffer.extend_from_slice(&offset.to_le_bytes());
}
buffer.extend_from_slice(&entry_data);
if let Some(first_key) = &self.first_key {
buffer.extend_from_slice(&(first_key.len() as u32).to_be_bytes());
buffer.extend_from_slice(first_key);
}
if let Some(last_key) = &self.last_key {
buffer.extend_from_slice(&(last_key.len() as u32).to_be_bytes());
buffer.extend_from_slice(last_key);
}
Ok(buffer)
}
pub fn entry_count(&self) -> usize {
self.entries.len()
}
fn write_header(&self, buffer: &mut Vec<u8>, entries_count: u32, summary_entries_size: u64) {
buffer.extend_from_slice(&self.min_index_interval.to_be_bytes());
buffer.extend_from_slice(&entries_count.to_be_bytes());
buffer.extend_from_slice(&summary_entries_size.to_be_bytes());
buffer.extend_from_slice(&BASE_SAMPLING_LEVEL.to_be_bytes());
buffer.extend_from_slice(&entries_count.to_be_bytes());
}
}
impl Default for SummaryWriter {
fn default() -> Self {
Self::new(128)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_summary_writer_new() {
let writer = SummaryWriter::new(128);
assert_eq!(writer.entry_count(), 0);
assert_eq!(writer.min_index_interval, 128);
}
#[test]
fn test_add_single_entry() {
let mut writer = SummaryWriter::new(128);
let key = DecoratedKey::new(12345, vec![0x00, 0x00, 0x00, 0x2A]);
writer.add_entry(&key, 0).unwrap();
assert_eq!(writer.entry_count(), 1);
}
#[test]
fn test_add_multiple_entries() {
let mut writer = SummaryWriter::new(128);
let key1 = DecoratedKey::new(100, vec![0x01]);
let key2 = DecoratedKey::new(200, vec![0x02]);
let key3 = DecoratedKey::new(300, vec![0x03]);
writer.add_entry(&key1, 0).unwrap();
writer.add_entry(&key2, 1024).unwrap();
writer.add_entry(&key3, 2048).unwrap();
assert_eq!(writer.entry_count(), 3);
}
#[test]
fn test_finish_single_entry() {
let mut writer = SummaryWriter::new(128);
let key = DecoratedKey::new(12345, vec![0x01, 0x02, 0x03, 0x04]);
writer.note_partition(&key);
writer.add_entry(&key, 0).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(bytes.len(), 56);
assert_eq!(&bytes[0..4], &[0x00, 0x00, 0x00, 0x80]);
assert_eq!(&bytes[4..8], &[0x00, 0x00, 0x00, 0x01]);
assert_eq!(
&bytes[8..16],
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10]
);
assert_eq!(&bytes[16..20], &[0x00, 0x00, 0x00, 0x80]);
assert_eq!(&bytes[20..24], &[0x00, 0x00, 0x00, 0x01]);
assert_eq!(&bytes[24..28], &[0x04, 0x00, 0x00, 0x00]);
assert_eq!(&bytes[28..32], &[0x01, 0x02, 0x03, 0x04]);
assert_eq!(
&bytes[32..40],
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
);
assert_eq!(&bytes[40..44], &[0x00, 0x00, 0x00, 0x04]);
assert_eq!(&bytes[44..48], &[0x01, 0x02, 0x03, 0x04]);
assert_eq!(&bytes[48..52], &[0x00, 0x00, 0x00, 0x04]);
assert_eq!(&bytes[52..56], &[0x01, 0x02, 0x03, 0x04]);
}
#[test]
fn test_finish_multiple_entries() {
let mut writer = SummaryWriter::new(128);
let key1 = DecoratedKey::new(100, vec![0xAA, 0xBB]);
writer.note_partition(&key1);
writer.add_entry(&key1, 0).unwrap();
let key2 = DecoratedKey::new(200, vec![0xCC, 0xDD, 0xEE]);
writer.note_partition(&key2);
writer.add_entry(&key2, 1024).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[4..8], &[0x00, 0x00, 0x00, 0x02]);
assert_eq!(&bytes[24..28], &[0x08, 0x00, 0x00, 0x00]);
assert_eq!(&bytes[28..32], &[0x12, 0x00, 0x00, 0x00]);
assert_eq!(&bytes[32..34], &[0xAA, 0xBB]); assert_eq!(
&bytes[34..42],
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
);
assert_eq!(&bytes[42..45], &[0xCC, 0xDD, 0xEE]); assert_eq!(
&bytes[45..53],
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00]
);
assert_eq!(&bytes[53..57], &[0x00, 0x00, 0x00, 0x02]); assert_eq!(&bytes[57..59], &[0xAA, 0xBB]);
assert_eq!(&bytes[59..63], &[0x00, 0x00, 0x00, 0x03]); assert_eq!(&bytes[63..66], &[0xCC, 0xDD, 0xEE]);
}
#[test]
fn test_offset_table_little_endian() {
let mut writer = SummaryWriter::new(128);
let key1 = DecoratedKey::new(100, vec![0x01; 16]); let key2 = DecoratedKey::new(200, vec![0x02; 16]);
writer.note_partition(&key1);
writer.add_entry(&key1, 0).unwrap();
writer.note_partition(&key2);
writer.add_entry(&key2, 100).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[24..28], &[0x08, 0x00, 0x00, 0x00]);
assert_eq!(&bytes[28..32], &[0x20, 0x00, 0x00, 0x00]);
}
#[test]
fn test_sampling_behavior() {
let mut writer = SummaryWriter::new(128);
let key0 = DecoratedKey::new(100, vec![0x00]);
let key128 = DecoratedKey::new(200, vec![0x80]);
let key256 = DecoratedKey::new(300, vec![0xFF]);
writer.note_partition(&key0);
writer.add_entry(&key0, 0).unwrap();
writer.note_partition(&key128);
writer.add_entry(&key128, 2048).unwrap();
writer.note_partition(&key256);
writer.add_entry(&key256, 4096).unwrap();
assert_eq!(writer.entry_count(), 3);
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[4..8], &[0x00, 0x00, 0x00, 0x03]);
}
#[test]
fn test_first_and_last_keys() {
let mut writer = SummaryWriter::new(128);
let first_key_bytes = vec![0x01, 0x02];
let middle_key_bytes = vec![0x03, 0x04];
let last_key_bytes = vec![0x05, 0x06];
let key1 = DecoratedKey::new(100, first_key_bytes.clone());
let key2 = DecoratedKey::new(200, middle_key_bytes.clone());
let key3 = DecoratedKey::new(300, last_key_bytes.clone());
writer.note_partition(&key1);
writer.add_entry(&key1, 0).unwrap();
writer.note_partition(&key2);
writer.add_entry(&key2, 1024).unwrap();
writer.note_partition(&key3);
writer.add_entry(&key3, 2048).unwrap();
let bytes = writer.finish().unwrap();
let first_key_start = 66;
assert_eq!(
&bytes[first_key_start..first_key_start + 4],
&[0x00, 0x00, 0x00, 0x02]
); assert_eq!(
&bytes[first_key_start + 4..first_key_start + 6],
&first_key_bytes[..]
);
let last_key_start = first_key_start + 6;
assert_eq!(
&bytes[last_key_start..last_key_start + 4],
&[0x00, 0x00, 0x00, 0x02]
); assert_eq!(
&bytes[last_key_start + 4..last_key_start + 6],
&last_key_bytes[..]
);
}
#[test]
fn test_empty_summary() {
let writer = SummaryWriter::new(128);
let bytes = writer.finish().unwrap();
assert_eq!(bytes.len(), 24);
assert_eq!(&bytes[4..8], &[0x00, 0x00, 0x00, 0x00]);
}
#[test]
fn test_large_position_value() {
let mut writer = SummaryWriter::new(128);
let key = DecoratedKey::new(12345, vec![0xFF]);
writer.note_partition(&key);
writer.add_entry(&key, 1_073_741_824).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(
&bytes[29..37],
&[0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00]
);
}
#[test]
fn test_position_encoding() {
let mut writer = SummaryWriter::new(128);
let key = DecoratedKey::new(12345, vec![0x01]);
writer.note_partition(&key);
writer.add_entry(&key, 12381).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(
&bytes[29..37],
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x30, 0x5D]
);
}
#[test]
fn test_hex_dump_verification() {
let mut writer = SummaryWriter::new(128);
let key = DecoratedKey::new(12345, vec![0x01, 0x02, 0x03, 0x04]);
writer.note_partition(&key);
writer.add_entry(&key, 0).unwrap();
let bytes = writer.finish().unwrap();
println!("\nSummary.db hex dump:");
for (i, chunk) in bytes.chunks(16).enumerate() {
print!("{:08x}: ", i * 16);
for byte in chunk {
print!("{:02x} ", byte);
}
println!();
}
assert_eq!(
&bytes[0..2],
&[0x00, 0x00],
"Header should start with 0x0000"
);
}
#[test]
fn test_custom_min_index_interval() {
let writer = SummaryWriter::new(64);
assert_eq!(writer.min_index_interval, 64);
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[0..4], &[0x00, 0x00, 0x00, 0x40]);
assert_eq!(&bytes[16..20], &[0x00, 0x00, 0x00, 0x80]); }
#[test]
fn test_token_order_preservation() {
let mut writer = SummaryWriter::new(128);
let key1 = DecoratedKey::new(-5000000000, vec![0x01]);
let key2 = DecoratedKey::new(0, vec![0x02]);
let key3 = DecoratedKey::new(5000000000, vec![0x03]);
writer.note_partition(&key1);
writer.add_entry(&key1, 0).unwrap();
writer.note_partition(&key2);
writer.add_entry(&key2, 1000).unwrap();
writer.note_partition(&key3);
writer.add_entry(&key3, 2000).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[4..8], &[0x00, 0x00, 0x00, 0x03]);
}
#[test]
fn test_variable_key_sizes() {
let mut writer = SummaryWriter::new(128);
let key1 = DecoratedKey::new(100, vec![0x01]); let key2 = DecoratedKey::new(200, vec![0x02, 0x03]); let key3 = DecoratedKey::new(300, vec![0x04, 0x05, 0x06, 0x07]);
writer.note_partition(&key1);
writer.add_entry(&key1, 0).unwrap();
writer.note_partition(&key2);
writer.add_entry(&key2, 100).unwrap();
writer.note_partition(&key3);
writer.add_entry(&key3, 200).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[24..28], &[0x0C, 0x00, 0x00, 0x00]);
assert_eq!(&bytes[28..32], &[0x15, 0x00, 0x00, 0x00]);
assert_eq!(&bytes[32..36], &[0x1F, 0x00, 0x00, 0x00]);
}
#[test]
fn test_large_key() {
let mut writer = SummaryWriter::new(128);
let large_key = vec![0xAB; 256];
let key = DecoratedKey::new(12345, large_key.clone());
writer.note_partition(&key);
writer.add_entry(&key, 0).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[28..28 + 256], &large_key[..]);
}
#[test]
fn test_realistic_scenario() {
let mut writer = SummaryWriter::new(128);
let key0 = DecoratedKey::new(-5000000000, vec![0x00, 0x00, 0x03, 0xE9]); let key128 = DecoratedKey::new(-1000000000, vec![0x00, 0x00, 0x03, 0xEA]); let key256 = DecoratedKey::new(3000000000, vec![0x00, 0x00, 0x03, 0xEB]);
writer.note_partition(&key0);
writer.add_entry(&key0, 0).unwrap();
writer.note_partition(&key128);
writer.add_entry(&key128, 25600).unwrap(); writer.note_partition(&key256);
writer.add_entry(&key256, 51200).unwrap();
assert_eq!(writer.entry_count(), 3);
let bytes = writer.finish().unwrap();
assert!(bytes.len() > 24);
assert_eq!(&bytes[4..8], &[0x00, 0x00, 0x00, 0x03]);
}
#[test]
fn test_summary_entries_size_calculation() {
let mut writer = SummaryWriter::new(128);
let key1 = DecoratedKey::new(100, vec![0x01, 0x02]); let key2 = DecoratedKey::new(200, vec![0x03, 0x04]);
writer.note_partition(&key1);
writer.add_entry(&key1, 0).unwrap();
writer.note_partition(&key2);
writer.add_entry(&key2, 1024).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(
&bytes[8..16],
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1C]
);
}
#[test]
fn test_16_byte_key() {
let mut writer = SummaryWriter::new(128);
let key_bytes: [u8; 16] = [
0xdc, 0x67, 0x26, 0xa6, 0x05, 0xc6, 0x48, 0x50, 0x86, 0xcd, 0x0f, 0xe3, 0x1b, 0x67,
0x57, 0xaf,
];
let key = DecoratedKey::new(12345, key_bytes.to_vec());
writer.note_partition(&key);
writer.add_entry(&key, 0).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[28..44], &key_bytes[..]);
}
#[test]
fn test_default_min_index_interval() {
let writer = SummaryWriter::default();
assert_eq!(writer.min_index_interval, 128);
}
#[test]
fn issue_666_offset_table_absolute_not_relative() {
{
let mut writer = SummaryWriter::new(128);
let key = DecoratedKey::new(12345, vec![0u8; 16]);
writer.note_partition(&key);
writer.add_entry(&key, 0).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(
&bytes[24..28],
&[0x04, 0x00, 0x00, 0x00],
"Issue #666: single-entry offset must be 4 (absolute), not 0 (relative)"
);
}
{
let mut writer = SummaryWriter::new(128);
for i in 0u8..8 {
let key = DecoratedKey::new(i as i64 * 1000, vec![i; 16]);
writer.note_partition(&key);
writer.add_entry(&key, i as u64 * 1024).unwrap();
}
let bytes = writer.finish().unwrap();
let expected_offsets: &[u32] = &[32, 56, 80, 104, 128, 152, 176, 200];
for (idx, &expected) in expected_offsets.iter().enumerate() {
let offset_pos = 24 + idx * 4;
let actual = u32::from_le_bytes([
bytes[offset_pos],
bytes[offset_pos + 1],
bytes[offset_pos + 2],
bytes[offset_pos + 3],
]);
assert_eq!(
actual, expected,
"Issue #666: offset[{idx}] = {actual}, want {expected} (absolute)"
);
}
}
{
use crate::storage::sstable::summary_reader::parse_summary_header;
use nom::error::Error as NomError;
use nom::multi::count;
use nom::number::complete::le_u32;
let mut writer = SummaryWriter::new(128);
let key_bytes = vec![0xAB; 4];
let key = DecoratedKey::new(42, key_bytes.clone());
writer.note_partition(&key);
writer.add_entry(&key, 99).unwrap();
let bytes = writer.finish().unwrap();
let (after_header, header) = parse_summary_header(&bytes).unwrap();
assert_eq!(header.entries_count, 1);
assert_eq!(header.summary_entries_size, 16);
let (_, offsets) = count(le_u32::<_, NomError<_>>, 1usize)(after_header).unwrap();
assert_eq!(
offsets[0], 4,
"Issue #666: writer must emit absolute offset 4, not 0"
);
}
}
#[test]
fn issue_666_first_last_keys_cover_all_partitions() {
let mut writer = SummaryWriter::new(128);
let key_first = DecoratedKey::new(100, vec![0x01; 16]); let key_mid = DecoratedKey::new(200, vec![0x02; 16]);
let key_last = DecoratedKey::new(300, vec![0x03; 16]);
writer.note_partition(&key_first);
writer.add_entry(&key_first, 0).unwrap();
writer.note_partition(&key_mid);
writer.note_partition(&key_last);
let bytes = writer.finish().unwrap();
assert_eq!(
&bytes[4..8],
&[0x00, 0x00, 0x00, 0x01],
"Issue #666: entries_count must be 1 (only sampled entry)"
);
let first_key_len_pos = 52;
let first_key_len = u32::from_be_bytes(
bytes[first_key_len_pos..first_key_len_pos + 4]
.try_into()
.unwrap(),
);
assert_eq!(first_key_len, 16, "Issue #666: first_key len must be 16");
assert_eq!(
&bytes[first_key_len_pos + 4..first_key_len_pos + 20],
&[0x01; 16],
"Issue #666: first_key must be key_first"
);
let last_key_len_pos = first_key_len_pos + 20; let last_key_len = u32::from_be_bytes(
bytes[last_key_len_pos..last_key_len_pos + 4]
.try_into()
.unwrap(),
);
assert_eq!(last_key_len, 16, "Issue #666: last_key len must be 16");
assert_eq!(
&bytes[last_key_len_pos + 4..last_key_len_pos + 20],
&[0x03; 16],
"Issue #666: last_key must be key_last (all 3 partitions visible)"
);
}
}