use crate::storage::engine::crc32::{crc32, crc32_update};
use std::io::{self, Read};
pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
pub const WAL_VERSION: u8 = 3;
pub const WAL_VERSION_V2: u8 = 2;
pub const WAL_DEFAULT_TERM: u64 = crate::replication::DEFAULT_REPLICATION_TERM;
const COMPRESS_THRESHOLD: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum Compression {
None = 0,
Zstd = 1,
}
impl Compression {
fn from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Compression::None),
1 => Some(Compression::Zstd),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum RecordType {
Begin = 1,
Commit = 2,
Rollback = 3,
PageWrite = 4,
Checkpoint = 5,
PageWriteCompressed = 6,
TxCommitBatch = 7,
FullPageImage = 8,
VectorInsert = 9,
}
impl RecordType {
pub fn from_u8(v: u8) -> Option<Self> {
match v {
1 => Some(RecordType::Begin),
2 => Some(RecordType::Commit),
3 => Some(RecordType::Rollback),
4 => Some(RecordType::PageWrite),
5 => Some(RecordType::Checkpoint),
6 => Some(RecordType::PageWriteCompressed),
7 => Some(RecordType::TxCommitBatch),
8 => Some(RecordType::FullPageImage),
9 => Some(RecordType::VectorInsert),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum WalRecord {
Begin { tx_id: u64 },
Commit { tx_id: u64 },
Rollback { tx_id: u64 },
PageWrite {
tx_id: u64,
page_id: u32,
data: Vec<u8>,
},
TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
FullPageImage {
tx_id: u64,
page_id: u32,
ckpt_epoch: u64,
data: Vec<u8>,
},
VectorInsert {
collection: String,
entity_id: u64,
vector: Vec<f32>,
},
Checkpoint { lsn: u64 },
}
impl WalRecord {
pub fn encode(&self) -> Vec<u8> {
self.encode_with_term(WAL_DEFAULT_TERM)
}
pub fn encode_with_term(&self, term: u64) -> Vec<u8> {
let mut buf = Vec::new();
self.encode_with_term_into(&mut buf, term);
buf
}
pub fn encode_into(&self, out: &mut Vec<u8>) {
self.encode_with_term_into(out, WAL_DEFAULT_TERM)
}
pub fn encode_with_term_into(&self, out: &mut Vec<u8>, term: u64) {
let start = out.len();
let buf = out;
match self {
WalRecord::Begin { tx_id } => {
buf.push(RecordType::Begin as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
}
WalRecord::Commit { tx_id } => {
buf.push(RecordType::Commit as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
}
WalRecord::Rollback { tx_id } => {
buf.push(RecordType::Rollback as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
}
WalRecord::PageWrite {
tx_id,
page_id,
data,
} => {
if data.len() >= COMPRESS_THRESHOLD {
if let Ok(compressed) =
zstd::bulk::compress(data.as_slice(), 3)
{
if compressed.len() < data.len() {
buf.push(RecordType::PageWriteCompressed as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
buf.extend_from_slice(&page_id.to_le_bytes());
buf.push(Compression::Zstd as u8);
buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
buf.extend_from_slice(&compressed);
let checksum = crc32(&buf[start..]);
buf.extend_from_slice(&checksum.to_le_bytes());
return;
}
}
}
buf.push(RecordType::PageWrite as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
buf.extend_from_slice(&page_id.to_le_bytes());
buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
buf.extend_from_slice(data);
}
WalRecord::TxCommitBatch { tx_id, actions } => {
buf.push(RecordType::TxCommitBatch as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
for action in actions {
buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
buf.extend_from_slice(action);
}
}
WalRecord::FullPageImage {
tx_id,
page_id,
ckpt_epoch,
data,
} => {
buf.push(RecordType::FullPageImage as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&tx_id.to_le_bytes());
buf.extend_from_slice(&page_id.to_le_bytes());
buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
buf.extend_from_slice(data);
}
WalRecord::VectorInsert {
collection,
entity_id,
vector,
} => {
buf.push(RecordType::VectorInsert as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&(collection.len() as u32).to_le_bytes());
buf.extend_from_slice(collection.as_bytes());
buf.extend_from_slice(&entity_id.to_le_bytes());
buf.extend_from_slice(&(vector.len() as u32).to_le_bytes());
for value in vector {
buf.extend_from_slice(&value.to_le_bytes());
}
}
WalRecord::Checkpoint { lsn } => {
buf.push(RecordType::Checkpoint as u8);
buf.extend_from_slice(&term.to_le_bytes());
buf.extend_from_slice(&lsn.to_le_bytes());
}
}
let checksum = crc32(&buf[start..]);
buf.extend_from_slice(&checksum.to_le_bytes());
}
pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
Ok(Self::read_with_term(reader)?.map(|(_, record)| record))
}
pub fn read_with_term<R: Read>(reader: &mut R) -> io::Result<Option<(u64, WalRecord)>> {
Self::read_with_format_version(reader, WAL_VERSION)
}
pub(crate) fn read_with_format_version<R: Read>(
reader: &mut R,
format_version: u8,
) -> io::Result<Option<(u64, WalRecord)>> {
let mut type_buf = [0u8; 1];
match reader.read_exact(&mut type_buf) {
Ok(_) => (),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
};
let record_type = RecordType::from_u8(type_buf[0])
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
let mut running_crc = crc32_update(0, &type_buf);
let term = match format_version {
WAL_VERSION => {
let mut term_buf = [0u8; 8];
reader.read_exact(&mut term_buf)?;
running_crc = crc32_update(running_crc, &term_buf);
u64::from_le_bytes(term_buf)
}
WAL_VERSION_V2 => WAL_DEFAULT_TERM,
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unsupported WAL version: {format_version}"),
));
}
};
let record = match record_type {
RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
let mut buf = [0u8; 8];
reader.read_exact(&mut buf)?;
running_crc = crc32_update(running_crc, &buf);
let tx_id = u64::from_le_bytes(buf);
match record_type {
RecordType::Begin => WalRecord::Begin { tx_id },
RecordType::Commit => WalRecord::Commit { tx_id },
RecordType::Rollback => WalRecord::Rollback { tx_id },
_ => unreachable!(),
}
}
RecordType::PageWrite => {
let mut tx_buf = [0u8; 8];
reader.read_exact(&mut tx_buf)?;
running_crc = crc32_update(running_crc, &tx_buf);
let tx_id = u64::from_le_bytes(tx_buf);
let mut page_buf = [0u8; 4];
reader.read_exact(&mut page_buf)?;
running_crc = crc32_update(running_crc, &page_buf);
let page_id = u32::from_le_bytes(page_buf);
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
running_crc = crc32_update(running_crc, &len_buf);
let len = u32::from_le_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
reader.read_exact(&mut data)?;
running_crc = crc32_update(running_crc, &data);
WalRecord::PageWrite {
tx_id,
page_id,
data,
}
}
RecordType::PageWriteCompressed => {
let mut tx_buf = [0u8; 8];
reader.read_exact(&mut tx_buf)?;
running_crc = crc32_update(running_crc, &tx_buf);
let tx_id = u64::from_le_bytes(tx_buf);
let mut page_buf = [0u8; 4];
reader.read_exact(&mut page_buf)?;
running_crc = crc32_update(running_crc, &page_buf);
let page_id = u32::from_le_bytes(page_buf);
let mut comp_buf = [0u8; 1];
reader.read_exact(&mut comp_buf)?;
running_crc = crc32_update(running_crc, &comp_buf);
let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
)
})?;
let mut orig_len_buf = [0u8; 4];
reader.read_exact(&mut orig_len_buf)?;
running_crc = crc32_update(running_crc, &orig_len_buf);
let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
running_crc = crc32_update(running_crc, &len_buf);
let len = u32::from_le_bytes(len_buf) as usize;
let mut compressed = vec![0u8; len];
reader.read_exact(&mut compressed)?;
running_crc = crc32_update(running_crc, &compressed);
let data = match compression {
Compression::Zstd => {
let mut out = vec![0u8; orig_len];
zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("WAL zstd decompress failed: {e}"),
)
})?;
out
}
Compression::None => compressed,
};
WalRecord::PageWrite {
tx_id,
page_id,
data,
}
}
RecordType::TxCommitBatch => {
let mut tx_buf = [0u8; 8];
reader.read_exact(&mut tx_buf)?;
running_crc = crc32_update(running_crc, &tx_buf);
let tx_id = u64::from_le_bytes(tx_buf);
let mut count_buf = [0u8; 4];
reader.read_exact(&mut count_buf)?;
running_crc = crc32_update(running_crc, &count_buf);
let count = u32::from_le_bytes(count_buf) as usize;
let mut actions = Vec::with_capacity(count);
for _ in 0..count {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
running_crc = crc32_update(running_crc, &len_buf);
let len = u32::from_le_bytes(len_buf) as usize;
let mut action = vec![0u8; len];
reader.read_exact(&mut action)?;
running_crc = crc32_update(running_crc, &action);
actions.push(action);
}
WalRecord::TxCommitBatch { tx_id, actions }
}
RecordType::VectorInsert => {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
running_crc = crc32_update(running_crc, &len_buf);
let collection_len = u32::from_le_bytes(len_buf) as usize;
let mut collection_buf = vec![0u8; collection_len];
reader.read_exact(&mut collection_buf)?;
running_crc = crc32_update(running_crc, &collection_buf);
let collection = String::from_utf8(collection_buf).map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid collection utf8: {err}"),
)
})?;
let mut entity_buf = [0u8; 8];
reader.read_exact(&mut entity_buf)?;
running_crc = crc32_update(running_crc, &entity_buf);
let entity_id = u64::from_le_bytes(entity_buf);
let mut count_buf = [0u8; 4];
reader.read_exact(&mut count_buf)?;
running_crc = crc32_update(running_crc, &count_buf);
let count = u32::from_le_bytes(count_buf) as usize;
let mut vector = Vec::with_capacity(count);
for _ in 0..count {
let mut value_buf = [0u8; 4];
reader.read_exact(&mut value_buf)?;
running_crc = crc32_update(running_crc, &value_buf);
vector.push(f32::from_le_bytes(value_buf));
}
WalRecord::VectorInsert {
collection,
entity_id,
vector,
}
}
RecordType::FullPageImage => {
let mut tx_buf = [0u8; 8];
reader.read_exact(&mut tx_buf)?;
running_crc = crc32_update(running_crc, &tx_buf);
let tx_id = u64::from_le_bytes(tx_buf);
let mut page_buf = [0u8; 4];
reader.read_exact(&mut page_buf)?;
running_crc = crc32_update(running_crc, &page_buf);
let page_id = u32::from_le_bytes(page_buf);
let mut epoch_buf = [0u8; 8];
reader.read_exact(&mut epoch_buf)?;
running_crc = crc32_update(running_crc, &epoch_buf);
let ckpt_epoch = u64::from_le_bytes(epoch_buf);
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
running_crc = crc32_update(running_crc, &len_buf);
let len = u32::from_le_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
reader.read_exact(&mut data)?;
running_crc = crc32_update(running_crc, &data);
WalRecord::FullPageImage {
tx_id,
page_id,
ckpt_epoch,
data,
}
}
RecordType::Checkpoint => {
let mut buf = [0u8; 8];
reader.read_exact(&mut buf)?;
running_crc = crc32_update(running_crc, &buf);
let lsn = u64::from_le_bytes(buf);
WalRecord::Checkpoint { lsn }
}
};
let mut crc_buf = [0u8; 4];
reader.read_exact(&mut crc_buf)?;
let stored_crc = u32::from_le_bytes(crc_buf);
if running_crc != stored_crc {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"WAL record checksum mismatch",
));
}
Ok(Some((term, record)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_record_type_from_u8() {
assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
assert_eq!(
RecordType::from_u8(6),
Some(RecordType::PageWriteCompressed)
);
assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
assert_eq!(RecordType::from_u8(9), Some(RecordType::VectorInsert));
}
#[test]
fn test_record_type_invalid() {
assert_eq!(RecordType::from_u8(0), None);
assert_eq!(RecordType::from_u8(10), None);
assert_eq!(RecordType::from_u8(255), None);
}
#[test]
fn test_encode_begin() {
let record = WalRecord::Begin { tx_id: 12345 };
let encoded = record.encode();
assert_eq!(encoded.len(), 21);
assert_eq!(encoded[0], RecordType::Begin as u8);
}
#[test]
fn test_encode_commit() {
let record = WalRecord::Commit { tx_id: 99999 };
let encoded = record.encode();
assert_eq!(encoded.len(), 21);
assert_eq!(encoded[0], RecordType::Commit as u8);
}
#[test]
fn test_encode_rollback() {
let record = WalRecord::Rollback { tx_id: 54321 };
let encoded = record.encode();
assert_eq!(encoded.len(), 21);
assert_eq!(encoded[0], RecordType::Rollback as u8);
}
#[test]
fn test_encode_checkpoint() {
let record = WalRecord::Checkpoint { lsn: 1000000 };
let encoded = record.encode();
assert_eq!(encoded.len(), 21);
assert_eq!(encoded[0], RecordType::Checkpoint as u8);
}
#[test]
fn test_encode_page_write_small() {
let data = vec![1, 2, 3, 4, 5];
let record = WalRecord::PageWrite {
tx_id: 100,
page_id: 42,
data: data.clone(),
};
let encoded = record.encode();
assert_eq!(encoded.len(), 34);
assert_eq!(encoded[0], RecordType::PageWrite as u8);
}
#[test]
fn test_encode_page_write_empty_data() {
let record = WalRecord::PageWrite {
tx_id: 1,
page_id: 0,
data: vec![],
};
let encoded = record.encode();
assert_eq!(encoded.len(), 29);
}
#[test]
fn test_encode_tx_commit_batch() {
let record = WalRecord::TxCommitBatch {
tx_id: 7,
actions: vec![b"insert".to_vec(), b"update".to_vec()],
};
let encoded = record.encode();
assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
}
#[test]
fn test_read_begin_roundtrip() {
let original = WalRecord::Begin { tx_id: 42 };
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_read_begin_roundtrip_preserves_term() {
let original = WalRecord::Begin { tx_id: 42 };
let encoded = original.encode_with_term(9);
let mut cursor = Cursor::new(encoded);
let (term, decoded) = WalRecord::read_with_term(&mut cursor).unwrap().unwrap();
assert_eq!(term, 9);
assert_eq!(decoded, original);
}
#[test]
fn test_read_v2_begin_defaults_term() {
let tx_id = 42u64;
let mut encoded = Vec::new();
encoded.push(RecordType::Begin as u8);
encoded.extend_from_slice(&tx_id.to_le_bytes());
let checksum = crc32(&encoded);
encoded.extend_from_slice(&checksum.to_le_bytes());
let mut cursor = Cursor::new(encoded);
let (term, decoded) = WalRecord::read_with_format_version(&mut cursor, WAL_VERSION_V2)
.unwrap()
.unwrap();
assert_eq!(term, WAL_DEFAULT_TERM);
assert_eq!(decoded, WalRecord::Begin { tx_id });
}
#[test]
fn test_read_commit_roundtrip() {
let original = WalRecord::Commit { tx_id: 999 };
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_read_rollback_roundtrip() {
let original = WalRecord::Rollback { tx_id: 777 };
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_read_checkpoint_roundtrip() {
let original = WalRecord::Checkpoint { lsn: 123456789 };
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_read_page_write_roundtrip() {
let original = WalRecord::PageWrite {
tx_id: 50,
page_id: 100,
data: vec![10, 20, 30, 40, 50, 60, 70, 80],
};
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_read_tx_commit_batch_roundtrip() {
let original = WalRecord::TxCommitBatch {
tx_id: 42,
actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
};
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_vector_insert_roundtrip() {
let original = WalRecord::VectorInsert {
collection: "turbo".to_string(),
entity_id: 42,
vector: vec![1.0, -0.5, 0.25],
};
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn test_read_page_write_large_data() {
let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
let original = WalRecord::PageWrite {
tx_id: 1,
page_id: 0,
data,
};
let encoded = original.encode();
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn page_write_compressed_roundtrip() {
let data = vec![0xABu8; 1024];
let record = WalRecord::PageWrite {
tx_id: 7,
page_id: 3,
data: data.clone(),
};
let encoded = record.encode();
assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(
decoded,
WalRecord::PageWrite {
tx_id: 7,
page_id: 3,
data
}
);
}
#[test]
fn full_page_image_roundtrip() {
let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
let original = WalRecord::FullPageImage {
tx_id: 11,
page_id: 9,
ckpt_epoch: 42,
data: data.clone(),
};
let encoded = original.encode();
assert_eq!(encoded[0], RecordType::FullPageImage as u8);
let mut cursor = Cursor::new(encoded);
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(decoded, original);
}
#[test]
fn full_page_image_checksum_mismatch_detected() {
let original = WalRecord::FullPageImage {
tx_id: 1,
page_id: 2,
ckpt_epoch: 3,
data: vec![0xAA; 32],
};
let mut encoded = original.encode();
let mid = encoded.len() / 2;
encoded[mid] ^= 0xFF;
let mut cursor = Cursor::new(encoded);
assert!(WalRecord::read(&mut cursor).is_err());
}
#[test]
fn test_read_eof() {
let mut cursor = Cursor::new(Vec::<u8>::new());
let result = WalRecord::read(&mut cursor).unwrap();
assert!(result.is_none());
}
#[test]
fn test_read_invalid_record_type() {
let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; let mut cursor = Cursor::new(buf);
let result = WalRecord::read(&mut cursor);
assert!(result.is_err());
}
#[test]
fn test_read_checksum_mismatch() {
let record = WalRecord::Begin { tx_id: 42 };
let mut encoded = record.encode();
let len = encoded.len();
encoded[len - 1] ^= 0xFF;
let mut cursor = Cursor::new(encoded);
let result = WalRecord::read(&mut cursor);
assert!(result.is_err());
}
#[test]
fn test_read_data_corruption() {
let record = WalRecord::PageWrite {
tx_id: 1,
page_id: 2,
data: vec![1, 2, 3, 4],
};
let mut encoded = record.encode();
encoded[15] ^= 0xFF;
let mut cursor = Cursor::new(encoded);
let result = WalRecord::read(&mut cursor);
assert!(result.is_err()); }
#[test]
fn test_multiple_records_sequential() {
let records = vec![
WalRecord::Begin { tx_id: 1 },
WalRecord::PageWrite {
tx_id: 1,
page_id: 10,
data: vec![1, 2, 3],
},
WalRecord::PageWrite {
tx_id: 1,
page_id: 20,
data: vec![4, 5, 6],
},
WalRecord::Commit { tx_id: 1 },
WalRecord::Checkpoint { lsn: 100 },
];
let mut buf = Vec::new();
for r in &records {
buf.extend_from_slice(&r.encode());
}
let mut cursor = Cursor::new(buf);
for expected in &records {
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(&decoded, expected);
}
assert!(WalRecord::read(&mut cursor).unwrap().is_none());
}
#[test]
fn test_encode_into_matches_encode_for_all_variants() {
let records = vec![
WalRecord::Begin { tx_id: 12345 },
WalRecord::Commit { tx_id: 99999 },
WalRecord::Rollback { tx_id: 54321 },
WalRecord::Checkpoint { lsn: 1_000_000 },
WalRecord::PageWrite {
tx_id: 100,
page_id: 42,
data: vec![1, 2, 3, 4, 5],
},
WalRecord::PageWrite {
tx_id: 7,
page_id: 3,
data: vec![0xABu8; 1024],
},
WalRecord::TxCommitBatch {
tx_id: 7,
actions: vec![b"insert".to_vec(), b"update".to_vec()],
},
WalRecord::FullPageImage {
tx_id: 11,
page_id: 9,
ckpt_epoch: 42,
data: (0..4096).map(|i| (i % 251) as u8).collect(),
},
WalRecord::VectorInsert {
collection: "turbo".to_string(),
entity_id: 42,
vector: vec![1.0, -0.5, 0.25],
},
];
for record in &records {
let baseline = record.encode();
let mut scratch = Vec::new();
record.encode_into(&mut scratch);
assert_eq!(scratch, baseline, "encode_into mismatch for {record:?}");
}
}
#[test]
fn test_encode_into_reuses_scratch_across_records() {
let records = vec![
WalRecord::Begin { tx_id: 1 },
WalRecord::PageWrite {
tx_id: 1,
page_id: 10,
data: vec![1, 2, 3],
},
WalRecord::Commit { tx_id: 1 },
];
let mut expected = Vec::new();
for r in &records {
expected.extend_from_slice(&r.encode());
}
let mut scratch = Vec::new();
for r in &records {
r.encode_into(&mut scratch);
}
assert_eq!(scratch, expected);
let mut cursor = Cursor::new(scratch);
for expected in &records {
let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
assert_eq!(&decoded, expected);
}
assert!(WalRecord::read(&mut cursor).unwrap().is_none());
}
#[test]
fn test_encode_with_term_into_matches_and_preserves_prefix() {
let prefix = b"PREFIX-BYTES".to_vec();
let record = WalRecord::Begin { tx_id: 42 };
let mut scratch = prefix.clone();
record.encode_with_term_into(&mut scratch, 9);
assert_eq!(&scratch[..prefix.len()], &prefix[..]);
assert_eq!(&scratch[prefix.len()..], &record.encode_with_term(9)[..]);
}
#[test]
fn test_wal_magic() {
assert_eq!(WAL_MAGIC, b"RDBW");
}
#[test]
fn test_wal_version() {
assert_eq!(WAL_VERSION, 3);
}
}