1use std::collections::HashMap;
8use std::fs::{self, File, OpenOptions};
9use std::io::{Read, Seek, SeekFrom, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex, OnceLock};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use fs2::FileExt;
15
16pub type RdbFileResult<T> = Result<T, RdbFileError>;
17
18pub const DEFAULT_FORMAT_VERSION: u32 = 1;
19
20#[derive(Debug)]
21pub enum RdbFileError {
22 InvalidOperation(String),
23 Io(std::io::Error),
24}
25
26impl std::fmt::Display for RdbFileError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
30 Self::Io(err) => write!(f, "io error: {err}"),
31 }
32 }
33}
34
35impl std::error::Error for RdbFileError {}
36
37impl From<std::io::Error> for RdbFileError {
38 fn from(err: std::io::Error) -> Self {
39 Self::Io(err)
40 }
41}
42
43fn crc32(data: &[u8]) -> u32 {
44 let mut hasher = crc32fast::Hasher::new();
45 hasher.update(data);
46 hasher.finalize()
47}
48
49pub const EMBEDDED_RDB_SUPERBLOCK_SIZE: u64 = 4096;
50pub const EMBEDDED_RDB_SUPERBLOCK_0_OFFSET: u64 = 0;
51pub const EMBEDDED_RDB_SUPERBLOCK_1_OFFSET: u64 = EMBEDDED_RDB_SUPERBLOCK_SIZE;
52pub const EMBEDDED_RDB_MANIFEST_OFFSET: u64 = EMBEDDED_RDB_SUPERBLOCK_SIZE * 2;
53
54const SUPERBLOCK_MAGIC: &[u8; 8] = b"RDBSBLK1";
55const MANIFEST_MAGIC: &[u8; 8] = b"RDBMNFS1";
56const SUPERBLOCK_VERSION: u32 = 1;
57const MANIFEST_VERSION: u32 = 1;
58const CHECKSUM_LEN: usize = 4;
59const MANIFEST_REGION_BYTES: u64 = 4096;
60const WAL_REGION_BYTES: u64 = 64 * 1024;
61const SNAPSHOT_ALIGNMENT: u64 = 4096;
62const SNAPSHOT_MAGIC: &[u8; 4] = b"RDST";
63const WAL_FRAME_MAGIC: &[u8; 8] = b"RDBEWAL1";
64const WAL_FRAME_VERSION: u16 = 2;
65const WAL_FRAME_HEADER_BYTES: usize = 8 + 2 + 2 + 8 + 4 + 4 + 4 + 4;
66const LEGACY_WAL_FRAME_HEADER_BYTES: usize = 8 + 4 + 4;
67const CRASH_INJECT_ENV: &str = "REDDB_EMBEDDED_RDB_CRASH_AT";
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct EmbeddedRdbManifest {
71 pub version: u32,
72 pub wal_region_offset: u64,
73 pub wal_region_bytes: u64,
74 pub wal_recovery_boundary: u64,
75 pub snapshot_offset: u64,
76 pub snapshot_bytes: u64,
77 pub snapshot_checksum: u32,
78 pub created_at_unix_ms: u128,
79 pub checksum: u32,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub struct EmbeddedRdbSuperblock {
84 pub copy_index: u8,
85 pub generation: u64,
86 pub format_version: u32,
87 pub manifest_offset: u64,
88 pub manifest_len: u64,
89 pub manifest_checksum: u32,
90 pub wal_region_offset: u64,
91 pub wal_region_bytes: u64,
92 pub wal_recovery_boundary: u64,
93 pub snapshot_offset: u64,
94 pub snapshot_bytes: u64,
95 pub snapshot_checksum: u32,
96 pub checksum: u32,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct EmbeddedRdbOpen {
101 pub path: PathBuf,
102 pub selected_superblock: EmbeddedRdbSuperblock,
103 pub manifest: EmbeddedRdbManifest,
104}
105
106#[derive(Debug, Default)]
107struct WalScan {
108 payloads: Vec<Vec<u8>>,
109 next_sequence: u64,
110 previous_frame_crc: u32,
111 valid_bytes: u64,
112}
113
114pub struct EmbeddedRdbArtifact;
115
116impl EmbeddedRdbArtifact {
117 pub fn create(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
118 Self::create_with_snapshot(path, &[])
119 }
120
121 pub fn create_with_snapshot(
122 path: impl AsRef<Path>,
123 snapshot: &[u8],
124 ) -> RdbFileResult<EmbeddedRdbOpen> {
125 let path = path.as_ref();
126 if let Some(parent) = path.parent() {
127 if !parent.as_os_str().is_empty() {
128 fs::create_dir_all(parent)?;
129 }
130 }
131
132 let created_at_unix_ms = now_unix_ms();
133 let wal_region_offset = EMBEDDED_RDB_MANIFEST_OFFSET + MANIFEST_REGION_BYTES;
134 let snapshot_offset = wal_region_offset + WAL_REGION_BYTES;
135 let manifest = EmbeddedRdbManifest {
136 version: MANIFEST_VERSION,
137 wal_region_offset,
138 wal_region_bytes: WAL_REGION_BYTES,
139 wal_recovery_boundary: wal_region_offset,
140 snapshot_offset,
141 snapshot_bytes: snapshot.len() as u64,
142 snapshot_checksum: crc32(snapshot),
143 created_at_unix_ms,
144 checksum: 0,
145 };
146 let manifest_bytes = encode_manifest(manifest);
147 let manifest_checksum = trailer_checksum(&manifest_bytes);
148
149 let mut file = OpenOptions::new()
150 .create(true)
151 .truncate(true)
152 .read(true)
153 .write(true)
154 .open(path)?;
155 file.set_len(snapshot_offset + snapshot.len() as u64)?;
156 write_at(&mut file, EMBEDDED_RDB_MANIFEST_OFFSET, &manifest_bytes)?;
157 if !snapshot.is_empty() {
158 write_at(&mut file, snapshot_offset, snapshot)?;
159 }
160
161 let base = EmbeddedRdbSuperblock {
162 copy_index: 0,
163 generation: 1,
164 format_version: DEFAULT_FORMAT_VERSION,
165 manifest_offset: EMBEDDED_RDB_MANIFEST_OFFSET,
166 manifest_len: manifest_bytes.len() as u64,
167 manifest_checksum,
168 wal_region_offset,
169 wal_region_bytes: WAL_REGION_BYTES,
170 wal_recovery_boundary: wal_region_offset,
171 snapshot_offset,
172 snapshot_bytes: snapshot.len() as u64,
173 snapshot_checksum: crc32(snapshot),
174 checksum: 0,
175 };
176 Self::write_superblock_copy(&mut file, &base)?;
177 Self::write_superblock_copy(
178 &mut file,
179 &EmbeddedRdbSuperblock {
180 copy_index: 1,
181 generation: 2,
182 ..base
183 },
184 )?;
185 file.sync_all()?;
186
187 Self::open(path)
188 }
189
190 pub fn open(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
191 Self::open_inner(path, true)
192 }
193
194 fn open_for_wal_append(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
195 Self::open_inner(path, false)
196 }
197
198 fn open_inner(
199 path: impl AsRef<Path>,
200 validate_snapshot_refs: bool,
201 ) -> RdbFileResult<EmbeddedRdbOpen> {
202 let path = path.as_ref();
203 let mut file = File::open(path)?;
204 let mut superblocks: Vec<EmbeddedRdbSuperblock> = [
205 read_superblock_copy(&mut file, 0),
206 read_superblock_copy(&mut file, 1),
207 ]
208 .into_iter()
209 .flatten()
210 .collect();
211 superblocks.sort_by_key(|superblock| std::cmp::Reverse(superblock.generation));
212
213 for selected_superblock in superblocks {
214 let manifest = match read_manifest(&mut file, selected_superblock) {
215 Ok(mut manifest) => {
216 manifest.wal_recovery_boundary = selected_superblock.wal_recovery_boundary;
217 manifest
218 }
219 Err(_) => manifest_from_superblock(selected_superblock),
220 };
221 if validate_snapshot_refs && !snapshot_reference_valid(&mut file, &manifest)? {
222 continue;
223 }
224 return Ok(EmbeddedRdbOpen {
225 path: path.to_path_buf(),
226 selected_superblock,
227 manifest,
228 });
229 }
230
231 Err(RdbFileError::InvalidOperation(
232 "no valid embedded superblock".into(),
233 ))
234 }
235
236 pub fn wal_payloads_encoded_len(payloads: &[Vec<u8>]) -> RdbFileResult<u64> {
237 let mut len = 0u64;
238 for payload in payloads {
239 let payload_len = u32::try_from(payload.len()).map_err(|_| {
240 RdbFileError::InvalidOperation("embedded wal payload too large".into())
241 })?;
242 let frame_len = WAL_FRAME_HEADER_BYTES as u64 + payload_len as u64;
243 len = len.checked_add(frame_len).ok_or_else(|| {
244 RdbFileError::InvalidOperation("embedded wal encoded length overflow".into())
245 })?;
246 }
247 Ok(len)
248 }
249
250 pub fn write_snapshot_with_wal_capacity(
251 path: impl AsRef<Path>,
252 snapshot: &[u8],
253 min_wal_bytes: u64,
254 ) -> RdbFileResult<EmbeddedRdbOpen> {
255 let path = path.as_ref();
256 let path_lock = embedded_path_lock(path);
257 let _path_guard = path_lock
258 .lock()
259 .unwrap_or_else(|poisoned| poisoned.into_inner());
260 let lock_file = OpenOptions::new().read(true).write(true).open(path)?;
261 lock_file.lock_exclusive()?;
262
263 let open = Self::open(path)?;
264 let wal_region_bytes =
265 grow_wal_region_bytes(open.manifest.wal_region_bytes, min_wal_bytes)?;
266 let snapshot_offset = next_snapshot_offset(path, &open, wal_region_bytes, snapshot)?;
267 let snapshot_checksum = crc32(snapshot);
268 let manifest = EmbeddedRdbManifest {
269 wal_region_bytes,
270 wal_recovery_boundary: open.manifest.wal_region_offset,
271 snapshot_offset,
272 snapshot_bytes: snapshot.len() as u64,
273 snapshot_checksum,
274 checksum: 0,
275 ..open.manifest
276 };
277 let manifest_bytes = encode_manifest(manifest);
278 let manifest_checksum = trailer_checksum(&manifest_bytes);
279
280 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
281 file.set_len(snapshot_offset + snapshot.len() as u64)?;
282 if !snapshot.is_empty() {
283 write_at(&mut file, snapshot_offset, snapshot)?;
284 }
285 crash_inject("snapshot_after_image_write");
286 file.sync_data()?;
287 crash_inject("snapshot_after_image_sync");
288 write_at(&mut file, EMBEDDED_RDB_MANIFEST_OFFSET, &manifest_bytes)?;
289 crash_inject("snapshot_after_manifest_write");
290
291 let next_copy_index = if open.selected_superblock.copy_index == 0 {
292 1
293 } else {
294 0
295 };
296 let next_superblock = EmbeddedRdbSuperblock {
297 copy_index: next_copy_index,
298 generation: open.selected_superblock.generation.saturating_add(1),
299 manifest_len: manifest_bytes.len() as u64,
300 manifest_checksum,
301 wal_region_bytes,
302 wal_recovery_boundary: open.manifest.wal_region_offset,
303 snapshot_offset,
304 snapshot_bytes: snapshot.len() as u64,
305 snapshot_checksum,
306 checksum: 0,
307 ..open.selected_superblock
308 };
309 Self::write_superblock_copy(&mut file, &next_superblock)?;
310 crash_inject("snapshot_after_superblock_write");
311 file.sync_all()?;
312 lock_file.unlock()?;
313 Self::open(path)
314 }
315
316 pub fn open_strict_manifest(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
317 let path = path.as_ref();
318 let mut file = File::open(path)?;
319 let selected_superblock = [
320 read_superblock_copy(&mut file, 0),
321 read_superblock_copy(&mut file, 1),
322 ]
323 .into_iter()
324 .flatten()
325 .max_by_key(|superblock| superblock.generation)
326 .ok_or_else(|| RdbFileError::InvalidOperation("no valid embedded superblock".into()))?;
327
328 let mut manifest = read_manifest(&mut file, selected_superblock)?;
329 manifest.wal_recovery_boundary = selected_superblock.wal_recovery_boundary;
330 Ok(EmbeddedRdbOpen {
331 path: path.to_path_buf(),
332 selected_superblock,
333 manifest,
334 })
335 }
336
337 pub fn read_snapshot(open: &EmbeddedRdbOpen) -> RdbFileResult<Option<Vec<u8>>> {
338 if open.manifest.snapshot_bytes == 0 {
339 return Ok(None);
340 }
341 let mut file = File::open(&open.path)?;
342 let mut bytes = vec![0u8; open.manifest.snapshot_bytes as usize];
343 file.seek(SeekFrom::Start(open.manifest.snapshot_offset))?;
344 file.read_exact(&mut bytes)?;
345 let checksum = crc32(&bytes);
346 if checksum != open.manifest.snapshot_checksum {
347 return Err(RdbFileError::InvalidOperation(format!(
348 "embedded snapshot checksum mismatch: stored {:#010x}, computed {:#010x}",
349 open.manifest.snapshot_checksum, checksum
350 )));
351 }
352 if bytes.len() >= SNAPSHOT_MAGIC.len() && &bytes[..SNAPSHOT_MAGIC.len()] != SNAPSHOT_MAGIC {
353 return Err(RdbFileError::InvalidOperation(
354 "invalid embedded snapshot magic".into(),
355 ));
356 }
357 Ok(Some(bytes))
358 }
359
360 pub fn write_snapshot(
361 path: impl AsRef<Path>,
362 snapshot: &[u8],
363 ) -> RdbFileResult<EmbeddedRdbOpen> {
364 Self::write_snapshot_with_wal_capacity(path, snapshot, 0)
365 }
366
367 pub fn read_wal_payloads(open: &EmbeddedRdbOpen) -> RdbFileResult<Vec<Vec<u8>>> {
368 Ok(scan_wal(open)?.payloads)
369 }
370
371 pub fn append_wal_payloads(
372 path: impl AsRef<Path>,
373 payloads: &[Vec<u8>],
374 ) -> RdbFileResult<EmbeddedRdbOpen> {
375 let path = path.as_ref();
376 if payloads.is_empty() {
377 return Self::open(path);
378 }
379
380 let path_lock = embedded_path_lock(path);
381 let _path_guard = path_lock
382 .lock()
383 .unwrap_or_else(|poisoned| poisoned.into_inner());
384 let lock_file = OpenOptions::new().read(true).write(true).open(path)?;
385 lock_file.lock_exclusive()?;
386
387 let open = Self::open_for_wal_append(path)?;
388 let wal_scan = scan_wal(&open)?;
389 let mut sequence = wal_scan.next_sequence;
390 let mut previous_frame_crc = wal_scan.previous_frame_crc;
391 let mut encoded = Vec::new();
392 for payload in payloads {
393 let (frame, frame_crc) = encode_wal_frame(sequence, previous_frame_crc, payload)?;
394 encoded.extend_from_slice(&frame);
395 previous_frame_crc = frame_crc;
396 sequence = sequence.saturating_add(1);
397 }
398
399 let wal_start = open.manifest.wal_region_offset;
400 let wal_end = wal_start.checked_add(wal_scan.valid_bytes).ok_or_else(|| {
401 RdbFileError::InvalidOperation("embedded wal boundary overflow".into())
402 })?;
403 let max_end = open
404 .manifest
405 .wal_region_offset
406 .saturating_add(open.manifest.wal_region_bytes);
407 let next_boundary = wal_end.checked_add(encoded.len() as u64).ok_or_else(|| {
408 RdbFileError::InvalidOperation("embedded wal boundary overflow".into())
409 })?;
410 if wal_end < wal_start || next_boundary > max_end {
411 return Err(RdbFileError::InvalidOperation(
412 "embedded wal region full".into(),
413 ));
414 }
415
416 let mut file = OpenOptions::new().read(true).write(true).open(path)?;
417 write_at(&mut file, wal_end, &encoded)?;
418 crash_inject("wal_after_frame_write");
419 file.sync_data()?;
420 crash_inject("wal_after_frame_sync");
421
422 let next_copy_index = if open.selected_superblock.copy_index == 0 {
423 1
424 } else {
425 0
426 };
427 let next_superblock = EmbeddedRdbSuperblock {
428 copy_index: next_copy_index,
429 generation: open.selected_superblock.generation.saturating_add(1),
430 wal_recovery_boundary: next_boundary,
431 checksum: 0,
432 ..open.selected_superblock
433 };
434 Self::write_superblock_copy(&mut file, &next_superblock)?;
435 crash_inject("wal_after_superblock_write");
436 file.sync_all()?;
437 lock_file.unlock()?;
438 Self::open(path)
439 }
440
441 pub fn write_superblock_copy(
442 file: &mut File,
443 superblock: &EmbeddedRdbSuperblock,
444 ) -> RdbFileResult<()> {
445 let offset = superblock_offset(superblock.copy_index)?;
446 write_at(file, offset, &encode_superblock(*superblock)?)?;
447 Ok(())
448 }
449}
450
451fn read_superblock_copy(file: &mut File, copy_index: u8) -> Option<EmbeddedRdbSuperblock> {
452 let offset = superblock_offset(copy_index).ok()?;
453 let mut bytes = vec![0u8; EMBEDDED_RDB_SUPERBLOCK_SIZE as usize];
454 file.seek(SeekFrom::Start(offset)).ok()?;
455 file.read_exact(&mut bytes).ok()?;
456 decode_superblock(copy_index, &bytes).ok()
457}
458
459fn read_manifest(
460 file: &mut File,
461 superblock: EmbeddedRdbSuperblock,
462) -> RdbFileResult<EmbeddedRdbManifest> {
463 if superblock.manifest_len < CHECKSUM_LEN as u64
464 || superblock.manifest_len > MANIFEST_REGION_BYTES
465 {
466 return Err(RdbFileError::InvalidOperation(format!(
467 "invalid embedded manifest length {}",
468 superblock.manifest_len
469 )));
470 }
471
472 let mut bytes = vec![0u8; superblock.manifest_len as usize];
473 file.seek(SeekFrom::Start(superblock.manifest_offset))?;
474 file.read_exact(&mut bytes)?;
475 let checksum = trailer_checksum(&bytes);
476 if checksum != superblock.manifest_checksum {
477 return Err(RdbFileError::InvalidOperation(format!(
478 "embedded manifest checksum mismatch: stored {:#010x}, computed {:#010x}",
479 superblock.manifest_checksum, checksum
480 )));
481 }
482 decode_manifest(&bytes)
483}
484
485fn snapshot_reference_valid(
486 file: &mut File,
487 manifest: &EmbeddedRdbManifest,
488) -> RdbFileResult<bool> {
489 if manifest.snapshot_bytes == 0 {
490 return Ok(true);
491 }
492 let snapshot_end = manifest
493 .snapshot_offset
494 .checked_add(manifest.snapshot_bytes)
495 .ok_or_else(|| RdbFileError::InvalidOperation("embedded snapshot end overflow".into()))?;
496 if snapshot_end > file.metadata()?.len() {
497 return Ok(false);
498 }
499
500 let mut bytes = vec![0u8; manifest.snapshot_bytes as usize];
501 file.seek(SeekFrom::Start(manifest.snapshot_offset))?;
502 if file.read_exact(&mut bytes).is_err() {
503 return Ok(false);
504 }
505 if crc32(&bytes) != manifest.snapshot_checksum {
506 return Ok(false);
507 }
508 if bytes.len() >= SNAPSHOT_MAGIC.len() && &bytes[..SNAPSHOT_MAGIC.len()] != SNAPSHOT_MAGIC {
509 return Ok(false);
510 }
511 Ok(true)
512}
513
514fn manifest_from_superblock(superblock: EmbeddedRdbSuperblock) -> EmbeddedRdbManifest {
515 EmbeddedRdbManifest {
516 version: MANIFEST_VERSION,
517 wal_region_offset: superblock.wal_region_offset,
518 wal_region_bytes: superblock.wal_region_bytes,
519 wal_recovery_boundary: superblock.wal_recovery_boundary,
520 snapshot_offset: superblock.snapshot_offset,
521 snapshot_bytes: superblock.snapshot_bytes,
522 snapshot_checksum: superblock.snapshot_checksum,
523 created_at_unix_ms: 0,
524 checksum: 0,
525 }
526}
527
528fn grow_wal_region_bytes(current: u64, min_required: u64) -> RdbFileResult<u64> {
529 let mut next = current.max(WAL_REGION_BYTES);
530 while next < min_required {
531 next = next.checked_mul(2).ok_or_else(|| {
532 RdbFileError::InvalidOperation("embedded wal region size overflow".into())
533 })?;
534 }
535 Ok(next)
536}
537
538fn embedded_path_lock(path: &Path) -> Arc<Mutex<()>> {
539 static LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
540 let key = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
541 let mut locks = LOCKS
542 .get_or_init(|| Mutex::new(HashMap::new()))
543 .lock()
544 .unwrap_or_else(|poisoned| poisoned.into_inner());
545 locks
546 .entry(key)
547 .or_insert_with(|| Arc::new(Mutex::new(())))
548 .clone()
549}
550
551fn next_snapshot_offset(
552 path: &Path,
553 open: &EmbeddedRdbOpen,
554 wal_region_bytes: u64,
555 snapshot: &[u8],
556) -> RdbFileResult<u64> {
557 let base = open
558 .manifest
559 .wal_region_offset
560 .checked_add(wal_region_bytes)
561 .ok_or_else(|| {
562 RdbFileError::InvalidOperation("embedded snapshot offset overflow".into())
563 })?;
564 if open.manifest.snapshot_bytes == 0 && snapshot.is_empty() {
565 return Ok(base);
566 }
567
568 let file_len = std::fs::metadata(path).map(|metadata| metadata.len())?;
569 let active_snapshot_end = open
570 .manifest
571 .snapshot_offset
572 .checked_add(open.manifest.snapshot_bytes)
573 .ok_or_else(|| RdbFileError::InvalidOperation("embedded snapshot end overflow".into()))?;
574 align_up(
575 file_len.max(active_snapshot_end).max(base),
576 SNAPSHOT_ALIGNMENT,
577 )
578}
579
580fn align_up(value: u64, alignment: u64) -> RdbFileResult<u64> {
581 if alignment == 0 {
582 return Ok(value);
583 }
584 let remainder = value % alignment;
585 if remainder == 0 {
586 return Ok(value);
587 }
588 value
589 .checked_add(alignment - remainder)
590 .ok_or_else(|| RdbFileError::InvalidOperation("embedded alignment overflow".into()))
591}
592
593fn scan_wal(open: &EmbeddedRdbOpen) -> RdbFileResult<WalScan> {
594 let wal_start = open.manifest.wal_region_offset;
595 let wal_end = open.manifest.wal_recovery_boundary;
596 let max_end = open
597 .manifest
598 .wal_region_offset
599 .saturating_add(open.manifest.wal_region_bytes);
600 if wal_end < wal_start || wal_end > max_end {
601 return Err(RdbFileError::InvalidOperation(format!(
602 "invalid embedded wal boundary {wal_end}"
603 )));
604 }
605 if wal_end == wal_start {
606 return Ok(WalScan {
607 next_sequence: 1,
608 ..WalScan::default()
609 });
610 }
611
612 let mut file = File::open(&open.path)?;
613 let file_len = file.metadata()?.len();
614 if file_len <= wal_start {
615 return Ok(WalScan {
616 next_sequence: 1,
617 ..WalScan::default()
618 });
619 }
620 let read_end = wal_end.min(file_len);
621 file.seek(SeekFrom::Start(wal_start))?;
622 let mut bytes = vec![0u8; (read_end - wal_start) as usize];
623 file.read_exact(&mut bytes)?;
624 Ok(scan_wal_bytes(&bytes))
625}
626
627fn scan_wal_bytes(bytes: &[u8]) -> WalScan {
628 let mut scan = WalScan {
629 next_sequence: 1,
630 ..WalScan::default()
631 };
632 let mut cursor = 0usize;
633 while cursor < bytes.len() {
634 let Some(frame) = decode_next_wal_frame(bytes, cursor, &scan) else {
635 break;
636 };
637 scan.payloads.push(frame.payload);
638 scan.next_sequence = scan.next_sequence.saturating_add(1);
639 scan.previous_frame_crc = frame.frame_crc;
640 cursor = frame.end;
641 scan.valid_bytes = cursor as u64;
642 }
643 scan
644}
645
646struct DecodedWalFrame {
647 payload: Vec<u8>,
648 frame_crc: u32,
649 end: usize,
650}
651
652fn decode_next_wal_frame(bytes: &[u8], start: usize, scan: &WalScan) -> Option<DecodedWalFrame> {
653 let remaining = bytes.len().checked_sub(start)?;
654 if remaining < WAL_FRAME_MAGIC.len() {
655 return None;
656 }
657 if &bytes[start..start + WAL_FRAME_MAGIC.len()] != WAL_FRAME_MAGIC {
658 return None;
659 }
660 if remaining < WAL_FRAME_MAGIC.len() + 2 {
661 return None;
662 }
663 let version_offset = start + WAL_FRAME_MAGIC.len();
664 let version = u16::from_le_bytes(bytes[version_offset..version_offset + 2].try_into().ok()?);
665 if version == WAL_FRAME_VERSION {
666 decode_v2_wal_frame(bytes, start, scan)
667 } else {
668 decode_legacy_wal_frame(bytes, start)
669 }
670}
671
672fn decode_v2_wal_frame(bytes: &[u8], start: usize, scan: &WalScan) -> Option<DecodedWalFrame> {
673 if bytes.len().checked_sub(start)? < WAL_FRAME_HEADER_BYTES {
674 return None;
675 }
676 let header_len_offset = start + 10;
677 let header_len = u16::from_le_bytes(
678 bytes[header_len_offset..header_len_offset + 2]
679 .try_into()
680 .ok()?,
681 ) as usize;
682 if header_len != WAL_FRAME_HEADER_BYTES {
683 return None;
684 }
685 let sequence_offset = start + 12;
686 let sequence = u64::from_le_bytes(
687 bytes[sequence_offset..sequence_offset + 8]
688 .try_into()
689 .ok()?,
690 );
691 if sequence != scan.next_sequence {
692 return None;
693 }
694 let payload_len_offset = start + 20;
695 let payload_len = u32::from_le_bytes(
696 bytes[payload_len_offset..payload_len_offset + 4]
697 .try_into()
698 .ok()?,
699 ) as usize;
700 let payload_crc_offset = start + 24;
701 let payload_crc = u32::from_le_bytes(
702 bytes[payload_crc_offset..payload_crc_offset + 4]
703 .try_into()
704 .ok()?,
705 );
706 let previous_frame_crc_offset = start + 28;
707 let previous_frame_crc = u32::from_le_bytes(
708 bytes[previous_frame_crc_offset..previous_frame_crc_offset + 4]
709 .try_into()
710 .ok()?,
711 );
712 if previous_frame_crc != scan.previous_frame_crc {
713 return None;
714 }
715 let header_crc_offset = start + 32;
716 let header_crc = u32::from_le_bytes(
717 bytes[header_crc_offset..header_crc_offset + 4]
718 .try_into()
719 .ok()?,
720 );
721 if header_crc != crc32(&bytes[start..header_crc_offset]) {
722 return None;
723 }
724 let payload_start = start.checked_add(header_len)?;
725 let end = payload_start.checked_add(payload_len)?;
726 if end > bytes.len() {
727 return None;
728 }
729 let payload = bytes[payload_start..end].to_vec();
730 if crc32(&payload) != payload_crc {
731 return None;
732 }
733 Some(DecodedWalFrame {
734 payload,
735 frame_crc: crc32(&bytes[start..end]),
736 end,
737 })
738}
739
740fn decode_legacy_wal_frame(bytes: &[u8], start: usize) -> Option<DecodedWalFrame> {
741 if bytes.len().checked_sub(start)? < LEGACY_WAL_FRAME_HEADER_BYTES {
742 return None;
743 }
744 let payload_len_offset = start + WAL_FRAME_MAGIC.len();
745 let payload_len = u32::from_le_bytes(
746 bytes[payload_len_offset..payload_len_offset + 4]
747 .try_into()
748 .ok()?,
749 ) as usize;
750 let payload_crc_offset = payload_len_offset + 4;
751 let payload_crc = u32::from_le_bytes(
752 bytes[payload_crc_offset..payload_crc_offset + 4]
753 .try_into()
754 .ok()?,
755 );
756 let payload_start = start.checked_add(LEGACY_WAL_FRAME_HEADER_BYTES)?;
757 let end = payload_start.checked_add(payload_len)?;
758 if end > bytes.len() {
759 return None;
760 }
761 let payload = bytes[payload_start..end].to_vec();
762 if crc32(&payload) != payload_crc {
763 return None;
764 }
765 Some(DecodedWalFrame {
766 payload,
767 frame_crc: crc32(&bytes[start..end]),
768 end,
769 })
770}
771
772fn encode_wal_frame(
773 sequence: u64,
774 previous_frame_crc: u32,
775 payload: &[u8],
776) -> RdbFileResult<(Vec<u8>, u32)> {
777 let payload_len = u32::try_from(payload.len())
778 .map_err(|_| RdbFileError::InvalidOperation("embedded wal payload too large".into()))?;
779 let mut frame = Vec::with_capacity(WAL_FRAME_HEADER_BYTES + payload.len());
780 frame.extend_from_slice(WAL_FRAME_MAGIC);
781 frame.extend_from_slice(&WAL_FRAME_VERSION.to_le_bytes());
782 frame.extend_from_slice(&(WAL_FRAME_HEADER_BYTES as u16).to_le_bytes());
783 frame.extend_from_slice(&sequence.to_le_bytes());
784 frame.extend_from_slice(&payload_len.to_le_bytes());
785 frame.extend_from_slice(&crc32(payload).to_le_bytes());
786 frame.extend_from_slice(&previous_frame_crc.to_le_bytes());
787 let header_crc = crc32(&frame);
788 frame.extend_from_slice(&header_crc.to_le_bytes());
789 frame.extend_from_slice(payload);
790 let frame_crc = crc32(&frame);
791 Ok((frame, frame_crc))
792}
793
794fn encode_superblock(superblock: EmbeddedRdbSuperblock) -> RdbFileResult<Vec<u8>> {
795 let mut bytes = vec![0u8; EMBEDDED_RDB_SUPERBLOCK_SIZE as usize];
796 let mut cursor = 0usize;
797 put_bytes(&mut bytes, &mut cursor, SUPERBLOCK_MAGIC);
798 put_u32(&mut bytes, &mut cursor, SUPERBLOCK_VERSION);
799 put_u8(&mut bytes, &mut cursor, superblock.copy_index);
800 put_u64(&mut bytes, &mut cursor, superblock.generation);
801 put_u32(&mut bytes, &mut cursor, superblock.format_version);
802 put_u64(&mut bytes, &mut cursor, superblock.manifest_offset);
803 put_u64(&mut bytes, &mut cursor, superblock.manifest_len);
804 put_u32(&mut bytes, &mut cursor, superblock.manifest_checksum);
805 put_u64(&mut bytes, &mut cursor, superblock.wal_region_offset);
806 put_u64(&mut bytes, &mut cursor, superblock.wal_region_bytes);
807 put_u64(&mut bytes, &mut cursor, superblock.wal_recovery_boundary);
808 put_u64(&mut bytes, &mut cursor, superblock.snapshot_offset);
809 put_u64(&mut bytes, &mut cursor, superblock.snapshot_bytes);
810 put_u32(&mut bytes, &mut cursor, superblock.snapshot_checksum);
811
812 let checksum_offset = bytes.len() - CHECKSUM_LEN;
813 let checksum = crc32(&bytes[..checksum_offset]);
814 bytes[checksum_offset..].copy_from_slice(&checksum.to_le_bytes());
815 Ok(bytes)
816}
817
818fn decode_superblock(copy_index: u8, bytes: &[u8]) -> RdbFileResult<EmbeddedRdbSuperblock> {
819 if bytes.len() != EMBEDDED_RDB_SUPERBLOCK_SIZE as usize {
820 return Err(RdbFileError::InvalidOperation(
821 "invalid embedded superblock size".into(),
822 ));
823 }
824 let checksum_offset = bytes.len() - CHECKSUM_LEN;
825 let stored_checksum = u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap());
826 let computed_checksum = crc32(&bytes[..checksum_offset]);
827 if stored_checksum != computed_checksum {
828 return Err(RdbFileError::InvalidOperation(
829 "embedded superblock checksum mismatch".into(),
830 ));
831 }
832
833 let mut cursor = 0usize;
834 if take_bytes(bytes, &mut cursor, SUPERBLOCK_MAGIC.len())? != SUPERBLOCK_MAGIC {
835 return Err(RdbFileError::InvalidOperation(
836 "invalid embedded superblock magic".into(),
837 ));
838 }
839 let version = take_u32(bytes, &mut cursor)?;
840 if version != SUPERBLOCK_VERSION {
841 return Err(RdbFileError::InvalidOperation(format!(
842 "unsupported embedded superblock version {version}"
843 )));
844 }
845 let stored_copy_index = take_u8(bytes, &mut cursor)?;
846 if stored_copy_index != copy_index {
847 return Err(RdbFileError::InvalidOperation(
848 "embedded superblock copy index mismatch".into(),
849 ));
850 }
851
852 Ok(EmbeddedRdbSuperblock {
853 copy_index: stored_copy_index,
854 generation: take_u64(bytes, &mut cursor)?,
855 format_version: take_u32(bytes, &mut cursor)?,
856 manifest_offset: take_u64(bytes, &mut cursor)?,
857 manifest_len: take_u64(bytes, &mut cursor)?,
858 manifest_checksum: take_u32(bytes, &mut cursor)?,
859 wal_region_offset: take_u64(bytes, &mut cursor)?,
860 wal_region_bytes: take_u64(bytes, &mut cursor)?,
861 wal_recovery_boundary: take_u64(bytes, &mut cursor)?,
862 snapshot_offset: take_u64(bytes, &mut cursor)?,
863 snapshot_bytes: take_u64(bytes, &mut cursor)?,
864 snapshot_checksum: take_u32(bytes, &mut cursor)?,
865 checksum: stored_checksum,
866 })
867}
868
869fn encode_manifest(manifest: EmbeddedRdbManifest) -> Vec<u8> {
870 let mut bytes = vec![0u8; 8 + 4 + 8 + 8 + 8 + 8 + 8 + 4 + 16 + CHECKSUM_LEN];
871 let mut cursor = 0usize;
872 put_bytes(&mut bytes, &mut cursor, MANIFEST_MAGIC);
873 put_u32(&mut bytes, &mut cursor, manifest.version);
874 put_u64(&mut bytes, &mut cursor, manifest.wal_region_offset);
875 put_u64(&mut bytes, &mut cursor, manifest.wal_region_bytes);
876 put_u64(&mut bytes, &mut cursor, manifest.wal_recovery_boundary);
877 put_u64(&mut bytes, &mut cursor, manifest.snapshot_offset);
878 put_u64(&mut bytes, &mut cursor, manifest.snapshot_bytes);
879 put_u32(&mut bytes, &mut cursor, manifest.snapshot_checksum);
880 put_u128(&mut bytes, &mut cursor, manifest.created_at_unix_ms);
881
882 let checksum_offset = bytes.len() - CHECKSUM_LEN;
883 let checksum = crc32(&bytes[..checksum_offset]);
884 bytes[checksum_offset..].copy_from_slice(&checksum.to_le_bytes());
885 bytes
886}
887
888fn decode_manifest(bytes: &[u8]) -> RdbFileResult<EmbeddedRdbManifest> {
889 let checksum_offset = bytes
890 .len()
891 .checked_sub(CHECKSUM_LEN)
892 .ok_or_else(|| RdbFileError::InvalidOperation("embedded manifest too short".into()))?;
893 let stored_checksum = u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap());
894 let computed_checksum = crc32(&bytes[..checksum_offset]);
895 if stored_checksum != computed_checksum {
896 return Err(RdbFileError::InvalidOperation(
897 "embedded manifest checksum mismatch".into(),
898 ));
899 }
900
901 let mut cursor = 0usize;
902 if take_bytes(bytes, &mut cursor, MANIFEST_MAGIC.len())? != MANIFEST_MAGIC {
903 return Err(RdbFileError::InvalidOperation(
904 "invalid embedded manifest magic".into(),
905 ));
906 }
907 let version = take_u32(bytes, &mut cursor)?;
908 if version != MANIFEST_VERSION {
909 return Err(RdbFileError::InvalidOperation(format!(
910 "unsupported embedded manifest version {version}"
911 )));
912 }
913 Ok(EmbeddedRdbManifest {
914 version,
915 wal_region_offset: take_u64(bytes, &mut cursor)?,
916 wal_region_bytes: take_u64(bytes, &mut cursor)?,
917 wal_recovery_boundary: take_u64(bytes, &mut cursor)?,
918 snapshot_offset: take_u64(bytes, &mut cursor)?,
919 snapshot_bytes: take_u64(bytes, &mut cursor)?,
920 snapshot_checksum: take_u32(bytes, &mut cursor)?,
921 created_at_unix_ms: take_u128(bytes, &mut cursor)?,
922 checksum: stored_checksum,
923 })
924}
925
926fn trailer_checksum(bytes: &[u8]) -> u32 {
927 let checksum_offset = bytes.len() - CHECKSUM_LEN;
928 u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap())
929}
930
931fn superblock_offset(copy_index: u8) -> RdbFileResult<u64> {
932 match copy_index {
933 0 => Ok(EMBEDDED_RDB_SUPERBLOCK_0_OFFSET),
934 1 => Ok(EMBEDDED_RDB_SUPERBLOCK_1_OFFSET),
935 _ => Err(RdbFileError::InvalidOperation(format!(
936 "invalid embedded superblock copy index {copy_index}"
937 ))),
938 }
939}
940
941fn write_at(file: &mut File, offset: u64, bytes: &[u8]) -> RdbFileResult<()> {
942 file.seek(SeekFrom::Start(offset))?;
943 file.write_all(bytes)?;
944 Ok(())
945}
946
947fn crash_inject(point: &str) {
948 if std::env::var(CRASH_INJECT_ENV).ok().as_deref() == Some(point) {
949 std::process::exit(173);
950 }
951}
952
953fn now_unix_ms() -> u128 {
954 SystemTime::now()
955 .duration_since(UNIX_EPOCH)
956 .map(|duration| duration.as_millis())
957 .unwrap_or(0)
958}
959
960fn put_bytes(target: &mut [u8], cursor: &mut usize, value: &[u8]) {
961 target[*cursor..*cursor + value.len()].copy_from_slice(value);
962 *cursor += value.len();
963}
964
965fn put_u8(target: &mut [u8], cursor: &mut usize, value: u8) {
966 target[*cursor] = value;
967 *cursor += 1;
968}
969
970fn put_u32(target: &mut [u8], cursor: &mut usize, value: u32) {
971 put_bytes(target, cursor, &value.to_le_bytes());
972}
973
974fn put_u64(target: &mut [u8], cursor: &mut usize, value: u64) {
975 put_bytes(target, cursor, &value.to_le_bytes());
976}
977
978fn put_u128(target: &mut [u8], cursor: &mut usize, value: u128) {
979 put_bytes(target, cursor, &value.to_le_bytes());
980}
981
982fn take_bytes<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> RdbFileResult<&'a [u8]> {
983 let end = cursor.checked_add(len).ok_or_else(|| {
984 RdbFileError::InvalidOperation("embedded artifact cursor overflow".into())
985 })?;
986 if end > bytes.len() {
987 return Err(RdbFileError::InvalidOperation(
988 "embedded artifact truncated".into(),
989 ));
990 }
991 let value = &bytes[*cursor..end];
992 *cursor = end;
993 Ok(value)
994}
995
996fn take_u8(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u8> {
997 Ok(take_bytes(bytes, cursor, 1)?[0])
998}
999
1000fn take_u32(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u32> {
1001 Ok(u32::from_le_bytes(
1002 take_bytes(bytes, cursor, 4)?.try_into().unwrap(),
1003 ))
1004}
1005
1006fn take_u64(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u64> {
1007 Ok(u64::from_le_bytes(
1008 take_bytes(bytes, cursor, 8)?.try_into().unwrap(),
1009 ))
1010}
1011
1012fn take_u128(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u128> {
1013 Ok(u128::from_le_bytes(
1014 take_bytes(bytes, cursor, 16)?.try_into().unwrap(),
1015 ))
1016}