1use std::os::fd::AsRawFd;
20use std::path::Path;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23use memmap2::Mmap;
24
25use crate::error::{Result, WalError};
26use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
27
28pub mod observability {
32 use super::{AtomicU64, Ordering};
33 pub(super) static SEGMENTS_OPENED: AtomicU64 = AtomicU64::new(0);
34 pub(super) static FADV_DONTNEED_COUNT: AtomicU64 = AtomicU64::new(0);
35 pub(super) static MADV_SEQUENTIAL_COUNT: AtomicU64 = AtomicU64::new(0);
36
37 pub fn segments_opened() -> u64 {
38 SEGMENTS_OPENED.load(Ordering::Relaxed)
39 }
40 pub fn fadv_dontneed_count() -> u64 {
41 FADV_DONTNEED_COUNT.load(Ordering::Relaxed)
42 }
43 pub fn madv_sequential_count() -> u64 {
44 MADV_SEQUENTIAL_COUNT.load(Ordering::Relaxed)
45 }
46}
47
48fn fadv_dontneed(fd: &std::fs::File, len: usize, path: &Path) {
54 if len == 0 {
55 return;
56 }
57 let rc = unsafe {
58 libc::posix_fadvise(
59 fd.as_raw_fd(),
60 0,
61 len as libc::off_t,
62 libc::POSIX_FADV_DONTNEED,
63 )
64 };
65 if rc == 0 {
66 observability::FADV_DONTNEED_COUNT.fetch_add(1, Ordering::Relaxed);
67 } else {
68 tracing::warn!(
69 path = %path.display(),
70 errno = rc,
71 "posix_fadvise(DONTNEED) failed on exhausted WAL segment",
72 );
73 }
74}
75
76pub struct MmapWalReader {
82 mmap: Mmap,
83 offset: usize,
84 file: std::fs::File,
85 path: std::path::PathBuf,
86 madvise_state: Option<libc::c_int>,
87}
88
89impl MmapWalReader {
90 pub fn open(path: &Path) -> Result<Self> {
92 observability::SEGMENTS_OPENED.fetch_add(1, Ordering::Relaxed);
93 let file = std::fs::File::open(path)?;
94 let mmap = unsafe { Mmap::map(&file)? };
98
99 let mut madvise_state = None;
103 if !mmap.is_empty() {
104 let rc = unsafe {
105 libc::madvise(
106 mmap.as_ptr() as *mut libc::c_void,
107 mmap.len(),
108 libc::MADV_SEQUENTIAL,
109 )
110 };
111 if rc == 0 {
112 madvise_state = Some(libc::MADV_SEQUENTIAL);
113 observability::MADV_SEQUENTIAL_COUNT.fetch_add(1, Ordering::Relaxed);
114 } else {
115 tracing::warn!(
116 path = %path.display(),
117 errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0),
118 "madvise(MADV_SEQUENTIAL) failed on WAL segment; continuing",
119 );
120 }
121 }
122
123 Ok(Self {
124 mmap,
125 offset: 0,
126 file,
127 path: path.to_path_buf(),
128 madvise_state,
129 })
130 }
131
132 pub fn madvise_state(&self) -> Option<libc::c_int> {
134 self.madvise_state
135 }
136
137 pub fn release_pages(&self) {
140 fadv_dontneed(&self.file, self.mmap.len(), &self.path);
141 }
142
143 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
148 let data = &self.mmap[..];
149
150 loop {
151 if self.offset + HEADER_SIZE > data.len() {
153 return Ok(None);
154 }
155
156 let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
158 .try_into()
159 .map_err(|_| {
160 WalError::Io(std::io::Error::new(
161 std::io::ErrorKind::InvalidData,
162 "header slice conversion failed",
163 ))
164 })?;
165 let header = RecordHeader::from_bytes(header_bytes);
166
167 if header.magic != WAL_MAGIC {
169 return Ok(None);
170 }
171
172 if header.validate(self.offset as u64).is_err() {
174 return Ok(None);
175 }
176
177 let payload_len = header.payload_len as usize;
178 let record_end = self.offset + HEADER_SIZE + payload_len;
179
180 if record_end > data.len() {
182 return Ok(None); }
184
185 let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
187 self.offset = record_end;
188
189 let record = WalRecord { header, payload };
190
191 if record.verify_checksum().is_err() {
193 return Ok(None); }
195
196 let logical_type = record.logical_record_type();
198 if RecordType::from_raw(logical_type).is_none() {
199 if RecordType::is_required(logical_type) {
200 return Err(WalError::UnknownRequiredRecordType {
201 record_type: header.record_type,
202 lsn: header.lsn,
203 });
204 }
205 continue;
207 }
208
209 return Ok(Some(record));
210 }
211 }
212
213 pub fn records(self) -> MmapRecordIter {
215 MmapRecordIter { reader: self }
216 }
217
218 pub fn offset(&self) -> usize {
220 self.offset
221 }
222
223 pub fn len(&self) -> usize {
225 self.mmap.len()
226 }
227
228 pub fn is_empty(&self) -> bool {
230 self.mmap.is_empty()
231 }
232}
233
234pub struct MmapRecordIter {
236 reader: MmapWalReader,
237}
238
239impl Iterator for MmapRecordIter {
240 type Item = Result<WalRecord>;
241
242 fn next(&mut self) -> Option<Self::Item> {
243 match self.reader.next_record() {
244 Ok(Some(record)) => Some(Ok(record)),
245 Ok(None) => None,
246 Err(e) => Some(Err(e)),
247 }
248 }
249}
250
251const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
253
254pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
264 let segments = crate::segment::discover_segments(wal_dir)?;
265 let live = filter_segments_by_lsn(&segments, from_lsn);
266
267 if live.len() < PARALLEL_SEGMENT_THRESHOLD {
268 return replay_segments_sequential(live, from_lsn);
269 }
270
271 replay_segments_parallel(live, from_lsn)
272}
273
274fn filter_segments_by_lsn(
280 segments: &[crate::segment::SegmentMeta],
281 from_lsn: u64,
282) -> &[crate::segment::SegmentMeta] {
283 let mut start = 0;
288 for i in 0..segments.len() {
289 let upper = segments.get(i + 1).map(|s| s.first_lsn).unwrap_or(u64::MAX);
291 if upper > from_lsn {
292 start = i;
293 break;
294 }
295 start = i + 1;
296 }
297 if start >= segments.len() {
298 return &[];
300 }
301 &segments[start..]
302}
303
304fn replay_segments_sequential(
306 segments: &[crate::segment::SegmentMeta],
307 from_lsn: u64,
308) -> Result<Vec<WalRecord>> {
309 let mut records = Vec::new();
310 for seg in segments {
311 let mut reader = MmapWalReader::open(&seg.path)?;
312 while let Some(record) = reader.next_record()? {
313 if record.header.lsn >= from_lsn {
314 records.push(record);
315 }
316 }
317 reader.release_pages();
318 }
319 Ok(records)
320}
321
322fn replay_segments_parallel(
328 segments: &[crate::segment::SegmentMeta],
329 from_lsn: u64,
330) -> Result<Vec<WalRecord>> {
331 let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
333
334 std::thread::scope(|scope| {
335 let handles: Vec<_> = segments
336 .iter()
337 .map(|seg| {
338 scope.spawn(move || -> Result<Vec<WalRecord>> {
339 let mut reader = MmapWalReader::open(&seg.path)?;
340 let mut seg_records = Vec::new();
341 while let Some(record) = reader.next_record()? {
342 if record.header.lsn >= from_lsn {
343 seg_records.push(record);
344 }
345 }
346 reader.release_pages();
347 Ok(seg_records)
348 })
349 })
350 .collect();
351
352 for handle in handles {
353 per_segment.push(handle.join().unwrap_or_else(|_| {
354 Err(WalError::Io(std::io::Error::other(
355 "segment replay thread panicked",
356 )))
357 }));
358 }
359 });
360
361 let total_estimate: usize = per_segment
363 .iter()
364 .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
365 .sum();
366 let mut records = Vec::with_capacity(total_estimate);
367 for seg_result in per_segment {
368 records.extend(seg_result?);
369 }
370
371 Ok(records)
372}
373
374pub fn replay_segments_mmap_limit(
383 wal_dir: &Path,
384 from_lsn: u64,
385 max_records: usize,
386) -> Result<(Vec<WalRecord>, bool)> {
387 let segments = crate::segment::discover_segments(wal_dir)?;
388 let live = filter_segments_by_lsn(&segments, from_lsn);
389 let mut records = Vec::with_capacity(max_records.min(4096));
390
391 for seg in live {
392 let mut reader = MmapWalReader::open(&seg.path)?;
393 while let Some(record) = reader.next_record()? {
394 if record.header.lsn >= from_lsn {
395 records.push(record);
396 if records.len() >= max_records {
397 return Ok((records, true));
400 }
401 }
402 }
403 reader.release_pages();
404 }
405
406 Ok((records, false))
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use crate::record::RecordType;
413 use crate::writer::{WalWriter, WalWriterConfig};
414
415 fn test_writer(path: &Path) -> WalWriter {
416 let config = WalWriterConfig {
417 use_direct_io: false, ..Default::default()
419 };
420 WalWriter::open(path, config).unwrap()
421 }
422
423 #[test]
424 fn mmap_reader_basic() {
425 let dir = tempfile::tempdir().unwrap();
426 let path = dir.path().join("test.wal");
427
428 {
430 let mut writer = test_writer(&path);
431 writer
432 .append(RecordType::Put as u32, 1, 0, 0, b"hello")
433 .unwrap();
434 writer
435 .append(RecordType::Put as u32, 1, 0, 0, b"world")
436 .unwrap();
437 writer.sync().unwrap();
438 }
439
440 let reader = MmapWalReader::open(&path).unwrap();
442 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
443
444 assert_eq!(records.len(), 2);
445 assert_eq!(records[0].payload, b"hello");
446 assert_eq!(records[1].payload, b"world");
447 }
448
449 #[test]
450 fn mmap_reader_empty_file() {
451 let dir = tempfile::tempdir().unwrap();
452 let path = dir.path().join("empty.wal");
453 std::fs::write(&path, []).unwrap();
454
455 let reader = MmapWalReader::open(&path).unwrap();
456 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
457 assert!(records.is_empty());
458 }
459
460 #[test]
461 fn mmap_reader_truncated_header() {
462 let dir = tempfile::tempdir().unwrap();
463 let path = dir.path().join("truncated.wal");
464 std::fs::write(&path, [0u8; 10]).unwrap();
466
467 let reader = MmapWalReader::open(&path).unwrap();
468 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
469 assert!(records.is_empty());
470 }
471
472 #[test]
473 fn replay_mmap_from_lsn() {
474 let dir = tempfile::tempdir().unwrap();
475 let wal_dir = dir.path().join("wal");
476 std::fs::create_dir_all(&wal_dir).unwrap();
477
478 let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
479 let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
480
481 let lsn1 = wal.append(RecordType::Put as u32, 1, 0, 0, b"a").unwrap();
482 let lsn2 = wal.append(RecordType::Put as u32, 1, 0, 0, b"b").unwrap();
483 let lsn3 = wal.append(RecordType::Put as u32, 1, 0, 0, b"c").unwrap();
484 wal.sync().unwrap();
485
486 let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
488 assert_eq!(records.len(), 2);
489 assert_eq!(records[0].header.lsn, lsn2);
490 assert_eq!(records[1].header.lsn, lsn3);
491
492 let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
494 assert_eq!(all.len(), 3);
495 }
496}