use bytes::{Buf, BufMut, Bytes, BytesMut};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum RecordError {
#[error("Buffer too short")]
BufferTooShort,
#[error("Invalid record type: {0}")]
InvalidType(u8),
#[error("Varint decoding error")]
VarintError,
}
#[derive(Debug, Clone, PartialEq)]
pub enum BatchOp {
Put { key: Bytes, value: Bytes },
Delete { key: Bytes },
Merge { key: Bytes, operand: Bytes },
}
#[derive(Debug, Clone, PartialEq)]
pub enum Record {
Put {
key: Bytes,
value: Bytes,
seq: u64,
},
Delete {
key: Bytes,
seq: u64,
},
Merge {
key: Bytes,
operand: Bytes,
seq: u64,
},
Batch {
base_seq: u64,
operations: Vec<BatchOp>,
},
}
impl Record {
#[inline]
pub fn encoded_len(&self) -> usize {
match self {
Self::Put { key, value, .. } => {
1 + 8 + 4 + key.len() + 4 + value.len()
}
Self::Delete { key, .. } => {
1 + 8 + 4 + key.len()
}
Self::Merge { key, operand, .. } => {
1 + 8 + 4 + key.len() + 4 + operand.len()
}
Self::Batch { operations, .. } => {
let ops_len: usize = operations
.iter()
.map(|op| match op {
BatchOp::Put { key, value } => 1 + 4 + key.len() + 4 + value.len(),
BatchOp::Delete { key } => 1 + 4 + key.len(),
BatchOp::Merge { key, operand } => 1 + 4 + key.len() + 4 + operand.len(),
})
.sum();
1 + 8 + 4 + ops_len
}
}
}
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.encoded_len());
match self {
Self::Put { key, value, seq } => {
buf.put_u8(1); buf.put_u64_le(*seq); put_bytes(&mut buf, key);
put_bytes(&mut buf, value);
}
Self::Delete { key, seq } => {
buf.put_u8(2); buf.put_u64_le(*seq); put_bytes(&mut buf, key);
}
Self::Batch {
base_seq,
operations,
} => {
buf.put_u8(3); buf.put_u64_le(*base_seq); buf.put_u32_le(operations.len() as u32);
for op in operations {
match op {
BatchOp::Put { key, value } => {
buf.put_u8(1);
put_bytes(&mut buf, key);
put_bytes(&mut buf, value);
}
BatchOp::Delete { key } => {
buf.put_u8(2);
put_bytes(&mut buf, key);
}
BatchOp::Merge { key, operand } => {
buf.put_u8(4);
put_bytes(&mut buf, key);
put_bytes(&mut buf, operand);
}
}
}
}
Self::Merge { key, operand, seq } => {
buf.put_u8(4); buf.put_u64_le(*seq); put_bytes(&mut buf, key);
put_bytes(&mut buf, operand);
}
}
buf.freeze()
}
pub fn decode(mut buf: Bytes) -> Result<Self, RecordError> {
if buf.remaining() < 1 {
return Err(RecordError::BufferTooShort);
}
let record_type = buf.get_u8();
match record_type {
1 => {
if buf.remaining() < 8 {
return Err(RecordError::BufferTooShort);
}
let seq = buf.get_u64_le();
let key = get_bytes(&mut buf)?;
let value = get_bytes(&mut buf)?;
Ok(Self::Put { key, value, seq })
}
2 => {
if buf.remaining() < 8 {
return Err(RecordError::BufferTooShort);
}
let seq = buf.get_u64_le();
let key = get_bytes(&mut buf)?;
Ok(Self::Delete { key, seq })
}
3 => {
if buf.remaining() < 8 {
return Err(RecordError::BufferTooShort);
}
let base_seq = buf.get_u64_le();
if buf.remaining() < 4 {
return Err(RecordError::BufferTooShort);
}
let count = buf.get_u32_le();
let mut operations = Vec::with_capacity(count as usize);
for _ in 0..count {
if buf.remaining() < 1 {
return Err(RecordError::BufferTooShort);
}
let op_type = buf.get_u8();
match op_type {
1 => {
let key = get_bytes(&mut buf)?;
let value = get_bytes(&mut buf)?;
operations.push(BatchOp::Put { key, value });
}
2 => {
let key = get_bytes(&mut buf)?;
operations.push(BatchOp::Delete { key });
}
4 => {
let key = get_bytes(&mut buf)?;
let operand = get_bytes(&mut buf)?;
operations.push(BatchOp::Merge { key, operand });
}
_ => return Err(RecordError::InvalidType(op_type)),
}
}
Ok(Self::Batch {
base_seq,
operations,
})
}
4 => {
if buf.remaining() < 8 {
return Err(RecordError::BufferTooShort);
}
let seq = buf.get_u64_le();
let key = get_bytes(&mut buf)?;
let operand = get_bytes(&mut buf)?;
Ok(Self::Merge { key, operand, seq })
}
_ => Err(RecordError::InvalidType(record_type)),
}
}
}
fn put_bytes(buf: &mut BytesMut, bytes: &Bytes) {
buf.put_u32_le(bytes.len() as u32);
buf.put(bytes.as_ref());
}
fn get_bytes(buf: &mut Bytes) -> Result<Bytes, RecordError> {
if buf.remaining() < 4 {
return Err(RecordError::BufferTooShort);
}
let len = buf.get_u32_le() as usize;
if buf.remaining() < len {
return Err(RecordError::BufferTooShort);
}
Ok(buf.split_to(len))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_encoding() {
let record = Record::Put {
key: Bytes::from("key"),
value: Bytes::from("value"),
seq: 42,
};
let encoded = record.encode();
let decoded = Record::decode(encoded).unwrap();
assert_eq!(record, decoded);
}
#[test]
fn test_batch_encoding() {
let operations = vec![
BatchOp::Put {
key: Bytes::from("k1"),
value: Bytes::from("v1"),
},
BatchOp::Delete {
key: Bytes::from("k2"),
},
];
let record = Record::Batch {
base_seq: 100,
operations,
};
let encoded = record.encode();
let decoded = Record::decode(encoded).unwrap();
assert_eq!(record, decoded);
}
}