use super::record::{Record, RecordError};
use bytes::Bytes;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom};
use std::path::Path;
use thiserror::Error;
const MAGIC: u32 = 0x574C_4F47;
const VERSION_V1: u32 = 0x0000_0001; const VERSION_V2: u32 = 0x0000_0002; const HEADER_SIZE: u64 = 8;
#[derive(Debug, Error)]
pub enum ReaderError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Record error: {0}")]
Record(#[from] RecordError),
#[error("Invalid WAL format: bad magic or version")]
InvalidFormat,
#[error("CRC mismatch: expected {expected:#010x}, got {actual:#010x}")]
CrcMismatch { expected: u32, actual: u32 },
}
pub type Result<T> = std::result::Result<T, ReaderError>;
pub struct WALReader {
file: File,
offset: u64,
version: u32,
}
impl WALReader {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let mut file = File::open(path)?;
let mut header = [0u8; 8];
file.read_exact(&mut header)?;
let magic = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
if magic != MAGIC || (version != VERSION_V1 && version != VERSION_V2) {
return Err(ReaderError::InvalidFormat);
}
Ok(Self {
file,
offset: HEADER_SIZE,
version,
})
}
pub fn read_next(&mut self) -> Result<Option<Record>> {
if self.version == VERSION_V2 {
self.read_next_v2()
} else {
self.read_next_v1()
}
}
fn read_next_v1(&mut self) -> Result<Option<Record>> {
let mut len_buf = [0u8; 4];
match self.file.read_exact(&mut len_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => return Err(e.into()),
}
let total_len = u32::from_be_bytes(len_buf) as usize;
let mut data_buf = vec![0u8; total_len];
self.file.read_exact(&mut data_buf)?;
self.offset += (4 + total_len) as u64;
let record = Record::decode(Bytes::from(data_buf))?;
Ok(Some(record))
}
fn read_next_v2(&mut self) -> Result<Option<Record>> {
let mut crc_buf = [0u8; 4];
match self.file.read_exact(&mut crc_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => return Err(e.into()),
}
let stored_crc = u32::from_le_bytes(crc_buf);
let mut len_buf = [0u8; 4];
self.file.read_exact(&mut len_buf)?;
let total_len = u32::from_be_bytes(len_buf) as usize;
let mut data_buf = vec![0u8; total_len];
self.file.read_exact(&mut data_buf)?;
let computed_crc = crc32c::crc32c_append(crc32c::crc32c(&len_buf), &data_buf);
if computed_crc != stored_crc {
return Err(ReaderError::CrcMismatch {
expected: stored_crc,
actual: computed_crc,
});
}
self.offset += (4 + 4 + total_len) as u64;
let record = Record::decode(Bytes::from(data_buf))?;
Ok(Some(record))
}
pub fn read_all(&mut self) -> Result<Vec<Record>> {
let mut records = Vec::new();
while let Some(record) = self.read_next()? {
records.push(record);
}
Ok(records)
}
pub fn seek(&mut self, offset: u64) -> Result<()> {
self.file.seek(SeekFrom::Start(offset))?;
self.offset = offset;
Ok(())
}
#[must_use]
pub const fn offset(&self) -> u64 {
self.offset
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wal::{Record, SyncPolicy, WAL};
use bytes::Bytes;
use tempfile::tempdir;
#[test]
fn test_reader_read_all() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
let records = vec![
Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
},
Record::Put {
key: Bytes::from("key2"),
value: Bytes::from("value2"),
seq: 2,
},
Record::Delete {
key: Bytes::from("key1"),
seq: 3,
},
];
wal.write_batch(&records).unwrap();
}
let mut reader = WALReader::open(&wal_path).unwrap();
let records = reader.read_all().unwrap();
assert_eq!(records.len(), 3);
assert!(matches!(records[0], Record::Put { .. }));
assert!(matches!(records[1], Record::Put { .. }));
assert!(matches!(records[2], Record::Delete { .. }));
}
#[test]
fn test_reader_read_next() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
wal.write(&Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
})
.unwrap();
}
let mut reader = WALReader::open(&wal_path).unwrap();
let record1 = reader.read_next().unwrap();
assert!(record1.is_some());
let record2 = reader.read_next().unwrap();
assert!(record2.is_none()); }
}