Skip to main content

fsqlite_core/
commit_marker.rs

1//! Commit Marker Stream Format (§3.5.4 + §3.5.4.1, bd-1hi.23).
2//!
3//! The marker stream under `ecs/markers/` is the total order of commits in
4//! Native mode.  It is the authoritative, tamper-evident, seekable commit log.
5//!
6//! On-disk encoding: all fixed-width integers are **little-endian** (§3.5.1).
7//! Sizes are byte-exact — never derived from `mem::size_of::<T>()`.
8
9// ---------------------------------------------------------------------------
10// Constants
11// ---------------------------------------------------------------------------
12
13/// Magic bytes for a marker segment header: "FSMK".
14pub const MARKER_SEGMENT_MAGIC: [u8; 4] = *b"FSMK";
15
16/// Current format version.
17pub const MARKER_FORMAT_VERSION: u32 = 1;
18
19/// Byte size of [`MarkerSegmentHeader`] on disk.
20pub const MARKER_SEGMENT_HEADER_BYTES: usize = 36;
21
22/// Byte size of [`CommitMarkerRecord`] on disk.
23pub const COMMIT_MARKER_RECORD_BYTES: usize = 88;
24
25/// Default number of markers per segment (fixed rotation policy).
26pub const MARKERS_PER_SEGMENT: u64 = 1_000_000;
27
28/// Domain separation tag for marker_id computation.
29const MARKER_ID_DOMAIN: &[u8] = b"fsqlite:marker:v1";
30
31/// Size of a marker_id or object_id in bytes.
32const ID_SIZE: usize = 16;
33
34/// Byte length of the record prefix used for marker_id hashing.
35/// commit_seq(8) + commit_time_unix_ns(8) + capsule_object_id(16) +
36/// proof_object_id(16) + prev_marker_id(16) = 64.
37const RECORD_PREFIX_BYTES: usize = 64;
38
39// ---------------------------------------------------------------------------
40// Errors
41// ---------------------------------------------------------------------------
42
43/// Errors from marker stream operations.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum MarkerError {
46    /// Header buffer too short (need [`MARKER_SEGMENT_HEADER_BYTES`]).
47    HeaderTooShort,
48    /// Bad magic bytes in segment header.
49    BadMagic,
50    /// Header xxh3 checksum mismatch.
51    HeaderChecksumMismatch { expected: u64, actual: u64 },
52    /// Record buffer too short (need [`COMMIT_MARKER_RECORD_BYTES`]).
53    RecordTooShort,
54    /// Record xxh3 checksum mismatch.
55    RecordChecksumMismatch { expected: u64, actual: u64 },
56    /// Record version mismatch.
57    UnsupportedVersion { version: u32 },
58    /// Record size in header doesn't match expected.
59    RecordSizeMismatch { expected: u32, actual: u32 },
60    /// commit_seq doesn't match expected slot position.
61    CommitSeqMismatch { expected: u64, actual: u64 },
62    /// Segment data has incomplete (torn) tail.
63    TornTail {
64        complete_records: u64,
65        trailing_bytes: usize,
66    },
67}
68
69impl std::fmt::Display for MarkerError {
70    #[allow(clippy::too_many_lines)]
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::HeaderTooShort => f.write_str("marker segment header too short"),
74            Self::BadMagic => f.write_str("bad magic in marker segment header"),
75            Self::HeaderChecksumMismatch { expected, actual } => {
76                write!(
77                    f,
78                    "marker header xxh3 mismatch: expected {expected:#018X}, got {actual:#018X}"
79                )
80            }
81            Self::RecordTooShort => f.write_str("commit marker record too short"),
82            Self::RecordChecksumMismatch { expected, actual } => {
83                write!(
84                    f,
85                    "marker record xxh3 mismatch: expected {expected:#018X}, got {actual:#018X}"
86                )
87            }
88            Self::UnsupportedVersion { version } => {
89                write!(f, "unsupported marker format version: {version}")
90            }
91            Self::RecordSizeMismatch { expected, actual } => {
92                write!(
93                    f,
94                    "marker record size mismatch: expected {expected}, got {actual}"
95                )
96            }
97            Self::CommitSeqMismatch { expected, actual } => {
98                write!(f, "commit_seq mismatch: expected {expected}, got {actual}")
99            }
100            Self::TornTail {
101                complete_records,
102                trailing_bytes,
103            } => {
104                write!(
105                    f,
106                    "torn tail: {complete_records} complete records, {trailing_bytes} trailing bytes"
107                )
108            }
109        }
110    }
111}
112
113impl std::error::Error for MarkerError {}
114
115// ---------------------------------------------------------------------------
116// MarkerSegmentHeader (36 bytes)
117// ---------------------------------------------------------------------------
118
119/// On-disk segment header for the commit marker stream.
120///
121/// Layout (36 bytes, all LE):
122/// ```text
123///   magic           : [u8; 4]   — "FSMK"
124///   version         : u32       — 1
125///   segment_id      : u64       — monotonic identifier
126///   start_commit_seq: u64       — first commit_seq in this segment
127///   record_size     : u32       — bytes per record (88 in V1)
128///   header_xxh3     : u64       — xxhash3 of all preceding fields
129/// ```
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct MarkerSegmentHeader {
132    /// Monotonic segment identifier (matches filename).
133    pub segment_id: u64,
134    /// First `commit_seq` stored in this segment.
135    pub start_commit_seq: u64,
136}
137
138impl MarkerSegmentHeader {
139    /// Create a new header for the given segment.
140    #[must_use]
141    pub const fn new(segment_id: u64, start_commit_seq: u64) -> Self {
142        Self {
143            segment_id,
144            start_commit_seq,
145        }
146    }
147
148    /// Encode to exactly [`MARKER_SEGMENT_HEADER_BYTES`] bytes.
149    #[must_use]
150    pub fn encode(&self) -> [u8; MARKER_SEGMENT_HEADER_BYTES] {
151        let mut buf = [0u8; MARKER_SEGMENT_HEADER_BYTES];
152        // magic (4)
153        buf[0..4].copy_from_slice(&MARKER_SEGMENT_MAGIC);
154        // version (4)
155        buf[4..8].copy_from_slice(&MARKER_FORMAT_VERSION.to_le_bytes());
156        // segment_id (8)
157        buf[8..16].copy_from_slice(&self.segment_id.to_le_bytes());
158        // start_commit_seq (8)
159        buf[16..24].copy_from_slice(&self.start_commit_seq.to_le_bytes());
160        // record_size (4)
161        #[allow(clippy::cast_possible_truncation)]
162        let record_size = COMMIT_MARKER_RECORD_BYTES as u32;
163        buf[24..28].copy_from_slice(&record_size.to_le_bytes());
164        // header_xxh3 (8) — hash of bytes [0..28]
165        let hash = xxhash_rust::xxh3::xxh3_64(&buf[..28]);
166        buf[28..36].copy_from_slice(&hash.to_le_bytes());
167        buf
168    }
169
170    /// Decode from a byte slice. Validates magic, version, record_size, and checksum.
171    pub fn decode(data: &[u8]) -> Result<Self, MarkerError> {
172        if data.len() < MARKER_SEGMENT_HEADER_BYTES {
173            return Err(MarkerError::HeaderTooShort);
174        }
175
176        // magic
177        if data[0..4] != MARKER_SEGMENT_MAGIC {
178            return Err(MarkerError::BadMagic);
179        }
180
181        // version
182        let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
183        if version != MARKER_FORMAT_VERSION {
184            return Err(MarkerError::UnsupportedVersion { version });
185        }
186
187        // segment_id
188        let segment_id = u64::from_le_bytes(data[8..16].try_into().expect("8 bytes"));
189
190        // start_commit_seq
191        let start_commit_seq = u64::from_le_bytes(data[16..24].try_into().expect("8 bytes"));
192
193        // record_size
194        let record_size = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);
195        #[allow(clippy::cast_possible_truncation)]
196        let expected_record_size = COMMIT_MARKER_RECORD_BYTES as u32;
197        if record_size != expected_record_size {
198            return Err(MarkerError::RecordSizeMismatch {
199                expected: expected_record_size,
200                actual: record_size,
201            });
202        }
203
204        // header_xxh3
205        let stored_hash = u64::from_le_bytes(data[28..36].try_into().expect("8 bytes"));
206        let computed_hash = xxhash_rust::xxh3::xxh3_64(&data[..28]);
207        if stored_hash != computed_hash {
208            return Err(MarkerError::HeaderChecksumMismatch {
209                expected: computed_hash,
210                actual: stored_hash,
211            });
212        }
213
214        Ok(Self {
215            segment_id,
216            start_commit_seq,
217        })
218    }
219}
220
221// ---------------------------------------------------------------------------
222// CommitMarkerRecord (88 bytes)
223// ---------------------------------------------------------------------------
224
225/// On-disk commit marker record.
226///
227/// Layout (88 bytes, all LE):
228/// ```text
229///   commit_seq          : u64      (8)
230///   commit_time_unix_ns : u64      (8)
231///   capsule_object_id   : [u8;16]  (16)
232///   proof_object_id     : [u8;16]  (16)
233///   prev_marker_id      : [u8;16]  (16) — 0 for genesis
234///   marker_id           : [u8;16]  (16)
235///   record_xxh3         : u64      (8)
236/// ```
237#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct CommitMarkerRecord {
239    /// Monotonic commit sequence number (gap-free within a segment).
240    pub commit_seq: u64,
241    /// Commit timestamp in nanoseconds since Unix epoch.
242    pub commit_time_unix_ns: u64,
243    /// ObjectId of the commit capsule.
244    pub capsule_object_id: [u8; ID_SIZE],
245    /// ObjectId of the proof object.
246    pub proof_object_id: [u8; ID_SIZE],
247    /// marker_id of the previous commit (all zeros for genesis).
248    pub prev_marker_id: [u8; ID_SIZE],
249    /// This record's marker_id: `Trunc128(BLAKE3("fsqlite:marker:v1" || prefix_bytes))`.
250    pub marker_id: [u8; ID_SIZE],
251}
252
253impl CommitMarkerRecord {
254    /// Create a new record, computing `marker_id` from the other fields.
255    #[must_use]
256    pub fn new(
257        commit_seq: u64,
258        commit_time_unix_ns: u64,
259        capsule_object_id: [u8; ID_SIZE],
260        proof_object_id: [u8; ID_SIZE],
261        prev_marker_id: [u8; ID_SIZE],
262    ) -> Self {
263        let marker_id = compute_marker_id(
264            commit_seq,
265            commit_time_unix_ns,
266            &capsule_object_id,
267            &proof_object_id,
268            &prev_marker_id,
269        );
270        Self {
271            commit_seq,
272            commit_time_unix_ns,
273            capsule_object_id,
274            proof_object_id,
275            prev_marker_id,
276            marker_id,
277        }
278    }
279
280    /// Encode to exactly [`COMMIT_MARKER_RECORD_BYTES`] bytes.
281    #[must_use]
282    pub fn encode(&self) -> [u8; COMMIT_MARKER_RECORD_BYTES] {
283        let mut buf = [0u8; COMMIT_MARKER_RECORD_BYTES];
284        buf[0..8].copy_from_slice(&self.commit_seq.to_le_bytes());
285        buf[8..16].copy_from_slice(&self.commit_time_unix_ns.to_le_bytes());
286        buf[16..32].copy_from_slice(&self.capsule_object_id);
287        buf[32..48].copy_from_slice(&self.proof_object_id);
288        buf[48..64].copy_from_slice(&self.prev_marker_id);
289        buf[64..80].copy_from_slice(&self.marker_id);
290        // record_xxh3: hash of bytes [0..80]
291        let hash = xxhash_rust::xxh3::xxh3_64(&buf[..80]);
292        buf[80..88].copy_from_slice(&hash.to_le_bytes());
293        buf
294    }
295
296    /// Decode from a byte slice. Validates xxh3 checksum.
297    pub fn decode(data: &[u8]) -> Result<Self, MarkerError> {
298        if data.len() < COMMIT_MARKER_RECORD_BYTES {
299            return Err(MarkerError::RecordTooShort);
300        }
301
302        let commit_seq = u64::from_le_bytes(data[0..8].try_into().expect("8 bytes"));
303        let commit_time_unix_ns = u64::from_le_bytes(data[8..16].try_into().expect("8 bytes"));
304
305        let mut capsule_object_id = [0u8; ID_SIZE];
306        capsule_object_id.copy_from_slice(&data[16..32]);
307
308        let mut proof_object_id = [0u8; ID_SIZE];
309        proof_object_id.copy_from_slice(&data[32..48]);
310
311        let mut prev_marker_id = [0u8; ID_SIZE];
312        prev_marker_id.copy_from_slice(&data[48..64]);
313
314        let mut marker_id = [0u8; ID_SIZE];
315        marker_id.copy_from_slice(&data[64..80]);
316
317        let stored_hash = u64::from_le_bytes(data[80..88].try_into().expect("8 bytes"));
318        let computed_hash = xxhash_rust::xxh3::xxh3_64(&data[..80]);
319        if stored_hash != computed_hash {
320            return Err(MarkerError::RecordChecksumMismatch {
321                expected: computed_hash,
322                actual: stored_hash,
323            });
324        }
325
326        Ok(Self {
327            commit_seq,
328            commit_time_unix_ns,
329            capsule_object_id,
330            proof_object_id,
331            prev_marker_id,
332            marker_id,
333        })
334    }
335
336    /// Verify that `marker_id` matches the recomputed value.
337    #[must_use]
338    pub fn verify_marker_id(&self) -> bool {
339        let expected = compute_marker_id(
340            self.commit_seq,
341            self.commit_time_unix_ns,
342            &self.capsule_object_id,
343            &self.proof_object_id,
344            &self.prev_marker_id,
345        );
346        self.marker_id == expected
347    }
348}
349
350// ---------------------------------------------------------------------------
351// marker_id computation
352// ---------------------------------------------------------------------------
353
354/// Compute marker_id: `Trunc128(BLAKE3("fsqlite:marker:v1" || prefix_bytes))`.
355///
356/// `prefix_bytes` is the LE encoding of
357/// `(commit_seq, commit_time_unix_ns, capsule_object_id, proof_object_id, prev_marker_id)`.
358#[must_use]
359pub fn compute_marker_id(
360    commit_seq: u64,
361    commit_time_unix_ns: u64,
362    capsule_object_id: &[u8; ID_SIZE],
363    proof_object_id: &[u8; ID_SIZE],
364    prev_marker_id: &[u8; ID_SIZE],
365) -> [u8; ID_SIZE] {
366    let mut prefix = [0u8; RECORD_PREFIX_BYTES];
367    prefix[0..8].copy_from_slice(&commit_seq.to_le_bytes());
368    prefix[8..16].copy_from_slice(&commit_time_unix_ns.to_le_bytes());
369    prefix[16..32].copy_from_slice(capsule_object_id);
370    prefix[32..48].copy_from_slice(proof_object_id);
371    prefix[48..64].copy_from_slice(prev_marker_id);
372
373    let mut hasher = blake3::Hasher::new();
374    hasher.update(MARKER_ID_DOMAIN);
375    hasher.update(&prefix);
376    let hash = hasher.finalize();
377
378    let mut id = [0u8; ID_SIZE];
379    id.copy_from_slice(&hash.as_bytes()[..ID_SIZE]);
380    id
381}
382
383// ---------------------------------------------------------------------------
384// O(1) seek helpers
385// ---------------------------------------------------------------------------
386
387/// Compute which segment a `commit_seq` falls in (fixed rotation policy).
388#[must_use]
389pub const fn segment_id_for_commit_seq(commit_seq: u64) -> u64 {
390    commit_seq / MARKERS_PER_SEGMENT
391}
392
393/// Compute the `start_commit_seq` for a given segment_id.
394#[must_use]
395pub const fn start_commit_seq_for_segment(segment_id: u64) -> u64 {
396    segment_id * MARKERS_PER_SEGMENT
397}
398
399/// Compute the byte offset of a record within a segment file.
400///
401/// `offset = MARKER_SEGMENT_HEADER_BYTES + (commit_seq - start_commit_seq) * COMMIT_MARKER_RECORD_BYTES`
402#[must_use]
403pub const fn record_offset(commit_seq: u64, start_commit_seq: u64) -> u64 {
404    let slot = commit_seq - start_commit_seq;
405    MARKER_SEGMENT_HEADER_BYTES as u64 + slot * COMMIT_MARKER_RECORD_BYTES as u64
406}
407
408/// Compute the next `commit_seq` from segment file length (crash-safe allocation).
409///
410/// `next_commit_seq = start_commit_seq + floor((file_len - header) / record_size)`
411#[must_use]
412pub const fn next_commit_seq_from_file_len(start_commit_seq: u64, file_len: u64) -> u64 {
413    if file_len < MARKER_SEGMENT_HEADER_BYTES as u64 {
414        return start_commit_seq;
415    }
416    let data_bytes = file_len - MARKER_SEGMENT_HEADER_BYTES as u64;
417    let n_records = data_bytes / COMMIT_MARKER_RECORD_BYTES as u64;
418    start_commit_seq + n_records
419}
420
421// ---------------------------------------------------------------------------
422// Torn tail handling
423// ---------------------------------------------------------------------------
424
425/// Scan a segment's record region and return the count of valid (checksum-verified)
426/// records from the start.  Stops at the first record that fails xxh3 verification.
427///
428/// `data` must be the record region only (header already stripped).
429#[must_use]
430pub fn valid_record_prefix_count(data: &[u8]) -> u64 {
431    let mut count = 0u64;
432    let mut offset = 0;
433    while offset + COMMIT_MARKER_RECORD_BYTES <= data.len() {
434        let record_bytes = &data[offset..offset + COMMIT_MARKER_RECORD_BYTES];
435        if CommitMarkerRecord::decode(record_bytes).is_err() {
436            break;
437        }
438        count += 1;
439        offset += COMMIT_MARKER_RECORD_BYTES;
440    }
441    count
442}
443
444/// Analyze a full segment buffer for torn tail conditions.
445///
446/// Returns `Ok(n_records)` if all complete records are valid,
447/// or `Err(TornTail { .. })` if there are trailing partial bytes.
448pub fn check_segment_integrity(segment_data: &[u8]) -> Result<u64, MarkerError> {
449    if segment_data.len() < MARKER_SEGMENT_HEADER_BYTES {
450        return Err(MarkerError::HeaderTooShort);
451    }
452
453    // Header must be valid before we reason about record layout.
454    let _header = MarkerSegmentHeader::decode(&segment_data[..MARKER_SEGMENT_HEADER_BYTES])?;
455
456    let record_region = &segment_data[MARKER_SEGMENT_HEADER_BYTES..];
457    let complete_records = record_region.len() / COMMIT_MARKER_RECORD_BYTES;
458    let trailing = record_region.len() % COMMIT_MARKER_RECORD_BYTES;
459
460    // Verify all complete records up to the first decode failure while also
461    // enforcing density (`commit_seq = start_commit_seq + slot`).
462    let recovered = recover_valid_prefix(segment_data)?;
463    let valid = u64::try_from(recovered.len()).expect("record count always fits in u64");
464
465    #[allow(clippy::cast_possible_truncation)]
466    let complete_u64 = complete_records as u64;
467
468    if trailing > 0 || valid < complete_u64 {
469        let valid_usize = recovered.len();
470        return Err(MarkerError::TornTail {
471            complete_records: valid,
472            trailing_bytes: if valid < complete_u64 {
473                // Corruption mid-stream: remaining bytes from corrupt record onward.
474                record_region
475                    .len()
476                    .saturating_sub(valid_usize.saturating_mul(COMMIT_MARKER_RECORD_BYTES))
477            } else {
478                trailing
479            },
480        });
481    }
482
483    Ok(valid)
484}
485
486/// Recover the valid, density-checked prefix of commit markers from a segment.
487///
488/// This helper is intentionally tolerant of torn tails: it stops at the first
489/// undecodable record and returns the valid prefix. Density violations are
490/// fail-closed and returned as [`MarkerError::CommitSeqMismatch`].
491pub fn recover_valid_prefix(segment_data: &[u8]) -> Result<Vec<CommitMarkerRecord>, MarkerError> {
492    if segment_data.len() < MARKER_SEGMENT_HEADER_BYTES {
493        return Err(MarkerError::HeaderTooShort);
494    }
495
496    let header = MarkerSegmentHeader::decode(&segment_data[..MARKER_SEGMENT_HEADER_BYTES])?;
497    let record_region = &segment_data[MARKER_SEGMENT_HEADER_BYTES..];
498
499    let mut records = Vec::new();
500    let mut offset = 0usize;
501
502    while offset + COMMIT_MARKER_RECORD_BYTES <= record_region.len() {
503        let record_bytes = &record_region[offset..offset + COMMIT_MARKER_RECORD_BYTES];
504        let Ok(record) = CommitMarkerRecord::decode(record_bytes) else {
505            break;
506        };
507
508        let expected = header.start_commit_seq
509            + u64::try_from(records.len()).expect("record vector length always fits in u64");
510        if record.commit_seq != expected {
511            return Err(MarkerError::CommitSeqMismatch {
512                expected,
513                actual: record.commit_seq,
514            });
515        }
516
517        records.push(record);
518        offset += COMMIT_MARKER_RECORD_BYTES;
519    }
520
521    Ok(records)
522}
523
524// ---------------------------------------------------------------------------
525// Binary search by time
526// ---------------------------------------------------------------------------
527
528/// Binary search for the commit_seq whose `commit_time_unix_ns` is the greatest
529/// value <= `target_ns`.  Returns `None` if all records are after `target_ns`.
530///
531/// `read_record` is called with a commit_seq and must return the decoded record.
532pub fn binary_search_by_time<F>(
533    start_commit_seq: u64,
534    record_count: u64,
535    target_ns: u64,
536    mut read_record: F,
537) -> Option<u64>
538where
539    F: FnMut(u64) -> Option<CommitMarkerRecord>,
540{
541    if record_count == 0 {
542        return None;
543    }
544
545    let mut lo = 0u64;
546    let mut hi = record_count;
547    let mut best: Option<u64> = None;
548
549    while lo < hi {
550        let mid = lo + (hi - lo) / 2;
551        let seq = start_commit_seq + mid;
552        let Some(record) = read_record(seq) else {
553            break;
554        };
555
556        if record.commit_time_unix_ns <= target_ns {
557            best = Some(seq);
558            lo = mid + 1;
559        } else {
560            hi = mid;
561        }
562    }
563
564    best
565}
566
567// =========================================================================
568// Tests
569// =========================================================================
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use std::collections::HashSet;
575
576    // ===================================================================
577    // bd-1hi.23 — Commit Marker Stream Format (§3.5.4)
578    // ===================================================================
579
580    fn make_test_record(seq: u64, prev: [u8; ID_SIZE]) -> CommitMarkerRecord {
581        let capsule = [(seq & 0xFF) as u8; ID_SIZE];
582        let proof = [((seq >> 8) & 0xFF) as u8; ID_SIZE];
583        let time_ns = 1_700_000_000_000_000_000u64 + seq * 1_000_000;
584        CommitMarkerRecord::new(seq, time_ns, capsule, proof, prev)
585    }
586
587    #[test]
588    fn test_marker_segment_header_encode_decode() {
589        let header = MarkerSegmentHeader::new(42, 42_000_000);
590        let encoded = header.encode();
591        assert_eq!(
592            encoded.len(),
593            MARKER_SEGMENT_HEADER_BYTES,
594            "header must be exactly {MARKER_SEGMENT_HEADER_BYTES} bytes"
595        );
596
597        let decoded = MarkerSegmentHeader::decode(&encoded).expect("decode must succeed");
598        assert_eq!(decoded, header);
599
600        // Verify magic bytes.
601        assert_eq!(&encoded[0..4], b"FSMK");
602    }
603
604    #[test]
605    fn test_commit_marker_record_encode_decode() {
606        let record = make_test_record(7, [0u8; ID_SIZE]);
607        let encoded = record.encode();
608        assert_eq!(
609            encoded.len(),
610            COMMIT_MARKER_RECORD_BYTES,
611            "record must be exactly {COMMIT_MARKER_RECORD_BYTES} bytes"
612        );
613
614        let decoded = CommitMarkerRecord::decode(&encoded).expect("decode must succeed");
615        assert_eq!(decoded, record);
616    }
617
618    #[test]
619    fn test_marker_id_computation() {
620        let seq = 100u64;
621        let time_ns = 1_700_000_000_000_000_000u64;
622        let capsule = [0xAAu8; ID_SIZE];
623        let proof = [0xBBu8; ID_SIZE];
624        let prev = [0u8; ID_SIZE];
625
626        let marker_id = compute_marker_id(seq, time_ns, &capsule, &proof, &prev);
627
628        // Manually compute the expected value.
629        let mut prefix = [0u8; RECORD_PREFIX_BYTES];
630        prefix[0..8].copy_from_slice(&seq.to_le_bytes());
631        prefix[8..16].copy_from_slice(&time_ns.to_le_bytes());
632        prefix[16..32].copy_from_slice(&capsule);
633        prefix[32..48].copy_from_slice(&proof);
634        prefix[48..64].copy_from_slice(&prev);
635
636        let mut hasher = blake3::Hasher::new();
637        hasher.update(MARKER_ID_DOMAIN);
638        hasher.update(&prefix);
639        let hash = hasher.finalize();
640        let mut expected = [0u8; ID_SIZE];
641        expected.copy_from_slice(&hash.as_bytes()[..ID_SIZE]);
642
643        assert_eq!(
644            marker_id, expected,
645            "marker_id must be Trunc128(BLAKE3(domain || prefix))"
646        );
647    }
648
649    #[test]
650    fn test_density_invariant() {
651        let start_seq = 1000u64;
652        let mut prev = [0u8; ID_SIZE];
653        let mut records = Vec::new();
654
655        for i in 0..5u64 {
656            let record = make_test_record(start_seq + i, prev);
657            prev = record.marker_id;
658            records.push(record);
659        }
660
661        for (i, record) in records.iter().enumerate() {
662            assert_eq!(
663                record.commit_seq,
664                start_seq + i as u64,
665                "record at slot {i} must have commit_seq = start + {i}"
666            );
667        }
668    }
669
670    #[test]
671    fn test_o1_seek_by_commit_seq() {
672        let start_seq = 0u64;
673        let header = MarkerSegmentHeader::new(0, start_seq);
674        let mut segment = Vec::from(header.encode());
675
676        let mut prev = [0u8; ID_SIZE];
677        let mut records = Vec::new();
678        for i in 0..1000u64 {
679            let record = make_test_record(start_seq + i, prev);
680            prev = record.marker_id;
681            segment.extend_from_slice(&record.encode());
682            records.push(record);
683        }
684
685        // Seek to commit_seq=500 via offset formula.
686        let target_seq = 500u64;
687        #[allow(clippy::cast_possible_truncation)]
688        let offset = record_offset(target_seq, start_seq) as usize;
689        let record_bytes = &segment[offset..offset + COMMIT_MARKER_RECORD_BYTES];
690        let record = CommitMarkerRecord::decode(record_bytes).expect("decode at offset");
691        assert_eq!(record.commit_seq, target_seq);
692        assert_eq!(record, records[500]);
693    }
694
695    #[test]
696    fn test_commit_seq_allocation_from_file_length() {
697        let start_seq = 5000u64;
698        // 10 records: file_len = 36 + 10 * 88 = 916
699        let file_len = MARKER_SEGMENT_HEADER_BYTES as u64 + 10 * COMMIT_MARKER_RECORD_BYTES as u64;
700        assert_eq!(file_len, 916);
701
702        let next = next_commit_seq_from_file_len(start_seq, file_len);
703        assert_eq!(next, start_seq + 10);
704    }
705
706    #[test]
707    fn test_torn_tail_handling() {
708        let start_seq = 0u64;
709        let header = MarkerSegmentHeader::new(0, start_seq);
710        let mut segment = Vec::from(header.encode());
711
712        let mut prev = [0u8; ID_SIZE];
713        for i in 0..5u64 {
714            let record = make_test_record(start_seq + i, prev);
715            prev = record.marker_id;
716            segment.extend_from_slice(&record.encode());
717        }
718
719        // Append 44 partial bytes (half a record).
720        segment.extend_from_slice(&[0xDE; 44]);
721
722        let result = check_segment_integrity(&segment);
723        match result {
724            Err(MarkerError::TornTail {
725                complete_records,
726                trailing_bytes,
727            }) => {
728                assert_eq!(complete_records, 5);
729                assert_eq!(trailing_bytes, 44);
730            }
731            other => unreachable!("expected TornTail, got {other:?}"),
732        }
733    }
734
735    #[test]
736    fn test_torn_tail_corrupt_last_record() {
737        let start_seq = 0u64;
738        let header = MarkerSegmentHeader::new(0, start_seq);
739        let mut segment = Vec::from(header.encode());
740
741        let mut prev = [0u8; ID_SIZE];
742        for i in 0..5u64 {
743            let record = make_test_record(start_seq + i, prev);
744            prev = record.marker_id;
745            segment.extend_from_slice(&record.encode());
746        }
747
748        // Corrupt record 4's xxh3 (last 8 bytes of record 4).
749        let record_4_end = MARKER_SEGMENT_HEADER_BYTES + 5 * COMMIT_MARKER_RECORD_BYTES;
750        let xxh3_start = record_4_end - 8;
751        segment[xxh3_start] ^= 0xFF;
752
753        let result = check_segment_integrity(&segment);
754        match result {
755            Err(MarkerError::TornTail {
756                complete_records, ..
757            }) => {
758                assert_eq!(complete_records, 4, "valid prefix is records 0-3");
759            }
760            other => unreachable!("expected TornTail, got {other:?}"),
761        }
762    }
763
764    #[test]
765    fn test_commit_seq_mismatch_detected() {
766        let start_seq = 100u64;
767        let header = MarkerSegmentHeader::new(0, start_seq);
768        let mut segment = Vec::from(header.encode());
769
770        let first = make_test_record(start_seq, [0u8; ID_SIZE]);
771        let second = make_test_record(start_seq + 2, first.marker_id);
772        segment.extend_from_slice(&first.encode());
773        segment.extend_from_slice(&second.encode());
774
775        let result = check_segment_integrity(&segment);
776        match result {
777            Err(MarkerError::CommitSeqMismatch { expected, actual }) => {
778                assert_eq!(expected, start_seq + 1);
779                assert_eq!(actual, start_seq + 2);
780            }
781            other => unreachable!("expected CommitSeqMismatch, got {other:?}"),
782        }
783    }
784
785    #[test]
786    fn test_binary_search_by_time() {
787        let start_seq = 0u64;
788        let base_ns = 1_000_000_000_000_000_000u64;
789
790        let records: Vec<CommitMarkerRecord> = (0..100u64)
791            .scan([0u8; ID_SIZE], |prev, i| {
792                let capsule = [(i & 0xFF) as u8; ID_SIZE];
793                let proof = [((i >> 8) & 0xFF) as u8; ID_SIZE];
794                let time_ns = base_ns + i * 1_000_000;
795                let record = CommitMarkerRecord::new(i, time_ns, capsule, proof, *prev);
796                *prev = record.marker_id;
797                Some(record)
798            })
799            .collect();
800
801        // Search for time at commit_seq=50.
802        let target_ns = base_ns + 50 * 1_000_000;
803        #[allow(clippy::cast_possible_truncation)]
804        let result = binary_search_by_time(start_seq, 100, target_ns, |seq| {
805            records.get(seq as usize).cloned()
806        });
807        assert_eq!(result, Some(50));
808
809        // Search before all records.
810        #[allow(clippy::cast_possible_truncation)]
811        let result = binary_search_by_time(start_seq, 100, base_ns - 1, |seq| {
812            records.get(seq as usize).cloned()
813        });
814        assert_eq!(result, None);
815
816        // Search after all records.
817        #[allow(clippy::cast_possible_truncation)]
818        let result = binary_search_by_time(start_seq, 100, u64::MAX, |seq| {
819            records.get(seq as usize).cloned()
820        });
821        assert_eq!(result, Some(99));
822    }
823
824    #[test]
825    fn test_fork_detection() {
826        let base_ns = 1_700_000_000_000_000_000u64;
827        let mut prev = [0u8; ID_SIZE];
828
829        // Build a shared prefix of 10 commits.
830        let mut shared = Vec::new();
831        for i in 0..10u64 {
832            let capsule = [0xAAu8; ID_SIZE];
833            let proof = [0xBBu8; ID_SIZE];
834            let record = CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev);
835            prev = record.marker_id;
836            shared.push(record);
837        }
838
839        // Fork A: continues from shared[9].
840        let mut fork_a = shared.clone();
841        let mut prev_a = shared[9].marker_id;
842        for i in 10..15u64 {
843            let capsule = [0x11u8; ID_SIZE];
844            let proof = [0x22u8; ID_SIZE];
845            let record =
846                CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev_a);
847            prev_a = record.marker_id;
848            fork_a.push(record);
849        }
850
851        // Fork B: different content from commit 10.
852        let mut fork_b = shared.clone();
853        let mut prev_b = shared[9].marker_id;
854        for i in 10..13u64 {
855            let capsule = [0x33u8; ID_SIZE];
856            let proof = [0x44u8; ID_SIZE];
857            let record =
858                CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev_b);
859            prev_b = record.marker_id;
860            fork_b.push(record);
861        }
862
863        // Binary search for divergence point.
864        let min_len = fork_a.len().min(fork_b.len());
865        let mut lo = 0usize;
866        let mut hi = min_len;
867        while lo < hi {
868            let mid = lo + (hi - lo) / 2;
869            if fork_a[mid].marker_id == fork_b[mid].marker_id {
870                lo = mid + 1;
871            } else {
872                hi = mid;
873            }
874        }
875
876        // Greatest common prefix is at index lo - 1.
877        assert_eq!(lo, 10, "divergence starts at commit_seq 10");
878        assert_eq!(
879            fork_a[9].marker_id, fork_b[9].marker_id,
880            "last matching marker_id is at seq 9"
881        );
882        assert_ne!(
883            fork_a[10].marker_id, fork_b[10].marker_id,
884            "first divergence at seq 10"
885        );
886    }
887
888    #[test]
889    fn test_hash_chain_integrity() {
890        let mut prev = [0u8; ID_SIZE];
891        let mut records = Vec::new();
892
893        for i in 0..10u64 {
894            let record = make_test_record(i, prev);
895            prev = record.marker_id;
896            records.push(record);
897        }
898
899        // Verify chain links.
900        for i in 1..records.len() {
901            assert_eq!(
902                records[i].prev_marker_id,
903                records[i - 1].marker_id,
904                "record {i} prev_marker_id must link to record {}'s marker_id",
905                i - 1
906            );
907        }
908        assert_eq!(records[0].prev_marker_id, [0u8; ID_SIZE], "genesis is zero");
909
910        // Verify all marker_ids.
911        for record in &records {
912            assert!(
913                record.verify_marker_id(),
914                "marker_id must be verifiable for commit_seq {}",
915                record.commit_seq
916            );
917        }
918
919        // Tamper with one record and verify detection.
920        let mut tampered = records[5].clone();
921        tampered.capsule_object_id[0] ^= 0xFF;
922        assert!(
923            !tampered.verify_marker_id(),
924            "tampered record must fail marker_id verification"
925        );
926    }
927
928    #[test]
929    fn test_marker_no_mem_size_of() {
930        // Verify on-disk sizes are constants, not derived from mem::size_of.
931        assert_eq!(MARKER_SEGMENT_HEADER_BYTES, 36);
932        assert_eq!(COMMIT_MARKER_RECORD_BYTES, 88);
933
934        // Verify the actual struct sizes may differ from on-disk sizes
935        // (padding), confirming we don't use mem::size_of for offsets.
936        let header = MarkerSegmentHeader::new(0, 0);
937        let encoded_header = header.encode();
938        assert_eq!(
939            encoded_header.len(),
940            36,
941            "header on-disk size is a constant"
942        );
943
944        let record = make_test_record(0, [0u8; ID_SIZE]);
945        let encoded_record = record.encode();
946        assert_eq!(
947            encoded_record.len(),
948            88,
949            "record on-disk size is a constant"
950        );
951    }
952
953    #[test]
954    fn test_header_bad_magic_rejected() {
955        let header = MarkerSegmentHeader::new(1, 0);
956        let mut encoded = header.encode();
957        encoded[0] = b'X';
958
959        let result = MarkerSegmentHeader::decode(&encoded);
960        assert_eq!(result.unwrap_err(), MarkerError::BadMagic);
961    }
962
963    #[test]
964    fn test_header_checksum_tamper_detected() {
965        let header = MarkerSegmentHeader::new(1, 0);
966        let mut encoded = header.encode();
967        // Tamper with segment_id.
968        encoded[8] ^= 0x01;
969
970        let result = MarkerSegmentHeader::decode(&encoded);
971        assert!(matches!(
972            result.unwrap_err(),
973            MarkerError::HeaderChecksumMismatch { .. }
974        ));
975    }
976
977    #[test]
978    fn test_record_checksum_tamper_detected() {
979        let record = make_test_record(42, [0u8; ID_SIZE]);
980        let mut encoded = record.encode();
981        // Tamper with commit_time_unix_ns.
982        encoded[10] ^= 0x01;
983
984        let result = CommitMarkerRecord::decode(&encoded);
985        assert!(matches!(
986            result.unwrap_err(),
987            MarkerError::RecordChecksumMismatch { .. }
988        ));
989    }
990
991    #[test]
992    fn test_segment_id_for_commit_seq() {
993        assert_eq!(segment_id_for_commit_seq(0), 0);
994        assert_eq!(segment_id_for_commit_seq(999_999), 0);
995        assert_eq!(segment_id_for_commit_seq(1_000_000), 1);
996        assert_eq!(segment_id_for_commit_seq(2_500_000), 2);
997    }
998
999    #[test]
1000    fn test_start_commit_seq_for_segment() {
1001        assert_eq!(start_commit_seq_for_segment(0), 0);
1002        assert_eq!(start_commit_seq_for_segment(1), 1_000_000);
1003        assert_eq!(start_commit_seq_for_segment(5), 5_000_000);
1004    }
1005
1006    #[test]
1007    fn test_record_offset_formula() {
1008        let offset = record_offset(500, 0);
1009        assert_eq!(
1010            offset,
1011            MARKER_SEGMENT_HEADER_BYTES as u64 + 500 * COMMIT_MARKER_RECORD_BYTES as u64
1012        );
1013
1014        let offset2 = record_offset(1_000_050, 1_000_000);
1015        assert_eq!(
1016            offset2,
1017            MARKER_SEGMENT_HEADER_BYTES as u64 + 50 * COMMIT_MARKER_RECORD_BYTES as u64
1018        );
1019    }
1020
1021    #[test]
1022    fn test_error_display() {
1023        let err = MarkerError::BadMagic;
1024        assert_eq!(err.to_string(), "bad magic in marker segment header");
1025
1026        let err = MarkerError::TornTail {
1027            complete_records: 5,
1028            trailing_bytes: 44,
1029        };
1030        assert!(err.to_string().contains("torn tail"));
1031        assert!(err.to_string().contains('5'));
1032    }
1033
1034    #[test]
1035    fn test_marker_id_deterministic() {
1036        let capsule = [0xAA; ID_SIZE];
1037        let proof = [0xBB; ID_SIZE];
1038        let prev = [0u8; ID_SIZE];
1039
1040        let id1 = compute_marker_id(1, 100, &capsule, &proof, &prev);
1041        let id2 = compute_marker_id(1, 100, &capsule, &proof, &prev);
1042        assert_eq!(id1, id2, "marker_id must be deterministic");
1043
1044        // Different input → different output.
1045        let id3 = compute_marker_id(2, 100, &capsule, &proof, &prev);
1046        assert_ne!(id1, id3);
1047    }
1048
1049    #[test]
1050    fn prop_marker_id_unique() {
1051        let mut rng_state = 0xDEAD_BEEF_CAFE_BABE_u64;
1052        let mut observed_ids = HashSet::new();
1053
1054        for i in 0..2048u64 {
1055            // Deterministic pseudo-random generation to avoid flaky tests.
1056            rng_state = rng_state
1057                .wrapping_mul(6_364_136_223_846_793_005)
1058                .wrapping_add(1);
1059            let mut capsule = [0u8; ID_SIZE];
1060            let mut proof = [0u8; ID_SIZE];
1061            let mut prev = [0u8; ID_SIZE];
1062
1063            capsule[..8].copy_from_slice(&rng_state.to_le_bytes());
1064            proof[..8].copy_from_slice(&rng_state.rotate_left(13).to_le_bytes());
1065            prev[..8].copy_from_slice(&rng_state.rotate_right(7).to_le_bytes());
1066            capsule[8..16].copy_from_slice(&i.to_le_bytes());
1067            proof[8..16].copy_from_slice(&(i ^ rng_state).to_le_bytes());
1068            prev[8..16].copy_from_slice(&(i.wrapping_mul(17)).to_le_bytes());
1069
1070            let marker_id =
1071                compute_marker_id(i, 1_700_000_000_000_000_000 + i, &capsule, &proof, &prev);
1072            assert!(
1073                observed_ids.insert(marker_id),
1074                "marker_id collision at sample {i}: {marker_id:02X?}"
1075            );
1076        }
1077    }
1078
1079    #[test]
1080    fn prop_density_invariant_holds() {
1081        for count in 1..=256u64 {
1082            let start_seq = 10_000 + count * 31;
1083            let header = MarkerSegmentHeader::new(segment_id_for_commit_seq(start_seq), start_seq);
1084            let mut segment = Vec::from(header.encode());
1085            let mut prev = [0u8; ID_SIZE];
1086
1087            for i in 0..count {
1088                let record = make_test_record(start_seq + i, prev);
1089                prev = record.marker_id;
1090                segment.extend_from_slice(&record.encode());
1091            }
1092
1093            let integrity = check_segment_integrity(&segment).expect("segment should be dense");
1094            assert_eq!(integrity, count);
1095        }
1096    }
1097
1098    #[test]
1099    fn test_e2e_marker_stream_recovery() {
1100        let start_seq = 20_000u64;
1101        let header = MarkerSegmentHeader::new(segment_id_for_commit_seq(start_seq), start_seq);
1102        let mut segment = Vec::from(header.encode());
1103        let mut expected = Vec::new();
1104        let mut prev = [0u8; ID_SIZE];
1105
1106        for i in 0..1000u64 {
1107            let record = make_test_record(start_seq + i, prev);
1108            prev = record.marker_id;
1109            segment.extend_from_slice(&record.encode());
1110            expected.push(record);
1111        }
1112
1113        // Simulate crash: truncate in the middle of the final record.
1114        segment.truncate(segment.len() - (COMMIT_MARKER_RECORD_BYTES / 2));
1115
1116        let recovered = recover_valid_prefix(&segment).expect("recovery should succeed");
1117        assert_eq!(recovered.len(), expected.len() - 1);
1118        assert_eq!(recovered, expected[..expected.len() - 1]);
1119
1120        let integrity = check_segment_integrity(&segment);
1121        match integrity {
1122            Err(MarkerError::TornTail {
1123                complete_records,
1124                trailing_bytes,
1125            }) => {
1126                assert_eq!(complete_records, 999);
1127                assert_eq!(trailing_bytes, COMMIT_MARKER_RECORD_BYTES / 2);
1128            }
1129            other => unreachable!("expected torn-tail integrity result, got {other:?}"),
1130        }
1131    }
1132
1133    #[test]
1134    fn test_e2e_time_travel_query() {
1135        let start_seq = 5_000u64;
1136        let count = 256u64;
1137        let base_ns = 1_900_000_000_000_000_000u64;
1138        let mut prev = [0u8; ID_SIZE];
1139        let mut records = Vec::new();
1140
1141        for i in 0..count {
1142            let capsule = [(i & 0xFF) as u8; ID_SIZE];
1143            let proof = [((i >> 8) & 0xFF) as u8; ID_SIZE];
1144            let record = CommitMarkerRecord::new(
1145                start_seq + i,
1146                base_ns + i * 2_000_000,
1147                capsule,
1148                proof,
1149                prev,
1150            );
1151            prev = record.marker_id;
1152            records.push(record);
1153        }
1154
1155        let lookup = |seq: u64| -> Option<CommitMarkerRecord> {
1156            if seq < start_seq {
1157                return None;
1158            }
1159            let idx = usize::try_from(seq - start_seq).ok()?;
1160            records.get(idx).cloned()
1161        };
1162
1163        // Before the first marker.
1164        assert_eq!(
1165            binary_search_by_time(start_seq, count, base_ns - 1, lookup),
1166            None
1167        );
1168        // Exact hit.
1169        assert_eq!(
1170            binary_search_by_time(start_seq, count, base_ns + 40 * 2_000_000, lookup),
1171            Some(start_seq + 40)
1172        );
1173        // Between two commits should select the lower commit_seq.
1174        assert_eq!(
1175            binary_search_by_time(
1176                start_seq,
1177                count,
1178                base_ns + 40 * 2_000_000 + 1_000_000,
1179                lookup
1180            ),
1181            Some(start_seq + 40)
1182        );
1183        // After the final marker.
1184        assert_eq!(
1185            binary_search_by_time(start_seq, count, u64::MAX, lookup),
1186            Some(start_seq + count - 1)
1187        );
1188    }
1189}