1use std::collections::HashMap;
12use std::sync::Arc;
13
14use fsqlite_error::{FrankenError, Result};
15use fsqlite_pager::traits::{
16 PreparedWalChecksumSeed, PreparedWalChecksumTransform, PreparedWalFinalizationState,
17 PreparedWalFrameBatch, PreparedWalFrameMeta, WalFrameRef,
18};
19use fsqlite_pager::{CheckpointMode, CheckpointPageWriter, CheckpointResult, WalBackend};
20use fsqlite_types::PageNumber;
21use fsqlite_types::cx::Cx;
22use fsqlite_types::flags::SyncFlags;
23use fsqlite_vfs::VfsFile;
24use fsqlite_wal::checksum::{SqliteWalChecksum, WAL_FRAME_HEADER_SIZE, WalChecksumTransform};
25use fsqlite_wal::wal::WalAppendFrameRef;
26use fsqlite_wal::{
27 CheckpointMode as WalCheckpointMode, CheckpointState, CheckpointTarget, WalFile,
28 WalGenerationIdentity, execute_checkpoint,
29};
30use tracing::debug;
31#[cfg(not(target_arch = "wasm32"))]
32use tracing::warn;
33
34#[cfg(not(target_arch = "wasm32"))]
35use crate::wal_fec_adapter::{FecCommitHook, FecCommitResult};
36
37const PAGE_INDEX_MAX_ENTRIES: usize = usize::MAX;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60enum WalPageLookupResolution {
61 AuthoritativeHit { frame_index: usize },
62 AuthoritativeMiss,
63 PartialIndexFallbackHit { frame_index: usize },
64 PartialIndexFallbackMiss,
65}
66
67impl WalPageLookupResolution {
68 #[must_use]
69 const fn frame_index(self) -> Option<usize> {
70 match self {
71 Self::AuthoritativeHit { frame_index }
72 | Self::PartialIndexFallbackHit { frame_index } => Some(frame_index),
73 Self::AuthoritativeMiss | Self::PartialIndexFallbackMiss => None,
74 }
75 }
76
77 #[must_use]
78 const fn lookup_mode(self) -> &'static str {
79 match self {
80 Self::AuthoritativeHit { .. } | Self::AuthoritativeMiss => "authoritative_index",
81 Self::PartialIndexFallbackHit { .. } | Self::PartialIndexFallbackMiss => {
82 "partial_index_fallback"
83 }
84 }
85 }
86
87 #[must_use]
88 const fn fallback_reason(self) -> &'static str {
89 match self {
90 Self::AuthoritativeHit { .. } | Self::AuthoritativeMiss => "none",
91 Self::PartialIndexFallbackHit { .. } | Self::PartialIndexFallbackMiss => {
92 "partial_index_cap"
93 }
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
104struct WalPublishedSnapshot {
105 publication_seq: u64,
106 generation: WalGenerationIdentity,
107 last_commit_frame: Option<usize>,
108 commit_count: u64,
109 page_index: Arc<HashMap<u32, usize>>,
110 index_is_partial: bool,
111}
112
113impl WalPublishedSnapshot {
114 #[must_use]
115 fn empty(publication_seq: u64, generation: WalGenerationIdentity) -> Self {
116 Self {
117 publication_seq,
118 generation,
119 last_commit_frame: None,
120 commit_count: 0,
121 page_index: Arc::new(HashMap::new()),
122 index_is_partial: false,
123 }
124 }
125}
126
127#[derive(Debug, Clone, Copy)]
128struct PendingPublicationFrame {
129 page_number: u32,
130 frame_index: usize,
131 is_commit: bool,
132}
133
134pub struct WalBackendAdapter<F: VfsFile> {
135 wal: WalFile<F>,
136 refresh_before_append: bool,
138 published_snapshot: WalPublishedSnapshot,
140 next_publication_seq: u64,
142 read_snapshot: Option<WalPublishedSnapshot>,
144 pending_publication_frames: Vec<PendingPublicationFrame>,
146 #[cfg(not(target_arch = "wasm32"))]
148 fec_hook: Option<FecCommitHook>,
149 #[cfg(not(target_arch = "wasm32"))]
151 fec_pending: Vec<FecCommitResult>,
152 page_index_cap: usize,
156}
157
158impl<F: VfsFile> WalBackendAdapter<F> {
159 #[must_use]
161 pub fn new(wal: WalFile<F>) -> Self {
162 let generation = wal.generation_identity();
163 Self {
164 wal,
165 refresh_before_append: true,
166 published_snapshot: WalPublishedSnapshot::empty(0, generation),
167 next_publication_seq: 1,
168 read_snapshot: None,
169 pending_publication_frames: Vec::new(),
170 #[cfg(not(target_arch = "wasm32"))]
171 fec_hook: None,
172 #[cfg(not(target_arch = "wasm32"))]
173 fec_pending: Vec::new(),
174 page_index_cap: PAGE_INDEX_MAX_ENTRIES,
175 }
176 }
177
178 #[must_use]
180 #[cfg(not(target_arch = "wasm32"))]
181 pub fn with_fec_hook(wal: WalFile<F>, hook: FecCommitHook) -> Self {
182 let generation = wal.generation_identity();
183 Self {
184 wal,
185 refresh_before_append: true,
186 published_snapshot: WalPublishedSnapshot::empty(0, generation),
187 next_publication_seq: 1,
188 read_snapshot: None,
189 pending_publication_frames: Vec::new(),
190 fec_hook: Some(hook),
191 fec_pending: Vec::new(),
192 page_index_cap: PAGE_INDEX_MAX_ENTRIES,
193 }
194 }
195
196 #[must_use]
198 pub fn into_inner(self) -> WalFile<F> {
199 self.wal
200 }
201
202 #[must_use]
204 pub fn inner(&self) -> &WalFile<F> {
205 &self.wal
206 }
207
208 pub fn inner_mut(&mut self) -> &mut WalFile<F> {
212 self.invalidate_publication();
213 &mut self.wal
214 }
215
216 fn invalidate_publication(&mut self) {
218 self.read_snapshot = None;
219 self.pending_publication_frames.clear();
220 self.published_snapshot = WalPublishedSnapshot::empty(
221 self.published_snapshot.publication_seq,
222 self.published_snapshot.generation,
223 );
224 }
225
226 fn publish_visible_snapshot(
232 &mut self,
233 cx: &Cx,
234 last_commit_frame: Option<usize>,
235 scenario_id: &'static str,
236 ) -> Result<()> {
237 let generation = self.wal.generation_identity();
238 if self.published_snapshot.generation == generation
239 && self.published_snapshot.last_commit_frame == last_commit_frame
240 {
241 return Ok(());
242 }
243
244 let previous_generation = self.published_snapshot.generation;
245 let previous_last_commit = self.published_snapshot.last_commit_frame;
246 let previous_commit_count = if previous_generation == generation {
247 self.published_snapshot.commit_count
248 } else {
249 0
250 };
251 let mut page_index = if previous_generation == generation {
252 std::mem::replace(
253 &mut self.published_snapshot.page_index,
254 Arc::new(HashMap::new()),
255 )
256 } else {
257 Arc::new(HashMap::new())
258 };
259 let mut index_is_partial = if previous_generation == generation {
260 self.published_snapshot.index_is_partial
261 } else {
262 false
263 };
264
265 let frame_delta_count = match (previous_last_commit, last_commit_frame) {
266 (Some(prev), Some(curr)) if curr >= prev => curr.saturating_sub(prev),
267 (Some(_) | None, Some(curr)) => curr.saturating_add(1),
268 (Some(prev), None) => prev.saturating_add(1),
269 (None, None) => 0,
270 };
271
272 let update_result = match last_commit_frame {
273 None => {
274 Arc::make_mut(&mut page_index).clear();
275 index_is_partial = false;
276 Ok(())
277 }
278 Some(current_last_commit) => {
279 let start = match (previous_generation == generation, previous_last_commit) {
280 (true, Some(previous_last_commit))
281 if previous_last_commit < current_last_commit =>
282 {
283 previous_last_commit.saturating_add(1)
284 }
285 (true, Some(previous_last_commit))
286 if previous_last_commit == current_last_commit =>
287 {
288 current_last_commit.saturating_add(1)
289 }
290 _ => {
291 Arc::make_mut(&mut page_index).clear();
292 index_is_partial = false;
293 0
294 }
295 };
296 if start <= current_last_commit {
297 self.build_index_range(
298 cx,
299 Arc::make_mut(&mut page_index),
300 &mut index_is_partial,
301 start,
302 current_last_commit,
303 )
304 } else {
305 Ok(())
306 }
307 }
308 };
309 let commit_count_result = match last_commit_frame {
310 None => Ok(0),
311 Some(current_last_commit) => {
312 match (previous_generation == generation, previous_last_commit) {
313 (true, Some(previous_last_commit))
314 if previous_last_commit < current_last_commit =>
315 {
316 self.count_commit_frames_in_range(
317 cx,
318 previous_last_commit.saturating_add(1),
319 current_last_commit,
320 )
321 .map(|delta| previous_commit_count.saturating_add(delta))
322 }
323 (true, Some(previous_last_commit))
324 if previous_last_commit == current_last_commit =>
325 {
326 Ok(previous_commit_count)
327 }
328 _ => self.count_commit_frames_in_range(cx, 0, current_last_commit),
329 }
330 }
331 };
332 if let Err(error) = update_result {
333 if previous_generation == generation {
334 self.published_snapshot.page_index = page_index;
335 }
336 return Err(error);
337 }
338 let commit_count = match commit_count_result {
339 Ok(commit_count) => commit_count,
340 Err(error) => {
341 if previous_generation == generation {
342 self.published_snapshot.page_index = page_index;
343 }
344 return Err(error);
345 }
346 };
347
348 let publication_seq = self.next_publication_seq;
349 self.next_publication_seq = self.next_publication_seq.saturating_add(1);
350 let latest_frame_entries = page_index.len();
351 self.published_snapshot = WalPublishedSnapshot {
352 publication_seq,
353 generation,
354 last_commit_frame,
355 commit_count,
356 page_index,
357 index_is_partial,
358 };
359
360 tracing::trace!(
361 target: "fsqlite.wal_publication",
362 trace_id = cx.trace_id(),
363 run_id = "wal-publication",
364 scenario_id,
365 wal_generation = generation.checkpoint_seq,
366 wal_salt1 = generation.salts.salt1,
367 wal_salt2 = generation.salts.salt2,
368 publication_seq,
369 frame_delta_count,
370 latest_frame_entries,
371 snapshot_age = 0_u64,
372 lookup_mode = "published_visibility_map",
373 fallback_reason = if index_is_partial {
374 "partial_index_cap"
375 } else {
376 "none"
377 },
378 "published WAL visibility snapshot"
379 );
380
381 Ok(())
382 }
383
384 fn resolve_visible_frame(
391 &self,
392 cx: &Cx,
393 snapshot: &WalPublishedSnapshot,
394 page_number: u32,
395 ) -> Result<WalPageLookupResolution> {
396 match snapshot.page_index.get(&page_number) {
397 Some(&frame_index) => Ok(WalPageLookupResolution::AuthoritativeHit { frame_index }),
398 None if !snapshot.index_is_partial => Ok(WalPageLookupResolution::AuthoritativeMiss),
399 None => match snapshot.last_commit_frame {
400 Some(last_commit_frame) => {
401 match self.scan_backwards_for_page(cx, page_number, last_commit_frame)? {
402 Some(frame_index) => {
403 Ok(WalPageLookupResolution::PartialIndexFallbackHit { frame_index })
404 }
405 None => Ok(WalPageLookupResolution::PartialIndexFallbackMiss),
406 }
407 }
408 None => Ok(WalPageLookupResolution::AuthoritativeMiss),
409 },
410 }
411 }
412
413 fn build_index_range(
418 &self,
419 cx: &Cx,
420 page_index: &mut HashMap<u32, usize>,
421 index_is_partial: &mut bool,
422 start: usize,
423 end: usize,
424 ) -> Result<()> {
425 for frame_index in start..=end {
426 let header = self.wal.read_frame_header(cx, frame_index)?;
427 if page_index.len() < self.page_index_cap
430 || page_index.contains_key(&header.page_number)
431 {
432 page_index.insert(header.page_number, frame_index);
433 } else {
434 *index_is_partial = true;
438 }
439 }
440 Ok(())
441 }
442
443 fn count_commit_frames_in_range(&self, cx: &Cx, start: usize, end: usize) -> Result<u64> {
445 if start > end {
446 return Ok(0);
447 }
448
449 let mut commit_count = 0_u64;
450 for frame_index in start..=end {
451 if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
452 commit_count = commit_count.saturating_add(1);
453 }
454 }
455 Ok(commit_count)
456 }
457
458 fn scan_backwards_for_page(
465 &self,
466 cx: &Cx,
467 page_number: u32,
468 last_commit_frame: usize,
469 ) -> Result<Option<usize>> {
470 for frame_index in (0..=last_commit_frame).rev() {
471 let header = self.wal.read_frame_header(cx, frame_index)?;
472 if header.page_number == page_number {
473 return Ok(Some(frame_index));
474 }
475 }
476 Ok(None)
477 }
478
479 #[cfg(not(target_arch = "wasm32"))]
481 pub fn take_fec_pending(&mut self) -> Vec<FecCommitResult> {
482 std::mem::take(&mut self.fec_pending)
483 }
484
485 #[must_use]
487 #[cfg(not(target_arch = "wasm32"))]
488 pub fn fec_enabled(&self) -> bool {
489 self.fec_hook
490 .as_ref()
491 .is_some_and(FecCommitHook::is_enabled)
492 }
493
494 #[cfg(not(target_arch = "wasm32"))]
496 pub fn fec_discard(&mut self) {
497 if let Some(hook) = &mut self.fec_hook {
498 hook.discard_buffered();
499 }
500 }
501
502 #[cfg(test)]
504 fn set_page_index_cap(&mut self, cap: usize) {
505 self.page_index_cap = cap;
506 self.invalidate_publication();
508 }
509
510 #[must_use]
511 fn current_prepared_finalization_state(&self) -> PreparedWalFinalizationState {
512 let generation = self.wal.generation_identity();
513 let seed = self.wal.running_checksum();
514 PreparedWalFinalizationState {
515 checkpoint_seq: generation.checkpoint_seq,
516 salt1: generation.salts.salt1,
517 salt2: generation.salts.salt2,
518 start_frame_index: self.wal.frame_count(),
519 seed: PreparedWalChecksumSeed {
520 s1: seed.s1,
521 s2: seed.s2,
522 },
523 }
524 }
525
526 #[must_use]
527 fn prepared_batch_matches_current_state(&self, prepared: &PreparedWalFrameBatch) -> bool {
528 prepared
529 .finalized_for
530 .is_some_and(|state| state == self.current_prepared_finalization_state())
531 }
532
533 fn prepared_batch_matches_disk_state(
534 &self,
535 cx: &Cx,
536 prepared: &PreparedWalFrameBatch,
537 ) -> Result<bool> {
538 let Some(state) = prepared.finalized_for else {
539 return Ok(false);
540 };
541 let generation = WalGenerationIdentity {
542 checkpoint_seq: state.checkpoint_seq,
543 salts: fsqlite_wal::checksum::WalSalts {
544 salt1: state.salt1,
545 salt2: state.salt2,
546 },
547 };
548 self.wal
549 .prepared_append_window_still_current(cx, generation, state.start_frame_index)
550 }
551
552 fn checksum_transforms_for_prepared(
553 prepared: &PreparedWalFrameBatch,
554 ) -> Vec<WalChecksumTransform> {
555 prepared
556 .checksum_transforms
557 .iter()
558 .map(|transform| WalChecksumTransform {
559 a11: transform.a11,
560 a12: transform.a12,
561 a21: transform.a21,
562 a22: transform.a22,
563 c1: transform.c1,
564 c2: transform.c2,
565 })
566 .collect()
567 }
568
569 fn finalize_prepared_batch_against_current_state(
570 &self,
571 prepared: &mut PreparedWalFrameBatch,
572 ) -> Result<()> {
573 let checksum_transforms = Self::checksum_transforms_for_prepared(prepared);
574 let final_running_checksum = self
575 .wal
576 .finalize_prepared_frame_bytes(&mut prepared.frame_bytes, &checksum_transforms)?;
577 prepared.finalized_for = Some(self.current_prepared_finalization_state());
578 prepared.finalized_running_checksum = Some(PreparedWalChecksumSeed {
579 s1: final_running_checksum.s1,
580 s2: final_running_checksum.s2,
581 });
582 Ok(())
583 }
584
585 fn finalized_running_checksum(prepared: &PreparedWalFrameBatch) -> Result<SqliteWalChecksum> {
586 let Some(checksum) = prepared.finalized_running_checksum else {
587 return Err(FrankenError::internal(
588 "prepared WAL batch missing finalized running checksum",
589 ));
590 };
591 Ok(SqliteWalChecksum {
592 s1: checksum.s1,
593 s2: checksum.s2,
594 })
595 }
596
597 fn publish_latest_committed_snapshot(
598 &mut self,
599 cx: &Cx,
600 scenario_id: &'static str,
601 ) -> Result<()> {
602 let last_commit_frame = self.wal.last_commit_frame(cx)?;
603 self.publish_visible_snapshot(cx, last_commit_frame, scenario_id)
604 }
605
606 fn synchronize_publication_before_append(
607 &mut self,
608 cx: &Cx,
609 scenario_id: &'static str,
610 ) -> Result<()> {
611 self.wal.refresh(cx)?;
612 self.pending_publication_frames.clear();
613 self.publish_latest_committed_snapshot(cx, scenario_id)
614 }
615
616 fn record_appended_frames<I>(&mut self, start_frame_index: usize, frames: I) -> Option<usize>
617 where
618 I: IntoIterator<Item = (u32, u32)>,
619 {
620 let mut last_commit_frame = None;
621 for (offset, (page_number, db_size_if_commit)) in frames.into_iter().enumerate() {
622 let frame_index = start_frame_index.saturating_add(offset);
623 self.pending_publication_frames
624 .push(PendingPublicationFrame {
625 page_number,
626 frame_index,
627 is_commit: db_size_if_commit != 0,
628 });
629 if db_size_if_commit != 0 {
630 last_commit_frame = Some(frame_index);
631 }
632 }
633 last_commit_frame
634 }
635
636 fn publish_pending_commit_snapshot(
637 &mut self,
638 cx: &Cx,
639 last_commit_frame: usize,
640 scenario_id: &'static str,
641 ) -> Result<()> {
642 let generation = self.wal.generation_identity();
643 let previous_last_commit = self.published_snapshot.last_commit_frame;
644 let can_extend_previous = self.published_snapshot.generation == generation
645 && self
646 .published_snapshot
647 .last_commit_frame
648 .is_none_or(|previous_last_commit| previous_last_commit < last_commit_frame);
649 let mut page_index = if can_extend_previous {
650 std::mem::replace(
651 &mut self.published_snapshot.page_index,
652 Arc::new(HashMap::new()),
653 )
654 } else {
655 Arc::new(HashMap::new())
656 };
657 let mut index_is_partial = if can_extend_previous {
658 self.published_snapshot.index_is_partial
659 } else {
660 false
661 };
662 let previous_last_commit = if can_extend_previous {
663 previous_last_commit
664 } else {
665 None
666 };
667 let previous_commit_count = if can_extend_previous {
668 self.published_snapshot.commit_count
669 } else {
670 0
671 };
672
673 let mut frame_delta_count = 0_usize;
674 let mut commit_delta_count = 0_u64;
675 for frame in &self.pending_publication_frames {
676 if previous_last_commit
677 .is_some_and(|previous_last_commit| frame.frame_index <= previous_last_commit)
678 || frame.frame_index > last_commit_frame
679 {
680 continue;
681 }
682
683 frame_delta_count = frame_delta_count.saturating_add(1);
684 let page_index_map = Arc::make_mut(&mut page_index);
685 if page_index_map.len() < self.page_index_cap
686 || page_index_map.contains_key(&frame.page_number)
687 {
688 page_index_map.insert(frame.page_number, frame.frame_index);
689 } else {
690 index_is_partial = true;
691 }
692 if frame.is_commit {
693 commit_delta_count = commit_delta_count.saturating_add(1);
694 }
695 }
696
697 if frame_delta_count == 0 {
698 self.pending_publication_frames.clear();
699 return self.publish_visible_snapshot(cx, Some(last_commit_frame), scenario_id);
700 }
701
702 let publication_seq = self.next_publication_seq;
703 self.next_publication_seq = self.next_publication_seq.saturating_add(1);
704 let latest_frame_entries = page_index.len();
705 self.published_snapshot = WalPublishedSnapshot {
706 publication_seq,
707 generation,
708 last_commit_frame: Some(last_commit_frame),
709 commit_count: previous_commit_count.saturating_add(commit_delta_count),
710 page_index,
711 index_is_partial,
712 };
713 self.pending_publication_frames.clear();
714
715 tracing::trace!(
716 target: "fsqlite.wal_publication",
717 trace_id = cx.trace_id(),
718 run_id = "wal-publication",
719 scenario_id,
720 wal_generation = generation.checkpoint_seq,
721 wal_salt1 = generation.salts.salt1,
722 wal_salt2 = generation.salts.salt2,
723 publication_seq,
724 frame_delta_count,
725 latest_frame_entries,
726 snapshot_age = 0_u64,
727 lookup_mode = "published_visibility_map",
728 fallback_reason = if index_is_partial {
729 "partial_index_cap"
730 } else {
731 "none"
732 },
733 "published WAL visibility snapshot from commit path"
734 );
735
736 Ok(())
737 }
738}
739
740fn to_wal_mode(mode: CheckpointMode) -> WalCheckpointMode {
742 match mode {
743 CheckpointMode::Passive => WalCheckpointMode::Passive,
744 CheckpointMode::Full => WalCheckpointMode::Full,
745 CheckpointMode::Restart => WalCheckpointMode::Restart,
746 CheckpointMode::Truncate => WalCheckpointMode::Truncate,
747 }
748}
749
750impl<F: VfsFile> WalBackend for WalBackendAdapter<F> {
751 fn begin_transaction(&mut self, cx: &Cx) -> Result<()> {
752 self.wal.refresh(cx)?;
755 self.publish_latest_committed_snapshot(cx, "begin_transaction")?;
756 self.read_snapshot = Some(self.published_snapshot.clone());
757 self.refresh_before_append = true;
758 Ok(())
759 }
760
761 fn append_frame(
762 &mut self,
763 cx: &Cx,
764 page_number: u32,
765 page_data: &[u8],
766 db_size_if_commit: u32,
767 ) -> Result<()> {
768 if self.refresh_before_append {
769 self.synchronize_publication_before_append(cx, "append_frame_pre_refresh")?;
773 }
774 let start_frame_index = self.wal.frame_count();
775 self.wal
776 .append_frame(cx, page_number, page_data, db_size_if_commit)?;
777 self.refresh_before_append = false;
778 let last_commit_frame =
779 self.record_appended_frames(start_frame_index, [(page_number, db_size_if_commit)]);
780
781 #[cfg(not(target_arch = "wasm32"))]
784 if let Some(hook) = &mut self.fec_hook {
785 match hook.on_frame(cx, page_number, page_data, db_size_if_commit) {
786 Ok(Some(result)) => {
787 debug!(
788 pages = result.page_numbers.len(),
789 k_source = result.k_source,
790 symbols = result.symbols.len(),
791 "FEC commit group encoded"
792 );
793 self.fec_pending.push(result);
794 }
795 Ok(None) => {}
796 Err(e) => {
797 warn!(error = %e, "FEC encoding failed; commit proceeds without repair symbols");
799 }
800 }
801 }
802
803 if let Some(last_commit_frame) = last_commit_frame {
804 self.publish_pending_commit_snapshot(cx, last_commit_frame, "append_frame_commit")?;
805 }
806
807 Ok(())
808 }
809
810 fn append_frames(&mut self, cx: &Cx, frames: &[WalFrameRef<'_>]) -> Result<()> {
811 if frames.is_empty() {
812 return Ok(());
813 }
814
815 if self.refresh_before_append {
816 self.synchronize_publication_before_append(cx, "append_frames_pre_refresh")?;
817 }
818
819 let start_frame_index = self.wal.frame_count();
820 let mut wal_frames = Vec::with_capacity(frames.len());
821 for frame in frames {
822 wal_frames.push(WalAppendFrameRef {
823 page_number: frame.page_number,
824 page_data: frame.page_data,
825 db_size_if_commit: frame.db_size_if_commit,
826 });
827 }
828 self.wal.append_frames(cx, &wal_frames)?;
829 self.refresh_before_append = false;
830 let last_commit_frame = self.record_appended_frames(
831 start_frame_index,
832 frames
833 .iter()
834 .map(|frame| (frame.page_number, frame.db_size_if_commit)),
835 );
836
837 #[cfg(not(target_arch = "wasm32"))]
838 if let Some(hook) = &mut self.fec_hook {
839 for frame in frames {
840 match hook.on_frame(
841 cx,
842 frame.page_number,
843 frame.page_data,
844 frame.db_size_if_commit,
845 ) {
846 Ok(Some(result)) => {
847 debug!(
848 pages = result.page_numbers.len(),
849 k_source = result.k_source,
850 symbols = result.symbols.len(),
851 "FEC commit group encoded"
852 );
853 self.fec_pending.push(result);
854 }
855 Ok(None) => {}
856 Err(e) => {
857 warn!(
858 error = %e,
859 "FEC encoding failed; commit proceeds without repair symbols"
860 );
861 }
862 }
863 }
864 }
865
866 if let Some(last_commit_frame) = last_commit_frame {
867 self.publish_pending_commit_snapshot(cx, last_commit_frame, "append_frames_commit")?;
868 }
869
870 Ok(())
871 }
872
873 fn prepare_append_frames(
874 &mut self,
875 frames: &[WalFrameRef<'_>],
876 ) -> Result<Option<PreparedWalFrameBatch>> {
877 if frames.is_empty() {
878 return Ok(None);
879 }
880
881 let wal_frames: Vec<_> = frames
882 .iter()
883 .map(|frame| WalAppendFrameRef {
884 page_number: frame.page_number,
885 page_data: frame.page_data,
886 db_size_if_commit: frame.db_size_if_commit,
887 })
888 .collect();
889 let frame_bytes = self.wal.prepare_frame_bytes(&wal_frames)?;
890 let checksum_transforms = frame_bytes
891 .chunks_exact(self.wal.frame_size())
892 .map(|frame| {
893 WalChecksumTransform::for_wal_frame(
894 frame,
895 self.wal.page_size(),
896 self.wal.big_endian_checksum(),
897 )
898 })
899 .map(|result| {
900 result.map(|transform| PreparedWalChecksumTransform {
901 a11: transform.a11,
902 a12: transform.a12,
903 a21: transform.a21,
904 a22: transform.a22,
905 c1: transform.c1,
906 c2: transform.c2,
907 })
908 })
909 .collect::<Result<Vec<_>>>()?;
910 let frame_metas = frames
911 .iter()
912 .map(|frame| PreparedWalFrameMeta {
913 page_number: frame.page_number,
914 db_size_if_commit: frame.db_size_if_commit,
915 })
916 .collect();
917 let last_commit_frame_offset = frames
918 .iter()
919 .enumerate()
920 .rev()
921 .find_map(|(offset, frame)| (frame.db_size_if_commit != 0).then_some(offset));
922
923 Ok(Some(PreparedWalFrameBatch {
924 frame_size: self.wal.frame_size(),
925 page_data_offset: WAL_FRAME_HEADER_SIZE,
926 frame_metas,
927 checksum_transforms,
928 frame_bytes,
929 last_commit_frame_offset,
930 finalized_for: None,
931 finalized_running_checksum: None,
932 }))
933 }
934
935 fn finalize_prepared_frames(
936 &mut self,
937 _cx: &Cx,
938 prepared: &mut PreparedWalFrameBatch,
939 ) -> Result<()> {
940 if prepared.frame_count() == 0 {
941 return Ok(());
942 }
943 self.finalize_prepared_batch_against_current_state(prepared)
947 }
948
949 fn append_prepared_frames(
950 &mut self,
951 cx: &Cx,
952 prepared: &mut PreparedWalFrameBatch,
953 ) -> Result<()> {
954 if prepared.frame_count() == 0 {
955 return Ok(());
956 }
957
958 let can_reuse_prelock_finalize = self.refresh_before_append
959 && self.prepared_batch_matches_current_state(prepared)
960 && self.prepared_batch_matches_disk_state(cx, prepared)?;
961 if self.refresh_before_append && !can_reuse_prelock_finalize {
962 self.synchronize_publication_before_append(cx, "append_prepared_pre_refresh")?;
963 }
964
965 if !self.prepared_batch_matches_current_state(prepared) {
966 self.finalize_prepared_batch_against_current_state(prepared)?;
967 }
968
969 let start_frame_index = self.wal.frame_count();
970 self.wal.append_finalized_prepared_frame_bytes(
971 cx,
972 &prepared.frame_bytes,
973 prepared.frame_count(),
974 Self::finalized_running_checksum(prepared)?,
975 prepared.last_commit_frame_offset,
976 )?;
977 self.refresh_before_append = false;
978 let last_commit_frame = self.record_appended_frames(
979 start_frame_index,
980 prepared
981 .frame_metas
982 .iter()
983 .map(|frame| (frame.page_number, frame.db_size_if_commit)),
984 );
985
986 #[cfg(not(target_arch = "wasm32"))]
987 if let Some(hook) = &mut self.fec_hook {
988 for (index, frame) in prepared.frame_metas.iter().enumerate() {
989 match hook.on_frame(
990 cx,
991 frame.page_number,
992 prepared.page_data(index),
993 frame.db_size_if_commit,
994 ) {
995 Ok(Some(result)) => {
996 debug!(
997 pages = result.page_numbers.len(),
998 k_source = result.k_source,
999 symbols = result.symbols.len(),
1000 "FEC commit group encoded"
1001 );
1002 self.fec_pending.push(result);
1003 }
1004 Ok(None) => {}
1005 Err(e) => {
1006 warn!(
1007 error = %e,
1008 "FEC encoding failed; commit proceeds without repair symbols"
1009 );
1010 }
1011 }
1012 }
1013 }
1014
1015 if let Some(last_commit_frame) = last_commit_frame {
1016 self.publish_pending_commit_snapshot(
1017 cx,
1018 last_commit_frame,
1019 "append_prepared_frames_commit",
1020 )?;
1021 }
1022
1023 Ok(())
1024 }
1025
1026 fn read_page(&mut self, cx: &Cx, page_number: u32) -> Result<Option<Vec<u8>>> {
1027 let snapshot = if let Some(snapshot) = self.read_snapshot.clone() {
1028 snapshot
1029 } else {
1030 self.publish_latest_committed_snapshot(cx, "read_page_unpinned")?;
1031 self.published_snapshot.clone()
1032 };
1033 if snapshot.last_commit_frame.is_none() {
1034 return Ok(None);
1035 }
1036 let snapshot_age = self
1037 .published_snapshot
1038 .publication_seq
1039 .saturating_sub(snapshot.publication_seq);
1040
1041 let resolution = self.resolve_visible_frame(cx, &snapshot, page_number)?;
1042 let Some(frame_index) = resolution.frame_index() else {
1043 debug!(
1044 page_number,
1045 wal_checkpoint_seq = snapshot.generation.checkpoint_seq,
1046 wal_salt1 = snapshot.generation.salts.salt1,
1047 wal_salt2 = snapshot.generation.salts.salt2,
1048 publication_seq = snapshot.publication_seq,
1049 snapshot_age,
1050 lookup_mode = resolution.lookup_mode(),
1051 fallback_reason = resolution.fallback_reason(),
1052 "WAL adapter: page absent from current generation"
1053 );
1054 return Ok(None);
1055 };
1056
1057 let mut frame_buf = vec![0u8; self.wal.frame_size()];
1059 let header = self.wal.read_frame_into(cx, frame_index, &mut frame_buf)?;
1060
1061 if header.page_number != page_number {
1064 return Err(FrankenError::WalCorrupt {
1065 detail: format!(
1066 "WAL page index integrity failure: expected page {page_number} \
1067 at frame {frame_index}, found page {}",
1068 header.page_number
1069 ),
1070 });
1071 }
1072
1073 let data = frame_buf[fsqlite_wal::checksum::WAL_FRAME_HEADER_SIZE..].to_vec();
1074 debug!(
1075 page_number,
1076 frame_index,
1077 wal_checkpoint_seq = snapshot.generation.checkpoint_seq,
1078 wal_salt1 = snapshot.generation.salts.salt1,
1079 wal_salt2 = snapshot.generation.salts.salt2,
1080 publication_seq = snapshot.publication_seq,
1081 snapshot_age,
1082 lookup_mode = resolution.lookup_mode(),
1083 fallback_reason = resolution.fallback_reason(),
1084 "WAL adapter: resolved page from current WAL generation"
1085 );
1086 Ok(Some(data))
1087 }
1088
1089 fn committed_txns_since_page(&mut self, cx: &Cx, page_number: u32) -> Result<u64> {
1090 let snapshot = if let Some(snapshot) = self.read_snapshot.clone() {
1091 snapshot
1092 } else {
1093 self.publish_latest_committed_snapshot(cx, "committed_txns_since_page")?;
1094 self.published_snapshot.clone()
1095 };
1096 let Some(last_commit_frame) = snapshot.last_commit_frame else {
1097 return Ok(0);
1098 };
1099
1100 let resolution = self.resolve_visible_frame(cx, &snapshot, page_number)?;
1101 let Some(last_page_frame) = resolution.frame_index() else {
1102 let mut total_commits = 0_u64;
1103 for frame_index in 0..=last_commit_frame {
1104 if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
1105 total_commits = total_commits.saturating_add(1);
1106 }
1107 }
1108 return Ok(total_commits);
1109 };
1110
1111 let mut page_commit_frame = None;
1112 for frame_index in last_page_frame..=last_commit_frame {
1113 if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
1114 page_commit_frame = Some(frame_index);
1115 break;
1116 }
1117 }
1118
1119 let Some(page_commit_frame) = page_commit_frame else {
1120 return Ok(0);
1121 };
1122
1123 let mut committed_txns_after_page = 0_u64;
1124 for frame_index in page_commit_frame.saturating_add(1)..=last_commit_frame {
1125 if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
1126 committed_txns_after_page = committed_txns_after_page.saturating_add(1);
1127 }
1128 }
1129
1130 Ok(committed_txns_after_page)
1131 }
1132
1133 fn committed_txn_count(&mut self, cx: &Cx) -> Result<u64> {
1134 let snapshot = if let Some(snapshot) = self.read_snapshot.clone() {
1135 snapshot
1136 } else {
1137 self.publish_latest_committed_snapshot(cx, "committed_txn_count")?;
1138 self.published_snapshot.clone()
1139 };
1140 Ok(snapshot.commit_count)
1141 }
1142
1143 fn sync(&mut self, cx: &Cx) -> Result<()> {
1144 let result = self.wal.sync(cx, SyncFlags::NORMAL);
1145 self.refresh_before_append = true;
1146 result
1147 }
1148
1149 fn frame_count(&self) -> usize {
1150 self.wal.frame_count()
1151 }
1152
1153 fn checkpoint(
1154 &mut self,
1155 cx: &Cx,
1156 mode: CheckpointMode,
1157 writer: &mut dyn CheckpointPageWriter,
1158 backfilled_frames: u32,
1159 oldest_reader_frame: Option<u32>,
1160 ) -> Result<CheckpointResult> {
1161 self.wal.refresh(cx)?;
1163 self.refresh_before_append = true;
1164 let total_frames = u32::try_from(self.wal.frame_count()).unwrap_or(u32::MAX);
1165
1166 let state = CheckpointState {
1168 total_frames,
1169 backfilled_frames,
1170 oldest_reader_frame,
1171 };
1172
1173 let mut target = CheckpointTargetAdapterRef { writer };
1175
1176 let result = execute_checkpoint(cx, &mut self.wal, to_wal_mode(mode), state, &mut target)?;
1178
1179 #[cfg(not(target_arch = "wasm32"))]
1183 if result.frames_backfilled > 0 {
1184 let drained = self.fec_pending.len();
1185 self.fec_pending.clear();
1186 if drained > 0 {
1187 debug!(
1188 drained_groups = drained,
1189 frames_backfilled = result.frames_backfilled,
1190 "FEC symbols reclaimed after checkpoint"
1191 );
1192 }
1193 }
1194
1195 #[cfg(not(target_arch = "wasm32"))]
1198 if result.wal_was_reset {
1199 self.fec_discard();
1200 }
1201 if result.wal_was_reset {
1202 self.invalidate_publication();
1203 }
1204
1205 self.publish_latest_committed_snapshot(cx, "checkpoint")?;
1206
1207 Ok(CheckpointResult {
1208 total_frames,
1209 frames_backfilled: result.frames_backfilled,
1210 completed: result.plan.completes_checkpoint(),
1211 wal_was_reset: result.wal_was_reset,
1212 })
1213 }
1214}
1215
1216struct CheckpointTargetAdapterRef<'a> {
1221 writer: &'a mut dyn CheckpointPageWriter,
1222}
1223
1224impl CheckpointTarget for CheckpointTargetAdapterRef<'_> {
1225 fn write_page(&mut self, cx: &Cx, page_no: PageNumber, data: &[u8]) -> Result<()> {
1226 self.writer.write_page(cx, page_no, data)
1227 }
1228
1229 fn truncate_db(&mut self, cx: &Cx, n_pages: u32) -> Result<()> {
1230 self.writer.truncate(cx, n_pages)
1231 }
1232
1233 fn sync_db(&mut self, cx: &Cx) -> Result<()> {
1234 self.writer.sync(cx)
1235 }
1236}
1237
1238#[cfg(test)]
1243mod tests {
1244 use std::sync::OnceLock;
1245
1246 use fsqlite_pager::MockCheckpointPageWriter;
1247 use fsqlite_pager::traits::WalFrameRef;
1248 use fsqlite_types::flags::VfsOpenFlags;
1249 use fsqlite_vfs::MemoryVfs;
1250 use fsqlite_vfs::traits::Vfs;
1251 use fsqlite_wal::checksum::WalSalts;
1252
1253 use super::*;
1254
1255 const PAGE_SIZE: u32 = 4096;
1256
1257 fn init_wal_publication_test_tracing() {
1258 static TRACING_INIT: OnceLock<()> = OnceLock::new();
1259 TRACING_INIT.get_or_init(|| {
1260 if tracing_subscriber::fmt()
1261 .with_ansi(false)
1262 .with_max_level(tracing::Level::TRACE)
1263 .with_test_writer()
1264 .try_init()
1265 .is_err()
1266 {
1267 }
1269 });
1270 }
1271
1272 fn test_cx() -> Cx {
1273 Cx::default()
1274 }
1275
1276 fn test_salts() -> WalSalts {
1277 WalSalts {
1278 salt1: 0xDEAD_BEEF,
1279 salt2: 0xCAFE_BABE,
1280 }
1281 }
1282
1283 fn sample_page(seed: u8) -> Vec<u8> {
1284 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
1285 let mut page = vec![0u8; page_size];
1286 for (i, byte) in page.iter_mut().enumerate() {
1287 let reduced = u8::try_from(i % 251).expect("modulo fits u8");
1288 *byte = reduced ^ seed;
1289 }
1290 page
1291 }
1292
1293 fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
1294 let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
1295 let (file, _) = vfs
1296 .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
1297 .expect("open WAL file");
1298 file
1299 }
1300
1301 fn make_adapter(vfs: &MemoryVfs, cx: &Cx) -> WalBackendAdapter<<MemoryVfs as Vfs>::File> {
1302 let file = open_wal_file(vfs, cx);
1303 let wal = WalFile::create(cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1304 WalBackendAdapter::new(wal)
1305 }
1306
1307 #[test]
1310 fn test_adapter_append_and_frame_count() {
1311 let cx = test_cx();
1312 let vfs = MemoryVfs::new();
1313 let mut adapter = make_adapter(&vfs, &cx);
1314
1315 assert_eq!(adapter.frame_count(), 0);
1316
1317 let page = sample_page(0x42);
1318 adapter
1319 .append_frame(&cx, 1, &page, 0)
1320 .expect("append frame");
1321 assert_eq!(adapter.frame_count(), 1);
1322
1323 adapter
1324 .append_frame(&cx, 2, &sample_page(0x43), 2)
1325 .expect("append commit frame");
1326 assert_eq!(adapter.frame_count(), 2);
1327 }
1328
1329 #[test]
1330 fn test_adapter_read_page_found() {
1331 let cx = test_cx();
1332 let vfs = MemoryVfs::new();
1333 let mut adapter = make_adapter(&vfs, &cx);
1334
1335 let page1 = sample_page(0x10);
1336 let page2 = sample_page(0x20);
1337 adapter.append_frame(&cx, 1, &page1, 0).expect("append");
1338 adapter
1339 .append_frame(&cx, 2, &page2, 2)
1340 .expect("append commit");
1341
1342 let result = adapter.read_page(&cx, 1).expect("read page 1");
1343 assert_eq!(result, Some(page1));
1344
1345 let result = adapter.read_page(&cx, 2).expect("read page 2");
1346 assert_eq!(result, Some(page2));
1347 }
1348
1349 #[test]
1350 fn test_adapter_read_page_not_found() {
1351 let cx = test_cx();
1352 let vfs = MemoryVfs::new();
1353 let mut adapter = make_adapter(&vfs, &cx);
1354
1355 adapter
1356 .append_frame(&cx, 1, &sample_page(0x10), 1)
1357 .expect("append");
1358
1359 let result = adapter.read_page(&cx, 99).expect("read missing page");
1360 assert_eq!(result, None);
1361 }
1362
1363 #[test]
1364 fn test_adapter_read_page_returns_latest_version() {
1365 let cx = test_cx();
1366 let vfs = MemoryVfs::new();
1367 let mut adapter = make_adapter(&vfs, &cx);
1368
1369 let old_data = sample_page(0xAA);
1370 let new_data = sample_page(0xBB);
1371
1372 adapter
1374 .append_frame(&cx, 5, &old_data, 0)
1375 .expect("append old");
1376 adapter
1377 .append_frame(&cx, 5, &new_data, 1)
1378 .expect("append new (commit)");
1379
1380 let result = adapter.read_page(&cx, 5).expect("read page 5");
1381 assert_eq!(
1382 result,
1383 Some(new_data),
1384 "adapter should return the latest WAL version"
1385 );
1386 }
1387
1388 #[test]
1389 fn test_adapter_refreshes_cross_handle_visibility_and_append_position() {
1390 let cx = test_cx();
1391 let vfs = MemoryVfs::new();
1392
1393 let file1 = open_wal_file(&vfs, &cx);
1394 let wal1 = WalFile::create(&cx, file1, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1395 let mut adapter1 = WalBackendAdapter::new(wal1);
1396
1397 let file2 = open_wal_file(&vfs, &cx);
1398 let wal2 = WalFile::open(&cx, file2).expect("open WAL");
1399 let mut adapter2 = WalBackendAdapter::new(wal2);
1400
1401 let page1 = sample_page(0x11);
1402 adapter1
1403 .append_frame(&cx, 1, &page1, 1)
1404 .expect("adapter1 append commit");
1405 adapter1.sync(&cx).expect("adapter1 sync");
1406 adapter2
1407 .begin_transaction(&cx)
1408 .expect("adapter2 begin transaction");
1409 assert_eq!(
1410 adapter2.read_page(&cx, 1).expect("adapter2 read page1"),
1411 Some(page1.clone()),
1412 "adapter2 should observe adapter1 commit at transaction begin"
1413 );
1414
1415 let page2 = sample_page(0x22);
1416 adapter2
1417 .append_frame(&cx, 2, &page2, 2)
1418 .expect("adapter2 append commit");
1419 adapter2.sync(&cx).expect("adapter2 sync");
1420 adapter1
1421 .begin_transaction(&cx)
1422 .expect("adapter1 begin transaction");
1423 assert_eq!(
1424 adapter1.read_page(&cx, 2).expect("adapter1 read page2"),
1425 Some(page2.clone()),
1426 "adapter1 should observe adapter2 commit at transaction begin"
1427 );
1428
1429 assert_eq!(
1431 adapter1.frame_count(),
1432 2,
1433 "shared WAL should contain both commit frames"
1434 );
1435 assert_eq!(
1436 adapter2.frame_count(),
1437 2,
1438 "shared WAL should contain both commit frames"
1439 );
1440 }
1441
1442 #[test]
1443 fn test_adapter_batch_append_checksum_chain_matches_single_append() {
1444 let cx = test_cx();
1445 let vfs_single = MemoryVfs::new();
1446 let vfs_batch = MemoryVfs::new();
1447
1448 let mut adapter_single = make_adapter(&vfs_single, &cx);
1449 let mut adapter_batch = make_adapter(&vfs_batch, &cx);
1450
1451 let pages: Vec<Vec<u8>> = (0..4u8).map(sample_page).collect();
1452 let commit_sizes = [0_u32, 0, 0, 4];
1453
1454 for (index, page) in pages.iter().enumerate() {
1455 adapter_single
1456 .append_frame(
1457 &cx,
1458 u32::try_from(index + 1).expect("page number fits u32"),
1459 page,
1460 commit_sizes[index],
1461 )
1462 .expect("single append");
1463 }
1464
1465 let batch_frames: Vec<_> = pages
1466 .iter()
1467 .enumerate()
1468 .map(|(index, page)| WalFrameRef {
1469 page_number: u32::try_from(index + 1).expect("page number fits u32"),
1470 page_data: page,
1471 db_size_if_commit: commit_sizes[index],
1472 })
1473 .collect();
1474 adapter_batch
1475 .append_frames(&cx, &batch_frames)
1476 .expect("batch append");
1477
1478 assert_eq!(
1479 adapter_single.frame_count(),
1480 adapter_batch.frame_count(),
1481 "batch adapter append must preserve frame count"
1482 );
1483 assert_eq!(
1484 adapter_single.wal.running_checksum(),
1485 adapter_batch.wal.running_checksum(),
1486 "batch adapter append must preserve checksum chain"
1487 );
1488
1489 for frame_index in 0..pages.len() {
1490 let (single_header, single_data) = adapter_single
1491 .wal
1492 .read_frame(&cx, frame_index)
1493 .expect("read single frame");
1494 let (batch_header, batch_data) = adapter_batch
1495 .wal
1496 .read_frame(&cx, frame_index)
1497 .expect("read batch frame");
1498 assert_eq!(
1499 single_header, batch_header,
1500 "frame header {frame_index} must match"
1501 );
1502 assert_eq!(
1503 single_data, batch_data,
1504 "frame payload {frame_index} must match"
1505 );
1506 }
1507 }
1508
1509 #[test]
1510 fn test_adapter_prepared_batch_append_checksum_chain_matches_single_append() {
1511 let cx = test_cx();
1512 let vfs_single = MemoryVfs::new();
1513 let vfs_prepared = MemoryVfs::new();
1514
1515 let mut adapter_single = make_adapter(&vfs_single, &cx);
1516 let mut adapter_prepared = make_adapter(&vfs_prepared, &cx);
1517
1518 let pages: Vec<Vec<u8>> = (0..4u8).map(sample_page).collect();
1519 let commit_sizes = [0_u32, 0, 0, 4];
1520
1521 for (index, page) in pages.iter().enumerate() {
1522 adapter_single
1523 .append_frame(
1524 &cx,
1525 u32::try_from(index + 1).expect("page number fits u32"),
1526 page,
1527 commit_sizes[index],
1528 )
1529 .expect("single append");
1530 }
1531
1532 let batch_frames: Vec<_> = pages
1533 .iter()
1534 .enumerate()
1535 .map(|(index, page)| WalFrameRef {
1536 page_number: u32::try_from(index + 1).expect("page number fits u32"),
1537 page_data: page,
1538 db_size_if_commit: commit_sizes[index],
1539 })
1540 .collect();
1541 let mut prepared = adapter_prepared
1542 .prepare_append_frames(&batch_frames)
1543 .expect("prepare append")
1544 .expect("prepared batch");
1545 adapter_prepared
1546 .append_prepared_frames(&cx, &mut prepared)
1547 .expect("append prepared");
1548
1549 assert_eq!(
1550 adapter_single.frame_count(),
1551 adapter_prepared.frame_count(),
1552 "prepared adapter append must preserve frame count"
1553 );
1554 assert_eq!(
1555 adapter_single.wal.running_checksum(),
1556 adapter_prepared.wal.running_checksum(),
1557 "prepared adapter append must preserve checksum chain"
1558 );
1559
1560 for frame_index in 0..pages.len() {
1561 let (single_header, single_data) = adapter_single
1562 .wal
1563 .read_frame(&cx, frame_index)
1564 .expect("read single frame");
1565 let (prepared_header, prepared_data) = adapter_prepared
1566 .wal
1567 .read_frame(&cx, frame_index)
1568 .expect("read prepared frame");
1569 assert_eq!(
1570 single_header, prepared_header,
1571 "frame header {frame_index} must match"
1572 );
1573 assert_eq!(
1574 single_data, prepared_data,
1575 "frame payload {frame_index} must match"
1576 );
1577 }
1578 }
1579
1580 #[test]
1581 fn test_adapter_pre_finalize_reused_when_append_window_is_stable() {
1582 let cx = test_cx();
1583 let vfs_single = MemoryVfs::new();
1584 let vfs_prepared = MemoryVfs::new();
1585
1586 let mut adapter_single = make_adapter(&vfs_single, &cx);
1587 let mut adapter_prepared = make_adapter(&vfs_prepared, &cx);
1588
1589 let pages: Vec<Vec<u8>> = (0..3u8).map(sample_page).collect();
1590 let commit_sizes = [0_u32, 0, 3];
1591
1592 for (index, page) in pages.iter().enumerate() {
1593 adapter_single
1594 .append_frame(
1595 &cx,
1596 u32::try_from(index + 1).expect("page number fits u32"),
1597 page,
1598 commit_sizes[index],
1599 )
1600 .expect("single append");
1601 }
1602
1603 let batch_frames: Vec<_> = pages
1604 .iter()
1605 .enumerate()
1606 .map(|(index, page)| WalFrameRef {
1607 page_number: u32::try_from(index + 1).expect("page number fits u32"),
1608 page_data: page,
1609 db_size_if_commit: commit_sizes[index],
1610 })
1611 .collect();
1612 let mut prepared = adapter_prepared
1613 .prepare_append_frames(&batch_frames)
1614 .expect("prepare append")
1615 .expect("prepared batch");
1616 adapter_prepared
1617 .finalize_prepared_frames(&cx, &mut prepared)
1618 .expect("pre-finalize prepared batch");
1619 let finalized_for = prepared.finalized_for.expect("finalization state");
1620 let finalized_running_checksum = prepared
1621 .finalized_running_checksum
1622 .expect("finalized checksum");
1623
1624 adapter_prepared
1625 .append_prepared_frames(&cx, &mut prepared)
1626 .expect("append prepared");
1627
1628 assert_eq!(
1629 prepared.finalized_for,
1630 Some(finalized_for),
1631 "stable append window should reuse the pre-lock finalization state"
1632 );
1633 assert_eq!(
1634 prepared.finalized_running_checksum,
1635 Some(finalized_running_checksum),
1636 "stable append window should reuse the pre-lock finalized checksum"
1637 );
1638 assert_eq!(
1639 adapter_single.wal.running_checksum(),
1640 adapter_prepared.wal.running_checksum(),
1641 "stable reuse path must preserve checksum chain"
1642 );
1643 }
1644
1645 #[test]
1646 fn test_adapter_pre_finalize_reseeds_after_intervening_external_append() {
1647 let cx = test_cx();
1648 let baseline_vfs = MemoryVfs::new();
1649 let shared_vfs = MemoryVfs::new();
1650
1651 let mut baseline = make_adapter(&baseline_vfs, &cx);
1652 let mut prepared_writer = make_adapter(&shared_vfs, &cx);
1653 let intruder_file = open_wal_file(&shared_vfs, &cx);
1654 let intruder_wal = WalFile::open(&cx, intruder_file).expect("open shared WAL");
1655 let mut intruder = WalBackendAdapter::new(intruder_wal);
1656
1657 let pages: Vec<Vec<u8>> = (0..3u8).map(sample_page).collect();
1658 let commit_sizes = [0_u32, 0, 3];
1659 let intruder_page = sample_page(0xEE);
1660
1661 baseline
1662 .append_frame(&cx, 99, &intruder_page, 1)
1663 .expect("baseline intruder append");
1664 for (index, page) in pages.iter().enumerate() {
1665 baseline
1666 .append_frame(
1667 &cx,
1668 u32::try_from(index + 1).expect("page number fits u32"),
1669 page,
1670 commit_sizes[index],
1671 )
1672 .expect("baseline append");
1673 }
1674
1675 let batch_frames: Vec<_> = pages
1676 .iter()
1677 .enumerate()
1678 .map(|(index, page)| WalFrameRef {
1679 page_number: u32::try_from(index + 1).expect("page number fits u32"),
1680 page_data: page,
1681 db_size_if_commit: commit_sizes[index],
1682 })
1683 .collect();
1684 let mut prepared = prepared_writer
1685 .prepare_append_frames(&batch_frames)
1686 .expect("prepare append")
1687 .expect("prepared batch");
1688 prepared_writer
1689 .finalize_prepared_frames(&cx, &mut prepared)
1690 .expect("pre-finalize prepared batch");
1691 let stale_finalization_state = prepared.finalized_for;
1692
1693 intruder
1694 .append_frame(&cx, 99, &intruder_page, 1)
1695 .expect("intruder append");
1696 intruder.sync(&cx).expect("intruder sync");
1697
1698 prepared_writer
1699 .append_prepared_frames(&cx, &mut prepared)
1700 .expect("append prepared after external growth");
1701
1702 assert_ne!(
1703 prepared.finalized_for, stale_finalization_state,
1704 "intervening external growth should force prepared batch reseeding"
1705 );
1706 assert_eq!(
1707 baseline.wal.running_checksum(),
1708 prepared_writer.wal.running_checksum(),
1709 "reseeding path must preserve checksum chain"
1710 );
1711 assert_eq!(
1712 baseline.frame_count(),
1713 prepared_writer.frame_count(),
1714 "reseeding path must preserve frame count"
1715 );
1716 }
1717
1718 #[test]
1719 fn test_adapter_pins_read_snapshot_until_next_begin() {
1720 init_wal_publication_test_tracing();
1721 let cx = test_cx();
1722 let vfs = MemoryVfs::new();
1723
1724 let file_writer = open_wal_file(&vfs, &cx);
1725 let wal_writer =
1726 WalFile::create(&cx, file_writer, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1727 let mut writer = WalBackendAdapter::new(wal_writer);
1728
1729 let file_reader = open_wal_file(&vfs, &cx);
1730 let wal_reader = WalFile::open(&cx, file_reader).expect("open WAL");
1731 let mut reader = WalBackendAdapter::new(wal_reader);
1732
1733 let v1 = sample_page(0x41);
1734 writer.append_frame(&cx, 3, &v1, 3).expect("append v1");
1735 writer.sync(&cx).expect("sync v1");
1736
1737 reader
1738 .begin_transaction(&cx)
1739 .expect("begin reader snapshot 1");
1740 assert_eq!(
1741 reader.read_page(&cx, 3).expect("reader sees v1"),
1742 Some(v1.clone())
1743 );
1744
1745 let v2 = sample_page(0x42);
1746 writer.append_frame(&cx, 3, &v2, 3).expect("append v2");
1747 writer.sync(&cx).expect("sync v2");
1748
1749 assert_eq!(
1751 reader
1752 .read_page(&cx, 3)
1753 .expect("reader remains on pinned snapshot"),
1754 Some(v1.clone())
1755 );
1756
1757 reader
1759 .begin_transaction(&cx)
1760 .expect("begin reader snapshot 2");
1761 assert_eq!(reader.read_page(&cx, 3).expect("reader sees v2"), Some(v2));
1762 }
1763
1764 #[test]
1765 fn test_adapter_read_page_hides_uncommitted_frames() {
1766 let cx = test_cx();
1767 let vfs = MemoryVfs::new();
1768 let mut adapter = make_adapter(&vfs, &cx);
1769
1770 let committed = sample_page(0x31);
1771 let uncommitted = sample_page(0x32);
1772
1773 adapter
1774 .append_frame(&cx, 7, &committed, 7)
1775 .expect("append committed frame");
1776 adapter
1777 .append_frame(&cx, 7, &uncommitted, 0)
1778 .expect("append uncommitted frame");
1779
1780 let result = adapter.read_page(&cx, 7).expect("read committed page");
1781 assert_eq!(
1782 result,
1783 Some(committed),
1784 "reader must ignore uncommitted tail frames"
1785 );
1786 }
1787
1788 #[test]
1789 fn test_adapter_read_page_none_when_wal_has_no_commit_frame() {
1790 let cx = test_cx();
1791 let vfs = MemoryVfs::new();
1792 let mut adapter = make_adapter(&vfs, &cx);
1793
1794 adapter
1795 .append_frame(&cx, 3, &sample_page(0x44), 0)
1796 .expect("append uncommitted frame");
1797
1798 let result = adapter.read_page(&cx, 3).expect("read page");
1799 assert_eq!(result, None, "uncommitted WAL frames must stay invisible");
1800 }
1801
1802 #[test]
1803 fn test_adapter_read_page_empty_wal() {
1804 let cx = test_cx();
1805 let vfs = MemoryVfs::new();
1806 let mut adapter = make_adapter(&vfs, &cx);
1807
1808 let result = adapter.read_page(&cx, 1).expect("read from empty WAL");
1809 assert_eq!(result, None);
1810 }
1811
1812 #[test]
1813 fn test_adapter_sync() {
1814 let cx = test_cx();
1815 let vfs = MemoryVfs::new();
1816 let mut adapter = make_adapter(&vfs, &cx);
1817
1818 adapter
1819 .append_frame(&cx, 1, &sample_page(0), 1)
1820 .expect("append");
1821 adapter.sync(&cx).expect("sync should not fail");
1822 }
1823
1824 #[test]
1825 fn test_adapter_into_inner_round_trip() {
1826 let cx = test_cx();
1827 let vfs = MemoryVfs::new();
1828 let mut adapter = make_adapter(&vfs, &cx);
1829
1830 adapter
1831 .append_frame(&cx, 1, &sample_page(0), 1)
1832 .expect("append");
1833
1834 assert_eq!(adapter.inner().frame_count(), 1);
1835
1836 let wal = adapter.into_inner();
1837 assert_eq!(wal.frame_count(), 1);
1838 }
1839
1840 #[test]
1841 fn test_adapter_as_dyn_wal_backend() {
1842 let cx = test_cx();
1843 let vfs = MemoryVfs::new();
1844 let mut adapter = make_adapter(&vfs, &cx);
1845
1846 let backend: &mut dyn WalBackend = &mut adapter;
1848 backend
1849 .append_frame(&cx, 1, &sample_page(0x77), 1)
1850 .expect("append via dyn");
1851 assert_eq!(backend.frame_count(), 1);
1852
1853 let page = backend.read_page(&cx, 1).expect("read via dyn");
1854 assert_eq!(page, Some(sample_page(0x77)));
1855 }
1856
1857 #[test]
1860 fn test_page_index_returns_correct_data() {
1861 let cx = test_cx();
1863 let vfs = MemoryVfs::new();
1864 let mut adapter = make_adapter(&vfs, &cx);
1865
1866 let page1 = sample_page(0x01);
1867 let page2 = sample_page(0x02);
1868 let page3 = sample_page(0x03);
1869
1870 adapter.append_frame(&cx, 1, &page1, 0).expect("append");
1871 adapter.append_frame(&cx, 2, &page2, 0).expect("append");
1872 adapter
1873 .append_frame(&cx, 3, &page3, 3)
1874 .expect("append commit");
1875
1876 assert_eq!(adapter.read_page(&cx, 1).expect("read"), Some(page1));
1878 assert_eq!(adapter.read_page(&cx, 2).expect("read"), Some(page2));
1879 assert_eq!(adapter.read_page(&cx, 3).expect("read"), Some(page3));
1880
1881 assert_eq!(adapter.read_page(&cx, 99).expect("read"), None);
1883 }
1884
1885 #[test]
1886 fn test_page_index_returns_latest_version() {
1887 let cx = test_cx();
1889 let vfs = MemoryVfs::new();
1890 let mut adapter = make_adapter(&vfs, &cx);
1891
1892 let old_data = sample_page(0xAA);
1893 let new_data = sample_page(0xBB);
1894
1895 adapter
1896 .append_frame(&cx, 5, &old_data, 0)
1897 .expect("append old");
1898 adapter
1899 .append_frame(&cx, 5, &new_data, 1)
1900 .expect("append new (commit)");
1901
1902 assert_eq!(
1903 adapter.read_page(&cx, 5).expect("read"),
1904 Some(new_data),
1905 "page index must return the latest frame for a page"
1906 );
1907 }
1908
1909 #[test]
1910 fn test_page_index_invalidated_on_wal_reset() {
1911 let cx = test_cx();
1914 let vfs = MemoryVfs::new();
1915 let mut adapter = make_adapter(&vfs, &cx);
1916
1917 let old_data = sample_page(0x11);
1918 adapter
1919 .append_frame(&cx, 1, &old_data, 1)
1920 .expect("append commit");
1921
1922 assert_eq!(adapter.read_page(&cx, 1).expect("read old"), Some(old_data));
1924
1925 let new_salts = WalSalts {
1927 salt1: 0xAAAA_BBBB,
1928 salt2: 0xCCCC_DDDD,
1929 };
1930 adapter
1931 .inner_mut()
1932 .reset(&cx, 1, new_salts, false)
1933 .expect("WAL reset");
1934
1935 let new_data = sample_page(0x22);
1937 adapter
1938 .append_frame(&cx, 1, &new_data, 1)
1939 .expect("append new generation commit");
1940
1941 let result = adapter.read_page(&cx, 1).expect("read after reset");
1943 assert_eq!(
1944 result,
1945 Some(new_data),
1946 "after WAL reset, page index must return new-generation data, not stale cached data"
1947 );
1948
1949 let old_only = sample_page(0x33);
1951 assert_eq!(
1953 adapter.read_page(&cx, 99).expect("read non-existent"),
1954 None,
1955 "pages from old WAL generation must not appear after reset"
1956 );
1957 drop(old_only);
1959 }
1960
1961 #[test]
1962 fn test_page_index_invalidated_on_same_salt_generation_change() {
1963 init_wal_publication_test_tracing();
1964 let cx = test_cx();
1967 let vfs = MemoryVfs::new();
1968 let mut adapter = make_adapter(&vfs, &cx);
1969
1970 let reused_salts = adapter.inner().header().salts;
1971 let old_data = sample_page(0x11);
1972 adapter
1973 .append_frame(&cx, 1, &old_data, 1)
1974 .expect("append commit");
1975 assert_eq!(adapter.read_page(&cx, 1).expect("read old"), Some(old_data));
1976
1977 adapter
1978 .inner_mut()
1979 .reset(&cx, 1, reused_salts, false)
1980 .expect("reset with same salts");
1981 let new_data = sample_page(0x22);
1982 adapter
1983 .append_frame(&cx, 2, &new_data, 2)
1984 .expect("append new generation commit");
1985
1986 assert_eq!(
1987 adapter.read_page(&cx, 1).expect("old page should be gone"),
1988 None,
1989 "cached index entries from the previous generation must be invalidated"
1990 );
1991 assert_eq!(
1992 adapter.read_page(&cx, 2).expect("read new page"),
1993 Some(new_data),
1994 "adapter must resolve pages from the new generation even when salts are reused"
1995 );
1996 }
1997
1998 #[test]
1999 fn test_page_index_incremental_extend() {
2000 let cx = test_cx();
2002 let vfs = MemoryVfs::new();
2003 let mut adapter = make_adapter(&vfs, &cx);
2004
2005 let page1 = sample_page(0x10);
2006 adapter
2007 .append_frame(&cx, 1, &page1, 1)
2008 .expect("append commit 1");
2009
2010 assert_eq!(
2012 adapter.read_page(&cx, 1).expect("read"),
2013 Some(page1.clone())
2014 );
2015
2016 let page2 = sample_page(0x20);
2018 let page1_v2 = sample_page(0x30);
2019 adapter
2020 .append_frame(&cx, 2, &page2, 0)
2021 .expect("append page 2");
2022 adapter
2023 .append_frame(&cx, 1, &page1_v2, 3)
2024 .expect("append page 1 v2 (commit)");
2025
2026 assert_eq!(
2028 adapter.read_page(&cx, 1).expect("read page 1 v2"),
2029 Some(page1_v2),
2030 "incremental index extend should pick up the updated page"
2031 );
2032 assert_eq!(adapter.read_page(&cx, 2).expect("read page 2"), Some(page2));
2033 }
2034
2035 #[test]
2036 fn test_commit_append_publishes_visibility_snapshot() {
2037 init_wal_publication_test_tracing();
2038 let cx = test_cx();
2039 let vfs = MemoryVfs::new();
2040 let mut adapter = make_adapter(&vfs, &cx);
2041
2042 let p1 = sample_page(0x41);
2043 let p2 = sample_page(0x42);
2044 adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2045 adapter.append_frame(&cx, 2, &p2, 2).expect("append commit");
2046
2047 assert_eq!(
2048 adapter.published_snapshot.last_commit_frame,
2049 Some(1),
2050 "commit append should publish the visible commit horizon"
2051 );
2052 assert_eq!(
2053 adapter.published_snapshot.commit_count, 1,
2054 "commit append should track the visible WAL commit count"
2055 );
2056 assert_eq!(
2057 adapter.published_snapshot.page_index.len(),
2058 2,
2059 "published snapshot should track both committed pages"
2060 );
2061 assert_eq!(
2062 adapter.published_snapshot.page_index.get(&2),
2063 Some(&1),
2064 "published snapshot must map each page to its latest committed frame"
2065 );
2066 }
2067
2068 #[test]
2069 fn test_prepared_append_publishes_visibility_snapshot() {
2070 init_wal_publication_test_tracing();
2071 let cx = test_cx();
2072 let vfs = MemoryVfs::new();
2073 let mut adapter = make_adapter(&vfs, &cx);
2074
2075 let p1 = sample_page(0x51);
2076 let p2 = sample_page(0x52);
2077 let frames = [
2078 WalFrameRef {
2079 page_number: 1,
2080 page_data: &p1,
2081 db_size_if_commit: 0,
2082 },
2083 WalFrameRef {
2084 page_number: 2,
2085 page_data: &p2,
2086 db_size_if_commit: 2,
2087 },
2088 ];
2089 let mut prepared = adapter
2090 .prepare_append_frames(&frames)
2091 .expect("prepare append")
2092 .expect("prepared batch");
2093 adapter
2094 .append_prepared_frames(&cx, &mut prepared)
2095 .expect("append prepared");
2096
2097 assert_eq!(
2098 adapter.published_snapshot.last_commit_frame,
2099 Some(1),
2100 "prepared commit append should publish the visible commit horizon"
2101 );
2102 assert_eq!(
2103 adapter.published_snapshot.commit_count, 1,
2104 "prepared commit append should track the visible WAL commit count"
2105 );
2106 assert_eq!(
2107 adapter.published_snapshot.page_index.len(),
2108 2,
2109 "prepared commit append should publish all committed pages"
2110 );
2111 assert_eq!(
2112 adapter.published_snapshot.page_index.get(&2),
2113 Some(&1),
2114 "prepared commit append must map each page to its latest committed frame"
2115 );
2116 }
2117
2118 #[test]
2119 fn test_commit_publication_refreshes_external_prefix_before_local_commit() {
2120 let cx = test_cx();
2121 let vfs = MemoryVfs::new();
2122
2123 let file_writer = open_wal_file(&vfs, &cx);
2124 let wal_writer =
2125 WalFile::create(&cx, file_writer, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2126 let mut writer = WalBackendAdapter::new(wal_writer);
2127
2128 let file_follower = open_wal_file(&vfs, &cx);
2129 let wal_follower = WalFile::open(&cx, file_follower).expect("open WAL");
2130 let mut follower = WalBackendAdapter::new(wal_follower);
2131
2132 let p1 = sample_page(0x61);
2133 writer
2134 .append_frame(&cx, 1, &p1, 1)
2135 .expect("writer commit 1");
2136 writer.sync(&cx).expect("sync writer commit 1");
2137
2138 let p2 = sample_page(0x62);
2139 writer
2140 .append_frame(&cx, 2, &p2, 2)
2141 .expect("writer commit 2");
2142 writer.sync(&cx).expect("sync writer commit 2");
2143
2144 let p3 = sample_page(0x63);
2145 follower
2146 .append_frame(&cx, 3, &p3, 3)
2147 .expect("follower local commit");
2148
2149 assert_eq!(
2150 follower.published_snapshot.last_commit_frame,
2151 Some(2),
2152 "local commit should publish on top of refreshed external WAL state"
2153 );
2154 assert_eq!(
2155 follower.published_snapshot.commit_count, 3,
2156 "local commit publication should include refreshed external commits"
2157 );
2158 assert_eq!(
2159 follower.published_snapshot.page_index.get(&1),
2160 Some(&0),
2161 "refresh-before-append should preserve earlier committed pages"
2162 );
2163 assert_eq!(
2164 follower.published_snapshot.page_index.get(&2),
2165 Some(&1),
2166 "refresh-before-append should publish externally committed pages"
2167 );
2168 assert_eq!(
2169 follower.published_snapshot.page_index.get(&3),
2170 Some(&2),
2171 "local commit should extend the published WAL visibility map"
2172 );
2173 assert_eq!(follower.read_page(&cx, 1).expect("read p1"), Some(p1));
2174 assert_eq!(follower.read_page(&cx, 2).expect("read p2"), Some(p2));
2175 assert_eq!(follower.read_page(&cx, 3).expect("read p3"), Some(p3));
2176 }
2177
2178 #[test]
2181 fn test_partial_index_falls_back_to_linear_scan() {
2182 init_wal_publication_test_tracing();
2183 let cx = test_cx();
2186 let vfs = MemoryVfs::new();
2187 let mut adapter = make_adapter(&vfs, &cx);
2188
2189 adapter.set_page_index_cap(2);
2192
2193 let p1 = sample_page(0x01);
2196 let p2 = sample_page(0x02);
2197 let p3 = sample_page(0x03);
2198 let p4 = sample_page(0x04);
2199 let p5 = sample_page(0x05);
2200
2201 adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2202 adapter.append_frame(&cx, 2, &p2, 0).expect("append p2");
2203 adapter.append_frame(&cx, 3, &p3, 0).expect("append p3");
2204 adapter.append_frame(&cx, 4, &p4, 0).expect("append p4");
2205 adapter
2206 .append_frame(&cx, 5, &p5, 5)
2207 .expect("append p5 (commit)");
2208
2209 assert_eq!(
2211 adapter.read_page(&cx, 1).expect("read p1"),
2212 Some(p1),
2213 "indexed page should be found via HashMap"
2214 );
2215 assert_eq!(
2216 adapter.read_page(&cx, 2).expect("read p2"),
2217 Some(p2),
2218 "indexed page should be found via HashMap"
2219 );
2220
2221 assert_eq!(
2224 adapter.read_page(&cx, 3).expect("read p3"),
2225 Some(p3),
2226 "non-indexed page must be found via linear scan fallback"
2227 );
2228 assert_eq!(
2229 adapter.read_page(&cx, 4).expect("read p4"),
2230 Some(p4),
2231 "non-indexed page must be found via linear scan fallback"
2232 );
2233 assert_eq!(
2234 adapter.read_page(&cx, 5).expect("read p5"),
2235 Some(p5),
2236 "non-indexed page must be found via linear scan fallback"
2237 );
2238
2239 assert_eq!(
2241 adapter.read_page(&cx, 99).expect("read non-existent"),
2242 None,
2243 "non-existent page must return None even with partial index"
2244 );
2245
2246 assert!(
2248 adapter.published_snapshot.index_is_partial,
2249 "index_is_partial should be true when cap is exceeded"
2250 );
2251 }
2252
2253 #[test]
2254 fn test_partial_index_returns_latest_version_via_fallback() {
2255 let cx = test_cx();
2259 let vfs = MemoryVfs::new();
2260 let mut adapter = make_adapter(&vfs, &cx);
2261
2262 adapter.set_page_index_cap(1);
2264
2265 let old_p2 = sample_page(0xAA);
2266 let new_p2 = sample_page(0xBB);
2267
2268 adapter
2270 .append_frame(&cx, 1, &sample_page(0x01), 0)
2271 .expect("append p1");
2272 adapter
2274 .append_frame(&cx, 2, &old_p2, 0)
2275 .expect("append p2 old");
2276 adapter
2279 .append_frame(&cx, 2, &new_p2, 3)
2280 .expect("append p2 new (commit)");
2281
2282 assert_eq!(
2284 adapter.read_page(&cx, 2).expect("read p2"),
2285 Some(new_p2),
2286 "backwards scan must return the most recent frame for the page"
2287 );
2288 }
2289
2290 #[test]
2291 fn test_lookup_contract_distinguishes_authoritative_and_fallback_paths() {
2292 init_wal_publication_test_tracing();
2293 let cx = test_cx();
2294 let vfs = MemoryVfs::new();
2295 let mut adapter = make_adapter(&vfs, &cx);
2296 adapter.set_page_index_cap(1);
2297
2298 let p1 = sample_page(0x01);
2299 let p2 = sample_page(0x02);
2300 adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2301 adapter
2302 .append_frame(&cx, 2, &p2, 2)
2303 .expect("append p2 commit");
2304
2305 let last_commit = adapter
2306 .inner_mut()
2307 .last_commit_frame(&cx)
2308 .expect("last commit")
2309 .expect("commit exists");
2310 adapter
2311 .publish_visible_snapshot(&cx, Some(last_commit), "lookup_contract_test")
2312 .expect("build published snapshot");
2313 let snapshot = adapter.published_snapshot.clone();
2314
2315 assert_eq!(
2316 adapter
2317 .resolve_visible_frame(&cx, &snapshot, 1)
2318 .expect("resolve indexed page"),
2319 WalPageLookupResolution::AuthoritativeHit { frame_index: 0 }
2320 );
2321 assert_eq!(
2322 adapter
2323 .resolve_visible_frame(&cx, &snapshot, 2)
2324 .expect("resolve fallback page"),
2325 WalPageLookupResolution::PartialIndexFallbackHit { frame_index: 1 }
2326 );
2327 assert_eq!(
2328 adapter
2329 .resolve_visible_frame(&cx, &snapshot, 99)
2330 .expect("resolve missing page"),
2331 WalPageLookupResolution::PartialIndexFallbackMiss
2332 );
2333 }
2334
2335 #[test]
2336 fn test_lookup_contract_is_authoritative_by_default() {
2337 let cx = test_cx();
2338 let vfs = MemoryVfs::new();
2339 let mut adapter = make_adapter(&vfs, &cx);
2340
2341 let p1 = sample_page(0x11);
2342 let p2 = sample_page(0x22);
2343 adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2344 adapter
2345 .append_frame(&cx, 2, &p2, 2)
2346 .expect("append p2 commit");
2347
2348 let last_commit = adapter
2349 .inner_mut()
2350 .last_commit_frame(&cx)
2351 .expect("last commit")
2352 .expect("commit exists");
2353 adapter
2354 .publish_visible_snapshot(&cx, Some(last_commit), "lookup_contract_default")
2355 .expect("build published snapshot");
2356 let snapshot = adapter.published_snapshot.clone();
2357
2358 assert!(
2359 !snapshot.index_is_partial,
2360 "default index should be authoritative"
2361 );
2362 assert_eq!(
2363 adapter
2364 .resolve_visible_frame(&cx, &snapshot, 1)
2365 .expect("resolve page 1"),
2366 WalPageLookupResolution::AuthoritativeHit { frame_index: 0 }
2367 );
2368 assert_eq!(
2369 adapter
2370 .resolve_visible_frame(&cx, &snapshot, 2)
2371 .expect("resolve page 2"),
2372 WalPageLookupResolution::AuthoritativeHit { frame_index: 1 }
2373 );
2374 assert_eq!(
2375 adapter
2376 .resolve_visible_frame(&cx, &snapshot, 99)
2377 .expect("resolve missing page"),
2378 WalPageLookupResolution::AuthoritativeMiss
2379 );
2380 }
2381
2382 #[test]
2383 fn test_committed_txns_since_page_uses_visible_frame_horizon() {
2384 let cx = test_cx();
2385 let vfs = MemoryVfs::new();
2386 let mut adapter = make_adapter(&vfs, &cx);
2387
2388 let p1 = sample_page(0x31);
2389 let p2 = sample_page(0x32);
2390 let p3 = sample_page(0x33);
2391
2392 adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2393 adapter.append_frame(&cx, 2, &p2, 2).expect("commit tx1");
2394 adapter.append_frame(&cx, 3, &p3, 0).expect("append p3");
2395 adapter.append_frame(&cx, 2, &p2, 3).expect("commit tx2");
2396
2397 assert_eq!(
2398 adapter
2399 .committed_txns_since_page(&cx, 1)
2400 .expect("count txns since page 1"),
2401 1
2402 );
2403 assert_eq!(
2404 adapter
2405 .committed_txns_since_page(&cx, 2)
2406 .expect("count txns since page 2"),
2407 0
2408 );
2409 assert_eq!(
2410 adapter
2411 .committed_txns_since_page(&cx, 99)
2412 .expect("count txns since missing page"),
2413 2
2414 );
2415 assert_eq!(
2416 adapter
2417 .committed_txn_count(&cx)
2418 .expect("count visible transactions"),
2419 2
2420 );
2421 }
2422
2423 #[test]
2426 fn test_checkpoint_adapter_write_page() {
2427 let cx = test_cx();
2428 let mut writer = MockCheckpointPageWriter;
2429 let mut adapter = CheckpointTargetAdapterRef {
2430 writer: &mut writer,
2431 };
2432
2433 let page_no = PageNumber::new(1).expect("valid page number");
2434 adapter
2435 .write_page(&cx, page_no, &[0u8; 4096])
2436 .expect("write_page");
2437 }
2438
2439 #[test]
2440 fn test_checkpoint_adapter_truncate_db() {
2441 let cx = test_cx();
2442 let mut writer = MockCheckpointPageWriter;
2443 let mut adapter = CheckpointTargetAdapterRef {
2444 writer: &mut writer,
2445 };
2446
2447 adapter.truncate_db(&cx, 10).expect("truncate_db");
2448 }
2449
2450 #[test]
2451 fn test_checkpoint_adapter_sync_db() {
2452 let cx = test_cx();
2453 let mut writer = MockCheckpointPageWriter;
2454 let mut adapter = CheckpointTargetAdapterRef {
2455 writer: &mut writer,
2456 };
2457
2458 adapter.sync_db(&cx).expect("sync_db");
2459 }
2460
2461 #[test]
2462 fn test_checkpoint_adapter_as_dyn_target() {
2463 let cx = test_cx();
2464 let mut writer = MockCheckpointPageWriter;
2465 let mut adapter = CheckpointTargetAdapterRef {
2466 writer: &mut writer,
2467 };
2468
2469 let target: &mut dyn CheckpointTarget = &mut adapter;
2471 let page_no = PageNumber::new(3).expect("valid page number");
2472 target
2473 .write_page(&cx, page_no, &[0u8; 4096])
2474 .expect("write via dyn");
2475 target.truncate_db(&cx, 5).expect("truncate via dyn");
2476 target.sync_db(&cx).expect("sync via dyn");
2477 }
2478}