1use fsqlite_error::{FrankenError, Result};
16use fsqlite_types::PageNumber;
17use fsqlite_types::cx::Cx;
18use fsqlite_vfs::VfsFile;
19use tracing::{debug, info};
20
21use crate::checkpoint::{
22 CheckpointMode, CheckpointPlan, CheckpointPostAction, CheckpointProgress, CheckpointState,
23 plan_checkpoint,
24};
25use crate::checksum::{WAL_FRAME_HEADER_SIZE, WalSalts};
26use crate::recovery_fence::{CheckpointChecksumVerdict, ExpectedPageChecksum};
27use crate::wal::WalFile;
28
29pub trait CheckpointTarget {
39 fn write_page(&mut self, cx: &Cx, page_no: PageNumber, data: &[u8]) -> Result<()>;
41
42 fn truncate_db(&mut self, cx: &Cx, n_pages: u32) -> Result<()>;
44
45 fn sync_db(&mut self, cx: &Cx) -> Result<()>;
47
48 fn read_page_if_supported(
57 &self,
58 _cx: &Cx,
59 _page_no: PageNumber,
60 _buf: &mut [u8],
61 ) -> Result<Option<usize>> {
62 Ok(None)
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct CheckpointExecutionResult {
73 pub plan: CheckpointPlan,
75 pub frames_backfilled: u32,
77 pub db_size_pages: Option<u32>,
79 pub wal_was_reset: bool,
81}
82
83#[allow(clippy::too_many_lines)]
100pub fn execute_checkpoint<F: VfsFile>(
101 cx: &Cx,
102 wal: &mut WalFile<F>,
103 mode: CheckpointMode,
104 state: CheckpointState,
105 target: &mut impl CheckpointTarget,
106) -> Result<CheckpointExecutionResult> {
107 let checkpoint_start = fsqlite_types::sync_primitives::Instant::now();
108 let plan = plan_checkpoint(mode, state);
109 let normalized = state.normalized();
110
111 info!(
112 mode = ?plan.mode,
113 frames_to_backfill = plan.frames_to_backfill,
114 progress = ?plan.progress,
115 blocked_by_readers = plan.blocked_by_readers,
116 post_action = ?plan.post_action,
117 "checkpoint plan computed"
118 );
119
120 let mut frames_backfilled: u32 = 0;
121 let mut last_db_size: Option<u32> = None;
122 let mut expected_checksums: Vec<ExpectedPageChecksum> = Vec::new();
125 if plan.frames_to_backfill > 0 {
126 let start = usize::try_from(normalized.backfilled_frames).unwrap_or(usize::MAX);
128 let count = usize::try_from(plan.frames_to_backfill).unwrap_or(usize::MAX);
129 let end = start.saturating_add(count).min(wal.frame_count());
130
131 let mut latest_frames: std::collections::HashMap<PageNumber, usize> =
132 std::collections::HashMap::new();
133
134 for frame_idx in start..end {
136 let header = wal.read_frame_header(cx, frame_idx)?;
137
138 let page_no =
139 PageNumber::new(header.page_number).ok_or_else(|| FrankenError::OutOfRange {
140 what: "checkpoint frame page number".to_owned(),
141 value: header.page_number.to_string(),
142 })?;
143
144 latest_frames.insert(page_no, frame_idx);
145 frames_backfilled += 1;
146
147 if header.is_commit() && header.db_size > 0 {
148 last_db_size = Some(header.db_size);
149 }
150 }
151
152 let mut sorted_pages: Vec<(PageNumber, usize)> = latest_frames.into_iter().collect();
154 sorted_pages.sort_unstable_by_key(|(p, _)| p.get());
155
156 let mut frame_buf = vec![0u8; wal.frame_size()];
157 for (fault_page_idx, (page_no, frame_idx)) in sorted_pages.iter().enumerate() {
158 #[cfg(not(any(test, feature = "fault-injection")))]
159 let _ = fault_page_idx;
160 #[cfg(any(test, feature = "fault-injection"))]
161 {
162 if fault_page_idx > 0 {
163 crate::fault_hooks::maybe_inject_crash_at(
164 crate::fault_hooks::CrashBoundary::MidCheckpoint,
165 &format!("page_idx={fault_page_idx} page_no={}", page_no.get()),
166 )?;
167 }
168 }
169
170 wal.read_frame_into(cx, *frame_idx, &mut frame_buf)?;
171 let page_data = &frame_buf[WAL_FRAME_HEADER_SIZE..];
172 target.write_page(cx, *page_no, page_data)?;
173
174 if page_data.len() >= crate::checksum::PAGE_CHECKSUM_RESERVED_BYTES {
181 if let Ok(checksum) = crate::checksum::read_page_checksum(page_data) {
182 expected_checksums.push(ExpectedPageChecksum {
183 page: *page_no,
184 checksum,
185 });
186 }
187 }
188
189 debug!(
190 frame_idx = *frame_idx,
191 page_number = page_no.get(),
192 "checkpoint: page backfilled"
193 );
194 }
195
196 target.sync_db(cx)?;
198
199 if matches!(plan.progress, CheckpointProgress::Complete) {
202 if let Some(db_size) = last_db_size {
203 target.truncate_db(cx, db_size)?;
204 target.sync_db(cx)?;
205 }
206 }
207 }
208
209 let wal_was_reset =
215 apply_checkpoint_post_action(cx, wal, plan.post_action, target, &expected_checksums)?;
216
217 let checkpoint_duration_us = crate::metrics::duration_us_saturating(checkpoint_start.elapsed());
218
219 info!(
220 frames_backfilled,
221 wal_was_reset,
222 db_size_pages = ?last_db_size,
223 checkpoint_duration_us,
224 "checkpoint execution complete"
225 );
226
227 crate::metrics::GLOBAL_WAL_METRICS
228 .record_checkpoint(u64::from(frames_backfilled), checkpoint_duration_us);
229
230 #[cfg(any(test, feature = "fault-injection"))]
231 crate::fault_hooks::maybe_inject_crash_at(
232 crate::fault_hooks::CrashBoundary::AfterCheckpoint,
233 &format!("frames_backfilled={frames_backfilled} wal_was_reset={wal_was_reset}"),
234 )?;
235
236 Ok(CheckpointExecutionResult {
237 plan,
238 frames_backfilled,
239 db_size_pages: last_db_size,
240 wal_was_reset,
241 })
242}
243
244fn apply_checkpoint_post_action<F: VfsFile>(
245 cx: &Cx,
246 wal: &mut WalFile<F>,
247 post_action: CheckpointPostAction,
248 target: &mut impl CheckpointTarget,
249 expected_checksums: &[ExpectedPageChecksum],
250) -> Result<bool> {
251 match post_action {
252 CheckpointPostAction::ResetWal | CheckpointPostAction::TruncateWal => {
253 let new_seq = wal.header().checkpoint_seq.wrapping_add(1);
254 let new_salts = WalSalts {
255 salt1: wal.header().salts.salt1.wrapping_add(1),
256 salt2: wal.header().salts.salt2.wrapping_add(1),
257 };
258 let truncate = matches!(post_action, CheckpointPostAction::TruncateWal);
259 if truncate {
260 crate::recovery_fence::ensure_db_fsync_before_wal_truncate(cx, target)?;
268
269 if !expected_checksums.is_empty() {
275 let verdict = verify_checkpoint_checksums_via_target(
276 cx,
277 target,
278 wal.page_size(),
279 expected_checksums,
280 )?;
281 if let CheckpointChecksumVerdict::Mismatch { first_bad_page } = verdict {
282 tracing::error!(
283 target: "fsqlite.wal.recovery_fence",
284 first_bad_page = first_bad_page.get(),
285 "UNRECOVERABLE: post-checkpoint DB/WAL disagreed; refusing truncate"
286 );
287 return Err(FrankenError::DatabaseCorrupt {
288 detail: format!(
289 "post-checkpoint DB/WAL state disagreed at page {}; WAL truncate refused \
290 to preserve committed frames (bd-yfdb6)",
291 first_bad_page.get()
292 ),
293 });
294 }
295 }
296 }
297 wal.reset(cx, new_seq, new_salts, truncate)?;
298 info!(
299 new_checkpoint_seq = new_seq,
300 action = ?post_action,
301 truncate,
302 "WAL reset after checkpoint"
303 );
304 Ok(true)
305 }
306 CheckpointPostAction::None => Ok(false),
307 }
308}
309
310fn verify_checkpoint_checksums_via_target(
316 cx: &Cx,
317 target: &impl CheckpointTarget,
318 page_size: usize,
319 expected: &[ExpectedPageChecksum],
320) -> Result<CheckpointChecksumVerdict> {
321 let mut buf = vec![0u8; page_size];
322 for exp in expected {
323 let maybe_read = target.read_page_if_supported(cx, exp.page, &mut buf)?;
324 let Some(n) = maybe_read else {
325 return Ok(CheckpointChecksumVerdict::Match);
327 };
328 if n < page_size {
329 return Ok(CheckpointChecksumVerdict::Mismatch {
330 first_bad_page: exp.page,
331 });
332 }
333 let observed = crate::checksum::read_page_checksum(&buf)?;
334 if observed != exp.checksum {
335 return Ok(CheckpointChecksumVerdict::Mismatch {
336 first_bad_page: exp.page,
337 });
338 }
339 }
340 Ok(CheckpointChecksumVerdict::Match)
341}
342
343#[cfg(test)]
348mod tests {
349 use fsqlite_types::flags::VfsOpenFlags;
350 use fsqlite_vfs::MemoryVfs;
351 use fsqlite_vfs::traits::Vfs;
352
353 use super::*;
354
355 const PAGE_SIZE: u32 = 4096;
356
357 fn test_cx() -> Cx {
358 Cx::default()
359 }
360
361 fn test_salts() -> WalSalts {
362 WalSalts {
363 salt1: 0xDEAD_BEEF,
364 salt2: 0xCAFE_BABE,
365 }
366 }
367
368 fn sample_page(seed: u8) -> Vec<u8> {
369 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
370 let mut page = vec![0u8; page_size];
371 for (i, byte) in page.iter_mut().enumerate() {
372 let reduced = u8::try_from(i % 251).expect("modulo fits u8");
373 *byte = reduced ^ seed;
374 }
375 page
376 }
377
378 fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
379 let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
380 let (file, _) = vfs
381 .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
382 .expect("open WAL file");
383 file
384 }
385
386 struct RecordingTarget {
388 pages: Vec<(PageNumber, Vec<u8>)>,
389 truncate_to: Option<u32>,
390 sync_count: u32,
391 }
392
393 impl RecordingTarget {
394 fn new() -> Self {
395 Self {
396 pages: Vec::new(),
397 truncate_to: None,
398 sync_count: 0,
399 }
400 }
401 }
402
403 impl CheckpointTarget for RecordingTarget {
404 fn write_page(&mut self, _cx: &Cx, page_no: PageNumber, data: &[u8]) -> Result<()> {
405 self.pages.push((page_no, data.to_vec()));
406 Ok(())
407 }
408
409 fn truncate_db(&mut self, _cx: &Cx, n_pages: u32) -> Result<()> {
410 self.truncate_to = Some(n_pages);
411 Ok(())
412 }
413
414 fn sync_db(&mut self, _cx: &Cx) -> Result<()> {
415 self.sync_count += 1;
416 Ok(())
417 }
418 }
419
420 fn populate_wal(wal: &mut WalFile<impl VfsFile>, cx: &Cx, n_frames: u32) {
422 for i in 0..n_frames {
423 let page = sample_page(u8::try_from(i & 0xFF).expect("masked to u8"));
424 let db_size = if i == n_frames - 1 { n_frames } else { 0 };
425 wal.append_frame(cx, i + 1, &page, db_size)
426 .expect("append frame");
427 }
428 }
429
430 #[test]
433 fn test_passive_backfills_all_when_no_readers() {
434 let cx = test_cx();
435 let vfs = MemoryVfs::new();
436 let file = open_wal_file(&vfs, &cx);
437 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
438 populate_wal(&mut wal, &cx, 5);
439
440 let state = CheckpointState {
441 total_frames: 5,
442 backfilled_frames: 0,
443 oldest_reader_frame: None,
444 };
445 let mut target = RecordingTarget::new();
446 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
447 .expect("checkpoint");
448
449 assert_eq!(result.frames_backfilled, 5);
450 assert!(result.plan.completes_checkpoint());
451 assert!(!result.wal_was_reset);
452 assert_eq!(target.pages.len(), 5);
453 assert!(target.sync_count >= 1);
454 }
455
456 #[test]
457 fn test_passive_stops_at_reader_limit() {
458 let cx = test_cx();
459 let vfs = MemoryVfs::new();
460 let file = open_wal_file(&vfs, &cx);
461 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
462 populate_wal(&mut wal, &cx, 10);
463
464 let state = CheckpointState {
465 total_frames: 10,
466 backfilled_frames: 0,
467 oldest_reader_frame: Some(6),
468 };
469 let mut target = RecordingTarget::new();
470 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
471 .expect("checkpoint");
472
473 assert_eq!(result.frames_backfilled, 6);
474 assert!(!result.plan.completes_checkpoint());
475 assert!(!result.wal_was_reset);
476 assert_eq!(target.pages.len(), 6);
477 }
478
479 #[test]
480 fn test_passive_partial_backfill_resumes() {
481 let cx = test_cx();
482 let vfs = MemoryVfs::new();
483 let file = open_wal_file(&vfs, &cx);
484 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
485 populate_wal(&mut wal, &cx, 8);
486
487 let state1 = CheckpointState {
489 total_frames: 8,
490 backfilled_frames: 0,
491 oldest_reader_frame: Some(4),
492 };
493 let mut target1 = RecordingTarget::new();
494 let r1 = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state1, &mut target1)
495 .expect("ckpt 1");
496 assert_eq!(r1.frames_backfilled, 4);
497
498 let state2 = CheckpointState {
500 total_frames: 8,
501 backfilled_frames: 4,
502 oldest_reader_frame: None,
503 };
504 let mut target2 = RecordingTarget::new();
505 let r2 = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state2, &mut target2)
506 .expect("ckpt 2");
507 assert_eq!(r2.frames_backfilled, 4);
508 assert!(r2.plan.completes_checkpoint());
509
510 let page_numbers: Vec<u32> = target2.pages.iter().map(|(pn, _)| pn.get()).collect();
512 assert_eq!(page_numbers, vec![5, 6, 7, 8]);
513 }
514
515 #[test]
518 fn test_full_marks_blocked_when_reader_pins_tail() {
519 let cx = test_cx();
520 let vfs = MemoryVfs::new();
521 let file = open_wal_file(&vfs, &cx);
522 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
523 populate_wal(&mut wal, &cx, 10);
524
525 let state = CheckpointState {
526 total_frames: 10,
527 backfilled_frames: 0,
528 oldest_reader_frame: Some(7),
529 };
530 let mut target = RecordingTarget::new();
531 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
532 .expect("checkpoint");
533
534 assert_eq!(result.frames_backfilled, 7);
535 assert!(!result.plan.completes_checkpoint());
536 assert!(result.plan.blocked_by_readers);
537 }
538
539 #[test]
540 fn test_full_completes_without_readers() {
541 let cx = test_cx();
542 let vfs = MemoryVfs::new();
543 let file = open_wal_file(&vfs, &cx);
544 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
545 populate_wal(&mut wal, &cx, 5);
546
547 let state = CheckpointState {
548 total_frames: 5,
549 backfilled_frames: 0,
550 oldest_reader_frame: None,
551 };
552 let mut target = RecordingTarget::new();
553 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
554 .expect("checkpoint");
555
556 assert_eq!(result.frames_backfilled, 5);
557 assert!(result.plan.completes_checkpoint());
558 assert!(!result.plan.blocked_by_readers);
559 }
560
561 #[test]
564 fn test_restart_resets_wal_when_complete() {
565 let cx = test_cx();
566 let vfs = MemoryVfs::new();
567 let file = open_wal_file(&vfs, &cx);
568 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
569 populate_wal(&mut wal, &cx, 4);
570
571 let state = CheckpointState {
572 total_frames: 4,
573 backfilled_frames: 0,
574 oldest_reader_frame: None,
575 };
576 let mut target = RecordingTarget::new();
577 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
578 .expect("checkpoint");
579
580 assert_eq!(result.frames_backfilled, 4);
581 assert!(result.wal_was_reset);
582 assert_eq!(wal.frame_count(), 0);
583 assert_eq!(wal.header().checkpoint_seq, 1);
584 }
585
586 #[test]
587 fn test_restart_skips_reset_when_reader_active() {
588 let cx = test_cx();
589 let vfs = MemoryVfs::new();
590 let file = open_wal_file(&vfs, &cx);
591 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
592 populate_wal(&mut wal, &cx, 4);
593
594 let state = CheckpointState {
595 total_frames: 4,
596 backfilled_frames: 0,
597 oldest_reader_frame: Some(4),
598 };
599 let mut target = RecordingTarget::new();
600 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
601 .expect("checkpoint");
602
603 assert_eq!(result.frames_backfilled, 4);
606 assert!(!result.wal_was_reset);
607 assert_eq!(wal.frame_count(), 4);
608 }
609
610 #[test]
611 fn test_restart_resets_wal_when_already_backfilled() {
612 let cx = test_cx();
613 let vfs = MemoryVfs::new();
614 let file = open_wal_file(&vfs, &cx);
615 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
616 populate_wal(&mut wal, &cx, 4);
617
618 let state = CheckpointState {
619 total_frames: 4,
620 backfilled_frames: 4,
621 oldest_reader_frame: None,
622 };
623 let mut target = RecordingTarget::new();
624 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
625 .expect("checkpoint");
626
627 assert_eq!(result.frames_backfilled, 0);
628 assert!(result.wal_was_reset);
629 assert_eq!(wal.frame_count(), 0);
630 assert_eq!(wal.header().checkpoint_seq, 1);
631 }
632
633 #[test]
636 fn test_truncate_resets_wal_when_complete() {
637 let cx = test_cx();
638 let vfs = MemoryVfs::new();
639 let file = open_wal_file(&vfs, &cx);
640 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
641 populate_wal(&mut wal, &cx, 6);
642
643 let state = CheckpointState {
644 total_frames: 6,
645 backfilled_frames: 0,
646 oldest_reader_frame: None,
647 };
648 let mut target = RecordingTarget::new();
649 let result =
650 execute_checkpoint(&cx, &mut wal, CheckpointMode::Truncate, state, &mut target)
651 .expect("checkpoint");
652
653 assert_eq!(result.frames_backfilled, 6);
654 assert!(result.wal_was_reset);
655 assert_eq!(wal.frame_count(), 0);
656 assert_eq!(wal.header().checkpoint_seq, 1);
657 }
658
659 #[test]
660 fn test_truncate_skips_reset_when_reader_active() {
661 let cx = test_cx();
662 let vfs = MemoryVfs::new();
663 let file = open_wal_file(&vfs, &cx);
664 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
665 populate_wal(&mut wal, &cx, 6);
666
667 let state = CheckpointState {
668 total_frames: 6,
669 backfilled_frames: 0,
670 oldest_reader_frame: Some(6),
671 };
672 let mut target = RecordingTarget::new();
673 let result =
674 execute_checkpoint(&cx, &mut wal, CheckpointMode::Truncate, state, &mut target)
675 .expect("checkpoint");
676
677 assert_eq!(result.frames_backfilled, 6);
678 assert!(!result.wal_was_reset);
679 }
680
681 #[test]
682 fn test_truncate_resets_wal_when_already_backfilled() {
683 let cx = test_cx();
684 let vfs = MemoryVfs::new();
685 let file = open_wal_file(&vfs, &cx);
686 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
687 populate_wal(&mut wal, &cx, 6);
688
689 let state = CheckpointState {
690 total_frames: 6,
691 backfilled_frames: 6,
692 oldest_reader_frame: None,
693 };
694 let mut target = RecordingTarget::new();
695 let result =
696 execute_checkpoint(&cx, &mut wal, CheckpointMode::Truncate, state, &mut target)
697 .expect("checkpoint");
698
699 assert_eq!(result.frames_backfilled, 0);
700 assert!(result.wal_was_reset);
701 assert_eq!(wal.frame_count(), 0);
702 assert_eq!(wal.header().checkpoint_seq, 1);
703 }
704
705 #[test]
708 fn test_checkpoint_empty_wal_is_noop() {
709 let cx = test_cx();
710 let vfs = MemoryVfs::new();
711 let file = open_wal_file(&vfs, &cx);
712 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
713
714 let state = CheckpointState {
715 total_frames: 0,
716 backfilled_frames: 0,
717 oldest_reader_frame: None,
718 };
719 let mut target = RecordingTarget::new();
720 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
721 .expect("checkpoint");
722
723 assert_eq!(result.frames_backfilled, 0);
724 assert!(target.pages.is_empty());
725 assert_eq!(target.sync_count, 0);
726 }
727
728 #[test]
729 fn test_checkpoint_already_fully_backfilled() {
730 let cx = test_cx();
731 let vfs = MemoryVfs::new();
732 let file = open_wal_file(&vfs, &cx);
733 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
734 populate_wal(&mut wal, &cx, 3);
735
736 let state = CheckpointState {
737 total_frames: 3,
738 backfilled_frames: 3,
739 oldest_reader_frame: None,
740 };
741 let mut target = RecordingTarget::new();
742 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
743 .expect("checkpoint");
744
745 assert_eq!(result.frames_backfilled, 0);
746 assert!(result.plan.completes_checkpoint());
747 }
748
749 #[test]
750 fn test_checkpoint_writes_correct_page_data() {
751 let cx = test_cx();
752 let vfs = MemoryVfs::new();
753 let file = open_wal_file(&vfs, &cx);
754 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
755
756 for i in 0..3u32 {
758 let page = sample_page(u8::try_from(i).expect("fits"));
759 let db_size = if i == 2 { 3 } else { 0 };
760 wal.append_frame(&cx, i + 1, &page, db_size)
761 .expect("append");
762 }
763
764 let state = CheckpointState {
765 total_frames: 3,
766 backfilled_frames: 0,
767 oldest_reader_frame: None,
768 };
769 let mut target = RecordingTarget::new();
770 execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
771 .expect("checkpoint");
772
773 for (i, (page_no, data)) in target.pages.iter().enumerate() {
775 let expected_page_number = u32::try_from(i + 1).expect("fits");
776 assert_eq!(page_no.get(), expected_page_number);
777 let expected_data = sample_page(u8::try_from(i).expect("fits"));
778 assert_eq!(*data, expected_data);
779 }
780 }
781
782 #[test]
783 fn test_checkpoint_db_truncation_on_complete() {
784 let cx = test_cx();
785 let vfs = MemoryVfs::new();
786 let file = open_wal_file(&vfs, &cx);
787 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
788
789 populate_wal(&mut wal, &cx, 3);
791
792 let state = CheckpointState {
793 total_frames: 3,
794 backfilled_frames: 0,
795 oldest_reader_frame: None,
796 };
797 let mut target = RecordingTarget::new();
798 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
799 .expect("checkpoint");
800
801 assert_eq!(result.db_size_pages, Some(3));
802 assert_eq!(target.truncate_to, Some(3));
803 }
804
805 #[test]
806 fn test_wal_can_accept_new_frames_after_restart() {
807 let cx = test_cx();
808 let vfs = MemoryVfs::new();
809 let file = open_wal_file(&vfs, &cx);
810 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
811 populate_wal(&mut wal, &cx, 4);
812
813 let state = CheckpointState {
814 total_frames: 4,
815 backfilled_frames: 0,
816 oldest_reader_frame: None,
817 };
818 let mut target = RecordingTarget::new();
819 execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
820 .expect("checkpoint");
821
822 assert_eq!(wal.frame_count(), 0);
823 assert_eq!(wal.header().checkpoint_seq, 1);
824
825 wal.append_frame(&cx, 1, &sample_page(0xAA), 0)
827 .expect("append after restart");
828 wal.append_frame(&cx, 2, &sample_page(0xBB), 2)
829 .expect("append commit after restart");
830 assert_eq!(wal.frame_count(), 2);
831 }
832
833 #[test]
834 fn test_checkpoint_deduplicates_same_page_uses_latest_frame() {
835 let cx = test_cx();
836 let vfs = MemoryVfs::new();
837 let file = open_wal_file(&vfs, &cx);
838 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
839
840 let page1_v1 = sample_page(0x01);
841 let page1_v2 = sample_page(0x02);
842 wal.append_frame(&cx, 1, &page1_v1, 0).expect("append v1");
843 wal.append_frame(&cx, 1, &page1_v2, 1).expect("append v2");
844
845 let state = CheckpointState {
846 total_frames: 2,
847 backfilled_frames: 0,
848 oldest_reader_frame: None,
849 };
850 let mut target = RecordingTarget::new();
851 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
852 .expect("checkpoint");
853
854 assert_eq!(result.frames_backfilled, 2);
855 assert_eq!(
856 target.pages.len(),
857 1,
858 "same page written twice → one deduped write"
859 );
860 assert_eq!(target.pages[0].0.get(), 1);
861 assert_eq!(target.pages[0].1, page1_v2, "must use latest frame's data");
862 }
863
864 #[test]
865 fn test_recording_target_read_page_default_returns_none() {
866 let cx = test_cx();
867 let target = RecordingTarget::new();
868 let mut buf = vec![0u8; 4096];
869 let page = PageNumber::new(1).expect("valid page");
870 let result = target
871 .read_page_if_supported(&cx, page, &mut buf)
872 .expect("no error");
873 assert!(result.is_none());
874 }
875
876 #[test]
877 fn test_consecutive_restarts_bump_salts_and_checkpoint_seq() {
878 let cx = test_cx();
879 let vfs = MemoryVfs::new();
880 let file = open_wal_file(&vfs, &cx);
881 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
882
883 for round in 0..3u32 {
884 populate_wal(&mut wal, &cx, 2);
885 let state = CheckpointState {
886 total_frames: 2,
887 backfilled_frames: 0,
888 oldest_reader_frame: None,
889 };
890 let mut target = RecordingTarget::new();
891 execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
892 .expect("checkpoint");
893 assert_eq!(wal.header().checkpoint_seq, round + 1);
894 }
895 assert_eq!(wal.header().checkpoint_seq, 3);
896 let salts = wal.header().salts;
897 assert_eq!(salts.salt1, test_salts().salt1.wrapping_add(3));
898 assert_eq!(salts.salt2, test_salts().salt2.wrapping_add(3));
899 }
900
901 #[test]
902 fn test_checkpoint_execution_result_fields() {
903 let cx = test_cx();
904 let vfs = MemoryVfs::new();
905 let file = open_wal_file(&vfs, &cx);
906 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
907 populate_wal(&mut wal, &cx, 3);
908
909 let state = CheckpointState {
910 total_frames: 3,
911 backfilled_frames: 0,
912 oldest_reader_frame: None,
913 };
914 let mut target = RecordingTarget::new();
915 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
916 .expect("checkpoint");
917
918 assert_eq!(result.frames_backfilled, 3);
919 assert_eq!(result.db_size_pages, Some(3));
920 assert!(!result.wal_was_reset);
921 assert_eq!(result.plan.mode, CheckpointMode::Full);
922 assert!(result.plan.completes_checkpoint());
923 }
924
925 #[test]
926 fn test_checkpoint_execution_result_clone_eq_debug() {
927 let cx = test_cx();
928 let vfs = MemoryVfs::new();
929 let file = open_wal_file(&vfs, &cx);
930 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
931 populate_wal(&mut wal, &cx, 2);
932
933 let state = CheckpointState {
934 total_frames: 2,
935 backfilled_frames: 0,
936 oldest_reader_frame: None,
937 };
938 let mut target = RecordingTarget::new();
939 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
940 .expect("checkpoint");
941
942 let cloned = result.clone();
943 assert_eq!(result, cloned);
944 let dbg = format!("{result:?}");
945 assert!(dbg.contains("CheckpointExecutionResult"));
946 assert!(dbg.contains("frames_backfilled"));
947 }
948
949 #[test]
950 fn test_passive_syncs_exactly_once() {
951 let cx = test_cx();
952 let vfs = MemoryVfs::new();
953 let file = open_wal_file(&vfs, &cx);
954 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
955 populate_wal(&mut wal, &cx, 3);
956
957 let state = CheckpointState {
958 total_frames: 3,
959 backfilled_frames: 0,
960 oldest_reader_frame: Some(2),
961 };
962 let mut target = RecordingTarget::new();
963 execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
964 .expect("checkpoint");
965
966 assert_eq!(
967 target.sync_count, 1,
968 "partial passive should sync exactly once"
969 );
970 }
971
972 #[test]
973 fn test_full_complete_syncs_twice_for_truncate() {
974 let cx = test_cx();
975 let vfs = MemoryVfs::new();
976 let file = open_wal_file(&vfs, &cx);
977 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
978 populate_wal(&mut wal, &cx, 3);
979
980 let state = CheckpointState {
981 total_frames: 3,
982 backfilled_frames: 0,
983 oldest_reader_frame: None,
984 };
985 let mut target = RecordingTarget::new();
986 execute_checkpoint(&cx, &mut wal, CheckpointMode::Full, state, &mut target)
987 .expect("checkpoint");
988
989 assert_eq!(
990 target.sync_count, 2,
991 "full complete with db_size truncate should sync twice"
992 );
993 }
994
995 #[test]
996 fn test_pages_written_in_ascending_order() {
997 let cx = test_cx();
998 let vfs = MemoryVfs::new();
999 let file = open_wal_file(&vfs, &cx);
1000 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1001
1002 wal.append_frame(&cx, 5, &sample_page(5), 0)
1003 .expect("append");
1004 wal.append_frame(&cx, 2, &sample_page(2), 0)
1005 .expect("append");
1006 wal.append_frame(&cx, 8, &sample_page(8), 0)
1007 .expect("append");
1008 wal.append_frame(&cx, 1, &sample_page(1), 4)
1009 .expect("append commit");
1010
1011 let state = CheckpointState {
1012 total_frames: 4,
1013 backfilled_frames: 0,
1014 oldest_reader_frame: None,
1015 };
1016 let mut target = RecordingTarget::new();
1017 execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
1018 .expect("checkpoint");
1019
1020 let page_nums: Vec<u32> = target.pages.iter().map(|(pn, _)| pn.get()).collect();
1021 let mut sorted = page_nums.clone();
1022 sorted.sort_unstable();
1023 assert_eq!(
1024 page_nums, sorted,
1025 "pages must be written in ascending order"
1026 );
1027 }
1028
1029 #[test]
1030 fn checkpoint_execution_result_clone_eq_debug() {
1031 let plan = plan_checkpoint(
1032 CheckpointMode::Passive,
1033 CheckpointState {
1034 total_frames: 4,
1035 backfilled_frames: 0,
1036 oldest_reader_frame: None,
1037 },
1038 );
1039 let result = CheckpointExecutionResult {
1040 plan,
1041 frames_backfilled: 4,
1042 db_size_pages: Some(10),
1043 wal_was_reset: false,
1044 };
1045 let cloned = result.clone();
1046 assert_eq!(cloned, result);
1047 let dbg = format!("{result:?}");
1048 assert!(dbg.contains("CheckpointExecutionResult"));
1049 }
1050
1051 #[test]
1052 fn checkpoint_execution_result_ne_on_different_fields() {
1053 let plan = plan_checkpoint(
1054 CheckpointMode::Passive,
1055 CheckpointState {
1056 total_frames: 1,
1057 backfilled_frames: 0,
1058 oldest_reader_frame: None,
1059 },
1060 );
1061 let a = CheckpointExecutionResult {
1062 plan,
1063 frames_backfilled: 1,
1064 db_size_pages: Some(1),
1065 wal_was_reset: false,
1066 };
1067 let b = CheckpointExecutionResult {
1068 plan,
1069 frames_backfilled: 2,
1070 db_size_pages: Some(1),
1071 wal_was_reset: false,
1072 };
1073 assert_ne!(a, b);
1074 }
1075
1076 #[test]
1077 fn checkpoint_target_default_read_page_returns_none() {
1078 let target = RecordingTarget::new();
1079 let cx = test_cx();
1080 let page = PageNumber::new(1).expect("valid");
1081 let mut buf = [0u8; 4096];
1082 let result = target
1083 .read_page_if_supported(&cx, page, &mut buf)
1084 .expect("ok");
1085 assert!(result.is_none());
1086 }
1087
1088 #[test]
1089 fn empty_wal_passive_yields_zero_backfilled() {
1090 let cx = test_cx();
1091 let vfs = MemoryVfs::new();
1092 let file = open_wal_file(&vfs, &cx);
1093 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1094 let state = CheckpointState {
1095 total_frames: 0,
1096 backfilled_frames: 0,
1097 oldest_reader_frame: None,
1098 };
1099 let mut target = RecordingTarget::new();
1100 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
1101 .expect("checkpoint");
1102 assert_eq!(result.frames_backfilled, 0);
1103 assert!(!result.wal_was_reset);
1104 }
1105
1106 #[test]
1107 fn mid_checkpoint_crash_produces_partial_backfill() {
1108 use std::sync::Mutex;
1109 static LOCK: Mutex<()> = Mutex::new(());
1110 let _guard = LOCK.lock().unwrap();
1111
1112 let cx = test_cx();
1113 let vfs = MemoryVfs::new();
1114 let file = open_wal_file(&vfs, &cx);
1115 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1116 populate_wal(&mut wal, &cx, 5);
1117
1118 let state = CheckpointState {
1119 total_frames: 5,
1120 backfilled_frames: 0,
1121 oldest_reader_frame: None,
1122 };
1123
1124 crate::fault_hooks::arm_crash_boundary(
1125 crate::fault_hooks::CrashBoundary::MidCheckpoint,
1126 crate::fault_hooks::FaultHookArm::new(
1127 "mid-ckpt-crash",
1128 "CHECKPOINT-MID-CRASH",
1129 "test_mid_checkpoint_crash",
1130 ),
1131 );
1132
1133 let mut target = RecordingTarget::new();
1134 let err = execute_checkpoint(&cx, &mut wal, CheckpointMode::Passive, state, &mut target)
1135 .expect_err("should fail at MidCheckpoint boundary after first page");
1136
1137 crate::fault_hooks::clear_crash_boundary();
1138
1139 assert!(
1140 err.to_string().contains("fault_inject"),
1141 "error identifies the fault hook: {err}"
1142 );
1143 assert_eq!(
1144 target.pages.len(),
1145 1,
1146 "only the first page was written before the crash fired on page_idx=1"
1147 );
1148 }
1149}