use super::wal_op::WalOp;
use crate::api::errors::{Error, Result};
pub const RECORD_MAGIC: u32 = 0x5243_4552;
pub const RECORD_HEADER_SIZE: usize = 4 + 4 + 8 + 1;
pub const RECORD_FOOTER_SIZE: usize = 4;
pub const FILE_MAGIC: u32 = 0x414C_4157;
pub const FORMAT_VERSION: u32 = 3;
pub const FILE_HEADER_SIZE: usize = 32;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileHeader {
pub tree_id: u64,
pub created_at: u64,
}
impl FileHeader {
#[must_use]
pub fn now(tree_id: u64) -> Self {
let created_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
Self {
tree_id,
created_at,
}
}
}
pub fn encode_file_header(h: &FileHeader, out: &mut Vec<u8>) {
out.extend_from_slice(&FILE_MAGIC.to_le_bytes());
out.extend_from_slice(&FORMAT_VERSION.to_le_bytes());
out.extend_from_slice(&h.tree_id.to_le_bytes());
out.extend_from_slice(&h.created_at.to_le_bytes());
out.extend_from_slice(&0u64.to_le_bytes());
debug_assert_eq!(out.len(), FILE_HEADER_SIZE);
}
pub fn decode_file_header(buf: &[u8]) -> Result<FileHeader> {
if buf.len() < FILE_HEADER_SIZE {
return Err(sanity("WAL file header truncated"));
}
let magic = u32::from_le_bytes(buf[0..4].try_into().unwrap());
if magic != FILE_MAGIC {
return Err(sanity("WAL file magic mismatch"));
}
let version = u32::from_le_bytes(buf[4..8].try_into().unwrap());
if version != FORMAT_VERSION {
return Err(sanity("WAL file format version unsupported"));
}
let tree_id = u64::from_le_bytes(buf[8..16].try_into().unwrap());
let created_at = u64::from_le_bytes(buf[16..24].try_into().unwrap());
Ok(FileHeader {
tree_id,
created_at,
})
}
const TY_INSERT: u8 = 0;
const TY_ERASE: u8 = 1;
const TY_RENAME_OBJECT: u8 = 5;
const TY_BATCH: u8 = 10;
const TY_BATCH_INSERT_RUN: u8 = 11;
pub fn crc32(bytes: &[u8]) -> u32 {
crc32fast::hash(bytes)
}
#[cfg(test)]
pub fn encode_record(op: &WalOp, seq: u64, out: &mut Vec<u8>) {
write_record(out, seq, variant_tag(op), |buf| encode_body(op, buf));
}
fn write_record<F>(out: &mut Vec<u8>, seq: u64, ty: u8, write_body: F)
where
F: FnOnce(&mut Vec<u8>),
{
let start = out.len();
out.extend_from_slice(&RECORD_MAGIC.to_le_bytes());
let len_pos = out.len();
out.extend_from_slice(&[0u8; 4]);
out.extend_from_slice(&seq.to_le_bytes());
out.push(ty);
let body_start = out.len();
write_body(out);
let body_end = out.len();
let body_len = u32::try_from(body_end - body_start).expect("body fits in u32");
out[len_pos..len_pos + 4].copy_from_slice(&body_len.to_le_bytes());
let crc = crc32(&out[start..body_end]);
out.extend_from_slice(&crc.to_le_bytes());
}
pub fn encode_insert_record(out: &mut Vec<u8>, seq: u64, tree_id: u64, key: &[u8], value: &[u8]) {
write_record(out, seq, TY_INSERT, |buf| {
buf.extend_from_slice(&tree_id.to_le_bytes());
write_bytes(buf, key);
write_bytes(buf, value);
});
}
#[inline]
pub(crate) const fn encoded_insert_record_len(key_len: usize, value_len: usize) -> usize {
RECORD_HEADER_SIZE + 8 + 4 + key_len + 4 + value_len + RECORD_FOOTER_SIZE
}
pub fn encode_erase_record(out: &mut Vec<u8>, seq: u64, tree_id: u64, key: &[u8]) {
write_record(out, seq, TY_ERASE, |buf| {
buf.extend_from_slice(&tree_id.to_le_bytes());
write_bytes(buf, key);
});
}
#[inline]
pub(crate) const fn encoded_erase_record_len(key_len: usize) -> usize {
RECORD_HEADER_SIZE + 8 + 4 + key_len + RECORD_FOOTER_SIZE
}
pub fn encode_rename_object_record(
out: &mut Vec<u8>,
seq: u64,
tree_id: u64,
src_key: &[u8],
dst_key: &[u8],
force: bool,
) {
write_record(out, seq, TY_RENAME_OBJECT, |buf| {
buf.extend_from_slice(&tree_id.to_le_bytes());
write_bytes(buf, src_key);
write_bytes(buf, dst_key);
buf.push(u8::from(force));
});
}
#[inline]
pub(crate) const fn encoded_rename_object_record_len(
src_key_len: usize,
dst_key_len: usize,
) -> usize {
RECORD_HEADER_SIZE + 8 + 4 + src_key_len + 4 + dst_key_len + 1 + RECORD_FOOTER_SIZE
}
pub struct BatchEncoder<'buf> {
out: &'buf mut Vec<u8>,
start: usize,
len_pos: usize,
body_start: usize,
count_pos: usize,
inner_count: u32,
finished: bool,
}
impl<'buf> BatchEncoder<'buf> {
pub fn begin(out: &'buf mut Vec<u8>, seq: u64, tree_id: u64) -> Self {
let start = out.len();
out.extend_from_slice(&RECORD_MAGIC.to_le_bytes());
let len_pos = out.len();
out.extend_from_slice(&[0u8; 4]);
out.extend_from_slice(&seq.to_le_bytes());
out.push(TY_BATCH);
let body_start = out.len();
out.extend_from_slice(&tree_id.to_le_bytes());
let count_pos = out.len();
out.extend_from_slice(&[0u8; 4]);
Self {
out,
start,
len_pos,
body_start,
count_pos,
inner_count: 0,
finished: false,
}
}
pub fn push_insert(&mut self, tree_id: u64, key: &[u8], value: &[u8]) {
self.out.push(TY_INSERT);
self.out.extend_from_slice(&tree_id.to_le_bytes());
write_bytes(self.out, key);
write_bytes(self.out, value);
self.inner_count += 1;
}
pub fn push_insert_run<'a, I>(
&mut self,
tree_id: u64,
count: usize,
key_len: usize,
value_len: usize,
items: I,
) where
I: IntoIterator<Item = (&'a [u8], &'a [u8])>,
{
if count == 1 {
let mut iter = items.into_iter();
let (key, value) = iter.next().expect("single insert run has one item");
debug_assert!(iter.next().is_none());
self.push_insert(tree_id, key, value);
return;
}
let count_u32 = u32::try_from(count).expect("insert run count fits in u32");
let key_len_u32 = u32::try_from(key_len).expect("key length fits in u32");
let value_len_u32 = u32::try_from(value_len).expect("value length fits in u32");
self.out.push(TY_BATCH_INSERT_RUN);
self.out.extend_from_slice(&tree_id.to_le_bytes());
self.out.extend_from_slice(&count_u32.to_le_bytes());
self.out.extend_from_slice(&key_len_u32.to_le_bytes());
self.out.extend_from_slice(&value_len_u32.to_le_bytes());
let mut actual = 0usize;
for (key, value) in items {
debug_assert_eq!(key.len(), key_len);
debug_assert_eq!(value.len(), value_len);
self.out.extend_from_slice(key);
self.out.extend_from_slice(value);
actual += 1;
}
assert_eq!(actual, count, "insert run item count mismatch");
self.inner_count += count_u32;
}
pub fn push_erase(&mut self, tree_id: u64, key: &[u8]) {
self.out.push(TY_ERASE);
self.out.extend_from_slice(&tree_id.to_le_bytes());
write_bytes(self.out, key);
self.inner_count += 1;
}
pub fn push_rename_object(&mut self, tree_id: u64, src: &[u8], dst: &[u8], force: bool) {
self.out.push(TY_RENAME_OBJECT);
self.out.extend_from_slice(&tree_id.to_le_bytes());
write_bytes(self.out, src);
write_bytes(self.out, dst);
self.out.push(u8::from(force));
self.inner_count += 1;
}
pub fn finish(mut self) -> u32 {
let body_end = self.out.len();
let body_len = u32::try_from(body_end - self.body_start).expect("batch body fits in u32");
self.out[self.count_pos..self.count_pos + 4]
.copy_from_slice(&self.inner_count.to_le_bytes());
self.out[self.len_pos..self.len_pos + 4].copy_from_slice(&body_len.to_le_bytes());
let crc = crc32(&self.out[self.start..body_end]);
self.out.extend_from_slice(&crc.to_le_bytes());
self.finished = true;
self.inner_count
}
}
impl Drop for BatchEncoder<'_> {
fn drop(&mut self) {
if !self.finished {
self.out.truncate(self.start);
}
}
}
#[cfg(test)]
fn variant_tag(op: &WalOp) -> u8 {
match op {
WalOp::Insert { .. } => TY_INSERT,
WalOp::Erase { .. } => TY_ERASE,
WalOp::RenameObject { .. } => TY_RENAME_OBJECT,
WalOp::Batch { .. } => TY_BATCH,
}
}
#[cfg(test)]
fn encode_body(op: &WalOp, out: &mut Vec<u8>) {
match op {
WalOp::Insert { key, value } => {
out.extend_from_slice(&0u64.to_le_bytes());
write_bytes(out, key);
write_bytes(out, value);
}
WalOp::Erase { key } => {
out.extend_from_slice(&0u64.to_le_bytes());
write_bytes(out, key);
}
WalOp::RenameObject {
src_key,
dst_key,
force,
} => {
out.extend_from_slice(&0u64.to_le_bytes());
write_bytes(out, src_key);
write_bytes(out, dst_key);
out.push(u8::from(*force));
}
WalOp::Batch { ops } => {
out.extend_from_slice(&0u64.to_le_bytes());
let count = u32::try_from(ops.len()).expect("batch ops fit in u32");
out.extend_from_slice(&count.to_le_bytes());
for inner in ops {
let inner_ty = variant_tag(inner);
assert!(
inner_ty != TY_BATCH,
"nested Batch is rejected — Tree::atomic must flatten",
);
out.push(inner_ty);
encode_body(inner, out);
}
}
}
}
fn write_bytes(out: &mut Vec<u8>, b: &[u8]) {
let len = u32::try_from(b.len()).expect("byte string fits in u32");
out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(b);
}
#[derive(Debug)]
pub struct DecodedRecord {
pub op: WalOp,
pub seq: u64,
pub bytes_consumed: usize,
}
pub fn decode_record(buf: &[u8]) -> Result<DecodedRecord> {
if buf.len() < RECORD_HEADER_SIZE {
return Err(sanity("record header truncated"));
}
let magic = u32::from_le_bytes(buf[0..4].try_into().unwrap());
if magic != RECORD_MAGIC {
return Err(sanity("record magic mismatch"));
}
let body_len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize;
let seq = u64::from_le_bytes(buf[8..16].try_into().unwrap());
let ty = buf[16];
let total = RECORD_HEADER_SIZE + body_len + RECORD_FOOTER_SIZE;
if buf.len() < total {
return Err(sanity("record body truncated"));
}
let body_end = RECORD_HEADER_SIZE + body_len;
let crc_expected = u32::from_le_bytes(buf[body_end..body_end + 4].try_into().unwrap());
let crc_computed = crc32(&buf[..body_end]);
if crc_computed != crc_expected {
return Err(sanity("record CRC mismatch"));
}
let body = &buf[RECORD_HEADER_SIZE..body_end];
let op = decode_body(ty, body)?;
Ok(DecodedRecord {
op,
seq,
bytes_consumed: total,
})
}
fn decode_body(ty: u8, body: &[u8]) -> Result<WalOp> {
let mut cursor = body;
let op = decode_body_into(ty, &mut cursor)?;
if !cursor.is_empty() {
return Err(sanity("trailing bytes after variant body"));
}
Ok(op)
}
fn decode_body_into(ty: u8, body: &mut &[u8]) -> Result<WalOp> {
let op = match ty {
TY_INSERT => {
let _tree_id = read_u64(body)?;
let key = read_bytes(body)?;
let value = read_bytes(body)?;
WalOp::Insert { key, value }
}
TY_ERASE => {
let _tree_id = read_u64(body)?;
let key = read_bytes(body)?;
WalOp::Erase { key }
}
TY_RENAME_OBJECT => {
let _tree_id = read_u64(body)?;
let src_key = read_bytes(body)?;
let dst_key = read_bytes(body)?;
let force = read_u8(body)? != 0;
WalOp::RenameObject {
src_key,
dst_key,
force,
}
}
TY_BATCH => {
let _tree_id = read_u64(body)?;
let count = read_u32(body)? as usize;
let mut ops = Vec::with_capacity(count);
while ops.len() < count {
let inner_ty = read_u8(body)?;
if inner_ty == TY_BATCH {
return Err(sanity("nested Batch is rejected"));
}
if inner_ty == TY_BATCH_INSERT_RUN {
decode_insert_run(body, count, &mut ops)?;
} else {
let inner = decode_body_into(inner_ty, body)?;
ops.push(inner);
}
}
WalOp::Batch { ops }
}
_ => return Err(sanity("unknown WalOp variant tag")),
};
Ok(op)
}
fn decode_insert_run(body: &mut &[u8], batch_count: usize, ops: &mut Vec<WalOp>) -> Result<()> {
let _tree_id = read_u64(body)?;
let count = read_u32(body)? as usize;
if count == 0 {
return Err(sanity("empty BatchInsertRun is rejected"));
}
if ops.len().saturating_add(count) > batch_count {
return Err(sanity("BatchInsertRun exceeds batch inner count"));
}
let key_len = read_u32(body)? as usize;
let value_len = read_u32(body)? as usize;
for _ in 0..count {
let (key, rest) = take(body, key_len)?;
*body = rest;
let (value, rest) = take(body, value_len)?;
*body = rest;
ops.push(WalOp::Insert {
key: key.to_vec(),
value: value.to_vec(),
});
}
Ok(())
}
fn read_u8(body: &mut &[u8]) -> Result<u8> {
let (front, rest) = take(body, 1)?;
*body = rest;
Ok(front[0])
}
fn read_u32(body: &mut &[u8]) -> Result<u32> {
let (front, rest) = take(body, 4)?;
*body = rest;
Ok(u32::from_le_bytes(front.try_into().unwrap()))
}
fn read_u64(body: &mut &[u8]) -> Result<u64> {
let (front, rest) = take(body, 8)?;
*body = rest;
Ok(u64::from_le_bytes(front.try_into().unwrap()))
}
fn read_bytes(body: &mut &[u8]) -> Result<Vec<u8>> {
let len = read_u32(body)? as usize;
let (front, rest) = take(body, len)?;
*body = rest;
Ok(front.to_vec())
}
fn take(buf: &[u8], n: usize) -> Result<(&[u8], &[u8])> {
if buf.len() < n {
return Err(sanity("body truncated"));
}
Ok(buf.split_at(n))
}
fn sanity(context: &'static str) -> Error {
Error::ReplaySanityFailed {
context,
record_offset: 0,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn roundtrip(op: WalOp, seq: u64) {
let mut buf = Vec::new();
encode_record(&op, seq, &mut buf);
let r = decode_record(&buf).unwrap();
assert_eq!(r.seq, seq);
assert_eq!(r.bytes_consumed, buf.len());
assert_eq!(format!("{:?}", r.op), format!("{op:?}"));
}
#[test]
fn crc32_matches_known_vector() {
assert_eq!(crc32(b"123456789"), 0xCBF4_3926);
}
#[test]
fn roundtrip_insert_small() {
roundtrip(
WalOp::Insert {
key: b"img/01.jpg".to_vec(),
value: b"v-new".to_vec(),
},
42,
);
}
#[test]
fn roundtrip_insert_large_value() {
roundtrip(
WalOp::Insert {
key: b"new/key".to_vec(),
value: vec![0xAB; 200],
},
7,
);
}
#[test]
fn roundtrip_erase() {
roundtrip(
WalOp::Erase {
key: b"img/02.jpg".to_vec(),
},
99,
);
}
#[test]
fn roundtrip_rename_object() {
roundtrip(
WalOp::RenameObject {
src_key: b"a/b".to_vec(),
dst_key: b"a/c".to_vec(),
force: true,
},
10,
);
}
#[test]
fn removed_cross_tree_rename_tag_is_rejected() {
let mut buf = Vec::new();
write_record(&mut buf, 11, 6, |body| {
body.extend_from_slice(&1u64.to_le_bytes());
body.extend_from_slice(&2u64.to_le_bytes());
write_bytes(body, b"x");
write_bytes(body, b"y");
body.push(0);
});
assert!(matches!(
decode_record(&buf),
Err(Error::ReplaySanityFailed {
context: "unknown WalOp variant tag",
..
})
));
}
#[test]
fn removed_structural_tags_are_rejected() {
for ty in [2, 3, 4] {
let mut buf = Vec::new();
write_record(&mut buf, 500 + u64::from(ty), ty, |_| {});
assert!(
matches!(
decode_record(&buf),
Err(Error::ReplaySanityFailed {
context: "unknown WalOp variant tag",
..
})
),
"removed structural tag {ty} should not decode",
);
}
}
#[test]
fn record_length_breakdown_is_predictable() {
let op = WalOp::Insert {
key: b"k".to_vec(),
value: b"v".to_vec(),
};
let mut buf = Vec::new();
encode_record(&op, 0, &mut buf);
assert_eq!(buf.len(), 39);
}
#[test]
fn corrupt_crc_is_caught() {
let op = WalOp::Insert {
key: b"k".to_vec(),
value: b"v".to_vec(),
};
let mut buf = Vec::new();
encode_record(&op, 1, &mut buf);
let last = buf.len() - 1;
buf[last] ^= 0x01;
match decode_record(&buf) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("CRC"));
}
other => panic!("expected CRC sanity failure, got {other:?}"),
}
}
#[test]
fn corrupt_magic_is_caught() {
let op = WalOp::Erase { key: b"k".to_vec() };
let mut buf = Vec::new();
encode_record(&op, 5, &mut buf);
buf[0] ^= 0xFF;
match decode_record(&buf) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("magic"));
}
other => panic!("expected magic sanity failure, got {other:?}"),
}
}
#[test]
fn truncated_record_is_caught() {
let op = WalOp::Insert {
key: vec![0xAB; 100],
value: vec![0xCD; 100],
};
let mut buf = Vec::new();
encode_record(&op, 1, &mut buf);
let len = buf.len();
buf.truncate(len - 10);
match decode_record(&buf) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("truncated"));
}
other => panic!("expected truncation sanity failure, got {other:?}"),
}
}
#[test]
fn unknown_variant_tag_is_caught() {
let op = WalOp::Erase { key: b"k".to_vec() };
let mut buf = Vec::new();
encode_record(&op, 1, &mut buf);
buf[16] = 0xFF;
let body_len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize;
let body_end = RECORD_HEADER_SIZE + body_len;
let crc = crc32(&buf[..body_end]);
buf[body_end..body_end + 4].copy_from_slice(&crc.to_le_bytes());
match decode_record(&buf) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("variant"));
}
other => panic!("expected unknown-variant sanity failure, got {other:?}"),
}
}
#[test]
fn back_to_back_records_concatenate_cleanly() {
let mut buf = Vec::new();
encode_record(
&WalOp::Insert {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
},
1,
&mut buf,
);
encode_record(
&WalOp::Erase {
key: b"k1".to_vec(),
},
2,
&mut buf,
);
let r1 = decode_record(&buf).unwrap();
assert_eq!(r1.seq, 1);
let r2 = decode_record(&buf[r1.bytes_consumed..]).unwrap();
assert_eq!(r2.seq, 2);
assert_eq!(r1.bytes_consumed + r2.bytes_consumed, buf.len());
}
#[test]
fn roundtrip_batch_three_inner_ops() {
let base = 100u64;
let batch = WalOp::Batch {
ops: vec![
WalOp::Insert {
key: b"a".to_vec(),
value: b"v-a".to_vec(),
},
WalOp::Erase { key: b"b".to_vec() },
WalOp::RenameObject {
src_key: b"c".to_vec(),
dst_key: b"d".to_vec(),
force: false,
},
],
};
let mut buf = Vec::new();
encode_record(&batch, base, &mut buf);
let r = decode_record(&buf).unwrap();
assert_eq!(r.seq, base);
assert_eq!(r.bytes_consumed, buf.len());
match r.op {
WalOp::Batch { ops } => {
assert_eq!(ops.len(), 3);
match &ops[0] {
WalOp::Insert { key, value: _ } => {
assert_eq!(key, b"a");
}
other => panic!("expected Insert, got {other:?}"),
}
match &ops[1] {
WalOp::Erase { key } => {
assert_eq!(key, b"b");
}
other => panic!("expected Erase, got {other:?}"),
}
match &ops[2] {
WalOp::RenameObject {
src_key,
dst_key,
force,
..
} => {
assert_eq!(src_key, b"c");
assert_eq!(dst_key, b"d");
assert!(!force);
}
other => panic!("expected RenameObject, got {other:?}"),
}
}
other => panic!("expected Batch, got {other:?}"),
}
}
#[test]
fn roundtrip_batch_empty() {
let batch = WalOp::Batch { ops: vec![] };
let mut buf = Vec::new();
encode_record(&batch, 7, &mut buf);
let r = decode_record(&buf).unwrap();
assert_eq!(r.seq, 7);
match r.op {
WalOp::Batch { ops } => assert!(ops.is_empty()),
other => panic!("expected Batch, got {other:?}"),
}
}
#[test]
fn batch_encoder_wire_matches_encode_record() {
let base = 200u64;
let mut buf_streaming = Vec::new();
{
let mut enc = BatchEncoder::begin(&mut buf_streaming, base, 0);
enc.push_insert(0, b"a", b"v-a");
enc.push_erase(0, b"b");
enc.push_rename_object(0, b"c", b"d", false);
let n = enc.finish();
assert_eq!(n, 3);
}
let mut buf_enum = Vec::new();
let batch = WalOp::Batch {
ops: vec![
WalOp::Insert {
key: b"a".to_vec(),
value: b"v-a".to_vec(),
},
WalOp::Erase { key: b"b".to_vec() },
WalOp::RenameObject {
src_key: b"c".to_vec(),
dst_key: b"d".to_vec(),
force: false,
},
],
};
encode_record(&batch, base, &mut buf_enum);
assert_eq!(buf_streaming, buf_enum);
let r = decode_record(&buf_streaming).unwrap();
assert_eq!(r.seq, base);
match r.op {
WalOp::Batch { ops } => assert_eq!(ops.len(), 3),
other => panic!("expected Batch, got {other:?}"),
}
}
#[test]
fn batch_insert_run_round_trips_and_saves_wire_bytes() {
let base = 300u64;
let mut compact = Vec::new();
{
let mut enc = BatchEncoder::begin(&mut compact, base, 0);
enc.push_insert_run(
0,
3,
4,
2,
[
(&b"k001"[..], &b"v1"[..]),
(&b"k002"[..], &b"v2"[..]),
(&b"k003"[..], &b"v3"[..]),
],
);
assert_eq!(enc.finish(), 3);
}
let mut individual = Vec::new();
{
let mut enc = BatchEncoder::begin(&mut individual, base, 0);
enc.push_insert(0, b"k001", b"v1");
enc.push_insert(0, b"k002", b"v2");
enc.push_insert(0, b"k003", b"v3");
assert_eq!(enc.finish(), 3);
}
assert!(
compact.len() < individual.len(),
"compact insert run should be smaller: compact={}, individual={}",
compact.len(),
individual.len(),
);
let r = decode_record(&compact).unwrap();
match r.op {
WalOp::Batch { ops } => {
assert_eq!(ops.len(), 3);
for (idx, op) in ops.iter().enumerate() {
let WalOp::Insert { key, value } = op else {
panic!("expected insert, got {op:?}");
};
assert_eq!(key, format!("k{:03}", idx + 1).as_bytes());
assert_eq!(value, format!("v{}", idx + 1).as_bytes());
}
}
other => panic!("expected Batch, got {other:?}"),
}
}
#[test]
fn batch_encoder_empty_round_trips() {
let mut buf = Vec::new();
{
let enc = BatchEncoder::begin(&mut buf, 9, 0);
assert_eq!(enc.finish(), 0);
}
let r = decode_record(&buf).unwrap();
assert_eq!(r.seq, 9);
match r.op {
WalOp::Batch { ops } => assert!(ops.is_empty()),
other => panic!("expected Batch, got {other:?}"),
}
}
#[test]
fn batch_encoder_drop_without_finish_rolls_back() {
let mut buf = Vec::new();
buf.extend_from_slice(b"pre-existing bytes");
let before = buf.len();
{
let mut enc = BatchEncoder::begin(&mut buf, 1, 0);
enc.push_insert(0, b"would-be-rolled-back", b"v");
}
assert_eq!(buf.len(), before, "Drop should truncate the partial record");
assert_eq!(&buf[..], b"pre-existing bytes");
}
#[test]
fn batch_encoder_finish_commits_record() {
let mut buf = Vec::new();
{
let mut enc = BatchEncoder::begin(&mut buf, 5, 0);
enc.push_insert(0, b"k", b"v");
let _ = enc.finish();
}
assert!(!buf.is_empty());
let r = decode_record(&buf).unwrap();
assert_eq!(r.seq, 5);
}
#[test]
#[should_panic(expected = "nested Batch is rejected")]
fn nested_batch_encode_panics() {
let inner = WalOp::Batch { ops: vec![] };
let outer = WalOp::Batch { ops: vec![inner] };
let mut buf = Vec::new();
encode_record(&outer, 0, &mut buf);
}
}