use crate::journal::format::{decode_frame, FrameDecode, FRAME_OVERHEAD};
use crate::journal::Lsn;
use crate::{Error, Result};
use std::fs::{File, OpenOptions};
use std::io::Read;
use std::path::Path;
const READ_BUF_SIZE: usize = 64 * 1024;
const PAD_SKIP_GRANULARITY: u64 = 512;
const MAX_PAD_SKIP_SECTORS: u32 = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum JournalTailState {
CleanEnd,
TruncatedHeader,
TruncatedPayload,
ChecksumMismatch,
BadMagic,
LengthOverflow,
}
#[derive(Debug, Clone)]
pub struct JournalRecord {
pub lsn: Lsn,
pub payload: Vec<u8>,
}
pub struct JournalReader {
file: File,
file_size: u64,
cursor: u64,
last_state: JournalTailState,
}
impl JournalReader {
pub fn open(path: &Path) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.open(path)
.map_err(Error::Io)?;
let file_size = file.metadata().map_err(Error::Io)?.len();
Ok(Self {
file,
file_size,
cursor: 0,
last_state: JournalTailState::CleanEnd,
})
}
pub fn seek_to(&mut self, lsn: Lsn) {
self.cursor = lsn.0;
self.last_state = JournalTailState::CleanEnd;
}
#[must_use]
pub fn position(&self) -> Lsn {
Lsn(self.cursor)
}
#[must_use]
pub fn file_size(&self) -> u64 {
self.file_size
}
pub fn refresh_size(&mut self) -> Result<()> {
self.file_size = self.file.metadata().map_err(Error::Io)?.len();
Ok(())
}
#[must_use]
pub fn tail_state(&self) -> JournalTailState {
self.last_state
}
pub fn advise(&self, offset: u64, len: u64, advice: crate::Advice) -> Result<()> {
crate::platform::advise(&self.file, offset, len, advice)
}
pub fn advise_sequential(&self) -> Result<()> {
self.advise(0, 0, crate::Advice::Sequential)
}
pub fn iter(&mut self) -> JournalIter<'_> {
JournalIter::new(self)
}
pub fn read_at_lsn(&mut self, lsn: Lsn) -> Result<JournalRecord> {
let mut header = [0u8; FRAME_OVERHEAD];
self.read_exact_at(lsn.0, &mut header[..8])?;
let length = u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
let mut frame = vec![0u8; FRAME_OVERHEAD + length];
self.read_exact_at(lsn.0, &mut frame)?;
match decode_frame(&frame) {
FrameDecode::Ok {
payload_start,
payload_end,
..
} => Ok(JournalRecord {
lsn,
payload: frame[payload_start..payload_end].to_vec(),
}),
FrameDecode::BadMagic => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("bad magic at LSN {lsn}"),
))),
FrameDecode::ChecksumMismatch => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("CRC-32C mismatch at LSN {lsn}"),
))),
FrameDecode::Truncated => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("frame at LSN {lsn} is truncated"),
))),
FrameDecode::LengthOverflow => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("frame length at LSN {lsn} exceeds FRAME_MAX_PAYLOAD"),
))),
}
}
fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<()> {
use std::io::Seek;
let _ = self
.file
.seek(std::io::SeekFrom::Start(offset))
.map_err(Error::Io)?;
let mut total = 0;
while total < buf.len() {
let n = self.file.read(&mut buf[total..]).map_err(Error::Io)?;
if n == 0 {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"unexpected EOF in journal read_exact_at",
)));
}
total += n;
}
Ok(())
}
}
pub struct JournalIter<'a> {
reader: &'a mut JournalReader,
buf: Vec<u8>,
valid_start: usize,
valid_end: usize,
finished: bool,
pad_skips_in_a_row: u32,
}
impl<'a> JournalIter<'a> {
fn new(reader: &'a mut JournalReader) -> Self {
Self {
reader,
buf: vec![0u8; READ_BUF_SIZE],
valid_start: 0,
valid_end: 0,
finished: false,
pad_skips_in_a_row: 0,
}
}
fn refill(&mut self) -> Result<()> {
if self.valid_start > 0 {
let remaining = self.valid_end - self.valid_start;
self.buf.copy_within(self.valid_start..self.valid_end, 0);
self.valid_start = 0;
self.valid_end = remaining;
}
let space = self.buf.len() - self.valid_end;
if space == 0 {
if self.valid_end - self.valid_start < 8 {
return Ok(()); }
let len_bytes = &self.buf[self.valid_start + 4..self.valid_start + 8];
let payload_len =
u32::from_le_bytes([len_bytes[0], len_bytes[1], len_bytes[2], len_bytes[3]])
as usize;
let needed = FRAME_OVERHEAD + payload_len;
if self.buf.len() < needed {
self.buf.resize(needed, 0);
}
}
let file_offset = self.reader.cursor + self.valid_end as u64;
if file_offset >= self.reader.file_size {
return Ok(()); }
let to_read = std::cmp::min(
(self.reader.file_size - file_offset) as usize,
self.buf.len() - self.valid_end,
);
if to_read == 0 {
return Ok(());
}
let buf_slice = &mut self.buf[self.valid_end..self.valid_end + to_read];
use std::io::Seek;
let _ = self
.reader
.file
.seek(std::io::SeekFrom::Start(file_offset))
.map_err(Error::Io)?;
let mut total = 0;
while total < buf_slice.len() {
let n = self
.reader
.file
.read(&mut buf_slice[total..])
.map_err(Error::Io)?;
if n == 0 {
break;
}
total += n;
}
self.valid_end += total;
Ok(())
}
}
impl<'a> Iterator for JournalIter<'a> {
type Item = Result<JournalRecord>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
loop {
let need_refill = (self.valid_end - self.valid_start) < 8;
let next_unread_file_offset =
self.reader.cursor + (self.valid_end - self.valid_start) as u64;
let at_eof = next_unread_file_offset >= self.reader.file_size
&& (self.valid_end - self.valid_start) == 0;
if at_eof {
self.reader.last_state = JournalTailState::CleanEnd;
self.finished = true;
return None;
}
if need_refill {
if let Err(e) = self.refill() {
self.finished = true;
return Some(Err(e));
}
if (self.valid_end - self.valid_start) < 8 {
if (self.valid_end - self.valid_start) == 0 {
self.reader.last_state = JournalTailState::CleanEnd;
} else {
self.reader.last_state = JournalTailState::TruncatedHeader;
}
self.finished = true;
return None;
}
}
let view = &self.buf[self.valid_start..self.valid_end];
match decode_frame(view) {
FrameDecode::Ok {
consumed,
payload_start,
payload_end,
} => {
let lsn = Lsn(self.reader.cursor);
let payload = view[payload_start..payload_end].to_vec();
self.reader.cursor += consumed as u64;
self.valid_start += consumed;
self.pad_skips_in_a_row = 0;
return Some(Ok(JournalRecord { lsn, payload }));
}
FrameDecode::Truncated => {
let bytes_in_buffer = self.valid_end - self.valid_start;
let next_unread_file_offset = self.reader.cursor + bytes_in_buffer as u64;
let bytes_we_could_still_read = self
.reader
.file_size
.saturating_sub(next_unread_file_offset);
if bytes_we_could_still_read == 0 {
self.reader.last_state = if bytes_in_buffer < 8 {
JournalTailState::TruncatedHeader
} else {
JournalTailState::TruncatedPayload
};
self.finished = true;
return None;
}
if let Err(e) = self.refill() {
self.finished = true;
return Some(Err(e));
}
continue;
}
FrameDecode::BadMagic => {
let view = &self.buf[self.valid_start..self.valid_end];
let header_zero = view.len() >= 4
&& view[0] == 0
&& view[1] == 0
&& view[2] == 0
&& view[3] == 0;
if header_zero && self.pad_skips_in_a_row < MAX_PAD_SKIP_SECTORS {
let cur = self.reader.cursor;
let next = (cur / PAD_SKIP_GRANULARITY + 1) * PAD_SKIP_GRANULARITY;
let advance = (next - cur) as usize;
let buffered = self.valid_end - self.valid_start;
let drop_from_buf = advance.min(buffered);
self.valid_start += drop_from_buf;
self.reader.cursor = next;
self.pad_skips_in_a_row += 1;
continue;
}
self.reader.last_state = JournalTailState::BadMagic;
self.finished = true;
return None;
}
FrameDecode::LengthOverflow => {
self.reader.last_state = JournalTailState::LengthOverflow;
self.finished = true;
return None;
}
FrameDecode::ChecksumMismatch => {
self.reader.last_state = JournalTailState::ChecksumMismatch;
self.finished = true;
return None;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::journal::JournalHandle;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
static C: AtomicU64 = AtomicU64::new(0);
fn tmp_path(tag: &str) -> PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_reader_test_{}_{}_{tag}",
std::process::id(),
n
))
}
struct Cleanup(PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn iter_yields_appended_records_in_order() {
let path = tmp_path("ordered");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"alpha").unwrap();
let _ = writer.append(b"beta").unwrap();
let _ = writer.append(b"gamma").unwrap();
writer.close().unwrap();
let mut reader = JournalReader::open(&path).unwrap();
let recs: Vec<JournalRecord> = reader.iter().map(|r| r.unwrap()).collect();
assert_eq!(recs.len(), 3);
assert_eq!(recs[0].payload, b"alpha");
assert_eq!(recs[1].payload, b"beta");
assert_eq!(recs[2].payload, b"gamma");
assert!(recs[0].lsn < recs[1].lsn);
assert!(recs[1].lsn < recs[2].lsn);
assert_eq!(reader.tail_state(), JournalTailState::CleanEnd);
}
#[test]
fn iter_handles_empty_file_cleanly() {
let path = tmp_path("empty_file");
let _g = Cleanup(path.clone());
std::fs::write(&path, b"").unwrap();
let mut reader = JournalReader::open(&path).unwrap();
assert!(reader.iter().next().is_none());
assert_eq!(reader.tail_state(), JournalTailState::CleanEnd);
}
#[test]
fn iter_handles_journal_with_only_empty_records() {
let path = tmp_path("empty_records");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"").unwrap();
let _ = writer.append(b"").unwrap();
let _ = writer.append(b"").unwrap();
writer.close().unwrap();
let mut reader = JournalReader::open(&path).unwrap();
let recs: Vec<JournalRecord> = reader.iter().map(|r| r.unwrap()).collect();
assert_eq!(recs.len(), 3);
for r in &recs {
assert_eq!(r.payload, b"");
}
assert_eq!(reader.tail_state(), JournalTailState::CleanEnd);
}
#[test]
fn read_at_lsn_returns_record_at_position() {
let path = tmp_path("read_at");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"first").unwrap(); let lsn_before_second = writer.next_lsn();
let _ = writer.append(b"second").unwrap();
let lsn_before_third = writer.next_lsn();
let _ = writer.append(b"third").unwrap();
writer.close().unwrap();
let mut reader = JournalReader::open(&path).unwrap();
let r0 = reader.read_at_lsn(Lsn(0)).unwrap();
assert_eq!(r0.payload, b"first");
let r1 = reader.read_at_lsn(lsn_before_second).unwrap();
assert_eq!(r1.payload, b"second");
let r2 = reader.read_at_lsn(lsn_before_third).unwrap();
assert_eq!(r2.payload, b"third");
}
#[test]
fn truncated_header_detected_as_tail_state() {
let path = tmp_path("trunc_header");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"good").unwrap();
writer.close().unwrap();
use std::io::Write;
let mut f = OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(b"\x46\x53\x59\x01\xAB").unwrap();
drop(f);
let mut reader = JournalReader::open(&path).unwrap();
let first = reader.iter().next();
assert!(first.is_some());
let next = reader.iter().next();
assert!(next.is_none());
assert_eq!(reader.tail_state(), JournalTailState::TruncatedHeader);
}
#[test]
fn truncated_payload_detected_as_tail_state() {
let path = tmp_path("trunc_payload");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"good").unwrap();
writer.close().unwrap();
let mut frame = Vec::new();
frame.extend_from_slice(&0x46535901u32.to_be_bytes());
frame.extend_from_slice(&100u32.to_le_bytes());
frame.extend(std::iter::repeat_n(0xAA, 20));
use std::io::Write;
let mut f = OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(&frame).unwrap();
drop(f);
let mut reader = JournalReader::open(&path).unwrap();
let _good = reader.iter().next().unwrap().unwrap();
let next = reader.iter().next();
assert!(next.is_none());
assert_eq!(reader.tail_state(), JournalTailState::TruncatedPayload);
}
#[test]
fn checksum_mismatch_detected_as_tail_state() {
let path = tmp_path("checksum");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"good").unwrap();
let lsn_corrupt = writer.next_lsn();
let _ = writer.append(b"corrupt-this").unwrap();
writer.close().unwrap();
let mut bytes = std::fs::read(&path).unwrap();
let payload_offset = lsn_corrupt.0 as usize + 8;
bytes[payload_offset] ^= 0xFF;
std::fs::write(&path, &bytes).unwrap();
let mut reader = JournalReader::open(&path).unwrap();
let _good = reader.iter().next().unwrap().unwrap();
let next = reader.iter().next();
assert!(next.is_none());
assert_eq!(reader.tail_state(), JournalTailState::ChecksumMismatch);
}
#[test]
fn bad_magic_detected_as_tail_state() {
let path = tmp_path("bad_magic");
let _g = Cleanup(path.clone());
std::fs::write(&path, b"\xDE\xAD\xBE\xEF\x00\x00\x00\x00garbage").unwrap();
let mut reader = JournalReader::open(&path).unwrap();
let next = reader.iter().next();
assert!(next.is_none());
assert_eq!(reader.tail_state(), JournalTailState::BadMagic);
}
#[test]
fn iter_handles_large_records_above_default_buf_size() {
let path = tmp_path("large_records");
let _g = Cleanup(path.clone());
let big = vec![0xCDu8; 200 * 1024];
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(&big).unwrap();
writer.close().unwrap();
let mut reader = JournalReader::open(&path).unwrap();
let r = reader.iter().next().unwrap().unwrap();
assert_eq!(r.payload, big);
}
#[test]
fn seek_to_repositions_iterator() {
let path = tmp_path("seek_to");
let _g = Cleanup(path.clone());
let writer = JournalHandle::open(&path).unwrap();
let _ = writer.append(b"first").unwrap();
let lsn_at_second = writer.next_lsn();
let _ = writer.append(b"second").unwrap();
let lsn_at_third = writer.next_lsn();
let _ = writer.append(b"third").unwrap();
writer.close().unwrap();
let mut reader = JournalReader::open(&path).unwrap();
reader.seek_to(lsn_at_second);
let recs: Vec<JournalRecord> = reader.iter().map(|r| r.unwrap()).collect();
assert_eq!(recs.len(), 2);
assert_eq!(recs[0].payload, b"second");
assert_eq!(recs[1].payload, b"third");
reader.seek_to(lsn_at_third);
let recs: Vec<JournalRecord> = reader.iter().map(|r| r.unwrap()).collect();
assert_eq!(recs.len(), 1);
assert_eq!(recs[0].payload, b"third");
}
}