use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::Path;
use rkyv::rancor::Error as RkyvError;
use super::entry::PerCoreWalEntry;
use super::error::PerCoreWalError;
const RECORD_HEADER_SIZE: u64 = 8;
#[derive(Debug)]
pub enum WalReadResult {
Entry(PerCoreWalEntry),
Eof,
TornWrite {
position: u64,
reason: String,
},
ChecksumMismatch {
position: u64,
expected: u32,
actual: u32,
},
Corrupted {
position: u64,
reason: String,
},
}
pub struct PerCoreWalReader {
core_id: usize,
reader: BufReader<File>,
position: u64,
file_len: u64,
}
impl PerCoreWalReader {
pub fn open(core_id: usize, path: &Path) -> Result<Self, PerCoreWalError> {
let file = File::open(path)?;
let file_len = file.metadata()?.len();
let reader = BufReader::new(file);
Ok(Self {
core_id,
reader,
position: 0,
file_len,
})
}
pub fn open_from(core_id: usize, path: &Path, position: u64) -> Result<Self, PerCoreWalError> {
let file = File::open(path)?;
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::Start(position))?;
Ok(Self {
core_id,
reader,
position,
file_len,
})
}
#[must_use]
pub fn core_id(&self) -> usize {
self.core_id
}
#[must_use]
pub fn position(&self) -> u64 {
self.position
}
#[must_use]
pub fn file_len(&self) -> u64 {
self.file_len
}
pub fn read_next(&mut self) -> Result<WalReadResult, PerCoreWalError> {
let remaining = self.file_len.saturating_sub(self.position);
if remaining == 0 {
return Ok(WalReadResult::Eof);
}
if remaining < RECORD_HEADER_SIZE {
return Ok(WalReadResult::TornWrite {
position: self.position,
reason: format!(
"incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
),
});
}
let record_start = self.position;
let mut len_bytes = [0u8; 4];
self.reader.read_exact(&mut len_bytes)?;
let len = u64::from(u32::from_le_bytes(len_bytes));
self.position += 4;
let mut crc_bytes = [0u8; 4];
self.reader.read_exact(&mut crc_bytes)?;
let expected_crc = u32::from_le_bytes(crc_bytes);
self.position += 4;
if len > 256 * 1024 * 1024 {
return Ok(WalReadResult::Corrupted {
position: record_start,
reason: format!("entry length {len} exceeds 256 MiB — likely corrupted"),
});
}
let data_remaining = self.file_len.saturating_sub(self.position);
if data_remaining < len {
return Ok(WalReadResult::TornWrite {
position: record_start,
reason: format!(
"incomplete data: only {data_remaining} bytes remaining, need {len}"
),
});
}
#[allow(clippy::cast_possible_truncation)]
let mut data = vec![0u8; len as usize];
self.reader.read_exact(&mut data)?;
self.position += len;
let actual_crc = crc32c::crc32c(&data);
if actual_crc != expected_crc {
return Ok(WalReadResult::ChecksumMismatch {
position: record_start,
expected: expected_crc,
actual: actual_crc,
});
}
match rkyv::from_bytes::<PerCoreWalEntry, RkyvError>(&data) {
Ok(entry) => Ok(WalReadResult::Entry(entry)),
Err(e) => Err(PerCoreWalError::Deserialization(e.to_string())),
}
}
pub fn read_all(&mut self) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
let mut entries = Vec::new();
while let WalReadResult::Entry(entry) = self.read_next()? {
entries.push(entry);
}
Ok(entries)
}
pub fn read_up_to_epoch(
&mut self,
max_epoch: u64,
) -> Result<Vec<PerCoreWalEntry>, PerCoreWalError> {
let mut entries = Vec::new();
while let WalReadResult::Entry(entry) = self.read_next()? {
if entry.epoch > max_epoch {
break;
}
entries.push(entry);
}
Ok(entries)
}
pub fn find_valid_end(&mut self) -> Result<u64, PerCoreWalError> {
let mut valid_position = self.position;
loop {
let pos_before = self.position;
match self.read_next()? {
WalReadResult::Entry(_) => {
valid_position = self.position;
}
WalReadResult::Eof => {
break;
}
WalReadResult::TornWrite { .. }
| WalReadResult::ChecksumMismatch { .. }
| WalReadResult::Corrupted { .. } => {
valid_position = pos_before;
break;
}
}
}
Ok(valid_position)
}
}
impl Iterator for PerCoreWalReader {
type Item = Result<PerCoreWalEntry, PerCoreWalError>;
fn next(&mut self) -> Option<Self::Item> {
match self.read_next() {
Ok(WalReadResult::Entry(entry)) => Some(Ok(entry)),
Ok(WalReadResult::Eof) => None,
Ok(WalReadResult::TornWrite { position, reason }) => {
Some(Err(PerCoreWalError::TornWrite {
core_id: self.core_id,
position,
reason,
}))
}
Ok(WalReadResult::ChecksumMismatch {
position,
expected,
actual,
}) => Some(Err(PerCoreWalError::ChecksumMismatch {
core_id: self.core_id,
position,
expected,
actual,
})),
Ok(WalReadResult::Corrupted { position, reason }) => {
Some(Err(PerCoreWalError::Corrupted {
core_id: self.core_id,
position,
reason,
}))
}
Err(e) => Some(Err(e)),
}
}
}
impl std::fmt::Debug for PerCoreWalReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PerCoreWalReader")
.field("core_id", &self.core_id)
.field("position", &self.position)
.field("file_len", &self.file_len)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::per_core_wal::writer::CoreWalWriter;
use tempfile::TempDir;
fn setup_test_segment(core_id: usize) -> (TempDir, std::path::PathBuf) {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join(format!("wal-{core_id}.log"));
{
let mut writer = CoreWalWriter::new(core_id, &path).unwrap();
writer.set_epoch(1);
writer.append_put(b"key1", b"value1").unwrap();
writer.append_put(b"key2", b"value2").unwrap();
writer.set_epoch(2);
writer.append_put(b"key3", b"value3").unwrap();
writer.sync().unwrap();
}
(temp_dir, path)
}
#[test]
fn test_reader_open() {
let (_temp_dir, path) = setup_test_segment(0);
let reader = PerCoreWalReader::open(0, &path).unwrap();
assert_eq!(reader.core_id(), 0);
assert_eq!(reader.position(), 0);
assert!(reader.file_len() > 0);
}
#[test]
fn test_read_all() {
let (_temp_dir, path) = setup_test_segment(0);
let mut reader = PerCoreWalReader::open(0, &path).unwrap();
let entries = reader.read_all().unwrap();
assert_eq!(entries.len(), 3);
assert!(entries[0].is_put());
assert_eq!(entries[0].key(), Some(b"key1".as_slice()));
assert_eq!(entries[0].epoch, 1);
assert_eq!(entries[2].epoch, 2);
}
#[test]
fn test_read_up_to_epoch() {
let (_temp_dir, path) = setup_test_segment(0);
let mut reader = PerCoreWalReader::open(0, &path).unwrap();
let entries = reader.read_up_to_epoch(1).unwrap();
assert_eq!(entries.len(), 2); }
#[test]
fn test_iterator() {
let (_temp_dir, path) = setup_test_segment(0);
let reader = PerCoreWalReader::open(0, &path).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 3);
}
#[test]
fn test_open_from_position() {
let (_temp_dir, path) = setup_test_segment(0);
let mut reader1 = PerCoreWalReader::open(0, &path).unwrap();
let _ = reader1.read_next().unwrap();
let pos_after_first = reader1.position();
let mut reader2 = PerCoreWalReader::open_from(0, &path, pos_after_first).unwrap();
let entries = reader2.read_all().unwrap();
assert_eq!(entries.len(), 2); }
#[test]
fn test_empty_segment() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("wal-0.log");
{
let _writer = CoreWalWriter::new(0, &path).unwrap();
}
let mut reader = PerCoreWalReader::open(0, &path).unwrap();
match reader.read_next().unwrap() {
WalReadResult::Eof => {}
other => panic!("Expected Eof, got {other:?}"),
}
}
#[test]
fn test_find_valid_end() {
let (_temp_dir, path) = setup_test_segment(0);
let mut reader = PerCoreWalReader::open(0, &path).unwrap();
let valid_end = reader.find_valid_end().unwrap();
assert_eq!(valid_end, reader.file_len()); }
#[test]
fn test_torn_write_detection() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("wal-0.log");
{
let mut writer = CoreWalWriter::new(0, &path).unwrap();
writer.append_put(b"key1", b"value1").unwrap();
writer.sync().unwrap();
}
{
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
file.sync_all().unwrap();
}
let mut reader = PerCoreWalReader::open(0, &path).unwrap();
match reader.read_next().unwrap() {
WalReadResult::Entry(entry) => {
assert_eq!(entry.key(), Some(b"key1".as_slice()));
}
other => panic!("Expected Entry, got {other:?}"),
}
match reader.read_next().unwrap() {
WalReadResult::TornWrite { .. } => {}
other => panic!("Expected TornWrite, got {other:?}"),
}
}
#[test]
fn test_checksum_mismatch() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("wal-0.log");
{
let mut writer = CoreWalWriter::new(0, &path).unwrap();
writer.append_put(b"key1", b"value1").unwrap();
writer.sync().unwrap();
}
{
use std::io::Write;
let mut file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
file.seek(SeekFrom::Start(10)).unwrap();
file.write_all(&[0xFF]).unwrap();
file.sync_all().unwrap();
}
let mut reader = PerCoreWalReader::open(0, &path).unwrap();
match reader.read_next().unwrap() {
WalReadResult::ChecksumMismatch {
position,
expected,
actual,
} => {
assert_eq!(position, 0);
assert_ne!(expected, actual);
}
other => panic!("Expected ChecksumMismatch, got {other:?}"),
}
}
#[test]
fn test_debug_format() {
let (_temp_dir, path) = setup_test_segment(0);
let reader = PerCoreWalReader::open(0, &path).unwrap();
let debug_str = format!("{reader:?}");
assert!(debug_str.contains("PerCoreWalReader"));
assert!(debug_str.contains("core_id"));
}
}