Skip to main content

fsqlite_core/
wal_adapter.rs

1//! Adapters bridging the WAL and pager crates at runtime.
2//!
3//! These adapters break the circular dependency between `fsqlite-pager` and
4//! `fsqlite-wal`:
5//!
6//! - [`WalBackendAdapter`] wraps `WalFile` to satisfy the pager's
7//!   [`WalBackend`] trait (pager -> WAL direction).
8//! - [`CheckpointTargetAdapterRef`] wraps `CheckpointPageWriter` to satisfy the
9//!   WAL executor's [`CheckpointTarget`] trait (WAL -> pager direction).
10
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use fsqlite_error::{FrankenError, Result};
15use fsqlite_pager::traits::{
16    PreparedWalChecksumSeed, PreparedWalChecksumTransform, PreparedWalFinalizationState,
17    PreparedWalFrameBatch, PreparedWalFrameMeta, WalFrameRef,
18};
19use fsqlite_pager::{CheckpointMode, CheckpointPageWriter, CheckpointResult, WalBackend};
20use fsqlite_types::PageNumber;
21use fsqlite_types::cx::Cx;
22use fsqlite_types::flags::SyncFlags;
23use fsqlite_vfs::VfsFile;
24use fsqlite_wal::checksum::{SqliteWalChecksum, WAL_FRAME_HEADER_SIZE, WalChecksumTransform};
25use fsqlite_wal::wal::WalAppendFrameRef;
26use fsqlite_wal::{
27    CheckpointMode as WalCheckpointMode, CheckpointState, CheckpointTarget, WalFile,
28    WalGenerationIdentity, execute_checkpoint,
29};
30use tracing::debug;
31#[cfg(not(target_arch = "wasm32"))]
32use tracing::warn;
33
34#[cfg(not(target_arch = "wasm32"))]
35use crate::wal_fec_adapter::{FecCommitHook, FecCommitResult};
36
37// ---------------------------------------------------------------------------
38// WalBackendAdapter: WalFile -> WalBackend
39// ---------------------------------------------------------------------------
40
41/// Adapter wrapping [`WalFile`] to implement the pager's [`WalBackend`] trait.
42///
43/// The pager calls `dyn WalBackend` during WAL-mode commits and page reads.
44/// This adapter delegates those calls to the concrete `WalFile<F>` from
45/// `fsqlite-wal`.
46/// Default steady-state page-index cap.
47///
48/// Normal runtime operation keeps the published WAL page index authoritative
49/// for the full visible generation. Tests can still lower this cap explicitly
50/// to exercise the bounded fallback path.
51const PAGE_INDEX_MAX_ENTRIES: usize = usize::MAX;
52
53/// How a visible page lookup was resolved for the current WAL generation.
54///
55/// The steady-state contract is that `Authoritative*` outcomes come from a
56/// complete per-generation index. `PartialIndexFallback*` outcomes are an
57/// explicit slow-path exception used only when a lowered cap makes the
58/// in-memory index incomplete.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60enum WalPageLookupResolution {
61    AuthoritativeHit { frame_index: usize },
62    AuthoritativeMiss,
63    PartialIndexFallbackHit { frame_index: usize },
64    PartialIndexFallbackMiss,
65}
66
67impl WalPageLookupResolution {
68    #[must_use]
69    const fn frame_index(self) -> Option<usize> {
70        match self {
71            Self::AuthoritativeHit { frame_index }
72            | Self::PartialIndexFallbackHit { frame_index } => Some(frame_index),
73            Self::AuthoritativeMiss | Self::PartialIndexFallbackMiss => None,
74        }
75    }
76
77    #[must_use]
78    const fn lookup_mode(self) -> &'static str {
79        match self {
80            Self::AuthoritativeHit { .. } | Self::AuthoritativeMiss => "authoritative_index",
81            Self::PartialIndexFallbackHit { .. } | Self::PartialIndexFallbackMiss => {
82                "partial_index_fallback"
83            }
84        }
85    }
86
87    #[must_use]
88    const fn fallback_reason(self) -> &'static str {
89        match self {
90            Self::AuthoritativeHit { .. } | Self::AuthoritativeMiss => "none",
91            Self::PartialIndexFallbackHit { .. } | Self::PartialIndexFallbackMiss => {
92                "partial_index_cap"
93            }
94        }
95    }
96}
97
98/// Immutable visibility snapshot published for one WAL generation.
99///
100/// Readers pin one of these snapshots at transaction start so page lookups stay
101/// bound to a stable committed horizon even if later commits advance the active
102/// publication plane.
103#[derive(Debug, Clone)]
104struct WalPublishedSnapshot {
105    publication_seq: u64,
106    generation: WalGenerationIdentity,
107    last_commit_frame: Option<usize>,
108    commit_count: u64,
109    page_index: Arc<HashMap<u32, usize>>,
110    index_is_partial: bool,
111}
112
113impl WalPublishedSnapshot {
114    #[must_use]
115    fn empty(publication_seq: u64, generation: WalGenerationIdentity) -> Self {
116        Self {
117            publication_seq,
118            generation,
119            last_commit_frame: None,
120            commit_count: 0,
121            page_index: Arc::new(HashMap::new()),
122            index_is_partial: false,
123        }
124    }
125}
126
127#[derive(Debug, Clone, Copy)]
128struct PendingPublicationFrame {
129    page_number: u32,
130    frame_index: usize,
131    is_commit: bool,
132}
133
134pub struct WalBackendAdapter<F: VfsFile> {
135    wal: WalFile<F>,
136    /// Guard so commit-time append refresh runs only once per commit batch.
137    refresh_before_append: bool,
138    /// Active commit-published visibility plane for the current WAL generation.
139    published_snapshot: WalPublishedSnapshot,
140    /// Monotonic publication sequence assigned to the next published snapshot.
141    next_publication_seq: u64,
142    /// Transaction-bounded read snapshot pinned at `begin_transaction()`.
143    read_snapshot: Option<WalPublishedSnapshot>,
144    /// Frames appended after the last published commit horizon.
145    pending_publication_frames: Vec<PendingPublicationFrame>,
146    /// Optional FEC commit hook for encoding repair symbols on commit.
147    #[cfg(not(target_arch = "wasm32"))]
148    fec_hook: Option<FecCommitHook>,
149    /// Accumulated FEC commit results (for later sidecar persistence).
150    #[cfg(not(target_arch = "wasm32"))]
151    fec_pending: Vec<FecCommitResult>,
152    /// Maximum number of unique pages the index will track. Defaults to a
153    /// full authoritative index in steady state. Tests can lower the cap to
154    /// exercise the partial-index fallback path explicitly.
155    page_index_cap: usize,
156}
157
158impl<F: VfsFile> WalBackendAdapter<F> {
159    /// Wrap an existing [`WalFile`] in the adapter (FEC disabled).
160    #[must_use]
161    pub fn new(wal: WalFile<F>) -> Self {
162        let generation = wal.generation_identity();
163        Self {
164            wal,
165            refresh_before_append: true,
166            published_snapshot: WalPublishedSnapshot::empty(0, generation),
167            next_publication_seq: 1,
168            read_snapshot: None,
169            pending_publication_frames: Vec::new(),
170            #[cfg(not(target_arch = "wasm32"))]
171            fec_hook: None,
172            #[cfg(not(target_arch = "wasm32"))]
173            fec_pending: Vec::new(),
174            page_index_cap: PAGE_INDEX_MAX_ENTRIES,
175        }
176    }
177
178    /// Wrap an existing [`WalFile`] with an FEC commit hook.
179    #[must_use]
180    #[cfg(not(target_arch = "wasm32"))]
181    pub fn with_fec_hook(wal: WalFile<F>, hook: FecCommitHook) -> Self {
182        let generation = wal.generation_identity();
183        Self {
184            wal,
185            refresh_before_append: true,
186            published_snapshot: WalPublishedSnapshot::empty(0, generation),
187            next_publication_seq: 1,
188            read_snapshot: None,
189            pending_publication_frames: Vec::new(),
190            fec_hook: Some(hook),
191            fec_pending: Vec::new(),
192            page_index_cap: PAGE_INDEX_MAX_ENTRIES,
193        }
194    }
195
196    /// Consume the adapter and return the inner [`WalFile`].
197    #[must_use]
198    pub fn into_inner(self) -> WalFile<F> {
199        self.wal
200    }
201
202    /// Borrow the inner [`WalFile`].
203    #[must_use]
204    pub fn inner(&self) -> &WalFile<F> {
205        &self.wal
206    }
207
208    /// Mutably borrow the inner [`WalFile`].
209    ///
210    /// Invalidates the publication plane since the caller may mutate WAL state.
211    pub fn inner_mut(&mut self) -> &mut WalFile<F> {
212        self.invalidate_publication();
213        &mut self.wal
214    }
215
216    /// Discard published and pinned snapshots after external WAL mutation.
217    fn invalidate_publication(&mut self) {
218        self.read_snapshot = None;
219        self.pending_publication_frames.clear();
220        self.published_snapshot = WalPublishedSnapshot::empty(
221            self.published_snapshot.publication_seq,
222            self.published_snapshot.generation,
223        );
224    }
225
226    /// Publish an immutable visibility snapshot for the current committed WAL prefix.
227    ///
228    /// The commit path advances this plane directly, and readers pin a clone of
229    /// the published snapshot instead of mutating shared lookup state under an
230    /// active transaction.
231    fn publish_visible_snapshot(
232        &mut self,
233        cx: &Cx,
234        last_commit_frame: Option<usize>,
235        scenario_id: &'static str,
236    ) -> Result<()> {
237        let generation = self.wal.generation_identity();
238        if self.published_snapshot.generation == generation
239            && self.published_snapshot.last_commit_frame == last_commit_frame
240        {
241            return Ok(());
242        }
243
244        let previous_generation = self.published_snapshot.generation;
245        let previous_last_commit = self.published_snapshot.last_commit_frame;
246        let previous_commit_count = if previous_generation == generation {
247            self.published_snapshot.commit_count
248        } else {
249            0
250        };
251        let mut page_index = if previous_generation == generation {
252            std::mem::replace(
253                &mut self.published_snapshot.page_index,
254                Arc::new(HashMap::new()),
255            )
256        } else {
257            Arc::new(HashMap::new())
258        };
259        let mut index_is_partial = if previous_generation == generation {
260            self.published_snapshot.index_is_partial
261        } else {
262            false
263        };
264
265        let frame_delta_count = match (previous_last_commit, last_commit_frame) {
266            (Some(prev), Some(curr)) if curr >= prev => curr.saturating_sub(prev),
267            (Some(_) | None, Some(curr)) => curr.saturating_add(1),
268            (Some(prev), None) => prev.saturating_add(1),
269            (None, None) => 0,
270        };
271
272        let update_result = match last_commit_frame {
273            None => {
274                Arc::make_mut(&mut page_index).clear();
275                index_is_partial = false;
276                Ok(())
277            }
278            Some(current_last_commit) => {
279                let start = match (previous_generation == generation, previous_last_commit) {
280                    (true, Some(previous_last_commit))
281                        if previous_last_commit < current_last_commit =>
282                    {
283                        previous_last_commit.saturating_add(1)
284                    }
285                    (true, Some(previous_last_commit))
286                        if previous_last_commit == current_last_commit =>
287                    {
288                        current_last_commit.saturating_add(1)
289                    }
290                    _ => {
291                        Arc::make_mut(&mut page_index).clear();
292                        index_is_partial = false;
293                        0
294                    }
295                };
296                if start <= current_last_commit {
297                    self.build_index_range(
298                        cx,
299                        Arc::make_mut(&mut page_index),
300                        &mut index_is_partial,
301                        start,
302                        current_last_commit,
303                    )
304                } else {
305                    Ok(())
306                }
307            }
308        };
309        let commit_count_result = match last_commit_frame {
310            None => Ok(0),
311            Some(current_last_commit) => {
312                match (previous_generation == generation, previous_last_commit) {
313                    (true, Some(previous_last_commit))
314                        if previous_last_commit < current_last_commit =>
315                    {
316                        self.count_commit_frames_in_range(
317                            cx,
318                            previous_last_commit.saturating_add(1),
319                            current_last_commit,
320                        )
321                        .map(|delta| previous_commit_count.saturating_add(delta))
322                    }
323                    (true, Some(previous_last_commit))
324                        if previous_last_commit == current_last_commit =>
325                    {
326                        Ok(previous_commit_count)
327                    }
328                    _ => self.count_commit_frames_in_range(cx, 0, current_last_commit),
329                }
330            }
331        };
332        if let Err(error) = update_result {
333            if previous_generation == generation {
334                self.published_snapshot.page_index = page_index;
335            }
336            return Err(error);
337        }
338        let commit_count = match commit_count_result {
339            Ok(commit_count) => commit_count,
340            Err(error) => {
341                if previous_generation == generation {
342                    self.published_snapshot.page_index = page_index;
343                }
344                return Err(error);
345            }
346        };
347
348        let publication_seq = self.next_publication_seq;
349        self.next_publication_seq = self.next_publication_seq.saturating_add(1);
350        let latest_frame_entries = page_index.len();
351        self.published_snapshot = WalPublishedSnapshot {
352            publication_seq,
353            generation,
354            last_commit_frame,
355            commit_count,
356            page_index,
357            index_is_partial,
358        };
359
360        tracing::trace!(
361            target: "fsqlite.wal_publication",
362            trace_id = cx.trace_id(),
363            run_id = "wal-publication",
364            scenario_id,
365            wal_generation = generation.checkpoint_seq,
366            wal_salt1 = generation.salts.salt1,
367            wal_salt2 = generation.salts.salt2,
368            publication_seq,
369            frame_delta_count,
370            latest_frame_entries,
371            snapshot_age = 0_u64,
372            lookup_mode = "published_visibility_map",
373            fallback_reason = if index_is_partial {
374                "partial_index_cap"
375            } else {
376                "none"
377            },
378            "published WAL visibility snapshot"
379        );
380
381        Ok(())
382    }
383
384    /// Resolve the most recent visible frame for `page_number`.
385    ///
386    /// The normal contract is `Authoritative*`: the published page index fully
387    /// covers the visible WAL generation, so a miss means the page is absent.
388    /// `PartialIndexFallback*` is a bounded slow-path used only when the capped
389    /// index is known to be incomplete.
390    fn resolve_visible_frame(
391        &self,
392        cx: &Cx,
393        snapshot: &WalPublishedSnapshot,
394        page_number: u32,
395    ) -> Result<WalPageLookupResolution> {
396        match snapshot.page_index.get(&page_number) {
397            Some(&frame_index) => Ok(WalPageLookupResolution::AuthoritativeHit { frame_index }),
398            None if !snapshot.index_is_partial => Ok(WalPageLookupResolution::AuthoritativeMiss),
399            None => match snapshot.last_commit_frame {
400                Some(last_commit_frame) => {
401                    match self.scan_backwards_for_page(cx, page_number, last_commit_frame)? {
402                        Some(frame_index) => {
403                            Ok(WalPageLookupResolution::PartialIndexFallbackHit { frame_index })
404                        }
405                        None => Ok(WalPageLookupResolution::PartialIndexFallbackMiss),
406                    }
407                }
408                None => Ok(WalPageLookupResolution::AuthoritativeMiss),
409            },
410        }
411    }
412
413    /// Scan frame headers from `start..=end` (inclusive) and populate the page index.
414    ///
415    /// Since we scan forward, later frames naturally overwrite earlier entries
416    /// for the same page number, ensuring "newest frame wins" semantics.
417    fn build_index_range(
418        &self,
419        cx: &Cx,
420        page_index: &mut HashMap<u32, usize>,
421        index_is_partial: &mut bool,
422        start: usize,
423        end: usize,
424    ) -> Result<()> {
425        for frame_index in start..=end {
426            let header = self.wal.read_frame_header(cx, frame_index)?;
427            // Only insert if we haven't hit the capacity cap, or if this page
428            // is already tracked (update is free).
429            if page_index.len() < self.page_index_cap
430                || page_index.contains_key(&header.page_number)
431            {
432                page_index.insert(header.page_number, frame_index);
433            } else {
434                // A page was dropped because the index is full -- mark it as
435                // partial so that `read_page` knows a HashMap miss cannot be
436                // trusted and must fall back to a linear scan.
437                *index_is_partial = true;
438            }
439        }
440        Ok(())
441    }
442
443    /// Count commit frames within the visible range `start..=end`.
444    fn count_commit_frames_in_range(&self, cx: &Cx, start: usize, end: usize) -> Result<u64> {
445        if start > end {
446            return Ok(0);
447        }
448
449        let mut commit_count = 0_u64;
450        for frame_index in start..=end {
451            if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
452                commit_count = commit_count.saturating_add(1);
453            }
454        }
455        Ok(commit_count)
456    }
457
458    /// Backwards linear scan of committed frames to find a page that was not
459    /// captured by the capped page index.
460    ///
461    /// Scans from `last_commit_frame` down to frame 0 and returns the index
462    /// of the first (i.e., most recent) frame containing `page_number`, or
463    /// `None` if the page is not in the WAL at all.
464    fn scan_backwards_for_page(
465        &self,
466        cx: &Cx,
467        page_number: u32,
468        last_commit_frame: usize,
469    ) -> Result<Option<usize>> {
470        for frame_index in (0..=last_commit_frame).rev() {
471            let header = self.wal.read_frame_header(cx, frame_index)?;
472            if header.page_number == page_number {
473                return Ok(Some(frame_index));
474            }
475        }
476        Ok(None)
477    }
478
479    /// Take any pending FEC commit results for sidecar persistence.
480    #[cfg(not(target_arch = "wasm32"))]
481    pub fn take_fec_pending(&mut self) -> Vec<FecCommitResult> {
482        std::mem::take(&mut self.fec_pending)
483    }
484
485    /// Whether FEC encoding is active.
486    #[must_use]
487    #[cfg(not(target_arch = "wasm32"))]
488    pub fn fec_enabled(&self) -> bool {
489        self.fec_hook
490            .as_ref()
491            .is_some_and(FecCommitHook::is_enabled)
492    }
493
494    /// Discard buffered FEC pages (e.g. on transaction rollback).
495    #[cfg(not(target_arch = "wasm32"))]
496    pub fn fec_discard(&mut self) {
497        if let Some(hook) = &mut self.fec_hook {
498            hook.discard_buffered();
499        }
500    }
501
502    /// Override the page index capacity (for testing only).
503    #[cfg(test)]
504    fn set_page_index_cap(&mut self, cap: usize) {
505        self.page_index_cap = cap;
506        // Invalidate so the next read rebuilds with the new cap.
507        self.invalidate_publication();
508    }
509
510    #[must_use]
511    fn current_prepared_finalization_state(&self) -> PreparedWalFinalizationState {
512        let generation = self.wal.generation_identity();
513        let seed = self.wal.running_checksum();
514        PreparedWalFinalizationState {
515            checkpoint_seq: generation.checkpoint_seq,
516            salt1: generation.salts.salt1,
517            salt2: generation.salts.salt2,
518            start_frame_index: self.wal.frame_count(),
519            seed: PreparedWalChecksumSeed {
520                s1: seed.s1,
521                s2: seed.s2,
522            },
523        }
524    }
525
526    #[must_use]
527    fn prepared_batch_matches_current_state(&self, prepared: &PreparedWalFrameBatch) -> bool {
528        prepared
529            .finalized_for
530            .is_some_and(|state| state == self.current_prepared_finalization_state())
531    }
532
533    fn prepared_batch_matches_disk_state(
534        &self,
535        cx: &Cx,
536        prepared: &PreparedWalFrameBatch,
537    ) -> Result<bool> {
538        let Some(state) = prepared.finalized_for else {
539            return Ok(false);
540        };
541        let generation = WalGenerationIdentity {
542            checkpoint_seq: state.checkpoint_seq,
543            salts: fsqlite_wal::checksum::WalSalts {
544                salt1: state.salt1,
545                salt2: state.salt2,
546            },
547        };
548        self.wal
549            .prepared_append_window_still_current(cx, generation, state.start_frame_index)
550    }
551
552    fn checksum_transforms_for_prepared(
553        prepared: &PreparedWalFrameBatch,
554    ) -> Vec<WalChecksumTransform> {
555        prepared
556            .checksum_transforms
557            .iter()
558            .map(|transform| WalChecksumTransform {
559                a11: transform.a11,
560                a12: transform.a12,
561                a21: transform.a21,
562                a22: transform.a22,
563                c1: transform.c1,
564                c2: transform.c2,
565            })
566            .collect()
567    }
568
569    fn finalize_prepared_batch_against_current_state(
570        &self,
571        prepared: &mut PreparedWalFrameBatch,
572    ) -> Result<()> {
573        let checksum_transforms = Self::checksum_transforms_for_prepared(prepared);
574        let final_running_checksum = self
575            .wal
576            .finalize_prepared_frame_bytes(&mut prepared.frame_bytes, &checksum_transforms)?;
577        prepared.finalized_for = Some(self.current_prepared_finalization_state());
578        prepared.finalized_running_checksum = Some(PreparedWalChecksumSeed {
579            s1: final_running_checksum.s1,
580            s2: final_running_checksum.s2,
581        });
582        Ok(())
583    }
584
585    fn finalized_running_checksum(prepared: &PreparedWalFrameBatch) -> Result<SqliteWalChecksum> {
586        let Some(checksum) = prepared.finalized_running_checksum else {
587            return Err(FrankenError::internal(
588                "prepared WAL batch missing finalized running checksum",
589            ));
590        };
591        Ok(SqliteWalChecksum {
592            s1: checksum.s1,
593            s2: checksum.s2,
594        })
595    }
596
597    fn publish_latest_committed_snapshot(
598        &mut self,
599        cx: &Cx,
600        scenario_id: &'static str,
601    ) -> Result<()> {
602        let last_commit_frame = self.wal.last_commit_frame(cx)?;
603        self.publish_visible_snapshot(cx, last_commit_frame, scenario_id)
604    }
605
606    fn synchronize_publication_before_append(
607        &mut self,
608        cx: &Cx,
609        scenario_id: &'static str,
610    ) -> Result<()> {
611        self.wal.refresh(cx)?;
612        self.pending_publication_frames.clear();
613        self.publish_latest_committed_snapshot(cx, scenario_id)
614    }
615
616    fn record_appended_frames<I>(&mut self, start_frame_index: usize, frames: I) -> Option<usize>
617    where
618        I: IntoIterator<Item = (u32, u32)>,
619    {
620        let mut last_commit_frame = None;
621        for (offset, (page_number, db_size_if_commit)) in frames.into_iter().enumerate() {
622            let frame_index = start_frame_index.saturating_add(offset);
623            self.pending_publication_frames
624                .push(PendingPublicationFrame {
625                    page_number,
626                    frame_index,
627                    is_commit: db_size_if_commit != 0,
628                });
629            if db_size_if_commit != 0 {
630                last_commit_frame = Some(frame_index);
631            }
632        }
633        last_commit_frame
634    }
635
636    fn publish_pending_commit_snapshot(
637        &mut self,
638        cx: &Cx,
639        last_commit_frame: usize,
640        scenario_id: &'static str,
641    ) -> Result<()> {
642        let generation = self.wal.generation_identity();
643        let previous_last_commit = self.published_snapshot.last_commit_frame;
644        let can_extend_previous = self.published_snapshot.generation == generation
645            && self
646                .published_snapshot
647                .last_commit_frame
648                .is_none_or(|previous_last_commit| previous_last_commit < last_commit_frame);
649        let mut page_index = if can_extend_previous {
650            std::mem::replace(
651                &mut self.published_snapshot.page_index,
652                Arc::new(HashMap::new()),
653            )
654        } else {
655            Arc::new(HashMap::new())
656        };
657        let mut index_is_partial = if can_extend_previous {
658            self.published_snapshot.index_is_partial
659        } else {
660            false
661        };
662        let previous_last_commit = if can_extend_previous {
663            previous_last_commit
664        } else {
665            None
666        };
667        let previous_commit_count = if can_extend_previous {
668            self.published_snapshot.commit_count
669        } else {
670            0
671        };
672
673        let mut frame_delta_count = 0_usize;
674        let mut commit_delta_count = 0_u64;
675        for frame in &self.pending_publication_frames {
676            if previous_last_commit
677                .is_some_and(|previous_last_commit| frame.frame_index <= previous_last_commit)
678                || frame.frame_index > last_commit_frame
679            {
680                continue;
681            }
682
683            frame_delta_count = frame_delta_count.saturating_add(1);
684            let page_index_map = Arc::make_mut(&mut page_index);
685            if page_index_map.len() < self.page_index_cap
686                || page_index_map.contains_key(&frame.page_number)
687            {
688                page_index_map.insert(frame.page_number, frame.frame_index);
689            } else {
690                index_is_partial = true;
691            }
692            if frame.is_commit {
693                commit_delta_count = commit_delta_count.saturating_add(1);
694            }
695        }
696
697        if frame_delta_count == 0 {
698            self.pending_publication_frames.clear();
699            return self.publish_visible_snapshot(cx, Some(last_commit_frame), scenario_id);
700        }
701
702        let publication_seq = self.next_publication_seq;
703        self.next_publication_seq = self.next_publication_seq.saturating_add(1);
704        let latest_frame_entries = page_index.len();
705        self.published_snapshot = WalPublishedSnapshot {
706            publication_seq,
707            generation,
708            last_commit_frame: Some(last_commit_frame),
709            commit_count: previous_commit_count.saturating_add(commit_delta_count),
710            page_index,
711            index_is_partial,
712        };
713        self.pending_publication_frames.clear();
714
715        tracing::trace!(
716            target: "fsqlite.wal_publication",
717            trace_id = cx.trace_id(),
718            run_id = "wal-publication",
719            scenario_id,
720            wal_generation = generation.checkpoint_seq,
721            wal_salt1 = generation.salts.salt1,
722            wal_salt2 = generation.salts.salt2,
723            publication_seq,
724            frame_delta_count,
725            latest_frame_entries,
726            snapshot_age = 0_u64,
727            lookup_mode = "published_visibility_map",
728            fallback_reason = if index_is_partial {
729                "partial_index_cap"
730            } else {
731                "none"
732            },
733            "published WAL visibility snapshot from commit path"
734        );
735
736        Ok(())
737    }
738}
739
740/// Convert pager checkpoint mode to WAL checkpoint mode.
741fn to_wal_mode(mode: CheckpointMode) -> WalCheckpointMode {
742    match mode {
743        CheckpointMode::Passive => WalCheckpointMode::Passive,
744        CheckpointMode::Full => WalCheckpointMode::Full,
745        CheckpointMode::Restart => WalCheckpointMode::Restart,
746        CheckpointMode::Truncate => WalCheckpointMode::Truncate,
747    }
748}
749
750impl<F: VfsFile> WalBackend for WalBackendAdapter<F> {
751    fn begin_transaction(&mut self, cx: &Cx) -> Result<()> {
752        // Establish a transaction-bounded snapshot once, instead of doing an
753        // expensive refresh for every page read.
754        self.wal.refresh(cx)?;
755        self.publish_latest_committed_snapshot(cx, "begin_transaction")?;
756        self.read_snapshot = Some(self.published_snapshot.clone());
757        self.refresh_before_append = true;
758        Ok(())
759    }
760
761    fn append_frame(
762        &mut self,
763        cx: &Cx,
764        page_number: u32,
765        page_data: &[u8],
766        db_size_if_commit: u32,
767    ) -> Result<()> {
768        if self.refresh_before_append {
769            // Refresh and synchronize the published base snapshot once before
770            // the commit batch starts, then publish local frame deltas directly
771            // from the append path.
772            self.synchronize_publication_before_append(cx, "append_frame_pre_refresh")?;
773        }
774        let start_frame_index = self.wal.frame_count();
775        self.wal
776            .append_frame(cx, page_number, page_data, db_size_if_commit)?;
777        self.refresh_before_append = false;
778        let last_commit_frame =
779            self.record_appended_frames(start_frame_index, [(page_number, db_size_if_commit)]);
780
781        // Feed the frame to the FEC hook.  On commit, it encodes repair
782        // symbols and stores them for later sidecar persistence.
783        #[cfg(not(target_arch = "wasm32"))]
784        if let Some(hook) = &mut self.fec_hook {
785            match hook.on_frame(cx, page_number, page_data, db_size_if_commit) {
786                Ok(Some(result)) => {
787                    debug!(
788                        pages = result.page_numbers.len(),
789                        k_source = result.k_source,
790                        symbols = result.symbols.len(),
791                        "FEC commit group encoded"
792                    );
793                    self.fec_pending.push(result);
794                }
795                Ok(None) => {}
796                Err(e) => {
797                    // FEC encoding failure is non-fatal -- log and continue.
798                    warn!(error = %e, "FEC encoding failed; commit proceeds without repair symbols");
799                }
800            }
801        }
802
803        if let Some(last_commit_frame) = last_commit_frame {
804            self.publish_pending_commit_snapshot(cx, last_commit_frame, "append_frame_commit")?;
805        }
806
807        Ok(())
808    }
809
810    fn append_frames(&mut self, cx: &Cx, frames: &[WalFrameRef<'_>]) -> Result<()> {
811        if frames.is_empty() {
812            return Ok(());
813        }
814
815        if self.refresh_before_append {
816            self.synchronize_publication_before_append(cx, "append_frames_pre_refresh")?;
817        }
818
819        let start_frame_index = self.wal.frame_count();
820        let mut wal_frames = Vec::with_capacity(frames.len());
821        for frame in frames {
822            wal_frames.push(WalAppendFrameRef {
823                page_number: frame.page_number,
824                page_data: frame.page_data,
825                db_size_if_commit: frame.db_size_if_commit,
826            });
827        }
828        self.wal.append_frames(cx, &wal_frames)?;
829        self.refresh_before_append = false;
830        let last_commit_frame = self.record_appended_frames(
831            start_frame_index,
832            frames
833                .iter()
834                .map(|frame| (frame.page_number, frame.db_size_if_commit)),
835        );
836
837        #[cfg(not(target_arch = "wasm32"))]
838        if let Some(hook) = &mut self.fec_hook {
839            for frame in frames {
840                match hook.on_frame(
841                    cx,
842                    frame.page_number,
843                    frame.page_data,
844                    frame.db_size_if_commit,
845                ) {
846                    Ok(Some(result)) => {
847                        debug!(
848                            pages = result.page_numbers.len(),
849                            k_source = result.k_source,
850                            symbols = result.symbols.len(),
851                            "FEC commit group encoded"
852                        );
853                        self.fec_pending.push(result);
854                    }
855                    Ok(None) => {}
856                    Err(e) => {
857                        warn!(
858                            error = %e,
859                            "FEC encoding failed; commit proceeds without repair symbols"
860                        );
861                    }
862                }
863            }
864        }
865
866        if let Some(last_commit_frame) = last_commit_frame {
867            self.publish_pending_commit_snapshot(cx, last_commit_frame, "append_frames_commit")?;
868        }
869
870        Ok(())
871    }
872
873    fn prepare_append_frames(
874        &mut self,
875        frames: &[WalFrameRef<'_>],
876    ) -> Result<Option<PreparedWalFrameBatch>> {
877        if frames.is_empty() {
878            return Ok(None);
879        }
880
881        let wal_frames: Vec<_> = frames
882            .iter()
883            .map(|frame| WalAppendFrameRef {
884                page_number: frame.page_number,
885                page_data: frame.page_data,
886                db_size_if_commit: frame.db_size_if_commit,
887            })
888            .collect();
889        let frame_bytes = self.wal.prepare_frame_bytes(&wal_frames)?;
890        let checksum_transforms = frame_bytes
891            .chunks_exact(self.wal.frame_size())
892            .map(|frame| {
893                WalChecksumTransform::for_wal_frame(
894                    frame,
895                    self.wal.page_size(),
896                    self.wal.big_endian_checksum(),
897                )
898            })
899            .map(|result| {
900                result.map(|transform| PreparedWalChecksumTransform {
901                    a11: transform.a11,
902                    a12: transform.a12,
903                    a21: transform.a21,
904                    a22: transform.a22,
905                    c1: transform.c1,
906                    c2: transform.c2,
907                })
908            })
909            .collect::<Result<Vec<_>>>()?;
910        let frame_metas = frames
911            .iter()
912            .map(|frame| PreparedWalFrameMeta {
913                page_number: frame.page_number,
914                db_size_if_commit: frame.db_size_if_commit,
915            })
916            .collect();
917        let last_commit_frame_offset = frames
918            .iter()
919            .enumerate()
920            .rev()
921            .find_map(|(offset, frame)| (frame.db_size_if_commit != 0).then_some(offset));
922
923        Ok(Some(PreparedWalFrameBatch {
924            frame_size: self.wal.frame_size(),
925            page_data_offset: WAL_FRAME_HEADER_SIZE,
926            frame_metas,
927            checksum_transforms,
928            frame_bytes,
929            last_commit_frame_offset,
930            finalized_for: None,
931            finalized_running_checksum: None,
932        }))
933    }
934
935    fn finalize_prepared_frames(
936        &mut self,
937        _cx: &Cx,
938        prepared: &mut PreparedWalFrameBatch,
939    ) -> Result<()> {
940        if prepared.frame_count() == 0 {
941            return Ok(());
942        }
943        // Optimistically finalize against the adapter's current WAL state.
944        // The append path still validates against both local and on-disk state
945        // and will refresh/reseed if another writer advanced the append window.
946        self.finalize_prepared_batch_against_current_state(prepared)
947    }
948
949    fn append_prepared_frames(
950        &mut self,
951        cx: &Cx,
952        prepared: &mut PreparedWalFrameBatch,
953    ) -> Result<()> {
954        if prepared.frame_count() == 0 {
955            return Ok(());
956        }
957
958        let can_reuse_prelock_finalize = self.refresh_before_append
959            && self.prepared_batch_matches_current_state(prepared)
960            && self.prepared_batch_matches_disk_state(cx, prepared)?;
961        if self.refresh_before_append && !can_reuse_prelock_finalize {
962            self.synchronize_publication_before_append(cx, "append_prepared_pre_refresh")?;
963        }
964
965        if !self.prepared_batch_matches_current_state(prepared) {
966            self.finalize_prepared_batch_against_current_state(prepared)?;
967        }
968
969        let start_frame_index = self.wal.frame_count();
970        self.wal.append_finalized_prepared_frame_bytes(
971            cx,
972            &prepared.frame_bytes,
973            prepared.frame_count(),
974            Self::finalized_running_checksum(prepared)?,
975            prepared.last_commit_frame_offset,
976        )?;
977        self.refresh_before_append = false;
978        let last_commit_frame = self.record_appended_frames(
979            start_frame_index,
980            prepared
981                .frame_metas
982                .iter()
983                .map(|frame| (frame.page_number, frame.db_size_if_commit)),
984        );
985
986        #[cfg(not(target_arch = "wasm32"))]
987        if let Some(hook) = &mut self.fec_hook {
988            for (index, frame) in prepared.frame_metas.iter().enumerate() {
989                match hook.on_frame(
990                    cx,
991                    frame.page_number,
992                    prepared.page_data(index),
993                    frame.db_size_if_commit,
994                ) {
995                    Ok(Some(result)) => {
996                        debug!(
997                            pages = result.page_numbers.len(),
998                            k_source = result.k_source,
999                            symbols = result.symbols.len(),
1000                            "FEC commit group encoded"
1001                        );
1002                        self.fec_pending.push(result);
1003                    }
1004                    Ok(None) => {}
1005                    Err(e) => {
1006                        warn!(
1007                            error = %e,
1008                            "FEC encoding failed; commit proceeds without repair symbols"
1009                        );
1010                    }
1011                }
1012            }
1013        }
1014
1015        if let Some(last_commit_frame) = last_commit_frame {
1016            self.publish_pending_commit_snapshot(
1017                cx,
1018                last_commit_frame,
1019                "append_prepared_frames_commit",
1020            )?;
1021        }
1022
1023        Ok(())
1024    }
1025
1026    fn read_page(&mut self, cx: &Cx, page_number: u32) -> Result<Option<Vec<u8>>> {
1027        let snapshot = if let Some(snapshot) = self.read_snapshot.clone() {
1028            snapshot
1029        } else {
1030            self.publish_latest_committed_snapshot(cx, "read_page_unpinned")?;
1031            self.published_snapshot.clone()
1032        };
1033        if snapshot.last_commit_frame.is_none() {
1034            return Ok(None);
1035        }
1036        let snapshot_age = self
1037            .published_snapshot
1038            .publication_seq
1039            .saturating_sub(snapshot.publication_seq);
1040
1041        let resolution = self.resolve_visible_frame(cx, &snapshot, page_number)?;
1042        let Some(frame_index) = resolution.frame_index() else {
1043            debug!(
1044                page_number,
1045                wal_checkpoint_seq = snapshot.generation.checkpoint_seq,
1046                wal_salt1 = snapshot.generation.salts.salt1,
1047                wal_salt2 = snapshot.generation.salts.salt2,
1048                publication_seq = snapshot.publication_seq,
1049                snapshot_age,
1050                lookup_mode = resolution.lookup_mode(),
1051                fallback_reason = resolution.fallback_reason(),
1052                "WAL adapter: page absent from current generation"
1053            );
1054            return Ok(None);
1055        };
1056
1057        // Read the frame data at the resolved position.
1058        let mut frame_buf = vec![0u8; self.wal.frame_size()];
1059        let header = self.wal.read_frame_into(cx, frame_index, &mut frame_buf)?;
1060
1061        // Runtime integrity check: verify the frame actually contains our page.
1062        // This guards against index corruption or stale entries.
1063        if header.page_number != page_number {
1064            return Err(FrankenError::WalCorrupt {
1065                detail: format!(
1066                    "WAL page index integrity failure: expected page {page_number} \
1067                     at frame {frame_index}, found page {}",
1068                    header.page_number
1069                ),
1070            });
1071        }
1072
1073        let data = frame_buf[fsqlite_wal::checksum::WAL_FRAME_HEADER_SIZE..].to_vec();
1074        debug!(
1075            page_number,
1076            frame_index,
1077            wal_checkpoint_seq = snapshot.generation.checkpoint_seq,
1078            wal_salt1 = snapshot.generation.salts.salt1,
1079            wal_salt2 = snapshot.generation.salts.salt2,
1080            publication_seq = snapshot.publication_seq,
1081            snapshot_age,
1082            lookup_mode = resolution.lookup_mode(),
1083            fallback_reason = resolution.fallback_reason(),
1084            "WAL adapter: resolved page from current WAL generation"
1085        );
1086        Ok(Some(data))
1087    }
1088
1089    fn committed_txns_since_page(&mut self, cx: &Cx, page_number: u32) -> Result<u64> {
1090        let snapshot = if let Some(snapshot) = self.read_snapshot.clone() {
1091            snapshot
1092        } else {
1093            self.publish_latest_committed_snapshot(cx, "committed_txns_since_page")?;
1094            self.published_snapshot.clone()
1095        };
1096        let Some(last_commit_frame) = snapshot.last_commit_frame else {
1097            return Ok(0);
1098        };
1099
1100        let resolution = self.resolve_visible_frame(cx, &snapshot, page_number)?;
1101        let Some(last_page_frame) = resolution.frame_index() else {
1102            let mut total_commits = 0_u64;
1103            for frame_index in 0..=last_commit_frame {
1104                if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
1105                    total_commits = total_commits.saturating_add(1);
1106                }
1107            }
1108            return Ok(total_commits);
1109        };
1110
1111        let mut page_commit_frame = None;
1112        for frame_index in last_page_frame..=last_commit_frame {
1113            if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
1114                page_commit_frame = Some(frame_index);
1115                break;
1116            }
1117        }
1118
1119        let Some(page_commit_frame) = page_commit_frame else {
1120            return Ok(0);
1121        };
1122
1123        let mut committed_txns_after_page = 0_u64;
1124        for frame_index in page_commit_frame.saturating_add(1)..=last_commit_frame {
1125            if self.wal.read_frame_header(cx, frame_index)?.is_commit() {
1126                committed_txns_after_page = committed_txns_after_page.saturating_add(1);
1127            }
1128        }
1129
1130        Ok(committed_txns_after_page)
1131    }
1132
1133    fn committed_txn_count(&mut self, cx: &Cx) -> Result<u64> {
1134        let snapshot = if let Some(snapshot) = self.read_snapshot.clone() {
1135            snapshot
1136        } else {
1137            self.publish_latest_committed_snapshot(cx, "committed_txn_count")?;
1138            self.published_snapshot.clone()
1139        };
1140        Ok(snapshot.commit_count)
1141    }
1142
1143    fn sync(&mut self, cx: &Cx) -> Result<()> {
1144        let result = self.wal.sync(cx, SyncFlags::NORMAL);
1145        self.refresh_before_append = true;
1146        result
1147    }
1148
1149    fn frame_count(&self) -> usize {
1150        self.wal.frame_count()
1151    }
1152
1153    fn checkpoint(
1154        &mut self,
1155        cx: &Cx,
1156        mode: CheckpointMode,
1157        writer: &mut dyn CheckpointPageWriter,
1158        backfilled_frames: u32,
1159        oldest_reader_frame: Option<u32>,
1160    ) -> Result<CheckpointResult> {
1161        // Refresh so planner state reflects the latest on-disk WAL shape.
1162        self.wal.refresh(cx)?;
1163        self.refresh_before_append = true;
1164        let total_frames = u32::try_from(self.wal.frame_count()).unwrap_or(u32::MAX);
1165
1166        // Build checkpoint state for the planner.
1167        let state = CheckpointState {
1168            total_frames,
1169            backfilled_frames,
1170            oldest_reader_frame,
1171        };
1172
1173        // Wrap the CheckpointPageWriter in a CheckpointTargetAdapter.
1174        let mut target = CheckpointTargetAdapterRef { writer };
1175
1176        // Execute the checkpoint.
1177        let result = execute_checkpoint(cx, &mut self.wal, to_wal_mode(mode), state, &mut target)?;
1178
1179        // Checkpoint-aware FEC lifecycle: once frames are backfilled to the
1180        // database file, their FEC symbols are no longer needed.  Clear
1181        // pending FEC results for the checkpointed range.
1182        #[cfg(not(target_arch = "wasm32"))]
1183        if result.frames_backfilled > 0 {
1184            let drained = self.fec_pending.len();
1185            self.fec_pending.clear();
1186            if drained > 0 {
1187                debug!(
1188                    drained_groups = drained,
1189                    frames_backfilled = result.frames_backfilled,
1190                    "FEC symbols reclaimed after checkpoint"
1191                );
1192            }
1193        }
1194
1195        // If the WAL was fully reset, also discard any buffered FEC pages
1196        // and invalidate the page index (salts changed).
1197        #[cfg(not(target_arch = "wasm32"))]
1198        if result.wal_was_reset {
1199            self.fec_discard();
1200        }
1201        if result.wal_was_reset {
1202            self.invalidate_publication();
1203        }
1204
1205        self.publish_latest_committed_snapshot(cx, "checkpoint")?;
1206
1207        Ok(CheckpointResult {
1208            total_frames,
1209            frames_backfilled: result.frames_backfilled,
1210            completed: result.plan.completes_checkpoint(),
1211            wal_was_reset: result.wal_was_reset,
1212        })
1213    }
1214}
1215
1216/// Adapter wrapping a `&mut dyn CheckpointPageWriter` to implement `CheckpointTarget`.
1217///
1218/// This is used internally by `WalBackendAdapter::checkpoint` to bridge the
1219/// pager's writer to the WAL executor's target trait.
1220struct CheckpointTargetAdapterRef<'a> {
1221    writer: &'a mut dyn CheckpointPageWriter,
1222}
1223
1224impl CheckpointTarget for CheckpointTargetAdapterRef<'_> {
1225    fn write_page(&mut self, cx: &Cx, page_no: PageNumber, data: &[u8]) -> Result<()> {
1226        self.writer.write_page(cx, page_no, data)
1227    }
1228
1229    fn truncate_db(&mut self, cx: &Cx, n_pages: u32) -> Result<()> {
1230        self.writer.truncate(cx, n_pages)
1231    }
1232
1233    fn sync_db(&mut self, cx: &Cx) -> Result<()> {
1234        self.writer.sync(cx)
1235    }
1236}
1237
1238// ---------------------------------------------------------------------------
1239// Tests
1240// ---------------------------------------------------------------------------
1241
1242#[cfg(test)]
1243mod tests {
1244    use std::sync::OnceLock;
1245
1246    use fsqlite_pager::MockCheckpointPageWriter;
1247    use fsqlite_pager::traits::WalFrameRef;
1248    use fsqlite_types::flags::VfsOpenFlags;
1249    use fsqlite_vfs::MemoryVfs;
1250    use fsqlite_vfs::traits::Vfs;
1251    use fsqlite_wal::checksum::WalSalts;
1252
1253    use super::*;
1254
1255    const PAGE_SIZE: u32 = 4096;
1256
1257    fn init_wal_publication_test_tracing() {
1258        static TRACING_INIT: OnceLock<()> = OnceLock::new();
1259        TRACING_INIT.get_or_init(|| {
1260            if tracing_subscriber::fmt()
1261                .with_ansi(false)
1262                .with_max_level(tracing::Level::TRACE)
1263                .with_test_writer()
1264                .try_init()
1265                .is_err()
1266            {
1267                // Another test already installed a global subscriber.
1268            }
1269        });
1270    }
1271
1272    fn test_cx() -> Cx {
1273        Cx::default()
1274    }
1275
1276    fn test_salts() -> WalSalts {
1277        WalSalts {
1278            salt1: 0xDEAD_BEEF,
1279            salt2: 0xCAFE_BABE,
1280        }
1281    }
1282
1283    fn sample_page(seed: u8) -> Vec<u8> {
1284        let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
1285        let mut page = vec![0u8; page_size];
1286        for (i, byte) in page.iter_mut().enumerate() {
1287            let reduced = u8::try_from(i % 251).expect("modulo fits u8");
1288            *byte = reduced ^ seed;
1289        }
1290        page
1291    }
1292
1293    fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
1294        let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
1295        let (file, _) = vfs
1296            .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
1297            .expect("open WAL file");
1298        file
1299    }
1300
1301    fn make_adapter(vfs: &MemoryVfs, cx: &Cx) -> WalBackendAdapter<<MemoryVfs as Vfs>::File> {
1302        let file = open_wal_file(vfs, cx);
1303        let wal = WalFile::create(cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1304        WalBackendAdapter::new(wal)
1305    }
1306
1307    // -- WalBackendAdapter tests --
1308
1309    #[test]
1310    fn test_adapter_append_and_frame_count() {
1311        let cx = test_cx();
1312        let vfs = MemoryVfs::new();
1313        let mut adapter = make_adapter(&vfs, &cx);
1314
1315        assert_eq!(adapter.frame_count(), 0);
1316
1317        let page = sample_page(0x42);
1318        adapter
1319            .append_frame(&cx, 1, &page, 0)
1320            .expect("append frame");
1321        assert_eq!(adapter.frame_count(), 1);
1322
1323        adapter
1324            .append_frame(&cx, 2, &sample_page(0x43), 2)
1325            .expect("append commit frame");
1326        assert_eq!(adapter.frame_count(), 2);
1327    }
1328
1329    #[test]
1330    fn test_adapter_read_page_found() {
1331        let cx = test_cx();
1332        let vfs = MemoryVfs::new();
1333        let mut adapter = make_adapter(&vfs, &cx);
1334
1335        let page1 = sample_page(0x10);
1336        let page2 = sample_page(0x20);
1337        adapter.append_frame(&cx, 1, &page1, 0).expect("append");
1338        adapter
1339            .append_frame(&cx, 2, &page2, 2)
1340            .expect("append commit");
1341
1342        let result = adapter.read_page(&cx, 1).expect("read page 1");
1343        assert_eq!(result, Some(page1));
1344
1345        let result = adapter.read_page(&cx, 2).expect("read page 2");
1346        assert_eq!(result, Some(page2));
1347    }
1348
1349    #[test]
1350    fn test_adapter_read_page_not_found() {
1351        let cx = test_cx();
1352        let vfs = MemoryVfs::new();
1353        let mut adapter = make_adapter(&vfs, &cx);
1354
1355        adapter
1356            .append_frame(&cx, 1, &sample_page(0x10), 1)
1357            .expect("append");
1358
1359        let result = adapter.read_page(&cx, 99).expect("read missing page");
1360        assert_eq!(result, None);
1361    }
1362
1363    #[test]
1364    fn test_adapter_read_page_returns_latest_version() {
1365        let cx = test_cx();
1366        let vfs = MemoryVfs::new();
1367        let mut adapter = make_adapter(&vfs, &cx);
1368
1369        let old_data = sample_page(0xAA);
1370        let new_data = sample_page(0xBB);
1371
1372        // Write page 5 twice -- the adapter should return the latest.
1373        adapter
1374            .append_frame(&cx, 5, &old_data, 0)
1375            .expect("append old");
1376        adapter
1377            .append_frame(&cx, 5, &new_data, 1)
1378            .expect("append new (commit)");
1379
1380        let result = adapter.read_page(&cx, 5).expect("read page 5");
1381        assert_eq!(
1382            result,
1383            Some(new_data),
1384            "adapter should return the latest WAL version"
1385        );
1386    }
1387
1388    #[test]
1389    fn test_adapter_refreshes_cross_handle_visibility_and_append_position() {
1390        let cx = test_cx();
1391        let vfs = MemoryVfs::new();
1392
1393        let file1 = open_wal_file(&vfs, &cx);
1394        let wal1 = WalFile::create(&cx, file1, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1395        let mut adapter1 = WalBackendAdapter::new(wal1);
1396
1397        let file2 = open_wal_file(&vfs, &cx);
1398        let wal2 = WalFile::open(&cx, file2).expect("open WAL");
1399        let mut adapter2 = WalBackendAdapter::new(wal2);
1400
1401        let page1 = sample_page(0x11);
1402        adapter1
1403            .append_frame(&cx, 1, &page1, 1)
1404            .expect("adapter1 append commit");
1405        adapter1.sync(&cx).expect("adapter1 sync");
1406        adapter2
1407            .begin_transaction(&cx)
1408            .expect("adapter2 begin transaction");
1409        assert_eq!(
1410            adapter2.read_page(&cx, 1).expect("adapter2 read page1"),
1411            Some(page1.clone()),
1412            "adapter2 should observe adapter1 commit at transaction begin"
1413        );
1414
1415        let page2 = sample_page(0x22);
1416        adapter2
1417            .append_frame(&cx, 2, &page2, 2)
1418            .expect("adapter2 append commit");
1419        adapter2.sync(&cx).expect("adapter2 sync");
1420        adapter1
1421            .begin_transaction(&cx)
1422            .expect("adapter1 begin transaction");
1423        assert_eq!(
1424            adapter1.read_page(&cx, 2).expect("adapter1 read page2"),
1425            Some(page2.clone()),
1426            "adapter1 should observe adapter2 commit at transaction begin"
1427        );
1428
1429        // Ensure the second writer appended to frame 1 (not frame 0 overwrite).
1430        assert_eq!(
1431            adapter1.frame_count(),
1432            2,
1433            "shared WAL should contain both commit frames"
1434        );
1435        assert_eq!(
1436            adapter2.frame_count(),
1437            2,
1438            "shared WAL should contain both commit frames"
1439        );
1440    }
1441
1442    #[test]
1443    fn test_adapter_batch_append_checksum_chain_matches_single_append() {
1444        let cx = test_cx();
1445        let vfs_single = MemoryVfs::new();
1446        let vfs_batch = MemoryVfs::new();
1447
1448        let mut adapter_single = make_adapter(&vfs_single, &cx);
1449        let mut adapter_batch = make_adapter(&vfs_batch, &cx);
1450
1451        let pages: Vec<Vec<u8>> = (0..4u8).map(sample_page).collect();
1452        let commit_sizes = [0_u32, 0, 0, 4];
1453
1454        for (index, page) in pages.iter().enumerate() {
1455            adapter_single
1456                .append_frame(
1457                    &cx,
1458                    u32::try_from(index + 1).expect("page number fits u32"),
1459                    page,
1460                    commit_sizes[index],
1461                )
1462                .expect("single append");
1463        }
1464
1465        let batch_frames: Vec<_> = pages
1466            .iter()
1467            .enumerate()
1468            .map(|(index, page)| WalFrameRef {
1469                page_number: u32::try_from(index + 1).expect("page number fits u32"),
1470                page_data: page,
1471                db_size_if_commit: commit_sizes[index],
1472            })
1473            .collect();
1474        adapter_batch
1475            .append_frames(&cx, &batch_frames)
1476            .expect("batch append");
1477
1478        assert_eq!(
1479            adapter_single.frame_count(),
1480            adapter_batch.frame_count(),
1481            "batch adapter append must preserve frame count"
1482        );
1483        assert_eq!(
1484            adapter_single.wal.running_checksum(),
1485            adapter_batch.wal.running_checksum(),
1486            "batch adapter append must preserve checksum chain"
1487        );
1488
1489        for frame_index in 0..pages.len() {
1490            let (single_header, single_data) = adapter_single
1491                .wal
1492                .read_frame(&cx, frame_index)
1493                .expect("read single frame");
1494            let (batch_header, batch_data) = adapter_batch
1495                .wal
1496                .read_frame(&cx, frame_index)
1497                .expect("read batch frame");
1498            assert_eq!(
1499                single_header, batch_header,
1500                "frame header {frame_index} must match"
1501            );
1502            assert_eq!(
1503                single_data, batch_data,
1504                "frame payload {frame_index} must match"
1505            );
1506        }
1507    }
1508
1509    #[test]
1510    fn test_adapter_prepared_batch_append_checksum_chain_matches_single_append() {
1511        let cx = test_cx();
1512        let vfs_single = MemoryVfs::new();
1513        let vfs_prepared = MemoryVfs::new();
1514
1515        let mut adapter_single = make_adapter(&vfs_single, &cx);
1516        let mut adapter_prepared = make_adapter(&vfs_prepared, &cx);
1517
1518        let pages: Vec<Vec<u8>> = (0..4u8).map(sample_page).collect();
1519        let commit_sizes = [0_u32, 0, 0, 4];
1520
1521        for (index, page) in pages.iter().enumerate() {
1522            adapter_single
1523                .append_frame(
1524                    &cx,
1525                    u32::try_from(index + 1).expect("page number fits u32"),
1526                    page,
1527                    commit_sizes[index],
1528                )
1529                .expect("single append");
1530        }
1531
1532        let batch_frames: Vec<_> = pages
1533            .iter()
1534            .enumerate()
1535            .map(|(index, page)| WalFrameRef {
1536                page_number: u32::try_from(index + 1).expect("page number fits u32"),
1537                page_data: page,
1538                db_size_if_commit: commit_sizes[index],
1539            })
1540            .collect();
1541        let mut prepared = adapter_prepared
1542            .prepare_append_frames(&batch_frames)
1543            .expect("prepare append")
1544            .expect("prepared batch");
1545        adapter_prepared
1546            .append_prepared_frames(&cx, &mut prepared)
1547            .expect("append prepared");
1548
1549        assert_eq!(
1550            adapter_single.frame_count(),
1551            adapter_prepared.frame_count(),
1552            "prepared adapter append must preserve frame count"
1553        );
1554        assert_eq!(
1555            adapter_single.wal.running_checksum(),
1556            adapter_prepared.wal.running_checksum(),
1557            "prepared adapter append must preserve checksum chain"
1558        );
1559
1560        for frame_index in 0..pages.len() {
1561            let (single_header, single_data) = adapter_single
1562                .wal
1563                .read_frame(&cx, frame_index)
1564                .expect("read single frame");
1565            let (prepared_header, prepared_data) = adapter_prepared
1566                .wal
1567                .read_frame(&cx, frame_index)
1568                .expect("read prepared frame");
1569            assert_eq!(
1570                single_header, prepared_header,
1571                "frame header {frame_index} must match"
1572            );
1573            assert_eq!(
1574                single_data, prepared_data,
1575                "frame payload {frame_index} must match"
1576            );
1577        }
1578    }
1579
1580    #[test]
1581    fn test_adapter_pre_finalize_reused_when_append_window_is_stable() {
1582        let cx = test_cx();
1583        let vfs_single = MemoryVfs::new();
1584        let vfs_prepared = MemoryVfs::new();
1585
1586        let mut adapter_single = make_adapter(&vfs_single, &cx);
1587        let mut adapter_prepared = make_adapter(&vfs_prepared, &cx);
1588
1589        let pages: Vec<Vec<u8>> = (0..3u8).map(sample_page).collect();
1590        let commit_sizes = [0_u32, 0, 3];
1591
1592        for (index, page) in pages.iter().enumerate() {
1593            adapter_single
1594                .append_frame(
1595                    &cx,
1596                    u32::try_from(index + 1).expect("page number fits u32"),
1597                    page,
1598                    commit_sizes[index],
1599                )
1600                .expect("single append");
1601        }
1602
1603        let batch_frames: Vec<_> = pages
1604            .iter()
1605            .enumerate()
1606            .map(|(index, page)| WalFrameRef {
1607                page_number: u32::try_from(index + 1).expect("page number fits u32"),
1608                page_data: page,
1609                db_size_if_commit: commit_sizes[index],
1610            })
1611            .collect();
1612        let mut prepared = adapter_prepared
1613            .prepare_append_frames(&batch_frames)
1614            .expect("prepare append")
1615            .expect("prepared batch");
1616        adapter_prepared
1617            .finalize_prepared_frames(&cx, &mut prepared)
1618            .expect("pre-finalize prepared batch");
1619        let finalized_for = prepared.finalized_for.expect("finalization state");
1620        let finalized_running_checksum = prepared
1621            .finalized_running_checksum
1622            .expect("finalized checksum");
1623
1624        adapter_prepared
1625            .append_prepared_frames(&cx, &mut prepared)
1626            .expect("append prepared");
1627
1628        assert_eq!(
1629            prepared.finalized_for,
1630            Some(finalized_for),
1631            "stable append window should reuse the pre-lock finalization state"
1632        );
1633        assert_eq!(
1634            prepared.finalized_running_checksum,
1635            Some(finalized_running_checksum),
1636            "stable append window should reuse the pre-lock finalized checksum"
1637        );
1638        assert_eq!(
1639            adapter_single.wal.running_checksum(),
1640            adapter_prepared.wal.running_checksum(),
1641            "stable reuse path must preserve checksum chain"
1642        );
1643    }
1644
1645    #[test]
1646    fn test_adapter_pre_finalize_reseeds_after_intervening_external_append() {
1647        let cx = test_cx();
1648        let baseline_vfs = MemoryVfs::new();
1649        let shared_vfs = MemoryVfs::new();
1650
1651        let mut baseline = make_adapter(&baseline_vfs, &cx);
1652        let mut prepared_writer = make_adapter(&shared_vfs, &cx);
1653        let intruder_file = open_wal_file(&shared_vfs, &cx);
1654        let intruder_wal = WalFile::open(&cx, intruder_file).expect("open shared WAL");
1655        let mut intruder = WalBackendAdapter::new(intruder_wal);
1656
1657        let pages: Vec<Vec<u8>> = (0..3u8).map(sample_page).collect();
1658        let commit_sizes = [0_u32, 0, 3];
1659        let intruder_page = sample_page(0xEE);
1660
1661        baseline
1662            .append_frame(&cx, 99, &intruder_page, 1)
1663            .expect("baseline intruder append");
1664        for (index, page) in pages.iter().enumerate() {
1665            baseline
1666                .append_frame(
1667                    &cx,
1668                    u32::try_from(index + 1).expect("page number fits u32"),
1669                    page,
1670                    commit_sizes[index],
1671                )
1672                .expect("baseline append");
1673        }
1674
1675        let batch_frames: Vec<_> = pages
1676            .iter()
1677            .enumerate()
1678            .map(|(index, page)| WalFrameRef {
1679                page_number: u32::try_from(index + 1).expect("page number fits u32"),
1680                page_data: page,
1681                db_size_if_commit: commit_sizes[index],
1682            })
1683            .collect();
1684        let mut prepared = prepared_writer
1685            .prepare_append_frames(&batch_frames)
1686            .expect("prepare append")
1687            .expect("prepared batch");
1688        prepared_writer
1689            .finalize_prepared_frames(&cx, &mut prepared)
1690            .expect("pre-finalize prepared batch");
1691        let stale_finalization_state = prepared.finalized_for;
1692
1693        intruder
1694            .append_frame(&cx, 99, &intruder_page, 1)
1695            .expect("intruder append");
1696        intruder.sync(&cx).expect("intruder sync");
1697
1698        prepared_writer
1699            .append_prepared_frames(&cx, &mut prepared)
1700            .expect("append prepared after external growth");
1701
1702        assert_ne!(
1703            prepared.finalized_for, stale_finalization_state,
1704            "intervening external growth should force prepared batch reseeding"
1705        );
1706        assert_eq!(
1707            baseline.wal.running_checksum(),
1708            prepared_writer.wal.running_checksum(),
1709            "reseeding path must preserve checksum chain"
1710        );
1711        assert_eq!(
1712            baseline.frame_count(),
1713            prepared_writer.frame_count(),
1714            "reseeding path must preserve frame count"
1715        );
1716    }
1717
1718    #[test]
1719    fn test_adapter_pins_read_snapshot_until_next_begin() {
1720        init_wal_publication_test_tracing();
1721        let cx = test_cx();
1722        let vfs = MemoryVfs::new();
1723
1724        let file_writer = open_wal_file(&vfs, &cx);
1725        let wal_writer =
1726            WalFile::create(&cx, file_writer, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1727        let mut writer = WalBackendAdapter::new(wal_writer);
1728
1729        let file_reader = open_wal_file(&vfs, &cx);
1730        let wal_reader = WalFile::open(&cx, file_reader).expect("open WAL");
1731        let mut reader = WalBackendAdapter::new(wal_reader);
1732
1733        let v1 = sample_page(0x41);
1734        writer.append_frame(&cx, 3, &v1, 3).expect("append v1");
1735        writer.sync(&cx).expect("sync v1");
1736
1737        reader
1738            .begin_transaction(&cx)
1739            .expect("begin reader snapshot 1");
1740        assert_eq!(
1741            reader.read_page(&cx, 3).expect("reader sees v1"),
1742            Some(v1.clone())
1743        );
1744
1745        let v2 = sample_page(0x42);
1746        writer.append_frame(&cx, 3, &v2, 3).expect("append v2");
1747        writer.sync(&cx).expect("sync v2");
1748
1749        // Same transaction snapshot must stay stable (no mid-transaction drift).
1750        assert_eq!(
1751            reader
1752                .read_page(&cx, 3)
1753                .expect("reader remains on pinned snapshot"),
1754            Some(v1.clone())
1755        );
1756
1757        // A new transaction snapshot should pick up the latest commit.
1758        reader
1759            .begin_transaction(&cx)
1760            .expect("begin reader snapshot 2");
1761        assert_eq!(reader.read_page(&cx, 3).expect("reader sees v2"), Some(v2));
1762    }
1763
1764    #[test]
1765    fn test_adapter_read_page_hides_uncommitted_frames() {
1766        let cx = test_cx();
1767        let vfs = MemoryVfs::new();
1768        let mut adapter = make_adapter(&vfs, &cx);
1769
1770        let committed = sample_page(0x31);
1771        let uncommitted = sample_page(0x32);
1772
1773        adapter
1774            .append_frame(&cx, 7, &committed, 7)
1775            .expect("append committed frame");
1776        adapter
1777            .append_frame(&cx, 7, &uncommitted, 0)
1778            .expect("append uncommitted frame");
1779
1780        let result = adapter.read_page(&cx, 7).expect("read committed page");
1781        assert_eq!(
1782            result,
1783            Some(committed),
1784            "reader must ignore uncommitted tail frames"
1785        );
1786    }
1787
1788    #[test]
1789    fn test_adapter_read_page_none_when_wal_has_no_commit_frame() {
1790        let cx = test_cx();
1791        let vfs = MemoryVfs::new();
1792        let mut adapter = make_adapter(&vfs, &cx);
1793
1794        adapter
1795            .append_frame(&cx, 3, &sample_page(0x44), 0)
1796            .expect("append uncommitted frame");
1797
1798        let result = adapter.read_page(&cx, 3).expect("read page");
1799        assert_eq!(result, None, "uncommitted WAL frames must stay invisible");
1800    }
1801
1802    #[test]
1803    fn test_adapter_read_page_empty_wal() {
1804        let cx = test_cx();
1805        let vfs = MemoryVfs::new();
1806        let mut adapter = make_adapter(&vfs, &cx);
1807
1808        let result = adapter.read_page(&cx, 1).expect("read from empty WAL");
1809        assert_eq!(result, None);
1810    }
1811
1812    #[test]
1813    fn test_adapter_sync() {
1814        let cx = test_cx();
1815        let vfs = MemoryVfs::new();
1816        let mut adapter = make_adapter(&vfs, &cx);
1817
1818        adapter
1819            .append_frame(&cx, 1, &sample_page(0), 1)
1820            .expect("append");
1821        adapter.sync(&cx).expect("sync should not fail");
1822    }
1823
1824    #[test]
1825    fn test_adapter_into_inner_round_trip() {
1826        let cx = test_cx();
1827        let vfs = MemoryVfs::new();
1828        let mut adapter = make_adapter(&vfs, &cx);
1829
1830        adapter
1831            .append_frame(&cx, 1, &sample_page(0), 1)
1832            .expect("append");
1833
1834        assert_eq!(adapter.inner().frame_count(), 1);
1835
1836        let wal = adapter.into_inner();
1837        assert_eq!(wal.frame_count(), 1);
1838    }
1839
1840    #[test]
1841    fn test_adapter_as_dyn_wal_backend() {
1842        let cx = test_cx();
1843        let vfs = MemoryVfs::new();
1844        let mut adapter = make_adapter(&vfs, &cx);
1845
1846        // Verify it can be used as a trait object.
1847        let backend: &mut dyn WalBackend = &mut adapter;
1848        backend
1849            .append_frame(&cx, 1, &sample_page(0x77), 1)
1850            .expect("append via dyn");
1851        assert_eq!(backend.frame_count(), 1);
1852
1853        let page = backend.read_page(&cx, 1).expect("read via dyn");
1854        assert_eq!(page, Some(sample_page(0x77)));
1855    }
1856
1857    // -- Page index O(1) lookup tests --
1858
1859    #[test]
1860    fn test_page_index_returns_correct_data() {
1861        // Write several pages, verify O(1) index returns the right data.
1862        let cx = test_cx();
1863        let vfs = MemoryVfs::new();
1864        let mut adapter = make_adapter(&vfs, &cx);
1865
1866        let page1 = sample_page(0x01);
1867        let page2 = sample_page(0x02);
1868        let page3 = sample_page(0x03);
1869
1870        adapter.append_frame(&cx, 1, &page1, 0).expect("append");
1871        adapter.append_frame(&cx, 2, &page2, 0).expect("append");
1872        adapter
1873            .append_frame(&cx, 3, &page3, 3)
1874            .expect("append commit");
1875
1876        // All three pages should be readable via the index.
1877        assert_eq!(adapter.read_page(&cx, 1).expect("read"), Some(page1));
1878        assert_eq!(adapter.read_page(&cx, 2).expect("read"), Some(page2));
1879        assert_eq!(adapter.read_page(&cx, 3).expect("read"), Some(page3));
1880
1881        // Non-existent page returns None.
1882        assert_eq!(adapter.read_page(&cx, 99).expect("read"), None);
1883    }
1884
1885    #[test]
1886    fn test_page_index_returns_latest_version() {
1887        // Write the same page twice; the index should point to the newer frame.
1888        let cx = test_cx();
1889        let vfs = MemoryVfs::new();
1890        let mut adapter = make_adapter(&vfs, &cx);
1891
1892        let old_data = sample_page(0xAA);
1893        let new_data = sample_page(0xBB);
1894
1895        adapter
1896            .append_frame(&cx, 5, &old_data, 0)
1897            .expect("append old");
1898        adapter
1899            .append_frame(&cx, 5, &new_data, 1)
1900            .expect("append new (commit)");
1901
1902        assert_eq!(
1903            adapter.read_page(&cx, 5).expect("read"),
1904            Some(new_data),
1905            "page index must return the latest frame for a page"
1906        );
1907    }
1908
1909    #[test]
1910    fn test_page_index_invalidated_on_wal_reset() {
1911        // Simulate a WAL reset with new salts. The index must be rebuilt so
1912        // stale entries from the old generation are not returned.
1913        let cx = test_cx();
1914        let vfs = MemoryVfs::new();
1915        let mut adapter = make_adapter(&vfs, &cx);
1916
1917        let old_data = sample_page(0x11);
1918        adapter
1919            .append_frame(&cx, 1, &old_data, 1)
1920            .expect("append commit");
1921
1922        // Read page 1 to populate the index.
1923        assert_eq!(adapter.read_page(&cx, 1).expect("read old"), Some(old_data));
1924
1925        // Reset WAL with new salts (simulates checkpoint reset).
1926        let new_salts = WalSalts {
1927            salt1: 0xAAAA_BBBB,
1928            salt2: 0xCCCC_DDDD,
1929        };
1930        adapter
1931            .inner_mut()
1932            .reset(&cx, 1, new_salts, false)
1933            .expect("WAL reset");
1934
1935        // Write new data for the same page number in the new generation.
1936        let new_data = sample_page(0x22);
1937        adapter
1938            .append_frame(&cx, 1, &new_data, 1)
1939            .expect("append new generation commit");
1940
1941        // The index must have been invalidated; we should get the new data.
1942        let result = adapter.read_page(&cx, 1).expect("read after reset");
1943        assert_eq!(
1944            result,
1945            Some(new_data),
1946            "after WAL reset, page index must return new-generation data, not stale cached data"
1947        );
1948
1949        // A page that existed only in the old generation should be gone.
1950        let old_only = sample_page(0x33);
1951        // (We never wrote page 99 in the new generation.)
1952        assert_eq!(
1953            adapter.read_page(&cx, 99).expect("read non-existent"),
1954            None,
1955            "pages from old WAL generation must not appear after reset"
1956        );
1957        // Suppress unused variable warning.
1958        drop(old_only);
1959    }
1960
1961    #[test]
1962    fn test_page_index_invalidated_on_same_salt_generation_change() {
1963        init_wal_publication_test_tracing();
1964        // Generation identity must include checkpoint_seq. Reusing salts across
1965        // reset must still invalidate the cached page index and avoid ABA bugs.
1966        let cx = test_cx();
1967        let vfs = MemoryVfs::new();
1968        let mut adapter = make_adapter(&vfs, &cx);
1969
1970        let reused_salts = adapter.inner().header().salts;
1971        let old_data = sample_page(0x11);
1972        adapter
1973            .append_frame(&cx, 1, &old_data, 1)
1974            .expect("append commit");
1975        assert_eq!(adapter.read_page(&cx, 1).expect("read old"), Some(old_data));
1976
1977        adapter
1978            .inner_mut()
1979            .reset(&cx, 1, reused_salts, false)
1980            .expect("reset with same salts");
1981        let new_data = sample_page(0x22);
1982        adapter
1983            .append_frame(&cx, 2, &new_data, 2)
1984            .expect("append new generation commit");
1985
1986        assert_eq!(
1987            adapter.read_page(&cx, 1).expect("old page should be gone"),
1988            None,
1989            "cached index entries from the previous generation must be invalidated"
1990        );
1991        assert_eq!(
1992            adapter.read_page(&cx, 2).expect("read new page"),
1993            Some(new_data),
1994            "adapter must resolve pages from the new generation even when salts are reused"
1995        );
1996    }
1997
1998    #[test]
1999    fn test_page_index_incremental_extend() {
2000        // Verify that the index extends incrementally when new frames are committed.
2001        let cx = test_cx();
2002        let vfs = MemoryVfs::new();
2003        let mut adapter = make_adapter(&vfs, &cx);
2004
2005        let page1 = sample_page(0x10);
2006        adapter
2007            .append_frame(&cx, 1, &page1, 1)
2008            .expect("append commit 1");
2009
2010        // First read builds the index.
2011        assert_eq!(
2012            adapter.read_page(&cx, 1).expect("read"),
2013            Some(page1.clone())
2014        );
2015
2016        // Append more committed frames.
2017        let page2 = sample_page(0x20);
2018        let page1_v2 = sample_page(0x30);
2019        adapter
2020            .append_frame(&cx, 2, &page2, 0)
2021            .expect("append page 2");
2022        adapter
2023            .append_frame(&cx, 1, &page1_v2, 3)
2024            .expect("append page 1 v2 (commit)");
2025
2026        // Reading should trigger incremental extend, not full rebuild.
2027        assert_eq!(
2028            adapter.read_page(&cx, 1).expect("read page 1 v2"),
2029            Some(page1_v2),
2030            "incremental index extend should pick up the updated page"
2031        );
2032        assert_eq!(adapter.read_page(&cx, 2).expect("read page 2"), Some(page2));
2033    }
2034
2035    #[test]
2036    fn test_commit_append_publishes_visibility_snapshot() {
2037        init_wal_publication_test_tracing();
2038        let cx = test_cx();
2039        let vfs = MemoryVfs::new();
2040        let mut adapter = make_adapter(&vfs, &cx);
2041
2042        let p1 = sample_page(0x41);
2043        let p2 = sample_page(0x42);
2044        adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2045        adapter.append_frame(&cx, 2, &p2, 2).expect("append commit");
2046
2047        assert_eq!(
2048            adapter.published_snapshot.last_commit_frame,
2049            Some(1),
2050            "commit append should publish the visible commit horizon"
2051        );
2052        assert_eq!(
2053            adapter.published_snapshot.commit_count, 1,
2054            "commit append should track the visible WAL commit count"
2055        );
2056        assert_eq!(
2057            adapter.published_snapshot.page_index.len(),
2058            2,
2059            "published snapshot should track both committed pages"
2060        );
2061        assert_eq!(
2062            adapter.published_snapshot.page_index.get(&2),
2063            Some(&1),
2064            "published snapshot must map each page to its latest committed frame"
2065        );
2066    }
2067
2068    #[test]
2069    fn test_prepared_append_publishes_visibility_snapshot() {
2070        init_wal_publication_test_tracing();
2071        let cx = test_cx();
2072        let vfs = MemoryVfs::new();
2073        let mut adapter = make_adapter(&vfs, &cx);
2074
2075        let p1 = sample_page(0x51);
2076        let p2 = sample_page(0x52);
2077        let frames = [
2078            WalFrameRef {
2079                page_number: 1,
2080                page_data: &p1,
2081                db_size_if_commit: 0,
2082            },
2083            WalFrameRef {
2084                page_number: 2,
2085                page_data: &p2,
2086                db_size_if_commit: 2,
2087            },
2088        ];
2089        let mut prepared = adapter
2090            .prepare_append_frames(&frames)
2091            .expect("prepare append")
2092            .expect("prepared batch");
2093        adapter
2094            .append_prepared_frames(&cx, &mut prepared)
2095            .expect("append prepared");
2096
2097        assert_eq!(
2098            adapter.published_snapshot.last_commit_frame,
2099            Some(1),
2100            "prepared commit append should publish the visible commit horizon"
2101        );
2102        assert_eq!(
2103            adapter.published_snapshot.commit_count, 1,
2104            "prepared commit append should track the visible WAL commit count"
2105        );
2106        assert_eq!(
2107            adapter.published_snapshot.page_index.len(),
2108            2,
2109            "prepared commit append should publish all committed pages"
2110        );
2111        assert_eq!(
2112            adapter.published_snapshot.page_index.get(&2),
2113            Some(&1),
2114            "prepared commit append must map each page to its latest committed frame"
2115        );
2116    }
2117
2118    #[test]
2119    fn test_commit_publication_refreshes_external_prefix_before_local_commit() {
2120        let cx = test_cx();
2121        let vfs = MemoryVfs::new();
2122
2123        let file_writer = open_wal_file(&vfs, &cx);
2124        let wal_writer =
2125            WalFile::create(&cx, file_writer, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2126        let mut writer = WalBackendAdapter::new(wal_writer);
2127
2128        let file_follower = open_wal_file(&vfs, &cx);
2129        let wal_follower = WalFile::open(&cx, file_follower).expect("open WAL");
2130        let mut follower = WalBackendAdapter::new(wal_follower);
2131
2132        let p1 = sample_page(0x61);
2133        writer
2134            .append_frame(&cx, 1, &p1, 1)
2135            .expect("writer commit 1");
2136        writer.sync(&cx).expect("sync writer commit 1");
2137
2138        let p2 = sample_page(0x62);
2139        writer
2140            .append_frame(&cx, 2, &p2, 2)
2141            .expect("writer commit 2");
2142        writer.sync(&cx).expect("sync writer commit 2");
2143
2144        let p3 = sample_page(0x63);
2145        follower
2146            .append_frame(&cx, 3, &p3, 3)
2147            .expect("follower local commit");
2148
2149        assert_eq!(
2150            follower.published_snapshot.last_commit_frame,
2151            Some(2),
2152            "local commit should publish on top of refreshed external WAL state"
2153        );
2154        assert_eq!(
2155            follower.published_snapshot.commit_count, 3,
2156            "local commit publication should include refreshed external commits"
2157        );
2158        assert_eq!(
2159            follower.published_snapshot.page_index.get(&1),
2160            Some(&0),
2161            "refresh-before-append should preserve earlier committed pages"
2162        );
2163        assert_eq!(
2164            follower.published_snapshot.page_index.get(&2),
2165            Some(&1),
2166            "refresh-before-append should publish externally committed pages"
2167        );
2168        assert_eq!(
2169            follower.published_snapshot.page_index.get(&3),
2170            Some(&2),
2171            "local commit should extend the published WAL visibility map"
2172        );
2173        assert_eq!(follower.read_page(&cx, 1).expect("read p1"), Some(p1));
2174        assert_eq!(follower.read_page(&cx, 2).expect("read p2"), Some(p2));
2175        assert_eq!(follower.read_page(&cx, 3).expect("read p3"), Some(p3));
2176    }
2177
2178    // -- Partial index fallback tests --
2179
2180    #[test]
2181    fn test_partial_index_falls_back_to_linear_scan() {
2182        init_wal_publication_test_tracing();
2183        // Verify that when the page index cap is hit, pages that weren't
2184        // indexed are still found via the backwards linear scan fallback.
2185        let cx = test_cx();
2186        let vfs = MemoryVfs::new();
2187        let mut adapter = make_adapter(&vfs, &cx);
2188
2189        // Set a very small cap so we can trigger the partial-index path
2190        // with just a handful of frames.
2191        adapter.set_page_index_cap(2);
2192
2193        // Write 5 distinct pages.  With a cap of 2, only the first 2 unique
2194        // pages will be indexed; pages 3-5 will be dropped from the index.
2195        let p1 = sample_page(0x01);
2196        let p2 = sample_page(0x02);
2197        let p3 = sample_page(0x03);
2198        let p4 = sample_page(0x04);
2199        let p5 = sample_page(0x05);
2200
2201        adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2202        adapter.append_frame(&cx, 2, &p2, 0).expect("append p2");
2203        adapter.append_frame(&cx, 3, &p3, 0).expect("append p3");
2204        adapter.append_frame(&cx, 4, &p4, 0).expect("append p4");
2205        adapter
2206            .append_frame(&cx, 5, &p5, 5)
2207            .expect("append p5 (commit)");
2208
2209        // Pages 1 and 2 should be in the index (fast path).
2210        assert_eq!(
2211            adapter.read_page(&cx, 1).expect("read p1"),
2212            Some(p1),
2213            "indexed page should be found via HashMap"
2214        );
2215        assert_eq!(
2216            adapter.read_page(&cx, 2).expect("read p2"),
2217            Some(p2),
2218            "indexed page should be found via HashMap"
2219        );
2220
2221        // Pages 3-5 were NOT indexed, but must still be found via the
2222        // backwards linear scan fallback.
2223        assert_eq!(
2224            adapter.read_page(&cx, 3).expect("read p3"),
2225            Some(p3),
2226            "non-indexed page must be found via linear scan fallback"
2227        );
2228        assert_eq!(
2229            adapter.read_page(&cx, 4).expect("read p4"),
2230            Some(p4),
2231            "non-indexed page must be found via linear scan fallback"
2232        );
2233        assert_eq!(
2234            adapter.read_page(&cx, 5).expect("read p5"),
2235            Some(p5),
2236            "non-indexed page must be found via linear scan fallback"
2237        );
2238
2239        // A page that was never written should still return None.
2240        assert_eq!(
2241            adapter.read_page(&cx, 99).expect("read non-existent"),
2242            None,
2243            "non-existent page must return None even with partial index"
2244        );
2245
2246        // Verify the index was indeed marked partial.
2247        assert!(
2248            adapter.published_snapshot.index_is_partial,
2249            "index_is_partial should be true when cap is exceeded"
2250        );
2251    }
2252
2253    #[test]
2254    fn test_partial_index_returns_latest_version_via_fallback() {
2255        // When the same page appears multiple times and overflows the index,
2256        // the backwards scan must return the LATEST (highest frame index)
2257        // version, not the first one it encounters in a forward scan.
2258        let cx = test_cx();
2259        let vfs = MemoryVfs::new();
2260        let mut adapter = make_adapter(&vfs, &cx);
2261
2262        // Cap at 1 so only page 1 fits in the index.
2263        adapter.set_page_index_cap(1);
2264
2265        let old_p2 = sample_page(0xAA);
2266        let new_p2 = sample_page(0xBB);
2267
2268        // Frame 0: page 1 (indexed)
2269        adapter
2270            .append_frame(&cx, 1, &sample_page(0x01), 0)
2271            .expect("append p1");
2272        // Frame 1: page 2 old version (NOT indexed -- cap exceeded)
2273        adapter
2274            .append_frame(&cx, 2, &old_p2, 0)
2275            .expect("append p2 old");
2276        // Frame 2: page 2 new version (NOT indexed -- cap exceeded, and
2277        // page 2 is not already in the index so it won't be updated)
2278        adapter
2279            .append_frame(&cx, 2, &new_p2, 3)
2280            .expect("append p2 new (commit)");
2281
2282        // The backwards scan from frame 2 should find the newest version first.
2283        assert_eq!(
2284            adapter.read_page(&cx, 2).expect("read p2"),
2285            Some(new_p2),
2286            "backwards scan must return the most recent frame for the page"
2287        );
2288    }
2289
2290    #[test]
2291    fn test_lookup_contract_distinguishes_authoritative_and_fallback_paths() {
2292        init_wal_publication_test_tracing();
2293        let cx = test_cx();
2294        let vfs = MemoryVfs::new();
2295        let mut adapter = make_adapter(&vfs, &cx);
2296        adapter.set_page_index_cap(1);
2297
2298        let p1 = sample_page(0x01);
2299        let p2 = sample_page(0x02);
2300        adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2301        adapter
2302            .append_frame(&cx, 2, &p2, 2)
2303            .expect("append p2 commit");
2304
2305        let last_commit = adapter
2306            .inner_mut()
2307            .last_commit_frame(&cx)
2308            .expect("last commit")
2309            .expect("commit exists");
2310        adapter
2311            .publish_visible_snapshot(&cx, Some(last_commit), "lookup_contract_test")
2312            .expect("build published snapshot");
2313        let snapshot = adapter.published_snapshot.clone();
2314
2315        assert_eq!(
2316            adapter
2317                .resolve_visible_frame(&cx, &snapshot, 1)
2318                .expect("resolve indexed page"),
2319            WalPageLookupResolution::AuthoritativeHit { frame_index: 0 }
2320        );
2321        assert_eq!(
2322            adapter
2323                .resolve_visible_frame(&cx, &snapshot, 2)
2324                .expect("resolve fallback page"),
2325            WalPageLookupResolution::PartialIndexFallbackHit { frame_index: 1 }
2326        );
2327        assert_eq!(
2328            adapter
2329                .resolve_visible_frame(&cx, &snapshot, 99)
2330                .expect("resolve missing page"),
2331            WalPageLookupResolution::PartialIndexFallbackMiss
2332        );
2333    }
2334
2335    #[test]
2336    fn test_lookup_contract_is_authoritative_by_default() {
2337        let cx = test_cx();
2338        let vfs = MemoryVfs::new();
2339        let mut adapter = make_adapter(&vfs, &cx);
2340
2341        let p1 = sample_page(0x11);
2342        let p2 = sample_page(0x22);
2343        adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2344        adapter
2345            .append_frame(&cx, 2, &p2, 2)
2346            .expect("append p2 commit");
2347
2348        let last_commit = adapter
2349            .inner_mut()
2350            .last_commit_frame(&cx)
2351            .expect("last commit")
2352            .expect("commit exists");
2353        adapter
2354            .publish_visible_snapshot(&cx, Some(last_commit), "lookup_contract_default")
2355            .expect("build published snapshot");
2356        let snapshot = adapter.published_snapshot.clone();
2357
2358        assert!(
2359            !snapshot.index_is_partial,
2360            "default index should be authoritative"
2361        );
2362        assert_eq!(
2363            adapter
2364                .resolve_visible_frame(&cx, &snapshot, 1)
2365                .expect("resolve page 1"),
2366            WalPageLookupResolution::AuthoritativeHit { frame_index: 0 }
2367        );
2368        assert_eq!(
2369            adapter
2370                .resolve_visible_frame(&cx, &snapshot, 2)
2371                .expect("resolve page 2"),
2372            WalPageLookupResolution::AuthoritativeHit { frame_index: 1 }
2373        );
2374        assert_eq!(
2375            adapter
2376                .resolve_visible_frame(&cx, &snapshot, 99)
2377                .expect("resolve missing page"),
2378            WalPageLookupResolution::AuthoritativeMiss
2379        );
2380    }
2381
2382    #[test]
2383    fn test_committed_txns_since_page_uses_visible_frame_horizon() {
2384        let cx = test_cx();
2385        let vfs = MemoryVfs::new();
2386        let mut adapter = make_adapter(&vfs, &cx);
2387
2388        let p1 = sample_page(0x31);
2389        let p2 = sample_page(0x32);
2390        let p3 = sample_page(0x33);
2391
2392        adapter.append_frame(&cx, 1, &p1, 0).expect("append p1");
2393        adapter.append_frame(&cx, 2, &p2, 2).expect("commit tx1");
2394        adapter.append_frame(&cx, 3, &p3, 0).expect("append p3");
2395        adapter.append_frame(&cx, 2, &p2, 3).expect("commit tx2");
2396
2397        assert_eq!(
2398            adapter
2399                .committed_txns_since_page(&cx, 1)
2400                .expect("count txns since page 1"),
2401            1
2402        );
2403        assert_eq!(
2404            adapter
2405                .committed_txns_since_page(&cx, 2)
2406                .expect("count txns since page 2"),
2407            0
2408        );
2409        assert_eq!(
2410            adapter
2411                .committed_txns_since_page(&cx, 99)
2412                .expect("count txns since missing page"),
2413            2
2414        );
2415        assert_eq!(
2416            adapter
2417                .committed_txn_count(&cx)
2418                .expect("count visible transactions"),
2419            2
2420        );
2421    }
2422
2423    // -- CheckpointTargetAdapterRef tests --
2424
2425    #[test]
2426    fn test_checkpoint_adapter_write_page() {
2427        let cx = test_cx();
2428        let mut writer = MockCheckpointPageWriter;
2429        let mut adapter = CheckpointTargetAdapterRef {
2430            writer: &mut writer,
2431        };
2432
2433        let page_no = PageNumber::new(1).expect("valid page number");
2434        adapter
2435            .write_page(&cx, page_no, &[0u8; 4096])
2436            .expect("write_page");
2437    }
2438
2439    #[test]
2440    fn test_checkpoint_adapter_truncate_db() {
2441        let cx = test_cx();
2442        let mut writer = MockCheckpointPageWriter;
2443        let mut adapter = CheckpointTargetAdapterRef {
2444            writer: &mut writer,
2445        };
2446
2447        adapter.truncate_db(&cx, 10).expect("truncate_db");
2448    }
2449
2450    #[test]
2451    fn test_checkpoint_adapter_sync_db() {
2452        let cx = test_cx();
2453        let mut writer = MockCheckpointPageWriter;
2454        let mut adapter = CheckpointTargetAdapterRef {
2455            writer: &mut writer,
2456        };
2457
2458        adapter.sync_db(&cx).expect("sync_db");
2459    }
2460
2461    #[test]
2462    fn test_checkpoint_adapter_as_dyn_target() {
2463        let cx = test_cx();
2464        let mut writer = MockCheckpointPageWriter;
2465        let mut adapter = CheckpointTargetAdapterRef {
2466            writer: &mut writer,
2467        };
2468
2469        // Verify it can be used as a trait object.
2470        let target: &mut dyn CheckpointTarget = &mut adapter;
2471        let page_no = PageNumber::new(3).expect("valid page number");
2472        target
2473            .write_page(&cx, page_no, &[0u8; 4096])
2474            .expect("write via dyn");
2475        target.truncate_db(&cx, 5).expect("truncate via dyn");
2476        target.sync_db(&cx).expect("sync via dyn");
2477    }
2478}