use crate::types::{MeruError, Result};
use crc32fast::Hasher as Crc32;
use crate::wal::format::{RecordType, BLOCK_SIZE, HEADER_SIZE, RECYCLABLE_HEADER_SIZE};
pub trait WalSource: Send {
fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<usize>;
fn size(&self) -> Result<u64>;
}
pub struct VecSource {
data: Vec<u8>,
}
impl VecSource {
pub fn new(data: Vec<u8>) -> Self {
Self { data }
}
}
impl WalSource for VecSource {
fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<usize> {
let start = offset as usize;
if start >= self.data.len() {
return Ok(0);
}
let end = (start + buf.len()).min(self.data.len());
let n = end - start;
buf[..n].copy_from_slice(&self.data[start..end]);
Ok(n)
}
fn size(&self) -> Result<u64> {
Ok(self.data.len() as u64)
}
}
pub struct FileSource {
file: std::fs::File,
size: u64,
}
impl FileSource {
pub fn open(path: &std::path::Path) -> Result<Self> {
let file = std::fs::File::open(path)?;
let size = file.metadata()?.len();
Ok(Self { file, size })
}
}
impl WalSource for FileSource {
fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<usize> {
use std::io::{Read, Seek, SeekFrom};
self.file.seek(SeekFrom::Start(offset))?;
let n = self.file.read(buf)?;
Ok(n)
}
fn size(&self) -> Result<u64> {
Ok(self.size)
}
}
pub struct WalReader {
source: Box<dyn WalSource>,
file_offset: u64,
block_offset: usize,
recyclable: bool,
expected_log_number: u32,
eof: bool,
}
impl WalReader {
pub fn new<S: WalSource + 'static>(source: S) -> Self {
Self {
source: Box::new(source),
file_offset: 0,
block_offset: 0,
recyclable: false,
expected_log_number: 0,
eof: false,
}
}
pub fn new_recyclable<S: WalSource + 'static>(source: S, expected_log_number: u32) -> Self {
Self {
source: Box::new(source),
file_offset: 0,
block_offset: 0,
recyclable: true,
expected_log_number,
eof: false,
}
}
pub fn records(&mut self) -> RecordIter<'_> {
RecordIter { reader: self }
}
fn next_record(&mut self) -> Option<Result<Vec<u8>>> {
if self.eof {
return None;
}
let mut assembled: Vec<u8> = Vec::new();
let mut in_fragment = false;
loop {
match self.read_physical_record() {
Ok(None) => {
self.eof = true;
if in_fragment {
return None;
}
return None;
}
Ok(Some((rtype, payload))) => match rtype {
RecordType::Full | RecordType::RecyclableFull => {
if in_fragment {
return Some(Err(MeruError::Corruption(
"Full record while assembling fragment".into(),
)));
}
return Some(Ok(payload));
}
RecordType::First | RecordType::RecyclableFirst => {
if in_fragment {
return Some(Err(MeruError::Corruption(
"First record while already in fragment".into(),
)));
}
assembled = payload;
in_fragment = true;
}
RecordType::Middle | RecordType::RecyclableMiddle => {
if !in_fragment {
return Some(Err(MeruError::Corruption(
"Middle record without First".into(),
)));
}
assembled.extend_from_slice(&payload);
}
RecordType::Last | RecordType::RecyclableLast => {
if !in_fragment {
return Some(Err(MeruError::Corruption(
"Last record without First".into(),
)));
}
assembled.extend_from_slice(&payload);
return Some(Ok(assembled));
}
},
Err(_) => {
self.eof = true;
return None;
}
}
}
}
fn read_physical_record(&mut self) -> Result<Option<(RecordType, Vec<u8>)>> {
let header_size = if self.recyclable {
RECYCLABLE_HEADER_SIZE
} else {
HEADER_SIZE
};
loop {
if BLOCK_SIZE - self.block_offset < header_size {
let skip = BLOCK_SIZE - self.block_offset;
self.file_offset += skip as u64;
self.block_offset = 0;
}
let mut header = [0u8; RECYCLABLE_HEADER_SIZE];
let n = self
.source
.read_at(self.file_offset, &mut header[..header_size])?;
if n == 0 {
return Ok(None);
}
if n < header_size {
return Err(MeruError::Corruption("truncated WAL header".into()));
}
let stored_crc = u32::from_le_bytes(header[..4].try_into().unwrap());
let length = u16::from_le_bytes(header[4..6].try_into().unwrap()) as usize;
let type_byte = header[6];
if stored_crc == 0 && length == 0 && type_byte == 0 {
let skip = BLOCK_SIZE - self.block_offset;
self.file_offset += skip as u64;
self.block_offset = 0;
continue;
}
let rtype = RecordType::from_byte(type_byte).ok_or_else(|| {
MeruError::Corruption(format!("unknown record type {type_byte:#x}"))
})?;
if self.recyclable {
if !rtype.is_recyclable() {
return Err(MeruError::Corruption(format!(
"non-recyclable record type {type_byte:#x} in recyclable log"
)));
}
let embedded_log =
u32::from_le_bytes(header[7..RECYCLABLE_HEADER_SIZE].try_into().unwrap());
if embedded_log != self.expected_log_number {
return Ok(None);
}
} else if rtype.is_recyclable() {
return Err(MeruError::Corruption(format!(
"recyclable record type {type_byte:#x} in non-recyclable log"
)));
}
let payload_offset = self.file_offset + header_size as u64;
let mut payload = vec![0u8; length];
if length > 0 {
let n = self.source.read_at(payload_offset, &mut payload)?;
if n < length {
return Err(MeruError::Corruption(format!(
"truncated WAL payload: need {length}, got {n}"
)));
}
}
let mut hasher = Crc32::new();
hasher.update(&[type_byte]);
hasher.update(&payload);
let computed_crc = hasher.finalize();
if computed_crc != stored_crc {
return Err(MeruError::Corruption(format!(
"WAL CRC mismatch: stored {stored_crc:#x}, computed {computed_crc:#x}"
)));
}
let total = header_size + length;
self.file_offset += total as u64;
self.block_offset = (self.block_offset + total) % BLOCK_SIZE;
return Ok(Some((rtype, payload)));
} }
}
pub struct RecordIter<'a> {
reader: &'a mut WalReader,
}
impl<'a> Iterator for RecordIter<'a> {
type Item = Result<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.next_record()
}
}