use std::os::fd::AsRawFd;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use memmap2::Mmap;
use crate::error::{Result, WalError};
use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
pub mod observability {
use super::{AtomicU64, Ordering};
pub(super) static SEGMENTS_OPENED: AtomicU64 = AtomicU64::new(0);
pub(super) static FADV_DONTNEED_COUNT: AtomicU64 = AtomicU64::new(0);
pub(super) static MADV_SEQUENTIAL_COUNT: AtomicU64 = AtomicU64::new(0);
pub fn segments_opened() -> u64 {
SEGMENTS_OPENED.load(Ordering::Relaxed)
}
pub fn fadv_dontneed_count() -> u64 {
FADV_DONTNEED_COUNT.load(Ordering::Relaxed)
}
pub fn madv_sequential_count() -> u64 {
MADV_SEQUENTIAL_COUNT.load(Ordering::Relaxed)
}
}
fn fadv_dontneed(fd: &std::fs::File, len: usize, path: &Path) {
if len == 0 {
return;
}
let rc = unsafe {
libc::posix_fadvise(
fd.as_raw_fd(),
0,
len as libc::off_t,
libc::POSIX_FADV_DONTNEED,
)
};
if rc == 0 {
observability::FADV_DONTNEED_COUNT.fetch_add(1, Ordering::Relaxed);
} else {
tracing::warn!(
path = %path.display(),
errno = rc,
"posix_fadvise(DONTNEED) failed on exhausted WAL segment",
);
}
}
pub struct MmapWalReader {
mmap: Mmap,
offset: usize,
file: std::fs::File,
path: std::path::PathBuf,
madvise_state: Option<libc::c_int>,
}
impl MmapWalReader {
pub fn open(path: &Path) -> Result<Self> {
observability::SEGMENTS_OPENED.fetch_add(1, Ordering::Relaxed);
let file = std::fs::File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
let mut madvise_state = None;
if !mmap.is_empty() {
let rc = unsafe {
libc::madvise(
mmap.as_ptr() as *mut libc::c_void,
mmap.len(),
libc::MADV_SEQUENTIAL,
)
};
if rc == 0 {
madvise_state = Some(libc::MADV_SEQUENTIAL);
observability::MADV_SEQUENTIAL_COUNT.fetch_add(1, Ordering::Relaxed);
} else {
tracing::warn!(
path = %path.display(),
errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0),
"madvise(MADV_SEQUENTIAL) failed on WAL segment; continuing",
);
}
}
Ok(Self {
mmap,
offset: 0,
file,
path: path.to_path_buf(),
madvise_state,
})
}
pub fn madvise_state(&self) -> Option<libc::c_int> {
self.madvise_state
}
pub fn release_pages(&self) {
fadv_dontneed(&self.file, self.mmap.len(), &self.path);
}
pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
let data = &self.mmap[..];
loop {
if self.offset + HEADER_SIZE > data.len() {
return Ok(None);
}
let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
.try_into()
.map_err(|_| {
WalError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"header slice conversion failed",
))
})?;
let header = RecordHeader::from_bytes(header_bytes);
if header.magic != WAL_MAGIC {
return Ok(None);
}
if header.validate(self.offset as u64).is_err() {
return Ok(None);
}
let payload_len = header.payload_len as usize;
let record_end = self.offset + HEADER_SIZE + payload_len;
if record_end > data.len() {
return Ok(None); }
let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
self.offset = record_end;
let record = WalRecord { header, payload };
if record.verify_checksum().is_err() {
return Ok(None); }
let logical_type = record.logical_record_type();
if RecordType::from_raw(logical_type).is_none() {
if RecordType::is_required(logical_type) {
return Err(WalError::UnknownRequiredRecordType {
record_type: header.record_type,
lsn: header.lsn,
});
}
continue;
}
return Ok(Some(record));
}
}
pub fn records(self) -> MmapRecordIter {
MmapRecordIter { reader: self }
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn len(&self) -> usize {
self.mmap.len()
}
pub fn is_empty(&self) -> bool {
self.mmap.is_empty()
}
}
pub struct MmapRecordIter {
reader: MmapWalReader,
}
impl Iterator for MmapRecordIter {
type Item = Result<WalRecord>;
fn next(&mut self) -> Option<Self::Item> {
match self.reader.next_record() {
Ok(Some(record)) => Some(Ok(record)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
let segments = crate::segment::discover_segments(wal_dir)?;
let live = filter_segments_by_lsn(&segments, from_lsn);
if live.len() < PARALLEL_SEGMENT_THRESHOLD {
return replay_segments_sequential(live, from_lsn);
}
replay_segments_parallel(live, from_lsn)
}
fn filter_segments_by_lsn(
segments: &[crate::segment::SegmentMeta],
from_lsn: u64,
) -> &[crate::segment::SegmentMeta] {
let mut start = 0;
for i in 0..segments.len() {
let upper = segments.get(i + 1).map(|s| s.first_lsn).unwrap_or(u64::MAX);
if upper > from_lsn {
start = i;
break;
}
start = i + 1;
}
if start >= segments.len() {
return &[];
}
&segments[start..]
}
fn replay_segments_sequential(
segments: &[crate::segment::SegmentMeta],
from_lsn: u64,
) -> Result<Vec<WalRecord>> {
let mut records = Vec::new();
for seg in segments {
let mut reader = MmapWalReader::open(&seg.path)?;
while let Some(record) = reader.next_record()? {
if record.header.lsn >= from_lsn {
records.push(record);
}
}
reader.release_pages();
}
Ok(records)
}
fn replay_segments_parallel(
segments: &[crate::segment::SegmentMeta],
from_lsn: u64,
) -> Result<Vec<WalRecord>> {
let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
std::thread::scope(|scope| {
let handles: Vec<_> = segments
.iter()
.map(|seg| {
scope.spawn(move || -> Result<Vec<WalRecord>> {
let mut reader = MmapWalReader::open(&seg.path)?;
let mut seg_records = Vec::new();
while let Some(record) = reader.next_record()? {
if record.header.lsn >= from_lsn {
seg_records.push(record);
}
}
reader.release_pages();
Ok(seg_records)
})
})
.collect();
for handle in handles {
per_segment.push(handle.join().unwrap_or_else(|_| {
Err(WalError::Io(std::io::Error::other(
"segment replay thread panicked",
)))
}));
}
});
let total_estimate: usize = per_segment
.iter()
.map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
.sum();
let mut records = Vec::with_capacity(total_estimate);
for seg_result in per_segment {
records.extend(seg_result?);
}
Ok(records)
}
pub fn replay_segments_mmap_limit(
wal_dir: &Path,
from_lsn: u64,
max_records: usize,
) -> Result<(Vec<WalRecord>, bool)> {
let segments = crate::segment::discover_segments(wal_dir)?;
let live = filter_segments_by_lsn(&segments, from_lsn);
let mut records = Vec::with_capacity(max_records.min(4096));
for seg in live {
let mut reader = MmapWalReader::open(&seg.path)?;
while let Some(record) = reader.next_record()? {
if record.header.lsn >= from_lsn {
records.push(record);
if records.len() >= max_records {
return Ok((records, true));
}
}
}
reader.release_pages();
}
Ok((records, false))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::RecordType;
use crate::writer::{WalWriter, WalWriterConfig};
fn test_writer(path: &Path) -> WalWriter {
let config = WalWriterConfig {
use_direct_io: false, ..Default::default()
};
WalWriter::open(path, config).unwrap()
}
#[test]
fn mmap_reader_basic() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
{
let mut writer = test_writer(&path);
writer
.append(RecordType::Put as u32, 1, 0, 0, b"hello")
.unwrap();
writer
.append(RecordType::Put as u32, 1, 0, 0, b"world")
.unwrap();
writer.sync().unwrap();
}
let reader = MmapWalReader::open(&path).unwrap();
let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].payload, b"hello");
assert_eq!(records[1].payload, b"world");
}
#[test]
fn mmap_reader_empty_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("empty.wal");
std::fs::write(&path, []).unwrap();
let reader = MmapWalReader::open(&path).unwrap();
let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
assert!(records.is_empty());
}
#[test]
fn mmap_reader_truncated_header() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("truncated.wal");
std::fs::write(&path, [0u8; 10]).unwrap();
let reader = MmapWalReader::open(&path).unwrap();
let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
assert!(records.is_empty());
}
#[test]
fn replay_mmap_from_lsn() {
let dir = tempfile::tempdir().unwrap();
let wal_dir = dir.path().join("wal");
std::fs::create_dir_all(&wal_dir).unwrap();
let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
let lsn1 = wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
let lsn2 = wal.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
let lsn3 = wal.append(RecordType::Put as u32, 1, 0, 0, b"c").unwrap();
wal.sync().unwrap();
let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].header.lsn, lsn2);
assert_eq!(records[1].header.lsn, lsn3);
let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
assert_eq!(all.len(), 3);
}
}