use bytes::{BufMut, Bytes, BytesMut};
use std::io;
use crate::row::RowDecoder;
use crate::row::compacted::CompactedRow;
use crate::util::varint::{
read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint_buf,
};
pub const LENGTH_LENGTH: usize = 4;
#[derive(Debug, Clone)]
pub struct KvRecord {
key: Bytes,
value_bytes: Option<Bytes>,
size_in_bytes: usize,
}
impl KvRecord {
pub fn key(&self) -> &Bytes {
&self.key
}
#[cfg(test)]
pub(crate) fn value_bytes(&self) -> Option<&Bytes> {
self.value_bytes.as_ref()
}
pub fn row<'a>(&'a self, decoder: &dyn RowDecoder) -> Option<CompactedRow<'a>> {
self.value_bytes.as_ref().map(|bytes| {
decoder.decode(bytes.as_ref())
})
}
pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize {
Self::size_without_length(key, value) + LENGTH_LENGTH
}
fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize {
let key_len = key.len();
let key_len_size = size_of_unsigned_varint(key_len as u32);
match value {
Some(v) => key_len_size.saturating_add(key_len).saturating_add(v.len()),
None => {
key_len_size.saturating_add(key_len)
}
}
}
pub fn write_to_buf(buf: &mut BytesMut, key: &[u8], value: Option<&[u8]>) -> io::Result<usize> {
let size_in_bytes = Self::size_without_length(key, value);
let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Record size {size_in_bytes} exceeds i32::MAX"),
)
})?;
buf.put_i32_le(size_i32);
let key_len = key.len() as u32;
write_unsigned_varint_buf(key_len, buf);
buf.put_slice(key);
if let Some(v) = value {
buf.put_slice(v);
}
Ok(size_in_bytes + LENGTH_LENGTH)
}
pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, usize)> {
if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Not enough bytes to read record length",
));
}
let size_in_bytes_i32 = i32::from_le_bytes([
bytes[position],
bytes[position + 1],
bytes[position + 2],
bytes[position + 3],
]);
if size_in_bytes_i32 < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid record length: {size_in_bytes_i32}"),
));
}
let size_in_bytes = size_in_bytes_i32 as usize;
let total_size = size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Record size overflow: {size_in_bytes} + {LENGTH_LENGTH}"),
)
})?;
let available = bytes.len().saturating_sub(position);
if available < total_size {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"Not enough bytes to read record: expected {total_size}, available {available}"
),
));
}
let mut current_offset = position + LENGTH_LENGTH;
let record_end = position + total_size;
let (key_len, varint_size) =
read_unsigned_varint_bytes(&bytes[current_offset..record_end])?;
current_offset += varint_size;
let key_end = current_offset + key_len as usize;
if key_end > position + total_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Key length exceeds record size",
));
}
let key = bytes.slice(current_offset..key_end);
current_offset = key_end;
let value_bytes = if current_offset < record_end {
Some(bytes.slice(current_offset..record_end))
} else {
None
};
Ok((
Self {
key,
value_bytes,
size_in_bytes: total_size,
},
total_size,
))
}
pub fn get_size_in_bytes(&self) -> usize {
self.size_in_bytes
}
pub fn is_deletion(&self) -> bool {
self.value_bytes.is_none()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kv_record_basic_operations() {
let key = b"test_key";
let value = b"test_value";
let size_with_value = KvRecord::size_of(key, Some(value));
assert_eq!(
size_with_value,
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() + value.len()
);
let size_without_value = KvRecord::size_of(key, None);
assert_eq!(
size_without_value,
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len()
);
let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, key, Some(value)).unwrap();
let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
assert_eq!(written, read_size);
assert_eq!(record.key().as_ref(), key);
assert_eq!(record.value_bytes().unwrap().as_ref(), value);
assert_eq!(record.get_size_in_bytes(), written);
assert!(!record.is_deletion());
let delete_key = b"delete_me";
let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, delete_key, None).unwrap();
let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
assert_eq!(written, read_size);
assert_eq!(record.key().as_ref(), delete_key);
assert!(record.is_deletion());
assert!(record.value_bytes().is_none());
}
#[test]
fn test_kv_record_multiple_records() {
let records = vec![
(b"key1".as_slice(), Some(b"value1".as_slice())),
(b"key2".as_slice(), None), (b"key3".as_slice(), Some(b"value3".as_slice())),
];
let mut buf = BytesMut::new();
for (key, value) in &records {
KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
}
let bytes = buf.freeze();
let mut offset = 0;
for (expected_key, expected_value) in &records {
let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
assert_eq!(record.key().as_ref(), *expected_key);
match expected_value {
Some(v) => {
assert_eq!(record.value_bytes().unwrap().as_ref(), *v);
assert!(!record.is_deletion());
}
None => {
assert!(record.is_deletion());
assert!(record.value_bytes().is_none());
}
}
offset += size;
}
assert_eq!(offset, bytes.len());
let large_key = vec![0u8; 1024];
let large_value = vec![1u8; 4096];
let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, &large_key, Some(&large_value)).unwrap();
let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
assert_eq!(written, read_size);
assert_eq!(record.key().len(), large_key.len());
assert_eq!(record.value_bytes().unwrap().len(), large_value.len());
}
#[test]
fn test_invalid_record_lengths() {
let mut buf = BytesMut::new();
buf.put_i32_le(-1); buf.put_u8(1); buf.put_slice(b"key");
let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(e.kind(), io::ErrorKind::InvalidData);
}
let mut buf = BytesMut::new();
buf.put_i32_le(i32::MAX); buf.put_u8(1); let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
let mut buf = BytesMut::new();
buf.put_i32_le(1_000_000);
let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
}
}
}