use std::fs::File;
use std::io::Read;
use std::path::Path;
use crate::error::{Result, WalError};
use crate::preamble::{PREAMBLE_SIZE, SegmentPreamble, WAL_PREAMBLE_MAGIC};
use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WalRecord};
pub struct WalReader {
file: File,
offset: u64,
segment_preamble: Option<SegmentPreamble>,
double_write: Option<crate::double_write::DoubleWriteBuffer>,
}
impl WalReader {
pub fn open(path: &Path) -> Result<Self> {
let mut file = File::open(path)?;
let dwb_path = path.with_extension("dwb");
let double_write = if dwb_path.exists() {
crate::double_write::DoubleWriteBuffer::open(
&dwb_path,
crate::double_write::DwbMode::Buffered,
)
.ok()
} else {
None
};
let (segment_preamble, start_offset) = try_read_preamble(&mut file)?;
Ok(Self {
file,
offset: start_offset,
segment_preamble,
double_write,
})
}
pub fn segment_preamble(&self) -> Option<&SegmentPreamble> {
self.segment_preamble.as_ref()
}
pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
loop {
let mut header_buf = [0u8; HEADER_SIZE];
match self.read_exact(&mut header_buf) {
Ok(()) => {}
Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None); }
Err(e) => return Err(e),
}
let header = RecordHeader::from_bytes(&header_buf);
if header.validate(self.offset - HEADER_SIZE as u64).is_err() {
return Ok(None);
}
let mut payload = vec![0u8; header.payload_len as usize];
if !payload.is_empty() {
match self.read_exact(&mut payload) {
Ok(()) => {}
Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => return Err(e),
}
}
let record = WalRecord { header, payload };
if record.verify_checksum().is_err() {
if let Some(dwb) = &mut self.double_write
&& let Ok(Some(recovered)) = dwb.recover_record(header.lsn)
{
tracing::info!(
lsn = header.lsn,
"recovered torn write from double-write buffer"
);
self.offset += recovered.payload.len() as u64;
return Ok(Some(recovered));
}
return Ok(None);
}
let logical_type = record.logical_record_type();
if RecordType::from_raw(logical_type).is_none() {
if RecordType::is_required(logical_type) {
return Err(WalError::UnknownRequiredRecordType {
record_type: header.record_type,
lsn: header.lsn,
});
}
continue;
}
return Ok(Some(record));
}
}
pub fn records(self) -> WalRecordIter {
WalRecordIter { reader: self }
}
pub fn offset(&self) -> u64 {
self.offset
}
fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
self.file.read_exact(buf)?;
self.offset += buf.len() as u64;
Ok(())
}
}
fn try_read_preamble(file: &mut File) -> Result<(Option<SegmentPreamble>, u64)> {
use std::io::Seek;
let mut buf = [0u8; PREAMBLE_SIZE];
match file.read_exact(&mut buf) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
file.seek(std::io::SeekFrom::Start(0))?;
return Ok((None, 0));
}
Err(e) => return Err(WalError::Io(e)),
}
if buf[0..4] == WAL_PREAMBLE_MAGIC {
let preamble = SegmentPreamble::from_bytes(&buf, &WAL_PREAMBLE_MAGIC)?;
Ok((Some(preamble), PREAMBLE_SIZE as u64))
} else {
file.seek(std::io::SeekFrom::Start(0))?;
Ok((None, 0))
}
}
pub struct WalRecordIter {
reader: WalReader,
}
impl Iterator for WalRecordIter {
type Item = Result<WalRecord>;
fn next(&mut self) -> Option<Self::Item> {
match self.reader.next_record() {
Ok(Some(record)) => Some(Ok(record)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::RecordType;
use crate::writer::WalWriter;
#[test]
fn write_then_read_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
{
let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
writer
.append(RecordType::Put as u32, 1, 0, 0, b"first")
.unwrap();
writer
.append(RecordType::Put as u32, 2, 1, 0, b"second")
.unwrap();
writer
.append(RecordType::Delete as u32, 1, 0, 0, b"third")
.unwrap();
writer.sync().unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0].header.lsn, 1);
assert_eq!(records[0].header.tenant_id, 1);
assert_eq!(records[0].payload, b"first");
assert_eq!(records[1].header.lsn, 2);
assert_eq!(records[1].header.tenant_id, 2);
assert_eq!(records[1].header.vshard_id, 1);
assert_eq!(records[1].payload, b"second");
assert_eq!(records[2].header.lsn, 3);
assert_eq!(records[2].header.record_type, RecordType::Delete as u32);
assert_eq!(records[2].payload, b"third");
}
#[test]
fn empty_wal_yields_no_records() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("empty.wal");
{
let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
writer.sync().unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
assert!(records.is_empty());
}
#[test]
fn truncated_file_stops_at_committed_prefix() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("truncated.wal");
{
let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
writer
.append(RecordType::Put as u32, 1, 0, 0, b"good-record")
.unwrap();
writer.sync().unwrap();
}
{
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
file.write_all(b"GARBAGE_PARTIAL_RECORD").unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].payload, b"good-record");
}
#[test]
fn skip_many_unknown_optional_records_is_iterative() {
const UNKNOWN_OPTIONAL: u32 = 99; const SKIP_COUNT: usize = 50_000;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("many_unknown.wal");
{
let mut writer = WalWriter::open_without_direct_io(&path).unwrap();
for _ in 0..SKIP_COUNT {
writer
.append(UNKNOWN_OPTIONAL, 1, 0, 0, b"skip-me")
.unwrap();
}
writer
.append(RecordType::Put as u32, 1, 0, 0, b"keep-me")
.unwrap();
writer.sync().unwrap();
}
let reader = WalReader::open(&path).unwrap();
let records: Vec<_> = reader.records().collect::<Result<_>>().unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].payload, b"keep-me");
}
}