1pub const MARKER_SEGMENT_MAGIC: [u8; 4] = *b"FSMK";
15
16pub const MARKER_FORMAT_VERSION: u32 = 1;
18
19pub const MARKER_SEGMENT_HEADER_BYTES: usize = 36;
21
22pub const COMMIT_MARKER_RECORD_BYTES: usize = 88;
24
25pub const MARKERS_PER_SEGMENT: u64 = 1_000_000;
27
28const MARKER_ID_DOMAIN: &[u8] = b"fsqlite:marker:v1";
30
31const ID_SIZE: usize = 16;
33
34const RECORD_PREFIX_BYTES: usize = 64;
38
39#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum MarkerError {
46 HeaderTooShort,
48 BadMagic,
50 HeaderChecksumMismatch { expected: u64, actual: u64 },
52 RecordTooShort,
54 RecordChecksumMismatch { expected: u64, actual: u64 },
56 UnsupportedVersion { version: u32 },
58 RecordSizeMismatch { expected: u32, actual: u32 },
60 CommitSeqMismatch { expected: u64, actual: u64 },
62 TornTail {
64 complete_records: u64,
65 trailing_bytes: usize,
66 },
67 MarkerIdMismatch { commit_seq: u64 },
69}
70
71impl std::fmt::Display for MarkerError {
72 #[allow(clippy::too_many_lines)]
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Self::HeaderTooShort => f.write_str("marker segment header too short"),
76 Self::BadMagic => f.write_str("bad magic in marker segment header"),
77 Self::HeaderChecksumMismatch { expected, actual } => {
78 write!(
79 f,
80 "marker header xxh3 mismatch: expected {expected:#018X}, got {actual:#018X}"
81 )
82 }
83 Self::RecordTooShort => f.write_str("commit marker record too short"),
84 Self::RecordChecksumMismatch { expected, actual } => {
85 write!(
86 f,
87 "marker record xxh3 mismatch: expected {expected:#018X}, got {actual:#018X}"
88 )
89 }
90 Self::UnsupportedVersion { version } => {
91 write!(f, "unsupported marker format version: {version}")
92 }
93 Self::RecordSizeMismatch { expected, actual } => {
94 write!(
95 f,
96 "marker record size mismatch: expected {expected}, got {actual}"
97 )
98 }
99 Self::CommitSeqMismatch { expected, actual } => {
100 write!(f, "commit_seq mismatch: expected {expected}, got {actual}")
101 }
102 Self::TornTail {
103 complete_records,
104 trailing_bytes,
105 } => {
106 write!(
107 f,
108 "torn tail: {complete_records} complete records, {trailing_bytes} trailing bytes"
109 )
110 }
111 Self::MarkerIdMismatch { commit_seq } => {
112 write!(
113 f,
114 "marker_id verification failed for commit_seq {commit_seq} (forged or corrupt)"
115 )
116 }
117 }
118 }
119}
120
121impl std::error::Error for MarkerError {}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub struct MarkerSegmentHeader {
140 pub segment_id: u64,
142 pub start_commit_seq: u64,
144}
145
146impl MarkerSegmentHeader {
147 #[must_use]
149 pub const fn new(segment_id: u64, start_commit_seq: u64) -> Self {
150 Self {
151 segment_id,
152 start_commit_seq,
153 }
154 }
155
156 #[must_use]
158 pub fn encode(&self) -> [u8; MARKER_SEGMENT_HEADER_BYTES] {
159 let mut buf = [0u8; MARKER_SEGMENT_HEADER_BYTES];
160 buf[0..4].copy_from_slice(&MARKER_SEGMENT_MAGIC);
162 buf[4..8].copy_from_slice(&MARKER_FORMAT_VERSION.to_le_bytes());
164 buf[8..16].copy_from_slice(&self.segment_id.to_le_bytes());
166 buf[16..24].copy_from_slice(&self.start_commit_seq.to_le_bytes());
168 #[allow(clippy::cast_possible_truncation)]
170 let record_size = COMMIT_MARKER_RECORD_BYTES as u32;
171 buf[24..28].copy_from_slice(&record_size.to_le_bytes());
172 let hash = xxhash_rust::xxh3::xxh3_64(&buf[..28]);
174 buf[28..36].copy_from_slice(&hash.to_le_bytes());
175 buf
176 }
177
178 pub fn decode(data: &[u8]) -> Result<Self, MarkerError> {
180 if data.len() < MARKER_SEGMENT_HEADER_BYTES {
181 return Err(MarkerError::HeaderTooShort);
182 }
183
184 if data[0..4] != MARKER_SEGMENT_MAGIC {
186 return Err(MarkerError::BadMagic);
187 }
188
189 let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
191 if version != MARKER_FORMAT_VERSION {
192 return Err(MarkerError::UnsupportedVersion { version });
193 }
194
195 let segment_id = u64::from_le_bytes(data[8..16].try_into().expect("8 bytes"));
197
198 let start_commit_seq = u64::from_le_bytes(data[16..24].try_into().expect("8 bytes"));
200
201 let record_size = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);
203 #[allow(clippy::cast_possible_truncation)]
204 let expected_record_size = COMMIT_MARKER_RECORD_BYTES as u32;
205 if record_size != expected_record_size {
206 return Err(MarkerError::RecordSizeMismatch {
207 expected: expected_record_size,
208 actual: record_size,
209 });
210 }
211
212 let stored_hash = u64::from_le_bytes(data[28..36].try_into().expect("8 bytes"));
214 let computed_hash = xxhash_rust::xxh3::xxh3_64(&data[..28]);
215 if stored_hash != computed_hash {
216 return Err(MarkerError::HeaderChecksumMismatch {
217 expected: computed_hash,
218 actual: stored_hash,
219 });
220 }
221
222 Ok(Self {
223 segment_id,
224 start_commit_seq,
225 })
226 }
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct CommitMarkerRecord {
247 pub commit_seq: u64,
249 pub commit_time_unix_ns: u64,
251 pub capsule_object_id: [u8; ID_SIZE],
253 pub proof_object_id: [u8; ID_SIZE],
255 pub prev_marker_id: [u8; ID_SIZE],
257 pub marker_id: [u8; ID_SIZE],
259}
260
261impl CommitMarkerRecord {
262 #[must_use]
264 pub fn new(
265 commit_seq: u64,
266 commit_time_unix_ns: u64,
267 capsule_object_id: [u8; ID_SIZE],
268 proof_object_id: [u8; ID_SIZE],
269 prev_marker_id: [u8; ID_SIZE],
270 ) -> Self {
271 let marker_id = compute_marker_id(
272 commit_seq,
273 commit_time_unix_ns,
274 &capsule_object_id,
275 &proof_object_id,
276 &prev_marker_id,
277 );
278 Self {
279 commit_seq,
280 commit_time_unix_ns,
281 capsule_object_id,
282 proof_object_id,
283 prev_marker_id,
284 marker_id,
285 }
286 }
287
288 #[must_use]
290 pub fn encode(&self) -> [u8; COMMIT_MARKER_RECORD_BYTES] {
291 let mut buf = [0u8; COMMIT_MARKER_RECORD_BYTES];
292 buf[0..8].copy_from_slice(&self.commit_seq.to_le_bytes());
293 buf[8..16].copy_from_slice(&self.commit_time_unix_ns.to_le_bytes());
294 buf[16..32].copy_from_slice(&self.capsule_object_id);
295 buf[32..48].copy_from_slice(&self.proof_object_id);
296 buf[48..64].copy_from_slice(&self.prev_marker_id);
297 buf[64..80].copy_from_slice(&self.marker_id);
298 let hash = xxhash_rust::xxh3::xxh3_64(&buf[..80]);
300 buf[80..88].copy_from_slice(&hash.to_le_bytes());
301 buf
302 }
303
304 pub fn decode(data: &[u8]) -> Result<Self, MarkerError> {
306 if data.len() < COMMIT_MARKER_RECORD_BYTES {
307 return Err(MarkerError::RecordTooShort);
308 }
309
310 let commit_seq = u64::from_le_bytes(data[0..8].try_into().expect("8 bytes"));
311 let commit_time_unix_ns = u64::from_le_bytes(data[8..16].try_into().expect("8 bytes"));
312
313 let mut capsule_object_id = [0u8; ID_SIZE];
314 capsule_object_id.copy_from_slice(&data[16..32]);
315
316 let mut proof_object_id = [0u8; ID_SIZE];
317 proof_object_id.copy_from_slice(&data[32..48]);
318
319 let mut prev_marker_id = [0u8; ID_SIZE];
320 prev_marker_id.copy_from_slice(&data[48..64]);
321
322 let mut marker_id = [0u8; ID_SIZE];
323 marker_id.copy_from_slice(&data[64..80]);
324
325 let stored_hash = u64::from_le_bytes(data[80..88].try_into().expect("8 bytes"));
326 let computed_hash = xxhash_rust::xxh3::xxh3_64(&data[..80]);
327 if stored_hash != computed_hash {
328 return Err(MarkerError::RecordChecksumMismatch {
329 expected: computed_hash,
330 actual: stored_hash,
331 });
332 }
333
334 Ok(Self {
335 commit_seq,
336 commit_time_unix_ns,
337 capsule_object_id,
338 proof_object_id,
339 prev_marker_id,
340 marker_id,
341 })
342 }
343
344 #[must_use]
346 pub fn verify_marker_id(&self) -> bool {
347 let expected = compute_marker_id(
348 self.commit_seq,
349 self.commit_time_unix_ns,
350 &self.capsule_object_id,
351 &self.proof_object_id,
352 &self.prev_marker_id,
353 );
354 self.marker_id == expected
355 }
356}
357
358#[must_use]
367pub fn compute_marker_id(
368 commit_seq: u64,
369 commit_time_unix_ns: u64,
370 capsule_object_id: &[u8; ID_SIZE],
371 proof_object_id: &[u8; ID_SIZE],
372 prev_marker_id: &[u8; ID_SIZE],
373) -> [u8; ID_SIZE] {
374 let mut prefix = [0u8; RECORD_PREFIX_BYTES];
375 prefix[0..8].copy_from_slice(&commit_seq.to_le_bytes());
376 prefix[8..16].copy_from_slice(&commit_time_unix_ns.to_le_bytes());
377 prefix[16..32].copy_from_slice(capsule_object_id);
378 prefix[32..48].copy_from_slice(proof_object_id);
379 prefix[48..64].copy_from_slice(prev_marker_id);
380
381 let mut hasher = blake3::Hasher::new();
382 hasher.update(MARKER_ID_DOMAIN);
383 hasher.update(&prefix);
384 let hash = hasher.finalize();
385
386 let mut id = [0u8; ID_SIZE];
387 id.copy_from_slice(&hash.as_bytes()[..ID_SIZE]);
388 id
389}
390
391#[must_use]
397pub const fn segment_id_for_commit_seq(commit_seq: u64) -> u64 {
398 commit_seq / MARKERS_PER_SEGMENT
399}
400
401#[must_use]
403pub const fn start_commit_seq_for_segment(segment_id: u64) -> u64 {
404 segment_id * MARKERS_PER_SEGMENT
405}
406
407#[must_use]
411pub const fn record_offset(commit_seq: u64, start_commit_seq: u64) -> u64 {
412 let slot = commit_seq - start_commit_seq;
413 MARKER_SEGMENT_HEADER_BYTES as u64 + slot * COMMIT_MARKER_RECORD_BYTES as u64
414}
415
416#[must_use]
420pub const fn next_commit_seq_from_file_len(start_commit_seq: u64, file_len: u64) -> u64 {
421 if file_len < MARKER_SEGMENT_HEADER_BYTES as u64 {
422 return start_commit_seq;
423 }
424 let data_bytes = file_len - MARKER_SEGMENT_HEADER_BYTES as u64;
425 let n_records = data_bytes / COMMIT_MARKER_RECORD_BYTES as u64;
426 start_commit_seq + n_records
427}
428
429#[must_use]
439pub fn valid_record_prefix_count(data: &[u8]) -> u64 {
440 let mut count = 0u64;
441 let mut offset = 0;
442 while offset + COMMIT_MARKER_RECORD_BYTES <= data.len() {
443 let record_bytes = &data[offset..offset + COMMIT_MARKER_RECORD_BYTES];
444 let Ok(record) = CommitMarkerRecord::decode(record_bytes) else {
445 break;
446 };
447 if !record.verify_marker_id() {
449 break;
450 }
451 count += 1;
452 offset += COMMIT_MARKER_RECORD_BYTES;
453 }
454 count
455}
456
457pub fn check_segment_integrity(segment_data: &[u8]) -> Result<u64, MarkerError> {
462 if segment_data.len() < MARKER_SEGMENT_HEADER_BYTES {
463 return Err(MarkerError::HeaderTooShort);
464 }
465
466 let _header = MarkerSegmentHeader::decode(&segment_data[..MARKER_SEGMENT_HEADER_BYTES])?;
468
469 let record_region = &segment_data[MARKER_SEGMENT_HEADER_BYTES..];
470 let complete_records = record_region.len() / COMMIT_MARKER_RECORD_BYTES;
471 let trailing = record_region.len() % COMMIT_MARKER_RECORD_BYTES;
472
473 let recovered = recover_valid_prefix(segment_data)?;
476 let valid = u64::try_from(recovered.len()).expect("record count always fits in u64");
477
478 #[allow(clippy::cast_possible_truncation)]
479 let complete_u64 = complete_records as u64;
480
481 if trailing > 0 || valid < complete_u64 {
482 let valid_usize = recovered.len();
483 return Err(MarkerError::TornTail {
484 complete_records: valid,
485 trailing_bytes: if valid < complete_u64 {
486 record_region
488 .len()
489 .saturating_sub(valid_usize.saturating_mul(COMMIT_MARKER_RECORD_BYTES))
490 } else {
491 trailing
492 },
493 });
494 }
495
496 Ok(valid)
497}
498
499pub fn recover_valid_prefix(segment_data: &[u8]) -> Result<Vec<CommitMarkerRecord>, MarkerError> {
505 if segment_data.len() < MARKER_SEGMENT_HEADER_BYTES {
506 return Err(MarkerError::HeaderTooShort);
507 }
508
509 let header = MarkerSegmentHeader::decode(&segment_data[..MARKER_SEGMENT_HEADER_BYTES])?;
510 let record_region = &segment_data[MARKER_SEGMENT_HEADER_BYTES..];
511
512 let mut records = Vec::new();
513 let mut offset = 0usize;
514
515 while offset + COMMIT_MARKER_RECORD_BYTES <= record_region.len() {
516 let record_bytes = &record_region[offset..offset + COMMIT_MARKER_RECORD_BYTES];
517 let Ok(record) = CommitMarkerRecord::decode(record_bytes) else {
518 break;
519 };
520
521 let expected = header.start_commit_seq
522 + u64::try_from(records.len()).expect("record vector length always fits in u64");
523 if record.commit_seq != expected {
524 return Err(MarkerError::CommitSeqMismatch {
525 expected,
526 actual: record.commit_seq,
527 });
528 }
529
530 if !record.verify_marker_id() {
534 return Err(MarkerError::MarkerIdMismatch {
535 commit_seq: record.commit_seq,
536 });
537 }
538
539 records.push(record);
540 offset += COMMIT_MARKER_RECORD_BYTES;
541 }
542
543 Ok(records)
544}
545
546pub fn binary_search_by_time<F>(
555 start_commit_seq: u64,
556 record_count: u64,
557 target_ns: u64,
558 mut read_record: F,
559) -> Option<u64>
560where
561 F: FnMut(u64) -> Option<CommitMarkerRecord>,
562{
563 if record_count == 0 {
564 return None;
565 }
566
567 let mut lo = 0u64;
568 let mut hi = record_count;
569 let mut best: Option<u64> = None;
570
571 while lo < hi {
572 let mid = lo + (hi - lo) / 2;
573 let seq = start_commit_seq + mid;
574 let Some(record) = read_record(seq) else {
575 break;
576 };
577
578 if record.commit_time_unix_ns <= target_ns {
579 best = Some(seq);
580 lo = mid + 1;
581 } else {
582 hi = mid;
583 }
584 }
585
586 best
587}
588
589#[cfg(test)]
594mod tests {
595 use super::*;
596 use std::collections::HashSet;
597
598 fn make_test_record(seq: u64, prev: [u8; ID_SIZE]) -> CommitMarkerRecord {
603 let capsule = [(seq & 0xFF) as u8; ID_SIZE];
604 let proof = [((seq >> 8) & 0xFF) as u8; ID_SIZE];
605 let time_ns = 1_700_000_000_000_000_000u64 + seq * 1_000_000;
606 CommitMarkerRecord::new(seq, time_ns, capsule, proof, prev)
607 }
608
609 #[test]
610 fn test_marker_segment_header_encode_decode() {
611 let header = MarkerSegmentHeader::new(42, 42_000_000);
612 let encoded = header.encode();
613 assert_eq!(
614 encoded.len(),
615 MARKER_SEGMENT_HEADER_BYTES,
616 "header must be exactly {MARKER_SEGMENT_HEADER_BYTES} bytes"
617 );
618
619 let decoded = MarkerSegmentHeader::decode(&encoded).expect("decode must succeed");
620 assert_eq!(decoded, header);
621
622 assert_eq!(&encoded[0..4], b"FSMK");
624 }
625
626 #[test]
627 fn test_commit_marker_record_encode_decode() {
628 let record = make_test_record(7, [0u8; ID_SIZE]);
629 let encoded = record.encode();
630 assert_eq!(
631 encoded.len(),
632 COMMIT_MARKER_RECORD_BYTES,
633 "record must be exactly {COMMIT_MARKER_RECORD_BYTES} bytes"
634 );
635
636 let decoded = CommitMarkerRecord::decode(&encoded).expect("decode must succeed");
637 assert_eq!(decoded, record);
638 }
639
640 #[test]
641 fn test_marker_id_computation() {
642 let seq = 100u64;
643 let time_ns = 1_700_000_000_000_000_000u64;
644 let capsule = [0xAAu8; ID_SIZE];
645 let proof = [0xBBu8; ID_SIZE];
646 let prev = [0u8; ID_SIZE];
647
648 let marker_id = compute_marker_id(seq, time_ns, &capsule, &proof, &prev);
649
650 let mut prefix = [0u8; RECORD_PREFIX_BYTES];
652 prefix[0..8].copy_from_slice(&seq.to_le_bytes());
653 prefix[8..16].copy_from_slice(&time_ns.to_le_bytes());
654 prefix[16..32].copy_from_slice(&capsule);
655 prefix[32..48].copy_from_slice(&proof);
656 prefix[48..64].copy_from_slice(&prev);
657
658 let mut hasher = blake3::Hasher::new();
659 hasher.update(MARKER_ID_DOMAIN);
660 hasher.update(&prefix);
661 let hash = hasher.finalize();
662 let mut expected = [0u8; ID_SIZE];
663 expected.copy_from_slice(&hash.as_bytes()[..ID_SIZE]);
664
665 assert_eq!(
666 marker_id, expected,
667 "marker_id must be Trunc128(BLAKE3(domain || prefix))"
668 );
669 }
670
671 #[test]
672 fn test_density_invariant() {
673 let start_seq = 1000u64;
674 let mut prev = [0u8; ID_SIZE];
675 let mut records = Vec::new();
676
677 for i in 0..5u64 {
678 let record = make_test_record(start_seq + i, prev);
679 prev = record.marker_id;
680 records.push(record);
681 }
682
683 for (i, record) in records.iter().enumerate() {
684 assert_eq!(
685 record.commit_seq,
686 start_seq + i as u64,
687 "record at slot {i} must have commit_seq = start + {i}"
688 );
689 }
690 }
691
692 #[test]
693 fn test_o1_seek_by_commit_seq() {
694 let start_seq = 0u64;
695 let header = MarkerSegmentHeader::new(0, start_seq);
696 let mut segment = Vec::from(header.encode());
697
698 let mut prev = [0u8; ID_SIZE];
699 let mut records = Vec::new();
700 for i in 0..1000u64 {
701 let record = make_test_record(start_seq + i, prev);
702 prev = record.marker_id;
703 segment.extend_from_slice(&record.encode());
704 records.push(record);
705 }
706
707 let target_seq = 500u64;
709 #[allow(clippy::cast_possible_truncation)]
710 let offset = record_offset(target_seq, start_seq) as usize;
711 let record_bytes = &segment[offset..offset + COMMIT_MARKER_RECORD_BYTES];
712 let record = CommitMarkerRecord::decode(record_bytes).expect("decode at offset");
713 assert_eq!(record.commit_seq, target_seq);
714 assert_eq!(record, records[500]);
715 }
716
717 #[test]
718 fn test_commit_seq_allocation_from_file_length() {
719 let start_seq = 5000u64;
720 let file_len = MARKER_SEGMENT_HEADER_BYTES as u64 + 10 * COMMIT_MARKER_RECORD_BYTES as u64;
722 assert_eq!(file_len, 916);
723
724 let next = next_commit_seq_from_file_len(start_seq, file_len);
725 assert_eq!(next, start_seq + 10);
726 }
727
728 #[test]
729 fn test_torn_tail_handling() {
730 let start_seq = 0u64;
731 let header = MarkerSegmentHeader::new(0, start_seq);
732 let mut segment = Vec::from(header.encode());
733
734 let mut prev = [0u8; ID_SIZE];
735 for i in 0..5u64 {
736 let record = make_test_record(start_seq + i, prev);
737 prev = record.marker_id;
738 segment.extend_from_slice(&record.encode());
739 }
740
741 segment.extend_from_slice(&[0xDE; 44]);
743
744 let result = check_segment_integrity(&segment);
745 match result {
746 Err(MarkerError::TornTail {
747 complete_records,
748 trailing_bytes,
749 }) => {
750 assert_eq!(complete_records, 5);
751 assert_eq!(trailing_bytes, 44);
752 }
753 other => unreachable!("expected TornTail, got {other:?}"),
754 }
755 }
756
757 #[test]
758 fn test_torn_tail_corrupt_last_record() {
759 let start_seq = 0u64;
760 let header = MarkerSegmentHeader::new(0, start_seq);
761 let mut segment = Vec::from(header.encode());
762
763 let mut prev = [0u8; ID_SIZE];
764 for i in 0..5u64 {
765 let record = make_test_record(start_seq + i, prev);
766 prev = record.marker_id;
767 segment.extend_from_slice(&record.encode());
768 }
769
770 let record_4_end = MARKER_SEGMENT_HEADER_BYTES + 5 * COMMIT_MARKER_RECORD_BYTES;
772 let xxh3_start = record_4_end - 8;
773 segment[xxh3_start] ^= 0xFF;
774
775 let result = check_segment_integrity(&segment);
776 match result {
777 Err(MarkerError::TornTail {
778 complete_records, ..
779 }) => {
780 assert_eq!(complete_records, 4, "valid prefix is records 0-3");
781 }
782 other => unreachable!("expected TornTail, got {other:?}"),
783 }
784 }
785
786 #[test]
787 fn test_commit_seq_mismatch_detected() {
788 let start_seq = 100u64;
789 let header = MarkerSegmentHeader::new(0, start_seq);
790 let mut segment = Vec::from(header.encode());
791
792 let first = make_test_record(start_seq, [0u8; ID_SIZE]);
793 let second = make_test_record(start_seq + 2, first.marker_id);
794 segment.extend_from_slice(&first.encode());
795 segment.extend_from_slice(&second.encode());
796
797 let result = check_segment_integrity(&segment);
798 match result {
799 Err(MarkerError::CommitSeqMismatch { expected, actual }) => {
800 assert_eq!(expected, start_seq + 1);
801 assert_eq!(actual, start_seq + 2);
802 }
803 other => unreachable!("expected CommitSeqMismatch, got {other:?}"),
804 }
805 }
806
807 #[test]
808 fn test_binary_search_by_time() {
809 let start_seq = 0u64;
810 let base_ns = 1_000_000_000_000_000_000u64;
811
812 let records: Vec<CommitMarkerRecord> = (0..100u64)
813 .scan([0u8; ID_SIZE], |prev, i| {
814 let capsule = [(i & 0xFF) as u8; ID_SIZE];
815 let proof = [((i >> 8) & 0xFF) as u8; ID_SIZE];
816 let time_ns = base_ns + i * 1_000_000;
817 let record = CommitMarkerRecord::new(i, time_ns, capsule, proof, *prev);
818 *prev = record.marker_id;
819 Some(record)
820 })
821 .collect();
822
823 let target_ns = base_ns + 50 * 1_000_000;
825 #[allow(clippy::cast_possible_truncation)]
826 let result = binary_search_by_time(start_seq, 100, target_ns, |seq| {
827 records.get(seq as usize).cloned()
828 });
829 assert_eq!(result, Some(50));
830
831 #[allow(clippy::cast_possible_truncation)]
833 let result = binary_search_by_time(start_seq, 100, base_ns - 1, |seq| {
834 records.get(seq as usize).cloned()
835 });
836 assert_eq!(result, None);
837
838 #[allow(clippy::cast_possible_truncation)]
840 let result = binary_search_by_time(start_seq, 100, u64::MAX, |seq| {
841 records.get(seq as usize).cloned()
842 });
843 assert_eq!(result, Some(99));
844 }
845
846 #[test]
847 fn test_fork_detection() {
848 let base_ns = 1_700_000_000_000_000_000u64;
849 let mut prev = [0u8; ID_SIZE];
850
851 let mut shared = Vec::new();
853 for i in 0..10u64 {
854 let capsule = [0xAAu8; ID_SIZE];
855 let proof = [0xBBu8; ID_SIZE];
856 let record = CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev);
857 prev = record.marker_id;
858 shared.push(record);
859 }
860
861 let mut fork_a = shared.clone();
863 let mut prev_a = shared[9].marker_id;
864 for i in 10..15u64 {
865 let capsule = [0x11u8; ID_SIZE];
866 let proof = [0x22u8; ID_SIZE];
867 let record =
868 CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev_a);
869 prev_a = record.marker_id;
870 fork_a.push(record);
871 }
872
873 let mut fork_b = shared.clone();
875 let mut prev_b = shared[9].marker_id;
876 for i in 10..13u64 {
877 let capsule = [0x33u8; ID_SIZE];
878 let proof = [0x44u8; ID_SIZE];
879 let record =
880 CommitMarkerRecord::new(i, base_ns + i * 1_000_000, capsule, proof, prev_b);
881 prev_b = record.marker_id;
882 fork_b.push(record);
883 }
884
885 let min_len = fork_a.len().min(fork_b.len());
887 let mut lo = 0usize;
888 let mut hi = min_len;
889 while lo < hi {
890 let mid = lo + (hi - lo) / 2;
891 if fork_a[mid].marker_id == fork_b[mid].marker_id {
892 lo = mid + 1;
893 } else {
894 hi = mid;
895 }
896 }
897
898 assert_eq!(lo, 10, "divergence starts at commit_seq 10");
900 assert_eq!(
901 fork_a[9].marker_id, fork_b[9].marker_id,
902 "last matching marker_id is at seq 9"
903 );
904 assert_ne!(
905 fork_a[10].marker_id, fork_b[10].marker_id,
906 "first divergence at seq 10"
907 );
908 }
909
910 #[test]
911 fn test_hash_chain_integrity() {
912 let mut prev = [0u8; ID_SIZE];
913 let mut records = Vec::new();
914
915 for i in 0..10u64 {
916 let record = make_test_record(i, prev);
917 prev = record.marker_id;
918 records.push(record);
919 }
920
921 for i in 1..records.len() {
923 assert_eq!(
924 records[i].prev_marker_id,
925 records[i - 1].marker_id,
926 "record {i} prev_marker_id must link to record {}'s marker_id",
927 i - 1
928 );
929 }
930 assert_eq!(records[0].prev_marker_id, [0u8; ID_SIZE], "genesis is zero");
931
932 for record in &records {
934 assert!(
935 record.verify_marker_id(),
936 "marker_id must be verifiable for commit_seq {}",
937 record.commit_seq
938 );
939 }
940
941 let mut tampered = records[5].clone();
943 tampered.capsule_object_id[0] ^= 0xFF;
944 assert!(
945 !tampered.verify_marker_id(),
946 "tampered record must fail marker_id verification"
947 );
948 }
949
950 #[test]
951 fn test_marker_no_mem_size_of() {
952 assert_eq!(MARKER_SEGMENT_HEADER_BYTES, 36);
954 assert_eq!(COMMIT_MARKER_RECORD_BYTES, 88);
955
956 let header = MarkerSegmentHeader::new(0, 0);
959 let encoded_header = header.encode();
960 assert_eq!(
961 encoded_header.len(),
962 36,
963 "header on-disk size is a constant"
964 );
965
966 let record = make_test_record(0, [0u8; ID_SIZE]);
967 let encoded_record = record.encode();
968 assert_eq!(
969 encoded_record.len(),
970 88,
971 "record on-disk size is a constant"
972 );
973 }
974
975 #[test]
976 fn test_header_bad_magic_rejected() {
977 let header = MarkerSegmentHeader::new(1, 0);
978 let mut encoded = header.encode();
979 encoded[0] = b'X';
980
981 let result = MarkerSegmentHeader::decode(&encoded);
982 assert_eq!(result.unwrap_err(), MarkerError::BadMagic);
983 }
984
985 #[test]
986 fn test_header_checksum_tamper_detected() {
987 let header = MarkerSegmentHeader::new(1, 0);
988 let mut encoded = header.encode();
989 encoded[8] ^= 0x01;
991
992 let result = MarkerSegmentHeader::decode(&encoded);
993 assert!(matches!(
994 result.unwrap_err(),
995 MarkerError::HeaderChecksumMismatch { .. }
996 ));
997 }
998
999 #[test]
1000 fn test_record_checksum_tamper_detected() {
1001 let record = make_test_record(42, [0u8; ID_SIZE]);
1002 let mut encoded = record.encode();
1003 encoded[10] ^= 0x01;
1005
1006 let result = CommitMarkerRecord::decode(&encoded);
1007 assert!(matches!(
1008 result.unwrap_err(),
1009 MarkerError::RecordChecksumMismatch { .. }
1010 ));
1011 }
1012
1013 #[test]
1014 fn test_segment_id_for_commit_seq() {
1015 assert_eq!(segment_id_for_commit_seq(0), 0);
1016 assert_eq!(segment_id_for_commit_seq(999_999), 0);
1017 assert_eq!(segment_id_for_commit_seq(1_000_000), 1);
1018 assert_eq!(segment_id_for_commit_seq(2_500_000), 2);
1019 }
1020
1021 #[test]
1022 fn test_start_commit_seq_for_segment() {
1023 assert_eq!(start_commit_seq_for_segment(0), 0);
1024 assert_eq!(start_commit_seq_for_segment(1), 1_000_000);
1025 assert_eq!(start_commit_seq_for_segment(5), 5_000_000);
1026 }
1027
1028 #[test]
1029 fn test_record_offset_formula() {
1030 let offset = record_offset(500, 0);
1031 assert_eq!(
1032 offset,
1033 MARKER_SEGMENT_HEADER_BYTES as u64 + 500 * COMMIT_MARKER_RECORD_BYTES as u64
1034 );
1035
1036 let offset2 = record_offset(1_000_050, 1_000_000);
1037 assert_eq!(
1038 offset2,
1039 MARKER_SEGMENT_HEADER_BYTES as u64 + 50 * COMMIT_MARKER_RECORD_BYTES as u64
1040 );
1041 }
1042
1043 #[test]
1044 fn test_error_display() {
1045 let err = MarkerError::BadMagic;
1046 assert_eq!(err.to_string(), "bad magic in marker segment header");
1047
1048 let err = MarkerError::TornTail {
1049 complete_records: 5,
1050 trailing_bytes: 44,
1051 };
1052 assert!(err.to_string().contains("torn tail"));
1053 assert!(err.to_string().contains('5'));
1054 }
1055
1056 #[test]
1057 fn test_marker_id_deterministic() {
1058 let capsule = [0xAA; ID_SIZE];
1059 let proof = [0xBB; ID_SIZE];
1060 let prev = [0u8; ID_SIZE];
1061
1062 let id1 = compute_marker_id(1, 100, &capsule, &proof, &prev);
1063 let id2 = compute_marker_id(1, 100, &capsule, &proof, &prev);
1064 assert_eq!(id1, id2, "marker_id must be deterministic");
1065
1066 let id3 = compute_marker_id(2, 100, &capsule, &proof, &prev);
1068 assert_ne!(id1, id3);
1069 }
1070
1071 #[test]
1072 fn prop_marker_id_unique() {
1073 let mut rng_state = 0xDEAD_BEEF_CAFE_BABE_u64;
1074 let mut observed_ids = HashSet::new();
1075
1076 for i in 0..2048u64 {
1077 rng_state = rng_state
1079 .wrapping_mul(6_364_136_223_846_793_005)
1080 .wrapping_add(1);
1081 let mut capsule = [0u8; ID_SIZE];
1082 let mut proof = [0u8; ID_SIZE];
1083 let mut prev = [0u8; ID_SIZE];
1084
1085 capsule[..8].copy_from_slice(&rng_state.to_le_bytes());
1086 proof[..8].copy_from_slice(&rng_state.rotate_left(13).to_le_bytes());
1087 prev[..8].copy_from_slice(&rng_state.rotate_right(7).to_le_bytes());
1088 capsule[8..16].copy_from_slice(&i.to_le_bytes());
1089 proof[8..16].copy_from_slice(&(i ^ rng_state).to_le_bytes());
1090 prev[8..16].copy_from_slice(&(i.wrapping_mul(17)).to_le_bytes());
1091
1092 let marker_id =
1093 compute_marker_id(i, 1_700_000_000_000_000_000 + i, &capsule, &proof, &prev);
1094 assert!(
1095 observed_ids.insert(marker_id),
1096 "marker_id collision at sample {i}: {marker_id:02X?}"
1097 );
1098 }
1099 }
1100
1101 #[test]
1102 fn prop_density_invariant_holds() {
1103 for count in 1..=256u64 {
1104 let start_seq = 10_000 + count * 31;
1105 let header = MarkerSegmentHeader::new(segment_id_for_commit_seq(start_seq), start_seq);
1106 let mut segment = Vec::from(header.encode());
1107 let mut prev = [0u8; ID_SIZE];
1108
1109 for i in 0..count {
1110 let record = make_test_record(start_seq + i, prev);
1111 prev = record.marker_id;
1112 segment.extend_from_slice(&record.encode());
1113 }
1114
1115 let integrity = check_segment_integrity(&segment).expect("segment should be dense");
1116 assert_eq!(integrity, count);
1117 }
1118 }
1119
1120 #[test]
1121 fn test_e2e_marker_stream_recovery() {
1122 let start_seq = 20_000u64;
1123 let header = MarkerSegmentHeader::new(segment_id_for_commit_seq(start_seq), start_seq);
1124 let mut segment = Vec::from(header.encode());
1125 let mut expected = Vec::new();
1126 let mut prev = [0u8; ID_SIZE];
1127
1128 for i in 0..1000u64 {
1129 let record = make_test_record(start_seq + i, prev);
1130 prev = record.marker_id;
1131 segment.extend_from_slice(&record.encode());
1132 expected.push(record);
1133 }
1134
1135 segment.truncate(segment.len() - (COMMIT_MARKER_RECORD_BYTES / 2));
1137
1138 let recovered = recover_valid_prefix(&segment).expect("recovery should succeed");
1139 assert_eq!(recovered.len(), expected.len() - 1);
1140 assert_eq!(recovered, expected[..expected.len() - 1]);
1141
1142 let integrity = check_segment_integrity(&segment);
1143 match integrity {
1144 Err(MarkerError::TornTail {
1145 complete_records,
1146 trailing_bytes,
1147 }) => {
1148 assert_eq!(complete_records, 999);
1149 assert_eq!(trailing_bytes, COMMIT_MARKER_RECORD_BYTES / 2);
1150 }
1151 other => unreachable!("expected torn-tail integrity result, got {other:?}"),
1152 }
1153 }
1154
1155 #[test]
1156 fn test_e2e_time_travel_query() {
1157 let start_seq = 5_000u64;
1158 let count = 256u64;
1159 let base_ns = 1_900_000_000_000_000_000u64;
1160 let mut prev = [0u8; ID_SIZE];
1161 let mut records = Vec::new();
1162
1163 for i in 0..count {
1164 let capsule = [(i & 0xFF) as u8; ID_SIZE];
1165 let proof = [((i >> 8) & 0xFF) as u8; ID_SIZE];
1166 let record = CommitMarkerRecord::new(
1167 start_seq + i,
1168 base_ns + i * 2_000_000,
1169 capsule,
1170 proof,
1171 prev,
1172 );
1173 prev = record.marker_id;
1174 records.push(record);
1175 }
1176
1177 let lookup = |seq: u64| -> Option<CommitMarkerRecord> {
1178 if seq < start_seq {
1179 return None;
1180 }
1181 let idx = usize::try_from(seq - start_seq).ok()?;
1182 records.get(idx).cloned()
1183 };
1184
1185 assert_eq!(
1187 binary_search_by_time(start_seq, count, base_ns - 1, lookup),
1188 None
1189 );
1190 assert_eq!(
1192 binary_search_by_time(start_seq, count, base_ns + 40 * 2_000_000, lookup),
1193 Some(start_seq + 40)
1194 );
1195 assert_eq!(
1197 binary_search_by_time(
1198 start_seq,
1199 count,
1200 base_ns + 40 * 2_000_000 + 1_000_000,
1201 lookup
1202 ),
1203 Some(start_seq + 40)
1204 );
1205 assert_eq!(
1207 binary_search_by_time(start_seq, count, u64::MAX, lookup),
1208 Some(start_seq + count - 1)
1209 );
1210 }
1211}