1use std::os::fd::AsRawFd;
18use std::path::Path;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use memmap2::Mmap;
22
23use crate::error::{Result, WalError};
24use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
25
26pub mod test_hooks {
28 use super::{AtomicU64, Ordering};
29 pub(super) static SEGMENTS_OPENED: AtomicU64 = AtomicU64::new(0);
30 pub(super) static FADV_DONTNEED_COUNT: AtomicU64 = AtomicU64::new(0);
31 pub(super) static MADV_SEQUENTIAL_COUNT: AtomicU64 = AtomicU64::new(0);
32
33 pub fn segments_opened() -> u64 {
34 SEGMENTS_OPENED.load(Ordering::Relaxed)
35 }
36 pub fn fadv_dontneed_count() -> u64 {
37 FADV_DONTNEED_COUNT.load(Ordering::Relaxed)
38 }
39 pub fn madv_sequential_count() -> u64 {
40 MADV_SEQUENTIAL_COUNT.load(Ordering::Relaxed)
41 }
42}
43
44fn fadv_dontneed(fd: &std::fs::File, len: usize, path: &Path) {
50 if len == 0 {
51 return;
52 }
53 let rc = unsafe {
54 libc::posix_fadvise(
55 fd.as_raw_fd(),
56 0,
57 len as libc::off_t,
58 libc::POSIX_FADV_DONTNEED,
59 )
60 };
61 if rc == 0 {
62 test_hooks::FADV_DONTNEED_COUNT.fetch_add(1, Ordering::Relaxed);
63 } else {
64 tracing::warn!(
65 path = %path.display(),
66 errno = rc,
67 "posix_fadvise(DONTNEED) failed on exhausted WAL segment",
68 );
69 }
70}
71
72pub struct MmapWalReader {
78 mmap: Mmap,
79 offset: usize,
80 file: std::fs::File,
81 path: std::path::PathBuf,
82 madvise_state: Option<libc::c_int>,
83}
84
85impl MmapWalReader {
86 pub fn open(path: &Path) -> Result<Self> {
88 test_hooks::SEGMENTS_OPENED.fetch_add(1, Ordering::Relaxed);
89 let file = std::fs::File::open(path)?;
90 let mmap = unsafe { Mmap::map(&file)? };
94
95 let mut madvise_state = None;
99 if !mmap.is_empty() {
100 let rc = unsafe {
101 libc::madvise(
102 mmap.as_ptr() as *mut libc::c_void,
103 mmap.len(),
104 libc::MADV_SEQUENTIAL,
105 )
106 };
107 if rc == 0 {
108 madvise_state = Some(libc::MADV_SEQUENTIAL);
109 test_hooks::MADV_SEQUENTIAL_COUNT.fetch_add(1, Ordering::Relaxed);
110 } else {
111 tracing::warn!(
112 path = %path.display(),
113 errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0),
114 "madvise(MADV_SEQUENTIAL) failed on WAL segment; continuing",
115 );
116 }
117 }
118
119 Ok(Self {
120 mmap,
121 offset: 0,
122 file,
123 path: path.to_path_buf(),
124 madvise_state,
125 })
126 }
127
128 pub fn madvise_state(&self) -> Option<libc::c_int> {
130 self.madvise_state
131 }
132
133 pub fn release_pages(&self) {
136 fadv_dontneed(&self.file, self.mmap.len(), &self.path);
137 }
138
139 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
144 let data = &self.mmap[..];
145
146 loop {
147 if self.offset + HEADER_SIZE > data.len() {
149 return Ok(None);
150 }
151
152 let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
154 .try_into()
155 .map_err(|_| {
156 WalError::Io(std::io::Error::new(
157 std::io::ErrorKind::InvalidData,
158 "header slice conversion failed",
159 ))
160 })?;
161 let header = RecordHeader::from_bytes(header_bytes);
162
163 if header.magic != WAL_MAGIC {
165 return Ok(None);
166 }
167
168 if header.validate(self.offset as u64).is_err() {
170 return Ok(None);
171 }
172
173 let payload_len = header.payload_len as usize;
174 let record_end = self.offset + HEADER_SIZE + payload_len;
175
176 if record_end > data.len() {
178 return Ok(None); }
180
181 let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
183 self.offset = record_end;
184
185 let record = WalRecord { header, payload };
186
187 if record.verify_checksum().is_err() {
189 return Ok(None); }
191
192 let logical_type = record.logical_record_type();
194 if RecordType::from_raw(logical_type).is_none() {
195 if RecordType::is_required(logical_type) {
196 return Err(WalError::UnknownRequiredRecordType {
197 record_type: header.record_type,
198 lsn: header.lsn,
199 });
200 }
201 continue;
203 }
204
205 return Ok(Some(record));
206 }
207 }
208
209 pub fn records(self) -> MmapRecordIter {
211 MmapRecordIter { reader: self }
212 }
213
214 pub fn offset(&self) -> usize {
216 self.offset
217 }
218
219 pub fn len(&self) -> usize {
221 self.mmap.len()
222 }
223
224 pub fn is_empty(&self) -> bool {
226 self.mmap.is_empty()
227 }
228}
229
230pub struct MmapRecordIter {
232 reader: MmapWalReader,
233}
234
235impl Iterator for MmapRecordIter {
236 type Item = Result<WalRecord>;
237
238 fn next(&mut self) -> Option<Self::Item> {
239 match self.reader.next_record() {
240 Ok(Some(record)) => Some(Ok(record)),
241 Ok(None) => None,
242 Err(e) => Some(Err(e)),
243 }
244 }
245}
246
247const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
249
250pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
260 let segments = crate::segment::discover_segments(wal_dir)?;
261 let live = filter_segments_by_lsn(&segments, from_lsn);
262
263 if live.len() < PARALLEL_SEGMENT_THRESHOLD {
264 return replay_segments_sequential(live, from_lsn);
265 }
266
267 replay_segments_parallel(live, from_lsn)
268}
269
270fn filter_segments_by_lsn(
276 segments: &[crate::segment::SegmentMeta],
277 from_lsn: u64,
278) -> &[crate::segment::SegmentMeta] {
279 let mut start = 0;
284 for i in 0..segments.len() {
285 let upper = segments.get(i + 1).map(|s| s.first_lsn).unwrap_or(u64::MAX);
287 if upper > from_lsn {
288 start = i;
289 break;
290 }
291 start = i + 1;
292 }
293 if start >= segments.len() {
294 return &[];
296 }
297 &segments[start..]
298}
299
300fn replay_segments_sequential(
302 segments: &[crate::segment::SegmentMeta],
303 from_lsn: u64,
304) -> Result<Vec<WalRecord>> {
305 let mut records = Vec::new();
306 for seg in segments {
307 let mut reader = MmapWalReader::open(&seg.path)?;
308 while let Some(record) = reader.next_record()? {
309 if record.header.lsn >= from_lsn {
310 records.push(record);
311 }
312 }
313 reader.release_pages();
314 }
315 Ok(records)
316}
317
318fn replay_segments_parallel(
324 segments: &[crate::segment::SegmentMeta],
325 from_lsn: u64,
326) -> Result<Vec<WalRecord>> {
327 let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
329
330 std::thread::scope(|scope| {
331 let handles: Vec<_> = segments
332 .iter()
333 .map(|seg| {
334 scope.spawn(move || -> Result<Vec<WalRecord>> {
335 let mut reader = MmapWalReader::open(&seg.path)?;
336 let mut seg_records = Vec::new();
337 while let Some(record) = reader.next_record()? {
338 if record.header.lsn >= from_lsn {
339 seg_records.push(record);
340 }
341 }
342 reader.release_pages();
343 Ok(seg_records)
344 })
345 })
346 .collect();
347
348 for handle in handles {
349 per_segment.push(handle.join().unwrap_or_else(|_| {
350 Err(WalError::Io(std::io::Error::other(
351 "segment replay thread panicked",
352 )))
353 }));
354 }
355 });
356
357 let total_estimate: usize = per_segment
359 .iter()
360 .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
361 .sum();
362 let mut records = Vec::with_capacity(total_estimate);
363 for seg_result in per_segment {
364 records.extend(seg_result?);
365 }
366
367 Ok(records)
368}
369
370pub fn replay_segments_mmap_limit(
379 wal_dir: &Path,
380 from_lsn: u64,
381 max_records: usize,
382) -> Result<(Vec<WalRecord>, bool)> {
383 let segments = crate::segment::discover_segments(wal_dir)?;
384 let live = filter_segments_by_lsn(&segments, from_lsn);
385 let mut records = Vec::with_capacity(max_records.min(4096));
386
387 for seg in live {
388 let mut reader = MmapWalReader::open(&seg.path)?;
389 while let Some(record) = reader.next_record()? {
390 if record.header.lsn >= from_lsn {
391 records.push(record);
392 if records.len() >= max_records {
393 return Ok((records, true));
396 }
397 }
398 }
399 reader.release_pages();
400 }
401
402 Ok((records, false))
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::record::RecordType;
409 use crate::writer::{WalWriter, WalWriterConfig};
410
411 fn test_writer(path: &Path) -> WalWriter {
412 let config = WalWriterConfig {
413 use_direct_io: false, ..Default::default()
415 };
416 WalWriter::open(path, config).unwrap()
417 }
418
419 #[test]
420 fn mmap_reader_basic() {
421 let dir = tempfile::tempdir().unwrap();
422 let path = dir.path().join("test.wal");
423
424 {
426 let mut writer = test_writer(&path);
427 writer
428 .append(RecordType::Put as u16, 1, 0, b"hello")
429 .unwrap();
430 writer
431 .append(RecordType::Put as u16, 1, 0, b"world")
432 .unwrap();
433 writer.sync().unwrap();
434 }
435
436 let reader = MmapWalReader::open(&path).unwrap();
438 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
439
440 assert_eq!(records.len(), 2);
441 assert_eq!(records[0].payload, b"hello");
442 assert_eq!(records[1].payload, b"world");
443 }
444
445 #[test]
446 fn mmap_reader_empty_file() {
447 let dir = tempfile::tempdir().unwrap();
448 let path = dir.path().join("empty.wal");
449 std::fs::write(&path, []).unwrap();
450
451 let reader = MmapWalReader::open(&path).unwrap();
452 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
453 assert!(records.is_empty());
454 }
455
456 #[test]
457 fn mmap_reader_truncated_header() {
458 let dir = tempfile::tempdir().unwrap();
459 let path = dir.path().join("truncated.wal");
460 std::fs::write(&path, [0u8; 10]).unwrap();
462
463 let reader = MmapWalReader::open(&path).unwrap();
464 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
465 assert!(records.is_empty());
466 }
467
468 #[test]
469 fn replay_mmap_from_lsn() {
470 let dir = tempfile::tempdir().unwrap();
471 let wal_dir = dir.path().join("wal");
472 std::fs::create_dir_all(&wal_dir).unwrap();
473
474 let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
475 let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
476
477 let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
478 let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
479 let lsn3 = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
480 wal.sync().unwrap();
481
482 let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
484 assert_eq!(records.len(), 2);
485 assert_eq!(records[0].header.lsn, lsn2);
486 assert_eq!(records[1].header.lsn, lsn3);
487
488 let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
490 assert_eq!(all.len(), 3);
491 }
492}