Skip to main content

fsqlite_core/
commit_marker.rs

1//! Commit Marker Stream Format (§3.5.4 + §3.5.4.1, bd-1hi.23).
2//!
3//! The marker stream under `ecs/markers/` is the total order of commits in
4//! Native mode.  It is the authoritative, tamper-evident, seekable commit log.
5//!
6//! On-disk encoding: all fixed-width integers are **little-endian** (§3.5.1).
7//! Sizes are byte-exact — never derived from `mem::size_of::<T>()`.
8
9// ---------------------------------------------------------------------------
10// Constants
11// ---------------------------------------------------------------------------
12
13/// Magic bytes for a marker segment header: "FSMK".
14pub const MARKER_SEGMENT_MAGIC: [u8; 4] = *b"FSMK";
15
16/// Current format version.
17pub const MARKER_FORMAT_VERSION: u32 = 1;
18
19/// Byte size of [`MarkerSegmentHeader`] on disk.
20pub const MARKER_SEGMENT_HEADER_BYTES: usize = 36;
21
22/// Byte size of [`CommitMarkerRecord`] on disk.
23pub const COMMIT_MARKER_RECORD_BYTES: usize = 88;
24
25/// Default number of markers per segment (fixed rotation policy).
26pub const MARKERS_PER_SEGMENT: u64 = 1_000_000;
27
28/// Domain separation tag for marker_id computation.
29const MARKER_ID_DOMAIN: &[u8] = b"fsqlite:marker:v1";
30
31/// Size of a marker_id or object_id in bytes.
32const ID_SIZE: usize = 16;
33
34/// Byte length of the record prefix used for marker_id hashing.
35/// commit_seq(8) + commit_time_unix_ns(8) + capsule_object_id(16) +
36/// proof_object_id(16) + prev_marker_id(16) = 64.
37const RECORD_PREFIX_BYTES: usize = 64;
38
39// ---------------------------------------------------------------------------
40// Errors
41// ---------------------------------------------------------------------------
42
43/// Errors from marker stream operations.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum MarkerError {
46    /// Header buffer too short (need [`MARKER_SEGMENT_HEADER_BYTES`]).
47    HeaderTooShort,
48    /// Bad magic bytes in segment header.
49    BadMagic,
50    /// Header xxh3 checksum mismatch.
51    HeaderChecksumMismatch { expected: u64, actual: u64 },
52    /// Record buffer too short (need [`COMMIT_MARKER_RECORD_BYTES`]).
53    RecordTooShort,
54    /// Record xxh3 checksum mismatch.
55    RecordChecksumMismatch { expected: u64, actual: u64 },
56    /// Record version mismatch.
57    UnsupportedVersion { version: u32 },
58    /// Record size in header doesn't match expected.
59    RecordSizeMismatch { expected: u32, actual: u32 },
60    /// commit_seq doesn't match expected slot position.
61    CommitSeqMismatch { expected: u64, actual: u64 },
62    /// Segment data has incomplete (torn) tail.
63    TornTail {
64        complete_records: u64,
65        trailing_bytes: usize,
66    },
67    /// marker_id does not match recomputed value (forged or corrupt record).
68    MarkerIdMismatch { commit_seq: u64 },
69}
70
71impl std::fmt::Display for MarkerError {
72    #[allow(clippy::too_many_lines)]
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Self::HeaderTooShort => f.write_str("marker segment header too short"),
76            Self::BadMagic => f.write_str("bad magic in marker segment header"),
77            Self::HeaderChecksumMismatch { expected, actual } => {
78                write!(
79                    f,
80                    "marker header xxh3 mismatch: expected {expected:#018X}, got {actual:#018X}"
81                )
82            }
83            Self::RecordTooShort => f.write_str("commit marker record too short"),
84            Self::RecordChecksumMismatch { expected, actual } => {
85                write!(
86                    f,
87                    "marker record xxh3 mismatch: expected {expected:#018X}, got {actual:#018X}"
88                )
89            }
90            Self::UnsupportedVersion { version } => {
91                write!(f, "unsupported marker format version: {version}")
92            }
93            Self::RecordSizeMismatch { expected, actual } => {
94                write!(
95                    f,
96                    "marker record size mismatch: expected {expected}, got {actual}"
97                )
98            }
99            Self::CommitSeqMismatch { expected, actual } => {
100                write!(f, "commit_seq mismatch: expected {expected}, got {actual}")
101            }
102            Self::TornTail {
103                complete_records,
104                trailing_bytes,
105            } => {
106                write!(
107                    f,
108                    "torn tail: {complete_records} complete records, {trailing_bytes} trailing bytes"
109                )
110            }
111            Self::MarkerIdMismatch { commit_seq } => {
112                write!(
113                    f,
114                    "marker_id verification failed for commit_seq {commit_seq} (forged or corrupt)"
115                )
116            }
117        }
118    }
119}
120
121impl std::error::Error for MarkerError {}
122
123// ---------------------------------------------------------------------------
124// MarkerSegmentHeader (36 bytes)
125// ---------------------------------------------------------------------------
126
127/// On-disk segment header for the commit marker stream.
128///
129/// Layout (36 bytes, all LE):
130/// ```text
131///   magic           : [u8; 4]   — "FSMK"
132///   version         : u32       — 1
133///   segment_id      : u64       — monotonic identifier
134///   start_commit_seq: u64       — first commit_seq in this segment
135///   record_size     : u32       — bytes per record (88 in V1)
136///   header_xxh3     : u64       — xxhash3 of all preceding fields
137/// ```
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub struct MarkerSegmentHeader {
140    /// Monotonic segment identifier (matches filename).
141    pub segment_id: u64,
142    /// First `commit_seq` stored in this segment.
143    pub start_commit_seq: u64,
144}
145
146impl MarkerSegmentHeader {
147    /// Create a new header for the given segment.
148    #[must_use]
149    pub const fn new(segment_id: u64, start_commit_seq: u64) -> Self {
150        Self {
151            segment_id,
152            start_commit_seq,
153        }
154    }
155
156    /// Encode to exactly [`MARKER_SEGMENT_HEADER_BYTES`] bytes.
157    #[must_use]
158    pub fn encode(&self) -> [u8; MARKER_SEGMENT_HEADER_BYTES] {
159        let mut buf = [0u8; MARKER_SEGMENT_HEADER_BYTES];
160        // magic (4)
161        buf[0..4].copy_from_slice(&MARKER_SEGMENT_MAGIC);
162        // version (4)
163        buf[4..8].copy_from_slice(&MARKER_FORMAT_VERSION.to_le_bytes());
164        // segment_id (8)
165        buf[8..16].copy_from_slice(&self.segment_id.to_le_bytes());
166        // start_commit_seq (8)
167        buf[16..24].copy_from_slice(&self.start_commit_seq.to_le_bytes());
168        // record_size (4)
169        #[allow(clippy::cast_possible_truncation)]
170        let record_size = COMMIT_MARKER_RECORD_BYTES as u32;
171        buf[24..28].copy_from_slice(&record_size.to_le_bytes());
172        // header_xxh3 (8) — hash of bytes [0..28]
173        let hash = xxhash_rust::xxh3::xxh3_64(&buf[..28]);
174        buf[28..36].copy_from_slice(&hash.to_le_bytes());
175        buf
176    }
177
178    /// Decode from a byte slice. Validates magic, version, record_size, and checksum.
179    pub fn decode(data: &[u8]) -> Result<Self, MarkerError> {
180        if data.len() < MARKER_SEGMENT_HEADER_BYTES {
181            return Err(MarkerError::HeaderTooShort);
182        }
183
184        // magic
185        if data[0..4] != MARKER_SEGMENT_MAGIC {
186            return Err(MarkerError::BadMagic);
187        }
188
189        // version
190        let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
191        if version != MARKER_FORMAT_VERSION {
192            return Err(MarkerError::UnsupportedVersion { version });
193        }
194
195        // segment_id
196        let segment_id = u64::from_le_bytes(data[8..16].try_into().expect("8 bytes"));
197
198        // start_commit_seq
199        let start_commit_seq = u64::from_le_bytes(data[16..24].try_into().expect("8 bytes"));
200
201        // record_size
202        let record_size = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);
203        #[allow(clippy::cast_possible_truncation)]
204        let expected_record_size = COMMIT_MARKER_RECORD_BYTES as u32;
205        if record_size != expected_record_size {
206            return Err(MarkerError::RecordSizeMismatch {
207                expected: expected_record_size,
208                actual: record_size,
209            });
210        }
211
212        // header_xxh3
213        let stored_hash = u64::from_le_bytes(data[28..36].try_into().expect("8 bytes"));
214        let computed_hash = xxhash_rust::xxh3::xxh3_64(&data[..28]);
215        if stored_hash != computed_hash {
216            return Err(MarkerError::HeaderChecksumMismatch {
217                expected: computed_hash,
218                actual: stored_hash,
219            });
220        }
221
222        Ok(Self {
223            segment_id,
224            start_commit_seq,
225        })
226    }
227}
228
229// ---------------------------------------------------------------------------
230// CommitMarkerRecord (88 bytes)
231// ---------------------------------------------------------------------------
232
233/// On-disk commit marker record.
234///
235/// Layout (88 bytes, all LE):
236/// ```text
237///   commit_seq          : u64      (8)
238///   commit_time_unix_ns : u64      (8)
239///   capsule_object_id   : [u8;16]  (16)
240///   proof_object_id     : [u8;16]  (16)
241///   prev_marker_id      : [u8;16]  (16) — 0 for genesis
242///   marker_id           : [u8;16]  (16)
243///   record_xxh3         : u64      (8)
244/// ```
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct CommitMarkerRecord {
247    /// Monotonic commit sequence number (gap-free within a segment).
248    pub commit_seq: u64,
249    /// Commit timestamp in nanoseconds since Unix epoch.
250    pub commit_time_unix_ns: u64,
251    /// ObjectId of the commit capsule.
252    pub capsule_object_id: [u8; ID_SIZE],
253    /// ObjectId of the proof object.
254    pub proof_object_id: [u8; ID_SIZE],
255    /// marker_id of the previous commit (all zeros for genesis).
256    pub prev_marker_id: [u8; ID_SIZE],
257    /// This record's marker_id: `Trunc128(BLAKE3("fsqlite:marker:v1" || prefix_bytes))`.
258    pub marker_id: [u8; ID_SIZE],
259}
260
261impl CommitMarkerRecord {
262    /// Create a new record, computing `marker_id` from the other fields.
263    #[must_use]
264    pub fn new(
265        commit_seq: u64,
266        commit_time_unix_ns: u64,
267        capsule_object_id: [u8; ID_SIZE],
268        proof_object_id: [u8; ID_SIZE],
269        prev_marker_id: [u8; ID_SIZE],
270    ) -> Self {
271        let marker_id = compute_marker_id(
272            commit_seq,
273            commit_time_unix_ns,
274            &capsule_object_id,
275            &proof_object_id,
276            &prev_marker_id,
277        );
278        Self {
279            commit_seq,
280            commit_time_unix_ns,
281            capsule_object_id,
282            proof_object_id,
283            prev_marker_id,
284            marker_id,
285        }
286    }
287
288    /// Encode to exactly [`COMMIT_MARKER_RECORD_BYTES`] bytes.
289    #[must_use]
290    pub fn encode(&self) -> [u8; COMMIT_MARKER_RECORD_BYTES] {
291        let mut buf = [0u8; COMMIT_MARKER_RECORD_BYTES];
292        buf[0..8].copy_from_slice(&self.commit_seq.to_le_bytes());
293        buf[8..16].copy_from_slice(&self.commit_time_unix_ns.to_le_bytes());
294        buf[16..32].copy_from_slice(&self.capsule_object_id);
295        buf[32..48].copy_from_slice(&self.proof_object_id);
296        buf[48..64].copy_from_slice(&self.prev_marker_id);
297        buf[64..80].copy_from_slice(&self.marker_id);
298        // record_xxh3: hash of bytes [0..80]
299        let hash = xxhash_rust::xxh3::xxh3_64(&buf[..80]);
300        buf[80..88].copy_from_slice(&hash.to_le_bytes());
301        buf
302    }
303
304    /// Decode from a byte slice. Validates xxh3 checksum.
305    pub fn decode(data: &[u8]) -> Result<Self, MarkerError> {
306        if data.len() < COMMIT_MARKER_RECORD_BYTES {
307            return Err(MarkerError::RecordTooShort);
308        }
309
310        let commit_seq = u64::from_le_bytes(data[0..8].try_into().expect("8 bytes"));
311        let commit_time_unix_ns = u64::from_le_bytes(data[8..16].try_into().expect("8 bytes"));
312
313        let mut capsule_object_id = [0u8; ID_SIZE];
314        capsule_object_id.copy_from_slice(&data[16..32]);
315
316        let mut proof_object_id = [0u8; ID_SIZE];
317        proof_object_id.copy_from_slice(&data[32..48]);
318
319        let mut prev_marker_id = [0u8; ID_SIZE];
320        prev_marker_id.copy_from_slice(&data[48..64]);
321
322        let mut marker_id = [0u8; ID_SIZE];
323        marker_id.copy_from_slice(&data[64..80]);
324
325        let stored_hash = u64::from_le_bytes(data[80..88].try_into().expect("8 bytes"));
326        let computed_hash = xxhash_rust::xxh3::xxh3_64(&data[..80]);
327        if stored_hash != computed_hash {
328            return Err(MarkerError::RecordChecksumMismatch {
329                expected: computed_hash,
330                actual: stored_hash,
331            });
332        }
333
334        Ok(Self {
335            commit_seq,
336            commit_time_unix_ns,
337            capsule_object_id,
338            proof_object_id,
339            prev_marker_id,
340            marker_id,
341        })
342    }
343
344    /// Verify that `marker_id` matches the recomputed value.
345    #[must_use]
346    pub fn verify_marker_id(&self) -> bool {
347        let expected = compute_marker_id(
348            self.commit_seq,
349            self.commit_time_unix_ns,
350            &self.capsule_object_id,
351            &self.proof_object_id,
352            &self.prev_marker_id,
353        );
354        self.marker_id == expected
355    }
356}
357
358// ---------------------------------------------------------------------------
359// marker_id computation
360// ---------------------------------------------------------------------------
361
362/// Compute marker_id: `Trunc128(BLAKE3("fsqlite:marker:v1" || prefix_bytes))`.
363///
364/// `prefix_bytes` is the LE encoding of
365/// `(commit_seq, commit_time_unix_ns, capsule_object_id, proof_object_id, prev_marker_id)`.
366#[must_use]
367pub fn compute_marker_id(
368    commit_seq: u64,
369    commit_time_unix_ns: u64,
370    capsule_object_id: &[u8; ID_SIZE],
371    proof_object_id: &[u8; ID_SIZE],
372    prev_marker_id: &[u8; ID_SIZE],
373) -> [u8; ID_SIZE] {
374    let mut prefix = [0u8; RECORD_PREFIX_BYTES];
375    prefix[0..8].copy_from_slice(&commit_seq.to_le_bytes());
376    prefix[8..16].copy_from_slice(&commit_time_unix_ns.to_le_bytes());
377    prefix[16..32].copy_from_slice(capsule_object_id);
378    prefix[32..48].copy_from_slice(proof_object_id);
379    prefix[48..64].copy_from_slice(prev_marker_id);
380
381    let mut hasher = blake3::Hasher::new();
382    hasher.update(MARKER_ID_DOMAIN);
383    hasher.update(&prefix);
384    let hash = hasher.finalize();
385
386    let mut id = [0u8; ID_SIZE];
387    id.copy_from_slice(&hash.as_bytes()[..ID_SIZE]);
388    id
389}
390
391// ---------------------------------------------------------------------------
392// O(1) seek helpers
393// ---------------------------------------------------------------------------
394
395/// Compute which segment a `commit_seq` falls in (fixed rotation policy).
396#[must_use]
397pub const fn segment_id_for_commit_seq(commit_seq: u64) -> u64 {
398    commit_seq / MARKERS_PER_SEGMENT
399}
400
401/// Compute the `start_commit_seq` for a given segment_id.
402#[must_use]
403pub const fn start_commit_seq_for_segment(segment_id: u64) -> u64 {
404    segment_id * MARKERS_PER_SEGMENT
405}
406
407/// Compute the byte offset of a record within a segment file.
408///
409/// `offset = MARKER_SEGMENT_HEADER_BYTES + (commit_seq - start_commit_seq) * COMMIT_MARKER_RECORD_BYTES`
410#[must_use]
411pub const fn record_offset(commit_seq: u64, start_commit_seq: u64) -> u64 {
412    let slot = commit_seq - start_commit_seq;
413    MARKER_SEGMENT_HEADER_BYTES as u64 + slot * COMMIT_MARKER_RECORD_BYTES as u64
414}
415
416/// Compute the next `commit_seq` from segment file length (crash-safe allocation).
417///
418/// `next_commit_seq = start_commit_seq + floor((file_len - header) / record_size)`
419#[must_use]
420pub const fn next_commit_seq_from_file_len(start_commit_seq: u64, file_len: u64) -> u64 {
421    if file_len < MARKER_SEGMENT_HEADER_BYTES as u64 {
422        return start_commit_seq;
423    }
424    let data_bytes = file_len - MARKER_SEGMENT_HEADER_BYTES as u64;
425    let n_records = data_bytes / COMMIT_MARKER_RECORD_BYTES as u64;
426    start_commit_seq + n_records
427}
428
429// ---------------------------------------------------------------------------
430// Torn tail handling
431// ---------------------------------------------------------------------------
432
433/// Scan a segment's record region and return the count of valid records
434/// from the start.  Stops at the first record that fails xxh3 verification
435/// or marker_id verification (tamper detection).
436///
437/// `data` must be the record region only (header already stripped).
438#[must_use]
439pub fn valid_record_prefix_count(data: &[u8]) -> u64 {
440    let mut count = 0u64;
441    let mut offset = 0;
442    while offset + COMMIT_MARKER_RECORD_BYTES <= data.len() {
443        let record_bytes = &data[offset..offset + COMMIT_MARKER_RECORD_BYTES];
444        let Ok(record) = CommitMarkerRecord::decode(record_bytes) else {
445            break;
446        };
447        // Security: also verify marker_id to reject forged records.
448        if !record.verify_marker_id() {
449            break;
450        }
451        count += 1;
452        offset += COMMIT_MARKER_RECORD_BYTES;
453    }
454    count
455}
456
457/// Analyze a full segment buffer for torn tail conditions.
458///
459/// Returns `Ok(n_records)` if all complete records are valid,
460/// or `Err(TornTail { .. })` if there are trailing partial bytes.
461pub fn check_segment_integrity(segment_data: &[u8]) -> Result<u64, MarkerError> {
462    if segment_data.len() < MARKER_SEGMENT_HEADER_BYTES {
463        return Err(MarkerError::HeaderTooShort);
464    }
465
466    // Header must be valid before we reason about record layout.
467    let _header = MarkerSegmentHeader::decode(&segment_data[..MARKER_SEGMENT_HEADER_BYTES])?;
468
469    let record_region = &segment_data[MARKER_SEGMENT_HEADER_BYTES..];
470    let complete_records = record_region.len() / COMMIT_MARKER_RECORD_BYTES;
471    let trailing = record_region.len() % COMMIT_MARKER_RECORD_BYTES;
472
473    // Verify all complete records up to the first decode failure while also
474    // enforcing density (`commit_seq = start_commit_seq + slot`).
475    let recovered = recover_valid_prefix(segment_data)?;
476    let valid = u64::try_from(recovered.len()).expect("record count always fits in u64");
477
478    #[allow(clippy::cast_possible_truncation)]
479    let complete_u64 = complete_records as u64;
480
481    if trailing > 0 || valid < complete_u64 {
482        let valid_usize = recovered.len();
483        return Err(MarkerError::TornTail {
484            complete_records: valid,
485            trailing_bytes: if valid < complete_u64 {
486                // Corruption mid-stream: remaining bytes from corrupt record onward.
487                record_region
488                    .len()
489                    .saturating_sub(valid_usize.saturating_mul(COMMIT_MARKER_RECORD_BYTES))
490            } else {
491                trailing
492            },
493        });
494    }
495
496    Ok(valid)
497}
498
499/// Recover the valid, density-checked prefix of commit markers from a segment.
500///
501/// This helper is intentionally tolerant of torn tails: it stops at the first
502/// undecodable record and returns the valid prefix. Density violations are
503/// fail-closed and returned as [`MarkerError::CommitSeqMismatch`].
504pub fn recover_valid_prefix(segment_data: &[u8]) -> Result<Vec<CommitMarkerRecord>, MarkerError> {
505    if segment_data.len() < MARKER_SEGMENT_HEADER_BYTES {
506        return Err(MarkerError::HeaderTooShort);
507    }
508
509    let header = MarkerSegmentHeader::decode(&segment_data[..MARKER_SEGMENT_HEADER_BYTES])?;
510    let record_region = &segment_data[MARKER_SEGMENT_HEADER_BYTES..];
511
512    let mut records = Vec::new();
513    let mut offset = 0usize;
514
515    while offset + COMMIT_MARKER_RECORD_BYTES <= record_region.len() {
516        let record_bytes = &record_region[offset..offset + COMMIT_MARKER_RECORD_BYTES];
517        let Ok(record) = CommitMarkerRecord::decode(record_bytes) else {
518            break;
519        };
520
521        let expected = header.start_commit_seq
522            + u64::try_from(records.len()).expect("record vector length always fits in u64");
523        if record.commit_seq != expected {
524            return Err(MarkerError::CommitSeqMismatch {
525                expected,
526                actual: record.commit_seq,
527            });
528        }
529
530        // Security: reject records with forged marker_id even if xxh3 is consistent.
531        // An attacker can recompute record_xxh3 to match altered fields, but cannot
532        // forge the BLAKE3-based marker_id without knowing the correct inputs.
533        if !record.verify_marker_id() {
534            return Err(MarkerError::MarkerIdMismatch {
535                commit_seq: record.commit_seq,
536            });
537        }
538
539        records.push(record);
540        offset += COMMIT_MARKER_RECORD_BYTES;
541    }
542
543    Ok(records)
544}
545
546// ---------------------------------------------------------------------------
547// Binary search by time
548// ---------------------------------------------------------------------------
549
550/// Binary search for the commit_seq whose `commit_time_unix_ns` is the greatest
551/// value <= `target_ns`.  Returns `None` if all records are after `target_ns`.
552///
553/// `read_record` is called with a commit_seq and must return the decoded record.
554pub fn binary_search_by_time<F>(
555    start_commit_seq: u64,
556    record_count: u64,
557    target_ns: u64,
558    mut read_record: F,
559) -> Option<u64>
560where
561    F: FnMut(u64) -> Option<CommitMarkerRecord>,
562{
563    if record_count == 0 {
564        return None;
565    }
566
567    let mut lo = 0u64;
568    let mut hi = record_count;
569    let mut best: Option<u64> = None;
570
571    while lo < hi {
572        let mid = lo + (hi - lo) / 2;
573        let seq = start_commit_seq + mid;
574        let Some(record) = read_record(seq) else {
575            break;
576        };
577
578        if record.commit_time_unix_ns <= target_ns {
579            best = Some(seq);
580            lo = mid + 1;
581        } else {
582            hi = mid;
583        }
584    }
585
586    best
587}
588
589// =========================================================================
590// Tests
591// =========================================================================
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596    use std::collections::HashSet;
597
598    // ===================================================================
599    // bd-1hi.23 — Commit Marker Stream Format (§3.5.4)
600    // ===================================================================
601
602    fn make_test_record(seq: u64, prev: [u8; ID_SIZE]) -> CommitMarkerRecord {
603        let capsule = [(seq & 0xFF) as u8; ID_SIZE];
604        let proof = [((seq >> 8) & 0xFF) as u8; ID_SIZE];
605        let time_ns = 1_700_000_000_000_000_000u64 + seq * 1_000_000;
606        CommitMarkerRecord::new(seq, time_ns, capsule, proof, prev)
607    }
608
609    #[test]
610    fn test_marker_segment_header_encode_decode() {
611        let header = MarkerSegmentHeader::new(42, 42_000_000);
612        let encoded = header.encode();
613        assert_eq!(
614            encoded.len(),
615            MARKER_SEGMENT_HEADER_BYTES,
616            "header must be exactly {MARKER_SEGMENT_HEADER_BYTES} bytes"
617        );
618
619        let decoded = MarkerSegmentHeader::decode(&encoded).expect("decode must succeed");
620        assert_eq!(decoded, header);
621
622        // Verify magic bytes.
623        assert_eq!(&encoded[0..4], b"FSMK");
624    }
625
626    #[test]
627    fn test_commit_marker_record_encode_decode() {
628        let record = make_test_record(7, [0u8; ID_SIZE]);
629        let encoded = record.encode();
630        assert_eq!(
631            encoded.len(),
632            COMMIT_MARKER_RECORD_BYTES,
633            "record must be exactly {COMMIT_MARKER_RECORD_BYTES} bytes"
634        );
635
636        let decoded = CommitMarkerRecord::decode(&encoded).expect("decode must succeed");
637        assert_eq!(decoded, record);
638    }
639
640    #[test]
641    fn test_marker_id_computation() {
642        let seq = 100u64;
643        let time_ns = 1_700_000_000_000_000_000u64;
644        let capsule = [0xAAu8; ID_SIZE];
645        let proof = [0xBBu8; ID_SIZE];
646        let prev = [0u8; ID_SIZE];
647
648        let marker_id = compute_marker_id(seq, time_ns, &capsule, &proof, &prev);
649
650        // Manually compute the expected value.
651        let mut prefix = [0u8; RECORD_PREFIX_BYTES];
652        prefix[0..8].copy_from_slice(&seq.to_le_bytes());
653        prefix[8..16].copy_from_slice(&time_ns.to_le_bytes());
654        prefix[16..32].copy_from_slice(&capsule);
655        prefix[32..48].copy_from_slice(&proof);
656        prefix[48..64].copy_from_slice(&prev);
657
658        let mut hasher = blake3::Hasher::new();
659        hasher.update(MARKER_ID_DOMAIN);
660        hasher.update(&prefix);
661        let hash = hasher.finalize();
662        let mut expected = [0u8; ID_SIZE];
663        expected.copy_from_slice(&hash.as_bytes()[..ID_SIZE]);
664
665        assert_eq!(
666            marker_id, expected,
667            "marker_id must be Trunc128(BLAKE3(domain || prefix))"
668        );
669    }
670
671    #[test]
672    fn test_density_invariant() {
673        let start_seq = 1000u64;
674        let mut prev = [0u8; ID_SIZE];
675        let mut records = Vec::new();
676
677        for i in 0..5u64 {
678            let record = make_test_record(start_seq + i, prev);
679            prev = record.marker_id;
680            records.push(record);
681        }
682
683        for (i, record) in records.iter().enumerate() {
684            assert_eq!(
685                record.commit_seq,
686                start_seq + i as u64,
687                "record at slot {i} must have commit_seq = start + {i}"
688            );
689        }
690    }
691
692    #[test]
693    fn test_o1_seek_by_commit_seq() {
694        let start_seq = 0u64;
695        let header = MarkerSegmentHeader::new(0, start_seq);
696        let mut segment = Vec::from(header.encode());
697
698        let mut prev = [0u8; ID_SIZE];
699        let mut records = Vec::new();
700        for i in 0..1000u64 {
701            let record = make_test_record(start_seq + i, prev);
702            prev = record.marker_id;
703            segment.extend_from_slice(&record.encode());
704            records.push(record);
705        }
706
707        // Seek to commit_seq=500 via offset formula.
708        let target_seq = 500u64;
709        #[allow(clippy::cast_possible_truncation)]
710        let offset = record_offset(target_seq, start_seq) as usize;
711        let record_bytes = &segment[offset..offset + COMMIT_MARKER_RECORD_BYTES];
712        let record = CommitMarkerRecord::decode(record_bytes).expect("decode at offset");
713        assert_eq!(record.commit_seq, target_seq);
714        assert_eq!(record, records[500]);
715    }
716
717    #[test]
718    fn test_commit_seq_allocation_from_file_length() {
719        let start_seq = 5000u64;
720        // 10 records: file_len = 36 + 10 * 88 = 916
721        let file_len = MARKER_SEGMENT_HEADER_BYTES as u64 + 10 * COMMIT_MARKER_RECORD_BYTES as u64;
722        assert_eq!(file_len, 916);
723
724        let next = next_commit_seq_from_file_len(start_seq, file_len);
725        assert_eq!(next, start_seq + 10);
726    }
727
728    #[test]
729    fn test_torn_tail_handling() {
730        let start_seq = 0u64;
731        let header = MarkerSegmentHeader::new(0, start_seq);
732        let mut segment = Vec::from(header.encode());
733
734        let mut prev = [0u8; ID_SIZE];
735        for i in 0..5u64 {
736            let record = make_test_record(start_seq + i, prev);
737            prev = record.marker_id;
738            segment.extend_from_slice(&record.encode());
739        }
740
741        // Append 44 partial bytes (half a record).
742        segment.extend_from_slice(&[0xDE; 44]);
743
744        let result = check_segment_integrity(&segment);
745        match result {
746            Err(MarkerError::TornTail {
747                complete_records,
748                trailing_bytes,
749            }) => {
750                assert_eq!(complete_records, 5);
751                assert_eq!(trailing_bytes, 44);
752            }
753            other => unreachable!("expected TornTail, got {other:?}"),
754        }
755    }
756
757    #[test]
758    fn test_torn_tail_corrupt_last_record() {
759        let start_seq = 0u64;
760        let header = MarkerSegmentHeader::new(0, start_seq);
761        let mut segment = Vec::from(header.encode());
762
763        let mut prev = [0u8; ID_SIZE];
764        for i in 0..5u64 {
765            let record = make_test_record(start_seq + i, prev);
766            prev = record.marker_id;
767            segment.extend_from_slice(&record.encode());
768        }
769
770        // Corrupt record 4's xxh3 (last 8 bytes of record 4).
771        let record_4_end = MARKER_SEGMENT_HEADER_BYTES + 5 * COMMIT_MARKER_RECORD_BYTES;
772        let xxh3_start = record_4_end - 8;
773        segment[xxh3_start] ^= 0xFF;
774
775        let result = check_segment_integrity(&segment);
776        match result {
777            Err(MarkerError::TornTail {
778                complete_records, ..
779            }) => {
780                assert_eq!(complete_records, 4, "valid prefix is records 0-3");
781            }
782            other => unreachable!("expected TornTail, got {other:?}"),
783        }
784    }
785
786    #[test]
787    fn test_commit_seq_mismatch_detected() {
788        let start_seq = 100u64;
789        let header = MarkerSegmentHeader::new(0, start_seq);
790        let mut segment = Vec::from(header.encode());
791
792        let first = make_test_record(start_seq, [0u8; ID_SIZE]);
793        let second = make_test_record(start_seq + 2, first.marker_id);
794        segment.extend_from_slice(&first.encode());
795        segment.extend_from_slice(&second.encode());
796
797        let result = check_segment_integrity(&segment);
798        match result {
799            Err(MarkerError::CommitSeqMismatch { expected, actual }) => {
800                assert_eq!(expected, start_seq + 1);
801                assert_eq!(actual, start_seq + 2);
802            }
803            other => unreachable!("expected CommitSeqMismatch, got {other:?}"),
804        }
805    }
806
807    #[test]
808    fn test_binary_search_by_time() {
809        let start_seq = 0u64;
810        let base_ns = 1_000_000_000_000_000_000u64;
811
812        let records: Vec<CommitMarkerRecord> = (0..100u64)
813            .scan([0u8; ID_SIZE], |prev, i| {
814                let capsule = [(i & 0xFF) as u8; ID_SIZE];
815                let proof = [((i >> 8) & 0xFF) as u8; ID_SIZE];
816                let time_ns = base_ns + i * 1_000_000;
817                let record = CommitMarkerRecord::new(i, time_ns, capsule, proof, *prev);
818                *prev = record.marker_id;
819                Some(record)
820            })
821            .collect();
822
823        // Search for time at commit_seq=50.
824        let target_ns = base_ns + 50 * 1_000_000;
825        #[allow(clippy::cast_possible_truncation)]
826        let result = binary_search_by_time(start_seq, 100, target_ns, |seq| {
827            records.get(seq as usize).cloned()
828        });
829        assert_eq!(result, Some(50));
830
831        // Search before all records.
832        #[allow(clippy::cast_possible_truncation)]
833        let result = binary_search_by_time(start_seq, 100, base_ns - 1, |seq| {
834            records.get(seq as usize).cloned()
835        });
836        assert_eq!(result, None);
837
838        // Search after all records.
839        #[allow(clippy::cast_possible_truncation)]
840        let result = binary_search_by_time(start_seq, 100, u64::MAX, |seq| {
841            records.get(seq as usize).cloned()
842        });
843        assert_eq!(result, Some(99));
844    }
845
846    #[test]
847    fn test_fork_detection() {
848        let base_ns = 1_700_000_000_000_000_000u64;
849        let mut prev = [0u8; ID_SIZE];
850
851        // Build a shared prefix of 10 commits.
852        let mut shared = Vec::new();
853        for i in 0..10u64 {
854            let capsule = [0xAAu8; ID_SIZE];
855            let proof = [0xBBu8; ID_SIZE];
856            let record = CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev);
857            prev = record.marker_id;
858            shared.push(record);
859        }
860
861        // Fork A: continues from shared[9].
862        let mut fork_a = shared.clone();
863        let mut prev_a = shared[9].marker_id;
864        for i in 10..15u64 {
865            let capsule = [0x11u8; ID_SIZE];
866            let proof = [0x22u8; ID_SIZE];
867            let record =
868                CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev_a);
869            prev_a = record.marker_id;
870            fork_a.push(record);
871        }
872
873        // Fork B: different content from commit 10.
874        let mut fork_b = shared.clone();
875        let mut prev_b = shared[9].marker_id;
876        for i in 10..13u64 {
877            let capsule = [0x33u8; ID_SIZE];
878            let proof = [0x44u8; ID_SIZE];
879            let record =
880                CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev_b);
881            prev_b = record.marker_id;
882            fork_b.push(record);
883        }
884
885        // Binary search for divergence point.
886        let min_len = fork_a.len().min(fork_b.len());
887        let mut lo = 0usize;
888        let mut hi = min_len;
889        while lo < hi {
890            let mid = lo + (hi - lo) / 2;
891            if fork_a[mid].marker_id == fork_b[mid].marker_id {
892                lo = mid + 1;
893            } else {
894                hi = mid;
895            }
896        }
897
898        // Greatest common prefix is at index lo - 1.
899        assert_eq!(lo, 10, "divergence starts at commit_seq 10");
900        assert_eq!(
901            fork_a[9].marker_id, fork_b[9].marker_id,
902            "last matching marker_id is at seq 9"
903        );
904        assert_ne!(
905            fork_a[10].marker_id, fork_b[10].marker_id,
906            "first divergence at seq 10"
907        );
908    }
909
910    #[test]
911    fn test_hash_chain_integrity() {
912        let mut prev = [0u8; ID_SIZE];
913        let mut records = Vec::new();
914
915        for i in 0..10u64 {
916            let record = make_test_record(i, prev);
917            prev = record.marker_id;
918            records.push(record);
919        }
920
921        // Verify chain links.
922        for i in 1..records.len() {
923            assert_eq!(
924                records[i].prev_marker_id,
925                records[i - 1].marker_id,
926                "record {i} prev_marker_id must link to record {}'s marker_id",
927                i - 1
928            );
929        }
930        assert_eq!(records[0].prev_marker_id, [0u8; ID_SIZE], "genesis is zero");
931
932        // Verify all marker_ids.
933        for record in &records {
934            assert!(
935                record.verify_marker_id(),
936                "marker_id must be verifiable for commit_seq {}",
937                record.commit_seq
938            );
939        }
940
941        // Tamper with one record and verify detection.
942        let mut tampered = records[5].clone();
943        tampered.capsule_object_id[0] ^= 0xFF;
944        assert!(
945            !tampered.verify_marker_id(),
946            "tampered record must fail marker_id verification"
947        );
948    }
949
950    #[test]
951    fn test_marker_no_mem_size_of() {
952        // Verify on-disk sizes are constants, not derived from mem::size_of.
953        assert_eq!(MARKER_SEGMENT_HEADER_BYTES, 36);
954        assert_eq!(COMMIT_MARKER_RECORD_BYTES, 88);
955
956        // Verify the actual struct sizes may differ from on-disk sizes
957        // (padding), confirming we don't use mem::size_of for offsets.
958        let header = MarkerSegmentHeader::new(0, 0);
959        let encoded_header = header.encode();
960        assert_eq!(
961            encoded_header.len(),
962            36,
963            "header on-disk size is a constant"
964        );
965
966        let record = make_test_record(0, [0u8; ID_SIZE]);
967        let encoded_record = record.encode();
968        assert_eq!(
969            encoded_record.len(),
970            88,
971            "record on-disk size is a constant"
972        );
973    }
974
975    #[test]
976    fn test_header_bad_magic_rejected() {
977        let header = MarkerSegmentHeader::new(1, 0);
978        let mut encoded = header.encode();
979        encoded[0] = b'X';
980
981        let result = MarkerSegmentHeader::decode(&encoded);
982        assert_eq!(result.unwrap_err(), MarkerError::BadMagic);
983    }
984
985    #[test]
986    fn test_header_checksum_tamper_detected() {
987        let header = MarkerSegmentHeader::new(1, 0);
988        let mut encoded = header.encode();
989        // Tamper with segment_id.
990        encoded[8] ^= 0x01;
991
992        let result = MarkerSegmentHeader::decode(&encoded);
993        assert!(matches!(
994            result.unwrap_err(),
995            MarkerError::HeaderChecksumMismatch { .. }
996        ));
997    }
998
999    #[test]
1000    fn test_record_checksum_tamper_detected() {
1001        let record = make_test_record(42, [0u8; ID_SIZE]);
1002        let mut encoded = record.encode();
1003        // Tamper with commit_time_unix_ns.
1004        encoded[10] ^= 0x01;
1005
1006        let result = CommitMarkerRecord::decode(&encoded);
1007        assert!(matches!(
1008            result.unwrap_err(),
1009            MarkerError::RecordChecksumMismatch { .. }
1010        ));
1011    }
1012
1013    #[test]
1014    fn test_segment_id_for_commit_seq() {
1015        assert_eq!(segment_id_for_commit_seq(0), 0);
1016        assert_eq!(segment_id_for_commit_seq(999_999), 0);
1017        assert_eq!(segment_id_for_commit_seq(1_000_000), 1);
1018        assert_eq!(segment_id_for_commit_seq(2_500_000), 2);
1019    }
1020
1021    #[test]
1022    fn test_start_commit_seq_for_segment() {
1023        assert_eq!(start_commit_seq_for_segment(0), 0);
1024        assert_eq!(start_commit_seq_for_segment(1), 1_000_000);
1025        assert_eq!(start_commit_seq_for_segment(5), 5_000_000);
1026    }
1027
1028    #[test]
1029    fn test_record_offset_formula() {
1030        let offset = record_offset(500, 0);
1031        assert_eq!(
1032            offset,
1033            MARKER_SEGMENT_HEADER_BYTES as u64 + 500 * COMMIT_MARKER_RECORD_BYTES as u64
1034        );
1035
1036        let offset2 = record_offset(1_000_050, 1_000_000);
1037        assert_eq!(
1038            offset2,
1039            MARKER_SEGMENT_HEADER_BYTES as u64 + 50 * COMMIT_MARKER_RECORD_BYTES as u64
1040        );
1041    }
1042
1043    #[test]
1044    fn test_error_display() {
1045        let err = MarkerError::BadMagic;
1046        assert_eq!(err.to_string(), "bad magic in marker segment header");
1047
1048        let err = MarkerError::TornTail {
1049            complete_records: 5,
1050            trailing_bytes: 44,
1051        };
1052        assert!(err.to_string().contains("torn tail"));
1053        assert!(err.to_string().contains('5'));
1054    }
1055
1056    #[test]
1057    fn test_marker_id_deterministic() {
1058        let capsule = [0xAA; ID_SIZE];
1059        let proof = [0xBB; ID_SIZE];
1060        let prev = [0u8; ID_SIZE];
1061
1062        let id1 = compute_marker_id(1, 100, &capsule, &proof, &prev);
1063        let id2 = compute_marker_id(1, 100, &capsule, &proof, &prev);
1064        assert_eq!(id1, id2, "marker_id must be deterministic");
1065
1066        // Different input → different output.
1067        let id3 = compute_marker_id(2, 100, &capsule, &proof, &prev);
1068        assert_ne!(id1, id3);
1069    }
1070
1071    #[test]
1072    fn prop_marker_id_unique() {
1073        let mut rng_state = 0xDEAD_BEEF_CAFE_BABE_u64;
1074        let mut observed_ids = HashSet::new();
1075
1076        for i in 0..2048u64 {
1077            // Deterministic pseudo-random generation to avoid flaky tests.
1078            rng_state = rng_state
1079                .wrapping_mul(6_364_136_223_846_793_005)
1080                .wrapping_add(1);
1081            let mut capsule = [0u8; ID_SIZE];
1082            let mut proof = [0u8; ID_SIZE];
1083            let mut prev = [0u8; ID_SIZE];
1084
1085            capsule[..8].copy_from_slice(&rng_state.to_le_bytes());
1086            proof[..8].copy_from_slice(&rng_state.rotate_left(13).to_le_bytes());
1087            prev[..8].copy_from_slice(&rng_state.rotate_right(7).to_le_bytes());
1088            capsule[8..16].copy_from_slice(&i.to_le_bytes());
1089            proof[8..16].copy_from_slice(&(i ^ rng_state).to_le_bytes());
1090            prev[8..16].copy_from_slice(&(i.wrapping_mul(17)).to_le_bytes());
1091
1092            let marker_id =
1093                compute_marker_id(i, 1_700_000_000_000_000_000 + i, &capsule, &proof, &prev);
1094            assert!(
1095                observed_ids.insert(marker_id),
1096                "marker_id collision at sample {i}: {marker_id:02X?}"
1097            );
1098        }
1099    }
1100
1101    #[test]
1102    fn prop_density_invariant_holds() {
1103        for count in 1..=256u64 {
1104            let start_seq = 10_000 + count * 31;
1105            let header = MarkerSegmentHeader::new(segment_id_for_commit_seq(start_seq), start_seq);
1106            let mut segment = Vec::from(header.encode());
1107            let mut prev = [0u8; ID_SIZE];
1108
1109            for i in 0..count {
1110                let record = make_test_record(start_seq + i, prev);
1111                prev = record.marker_id;
1112                segment.extend_from_slice(&record.encode());
1113            }
1114
1115            let integrity = check_segment_integrity(&segment).expect("segment should be dense");
1116            assert_eq!(integrity, count);
1117        }
1118    }
1119
1120    #[test]
1121    fn test_e2e_marker_stream_recovery() {
1122        let start_seq = 20_000u64;
1123        let header = MarkerSegmentHeader::new(segment_id_for_commit_seq(start_seq), start_seq);
1124        let mut segment = Vec::from(header.encode());
1125        let mut expected = Vec::new();
1126        let mut prev = [0u8; ID_SIZE];
1127
1128        for i in 0..1000u64 {
1129            let record = make_test_record(start_seq + i, prev);
1130            prev = record.marker_id;
1131            segment.extend_from_slice(&record.encode());
1132            expected.push(record);
1133        }
1134
1135        // Simulate crash: truncate in the middle of the final record.
1136        segment.truncate(segment.len() - (COMMIT_MARKER_RECORD_BYTES / 2));
1137
1138        let recovered = recover_valid_prefix(&segment).expect("recovery should succeed");
1139        assert_eq!(recovered.len(), expected.len() - 1);
1140        assert_eq!(recovered, expected[..expected.len() - 1]);
1141
1142        let integrity = check_segment_integrity(&segment);
1143        match integrity {
1144            Err(MarkerError::TornTail {
1145                complete_records,
1146                trailing_bytes,
1147            }) => {
1148                assert_eq!(complete_records, 999);
1149                assert_eq!(trailing_bytes, COMMIT_MARKER_RECORD_BYTES / 2);
1150            }
1151            other => unreachable!("expected torn-tail integrity result, got {other:?}"),
1152        }
1153    }
1154
1155    #[test]
1156    fn test_e2e_time_travel_query() {
1157        let start_seq = 5_000u64;
1158        let count = 256u64;
1159        let base_ns = 1_900_000_000_000_000_000u64;
1160        let mut prev = [0u8; ID_SIZE];
1161        let mut records = Vec::new();
1162
1163        for i in 0..count {
1164            let capsule = [(i & 0xFF) as u8; ID_SIZE];
1165            let proof = [((i >> 8) & 0xFF) as u8; ID_SIZE];
1166            let record = CommitMarkerRecord::new(
1167                start_seq + i,
1168                base_ns + i * 2_000_000,
1169                capsule,
1170                proof,
1171                prev,
1172            );
1173            prev = record.marker_id;
1174            records.push(record);
1175        }
1176
1177        let lookup = |seq: u64| -> Option<CommitMarkerRecord> {
1178            if seq < start_seq {
1179                return None;
1180            }
1181            let idx = usize::try_from(seq - start_seq).ok()?;
1182            records.get(idx).cloned()
1183        };
1184
1185        // Before the first marker.
1186        assert_eq!(
1187            binary_search_by_time(start_seq, count, base_ns - 1, lookup),
1188            None
1189        );
1190        // Exact hit.
1191        assert_eq!(
1192            binary_search_by_time(start_seq, count, base_ns + 40 * 2_000_000, lookup),
1193            Some(start_seq + 40)
1194        );
1195        // Between two commits should select the lower commit_seq.
1196        assert_eq!(
1197            binary_search_by_time(
1198                start_seq,
1199                count,
1200                base_ns + 40 * 2_000_000 + 1_000_000,
1201                lookup
1202            ),
1203            Some(start_seq + 40)
1204        );
1205        // After the final marker.
1206        assert_eq!(
1207            binary_search_by_time(start_seq, count, u64::MAX, lookup),
1208            Some(start_seq + count - 1)
1209        );
1210    }
1211}