Skip to main content

fsqlite_core/
epoch.rs

1//! Epoch clock, key derivation, and epoch transition barrier (§4.18, bd-3go.12).
2//!
3//! `ecs_epoch` is a monotone `u64` stored durably in [`RootManifest::ecs_epoch`]
4//! and mirrored in `SharedMemoryLayout.ecs_epoch`. Epochs MUST NOT be reused.
5//!
6//! The [`EpochClock`] provides the in-process monotone counter. The
7//! [`EpochBarrier`] coordinates all-or-nothing epoch transitions across
8//! participants (WriteCoordinator, SymbolStore, Replicator, CheckpointGc).
9
10use std::collections::BTreeMap;
11use std::fs;
12use std::io::Write;
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicU64, Ordering};
15
16use crate::commit_marker::{
17    CommitMarkerRecord, MARKER_SEGMENT_HEADER_BYTES, MarkerSegmentHeader, recover_valid_prefix,
18    segment_id_for_commit_seq,
19};
20use crate::symbol_log::scan_symbol_segment;
21use fsqlite_error::{FrankenError, Result};
22use fsqlite_types::{EpochId, ObjectId, SymbolRecord};
23use tracing::{debug, error, info, warn};
24
25// ── Domain separation constants (§4.18.2) ──────────────────────────────
26
27/// Domain separator for deriving the symbol auth master key from a DEK.
28///
29/// `master_key = BLAKE3_KEYED(DEK, "fsqlite:symbol-auth-master:v1")`
30const MASTER_KEY_DOMAIN: &[u8] = b"fsqlite:symbol-auth-master:v1";
31
32/// Domain separator for deriving per-epoch auth keys.
33///
34/// `K_epoch = BLAKE3_KEYED(master_key, "fsqlite:symbol-auth:epoch:v1" || le_u64(ecs_epoch))`
35const EPOCH_KEY_DOMAIN: &[u8] = b"fsqlite:symbol-auth:epoch:v1";
36
37/// Domain separator for `ecs/root` authentication tags (§3.5.5).
38const ROOT_POINTER_AUTH_DOMAIN: &[u8] = b"fsqlite:ecs-root-auth:v1";
39
40/// Logging bead id for RootManifest bootstrap work.
41const ROOT_BOOTSTRAP_BEAD_ID: &str = "bd-1hi.25";
42/// Structured logging standard bead reference.
43const ROOT_BOOTSTRAP_LOGGING_STANDARD: &str = "bd-1fpm";
44/// Process-local suffix counter for `ecs/root` temp file names.
45static ROOT_TMP_SUFFIX_COUNTER: AtomicU64 = AtomicU64::new(0);
46
47// ── EpochClock ─────────────────────────────────────────────────────────
48
49/// In-process monotone epoch counter (§4.18).
50///
51/// Wraps an `AtomicU64` for lock-free reads. Increments are serialized by
52/// the coordinator (only one caller should call [`EpochClock::increment`] at a time).
53#[derive(Debug)]
54pub struct EpochClock {
55    current: AtomicU64,
56}
57
58impl EpochClock {
59    /// Create a new clock initialised at the given epoch.
60    #[must_use]
61    pub fn new(initial: EpochId) -> Self {
62        Self {
63            current: AtomicU64::new(initial.get()),
64        }
65    }
66
67    /// Read the current epoch (Acquire ordering).
68    #[must_use]
69    pub fn current(&self) -> EpochId {
70        EpochId::new(self.current.load(Ordering::Acquire))
71    }
72
73    /// Atomically increment the epoch counter by one.
74    ///
75    /// Returns the *new* epoch on success, or an error if the counter
76    /// would overflow (saturated at `u64::MAX`).
77    ///
78    /// # Errors
79    ///
80    /// Returns [`FrankenError::OutOfRange`] if the epoch has reached `u64::MAX`.
81    pub fn increment(&self) -> Result<EpochId> {
82        loop {
83            let old = self.current.load(Ordering::Acquire);
84            let new = old.checked_add(1).ok_or_else(|| {
85                error!(
86                    bead_id = "bd-3go.12",
87                    old_epoch = old,
88                    "epoch counter overflow — cannot increment past u64::MAX"
89                );
90                FrankenError::OutOfRange {
91                    what: "ecs_epoch".to_owned(),
92                    value: old.to_string(),
93                }
94            })?;
95            if self
96                .current
97                .compare_exchange_weak(old, new, Ordering::AcqRel, Ordering::Acquire)
98                .is_ok()
99            {
100                info!(
101                    bead_id = "bd-3go.12",
102                    old_epoch = old,
103                    new_epoch = new,
104                    "epoch incremented"
105                );
106                return Ok(EpochId::new(new));
107            }
108        }
109    }
110
111    /// Store a specific epoch value (Release ordering).
112    ///
113    /// Used during bootstrap or recovery to set the epoch from a persisted
114    /// `RootManifest.ecs_epoch`.
115    pub fn store(&self, epoch: EpochId) {
116        self.current.store(epoch.get(), Ordering::Release);
117    }
118}
119
120// ── Epoch-scoped key derivation (§4.18.2) ──────────────────────────────
121
122/// A 32-byte epoch-scoped symbol authentication key.
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct EpochAuthKey([u8; 32]);
125
126impl EpochAuthKey {
127    /// View the raw key bytes.
128    #[must_use]
129    pub fn as_bytes(&self) -> &[u8; 32] {
130        &self.0
131    }
132}
133
134/// Derive the symbol auth master key from a DEK with domain separation (§4.18.2).
135///
136/// `master_key = BLAKE3_KEYED(DEK, "fsqlite:symbol-auth-master:v1")`
137///
138/// The DEK must be exactly 32 bytes (XChaCha20-Poly1305 key size).
139pub fn derive_master_key_from_dek(dek: &[u8; 32]) -> [u8; 32] {
140    let keyed_hasher = blake3::Hasher::new_keyed(dek);
141    let mut hasher = keyed_hasher;
142    hasher.update(MASTER_KEY_DOMAIN);
143    let hash = hasher.finalize();
144    debug!(
145        bead_id = "bd-3go.12",
146        domain = std::str::from_utf8(MASTER_KEY_DOMAIN).unwrap_or("<invalid>"),
147        "derived master key from DEK with domain separation"
148    );
149    *hash.as_bytes()
150}
151
152/// Derive an epoch-scoped authentication key from a master key (§4.18.2).
153///
154/// `K_epoch = BLAKE3_KEYED(master_key, "fsqlite:symbol-auth:epoch:v1" || le_u64(ecs_epoch))`
155///
156/// Deterministic: same `(master_key, epoch)` always produces the same key.
157#[must_use]
158pub fn derive_epoch_auth_key(master_key: &[u8; 32], epoch: EpochId) -> EpochAuthKey {
159    let mut hasher = blake3::Hasher::new_keyed(master_key);
160    hasher.update(EPOCH_KEY_DOMAIN);
161    hasher.update(&epoch.get().to_le_bytes());
162    let hash = hasher.finalize();
163    debug!(
164        bead_id = "bd-3go.12",
165        epoch = epoch.get(),
166        domain = std::str::from_utf8(EPOCH_KEY_DOMAIN).unwrap_or("<invalid>"),
167        "derived epoch auth key (NOT logging key material)"
168    );
169    EpochAuthKey(*hash.as_bytes())
170}
171
172// ── EpochBarrier (§4.18.4) ──────────────────────────────────────────────
173
174/// Outcome of an epoch barrier attempt.
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
176pub enum BarrierOutcome {
177    /// All participants arrived; epoch was incremented.
178    AllArrived {
179        /// The new epoch after increment.
180        new_epoch: EpochId,
181    },
182    /// The barrier timed out before all participants arrived.
183    Timeout {
184        /// How many participants arrived before timeout.
185        arrived: usize,
186        /// Total expected participants.
187        expected: usize,
188    },
189    /// The barrier was explicitly cancelled.
190    Cancelled,
191}
192
193/// Epoch transition barrier (§4.18.4).
194///
195/// Coordinates quiescence across all correctness-critical services before
196/// incrementing the epoch. The barrier is all-or-nothing: either all
197/// participants arrive, or the epoch does not advance.
198///
199/// Participants: WriteCoordinator, SymbolStore, Replicator, CheckpointGc.
200#[derive(Debug)]
201pub struct EpochBarrier {
202    /// The epoch being transitioned *from*.
203    current_epoch: EpochId,
204    /// Total expected participants.
205    expected: usize,
206    /// Number of participants that have arrived.
207    arrived: AtomicU64,
208    /// Whether the barrier has been cancelled.
209    cancelled: std::sync::atomic::AtomicBool,
210}
211
212impl EpochBarrier {
213    /// Create a new epoch barrier.
214    ///
215    /// `current_epoch` is the epoch being transitioned from.
216    /// `participants` is the number of services that must drain and arrive.
217    #[must_use]
218    pub fn new(current_epoch: EpochId, participants: usize) -> Self {
219        info!(
220            bead_id = "bd-3go.12",
221            epoch = current_epoch.get(),
222            participants,
223            "epoch barrier created"
224        );
225        Self {
226            current_epoch,
227            expected: participants,
228            arrived: AtomicU64::new(0),
229            cancelled: std::sync::atomic::AtomicBool::new(false),
230        }
231    }
232
233    /// The epoch being transitioned from.
234    #[must_use]
235    pub fn epoch(&self) -> EpochId {
236        self.current_epoch
237    }
238
239    /// Number of participants that have arrived so far.
240    #[must_use]
241    pub fn arrived_count(&self) -> usize {
242        let val = self.arrived.load(Ordering::Acquire);
243        usize::try_from(val).unwrap_or(usize::MAX)
244    }
245
246    /// Total expected participants.
247    #[must_use]
248    pub fn expected_count(&self) -> usize {
249        self.expected
250    }
251
252    /// Register that a participant has drained in-flight work and arrived.
253    ///
254    /// Returns `true` if this was the last participant (barrier is complete).
255    pub fn arrive(&self, participant_name: &str) -> bool {
256        if self.cancelled.load(Ordering::Acquire) {
257            warn!(
258                bead_id = "bd-3go.12",
259                participant = participant_name,
260                "participant arrived at cancelled barrier — ignoring"
261            );
262            return false;
263        }
264        let prev = self.arrived.fetch_add(1, Ordering::AcqRel);
265        let new_count = usize::try_from(prev.saturating_add(1)).unwrap_or(usize::MAX);
266        debug!(
267            bead_id = "bd-3go.12",
268            participant = participant_name,
269            arrived = new_count,
270            expected = self.expected,
271            "barrier participant arrived"
272        );
273        new_count >= self.expected
274    }
275
276    /// Whether all participants have arrived.
277    #[must_use]
278    pub fn is_complete(&self) -> bool {
279        self.arrived_count() >= self.expected
280    }
281
282    /// Cancel the barrier. The epoch will NOT advance.
283    pub fn cancel(&self) {
284        self.cancelled.store(true, Ordering::Release);
285        warn!(
286            bead_id = "bd-3go.12",
287            epoch = self.current_epoch.get(),
288            arrived = self.arrived_count(),
289            expected = self.expected,
290            "epoch barrier cancelled — epoch will NOT advance"
291        );
292    }
293
294    /// Whether the barrier has been cancelled.
295    #[must_use]
296    pub fn is_cancelled(&self) -> bool {
297        self.cancelled.load(Ordering::Acquire)
298    }
299
300    /// Resolve the barrier after a timeout or cancellation check.
301    ///
302    /// If all participants arrived and the barrier is not cancelled,
303    /// the epoch clock is incremented and the new epoch is returned.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`FrankenError::OutOfRange`] if the epoch clock overflows.
308    pub fn resolve(&self, clock: &EpochClock) -> Result<BarrierOutcome> {
309        if self.is_cancelled() {
310            return Ok(BarrierOutcome::Cancelled);
311        }
312        if !self.is_complete() {
313            return Ok(BarrierOutcome::Timeout {
314                arrived: self.arrived_count(),
315                expected: self.expected,
316            });
317        }
318        let new_epoch = clock.increment()?;
319        info!(
320            bead_id = "bd-3go.12",
321            old_epoch = self.current_epoch.get(),
322            new_epoch = new_epoch.get(),
323            participants = self.expected,
324            "epoch transition completed — all participants arrived"
325        );
326        Ok(BarrierOutcome::AllArrived { new_epoch })
327    }
328}
329
330// ── Validation helpers ──────────────────────────────────────────────────
331
332/// Validate a symbol's epoch against the current validity window (§4.18.1).
333///
334/// Fail-closed: rejects symbols with `epoch_id > current_epoch`.
335///
336/// # Errors
337///
338/// Returns [`FrankenError::DatabaseCorrupt`] if the symbol epoch is outside
339/// the validity window.
340pub fn validate_symbol_epoch(
341    symbol_epoch: EpochId,
342    window: &fsqlite_types::SymbolValidityWindow,
343) -> Result<()> {
344    if window.contains(symbol_epoch) {
345        Ok(())
346    } else {
347        error!(
348            bead_id = "bd-3go.12",
349            symbol_epoch = symbol_epoch.get(),
350            window_from = window.from_epoch.get(),
351            window_to = window.to_epoch.get(),
352            "symbol epoch outside validity window — fail-closed rejection"
353        );
354        Err(FrankenError::DatabaseCorrupt {
355            detail: format!(
356                "symbol epoch {} outside validity window [{}, {}]",
357                symbol_epoch.get(),
358                window.from_epoch.get(),
359                window.to_epoch.get(),
360            ),
361        })
362    }
363}
364
365// ── RootManifest bootstrap (§3.5.5, bd-1hi.25) ──────────────────────────
366
367/// Magic bytes for `ecs/root`: `"FSRT"`.
368pub const ECS_ROOT_POINTER_MAGIC: [u8; 4] = *b"FSRT";
369/// Supported `ecs/root` pointer version.
370pub const ECS_ROOT_POINTER_VERSION: u32 = 1;
371/// Exact wire size of `EcsRootPointer`.
372pub const ECS_ROOT_POINTER_BYTES: usize = 56;
373/// Bytes covered by `checksum` in `EcsRootPointer`.
374const ECS_ROOT_POINTER_CHECKSUM_INPUT_BYTES: usize = 32;
375/// Bytes covered by `root_auth_tag` in `EcsRootPointer`.
376const ECS_ROOT_POINTER_AUTH_INPUT_BYTES: usize = 40;
377
378/// Magic bytes for `RootManifest`: `"FSQLROOT"`.
379pub const ROOT_MANIFEST_MAGIC: [u8; 8] = *b"FSQLROOT";
380/// Supported RootManifest version.
381pub const ROOT_MANIFEST_VERSION: u32 = 1;
382
383/// Mutable bootstrap pointer at `ecs/root`.
384///
385/// The pointer is tiny and atomically updated; it is the only file read before
386/// object lookup starts.
387#[derive(Debug, Clone, Copy, PartialEq, Eq)]
388pub struct EcsRootPointer {
389    /// ObjectId of the current RootManifest object.
390    pub manifest_object_id: ObjectId,
391    /// Bootstrap epoch guard (`root_epoch`).
392    pub ecs_epoch: EpochId,
393    /// Optional keyed authentication tag for `symbol_auth=on`.
394    pub root_auth_tag: [u8; 16],
395}
396
397impl EcsRootPointer {
398    /// Construct an unauthenticated root pointer (`symbol_auth=off`).
399    #[must_use]
400    pub const fn unauthed(manifest_object_id: ObjectId, ecs_epoch: EpochId) -> Self {
401        Self {
402            manifest_object_id,
403            ecs_epoch,
404            root_auth_tag: [0_u8; 16],
405        }
406    }
407
408    /// Construct an authenticated root pointer (`symbol_auth=on`).
409    #[must_use]
410    pub fn authed(manifest_object_id: ObjectId, ecs_epoch: EpochId, master_key: &[u8; 32]) -> Self {
411        let mut pointer = Self::unauthed(manifest_object_id, ecs_epoch);
412        let auth_input = pointer.auth_input_bytes();
413        pointer.root_auth_tag = compute_root_pointer_auth_tag(master_key, &auth_input);
414        pointer
415    }
416
417    /// Encode to exact wire bytes.
418    #[must_use]
419    pub fn encode(&self) -> [u8; ECS_ROOT_POINTER_BYTES] {
420        let mut out = [0_u8; ECS_ROOT_POINTER_BYTES];
421        out[0..4].copy_from_slice(&ECS_ROOT_POINTER_MAGIC);
422        out[4..8].copy_from_slice(&ECS_ROOT_POINTER_VERSION.to_le_bytes());
423        out[8..24].copy_from_slice(self.manifest_object_id.as_bytes());
424        out[24..32].copy_from_slice(&self.ecs_epoch.get().to_le_bytes());
425        let checksum = xxhash_rust::xxh3::xxh3_64(&out[..ECS_ROOT_POINTER_CHECKSUM_INPUT_BYTES]);
426        out[32..40].copy_from_slice(&checksum.to_le_bytes());
427        out[40..56].copy_from_slice(&self.root_auth_tag);
428        out
429    }
430
431    /// Decode and validate `ecs/root`.
432    ///
433    /// # Errors
434    ///
435    /// Returns [`FrankenError::DatabaseCorrupt`] on magic/version/checksum/auth failures.
436    pub fn decode(
437        bytes: &[u8],
438        symbol_auth_enabled: bool,
439        master_key: Option<&[u8; 32]>,
440    ) -> Result<Self> {
441        if bytes.len() != ECS_ROOT_POINTER_BYTES {
442            return Err(FrankenError::DatabaseCorrupt {
443                detail: format!(
444                    "ecs/root size mismatch: expected {ECS_ROOT_POINTER_BYTES}, got {}",
445                    bytes.len()
446                ),
447            });
448        }
449        if bytes[0..4] != ECS_ROOT_POINTER_MAGIC {
450            return Err(FrankenError::DatabaseCorrupt {
451                detail: format!(
452                    "invalid ecs/root magic: {:02X?} (reason=bad_magic)",
453                    &bytes[0..4]
454                ),
455            });
456        }
457        let version = read_u32_le_at(bytes, 4, "root.version")?;
458        if version != ECS_ROOT_POINTER_VERSION {
459            return Err(FrankenError::DatabaseCorrupt {
460                detail: format!(
461                    "unsupported ecs/root version {version} (expected {ECS_ROOT_POINTER_VERSION})"
462                ),
463            });
464        }
465
466        let mut manifest_id = [0_u8; 16];
467        manifest_id.copy_from_slice(&bytes[8..24]);
468        let manifest_object_id = ObjectId::from_bytes(manifest_id);
469        let ecs_epoch_raw = read_u64_le_at(bytes, 24, "root.ecs_epoch")?;
470        let ecs_epoch = EpochId::new(ecs_epoch_raw);
471
472        let stored_checksum = read_u64_le_at(bytes, 32, "root.checksum")?;
473        let computed_checksum =
474            xxhash_rust::xxh3::xxh3_64(&bytes[..ECS_ROOT_POINTER_CHECKSUM_INPUT_BYTES]);
475        if stored_checksum != computed_checksum {
476            error!(
477                bead_id = ROOT_BOOTSTRAP_BEAD_ID,
478                logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
479                reason_code = "checksum_mismatch",
480                stored_checksum = stored_checksum,
481                computed_checksum = computed_checksum,
482                "ecs/root checksum verification failed"
483            );
484            return Err(FrankenError::DatabaseCorrupt {
485                detail: format!(
486                    "ecs/root checksum mismatch (reason=checksum_mismatch): stored={stored_checksum:#018X}, computed={computed_checksum:#018X}"
487                ),
488            });
489        }
490
491        let mut root_auth_tag = [0_u8; 16];
492        root_auth_tag.copy_from_slice(&bytes[40..56]);
493
494        if symbol_auth_enabled {
495            let Some(master_key) = master_key else {
496                return Err(FrankenError::DatabaseCorrupt {
497                    detail: "symbol_auth enabled but master key is missing (reason=auth_failed)"
498                        .to_owned(),
499                });
500            };
501            let expected = compute_root_pointer_auth_tag(
502                master_key,
503                &bytes[..ECS_ROOT_POINTER_AUTH_INPUT_BYTES],
504            );
505            if root_auth_tag != expected {
506                error!(
507                    bead_id = ROOT_BOOTSTRAP_BEAD_ID,
508                    logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
509                    reason_code = "auth_failed",
510                    "ecs/root auth-tag verification failed"
511                );
512                return Err(FrankenError::DatabaseCorrupt {
513                    detail: "ecs/root auth tag verification failed (reason=auth_failed)".to_owned(),
514                });
515            }
516        } else if root_auth_tag != [0_u8; 16] {
517            return Err(FrankenError::DatabaseCorrupt {
518                detail: "ecs/root auth tag must be all-zero when symbol_auth=off".to_owned(),
519            });
520        }
521
522        Ok(Self {
523            manifest_object_id,
524            ecs_epoch,
525            root_auth_tag,
526        })
527    }
528
529    /// Bytes used to compute `root_auth_tag`.
530    #[must_use]
531    fn auth_input_bytes(&self) -> [u8; ECS_ROOT_POINTER_AUTH_INPUT_BYTES] {
532        let encoded = self.encode();
533        let mut out = [0_u8; ECS_ROOT_POINTER_AUTH_INPUT_BYTES];
534        out.copy_from_slice(&encoded[..ECS_ROOT_POINTER_AUTH_INPUT_BYTES]);
535        out
536    }
537}
538
539/// Root bootstrap state object.
540#[derive(Debug, Clone, PartialEq, Eq)]
541pub struct RootManifest {
542    /// Human-readable database name.
543    pub database_name: String,
544    /// Latest commit marker object id.
545    pub current_commit: ObjectId,
546    /// Latest commit sequence.
547    pub commit_seq: u64,
548    /// Current schema snapshot object id.
549    pub schema_snapshot: ObjectId,
550    /// Monotone schema epoch.
551    pub schema_epoch: u64,
552    /// Monotone ECS coordination epoch.
553    pub ecs_epoch: EpochId,
554    /// Last full checkpoint base object.
555    pub checkpoint_base: ObjectId,
556    /// GC horizon commit sequence.
557    pub gc_horizon: u64,
558    /// Creation timestamp.
559    pub created_at: u64,
560    /// Last update timestamp.
561    pub updated_at: u64,
562}
563
564impl RootManifest {
565    /// Encode to deterministic bytes with trailing checksum.
566    ///
567    /// # Errors
568    ///
569    /// Returns [`FrankenError::OutOfRange`] if `database_name` is too large.
570    pub fn encode(&self) -> Result<Vec<u8>> {
571        let name_bytes = self.database_name.as_bytes();
572        let name_len = u32::try_from(name_bytes.len()).map_err(|_| FrankenError::OutOfRange {
573            what: "root_manifest.database_name_len".to_owned(),
574            value: name_bytes.len().to_string(),
575        })?;
576
577        let mut out = Vec::with_capacity(name_bytes.len().saturating_add(128));
578        out.extend_from_slice(&ROOT_MANIFEST_MAGIC);
579        out.extend_from_slice(&ROOT_MANIFEST_VERSION.to_le_bytes());
580        out.extend_from_slice(&name_len.to_le_bytes());
581        out.extend_from_slice(name_bytes);
582        out.extend_from_slice(self.current_commit.as_bytes());
583        out.extend_from_slice(&self.commit_seq.to_le_bytes());
584        out.extend_from_slice(self.schema_snapshot.as_bytes());
585        out.extend_from_slice(&self.schema_epoch.to_le_bytes());
586        out.extend_from_slice(&self.ecs_epoch.get().to_le_bytes());
587        out.extend_from_slice(self.checkpoint_base.as_bytes());
588        out.extend_from_slice(&self.gc_horizon.to_le_bytes());
589        out.extend_from_slice(&self.created_at.to_le_bytes());
590        out.extend_from_slice(&self.updated_at.to_le_bytes());
591        let checksum = xxhash_rust::xxh3::xxh3_64(&out);
592        out.extend_from_slice(&checksum.to_le_bytes());
593        Ok(out)
594    }
595
596    /// Decode and validate a `RootManifest`.
597    ///
598    /// # Errors
599    ///
600    /// Returns [`FrankenError::DatabaseCorrupt`] on wire-format violations.
601    pub fn decode(bytes: &[u8]) -> Result<Self> {
602        if bytes.len() < 120 {
603            return Err(FrankenError::DatabaseCorrupt {
604                detail: format!(
605                    "root manifest too short: expected >= 120 bytes, got {}",
606                    bytes.len()
607                ),
608            });
609        }
610        if bytes[0..8] != ROOT_MANIFEST_MAGIC {
611            return Err(FrankenError::DatabaseCorrupt {
612                detail: format!("invalid root manifest magic: {:02X?}", &bytes[0..8]),
613            });
614        }
615        let version = read_u32_le_at(bytes, 8, "root_manifest.version")?;
616        if version != ROOT_MANIFEST_VERSION {
617            return Err(FrankenError::DatabaseCorrupt {
618                detail: format!(
619                    "unsupported root manifest version {version} (expected {ROOT_MANIFEST_VERSION})"
620                ),
621            });
622        }
623
624        let name_len_u32 = read_u32_le_at(bytes, 12, "root_manifest.database_name_len")?;
625        let name_len = u32_to_usize(name_len_u32, "root_manifest.database_name_len")?;
626        let mut cursor = 16_usize;
627        let name_end = checked_add(cursor, name_len, "root_manifest.database_name_end")?;
628        if name_end > bytes.len() {
629            return Err(FrankenError::DatabaseCorrupt {
630                detail: format!(
631                    "root manifest name out of bounds: end={name_end}, len={}",
632                    bytes.len()
633                ),
634            });
635        }
636        let database_name = std::str::from_utf8(&bytes[cursor..name_end])
637            .map_err(|err| FrankenError::DatabaseCorrupt {
638                detail: format!("root manifest database_name is not UTF-8: {err}"),
639            })?
640            .to_owned();
641        cursor = name_end;
642
643        let current_commit = read_object_id_at(bytes, cursor, "root_manifest.current_commit")?;
644        cursor = checked_add(cursor, 16, "root_manifest.cursor.current_commit")?;
645        let commit_seq = read_u64_le_at(bytes, cursor, "root_manifest.commit_seq")?;
646        cursor = checked_add(cursor, 8, "root_manifest.cursor.commit_seq")?;
647        let schema_snapshot = read_object_id_at(bytes, cursor, "root_manifest.schema_snapshot")?;
648        cursor = checked_add(cursor, 16, "root_manifest.cursor.schema_snapshot")?;
649        let schema_epoch = read_u64_le_at(bytes, cursor, "root_manifest.schema_epoch")?;
650        cursor = checked_add(cursor, 8, "root_manifest.cursor.schema_epoch")?;
651        let ecs_epoch_raw = read_u64_le_at(bytes, cursor, "root_manifest.ecs_epoch")?;
652        let ecs_epoch = EpochId::new(ecs_epoch_raw);
653        cursor = checked_add(cursor, 8, "root_manifest.cursor.ecs_epoch")?;
654        let checkpoint_base = read_object_id_at(bytes, cursor, "root_manifest.checkpoint_base")?;
655        cursor = checked_add(cursor, 16, "root_manifest.cursor.checkpoint_base")?;
656        let gc_horizon = read_u64_le_at(bytes, cursor, "root_manifest.gc_horizon")?;
657        cursor = checked_add(cursor, 8, "root_manifest.cursor.gc_horizon")?;
658        let created_at = read_u64_le_at(bytes, cursor, "root_manifest.created_at")?;
659        cursor = checked_add(cursor, 8, "root_manifest.cursor.created_at")?;
660        let updated_at = read_u64_le_at(bytes, cursor, "root_manifest.updated_at")?;
661        cursor = checked_add(cursor, 8, "root_manifest.cursor.updated_at")?;
662
663        let checksum_end = checked_add(cursor, 8, "root_manifest.cursor.checksum_end")?;
664        if checksum_end != bytes.len() {
665            return Err(FrankenError::DatabaseCorrupt {
666                detail: format!(
667                    "root manifest trailing bytes present: parsed_end={checksum_end}, actual_len={}",
668                    bytes.len()
669                ),
670            });
671        }
672        let stored_checksum = read_u64_le_at(bytes, cursor, "root_manifest.checksum")?;
673        let computed_checksum = xxhash_rust::xxh3::xxh3_64(&bytes[..cursor]);
674        if stored_checksum != computed_checksum {
675            return Err(FrankenError::DatabaseCorrupt {
676                detail: format!(
677                    "root manifest checksum mismatch: stored={stored_checksum:#018X}, computed={computed_checksum:#018X}"
678                ),
679            });
680        }
681
682        Ok(Self {
683            database_name,
684            current_commit,
685            commit_seq,
686            schema_snapshot,
687            schema_epoch,
688            ecs_epoch,
689            checkpoint_base,
690            gc_horizon,
691            created_at,
692            updated_at,
693        })
694    }
695}
696
697/// Compute `root_auth_tag = Trunc128(BLAKE3_KEYED(master_key, domain || bytes(magic..checksum)))`.
698#[must_use]
699pub fn compute_root_pointer_auth_tag(master_key: &[u8; 32], magic_to_checksum: &[u8]) -> [u8; 16] {
700    let mut hasher = blake3::Hasher::new_keyed(master_key);
701    hasher.update(ROOT_POINTER_AUTH_DOMAIN);
702    hasher.update(magic_to_checksum);
703    let digest = hasher.finalize();
704    let mut out = [0_u8; 16];
705    out.copy_from_slice(&digest.as_bytes()[..16]);
706    out
707}
708
709/// Filesystem layout inputs for native bootstrap.
710#[derive(Debug, Clone, PartialEq, Eq)]
711pub struct NativeBootstrapLayout {
712    /// Path to the `ecs/` directory.
713    pub ecs_dir: PathBuf,
714}
715
716impl NativeBootstrapLayout {
717    /// Construct a layout rooted at `ecs_dir`.
718    #[must_use]
719    pub fn new(ecs_dir: impl Into<PathBuf>) -> Self {
720        Self {
721            ecs_dir: ecs_dir.into(),
722        }
723    }
724
725    /// `ecs/root` path.
726    #[must_use]
727    pub fn root_path(&self) -> PathBuf {
728        self.ecs_dir.join("root")
729    }
730
731    /// `ecs/symbols` directory.
732    #[must_use]
733    pub fn symbols_dir(&self) -> PathBuf {
734        self.ecs_dir.join("symbols")
735    }
736
737    /// `ecs/markers` directory.
738    #[must_use]
739    pub fn markers_dir(&self) -> PathBuf {
740        self.ecs_dir.join("markers")
741    }
742}
743
744/// Result of successful native bootstrap.
745#[derive(Debug, Clone, PartialEq, Eq)]
746pub struct NativeBootstrapState {
747    /// Decoded root pointer from `ecs/root`.
748    pub root_pointer: EcsRootPointer,
749    /// Decoded RootManifest object.
750    pub manifest: RootManifest,
751    /// Verified latest marker matching `manifest.current_commit`.
752    pub latest_marker: CommitMarkerRecord,
753    /// Bytes for `manifest.schema_snapshot`.
754    pub schema_snapshot_bytes: Vec<u8>,
755    /// Bytes for `manifest.checkpoint_base`.
756    pub checkpoint_base_bytes: Vec<u8>,
757}
758
759/// Read and decode `ecs/root`.
760///
761/// # Errors
762///
763/// Returns [`FrankenError::DatabaseCorrupt`] for malformed root data.
764pub fn read_root_pointer(
765    root_path: &Path,
766    symbol_auth_enabled: bool,
767    master_key: Option<&[u8; 32]>,
768) -> Result<EcsRootPointer> {
769    let bytes = fs::read(root_path).map_err(|err| {
770        error!(
771            bead_id = ROOT_BOOTSTRAP_BEAD_ID,
772            logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
773            reason_code = "scan_failed",
774            path = %root_path.display(),
775            error = %err,
776            "failed reading ecs/root"
777        );
778        FrankenError::Io(err)
779    })?;
780    EcsRootPointer::decode(&bytes, symbol_auth_enabled, master_key)
781}
782
783/// Crash-safe `ecs/root` update: temp write -> fsync temp -> rename -> fsync dir.
784///
785/// # Errors
786///
787/// Returns [`FrankenError::Io`] for filesystem failures.
788pub fn write_root_pointer_atomic(root_path: &Path, pointer: EcsRootPointer) -> Result<()> {
789    let Some(parent) = root_path.parent() else {
790        return Err(FrankenError::DatabaseCorrupt {
791            detail: format!("ecs/root has no parent directory: {}", root_path.display()),
792        });
793    };
794    fs::create_dir_all(parent)?;
795
796    let pid = std::process::id();
797    let suffix = ROOT_TMP_SUFFIX_COUNTER.fetch_add(1, Ordering::SeqCst);
798    let tmp_name = format!(".root.tmp.{pid}.{suffix}");
799    let tmp_path = parent.join(tmp_name);
800
801    let bytes = pointer.encode();
802    let mut temp = fs::OpenOptions::new()
803        .write(true)
804        .create_new(true)
805        .open(&tmp_path)?;
806    temp.write_all(&bytes)?;
807    temp.sync_all()?;
808    fs::rename(&tmp_path, root_path)?;
809    let parent_dir = fs::File::open(parent)?;
810    parent_dir.sync_all()?;
811
812    info!(
813        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
814        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
815        path = %root_path.display(),
816        root_epoch = pointer.ecs_epoch.get(),
817        "wrote ecs/root atomically"
818    );
819
820    Ok(())
821}
822
823/// Build `EcsRootPointer` according to `symbol_auth` mode.
824///
825/// # Errors
826///
827/// Returns [`FrankenError::DatabaseCorrupt`] when auth is required but key is missing.
828pub fn build_root_pointer(
829    manifest_object_id: ObjectId,
830    ecs_epoch: EpochId,
831    symbol_auth_enabled: bool,
832    master_key: Option<&[u8; 32]>,
833) -> Result<EcsRootPointer> {
834    if symbol_auth_enabled {
835        let Some(master_key) = master_key else {
836            return Err(FrankenError::DatabaseCorrupt {
837                detail: "symbol_auth enabled but master key is missing (reason=auth_failed)"
838                    .to_owned(),
839            });
840        };
841        Ok(EcsRootPointer::authed(
842            manifest_object_id,
843            ecs_epoch,
844            master_key,
845        ))
846    } else {
847        Ok(EcsRootPointer::unauthed(manifest_object_id, ecs_epoch))
848    }
849}
850
851/// Bootstrap native mode from on-disk `ecs/root` and ECS objects.
852///
853/// Implements the §3.5.5 sequence:
854/// 1) read root 2) verify auth (optional) 3) capture root epoch
855/// 4) fetch manifest with future-epoch guard 5) enforce epoch equality
856/// 6) verify marker id 7) load schema snapshot 8) load checkpoint base.
857///
858/// # Errors
859///
860/// Returns [`FrankenError::DatabaseCorrupt`] when bootstrap invariants fail.
861pub fn bootstrap_native_mode(
862    layout: &NativeBootstrapLayout,
863    symbol_auth_enabled: bool,
864    master_key: Option<&[u8; 32]>,
865) -> Result<NativeBootstrapState> {
866    debug!(
867        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
868        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
869        step = 1_u8,
870        root_path = %layout.root_path().display(),
871        symbol_auth_enabled = symbol_auth_enabled,
872        "bootstrap step 1: reading ecs/root"
873    );
874    let root_path = layout.root_path();
875    let root_pointer = read_root_pointer(&root_path, symbol_auth_enabled, master_key)?;
876    info!(
877        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
878        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
879        step = 1_u8,
880        duration_ms = 0_u64,
881        root_epoch = root_pointer.ecs_epoch.get(),
882        manifest_object_id = %root_pointer.manifest_object_id,
883        "bootstrap steps 1-3 complete"
884    );
885    bootstrap_from_root_pointer(layout, root_pointer)
886}
887
888/// Bootstrap native mode, falling back to scan-based root recovery.
889///
890/// # Errors
891///
892/// Returns [`FrankenError::DatabaseCorrupt`] when neither root nor scan recovery succeeds.
893pub fn bootstrap_native_mode_with_recovery(
894    layout: &NativeBootstrapLayout,
895    symbol_auth_enabled: bool,
896    master_key: Option<&[u8; 32]>,
897) -> Result<NativeBootstrapState> {
898    match bootstrap_native_mode(layout, symbol_auth_enabled, master_key) {
899        Ok(state) => Ok(state),
900        Err(initial_err) => {
901            debug!(
902                bead_id = ROOT_BOOTSTRAP_BEAD_ID,
903                logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
904                reason_code = "retry_scan_recovery",
905                error = %initial_err,
906                "bootstrap entering degraded scan-based recovery path"
907            );
908            warn!(
909                bead_id = ROOT_BOOTSTRAP_BEAD_ID,
910                logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
911                reason_code = "retry_scan_recovery",
912                error = %initial_err,
913                "bootstrap from ecs/root failed; attempting scan-based recovery"
914            );
915
916            let recovered_pointer =
917                recover_root_pointer_from_scan(layout, symbol_auth_enabled, master_key)?;
918            write_root_pointer_atomic(&layout.root_path(), recovered_pointer)?;
919            bootstrap_from_root_pointer(layout, recovered_pointer)
920        }
921    }
922}
923
924/// Recover `ecs/root` by scanning markers and symbols for the latest valid RootManifest.
925///
926/// # Errors
927///
928/// Returns [`FrankenError::DatabaseCorrupt`] when no manifest candidate is recoverable.
929pub fn recover_root_pointer_from_scan(
930    layout: &NativeBootstrapLayout,
931    symbol_auth_enabled: bool,
932    master_key: Option<&[u8; 32]>,
933) -> Result<EcsRootPointer> {
934    let marker_tip = scan_latest_marker(layout.markers_dir().as_path())?;
935    let mut grouped: BTreeMap<ObjectId, Vec<SymbolRecord>> = BTreeMap::new();
936    let symbol_segments = sorted_segment_paths(layout.symbols_dir().as_path())?;
937    debug!(
938        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
939        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
940        segments = symbol_segments.len(),
941        marker_tip_commit_seq = marker_tip.as_ref().map_or(0_u64, |m| m.commit_seq),
942        "scan recovery started"
943    );
944
945    for (_, segment_path) in &symbol_segments {
946        debug!(
947            bead_id = ROOT_BOOTSTRAP_BEAD_ID,
948            logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
949            segment = %segment_path.display(),
950            "scan recovery inspecting symbol segment"
951        );
952        let scan = scan_symbol_segment(segment_path)?;
953        for row in scan.records {
954            grouped
955                .entry(row.record.object_id)
956                .or_default()
957                .push(row.record);
958        }
959    }
960
961    let mut best: Option<(ObjectId, RootManifest, bool)> = None;
962    for (object_id, records) in grouped {
963        let Ok(payload) = reconstruct_payload_from_source_symbols(records) else {
964            continue;
965        };
966        let Ok(manifest) = RootManifest::decode(&payload) else {
967            continue;
968        };
969
970        let marker_matches = marker_tip.as_ref().is_some_and(|tip| {
971            manifest.current_commit.as_bytes() == &tip.marker_id
972                && manifest.commit_seq == tip.commit_seq
973        });
974
975        match &best {
976            None => best = Some((object_id, manifest, marker_matches)),
977            Some((_, best_manifest, best_marker_matches)) => {
978                let better_marker_match = marker_matches && !best_marker_matches;
979                let better_commit = manifest.commit_seq > best_manifest.commit_seq;
980                let better_update = manifest.commit_seq == best_manifest.commit_seq
981                    && manifest.updated_at > best_manifest.updated_at;
982                if better_marker_match || better_commit || better_update {
983                    best = Some((object_id, manifest, marker_matches));
984                }
985            }
986        }
987    }
988
989    let Some((manifest_object_id, manifest, _)) = best else {
990        error!(
991            bead_id = ROOT_BOOTSTRAP_BEAD_ID,
992            logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
993            reason_code = "scan_failed",
994            segments_scanned = symbol_segments.len(),
995            "scan recovery could not find a valid RootManifest candidate"
996        );
997        return Err(FrankenError::DatabaseCorrupt {
998            detail: "scan recovery failed: no valid RootManifest candidate (reason=scan_failed)"
999                .to_owned(),
1000        });
1001    };
1002
1003    info!(
1004        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1005        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1006        segments_scanned = symbol_segments.len(),
1007        best_candidate_commit_seq = manifest.commit_seq,
1008        chosen_root_pointer = %manifest_object_id,
1009        "scan recovery selected root manifest candidate"
1010    );
1011
1012    build_root_pointer(
1013        manifest_object_id,
1014        manifest.ecs_epoch,
1015        symbol_auth_enabled,
1016        master_key,
1017    )
1018}
1019
1020#[allow(clippy::too_many_lines)]
1021fn bootstrap_from_root_pointer(
1022    layout: &NativeBootstrapLayout,
1023    root_pointer: EcsRootPointer,
1024) -> Result<NativeBootstrapState> {
1025    info!(
1026        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1027        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1028        step = 4_u8,
1029        root_epoch = root_pointer.ecs_epoch.get(),
1030        manifest_object_id = %root_pointer.manifest_object_id,
1031        "bootstrap step 4: loading root manifest object"
1032    );
1033    let manifest_bytes = fetch_object_payload(
1034        layout.symbols_dir().as_path(),
1035        root_pointer.manifest_object_id,
1036        root_pointer.ecs_epoch,
1037    )?;
1038    let manifest = RootManifest::decode(&manifest_bytes)?;
1039    info!(
1040        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1041        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1042        step = 4_u8,
1043        duration_ms = 0_u64,
1044        root_epoch = root_pointer.ecs_epoch.get(),
1045        object_id = %root_pointer.manifest_object_id,
1046        "bootstrap step 4 complete"
1047    );
1048
1049    if manifest.ecs_epoch != root_pointer.ecs_epoch {
1050        error!(
1051            bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1052            logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1053            reason_code = "epoch_mismatch",
1054            root_epoch = root_pointer.ecs_epoch.get(),
1055            manifest_epoch = manifest.ecs_epoch.get(),
1056            "bootstrap step 5 failed: root epoch != manifest epoch"
1057        );
1058        return Err(FrankenError::DatabaseCorrupt {
1059            detail: format!(
1060                "root/manifest epoch mismatch (reason=epoch_mismatch): root={}, manifest={}",
1061                root_pointer.ecs_epoch.get(),
1062                manifest.ecs_epoch.get()
1063            ),
1064        });
1065    }
1066    info!(
1067        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1068        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1069        step = 5_u8,
1070        duration_ms = 0_u64,
1071        root_epoch = root_pointer.ecs_epoch.get(),
1072        object_id = %root_pointer.manifest_object_id,
1073        "bootstrap step 5 complete"
1074    );
1075
1076    info!(
1077        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1078        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1079        step = 6_u8,
1080        commit_seq = manifest.commit_seq,
1081        "bootstrap step 6: verifying marker"
1082    );
1083    let latest_marker = fetch_marker_record(layout.markers_dir().as_path(), manifest.commit_seq)?;
1084    if latest_marker.marker_id != *manifest.current_commit.as_bytes() {
1085        error!(
1086            bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1087            logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1088            reason_code = "marker_mismatch",
1089            manifest_commit_seq = manifest.commit_seq,
1090            "bootstrap marker mismatch"
1091        );
1092        return Err(FrankenError::DatabaseCorrupt {
1093            detail:
1094                "root manifest current_commit does not match marker stream (reason=marker_mismatch)"
1095                    .to_owned(),
1096        });
1097    }
1098    info!(
1099        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1100        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1101        step = 6_u8,
1102        duration_ms = 0_u64,
1103        root_epoch = root_pointer.ecs_epoch.get(),
1104        object_id = %manifest.current_commit,
1105        "bootstrap step 6 complete"
1106    );
1107
1108    let schema_snapshot_bytes = fetch_object_payload(
1109        layout.symbols_dir().as_path(),
1110        manifest.schema_snapshot,
1111        root_pointer.ecs_epoch,
1112    )?;
1113    info!(
1114        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1115        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1116        step = 7_u8,
1117        duration_ms = 0_u64,
1118        root_epoch = root_pointer.ecs_epoch.get(),
1119        object_id = %manifest.schema_snapshot,
1120        "bootstrap step 7 complete"
1121    );
1122    let checkpoint_base_bytes = fetch_object_payload(
1123        layout.symbols_dir().as_path(),
1124        manifest.checkpoint_base,
1125        root_pointer.ecs_epoch,
1126    )?;
1127    info!(
1128        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1129        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1130        step = 8_u8,
1131        duration_ms = 0_u64,
1132        root_epoch = root_pointer.ecs_epoch.get(),
1133        object_id = %manifest.checkpoint_base,
1134        "bootstrap step 8 complete"
1135    );
1136
1137    info!(
1138        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1139        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1140        step = 9_u8,
1141        duration_ms = 0_u64,
1142        root_epoch = root_pointer.ecs_epoch.get(),
1143        commit_seq = manifest.commit_seq,
1144        schema_epoch = manifest.schema_epoch,
1145        "bootstrap sequence completed"
1146    );
1147
1148    Ok(NativeBootstrapState {
1149        root_pointer,
1150        manifest,
1151        latest_marker,
1152        schema_snapshot_bytes,
1153        checkpoint_base_bytes,
1154    })
1155}
1156
1157fn fetch_object_payload(
1158    symbols_dir: &Path,
1159    object_id: ObjectId,
1160    root_epoch: EpochId,
1161) -> Result<Vec<u8>> {
1162    let mut records = Vec::new();
1163    let segments = sorted_segment_paths(symbols_dir)?;
1164    debug!(
1165        bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1166        logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1167        root_epoch = root_epoch.get(),
1168        object_id = %object_id,
1169        segment_count = segments.len(),
1170        "fetching bootstrap object payload from symbol log"
1171    );
1172
1173    for (_, segment_path) in segments {
1174        let scan = scan_symbol_segment(&segment_path)?;
1175        if scan.header.epoch_id > root_epoch.get() {
1176            error!(
1177                bead_id = ROOT_BOOTSTRAP_BEAD_ID,
1178                logging_standard = ROOT_BOOTSTRAP_LOGGING_STANDARD,
1179                reason_code = "future_epoch",
1180                segment = %segment_path.display(),
1181                segment_epoch = scan.header.epoch_id,
1182                root_epoch = root_epoch.get(),
1183                "bootstrap rejected future-epoch segment"
1184            );
1185            return Err(FrankenError::DatabaseCorrupt {
1186                detail: format!(
1187                    "future-epoch segment rejected (reason=future_epoch): segment_epoch={}, root_epoch={}",
1188                    scan.header.epoch_id,
1189                    root_epoch.get()
1190                ),
1191            });
1192        }
1193        for row in scan.records {
1194            if row.record.object_id == object_id {
1195                records.push(row.record);
1196            }
1197        }
1198    }
1199
1200    if records.is_empty() {
1201        return Err(FrankenError::DatabaseCorrupt {
1202            detail: format!("object {object_id} not found in symbol logs"),
1203        });
1204    }
1205    reconstruct_payload_from_source_symbols(records)
1206}
1207
1208fn reconstruct_payload_from_source_symbols(mut records: Vec<SymbolRecord>) -> Result<Vec<u8>> {
1209    records.sort_by_key(|record| record.esi);
1210    let Some(first) = records.first() else {
1211        return Err(FrankenError::DatabaseCorrupt {
1212            detail: "cannot reconstruct payload from empty symbol set".to_owned(),
1213        });
1214    };
1215    // Save OTI before consuming the Vec so the borrow on `first` is released.
1216    let first_oti = first.oti;
1217    let symbol_size_u64 = u64::from(first_oti.t);
1218    if symbol_size_u64 == 0 {
1219        return Err(FrankenError::DatabaseCorrupt {
1220            detail: "symbol_size=0 in OTI".to_owned(),
1221        });
1222    }
1223
1224    let transfer_len_usize = u64_to_usize(first_oti.f, "oti.f")?;
1225    let source_symbols = first_oti.f.div_ceil(symbol_size_u64);
1226    let source_symbols_usize = u64_to_usize(source_symbols, "source_symbols")?;
1227    let symbol_size_usize = u32_to_usize(first_oti.t, "oti.t")?;
1228    let total_bytes = source_symbols_usize
1229        .checked_mul(symbol_size_usize)
1230        .ok_or_else(|| FrankenError::DatabaseCorrupt {
1231            detail: "reconstruction size overflow".to_owned(),
1232        })?;
1233    let mut out = vec![0_u8; total_bytes];
1234    let mut seen = vec![false; source_symbols_usize];
1235
1236    for record in records {
1237        if u64::from(record.esi) >= source_symbols {
1238            continue;
1239        }
1240        if record.oti != first_oti {
1241            return Err(FrankenError::DatabaseCorrupt {
1242                detail: "inconsistent OTI across object symbols".to_owned(),
1243            });
1244        }
1245        let idx = u32_to_usize(record.esi, "esi")?;
1246        let start =
1247            idx.checked_mul(symbol_size_usize)
1248                .ok_or_else(|| FrankenError::DatabaseCorrupt {
1249                    detail: "symbol offset overflow".to_owned(),
1250                })?;
1251        let end = checked_add(start, symbol_size_usize, "symbol_end")?;
1252        if end > out.len() {
1253            return Err(FrankenError::DatabaseCorrupt {
1254                detail: "symbol write out of bounds during reconstruction".to_owned(),
1255            });
1256        }
1257        if record.symbol_data.len() != symbol_size_usize {
1258            return Err(FrankenError::DatabaseCorrupt {
1259                detail: "symbol size does not match OTI.t".to_owned(),
1260            });
1261        }
1262        out[start..end].copy_from_slice(&record.symbol_data);
1263        seen[idx] = true;
1264    }
1265
1266    if !seen.iter().all(|bit| *bit) {
1267        return Err(FrankenError::DatabaseCorrupt {
1268            detail: "insufficient source symbols to reconstruct object payload".to_owned(),
1269        });
1270    }
1271    out.truncate(transfer_len_usize);
1272    Ok(out)
1273}
1274
1275fn fetch_marker_record(markers_dir: &Path, commit_seq: u64) -> Result<CommitMarkerRecord> {
1276    let segment_id = segment_id_for_commit_seq(commit_seq);
1277    let segment_path = markers_dir.join(format!("segment-{segment_id:06}.log"));
1278    let bytes = fs::read(&segment_path)?;
1279    if bytes.len() < MARKER_SEGMENT_HEADER_BYTES {
1280        return Err(FrankenError::DatabaseCorrupt {
1281            detail: format!(
1282                "marker segment {} shorter than header: {} bytes",
1283                segment_path.display(),
1284                bytes.len()
1285            ),
1286        });
1287    }
1288    let header =
1289        MarkerSegmentHeader::decode(&bytes[..MARKER_SEGMENT_HEADER_BYTES]).map_err(|err| {
1290            FrankenError::DatabaseCorrupt {
1291                detail: format!(
1292                    "marker header decode failed for {}: {err}",
1293                    segment_path.display()
1294                ),
1295            }
1296        })?;
1297    let records = recover_valid_prefix(&bytes).map_err(|err| FrankenError::DatabaseCorrupt {
1298        detail: format!(
1299            "marker segment recover failed for {}: {err}",
1300            segment_path.display()
1301        ),
1302    })?;
1303
1304    if commit_seq < header.start_commit_seq {
1305        return Err(FrankenError::DatabaseCorrupt {
1306            detail: format!(
1307                "commit_seq {commit_seq} precedes segment start {}",
1308                header.start_commit_seq
1309            ),
1310        });
1311    }
1312    let index_u64 = commit_seq - header.start_commit_seq;
1313    let index = u64_to_usize(index_u64, "marker_index")?;
1314    let Some(record) = records.get(index) else {
1315        return Err(FrankenError::DatabaseCorrupt {
1316            detail: format!(
1317                "marker for commit_seq {commit_seq} missing in segment {}",
1318                segment_path.display()
1319            ),
1320        });
1321    };
1322    if !record.verify_marker_id() {
1323        return Err(FrankenError::DatabaseCorrupt {
1324            detail: "marker_id verification failed (reason=marker_mismatch)".to_owned(),
1325        });
1326    }
1327    if index > 0 {
1328        for i in 1..=index {
1329            if records[i].prev_marker_id != records[i - 1].marker_id {
1330                return Err(FrankenError::DatabaseCorrupt {
1331                    detail: format!("marker hash chain gap at index {i} (reason=marker_chain_gap)"),
1332                });
1333            }
1334        }
1335    }
1336    Ok(record.clone())
1337}
1338
1339fn scan_latest_marker(markers_dir: &Path) -> Result<Option<CommitMarkerRecord>> {
1340    let segments = sorted_segment_paths(markers_dir)?;
1341    let mut best: Option<CommitMarkerRecord> = None;
1342    for (_, segment_path) in segments {
1343        let bytes = fs::read(&segment_path)?;
1344        if bytes.len() < MARKER_SEGMENT_HEADER_BYTES {
1345            continue;
1346        }
1347        let Ok(records) = recover_valid_prefix(&bytes) else {
1348            continue;
1349        };
1350        if let Some(last) = records.last() {
1351            let replace = best
1352                .as_ref()
1353                .is_none_or(|existing| last.commit_seq > existing.commit_seq);
1354            if replace {
1355                best = Some(last.clone());
1356            }
1357        }
1358    }
1359    Ok(best)
1360}
1361
1362fn sorted_segment_paths(dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1363    if !dir.exists() {
1364        return Ok(Vec::new());
1365    }
1366    let mut out = Vec::new();
1367    for entry in fs::read_dir(dir)? {
1368        let entry = entry?;
1369        if !entry.file_type()?.is_file() {
1370            continue;
1371        }
1372        let name_os = entry.file_name();
1373        let Some(name) = name_os.to_str() else {
1374            continue;
1375        };
1376        let Some(segment_id) = parse_segment_id(name) else {
1377            continue;
1378        };
1379        out.push((segment_id, entry.path()));
1380    }
1381    out.sort_by_key(|(segment_id, _)| *segment_id);
1382    Ok(out)
1383}
1384
1385fn parse_segment_id(name: &str) -> Option<u64> {
1386    let body = name.strip_prefix("segment-")?.strip_suffix(".log")?;
1387    body.parse::<u64>().ok()
1388}
1389
1390fn read_object_id_at(bytes: &[u8], offset: usize, field: &str) -> Result<ObjectId> {
1391    let end = checked_add(offset, 16, field)?;
1392    if end > bytes.len() {
1393        return Err(FrankenError::DatabaseCorrupt {
1394            detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
1395        });
1396    }
1397    let mut raw = [0_u8; 16];
1398    raw.copy_from_slice(&bytes[offset..end]);
1399    Ok(ObjectId::from_bytes(raw))
1400}
1401
1402fn read_u32_le_at(bytes: &[u8], offset: usize, field: &str) -> Result<u32> {
1403    let end = checked_add(offset, 4, field)?;
1404    if end > bytes.len() {
1405        return Err(FrankenError::DatabaseCorrupt {
1406            detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
1407        });
1408    }
1409    Ok(u32::from_le_bytes(
1410        bytes[offset..end].try_into().expect("fixed 4-byte field"),
1411    ))
1412}
1413
1414fn read_u64_le_at(bytes: &[u8], offset: usize, field: &str) -> Result<u64> {
1415    let end = checked_add(offset, 8, field)?;
1416    if end > bytes.len() {
1417        return Err(FrankenError::DatabaseCorrupt {
1418            detail: format!("{field} out of bounds: end={end}, len={}", bytes.len()),
1419        });
1420    }
1421    Ok(u64::from_le_bytes(
1422        bytes[offset..end].try_into().expect("fixed 8-byte field"),
1423    ))
1424}
1425
1426fn u32_to_usize(value: u32, field: &str) -> Result<usize> {
1427    usize::try_from(value).map_err(|_| FrankenError::OutOfRange {
1428        what: field.to_owned(),
1429        value: value.to_string(),
1430    })
1431}
1432
1433fn u64_to_usize(value: u64, field: &str) -> Result<usize> {
1434    usize::try_from(value).map_err(|_| FrankenError::OutOfRange {
1435        what: field.to_owned(),
1436        value: value.to_string(),
1437    })
1438}
1439
1440fn checked_add(lhs: usize, rhs: usize, field: &str) -> Result<usize> {
1441    lhs.checked_add(rhs)
1442        .ok_or_else(|| FrankenError::DatabaseCorrupt {
1443            detail: format!("{field} overflow"),
1444        })
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449    use std::fs;
1450    use std::path::Path;
1451
1452    use crate::commit_marker::MarkerSegmentHeader;
1453    use crate::symbol_log::{SymbolSegmentHeader, append_symbol_record, ensure_symbol_segment};
1454    use fsqlite_types::{ObjectId, Oti, SymbolRecord, SymbolRecordFlags, SymbolValidityWindow};
1455    use tempfile::TempDir;
1456
1457    use super::*;
1458
1459    const BEAD_ID: &str = "bd-3go.12";
1460
1461    // ── test_epoch_id_monotone ──────────────────────────────────────────
1462
1463    #[test]
1464    fn test_epoch_id_monotone() {
1465        let clock = EpochClock::new(EpochId::ZERO);
1466        let mut prev = clock.current();
1467        for i in 0..100 {
1468            let next_result = clock.increment();
1469            assert!(
1470                next_result.is_ok(),
1471                "bead_id={BEAD_ID} case=epoch_monotone_increment_{i} err={next_result:?}"
1472            );
1473            let Ok(next) = next_result else {
1474                return;
1475            };
1476            assert!(
1477                next > prev,
1478                "bead_id={BEAD_ID} case=epoch_monotone prev={} next={}",
1479                prev.get(),
1480                next.get()
1481            );
1482            prev = next;
1483        }
1484        assert_eq!(
1485            clock.current().get(),
1486            100,
1487            "bead_id={BEAD_ID} case=epoch_monotone_final"
1488        );
1489    }
1490
1491    // ── test_symbol_validity_window_rejects_future ──────────────────────
1492
1493    #[test]
1494    fn test_symbol_validity_window_rejects_future() {
1495        let current = EpochId::new(5);
1496        let window = SymbolValidityWindow::default_window(current);
1497        let future = EpochId::new(6);
1498        assert!(
1499            !window.contains(future),
1500            "bead_id={BEAD_ID} case=validity_window_rejects_future"
1501        );
1502        let result = validate_symbol_epoch(future, &window);
1503        assert!(
1504            result.is_err(),
1505            "bead_id={BEAD_ID} case=validity_window_future_epoch_error"
1506        );
1507    }
1508
1509    // ── test_symbol_validity_window_accepts_past ────────────────────────
1510
1511    #[test]
1512    fn test_symbol_validity_window_accepts_past() {
1513        let current = EpochId::new(10);
1514        let window = SymbolValidityWindow::default_window(current);
1515        for past in [0, 1, 5, 9, 10] {
1516            let epoch = EpochId::new(past);
1517            assert!(
1518                window.contains(epoch),
1519                "bead_id={BEAD_ID} case=validity_window_accepts_past epoch={past}"
1520            );
1521            let result = validate_symbol_epoch(epoch, &window);
1522            assert!(
1523                result.is_ok(),
1524                "bead_id={BEAD_ID} case=validity_window_past_epoch_ok epoch={past}"
1525            );
1526        }
1527    }
1528
1529    // ── test_epoch_scoped_key_derivation ────────────────────────────────
1530
1531    #[test]
1532    fn test_epoch_scoped_key_derivation() {
1533        let master_key = [0xAB_u8; 32];
1534        let key_5 = derive_epoch_auth_key(&master_key, EpochId::new(5));
1535        let key_6 = derive_epoch_auth_key(&master_key, EpochId::new(6));
1536        assert_ne!(
1537            key_5, key_6,
1538            "bead_id={BEAD_ID} case=epoch_keys_differ_across_epochs"
1539        );
1540        // Deterministic: same inputs produce the same key.
1541        let key_5_again = derive_epoch_auth_key(&master_key, EpochId::new(5));
1542        assert_eq!(
1543            key_5, key_5_again,
1544            "bead_id={BEAD_ID} case=epoch_key_deterministic"
1545        );
1546    }
1547
1548    // ── test_epoch_key_derivation_domain_separation ─────────────────────
1549
1550    #[test]
1551    fn test_epoch_key_derivation_domain_separation() {
1552        let dek = [0x42_u8; 32];
1553        let master_key = derive_master_key_from_dek(&dek);
1554        // Master key MUST differ from raw DEK (domain separation).
1555        assert_ne!(
1556            master_key, dek,
1557            "bead_id={BEAD_ID} case=master_key_differs_from_dek"
1558        );
1559        // Auth key for epoch 0 MUST differ from master key.
1560        let auth_key = derive_epoch_auth_key(&master_key, EpochId::ZERO);
1561        assert_ne!(
1562            auth_key.as_bytes(),
1563            &master_key,
1564            "bead_id={BEAD_ID} case=auth_key_differs_from_master"
1565        );
1566    }
1567
1568    // ── test_epoch_transition_barrier_all_arrive ────────────────────────
1569
1570    #[test]
1571    fn test_epoch_transition_barrier_all_arrive() {
1572        let clock = EpochClock::new(EpochId::new(5));
1573        let barrier = EpochBarrier::new(EpochId::new(5), 4);
1574
1575        assert!(!barrier.arrive("WriteCoordinator"));
1576        assert!(!barrier.arrive("SymbolStore"));
1577        assert!(!barrier.arrive("Replicator"));
1578        assert!(barrier.arrive("CheckpointGc"));
1579
1580        assert!(
1581            barrier.is_complete(),
1582            "bead_id={BEAD_ID} case=barrier_complete"
1583        );
1584
1585        let outcome = barrier.resolve(&clock).expect("resolve must succeed");
1586        assert_eq!(
1587            outcome,
1588            BarrierOutcome::AllArrived {
1589                new_epoch: EpochId::new(6),
1590            },
1591            "bead_id={BEAD_ID} case=barrier_all_arrived_epoch_incremented"
1592        );
1593        assert_eq!(
1594            clock.current().get(),
1595            6,
1596            "bead_id={BEAD_ID} case=clock_advanced_after_barrier"
1597        );
1598    }
1599
1600    // ── test_epoch_transition_barrier_timeout ───────────────────────────
1601
1602    #[test]
1603    fn test_epoch_transition_barrier_timeout() {
1604        let clock = EpochClock::new(EpochId::new(5));
1605        let barrier = EpochBarrier::new(EpochId::new(5), 4);
1606
1607        barrier.arrive("WriteCoordinator");
1608        barrier.arrive("SymbolStore");
1609        barrier.arrive("Replicator");
1610        // CheckpointGc does NOT arrive.
1611
1612        let outcome = barrier.resolve(&clock).expect("resolve must succeed");
1613        assert_eq!(
1614            outcome,
1615            BarrierOutcome::Timeout {
1616                arrived: 3,
1617                expected: 4,
1618            },
1619            "bead_id={BEAD_ID} case=barrier_timeout_epoch_unchanged"
1620        );
1621        assert_eq!(
1622            clock.current().get(),
1623            5,
1624            "bead_id={BEAD_ID} case=clock_unchanged_after_timeout"
1625        );
1626    }
1627
1628    // ── test_epoch_bootstrap_from_ecs_root ──────────────────────────────
1629
1630    #[test]
1631    fn test_epoch_bootstrap_from_ecs_root() {
1632        // Before RootManifest is decoded, use EcsRootPointer.ecs_epoch as
1633        // provisional upper bound. Symbols with epoch_id > root_epoch must
1634        // be rejected.
1635        let root_epoch = EpochId::new(7);
1636        let window = SymbolValidityWindow::default_window(root_epoch);
1637
1638        // Epoch 8 (future) → rejected.
1639        assert!(
1640            !window.contains(EpochId::new(8)),
1641            "bead_id={BEAD_ID} case=bootstrap_rejects_future"
1642        );
1643        // Epoch 7 (current) → accepted.
1644        assert!(
1645            window.contains(EpochId::new(7)),
1646            "bead_id={BEAD_ID} case=bootstrap_accepts_current"
1647        );
1648        // Epoch 0 (past) → accepted.
1649        assert!(
1650            window.contains(EpochId::ZERO),
1651            "bead_id={BEAD_ID} case=bootstrap_accepts_zero"
1652        );
1653    }
1654
1655    // ── test_barrier_cancelled ──────────────────────────────────────────
1656
1657    #[test]
1658    fn test_barrier_cancelled() {
1659        let clock = EpochClock::new(EpochId::new(3));
1660        let barrier = EpochBarrier::new(EpochId::new(3), 2);
1661
1662        barrier.arrive("WriteCoordinator");
1663        barrier.cancel();
1664
1665        let outcome = barrier.resolve(&clock).expect("resolve must succeed");
1666        assert_eq!(
1667            outcome,
1668            BarrierOutcome::Cancelled,
1669            "bead_id={BEAD_ID} case=barrier_cancelled_epoch_unchanged"
1670        );
1671        assert_eq!(
1672            clock.current().get(),
1673            3,
1674            "bead_id={BEAD_ID} case=clock_unchanged_after_cancel"
1675        );
1676    }
1677
1678    // ── test_epoch_clock_store_and_recover ──────────────────────────────
1679
1680    #[test]
1681    fn test_epoch_clock_store_and_recover() {
1682        let clock = EpochClock::new(EpochId::ZERO);
1683        clock.store(EpochId::new(42));
1684        assert_eq!(
1685            clock.current().get(),
1686            42,
1687            "bead_id={BEAD_ID} case=clock_store_recovery"
1688        );
1689        let next = clock.increment().expect("increment after store");
1690        assert_eq!(
1691            next.get(),
1692            43,
1693            "bead_id={BEAD_ID} case=clock_increment_after_store"
1694        );
1695    }
1696
1697    // ── test_validity_window_boundary ───────────────────────────────────
1698
1699    #[test]
1700    fn test_validity_window_boundary() {
1701        let window = SymbolValidityWindow::new(EpochId::new(3), EpochId::new(7));
1702        assert!(
1703            !window.contains(EpochId::new(2)),
1704            "bead_id={BEAD_ID} case=window_below_lower_bound"
1705        );
1706        assert!(
1707            window.contains(EpochId::new(3)),
1708            "bead_id={BEAD_ID} case=window_at_lower_bound"
1709        );
1710        assert!(
1711            window.contains(EpochId::new(5)),
1712            "bead_id={BEAD_ID} case=window_within_bounds"
1713        );
1714        assert!(
1715            window.contains(EpochId::new(7)),
1716            "bead_id={BEAD_ID} case=window_at_upper_bound"
1717        );
1718        assert!(
1719            !window.contains(EpochId::new(8)),
1720            "bead_id={BEAD_ID} case=window_above_upper_bound"
1721        );
1722    }
1723
1724    const ROOT_BEAD_ID: &str = "bd-1hi.25";
1725
1726    fn make_object_id(seed: u8) -> ObjectId {
1727        ObjectId::from_bytes([seed; 16])
1728    }
1729
1730    fn test_master_key() -> [u8; 32] {
1731        [0xA5; 32]
1732    }
1733
1734    fn create_layout() -> (TempDir, NativeBootstrapLayout) {
1735        let temp_dir = TempDir::new().expect("tempdir");
1736        let layout = NativeBootstrapLayout::new(temp_dir.path().join("ecs"));
1737        std::fs::create_dir_all(layout.symbols_dir()).expect("create symbols dir");
1738        std::fs::create_dir_all(layout.markers_dir()).expect("create markers dir");
1739        (temp_dir, layout)
1740    }
1741
1742    fn write_single_symbol_object(
1743        symbols_dir: &Path,
1744        segment_id: u64,
1745        epoch_id: EpochId,
1746        object_id: ObjectId,
1747        payload: &[u8],
1748    ) {
1749        let header =
1750            SymbolSegmentHeader::new(segment_id, epoch_id.get(), 1_700_000_000 + segment_id);
1751        let segment_path = symbols_dir.join(format!("segment-{segment_id:06}.log"));
1752        ensure_symbol_segment(&segment_path, header).expect("ensure symbol segment");
1753        let symbol_size = u32::try_from(payload.len()).expect("payload fits u32");
1754        let oti = Oti {
1755            f: u64::from(symbol_size),
1756            al: 1,
1757            t: symbol_size,
1758            z: 1,
1759            n: 1,
1760        };
1761        let record = SymbolRecord::new(
1762            object_id,
1763            oti,
1764            0,
1765            payload.to_vec(),
1766            SymbolRecordFlags::SYSTEMATIC_RUN_START,
1767        );
1768        append_symbol_record(symbols_dir, header, &record).expect("append symbol");
1769    }
1770
1771    fn write_marker_segment(
1772        markers_dir: &Path,
1773        start_commit_seq: u64,
1774        records: &[CommitMarkerRecord],
1775    ) {
1776        let segment_id = segment_id_for_commit_seq(start_commit_seq);
1777        let header = MarkerSegmentHeader::new(segment_id, start_commit_seq);
1778        let mut bytes = Vec::from(header.encode());
1779        for record in records {
1780            bytes.extend_from_slice(&record.encode());
1781        }
1782        let segment_path = markers_dir.join(format!("segment-{segment_id:06}.log"));
1783        std::fs::write(segment_path, bytes).expect("write marker segment");
1784    }
1785
1786    fn make_marker(commit_seq: u64, prev: [u8; 16], salt: u8) -> CommitMarkerRecord {
1787        CommitMarkerRecord::new(
1788            commit_seq,
1789            1_800_000_000_000_000_000 + commit_seq,
1790            [salt; 16],
1791            [salt.wrapping_add(1); 16],
1792            prev,
1793        )
1794    }
1795
1796    fn make_manifest(
1797        database_name: &str,
1798        current_commit: ObjectId,
1799        commit_seq: u64,
1800        schema_snapshot: ObjectId,
1801        schema_epoch: u64,
1802        ecs_epoch: EpochId,
1803        checkpoint_base: ObjectId,
1804    ) -> RootManifest {
1805        RootManifest {
1806            database_name: database_name.to_owned(),
1807            current_commit,
1808            commit_seq,
1809            schema_snapshot,
1810            schema_epoch,
1811            ecs_epoch,
1812            checkpoint_base,
1813            gc_horizon: commit_seq,
1814            created_at: 1_800_000_000,
1815            updated_at: 1_800_000_123,
1816        }
1817    }
1818
1819    fn must_err_contains<T: std::fmt::Debug>(result: Result<T>, needle: &str, case: &str) {
1820        let err = result.expect_err(case);
1821        let detail = err.to_string();
1822        assert!(
1823            detail.contains(needle),
1824            "bead_id={ROOT_BEAD_ID} case={case} expected_substring={needle} actual={detail}"
1825        );
1826    }
1827
1828    fn write_bootstrap_objects(
1829        layout: &NativeBootstrapLayout,
1830        root_epoch: EpochId,
1831        manifest_id: ObjectId,
1832        manifest: &RootManifest,
1833        schema_payload: &[u8],
1834        checkpoint_payload: &[u8],
1835        markers: &[CommitMarkerRecord],
1836    ) {
1837        let manifest_bytes = manifest.encode().expect("encode manifest");
1838        write_single_symbol_object(
1839            layout.symbols_dir().as_path(),
1840            1,
1841            root_epoch,
1842            manifest_id,
1843            &manifest_bytes,
1844        );
1845        write_single_symbol_object(
1846            layout.symbols_dir().as_path(),
1847            1,
1848            root_epoch,
1849            manifest.schema_snapshot,
1850            schema_payload,
1851        );
1852        write_single_symbol_object(
1853            layout.symbols_dir().as_path(),
1854            1,
1855            root_epoch,
1856            manifest.checkpoint_base,
1857            checkpoint_payload,
1858        );
1859        if let Some(first) = markers.first() {
1860            write_marker_segment(layout.markers_dir().as_path(), first.commit_seq, markers);
1861        }
1862    }
1863
1864    fn write_valid_bootstrap_fixture(
1865        layout: &NativeBootstrapLayout,
1866        root_epoch: EpochId,
1867    ) -> (ObjectId, RootManifest, CommitMarkerRecord, Vec<u8>, Vec<u8>) {
1868        let manifest_id = make_object_id(0x70);
1869        let schema_payload = b"schema-cache-v1".to_vec();
1870        let checkpoint_payload = b"checkpoint-cache-v1".to_vec();
1871        let marker = make_marker(0, [0_u8; 16], 0x71);
1872        let manifest = make_manifest(
1873            "db-valid",
1874            ObjectId::from_bytes(marker.marker_id),
1875            marker.commit_seq,
1876            make_object_id(0x72),
1877            1,
1878            root_epoch,
1879            make_object_id(0x73),
1880        );
1881        write_bootstrap_objects(
1882            layout,
1883            root_epoch,
1884            manifest_id,
1885            &manifest,
1886            &schema_payload,
1887            &checkpoint_payload,
1888            std::slice::from_ref(&marker),
1889        );
1890        (
1891            manifest_id,
1892            manifest,
1893            marker,
1894            schema_payload,
1895            checkpoint_payload,
1896        )
1897    }
1898
1899    #[test]
1900    fn test_ecs_root_pointer_encode_decode() {
1901        let pointer = EcsRootPointer::unauthed(make_object_id(0x11), EpochId::new(7));
1902        let encoded = pointer.encode();
1903        assert_eq!(encoded.len(), ECS_ROOT_POINTER_BYTES);
1904        let decoded = EcsRootPointer::decode(&encoded, false, None).expect("decode root pointer");
1905        assert_eq!(
1906            decoded, pointer,
1907            "bead_id={ROOT_BEAD_ID} case=root_roundtrip"
1908        );
1909    }
1910
1911    #[test]
1912    fn test_ecs_root_pointer_magic() {
1913        let pointer = EcsRootPointer::unauthed(make_object_id(0x22), EpochId::new(3));
1914        let mut encoded = pointer.encode();
1915        encoded[0] = b'X';
1916        let result = EcsRootPointer::decode(&encoded, false, None);
1917        assert!(
1918            result.is_err(),
1919            "bead_id={ROOT_BEAD_ID} case=root_bad_magic"
1920        );
1921    }
1922
1923    #[test]
1924    fn test_ecs_root_pointer_checksum_tamper() {
1925        let pointer = EcsRootPointer::unauthed(make_object_id(0x33), EpochId::new(9));
1926        let mut encoded = pointer.encode();
1927        encoded[9] ^= 0xFF;
1928        let result = EcsRootPointer::decode(&encoded, false, None);
1929        assert!(
1930            result.is_err(),
1931            "bead_id={ROOT_BEAD_ID} case=root_checksum_tamper"
1932        );
1933    }
1934
1935    #[test]
1936    fn test_root_auth_tag_verification() {
1937        let key = test_master_key();
1938        let pointer = EcsRootPointer::authed(make_object_id(0x44), EpochId::new(12), &key);
1939        let encoded = pointer.encode();
1940        let decoded =
1941            EcsRootPointer::decode(&encoded, true, Some(&key)).expect("auth decode succeeds");
1942        assert_eq!(decoded, pointer);
1943
1944        let mut tampered = encoded;
1945        tampered[40] ^= 0x01;
1946        let result = EcsRootPointer::decode(&tampered, true, Some(&key));
1947        assert!(
1948            result.is_err(),
1949            "bead_id={ROOT_BEAD_ID} case=root_auth_tamper"
1950        );
1951    }
1952
1953    #[test]
1954    fn test_root_auth_tag_zero_when_off() {
1955        let pointer = build_root_pointer(make_object_id(0x55), EpochId::new(2), false, None)
1956            .expect("build unauthed root");
1957        assert_eq!(pointer.root_auth_tag, [0_u8; 16]);
1958        let decoded =
1959            EcsRootPointer::decode(&pointer.encode(), false, None).expect("decode off mode");
1960        assert_eq!(decoded.root_auth_tag, [0_u8; 16]);
1961    }
1962
1963    #[test]
1964    fn test_root_manifest_encode_decode() {
1965        let manifest = make_manifest(
1966            "db-main",
1967            make_object_id(0x10),
1968            5,
1969            make_object_id(0x20),
1970            3,
1971            EpochId::new(7),
1972            make_object_id(0x30),
1973        );
1974        let encoded = manifest.encode().expect("encode manifest");
1975        let decoded = RootManifest::decode(&encoded).expect("decode manifest");
1976        assert_eq!(decoded, manifest);
1977    }
1978
1979    #[test]
1980    fn test_root_manifest_magic() {
1981        let manifest = make_manifest(
1982            "db-main",
1983            make_object_id(0x10),
1984            5,
1985            make_object_id(0x20),
1986            3,
1987            EpochId::new(7),
1988            make_object_id(0x30),
1989        );
1990        let mut encoded = manifest.encode().expect("encode manifest");
1991        encoded[0] = b'X';
1992        assert!(
1993            RootManifest::decode(&encoded).is_err(),
1994            "bead_id={ROOT_BEAD_ID} case=manifest_bad_magic"
1995        );
1996    }
1997
1998    #[test]
1999    fn test_bootstrap_step_4_epoch_guard() {
2000        let (_tmp, layout) = create_layout();
2001        let manifest_id = make_object_id(0x66);
2002        let schema_id = make_object_id(0x67);
2003        let checkpoint_id = make_object_id(0x68);
2004        let marker = make_marker(0, [0_u8; 16], 0x60);
2005        let manifest = make_manifest(
2006            "future-segment",
2007            ObjectId::from_bytes(marker.marker_id),
2008            marker.commit_seq,
2009            schema_id,
2010            1,
2011            EpochId::new(3),
2012            checkpoint_id,
2013        );
2014        let manifest_bytes = manifest.encode().expect("encode manifest");
2015        write_single_symbol_object(
2016            layout.symbols_dir().as_path(),
2017            1,
2018            EpochId::new(4),
2019            manifest_id,
2020            &manifest_bytes,
2021        );
2022        let pointer =
2023            build_root_pointer(manifest_id, EpochId::new(3), false, None).expect("build root");
2024        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2025
2026        let result = bootstrap_native_mode(&layout, false, None);
2027        assert!(
2028            result.is_err(),
2029            "bead_id={ROOT_BEAD_ID} case=future_epoch_guard"
2030        );
2031    }
2032
2033    #[test]
2034    fn test_bootstrap_step_5_epoch_invariant() {
2035        let (_tmp, layout) = create_layout();
2036        let root_epoch = EpochId::new(7);
2037        let manifest_id = make_object_id(0x74);
2038        let marker = make_marker(0, [0_u8; 16], 0x75);
2039        let manifest = make_manifest(
2040            "epoch-mismatch",
2041            ObjectId::from_bytes(marker.marker_id),
2042            marker.commit_seq,
2043            make_object_id(0x76),
2044            1,
2045            EpochId::new(8),
2046            make_object_id(0x77),
2047        );
2048        write_bootstrap_objects(
2049            &layout,
2050            root_epoch,
2051            manifest_id,
2052            &manifest,
2053            b"schema",
2054            b"checkpoint",
2055            &[marker],
2056        );
2057        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2058        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2059        must_err_contains(
2060            bootstrap_native_mode(&layout, false, None),
2061            "epoch_mismatch",
2062            "bootstrap_step_5_epoch_invariant",
2063        );
2064    }
2065
2066    #[test]
2067    fn test_bootstrap_step_6_marker_verification() {
2068        let (_tmp, layout) = create_layout();
2069        let root_epoch = EpochId::new(9);
2070        let manifest_id = make_object_id(0x78);
2071        let marker = make_marker(0, [0_u8; 16], 0x79);
2072        let manifest = make_manifest(
2073            "marker-mismatch",
2074            make_object_id(0x7A),
2075            marker.commit_seq,
2076            make_object_id(0x7B),
2077            1,
2078            root_epoch,
2079            make_object_id(0x7C),
2080        );
2081        write_bootstrap_objects(
2082            &layout,
2083            root_epoch,
2084            manifest_id,
2085            &manifest,
2086            b"schema",
2087            b"checkpoint",
2088            &[marker],
2089        );
2090        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2091        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2092        must_err_contains(
2093            bootstrap_native_mode(&layout, false, None),
2094            "marker_mismatch",
2095            "bootstrap_step_6_marker_verification",
2096        );
2097    }
2098
2099    #[test]
2100    fn test_bootstrap_full_sequence() {
2101        let (_tmp, layout) = create_layout();
2102        let root_epoch = EpochId::new(11);
2103        let (manifest_id, manifest, marker, schema_payload, checkpoint_payload) =
2104            write_valid_bootstrap_fixture(&layout, root_epoch);
2105        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2106        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2107
2108        let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap ok");
2109        assert_eq!(state.root_pointer, pointer);
2110        assert_eq!(state.manifest, manifest);
2111        assert_eq!(state.latest_marker, marker);
2112        assert_eq!(state.schema_snapshot_bytes, schema_payload);
2113        assert_eq!(state.checkpoint_base_bytes, checkpoint_payload);
2114    }
2115
2116    #[test]
2117    fn test_bootstrap_corrupted_root_recovery() {
2118        let (_tmp, layout) = create_layout();
2119        let root_epoch = EpochId::new(12);
2120        let (_manifest_id, manifest, marker, schema_payload, checkpoint_payload) =
2121            write_valid_bootstrap_fixture(&layout, root_epoch);
2122        fs::write(layout.root_path(), [0xFF_u8; 7]).expect("write corrupt root");
2123
2124        let recovered = bootstrap_native_mode_with_recovery(&layout, false, None).expect("recover");
2125        assert_eq!(recovered.manifest, manifest);
2126        assert_eq!(recovered.latest_marker, marker);
2127        assert_eq!(recovered.schema_snapshot_bytes, schema_payload);
2128        assert_eq!(recovered.checkpoint_base_bytes, checkpoint_payload);
2129        let persisted =
2130            read_root_pointer(&layout.root_path(), false, None).expect("read recovered");
2131        assert_eq!(
2132            persisted.manifest_object_id,
2133            recovered.root_pointer.manifest_object_id
2134        );
2135        assert_eq!(persisted.ecs_epoch, recovered.root_pointer.ecs_epoch);
2136    }
2137
2138    #[test]
2139    fn test_crash_safe_root_update() {
2140        let (_tmp, layout) = create_layout();
2141        let pointer_a = EcsRootPointer::unauthed(make_object_id(0x80), EpochId::new(1));
2142        let pointer_b = EcsRootPointer::unauthed(make_object_id(0x81), EpochId::new(2));
2143        write_root_pointer_atomic(&layout.root_path(), pointer_a).expect("write A");
2144        write_root_pointer_atomic(&layout.root_path(), pointer_b).expect("write B");
2145        let decoded = read_root_pointer(&layout.root_path(), false, None).expect("decode");
2146        assert_eq!(
2147            decoded, pointer_b,
2148            "bead_id={ROOT_BEAD_ID} case=root_atomic_swap"
2149        );
2150        let entries = fs::read_dir(layout.ecs_dir.as_path()).expect("list ecs dir");
2151        for entry in entries {
2152            let entry = entry.expect("entry");
2153            let name = entry.file_name();
2154            let name = name.to_string_lossy();
2155            assert!(
2156                !name.starts_with(".root.tmp."),
2157                "bead_id={ROOT_BEAD_ID} case=temp_root_file_leaked file={name}"
2158            );
2159        }
2160    }
2161
2162    #[test]
2163    fn prop_root_pointer_roundtrip() {
2164        let key = test_master_key();
2165        for seed in [0_u8, 1, 17, 99, 255] {
2166            for epoch in [0_u64, 1, 2, 17, 255, 4_096, 1 << 20] {
2167                let id = make_object_id(seed);
2168                let plain = EcsRootPointer::unauthed(id, EpochId::new(epoch));
2169                let plain_roundtrip =
2170                    EcsRootPointer::decode(&plain.encode(), false, None).expect("plain decode");
2171                assert_eq!(plain_roundtrip, plain);
2172                let authed = EcsRootPointer::authed(id, EpochId::new(epoch), &key);
2173                let authed_roundtrip = EcsRootPointer::decode(&authed.encode(), true, Some(&key))
2174                    .expect("auth decode");
2175                assert_eq!(authed_roundtrip, authed);
2176            }
2177        }
2178    }
2179
2180    #[test]
2181    fn test_bootstrap_rejects_marker_chain_gap() {
2182        let (_tmp, layout) = create_layout();
2183        let root_epoch = EpochId::new(13);
2184        let manifest_id = make_object_id(0x82);
2185        let m0 = make_marker(0, [0_u8; 16], 0x83);
2186        let m1 = make_marker(1, m0.marker_id, 0x84);
2187        let m2 = make_marker(2, [0xEE_u8; 16], 0x85);
2188        let manifest = make_manifest(
2189            "marker-gap",
2190            ObjectId::from_bytes(m2.marker_id),
2191            m2.commit_seq,
2192            make_object_id(0x86),
2193            1,
2194            root_epoch,
2195            make_object_id(0x87),
2196        );
2197        write_bootstrap_objects(
2198            &layout,
2199            root_epoch,
2200            manifest_id,
2201            &manifest,
2202            b"schema-gap",
2203            b"checkpoint-gap",
2204            &[m0, m1, m2],
2205        );
2206        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2207        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2208        must_err_contains(
2209            bootstrap_native_mode(&layout, false, None),
2210            "marker_chain_gap",
2211            "bootstrap_rejects_marker_chain_gap",
2212        );
2213    }
2214
2215    #[test]
2216    fn test_bootstrap_schema_snapshot_loads() {
2217        let (_tmp, layout) = create_layout();
2218        let root_epoch = EpochId::new(14);
2219        let (manifest_id, _manifest, _marker, schema_payload, _checkpoint_payload) =
2220            write_valid_bootstrap_fixture(&layout, root_epoch);
2221        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2222        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2223        let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap");
2224        assert_eq!(state.schema_snapshot_bytes, schema_payload);
2225    }
2226
2227    #[test]
2228    fn test_bootstrap_checkpoint_base_warms_cache() {
2229        let (_tmp, layout) = create_layout();
2230        let root_epoch = EpochId::new(15);
2231        let (manifest_id, _manifest, _marker, _schema_payload, checkpoint_payload) =
2232            write_valid_bootstrap_fixture(&layout, root_epoch);
2233        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2234        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2235        let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap");
2236        assert_eq!(state.checkpoint_base_bytes, checkpoint_payload);
2237    }
2238
2239    #[test]
2240    fn test_bootstrap_happy_path_from_root() {
2241        test_bootstrap_full_sequence();
2242    }
2243
2244    #[test]
2245    fn test_bootstrap_corrupt_root_pointer_recovers_by_scan() {
2246        test_bootstrap_corrupted_root_recovery();
2247    }
2248
2249    #[test]
2250    fn test_bootstrap_root_auth_mismatch_fails() {
2251        let (_tmp, layout) = create_layout();
2252        let root_epoch = EpochId::new(16);
2253        let (manifest_id, _manifest, _marker, _schema_payload, _checkpoint_payload) =
2254            write_valid_bootstrap_fixture(&layout, root_epoch);
2255        let good_key = test_master_key();
2256        let bad_key = [0x5A_u8; 32];
2257        let pointer =
2258            build_root_pointer(manifest_id, root_epoch, true, Some(&good_key)).expect("root");
2259        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2260        must_err_contains(
2261            bootstrap_native_mode(&layout, true, Some(&bad_key)),
2262            "auth_failed",
2263            "bootstrap_root_auth_mismatch_fails",
2264        );
2265    }
2266
2267    #[test]
2268    fn test_bootstrap_root_pointer_corrupt_checksum_fails_then_scan() {
2269        let (_tmp, layout) = create_layout();
2270        let root_epoch = EpochId::new(17);
2271        let (manifest_id, _manifest, _marker, _schema_payload, _checkpoint_payload) =
2272            write_valid_bootstrap_fixture(&layout, root_epoch);
2273        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2274        let mut root_bytes = pointer.encode();
2275        root_bytes[10] ^= 0xAA;
2276        fs::write(layout.root_path(), root_bytes).expect("write corrupt root");
2277        let state = bootstrap_native_mode_with_recovery(&layout, false, None).expect("recovered");
2278        assert_eq!(state.root_pointer.manifest_object_id, manifest_id);
2279        assert_eq!(state.root_pointer.ecs_epoch, root_epoch);
2280    }
2281
2282    #[test]
2283    fn test_e2e_native_mode_open_close_reopen() {
2284        let (_tmp, layout) = create_layout();
2285        let root_epoch = EpochId::new(18);
2286        let (manifest_id, manifest, marker, _schema_payload, _checkpoint_payload) =
2287            write_valid_bootstrap_fixture(&layout, root_epoch);
2288        let pointer = build_root_pointer(manifest_id, root_epoch, false, None).expect("root");
2289        write_root_pointer_atomic(&layout.root_path(), pointer).expect("write root");
2290
2291        let first_open = bootstrap_native_mode(&layout, false, None).expect("first open");
2292        let second_open = bootstrap_native_mode(&layout, false, None).expect("second open");
2293        assert_eq!(first_open.manifest, manifest);
2294        assert_eq!(first_open.latest_marker, marker);
2295        assert_eq!(
2296            second_open.manifest.current_commit,
2297            first_open.manifest.current_commit
2298        );
2299        assert_eq!(second_open.root_pointer, first_open.root_pointer);
2300
2301        fs::write(layout.root_path(), [0_u8; 9]).expect("corrupt root");
2302        let recovered = bootstrap_native_mode_with_recovery(&layout, false, None).expect("reopen");
2303        assert_eq!(
2304            recovered.manifest.current_commit,
2305            first_open.manifest.current_commit
2306        );
2307        assert_eq!(
2308            recovered.manifest.commit_seq,
2309            first_open.manifest.commit_seq
2310        );
2311    }
2312
2313    #[test]
2314    fn test_e2e_bootstrap_cold_start() {
2315        test_bootstrap_full_sequence();
2316    }
2317
2318    #[test]
2319    fn test_e2e_bootstrap_after_crash() {
2320        test_bootstrap_root_pointer_corrupt_checksum_fails_then_scan();
2321    }
2322
2323    #[test]
2324    fn test_e2e_bootstrap_schema_migration() {
2325        let (_tmp, layout) = create_layout();
2326        let root_epoch = EpochId::new(19);
2327        let manifest_id_v1 = make_object_id(0x88);
2328        let marker0 = make_marker(0, [0_u8; 16], 0x89);
2329        let marker1 = make_marker(1, marker0.marker_id, 0x8A);
2330        write_marker_segment(
2331            layout.markers_dir().as_path(),
2332            0,
2333            &[marker0, marker1.clone()],
2334        );
2335
2336        let schema_v1 = make_object_id(0x8B);
2337        let schema_v2 = make_object_id(0x8C);
2338        let checkpoint_id = make_object_id(0x8D);
2339        let manifest_v1 = make_manifest(
2340            "schema-v1",
2341            ObjectId::from_bytes(marker1.marker_id),
2342            1,
2343            schema_v1,
2344            1,
2345            root_epoch,
2346            checkpoint_id,
2347        );
2348        let manifest_v2 = make_manifest(
2349            "schema-v2",
2350            ObjectId::from_bytes(marker1.marker_id),
2351            1,
2352            schema_v2,
2353            2,
2354            root_epoch,
2355            checkpoint_id,
2356        );
2357        let manifest_id_v2 = make_object_id(0x8E);
2358        write_single_symbol_object(
2359            layout.symbols_dir().as_path(),
2360            1,
2361            root_epoch,
2362            manifest_id_v1,
2363            &manifest_v1.encode().expect("manifest v1"),
2364        );
2365        write_single_symbol_object(
2366            layout.symbols_dir().as_path(),
2367            1,
2368            root_epoch,
2369            manifest_id_v2,
2370            &manifest_v2.encode().expect("manifest v2"),
2371        );
2372        write_single_symbol_object(
2373            layout.symbols_dir().as_path(),
2374            1,
2375            root_epoch,
2376            schema_v1,
2377            b"schema-v1",
2378        );
2379        write_single_symbol_object(
2380            layout.symbols_dir().as_path(),
2381            1,
2382            root_epoch,
2383            schema_v2,
2384            b"schema-v2",
2385        );
2386        write_single_symbol_object(
2387            layout.symbols_dir().as_path(),
2388            1,
2389            root_epoch,
2390            checkpoint_id,
2391            b"checkpoint",
2392        );
2393
2394        let pointer_v2 = build_root_pointer(manifest_id_v2, root_epoch, false, None).expect("root");
2395        write_root_pointer_atomic(&layout.root_path(), pointer_v2).expect("write root");
2396        let state = bootstrap_native_mode(&layout, false, None).expect("bootstrap");
2397        assert_eq!(state.manifest.schema_epoch, 2);
2398        assert_eq!(state.schema_snapshot_bytes, b"schema-v2".to_vec());
2399    }
2400
2401    #[test]
2402    fn test_ecs_root_pointer_checksum_roundtrip() {
2403        test_ecs_root_pointer_encode_decode();
2404    }
2405
2406    #[test]
2407    fn test_ecs_root_pointer_auth_tag_verifies() {
2408        test_root_auth_tag_verification();
2409    }
2410
2411    #[test]
2412    fn test_bootstrap_future_epoch_guard() {
2413        test_bootstrap_step_4_epoch_guard();
2414    }
2415
2416    #[test]
2417    fn test_root_manifest_epoch_must_match_root_pointer() {
2418        test_bootstrap_step_5_epoch_invariant();
2419    }
2420
2421    #[test]
2422    fn test_bootstrap_commit_marker_matches_current_commit() {
2423        test_bootstrap_step_6_marker_verification();
2424    }
2425
2426    #[test]
2427    fn test_bd_1hi_25_unit_compliance_gate() {
2428        assert_eq!(ROOT_BOOTSTRAP_BEAD_ID, "bd-1hi.25");
2429        assert_eq!(ROOT_BOOTSTRAP_LOGGING_STANDARD, "bd-1fpm");
2430        assert_eq!(ECS_ROOT_POINTER_MAGIC, *b"FSRT");
2431        assert_eq!(ROOT_MANIFEST_MAGIC, *b"FSQLROOT");
2432    }
2433
2434    #[test]
2435    fn prop_bd_1hi_25_structure_compliance() {
2436        for name in ["segment-000001.log", "segment-999999.log"] {
2437            assert!(
2438                parse_segment_id(name).is_some(),
2439                "bead_id={ROOT_BEAD_ID} case=parse_segment_id_valid name={name}"
2440            );
2441        }
2442        for name in ["segment.log", "segment-aa.log", "other-000001.log"] {
2443            assert!(
2444                parse_segment_id(name).is_none(),
2445                "bead_id={ROOT_BEAD_ID} case=parse_segment_id_invalid name={name}"
2446            );
2447        }
2448    }
2449
2450    #[test]
2451    fn test_e2e_bd_1hi_25_compliance() {
2452        test_e2e_native_mode_open_close_reopen();
2453    }
2454}