Skip to main content

fsqlite_core/
native_index.rs

1//! Native index lookup, repair, and deterministic segment construction
2//! (§3.6.4-§3.6.7).
3//!
4//! This module implements the read-path lookup algorithm used by Native mode:
5//! cache -> presence filter -> index scan -> fetch+materialize.
6
7use std::collections::BTreeMap;
8
9use crate::commit_marker::{
10    CommitMarkerRecord, MARKER_SEGMENT_HEADER_BYTES, MarkerSegmentHeader, recover_valid_prefix,
11};
12use fsqlite_error::{FrankenError, Result};
13use fsqlite_types::ecs::{PageVersionIndexSegment, PatchKind, VersionPointer};
14use fsqlite_types::{ObjectId, PageNumber};
15use tracing::{debug, error, info, warn};
16
17const NATIVE_INDEX_BEAD_ID: &str = "bd-1hi.32";
18const NATIVE_INDEX_REPAIR_BEAD_ID: &str = "bd-1hi.33";
19const NATIVE_INDEX_LOGGING_STANDARD: &str = "bd-1fpm";
20const MAX_PATCH_DEPTH: usize = 8;
21const DEFAULT_MAX_REPAIR_SYMBOL_LOSS_RATE: f64 = 0.25;
22
23/// Provider for base-page bytes (step 2 fallback / step 4 materialization).
24pub trait BasePageProvider {
25    /// Load a base page image.
26    ///
27    /// # Errors
28    ///
29    /// Returns an I/O or corruption error when the page cannot be read.
30    fn load_base_page(&self, page: PageNumber) -> Result<Vec<u8>>;
31}
32
33/// Provider for ECS patch object payload bytes.
34pub trait PatchObjectStore {
35    /// Fetch an ECS object payload by `ObjectId`.
36    ///
37    /// # Errors
38    ///
39    /// Returns an error when the object is missing/corrupt/unreadable.
40    fn fetch_patch_object(&self, object_id: ObjectId) -> Result<Vec<u8>>;
41}
42
43/// Hot-path cache keyed by `(page, snapshot_high)`.
44#[derive(Debug, Default, Clone, PartialEq, Eq)]
45pub struct NativePageCache {
46    entries: BTreeMap<(u32, u64), Vec<u8>>,
47}
48
49impl NativePageCache {
50    /// Insert a materialized page for a specific visibility bound.
51    pub fn insert(&mut self, page: PageNumber, snapshot_high: u64, bytes: Vec<u8>) {
52        self.entries.insert((page.get(), snapshot_high), bytes);
53    }
54
55    /// Get cached page bytes for `(page, snapshot_high)`.
56    #[must_use]
57    pub fn get(&self, page: PageNumber, snapshot_high: u64) -> Option<&[u8]> {
58        self.entries
59            .get(&(page.get(), snapshot_high))
60            .map(Vec::as_slice)
61    }
62}
63
64/// Structured debug trace for a single lookup.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub struct LookupTrace {
67    /// True when step 1 returned from cache.
68    pub cache_hit: bool,
69    /// True when the presence filter indicated a possible version.
70    pub filter_hit: bool,
71    /// Number of index segments scanned during step 3.
72    pub segment_scans: u64,
73    /// Resolved commit sequence, if any.
74    pub resolved_commit_seq: Option<u64>,
75}
76
77/// Result of native lookup.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct LookupResult {
80    /// Materialized page bytes visible under the snapshot.
81    pub page_bytes: Vec<u8>,
82    /// Resolved version pointer (if lookup found one).
83    pub resolved_pointer: Option<VersionPointer>,
84    /// Structured path telemetry.
85    pub trace: LookupTrace,
86}
87
88/// Deterministic index segment build output.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct BuiltIndexSegment {
91    /// Built segment payload.
92    pub segment: PageVersionIndexSegment,
93    /// Content-addressed deterministic object id for the segment.
94    pub object_id: ObjectId,
95}
96
97/// Input policy for repair/rebuild aggressiveness (§3.6.7).
98#[derive(Debug, Clone, Copy, PartialEq)]
99pub struct BoldnessConstraint {
100    /// Permit emergency linear scan when the native index is unavailable.
101    pub allow_emergency_linear_scan: bool,
102    /// Maximum tolerated symbol-loss estimate before aggressive repair is blocked.
103    pub max_repair_symbol_loss_rate: f64,
104}
105
106impl BoldnessConstraint {
107    /// Strict policy used by default: no emergency scans and conservative repair.
108    #[must_use]
109    pub const fn strict() -> Self {
110        Self {
111            allow_emergency_linear_scan: false,
112            max_repair_symbol_loss_rate: DEFAULT_MAX_REPAIR_SYMBOL_LOSS_RATE,
113        }
114    }
115
116    /// Emergency policy enabling linear-scan fallback.
117    #[must_use]
118    pub const fn emergency() -> Self {
119        Self {
120            allow_emergency_linear_scan: true,
121            max_repair_symbol_loss_rate: DEFAULT_MAX_REPAIR_SYMBOL_LOSS_RATE,
122        }
123    }
124
125    #[must_use]
126    fn permits_repair(self, symbol_loss_rate_estimate: f64) -> bool {
127        symbol_loss_rate_estimate <= self.max_repair_symbol_loss_rate
128    }
129}
130
131impl Default for BoldnessConstraint {
132    fn default() -> Self {
133        Self::strict()
134    }
135}
136
137/// Manifest entry for one native index segment object.
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub struct NativeIndexSegmentRef {
140    /// Inclusive start commit sequence covered by the segment.
141    pub start_seq: u64,
142    /// Inclusive end commit sequence covered by the segment.
143    pub end_seq: u64,
144    /// Deterministic ECS object id of the segment payload.
145    pub object_id: ObjectId,
146}
147
148/// Source of persisted index segments.
149pub trait NativeIndexSegmentStore {
150    /// Load one index segment object by id.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error when the segment is unavailable or corrupted.
155    fn fetch_index_segment(&self, object_id: ObjectId) -> Result<PageVersionIndexSegment>;
156}
157
158/// Source of per-commit page-version updates recoverable from commit capsules.
159pub trait CommitCapsuleIndexSource {
160    /// Return all page updates encoded by a commit capsule.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error when the capsule cannot be decoded or verified.
165    fn updates_for_commit(
166        &self,
167        commit_seq: u64,
168        capsule_object_id: ObjectId,
169    ) -> Result<Vec<(PageNumber, VersionPointer)>>;
170}
171
172/// Summary of native index repair attempts.
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexRepairReport {
175    /// Repaired segments ordered by `end_seq`.
176    pub segments: Vec<PageVersionIndexSegment>,
177    /// Segments recovered from local symbols.
178    pub repaired_from_local: u64,
179    /// Segments recovered from remote symbols.
180    pub repaired_from_remote: u64,
181}
182
183/// Summary of deterministic full rebuild results.
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct IndexRebuildReport {
186    /// Ordered commit markers used to rebuild the index.
187    pub markers: Vec<CommitMarkerRecord>,
188    /// Built index segments.
189    pub segments: Vec<BuiltIndexSegment>,
190}
191
192/// In-memory deterministic builder for `PageVersionIndexSegment`.
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct SegmentBuilder {
195    max_entries: usize,
196    start_seq: Option<u64>,
197    end_seq: Option<u64>,
198    pending: BTreeMap<(u32, u64), VersionPointer>,
199}
200
201impl SegmentBuilder {
202    /// Create a new builder.
203    ///
204    /// # Errors
205    ///
206    /// Returns [`FrankenError::OutOfRange`] when `max_entries == 0`.
207    pub fn new(max_entries: usize) -> Result<Self> {
208        if max_entries == 0 {
209            return Err(FrankenError::OutOfRange {
210                what: "segment_builder.max_entries".to_owned(),
211                value: "0".to_owned(),
212            });
213        }
214        Ok(Self {
215            max_entries,
216            start_seq: None,
217            end_seq: None,
218            pending: BTreeMap::new(),
219        })
220    }
221
222    /// Ingest one commit worth of page-version updates.
223    ///
224    /// Returns `Some(segment)` when the builder auto-flushes due to `max_entries`.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`FrankenError::TypeMismatch`] if a pointer commit sequence does
229    /// not match `commit_seq`.
230    pub fn ingest_commit(
231        &mut self,
232        commit_seq: u64,
233        updates: impl IntoIterator<Item = (PageNumber, VersionPointer)>,
234    ) -> Result<Option<BuiltIndexSegment>> {
235        debug!(
236            bead_id = NATIVE_INDEX_BEAD_ID,
237            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
238            commit_seq = commit_seq,
239            "segment builder ingesting commit updates"
240        );
241
242        for (page, pointer) in updates {
243            if pointer.commit_seq != commit_seq {
244                return Err(FrankenError::TypeMismatch {
245                    expected: format!("pointer.commit_seq == {commit_seq}"),
246                    actual: pointer.commit_seq.to_string(),
247                });
248            }
249            self.pending
250                .insert((page.get(), pointer.commit_seq), pointer);
251        }
252
253        self.start_seq = Some(match self.start_seq {
254            Some(start) => start.min(commit_seq),
255            None => commit_seq,
256        });
257        self.end_seq = Some(match self.end_seq {
258            Some(end) => end.max(commit_seq),
259            None => commit_seq,
260        });
261
262        if self.pending.len() >= self.max_entries {
263            self.flush()
264        } else {
265            Ok(None)
266        }
267    }
268
269    /// Flush pending updates into a deterministic segment.
270    ///
271    /// # Errors
272    ///
273    /// Returns corruption errors when internal state is inconsistent.
274    pub fn flush(&mut self) -> Result<Option<BuiltIndexSegment>> {
275        if self.pending.is_empty() {
276            return Ok(None);
277        }
278
279        let start_seq = self
280            .start_seq
281            .ok_or_else(|| FrankenError::DatabaseCorrupt {
282                detail: "segment builder missing start_seq".to_owned(),
283            })?;
284        let end_seq = self.end_seq.ok_or_else(|| FrankenError::DatabaseCorrupt {
285            detail: "segment builder missing end_seq".to_owned(),
286        })?;
287
288        let mut entries = Vec::with_capacity(self.pending.len());
289        for ((page_raw, _commit_seq), pointer) in &self.pending {
290            let page = PageNumber::new(*page_raw).ok_or_else(|| FrankenError::DatabaseCorrupt {
291                detail: format!("segment builder produced invalid page number {page_raw}"),
292            })?;
293            entries.push((page, *pointer));
294        }
295
296        let segment = PageVersionIndexSegment::new(start_seq, end_seq, entries);
297        let object_id = derive_segment_object_id(&segment);
298
299        info!(
300            bead_id = NATIVE_INDEX_BEAD_ID,
301            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
302            start_seq = start_seq,
303            end_seq = end_seq,
304            segments_built = 1_u8,
305            entry_count = segment.entries.len(),
306            object_id = %object_id,
307            "segment builder flush complete"
308        );
309
310        self.pending.clear();
311        self.start_seq = None;
312        self.end_seq = None;
313
314        Ok(Some(BuiltIndexSegment { segment, object_id }))
315    }
316}
317
318/// Perform native index lookup for one page under `snapshot_high`.
319///
320/// Implements:
321/// 1) cache lookup
322/// 2) presence filter check (fast negative path)
323/// 3) backward segment scan
324/// 4) patch fetch + materialization.
325///
326/// # Errors
327///
328/// Returns corruption errors for malformed patches and provider errors for I/O.
329pub fn lookup_page_version(
330    page: PageNumber,
331    snapshot_high: u64,
332    segments: &[PageVersionIndexSegment],
333    cache: &mut NativePageCache,
334    base_provider: &impl BasePageProvider,
335    patch_store: &impl PatchObjectStore,
336    symbol_loss_rate_estimate: f64,
337) -> Result<LookupResult> {
338    debug!(
339        bead_id = NATIVE_INDEX_BEAD_ID,
340        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
341        page = page.get(),
342        snapshot_high = snapshot_high,
343        "native index lookup started"
344    );
345    let mut deps = LookupDeps {
346        cache,
347        base_provider,
348        patch_store,
349        symbol_loss_rate_estimate,
350    };
351
352    if let Some(cached) = deps.cache.get(page, snapshot_high) {
353        log_lookup_path(true, false, 0, None);
354        return Ok(LookupResult {
355            page_bytes: cached.to_vec(),
356            resolved_pointer: None,
357            trace: LookupTrace {
358                cache_hit: true,
359                filter_hit: false,
360                segment_scans: 0,
361                resolved_commit_seq: None,
362            },
363        });
364    }
365
366    let filter_hit = version_maybe_present(page, snapshot_high, segments);
367    if !filter_hit {
368        return base_fallback_result(
369            page,
370            snapshot_high,
371            deps.cache,
372            deps.base_provider,
373            false,
374            0,
375        );
376    }
377
378    let (resolved_pointer, segment_scans) =
379        lookup_pointer_in_segments(page, snapshot_high, segments);
380    let Some(pointer) = resolved_pointer else {
381        return base_fallback_result(
382            page,
383            snapshot_high,
384            deps.cache,
385            deps.base_provider,
386            true,
387            segment_scans,
388        );
389    };
390
391    materialized_result(page, snapshot_high, pointer, segment_scans, &mut deps)
392}
393
394struct LookupDeps<'a> {
395    cache: &'a mut NativePageCache,
396    base_provider: &'a dyn BasePageProvider,
397    patch_store: &'a dyn PatchObjectStore,
398    symbol_loss_rate_estimate: f64,
399}
400
401fn base_fallback_result(
402    page: PageNumber,
403    snapshot_high: u64,
404    cache: &mut NativePageCache,
405    base_provider: &(impl BasePageProvider + ?Sized),
406    filter_hit: bool,
407    segment_scans: u64,
408) -> Result<LookupResult> {
409    let base = base_provider.load_base_page(page)?;
410    cache.insert(page, snapshot_high, base.clone());
411    log_lookup_path(false, filter_hit, segment_scans, None);
412    Ok(LookupResult {
413        page_bytes: base,
414        resolved_pointer: None,
415        trace: LookupTrace {
416            cache_hit: false,
417            filter_hit,
418            segment_scans,
419            resolved_commit_seq: None,
420        },
421    })
422}
423
424fn materialized_result(
425    page: PageNumber,
426    snapshot_high: u64,
427    pointer: VersionPointer,
428    segment_scans: u64,
429    deps: &mut LookupDeps<'_>,
430) -> Result<LookupResult> {
431    let patch_bytes = deps.patch_store.fetch_patch_object(pointer.patch_object)?;
432    let base_bytes = deps.base_provider.load_base_page(page)?;
433    let page_bytes = materialize_patch(pointer, &patch_bytes, &base_bytes, deps.patch_store, 0)?;
434    if pointer.patch_kind != PatchKind::FullImage {
435        warn!(
436            bead_id = NATIVE_INDEX_BEAD_ID,
437            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
438            object_id = %pointer.patch_object,
439            symbol_loss_rate_estimate = deps.symbol_loss_rate_estimate,
440            "native index lookup used repair/materialization path"
441        );
442    }
443
444    deps.cache.insert(page, snapshot_high, page_bytes.clone());
445    log_lookup_path(false, true, segment_scans, Some(pointer.commit_seq));
446    info!(
447        bead_id = NATIVE_INDEX_BEAD_ID,
448        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
449        page = page.get(),
450        snapshot_high = snapshot_high,
451        resolved_commit_seq = pointer.commit_seq,
452        "native index lookup resolved page version"
453    );
454    Ok(LookupResult {
455        page_bytes,
456        resolved_pointer: Some(pointer),
457        trace: LookupTrace {
458            cache_hit: false,
459            filter_hit: true,
460            segment_scans,
461            resolved_commit_seq: Some(pointer.commit_seq),
462        },
463    })
464}
465
466fn log_lookup_path(
467    cache_hit: bool,
468    filter_hit: bool,
469    segment_scans: u64,
470    resolved_commit_seq: Option<u64>,
471) {
472    debug!(
473        bead_id = NATIVE_INDEX_BEAD_ID,
474        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
475        cache_hit = cache_hit,
476        filter_hit = filter_hit,
477        segment_scans = segment_scans,
478        resolved_commit_seq = ?resolved_commit_seq,
479        "lookup path chosen"
480    );
481}
482
483#[must_use]
484fn version_maybe_present(
485    page: PageNumber,
486    snapshot_high: u64,
487    segments: &[PageVersionIndexSegment],
488) -> bool {
489    segments
490        .iter()
491        .any(|segment| segment.start_seq <= snapshot_high && segment.bloom.maybe_contains(page))
492}
493
494#[must_use]
495fn lookup_pointer_in_segments(
496    page: PageNumber,
497    snapshot_high: u64,
498    segments: &[PageVersionIndexSegment],
499) -> (Option<VersionPointer>, u64) {
500    let mut ordered: Vec<&PageVersionIndexSegment> = segments.iter().collect();
501    ordered.sort_by_key(|segment| segment.end_seq);
502    ordered.reverse();
503
504    let mut scans = 0_u64;
505    for segment in ordered {
506        if segment.start_seq > snapshot_high {
507            continue;
508        }
509        scans = scans.saturating_add(1);
510        if let Some(pointer) = segment.lookup(page, snapshot_high) {
511            return (Some(*pointer), scans);
512        }
513    }
514    (None, scans)
515}
516
517fn materialize_patch(
518    pointer: VersionPointer,
519    patch_bytes: &[u8],
520    base_page: &[u8],
521    patch_store: &(impl PatchObjectStore + ?Sized),
522    depth: usize,
523) -> Result<Vec<u8>> {
524    if depth > MAX_PATCH_DEPTH {
525        error!(
526            bead_id = NATIVE_INDEX_BEAD_ID,
527            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
528            reason_code = "materialize_depth_exceeded",
529            depth = depth,
530            "recursive patch materialization depth exceeded"
531        );
532        return Err(FrankenError::DatabaseCorrupt {
533            detail: "patch materialization depth exceeded".to_owned(),
534        });
535    }
536
537    match pointer.patch_kind {
538        PatchKind::FullImage => Ok(patch_bytes.to_vec()),
539        PatchKind::IntentLog => {
540            let mut out = resolve_base_bytes(pointer, base_page, patch_store)?;
541            apply_intent_log_patch(&mut out, patch_bytes)?;
542            Ok(out)
543        }
544        PatchKind::SparseXor => {
545            let mut out = resolve_base_bytes(pointer, base_page, patch_store)?;
546            apply_sparse_xor_patch(&mut out, patch_bytes)?;
547            Ok(out)
548        }
549    }
550}
551
552fn resolve_base_bytes(
553    pointer: VersionPointer,
554    base_page: &[u8],
555    patch_store: &(impl PatchObjectStore + ?Sized),
556) -> Result<Vec<u8>> {
557    if let Some(base_object) = pointer.base_hint {
558        patch_store.fetch_patch_object(base_object)
559    } else {
560        Ok(base_page.to_vec())
561    }
562}
563
564fn apply_intent_log_patch(out: &mut [u8], patch_bytes: &[u8]) -> Result<()> {
565    let mut cursor = 0_usize;
566    let op_count = read_u8(patch_bytes, &mut cursor, "intent.op_count")?;
567    for op_index in 0..op_count {
568        let offset = usize::from(read_u16_le(patch_bytes, &mut cursor, "intent.op.offset")?);
569        let len = usize::from(read_u16_le(patch_bytes, &mut cursor, "intent.op.len")?);
570        let data = read_slice(patch_bytes, &mut cursor, len, "intent.op.data")?;
571        let end = offset
572            .checked_add(len)
573            .ok_or_else(|| FrankenError::DatabaseCorrupt {
574                detail: "intent patch offset overflow".to_owned(),
575            })?;
576        if end > out.len() {
577            return Err(FrankenError::DatabaseCorrupt {
578                detail: format!(
579                    "intent patch op {op_index} out of bounds: end={end}, page_len={}",
580                    out.len()
581                ),
582            });
583        }
584        out[offset..end].copy_from_slice(data);
585    }
586    if cursor != patch_bytes.len() {
587        return Err(FrankenError::DatabaseCorrupt {
588            detail: format!(
589                "intent patch trailing bytes: parsed={cursor}, actual={}",
590                patch_bytes.len()
591            ),
592        });
593    }
594    Ok(())
595}
596
597fn apply_sparse_xor_patch(out: &mut [u8], patch_bytes: &[u8]) -> Result<()> {
598    let mut cursor = 0_usize;
599    let op_count = read_u8(patch_bytes, &mut cursor, "xor.op_count")?;
600    for op_index in 0..op_count {
601        let offset = usize::from(read_u16_le(patch_bytes, &mut cursor, "xor.op.offset")?);
602        let len = usize::from(read_u16_le(patch_bytes, &mut cursor, "xor.op.len")?);
603        let data = read_slice(patch_bytes, &mut cursor, len, "xor.op.data")?;
604        let end = offset
605            .checked_add(len)
606            .ok_or_else(|| FrankenError::DatabaseCorrupt {
607                detail: "sparse-xor patch offset overflow".to_owned(),
608            })?;
609        if end > out.len() {
610            return Err(FrankenError::DatabaseCorrupt {
611                detail: format!(
612                    "sparse-xor patch op {op_index} out of bounds: end={end}, page_len={}",
613                    out.len()
614                ),
615            });
616        }
617        for (dst, delta) in out[offset..end].iter_mut().zip(data.iter()) {
618            *dst ^= *delta;
619        }
620    }
621    if cursor != patch_bytes.len() {
622        return Err(FrankenError::DatabaseCorrupt {
623            detail: format!(
624                "sparse-xor patch trailing bytes: parsed={cursor}, actual={}",
625                patch_bytes.len()
626            ),
627        });
628    }
629    Ok(())
630}
631
632fn read_u8(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u8> {
633    let end = cursor
634        .checked_add(1)
635        .ok_or_else(|| FrankenError::DatabaseCorrupt {
636            detail: format!("{field} overflow"),
637        })?;
638    if end > bytes.len() {
639        return Err(FrankenError::DatabaseCorrupt {
640            detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
641        });
642    }
643    let value = bytes[*cursor];
644    *cursor = end;
645    Ok(value)
646}
647
648fn read_u16_le(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u16> {
649    let end = cursor
650        .checked_add(2)
651        .ok_or_else(|| FrankenError::DatabaseCorrupt {
652            detail: format!("{field} overflow"),
653        })?;
654    if end > bytes.len() {
655        return Err(FrankenError::DatabaseCorrupt {
656            detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
657        });
658    }
659    let raw = [bytes[*cursor], bytes[*cursor + 1]];
660    *cursor = end;
661    Ok(u16::from_le_bytes(raw))
662}
663
664fn read_slice<'a>(
665    bytes: &'a [u8],
666    cursor: &mut usize,
667    len: usize,
668    field: &str,
669) -> Result<&'a [u8]> {
670    let end = cursor
671        .checked_add(len)
672        .ok_or_else(|| FrankenError::DatabaseCorrupt {
673            detail: format!("{field} overflow"),
674        })?;
675    if end > bytes.len() {
676        return Err(FrankenError::DatabaseCorrupt {
677            detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
678        });
679    }
680    let slice = &bytes[*cursor..end];
681    *cursor = end;
682    Ok(slice)
683}
684
685/// Deterministically derive the ECS object id of an index segment.
686#[must_use]
687pub fn derive_segment_object_id(segment: &PageVersionIndexSegment) -> ObjectId {
688    let canonical = canonical_segment_bytes(segment);
689    ObjectId::derive_from_canonical_bytes(&canonical)
690}
691
692#[must_use]
693fn canonical_segment_bytes(segment: &PageVersionIndexSegment) -> Vec<u8> {
694    let mut out = Vec::new();
695    out.extend_from_slice(&segment.start_seq.to_le_bytes());
696    out.extend_from_slice(&segment.end_seq.to_le_bytes());
697    let count = u64::try_from(segment.entries.len()).unwrap_or(u64::MAX);
698    out.extend_from_slice(&count.to_le_bytes());
699    for (page, pointer) in &segment.entries {
700        out.extend_from_slice(&page.get().to_le_bytes());
701        let vp_bytes = pointer.to_bytes();
702        let vp_len = u64::try_from(vp_bytes.len()).unwrap_or(u64::MAX);
703        out.extend_from_slice(&vp_len.to_le_bytes());
704        out.extend_from_slice(&vp_bytes);
705    }
706    out
707}
708
709/// Critical preflight check before repair/rebuild attempts.
710///
711/// # Errors
712///
713/// Returns [`FrankenError::DatabaseCorrupt`] with
714/// `reason_code=index_unrebuildable_with_markers` when commit markers are
715/// present but neither repair nor rebuild capability is available.
716pub fn preflight_native_index_integrity(
717    marker_segment_blobs: &[Vec<u8>],
718    repair_available: bool,
719    rebuild_available: bool,
720) -> Result<()> {
721    let markers = scan_commit_markers(marker_segment_blobs)?;
722    if !markers.is_empty() && !repair_available && !rebuild_available {
723        error!(
724            bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
725            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
726            reason_code = "index_unrebuildable_with_markers",
727            marker_count = markers.len(),
728            repair_available = repair_available,
729            rebuild_available = rebuild_available,
730            "critical integrity failure detected before repair attempt"
731        );
732        return Err(FrankenError::DatabaseCorrupt {
733            detail: format!(
734                "reason_code=index_unrebuildable_with_markers marker_count={} repair_available={repair_available} rebuild_available={rebuild_available}",
735                markers.len()
736            ),
737        });
738    }
739    Ok(())
740}
741
742/// Repair index segments from surviving symbols without a full rebuild.
743///
744/// The loader first tries local symbols, then remote symbols.
745///
746/// # Errors
747///
748/// Returns:
749/// - `reason_code=boldness_violation_blocked_repair` when the boldness policy blocks repair.
750/// - `reason_code=index_repair_incomplete` when one or more segments are irrecoverable.
751pub fn repair_index_segments_from_ecs(
752    segment_refs: &[NativeIndexSegmentRef],
753    local_store: &impl NativeIndexSegmentStore,
754    remote_store: &impl NativeIndexSegmentStore,
755    symbol_loss_rate_estimate: f64,
756    boldness: BoldnessConstraint,
757) -> Result<IndexRepairReport> {
758    if !boldness.permits_repair(symbol_loss_rate_estimate) {
759        warn!(
760            bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
761            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
762            reason_code = "boldness_violation_blocked_repair",
763            symbol_loss_rate_estimate = symbol_loss_rate_estimate,
764            max_repair_symbol_loss_rate = boldness.max_repair_symbol_loss_rate,
765            "repair blocked by boldness constraint"
766        );
767        return Err(FrankenError::DatabaseCorrupt {
768            detail: format!(
769                "reason_code=boldness_violation_blocked_repair symbol_loss_rate_estimate={symbol_loss_rate_estimate:.6} max_repair_symbol_loss_rate={:.6}",
770                boldness.max_repair_symbol_loss_rate
771            ),
772        });
773    }
774
775    let mut ordered_refs = segment_refs.to_vec();
776    ordered_refs.sort_by_key(|entry| (entry.start_seq, entry.end_seq, *entry.object_id.as_bytes()));
777
778    debug!(
779        bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
780        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
781        segment_ref_count = ordered_refs.len(),
782        "starting native index repair from surviving symbols"
783    );
784
785    let mut segments = Vec::with_capacity(ordered_refs.len());
786    let mut repaired_from_local = 0_u64;
787    let mut repaired_from_remote = 0_u64;
788    let mut missing = Vec::new();
789
790    for entry in ordered_refs {
791        match try_fetch_valid_segment(entry, local_store) {
792            Ok(segment) => {
793                repaired_from_local = repaired_from_local.saturating_add(1);
794                segments.push(segment);
795                continue;
796            }
797            Err(local_error) => {
798                warn!(
799                    bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
800                    logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
801                    object_id = %entry.object_id,
802                    start_seq = entry.start_seq,
803                    end_seq = entry.end_seq,
804                    error = %local_error,
805                    "local segment fetch failed; trying remote recovery path"
806                );
807            }
808        }
809
810        match try_fetch_valid_segment(entry, remote_store) {
811            Ok(segment) => {
812                repaired_from_remote = repaired_from_remote.saturating_add(1);
813                segments.push(segment);
814            }
815            Err(remote_error) => {
816                error!(
817                    bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
818                    logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
819                    object_id = %entry.object_id,
820                    start_seq = entry.start_seq,
821                    end_seq = entry.end_seq,
822                    error = %remote_error,
823                    reason_code = "index_repair_segment_irrecoverable",
824                    "segment irrecoverable from both local and remote symbols"
825                );
826                missing.push(entry.object_id);
827            }
828        }
829    }
830
831    if !missing.is_empty() {
832        return Err(FrankenError::DatabaseCorrupt {
833            detail: format!(
834                "reason_code=index_repair_incomplete irrecoverable_segments={} first_irrecoverable_object={}",
835                missing.len(),
836                missing
837                    .first()
838                    .map_or_else(|| "none".to_owned(), ToString::to_string)
839            ),
840        });
841    }
842
843    segments.sort_by_key(|segment| segment.end_seq);
844
845    info!(
846        bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
847        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
848        repaired_from_local = repaired_from_local,
849        repaired_from_remote = repaired_from_remote,
850        segments_repaired = segments.len(),
851        "native index repair complete"
852    );
853
854    Ok(IndexRepairReport {
855        segments,
856        repaired_from_local,
857        repaired_from_remote,
858    })
859}
860
861/// Rebuild native index segments by replaying the commit marker stream.
862///
863/// # Errors
864///
865/// Returns [`FrankenError::DatabaseCorrupt`] with
866/// `reason_code=index_unrebuildable_with_markers` when marker replay cannot
867/// reconstruct commit updates.
868pub fn rebuild_index_from_marker_stream(
869    marker_segment_blobs: &[Vec<u8>],
870    capsule_source: &impl CommitCapsuleIndexSource,
871    max_entries: usize,
872) -> Result<IndexRebuildReport> {
873    let markers = scan_commit_markers(marker_segment_blobs)?;
874    if markers.is_empty() {
875        return Ok(IndexRebuildReport {
876            markers,
877            segments: Vec::new(),
878        });
879    }
880
881    let start_seq = markers.first().map_or(0_u64, |record| record.commit_seq);
882    let end_seq = markers.last().map_or(0_u64, |record| record.commit_seq);
883    info!(
884        bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
885        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
886        start_seq = start_seq,
887        end_seq = end_seq,
888        segments_built = 0_u64,
889        "native index rebuild start"
890    );
891
892    let mut builder = SegmentBuilder::new(max_entries)?;
893    let mut built_segments = Vec::new();
894
895    for marker in &markers {
896        let capsule_id = ObjectId::from_bytes(marker.capsule_object_id);
897        let updates =
898            capsule_source
899                .updates_for_commit(marker.commit_seq, capsule_id)
900                .map_err(|source_error| {
901                    error!(
902                        bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
903                        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
904                        reason_code = "index_unrebuildable_with_markers",
905                        commit_seq = marker.commit_seq,
906                        capsule_object_id = %capsule_id,
907                        error = %source_error,
908                        "marker stream exists but commit capsule updates are unrecoverable"
909                    );
910                    FrankenError::DatabaseCorrupt {
911                        detail: format!(
912                            "reason_code=index_unrebuildable_with_markers commit_seq={} capsule_object_id={} source_error={source_error}",
913                            marker.commit_seq, capsule_id
914                        ),
915                    }
916                })?;
917
918        if let Some(segment) = builder.ingest_commit(marker.commit_seq, updates)? {
919            built_segments.push(segment);
920        }
921    }
922
923    if let Some(segment) = builder.flush()? {
924        built_segments.push(segment);
925    }
926
927    info!(
928        bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
929        logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
930        start_seq = start_seq,
931        end_seq = end_seq,
932        segments_built = built_segments.len(),
933        "native index rebuild complete"
934    );
935
936    Ok(IndexRebuildReport {
937        markers,
938        segments: built_segments,
939    })
940}
941
942/// Emergency linear scan over commit markers when index segments are unavailable.
943///
944/// # Errors
945///
946/// Returns `reason_code=boldness_violation_blocked_linear_scan` if emergency
947/// mode is not explicitly enabled.
948pub fn emergency_linear_scan_lookup(
949    page: PageNumber,
950    snapshot_high: u64,
951    marker_segment_blobs: &[Vec<u8>],
952    capsule_source: &impl CommitCapsuleIndexSource,
953    boldness: BoldnessConstraint,
954    evidence_state: &str,
955) -> Result<Option<VersionPointer>> {
956    if !boldness.allow_emergency_linear_scan {
957        warn!(
958            bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
959            logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
960            reason_code = "boldness_violation_blocked_linear_scan",
961            attempted_page = page.get(),
962            attempted_snapshot_high = snapshot_high,
963            evidence_state = evidence_state,
964            "boldness violation attempt blocked"
965        );
966        return Err(FrankenError::DatabaseCorrupt {
967            detail: format!(
968                "reason_code=boldness_violation_blocked_linear_scan attempted_page={} attempted_snapshot_high={} evidence_state={evidence_state}",
969                page.get(),
970                snapshot_high
971            ),
972        });
973    }
974
975    let markers = scan_commit_markers(marker_segment_blobs)?;
976    for marker in markers.iter().rev() {
977        if marker.commit_seq > snapshot_high {
978            continue;
979        }
980        let capsule_id = ObjectId::from_bytes(marker.capsule_object_id);
981        let updates = capsule_source.updates_for_commit(marker.commit_seq, capsule_id)?;
982        if let Some((_, pointer)) = updates
983            .into_iter()
984            .find(|(candidate, pointer)| *candidate == page && pointer.commit_seq <= snapshot_high)
985        {
986            info!(
987                bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
988                logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
989                page = page.get(),
990                snapshot_high = snapshot_high,
991                resolved_commit_seq = pointer.commit_seq,
992                "native index emergency linear scan resolved version pointer"
993            );
994            return Ok(Some(pointer));
995        }
996    }
997    Ok(None)
998}
999
1000fn scan_commit_markers(marker_segment_blobs: &[Vec<u8>]) -> Result<Vec<CommitMarkerRecord>> {
1001    let mut ordered_segments: Vec<(u64, Vec<CommitMarkerRecord>)> =
1002        Vec::with_capacity(marker_segment_blobs.len());
1003    for bytes in marker_segment_blobs {
1004        if bytes.len() < MARKER_SEGMENT_HEADER_BYTES {
1005            return Err(FrankenError::DatabaseCorrupt {
1006                detail: format!(
1007                    "marker segment shorter than header: bytes={} header_bytes={MARKER_SEGMENT_HEADER_BYTES}",
1008                    bytes.len()
1009                ),
1010            });
1011        }
1012        let header =
1013            MarkerSegmentHeader::decode(&bytes[..MARKER_SEGMENT_HEADER_BYTES]).map_err(|err| {
1014                FrankenError::DatabaseCorrupt {
1015                    detail: format!("invalid marker segment header: {err}"),
1016                }
1017            })?;
1018        let records = recover_valid_prefix(bytes).map_err(|err| FrankenError::DatabaseCorrupt {
1019            detail: format!("marker segment prefix recovery failed: {err}"),
1020        })?;
1021        ordered_segments.push((header.start_commit_seq, records));
1022    }
1023
1024    ordered_segments.sort_by_key(|(start_seq, _)| *start_seq);
1025    let mut combined = Vec::new();
1026    let mut expected_next_seq: Option<u64> = None;
1027    for (_, records) in ordered_segments {
1028        for record in records {
1029            if let Some(expected) = expected_next_seq
1030                && record.commit_seq != expected
1031            {
1032                return Err(FrankenError::DatabaseCorrupt {
1033                    detail: format!(
1034                        "marker stream commit gap: expected {expected}, found {}",
1035                        record.commit_seq
1036                    ),
1037                });
1038            }
1039            expected_next_seq = Some(record.commit_seq.saturating_add(1));
1040            combined.push(record);
1041        }
1042    }
1043    Ok(combined)
1044}
1045
1046fn try_fetch_valid_segment(
1047    entry: NativeIndexSegmentRef,
1048    store: &impl NativeIndexSegmentStore,
1049) -> Result<PageVersionIndexSegment> {
1050    let segment = store.fetch_index_segment(entry.object_id)?;
1051    if segment.start_seq != entry.start_seq || segment.end_seq != entry.end_seq {
1052        return Err(FrankenError::DatabaseCorrupt {
1053            detail: format!(
1054                "segment bounds mismatch for object {}: expected [{}..={}], found [{}..={}]",
1055                entry.object_id, entry.start_seq, entry.end_seq, segment.start_seq, segment.end_seq
1056            ),
1057        });
1058    }
1059    let recomputed = derive_segment_object_id(&segment);
1060    if recomputed != entry.object_id {
1061        return Err(FrankenError::DatabaseCorrupt {
1062            detail: format!(
1063                "segment object id mismatch: expected {}, recomputed {}",
1064                entry.object_id, recomputed
1065            ),
1066        });
1067    }
1068    Ok(segment)
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use std::cell::Cell;
1074
1075    use super::*;
1076    use crate::commit_marker::{
1077        COMMIT_MARKER_RECORD_BYTES, CommitMarkerRecord, MARKER_SEGMENT_HEADER_BYTES,
1078        MarkerSegmentHeader,
1079    };
1080
1081    type CapsuleKey = (u64, [u8; 16]);
1082    type CapsuleUpdates = Vec<(PageNumber, VersionPointer)>;
1083
1084    #[derive(Debug, Clone)]
1085    struct TestBasePages {
1086        pages: BTreeMap<u32, Vec<u8>>,
1087        loads: Cell<u64>,
1088    }
1089
1090    impl TestBasePages {
1091        fn new(entries: impl IntoIterator<Item = (PageNumber, Vec<u8>)>) -> Self {
1092            let mut pages = BTreeMap::new();
1093            for (page, bytes) in entries {
1094                pages.insert(page.get(), bytes);
1095            }
1096            Self {
1097                pages,
1098                loads: Cell::new(0),
1099            }
1100        }
1101
1102        fn loads(&self) -> u64 {
1103            self.loads.get()
1104        }
1105    }
1106
1107    impl BasePageProvider for TestBasePages {
1108        fn load_base_page(&self, page: PageNumber) -> Result<Vec<u8>> {
1109            self.loads.set(self.loads.get().saturating_add(1));
1110            self.pages
1111                .get(&page.get())
1112                .cloned()
1113                .ok_or_else(|| FrankenError::DatabaseCorrupt {
1114                    detail: format!("missing base page {}", page.get()),
1115                })
1116        }
1117    }
1118
1119    #[derive(Debug, Clone)]
1120    struct TestPatchStore {
1121        objects: BTreeMap<[u8; 16], Vec<u8>>,
1122        fetches: Cell<u64>,
1123    }
1124
1125    impl TestPatchStore {
1126        fn new(entries: impl IntoIterator<Item = (ObjectId, Vec<u8>)>) -> Self {
1127            let mut objects = BTreeMap::new();
1128            for (oid, payload) in entries {
1129                objects.insert(*oid.as_bytes(), payload);
1130            }
1131            Self {
1132                objects,
1133                fetches: Cell::new(0),
1134            }
1135        }
1136
1137        fn fetches(&self) -> u64 {
1138            self.fetches.get()
1139        }
1140    }
1141
1142    impl PatchObjectStore for TestPatchStore {
1143        fn fetch_patch_object(&self, object_id: ObjectId) -> Result<Vec<u8>> {
1144            self.fetches.set(self.fetches.get().saturating_add(1));
1145            self.objects
1146                .get(object_id.as_bytes())
1147                .cloned()
1148                .ok_or_else(|| FrankenError::DatabaseCorrupt {
1149                    detail: format!("missing patch object {object_id}"),
1150                })
1151        }
1152    }
1153
1154    #[derive(Debug, Clone, Default)]
1155    struct TestSegmentStore {
1156        segments: BTreeMap<[u8; 16], PageVersionIndexSegment>,
1157    }
1158
1159    impl TestSegmentStore {
1160        fn new(entries: impl IntoIterator<Item = (ObjectId, PageVersionIndexSegment)>) -> Self {
1161            let mut segments = BTreeMap::new();
1162            for (id, segment) in entries {
1163                segments.insert(*id.as_bytes(), segment);
1164            }
1165            Self { segments }
1166        }
1167    }
1168
1169    impl NativeIndexSegmentStore for TestSegmentStore {
1170        fn fetch_index_segment(&self, object_id: ObjectId) -> Result<PageVersionIndexSegment> {
1171            self.segments
1172                .get(object_id.as_bytes())
1173                .cloned()
1174                .ok_or_else(|| FrankenError::DatabaseCorrupt {
1175                    detail: format!("missing index segment object {object_id}"),
1176                })
1177        }
1178    }
1179
1180    #[derive(Debug, Clone, Default)]
1181    struct TestCapsuleSource {
1182        updates: BTreeMap<CapsuleKey, CapsuleUpdates>,
1183    }
1184
1185    impl TestCapsuleSource {
1186        fn with_update(
1187            mut self,
1188            commit_seq: u64,
1189            capsule_seed: u8,
1190            updates: Vec<(PageNumber, VersionPointer)>,
1191        ) -> Self {
1192            self.updates
1193                .insert((commit_seq, [capsule_seed; 16]), updates);
1194            self
1195        }
1196    }
1197
1198    impl CommitCapsuleIndexSource for TestCapsuleSource {
1199        fn updates_for_commit(
1200            &self,
1201            commit_seq: u64,
1202            capsule_object_id: ObjectId,
1203        ) -> Result<Vec<(PageNumber, VersionPointer)>> {
1204            self.updates
1205                .get(&(commit_seq, *capsule_object_id.as_bytes()))
1206                .cloned()
1207                .ok_or_else(|| FrankenError::DatabaseCorrupt {
1208                    detail: format!(
1209                        "missing capsule updates commit_seq={commit_seq} capsule_object_id={capsule_object_id}"
1210                    ),
1211                })
1212        }
1213    }
1214
1215    fn page(n: u32) -> PageNumber {
1216        PageNumber::new(n).expect("non-zero page number")
1217    }
1218
1219    fn oid(seed: u8) -> ObjectId {
1220        ObjectId::from_bytes([seed; 16])
1221    }
1222
1223    fn pointer(
1224        commit_seq: u64,
1225        patch_object_seed: u8,
1226        patch_kind: PatchKind,
1227        base_hint: Option<u8>,
1228    ) -> VersionPointer {
1229        VersionPointer {
1230            commit_seq,
1231            patch_object: oid(patch_object_seed),
1232            patch_kind,
1233            base_hint: base_hint.map(oid),
1234        }
1235    }
1236
1237    fn encode_intent_patch(ops: &[(u16, &[u8])]) -> Vec<u8> {
1238        let mut out = Vec::new();
1239        out.push(u8::try_from(ops.len()).expect("op count fits u8"));
1240        for (offset, data) in ops {
1241            out.extend_from_slice(&offset.to_le_bytes());
1242            out.extend_from_slice(
1243                &u16::try_from(data.len())
1244                    .expect("op data len fits u16")
1245                    .to_le_bytes(),
1246            );
1247            out.extend_from_slice(data);
1248        }
1249        out
1250    }
1251
1252    fn encode_sparse_xor_patch(ops: &[(u16, &[u8])]) -> Vec<u8> {
1253        encode_intent_patch(ops)
1254    }
1255
1256    fn marker_record(
1257        commit_seq: u64,
1258        capsule_seed: u8,
1259        prev_marker: [u8; 16],
1260    ) -> CommitMarkerRecord {
1261        CommitMarkerRecord::new(
1262            commit_seq,
1263            1_700_000_000_000_000_000_u64.saturating_add(commit_seq),
1264            [capsule_seed; 16],
1265            [capsule_seed.wrapping_add(1); 16],
1266            prev_marker,
1267        )
1268    }
1269
1270    fn marker_segment_blob(start_seq: u64, records: &[CommitMarkerRecord]) -> Vec<u8> {
1271        let mut out = Vec::new();
1272        let segment_id = start_seq / 1_000_000;
1273        let header = MarkerSegmentHeader::new(segment_id, start_seq);
1274        out.extend_from_slice(&header.encode());
1275        for record in records {
1276            out.extend_from_slice(&record.encode());
1277        }
1278        let expected_len = MARKER_SEGMENT_HEADER_BYTES + COMMIT_MARKER_RECORD_BYTES * records.len();
1279        assert_eq!(out.len(), expected_len);
1280        out
1281    }
1282
1283    #[test]
1284    fn test_lookup_latest_version() {
1285        let p = page(7);
1286        let vp10 = pointer(10, 0x10, PatchKind::FullImage, None);
1287        let vp20 = pointer(20, 0x20, PatchKind::FullImage, None);
1288        let seg10 = PageVersionIndexSegment::new(10, 10, vec![(p, vp10)]);
1289        let seg20 = PageVersionIndexSegment::new(20, 20, vec![(p, vp20)]);
1290        let base = TestBasePages::new([(p, b"base".to_vec())]);
1291        let store =
1292            TestPatchStore::new([(oid(0x10), b"v10".to_vec()), (oid(0x20), b"v20".to_vec())]);
1293
1294        let mut cache = NativePageCache::default();
1295        let result_high = lookup_page_version(
1296            p,
1297            25,
1298            &[seg10.clone(), seg20.clone()],
1299            &mut cache,
1300            &base,
1301            &store,
1302            0.0,
1303        )
1304        .expect("lookup");
1305        assert_eq!(result_high.page_bytes, b"v20".to_vec());
1306        assert_eq!(result_high.resolved_pointer, Some(vp20));
1307
1308        let mut cache2 = NativePageCache::default();
1309        let result_mid =
1310            lookup_page_version(p, 15, &[seg10, seg20], &mut cache2, &base, &store, 0.0)
1311                .expect("lookup");
1312        assert_eq!(result_mid.page_bytes, b"v10".to_vec());
1313        assert_eq!(result_mid.resolved_pointer, Some(vp10));
1314    }
1315
1316    #[test]
1317    fn test_filter_negative_path_has_no_false_negatives() {
1318        let target = page(5);
1319        let other = page(9);
1320        let vp = pointer(12, 0x30, PatchKind::FullImage, None);
1321        let segment = PageVersionIndexSegment::new(12, 12, vec![(other, vp)]);
1322        let base = TestBasePages::new([(target, b"base5".to_vec()), (other, b"base9".to_vec())]);
1323        let store = TestPatchStore::new([(oid(0x30), b"other-version".to_vec())]);
1324
1325        let mut cache = NativePageCache::default();
1326        let negative = lookup_page_version(
1327            target,
1328            20,
1329            std::slice::from_ref(&segment),
1330            &mut cache,
1331            &base,
1332            &store,
1333            0.0,
1334        )
1335        .expect("negative path");
1336        assert!(!negative.trace.filter_hit);
1337        assert_eq!(negative.page_bytes, b"base5".to_vec());
1338        assert_eq!(store.fetches(), 0);
1339
1340        let mut cache2 = NativePageCache::default();
1341        let positive = lookup_page_version(other, 20, &[segment], &mut cache2, &base, &store, 0.0)
1342            .expect("positive path");
1343        assert!(positive.trace.filter_hit);
1344        assert_eq!(positive.page_bytes, b"other-version".to_vec());
1345    }
1346
1347    #[test]
1348    fn test_materialization_intent_and_sparse_xor() {
1349        let p = page(3);
1350        let base = TestBasePages::new([(p, b"aaaa".to_vec())]);
1351        let intent_ptr = pointer(10, 0x40, PatchKind::IntentLog, None);
1352        let xor_ptr = pointer(20, 0x50, PatchKind::SparseXor, None);
1353        let seg_intent = PageVersionIndexSegment::new(10, 10, vec![(p, intent_ptr)]);
1354        let seg_xor = PageVersionIndexSegment::new(20, 20, vec![(p, xor_ptr)]);
1355
1356        let intent_patch = encode_intent_patch(&[(1, b"BC")]);
1357        let xor_patch = encode_sparse_xor_patch(&[(0, &[0x01, 0x02, 0x03, 0x04])]);
1358        let store = TestPatchStore::new([(oid(0x40), intent_patch), (oid(0x50), xor_patch)]);
1359
1360        let mut cache = NativePageCache::default();
1361        let intent = lookup_page_version(
1362            p,
1363            10,
1364            &[seg_intent, seg_xor.clone()],
1365            &mut cache,
1366            &base,
1367            &store,
1368            0.0,
1369        )
1370        .expect("intent materialization");
1371        assert_eq!(intent.page_bytes, b"aBCa".to_vec());
1372
1373        let mut cache2 = NativePageCache::default();
1374        let sparse = lookup_page_version(p, 20, &[seg_xor], &mut cache2, &base, &store, 0.0)
1375            .expect("xor materialization");
1376        assert_eq!(sparse.page_bytes, vec![96, 99, 98, 101]);
1377    }
1378
1379    #[test]
1380    fn test_segment_construction_deterministic() {
1381        let mut builder_a = SegmentBuilder::new(16).expect("builder");
1382        let mut builder_b = SegmentBuilder::new(16).expect("builder");
1383
1384        let updates_a = vec![
1385            (page(7), pointer(100, 0x60, PatchKind::FullImage, None)),
1386            (
1387                page(2),
1388                pointer(100, 0x61, PatchKind::IntentLog, Some(0x62)),
1389            ),
1390        ];
1391        let updates_b = vec![
1392            (
1393                page(2),
1394                pointer(100, 0x61, PatchKind::IntentLog, Some(0x62)),
1395            ),
1396            (page(7), pointer(100, 0x60, PatchKind::FullImage, None)),
1397        ];
1398
1399        assert!(
1400            builder_a
1401                .ingest_commit(100, updates_a)
1402                .expect("ingest")
1403                .is_none()
1404        );
1405        assert!(
1406            builder_b
1407                .ingest_commit(100, updates_b)
1408                .expect("ingest")
1409                .is_none()
1410        );
1411
1412        let built_a = builder_a.flush().expect("flush").expect("segment");
1413        let built_b = builder_b.flush().expect("flush").expect("segment");
1414        assert_eq!(built_a.segment.entries, built_b.segment.entries);
1415        assert_eq!(built_a.object_id, built_b.object_id);
1416    }
1417
1418    fn run_e2e_path_case() {
1419        let p = page(11);
1420        let base_bytes = vec![0x10, 0x20, 0x30, 0x40];
1421        let base = TestBasePages::new([(p, base_bytes.clone())]);
1422        let pointer = pointer(30, 0x71, PatchKind::SparseXor, Some(0x70));
1423        let segment = PageVersionIndexSegment::new(30, 30, vec![(p, pointer)]);
1424        let xor_patch = encode_sparse_xor_patch(&[(2, &[0xFF, 0x0F])]);
1425        let store = TestPatchStore::new([(oid(0x70), base_bytes), (oid(0x71), xor_patch)]);
1426
1427        let mut cache = NativePageCache::default();
1428        let first = lookup_page_version(
1429            p,
1430            30,
1431            std::slice::from_ref(&segment),
1432            &mut cache,
1433            &base,
1434            &store,
1435            0.02,
1436        )
1437        .expect("first lookup");
1438        assert!(!first.trace.cache_hit);
1439        assert!(first.trace.filter_hit);
1440        assert_eq!(first.trace.segment_scans, 1);
1441        assert_eq!(first.trace.resolved_commit_seq, Some(30));
1442        assert_eq!(first.page_bytes, vec![0x10, 0x20, 0xCF, 0x4F]);
1443
1444        let second = lookup_page_version(p, 30, &[segment], &mut cache, &base, &store, 0.02)
1445            .expect("second lookup");
1446        assert!(second.trace.cache_hit);
1447        assert_eq!(second.page_bytes, first.page_bytes);
1448        assert_eq!(store.fetches(), 2);
1449        assert_eq!(base.loads(), 1);
1450    }
1451
1452    #[test]
1453    fn test_e2e_cache_miss_filter_hit_index_scan_fetch_materialize() {
1454        run_e2e_path_case();
1455    }
1456
1457    #[test]
1458    fn test_bd_1hi_32_unit_compliance_gate() {
1459        assert_eq!(NATIVE_INDEX_BEAD_ID, "bd-1hi.32");
1460        assert_eq!(NATIVE_INDEX_LOGGING_STANDARD, "bd-1fpm");
1461        let store = TestPatchStore::new(std::iter::empty::<(ObjectId, Vec<u8>)>());
1462        let err = materialize_patch(
1463            pointer(1, 0xAA, PatchKind::FullImage, None),
1464            b"",
1465            b"",
1466            &store,
1467            MAX_PATCH_DEPTH + 1,
1468        )
1469        .expect_err("depth guard");
1470        assert!(err.to_string().contains("depth exceeded"));
1471    }
1472
1473    #[test]
1474    fn prop_bd_1hi_32_structure_compliance() {
1475        for seed in 1_u8..=16 {
1476            let mut builder = SegmentBuilder::new(8).expect("builder");
1477            let updates = vec![
1478                (
1479                    page(u32::from(seed)),
1480                    pointer(500, seed, PatchKind::FullImage, None),
1481                ),
1482                (
1483                    page(u32::from(seed) + 100),
1484                    pointer(500, seed.wrapping_add(1), PatchKind::SparseXor, Some(seed)),
1485                ),
1486            ];
1487            assert!(
1488                builder
1489                    .ingest_commit(500, updates)
1490                    .expect("ingest")
1491                    .is_none()
1492            );
1493            let built = builder.flush().expect("flush").expect("segment");
1494            assert!(!built.segment.entries.is_empty());
1495            let recomputed = derive_segment_object_id(&built.segment);
1496            assert_eq!(built.object_id, recomputed);
1497        }
1498    }
1499
1500    #[test]
1501    fn test_e2e_bd_1hi_32_compliance() {
1502        run_e2e_path_case();
1503    }
1504
1505    #[test]
1506    fn test_index_rebuild_from_markers() {
1507        let marker_100 = marker_record(100, 0x10, [0_u8; 16]);
1508        let marker_101 = marker_record(101, 0x11, marker_100.marker_id);
1509        let marker_blob = marker_segment_blob(100, &[marker_100, marker_101]);
1510
1511        let source = TestCapsuleSource::default()
1512            .with_update(
1513                100,
1514                0x10,
1515                vec![(page(3), pointer(100, 0x80, PatchKind::FullImage, None))],
1516            )
1517            .with_update(
1518                101,
1519                0x11,
1520                vec![
1521                    (
1522                        page(3),
1523                        pointer(101, 0x81, PatchKind::IntentLog, Some(0x82)),
1524                    ),
1525                    (page(9), pointer(101, 0x83, PatchKind::FullImage, None)),
1526                ],
1527            );
1528
1529        let rebuilt_a =
1530            rebuild_index_from_marker_stream(std::slice::from_ref(&marker_blob), &source, 2)
1531                .expect("rebuild from markers");
1532        let rebuilt_b = rebuild_index_from_marker_stream(&[marker_blob], &source, 2)
1533            .expect("deterministic rebuild");
1534        assert_eq!(rebuilt_a.segments, rebuilt_b.segments);
1535
1536        let segments: Vec<PageVersionIndexSegment> = rebuilt_a
1537            .segments
1538            .iter()
1539            .map(|segment| segment.segment.clone())
1540            .collect();
1541        let (resolved, scans) = lookup_pointer_in_segments(page(3), 101, &segments);
1542        assert_eq!(
1543            resolved,
1544            Some(pointer(101, 0x81, PatchKind::IntentLog, Some(0x82)))
1545        );
1546        assert!(scans >= 1);
1547    }
1548
1549    #[test]
1550    fn test_index_repair_from_ecs() {
1551        let p1 = page(4);
1552        let p2 = page(8);
1553        let seg_local = PageVersionIndexSegment::new(
1554            200,
1555            200,
1556            vec![(p1, pointer(200, 0x90, PatchKind::FullImage, None))],
1557        );
1558        let seg_remote = PageVersionIndexSegment::new(
1559            201,
1560            201,
1561            vec![(p2, pointer(201, 0x91, PatchKind::SparseXor, Some(0x92)))],
1562        );
1563        let id_local = derive_segment_object_id(&seg_local);
1564        let id_remote = derive_segment_object_id(&seg_remote);
1565
1566        let refs = vec![
1567            NativeIndexSegmentRef {
1568                start_seq: 200,
1569                end_seq: 200,
1570                object_id: id_local,
1571            },
1572            NativeIndexSegmentRef {
1573                start_seq: 201,
1574                end_seq: 201,
1575                object_id: id_remote,
1576            },
1577        ];
1578
1579        let local_store = TestSegmentStore::new([(id_local, seg_local.clone())]);
1580        let remote_store = TestSegmentStore::new([(id_local, seg_local), (id_remote, seg_remote)]);
1581        let report = repair_index_segments_from_ecs(
1582            &refs,
1583            &local_store,
1584            &remote_store,
1585            0.02,
1586            BoldnessConstraint::strict(),
1587        )
1588        .expect("repair from surviving symbols");
1589
1590        assert_eq!(report.repaired_from_local, 1);
1591        assert_eq!(report.repaired_from_remote, 1);
1592        assert_eq!(report.segments.len(), 2);
1593        let (resolved, _) = lookup_pointer_in_segments(p2, 201, &report.segments);
1594        assert_eq!(
1595            resolved,
1596            Some(pointer(201, 0x91, PatchKind::SparseXor, Some(0x92)))
1597        );
1598    }
1599
1600    #[test]
1601    fn test_boldness_constraint() {
1602        let marker = marker_record(300, 0x33, [0_u8; 16]);
1603        let marker_blob = marker_segment_blob(300, std::slice::from_ref(&marker));
1604        let source = TestCapsuleSource::default().with_update(
1605            300,
1606            0x33,
1607            vec![(page(11), pointer(300, 0xA0, PatchKind::FullImage, None))],
1608        );
1609
1610        let blocked = emergency_linear_scan_lookup(
1611            page(11),
1612            300,
1613            std::slice::from_ref(&marker_blob),
1614            &source,
1615            BoldnessConstraint::strict(),
1616            "index_destroyed",
1617        )
1618        .expect_err("strict boldness must block emergency lookup");
1619        assert!(
1620            blocked
1621                .to_string()
1622                .contains("reason_code=boldness_violation_blocked_linear_scan")
1623        );
1624
1625        let resolved = emergency_linear_scan_lookup(
1626            page(11),
1627            300,
1628            &[marker_blob],
1629            &source,
1630            BoldnessConstraint::emergency(),
1631            "index_destroyed",
1632        )
1633        .expect("emergency lookup")
1634        .expect("pointer found");
1635        assert_eq!(resolved, pointer(300, 0xA0, PatchKind::FullImage, None));
1636    }
1637
1638    #[test]
1639    fn test_critical_integrity_failure_detected_before_repair_attempt() {
1640        let marker = marker_record(400, 0x44, [0_u8; 16]);
1641        let blob = marker_segment_blob(400, std::slice::from_ref(&marker));
1642        let err = preflight_native_index_integrity(std::slice::from_ref(&blob), false, false)
1643            .expect_err("markers without recovery paths are critical");
1644        assert!(
1645            err.to_string()
1646                .contains("reason_code=index_unrebuildable_with_markers")
1647        );
1648
1649        preflight_native_index_integrity(&[blob], true, false).expect("repair path available");
1650    }
1651
1652    #[test]
1653    fn test_bd_1hi_33_unit_compliance_gate() {
1654        assert_eq!(NATIVE_INDEX_REPAIR_BEAD_ID, "bd-1hi.33");
1655        assert_eq!(NATIVE_INDEX_LOGGING_STANDARD, "bd-1fpm");
1656
1657        let marker = marker_record(500, 0x55, [0_u8; 16]);
1658        let marker_blob = marker_segment_blob(500, std::slice::from_ref(&marker));
1659        let err = preflight_native_index_integrity(&[marker_blob], false, false)
1660            .expect_err("critical preflight error");
1661        assert!(
1662            err.to_string()
1663                .contains("reason_code=index_unrebuildable_with_markers")
1664        );
1665    }
1666
1667    #[test]
1668    fn prop_bd_1hi_33_structure_compliance() {
1669        for seed in 1_u8..=12 {
1670            let commit_seq = 10_000_u64 + u64::from(seed);
1671            let marker = marker_record(commit_seq, seed, [0_u8; 16]);
1672            let marker_blob = marker_segment_blob(commit_seq, std::slice::from_ref(&marker));
1673            let source = TestCapsuleSource::default().with_update(
1674                commit_seq,
1675                seed,
1676                vec![(
1677                    page(u32::from(seed) + 1),
1678                    pointer(commit_seq, seed.wrapping_add(1), PatchKind::FullImage, None),
1679                )],
1680            );
1681
1682            let rebuilt =
1683                rebuild_index_from_marker_stream(std::slice::from_ref(&marker_blob), &source, 1)
1684                    .expect("rebuild");
1685            assert_eq!(rebuilt.markers.len(), 1);
1686            assert!(!rebuilt.segments.is_empty());
1687
1688            let refs: Vec<NativeIndexSegmentRef> = rebuilt
1689                .segments
1690                .iter()
1691                .map(|segment| NativeIndexSegmentRef {
1692                    start_seq: segment.segment.start_seq,
1693                    end_seq: segment.segment.end_seq,
1694                    object_id: segment.object_id,
1695                })
1696                .collect();
1697            let remote_store = TestSegmentStore::new(
1698                rebuilt
1699                    .segments
1700                    .iter()
1701                    .map(|segment| (segment.object_id, segment.segment.clone())),
1702            );
1703            let local_store = TestSegmentStore::default();
1704            let repaired = repair_index_segments_from_ecs(
1705                &refs,
1706                &local_store,
1707                &remote_store,
1708                0.05,
1709                BoldnessConstraint::strict(),
1710            )
1711            .expect("repair");
1712            assert_eq!(repaired.segments.len(), refs.len());
1713
1714            let looked_up = emergency_linear_scan_lookup(
1715                page(u32::from(seed) + 1),
1716                commit_seq,
1717                &[marker_blob],
1718                &source,
1719                BoldnessConstraint::emergency(),
1720                "index_destroyed",
1721            )
1722            .expect("emergency lookup");
1723            assert!(looked_up.is_some());
1724        }
1725    }
1726
1727    struct Bd1Hi33Fixture {
1728        p11: PageNumber,
1729        p12: PageNumber,
1730        base: TestBasePages,
1731        patch_store: TestPatchStore,
1732        marker_blob: Vec<u8>,
1733        source: TestCapsuleSource,
1734    }
1735
1736    fn build_bd_1hi_33_fixture() -> Bd1Hi33Fixture {
1737        let p11 = page(11);
1738        let p12 = page(12);
1739        let base = TestBasePages::new([(p11, b"base11".to_vec()), (p12, b"base12".to_vec())]);
1740        let patch_store = TestPatchStore::new([
1741            (oid(0x90), b"v300-p11".to_vec()),
1742            (oid(0x91), b"v301-p11".to_vec()),
1743            (oid(0x92), b"v301-p12".to_vec()),
1744        ]);
1745
1746        let marker_300 = marker_record(300, 0x30, [0_u8; 16]);
1747        let marker_301 = marker_record(301, 0x31, marker_300.marker_id);
1748        let marker_blob = marker_segment_blob(300, &[marker_300, marker_301]);
1749
1750        let source = TestCapsuleSource::default()
1751            .with_update(
1752                300,
1753                0x30,
1754                vec![(p11, pointer(300, 0x90, PatchKind::FullImage, None))],
1755            )
1756            .with_update(
1757                301,
1758                0x31,
1759                vec![
1760                    (p11, pointer(301, 0x91, PatchKind::FullImage, None)),
1761                    (p12, pointer(301, 0x92, PatchKind::FullImage, None)),
1762                ],
1763            );
1764
1765        Bd1Hi33Fixture {
1766            p11,
1767            p12,
1768            base,
1769            patch_store,
1770            marker_blob,
1771            source,
1772        }
1773    }
1774
1775    fn assert_repaired_index_lookups(
1776        fixture: &Bd1Hi33Fixture,
1777        segments: &[PageVersionIndexSegment],
1778    ) {
1779        let mut cache = NativePageCache::default();
1780        let looked_up_p11 = lookup_page_version(
1781            fixture.p11,
1782            301,
1783            segments,
1784            &mut cache,
1785            &fixture.base,
1786            &fixture.patch_store,
1787            0.05,
1788        )
1789        .expect("indexed lookup p11");
1790        assert_eq!(looked_up_p11.page_bytes, b"v301-p11".to_vec());
1791
1792        let mut cache2 = NativePageCache::default();
1793        let looked_up_p12 = lookup_page_version(
1794            fixture.p12,
1795            301,
1796            segments,
1797            &mut cache2,
1798            &fixture.base,
1799            &fixture.patch_store,
1800            0.05,
1801        )
1802        .expect("indexed lookup p12");
1803        assert_eq!(looked_up_p12.page_bytes, b"v301-p12".to_vec());
1804    }
1805
1806    fn assert_emergency_scan_behavior(fixture: &Bd1Hi33Fixture) {
1807        let blocked = emergency_linear_scan_lookup(
1808            fixture.p11,
1809            301,
1810            std::slice::from_ref(&fixture.marker_blob),
1811            &fixture.source,
1812            BoldnessConstraint::strict(),
1813            "index_destroyed",
1814        )
1815        .expect_err("strict policy blocks emergency scan");
1816        assert!(
1817            blocked
1818                .to_string()
1819                .contains("reason_code=boldness_violation_blocked_linear_scan")
1820        );
1821
1822        let fallback_pointer = emergency_linear_scan_lookup(
1823            fixture.p11,
1824            301,
1825            std::slice::from_ref(&fixture.marker_blob),
1826            &fixture.source,
1827            BoldnessConstraint::emergency(),
1828            "index_destroyed",
1829        )
1830        .expect("emergency linear scan")
1831        .expect("pointer");
1832        let fallback_patch = fixture
1833            .patch_store
1834            .fetch_patch_object(fallback_pointer.patch_object)
1835            .expect("patch bytes");
1836        let fallback_base = fixture.base.load_base_page(fixture.p11).expect("base page");
1837        let fallback_bytes = materialize_patch(
1838            fallback_pointer,
1839            &fallback_patch,
1840            &fallback_base,
1841            &fixture.patch_store,
1842            0,
1843        )
1844        .expect("materialize from emergency pointer");
1845        assert_eq!(fallback_bytes, b"v301-p11".to_vec());
1846    }
1847
1848    #[test]
1849    fn test_e2e_bd_1hi_33_compliance() {
1850        let fixture = build_bd_1hi_33_fixture();
1851
1852        let rebuilt_a = rebuild_index_from_marker_stream(
1853            std::slice::from_ref(&fixture.marker_blob),
1854            &fixture.source,
1855            1,
1856        )
1857        .expect("rebuild");
1858        let rebuilt_b = rebuild_index_from_marker_stream(
1859            std::slice::from_ref(&fixture.marker_blob),
1860            &fixture.source,
1861            1,
1862        )
1863        .expect("rebuild deterministic");
1864        let ids_a: Vec<ObjectId> = rebuilt_a
1865            .segments
1866            .iter()
1867            .map(|segment| segment.object_id)
1868            .collect();
1869        let ids_b: Vec<ObjectId> = rebuilt_b
1870            .segments
1871            .iter()
1872            .map(|segment| segment.object_id)
1873            .collect();
1874        assert_eq!(ids_a, ids_b);
1875
1876        let refs: Vec<NativeIndexSegmentRef> = rebuilt_a
1877            .segments
1878            .iter()
1879            .map(|segment| NativeIndexSegmentRef {
1880                start_seq: segment.segment.start_seq,
1881                end_seq: segment.segment.end_seq,
1882                object_id: segment.object_id,
1883            })
1884            .collect();
1885        assert!(refs.len() >= 2);
1886
1887        let local_store = TestSegmentStore::new([(
1888            rebuilt_a.segments[0].object_id,
1889            rebuilt_a.segments[0].segment.clone(),
1890        )]);
1891        let remote_store = TestSegmentStore::new(
1892            rebuilt_a
1893                .segments
1894                .iter()
1895                .map(|segment| (segment.object_id, segment.segment.clone())),
1896        );
1897        let repaired = repair_index_segments_from_ecs(
1898            &refs,
1899            &local_store,
1900            &remote_store,
1901            0.05,
1902            BoldnessConstraint::strict(),
1903        )
1904        .expect("repair");
1905
1906        assert_repaired_index_lookups(&fixture, &repaired.segments);
1907        assert_emergency_scan_behavior(&fixture);
1908    }
1909}