1use std::collections::BTreeMap;
11use std::fs;
12use std::io::Write;
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicU64, Ordering};
15
16use crate::commit_marker::{
17 CommitMarkerRecord, MARKER_SEGMENT_HEADER_BYTES, MarkerSegmentHeader, recover_valid_prefix,
18 segment_id_for_commit_seq,
19};
20use crate::symbol_log::scan_symbol_segment;
21use fsqlite_error::{FrankenError, Result};
22use fsqlite_types::{EpochId, ObjectId, SymbolRecord};
23use tracing::{debug, error, info, warn};
24
25const MASTER_KEY_DOMAIN: &[u8] = b"fsqlite:symbol-auth-master:v1";
31
32const EPOCH_KEY_DOMAIN: &[u8] = b"fsqlite:symbol-auth:epoch:v1";
36
37const ROOT_POINTER_AUTH_DOMAIN: &[u8] = b"fsqlite:ecs-root-auth:v1";
39
40const ROOT_BOOTSTRAP_BEAD_ID: &str = "bd-1hi.25";
42const ROOT_BOOTSTRAP_LOGGING_STANDARD: &str = "bd-1fpm";
44static ROOT_TMP_SUFFIX_COUNTER: AtomicU64 = AtomicU64::new(0);
46
47#[derive(Debug)]
54pub struct EpochClock {
55 current: AtomicU64,
56}
57
58impl EpochClock {
59 #[must_use]
61 pub fn new(initial: EpochId) -> Self {
62 Self {
63 current: AtomicU64::new(initial.get()),
64 }
65 }
66
67 #[must_use]
69 pub fn current(&self) -> EpochId {
70 EpochId::new(self.current.load(Ordering::Acquire))
71 }
72
73 pub fn increment(&self) -> Result<EpochId> {
82 loop {
83 let old = self.current.load(Ordering::Acquire);
84 let new = old.checked_add(1).ok_or_else(|| {
85 error!(
86 bead_id = "bd-3go.12",
87 old_epoch = old,
88 "epoch counter overflow — cannot increment past u64::MAX"
89 );
90 FrankenError::OutOfRange {
91 what: "ecs_epoch".to_owned(),
92 value: old.to_string(),
93 }
94 })?;
95 if self
96 .current
97 .compare_exchange_weak(old, new, Ordering::AcqRel, Ordering::Acquire)
98 .is_ok()
99 {
100 info!(
101 bead_id = "bd-3go.12",
102 old_epoch = old,
103 new_epoch = new,
104 "epoch incremented"
105 );
106 return Ok(EpochId::new(new));
107 }
108 }
109 }
110
111 pub fn store(&self, epoch: EpochId) {
116 self.current.store(epoch.get(), Ordering::Release);
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct EpochAuthKey([u8; 32]);
125
126impl EpochAuthKey {
127 #[must_use]
129 pub fn as_bytes(&self) -> &[u8; 32] {
130 &self.0
131 }
132}
133
134pub fn derive_master_key_from_dek(dek: &[u8; 32]) -> [u8; 32] {
140 let keyed_hasher = blake3::Hasher::new_keyed(dek);
141 let mut hasher = keyed_hasher;
142 hasher.update(MASTER_KEY_DOMAIN);
143 let hash = hasher.finalize();
144 debug!(
145 bead_id = "bd-3go.12",
146 domain = std::str::from_utf8(MASTER_KEY_DOMAIN).unwrap_or("<invalid>"),
147 "derived master key from DEK with domain separation"
148 );
149 *hash.as_bytes()
150}
151
152#[must_use]
158pub fn derive_epoch_auth_key(master_key: &[u8; 32], epoch: EpochId) -> EpochAuthKey {
159 let mut hasher = blake3::Hasher::new_keyed(master_key);
160 hasher.update(EPOCH_KEY_DOMAIN);
161 hasher.update(&epoch.get().to_le_bytes());
162 let hash = hasher.finalize();
163 debug!(
164 bead_id = "bd-3go.12",
165 epoch = epoch.get(),
166 domain = std::str::from_utf8(EPOCH_KEY_DOMAIN).unwrap_or("<invalid>"),
167 "derived epoch auth key (NOT logging key material)"
168 );
169 EpochAuthKey(*hash.as_bytes())
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum BarrierOutcome {
177 AllArrived {
179 new_epoch: EpochId,
181 },
182 Timeout {
184 arrived: usize,
186 expected: usize,
188 },
189 Cancelled,
191}
192
193#[derive(Debug)]
201pub struct EpochBarrier {
202 current_epoch: EpochId,
204 expected: usize,
206 arrived: AtomicU64,
208 cancelled: std::sync::atomic::AtomicBool,
210}
211
212impl EpochBarrier {
213 #[must_use]
218 pub fn new(current_epoch: EpochId, participants: usize) -> Self {
219 info!(
220 bead_id = "bd-3go.12",
221 epoch = current_epoch.get(),
222 participants,
223 "epoch barrier created"
224 );
225 Self {
226 current_epoch,
227 expected: participants,
228 arrived: AtomicU64::new(0),
229 cancelled: std::sync::atomic::AtomicBool::new(false),
230 }
231 }
232
233 #[must_use]
235 pub fn epoch(&self) -> EpochId {
236 self.current_epoch
237 }
238
239 #[must_use]
241 pub fn arrived_count(&self) -> usize {
242 let val = self.arrived.load(Ordering::Acquire);
243 usize::try_from(val).unwrap_or(usize::MAX)
244 }
245
246 #[must_use]
248 pub fn expected_count(&self) -> usize {
249 self.expected
250 }
251
252 pub fn arrive(&self, participant_name: &str) -> bool {
256 if self.cancelled.load(Ordering::Acquire) {
257 warn!(
258 bead_id = "bd-3go.12",
259 participant = participant_name,
260 "participant arrived at cancelled barrier — ignoring"
261 );
262 return false;
263 }
264 let prev = self.arrived.fetch_add(1, Ordering::AcqRel);
265 let new_count = usize::try_from(prev.saturating_add(1)).unwrap_or(usize::MAX);
266 debug!(
267 bead_id = "bd-3go.12",
268 participant = participant_name,
269 arrived = new_count,
270 expected = self.expected,
271 "barrier participant arrived"
272 );
273 new_count >= self.expected
274 }
275
276 #[must_use]
278 pub fn is_complete(&self) -> bool {
279 self.arrived_count() >= self.expected
280 }
281
282 pub fn cancel(&self) {
284 self.cancelled.store(true, Ordering::Release);
285 warn!(
286 bead_id = "bd-3go.12",
287 epoch = self.current_epoch.get(),
288 arrived = self.arrived_count(),
289 expected = self.expected,
290 "epoch barrier cancelled — epoch will NOT advance"
291 );
292 }
293
294 #[must_use]
296 pub fn is_cancelled(&self) -> bool {
297 self.cancelled.load(Ordering::Acquire)
298 }
299
300 pub fn resolve(&self, clock: &EpochClock) -> Result<BarrierOutcome> {
309 if self.is_cancelled() {
310 return Ok(BarrierOutcome::Cancelled);
311 }
312 if !self.is_complete() {
313 return Ok(BarrierOutcome::Timeout {
314 arrived: self.arrived_count(),
315 expected: self.expected,
316 });
317 }
318 let new_epoch = clock.increment()?;
319 info!(
320 bead_id = "bd-3go.12",
321 old_epoch = self.current_epoch.get(),
322 new_epoch = new_epoch.get(),
323 participants = self.expected,
324 "epoch transition completed — all participants arrived"
325 );
326 Ok(BarrierOutcome::AllArrived { new_epoch })
327 }
328}
329
330pub fn validate_symbol_epoch(
341 symbol_epoch: EpochId,
342 window: &fsqlite_types::SymbolValidityWindow,
343) -> Result<()> {
344 if window.contains(symbol_epoch) {
345 Ok(())
346 } else {
347 error!(
348 bead_id = "bd-3go.12",
349 symbol_epoch = symbol_epoch.get(),
350 window_from = window.from_epoch.get(),
351 window_to = window.to_epoch.get(),
352 "symbol epoch outside validity window — fail-closed rejection"
353 );
354 Err(FrankenError::DatabaseCorrupt {
355 detail: format!(
356 "symbol epoch {} outside validity window [{}, {}]",
357 symbol_epoch.get(),
358 window.from_epoch.get(),
359 window.to_epoch.get(),
360 ),
361 })
362 }
363}
364
365pub const ECS_ROOT_POINTER_MAGIC: [u8; 4] = *b"FSRT";
369pub const ECS_ROOT_POINTER_VERSION: u32 = 1;
371pub const ECS_ROOT_POINTER_BYTES: usize = 56;
373const ECS_ROOT_POINTER_CHECKSUM_INPUT_BYTES: usize = 32;
375const ECS_ROOT_POINTER_AUTH_INPUT_BYTES: usize = 40;
377
378pub const ROOT_MANIFEST_MAGIC: [u8; 8] = *b"FSQLROOT";
380pub const ROOT_MANIFEST_VERSION: u32 = 1;
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
388pub struct EcsRootPointer {
389 pub manifest_object_id: ObjectId,
391 pub ecs_epoch: EpochId,
393 pub root_auth_tag: [u8; 16],
395}
396
397impl EcsRootPointer {
398 #[must_use]
400 pub const fn unauthed(manifest_object_id: ObjectId, ecs_epoch: EpochId) -> Self {
401 Self {
402 manifest_object_id,
403 ecs_epoch,
404 root_auth_tag: [0_u8; 16],
405 }
406 }
407
408 #[must_use]
410 pub fn authed(manifest_object_id: ObjectId, ecs_epoch: EpochId, master_key: &[u8; 32]) -> Self {
411 let mut pointer = Self::unauthed(manifest_object_id, ecs_epoch);
412 let auth_input = pointer.auth_input_bytes();
413 pointer.root_auth_tag = compute_root_pointer_auth_tag(master_key, &auth_input);
414 pointer
415 }
416
417 #[must_use]
419 pub fn encode(&self) -> [u8; ECS_ROOT_POINTER_BYTES] {
420 let mut out = [0_u8; ECS_ROOT_POINTER_BYTES];
421 out[0..4].copy_from_slice(&ECS_ROOT_POINTER_MAGIC);
422 out[4..8].copy_from_slice(&ECS_ROOT_POINTER_VERSION.to_le_bytes());
423 out[8..24].copy_from_slice(self.manifest_object_id.as_bytes());
424 out[24..32].copy_from_slice(&self.ecs_epoch.get().to_le_bytes());
425 let checksum = xxhash_rust::xxh3::xxh3_64(&out[..ECS_ROOT_POINTER_CHECKSUM_INPUT_BYTES]);
426 out[32..40].copy_from_slice(&checksum.to_le_bytes());
427 out[40..56].copy_from_slice(&self.root_auth_tag);
428 out
429 }
430
431 pub fn decode(
437 bytes: &[u8],
438 symbol_auth_enabled: bool,
439 master_key: Option<&[u8; 32]>,
440 ) -> Result<Self> {
441 if bytes.len() != ECS_ROOT_POINTER_BYTES {
442 return Err(FrankenError::DatabaseCorrupt {
443 detail: format!(
444 "ecs/root size mismatch: expected {ECS_ROOT_POINTER_BYTES}, got {}",
445 bytes.len()
446 ),
447 });
448 }
449 if bytes[0..4] != ECS_ROOT_POINTER_MAGIC {
450 return Err(FrankenError::DatabaseCorrupt {
451 detail: format!(
452 "invalid ecs/root magic: {:02X?} (reason=bad_magic)",
453 &bytes[0..4]
454 ),
455 });
456 }
457 let version = read_u32_le_at(bytes, 4, "root.version")?;
458 if version != ECS_ROOT_POINTER_VERSION {
459 return Err(FrankenError::DatabaseCorrupt {
460 detail: format!(
461 "unsupported ecs/root version {version} (expected {ECS_ROOT_POINTER_VERSION})"
462 ),
463 });
464 }
465
466 let mut manifest_id = [0_u8; 16];
467 manifest_id.copy_from_slice(&bytes[8..24]);
468 let manifest_object_id = ObjectId::from_bytes(manifest_id);
469 let ecs_epoch_raw = read_u64_le_at(bytes, 24, "root.ecs_epoch")?;
470 let ecs_epoch = EpochId::new(ecs_epoch_raw);
471
472 let stored_checksum = read_u64_le_at(bytes, 32, "root.checksum")?;
473 let computed_checksum =
474 xxhash_rust::xxh3::xxh3_64(&bytes[..ECS_ROOT_POINTER_CHECKSUM_INPUT_BYTES]);
475 if stored_checksum != computed_checksum {
476 error!(
477 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
478 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
479 reason_code = "checksum_mismatch",
480 stored_checksum = stored_checksum,
481 computed_checksum = computed_checksum,
482 "ecs/root checksum verification failed"
483 );
484 return Err(FrankenError::DatabaseCorrupt {
485 detail: format!(
486 "ecs/root checksum mismatch (reason=checksum_mismatch): stored={stored_checksum:#018X}, computed={computed_checksum:#018X}"
487 ),
488 });
489 }
490
491 let mut root_auth_tag = [0_u8; 16];
492 root_auth_tag.copy_from_slice(&bytes[40..56]);
493
494 if symbol_auth_enabled {
495 let Some(master_key) = master_key else {
496 return Err(FrankenError::DatabaseCorrupt {
497 detail: "symbol_auth enabled but master key is missing (reason=auth_failed)"
498 .to_owned(),
499 });
500 };
501 let expected = compute_root_pointer_auth_tag(
502 master_key,
503 &bytes[..ECS_ROOT_POINTER_AUTH_INPUT_BYTES],
504 );
505 if root_auth_tag != expected {
506 error!(
507 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
508 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
509 reason_code = "auth_failed",
510 "ecs/root auth-tag verification failed"
511 );
512 return Err(FrankenError::DatabaseCorrupt {
513 detail: "ecs/root auth tag verification failed (reason=auth_failed)".to_owned(),
514 });
515 }
516 } else if root_auth_tag != [0_u8; 16] {
517 return Err(FrankenError::DatabaseCorrupt {
518 detail: "ecs/root auth tag must be all-zero when symbol_auth=off".to_owned(),
519 });
520 }
521
522 Ok(Self {
523 manifest_object_id,
524 ecs_epoch,
525 root_auth_tag,
526 })
527 }
528
529 #[must_use]
531 fn auth_input_bytes(&self) -> [u8; ECS_ROOT_POINTER_AUTH_INPUT_BYTES] {
532 let encoded = self.encode();
533 let mut out = [0_u8; ECS_ROOT_POINTER_AUTH_INPUT_BYTES];
534 out.copy_from_slice(&encoded[..ECS_ROOT_POINTER_AUTH_INPUT_BYTES]);
535 out
536 }
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
541pub struct RootManifest {
542 pub database_name: String,
544 pub current_commit: ObjectId,
546 pub commit_seq: u64,
548 pub schema_snapshot: ObjectId,
550 pub schema_epoch: u64,
552 pub ecs_epoch: EpochId,
554 pub checkpoint_base: ObjectId,
556 pub gc_horizon: u64,
558 pub created_at: u64,
560 pub updated_at: u64,
562}
563
564impl RootManifest {
565 pub fn encode(&self) -> Result<Vec<u8>> {
571 let name_bytes = self.database_name.as_bytes();
572 let name_len = u32::try_from(name_bytes.len()).map_err(|_| FrankenError::OutOfRange {
573 what: "root_manifest.database_name_len".to_owned(),
574 value: name_bytes.len().to_string(),
575 })?;
576
577 let mut out = Vec::with_capacity(name_bytes.len().saturating_add(128));
578 out.extend_from_slice(&ROOT_MANIFEST_MAGIC);
579 out.extend_from_slice(&ROOT_MANIFEST_VERSION.to_le_bytes());
580 out.extend_from_slice(&name_len.to_le_bytes());
581 out.extend_from_slice(name_bytes);
582 out.extend_from_slice(self.current_commit.as_bytes());
583 out.extend_from_slice(&self.commit_seq.to_le_bytes());
584 out.extend_from_slice(self.schema_snapshot.as_bytes());
585 out.extend_from_slice(&self.schema_epoch.to_le_bytes());
586 out.extend_from_slice(&self.ecs_epoch.get().to_le_bytes());
587 out.extend_from_slice(self.checkpoint_base.as_bytes());
588 out.extend_from_slice(&self.gc_horizon.to_le_bytes());
589 out.extend_from_slice(&self.created_at.to_le_bytes());
590 out.extend_from_slice(&self.updated_at.to_le_bytes());
591 let checksum = xxhash_rust::xxh3::xxh3_64(&out);
592 out.extend_from_slice(&checksum.to_le_bytes());
593 Ok(out)
594 }
595
596 pub fn decode(bytes: &[u8]) -> Result<Self> {
602 if bytes.len() < 120 {
603 return Err(FrankenError::DatabaseCorrupt {
604 detail: format!(
605 "root manifest too short: expected >= 120 bytes, got {}",
606 bytes.len()
607 ),
608 });
609 }
610 if bytes[0..8] != ROOT_MANIFEST_MAGIC {
611 return Err(FrankenError::DatabaseCorrupt {
612 detail: format!("invalid root manifest magic: {:02X?}", &bytes[0..8]),
613 });
614 }
615 let version = read_u32_le_at(bytes, 8, "root_manifest.version")?;
616 if version != ROOT_MANIFEST_VERSION {
617 return Err(FrankenError::DatabaseCorrupt {
618 detail: format!(
619 "unsupported root manifest version {version} (expected {ROOT_MANIFEST_VERSION})"
620 ),
621 });
622 }
623
624 let name_len_u32 = read_u32_le_at(bytes, 12, "root_manifest.database_name_len")?;
625 let name_len = u32_to_usize(name_len_u32, "root_manifest.database_name_len")?;
626 let mut cursor = 16_usize;
627 let name_end = checked_add(cursor, name_len, "root_manifest.database_name_end")?;
628 if name_end > bytes.len() {
629 return Err(FrankenError::DatabaseCorrupt {
630 detail: format!(
631 "root manifest name out of bounds: end={name_end}, len={}",
632 bytes.len()
633 ),
634 });
635 }
636 let database_name = std::str::from_utf8(&bytes[cursor..name_end])
637 .map_err(|err| FrankenError::DatabaseCorrupt {
638 detail: format!("root manifest database_name is not UTF-8: {err}"),
639 })?
640 .to_owned();
641 cursor = name_end;
642
643 let current_commit = read_object_id_at(bytes, cursor, "root_manifest.current_commit")?;
644 cursor = checked_add(cursor, 16, "root_manifest.cursor.current_commit")?;
645 let commit_seq = read_u64_le_at(bytes, cursor, "root_manifest.commit_seq")?;
646 cursor = checked_add(cursor, 8, "root_manifest.cursor.commit_seq")?;
647 let schema_snapshot = read_object_id_at(bytes, cursor, "root_manifest.schema_snapshot")?;
648 cursor = checked_add(cursor, 16, "root_manifest.cursor.schema_snapshot")?;
649 let schema_epoch = read_u64_le_at(bytes, cursor, "root_manifest.schema_epoch")?;
650 cursor = checked_add(cursor, 8, "root_manifest.cursor.schema_epoch")?;
651 let ecs_epoch_raw = read_u64_le_at(bytes, cursor, "root_manifest.ecs_epoch")?;
652 let ecs_epoch = EpochId::new(ecs_epoch_raw);
653 cursor = checked_add(cursor, 8, "root_manifest.cursor.ecs_epoch")?;
654 let checkpoint_base = read_object_id_at(bytes, cursor, "root_manifest.checkpoint_base")?;
655 cursor = checked_add(cursor, 16, "root_manifest.cursor.checkpoint_base")?;
656 let gc_horizon = read_u64_le_at(bytes, cursor, "root_manifest.gc_horizon")?;
657 cursor = checked_add(cursor, 8, "root_manifest.cursor.gc_horizon")?;
658 let created_at = read_u64_le_at(bytes, cursor, "root_manifest.created_at")?;
659 cursor = checked_add(cursor, 8, "root_manifest.cursor.created_at")?;
660 let updated_at = read_u64_le_at(bytes, cursor, "root_manifest.updated_at")?;
661 cursor = checked_add(cursor, 8, "root_manifest.cursor.updated_at")?;
662
663 let checksum_end = checked_add(cursor, 8, "root_manifest.cursor.checksum_end")?;
664 if checksum_end != bytes.len() {
665 return Err(FrankenError::DatabaseCorrupt {
666 detail: format!(
667 "root manifest trailing bytes present: parsed_end={checksum_end}, actual_len={}",
668 bytes.len()
669 ),
670 });
671 }
672 let stored_checksum = read_u64_le_at(bytes, cursor, "root_manifest.checksum")?;
673 let computed_checksum = xxhash_rust::xxh3::xxh3_64(&bytes[..cursor]);
674 if stored_checksum != computed_checksum {
675 return Err(FrankenError::DatabaseCorrupt {
676 detail: format!(
677 "root manifest checksum mismatch: stored={stored_checksum:#018X}, computed={computed_checksum:#018X}"
678 ),
679 });
680 }
681
682 Ok(Self {
683 database_name,
684 current_commit,
685 commit_seq,
686 schema_snapshot,
687 schema_epoch,
688 ecs_epoch,
689 checkpoint_base,
690 gc_horizon,
691 created_at,
692 updated_at,
693 })
694 }
695}
696
697#[must_use]
699pub fn compute_root_pointer_auth_tag(master_key: &[u8; 32], magic_to_checksum: &[u8]) -> [u8; 16] {
700 let mut hasher = blake3::Hasher::new_keyed(master_key);
701 hasher.update(ROOT_POINTER_AUTH_DOMAIN);
702 hasher.update(magic_to_checksum);
703 let digest = hasher.finalize();
704 let mut out = [0_u8; 16];
705 out.copy_from_slice(&digest.as_bytes()[..16]);
706 out
707}
708
709#[derive(Debug, Clone, PartialEq, Eq)]
711pub struct NativeBootstrapLayout {
712 pub ecs_dir: PathBuf,
714}
715
716impl NativeBootstrapLayout {
717 #[must_use]
719 pub fn new(ecs_dir: impl Into<PathBuf>) -> Self {
720 Self {
721 ecs_dir: ecs_dir.into(),
722 }
723 }
724
725 #[must_use]
727 pub fn root_path(&self) -> PathBuf {
728 self.ecs_dir.join("root")
729 }
730
731 #[must_use]
733 pub fn symbols_dir(&self) -> PathBuf {
734 self.ecs_dir.join("symbols")
735 }
736
737 #[must_use]
739 pub fn markers_dir(&self) -> PathBuf {
740 self.ecs_dir.join("markers")
741 }
742}
743
744#[derive(Debug, Clone, PartialEq, Eq)]
746pub struct NativeBootstrapState {
747 pub root_pointer: EcsRootPointer,
749 pub manifest: RootManifest,
751 pub latest_marker: CommitMarkerRecord,
753 pub schema_snapshot_bytes: Vec<u8>,
755 pub checkpoint_base_bytes: Vec<u8>,
757}
758
759pub fn read_root_pointer(
765 root_path: &Path,
766 symbol_auth_enabled: bool,
767 master_key: Option<&[u8; 32]>,
768) -> Result<EcsRootPointer> {
769 let bytes = fs::read(root_path).map_err(|err| {
770 error!(
771 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
772 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
773 reason_code = "scan_failed",
774 path = %root_path.display(),
775 error = %err,
776 "failed reading ecs/root"
777 );
778 FrankenError::Io(err)
779 })?;
780 EcsRootPointer::decode(&bytes, symbol_auth_enabled, master_key)
781}
782
783pub fn write_root_pointer_atomic(root_path: &Path, pointer: EcsRootPointer) -> Result<()> {
789 let Some(parent) = root_path.parent() else {
790 return Err(FrankenError::DatabaseCorrupt {
791 detail: format!("ecs/root has no parent directory: {}", root_path.display()),
792 });
793 };
794 fs::create_dir_all(parent)?;
795
796 let pid = std::process::id();
797 let suffix = ROOT_TMP_SUFFIX_COUNTER.fetch_add(1, Ordering::SeqCst);
798 let tmp_name = format!(".root.tmp.{pid}.{suffix}");
799 let tmp_path = parent.join(tmp_name);
800
801 let bytes = pointer.encode();
802 let mut temp = fs::OpenOptions::new()
803 .write(true)
804 .create_new(true)
805 .open(&tmp_path)?;
806 temp.write_all(&bytes)?;
807 temp.sync_all()?;
808 fs::rename(&tmp_path, root_path)?;
809 let parent_dir = fs::File::open(parent)?;
810 parent_dir.sync_all()?;
811
812 info!(
813 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
814 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
815 path = %root_path.display(),
816 root_epoch = pointer.ecs_epoch.get(),
817 "wrote ecs/root atomically"
818 );
819
820 Ok(())
821}
822
823pub fn build_root_pointer(
829 manifest_object_id: ObjectId,
830 ecs_epoch: EpochId,
831 symbol_auth_enabled: bool,
832 master_key: Option<&[u8; 32]>,
833) -> Result<EcsRootPointer> {
834 if symbol_auth_enabled {
835 let Some(master_key) = master_key else {
836 return Err(FrankenError::DatabaseCorrupt {
837 detail: "symbol_auth enabled but master key is missing (reason=auth_failed)"
838 .to_owned(),
839 });
840 };
841 Ok(EcsRootPointer::authed(
842 manifest_object_id,
843 ecs_epoch,
844 master_key,
845 ))
846 } else {
847 Ok(EcsRootPointer::unauthed(manifest_object_id, ecs_epoch))
848 }
849}
850
851pub fn bootstrap_native_mode(
862 layout: &NativeBootstrapLayout,
863 symbol_auth_enabled: bool,
864 master_key: Option<&[u8; 32]>,
865) -> Result<NativeBootstrapState> {
866 debug!(
867 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
868 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
869 step = 1_u8,
870 root_path = %layout.root_path().display(),
871 symbol_auth_enabled = symbol_auth_enabled,
872 "bootstrap step 1: reading ecs/root"
873 );
874 let root_path = layout.root_path();
875 let root_pointer = read_root_pointer(&root_path, symbol_auth_enabled, master_key)?;
876 info!(
877 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
878 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
879 step = 1_u8,
880 duration_ms = 0_u64,
881 root_epoch = root_pointer.ecs_epoch.get(),
882 manifest_object_id = %root_pointer.manifest_object_id,
883 "bootstrap steps 1-3 complete"
884 );
885 bootstrap_from_root_pointer(layout, root_pointer)
886}
887
888pub fn bootstrap_native_mode_with_recovery(
894 layout: &NativeBootstrapLayout,
895 symbol_auth_enabled: bool,
896 master_key: Option<&[u8; 32]>,
897) -> Result<NativeBootstrapState> {
898 match bootstrap_native_mode(layout, symbol_auth_enabled, master_key) {
899 Ok(state) => Ok(state),
900 Err(initial_err) => {
901 debug!(
902 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
903 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
904 reason_code = "retry_scan_recovery",
905 error = %initial_err,
906 "bootstrap entering degraded scan-based recovery path"
907 );
908 warn!(
909 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
910 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
911 reason_code = "retry_scan_recovery",
912 error = %initial_err,
913 "bootstrap from ecs/root failed; attempting scan-based recovery"
914 );
915
916 let recovered_pointer =
917 recover_root_pointer_from_scan(layout, symbol_auth_enabled, master_key)?;
918 write_root_pointer_atomic(&layout.root_path(), recovered_pointer)?;
919 bootstrap_from_root_pointer(layout, recovered_pointer)
920 }
921 }
922}
923
924pub fn recover_root_pointer_from_scan(
930 layout: &NativeBootstrapLayout,
931 symbol_auth_enabled: bool,
932 master_key: Option<&[u8; 32]>,
933) -> Result<EcsRootPointer> {
934 let marker_tip = scan_latest_marker(layout.markers_dir().as_path())?;
935 let mut grouped: BTreeMap<ObjectId, Vec<SymbolRecord>> = BTreeMap::new();
936 let symbol_segments = sorted_segment_paths(layout.symbols_dir().as_path())?;
937 debug!(
938 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
939 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
940 segments = symbol_segments.len(),
941 marker_tip_commit_seq = marker_tip.as_ref().map_or(0_u64, |m| m.commit_seq),
942 "scan recovery started"
943 );
944
945 for (_, segment_path) in &symbol_segments {
946 debug!(
947 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
948 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
949 segment = %segment_path.display(),
950 "scan recovery inspecting symbol segment"
951 );
952 let scan = scan_symbol_segment(segment_path)?;
953 for row in scan.records {
954 grouped
955 .entry(row.record.object_id)
956 .or_default()
957 .push(row.record);
958 }
959 }
960
961 let mut best: Option<(ObjectId, RootManifest, bool)> = None;
962 for (object_id, records) in grouped {
963 let Ok(payload) = reconstruct_payload_from_source_symbols(records) else {
964 continue;
965 };
966 let Ok(manifest) = RootManifest::decode(&payload) else {
967 continue;
968 };
969
970 let marker_matches = marker_tip.as_ref().is_some_and(|tip| {
971 manifest.current_commit.as_bytes() == &tip.marker_id
972 && manifest.commit_seq == tip.commit_seq
973 });
974
975 match &best {
976 None => best = Some((object_id, manifest, marker_matches)),
977 Some((_, best_manifest, best_marker_matches)) => {
978 let better_marker_match = marker_matches && !best_marker_matches;
979 let better_commit = manifest.commit_seq > best_manifest.commit_seq;
980 let better_update = manifest.commit_seq == best_manifest.commit_seq
981 && manifest.updated_at > best_manifest.updated_at;
982 if better_marker_match || better_commit || better_update {
983 best = Some((object_id, manifest, marker_matches));
984 }
985 }
986 }
987 }
988
989 let Some((manifest_object_id, manifest, _)) = best else {
990 error!(
991 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
992 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
993 reason_code = "scan_failed",
994 segments_scanned = symbol_segments.len(),
995 "scan recovery could not find a valid RootManifest candidate"
996 );
997 return Err(FrankenError::DatabaseCorrupt {
998 detail: "scan recovery failed: no valid RootManifest candidate (reason=scan_failed)"
999 .to_owned(),
1000 });
1001 };
1002
1003 info!(
1004 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1005 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1006 segments_scanned = symbol_segments.len(),
1007 best_candidate_commit_seq = manifest.commit_seq,
1008 chosen_root_pointer = %manifest_object_id,
1009 "scan recovery selected root manifest candidate"
1010 );
1011
1012 build_root_pointer(
1013 manifest_object_id,
1014 manifest.ecs_epoch,
1015 symbol_auth_enabled,
1016 master_key,
1017 )
1018}
1019
1020#[allow(clippy::too_many_lines)]
1021fn bootstrap_from_root_pointer(
1022 layout: &NativeBootstrapLayout,
1023 root_pointer: EcsRootPointer,
1024) -> Result<NativeBootstrapState> {
1025 info!(
1026 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1027 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1028 step = 4_u8,
1029 root_epoch = root_pointer.ecs_epoch.get(),
1030 manifest_object_id = %root_pointer.manifest_object_id,
1031 "bootstrap step 4: loading root manifest object"
1032 );
1033 let manifest_bytes = fetch_object_payload(
1034 layout.symbols_dir().as_path(),
1035 root_pointer.manifest_object_id,
1036 root_pointer.ecs_epoch,
1037 )?;
1038 let manifest = RootManifest::decode(&manifest_bytes)?;
1039 info!(
1040 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1041 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1042 step = 4_u8,
1043 duration_ms = 0_u64,
1044 root_epoch = root_pointer.ecs_epoch.get(),
1045 object_id = %root_pointer.manifest_object_id,
1046 "bootstrap step 4 complete"
1047 );
1048
1049 if manifest.ecs_epoch != root_pointer.ecs_epoch {
1050 error!(
1051 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1052 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1053 reason_code = "epoch_mismatch",
1054 root_epoch = root_pointer.ecs_epoch.get(),
1055 manifest_epoch = manifest.ecs_epoch.get(),
1056 "bootstrap step 5 failed: root epoch != manifest epoch"
1057 );
1058 return Err(FrankenError::DatabaseCorrupt {
1059 detail: format!(
1060 "root/manifest epoch mismatch (reason=epoch_mismatch): root={}, manifest={}",
1061 root_pointer.ecs_epoch.get(),
1062 manifest.ecs_epoch.get()
1063 ),
1064 });
1065 }
1066 info!(
1067 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1068 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1069 step = 5_u8,
1070 duration_ms = 0_u64,
1071 root_epoch = root_pointer.ecs_epoch.get(),
1072 object_id = %root_pointer.manifest_object_id,
1073 "bootstrap step 5 complete"
1074 );
1075
1076 info!(
1077 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1078 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1079 step = 6_u8,
1080 commit_seq = manifest.commit_seq,
1081 "bootstrap step 6: verifying marker"
1082 );
1083 let latest_marker = fetch_marker_record(layout.markers_dir().as_path(), manifest.commit_seq)?;
1084 if latest_marker.marker_id != *manifest.current_commit.as_bytes() {
1085 error!(
1086 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1087 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1088 reason_code = "marker_mismatch",
1089 manifest_commit_seq = manifest.commit_seq,
1090 "bootstrap marker mismatch"
1091 );
1092 return Err(FrankenError::DatabaseCorrupt {
1093 detail:
1094 "root manifest current_commit does not match marker stream (reason=marker_mismatch)"
1095 .to_owned(),
1096 });
1097 }
1098 info!(
1099 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1100 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1101 step = 6_u8,
1102 duration_ms = 0_u64,
1103 root_epoch = root_pointer.ecs_epoch.get(),
1104 object_id = %manifest.current_commit,
1105 "bootstrap step 6 complete"
1106 );
1107
1108 let schema_snapshot_bytes = fetch_object_payload(
1109 layout.symbols_dir().as_path(),
1110 manifest.schema_snapshot,
1111 root_pointer.ecs_epoch,
1112 )?;
1113 info!(
1114 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1115 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1116 step = 7_u8,
1117 duration_ms = 0_u64,
1118 root_epoch = root_pointer.ecs_epoch.get(),
1119 object_id = %manifest.schema_snapshot,
1120 "bootstrap step 7 complete"
1121 );
1122 let checkpoint_base_bytes = fetch_object_payload(
1123 layout.symbols_dir().as_path(),
1124 manifest.checkpoint_base,
1125 root_pointer.ecs_epoch,
1126 )?;
1127 info!(
1128 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1129 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1130 step = 8_u8,
1131 duration_ms = 0_u64,
1132 root_epoch = root_pointer.ecs_epoch.get(),
1133 object_id = %manifest.checkpoint_base,
1134 "bootstrap step 8 complete"
1135 );
1136
1137 info!(
1138 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1139 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1140 step = 9_u8,
1141 duration_ms = 0_u64,
1142 root_epoch = root_pointer.ecs_epoch.get(),
1143 commit_seq = manifest.commit_seq,
1144 schema_epoch = manifest.schema_epoch,
1145 "bootstrap sequence completed"
1146 );
1147
1148 Ok(NativeBootstrapState {
1149 root_pointer,
1150 manifest,
1151 latest_marker,
1152 schema_snapshot_bytes,
1153 checkpoint_base_bytes,
1154 })
1155}
1156
1157fn fetch_object_payload(
1158 symbols_dir: &Path,
1159 object_id: ObjectId,
1160 root_epoch: EpochId,
1161) -> Result<Vec<u8>> {
1162 let mut records = Vec::new();
1163 let segments = sorted_segment_paths(symbols_dir)?;
1164 debug!(
1165 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1166 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1167 root_epoch = root_epoch.get(),
1168 object_id = %object_id,
1169 segment_count = segments.len(),
1170 "fetching bootstrap object payload from symbol log"
1171 );
1172
1173 for (_, segment_path) in segments {
1174 let scan = scan_symbol_segment(&segment_path)?;
1175 if scan.header.epoch_id > root_epoch.get() {
1176 error!(
1177 bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1178 logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1179 reason_code = "future_epoch",
1180 segment = %segment_path.display(),
1181 segment_epoch = scan.header.epoch_id,
1182 root_epoch = root_epoch.get(),
1183 "bootstrap rejected future-epoch segment"
1184 );
1185 return Err(FrankenError::DatabaseCorrupt {
1186 detail: format!(
1187 "future-epoch segment rejected (reason=future_epoch): segment_epoch={}, root_epoch={}",
1188 scan.header.epoch_id,
1189 root_epoch.get()
1190 ),
1191 });
1192 }
1193 for row in scan.records {
1194 if row.record.object_id == object_id {
1195 records.push(row.record);
1196 }
1197 }
1198 }
1199
1200 if records.is_empty() {
1201 return Err(FrankenError::DatabaseCorrupt {
1202 detail: format!("object {object_id} not found in symbol logs"),
1203 });
1204 }
1205 reconstruct_payload_from_source_symbols(records)
1206}
1207
1208fn reconstruct_payload_from_source_symbols(mut records: Vec<SymbolRecord>) -> Result<Vec<u8>> {
1209 records.sort_by_key(|record| record.esi);
1210 let Some(first) = records.first() else {
1211 return Err(FrankenError::DatabaseCorrupt {
1212 detail: "cannot reconstruct payload from empty symbol set".to_owned(),
1213 });
1214 };
1215 let first_oti = first.oti;
1217 let symbol_size_u64 = u64::from(first_oti.t);
1218 if symbol_size_u64 == 0 {
1219 return Err(FrankenError::DatabaseCorrupt {
1220 detail: "symbol_size=0 in OTI".to_owned(),
1221 });
1222 }
1223
1224 let transfer_len_usize = u64_to_usize(first_oti.f, "oti.f")?;
1225 let source_symbols = first_oti.f.div_ceil(symbol_size_u64);
1226 let source_symbols_usize = u64_to_usize(source_symbols, "source_symbols")?;
1227 let symbol_size_usize = u32_to_usize(first_oti.t, "oti.t")?;
1228 let total_bytes = source_symbols_usize
1229 .checked_mul(symbol_size_usize)
1230 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1231 detail: "reconstruction size overflow".to_owned(),
1232 })?;
1233 let mut out = vec![0_u8; total_bytes];
1234 let mut seen = vec![false; source_symbols_usize];
1235
1236 for record in records {
1237 if u64::from(record.esi) >= source_symbols {
1238 continue;
1239 }
1240 if record.oti != first_oti {
1241 return Err(FrankenError::DatabaseCorrupt {
1242 detail: "inconsistent OTI across object symbols".to_owned(),
1243 });
1244 }
1245 let idx = u32_to_usize(record.esi, "esi")?;
1246 let start =
1247 idx.checked_mul(symbol_size_usize)
1248 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1249 detail: "symbol offset overflow".to_owned(),
1250 })?;
1251 let end = checked_add(start, symbol_size_usize, "symbol_end")?;
1252 if end > out.len() {
1253 return Err(FrankenError::DatabaseCorrupt {
1254 detail: "symbol write out of bounds during reconstruction".to_owned(),
1255 });
1256 }
1257 if record.symbol_data.len() != symbol_size_usize {
1258 return Err(FrankenError::DatabaseCorrupt {
1259 detail: "symbol size does not match OTI.t".to_owned(),
1260 });
1261 }
1262 out[start..end].copy_from_slice(&record.symbol_data);
1263 seen[idx] = true;
1264 }
1265
1266 if !seen.iter().all(|bit| *bit) {
1267 return Err(FrankenError::DatabaseCorrupt {
1268 detail: "insufficient source symbols to reconstruct object payload".to_owned(),
1269 });
1270 }
1271 out.truncate(transfer_len_usize);
1272 Ok(out)
1273}
1274
1275fn fetch_marker_record(markers_dir: &Path, commit_seq: u64) -> Result<CommitMarkerRecord> {
1276 let segment_id = segment_id_for_commit_seq(commit_seq);
1277 let segment_path = markers_dir.join(format!("segment-{segment_id:06}.log"));
1278 let bytes = fs::read(&segment_path)?;
1279 if bytes.len() < MARKER_SEGMENT_HEADER_BYTES {
1280 return Err(FrankenError::DatabaseCorrupt {
1281 detail: format!(
1282 "marker segment {} shorter than header: {} bytes",
1283 segment_path.display(),
1284 bytes.len()
1285 ),
1286 });
1287 }
1288 let header =
1289 MarkerSegmentHeader::decode(&bytes[..MARKER_SEGMENT_HEADER_BYTES]).map_err(|err| {
1290 FrankenError::DatabaseCorrupt {
1291 detail: format!(
1292 "marker header decode failed for {}: {err}",
1293 segment_path.display()
1294 ),
1295 }
1296 })?;
1297 let records = recover_valid_prefix(&bytes).map_err(|err| FrankenError::DatabaseCorrupt {
1298 detail: format!(
1299 "marker segment recover failed for {}: {err}",
1300 segment_path.display()
1301 ),
1302 })?;
1303
1304 if commit_seq < header.start_commit_seq {
1305 return Err(FrankenError::DatabaseCorrupt {
1306 detail: format!(
1307 "commit_seq {commit_seq} precedes segment start {}",
1308 header.start_commit_seq
1309 ),
1310 });
1311 }
1312 let index_u64 = commit_seq - header.start_commit_seq;
1313 let index = u64_to_usize(index_u64, "marker_index")?;
1314 let Some(record) = records.get(index) else {
1315 return Err(FrankenError::DatabaseCorrupt {
1316 detail: format!(
1317 "marker for commit_seq {commit_seq} missing in segment {}",
1318 segment_path.display()
1319 ),
1320 });
1321 };
1322 if !record.verify_marker_id() {
1323 return Err(FrankenError::DatabaseCorrupt {
1324 detail: "marker_id verification failed (reason=marker_mismatch)".to_owned(),
1325 });
1326 }
1327 if index > 0 {
1328 for i in 1..=index {
1329 if records[i].prev_marker_id != records[i - 1].marker_id {
1330 return Err(FrankenError::DatabaseCorrupt {
1331 detail: format!("marker hash chain gap at index {i} (reason=marker_chain_gap)"),
1332 });
1333 }
1334 }
1335 }
1336 Ok(record.clone())
1337}
1338
1339fn scan_latest_marker(markers_dir: &Path) -> Result<Option<CommitMarkerRecord>> {
1340 let segments = sorted_segment_paths(markers_dir)?;
1341 let mut best: Option<CommitMarkerRecord> = None;
1342 for (_, segment_path) in segments {
1343 let bytes = fs::read(&segment_path)?;
1344 if bytes.len() < MARKER_SEGMENT_HEADER_BYTES {
1345 continue;
1346 }
1347 let Ok(records) = recover_valid_prefix(&bytes) else {
1348 continue;
1349 };
1350 if let Some(last) = records.last() {
1351 let replace = best
1352 .as_ref()
1353 .is_none_or(|existing| last.commit_seq > existing.commit_seq);
1354 if replace {
1355 best = Some(last.clone());
1356 }
1357 }
1358 }
1359 Ok(best)
1360}
1361
1362fn sorted_segment_paths(dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1363 if !dir.exists() {
1364 return Ok(Vec::new());
1365 }
1366 let mut out = Vec::new();
1367 for entry in fs::read_dir(dir)? {
1368 let entry = entry?;
1369 if !entry.file_type()?.is_file() {
1370 continue;
1371 }
1372 let name_os = entry.file_name();
1373 let Some(name) = name_os.to_str() else {
1374 continue;
1375 };
1376 let Some(segment_id) = parse_segment_id(name) else {
1377 continue;
1378 };
1379 out.push((segment_id, entry.path()));
1380 }
1381 out.sort_by_key(|(segment_id, _)| *segment_id);
1382 Ok(out)
1383}
1384
1385fn parse_segment_id(name: &str) -> Option<u64> {
1386 let body = name.strip_prefix("segment-")?.strip_suffix(".log")?;
1387 body.parse::<u64>().ok()
1388}
1389
1390fn read_object_id_at(bytes: &[u8], offset: usize, field: &str) -> Result<ObjectId> {
1391 let end = checked_add(offset, 16, field)?;
1392 if end > bytes.len() {
1393 return Err(FrankenError::DatabaseCorrupt {
1394 detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
1395 });
1396 }
1397 let mut raw = [0_u8; 16];
1398 raw.copy_from_slice(&bytes[offset..end]);
1399 Ok(ObjectId::from_bytes(raw))
1400}
1401
1402fn read_u32_le_at(bytes: &[u8], offset: usize, field: &str) -> Result<u32> {
1403 let end = checked_add(offset, 4, field)?;
1404 if end > bytes.len() {
1405 return Err(FrankenError::DatabaseCorrupt {
1406 detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
1407 });
1408 }
1409 Ok(u32::from_le_bytes(
1410 bytes[offset..end].try_into().expect("fixed 4-byte field"),
1411 ))
1412}
1413
1414fn read_u64_le_at(bytes: &[u8], offset: usize, field: &str) -> Result<u64> {
1415 let end = checked_add(offset, 8, field)?;
1416 if end > bytes.len() {
1417 return Err(FrankenError::DatabaseCorrupt {
1418 detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
1419 });
1420 }
1421 Ok(u64::from_le_bytes(
1422 bytes[offset..end].try_into().expect("fixed 8-byte field"),
1423 ))
1424}
1425
1426fn u32_to_usize(value: u32, field: &str) -> Result<usize> {
1427 usize::try_from(value).map_err(|_| FrankenError::OutOfRange {
1428 what: field.to_owned(),
1429 value: value.to_string(),
1430 })
1431}
1432
1433fn u64_to_usize(value: u64, field: &str) -> Result<usize> {
1434 usize::try_from(value).map_err(|_| FrankenError::OutOfRange {
1435 what: field.to_owned(),
1436 value: value.to_string(),
1437 })
1438}
1439
1440fn checked_add(lhs: usize, rhs: usize, field: &str) -> Result<usize> {
1441 lhs.checked_add(rhs)
1442 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1443 detail: format!("{field} overflow"),
1444 })
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449 use std::fs;
1450 use std::path::Path;
1451
1452 use crate::commit_marker::MarkerSegmentHeader;
1453 use crate::symbol_log::{SymbolSegmentHeader, append_symbol_record, ensure_symbol_segment};
1454 use fsqlite_types::{ObjectId, Oti, SymbolRecord, SymbolRecordFlags, SymbolValidityWindow};
1455 use tempfile::TempDir;
1456
1457 use super::*;
1458
1459 const BEAD_ID: &str = "bd-3go.12";
1460
1461 #[test]
1464 fn test_epoch_id_monotone() {
1465 let clock = EpochClock::new(EpochId::ZERO);
1466 let mut prev = clock.current();
1467 for i in 0..100 {
1468 let next_result = clock.increment();
1469 assert!(
1470 next_result.is_ok(),
1471 "bead_id={BEAD_ID} case=epoch_monotone_increment_{i} err={next_result:?}"
1472 );
1473 let Ok(next) = next_result else {
1474 return;
1475 };
1476 assert!(
1477 next > prev,
1478 "bead_id={BEAD_ID} case=epoch_monotone prev={} next={}",
1479 prev.get(),
1480 next.get()
1481 );
1482 prev = next;
1483 }
1484 assert_eq!(
1485 clock.current().get(),
1486 100,
1487 "bead_id={BEAD_ID} case=epoch_monotone_final"
1488 );
1489 }
1490
1491 #[test]
1494 fn test_symbol_validity_window_rejects_future() {
1495 let current = EpochId::new(5);
1496 let window = SymbolValidityWindow::default_window(current);
1497 let future = EpochId::new(6);
1498 assert!(
1499 !window.contains(future),
1500 "bead_id={BEAD_ID} case=validity_window_rejects_future"
1501 );
1502 let result = validate_symbol_epoch(future, &window);
1503 assert!(
1504 result.is_err(),
1505 "bead_id={BEAD_ID} case=validity_window_future_epoch_error"
1506 );
1507 }
1508
1509 #[test]
1512 fn test_symbol_validity_window_accepts_past() {
1513 let current = EpochId::new(10);
1514 let window = SymbolValidityWindow::default_window(current);
1515 for past in [0, 1, 5, 9, 10] {
1516 let epoch = EpochId::new(past);
1517 assert!(
1518 window.contains(epoch),
1519 "bead_id={BEAD_ID} case=validity_window_accepts_past epoch={past}"
1520 );
1521 let result = validate_symbol_epoch(epoch, &window);
1522 assert!(
1523 result.is_ok(),
1524 "bead_id={BEAD_ID} case=validity_window_past_epoch_ok epoch={past}"
1525 );
1526 }
1527 }
1528
1529 #[test]
1532 fn test_epoch_scoped_key_derivation() {
1533 let master_key = [0xAB_u8; 32];
1534 let key_5 = derive_epoch_auth_key(&master_key, EpochId::new(5));
1535 let key_6 = derive_epoch_auth_key(&master_key, EpochId::new(6));
1536 assert_ne!(
1537 key_5, key_6,
1538 "bead_id={BEAD_ID} case=epoch_keys_differ_across_epochs"
1539 );
1540 let key_5_again = derive_epoch_auth_key(&master_key, EpochId::new(5));
1542 assert_eq!(
1543 key_5, key_5_again,
1544 "bead_id={BEAD_ID} case=epoch_key_deterministic"
1545 );
1546 }
1547
1548 #[test]
1551 fn test_epoch_key_derivation_domain_separation() {
1552 let dek = [0x42_u8; 32];
1553 let master_key = derive_master_key_from_dek(&dek);
1554 assert_ne!(
1556 master_key, dek,
1557 "bead_id={BEAD_ID} case=master_key_differs_from_dek"
1558 );
1559 let auth_key = derive_epoch_auth_key(&master_key, EpochId::ZERO);
1561 assert_ne!(
1562 auth_key.as_bytes(),
1563 &master_key,
1564 "bead_id={BEAD_ID} case=auth_key_differs_from_master"
1565 );
1566 }
1567
1568 #[test]
1571 fn test_epoch_transition_barrier_all_arrive() {
1572 let clock = EpochClock::new(EpochId::new(5));
1573 let barrier = EpochBarrier::new(EpochId::new(5), 4);
1574
1575 assert!(!barrier.arrive("WriteCoordinator"));
1576 assert!(!barrier.arrive("SymbolStore"));
1577 assert!(!barrier.arrive("Replicator"));
1578 assert!(barrier.arrive("CheckpointGc"));
1579
1580 assert!(
1581 barrier.is_complete(),
1582 "bead_id={BEAD_ID} case=barrier_complete"
1583 );
1584
1585 let outcome = barrier.resolve(&clock).expect("resolve must succeed");
1586 assert_eq!(
1587 outcome,
1588 BarrierOutcome::AllArrived {
1589 new_epoch: EpochId::new(6),
1590 },
1591 "bead_id={BEAD_ID} case=barrier_all_arrived_epoch_incremented"
1592 );
1593 assert_eq!(
1594 clock.current().get(),
1595 6,
1596 "bead_id={BEAD_ID} case=clock_advanced_after_barrier"
1597 );
1598 }
1599
1600 #[test]
1603 fn test_epoch_transition_barrier_timeout() {
1604 let clock = EpochClock::new(EpochId::new(5));
1605 let barrier = EpochBarrier::new(EpochId::new(5), 4);
1606
1607 barrier.arrive("WriteCoordinator");
1608 barrier.arrive("SymbolStore");
1609 barrier.arrive("Replicator");
1610 let outcome = barrier.resolve(&clock).expect("resolve must succeed");
1613 assert_eq!(
1614 outcome,
1615 BarrierOutcome::Timeout {
1616 arrived: 3,
1617 expected: 4,
1618 },
1619 "bead_id={BEAD_ID} case=barrier_timeout_epoch_unchanged"
1620 );
1621 assert_eq!(
1622 clock.current().get(),
1623 5,
1624 "bead_id={BEAD_ID} case=clock_unchanged_after_timeout"
1625 );
1626 }
1627
1628 #[test]
1631 fn test_epoch_bootstrap_from_ecs_root() {
1632 let root_epoch = EpochId::new(7);
1636 let window = SymbolValidityWindow::default_window(root_epoch);
1637
1638 assert!(
1640 !window.contains(EpochId::new(8)),
1641 "bead_id={BEAD_ID} case=bootstrap_rejects_future"
1642 );
1643 assert!(
1645 window.contains(EpochId::new(7)),
1646 "bead_id={BEAD_ID} case=bootstrap_accepts_current"
1647 );
1648 assert!(
1650 window.contains(EpochId::ZERO),
1651 "bead_id={BEAD_ID} case=bootstrap_accepts_zero"
1652 );
1653 }
1654
1655 #[test]
1658 fn test_barrier_cancelled() {
1659 let clock = EpochClock::new(EpochId::new(3));
1660 let barrier = EpochBarrier::new(EpochId::new(3), 2);
1661
1662 barrier.arrive("WriteCoordinator");
1663 barrier.cancel();
1664
1665 let outcome = barrier.resolve(&clock).expect("resolve must succeed");
1666 assert_eq!(
1667 outcome,
1668 BarrierOutcome::Cancelled,
1669 "bead_id={BEAD_ID} case=barrier_cancelled_epoch_unchanged"
1670 );
1671 assert_eq!(
1672 clock.current().get(),
1673 3,
1674 "bead_id={BEAD_ID} case=clock_unchanged_after_cancel"
1675 );
1676 }
1677
1678 #[test]
1681 fn test_epoch_clock_store_and_recover() {
1682 let clock = EpochClock::new(EpochId::ZERO);
1683 clock.store(EpochId::new(42));
1684 assert_eq!(
1685 clock.current().get(),
1686 42,
1687 "bead_id={BEAD_ID} case=clock_store_recovery"
1688 );
1689 let next = clock.increment().expect("increment after store");
1690 assert_eq!(
1691 next.get(),
1692 43,
1693 "bead_id={BEAD_ID} case=clock_increment_after_store"
1694 );
1695 }
1696
1697 #[test]
1700 fn test_validity_window_boundary() {
1701 let window = SymbolValidityWindow::new(EpochId::new(3), EpochId::new(7));
1702 assert!(
1703 !window.contains(EpochId::new(2)),
1704 "bead_id={BEAD_ID} case=window_below_lower_bound"
1705 );
1706 assert!(
1707 window.contains(EpochId::new(3)),
1708 "bead_id={BEAD_ID} case=window_at_lower_bound"
1709 );
1710 assert!(
1711 window.contains(EpochId::new(5)),
1712 "bead_id={BEAD_ID} case=window_within_bounds"
1713 );
1714 assert!(
1715 window.contains(EpochId::new(7)),
1716 "bead_id={BEAD_ID} case=window_at_upper_bound"
1717 );
1718 assert!(
1719 !window.contains(EpochId::new(8)),
1720 "bead_id={BEAD_ID} case=window_above_upper_bound"
1721 );
1722 }
1723
1724 const ROOT_BEAD_ID: &str = "bd-1hi.25";
1725
1726 fn make_object_id(seed: u8) -> ObjectId {
1727 ObjectId::from_bytes([seed; 16])
1728 }
1729
1730 fn test_master_key() -> [u8; 32] {
1731 [0xA5; 32]
1732 }
1733
1734 fn create_layout() -> (TempDir, NativeBootstrapLayout) {
1735 let temp_dir = TempDir::new().expect("tempdir");
1736 let layout = NativeBootstrapLayout::new(temp_dir.path().join("ecs"));
1737 std::fs::create_dir_all(layout.symbols_dir()).expect("create symbols dir");
1738 std::fs::create_dir_all(layout.markers_dir()).expect("create markers dir");
1739 (temp_dir, layout)
1740 }
1741
1742 fn write_single_symbol_object(
1743 symbols_dir: &Path,
1744 segment_id: u64,
1745 epoch_id: EpochId,
1746 object_id: ObjectId,
1747 payload: &[u8],
1748 ) {
1749 let header =
1750 SymbolSegmentHeader::new(segment_id, epoch_id.get(), 1_700_000_000 + segment_id);
1751 let segment_path = symbols_dir.join(format!("segment-{segment_id:06}.log"));
1752 ensure_symbol_segment(&segment_path, header).expect("ensure symbol segment");
1753 let symbol_size = u32::try_from(payload.len()).expect("payload fits u32");
1754 let oti = Oti {
1755 f: u64::from(symbol_size),
1756 al: 1,
1757 t: symbol_size,
1758 z: 1,
1759 n: 1,
1760 };
1761 let record = SymbolRecord::new(
1762 object_id,
1763 oti,
1764 0,
1765 payload.to_vec(),
1766 SymbolRecordFlags::SYSTEMATIC_RUN_START,
1767 );
1768 append_symbol_record(symbols_dir, header, &record).expect("append symbol");
1769 }
1770
1771 fn write_marker_segment(
1772 markers_dir: &Path,
1773 start_commit_seq: u64,
1774 records: &[CommitMarkerRecord],
1775 ) {
1776 let segment_id = segment_id_for_commit_seq(start_commit_seq);
1777 let header = MarkerSegmentHeader::new(segment_id, start_commit_seq);
1778 let mut bytes = Vec::from(header.encode());
1779 for record in records {
1780 bytes.extend_from_slice(&record.encode());
1781 }
1782 let segment_path = markers_dir.join(format!("segment-{segment_id:06}.log"));
1783 std::fs::write(segment_path, bytes).expect("write marker segment");
1784 }
1785
1786 fn make_marker(commit_seq: u64, prev: [u8; 16], salt: u8) -> CommitMarkerRecord {
1787 CommitMarkerRecord::new(
1788 commit_seq,
1789 1_800_000_000_000_000_000 + commit_seq,
1790 [salt; 16],
1791 [salt.wrapping_add(1); 16],
1792 prev,
1793 )
1794 }
1795
1796 fn make_manifest(
1797 database_name: &str,
1798 current_commit: ObjectId,
1799 commit_seq: u64,
1800 schema_snapshot: ObjectId,
1801 schema_epoch: u64,
1802 ecs_epoch: EpochId,
1803 checkpoint_base: ObjectId,
1804 ) -> RootManifest {
1805 RootManifest {
1806 database_name: database_name.to_owned(),
1807 current_commit,
1808 commit_seq,
1809 schema_snapshot,
1810 schema_epoch,
1811 ecs_epoch,
1812 checkpoint_base,
1813 gc_horizon: commit_seq,
1814 created_at: 1_800_000_000,
1815 updated_at: 1_800_000_123,
1816 }
1817 }
1818
1819 fn must_err_contains<T: std::fmt::Debug>(result: Result<T>, needle: &str, case: &str) {
1820 let err = result.expect_err(case);
1821 let detail = err.to_string();
1822 assert!(
1823 detail.contains(needle),
1824 "bead_id={ROOT_BEAD_ID} case={case} expected_substring={needle} actual={detail}"
1825 );
1826 }
1827
1828 fn write_bootstrap_objects(
1829 layout: &NativeBootstrapLayout,
1830 root_epoch: EpochId,
1831 manifest_id: ObjectId,
1832 manifest: &RootManifest,
1833 schema_payload: &[u8],
1834 checkpoint_payload: &[u8],
1835 markers: &[CommitMarkerRecord],
1836 ) {
1837 let manifest_bytes = manifest.encode().expect("encode manifest");
1838 write_single_symbol_object(
1839 layout.symbols_dir().as_path(),
1840 1,
1841 root_epoch,
1842 manifest_id,
1843 &manifest_bytes,
1844 );
1845 write_single_symbol_object(
1846 layout.symbols_dir().as_path(),
1847 1,
1848 root_epoch,
1849 manifest.schema_snapshot,
1850 schema_payload,
1851 );
1852 write_single_symbol_object(
1853 layout.symbols_dir().as_path(),
1854 1,
1855 root_epoch,
1856 manifest.checkpoint_base,
1857 checkpoint_payload,
1858 );
1859 if let Some(first) = markers.first() {
1860 write_marker_segment(layout.markers_dir().as_path(), first.commit_seq, markers);
1861 }
1862 }
1863
1864 fn write_valid_bootstrap_fixture(
1865 layout: &NativeBootstrapLayout,
1866 root_epoch: EpochId,
1867 ) -> (ObjectId, RootManifest, CommitMarkerRecord, Vec<u8>, Vec<u8>) {
1868 let manifest_id = make_object_id(0x70);
1869 let schema_payload = b"schema-cache-v1".to_vec();
1870 let checkpoint_payload = b"checkpoint-cache-v1".to_vec();
1871 let marker = make_marker(0, [0_u8; 16], 0x71);
1872 let manifest = make_manifest(
1873 "db-valid",
1874 ObjectId::from_bytes(marker.marker_id),
1875 marker.commit_seq,
1876 make_object_id(0x72),
1877 1,
1878 root_epoch,
1879 make_object_id(0x73),
1880 );
1881 write_bootstrap_objects(
1882 layout,
1883 root_epoch,
1884 manifest_id,
1885 &manifest,
1886 &schema_payload,
1887 &checkpoint_payload,
1888 std::slice::from_ref(&marker),
1889 );
1890 (
1891 manifest_id,
1892 manifest,
1893 marker,
1894 schema_payload,
1895 checkpoint_payload,
1896 )
1897 }
1898
1899 #[test]
1900 fn test_ecs_root_pointer_encode_decode() {
1901 let pointer = EcsRootPointer::unauthed(make_object_id(0x11), EpochId::new(7));
1902 let encoded = pointer.encode();
1903 assert_eq!(encoded.len(), ECS_ROOT_POINTER_BYTES);
1904 let decoded = EcsRootPointer::decode(&encoded, false, None).expect("decode root pointer");
1905 assert_eq!(
1906 decoded, pointer,
1907 "bead_id={ROOT_BEAD_ID} case=root_roundtrip"
1908 );
1909 }
1910
1911 #[test]
1912 fn test_ecs_root_pointer_magic() {
1913 let pointer = EcsRootPointer::unauthed(make_object_id(0x22), EpochId::new(3));
1914 let mut encoded = pointer.encode();
1915 encoded[0] = b'X';
1916 let result = EcsRootPointer::decode(&encoded, false, None);
1917 assert!(
1918 result.is_err(),
1919 "bead_id={ROOT_BEAD_ID} case=root_bad_magic"
1920 );
1921 }
1922
1923 #[test]
1924 fn test_ecs_root_pointer_checksum_tamper() {
1925 let pointer = EcsRootPointer::unauthed(make_object_id(0x33), EpochId::new(9));
1926 let mut encoded = pointer.encode();
1927 encoded[9] ^= 0xFF;
1928 let result = EcsRootPointer::decode(&encoded, false, None);
1929 assert!(
1930 result.is_err(),
1931 "bead_id={ROOT_BEAD_ID} case=root_checksum_tamper"
1932 );
1933 }
1934
1935 #[test]
1936 fn test_root_auth_tag_verification() {
1937 let key = test_master_key();
1938 let pointer = EcsRootPointer::authed(make_object_id(0x44), EpochId::new(12), &key);
1939 let encoded = pointer.encode();
1940 let decoded =
1941 EcsRootPointer::decode(&encoded, true, Some(&key)).expect("auth decode succeeds");
1942 assert_eq!(decoded, pointer);
1943
1944 let mut tampered = encoded;
1945 tampered[40] ^= 0x01;
1946 let result = EcsRootPointer::decode(&tampered, true, Some(&key));
1947 assert!(
1948 result.is_err(),
1949 "bead_id={ROOT_BEAD_ID} case=root_auth_tamper"
1950 );
1951 }
1952
1953 #[test]
1954 fn test_root_auth_tag_zero_when_off() {
1955 let pointer = build_root_pointer(make_object_id(0x55), EpochId::new(2), false, None)
1956 .expect("build unauthed root");
1957 assert_eq!(pointer.root_auth_tag, [0_u8; 16]);
1958 let decoded =
1959 EcsRootPointer::decode(&pointer.encode(), false, None).expect("decode off mode");
1960 assert_eq!(decoded.root_auth_tag, [0_u8; 16]);
1961 }
1962
1963 #[test]
1964 fn test_root_manifest_encode_decode() {
1965 let manifest = make_manifest(
1966 "db-main",
1967 make_object_id(0x10),
1968 5,
1969 make_object_id(0x20),
1970 3,
1971 EpochId::new(7),
1972 make_object_id(0x30),
1973 );
1974 let encoded = manifest.encode().expect("encode manifest");
1975 let decoded = RootManifest::decode(&encoded).expect("decode manifest");
1976 assert_eq!(decoded, manifest);
1977 }
1978
1979 #[test]
1980 fn test_root_manifest_magic() {
1981 let manifest = make_manifest(
1982 "db-main",
1983 make_object_id(0x10),
1984 5,
1985 make_object_id(0x20),
1986 3,
1987 EpochId::new(7),
1988 make_object_id(0x30),
1989 );
1990 let mut encoded = manifest.encode().expect("encode manifest");
1991 encoded[0] = b'X';
1992 assert!(
1993 RootManifest::decode(&encoded).is_err(),
1994 "bead_id={ROOT_BEAD_ID} case=manifest_bad_magic"
1995 );
1996 }
1997
1998 #[test]
1999 fn test_bootstrap_step_4_epoch_guard() {
2000 let (_tmp, layout) = create_layout();
2001 let manifest_id = make_object_id(0x66);
2002 let schema_id = make_object_id(0x67);
2003 let checkpoint_id = make_object_id(0x68);
2004 let marker = make_marker(0, [0_u8; 16], 0x60);
2005 let manifest = make_manifest(
2006 "future-segment",
2007 ObjectId::from_bytes(marker.marker_id),
2008 marker.commit_seq,
2009 schema_id,
2010 1,
2011 EpochId::new(3),
2012 checkpoint_id,
2013 );
2014 let manifest_bytes = manifest.encode().expect("encode manifest");
2015 write_single_symbol_object(
2016 layout.symbols_dir().as_path(),
2017 1,
2018 EpochId::new(4),
2019 manifest_id,
2020 &manifest_bytes,
2021 );
2022 let pointer =
2023 build_root_pointer(manifest_id, EpochId::new(3), false, None).expect("build root");
2024 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2025
2026 let result = bootstrap_native_mode(&layout, false, None);
2027 assert!(
2028 result.is_err(),
2029 "bead_id={ROOT_BEAD_ID} case=future_epoch_guard"
2030 );
2031 }
2032
2033 #[test]
2034 fn test_bootstrap_step_5_epoch_invariant() {
2035 let (_tmp, layout) = create_layout();
2036 let root_epoch = EpochId::new(7);
2037 let manifest_id = make_object_id(0x74);
2038 let marker = make_marker(0, [0_u8; 16], 0x75);
2039 let manifest = make_manifest(
2040 "epoch-mismatch",
2041 ObjectId::from_bytes(marker.marker_id),
2042 marker.commit_seq,
2043 make_object_id(0x76),
2044 1,
2045 EpochId::new(8),
2046 make_object_id(0x77),
2047 );
2048 write_bootstrap_objects(
2049 &layout,
2050 root_epoch,
2051 manifest_id,
2052 &manifest,
2053 b"schema",
2054 b"checkpoint",
2055 &[marker],
2056 );
2057 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2058 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2059 must_err_contains(
2060 bootstrap_native_mode(&layout, false, None),
2061 "epoch_mismatch",
2062 "bootstrap_step_5_epoch_invariant",
2063 );
2064 }
2065
2066 #[test]
2067 fn test_bootstrap_step_6_marker_verification() {
2068 let (_tmp, layout) = create_layout();
2069 let root_epoch = EpochId::new(9);
2070 let manifest_id = make_object_id(0x78);
2071 let marker = make_marker(0, [0_u8; 16], 0x79);
2072 let manifest = make_manifest(
2073 "marker-mismatch",
2074 make_object_id(0x7A),
2075 marker.commit_seq,
2076 make_object_id(0x7B),
2077 1,
2078 root_epoch,
2079 make_object_id(0x7C),
2080 );
2081 write_bootstrap_objects(
2082 &layout,
2083 root_epoch,
2084 manifest_id,
2085 &manifest,
2086 b"schema",
2087 b"checkpoint",
2088 &[marker],
2089 );
2090 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2091 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2092 must_err_contains(
2093 bootstrap_native_mode(&layout, false, None),
2094 "marker_mismatch",
2095 "bootstrap_step_6_marker_verification",
2096 );
2097 }
2098
2099 #[test]
2100 fn test_bootstrap_full_sequence() {
2101 let (_tmp, layout) = create_layout();
2102 let root_epoch = EpochId::new(11);
2103 let (manifest_id, manifest, marker, schema_payload, checkpoint_payload) =
2104 write_valid_bootstrap_fixture(&layout, root_epoch);
2105 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2106 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2107
2108 let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap ok");
2109 assert_eq!(state.root_pointer, pointer);
2110 assert_eq!(state.manifest, manifest);
2111 assert_eq!(state.latest_marker, marker);
2112 assert_eq!(state.schema_snapshot_bytes, schema_payload);
2113 assert_eq!(state.checkpoint_base_bytes, checkpoint_payload);
2114 }
2115
2116 #[test]
2117 fn test_bootstrap_corrupted_root_recovery() {
2118 let (_tmp, layout) = create_layout();
2119 let root_epoch = EpochId::new(12);
2120 let (_manifest_id, manifest, marker, schema_payload, checkpoint_payload) =
2121 write_valid_bootstrap_fixture(&layout, root_epoch);
2122 fs::write(layout.root_path(), [0xFF_u8; 7]).expect("write corrupt root");
2123
2124 let recovered = bootstrap_native_mode_with_recovery(&layout, false, None).expect("recover");
2125 assert_eq!(recovered.manifest, manifest);
2126 assert_eq!(recovered.latest_marker, marker);
2127 assert_eq!(recovered.schema_snapshot_bytes, schema_payload);
2128 assert_eq!(recovered.checkpoint_base_bytes, checkpoint_payload);
2129 let persisted =
2130 read_root_pointer(&layout.root_path(), false, None).expect("read recovered");
2131 assert_eq!(
2132 persisted.manifest_object_id,
2133 recovered.root_pointer.manifest_object_id
2134 );
2135 assert_eq!(persisted.ecs_epoch, recovered.root_pointer.ecs_epoch);
2136 }
2137
2138 #[test]
2139 fn test_crash_safe_root_update() {
2140 let (_tmp, layout) = create_layout();
2141 let pointer_a = EcsRootPointer::unauthed(make_object_id(0x80), EpochId::new(1));
2142 let pointer_b = EcsRootPointer::unauthed(make_object_id(0x81), EpochId::new(2));
2143 write_root_pointer_atomic(&layout.root_path(), pointer_a).expect("write A");
2144 write_root_pointer_atomic(&layout.root_path(), pointer_b).expect("write B");
2145 let decoded = read_root_pointer(&layout.root_path(), false, None).expect("decode");
2146 assert_eq!(
2147 decoded, pointer_b,
2148 "bead_id={ROOT_BEAD_ID} case=root_atomic_swap"
2149 );
2150 let entries = fs::read_dir(layout.ecs_dir.as_path()).expect("list ecs dir");
2151 for entry in entries {
2152 let entry = entry.expect("entry");
2153 let name = entry.file_name();
2154 let name = name.to_string_lossy();
2155 assert!(
2156 !name.starts_with(".root.tmp."),
2157 "bead_id={ROOT_BEAD_ID} case=temp_root_file_leaked file={name}"
2158 );
2159 }
2160 }
2161
2162 #[test]
2163 fn prop_root_pointer_roundtrip() {
2164 let key = test_master_key();
2165 for seed in [0_u8, 1, 17, 99, 255] {
2166 for epoch in [0_u64, 1, 2, 17, 255, 4_096, 1 << 20] {
2167 let id = make_object_id(seed);
2168 let plain = EcsRootPointer::unauthed(id, EpochId::new(epoch));
2169 let plain_roundtrip =
2170 EcsRootPointer::decode(&plain.encode(), false, None).expect("plain decode");
2171 assert_eq!(plain_roundtrip, plain);
2172 let authed = EcsRootPointer::authed(id, EpochId::new(epoch), &key);
2173 let authed_roundtrip = EcsRootPointer::decode(&authed.encode(), true, Some(&key))
2174 .expect("auth decode");
2175 assert_eq!(authed_roundtrip, authed);
2176 }
2177 }
2178 }
2179
2180 #[test]
2181 fn test_bootstrap_rejects_marker_chain_gap() {
2182 let (_tmp, layout) = create_layout();
2183 let root_epoch = EpochId::new(13);
2184 let manifest_id = make_object_id(0x82);
2185 let m0 = make_marker(0, [0_u8; 16], 0x83);
2186 let m1 = make_marker(1, m0.marker_id, 0x84);
2187 let m2 = make_marker(2, [0xEE_u8; 16], 0x85);
2188 let manifest = make_manifest(
2189 "marker-gap",
2190 ObjectId::from_bytes(m2.marker_id),
2191 m2.commit_seq,
2192 make_object_id(0x86),
2193 1,
2194 root_epoch,
2195 make_object_id(0x87),
2196 );
2197 write_bootstrap_objects(
2198 &layout,
2199 root_epoch,
2200 manifest_id,
2201 &manifest,
2202 b"schema-gap",
2203 b"checkpoint-gap",
2204 &[m0, m1, m2],
2205 );
2206 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2207 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2208 must_err_contains(
2209 bootstrap_native_mode(&layout, false, None),
2210 "marker_chain_gap",
2211 "bootstrap_rejects_marker_chain_gap",
2212 );
2213 }
2214
2215 #[test]
2216 fn test_bootstrap_schema_snapshot_loads() {
2217 let (_tmp, layout) = create_layout();
2218 let root_epoch = EpochId::new(14);
2219 let (manifest_id, _manifest, _marker, schema_payload, _checkpoint_payload) =
2220 write_valid_bootstrap_fixture(&layout, root_epoch);
2221 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2222 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2223 let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap");
2224 assert_eq!(state.schema_snapshot_bytes, schema_payload);
2225 }
2226
2227 #[test]
2228 fn test_bootstrap_checkpoint_base_warms_cache() {
2229 let (_tmp, layout) = create_layout();
2230 let root_epoch = EpochId::new(15);
2231 let (manifest_id, _manifest, _marker, _schema_payload, checkpoint_payload) =
2232 write_valid_bootstrap_fixture(&layout, root_epoch);
2233 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2234 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2235 let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap");
2236 assert_eq!(state.checkpoint_base_bytes, checkpoint_payload);
2237 }
2238
2239 #[test]
2240 fn test_bootstrap_happy_path_from_root() {
2241 test_bootstrap_full_sequence();
2242 }
2243
2244 #[test]
2245 fn test_bootstrap_corrupt_root_pointer_recovers_by_scan() {
2246 test_bootstrap_corrupted_root_recovery();
2247 }
2248
2249 #[test]
2250 fn test_bootstrap_root_auth_mismatch_fails() {
2251 let (_tmp, layout) = create_layout();
2252 let root_epoch = EpochId::new(16);
2253 let (manifest_id, _manifest, _marker, _schema_payload, _checkpoint_payload) =
2254 write_valid_bootstrap_fixture(&layout, root_epoch);
2255 let good_key = test_master_key();
2256 let bad_key = [0x5A_u8; 32];
2257 let pointer =
2258 build_root_pointer(manifest_id, root_epoch, true, Some(&good_key)).expect("root");
2259 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2260 must_err_contains(
2261 bootstrap_native_mode(&layout, true, Some(&bad_key)),
2262 "auth_failed",
2263 "bootstrap_root_auth_mismatch_fails",
2264 );
2265 }
2266
2267 #[test]
2268 fn test_bootstrap_root_pointer_corrupt_checksum_fails_then_scan() {
2269 let (_tmp, layout) = create_layout();
2270 let root_epoch = EpochId::new(17);
2271 let (manifest_id, _manifest, _marker, _schema_payload, _checkpoint_payload) =
2272 write_valid_bootstrap_fixture(&layout, root_epoch);
2273 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2274 let mut root_bytes = pointer.encode();
2275 root_bytes[10] ^= 0xAA;
2276 fs::write(layout.root_path(), root_bytes).expect("write corrupt root");
2277 let state = bootstrap_native_mode_with_recovery(&layout, false, None).expect("recovered");
2278 assert_eq!(state.root_pointer.manifest_object_id, manifest_id);
2279 assert_eq!(state.root_pointer.ecs_epoch, root_epoch);
2280 }
2281
2282 #[test]
2283 fn test_e2e_native_mode_open_close_reopen() {
2284 let (_tmp, layout) = create_layout();
2285 let root_epoch = EpochId::new(18);
2286 let (manifest_id, manifest, marker, _schema_payload, _checkpoint_payload) =
2287 write_valid_bootstrap_fixture(&layout, root_epoch);
2288 let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2289 write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2290
2291 let first_open = bootstrap_native_mode(&layout, false, None).expect("first open");
2292 let second_open = bootstrap_native_mode(&layout, false, None).expect("second open");
2293 assert_eq!(first_open.manifest, manifest);
2294 assert_eq!(first_open.latest_marker, marker);
2295 assert_eq!(
2296 second_open.manifest.current_commit,
2297 first_open.manifest.current_commit
2298 );
2299 assert_eq!(second_open.root_pointer, first_open.root_pointer);
2300
2301 fs::write(layout.root_path(), [0_u8; 9]).expect("corrupt root");
2302 let recovered = bootstrap_native_mode_with_recovery(&layout, false, None).expect("reopen");
2303 assert_eq!(
2304 recovered.manifest.current_commit,
2305 first_open.manifest.current_commit
2306 );
2307 assert_eq!(
2308 recovered.manifest.commit_seq,
2309 first_open.manifest.commit_seq
2310 );
2311 }
2312
2313 #[test]
2314 fn test_e2e_bootstrap_cold_start() {
2315 test_bootstrap_full_sequence();
2316 }
2317
2318 #[test]
2319 fn test_e2e_bootstrap_after_crash() {
2320 test_bootstrap_root_pointer_corrupt_checksum_fails_then_scan();
2321 }
2322
2323 #[test]
2324 fn test_e2e_bootstrap_schema_migration() {
2325 let (_tmp, layout) = create_layout();
2326 let root_epoch = EpochId::new(19);
2327 let manifest_id_v1 = make_object_id(0x88);
2328 let marker0 = make_marker(0, [0_u8; 16], 0x89);
2329 let marker1 = make_marker(1, marker0.marker_id, 0x8A);
2330 write_marker_segment(
2331 layout.markers_dir().as_path(),
2332 0,
2333 &[marker0, marker1.clone()],
2334 );
2335
2336 let schema_v1 = make_object_id(0x8B);
2337 let schema_v2 = make_object_id(0x8C);
2338 let checkpoint_id = make_object_id(0x8D);
2339 let manifest_v1 = make_manifest(
2340 "schema-v1",
2341 ObjectId::from_bytes(marker1.marker_id),
2342 1,
2343 schema_v1,
2344 1,
2345 root_epoch,
2346 checkpoint_id,
2347 );
2348 let manifest_v2 = make_manifest(
2349 "schema-v2",
2350 ObjectId::from_bytes(marker1.marker_id),
2351 1,
2352 schema_v2,
2353 2,
2354 root_epoch,
2355 checkpoint_id,
2356 );
2357 let manifest_id_v2 = make_object_id(0x8E);
2358 write_single_symbol_object(
2359 layout.symbols_dir().as_path(),
2360 1,
2361 root_epoch,
2362 manifest_id_v1,
2363 &manifest_v1.encode().expect("manifest v1"),
2364 );
2365 write_single_symbol_object(
2366 layout.symbols_dir().as_path(),
2367 1,
2368 root_epoch,
2369 manifest_id_v2,
2370 &manifest_v2.encode().expect("manifest v2"),
2371 );
2372 write_single_symbol_object(
2373 layout.symbols_dir().as_path(),
2374 1,
2375 root_epoch,
2376 schema_v1,
2377 b"schema-v1",
2378 );
2379 write_single_symbol_object(
2380 layout.symbols_dir().as_path(),
2381 1,
2382 root_epoch,
2383 schema_v2,
2384 b"schema-v2",
2385 );
2386 write_single_symbol_object(
2387 layout.symbols_dir().as_path(),
2388 1,
2389 root_epoch,
2390 checkpoint_id,
2391 b"checkpoint",
2392 );
2393
2394 let pointer_v2 = build_root_pointer(manifest_id_v2, root_epoch, false, None).expect("root");
2395 write_root_pointer_atomic(&layout.root_path(), pointer_v2).expect("write root");
2396 let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap");
2397 assert_eq!(state.manifest.schema_epoch, 2);
2398 assert_eq!(state.schema_snapshot_bytes, b"schema-v2".to_vec());
2399 }
2400
2401 #[test]
2402 fn test_ecs_root_pointer_checksum_roundtrip() {
2403 test_ecs_root_pointer_encode_decode();
2404 }
2405
2406 #[test]
2407 fn test_ecs_root_pointer_auth_tag_verifies() {
2408 test_root_auth_tag_verification();
2409 }
2410
2411 #[test]
2412 fn test_bootstrap_future_epoch_guard() {
2413 test_bootstrap_step_4_epoch_guard();
2414 }
2415
2416 #[test]
2417 fn test_root_manifest_epoch_must_match_root_pointer() {
2418 test_bootstrap_step_5_epoch_invariant();
2419 }
2420
2421 #[test]
2422 fn test_bootstrap_commit_marker_matches_current_commit() {
2423 test_bootstrap_step_6_marker_verification();
2424 }
2425
2426 #[test]
2427 fn test_bd_1hi_25_unit_compliance_gate() {
2428 assert_eq!(ROOT_BOOTSTRAP_BEAD_ID, "bd-1hi.25");
2429 assert_eq!(ROOT_BOOTSTRAP_LOGGING_STANDARD, "bd-1fpm");
2430 assert_eq!(ECS_ROOT_POINTER_MAGIC, *b"FSRT");
2431 assert_eq!(ROOT_MANIFEST_MAGIC, *b"FSQLROOT");
2432 }
2433
2434 #[test]
2435 fn prop_bd_1hi_25_structure_compliance() {
2436 for name in ["segment-000001.log", "segment-999999.log"] {
2437 assert!(
2438 parse_segment_id(name).is_some(),
2439 "bead_id={ROOT_BEAD_ID} case=parse_segment_id_valid name={name}"
2440 );
2441 }
2442 for name in ["segment.log", "segment-aa.log", "other-000001.log"] {
2443 assert!(
2444 parse_segment_id(name).is_none(),
2445 "bead_id={ROOT_BEAD_ID} case=parse_segment_id_invalid name={name}"
2446 );
2447 }
2448 }
2449
2450 #[test]
2451 fn test_e2e_bd_1hi_25_compliance() {
2452 test_e2e_native_mode_open_close_reopen();
2453 }
2454}