Skip to main content

reddb_file/
embedded.rs

1//! Embedded single-file `.rdb` artifact.
2//!
3//! This module models the promoted path where one durable database artifact
4//! carries its superblock pair, internal manifest, WAL reservation, and current
5//! store snapshot inside the `.rdb` file itself.
6
7use std::collections::HashMap;
8use std::fs::{self, File, OpenOptions};
9use std::io::{Read, Seek, SeekFrom, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex, OnceLock};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use fs2::FileExt;
15
16pub type RdbFileResult<T> = Result<T, RdbFileError>;
17
18pub const DEFAULT_FORMAT_VERSION: u32 = 1;
19
20#[derive(Debug)]
21pub enum RdbFileError {
22    InvalidOperation(String),
23    Io(std::io::Error),
24}
25
26impl std::fmt::Display for RdbFileError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            Self::InvalidOperation(msg) => write!(f, "INVALID_OPERATION: {msg}"),
30            Self::Io(err) => write!(f, "io error: {err}"),
31        }
32    }
33}
34
35impl std::error::Error for RdbFileError {}
36
37impl From<std::io::Error> for RdbFileError {
38    fn from(err: std::io::Error) -> Self {
39        Self::Io(err)
40    }
41}
42
43fn crc32(data: &[u8]) -> u32 {
44    let mut hasher = crc32fast::Hasher::new();
45    hasher.update(data);
46    hasher.finalize()
47}
48
49pub const EMBEDDED_RDB_SUPERBLOCK_SIZE: u64 = 4096;
50pub const EMBEDDED_RDB_SUPERBLOCK_0_OFFSET: u64 = 0;
51pub const EMBEDDED_RDB_SUPERBLOCK_1_OFFSET: u64 = EMBEDDED_RDB_SUPERBLOCK_SIZE;
52pub const EMBEDDED_RDB_MANIFEST_OFFSET: u64 = EMBEDDED_RDB_SUPERBLOCK_SIZE * 2;
53
54const SUPERBLOCK_MAGIC: &[u8; 8] = b"RDBSBLK1";
55const MANIFEST_MAGIC: &[u8; 8] = b"RDBMNFS1";
56const SUPERBLOCK_VERSION: u32 = 1;
57const MANIFEST_VERSION: u32 = 1;
58const CHECKSUM_LEN: usize = 4;
59const MANIFEST_REGION_BYTES: u64 = 4096;
60const WAL_REGION_BYTES: u64 = 64 * 1024;
61const SNAPSHOT_ALIGNMENT: u64 = 4096;
62const SNAPSHOT_MAGIC: &[u8; 4] = b"RDST";
63const WAL_FRAME_MAGIC: &[u8; 8] = b"RDBEWAL1";
64const WAL_FRAME_VERSION: u16 = 2;
65const WAL_FRAME_HEADER_BYTES: usize = 8 + 2 + 2 + 8 + 4 + 4 + 4 + 4;
66const LEGACY_WAL_FRAME_HEADER_BYTES: usize = 8 + 4 + 4;
67const CRASH_INJECT_ENV: &str = "REDDB_EMBEDDED_RDB_CRASH_AT";
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct EmbeddedRdbManifest {
71    pub version: u32,
72    pub wal_region_offset: u64,
73    pub wal_region_bytes: u64,
74    pub wal_recovery_boundary: u64,
75    pub snapshot_offset: u64,
76    pub snapshot_bytes: u64,
77    pub snapshot_checksum: u32,
78    pub created_at_unix_ms: u128,
79    pub checksum: u32,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub struct EmbeddedRdbSuperblock {
84    pub copy_index: u8,
85    pub generation: u64,
86    pub format_version: u32,
87    pub manifest_offset: u64,
88    pub manifest_len: u64,
89    pub manifest_checksum: u32,
90    pub wal_region_offset: u64,
91    pub wal_region_bytes: u64,
92    pub wal_recovery_boundary: u64,
93    pub snapshot_offset: u64,
94    pub snapshot_bytes: u64,
95    pub snapshot_checksum: u32,
96    pub checksum: u32,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct EmbeddedRdbOpen {
101    pub path: PathBuf,
102    pub selected_superblock: EmbeddedRdbSuperblock,
103    pub manifest: EmbeddedRdbManifest,
104}
105
106#[derive(Debug, Default)]
107struct WalScan {
108    payloads: Vec<Vec<u8>>,
109    next_sequence: u64,
110    previous_frame_crc: u32,
111    valid_bytes: u64,
112}
113
114pub struct EmbeddedRdbArtifact;
115
116impl EmbeddedRdbArtifact {
117    pub fn create(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
118        Self::create_with_snapshot(path, &[])
119    }
120
121    pub fn create_with_snapshot(
122        path: impl AsRef<Path>,
123        snapshot: &[u8],
124    ) -> RdbFileResult<EmbeddedRdbOpen> {
125        let path = path.as_ref();
126        if let Some(parent) = path.parent() {
127            if !parent.as_os_str().is_empty() {
128                fs::create_dir_all(parent)?;
129            }
130        }
131
132        let created_at_unix_ms = now_unix_ms();
133        let wal_region_offset = EMBEDDED_RDB_MANIFEST_OFFSET + MANIFEST_REGION_BYTES;
134        let snapshot_offset = wal_region_offset + WAL_REGION_BYTES;
135        let manifest = EmbeddedRdbManifest {
136            version: MANIFEST_VERSION,
137            wal_region_offset,
138            wal_region_bytes: WAL_REGION_BYTES,
139            wal_recovery_boundary: wal_region_offset,
140            snapshot_offset,
141            snapshot_bytes: snapshot.len() as u64,
142            snapshot_checksum: crc32(snapshot),
143            created_at_unix_ms,
144            checksum: 0,
145        };
146        let manifest_bytes = encode_manifest(manifest);
147        let manifest_checksum = trailer_checksum(&manifest_bytes);
148
149        let mut file = OpenOptions::new()
150            .create(true)
151            .truncate(true)
152            .read(true)
153            .write(true)
154            .open(path)?;
155        file.set_len(snapshot_offset + snapshot.len() as u64)?;
156        write_at(&mut file, EMBEDDED_RDB_MANIFEST_OFFSET, &manifest_bytes)?;
157        if !snapshot.is_empty() {
158            write_at(&mut file, snapshot_offset, snapshot)?;
159        }
160
161        let base = EmbeddedRdbSuperblock {
162            copy_index: 0,
163            generation: 1,
164            format_version: DEFAULT_FORMAT_VERSION,
165            manifest_offset: EMBEDDED_RDB_MANIFEST_OFFSET,
166            manifest_len: manifest_bytes.len() as u64,
167            manifest_checksum,
168            wal_region_offset,
169            wal_region_bytes: WAL_REGION_BYTES,
170            wal_recovery_boundary: wal_region_offset,
171            snapshot_offset,
172            snapshot_bytes: snapshot.len() as u64,
173            snapshot_checksum: crc32(snapshot),
174            checksum: 0,
175        };
176        Self::write_superblock_copy(&mut file, &base)?;
177        Self::write_superblock_copy(
178            &mut file,
179            &EmbeddedRdbSuperblock {
180                copy_index: 1,
181                generation: 2,
182                ..base
183            },
184        )?;
185        file.sync_all()?;
186
187        Self::open(path)
188    }
189
190    pub fn open(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
191        Self::open_inner(path, true)
192    }
193
194    fn open_for_wal_append(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
195        Self::open_inner(path, false)
196    }
197
198    fn open_inner(
199        path: impl AsRef<Path>,
200        validate_snapshot_refs: bool,
201    ) -> RdbFileResult<EmbeddedRdbOpen> {
202        let path = path.as_ref();
203        let mut file = File::open(path)?;
204        let mut superblocks: Vec<EmbeddedRdbSuperblock> = [
205            read_superblock_copy(&mut file, 0),
206            read_superblock_copy(&mut file, 1),
207        ]
208        .into_iter()
209        .flatten()
210        .collect();
211        superblocks.sort_by_key(|superblock| std::cmp::Reverse(superblock.generation));
212
213        for selected_superblock in superblocks {
214            let manifest = match read_manifest(&mut file, selected_superblock) {
215                Ok(mut manifest) => {
216                    manifest.wal_recovery_boundary = selected_superblock.wal_recovery_boundary;
217                    manifest
218                }
219                Err(_) => manifest_from_superblock(selected_superblock),
220            };
221            if validate_snapshot_refs && !snapshot_reference_valid(&mut file, &manifest)? {
222                continue;
223            }
224            return Ok(EmbeddedRdbOpen {
225                path: path.to_path_buf(),
226                selected_superblock,
227                manifest,
228            });
229        }
230
231        Err(RdbFileError::InvalidOperation(
232            "no valid embedded superblock".into(),
233        ))
234    }
235
236    pub fn wal_payloads_encoded_len(payloads: &[Vec<u8>]) -> RdbFileResult<u64> {
237        let mut len = 0u64;
238        for payload in payloads {
239            let payload_len = u32::try_from(payload.len()).map_err(|_| {
240                RdbFileError::InvalidOperation("embedded wal payload too large".into())
241            })?;
242            let frame_len = WAL_FRAME_HEADER_BYTES as u64 + payload_len as u64;
243            len = len.checked_add(frame_len).ok_or_else(|| {
244                RdbFileError::InvalidOperation("embedded wal encoded length overflow".into())
245            })?;
246        }
247        Ok(len)
248    }
249
250    pub fn write_snapshot_with_wal_capacity(
251        path: impl AsRef<Path>,
252        snapshot: &[u8],
253        min_wal_bytes: u64,
254    ) -> RdbFileResult<EmbeddedRdbOpen> {
255        let path = path.as_ref();
256        let path_lock = embedded_path_lock(path);
257        let _path_guard = path_lock
258            .lock()
259            .unwrap_or_else(|poisoned| poisoned.into_inner());
260        let lock_file = OpenOptions::new().read(true).write(true).open(path)?;
261        lock_file.lock_exclusive()?;
262
263        let open = Self::open(path)?;
264        let wal_region_bytes =
265            grow_wal_region_bytes(open.manifest.wal_region_bytes, min_wal_bytes)?;
266        let snapshot_offset = next_snapshot_offset(path, &open, wal_region_bytes, snapshot)?;
267        let snapshot_checksum = crc32(snapshot);
268        let manifest = EmbeddedRdbManifest {
269            wal_region_bytes,
270            wal_recovery_boundary: open.manifest.wal_region_offset,
271            snapshot_offset,
272            snapshot_bytes: snapshot.len() as u64,
273            snapshot_checksum,
274            checksum: 0,
275            ..open.manifest
276        };
277        let manifest_bytes = encode_manifest(manifest);
278        let manifest_checksum = trailer_checksum(&manifest_bytes);
279
280        let mut file = OpenOptions::new().read(true).write(true).open(path)?;
281        file.set_len(snapshot_offset + snapshot.len() as u64)?;
282        if !snapshot.is_empty() {
283            write_at(&mut file, snapshot_offset, snapshot)?;
284        }
285        crash_inject("snapshot_after_image_write");
286        file.sync_data()?;
287        crash_inject("snapshot_after_image_sync");
288        write_at(&mut file, EMBEDDED_RDB_MANIFEST_OFFSET, &manifest_bytes)?;
289        crash_inject("snapshot_after_manifest_write");
290
291        let next_copy_index = if open.selected_superblock.copy_index == 0 {
292            1
293        } else {
294            0
295        };
296        let next_superblock = EmbeddedRdbSuperblock {
297            copy_index: next_copy_index,
298            generation: open.selected_superblock.generation.saturating_add(1),
299            manifest_len: manifest_bytes.len() as u64,
300            manifest_checksum,
301            wal_region_bytes,
302            wal_recovery_boundary: open.manifest.wal_region_offset,
303            snapshot_offset,
304            snapshot_bytes: snapshot.len() as u64,
305            snapshot_checksum,
306            checksum: 0,
307            ..open.selected_superblock
308        };
309        Self::write_superblock_copy(&mut file, &next_superblock)?;
310        crash_inject("snapshot_after_superblock_write");
311        file.sync_all()?;
312        lock_file.unlock()?;
313        Self::open(path)
314    }
315
316    pub fn open_strict_manifest(path: impl AsRef<Path>) -> RdbFileResult<EmbeddedRdbOpen> {
317        let path = path.as_ref();
318        let mut file = File::open(path)?;
319        let selected_superblock = [
320            read_superblock_copy(&mut file, 0),
321            read_superblock_copy(&mut file, 1),
322        ]
323        .into_iter()
324        .flatten()
325        .max_by_key(|superblock| superblock.generation)
326        .ok_or_else(|| RdbFileError::InvalidOperation("no valid embedded superblock".into()))?;
327
328        let mut manifest = read_manifest(&mut file, selected_superblock)?;
329        manifest.wal_recovery_boundary = selected_superblock.wal_recovery_boundary;
330        Ok(EmbeddedRdbOpen {
331            path: path.to_path_buf(),
332            selected_superblock,
333            manifest,
334        })
335    }
336
337    pub fn read_snapshot(open: &EmbeddedRdbOpen) -> RdbFileResult<Option<Vec<u8>>> {
338        if open.manifest.snapshot_bytes == 0 {
339            return Ok(None);
340        }
341        let mut file = File::open(&open.path)?;
342        let mut bytes = vec![0u8; open.manifest.snapshot_bytes as usize];
343        file.seek(SeekFrom::Start(open.manifest.snapshot_offset))?;
344        file.read_exact(&mut bytes)?;
345        let checksum = crc32(&bytes);
346        if checksum != open.manifest.snapshot_checksum {
347            return Err(RdbFileError::InvalidOperation(format!(
348                "embedded snapshot checksum mismatch: stored {:#010x}, computed {:#010x}",
349                open.manifest.snapshot_checksum, checksum
350            )));
351        }
352        if bytes.len() >= SNAPSHOT_MAGIC.len() && &bytes[..SNAPSHOT_MAGIC.len()] != SNAPSHOT_MAGIC {
353            return Err(RdbFileError::InvalidOperation(
354                "invalid embedded snapshot magic".into(),
355            ));
356        }
357        Ok(Some(bytes))
358    }
359
360    pub fn write_snapshot(
361        path: impl AsRef<Path>,
362        snapshot: &[u8],
363    ) -> RdbFileResult<EmbeddedRdbOpen> {
364        Self::write_snapshot_with_wal_capacity(path, snapshot, 0)
365    }
366
367    pub fn read_wal_payloads(open: &EmbeddedRdbOpen) -> RdbFileResult<Vec<Vec<u8>>> {
368        Ok(scan_wal(open)?.payloads)
369    }
370
371    pub fn append_wal_payloads(
372        path: impl AsRef<Path>,
373        payloads: &[Vec<u8>],
374    ) -> RdbFileResult<EmbeddedRdbOpen> {
375        let path = path.as_ref();
376        if payloads.is_empty() {
377            return Self::open(path);
378        }
379
380        let path_lock = embedded_path_lock(path);
381        let _path_guard = path_lock
382            .lock()
383            .unwrap_or_else(|poisoned| poisoned.into_inner());
384        let lock_file = OpenOptions::new().read(true).write(true).open(path)?;
385        lock_file.lock_exclusive()?;
386
387        let open = Self::open_for_wal_append(path)?;
388        let wal_scan = scan_wal(&open)?;
389        let mut sequence = wal_scan.next_sequence;
390        let mut previous_frame_crc = wal_scan.previous_frame_crc;
391        let mut encoded = Vec::new();
392        for payload in payloads {
393            let (frame, frame_crc) = encode_wal_frame(sequence, previous_frame_crc, payload)?;
394            encoded.extend_from_slice(&frame);
395            previous_frame_crc = frame_crc;
396            sequence = sequence.saturating_add(1);
397        }
398
399        let wal_start = open.manifest.wal_region_offset;
400        let wal_end = wal_start.checked_add(wal_scan.valid_bytes).ok_or_else(|| {
401            RdbFileError::InvalidOperation("embedded wal boundary overflow".into())
402        })?;
403        let max_end = open
404            .manifest
405            .wal_region_offset
406            .saturating_add(open.manifest.wal_region_bytes);
407        let next_boundary = wal_end.checked_add(encoded.len() as u64).ok_or_else(|| {
408            RdbFileError::InvalidOperation("embedded wal boundary overflow".into())
409        })?;
410        if wal_end < wal_start || next_boundary > max_end {
411            return Err(RdbFileError::InvalidOperation(
412                "embedded wal region full".into(),
413            ));
414        }
415
416        let mut file = OpenOptions::new().read(true).write(true).open(path)?;
417        write_at(&mut file, wal_end, &encoded)?;
418        crash_inject("wal_after_frame_write");
419        file.sync_data()?;
420        crash_inject("wal_after_frame_sync");
421
422        let next_copy_index = if open.selected_superblock.copy_index == 0 {
423            1
424        } else {
425            0
426        };
427        let next_superblock = EmbeddedRdbSuperblock {
428            copy_index: next_copy_index,
429            generation: open.selected_superblock.generation.saturating_add(1),
430            wal_recovery_boundary: next_boundary,
431            checksum: 0,
432            ..open.selected_superblock
433        };
434        Self::write_superblock_copy(&mut file, &next_superblock)?;
435        crash_inject("wal_after_superblock_write");
436        file.sync_all()?;
437        lock_file.unlock()?;
438        Self::open(path)
439    }
440
441    pub fn write_superblock_copy(
442        file: &mut File,
443        superblock: &EmbeddedRdbSuperblock,
444    ) -> RdbFileResult<()> {
445        let offset = superblock_offset(superblock.copy_index)?;
446        write_at(file, offset, &encode_superblock(*superblock)?)?;
447        Ok(())
448    }
449}
450
451fn read_superblock_copy(file: &mut File, copy_index: u8) -> Option<EmbeddedRdbSuperblock> {
452    let offset = superblock_offset(copy_index).ok()?;
453    let mut bytes = vec![0u8; EMBEDDED_RDB_SUPERBLOCK_SIZE as usize];
454    file.seek(SeekFrom::Start(offset)).ok()?;
455    file.read_exact(&mut bytes).ok()?;
456    decode_superblock(copy_index, &bytes).ok()
457}
458
459fn read_manifest(
460    file: &mut File,
461    superblock: EmbeddedRdbSuperblock,
462) -> RdbFileResult<EmbeddedRdbManifest> {
463    if superblock.manifest_len < CHECKSUM_LEN as u64
464        || superblock.manifest_len > MANIFEST_REGION_BYTES
465    {
466        return Err(RdbFileError::InvalidOperation(format!(
467            "invalid embedded manifest length {}",
468            superblock.manifest_len
469        )));
470    }
471
472    let mut bytes = vec![0u8; superblock.manifest_len as usize];
473    file.seek(SeekFrom::Start(superblock.manifest_offset))?;
474    file.read_exact(&mut bytes)?;
475    let checksum = trailer_checksum(&bytes);
476    if checksum != superblock.manifest_checksum {
477        return Err(RdbFileError::InvalidOperation(format!(
478            "embedded manifest checksum mismatch: stored {:#010x}, computed {:#010x}",
479            superblock.manifest_checksum, checksum
480        )));
481    }
482    decode_manifest(&bytes)
483}
484
485fn snapshot_reference_valid(
486    file: &mut File,
487    manifest: &EmbeddedRdbManifest,
488) -> RdbFileResult<bool> {
489    if manifest.snapshot_bytes == 0 {
490        return Ok(true);
491    }
492    let snapshot_end = manifest
493        .snapshot_offset
494        .checked_add(manifest.snapshot_bytes)
495        .ok_or_else(|| RdbFileError::InvalidOperation("embedded snapshot end overflow".into()))?;
496    if snapshot_end > file.metadata()?.len() {
497        return Ok(false);
498    }
499
500    let mut bytes = vec![0u8; manifest.snapshot_bytes as usize];
501    file.seek(SeekFrom::Start(manifest.snapshot_offset))?;
502    if file.read_exact(&mut bytes).is_err() {
503        return Ok(false);
504    }
505    if crc32(&bytes) != manifest.snapshot_checksum {
506        return Ok(false);
507    }
508    if bytes.len() >= SNAPSHOT_MAGIC.len() && &bytes[..SNAPSHOT_MAGIC.len()] != SNAPSHOT_MAGIC {
509        return Ok(false);
510    }
511    Ok(true)
512}
513
514fn manifest_from_superblock(superblock: EmbeddedRdbSuperblock) -> EmbeddedRdbManifest {
515    EmbeddedRdbManifest {
516        version: MANIFEST_VERSION,
517        wal_region_offset: superblock.wal_region_offset,
518        wal_region_bytes: superblock.wal_region_bytes,
519        wal_recovery_boundary: superblock.wal_recovery_boundary,
520        snapshot_offset: superblock.snapshot_offset,
521        snapshot_bytes: superblock.snapshot_bytes,
522        snapshot_checksum: superblock.snapshot_checksum,
523        created_at_unix_ms: 0,
524        checksum: 0,
525    }
526}
527
528fn grow_wal_region_bytes(current: u64, min_required: u64) -> RdbFileResult<u64> {
529    let mut next = current.max(WAL_REGION_BYTES);
530    while next < min_required {
531        next = next.checked_mul(2).ok_or_else(|| {
532            RdbFileError::InvalidOperation("embedded wal region size overflow".into())
533        })?;
534    }
535    Ok(next)
536}
537
538fn embedded_path_lock(path: &Path) -> Arc<Mutex<()>> {
539    static LOCKS: OnceLock<Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
540    let key = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
541    let mut locks = LOCKS
542        .get_or_init(|| Mutex::new(HashMap::new()))
543        .lock()
544        .unwrap_or_else(|poisoned| poisoned.into_inner());
545    locks
546        .entry(key)
547        .or_insert_with(|| Arc::new(Mutex::new(())))
548        .clone()
549}
550
551fn next_snapshot_offset(
552    path: &Path,
553    open: &EmbeddedRdbOpen,
554    wal_region_bytes: u64,
555    snapshot: &[u8],
556) -> RdbFileResult<u64> {
557    let base = open
558        .manifest
559        .wal_region_offset
560        .checked_add(wal_region_bytes)
561        .ok_or_else(|| {
562            RdbFileError::InvalidOperation("embedded snapshot offset overflow".into())
563        })?;
564    if open.manifest.snapshot_bytes == 0 && snapshot.is_empty() {
565        return Ok(base);
566    }
567
568    let file_len = std::fs::metadata(path).map(|metadata| metadata.len())?;
569    let active_snapshot_end = open
570        .manifest
571        .snapshot_offset
572        .checked_add(open.manifest.snapshot_bytes)
573        .ok_or_else(|| RdbFileError::InvalidOperation("embedded snapshot end overflow".into()))?;
574    align_up(
575        file_len.max(active_snapshot_end).max(base),
576        SNAPSHOT_ALIGNMENT,
577    )
578}
579
580fn align_up(value: u64, alignment: u64) -> RdbFileResult<u64> {
581    if alignment == 0 {
582        return Ok(value);
583    }
584    let remainder = value % alignment;
585    if remainder == 0 {
586        return Ok(value);
587    }
588    value
589        .checked_add(alignment - remainder)
590        .ok_or_else(|| RdbFileError::InvalidOperation("embedded alignment overflow".into()))
591}
592
593fn scan_wal(open: &EmbeddedRdbOpen) -> RdbFileResult<WalScan> {
594    let wal_start = open.manifest.wal_region_offset;
595    let wal_end = open.manifest.wal_recovery_boundary;
596    let max_end = open
597        .manifest
598        .wal_region_offset
599        .saturating_add(open.manifest.wal_region_bytes);
600    if wal_end < wal_start || wal_end > max_end {
601        return Err(RdbFileError::InvalidOperation(format!(
602            "invalid embedded wal boundary {wal_end}"
603        )));
604    }
605    if wal_end == wal_start {
606        return Ok(WalScan {
607            next_sequence: 1,
608            ..WalScan::default()
609        });
610    }
611
612    let mut file = File::open(&open.path)?;
613    let file_len = file.metadata()?.len();
614    if file_len <= wal_start {
615        return Ok(WalScan {
616            next_sequence: 1,
617            ..WalScan::default()
618        });
619    }
620    let read_end = wal_end.min(file_len);
621    file.seek(SeekFrom::Start(wal_start))?;
622    let mut bytes = vec![0u8; (read_end - wal_start) as usize];
623    file.read_exact(&mut bytes)?;
624    Ok(scan_wal_bytes(&bytes))
625}
626
627fn scan_wal_bytes(bytes: &[u8]) -> WalScan {
628    let mut scan = WalScan {
629        next_sequence: 1,
630        ..WalScan::default()
631    };
632    let mut cursor = 0usize;
633    while cursor < bytes.len() {
634        let Some(frame) = decode_next_wal_frame(bytes, cursor, &scan) else {
635            break;
636        };
637        scan.payloads.push(frame.payload);
638        scan.next_sequence = scan.next_sequence.saturating_add(1);
639        scan.previous_frame_crc = frame.frame_crc;
640        cursor = frame.end;
641        scan.valid_bytes = cursor as u64;
642    }
643    scan
644}
645
646struct DecodedWalFrame {
647    payload: Vec<u8>,
648    frame_crc: u32,
649    end: usize,
650}
651
652fn decode_next_wal_frame(bytes: &[u8], start: usize, scan: &WalScan) -> Option<DecodedWalFrame> {
653    let remaining = bytes.len().checked_sub(start)?;
654    if remaining < WAL_FRAME_MAGIC.len() {
655        return None;
656    }
657    if &bytes[start..start + WAL_FRAME_MAGIC.len()] != WAL_FRAME_MAGIC {
658        return None;
659    }
660    if remaining < WAL_FRAME_MAGIC.len() + 2 {
661        return None;
662    }
663    let version_offset = start + WAL_FRAME_MAGIC.len();
664    let version = u16::from_le_bytes(bytes[version_offset..version_offset + 2].try_into().ok()?);
665    if version == WAL_FRAME_VERSION {
666        decode_v2_wal_frame(bytes, start, scan)
667    } else {
668        decode_legacy_wal_frame(bytes, start)
669    }
670}
671
672fn decode_v2_wal_frame(bytes: &[u8], start: usize, scan: &WalScan) -> Option<DecodedWalFrame> {
673    if bytes.len().checked_sub(start)? < WAL_FRAME_HEADER_BYTES {
674        return None;
675    }
676    let header_len_offset = start + 10;
677    let header_len = u16::from_le_bytes(
678        bytes[header_len_offset..header_len_offset + 2]
679            .try_into()
680            .ok()?,
681    ) as usize;
682    if header_len != WAL_FRAME_HEADER_BYTES {
683        return None;
684    }
685    let sequence_offset = start + 12;
686    let sequence = u64::from_le_bytes(
687        bytes[sequence_offset..sequence_offset + 8]
688            .try_into()
689            .ok()?,
690    );
691    if sequence != scan.next_sequence {
692        return None;
693    }
694    let payload_len_offset = start + 20;
695    let payload_len = u32::from_le_bytes(
696        bytes[payload_len_offset..payload_len_offset + 4]
697            .try_into()
698            .ok()?,
699    ) as usize;
700    let payload_crc_offset = start + 24;
701    let payload_crc = u32::from_le_bytes(
702        bytes[payload_crc_offset..payload_crc_offset + 4]
703            .try_into()
704            .ok()?,
705    );
706    let previous_frame_crc_offset = start + 28;
707    let previous_frame_crc = u32::from_le_bytes(
708        bytes[previous_frame_crc_offset..previous_frame_crc_offset + 4]
709            .try_into()
710            .ok()?,
711    );
712    if previous_frame_crc != scan.previous_frame_crc {
713        return None;
714    }
715    let header_crc_offset = start + 32;
716    let header_crc = u32::from_le_bytes(
717        bytes[header_crc_offset..header_crc_offset + 4]
718            .try_into()
719            .ok()?,
720    );
721    if header_crc != crc32(&bytes[start..header_crc_offset]) {
722        return None;
723    }
724    let payload_start = start.checked_add(header_len)?;
725    let end = payload_start.checked_add(payload_len)?;
726    if end > bytes.len() {
727        return None;
728    }
729    let payload = bytes[payload_start..end].to_vec();
730    if crc32(&payload) != payload_crc {
731        return None;
732    }
733    Some(DecodedWalFrame {
734        payload,
735        frame_crc: crc32(&bytes[start..end]),
736        end,
737    })
738}
739
740fn decode_legacy_wal_frame(bytes: &[u8], start: usize) -> Option<DecodedWalFrame> {
741    if bytes.len().checked_sub(start)? < LEGACY_WAL_FRAME_HEADER_BYTES {
742        return None;
743    }
744    let payload_len_offset = start + WAL_FRAME_MAGIC.len();
745    let payload_len = u32::from_le_bytes(
746        bytes[payload_len_offset..payload_len_offset + 4]
747            .try_into()
748            .ok()?,
749    ) as usize;
750    let payload_crc_offset = payload_len_offset + 4;
751    let payload_crc = u32::from_le_bytes(
752        bytes[payload_crc_offset..payload_crc_offset + 4]
753            .try_into()
754            .ok()?,
755    );
756    let payload_start = start.checked_add(LEGACY_WAL_FRAME_HEADER_BYTES)?;
757    let end = payload_start.checked_add(payload_len)?;
758    if end > bytes.len() {
759        return None;
760    }
761    let payload = bytes[payload_start..end].to_vec();
762    if crc32(&payload) != payload_crc {
763        return None;
764    }
765    Some(DecodedWalFrame {
766        payload,
767        frame_crc: crc32(&bytes[start..end]),
768        end,
769    })
770}
771
772fn encode_wal_frame(
773    sequence: u64,
774    previous_frame_crc: u32,
775    payload: &[u8],
776) -> RdbFileResult<(Vec<u8>, u32)> {
777    let payload_len = u32::try_from(payload.len())
778        .map_err(|_| RdbFileError::InvalidOperation("embedded wal payload too large".into()))?;
779    let mut frame = Vec::with_capacity(WAL_FRAME_HEADER_BYTES + payload.len());
780    frame.extend_from_slice(WAL_FRAME_MAGIC);
781    frame.extend_from_slice(&WAL_FRAME_VERSION.to_le_bytes());
782    frame.extend_from_slice(&(WAL_FRAME_HEADER_BYTES as u16).to_le_bytes());
783    frame.extend_from_slice(&sequence.to_le_bytes());
784    frame.extend_from_slice(&payload_len.to_le_bytes());
785    frame.extend_from_slice(&crc32(payload).to_le_bytes());
786    frame.extend_from_slice(&previous_frame_crc.to_le_bytes());
787    let header_crc = crc32(&frame);
788    frame.extend_from_slice(&header_crc.to_le_bytes());
789    frame.extend_from_slice(payload);
790    let frame_crc = crc32(&frame);
791    Ok((frame, frame_crc))
792}
793
794fn encode_superblock(superblock: EmbeddedRdbSuperblock) -> RdbFileResult<Vec<u8>> {
795    let mut bytes = vec![0u8; EMBEDDED_RDB_SUPERBLOCK_SIZE as usize];
796    let mut cursor = 0usize;
797    put_bytes(&mut bytes, &mut cursor, SUPERBLOCK_MAGIC);
798    put_u32(&mut bytes, &mut cursor, SUPERBLOCK_VERSION);
799    put_u8(&mut bytes, &mut cursor, superblock.copy_index);
800    put_u64(&mut bytes, &mut cursor, superblock.generation);
801    put_u32(&mut bytes, &mut cursor, superblock.format_version);
802    put_u64(&mut bytes, &mut cursor, superblock.manifest_offset);
803    put_u64(&mut bytes, &mut cursor, superblock.manifest_len);
804    put_u32(&mut bytes, &mut cursor, superblock.manifest_checksum);
805    put_u64(&mut bytes, &mut cursor, superblock.wal_region_offset);
806    put_u64(&mut bytes, &mut cursor, superblock.wal_region_bytes);
807    put_u64(&mut bytes, &mut cursor, superblock.wal_recovery_boundary);
808    put_u64(&mut bytes, &mut cursor, superblock.snapshot_offset);
809    put_u64(&mut bytes, &mut cursor, superblock.snapshot_bytes);
810    put_u32(&mut bytes, &mut cursor, superblock.snapshot_checksum);
811
812    let checksum_offset = bytes.len() - CHECKSUM_LEN;
813    let checksum = crc32(&bytes[..checksum_offset]);
814    bytes[checksum_offset..].copy_from_slice(&checksum.to_le_bytes());
815    Ok(bytes)
816}
817
818fn decode_superblock(copy_index: u8, bytes: &[u8]) -> RdbFileResult<EmbeddedRdbSuperblock> {
819    if bytes.len() != EMBEDDED_RDB_SUPERBLOCK_SIZE as usize {
820        return Err(RdbFileError::InvalidOperation(
821            "invalid embedded superblock size".into(),
822        ));
823    }
824    let checksum_offset = bytes.len() - CHECKSUM_LEN;
825    let stored_checksum = u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap());
826    let computed_checksum = crc32(&bytes[..checksum_offset]);
827    if stored_checksum != computed_checksum {
828        return Err(RdbFileError::InvalidOperation(
829            "embedded superblock checksum mismatch".into(),
830        ));
831    }
832
833    let mut cursor = 0usize;
834    if take_bytes(bytes, &mut cursor, SUPERBLOCK_MAGIC.len())? != SUPERBLOCK_MAGIC {
835        return Err(RdbFileError::InvalidOperation(
836            "invalid embedded superblock magic".into(),
837        ));
838    }
839    let version = take_u32(bytes, &mut cursor)?;
840    if version != SUPERBLOCK_VERSION {
841        return Err(RdbFileError::InvalidOperation(format!(
842            "unsupported embedded superblock version {version}"
843        )));
844    }
845    let stored_copy_index = take_u8(bytes, &mut cursor)?;
846    if stored_copy_index != copy_index {
847        return Err(RdbFileError::InvalidOperation(
848            "embedded superblock copy index mismatch".into(),
849        ));
850    }
851
852    Ok(EmbeddedRdbSuperblock {
853        copy_index: stored_copy_index,
854        generation: take_u64(bytes, &mut cursor)?,
855        format_version: take_u32(bytes, &mut cursor)?,
856        manifest_offset: take_u64(bytes, &mut cursor)?,
857        manifest_len: take_u64(bytes, &mut cursor)?,
858        manifest_checksum: take_u32(bytes, &mut cursor)?,
859        wal_region_offset: take_u64(bytes, &mut cursor)?,
860        wal_region_bytes: take_u64(bytes, &mut cursor)?,
861        wal_recovery_boundary: take_u64(bytes, &mut cursor)?,
862        snapshot_offset: take_u64(bytes, &mut cursor)?,
863        snapshot_bytes: take_u64(bytes, &mut cursor)?,
864        snapshot_checksum: take_u32(bytes, &mut cursor)?,
865        checksum: stored_checksum,
866    })
867}
868
869fn encode_manifest(manifest: EmbeddedRdbManifest) -> Vec<u8> {
870    let mut bytes = vec![0u8; 8 + 4 + 8 + 8 + 8 + 8 + 8 + 4 + 16 + CHECKSUM_LEN];
871    let mut cursor = 0usize;
872    put_bytes(&mut bytes, &mut cursor, MANIFEST_MAGIC);
873    put_u32(&mut bytes, &mut cursor, manifest.version);
874    put_u64(&mut bytes, &mut cursor, manifest.wal_region_offset);
875    put_u64(&mut bytes, &mut cursor, manifest.wal_region_bytes);
876    put_u64(&mut bytes, &mut cursor, manifest.wal_recovery_boundary);
877    put_u64(&mut bytes, &mut cursor, manifest.snapshot_offset);
878    put_u64(&mut bytes, &mut cursor, manifest.snapshot_bytes);
879    put_u32(&mut bytes, &mut cursor, manifest.snapshot_checksum);
880    put_u128(&mut bytes, &mut cursor, manifest.created_at_unix_ms);
881
882    let checksum_offset = bytes.len() - CHECKSUM_LEN;
883    let checksum = crc32(&bytes[..checksum_offset]);
884    bytes[checksum_offset..].copy_from_slice(&checksum.to_le_bytes());
885    bytes
886}
887
888fn decode_manifest(bytes: &[u8]) -> RdbFileResult<EmbeddedRdbManifest> {
889    let checksum_offset = bytes
890        .len()
891        .checked_sub(CHECKSUM_LEN)
892        .ok_or_else(|| RdbFileError::InvalidOperation("embedded manifest too short".into()))?;
893    let stored_checksum = u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap());
894    let computed_checksum = crc32(&bytes[..checksum_offset]);
895    if stored_checksum != computed_checksum {
896        return Err(RdbFileError::InvalidOperation(
897            "embedded manifest checksum mismatch".into(),
898        ));
899    }
900
901    let mut cursor = 0usize;
902    if take_bytes(bytes, &mut cursor, MANIFEST_MAGIC.len())? != MANIFEST_MAGIC {
903        return Err(RdbFileError::InvalidOperation(
904            "invalid embedded manifest magic".into(),
905        ));
906    }
907    let version = take_u32(bytes, &mut cursor)?;
908    if version != MANIFEST_VERSION {
909        return Err(RdbFileError::InvalidOperation(format!(
910            "unsupported embedded manifest version {version}"
911        )));
912    }
913    Ok(EmbeddedRdbManifest {
914        version,
915        wal_region_offset: take_u64(bytes, &mut cursor)?,
916        wal_region_bytes: take_u64(bytes, &mut cursor)?,
917        wal_recovery_boundary: take_u64(bytes, &mut cursor)?,
918        snapshot_offset: take_u64(bytes, &mut cursor)?,
919        snapshot_bytes: take_u64(bytes, &mut cursor)?,
920        snapshot_checksum: take_u32(bytes, &mut cursor)?,
921        created_at_unix_ms: take_u128(bytes, &mut cursor)?,
922        checksum: stored_checksum,
923    })
924}
925
926fn trailer_checksum(bytes: &[u8]) -> u32 {
927    let checksum_offset = bytes.len() - CHECKSUM_LEN;
928    u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap())
929}
930
931fn superblock_offset(copy_index: u8) -> RdbFileResult<u64> {
932    match copy_index {
933        0 => Ok(EMBEDDED_RDB_SUPERBLOCK_0_OFFSET),
934        1 => Ok(EMBEDDED_RDB_SUPERBLOCK_1_OFFSET),
935        _ => Err(RdbFileError::InvalidOperation(format!(
936            "invalid embedded superblock copy index {copy_index}"
937        ))),
938    }
939}
940
941fn write_at(file: &mut File, offset: u64, bytes: &[u8]) -> RdbFileResult<()> {
942    file.seek(SeekFrom::Start(offset))?;
943    file.write_all(bytes)?;
944    Ok(())
945}
946
947fn crash_inject(point: &str) {
948    if std::env::var(CRASH_INJECT_ENV).ok().as_deref() == Some(point) {
949        std::process::exit(173);
950    }
951}
952
953fn now_unix_ms() -> u128 {
954    SystemTime::now()
955        .duration_since(UNIX_EPOCH)
956        .map(|duration| duration.as_millis())
957        .unwrap_or(0)
958}
959
960fn put_bytes(target: &mut [u8], cursor: &mut usize, value: &[u8]) {
961    target[*cursor..*cursor + value.len()].copy_from_slice(value);
962    *cursor += value.len();
963}
964
965fn put_u8(target: &mut [u8], cursor: &mut usize, value: u8) {
966    target[*cursor] = value;
967    *cursor += 1;
968}
969
970fn put_u32(target: &mut [u8], cursor: &mut usize, value: u32) {
971    put_bytes(target, cursor, &value.to_le_bytes());
972}
973
974fn put_u64(target: &mut [u8], cursor: &mut usize, value: u64) {
975    put_bytes(target, cursor, &value.to_le_bytes());
976}
977
978fn put_u128(target: &mut [u8], cursor: &mut usize, value: u128) {
979    put_bytes(target, cursor, &value.to_le_bytes());
980}
981
982fn take_bytes<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> RdbFileResult<&'a [u8]> {
983    let end = cursor.checked_add(len).ok_or_else(|| {
984        RdbFileError::InvalidOperation("embedded artifact cursor overflow".into())
985    })?;
986    if end > bytes.len() {
987        return Err(RdbFileError::InvalidOperation(
988            "embedded artifact truncated".into(),
989        ));
990    }
991    let value = &bytes[*cursor..end];
992    *cursor = end;
993    Ok(value)
994}
995
996fn take_u8(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u8> {
997    Ok(take_bytes(bytes, cursor, 1)?[0])
998}
999
1000fn take_u32(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u32> {
1001    Ok(u32::from_le_bytes(
1002        take_bytes(bytes, cursor, 4)?.try_into().unwrap(),
1003    ))
1004}
1005
1006fn take_u64(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u64> {
1007    Ok(u64::from_le_bytes(
1008        take_bytes(bytes, cursor, 8)?.try_into().unwrap(),
1009    ))
1010}
1011
1012fn take_u128(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u128> {
1013    Ok(u128::from_le_bytes(
1014        take_bytes(bytes, cursor, 16)?.try_into().unwrap(),
1015    ))
1016}