use super::record::{WalRecord, WAL_MAGIC, WAL_VERSION};
use std::fs::File;
use std::io::{self, BufReader, Read};
use std::path::Path;
pub struct WalReader {
reader: BufReader<File>,
position: u64,
}
impl WalReader {
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut header = [0u8; 8];
reader.read_exact(&mut header)?;
if &header[0..4] != WAL_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid WAL magic bytes",
));
}
if header[4] != WAL_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unsupported WAL version: {}", header[4]),
));
}
Ok(Self {
reader,
position: 8,
})
}
pub fn iter(self) -> WalIterator {
WalIterator {
reader: self.reader,
position: self.position,
}
}
}
pub struct WalIterator {
reader: BufReader<File>,
position: u64,
}
impl Iterator for WalIterator {
type Item = io::Result<(u64, WalRecord)>;
fn next(&mut self) -> Option<Self::Item> {
let start_pos = self.position;
struct CountingReader<'a, R> {
inner: &'a mut R,
count: u64,
}
impl<'a, R: Read> Read for CountingReader<'a, R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?;
self.count += n as u64;
Ok(n)
}
}
let mut counter = CountingReader {
inner: &mut self.reader,
count: 0,
};
match WalRecord::read(&mut counter) {
Ok(Some(record)) => {
self.position += counter.count;
Some(Ok((start_pos, record)))
}
Ok(None) => None, Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::super::writer::WalWriter;
use super::*;
use std::path::PathBuf;
struct FileGuard {
path: PathBuf,
}
impl Drop for FileGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
fn temp_wal(name: &str) -> (FileGuard, PathBuf) {
let path =
std::env::temp_dir().join(format!("rb_wal_reader_{}_{}.wal", name, std::process::id()));
let guard = FileGuard { path: path.clone() };
let _ = std::fs::remove_file(&path);
(guard, path)
}
#[test]
fn test_read_empty_wal() {
let (_guard, path) = temp_wal("empty");
{
let _writer = WalWriter::open(&path).unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.iter().collect();
assert!(records.is_empty());
}
#[test]
fn test_read_single_record() {
let (_guard, path) = temp_wal("single");
{
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 42 }).unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 1);
let (lsn, record) = records[0].as_ref().unwrap();
assert_eq!(*lsn, 8);
assert_eq!(*record, WalRecord::Begin { tx_id: 42 });
}
#[test]
fn test_read_multiple_records() {
let (_guard, path) = temp_wal("multi");
{
let mut writer = WalWriter::open(&path).unwrap();
writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
writer
.append(&WalRecord::PageWrite {
tx_id: 1,
page_id: 10,
data: vec![1, 2, 3],
})
.unwrap();
writer.append(&WalRecord::Commit { tx_id: 1 }).unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 3);
match &records[0].as_ref().unwrap().1 {
WalRecord::Begin { tx_id } => assert_eq!(*tx_id, 1),
_ => panic!("Expected Begin"),
}
match &records[1].as_ref().unwrap().1 {
WalRecord::PageWrite {
tx_id,
page_id,
data,
} => {
assert_eq!(*tx_id, 1);
assert_eq!(*page_id, 10);
assert_eq!(data, &vec![1, 2, 3]);
}
_ => panic!("Expected PageWrite"),
}
match &records[2].as_ref().unwrap().1 {
WalRecord::Commit { tx_id } => assert_eq!(*tx_id, 1),
_ => panic!("Expected Commit"),
}
}
#[test]
fn test_lsn_tracking() {
let (_guard, path) = temp_wal("lsn");
let (lsn1, lsn2, lsn3);
{
let mut writer = WalWriter::open(&path).unwrap();
lsn1 = writer.append(&WalRecord::Begin { tx_id: 1 }).unwrap();
lsn2 = writer.append(&WalRecord::Checkpoint { lsn: 100 }).unwrap();
lsn3 = writer.append(&WalRecord::Rollback { tx_id: 1 }).unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 3);
assert_eq!(records[0].as_ref().unwrap().0, lsn1);
assert_eq!(records[1].as_ref().unwrap().0, lsn2);
assert_eq!(records[2].as_ref().unwrap().0, lsn3);
}
#[test]
fn test_invalid_magic() {
let (_guard, path) = temp_wal("badmagic");
std::fs::write(&path, b"BAAD0000").unwrap();
let result = WalReader::open(&path);
assert!(result.is_err());
}
#[test]
fn test_invalid_version() {
let (_guard, path) = temp_wal("badver");
let mut header = Vec::new();
header.extend_from_slice(WAL_MAGIC);
header.push(99); header.extend_from_slice(&[0u8; 3]);
std::fs::write(&path, &header).unwrap();
let result = WalReader::open(&path);
assert!(result.is_err());
}
#[test]
fn test_read_large_page_write() {
let (_guard, path) = temp_wal("large");
let large_data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
{
let mut writer = WalWriter::open(&path).unwrap();
writer
.append(&WalRecord::PageWrite {
tx_id: 1,
page_id: 0,
data: large_data.clone(),
})
.unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 1);
match &records[0].as_ref().unwrap().1 {
WalRecord::PageWrite { data, .. } => {
assert_eq!(*data, large_data);
}
_ => panic!("Expected PageWrite"),
}
}
}