nodedb_wal/
mmap_reader.rs1use std::path::Path;
18
19use memmap2::Mmap;
20
21use crate::error::{Result, WalError};
22use crate::record::{HEADER_SIZE, RecordHeader, RecordType, WAL_MAGIC, WalRecord};
23
24pub struct MmapWalReader {
30 mmap: Mmap,
31 offset: usize,
32}
33
34impl MmapWalReader {
35 pub fn open(path: &Path) -> Result<Self> {
37 let file = std::fs::File::open(path)?;
38 let mmap = unsafe { Mmap::map(&file)? };
42 Ok(Self { mmap, offset: 0 })
43 }
44
45 pub fn next_record(&mut self) -> Result<Option<WalRecord>> {
50 let data = &self.mmap[..];
51
52 loop {
53 if self.offset + HEADER_SIZE > data.len() {
55 return Ok(None);
56 }
57
58 let header_bytes: &[u8; HEADER_SIZE] = data[self.offset..self.offset + HEADER_SIZE]
60 .try_into()
61 .map_err(|_| {
62 WalError::Io(std::io::Error::new(
63 std::io::ErrorKind::InvalidData,
64 "header slice conversion failed",
65 ))
66 })?;
67 let header = RecordHeader::from_bytes(header_bytes);
68
69 if header.magic != WAL_MAGIC {
71 return Ok(None);
72 }
73
74 if header.validate(self.offset as u64).is_err() {
76 return Ok(None);
77 }
78
79 let payload_len = header.payload_len as usize;
80 let record_end = self.offset + HEADER_SIZE + payload_len;
81
82 if record_end > data.len() {
84 return Ok(None); }
86
87 let payload = data[self.offset + HEADER_SIZE..record_end].to_vec();
89 self.offset = record_end;
90
91 let record = WalRecord { header, payload };
92
93 if record.verify_checksum().is_err() {
95 return Ok(None); }
97
98 let logical_type = record.logical_record_type();
100 if RecordType::from_raw(logical_type).is_none() {
101 if RecordType::is_required(logical_type) {
102 return Err(WalError::UnknownRequiredRecordType {
103 record_type: header.record_type,
104 lsn: header.lsn,
105 });
106 }
107 continue;
109 }
110
111 return Ok(Some(record));
112 }
113 }
114
115 pub fn records(self) -> MmapRecordIter {
117 MmapRecordIter { reader: self }
118 }
119
120 pub fn offset(&self) -> usize {
122 self.offset
123 }
124
125 pub fn len(&self) -> usize {
127 self.mmap.len()
128 }
129
130 pub fn is_empty(&self) -> bool {
132 self.mmap.is_empty()
133 }
134}
135
136pub struct MmapRecordIter {
138 reader: MmapWalReader,
139}
140
141impl Iterator for MmapRecordIter {
142 type Item = Result<WalRecord>;
143
144 fn next(&mut self) -> Option<Self::Item> {
145 match self.reader.next_record() {
146 Ok(Some(record)) => Some(Ok(record)),
147 Ok(None) => None,
148 Err(e) => Some(Err(e)),
149 }
150 }
151}
152
153const PARALLEL_SEGMENT_THRESHOLD: usize = 4;
155
156pub fn replay_segments_mmap(wal_dir: &Path, from_lsn: u64) -> Result<Vec<WalRecord>> {
166 let segments = crate::segment::discover_segments(wal_dir)?;
167
168 if segments.len() < PARALLEL_SEGMENT_THRESHOLD {
169 return replay_segments_sequential(&segments, from_lsn);
170 }
171
172 replay_segments_parallel(&segments, from_lsn)
173}
174
175fn replay_segments_sequential(
177 segments: &[crate::segment::SegmentMeta],
178 from_lsn: u64,
179) -> Result<Vec<WalRecord>> {
180 let mut records = Vec::new();
181 for seg in segments {
182 let reader = MmapWalReader::open(&seg.path)?;
183 for record_result in reader.records() {
184 let record = record_result?;
185 if record.header.lsn >= from_lsn {
186 records.push(record);
187 }
188 }
189 }
190 Ok(records)
191}
192
193fn replay_segments_parallel(
199 segments: &[crate::segment::SegmentMeta],
200 from_lsn: u64,
201) -> Result<Vec<WalRecord>> {
202 let mut per_segment: Vec<Result<Vec<WalRecord>>> = Vec::with_capacity(segments.len());
204
205 std::thread::scope(|scope| {
206 let handles: Vec<_> = segments
207 .iter()
208 .map(|seg| {
209 scope.spawn(move || -> Result<Vec<WalRecord>> {
210 let reader = MmapWalReader::open(&seg.path)?;
211 let mut seg_records = Vec::new();
212 for record_result in reader.records() {
213 let record = record_result?;
214 if record.header.lsn >= from_lsn {
215 seg_records.push(record);
216 }
217 }
218 Ok(seg_records)
219 })
220 })
221 .collect();
222
223 for handle in handles {
224 per_segment.push(handle.join().unwrap_or_else(|_| {
225 Err(WalError::Io(std::io::Error::other(
226 "segment replay thread panicked",
227 )))
228 }));
229 }
230 });
231
232 let total_estimate: usize = per_segment
234 .iter()
235 .map(|r| r.as_ref().map(|v| v.len()).unwrap_or(0))
236 .sum();
237 let mut records = Vec::with_capacity(total_estimate);
238 for seg_result in per_segment {
239 records.extend(seg_result?);
240 }
241
242 Ok(records)
243}
244
245pub fn replay_segments_mmap_limit(
254 wal_dir: &Path,
255 from_lsn: u64,
256 max_records: usize,
257) -> Result<(Vec<WalRecord>, bool)> {
258 let segments = crate::segment::discover_segments(wal_dir)?;
259 let mut records = Vec::with_capacity(max_records.min(4096));
260
261 for seg in &segments {
262 let reader = MmapWalReader::open(&seg.path)?;
267 for record_result in reader.records() {
268 let record = record_result?;
269 if record.header.lsn >= from_lsn {
270 records.push(record);
271 if records.len() >= max_records {
272 return Ok((records, true));
273 }
274 }
275 }
276 }
277
278 Ok((records, false))
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::record::RecordType;
285 use crate::writer::{WalWriter, WalWriterConfig};
286
287 fn test_writer(path: &Path) -> WalWriter {
288 let config = WalWriterConfig {
289 use_direct_io: false, ..Default::default()
291 };
292 WalWriter::open(path, config).unwrap()
293 }
294
295 #[test]
296 fn mmap_reader_basic() {
297 let dir = tempfile::tempdir().unwrap();
298 let path = dir.path().join("test.wal");
299
300 {
302 let mut writer = test_writer(&path);
303 writer
304 .append(RecordType::Put as u16, 1, 0, b"hello")
305 .unwrap();
306 writer
307 .append(RecordType::Put as u16, 1, 0, b"world")
308 .unwrap();
309 writer.sync().unwrap();
310 }
311
312 let reader = MmapWalReader::open(&path).unwrap();
314 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
315
316 assert_eq!(records.len(), 2);
317 assert_eq!(records[0].payload, b"hello");
318 assert_eq!(records[1].payload, b"world");
319 }
320
321 #[test]
322 fn mmap_reader_empty_file() {
323 let dir = tempfile::tempdir().unwrap();
324 let path = dir.path().join("empty.wal");
325 std::fs::write(&path, []).unwrap();
326
327 let reader = MmapWalReader::open(&path).unwrap();
328 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
329 assert!(records.is_empty());
330 }
331
332 #[test]
333 fn mmap_reader_truncated_header() {
334 let dir = tempfile::tempdir().unwrap();
335 let path = dir.path().join("truncated.wal");
336 std::fs::write(&path, [0u8; 10]).unwrap();
338
339 let reader = MmapWalReader::open(&path).unwrap();
340 let records: Vec<WalRecord> = reader.records().collect::<Result<Vec<_>>>().unwrap();
341 assert!(records.is_empty());
342 }
343
344 #[test]
345 fn replay_mmap_from_lsn() {
346 let dir = tempfile::tempdir().unwrap();
347 let wal_dir = dir.path().join("wal");
348 std::fs::create_dir_all(&wal_dir).unwrap();
349
350 let config = crate::segmented::SegmentedWalConfig::for_testing(wal_dir.clone());
351 let mut wal = crate::segmented::SegmentedWal::open(config).unwrap();
352
353 let lsn1 = wal.append(RecordType::Put as u16, 1, 0, b"a").unwrap();
354 let lsn2 = wal.append(RecordType::Put as u16, 1, 0, b"b").unwrap();
355 let lsn3 = wal.append(RecordType::Put as u16, 1, 0, b"c").unwrap();
356 wal.sync().unwrap();
357
358 let records = replay_segments_mmap(&wal_dir, lsn2).unwrap();
360 assert_eq!(records.len(), 2);
361 assert_eq!(records[0].header.lsn, lsn2);
362 assert_eq!(records[1].header.lsn, lsn3);
363
364 let all = replay_segments_mmap(&wal_dir, lsn1).unwrap();
366 assert_eq!(all.len(), 3);
367 }
368}