1use std::collections::BTreeMap;
8
9use crate::commit_marker::{
10 CommitMarkerRecord, MARKER_SEGMENT_HEADER_BYTES, MarkerSegmentHeader, recover_valid_prefix,
11};
12use fsqlite_error::{FrankenError, Result};
13use fsqlite_types::ecs::{PageVersionIndexSegment, PatchKind, VersionPointer};
14use fsqlite_types::{ObjectId, PageNumber};
15use tracing::{debug, error, info, warn};
16
17const NATIVE_INDEX_BEAD_ID: &str = "bd-1hi.32";
18const NATIVE_INDEX_REPAIR_BEAD_ID: &str = "bd-1hi.33";
19const NATIVE_INDEX_LOGGING_STANDARD: &str = "bd-1fpm";
20const MAX_PATCH_DEPTH: usize = 8;
21const DEFAULT_MAX_REPAIR_SYMBOL_LOSS_RATE: f64 = 0.25;
22
23pub trait BasePageProvider {
25 fn load_base_page(&self, page: PageNumber) -> Result<Vec<u8>>;
31}
32
33pub trait PatchObjectStore {
35 fn fetch_patch_object(&self, object_id: ObjectId) -> Result<Vec<u8>>;
41}
42
43#[derive(Debug, Default, Clone, PartialEq, Eq)]
45pub struct NativePageCache {
46 entries: BTreeMap<(u32, u64), Vec<u8>>,
47}
48
49impl NativePageCache {
50 pub fn insert(&mut self, page: PageNumber, snapshot_high: u64, bytes: Vec<u8>) {
52 self.entries.insert((page.get(), snapshot_high), bytes);
53 }
54
55 #[must_use]
57 pub fn get(&self, page: PageNumber, snapshot_high: u64) -> Option<&[u8]> {
58 self.entries
59 .get(&(page.get(), snapshot_high))
60 .map(Vec::as_slice)
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub struct LookupTrace {
67 pub cache_hit: bool,
69 pub filter_hit: bool,
71 pub segment_scans: u64,
73 pub resolved_commit_seq: Option<u64>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct LookupResult {
80 pub page_bytes: Vec<u8>,
82 pub resolved_pointer: Option<VersionPointer>,
84 pub trace: LookupTrace,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct BuiltIndexSegment {
91 pub segment: PageVersionIndexSegment,
93 pub object_id: ObjectId,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq)]
99pub struct BoldnessConstraint {
100 pub allow_emergency_linear_scan: bool,
102 pub max_repair_symbol_loss_rate: f64,
104}
105
106impl BoldnessConstraint {
107 #[must_use]
109 pub const fn strict() -> Self {
110 Self {
111 allow_emergency_linear_scan: false,
112 max_repair_symbol_loss_rate: DEFAULT_MAX_REPAIR_SYMBOL_LOSS_RATE,
113 }
114 }
115
116 #[must_use]
118 pub const fn emergency() -> Self {
119 Self {
120 allow_emergency_linear_scan: true,
121 max_repair_symbol_loss_rate: DEFAULT_MAX_REPAIR_SYMBOL_LOSS_RATE,
122 }
123 }
124
125 #[must_use]
126 fn permits_repair(self, symbol_loss_rate_estimate: f64) -> bool {
127 symbol_loss_rate_estimate <= self.max_repair_symbol_loss_rate
128 }
129}
130
131impl Default for BoldnessConstraint {
132 fn default() -> Self {
133 Self::strict()
134 }
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub struct NativeIndexSegmentRef {
140 pub start_seq: u64,
142 pub end_seq: u64,
144 pub object_id: ObjectId,
146}
147
148pub trait NativeIndexSegmentStore {
150 fn fetch_index_segment(&self, object_id: ObjectId) -> Result<PageVersionIndexSegment>;
156}
157
158pub trait CommitCapsuleIndexSource {
160 fn updates_for_commit(
166 &self,
167 commit_seq: u64,
168 capsule_object_id: ObjectId,
169 ) -> Result<Vec<(PageNumber, VersionPointer)>>;
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexRepairReport {
175 pub segments: Vec<PageVersionIndexSegment>,
177 pub repaired_from_local: u64,
179 pub repaired_from_remote: u64,
181}
182
183#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct IndexRebuildReport {
186 pub markers: Vec<CommitMarkerRecord>,
188 pub segments: Vec<BuiltIndexSegment>,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct SegmentBuilder {
195 max_entries: usize,
196 start_seq: Option<u64>,
197 end_seq: Option<u64>,
198 pending: BTreeMap<(u32, u64), VersionPointer>,
199}
200
201impl SegmentBuilder {
202 pub fn new(max_entries: usize) -> Result<Self> {
208 if max_entries == 0 {
209 return Err(FrankenError::OutOfRange {
210 what: "segment_builder.max_entries".to_owned(),
211 value: "0".to_owned(),
212 });
213 }
214 Ok(Self {
215 max_entries,
216 start_seq: None,
217 end_seq: None,
218 pending: BTreeMap::new(),
219 })
220 }
221
222 pub fn ingest_commit(
231 &mut self,
232 commit_seq: u64,
233 updates: impl IntoIterator<Item = (PageNumber, VersionPointer)>,
234 ) -> Result<Option<BuiltIndexSegment>> {
235 debug!(
236 bead_id = NATIVE_INDEX_BEAD_ID,
237 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
238 commit_seq = commit_seq,
239 "segment builder ingesting commit updates"
240 );
241
242 for (page, pointer) in updates {
243 if pointer.commit_seq != commit_seq {
244 return Err(FrankenError::TypeMismatch {
245 expected: format!("pointer.commit_seq == {commit_seq}"),
246 actual: pointer.commit_seq.to_string(),
247 });
248 }
249 self.pending
250 .insert((page.get(), pointer.commit_seq), pointer);
251 }
252
253 self.start_seq = Some(match self.start_seq {
254 Some(start) => start.min(commit_seq),
255 None => commit_seq,
256 });
257 self.end_seq = Some(match self.end_seq {
258 Some(end) => end.max(commit_seq),
259 None => commit_seq,
260 });
261
262 if self.pending.len() >= self.max_entries {
263 self.flush()
264 } else {
265 Ok(None)
266 }
267 }
268
269 pub fn flush(&mut self) -> Result<Option<BuiltIndexSegment>> {
275 if self.pending.is_empty() {
276 return Ok(None);
277 }
278
279 let start_seq = self
280 .start_seq
281 .ok_or_else(|| FrankenError::DatabaseCorrupt {
282 detail: "segment builder missing start_seq".to_owned(),
283 })?;
284 let end_seq = self.end_seq.ok_or_else(|| FrankenError::DatabaseCorrupt {
285 detail: "segment builder missing end_seq".to_owned(),
286 })?;
287
288 let mut entries = Vec::with_capacity(self.pending.len());
289 for ((page_raw, _commit_seq), pointer) in &self.pending {
290 let page = PageNumber::new(*page_raw).ok_or_else(|| FrankenError::DatabaseCorrupt {
291 detail: format!("segment builder produced invalid page number {page_raw}"),
292 })?;
293 entries.push((page, *pointer));
294 }
295
296 let segment = PageVersionIndexSegment::new(start_seq, end_seq, entries);
297 let object_id = derive_segment_object_id(&segment);
298
299 info!(
300 bead_id = NATIVE_INDEX_BEAD_ID,
301 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
302 start_seq = start_seq,
303 end_seq = end_seq,
304 segments_built = 1_u8,
305 entry_count = segment.entries.len(),
306 object_id = %object_id,
307 "segment builder flush complete"
308 );
309
310 self.pending.clear();
311 self.start_seq = None;
312 self.end_seq = None;
313
314 Ok(Some(BuiltIndexSegment { segment, object_id }))
315 }
316}
317
318pub fn lookup_page_version(
330 page: PageNumber,
331 snapshot_high: u64,
332 segments: &[PageVersionIndexSegment],
333 cache: &mut NativePageCache,
334 base_provider: &impl BasePageProvider,
335 patch_store: &impl PatchObjectStore,
336 symbol_loss_rate_estimate: f64,
337) -> Result<LookupResult> {
338 debug!(
339 bead_id = NATIVE_INDEX_BEAD_ID,
340 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
341 page = page.get(),
342 snapshot_high = snapshot_high,
343 "native index lookup started"
344 );
345 let mut deps = LookupDeps {
346 cache,
347 base_provider,
348 patch_store,
349 symbol_loss_rate_estimate,
350 };
351
352 if let Some(cached) = deps.cache.get(page, snapshot_high) {
353 log_lookup_path(true, false, 0, None);
354 return Ok(LookupResult {
355 page_bytes: cached.to_vec(),
356 resolved_pointer: None,
357 trace: LookupTrace {
358 cache_hit: true,
359 filter_hit: false,
360 segment_scans: 0,
361 resolved_commit_seq: None,
362 },
363 });
364 }
365
366 let filter_hit = version_maybe_present(page, snapshot_high, segments);
367 if !filter_hit {
368 return base_fallback_result(
369 page,
370 snapshot_high,
371 deps.cache,
372 deps.base_provider,
373 false,
374 0,
375 );
376 }
377
378 let (resolved_pointer, segment_scans) =
379 lookup_pointer_in_segments(page, snapshot_high, segments);
380 let Some(pointer) = resolved_pointer else {
381 return base_fallback_result(
382 page,
383 snapshot_high,
384 deps.cache,
385 deps.base_provider,
386 true,
387 segment_scans,
388 );
389 };
390
391 materialized_result(page, snapshot_high, pointer, segment_scans, &mut deps)
392}
393
394struct LookupDeps<'a> {
395 cache: &'a mut NativePageCache,
396 base_provider: &'a dyn BasePageProvider,
397 patch_store: &'a dyn PatchObjectStore,
398 symbol_loss_rate_estimate: f64,
399}
400
401fn base_fallback_result(
402 page: PageNumber,
403 snapshot_high: u64,
404 cache: &mut NativePageCache,
405 base_provider: &(impl BasePageProvider + ?Sized),
406 filter_hit: bool,
407 segment_scans: u64,
408) -> Result<LookupResult> {
409 let base = base_provider.load_base_page(page)?;
410 cache.insert(page, snapshot_high, base.clone());
411 log_lookup_path(false, filter_hit, segment_scans, None);
412 Ok(LookupResult {
413 page_bytes: base,
414 resolved_pointer: None,
415 trace: LookupTrace {
416 cache_hit: false,
417 filter_hit,
418 segment_scans,
419 resolved_commit_seq: None,
420 },
421 })
422}
423
424fn materialized_result(
425 page: PageNumber,
426 snapshot_high: u64,
427 pointer: VersionPointer,
428 segment_scans: u64,
429 deps: &mut LookupDeps<'_>,
430) -> Result<LookupResult> {
431 let patch_bytes = deps.patch_store.fetch_patch_object(pointer.patch_object)?;
432 let base_bytes = deps.base_provider.load_base_page(page)?;
433 let page_bytes = materialize_patch(pointer, &patch_bytes, &base_bytes, deps.patch_store, 0)?;
434 if pointer.patch_kind != PatchKind::FullImage {
435 warn!(
436 bead_id = NATIVE_INDEX_BEAD_ID,
437 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
438 object_id = %pointer.patch_object,
439 symbol_loss_rate_estimate = deps.symbol_loss_rate_estimate,
440 "native index lookup used repair/materialization path"
441 );
442 }
443
444 deps.cache.insert(page, snapshot_high, page_bytes.clone());
445 log_lookup_path(false, true, segment_scans, Some(pointer.commit_seq));
446 info!(
447 bead_id = NATIVE_INDEX_BEAD_ID,
448 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
449 page = page.get(),
450 snapshot_high = snapshot_high,
451 resolved_commit_seq = pointer.commit_seq,
452 "native index lookup resolved page version"
453 );
454 Ok(LookupResult {
455 page_bytes,
456 resolved_pointer: Some(pointer),
457 trace: LookupTrace {
458 cache_hit: false,
459 filter_hit: true,
460 segment_scans,
461 resolved_commit_seq: Some(pointer.commit_seq),
462 },
463 })
464}
465
466fn log_lookup_path(
467 cache_hit: bool,
468 filter_hit: bool,
469 segment_scans: u64,
470 resolved_commit_seq: Option<u64>,
471) {
472 debug!(
473 bead_id = NATIVE_INDEX_BEAD_ID,
474 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
475 cache_hit = cache_hit,
476 filter_hit = filter_hit,
477 segment_scans = segment_scans,
478 resolved_commit_seq = ?resolved_commit_seq,
479 "lookup path chosen"
480 );
481}
482
483#[must_use]
484fn version_maybe_present(
485 page: PageNumber,
486 snapshot_high: u64,
487 segments: &[PageVersionIndexSegment],
488) -> bool {
489 segments
490 .iter()
491 .any(|segment| segment.start_seq <= snapshot_high && segment.bloom.maybe_contains(page))
492}
493
494#[must_use]
495fn lookup_pointer_in_segments(
496 page: PageNumber,
497 snapshot_high: u64,
498 segments: &[PageVersionIndexSegment],
499) -> (Option<VersionPointer>, u64) {
500 let mut ordered: Vec<&PageVersionIndexSegment> = segments.iter().collect();
501 ordered.sort_by_key(|segment| segment.end_seq);
502 ordered.reverse();
503
504 let mut scans = 0_u64;
505 for segment in ordered {
506 if segment.start_seq > snapshot_high {
507 continue;
508 }
509 scans = scans.saturating_add(1);
510 if let Some(pointer) = segment.lookup(page, snapshot_high) {
511 return (Some(*pointer), scans);
512 }
513 }
514 (None, scans)
515}
516
517fn materialize_patch(
518 pointer: VersionPointer,
519 patch_bytes: &[u8],
520 base_page: &[u8],
521 patch_store: &(impl PatchObjectStore + ?Sized),
522 depth: usize,
523) -> Result<Vec<u8>> {
524 if depth > MAX_PATCH_DEPTH {
525 error!(
526 bead_id = NATIVE_INDEX_BEAD_ID,
527 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
528 reason_code = "materialize_depth_exceeded",
529 depth = depth,
530 "recursive patch materialization depth exceeded"
531 );
532 return Err(FrankenError::DatabaseCorrupt {
533 detail: "patch materialization depth exceeded".to_owned(),
534 });
535 }
536
537 match pointer.patch_kind {
538 PatchKind::FullImage => Ok(patch_bytes.to_vec()),
539 PatchKind::IntentLog => {
540 let mut out = resolve_base_bytes(pointer, base_page, patch_store)?;
541 apply_intent_log_patch(&mut out, patch_bytes)?;
542 Ok(out)
543 }
544 PatchKind::SparseXor => {
545 let mut out = resolve_base_bytes(pointer, base_page, patch_store)?;
546 apply_sparse_xor_patch(&mut out, patch_bytes)?;
547 Ok(out)
548 }
549 }
550}
551
552fn resolve_base_bytes(
553 pointer: VersionPointer,
554 base_page: &[u8],
555 patch_store: &(impl PatchObjectStore + ?Sized),
556) -> Result<Vec<u8>> {
557 if let Some(base_object) = pointer.base_hint {
558 patch_store.fetch_patch_object(base_object)
559 } else {
560 Ok(base_page.to_vec())
561 }
562}
563
564fn apply_intent_log_patch(out: &mut [u8], patch_bytes: &[u8]) -> Result<()> {
565 let mut cursor = 0_usize;
566 let op_count = read_u8(patch_bytes, &mut cursor, "intent.op_count")?;
567 for op_index in 0..op_count {
568 let offset = usize::from(read_u16_le(patch_bytes, &mut cursor, "intent.op.offset")?);
569 let len = usize::from(read_u16_le(patch_bytes, &mut cursor, "intent.op.len")?);
570 let data = read_slice(patch_bytes, &mut cursor, len, "intent.op.data")?;
571 let end = offset
572 .checked_add(len)
573 .ok_or_else(|| FrankenError::DatabaseCorrupt {
574 detail: "intent patch offset overflow".to_owned(),
575 })?;
576 if end > out.len() {
577 return Err(FrankenError::DatabaseCorrupt {
578 detail: format!(
579 "intent patch op {op_index} out of bounds: end={end}, page_len={}",
580 out.len()
581 ),
582 });
583 }
584 out[offset..end].copy_from_slice(data);
585 }
586 if cursor != patch_bytes.len() {
587 return Err(FrankenError::DatabaseCorrupt {
588 detail: format!(
589 "intent patch trailing bytes: parsed={cursor}, actual={}",
590 patch_bytes.len()
591 ),
592 });
593 }
594 Ok(())
595}
596
597fn apply_sparse_xor_patch(out: &mut [u8], patch_bytes: &[u8]) -> Result<()> {
598 let mut cursor = 0_usize;
599 let op_count = read_u8(patch_bytes, &mut cursor, "xor.op_count")?;
600 for op_index in 0..op_count {
601 let offset = usize::from(read_u16_le(patch_bytes, &mut cursor, "xor.op.offset")?);
602 let len = usize::from(read_u16_le(patch_bytes, &mut cursor, "xor.op.len")?);
603 let data = read_slice(patch_bytes, &mut cursor, len, "xor.op.data")?;
604 let end = offset
605 .checked_add(len)
606 .ok_or_else(|| FrankenError::DatabaseCorrupt {
607 detail: "sparse-xor patch offset overflow".to_owned(),
608 })?;
609 if end > out.len() {
610 return Err(FrankenError::DatabaseCorrupt {
611 detail: format!(
612 "sparse-xor patch op {op_index} out of bounds: end={end}, page_len={}",
613 out.len()
614 ),
615 });
616 }
617 for (dst, delta) in out[offset..end].iter_mut().zip(data.iter()) {
618 *dst ^= *delta;
619 }
620 }
621 if cursor != patch_bytes.len() {
622 return Err(FrankenError::DatabaseCorrupt {
623 detail: format!(
624 "sparse-xor patch trailing bytes: parsed={cursor}, actual={}",
625 patch_bytes.len()
626 ),
627 });
628 }
629 Ok(())
630}
631
632fn read_u8(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u8> {
633 let end = cursor
634 .checked_add(1)
635 .ok_or_else(|| FrankenError::DatabaseCorrupt {
636 detail: format!("{field} overflow"),
637 })?;
638 if end > bytes.len() {
639 return Err(FrankenError::DatabaseCorrupt {
640 detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
641 });
642 }
643 let value = bytes[*cursor];
644 *cursor = end;
645 Ok(value)
646}
647
648fn read_u16_le(bytes: &[u8], cursor: &mut usize, field: &str) -> Result<u16> {
649 let end = cursor
650 .checked_add(2)
651 .ok_or_else(|| FrankenError::DatabaseCorrupt {
652 detail: format!("{field} overflow"),
653 })?;
654 if end > bytes.len() {
655 return Err(FrankenError::DatabaseCorrupt {
656 detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
657 });
658 }
659 let raw = [bytes[*cursor], bytes[*cursor + 1]];
660 *cursor = end;
661 Ok(u16::from_le_bytes(raw))
662}
663
664fn read_slice<'a>(
665 bytes: &'a [u8],
666 cursor: &mut usize,
667 len: usize,
668 field: &str,
669) -> Result<&'a [u8]> {
670 let end = cursor
671 .checked_add(len)
672 .ok_or_else(|| FrankenError::DatabaseCorrupt {
673 detail: format!("{field} overflow"),
674 })?;
675 if end > bytes.len() {
676 return Err(FrankenError::DatabaseCorrupt {
677 detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
678 });
679 }
680 let slice = &bytes[*cursor..end];
681 *cursor = end;
682 Ok(slice)
683}
684
685#[must_use]
687pub fn derive_segment_object_id(segment: &PageVersionIndexSegment) -> ObjectId {
688 let canonical = canonical_segment_bytes(segment);
689 ObjectId::derive_from_canonical_bytes(&canonical)
690}
691
692#[must_use]
693fn canonical_segment_bytes(segment: &PageVersionIndexSegment) -> Vec<u8> {
694 let mut out = Vec::new();
695 out.extend_from_slice(&segment.start_seq.to_le_bytes());
696 out.extend_from_slice(&segment.end_seq.to_le_bytes());
697 let count = u64::try_from(segment.entries.len()).unwrap_or(u64::MAX);
698 out.extend_from_slice(&count.to_le_bytes());
699 for (page, pointer) in &segment.entries {
700 out.extend_from_slice(&page.get().to_le_bytes());
701 let vp_bytes = pointer.to_bytes();
702 let vp_len = u64::try_from(vp_bytes.len()).unwrap_or(u64::MAX);
703 out.extend_from_slice(&vp_len.to_le_bytes());
704 out.extend_from_slice(&vp_bytes);
705 }
706 out
707}
708
709pub fn preflight_native_index_integrity(
717 marker_segment_blobs: &[Vec<u8>],
718 repair_available: bool,
719 rebuild_available: bool,
720) -> Result<()> {
721 let markers = scan_commit_markers(marker_segment_blobs)?;
722 if !markers.is_empty() && !repair_available && !rebuild_available {
723 error!(
724 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
725 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
726 reason_code = "index_unrebuildable_with_markers",
727 marker_count = markers.len(),
728 repair_available = repair_available,
729 rebuild_available = rebuild_available,
730 "critical integrity failure detected before repair attempt"
731 );
732 return Err(FrankenError::DatabaseCorrupt {
733 detail: format!(
734 "reason_code=index_unrebuildable_with_markers marker_count={} repair_available={repair_available} rebuild_available={rebuild_available}",
735 markers.len()
736 ),
737 });
738 }
739 Ok(())
740}
741
742pub fn repair_index_segments_from_ecs(
752 segment_refs: &[NativeIndexSegmentRef],
753 local_store: &impl NativeIndexSegmentStore,
754 remote_store: &impl NativeIndexSegmentStore,
755 symbol_loss_rate_estimate: f64,
756 boldness: BoldnessConstraint,
757) -> Result<IndexRepairReport> {
758 if !boldness.permits_repair(symbol_loss_rate_estimate) {
759 warn!(
760 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
761 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
762 reason_code = "boldness_violation_blocked_repair",
763 symbol_loss_rate_estimate = symbol_loss_rate_estimate,
764 max_repair_symbol_loss_rate = boldness.max_repair_symbol_loss_rate,
765 "repair blocked by boldness constraint"
766 );
767 return Err(FrankenError::DatabaseCorrupt {
768 detail: format!(
769 "reason_code=boldness_violation_blocked_repair symbol_loss_rate_estimate={symbol_loss_rate_estimate:.6} max_repair_symbol_loss_rate={:.6}",
770 boldness.max_repair_symbol_loss_rate
771 ),
772 });
773 }
774
775 let mut ordered_refs = segment_refs.to_vec();
776 ordered_refs.sort_by_key(|entry| (entry.start_seq, entry.end_seq, *entry.object_id.as_bytes()));
777
778 debug!(
779 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
780 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
781 segment_ref_count = ordered_refs.len(),
782 "starting native index repair from surviving symbols"
783 );
784
785 let mut segments = Vec::with_capacity(ordered_refs.len());
786 let mut repaired_from_local = 0_u64;
787 let mut repaired_from_remote = 0_u64;
788 let mut missing = Vec::new();
789
790 for entry in ordered_refs {
791 match try_fetch_valid_segment(entry, local_store) {
792 Ok(segment) => {
793 repaired_from_local = repaired_from_local.saturating_add(1);
794 segments.push(segment);
795 continue;
796 }
797 Err(local_error) => {
798 warn!(
799 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
800 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
801 object_id = %entry.object_id,
802 start_seq = entry.start_seq,
803 end_seq = entry.end_seq,
804 error = %local_error,
805 "local segment fetch failed; trying remote recovery path"
806 );
807 }
808 }
809
810 match try_fetch_valid_segment(entry, remote_store) {
811 Ok(segment) => {
812 repaired_from_remote = repaired_from_remote.saturating_add(1);
813 segments.push(segment);
814 }
815 Err(remote_error) => {
816 error!(
817 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
818 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
819 object_id = %entry.object_id,
820 start_seq = entry.start_seq,
821 end_seq = entry.end_seq,
822 error = %remote_error,
823 reason_code = "index_repair_segment_irrecoverable",
824 "segment irrecoverable from both local and remote symbols"
825 );
826 missing.push(entry.object_id);
827 }
828 }
829 }
830
831 if !missing.is_empty() {
832 return Err(FrankenError::DatabaseCorrupt {
833 detail: format!(
834 "reason_code=index_repair_incomplete irrecoverable_segments={} first_irrecoverable_object={}",
835 missing.len(),
836 missing
837 .first()
838 .map_or_else(|| "none".to_owned(), ToString::to_string)
839 ),
840 });
841 }
842
843 segments.sort_by_key(|segment| segment.end_seq);
844
845 info!(
846 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
847 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
848 repaired_from_local = repaired_from_local,
849 repaired_from_remote = repaired_from_remote,
850 segments_repaired = segments.len(),
851 "native index repair complete"
852 );
853
854 Ok(IndexRepairReport {
855 segments,
856 repaired_from_local,
857 repaired_from_remote,
858 })
859}
860
861pub fn rebuild_index_from_marker_stream(
869 marker_segment_blobs: &[Vec<u8>],
870 capsule_source: &impl CommitCapsuleIndexSource,
871 max_entries: usize,
872) -> Result<IndexRebuildReport> {
873 let markers = scan_commit_markers(marker_segment_blobs)?;
874 if markers.is_empty() {
875 return Ok(IndexRebuildReport {
876 markers,
877 segments: Vec::new(),
878 });
879 }
880
881 let start_seq = markers.first().map_or(0_u64, |record| record.commit_seq);
882 let end_seq = markers.last().map_or(0_u64, |record| record.commit_seq);
883 info!(
884 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
885 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
886 start_seq = start_seq,
887 end_seq = end_seq,
888 segments_built = 0_u64,
889 "native index rebuild start"
890 );
891
892 let mut builder = SegmentBuilder::new(max_entries)?;
893 let mut built_segments = Vec::new();
894
895 for marker in &markers {
896 let capsule_id = ObjectId::from_bytes(marker.capsule_object_id);
897 let updates =
898 capsule_source
899 .updates_for_commit(marker.commit_seq, capsule_id)
900 .map_err(|source_error| {
901 error!(
902 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
903 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
904 reason_code = "index_unrebuildable_with_markers",
905 commit_seq = marker.commit_seq,
906 capsule_object_id = %capsule_id,
907 error = %source_error,
908 "marker stream exists but commit capsule updates are unrecoverable"
909 );
910 FrankenError::DatabaseCorrupt {
911 detail: format!(
912 "reason_code=index_unrebuildable_with_markers commit_seq={} capsule_object_id={} source_error={source_error}",
913 marker.commit_seq, capsule_id
914 ),
915 }
916 })?;
917
918 if let Some(segment) = builder.ingest_commit(marker.commit_seq, updates)? {
919 built_segments.push(segment);
920 }
921 }
922
923 if let Some(segment) = builder.flush()? {
924 built_segments.push(segment);
925 }
926
927 info!(
928 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
929 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
930 start_seq = start_seq,
931 end_seq = end_seq,
932 segments_built = built_segments.len(),
933 "native index rebuild complete"
934 );
935
936 Ok(IndexRebuildReport {
937 markers,
938 segments: built_segments,
939 })
940}
941
942pub fn emergency_linear_scan_lookup(
949 page: PageNumber,
950 snapshot_high: u64,
951 marker_segment_blobs: &[Vec<u8>],
952 capsule_source: &impl CommitCapsuleIndexSource,
953 boldness: BoldnessConstraint,
954 evidence_state: &str,
955) -> Result<Option<VersionPointer>> {
956 if !boldness.allow_emergency_linear_scan {
957 warn!(
958 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
959 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
960 reason_code = "boldness_violation_blocked_linear_scan",
961 attempted_page = page.get(),
962 attempted_snapshot_high = snapshot_high,
963 evidence_state = evidence_state,
964 "boldness violation attempt blocked"
965 );
966 return Err(FrankenError::DatabaseCorrupt {
967 detail: format!(
968 "reason_code=boldness_violation_blocked_linear_scan attempted_page={} attempted_snapshot_high={} evidence_state={evidence_state}",
969 page.get(),
970 snapshot_high
971 ),
972 });
973 }
974
975 let markers = scan_commit_markers(marker_segment_blobs)?;
976 for marker in markers.iter().rev() {
977 if marker.commit_seq > snapshot_high {
978 continue;
979 }
980 let capsule_id = ObjectId::from_bytes(marker.capsule_object_id);
981 let updates = capsule_source.updates_for_commit(marker.commit_seq, capsule_id)?;
982 if let Some((_, pointer)) = updates
983 .into_iter()
984 .find(|(candidate, pointer)| *candidate == page && pointer.commit_seq <= snapshot_high)
985 {
986 info!(
987 bead_id = NATIVE_INDEX_REPAIR_BEAD_ID,
988 logging_standard = NATIVE_INDEX_LOGGING_STANDARD,
989 page = page.get(),
990 snapshot_high = snapshot_high,
991 resolved_commit_seq = pointer.commit_seq,
992 "native index emergency linear scan resolved version pointer"
993 );
994 return Ok(Some(pointer));
995 }
996 }
997 Ok(None)
998}
999
1000fn scan_commit_markers(marker_segment_blobs: &[Vec<u8>]) -> Result<Vec<CommitMarkerRecord>> {
1001 let mut ordered_segments: Vec<(u64, Vec<CommitMarkerRecord>)> =
1002 Vec::with_capacity(marker_segment_blobs.len());
1003 for bytes in marker_segment_blobs {
1004 if bytes.len() < MARKER_SEGMENT_HEADER_BYTES {
1005 return Err(FrankenError::DatabaseCorrupt {
1006 detail: format!(
1007 "marker segment shorter than header: bytes={} header_bytes={MARKER_SEGMENT_HEADER_BYTES}",
1008 bytes.len()
1009 ),
1010 });
1011 }
1012 let header =
1013 MarkerSegmentHeader::decode(&bytes[..MARKER_SEGMENT_HEADER_BYTES]).map_err(|err| {
1014 FrankenError::DatabaseCorrupt {
1015 detail: format!("invalid marker segment header: {err}"),
1016 }
1017 })?;
1018 let records = recover_valid_prefix(bytes).map_err(|err| FrankenError::DatabaseCorrupt {
1019 detail: format!("marker segment prefix recovery failed: {err}"),
1020 })?;
1021 ordered_segments.push((header.start_commit_seq, records));
1022 }
1023
1024 ordered_segments.sort_by_key(|(start_seq, _)| *start_seq);
1025 let mut combined = Vec::new();
1026 let mut expected_next_seq: Option<u64> = None;
1027 for (_, records) in ordered_segments {
1028 for record in records {
1029 if let Some(expected) = expected_next_seq
1030 && record.commit_seq != expected
1031 {
1032 return Err(FrankenError::DatabaseCorrupt {
1033 detail: format!(
1034 "marker stream commit gap: expected {expected}, found {}",
1035 record.commit_seq
1036 ),
1037 });
1038 }
1039 expected_next_seq = Some(record.commit_seq.saturating_add(1));
1040 combined.push(record);
1041 }
1042 }
1043 Ok(combined)
1044}
1045
1046fn try_fetch_valid_segment(
1047 entry: NativeIndexSegmentRef,
1048 store: &impl NativeIndexSegmentStore,
1049) -> Result<PageVersionIndexSegment> {
1050 let segment = store.fetch_index_segment(entry.object_id)?;
1051 if segment.start_seq != entry.start_seq || segment.end_seq != entry.end_seq {
1052 return Err(FrankenError::DatabaseCorrupt {
1053 detail: format!(
1054 "segment bounds mismatch for object {}: expected [{}..={}], found [{}..={}]",
1055 entry.object_id, entry.start_seq, entry.end_seq, segment.start_seq, segment.end_seq
1056 ),
1057 });
1058 }
1059 let recomputed = derive_segment_object_id(&segment);
1060 if recomputed != entry.object_id {
1061 return Err(FrankenError::DatabaseCorrupt {
1062 detail: format!(
1063 "segment object id mismatch: expected {}, recomputed {}",
1064 entry.object_id, recomputed
1065 ),
1066 });
1067 }
1068 Ok(segment)
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use std::cell::Cell;
1074
1075 use super::*;
1076 use crate::commit_marker::{
1077 COMMIT_MARKER_RECORD_BYTES, CommitMarkerRecord, MARKER_SEGMENT_HEADER_BYTES,
1078 MarkerSegmentHeader,
1079 };
1080
1081 type CapsuleKey = (u64, [u8; 16]);
1082 type CapsuleUpdates = Vec<(PageNumber, VersionPointer)>;
1083
1084 #[derive(Debug, Clone)]
1085 struct TestBasePages {
1086 pages: BTreeMap<u32, Vec<u8>>,
1087 loads: Cell<u64>,
1088 }
1089
1090 impl TestBasePages {
1091 fn new(entries: impl IntoIterator<Item = (PageNumber, Vec<u8>)>) -> Self {
1092 let mut pages = BTreeMap::new();
1093 for (page, bytes) in entries {
1094 pages.insert(page.get(), bytes);
1095 }
1096 Self {
1097 pages,
1098 loads: Cell::new(0),
1099 }
1100 }
1101
1102 fn loads(&self) -> u64 {
1103 self.loads.get()
1104 }
1105 }
1106
1107 impl BasePageProvider for TestBasePages {
1108 fn load_base_page(&self, page: PageNumber) -> Result<Vec<u8>> {
1109 self.loads.set(self.loads.get().saturating_add(1));
1110 self.pages
1111 .get(&page.get())
1112 .cloned()
1113 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1114 detail: format!("missing base page {}", page.get()),
1115 })
1116 }
1117 }
1118
1119 #[derive(Debug, Clone)]
1120 struct TestPatchStore {
1121 objects: BTreeMap<[u8; 16], Vec<u8>>,
1122 fetches: Cell<u64>,
1123 }
1124
1125 impl TestPatchStore {
1126 fn new(entries: impl IntoIterator<Item = (ObjectId, Vec<u8>)>) -> Self {
1127 let mut objects = BTreeMap::new();
1128 for (oid, payload) in entries {
1129 objects.insert(*oid.as_bytes(), payload);
1130 }
1131 Self {
1132 objects,
1133 fetches: Cell::new(0),
1134 }
1135 }
1136
1137 fn fetches(&self) -> u64 {
1138 self.fetches.get()
1139 }
1140 }
1141
1142 impl PatchObjectStore for TestPatchStore {
1143 fn fetch_patch_object(&self, object_id: ObjectId) -> Result<Vec<u8>> {
1144 self.fetches.set(self.fetches.get().saturating_add(1));
1145 self.objects
1146 .get(object_id.as_bytes())
1147 .cloned()
1148 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1149 detail: format!("missing patch object {object_id}"),
1150 })
1151 }
1152 }
1153
1154 #[derive(Debug, Clone, Default)]
1155 struct TestSegmentStore {
1156 segments: BTreeMap<[u8; 16], PageVersionIndexSegment>,
1157 }
1158
1159 impl TestSegmentStore {
1160 fn new(entries: impl IntoIterator<Item = (ObjectId, PageVersionIndexSegment)>) -> Self {
1161 let mut segments = BTreeMap::new();
1162 for (id, segment) in entries {
1163 segments.insert(*id.as_bytes(), segment);
1164 }
1165 Self { segments }
1166 }
1167 }
1168
1169 impl NativeIndexSegmentStore for TestSegmentStore {
1170 fn fetch_index_segment(&self, object_id: ObjectId) -> Result<PageVersionIndexSegment> {
1171 self.segments
1172 .get(object_id.as_bytes())
1173 .cloned()
1174 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1175 detail: format!("missing index segment object {object_id}"),
1176 })
1177 }
1178 }
1179
1180 #[derive(Debug, Clone, Default)]
1181 struct TestCapsuleSource {
1182 updates: BTreeMap<CapsuleKey, CapsuleUpdates>,
1183 }
1184
1185 impl TestCapsuleSource {
1186 fn with_update(
1187 mut self,
1188 commit_seq: u64,
1189 capsule_seed: u8,
1190 updates: Vec<(PageNumber, VersionPointer)>,
1191 ) -> Self {
1192 self.updates
1193 .insert((commit_seq, [capsule_seed; 16]), updates);
1194 self
1195 }
1196 }
1197
1198 impl CommitCapsuleIndexSource for TestCapsuleSource {
1199 fn updates_for_commit(
1200 &self,
1201 commit_seq: u64,
1202 capsule_object_id: ObjectId,
1203 ) -> Result<Vec<(PageNumber, VersionPointer)>> {
1204 self.updates
1205 .get(&(commit_seq, *capsule_object_id.as_bytes()))
1206 .cloned()
1207 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1208 detail: format!(
1209 "missing capsule updates commit_seq={commit_seq} capsule_object_id={capsule_object_id}"
1210 ),
1211 })
1212 }
1213 }
1214
1215 fn page(n: u32) -> PageNumber {
1216 PageNumber::new(n).expect("non-zero page number")
1217 }
1218
1219 fn oid(seed: u8) -> ObjectId {
1220 ObjectId::from_bytes([seed; 16])
1221 }
1222
1223 fn pointer(
1224 commit_seq: u64,
1225 patch_object_seed: u8,
1226 patch_kind: PatchKind,
1227 base_hint: Option<u8>,
1228 ) -> VersionPointer {
1229 VersionPointer {
1230 commit_seq,
1231 patch_object: oid(patch_object_seed),
1232 patch_kind,
1233 base_hint: base_hint.map(oid),
1234 }
1235 }
1236
1237 fn encode_intent_patch(ops: &[(u16, &[u8])]) -> Vec<u8> {
1238 let mut out = Vec::new();
1239 out.push(u8::try_from(ops.len()).expect("op count fits u8"));
1240 for (offset, data) in ops {
1241 out.extend_from_slice(&offset.to_le_bytes());
1242 out.extend_from_slice(
1243 &u16::try_from(data.len())
1244 .expect("op data len fits u16")
1245 .to_le_bytes(),
1246 );
1247 out.extend_from_slice(data);
1248 }
1249 out
1250 }
1251
1252 fn encode_sparse_xor_patch(ops: &[(u16, &[u8])]) -> Vec<u8> {
1253 encode_intent_patch(ops)
1254 }
1255
1256 fn marker_record(
1257 commit_seq: u64,
1258 capsule_seed: u8,
1259 prev_marker: [u8; 16],
1260 ) -> CommitMarkerRecord {
1261 CommitMarkerRecord::new(
1262 commit_seq,
1263 1_700_000_000_000_000_000_u64.saturating_add(commit_seq),
1264 [capsule_seed; 16],
1265 [capsule_seed.wrapping_add(1); 16],
1266 prev_marker,
1267 )
1268 }
1269
1270 fn marker_segment_blob(start_seq: u64, records: &[CommitMarkerRecord]) -> Vec<u8> {
1271 let mut out = Vec::new();
1272 let segment_id = start_seq / 1_000_000;
1273 let header = MarkerSegmentHeader::new(segment_id, start_seq);
1274 out.extend_from_slice(&header.encode());
1275 for record in records {
1276 out.extend_from_slice(&record.encode());
1277 }
1278 let expected_len = MARKER_SEGMENT_HEADER_BYTES + COMMIT_MARKER_RECORD_BYTES * records.len();
1279 assert_eq!(out.len(), expected_len);
1280 out
1281 }
1282
1283 #[test]
1284 fn test_lookup_latest_version() {
1285 let p = page(7);
1286 let vp10 = pointer(10, 0x10, PatchKind::FullImage, None);
1287 let vp20 = pointer(20, 0x20, PatchKind::FullImage, None);
1288 let seg10 = PageVersionIndexSegment::new(10, 10, vec![(p, vp10)]);
1289 let seg20 = PageVersionIndexSegment::new(20, 20, vec![(p, vp20)]);
1290 let base = TestBasePages::new([(p, b"base".to_vec())]);
1291 let store =
1292 TestPatchStore::new([(oid(0x10), b"v10".to_vec()), (oid(0x20), b"v20".to_vec())]);
1293
1294 let mut cache = NativePageCache::default();
1295 let result_high = lookup_page_version(
1296 p,
1297 25,
1298 &[seg10.clone(), seg20.clone()],
1299 &mut cache,
1300 &base,
1301 &store,
1302 0.0,
1303 )
1304 .expect("lookup");
1305 assert_eq!(result_high.page_bytes, b"v20".to_vec());
1306 assert_eq!(result_high.resolved_pointer, Some(vp20));
1307
1308 let mut cache2 = NativePageCache::default();
1309 let result_mid =
1310 lookup_page_version(p, 15, &[seg10, seg20], &mut cache2, &base, &store, 0.0)
1311 .expect("lookup");
1312 assert_eq!(result_mid.page_bytes, b"v10".to_vec());
1313 assert_eq!(result_mid.resolved_pointer, Some(vp10));
1314 }
1315
1316 #[test]
1317 fn test_filter_negative_path_has_no_false_negatives() {
1318 let target = page(5);
1319 let other = page(9);
1320 let vp = pointer(12, 0x30, PatchKind::FullImage, None);
1321 let segment = PageVersionIndexSegment::new(12, 12, vec![(other, vp)]);
1322 let base = TestBasePages::new([(target, b"base5".to_vec()), (other, b"base9".to_vec())]);
1323 let store = TestPatchStore::new([(oid(0x30), b"other-version".to_vec())]);
1324
1325 let mut cache = NativePageCache::default();
1326 let negative = lookup_page_version(
1327 target,
1328 20,
1329 std::slice::from_ref(&segment),
1330 &mut cache,
1331 &base,
1332 &store,
1333 0.0,
1334 )
1335 .expect("negative path");
1336 assert!(!negative.trace.filter_hit);
1337 assert_eq!(negative.page_bytes, b"base5".to_vec());
1338 assert_eq!(store.fetches(), 0);
1339
1340 let mut cache2 = NativePageCache::default();
1341 let positive = lookup_page_version(other, 20, &[segment], &mut cache2, &base, &store, 0.0)
1342 .expect("positive path");
1343 assert!(positive.trace.filter_hit);
1344 assert_eq!(positive.page_bytes, b"other-version".to_vec());
1345 }
1346
1347 #[test]
1348 fn test_materialization_intent_and_sparse_xor() {
1349 let p = page(3);
1350 let base = TestBasePages::new([(p, b"aaaa".to_vec())]);
1351 let intent_ptr = pointer(10, 0x40, PatchKind::IntentLog, None);
1352 let xor_ptr = pointer(20, 0x50, PatchKind::SparseXor, None);
1353 let seg_intent = PageVersionIndexSegment::new(10, 10, vec![(p, intent_ptr)]);
1354 let seg_xor = PageVersionIndexSegment::new(20, 20, vec![(p, xor_ptr)]);
1355
1356 let intent_patch = encode_intent_patch(&[(1, b"BC")]);
1357 let xor_patch = encode_sparse_xor_patch(&[(0, &[0x01, 0x02, 0x03, 0x04])]);
1358 let store = TestPatchStore::new([(oid(0x40), intent_patch), (oid(0x50), xor_patch)]);
1359
1360 let mut cache = NativePageCache::default();
1361 let intent = lookup_page_version(
1362 p,
1363 10,
1364 &[seg_intent, seg_xor.clone()],
1365 &mut cache,
1366 &base,
1367 &store,
1368 0.0,
1369 )
1370 .expect("intent materialization");
1371 assert_eq!(intent.page_bytes, b"aBCa".to_vec());
1372
1373 let mut cache2 = NativePageCache::default();
1374 let sparse = lookup_page_version(p, 20, &[seg_xor], &mut cache2, &base, &store, 0.0)
1375 .expect("xor materialization");
1376 assert_eq!(sparse.page_bytes, vec![96, 99, 98, 101]);
1377 }
1378
1379 #[test]
1380 fn test_segment_construction_deterministic() {
1381 let mut builder_a = SegmentBuilder::new(16).expect("builder");
1382 let mut builder_b = SegmentBuilder::new(16).expect("builder");
1383
1384 let updates_a = vec![
1385 (page(7), pointer(100, 0x60, PatchKind::FullImage, None)),
1386 (
1387 page(2),
1388 pointer(100, 0x61, PatchKind::IntentLog, Some(0x62)),
1389 ),
1390 ];
1391 let updates_b = vec![
1392 (
1393 page(2),
1394 pointer(100, 0x61, PatchKind::IntentLog, Some(0x62)),
1395 ),
1396 (page(7), pointer(100, 0x60, PatchKind::FullImage, None)),
1397 ];
1398
1399 assert!(
1400 builder_a
1401 .ingest_commit(100, updates_a)
1402 .expect("ingest")
1403 .is_none()
1404 );
1405 assert!(
1406 builder_b
1407 .ingest_commit(100, updates_b)
1408 .expect("ingest")
1409 .is_none()
1410 );
1411
1412 let built_a = builder_a.flush().expect("flush").expect("segment");
1413 let built_b = builder_b.flush().expect("flush").expect("segment");
1414 assert_eq!(built_a.segment.entries, built_b.segment.entries);
1415 assert_eq!(built_a.object_id, built_b.object_id);
1416 }
1417
1418 fn run_e2e_path_case() {
1419 let p = page(11);
1420 let base_bytes = vec![0x10, 0x20, 0x30, 0x40];
1421 let base = TestBasePages::new([(p, base_bytes.clone())]);
1422 let pointer = pointer(30, 0x71, PatchKind::SparseXor, Some(0x70));
1423 let segment = PageVersionIndexSegment::new(30, 30, vec![(p, pointer)]);
1424 let xor_patch = encode_sparse_xor_patch(&[(2, &[0xFF, 0x0F])]);
1425 let store = TestPatchStore::new([(oid(0x70), base_bytes), (oid(0x71), xor_patch)]);
1426
1427 let mut cache = NativePageCache::default();
1428 let first = lookup_page_version(
1429 p,
1430 30,
1431 std::slice::from_ref(&segment),
1432 &mut cache,
1433 &base,
1434 &store,
1435 0.02,
1436 )
1437 .expect("first lookup");
1438 assert!(!first.trace.cache_hit);
1439 assert!(first.trace.filter_hit);
1440 assert_eq!(first.trace.segment_scans, 1);
1441 assert_eq!(first.trace.resolved_commit_seq, Some(30));
1442 assert_eq!(first.page_bytes, vec![0x10, 0x20, 0xCF, 0x4F]);
1443
1444 let second = lookup_page_version(p, 30, &[segment], &mut cache, &base, &store, 0.02)
1445 .expect("second lookup");
1446 assert!(second.trace.cache_hit);
1447 assert_eq!(second.page_bytes, first.page_bytes);
1448 assert_eq!(store.fetches(), 2);
1449 assert_eq!(base.loads(), 1);
1450 }
1451
1452 #[test]
1453 fn test_e2e_cache_miss_filter_hit_index_scan_fetch_materialize() {
1454 run_e2e_path_case();
1455 }
1456
1457 #[test]
1458 fn test_bd_1hi_32_unit_compliance_gate() {
1459 assert_eq!(NATIVE_INDEX_BEAD_ID, "bd-1hi.32");
1460 assert_eq!(NATIVE_INDEX_LOGGING_STANDARD, "bd-1fpm");
1461 let store = TestPatchStore::new(std::iter::empty::<(ObjectId, Vec<u8>)>());
1462 let err = materialize_patch(
1463 pointer(1, 0xAA, PatchKind::FullImage, None),
1464 b"",
1465 b"",
1466 &store,
1467 MAX_PATCH_DEPTH + 1,
1468 )
1469 .expect_err("depth guard");
1470 assert!(err.to_string().contains("depth exceeded"));
1471 }
1472
1473 #[test]
1474 fn prop_bd_1hi_32_structure_compliance() {
1475 for seed in 1_u8..=16 {
1476 let mut builder = SegmentBuilder::new(8).expect("builder");
1477 let updates = vec![
1478 (
1479 page(u32::from(seed)),
1480 pointer(500, seed, PatchKind::FullImage, None),
1481 ),
1482 (
1483 page(u32::from(seed) + 100),
1484 pointer(500, seed.wrapping_add(1), PatchKind::SparseXor, Some(seed)),
1485 ),
1486 ];
1487 assert!(
1488 builder
1489 .ingest_commit(500, updates)
1490 .expect("ingest")
1491 .is_none()
1492 );
1493 let built = builder.flush().expect("flush").expect("segment");
1494 assert!(!built.segment.entries.is_empty());
1495 let recomputed = derive_segment_object_id(&built.segment);
1496 assert_eq!(built.object_id, recomputed);
1497 }
1498 }
1499
1500 #[test]
1501 fn test_e2e_bd_1hi_32_compliance() {
1502 run_e2e_path_case();
1503 }
1504
1505 #[test]
1506 fn test_index_rebuild_from_markers() {
1507 let marker_100 = marker_record(100, 0x10, [0_u8; 16]);
1508 let marker_101 = marker_record(101, 0x11, marker_100.marker_id);
1509 let marker_blob = marker_segment_blob(100, &[marker_100, marker_101]);
1510
1511 let source = TestCapsuleSource::default()
1512 .with_update(
1513 100,
1514 0x10,
1515 vec![(page(3), pointer(100, 0x80, PatchKind::FullImage, None))],
1516 )
1517 .with_update(
1518 101,
1519 0x11,
1520 vec![
1521 (
1522 page(3),
1523 pointer(101, 0x81, PatchKind::IntentLog, Some(0x82)),
1524 ),
1525 (page(9), pointer(101, 0x83, PatchKind::FullImage, None)),
1526 ],
1527 );
1528
1529 let rebuilt_a =
1530 rebuild_index_from_marker_stream(std::slice::from_ref(&marker_blob), &source, 2)
1531 .expect("rebuild from markers");
1532 let rebuilt_b = rebuild_index_from_marker_stream(&[marker_blob], &source, 2)
1533 .expect("deterministic rebuild");
1534 assert_eq!(rebuilt_a.segments, rebuilt_b.segments);
1535
1536 let segments: Vec<PageVersionIndexSegment> = rebuilt_a
1537 .segments
1538 .iter()
1539 .map(|segment| segment.segment.clone())
1540 .collect();
1541 let (resolved, scans) = lookup_pointer_in_segments(page(3), 101, &segments);
1542 assert_eq!(
1543 resolved,
1544 Some(pointer(101, 0x81, PatchKind::IntentLog, Some(0x82)))
1545 );
1546 assert!(scans >= 1);
1547 }
1548
1549 #[test]
1550 fn test_index_repair_from_ecs() {
1551 let p1 = page(4);
1552 let p2 = page(8);
1553 let seg_local = PageVersionIndexSegment::new(
1554 200,
1555 200,
1556 vec![(p1, pointer(200, 0x90, PatchKind::FullImage, None))],
1557 );
1558 let seg_remote = PageVersionIndexSegment::new(
1559 201,
1560 201,
1561 vec![(p2, pointer(201, 0x91, PatchKind::SparseXor, Some(0x92)))],
1562 );
1563 let id_local = derive_segment_object_id(&seg_local);
1564 let id_remote = derive_segment_object_id(&seg_remote);
1565
1566 let refs = vec![
1567 NativeIndexSegmentRef {
1568 start_seq: 200,
1569 end_seq: 200,
1570 object_id: id_local,
1571 },
1572 NativeIndexSegmentRef {
1573 start_seq: 201,
1574 end_seq: 201,
1575 object_id: id_remote,
1576 },
1577 ];
1578
1579 let local_store = TestSegmentStore::new([(id_local, seg_local.clone())]);
1580 let remote_store = TestSegmentStore::new([(id_local, seg_local), (id_remote, seg_remote)]);
1581 let report = repair_index_segments_from_ecs(
1582 &refs,
1583 &local_store,
1584 &remote_store,
1585 0.02,
1586 BoldnessConstraint::strict(),
1587 )
1588 .expect("repair from surviving symbols");
1589
1590 assert_eq!(report.repaired_from_local, 1);
1591 assert_eq!(report.repaired_from_remote, 1);
1592 assert_eq!(report.segments.len(), 2);
1593 let (resolved, _) = lookup_pointer_in_segments(p2, 201, &report.segments);
1594 assert_eq!(
1595 resolved,
1596 Some(pointer(201, 0x91, PatchKind::SparseXor, Some(0x92)))
1597 );
1598 }
1599
1600 #[test]
1601 fn test_boldness_constraint() {
1602 let marker = marker_record(300, 0x33, [0_u8; 16]);
1603 let marker_blob = marker_segment_blob(300, std::slice::from_ref(&marker));
1604 let source = TestCapsuleSource::default().with_update(
1605 300,
1606 0x33,
1607 vec![(page(11), pointer(300, 0xA0, PatchKind::FullImage, None))],
1608 );
1609
1610 let blocked = emergency_linear_scan_lookup(
1611 page(11),
1612 300,
1613 std::slice::from_ref(&marker_blob),
1614 &source,
1615 BoldnessConstraint::strict(),
1616 "index_destroyed",
1617 )
1618 .expect_err("strict boldness must block emergency lookup");
1619 assert!(
1620 blocked
1621 .to_string()
1622 .contains("reason_code=boldness_violation_blocked_linear_scan")
1623 );
1624
1625 let resolved = emergency_linear_scan_lookup(
1626 page(11),
1627 300,
1628 &[marker_blob],
1629 &source,
1630 BoldnessConstraint::emergency(),
1631 "index_destroyed",
1632 )
1633 .expect("emergency lookup")
1634 .expect("pointer found");
1635 assert_eq!(resolved, pointer(300, 0xA0, PatchKind::FullImage, None));
1636 }
1637
1638 #[test]
1639 fn test_critical_integrity_failure_detected_before_repair_attempt() {
1640 let marker = marker_record(400, 0x44, [0_u8; 16]);
1641 let blob = marker_segment_blob(400, std::slice::from_ref(&marker));
1642 let err = preflight_native_index_integrity(std::slice::from_ref(&blob), false, false)
1643 .expect_err("markers without recovery paths are critical");
1644 assert!(
1645 err.to_string()
1646 .contains("reason_code=index_unrebuildable_with_markers")
1647 );
1648
1649 preflight_native_index_integrity(&[blob], true, false).expect("repair path available");
1650 }
1651
1652 #[test]
1653 fn test_bd_1hi_33_unit_compliance_gate() {
1654 assert_eq!(NATIVE_INDEX_REPAIR_BEAD_ID, "bd-1hi.33");
1655 assert_eq!(NATIVE_INDEX_LOGGING_STANDARD, "bd-1fpm");
1656
1657 let marker = marker_record(500, 0x55, [0_u8; 16]);
1658 let marker_blob = marker_segment_blob(500, std::slice::from_ref(&marker));
1659 let err = preflight_native_index_integrity(&[marker_blob], false, false)
1660 .expect_err("critical preflight error");
1661 assert!(
1662 err.to_string()
1663 .contains("reason_code=index_unrebuildable_with_markers")
1664 );
1665 }
1666
1667 #[test]
1668 fn prop_bd_1hi_33_structure_compliance() {
1669 for seed in 1_u8..=12 {
1670 let commit_seq = 10_000_u64 + u64::from(seed);
1671 let marker = marker_record(commit_seq, seed, [0_u8; 16]);
1672 let marker_blob = marker_segment_blob(commit_seq, std::slice::from_ref(&marker));
1673 let source = TestCapsuleSource::default().with_update(
1674 commit_seq,
1675 seed,
1676 vec![(
1677 page(u32::from(seed) + 1),
1678 pointer(commit_seq, seed.wrapping_add(1), PatchKind::FullImage, None),
1679 )],
1680 );
1681
1682 let rebuilt =
1683 rebuild_index_from_marker_stream(std::slice::from_ref(&marker_blob), &source, 1)
1684 .expect("rebuild");
1685 assert_eq!(rebuilt.markers.len(), 1);
1686 assert!(!rebuilt.segments.is_empty());
1687
1688 let refs: Vec<NativeIndexSegmentRef> = rebuilt
1689 .segments
1690 .iter()
1691 .map(|segment| NativeIndexSegmentRef {
1692 start_seq: segment.segment.start_seq,
1693 end_seq: segment.segment.end_seq,
1694 object_id: segment.object_id,
1695 })
1696 .collect();
1697 let remote_store = TestSegmentStore::new(
1698 rebuilt
1699 .segments
1700 .iter()
1701 .map(|segment| (segment.object_id, segment.segment.clone())),
1702 );
1703 let local_store = TestSegmentStore::default();
1704 let repaired = repair_index_segments_from_ecs(
1705 &refs,
1706 &local_store,
1707 &remote_store,
1708 0.05,
1709 BoldnessConstraint::strict(),
1710 )
1711 .expect("repair");
1712 assert_eq!(repaired.segments.len(), refs.len());
1713
1714 let looked_up = emergency_linear_scan_lookup(
1715 page(u32::from(seed) + 1),
1716 commit_seq,
1717 &[marker_blob],
1718 &source,
1719 BoldnessConstraint::emergency(),
1720 "index_destroyed",
1721 )
1722 .expect("emergency lookup");
1723 assert!(looked_up.is_some());
1724 }
1725 }
1726
1727 struct Bd1Hi33Fixture {
1728 p11: PageNumber,
1729 p12: PageNumber,
1730 base: TestBasePages,
1731 patch_store: TestPatchStore,
1732 marker_blob: Vec<u8>,
1733 source: TestCapsuleSource,
1734 }
1735
1736 fn build_bd_1hi_33_fixture() -> Bd1Hi33Fixture {
1737 let p11 = page(11);
1738 let p12 = page(12);
1739 let base = TestBasePages::new([(p11, b"base11".to_vec()), (p12, b"base12".to_vec())]);
1740 let patch_store = TestPatchStore::new([
1741 (oid(0x90), b"v300-p11".to_vec()),
1742 (oid(0x91), b"v301-p11".to_vec()),
1743 (oid(0x92), b"v301-p12".to_vec()),
1744 ]);
1745
1746 let marker_300 = marker_record(300, 0x30, [0_u8; 16]);
1747 let marker_301 = marker_record(301, 0x31, marker_300.marker_id);
1748 let marker_blob = marker_segment_blob(300, &[marker_300, marker_301]);
1749
1750 let source = TestCapsuleSource::default()
1751 .with_update(
1752 300,
1753 0x30,
1754 vec![(p11, pointer(300, 0x90, PatchKind::FullImage, None))],
1755 )
1756 .with_update(
1757 301,
1758 0x31,
1759 vec![
1760 (p11, pointer(301, 0x91, PatchKind::FullImage, None)),
1761 (p12, pointer(301, 0x92, PatchKind::FullImage, None)),
1762 ],
1763 );
1764
1765 Bd1Hi33Fixture {
1766 p11,
1767 p12,
1768 base,
1769 patch_store,
1770 marker_blob,
1771 source,
1772 }
1773 }
1774
1775 fn assert_repaired_index_lookups(
1776 fixture: &Bd1Hi33Fixture,
1777 segments: &[PageVersionIndexSegment],
1778 ) {
1779 let mut cache = NativePageCache::default();
1780 let looked_up_p11 = lookup_page_version(
1781 fixture.p11,
1782 301,
1783 segments,
1784 &mut cache,
1785 &fixture.base,
1786 &fixture.patch_store,
1787 0.05,
1788 )
1789 .expect("indexed lookup p11");
1790 assert_eq!(looked_up_p11.page_bytes, b"v301-p11".to_vec());
1791
1792 let mut cache2 = NativePageCache::default();
1793 let looked_up_p12 = lookup_page_version(
1794 fixture.p12,
1795 301,
1796 segments,
1797 &mut cache2,
1798 &fixture.base,
1799 &fixture.patch_store,
1800 0.05,
1801 )
1802 .expect("indexed lookup p12");
1803 assert_eq!(looked_up_p12.page_bytes, b"v301-p12".to_vec());
1804 }
1805
1806 fn assert_emergency_scan_behavior(fixture: &Bd1Hi33Fixture) {
1807 let blocked = emergency_linear_scan_lookup(
1808 fixture.p11,
1809 301,
1810 std::slice::from_ref(&fixture.marker_blob),
1811 &fixture.source,
1812 BoldnessConstraint::strict(),
1813 "index_destroyed",
1814 )
1815 .expect_err("strict policy blocks emergency scan");
1816 assert!(
1817 blocked
1818 .to_string()
1819 .contains("reason_code=boldness_violation_blocked_linear_scan")
1820 );
1821
1822 let fallback_pointer = emergency_linear_scan_lookup(
1823 fixture.p11,
1824 301,
1825 std::slice::from_ref(&fixture.marker_blob),
1826 &fixture.source,
1827 BoldnessConstraint::emergency(),
1828 "index_destroyed",
1829 )
1830 .expect("emergency linear scan")
1831 .expect("pointer");
1832 let fallback_patch = fixture
1833 .patch_store
1834 .fetch_patch_object(fallback_pointer.patch_object)
1835 .expect("patch bytes");
1836 let fallback_base = fixture.base.load_base_page(fixture.p11).expect("base page");
1837 let fallback_bytes = materialize_patch(
1838 fallback_pointer,
1839 &fallback_patch,
1840 &fallback_base,
1841 &fixture.patch_store,
1842 0,
1843 )
1844 .expect("materialize from emergency pointer");
1845 assert_eq!(fallback_bytes, b"v301-p11".to_vec());
1846 }
1847
1848 #[test]
1849 fn test_e2e_bd_1hi_33_compliance() {
1850 let fixture = build_bd_1hi_33_fixture();
1851
1852 let rebuilt_a = rebuild_index_from_marker_stream(
1853 std::slice::from_ref(&fixture.marker_blob),
1854 &fixture.source,
1855 1,
1856 )
1857 .expect("rebuild");
1858 let rebuilt_b = rebuild_index_from_marker_stream(
1859 std::slice::from_ref(&fixture.marker_blob),
1860 &fixture.source,
1861 1,
1862 )
1863 .expect("rebuild deterministic");
1864 let ids_a: Vec<ObjectId> = rebuilt_a
1865 .segments
1866 .iter()
1867 .map(|segment| segment.object_id)
1868 .collect();
1869 let ids_b: Vec<ObjectId> = rebuilt_b
1870 .segments
1871 .iter()
1872 .map(|segment| segment.object_id)
1873 .collect();
1874 assert_eq!(ids_a, ids_b);
1875
1876 let refs: Vec<NativeIndexSegmentRef> = rebuilt_a
1877 .segments
1878 .iter()
1879 .map(|segment| NativeIndexSegmentRef {
1880 start_seq: segment.segment.start_seq,
1881 end_seq: segment.segment.end_seq,
1882 object_id: segment.object_id,
1883 })
1884 .collect();
1885 assert!(refs.len() >= 2);
1886
1887 let local_store = TestSegmentStore::new([(
1888 rebuilt_a.segments[0].object_id,
1889 rebuilt_a.segments[0].segment.clone(),
1890 )]);
1891 let remote_store = TestSegmentStore::new(
1892 rebuilt_a
1893 .segments
1894 .iter()
1895 .map(|segment| (segment.object_id, segment.segment.clone())),
1896 );
1897 let repaired = repair_index_segments_from_ecs(
1898 &refs,
1899 &local_store,
1900 &remote_store,
1901 0.05,
1902 BoldnessConstraint::strict(),
1903 )
1904 .expect("repair");
1905
1906 assert_repaired_index_lookups(&fixture, &repaired.segments);
1907 assert_emergency_scan_behavior(&fixture);
1908 }
1909}