use crate::error::{Error, Result};
use crate::r#type::{Column, Key, RefColumn, RefKey, RefValue, Value, ValueType};
use bytes::{Buf, BufMut, Bytes, BytesMut};
pub(crate) fn encode_value_type(vt: &ValueType) -> u8 {
vt.encode_tag()
}
pub(crate) fn decode_value_type(byte: u8) -> Result<ValueType> {
ValueType::decode_tag(byte)
}
trait ColumnRef {
fn value_type(&self) -> &ValueType;
fn data(&self) -> &[u8];
}
impl ColumnRef for Column {
fn value_type(&self) -> &ValueType {
self.value_type()
}
fn data(&self) -> &[u8] {
self.data()
}
}
impl<'a> ColumnRef for RefColumn<'a> {
fn value_type(&self) -> &ValueType {
self.value_type()
}
fn data(&self) -> &[u8] {
self.data()
}
}
pub(crate) fn encode_key(key: &Key) -> Bytes {
let size = 2 + key.data().len();
let mut buf = BytesMut::with_capacity(size);
encode_key_ref_into(&RefKey::new(key.bucket(), key.data()), &mut buf);
buf.freeze()
}
pub(crate) fn encode_key_ref_into(key: &RefKey<'_>, buf: &mut impl BufMut) {
buf.put_u16_le(key.bucket());
buf.put_slice(key.data());
}
pub(crate) fn decode_key(data: &mut Bytes) -> Result<Key> {
if data.len() < 2 {
return Err(Error::IoError(format!(
"Key data too small: expected at least 2 bytes, got {}",
data.len()
)));
}
let group = data.get_u16_le();
let key_data = data.split_to(data.len());
Ok(Key::new(group, key_data))
}
pub(crate) fn key_encoded_size(key: &Key) -> usize {
2 + key.data().len()
}
fn bitmap_size(num_columns: usize) -> usize {
if num_columns <= 1 {
0
} else {
num_columns.div_ceil(8)
}
}
fn value_type_is_terminal(byte: u8) -> Result<bool> {
Ok(ValueType::decode_tag(byte)?.is_terminal())
}
pub(crate) fn value_expired_at(data: &[u8]) -> Result<Option<u32>> {
if data.len() < 4 {
return Err(Error::IoError(format!(
"Value data too small: expected at least 4 bytes for expired_at, got {}",
data.len()
)));
}
let mut buf = data;
let expired_at = buf.get_u32_le();
if expired_at == 0 {
Ok(None)
} else {
Ok(Some(expired_at))
}
}
pub(crate) fn value_is_terminal(data: &[u8], num_columns: usize) -> Result<bool> {
if data.len() < 4 {
return Err(Error::IoError(format!(
"Value data too small: expected at least 4 bytes for expired_at, got {}",
data.len()
)));
}
let mut buf = &data[4..];
let bmp_size = bitmap_size(num_columns);
if buf.len() < bmp_size {
return Err(Error::IoError(format!(
"Value data too small: expected at least {} bytes for bitmap, got {}",
bmp_size,
buf.len()
)));
}
let bitmap = &buf[..bmp_size];
buf = &buf[bmp_size..];
if num_columns > 1 {
for i in 0..num_columns {
let is_present = (bitmap[i / 8] >> (i % 8)) & 1 == 1;
if !is_present {
return Ok(false);
}
}
}
for i in 0..num_columns {
if buf.remaining() < 1 {
return Err(Error::IoError(format!(
"Column {} data corrupted: not enough bytes for value_type",
i
)));
}
let value_type = buf.get_u8();
if !value_type_is_terminal(value_type)? {
return Ok(false);
}
if i < num_columns.saturating_sub(1) {
if buf.remaining() < 4 {
return Err(Error::IoError(format!(
"Column {} data corrupted: not enough bytes for length",
i
)));
}
let data_len = buf.get_u32_le() as usize;
if buf.remaining() < data_len {
return Err(Error::IoError(format!(
"Column {} data corrupted: expected {} bytes, got {}",
i,
data_len,
buf.remaining()
)));
}
buf = &buf[data_len..];
} else {
buf = &buf[buf.len()..];
}
}
Ok(true)
}
pub(crate) fn encode_value(value: &Value, num_columns: usize) -> Bytes {
let total_size = value_encoded_size_columns(value.columns(), num_columns);
let mut buf = BytesMut::with_capacity(total_size);
encode_value_columns_into(value.columns(), value.expired_at(), num_columns, &mut buf);
buf.freeze()
}
pub(crate) fn encode_value_ref_into(
value: &RefValue<'_>,
num_columns: usize,
buf: &mut impl BufMut,
) {
encode_value_columns_into(value.columns(), value.expired_at(), num_columns, buf);
}
fn encode_value_columns_into<C: ColumnRef>(
columns: &[Option<C>],
expired_at: Option<u32>,
num_columns: usize,
buf: &mut impl BufMut,
) {
let bmp_size = bitmap_size(num_columns);
let present_count = columns
.iter()
.take(num_columns)
.filter(|c| c.is_some())
.count();
buf.put_u32_le(expired_at.unwrap_or(0));
if bmp_size > 0 {
let mut bitmap = vec![0u8; bmp_size];
for (i, col_opt) in columns.iter().take(num_columns).enumerate() {
if col_opt.is_some() {
bitmap[i / 8] |= 1 << (i % 8);
}
}
buf.put_slice(&bitmap);
}
let mut present_idx = 0;
for col in columns.iter().take(num_columns).flatten() {
present_idx += 1;
buf.put_u8(encode_value_type(col.value_type()));
if present_idx < present_count {
buf.put_u32_le(col.data().len() as u32);
}
buf.put_slice(col.data());
}
}
pub(crate) fn decode_value(data: &mut Bytes, num_columns: usize) -> Result<Value> {
if data.len() < 4 {
return Err(Error::IoError(format!(
"Value data too small: expected at least 4 bytes for expired_at, got {}",
data.len()
)));
}
let expired_at = data.get_u32_le();
let bmp_size = bitmap_size(num_columns);
if data.len() < bmp_size {
return Err(Error::IoError(format!(
"Value data too small: expected at least {} bytes for bitmap, got {}",
bmp_size,
data.len()
)));
}
let bitmap = data.split_to(bmp_size);
let bitmap = bitmap.as_ref();
let mut presence = Vec::with_capacity(num_columns);
let mut last_present_idx = None;
for i in 0..num_columns {
let is_present = if num_columns == 1 {
true
} else {
(bitmap[i / 8] >> (i % 8)) & 1 == 1
};
presence.push(is_present);
if is_present {
last_present_idx = Some(i);
}
}
let mut columns = Vec::with_capacity(num_columns);
for (i, is_presence) in presence.iter().enumerate().take(num_columns) {
if *is_presence {
let is_last = Some(i) == last_present_idx;
if is_last {
if data.remaining() < 1 {
return Err(Error::IoError(format!(
"Column {} data corrupted: not enough bytes for value_type",
i
)));
}
let value_type = decode_value_type(data.get_u8())?;
let col_data = data.split_to(data.len());
columns.push(Some(Column::new(value_type, col_data)));
} else {
if data.remaining() < 5 {
return Err(Error::IoError(format!(
"Column {} data corrupted: not enough bytes",
i
)));
}
let value_type = decode_value_type(data.get_u8())?;
let data_len = data.get_u32_le() as usize;
if data.remaining() < data_len {
return Err(Error::IoError(format!(
"Column {} data corrupted: expected {} bytes, got {}",
i,
data_len,
data.remaining()
)));
}
let col_data = data.split_to(data_len);
columns.push(Some(Column::new(value_type, col_data)));
}
} else {
columns.push(None);
}
}
let expired_at = if expired_at == 0 {
None
} else {
Some(expired_at)
};
Ok(Value::new_with_expired_at(columns, expired_at))
}
pub(crate) fn decode_value_masked(
data: &mut Bytes,
num_columns: usize,
decode_mask: &[u8],
mut terminal_mask: Option<&mut [u8]>,
) -> Result<Value> {
if data.len() < 4 {
return Err(Error::IoError(format!(
"Value data too small: expected at least 4 bytes for expired_at, got {}",
data.len()
)));
}
let expired_at = data.get_u32_le();
let mask_size = bitmap_size(num_columns).max(1);
if decode_mask.len() < mask_size {
return Err(Error::IoError(format!(
"decode_mask length {} is less than required {}",
decode_mask.len(),
mask_size
)));
}
let bmp_size = bitmap_size(num_columns);
if data.len() < bmp_size {
return Err(Error::IoError(format!(
"Value data too small: expected at least {} bytes for bitmap, got {}",
bmp_size,
data.len()
)));
}
let bitmap = data.split_to(bmp_size);
let bitmap = bitmap.as_ref();
let mut last_present_idx = None;
if num_columns == 1 {
last_present_idx = Some(0);
} else if bmp_size > 0 {
let last_byte_bits = (num_columns - 1) % 8 + 1;
let last_byte_mask = (1u8 << last_byte_bits) - 1;
for byte_idx in (0..bmp_size).rev() {
let mut byte = bitmap[byte_idx];
if byte_idx == bmp_size - 1 {
byte &= last_byte_mask;
}
if byte == 0 {
continue;
}
let leading = byte.leading_zeros() as usize;
let bit = 7 - leading;
last_present_idx = Some(byte_idx * 8 + bit);
break;
}
}
let mut columns = Vec::with_capacity(num_columns);
for i in 0..num_columns {
let is_presence = if num_columns == 1 {
true
} else {
(bitmap[i / 8] >> (i % 8)) & 1 == 1
};
if is_presence {
let is_last = Some(i) == last_present_idx;
if data.remaining() < 1 {
return Err(Error::IoError(format!(
"Column {} data corrupted: not enough bytes for value_type",
i
)));
}
let value_type = decode_value_type(data.get_u8())?;
if let Some(ref mut mask) = terminal_mask
&& value_type.is_terminal()
&& let Some(byte) = mask.get_mut(i / 8)
{
*byte |= 1 << (i % 8);
}
if is_last {
if decode_mask[i / 8] & (1 << (i % 8)) != 0 {
let col_data = data.split_to(data.len());
columns.push(Some(Column::new(value_type, col_data)));
} else {
data.advance(data.len());
columns.push(None);
}
} else {
if data.remaining() < 4 {
return Err(Error::IoError(format!(
"Column {} data corrupted: not enough bytes for length",
i
)));
}
let data_len = data.get_u32_le() as usize;
if data.remaining() < data_len {
return Err(Error::IoError(format!(
"Column {} data corrupted: expected {} bytes, got {}",
i,
data_len,
data.remaining()
)));
}
if decode_mask[i / 8] & (1 << (i % 8)) != 0 {
let col_data = data.split_to(data_len);
columns.push(Some(Column::new(value_type, col_data)));
} else {
data.advance(data_len);
columns.push(None);
}
}
} else {
columns.push(None);
}
}
let expired_at = if expired_at == 0 {
None
} else {
Some(expired_at)
};
Ok(Value::new_with_expired_at(columns, expired_at))
}
pub(crate) fn value_encoded_size(value: &Value, num_columns: usize) -> usize {
value_encoded_size_columns(value.columns(), num_columns)
}
fn value_encoded_size_columns<C: ColumnRef>(columns: &[Option<C>], num_columns: usize) -> usize {
let bmp_size = bitmap_size(num_columns);
let present_count = columns
.iter()
.take(num_columns)
.filter(|c| c.is_some())
.count();
let mut size = 4 + bmp_size;
for col in columns.iter().take(num_columns).flatten() {
size += 1 + 4 + col.data().len(); }
if present_count > 0 {
size -= 4;
}
size
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encode_decode_key() {
let key = Key::new(42, b"hello world".to_vec());
let encoded = encode_key(&key);
assert_eq!(encoded.len(), 13);
assert_eq!(key_encoded_size(&key), 13);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_key(&mut encoded_for_decode).unwrap();
assert_eq!(decoded.bucket(), 42);
assert_eq!(decoded.data().as_ref(), b"hello world");
}
#[test]
fn test_key_empty_data() {
let key = Key::new(0, Vec::new());
let encoded = encode_key(&key);
assert_eq!(encoded.len(), 2);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_key(&mut encoded_for_decode).unwrap();
assert_eq!(decoded.bucket(), 0);
assert_eq!(decoded.data().as_ref(), b"");
}
#[test]
fn test_key_max_group() {
let key = Key::new(u16::MAX, b"test".to_vec());
let encoded = encode_key(&key);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_key(&mut encoded_for_decode).unwrap();
assert_eq!(decoded.bucket(), u16::MAX);
assert_eq!(decoded.data().as_ref(), b"test");
}
#[test]
fn test_key_decode_too_small() {
let mut encoded = Bytes::from_static(&[0]);
let result = decode_key(&mut encoded);
assert!(result.is_err());
}
#[test]
fn test_value_type_encode_decode() {
assert_eq!(encode_value_type(&ValueType::Put), 0b0000_0001);
assert_eq!(encode_value_type(&ValueType::Delete), 0b0001_0001);
assert_eq!(encode_value_type(&ValueType::Merge), 0b0000_0010);
assert_eq!(encode_value_type(&ValueType::PutSeparated), 0b0000_0101);
assert_eq!(encode_value_type(&ValueType::MergeSeparated), 0b0000_0110);
assert_eq!(
encode_value_type(&ValueType::MergeSeparatedArray),
0b0000_1110
);
assert_eq!(
encode_value_type(&ValueType::PutSeparatedArray),
0b0000_1111
);
assert!(matches!(
decode_value_type(0b0000_0001).unwrap(),
ValueType::Put
));
assert!(matches!(
decode_value_type(0b0001_0001).unwrap(),
ValueType::Delete
));
assert!(matches!(
decode_value_type(0b0000_0010).unwrap(),
ValueType::Merge
));
assert!(matches!(
decode_value_type(0b0000_0101).unwrap(),
ValueType::PutSeparated
));
assert!(matches!(
decode_value_type(0b0000_0110).unwrap(),
ValueType::MergeSeparated
));
assert!(matches!(
decode_value_type(0b0000_1110).unwrap(),
ValueType::MergeSeparatedArray
));
assert!(matches!(
decode_value_type(0b0000_1111).unwrap(),
ValueType::PutSeparatedArray
));
}
#[test]
fn test_value_type_decode_invalid() {
assert!(decode_value_type(0).is_err());
assert!(decode_value_type(0b0000_0011).is_err());
assert!(decode_value_type(255).is_err());
}
#[test]
fn test_encode_decode_value_all_present() {
let col1 = Column::new(ValueType::Put, b"data1".to_vec());
let col2 = Column::new(ValueType::Delete, b"data2".to_vec());
let value = Value::new(vec![Some(col1), Some(col2)]);
let encoded = encode_value(&value, 2);
assert_eq!(encoded.len(), 21);
assert_eq!(value_encoded_size(&value, 2), 21);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_value(&mut encoded_for_decode, 2).unwrap();
let cols = decoded.columns();
assert_eq!(cols.len(), 2);
assert!(cols[0].is_some());
let c0 = cols[0].as_ref().unwrap();
assert!(matches!(c0.value_type(), ValueType::Put));
assert_eq!(c0.data().as_ref(), b"data1");
assert!(cols[1].is_some());
let c1 = cols[1].as_ref().unwrap();
assert!(matches!(c1.value_type(), ValueType::Delete));
assert_eq!(c1.data().as_ref(), b"data2");
}
#[test]
fn test_encode_decode_value_with_optional() {
let col1 = Column::new(ValueType::Put, b"present".to_vec());
let value = Value::new(vec![Some(col1), None, None]);
let encoded = encode_value(&value, 3);
assert_eq!(encoded.len(), 13);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_value(&mut encoded_for_decode, 3).unwrap();
let cols = decoded.columns();
assert_eq!(cols.len(), 3);
assert!(cols[0].is_some());
assert_eq!(cols[0].as_ref().unwrap().data().as_ref(), b"present");
assert!(cols[1].is_none());
assert!(cols[2].is_none());
}
#[test]
fn test_encode_decode_value_all_absent() {
let value = Value::new(vec![None, None, None, None]);
let encoded = encode_value(&value, 4);
assert_eq!(encoded.len(), 5);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_value(&mut encoded_for_decode, 4).unwrap();
let cols = decoded.columns();
assert_eq!(cols.len(), 4);
assert!(cols.iter().all(|c| c.is_none()));
}
#[test]
fn test_encode_decode_value_many_columns() {
let col = Column::new(ValueType::Merge, b"x".to_vec());
let mut columns: Vec<Option<Column>> = vec![None; 16];
columns[0] = Some(col.clone());
columns[8] = Some(col.clone());
columns[15] = Some(col);
let value = Value::new(columns);
let encoded = encode_value(&value, 16);
assert_eq!(encoded.len(), 20);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_value(&mut encoded_for_decode, 16).unwrap();
let cols = decoded.columns();
assert_eq!(cols.len(), 16);
assert!(cols[0].is_some());
assert!(cols[1].is_none());
assert!(cols[8].is_some());
assert!(cols[15].is_some());
}
#[test]
fn test_bitmap_size() {
assert_eq!(bitmap_size(0), 0);
assert_eq!(bitmap_size(1), 0); assert_eq!(bitmap_size(2), 1);
assert_eq!(bitmap_size(8), 1);
assert_eq!(bitmap_size(9), 2);
assert_eq!(bitmap_size(16), 2);
assert_eq!(bitmap_size(17), 3);
}
#[test]
fn test_value_decode_too_small() {
let mut encoded = Bytes::new();
let result = decode_value(&mut encoded, 2);
assert!(result.is_err());
}
#[test]
fn test_single_column_no_bitmap() {
let col = Column::new(ValueType::Put, b"single".to_vec());
let value = Value::new(vec![Some(col)]);
let encoded = encode_value(&value, 1);
assert_eq!(encoded.len(), 11);
assert_eq!(value_encoded_size(&value, 1), 11);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_value(&mut encoded_for_decode, 1).unwrap();
let cols = decoded.columns();
assert_eq!(cols.len(), 1);
assert!(cols[0].is_some());
assert_eq!(cols[0].as_ref().unwrap().data().as_ref(), b"single");
}
#[test]
fn test_large_data() {
let large_data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
let key = Key::new(1234, large_data.clone());
let encoded_key = encode_key(&key);
let mut encoded_key_for_decode = encoded_key.clone();
let decoded_key = decode_key(&mut encoded_key_for_decode).unwrap();
assert_eq!(decoded_key.bucket(), 1234);
assert_eq!(decoded_key.data().as_ref(), large_data.as_slice());
let col = Column::new(ValueType::Put, large_data.clone());
let value = Value::new(vec![Some(col)]);
let encoded = encode_value(&value, 1);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_value(&mut encoded_for_decode, 1).unwrap();
let cols = decoded.columns();
assert!(cols[0].is_some());
assert_eq!(
cols[0].as_ref().unwrap().data().as_ref(),
large_data.as_slice()
);
}
#[test]
fn test_binary_data_with_nulls() {
let binary_data = vec![0u8, 1, 0, 255, 0, 128, 0];
let key = Key::new(100, binary_data.clone());
let encoded = encode_key(&key);
let mut encoded_for_decode = encoded.clone();
let decoded = decode_key(&mut encoded_for_decode).unwrap();
assert_eq!(decoded.data().as_ref(), binary_data.as_slice());
}
#[test]
#[serial_test::serial(file)]
fn test_sst_key_value_codec() {
use crate::file::FileSystemRegistry;
use crate::sst::iterator::{SSTIterator, SSTIteratorOptions};
use crate::sst::writer::{SSTWriter, SSTWriterOptions};
let _ = std::fs::remove_dir_all("/tmp/sst_row_codec_test");
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register("file:///tmp/sst_row_codec_test".to_string())
.unwrap();
let num_columns = 2;
let key1 = Key::new(1, b"user:1".to_vec());
let value1 = Value::new(vec![
Some(Column::new(ValueType::Put, b"Alice".to_vec())),
Some(Column::new(ValueType::Put, b"alice@example.com".to_vec())),
]);
let key2 = Key::new(1, b"user:2".to_vec());
let value2 = Value::new(vec![
Some(Column::new(ValueType::Put, b"Bob".to_vec())),
None,
]);
let key3 = Key::new(2, b"order:100".to_vec());
let value3 = Value::new(vec![None, None]);
{
let writer_file = fs.open_write("codec_test.sst").unwrap();
let mut writer = SSTWriter::new(
writer_file,
SSTWriterOptions {
bloom_filter_enabled: true,
..SSTWriterOptions::default()
},
);
writer
.add(&encode_key(&key1), &encode_value(&value1, num_columns))
.unwrap();
writer
.add(&encode_key(&key2), &encode_value(&value2, num_columns))
.unwrap();
writer
.add(&encode_key(&key3), &encode_value(&value3, num_columns))
.unwrap();
writer.finish().unwrap();
}
{
let reader_file = fs.open_read("codec_test.sst").unwrap();
let mut iter = SSTIterator::with_cache(
reader_file,
0,
SSTIteratorOptions {
bloom_filter_enabled: true,
..SSTIteratorOptions::default()
},
None,
None,
)
.unwrap();
iter.seek_to_first().unwrap();
assert!(iter.valid());
let (mut key_bytes, mut value_bytes) = iter.current().unwrap().unwrap();
let decoded_key = decode_key(&mut key_bytes).unwrap();
let decoded_value = decode_value(&mut value_bytes, num_columns).unwrap();
let decoded_cols = decoded_value.columns();
assert_eq!(decoded_key.bucket(), 1);
assert_eq!(decoded_key.data().as_ref(), b"user:1");
assert_eq!(decoded_cols.len(), 2);
assert!(decoded_cols[0].is_some());
assert_eq!(decoded_cols[0].as_ref().unwrap().data().as_ref(), b"Alice");
assert!(decoded_cols[1].is_some());
assert_eq!(
decoded_cols[1].as_ref().unwrap().data().as_ref(),
b"alice@example.com"
);
iter.next().unwrap();
assert!(iter.valid());
let (mut key_bytes, mut value_bytes) = iter.current().unwrap().unwrap();
let decoded_key = decode_key(&mut key_bytes).unwrap();
let decoded_value = decode_value(&mut value_bytes, num_columns).unwrap();
let decoded_cols = decoded_value.columns();
assert_eq!(decoded_key.bucket(), 1);
assert_eq!(decoded_key.data().as_ref(), b"user:2");
assert!(decoded_cols[0].is_some());
assert_eq!(decoded_cols[0].as_ref().unwrap().data().as_ref(), b"Bob");
assert!(decoded_cols[1].is_none());
iter.next().unwrap();
assert!(iter.valid());
let (mut key_bytes, mut value_bytes) = iter.current().unwrap().unwrap();
let decoded_key = decode_key(&mut key_bytes).unwrap();
let decoded_value = decode_value(&mut value_bytes, num_columns).unwrap();
let decoded_cols = decoded_value.columns();
assert_eq!(decoded_key.bucket(), 2);
assert_eq!(decoded_key.data().as_ref(), b"order:100");
assert!(decoded_cols[0].is_none());
assert!(decoded_cols[1].is_none());
iter.next().unwrap();
assert!(!iter.valid());
}
let _ = std::fs::remove_dir_all("/tmp/sst_row_codec_test");
}
#[test]
#[serial_test::serial(file)]
fn test_sst_key_value_codec_seek() {
use crate::file::FileSystemRegistry;
use crate::sst::iterator::{SSTIterator, SSTIteratorOptions};
use crate::sst::writer::{SSTWriter, SSTWriterOptions};
let _ = std::fs::remove_dir_all("/tmp/sst_row_codec_seek_test");
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register("file:///tmp/sst_row_codec_seek_test".to_string())
.unwrap();
let num_columns = 1;
let key1 = Key::new(1, b"aaa".to_vec());
let key2 = Key::new(1, b"bbb".to_vec());
let key3 = Key::new(2, b"aaa".to_vec());
let value = Value::new(vec![Some(Column::new(ValueType::Put, b"test".to_vec()))]);
{
let writer_file = fs.open_write("codec_seek_test.sst").unwrap();
let mut writer = SSTWriter::new(
writer_file,
SSTWriterOptions {
bloom_filter_enabled: true,
..SSTWriterOptions::default()
},
);
writer
.add(&encode_key(&key1), &encode_value(&value, num_columns))
.unwrap();
writer
.add(&encode_key(&key2), &encode_value(&value, num_columns))
.unwrap();
writer
.add(&encode_key(&key3), &encode_value(&value, num_columns))
.unwrap();
writer.finish().unwrap();
}
{
let reader_file = fs.open_read("codec_seek_test.sst").unwrap();
let mut iter = SSTIterator::with_cache(
reader_file,
0,
SSTIteratorOptions {
bloom_filter_enabled: true,
..SSTIteratorOptions::default()
},
None,
None,
)
.unwrap();
let seek_key = Key::new(1, b"bbb".to_vec());
iter.seek(&encode_key(&seek_key)).unwrap();
assert!(iter.valid());
let (mut key_bytes, _) = iter.current().unwrap().unwrap();
let decoded_key = decode_key(&mut key_bytes).unwrap();
assert_eq!(decoded_key.bucket(), 1);
assert_eq!(decoded_key.data().as_ref(), b"bbb");
}
let _ = std::fs::remove_dir_all("/tmp/sst_row_codec_seek_test");
}
#[test]
#[serial_test::serial(file)]
fn test_sst_key_value_codec_multiple_blocks() {
use crate::file::FileSystemRegistry;
use crate::sst::iterator::{SSTIterator, SSTIteratorOptions};
use crate::sst::writer::{SSTWriter, SSTWriterOptions};
let _ = std::fs::remove_dir_all("/tmp/sst_row_codec_blocks_test");
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register("file:///tmp/sst_row_codec_blocks_test".to_string())
.unwrap();
let num_columns = 2;
let num_entries = 50;
{
let writer_file = fs.open_write("codec_blocks_test.sst").unwrap();
let mut writer = SSTWriter::new(
writer_file,
SSTWriterOptions {
metrics: None,
block_size: 200, buffer_size: 8192,
num_columns,
bloom_filter_enabled: true,
bloom_bits_per_key: 10,
partitioned_index: false,
compression: crate::SstCompressionAlgorithm::None,
},
);
for i in 0..num_entries {
let key = Key::new(i as u16, format!("key{:04}", i).into_bytes());
let col1 = Column::new(ValueType::Put, format!("val{:04}", i).into_bytes());
let col2 = Column::new(ValueType::Merge, b"extra".to_vec());
let value = if i % 2 == 0 {
Value::new(vec![Some(col1), Some(col2)])
} else {
Value::new(vec![Some(col1), None])
};
writer
.add(&encode_key(&key), &encode_value(&value, num_columns))
.unwrap();
}
writer.finish().unwrap();
}
{
let reader_file = fs.open_read("codec_blocks_test.sst").unwrap();
let mut iter = SSTIterator::with_cache(
reader_file,
0,
SSTIteratorOptions {
bloom_filter_enabled: true,
..SSTIteratorOptions::default()
},
None,
None,
)
.unwrap();
iter.seek_to_first().unwrap();
let mut count = 0;
while iter.valid() {
let (mut key_bytes, mut value_bytes) = iter.current().unwrap().unwrap();
let decoded_key = decode_key(&mut key_bytes).unwrap();
let decoded_value = decode_value(&mut value_bytes, num_columns).unwrap();
let decoded_cols = decoded_value.columns();
assert_eq!(decoded_key.bucket(), count as u16);
assert_eq!(
decoded_key.data().as_ref(),
format!("key{:04}", count).as_bytes()
);
assert!(decoded_cols[0].is_some());
assert_eq!(
decoded_cols[0].as_ref().unwrap().data().as_ref(),
format!("val{:04}", count).as_bytes()
);
if count % 2 == 0 {
assert!(decoded_cols[1].is_some());
} else {
assert!(decoded_cols[1].is_none());
}
count += 1;
iter.next().unwrap();
}
assert_eq!(count, num_entries);
}
let _ = std::fs::remove_dir_all("/tmp/sst_row_codec_blocks_test");
}
}