use bytes::Bytes;
use std::fmt;
use crate::error::{ClientError, Result};
pub const TLV_HEADER_SIZE: usize = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum RecordType {
Data = 0x01,
Tombstone = 0x02,
Transaction = 0x03,
Checkpoint = 0x04,
Schema = 0x05,
Compressed = 0x10,
UserDefined = 0x80,
Unknown = 0xFF,
}
impl From<u8> for RecordType {
fn from(byte: u8) -> Self {
match byte {
0x01 => RecordType::Data,
0x02 => RecordType::Tombstone,
0x03 => RecordType::Transaction,
0x04 => RecordType::Checkpoint,
0x05 => RecordType::Schema,
0x10 => RecordType::Compressed,
0x80..=0xFE => RecordType::UserDefined,
_ => RecordType::Unknown,
}
}
}
impl From<RecordType> for u8 {
fn from(rt: RecordType) -> u8 {
rt as u8
}
}
#[derive(Debug, Clone)]
pub struct Record {
pub record_type: RecordType,
pub type_byte: u8,
pub payload: Bytes,
pub offset: usize,
}
impl Record {
pub fn new(record_type: RecordType, type_byte: u8, payload: Bytes, offset: usize) -> Self {
Self {
record_type,
type_byte,
payload,
offset,
}
}
pub fn is_data(&self) -> bool {
self.record_type == RecordType::Data
}
pub fn is_tombstone(&self) -> bool {
self.record_type == RecordType::Tombstone
}
pub fn total_size(&self) -> usize {
TLV_HEADER_SIZE + self.payload.len()
}
pub fn as_str(&self) -> Option<&str> {
std::str::from_utf8(&self.payload).ok()
}
pub fn as_bytes(&self) -> &[u8] {
&self.payload
}
}
#[derive(Debug, Clone)]
pub enum RecordParseError {
InsufficientHeader {
needed: usize,
available: usize,
},
InsufficientPayload {
needed: usize,
available: usize,
},
InvalidType(u8),
PayloadTooLarge {
length: u32,
max: u32,
},
}
impl fmt::Display for RecordParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RecordParseError::InsufficientHeader { needed, available } => {
write!(
f,
"Insufficient data for header: need {} bytes, have {}",
needed, available
)
},
RecordParseError::InsufficientPayload { needed, available } => {
write!(
f,
"Insufficient data for payload: need {} bytes, have {}",
needed, available
)
},
RecordParseError::InvalidType(t) => {
write!(f, "Invalid record type: 0x{:02X}", t)
},
RecordParseError::PayloadTooLarge { length, max } => {
write!(f, "Payload too large: {} bytes (max: {})", length, max)
},
}
}
}
impl std::error::Error for RecordParseError {}
impl From<RecordParseError> for ClientError {
fn from(e: RecordParseError) -> Self {
ClientError::ProtocolError(e.to_string())
}
}
#[derive(Debug, Clone)]
pub struct RecordParserConfig {
pub max_payload_size: u32,
pub skip_unknown: bool,
}
impl Default for RecordParserConfig {
fn default() -> Self {
Self {
max_payload_size: 16 * 1024 * 1024, skip_unknown: true,
}
}
}
pub struct RecordIterator {
data: Bytes,
offset: usize,
config: RecordParserConfig,
}
impl RecordIterator {
pub fn new(data: Bytes) -> Self {
Self {
data,
offset: 0,
config: RecordParserConfig::default(),
}
}
pub fn with_config(data: Bytes, config: RecordParserConfig) -> Self {
Self {
data,
offset: 0,
config,
}
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn remaining(&self) -> usize {
self.data.len().saturating_sub(self.offset)
}
fn parse_record(&mut self) -> std::result::Result<Option<Record>, RecordParseError> {
let remaining = self.remaining();
if remaining == 0 {
return Ok(None);
}
if remaining < TLV_HEADER_SIZE {
return Err(RecordParseError::InsufficientHeader {
needed: TLV_HEADER_SIZE,
available: remaining,
});
}
let start_offset = self.offset;
let type_byte = self.data[self.offset];
let record_type = RecordType::from(type_byte);
let length = u32::from_le_bytes([
self.data[self.offset + 1],
self.data[self.offset + 2],
self.data[self.offset + 3],
self.data[self.offset + 4],
]);
if length > self.config.max_payload_size {
return Err(RecordParseError::PayloadTooLarge {
length,
max: self.config.max_payload_size,
});
}
let payload_len = length as usize;
if remaining < TLV_HEADER_SIZE + payload_len {
return Err(RecordParseError::InsufficientPayload {
needed: payload_len,
available: remaining - TLV_HEADER_SIZE,
});
}
let payload_start = self.offset + TLV_HEADER_SIZE;
let payload_end = payload_start + payload_len;
let payload = self.data.slice(payload_start..payload_end);
self.offset = payload_end;
Ok(Some(Record::new(
record_type,
type_byte,
payload,
start_offset,
)))
}
}
impl Iterator for RecordIterator {
type Item = std::result::Result<Record, RecordParseError>;
fn next(&mut self) -> Option<Self::Item> {
match self.parse_record() {
Ok(Some(record)) => Some(Ok(record)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
pub fn parse_records(data: Bytes) -> Result<Vec<Record>> {
let mut records = Vec::new();
for result in RecordIterator::new(data) {
records.push(result?);
}
Ok(records)
}
pub fn parse_record(data: &[u8]) -> std::result::Result<(Record, usize), RecordParseError> {
if data.len() < TLV_HEADER_SIZE {
return Err(RecordParseError::InsufficientHeader {
needed: TLV_HEADER_SIZE,
available: data.len(),
});
}
let type_byte = data[0];
let record_type = RecordType::from(type_byte);
let length = u32::from_le_bytes([data[1], data[2], data[3], data[4]]);
let payload_len = length as usize;
if data.len() < TLV_HEADER_SIZE + payload_len {
return Err(RecordParseError::InsufficientPayload {
needed: payload_len,
available: data.len() - TLV_HEADER_SIZE,
});
}
let payload = Bytes::copy_from_slice(&data[TLV_HEADER_SIZE..TLV_HEADER_SIZE + payload_len]);
let total_size = TLV_HEADER_SIZE + payload_len;
Ok((Record::new(record_type, type_byte, payload, 0), total_size))
}
pub fn encode_record(record_type: RecordType, payload: &[u8]) -> Bytes {
let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
buf.push(record_type as u8);
let length = payload.len() as u32;
buf.extend_from_slice(&length.to_le_bytes());
buf.extend_from_slice(payload);
Bytes::from(buf)
}
pub fn encode_record_with_type(type_byte: u8, payload: &[u8]) -> Bytes {
let mut buf = Vec::with_capacity(TLV_HEADER_SIZE + payload.len());
buf.push(type_byte);
let length = payload.len() as u32;
buf.extend_from_slice(&length.to_le_bytes());
buf.extend_from_slice(payload);
Bytes::from(buf)
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_record_type_from_byte() {
assert_eq!(RecordType::from(0x01), RecordType::Data);
assert_eq!(RecordType::from(0x02), RecordType::Tombstone);
assert_eq!(RecordType::from(0x80), RecordType::UserDefined);
assert_eq!(RecordType::from(0x00), RecordType::Unknown);
}
#[test]
fn test_encode_decode_record() {
let payload = b"hello world";
let encoded = encode_record(RecordType::Data, payload);
assert_eq!(encoded.len(), TLV_HEADER_SIZE + payload.len());
assert_eq!(encoded[0], 0x01);
let (record, size) = parse_record(&encoded).unwrap();
assert_eq!(size, encoded.len());
assert_eq!(record.record_type, RecordType::Data);
assert_eq!(record.payload.as_ref(), payload);
}
#[test]
fn test_record_iterator() {
let mut data = Vec::new();
data.extend_from_slice(&encode_record(RecordType::Data, b"record1"));
data.extend_from_slice(&encode_record(RecordType::Data, b"record2"));
data.extend_from_slice(&encode_record(RecordType::Tombstone, b""));
let records: Vec<_> = RecordIterator::new(Bytes::from(data))
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0].as_str(), Some("record1"));
assert_eq!(records[1].as_str(), Some("record2"));
assert!(records[2].is_tombstone());
}
#[test]
fn test_insufficient_header() {
let data = Bytes::from(vec![0x01, 0x00]); let mut iter = RecordIterator::new(data);
let result = iter.next();
assert!(matches!(
result,
Some(Err(RecordParseError::InsufficientHeader { .. }))
));
}
#[test]
fn test_insufficient_payload() {
let mut data = vec![0x01, 0x64, 0x00, 0x00, 0x00]; data.extend_from_slice(b"short");
let mut iter = RecordIterator::new(Bytes::from(data));
let result = iter.next();
assert!(matches!(
result,
Some(Err(RecordParseError::InsufficientPayload { .. }))
));
}
#[test]
fn test_empty_record() {
let encoded = encode_record(RecordType::Tombstone, b"");
let (record, _) = parse_record(&encoded).unwrap();
assert!(record.is_tombstone());
assert!(record.payload.is_empty());
}
#[test]
fn test_record_offset_tracking() {
let mut data = Vec::new();
let rec1 = encode_record(RecordType::Data, b"first");
let rec2 = encode_record(RecordType::Data, b"second");
data.extend_from_slice(&rec1);
data.extend_from_slice(&rec2);
let records: Vec<_> = RecordIterator::new(Bytes::from(data))
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(records[0].offset, 0);
assert_eq!(records[1].offset, rec1.len());
}
}