#![allow(dead_code)]
use std::ops::{Bound, Range};
use bytes::{BufMut, Bytes, BytesMut};
use common::BytesRange;
use common::serde::key_prefix::{KeyPrefix, RecordTag};
use common::serde::terminated_bytes;
use common::serde::varint::var_u64;
use crate::error::Error;
use crate::model::SegmentId;
use crate::segment::LogSegment;
impl From<common::serde::DeserializeError> for Error {
fn from(err: common::serde::DeserializeError) -> Self {
Error::Encoding(err.message)
}
}
pub const KEY_VERSION: u8 = 0x01;
pub const SEQ_BLOCK_KEY: [u8; 2] = [KEY_VERSION, 0x02];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecordType {
LogEntry = 0x01,
SeqBlock = 0x02,
SegmentMeta = 0x03,
ListingEntry = 0x04,
}
impl RecordType {
pub fn id(&self) -> u8 {
*self as u8
}
pub fn from_id(id: u8) -> Result<Self, Error> {
match id {
0x01 => Ok(RecordType::LogEntry),
0x02 => Ok(RecordType::SeqBlock),
0x03 => Ok(RecordType::SegmentMeta),
0x04 => Ok(RecordType::ListingEntry),
_ => Err(Error::Encoding(format!(
"invalid record type: 0x{:02x}",
id
))),
}
}
pub fn tag(&self) -> RecordTag {
RecordTag::new(self.id(), 0)
}
pub fn prefix(&self) -> KeyPrefix {
KeyPrefix::new(KEY_VERSION, self.tag())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogEntryKey {
pub segment_id: SegmentId,
pub key: Bytes,
pub sequence: u64,
}
impl LogEntryKey {
pub fn new(segment_id: SegmentId, key: Bytes, sequence: u64) -> Self {
Self {
segment_id,
key,
sequence,
}
}
pub fn serialize(&self, segment_start_seq: u64) -> Bytes {
let relative_seq = self.sequence - segment_start_seq;
let mut buf = BytesMut::new();
RecordType::LogEntry.prefix().write_to(&mut buf);
buf.put_u32(self.segment_id);
terminated_bytes::serialize(&self.key, &mut buf);
var_u64::serialize(relative_seq, &mut buf);
buf.freeze()
}
pub fn deserialize(data: &[u8], segment_start_seq: u64) -> Result<Self, Error> {
let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
let record_type = RecordType::from_id(prefix.tag().record_type())?;
if record_type != RecordType::LogEntry {
return Err(Error::Encoding(format!(
"invalid record type: expected LogEntry, got {:?}",
record_type
)));
}
if data.len() < 6 {
return Err(Error::Encoding(
"buffer too short for log entry key".to_string(),
));
}
let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
let mut buf = &data[6..];
let key = terminated_bytes::deserialize(&mut buf)?;
let relative_seq = var_u64::deserialize(&mut buf)?;
let sequence = segment_start_seq + relative_seq;
Ok(LogEntryKey {
segment_id,
key,
sequence,
})
}
pub fn scan_range(segment: &LogSegment, key: &[u8], seq_range: Range<u64>) -> BytesRange {
let start_key = Self::build_scan_key(segment, key, seq_range.start);
let end_key = Self::build_scan_key(segment, key, seq_range.end);
BytesRange::new(Bound::Included(start_key), Bound::Excluded(end_key))
}
fn build_scan_key(segment: &LogSegment, key: &[u8], seq: u64) -> Bytes {
let relative_seq = seq.saturating_sub(segment.meta().start_seq);
let mut buf = BytesMut::new();
RecordType::LogEntry.prefix().write_to(&mut buf);
buf.put_u32(segment.id());
terminated_bytes::serialize(key, &mut buf);
var_u64::serialize(relative_seq, &mut buf);
buf.freeze()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SegmentMetaKey {
pub segment_id: SegmentId,
}
impl SegmentMetaKey {
pub fn new(segment_id: SegmentId) -> Self {
Self { segment_id }
}
pub fn serialize(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(6);
RecordType::SegmentMeta.prefix().write_to(&mut buf);
buf.put_u32(self.segment_id);
buf.freeze()
}
pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
let record_type = RecordType::from_id(prefix.tag().record_type())?;
if record_type != RecordType::SegmentMeta {
return Err(Error::Encoding(format!(
"invalid record type: expected SegmentMeta, got {:?}",
record_type
)));
}
if data.len() < 6 {
return Err(Error::Encoding(
"buffer too short for SegmentMeta key".to_string(),
));
}
let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
Ok(SegmentMetaKey { segment_id })
}
pub fn scan_range(range: Range<SegmentId>) -> BytesRange {
let start = Bound::Included(SegmentMetaKey::new(range.start).serialize());
let end = Bound::Excluded(SegmentMetaKey::new(range.end).serialize());
BytesRange::new(start, end)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SegmentMeta {
pub start_seq: u64,
pub start_time_ms: i64,
}
impl SegmentMeta {
pub fn new(start_seq: u64, start_time_ms: i64) -> Self {
Self {
start_seq,
start_time_ms,
}
}
pub fn serialize(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(16);
buf.put_u64(self.start_seq);
buf.put_i64(self.start_time_ms);
buf.freeze()
}
pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
if data.len() < 16 {
return Err(Error::Encoding(format!(
"buffer too short for SegmentMeta value: need 16 bytes, got {}",
data.len()
)));
}
let start_seq = u64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
let start_time_ms = i64::from_be_bytes([
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
]);
Ok(SegmentMeta {
start_seq,
start_time_ms,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ListingEntryKey {
pub segment_id: SegmentId,
pub key: Bytes,
}
impl ListingEntryKey {
pub fn new(segment_id: SegmentId, key: Bytes) -> Self {
Self { segment_id, key }
}
pub fn serialize(&self) -> Bytes {
let mut buf = BytesMut::new();
RecordType::ListingEntry.prefix().write_to(&mut buf);
buf.put_u32(self.segment_id);
buf.put_slice(&self.key);
buf.freeze()
}
pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
let prefix = KeyPrefix::from_bytes_versioned(data, KEY_VERSION)?;
let record_type = RecordType::from_id(prefix.tag().record_type())?;
if record_type != RecordType::ListingEntry {
return Err(Error::Encoding(format!(
"invalid record type: expected ListingEntry, got {:?}",
record_type
)));
}
if data.len() < 6 {
return Err(Error::Encoding(
"buffer too short for listing entry key".to_string(),
));
}
let segment_id = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
let key = Bytes::copy_from_slice(&data[6..]);
Ok(ListingEntryKey { segment_id, key })
}
pub fn scan_range(range: Range<SegmentId>) -> BytesRange {
let start = Bound::Included(Self::segment_prefix(range.start));
let end = Bound::Excluded(Self::segment_prefix(range.end));
BytesRange::new(start, end)
}
fn segment_prefix(segment_id: SegmentId) -> Bytes {
let mut buf = BytesMut::with_capacity(6);
RecordType::ListingEntry.prefix().write_to(&mut buf);
buf.put_u32(segment_id);
buf.freeze()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ListingEntryValue;
impl ListingEntryValue {
pub fn new() -> Self {
Self
}
pub fn serialize(&self) -> Bytes {
Bytes::new()
}
pub fn deserialize(data: &[u8]) -> Result<Self, Error> {
if !data.is_empty() {
return Err(Error::Encoding(format!(
"listing entry value should be empty, got {} bytes",
data.len()
)));
}
Ok(ListingEntryValue)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::ops::RangeBounds;
#[test]
fn should_convert_record_type_to_id_and_back() {
let log_entry = RecordType::LogEntry;
let seq_block = RecordType::SeqBlock;
let segment_meta = RecordType::SegmentMeta;
let listing_entry = RecordType::ListingEntry;
assert_eq!(log_entry.id(), 0x01);
assert_eq!(seq_block.id(), 0x02);
assert_eq!(segment_meta.id(), 0x03);
assert_eq!(listing_entry.id(), 0x04);
assert_eq!(RecordType::from_id(0x01).unwrap(), RecordType::LogEntry);
assert_eq!(RecordType::from_id(0x02).unwrap(), RecordType::SeqBlock);
assert_eq!(RecordType::from_id(0x03).unwrap(), RecordType::SegmentMeta);
assert_eq!(RecordType::from_id(0x04).unwrap(), RecordType::ListingEntry);
}
#[test]
fn should_reject_invalid_record_type() {
let invalid_byte = 0x99;
let result = RecordType::from_id(invalid_byte);
assert!(result.is_err());
}
#[test]
fn should_serialize_and_deserialize_log_entry_key() {
let segment_start_seq = 10000;
let key = LogEntryKey::new(42, Bytes::from("test_key"), 12345);
let serialized = key.serialize(segment_start_seq);
let deserialized = LogEntryKey::deserialize(&serialized, segment_start_seq).unwrap();
assert_eq!(deserialized.segment_id, 42);
assert_eq!(deserialized.key, Bytes::from("test_key"));
assert_eq!(deserialized.sequence, 12345);
}
#[test]
fn should_serialize_log_entry_key_with_correct_structure() {
let segment_start_seq = 0;
let key = LogEntryKey::new(1, Bytes::from("k"), 100);
let serialized = key.serialize(segment_start_seq);
assert_eq!(serialized.len(), 10);
assert_eq!(serialized[0], KEY_VERSION);
assert_eq!(serialized[1], RecordType::LogEntry.tag().as_byte());
assert_eq!(serialized[1], 0x10);
assert_eq!(&serialized[2..6], &[0, 0, 0, 1]);
assert_eq!(serialized[6], b'k');
assert_eq!(serialized[7], 0x00); assert_eq!(&serialized[8..10], &[0x10, 0x64]);
}
#[test]
fn should_serialize_relative_sequence() {
let segment_start_seq = 1000;
let key = LogEntryKey::new(1, Bytes::from("k"), 1005);
let serialized = key.serialize(segment_start_seq);
assert_eq!(serialized.len(), 9);
assert_eq!(serialized[8], 0x05);
}
#[test]
fn should_order_log_entries_by_segment_then_key_then_sequence() {
let segment_start_seq = 0;
let key1 = LogEntryKey::new(0, Bytes::from("a"), 1);
let key2 = LogEntryKey::new(0, Bytes::from("a"), 2);
let key3 = LogEntryKey::new(0, Bytes::from("b"), 1);
let segment1_start_seq = 100;
let key4 = LogEntryKey::new(1, Bytes::from("a"), 101);
let s1 = key1.serialize(segment_start_seq);
let s2 = key2.serialize(segment_start_seq);
let s3 = key3.serialize(segment_start_seq);
let s4 = key4.serialize(segment1_start_seq);
assert!(s1 < s2, "same segment/key, seq 1 < seq 2");
assert!(s2 < s3, "same segment, key 'a' < key 'b'");
assert!(s3 < s4, "segment 0 < segment 1");
}
#[test]
fn should_create_record_tag() {
let log_entry_tag = RecordType::LogEntry.tag();
let seq_block_tag = RecordType::SeqBlock.tag();
let segment_meta_tag = RecordType::SegmentMeta.tag();
let listing_entry_tag = RecordType::ListingEntry.tag();
assert_eq!(log_entry_tag.as_byte(), 0x10);
assert_eq!(seq_block_tag.as_byte(), 0x20);
assert_eq!(segment_meta_tag.as_byte(), 0x30);
assert_eq!(listing_entry_tag.as_byte(), 0x40);
}
#[test]
fn should_fail_deserialize_log_entry_key_too_short() {
let data = vec![KEY_VERSION, RecordType::LogEntry.tag().as_byte(), 0, 0, 0];
let result = LogEntryKey::deserialize(&data, 0);
assert!(result.is_err());
}
#[test]
fn should_serialize_and_deserialize_listing_entry_key() {
let key = ListingEntryKey::new(42, Bytes::from("test_key"));
let serialized = key.serialize();
let deserialized = ListingEntryKey::deserialize(&serialized).unwrap();
assert_eq!(deserialized.segment_id, 42);
assert_eq!(deserialized.key, Bytes::from("test_key"));
}
#[test]
fn should_serialize_listing_entry_key_with_correct_structure() {
let key = ListingEntryKey::new(1, Bytes::from("k"));
let serialized = key.serialize();
assert_eq!(serialized.len(), 7);
assert_eq!(serialized[0], KEY_VERSION);
assert_eq!(serialized[1], RecordType::ListingEntry.tag().as_byte());
assert_eq!(serialized[1], 0x40);
assert_eq!(&serialized[2..6], &[0, 0, 0, 1]);
assert_eq!(serialized[6], b'k');
}
#[test]
fn should_serialize_listing_entry_key_with_empty_key() {
let key = ListingEntryKey::new(1, Bytes::new());
let serialized = key.serialize();
let deserialized = ListingEntryKey::deserialize(&serialized).unwrap();
assert_eq!(serialized.len(), 6); assert_eq!(deserialized.segment_id, 1);
assert_eq!(deserialized.key, Bytes::new());
}
#[test]
fn should_order_listing_entries_by_segment_then_key() {
let key1 = ListingEntryKey::new(0, Bytes::from("a"));
let key2 = ListingEntryKey::new(0, Bytes::from("b"));
let key3 = ListingEntryKey::new(1, Bytes::from("a"));
let s1 = key1.serialize();
let s2 = key2.serialize();
let s3 = key3.serialize();
assert!(s1 < s2, "same segment, key 'a' < key 'b'");
assert!(s2 < s3, "segment 0 < segment 1");
}
#[test]
fn should_create_listing_entry_scan_range() {
let range = 1..3;
let scan_range = ListingEntryKey::scan_range(range);
let start_key = ListingEntryKey::new(1, Bytes::new()).serialize();
let end_key = ListingEntryKey::new(3, Bytes::new()).serialize();
assert_eq!(scan_range.start_bound(), Bound::Included(&start_key));
assert_eq!(scan_range.end_bound(), Bound::Excluded(&end_key));
}
#[test]
fn should_serialize_and_deserialize_listing_entry_value() {
let value = ListingEntryValue::new();
let serialized = value.serialize();
let deserialized = ListingEntryValue::deserialize(&serialized).unwrap();
assert!(serialized.is_empty());
assert_eq!(deserialized, ListingEntryValue);
}
#[test]
fn should_fail_deserialize_listing_entry_value_with_data() {
let data = vec![0x01, 0x02];
let result = ListingEntryValue::deserialize(&data);
assert!(result.is_err());
}
#[test]
fn should_fail_deserialize_listing_entry_key_too_short() {
let data = vec![
KEY_VERSION,
RecordType::ListingEntry.tag().as_byte(),
0,
0,
0,
];
let result = ListingEntryKey::deserialize(&data);
assert!(result.is_err());
}
mod proptests {
use proptest::prelude::*;
use super::*;
proptest! {
#[test]
fn should_preserve_sequence_ordering(a: u64, b: u64) {
let segment_start_seq = 0;
let key_a = LogEntryKey::new(0, Bytes::from("key"), a);
let key_b = LogEntryKey::new(0, Bytes::from("key"), b);
let enc_a = key_a.serialize(segment_start_seq);
let enc_b = key_b.serialize(segment_start_seq);
prop_assert_eq!(
a.cmp(&b),
enc_a.cmp(&enc_b),
"ordering mismatch: a={}, b={}, enc_a={:?}, enc_b={:?}",
a, b, enc_a.as_ref(), enc_b.as_ref()
);
}
#[test]
fn should_include_listing_entry_in_scan_range(
start in 0u32..1000,
range_size in 1u32..100,
offset in 0u32..100,
key_bytes in prop::collection::vec(any::<u8>(), 1..100),
) {
let end = start.saturating_add(range_size);
let segment_id = start.saturating_add(offset % range_size);
let key = ListingEntryKey::new(segment_id, Bytes::from(key_bytes));
let serialized = key.serialize();
let scan_range = ListingEntryKey::scan_range(start..end);
prop_assert!(
scan_range.contains(&serialized),
"listing entry for segment {} with key should be in range {}..{}, \
serialized={:?}, range_start={:?}, range_end={:?}",
segment_id, start, end,
serialized.as_ref(),
scan_range.start_bound(),
scan_range.end_bound()
);
}
}
}
}