Skip to main content

fsqlite_wal/
checkpoint_executor.rs

1//! WAL checkpoint execution engine.
2//!
3//! Bridges the deterministic checkpoint planner ([`plan_checkpoint`]) with
4//! WAL file I/O ([`WalFile`]) to backfill frames into the database.
5//!
6//! The split is intentional:
7//! - `checkpoint.rs` is pure, deterministic planning (no I/O).
8//! - This module performs the actual reads from `WalFile` and writes
9//!   through [`CheckpointTarget`].
10//!
11//! [`CheckpointTarget`] mirrors `CheckpointPageWriter` from `fsqlite-pager`
12//! but is defined here to avoid a circular crate dependency.  Higher layers
13//! (`fsqlite-core`) provide an adapter bridging the two at runtime.
14
15use fsqlite_error::{FrankenError, Result};
16use fsqlite_types::PageNumber;
17use fsqlite_types::cx::Cx;
18use fsqlite_vfs::VfsFile;
19use tracing::{debug, info};
20
21use crate::checkpoint::{
22    CheckpointMode, CheckpointPlan, CheckpointPostAction, CheckpointProgress, CheckpointState,
23    plan_checkpoint,
24};
25use crate::checksum::{WAL_FRAME_HEADER_SIZE, WalSalts};
26use crate::recovery_fence::{CheckpointChecksumVerdict, ExpectedPageChecksum};
27use crate::wal::WalFile;
28
29// ---------------------------------------------------------------------------
30// CheckpointTarget trait
31// ---------------------------------------------------------------------------
32
33/// Write-back interface for checkpoint page transfers.
34///
35/// Implementors push WAL frame content into the main database file.
36/// This trait is intentionally **not** sealed so that `fsqlite-core` can
37/// provide the concrete adapter at runtime.
38pub trait CheckpointTarget {
39    /// Write `data` for `page_no` directly to the database file.
40    fn write_page(&mut self, cx: &Cx, page_no: PageNumber, data: &[u8]) -> Result<()>;
41
42    /// Truncate the database file to exactly `n_pages` pages.
43    fn truncate_db(&mut self, cx: &Cx, n_pages: u32) -> Result<()>;
44
45    /// Sync the database file to stable storage.
46    fn sync_db(&mut self, cx: &Cx) -> Result<()>;
47
48    /// Read back a page's current on-disk content, if the target supports
49    /// it.  Used by the post-checkpoint checksum verification path
50    /// (bd-yfdb6) to confirm that the DB file matches the expected state
51    /// before the WAL is truncated.
52    ///
53    /// The default implementation returns `None`, which skips verification.
54    /// Concrete `CheckpointTarget`s that back a real VFS file should
55    /// override this with the equivalent of `vfs.read(db_fd, buf, offset)`.
56    fn read_page_if_supported(
57        &self,
58        _cx: &Cx,
59        _page_no: PageNumber,
60        _buf: &mut [u8],
61    ) -> Result<Option<usize>> {
62        Ok(None)
63    }
64}
65
66// ---------------------------------------------------------------------------
67// Execution result
68// ---------------------------------------------------------------------------
69
70/// Summary of a completed checkpoint execution.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct CheckpointExecutionResult {
73    /// The plan that was executed.
74    pub plan: CheckpointPlan,
75    /// Number of frames actually backfilled to the database.
76    pub frames_backfilled: u32,
77    /// Database size in pages reported by the last commit frame, if any.
78    pub db_size_pages: Option<u32>,
79    /// Whether the WAL was reset after backfill.
80    pub wal_was_reset: bool,
81}
82
83// ---------------------------------------------------------------------------
84// Execution entry point
85// ---------------------------------------------------------------------------
86
87/// Execute a WAL checkpoint.
88///
89/// 1. Computes a [`CheckpointPlan`] from `mode` and `state`.
90/// 2. Reads `frames_to_backfill` frames from `wal` starting at
91///    `state.backfilled_frames`.
92/// 3. Writes each frame's page data through `target`.
93/// 4. Syncs the database.
94/// 5. Optionally resets / truncates the WAL per the plan's post-action.
95///
96/// # Errors
97///
98/// Propagates any I/O error from `WalFile`, `CheckpointTarget`, or VFS.
99#[allow(clippy::too_many_lines)]
100pub fn execute_checkpoint<F: VfsFile>(
101    cx: &Cx,
102    wal: &mut WalFile<F>,
103    mode: CheckpointMode,
104    state: CheckpointState,
105    target: &mut impl CheckpointTarget,
106) -> Result<CheckpointExecutionResult> {
107    let checkpoint_start = fsqlite_types::sync_primitives::Instant::now();
108    let plan = plan_checkpoint(mode, state);
109    let normalized = state.normalized();
110
111    info!(
112        mode = ?plan.mode,
113        frames_to_backfill = plan.frames_to_backfill,
114        progress = ?plan.progress,
115        blocked_by_readers = plan.blocked_by_readers,
116        post_action = ?plan.post_action,
117        "checkpoint plan computed"
118    );
119
120    let mut frames_backfilled: u32 = 0;
121    let mut last_db_size: Option<u32> = None;
122    // bd-yfdb6: accumulate expected post-checkpoint page checksums so the
123    // truncate path can verify the DB state before discarding WAL frames.
124    let mut expected_checksums: Vec<ExpectedPageChecksum> = Vec::new();
125    if plan.frames_to_backfill > 0 {
126        // Backfill frames [backfilled_frames .. backfilled_frames + frames_to_backfill).
127        let start = usize::try_from(normalized.backfilled_frames).unwrap_or(usize::MAX);
128        let count = usize::try_from(plan.frames_to_backfill).unwrap_or(usize::MAX);
129        let end = start.saturating_add(count).min(wal.frame_count());
130
131        let mut latest_frames: std::collections::HashMap<PageNumber, usize> =
132            std::collections::HashMap::new();
133
134        // Pass 1: Find the latest frame index for each page in the checkpoint range.
135        for frame_idx in start..end {
136            let header = wal.read_frame_header(cx, frame_idx)?;
137
138            let page_no =
139                PageNumber::new(header.page_number).ok_or_else(|| FrankenError::OutOfRange {
140                    what: "checkpoint frame page number".to_owned(),
141                    value: header.page_number.to_string(),
142                })?;
143
144            latest_frames.insert(page_no, frame_idx);
145            frames_backfilled += 1;
146
147            if header.is_commit() && header.db_size > 0 {
148                last_db_size = Some(header.db_size);
149            }
150        }
151
152        // Pass 2: Write deduplicated pages in sorted order to minimize disk seeks.
153        let mut sorted_pages: Vec<(PageNumber, usize)> = latest_frames.into_iter().collect();
154        sorted_pages.sort_unstable_by_key(|(p, _)| p.get());
155
156        let mut frame_buf = vec![0u8; wal.frame_size()];
157        for (fault_page_idx, (page_no, frame_idx)) in sorted_pages.iter().enumerate() {
158            #[cfg(not(any(test, feature = "fault-injection")))]
159            let _ = fault_page_idx;
160            #[cfg(any(test, feature = "fault-injection"))]
161            {
162                if fault_page_idx > 0 {
163                    crate::fault_hooks::maybe_inject_crash_at(
164                        crate::fault_hooks::CrashBoundary::MidCheckpoint,
165                        &format!("page_idx={fault_page_idx} page_no={}", page_no.get()),
166                    )?;
167                }
168            }
169
170            wal.read_frame_into(cx, *frame_idx, &mut frame_buf)?;
171            let page_data = &frame_buf[WAL_FRAME_HEADER_SIZE..];
172            target.write_page(cx, *page_no, page_data)?;
173
174            // bd-yfdb6: capture the expected post-checkpoint checksum so the
175            // truncate path can verify disk state before discarding WAL
176            // frames. Page checksums live in the trailer; if the trailer
177            // length is too small (tests with tiny pages), we skip the
178            // capture silently — this is additive insurance, not a hard
179            // invariant.
180            if page_data.len() >= crate::checksum::PAGE_CHECKSUM_RESERVED_BYTES {
181                if let Ok(checksum) = crate::checksum::read_page_checksum(page_data) {
182                    expected_checksums.push(ExpectedPageChecksum {
183                        page: *page_no,
184                        checksum,
185                    });
186                }
187            }
188
189            debug!(
190                frame_idx = *frame_idx,
191                page_number = page_no.get(),
192                "checkpoint: page backfilled"
193            );
194        }
195
196        // Sync database after all frame writes.
197        target.sync_db(cx)?;
198
199        // If the checkpoint completed fully, truncate the database to the last
200        // committed size.
201        if matches!(plan.progress, CheckpointProgress::Complete) {
202            if let Some(db_size) = last_db_size {
203                target.truncate_db(cx, db_size)?;
204                target.sync_db(cx)?;
205            }
206        }
207    }
208
209    // Post-action: reset or truncate WAL. Passes `target` so the
210    // truncate path can issue an explicit fsync(db, FULL) before the
211    // WAL is truncated, and the expected-checksums prefix so the same
212    // path can verify on-disk DB state matches the post-checkpoint
213    // state before truncating (both bd-yfdb6).
214    let wal_was_reset =
215        apply_checkpoint_post_action(cx, wal, plan.post_action, target, &expected_checksums)?;
216
217    let checkpoint_duration_us = crate::metrics::duration_us_saturating(checkpoint_start.elapsed());
218
219    info!(
220        frames_backfilled,
221        wal_was_reset,
222        db_size_pages = ?last_db_size,
223        checkpoint_duration_us,
224        "checkpoint execution complete"
225    );
226
227    crate::metrics::GLOBAL_WAL_METRICS
228        .record_checkpoint(u64::from(frames_backfilled), checkpoint_duration_us);
229
230    #[cfg(any(test, feature = "fault-injection"))]
231    crate::fault_hooks::maybe_inject_crash_at(
232        crate::fault_hooks::CrashBoundary::AfterCheckpoint,
233        &format!("frames_backfilled={frames_backfilled} wal_was_reset={wal_was_reset}"),
234    )?;
235
236    Ok(CheckpointExecutionResult {
237        plan,
238        frames_backfilled,
239        db_size_pages: last_db_size,
240        wal_was_reset,
241    })
242}
243
244fn apply_checkpoint_post_action<F: VfsFile>(
245    cx: &Cx,
246    wal: &mut WalFile<F>,
247    post_action: CheckpointPostAction,
248    target: &mut impl CheckpointTarget,
249    expected_checksums: &[ExpectedPageChecksum],
250) -> Result<bool> {
251    match post_action {
252        CheckpointPostAction::ResetWal | CheckpointPostAction::TruncateWal => {
253            let new_seq = wal.header().checkpoint_seq.wrapping_add(1);
254            let new_salts = WalSalts {
255                salt1: wal.header().salts.salt1.wrapping_add(1),
256                salt2: wal.header().salts.salt2.wrapping_add(1),
257            };
258            let truncate = matches!(post_action, CheckpointPostAction::TruncateWal);
259            if truncate {
260                // bd-yfdb6: enforce fsync(db, FULL) before any WAL
261                // truncate. The earlier backfill loop issues `sync_db`
262                // already, but we re-issue an explicit full sync here to
263                // make the ordering invariant visible at the truncate
264                // call-site and defensive against future changes to the
265                // backfill path. A failure here MUST prevent the truncate;
266                // `?` accomplishes that.
267                crate::recovery_fence::ensure_db_fsync_before_wal_truncate(cx, target)?;
268
269                // bd-yfdb6: if the target supports read-back, verify that
270                // every page we just checkpointed still matches its
271                // expected post-checkpoint checksum on disk. On mismatch,
272                // refuse the truncate — the WAL must stay intact so a retry
273                // can complete the backfill.
274                if !expected_checksums.is_empty() {
275                    let verdict = verify_checkpoint_checksums_via_target(
276                        cx,
277                        target,
278                        wal.page_size(),
279                        expected_checksums,
280                    )?;
281                    if let CheckpointChecksumVerdict::Mismatch { first_bad_page } = verdict {
282                        tracing::error!(
283                            target: "fsqlite.wal.recovery_fence",
284                            first_bad_page = first_bad_page.get(),
285                            "UNRECOVERABLE: post-checkpoint DB/WAL disagreed; refusing truncate"
286                        );
287                        return Err(FrankenError::DatabaseCorrupt {
288                            detail: format!(
289                                "post-checkpoint DB/WAL state disagreed at page {}; WAL truncate refused \
290                                 to preserve committed frames (bd-yfdb6)",
291                                first_bad_page.get()
292                            ),
293                        });
294                    }
295                }
296            }
297            wal.reset(cx, new_seq, new_salts, truncate)?;
298            info!(
299                new_checkpoint_seq = new_seq,
300                action = ?post_action,
301                truncate,
302                "WAL reset after checkpoint"
303            );
304            Ok(true)
305        }
306        CheckpointPostAction::None => Ok(false),
307    }
308}
309
310/// Walk each expected page through the target's optional read-back hook
311/// and compare on-disk trailers. When the target opts out
312/// (`read_page_if_supported` returns `None`), verification is silently
313/// skipped — which matches the audit-requested behaviour of "additive
314/// insurance, not a hard invariant" for targets without a read path.
315fn verify_checkpoint_checksums_via_target(
316    cx: &Cx,
317    target: &impl CheckpointTarget,
318    page_size: usize,
319    expected: &[ExpectedPageChecksum],
320) -> Result<CheckpointChecksumVerdict> {
321    let mut buf = vec![0u8; page_size];
322    for exp in expected {
323        let maybe_read = target.read_page_if_supported(cx, exp.page, &mut buf)?;
324        let Some(n) = maybe_read else {
325            // Target does not support read-back; abort further checks.
326            return Ok(CheckpointChecksumVerdict::Match);
327        };
328        if n < page_size {
329            return Ok(CheckpointChecksumVerdict::Mismatch {
330                first_bad_page: exp.page,
331            });
332        }
333        let observed = crate::checksum::read_page_checksum(&buf)?;
334        if observed != exp.checksum {
335            return Ok(CheckpointChecksumVerdict::Mismatch {
336                first_bad_page: exp.page,
337            });
338        }
339    }
340    Ok(CheckpointChecksumVerdict::Match)
341}
342
343// ===========================================================================
344// Tests
345// ===========================================================================
346
347#[cfg(test)]
348mod tests {
349    use fsqlite_types::flags::VfsOpenFlags;
350    use fsqlite_vfs::MemoryVfs;
351    use fsqlite_vfs::traits::Vfs;
352
353    use super::*;
354
355    const PAGE_SIZE: u32 = 4096;
356
357    fn test_cx() -> Cx {
358        Cx::default()
359    }
360
361    fn test_salts() -> WalSalts {
362        WalSalts {
363            salt1: 0xDEAD_BEEF,
364            salt2: 0xCAFE_BABE,
365        }
366    }
367
368    fn sample_page(seed: u8) -> Vec<u8> {
369        let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
370        let mut page = vec![0u8; page_size];
371        for (i, byte) in page.iter_mut().enumerate() {
372            let reduced = u8::try_from(i % 251).expect("modulo fits u8");
373            *byte = reduced ^ seed;
374        }
375        page
376    }
377
378    fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
379        let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
380        let (file, _) = vfs
381            .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
382            .expect("open WAL file");
383        file
384    }
385
386    /// Test target that records written pages.
387    struct RecordingTarget {
388        pages: Vec<(PageNumber, Vec<u8>)>,
389        truncate_to: Option<u32>,
390        sync_count: u32,
391    }
392
393    impl RecordingTarget {
394        fn new() -> Self {
395            Self {
396                pages: Vec::new(),
397                truncate_to: None,
398                sync_count: 0,
399            }
400        }
401    }
402
403    impl CheckpointTarget for RecordingTarget {
404        fn write_page(&mut self, _cx: &Cx, page_no: PageNumber, data: &[u8]) -> Result<()> {
405            self.pages.push((page_no, data.to_vec()));
406            Ok(())
407        }
408
409        fn truncate_db(&mut self, _cx: &Cx, n_pages: u32) -> Result<()> {
410            self.truncate_to = Some(n_pages);
411            Ok(())
412        }
413
414        fn sync_db(&mut self, _cx: &Cx) -> Result<()> {
415            self.sync_count += 1;
416            Ok(())
417        }
418    }
419
420    /// Populate a WAL with N frames, where the last frame is a commit frame.
421    fn populate_wal(wal: &mut WalFile<impl VfsFile>, cx: &Cx, n_frames: u32) {
422        for i in 0..n_frames {
423            let page = sample_page(u8::try_from(i & 0xFF).expect("masked to u8"));
424            let db_size = if i == n_frames - 1 { n_frames } else { 0 };
425            wal.append_frame(cx, i + 1, &page, db_size)
426                .expect("append frame");
427        }
428    }
429
430    // ── Passive mode tests ──
431
432    #[test]
433    fn test_passive_backfills_all_when_no_readers() {
434        let cx = test_cx();
435        let vfs = MemoryVfs::new();
436        let file = open_wal_file(&vfs, &cx);
437        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
438        populate_wal(&mut wal, &cx, 5);
439
440        let state = CheckpointState {
441            total_frames: 5,
442            backfilled_frames: 0,
443            oldest_reader_frame: None,
444        };
445        let mut target = RecordingTarget::new();
446        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
447            .expect("checkpoint");
448
449        assert_eq!(result.frames_backfilled, 5);
450        assert!(result.plan.completes_checkpoint());
451        assert!(!result.wal_was_reset);
452        assert_eq!(target.pages.len(), 5);
453        assert!(target.sync_count >= 1);
454    }
455
456    #[test]
457    fn test_passive_stops_at_reader_limit() {
458        let cx = test_cx();
459        let vfs = MemoryVfs::new();
460        let file = open_wal_file(&vfs, &cx);
461        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
462        populate_wal(&mut wal, &cx, 10);
463
464        let state = CheckpointState {
465            total_frames: 10,
466            backfilled_frames: 0,
467            oldest_reader_frame: Some(6),
468        };
469        let mut target = RecordingTarget::new();
470        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
471            .expect("checkpoint");
472
473        assert_eq!(result.frames_backfilled, 6);
474        assert!(!result.plan.completes_checkpoint());
475        assert!(!result.wal_was_reset);
476        assert_eq!(target.pages.len(), 6);
477    }
478
479    #[test]
480    fn test_passive_partial_backfill_resumes() {
481        let cx = test_cx();
482        let vfs = MemoryVfs::new();
483        let file = open_wal_file(&vfs, &cx);
484        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
485        populate_wal(&mut wal, &cx, 8);
486
487        // First pass: backfill 4 frames (reader at 4).
488        let state1 = CheckpointState {
489            total_frames: 8,
490            backfilled_frames: 0,
491            oldest_reader_frame: Some(4),
492        };
493        let mut target1 = RecordingTarget::new();
494        let r1 = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state1, &mut target1)
495            .expect("ckpt 1");
496        assert_eq!(r1.frames_backfilled, 4);
497
498        // Second pass: reader gone, resume from frame 4.
499        let state2 = CheckpointState {
500            total_frames: 8,
501            backfilled_frames: 4,
502            oldest_reader_frame: None,
503        };
504        let mut target2 = RecordingTarget::new();
505        let r2 = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state2, &mut target2)
506            .expect("ckpt 2");
507        assert_eq!(r2.frames_backfilled, 4);
508        assert!(r2.plan.completes_checkpoint());
509
510        // Verify pages from second pass are frames 4..8 (pages 5,6,7,8).
511        let page_numbers: Vec<u32> = target2.pages.iter().map(|(pn, _)| pn.get()).collect();
512        assert_eq!(page_numbers, vec![5, 6, 7, 8]);
513    }
514
515    // ── Full mode tests ──
516
517    #[test]
518    fn test_full_marks_blocked_when_reader_pins_tail() {
519        let cx = test_cx();
520        let vfs = MemoryVfs::new();
521        let file = open_wal_file(&vfs, &cx);
522        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
523        populate_wal(&mut wal, &cx, 10);
524
525        let state = CheckpointState {
526            total_frames: 10,
527            backfilled_frames: 0,
528            oldest_reader_frame: Some(7),
529        };
530        let mut target = RecordingTarget::new();
531        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
532            .expect("checkpoint");
533
534        assert_eq!(result.frames_backfilled, 7);
535        assert!(!result.plan.completes_checkpoint());
536        assert!(result.plan.blocked_by_readers);
537    }
538
539    #[test]
540    fn test_full_completes_without_readers() {
541        let cx = test_cx();
542        let vfs = MemoryVfs::new();
543        let file = open_wal_file(&vfs, &cx);
544        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
545        populate_wal(&mut wal, &cx, 5);
546
547        let state = CheckpointState {
548            total_frames: 5,
549            backfilled_frames: 0,
550            oldest_reader_frame: None,
551        };
552        let mut target = RecordingTarget::new();
553        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
554            .expect("checkpoint");
555
556        assert_eq!(result.frames_backfilled, 5);
557        assert!(result.plan.completes_checkpoint());
558        assert!(!result.plan.blocked_by_readers);
559    }
560
561    // ── Restart mode tests ──
562
563    #[test]
564    fn test_restart_resets_wal_when_complete() {
565        let cx = test_cx();
566        let vfs = MemoryVfs::new();
567        let file = open_wal_file(&vfs, &cx);
568        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
569        populate_wal(&mut wal, &cx, 4);
570
571        let state = CheckpointState {
572            total_frames: 4,
573            backfilled_frames: 0,
574            oldest_reader_frame: None,
575        };
576        let mut target = RecordingTarget::new();
577        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
578            .expect("checkpoint");
579
580        assert_eq!(result.frames_backfilled, 4);
581        assert!(result.wal_was_reset);
582        assert_eq!(wal.frame_count(), 0);
583        assert_eq!(wal.header().checkpoint_seq, 1);
584    }
585
586    #[test]
587    fn test_restart_skips_reset_when_reader_active() {
588        let cx = test_cx();
589        let vfs = MemoryVfs::new();
590        let file = open_wal_file(&vfs, &cx);
591        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
592        populate_wal(&mut wal, &cx, 4);
593
594        let state = CheckpointState {
595            total_frames: 4,
596            backfilled_frames: 0,
597            oldest_reader_frame: Some(4),
598        };
599        let mut target = RecordingTarget::new();
600        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
601            .expect("checkpoint");
602
603        // All 4 are backfilled (reader at end doesn't block backfill),
604        // but WAL reset is skipped because reader is present.
605        assert_eq!(result.frames_backfilled, 4);
606        assert!(!result.wal_was_reset);
607        assert_eq!(wal.frame_count(), 4);
608    }
609
610    #[test]
611    fn test_restart_resets_wal_when_already_backfilled() {
612        let cx = test_cx();
613        let vfs = MemoryVfs::new();
614        let file = open_wal_file(&vfs, &cx);
615        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
616        populate_wal(&mut wal, &cx, 4);
617
618        let state = CheckpointState {
619            total_frames: 4,
620            backfilled_frames: 4,
621            oldest_reader_frame: None,
622        };
623        let mut target = RecordingTarget::new();
624        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
625            .expect("checkpoint");
626
627        assert_eq!(result.frames_backfilled, 0);
628        assert!(result.wal_was_reset);
629        assert_eq!(wal.frame_count(), 0);
630        assert_eq!(wal.header().checkpoint_seq, 1);
631    }
632
633    // ── Truncate mode tests ──
634
635    #[test]
636    fn test_truncate_resets_wal_when_complete() {
637        let cx = test_cx();
638        let vfs = MemoryVfs::new();
639        let file = open_wal_file(&vfs, &cx);
640        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
641        populate_wal(&mut wal, &cx, 6);
642
643        let state = CheckpointState {
644            total_frames: 6,
645            backfilled_frames: 0,
646            oldest_reader_frame: None,
647        };
648        let mut target = RecordingTarget::new();
649        let result =
650            execute_checkpoint(&cx, &mut wal, CheckpointMode::Truncate, state, &mut target)
651                .expect("checkpoint");
652
653        assert_eq!(result.frames_backfilled, 6);
654        assert!(result.wal_was_reset);
655        assert_eq!(wal.frame_count(), 0);
656        assert_eq!(wal.header().checkpoint_seq, 1);
657    }
658
659    #[test]
660    fn test_truncate_skips_reset_when_reader_active() {
661        let cx = test_cx();
662        let vfs = MemoryVfs::new();
663        let file = open_wal_file(&vfs, &cx);
664        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
665        populate_wal(&mut wal, &cx, 6);
666
667        let state = CheckpointState {
668            total_frames: 6,
669            backfilled_frames: 0,
670            oldest_reader_frame: Some(6),
671        };
672        let mut target = RecordingTarget::new();
673        let result =
674            execute_checkpoint(&cx, &mut wal, CheckpointMode::Truncate, state, &mut target)
675                .expect("checkpoint");
676
677        assert_eq!(result.frames_backfilled, 6);
678        assert!(!result.wal_was_reset);
679    }
680
681    #[test]
682    fn test_truncate_resets_wal_when_already_backfilled() {
683        let cx = test_cx();
684        let vfs = MemoryVfs::new();
685        let file = open_wal_file(&vfs, &cx);
686        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
687        populate_wal(&mut wal, &cx, 6);
688
689        let state = CheckpointState {
690            total_frames: 6,
691            backfilled_frames: 6,
692            oldest_reader_frame: None,
693        };
694        let mut target = RecordingTarget::new();
695        let result =
696            execute_checkpoint(&cx, &mut wal, CheckpointMode::Truncate, state, &mut target)
697                .expect("checkpoint");
698
699        assert_eq!(result.frames_backfilled, 0);
700        assert!(result.wal_was_reset);
701        assert_eq!(wal.frame_count(), 0);
702        assert_eq!(wal.header().checkpoint_seq, 1);
703    }
704
705    // ── Empty / edge case tests ──
706
707    #[test]
708    fn test_checkpoint_empty_wal_is_noop() {
709        let cx = test_cx();
710        let vfs = MemoryVfs::new();
711        let file = open_wal_file(&vfs, &cx);
712        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
713
714        let state = CheckpointState {
715            total_frames: 0,
716            backfilled_frames: 0,
717            oldest_reader_frame: None,
718        };
719        let mut target = RecordingTarget::new();
720        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
721            .expect("checkpoint");
722
723        assert_eq!(result.frames_backfilled, 0);
724        assert!(target.pages.is_empty());
725        assert_eq!(target.sync_count, 0);
726    }
727
728    #[test]
729    fn test_checkpoint_already_fully_backfilled() {
730        let cx = test_cx();
731        let vfs = MemoryVfs::new();
732        let file = open_wal_file(&vfs, &cx);
733        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
734        populate_wal(&mut wal, &cx, 3);
735
736        let state = CheckpointState {
737            total_frames: 3,
738            backfilled_frames: 3,
739            oldest_reader_frame: None,
740        };
741        let mut target = RecordingTarget::new();
742        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
743            .expect("checkpoint");
744
745        assert_eq!(result.frames_backfilled, 0);
746        assert!(result.plan.completes_checkpoint());
747    }
748
749    #[test]
750    fn test_checkpoint_writes_correct_page_data() {
751        let cx = test_cx();
752        let vfs = MemoryVfs::new();
753        let file = open_wal_file(&vfs, &cx);
754        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
755
756        // Append 3 frames with distinct page data.
757        for i in 0..3u32 {
758            let page = sample_page(u8::try_from(i).expect("fits"));
759            let db_size = if i == 2 { 3 } else { 0 };
760            wal.append_frame(&cx, i + 1, &page, db_size)
761                .expect("append");
762        }
763
764        let state = CheckpointState {
765            total_frames: 3,
766            backfilled_frames: 0,
767            oldest_reader_frame: None,
768        };
769        let mut target = RecordingTarget::new();
770        execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
771            .expect("checkpoint");
772
773        // Verify each written page matches the original data.
774        for (i, (page_no, data)) in target.pages.iter().enumerate() {
775            let expected_page_number = u32::try_from(i + 1).expect("fits");
776            assert_eq!(page_no.get(), expected_page_number);
777            let expected_data = sample_page(u8::try_from(i).expect("fits"));
778            assert_eq!(*data, expected_data);
779        }
780    }
781
782    #[test]
783    fn test_checkpoint_db_truncation_on_complete() {
784        let cx = test_cx();
785        let vfs = MemoryVfs::new();
786        let file = open_wal_file(&vfs, &cx);
787        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
788
789        // Append 3 frames, commit frame reports db_size=3.
790        populate_wal(&mut wal, &cx, 3);
791
792        let state = CheckpointState {
793            total_frames: 3,
794            backfilled_frames: 0,
795            oldest_reader_frame: None,
796        };
797        let mut target = RecordingTarget::new();
798        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
799            .expect("checkpoint");
800
801        assert_eq!(result.db_size_pages, Some(3));
802        assert_eq!(target.truncate_to, Some(3));
803    }
804
805    #[test]
806    fn test_wal_can_accept_new_frames_after_restart() {
807        let cx = test_cx();
808        let vfs = MemoryVfs::new();
809        let file = open_wal_file(&vfs, &cx);
810        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
811        populate_wal(&mut wal, &cx, 4);
812
813        let state = CheckpointState {
814            total_frames: 4,
815            backfilled_frames: 0,
816            oldest_reader_frame: None,
817        };
818        let mut target = RecordingTarget::new();
819        execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
820            .expect("checkpoint");
821
822        assert_eq!(wal.frame_count(), 0);
823        assert_eq!(wal.header().checkpoint_seq, 1);
824
825        // Append new frames to the reset WAL.
826        wal.append_frame(&cx, 1, &sample_page(0xAA), 0)
827            .expect("append after restart");
828        wal.append_frame(&cx, 2, &sample_page(0xBB), 2)
829            .expect("append commit after restart");
830        assert_eq!(wal.frame_count(), 2);
831    }
832
833    #[test]
834    fn test_checkpoint_deduplicates_same_page_uses_latest_frame() {
835        let cx = test_cx();
836        let vfs = MemoryVfs::new();
837        let file = open_wal_file(&vfs, &cx);
838        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
839
840        let page1_v1 = sample_page(0x01);
841        let page1_v2 = sample_page(0x02);
842        wal.append_frame(&cx, 1, &page1_v1, 0).expect("append v1");
843        wal.append_frame(&cx, 1, &page1_v2, 1).expect("append v2");
844
845        let state = CheckpointState {
846            total_frames: 2,
847            backfilled_frames: 0,
848            oldest_reader_frame: None,
849        };
850        let mut target = RecordingTarget::new();
851        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
852            .expect("checkpoint");
853
854        assert_eq!(result.frames_backfilled, 2);
855        assert_eq!(
856            target.pages.len(),
857            1,
858            "same page written twice → one deduped write"
859        );
860        assert_eq!(target.pages[0].0.get(), 1);
861        assert_eq!(target.pages[0].1, page1_v2, "must use latest frame's data");
862    }
863
864    #[test]
865    fn test_recording_target_read_page_default_returns_none() {
866        let cx = test_cx();
867        let target = RecordingTarget::new();
868        let mut buf = vec![0u8; 4096];
869        let page = PageNumber::new(1).expect("valid page");
870        let result = target
871            .read_page_if_supported(&cx, page, &mut buf)
872            .expect("no error");
873        assert!(result.is_none());
874    }
875
876    #[test]
877    fn test_consecutive_restarts_bump_salts_and_checkpoint_seq() {
878        let cx = test_cx();
879        let vfs = MemoryVfs::new();
880        let file = open_wal_file(&vfs, &cx);
881        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
882
883        for round in 0..3u32 {
884            populate_wal(&mut wal, &cx, 2);
885            let state = CheckpointState {
886                total_frames: 2,
887                backfilled_frames: 0,
888                oldest_reader_frame: None,
889            };
890            let mut target = RecordingTarget::new();
891            execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
892                .expect("checkpoint");
893            assert_eq!(wal.header().checkpoint_seq, round + 1);
894        }
895        assert_eq!(wal.header().checkpoint_seq, 3);
896        let salts = wal.header().salts;
897        assert_eq!(salts.salt1, test_salts().salt1.wrapping_add(3));
898        assert_eq!(salts.salt2, test_salts().salt2.wrapping_add(3));
899    }
900
901    #[test]
902    fn test_checkpoint_execution_result_fields() {
903        let cx = test_cx();
904        let vfs = MemoryVfs::new();
905        let file = open_wal_file(&vfs, &cx);
906        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
907        populate_wal(&mut wal, &cx, 3);
908
909        let state = CheckpointState {
910            total_frames: 3,
911            backfilled_frames: 0,
912            oldest_reader_frame: None,
913        };
914        let mut target = RecordingTarget::new();
915        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
916            .expect("checkpoint");
917
918        assert_eq!(result.frames_backfilled, 3);
919        assert_eq!(result.db_size_pages, Some(3));
920        assert!(!result.wal_was_reset);
921        assert_eq!(result.plan.mode, CheckpointMode::Full);
922        assert!(result.plan.completes_checkpoint());
923    }
924
925    #[test]
926    fn test_checkpoint_execution_result_clone_eq_debug() {
927        let cx = test_cx();
928        let vfs = MemoryVfs::new();
929        let file = open_wal_file(&vfs, &cx);
930        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
931        populate_wal(&mut wal, &cx, 2);
932
933        let state = CheckpointState {
934            total_frames: 2,
935            backfilled_frames: 0,
936            oldest_reader_frame: None,
937        };
938        let mut target = RecordingTarget::new();
939        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
940            .expect("checkpoint");
941
942        let cloned = result.clone();
943        assert_eq!(result, cloned);
944        let dbg = format!("{result:?}");
945        assert!(dbg.contains("CheckpointExecutionResult"));
946        assert!(dbg.contains("frames_backfilled"));
947    }
948
949    #[test]
950    fn test_passive_syncs_exactly_once() {
951        let cx = test_cx();
952        let vfs = MemoryVfs::new();
953        let file = open_wal_file(&vfs, &cx);
954        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
955        populate_wal(&mut wal, &cx, 3);
956
957        let state = CheckpointState {
958            total_frames: 3,
959            backfilled_frames: 0,
960            oldest_reader_frame: Some(2),
961        };
962        let mut target = RecordingTarget::new();
963        execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
964            .expect("checkpoint");
965
966        assert_eq!(
967            target.sync_count, 1,
968            "partial passive should sync exactly once"
969        );
970    }
971
972    #[test]
973    fn test_full_complete_syncs_twice_for_truncate() {
974        let cx = test_cx();
975        let vfs = MemoryVfs::new();
976        let file = open_wal_file(&vfs, &cx);
977        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
978        populate_wal(&mut wal, &cx, 3);
979
980        let state = CheckpointState {
981            total_frames: 3,
982            backfilled_frames: 0,
983            oldest_reader_frame: None,
984        };
985        let mut target = RecordingTarget::new();
986        execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
987            .expect("checkpoint");
988
989        assert_eq!(
990            target.sync_count, 2,
991            "full complete with db_size truncate should sync twice"
992        );
993    }
994
995    #[test]
996    fn test_pages_written_in_ascending_order() {
997        let cx = test_cx();
998        let vfs = MemoryVfs::new();
999        let file = open_wal_file(&vfs, &cx);
1000        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1001
1002        wal.append_frame(&cx, 5, &sample_page(5), 0)
1003            .expect("append");
1004        wal.append_frame(&cx, 2, &sample_page(2), 0)
1005            .expect("append");
1006        wal.append_frame(&cx, 8, &sample_page(8), 0)
1007            .expect("append");
1008        wal.append_frame(&cx, 1, &sample_page(1), 4)
1009            .expect("append commit");
1010
1011        let state = CheckpointState {
1012            total_frames: 4,
1013            backfilled_frames: 0,
1014            oldest_reader_frame: None,
1015        };
1016        let mut target = RecordingTarget::new();
1017        execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
1018            .expect("checkpoint");
1019
1020        let page_nums: Vec<u32> = target.pages.iter().map(|(pn, _)| pn.get()).collect();
1021        let mut sorted = page_nums.clone();
1022        sorted.sort_unstable();
1023        assert_eq!(
1024            page_nums, sorted,
1025            "pages must be written in ascending order"
1026        );
1027    }
1028
1029    #[test]
1030    fn checkpoint_execution_result_clone_eq_debug() {
1031        let plan = plan_checkpoint(
1032            CheckpointMode::Passive,
1033            CheckpointState {
1034                total_frames: 4,
1035                backfilled_frames: 0,
1036                oldest_reader_frame: None,
1037            },
1038        );
1039        let result = CheckpointExecutionResult {
1040            plan,
1041            frames_backfilled: 4,
1042            db_size_pages: Some(10),
1043            wal_was_reset: false,
1044        };
1045        let cloned = result.clone();
1046        assert_eq!(cloned, result);
1047        let dbg = format!("{result:?}");
1048        assert!(dbg.contains("CheckpointExecutionResult"));
1049    }
1050
1051    #[test]
1052    fn checkpoint_execution_result_ne_on_different_fields() {
1053        let plan = plan_checkpoint(
1054            CheckpointMode::Passive,
1055            CheckpointState {
1056                total_frames: 1,
1057                backfilled_frames: 0,
1058                oldest_reader_frame: None,
1059            },
1060        );
1061        let a = CheckpointExecutionResult {
1062            plan,
1063            frames_backfilled: 1,
1064            db_size_pages: Some(1),
1065            wal_was_reset: false,
1066        };
1067        let b = CheckpointExecutionResult {
1068            plan,
1069            frames_backfilled: 2,
1070            db_size_pages: Some(1),
1071            wal_was_reset: false,
1072        };
1073        assert_ne!(a, b);
1074    }
1075
1076    #[test]
1077    fn checkpoint_target_default_read_page_returns_none() {
1078        let target = RecordingTarget::new();
1079        let cx = test_cx();
1080        let page = PageNumber::new(1).expect("valid");
1081        let mut buf = [0u8; 4096];
1082        let result = target
1083            .read_page_if_supported(&cx, page, &mut buf)
1084            .expect("ok");
1085        assert!(result.is_none());
1086    }
1087
1088    #[test]
1089    fn empty_wal_passive_yields_zero_backfilled() {
1090        let cx = test_cx();
1091        let vfs = MemoryVfs::new();
1092        let file = open_wal_file(&vfs, &cx);
1093        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1094        let state = CheckpointState {
1095            total_frames: 0,
1096            backfilled_frames: 0,
1097            oldest_reader_frame: None,
1098        };
1099        let mut target = RecordingTarget::new();
1100        let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
1101            .expect("checkpoint");
1102        assert_eq!(result.frames_backfilled, 0);
1103        assert!(!result.wal_was_reset);
1104    }
1105
1106    #[test]
1107    fn mid_checkpoint_crash_produces_partial_backfill() {
1108        use std::sync::Mutex;
1109        static LOCK: Mutex<()> = Mutex::new(());
1110        let _guard = LOCK.lock().unwrap();
1111
1112        let cx = test_cx();
1113        let vfs = MemoryVfs::new();
1114        let file = open_wal_file(&vfs, &cx);
1115        let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1116        populate_wal(&mut wal, &cx, 5);
1117
1118        let state = CheckpointState {
1119            total_frames: 5,
1120            backfilled_frames: 0,
1121            oldest_reader_frame: None,
1122        };
1123
1124        crate::fault_hooks::arm_crash_boundary(
1125            crate::fault_hooks::CrashBoundary::MidCheckpoint,
1126            crate::fault_hooks::FaultHookArm::new(
1127                "mid-ckpt-crash",
1128                "CHECKPOINT-MID-CRASH",
1129                "test_mid_checkpoint_crash",
1130            ),
1131        );
1132
1133        let mut target = RecordingTarget::new();
1134        let err = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
1135            .expect_err("should fail at MidCheckpoint boundary after first page");
1136
1137        crate::fault_hooks::clear_crash_boundary();
1138
1139        assert!(
1140            err.to_string().contains("fault_inject"),
1141            "error identifies the fault hook: {err}"
1142        );
1143        assert_eq!(
1144            target.pages.len(),
1145            1,
1146            "only the first page was written before the crash fired on page_idx=1"
1147        );
1148    }
1149}