nodedb_wal/
mmap_reader.rs1use std::path::Path;
18
19use memmap2::Mmap;
20
21use crate::error::{Result, WalError};
22use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
23
24pub struct MmapWalReader {
30 mmap: Mmap,
31 offset: usize,
32}
33
34impl MmapWalReader {
35 pub fn open(path: &Path) -> Result<Self> {
37 let file = std::fs::File::open(path)?;
38 let mmap = unsafe { Mmap::map(&file)? };
42 Ok(Self { mmap, offset: 0 })
43 }
44
45 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
50 let data = &self.mmap[..];
51
52 if self.offset + HEADER_SIZE > data.len() {
54 return Ok(None);
55 }
56
57 let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
59 .try_into()
60 .map_err(|_| {
61 WalError::Io(std::io::Error::new(
62 std::io::ErrorKind::InvalidData,
63 "header slice conversion failed",
64 ))
65 })?;
66 let header = RecordHeader::from_bytes(header_bytes);
67
68 if header.magic != WAL_MAGIC {
70 return Ok(None);
71 }
72
73 if header.validate(self.offset as u64).is_err() {
75 return Ok(None);
76 }
77
78 let payload_len = header.payload_len as usize;
79 let record_end = self.offset + HEADER_SIZE + payload_len;
80
81 if record_end > data.len() {
83 return Ok(None); }
85
86 let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
88 self.offset = record_end;
89
90 let record = WalRecord { header, payload };
91
92 if record.verify_checksum().is_err() {
94 return Ok(None); }
96
97 let logical_type = record.logical_record_type();
99 if RecordType::from_raw(logical_type).is_none() {
100 if RecordType::is_required(logical_type) {
101 return Err(WalError::UnknownRequiredRecordType {
102 record_type: header.record_type,
103 lsn: header.lsn,
104 });
105 }
106 return self.next_record();
108 }
109
110 Ok(Some(record))
111 }
112
113 pub fn records(self) -> MmapRecordIter {
115 MmapRecordIter { reader: self }
116 }
117
118 pub fn offset(&self) -> usize {
120 self.offset
121 }
122
123 pub fn len(&self) -> usize {
125 self.mmap.len()
126 }
127
128 pub fn is_empty(&self) -> bool {
130 self.mmap.is_empty()
131 }
132}
133
134pub struct MmapRecordIter {
136 reader: MmapWalReader,
137}
138
139impl Iterator for MmapRecordIter {
140 type Item = Result<WalRecord>;
141
142 fn next(&mut self) -> Option<Self::Item> {
143 match self.reader.next_record() {
144 Ok(Some(record)) => Some(Ok(record)),
145 Ok(None) => None,
146 Err(e) => Some(Err(e)),
147 }
148 }
149}
150
151const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
153
154pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
164 let segments = crate::segment::discover_segments(wal_dir)?;
165
166 if segments.len() < PARALLEL_SEGMENT_THRESHOLD {
167 return replay_segments_sequential(&segments, from_lsn);
168 }
169
170 replay_segments_parallel(&segments, from_lsn)
171}
172
173fn replay_segments_sequential(
175 segments: &[crate::segment::SegmentMeta],
176 from_lsn: u64,
177) -> Result<Vec<WalRecord>> {
178 let mut records = Vec::new();
179 for seg in segments {
180 let reader = MmapWalReader::open(&seg.path)?;
181 for record_result in reader.records() {
182 let record = record_result?;
183 if record.header.lsn >= from_lsn {
184 records.push(record);
185 }
186 }
187 }
188 Ok(records)
189}
190
191fn replay_segments_parallel(
197 segments: &[crate::segment::SegmentMeta],
198 from_lsn: u64,
199) -> Result<Vec<WalRecord>> {
200 let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
202
203 std::thread::scope(|scope| {
204 let handles: Vec<_> = segments
205 .iter()
206 .map(|seg| {
207 scope.spawn(move || -> Result<Vec<WalRecord>> {
208 let reader = MmapWalReader::open(&seg.path)?;
209 let mut seg_records = Vec::new();
210 for record_result in reader.records() {
211 let record = record_result?;
212 if record.header.lsn >= from_lsn {
213 seg_records.push(record);
214 }
215 }
216 Ok(seg_records)
217 })
218 })
219 .collect();
220
221 for handle in handles {
222 per_segment.push(handle.join().unwrap_or_else(|_| {
223 Err(WalError::Io(std::io::Error::other(
224 "segment replay thread panicked",
225 )))
226 }));
227 }
228 });
229
230 let total_estimate: usize = per_segment
232 .iter()
233 .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
234 .sum();
235 let mut records = Vec::with_capacity(total_estimate);
236 for seg_result in per_segment {
237 records.extend(seg_result?);
238 }
239
240 Ok(records)
241}
242
243pub fn replay_segments_mmap_limit(
252 wal_dir: &Path,
253 from_lsn: u64,
254 max_records: usize,
255) -> Result<(Vec<WalRecord>, bool)> {
256 let segments = crate::segment::discover_segments(wal_dir)?;
257 let mut records = Vec::with_capacity(max_records.min(4096));
258
259 for seg in &segments {
260 let reader = MmapWalReader::open(&seg.path)?;
265 for record_result in reader.records() {
266 let record = record_result?;
267 if record.header.lsn >= from_lsn {
268 records.push(record);
269 if records.len() >= max_records {
270 return Ok((records, true));
271 }
272 }
273 }
274 }
275
276 Ok((records, false))
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::record::RecordType;
283 use crate::writer::{WalWriter, WalWriterConfig};
284
285 fn test_writer(path: &Path) -> WalWriter {
286 let config = WalWriterConfig {
287 use_direct_io: false, ..Default::default()
289 };
290 WalWriter::open(path, config).unwrap()
291 }
292
293 #[test]
294 fn mmap_reader_basic() {
295 let dir = tempfile::tempdir().unwrap();
296 let path = dir.path().join("test.wal");
297
298 {
300 let mut writer = test_writer(&path);
301 writer
302 .append(RecordType::Put as u16, 1, 0, b"hello")
303 .unwrap();
304 writer
305 .append(RecordType::Put as u16, 1, 0, b"world")
306 .unwrap();
307 writer.sync().unwrap();
308 }
309
310 let reader = MmapWalReader::open(&path).unwrap();
312 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
313
314 assert_eq!(records.len(), 2);
315 assert_eq!(records[0].payload, b"hello");
316 assert_eq!(records[1].payload, b"world");
317 }
318
319 #[test]
320 fn mmap_reader_empty_file() {
321 let dir = tempfile::tempdir().unwrap();
322 let path = dir.path().join("empty.wal");
323 std::fs::write(&path, []).unwrap();
324
325 let reader = MmapWalReader::open(&path).unwrap();
326 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
327 assert!(records.is_empty());
328 }
329
330 #[test]
331 fn mmap_reader_truncated_header() {
332 let dir = tempfile::tempdir().unwrap();
333 let path = dir.path().join("truncated.wal");
334 std::fs::write(&path, [0u8; 10]).unwrap();
336
337 let reader = MmapWalReader::open(&path).unwrap();
338 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
339 assert!(records.is_empty());
340 }
341
342 #[test]
343 fn replay_mmap_from_lsn() {
344 let dir = tempfile::tempdir().unwrap();
345 let wal_dir = dir.path().join("wal");
346 std::fs::create_dir_all(&wal_dir).unwrap();
347
348 let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
349 let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
350
351 let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
352 let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
353 let lsn3 = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
354 wal.sync().unwrap();
355
356 let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
358 assert_eq!(records.len(), 2);
359 assert_eq!(records[0].header.lsn, lsn2);
360 assert_eq!(records[1].header.lsn, lsn3);
361
362 let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
364 assert_eq!(all.len(), 3);
365 }
366}