use crate::error::{Result, WalError};
pub const WAL_MAGIC: u32 = 0x5359_4E57;
pub const WAL_FORMAT_VERSION: u16 = 1;
pub const MAX_WAL_PAYLOAD_SIZE: usize = 64 * 1024 * 1024;
pub const HEADER_SIZE: usize = 54;
pub const ENCRYPTED_FLAG: u32 = 0x0000_4000;
pub const REQUIRED_FLAG: u32 = 0x0000_8000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecordHeader {
pub magic: u32,
pub format_version: u16,
pub record_type: u32,
pub lsn: u64,
pub tenant_id: u64,
pub vshard_id: u32,
pub payload_len: u32,
pub database_id: u64,
pub reserved: [u8; 8],
pub crc32c: u32,
}
impl RecordHeader {
pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
let mut buf = [0u8; HEADER_SIZE];
buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
buf[4..6].copy_from_slice(&self.format_version.to_le_bytes());
buf[6..10].copy_from_slice(&self.record_type.to_le_bytes());
buf[10..18].copy_from_slice(&self.lsn.to_le_bytes());
buf[18..26].copy_from_slice(&self.tenant_id.to_le_bytes());
buf[26..30].copy_from_slice(&self.vshard_id.to_le_bytes());
buf[30..34].copy_from_slice(&self.payload_len.to_le_bytes());
buf[34..42].copy_from_slice(&self.database_id.to_le_bytes());
buf[42..50].copy_from_slice(&self.reserved);
buf[50..54].copy_from_slice(&self.crc32c.to_le_bytes());
buf
}
pub fn from_bytes(buf: &[u8; HEADER_SIZE]) -> Self {
let mut reserved = [0u8; 8];
reserved.copy_from_slice(&buf[42..50]);
Self {
magic: u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]),
format_version: u16::from_le_bytes([buf[4], buf[5]]),
record_type: u32::from_le_bytes([buf[6], buf[7], buf[8], buf[9]]),
lsn: u64::from_le_bytes([
buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], buf[16], buf[17],
]),
tenant_id: u64::from_le_bytes([
buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], buf[24], buf[25],
]),
vshard_id: u32::from_le_bytes([buf[26], buf[27], buf[28], buf[29]]),
payload_len: u32::from_le_bytes([buf[30], buf[31], buf[32], buf[33]]),
database_id: u64::from_le_bytes([
buf[34], buf[35], buf[36], buf[37], buf[38], buf[39], buf[40], buf[41],
]),
reserved,
crc32c: u32::from_le_bytes([buf[50], buf[51], buf[52], buf[53]]),
}
}
pub fn compute_checksum(&self, payload: &[u8]) -> u32 {
let header_bytes = self.to_bytes();
let mut digest = crc32c::crc32c(&header_bytes[..HEADER_SIZE - 4]);
digest = crc32c::crc32c_append(digest, payload);
digest
}
pub fn logical_record_type(&self) -> u32 {
self.record_type & !ENCRYPTED_FLAG
}
pub fn validate(&self, offset: u64) -> Result<()> {
if self.magic != WAL_MAGIC {
return Err(WalError::InvalidMagic {
offset,
expected: WAL_MAGIC,
actual: self.magic,
});
}
if self.format_version != WAL_FORMAT_VERSION {
return Err(WalError::UnsupportedVersion {
version: self.format_version,
supported: WAL_FORMAT_VERSION,
});
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_header(record_type: u32, vshard_id: u32) -> RecordHeader {
RecordHeader {
magic: WAL_MAGIC,
format_version: WAL_FORMAT_VERSION,
record_type,
lsn: 42,
tenant_id: 7,
vshard_id,
payload_len: 100,
database_id: 0,
reserved: [0u8; 8],
crc32c: 0xDEAD_BEEF,
}
}
#[test]
fn header_roundtrip() {
let header = make_header(1 | REQUIRED_FLAG, 3);
let bytes = header.to_bytes();
assert_eq!(header, RecordHeader::from_bytes(&bytes));
}
#[test]
fn header_golden_54_bytes_exact_offsets() {
let header = RecordHeader {
magic: WAL_MAGIC,
format_version: WAL_FORMAT_VERSION,
record_type: 1,
lsn: 0x0102_0304_0506_0708,
tenant_id: 0xDEAD_BEEF_CAFE_1234,
vshard_id: 0xCAFE_BABE,
payload_len: 256,
database_id: 0xABCD_0000_1234_5678,
reserved: [0u8; 8],
crc32c: 0x1234_5678,
};
let b = header.to_bytes();
assert_eq!(b.len(), 54);
assert_eq!(&b[0..4], &WAL_MAGIC.to_le_bytes());
assert_eq!(&b[4..6], &WAL_FORMAT_VERSION.to_le_bytes());
assert_eq!(&b[6..10], &1u32.to_le_bytes());
assert_eq!(&b[10..18], &0x0102_0304_0506_0708u64.to_le_bytes());
assert_eq!(&b[18..26], &0xDEAD_BEEF_CAFE_1234u64.to_le_bytes());
assert_eq!(&b[26..30], &0xCAFE_BABEu32.to_le_bytes());
assert_eq!(&b[30..34], &256u32.to_le_bytes());
assert_eq!(&b[34..42], &0xABCD_0000_1234_5678u64.to_le_bytes());
assert_eq!(&b[42..50], &[0u8; 8]);
assert_eq!(&b[50..54], &0x1234_5678u32.to_le_bytes());
}
#[test]
fn database_id_roundtrip() {
let header = RecordHeader {
magic: WAL_MAGIC,
format_version: WAL_FORMAT_VERSION,
record_type: 1,
lsn: 1,
tenant_id: 42,
vshard_id: 0,
payload_len: 0,
database_id: 7,
reserved: [0u8; 8],
crc32c: 0,
};
let bytes = header.to_bytes();
let decoded = RecordHeader::from_bytes(&bytes);
assert_eq!(decoded.database_id, 7);
}
#[test]
fn pre_tier2_zero_database_id_compat() {
let mut raw = [0u8; HEADER_SIZE];
raw[0..4].copy_from_slice(&WAL_MAGIC.to_le_bytes());
raw[4..6].copy_from_slice(&WAL_FORMAT_VERSION.to_le_bytes());
raw[6..10].copy_from_slice(&1u32.to_le_bytes()); let decoded = RecordHeader::from_bytes(&raw);
assert_eq!(decoded.database_id, 0);
}
#[test]
fn tenant_id_above_u32_max_roundtrip() {
let tid = u32::MAX as u64 + 1;
let header = RecordHeader {
magic: WAL_MAGIC,
format_version: WAL_FORMAT_VERSION,
record_type: 1,
lsn: 1,
tenant_id: tid,
vshard_id: 0,
payload_len: 0,
database_id: 0,
reserved: [0u8; 8],
crc32c: 0,
};
let bytes = header.to_bytes();
let decoded = RecordHeader::from_bytes(&bytes);
assert_eq!(decoded.tenant_id, tid);
}
#[test]
fn invalid_magic_detected() {
let mut header = make_header(0, 0);
header.magic = 0xBAD0_F00D;
assert!(matches!(
header.validate(0),
Err(WalError::InvalidMagic { .. })
));
}
#[test]
fn unsupported_version_detected() {
let mut header = make_header(0, 0);
header.format_version = WAL_FORMAT_VERSION + 1;
assert!(matches!(
header.validate(0),
Err(WalError::UnsupportedVersion { .. })
));
}
#[test]
fn version_4_rejected() {
let mut header = make_header(0, 0);
header.format_version = 4;
assert!(matches!(
header.validate(0),
Err(WalError::UnsupportedVersion { version: 4, .. })
));
}
#[test]
fn large_vshard_id_roundtrip() {
let header = make_header(1, 0x1234_5678);
let bytes = header.to_bytes();
let decoded = RecordHeader::from_bytes(&bytes);
assert_eq!(decoded.vshard_id, 0x1234_5678u32);
}
#[test]
fn encrypted_flag_is_u32() {
let header = make_header(1 | ENCRYPTED_FLAG, 0);
assert_eq!(header.logical_record_type(), 1);
assert!(header.record_type & ENCRYPTED_FLAG != 0);
}
#[test]
fn large_record_type_roundtrip() {
let header = make_header(0x0001_0001, 0);
let bytes = header.to_bytes();
let decoded = RecordHeader::from_bytes(&bytes);
assert_eq!(decoded.record_type, 0x0001_0001u32);
let with_flags = make_header(0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG, 0);
let bytes2 = with_flags.to_bytes();
let decoded2 = RecordHeader::from_bytes(&bytes2);
assert_eq!(
decoded2.record_type,
0x0001_0001 | ENCRYPTED_FLAG | REQUIRED_FLAG
);
assert_eq!(decoded2.logical_record_type(), 0x0001_0001 | REQUIRED_FLAG);
}
}