batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
//! Round-7 "cake-and-eat-it" SIDX-manifest untrusted-footer recovery tests.
//!
//! Extracted from `boundary_tests.rs` to keep each inline test file within the
//! structural file-size budget. These pin the untrusted-footer
//! recover-vs-fail-closed decision via the SIDX entry table as a
//! self-authenticating manifest: the trust anchor is the content-addressed
//! `event_hash` (blake3 of the event payload), which the writer records into each
//! SIDX entry AND embeds in each frame's `hash_chain` — so a recovered (CRC-valid)
//! frame corroborates its matching entry, and a corrupt footer cannot forge a
//! corroboration.
//!
//! Decision under test (untrusted boundary only):
//! - (a) a corroborated manifest attesting to a committed frame missing from the
//!   recovered stream FAILS CLOSED regardless of tail policy (round-7 gap);
//! - (b) a corroborated manifest over intact frames RECOVERS them all;
//! - (c) an unparseable / uncorroborated manifest falls back to prefix recovery
//!   (no false fail-closed). Mid-stream corruption still fails closed first.

use super::*;
use crate::event::{EventHeader, EventKind, HashChain};
use crate::store::segment::sidx::{kind_to_raw, read_entries_unauthenticated, SidxEntryCollector};
use std::io::Cursor;

/// Build an in-memory buffer of `[real CRC-valid frames][CRC-valid SDX3 footer]`.
/// Mirror of the helper in `boundary_tests.rs`, used by the mid-stream-corruption
/// regression below. Returns `(bytes, frames_end)`.
fn frames_then_sdx3_footer(payloads: &[&str]) -> (Vec<u8>, u64) {
    use crate::store::segment::sidx::SidxEntry;

    let mut bytes = Vec::new();
    let mut collector = SidxEntryCollector::new();
    for (idx, p) in payloads.iter().enumerate() {
        let frame_offset = bytes.len() as u64;
        let frame = frame_encode(&serde_json::json!({ "payload": p })).expect("encode frame");
        let frame_length = u32::try_from(frame.len()).expect("frame length fits u32");
        bytes.extend_from_slice(&frame);
        let entry = SidxEntry {
            event_id: idx as u128 + 1,
            entity_idx: 0,
            scope_idx: 0,
            kind: kind_to_raw(EventKind::custom(0x1, 1)),
            wall_ms: 1,
            clock: 1,
            dag_lane: 0,
            dag_depth: 0,
            prev_hash: [0; 32],
            event_hash: [1; 32],
            frame_offset,
            frame_length,
            global_sequence: idx as u64 + 1,
            correlation_id: 1,
            causation_id: 0,
        };
        collector
            .record(entry, "entity:test", "scope:test")
            .expect("intern test strings");
    }
    let frames_end = bytes.len() as u64;
    let mut cursor = Cursor::new(&mut bytes);
    cursor.seek(SeekFrom::End(0)).expect("seek to footer start");
    collector
        .write_footer(&mut cursor, 7)
        .expect("write footer");
    (bytes, frames_end)
}

/// Total on-disk size of the frame beginning at `offset` in `bytes`.
fn frame_total_len_at(bytes: &[u8], offset: u64) -> u64 {
    let start = usize::try_from(offset).expect("offset fits usize");
    let header: [u8; 4] = bytes[start..start + 4]
        .try_into()
        .expect("4-byte frame length prefix");
    let payload_len = u64::from(u32::from_be_bytes(header));
    8 + payload_len
}

/// Build one real CRC-valid frame whose `FramePayload` carries a `hash_chain`
/// with `event_hash = blake3(payload_bytes)` (the exact writer invariant), plus
/// the matching [`crate::store::segment::sidx::SidxEntry`] (same offset, length,
/// and content event_hash). Returns `(frame_bytes, entry)`.
fn corroboratable_frame(
    frame_offset: u64,
    seq: u64,
    payload: &serde_json::Value,
) -> (Vec<u8>, crate::store::segment::sidx::SidxEntry) {
    let payload_bytes = crate::encoding::to_bytes(payload).expect("encode payload");
    let event_hash = crate::event::hash::compute_hash(&payload_bytes);
    let header = EventHeader::new(
        seq as u128,
        seq as u128,
        None,
        0,
        crate::coordinate::DagPosition::root(),
        u32::try_from(payload_bytes.len()).expect("payload size fits u32"),
        EventKind::custom(0x1, 1),
    );
    let event = crate::event::Event {
        header,
        payload: payload_bytes,
        hash_chain: Some(HashChain {
            prev_hash: [0; 32],
            event_hash,
        }),
    };
    let frame_payload = FramePayload {
        event,
        entity: "entity:test".to_owned(),
        scope: "scope:test".to_owned(),
        receipt_extensions: std::collections::BTreeMap::new(),
    };
    let frame = frame_encode(&frame_payload).expect("encode frame");
    let frame_length = u32::try_from(frame.len()).expect("frame length fits u32");
    let entry = crate::store::segment::sidx::SidxEntry {
        event_id: seq as u128,
        entity_idx: 0,
        scope_idx: 0,
        kind: kind_to_raw(EventKind::custom(0x1, 1)),
        wall_ms: 1,
        clock: 1,
        dag_lane: 0,
        dag_depth: 0,
        prev_hash: [0; 32],
        event_hash,
        frame_offset,
        frame_length,
        global_sequence: seq,
        correlation_id: seq as u128,
        causation_id: 0,
    };
    (frame, entry)
}

/// Append an SDX3 footer recording `entries` to `bytes`, then (optionally) flip a
/// byte inside the footer's covered region so the footer CRC fails → UNTRUSTED.
/// The 16-byte trailer geometry (offset/count/magic) is left intact, so the entry
/// table stays parseable by `read_entries_unauthenticated` while the footer CRC
/// no longer authenticates the boundary. Returns the footer-body byte offset that
/// was corrupted (= the footer start, the entries' recorded frames_end).
fn append_untrusted_footer(
    bytes: &mut Vec<u8>,
    entries: &[crate::store::segment::sidx::SidxEntry],
) -> u64 {
    let footer_start = bytes.len() as u64;
    let mut collector = SidxEntryCollector::new();
    for entry in entries.iter().cloned() {
        collector
            .record(entry, "entity:test", "scope:test")
            .expect("intern test strings");
    }
    let mut cursor = Cursor::new(&mut *bytes);
    cursor.seek(SeekFrom::End(0)).expect("seek to footer start");
    collector
        .write_footer(&mut cursor, 7)
        .expect("write footer");
    // Corrupt one byte of the footer string-table/entries region (the byte right
    // at footer_start) to break the footer CRC without touching the trailer
    // geometry. This is exactly how `detect_sidx_boundary_distrusts_a_crc_failed_sdx3_footer`
    // produces an untrusted boundary.
    let corrupt_at = usize::try_from(footer_start).expect("footer_start fits usize");
    bytes[corrupt_at] ^= 0xFF;
    footer_start
}

#[test]
fn untrusted_entries_parse_is_crc_independent() {
    // The untrusted entry-table read decodes entries even when the footer CRC has
    // been broken — proving SidxEntry::decode_from is CRC-independent and the
    // manifest is available for corroboration on a corrupt footer.
    let (mut bytes, e0) = {
        let (f0, e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "a"}));
        (f0, e0)
    };
    let (f1, mut e1) = corroboratable_frame(bytes.len() as u64, 2, &serde_json::json!({"v": "b"}));
    e1.frame_offset = bytes.len() as u64;
    bytes.extend_from_slice(&f1);
    append_untrusted_footer(&mut bytes, &[e0.clone(), e1.clone()]);

    let mut cursor = Cursor::new(bytes);
    let parsed = read_entries_unauthenticated(&mut cursor, 7).expect("must not error");
    assert_eq!(
        parsed.len(),
        2,
        "PROPERTY: entries parse from a CRC-failed footer (decode_from is CRC-independent)"
    );
    assert_eq!(
        parsed[0].event_hash, e0.event_hash,
        "PROPERTY: parsed untrusted entry preserves the content event_hash for corroboration"
    );
    assert_eq!(parsed[1].frame_offset, e1.frame_offset);
}

#[test]
fn untrusted_garbage_entry_table_parses_to_zero_entries() {
    // A trailer with absurd entry_count must yield ZERO entries (geometry guard),
    // forcing the fall-back path — never a partial/forged manifest.
    let mut bytes = vec![0xA5u8; 200];
    let trailer_start = bytes.len() - 16;
    bytes[trailer_start..trailer_start + 8].copy_from_slice(&0u64.to_le_bytes());
    // entry_count = u32::MAX → entries_block_len underflows entries_start → zero.
    bytes[trailer_start + 8..trailer_start + 12].copy_from_slice(&u32::MAX.to_le_bytes());
    bytes[trailer_start + 12..trailer_start + 16]
        .copy_from_slice(crate::store::segment::sidx::SIDX_MAGIC);

    let mut cursor = Cursor::new(bytes);
    let parsed = read_entries_unauthenticated(&mut cursor, 7).expect("must not error");
    assert!(
        parsed.is_empty(),
        "PROPERTY: an absurd entry_count yields zero entries (fall back), never a forged manifest"
    );
}

#[test]
fn resolve_untrusted_fails_closed_on_torn_last_committed_frame() {
    // ROUND-7 CASE (a): an UNTRUSTED footer whose SIDX manifest records THREE
    // committed frames, but the LAST committed frame is torn (only its header + 1
    // byte survive). Frames 0 and 1 are CRC-valid and corroborate their entries
    // (matching offset + length + content event_hash), anchoring the manifest to
    // THIS segment. The manifest then attests to a committed frame at offset == P
    // (the recovered prefix end) that is missing from the recovered stream → real
    // data loss → FAIL CLOSED, regardless of tail policy.
    let (f0, mut e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "first"}));
    e0.frame_offset = 0;
    let mut bytes = f0;

    let f1_off = bytes.len() as u64;
    let (f1, mut e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "second"}));
    e1.frame_offset = f1_off;
    bytes.extend_from_slice(&f1);

    // The (committed) third frame: build it fully to compute its real length +
    // event_hash for the entry, but write only its header + a single payload byte
    // so it can NEVER decode (torn). Its recorded offset is the end of frame 1,
    // which is exactly the recovery stop P.
    let f2_off = bytes.len() as u64;
    let (f2_full, mut e2) = corroboratable_frame(f2_off, 3, &serde_json::json!({"v": "third"}));
    e2.frame_offset = f2_off;
    bytes.extend_from_slice(&f2_full[..9]); // 8-byte header + 1 payload byte = torn

    append_untrusted_footer(&mut bytes, &[e0, e1, e2]);
    let file_len = bytes.len() as u64;

    // STASH-VERIFY (the round-7 gap, made explicit): the OLD primitive
    // `crc_valid_frames_end` — the manifest-blind recover-the-prefix walk the three
    // sites used before this fix — returns Ok(P) for these exact bytes, SILENTLY
    // DROPPING the torn committed frame 2 (P == start of frame 2). This is the
    // round-7 data-loss bug. The NEW manifest path FAILS CLOSED on the same bytes.
    {
        let mut old_cursor = Cursor::new(bytes.clone());
        let old = crc_valid_frames_end(&mut old_cursor, 0, file_len, 7).expect(
            "OLD primitive recovers the prefix (the bug): torn tail has nothing valid after",
        );
        assert_eq!(
            old, f2_off,
            "STASH-VERIFY: the manifest-blind walk recovers the prefix and silently drops the \
             torn committed frame (recovery stop == start of the torn frame 2)"
        );
    }

    let mut cursor = Cursor::new(bytes);

    // FailClosed policy (the non-tail sealed-segment posture cold_start passes).
    let result = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, true);
    assert!(
        matches!(
            result,
            Err(StoreError::CorruptSegment { segment_id: 7, .. })
        ),
        "PROPERTY: a corroborated SIDX manifest attesting to a torn/missing last committed frame \
         must FAIL CLOSED, not silently recover the prefix; got {result:?}"
    );

    // Honored REGARDLESS of tail policy: even the recover-torn-tail posture must
    // fail closed when a corroborated manifest proves a committed frame is missing.
    let mut cursor2 = {
        let (f0, mut e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "first"}));
        e0.frame_offset = 0;
        let mut bytes = f0;
        let f1_off = bytes.len() as u64;
        let (f1, mut e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "second"}));
        e1.frame_offset = f1_off;
        bytes.extend_from_slice(&f1);
        let f2_off = bytes.len() as u64;
        let (f2_full, mut e2) = corroboratable_frame(f2_off, 3, &serde_json::json!({"v": "third"}));
        e2.frame_offset = f2_off;
        bytes.extend_from_slice(&f2_full[..9]);
        append_untrusted_footer(&mut bytes, &[e0, e1, e2]);
        Cursor::new(bytes)
    };
    let len2 = cursor2.get_ref().len() as u64;
    let result_recover = resolve_untrusted_frames_end(&mut cursor2, 0, len2, 7, None, false);
    assert!(
        matches!(result_recover, Err(StoreError::CorruptSegment { .. })),
        "PROPERTY: a corroborated missing committed frame fails closed even under \
         RecoverTornTail policy (data loss is policy-independent); got {result_recover:?}"
    );
}

#[test]
fn resolve_untrusted_recovers_all_when_frames_intact_and_footer_corrupt() {
    // ROUND-7 CASE (b): an UNTRUSTED (corrupt) footer but ALL committed frames are
    // intact and CRC-valid. Every SIDX entry corroborates a recovered frame and
    // none reference a frame at/past P → the manifest agrees the segment is
    // complete → RECOVER all frames (return the true frames_end).
    let (f0, mut e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "a"}));
    e0.frame_offset = 0;
    let mut bytes = f0;
    let f1_off = bytes.len() as u64;
    let (f1, mut e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "b"}));
    e1.frame_offset = f1_off;
    bytes.extend_from_slice(&f1);
    let f2_off = bytes.len() as u64;
    let (f2, mut e2) = corroboratable_frame(f2_off, 3, &serde_json::json!({"v": "c"}));
    e2.frame_offset = f2_off;
    bytes.extend_from_slice(&f2);
    let frames_end = bytes.len() as u64;

    append_untrusted_footer(&mut bytes, &[e0, e1, e2]);
    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let recovered = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, true)
        .expect("intact frames under a corrupt footer must recover, not fail closed")
        .frames_end;
    assert_eq!(
        recovered, frames_end,
        "PROPERTY: a corrupt footer over intact, corroborated frames recovers ALL committed frames"
    );
}

#[test]
fn resolve_untrusted_falls_back_to_prefix_when_no_entry_corroborates() {
    // ROUND-7 CASE (c), PERMISSIVE leg: an UNTRUSTED footer whose entry table
    // parses but NONE of its entries corroborate a recovered frame (here: zero
    // entries — the unparseable/no-signal posture). With no anchored manifest
    // there is no PROOF a committed frame is missing, so recovery honors the tail
    // posture. Under the default permissive (`RecoverTornTail` → false) posture it
    // recovers the CRC-valid prefix — never a false fail-closed on a benign
    // corrupt-footer store. (The STRICT leg of this exact scenario now fails closed
    // as an unprovable tail — see
    // `strict_untrusted_footer_nonempty_prefix_no_corroboration_fails_closed`.)
    let (f0, _e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "a"}));
    let mut bytes = f0;
    let f1_off = bytes.len() as u64;
    let (f1, _e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "b"}));
    bytes.extend_from_slice(&f1);
    let frames_end = bytes.len() as u64;

    // Footer records ZERO entries (empty manifest) but is corrupted to UNTRUSTED.
    append_untrusted_footer(&mut bytes, &[]);
    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let recovered = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, false)
        .expect("an empty/unparseable manifest must fall back to prefix recovery under permissive")
        .frames_end;
    assert_eq!(
        recovered, frames_end,
        "PROPERTY: with no corroborating entry, the default permissive posture falls back to the \
         CRC-valid prefix (no false fail-closed)"
    );
}

#[test]
fn resolve_untrusted_recovers_for_garbage_entry_table() {
    // ROUND-7 CASE (c), garbage variant under the PERMISSIVE posture: an UNTRUSTED
    // footer whose entry table is unparseable (absurd entry_count).
    // read_entries_unauthenticated returns zero entries → no corroboration. Under
    // the default permissive (`RecoverTornTail` → false) posture this falls back to
    // prefix recovery — never a false fail-closed. (Under STRICT the same garbage
    // manifest fails closed as an unprovable tail; that leg is covered by
    // `strict_untrusted_footer_nonempty_prefix_no_corroboration_fails_closed`.)
    let (f0, _e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "x"}));
    let mut bytes = f0;
    let f1_off = bytes.len() as u64;
    let (f1, _e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "y"}));
    bytes.extend_from_slice(&f1);
    let frames_end = bytes.len() as u64;

    // Append a 16-byte trailer with valid magic but an absurd entry_count so the
    // geometry guard yields zero entries. (No CRC region behind it → also breaks
    // any authentication; the boundary is untrusted and the manifest is empty.)
    let mut trailer = [0u8; 16];
    trailer[0..8].copy_from_slice(&frames_end.to_le_bytes());
    trailer[8..12].copy_from_slice(&u32::MAX.to_le_bytes());
    trailer[12..16].copy_from_slice(crate::store::segment::sidx::SIDX_MAGIC);
    bytes.extend_from_slice(&trailer);

    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let recovered = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, false)
        .expect("a garbage entry table must fall back to prefix recovery under permissive")
        .frames_end;
    assert_eq!(
        recovered, frames_end,
        "PROPERTY: a garbage/unparseable entry table degrades to prefix recovery under the default \
         permissive posture (no false fail-closed)"
    );
}

#[test]
fn resolve_untrusted_legacy_sdx2_intact_frames_recovers() {
    // ROUND-7: a legacy SDX2 footer (never CRC-authenticated → always UNTRUSTED)
    // over intact frames. The SDX2 entry table is parseable by
    // read_entries_unauthenticated (it accepts both magics) and its entries
    // corroborate the recovered frames → RECOVER all.
    let (f0, mut e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "a"}));
    e0.frame_offset = 0;
    let mut bytes = f0;
    let f1_off = bytes.len() as u64;
    let (f1, mut e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "b"}));
    e1.frame_offset = f1_off;
    bytes.extend_from_slice(&f1);
    let frames_end = bytes.len() as u64;

    // Write a real SDX3 footer, then rewrite the trailing magic to legacy SDX2 so
    // the boundary reads as un-CRC'd/untrusted while the entry geometry is intact.
    append_untrusted_footer(&mut bytes, &[e0, e1]);
    let n = bytes.len();
    bytes[n - 4..n].copy_from_slice(crate::store::segment::sidx::SIDX_MAGIC_LEGACY_SDX2);

    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let recovered = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, true)
        .expect("legacy SDX2 over intact frames must recover")
        .frames_end;
    assert_eq!(
        recovered, frames_end,
        "PROPERTY: a legacy SDX2 manifest corroborates intact frames and recovers them all"
    );
}

#[test]
fn resolve_untrusted_still_fails_closed_on_mid_stream_corruption() {
    // ROUND-5 invariant preserved THROUGH the manifest path: mid-stream corruption
    // (a CRC-valid frame after the first bad frame) must still FAIL CLOSED before
    // the manifest is even consulted. The walk's look-ahead resync fires inside
    // crc_valid_frames_end_with_map exactly as in crc_valid_frames_end.
    let (bytes, frames_end) = frames_then_sdx3_footer(&["a", "b", "c", "d", "e"]);
    let mut bytes = bytes;
    let f2_start = frame_total_len_at(&bytes, 0);
    let f3_start = f2_start + frame_total_len_at(&bytes, f2_start);
    let third_payload_byte = usize::try_from(f3_start + 8).expect("offset fits usize");
    assert!((third_payload_byte as u64) < frames_end);
    bytes[third_payload_byte] ^= 0x01; // break the third frame's CRC (interior)
    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let result = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, true);
    assert!(
        matches!(
            result,
            Err(StoreError::CorruptSegment { segment_id: 7, .. })
        ),
        "PROPERTY: mid-stream corruption still fails closed via the manifest path; got {result:?}"
    );
}

#[test]
fn corroborate_property_missing_trailing_frame_fails_intact_recovers() {
    // PROPERTY (adversarial, unit-level): for a manifest with >= 1 corroborated
    // entry, ANY entry naming a committed frame at/after P that is absent from R
    // forces FailClosed; an intact set (every entry present in R) recovers. We
    // drive corroborate_untrusted_entries directly over synthetic R + entries.
    let recovered: RecoveredFrameMap = [
        (
            0u64,
            RecoveredFrame {
                frame_length: 64,
                event_hash: Some([7u8; 32]),
            },
        ),
        (
            64u64,
            RecoveredFrame {
                frame_length: 64,
                event_hash: Some([8u8; 32]),
            },
        ),
    ]
    .into_iter()
    .collect();
    let p = 128u64; // recovered prefix end (two 64-byte frames)

    let entry = |frame_offset: u64, frame_length: u32, event_hash: [u8; 32]| {
        crate::store::segment::sidx::SidxEntry {
            event_id: 1,
            entity_idx: 0,
            scope_idx: 0,
            kind: kind_to_raw(EventKind::custom(0x1, 1)),
            wall_ms: 1,
            clock: 1,
            dag_lane: 0,
            dag_depth: 0,
            prev_hash: [0; 32],
            event_hash,
            frame_offset,
            frame_length,
            global_sequence: 1,
            correlation_id: 1,
            causation_id: 0,
        }
    };

    // Intact: both entries corroborate, none past P → recover prefix.
    let intact = vec![entry(0, 64, [7; 32]), entry(64, 64, [8; 32])];
    assert_eq!(
        corroborate_untrusted_entries(&intact, &recovered, p, None, true),
        UntrustedRecovery::RecoverPrefix(p),
        "PROPERTY: a fully-corroborated manifest with nothing past P recovers the prefix"
    );

    // Missing trailing committed frame: one corroborated anchor + an entry naming a
    // committed frame at offset == P that is NOT in R → fail closed as proven loss.
    let missing = vec![entry(0, 64, [7; 32]), entry(128, 64, [9; 32])];
    assert_eq!(
        corroborate_untrusted_entries(&missing, &recovered, p, None, true),
        UntrustedRecovery::FailClosedCorroboratedLoss,
        "PROPERTY: an anchored manifest attesting to a committed frame at/after P missing from R \
         always fails closed (proven loss)"
    );

    // No corroboration (forged hashes): zero anchored entries → HONOR the tail
    // posture. This is the completed-feature wiring: the same un-anchored manifest
    // over a non-empty recovered prefix recovers under the default permissive
    // posture but is REFUSED as an unprovable tail under the strict posture.
    let forged = vec![entry(0, 64, [0xAA; 32]), entry(128, 64, [0xBB; 32])];
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, None, false),
        UntrustedRecovery::RecoverPrefix(p),
        "PROPERTY: an un-anchored (forged) manifest is inert under the permissive posture — fall \
         back to prefix, no false fail-closed"
    );
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, None, true),
        UntrustedRecovery::FailClosedUnprovableTail,
        "PROPERTY: under the strict FailClosed posture the same un-anchored manifest over a \
         non-empty recovered prefix refuses the unprovable tail (truncation cannot be ruled out)"
    );
}

#[test]
fn corroborate_runs_the_present_check_when_a_frame_sits_at_or_past_the_stop() {
    // The fail-closed loop's second corroboration (`present`: frame_length AND
    // content event_hash both match) is defense-in-depth: production R never holds
    // an offset >= P, so the check is documented as "robust to future map changes"
    // and its inner closure is normally never entered. We exercise it DIRECTLY by
    // seeding R with a frame AT the recovery stop, forcing the closure to run so the
    // three mutations inside it (frame_length `==`→`!=`, the `&&`→`||`, and the
    // event_hash `==`→`!=`) become observable.
    let entry = |frame_offset: u64, frame_length: u32, event_hash: [u8; 32]| {
        crate::store::segment::sidx::SidxEntry {
            event_id: 1,
            entity_idx: 0,
            scope_idx: 0,
            kind: kind_to_raw(EventKind::custom(0x1, 1)),
            wall_ms: 1,
            clock: 1,
            dag_lane: 0,
            dag_depth: 0,
            prev_hash: [0; 32],
            event_hash,
            frame_offset,
            frame_length,
            global_sequence: 1,
            correlation_id: 1,
            causation_id: 0,
        }
    };

    let p = 200u64;
    // R holds the anchor at 0 AND a frame AT the stop (offset == P == 200) so the
    // present-closure runs for the entry that names offset 200.
    let recovered: RecoveredFrameMap = [
        (
            0u64,
            RecoveredFrame {
                frame_length: 64,
                event_hash: Some([7u8; 32]),
            },
        ),
        (
            200u64,
            RecoveredFrame {
                frame_length: 70,
                event_hash: Some([9u8; 32]),
            },
        ),
    ]
    .into_iter()
    .collect();

    // Sub-case A — the entry at P matches the recovered frame EXACTLY (length AND
    // hash), so it is "present" and recovery proceeds. Flipping the length `==`→`!=`
    // or the hash `==`→`!=` inside the closure makes `present` false and FAILS
    // CLOSED instead — so this RecoverPrefix assertion convicts both `==` mutants.
    let exact = vec![entry(0, 64, [7; 32]), entry(200, 70, [9; 32])];
    assert_eq!(
        corroborate_untrusted_entries(&exact, &recovered, p, None, true),
        UntrustedRecovery::RecoverPrefix(p),
        "an anchored manifest whose entry at P matches the recovered frame's length AND content \
         hash recovers the prefix; a flipped length/hash equality would falsely fail closed"
    );

    // Sub-case B — the entry at P matches on LENGTH but MISMATCHES on content hash,
    // so it is NOT present → FAIL CLOSED. The `&&`→`||` mutant would treat the length
    // match alone as "present" and recover; the hash `==`→`!=` mutant would read the
    // mismatch as a match and recover. Asserting FailClosed convicts both.
    let hash_mismatch = vec![entry(0, 64, [7; 32]), entry(200, 70, [0xAA; 32])];
    assert_eq!(
        corroborate_untrusted_entries(&hash_mismatch, &recovered, p, None, true),
        UntrustedRecovery::FailClosedCorroboratedLoss,
        "an anchored manifest whose entry at P matches length but NOT content hash must fail \
         closed; a `&&`→`||` or a hash `==`→`!=` mutation would falsely recover"
    );
}

/// Build two intact CRC-valid frames followed by an UNTRUSTED footer that records
/// ZERO SIDX entries (an empty, un-anchored manifest). This is the round-7
/// completed-feature scenario: a non-empty recovered prefix under an untrusted
/// footer with NO corroboration. Returns `(bytes, frames_end)`.
fn two_intact_frames_then_untrusted_empty_footer() -> (Vec<u8>, u64) {
    let (f0, _e0) = corroboratable_frame(0, 1, &serde_json::json!({"v": "a"}));
    let mut bytes = f0;
    let f1_off = bytes.len() as u64;
    let (f1, _e1) = corroboratable_frame(f1_off, 2, &serde_json::json!({"v": "b"}));
    bytes.extend_from_slice(&f1);
    let frames_end = bytes.len() as u64;
    // Empty manifest (zero entries), then flip a footer byte → UNTRUSTED.
    append_untrusted_footer(&mut bytes, &[]);
    (bytes, frames_end)
}

#[test]
fn strict_untrusted_footer_nonempty_prefix_no_corroboration_fails_closed() {
    // LEG 1 (the completed feature): STRICT (`FailClosed` → true) posture + an
    // UNTRUSTED footer + a NON-EMPTY recovered CRC-valid prefix (two intact frames)
    // + NO corroborating manifest entry → REFUSE. The untrusted footer is exactly
    // the thing that would bound the frame region, so a torn/truncated further
    // committed frame past the prefix cannot be ruled out. This is the leg that was
    // silently DROPPED before the feature was completed (the flag was ignored, so
    // this recovered instead of refusing).
    let (bytes, frames_end) = two_intact_frames_then_untrusted_empty_footer();
    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let result = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, true);
    assert!(
        matches!(
            &result,
            Err(StoreError::CorruptSegment { segment_id: 7, .. })
        ),
        "PROPERTY: strict posture must REFUSE an unprovable non-empty tail with \
         CorruptSegment{{segment_id:7}}, not recover ({frames_end}); got {result:?}"
    );
    let detail = match result {
        Err(StoreError::CorruptSegment { detail, .. }) => detail,
        _ => String::new(),
    };
    assert!(
        detail.contains("unprovable tail") && detail.contains("NO corroborating"),
        "the strict refusal must carry the distinct unprovable-tail detail (not the \
         corroborated-loss message); got detail: {detail}"
    );
}

#[test]
fn permissive_untrusted_footer_nonempty_prefix_no_corroboration_recovers() {
    // LEG 2: the DEFAULT permissive (`RecoverTornTail` → false) posture over the
    // EXACT SAME bytes as leg 1 must RECOVER the full CRC-valid prefix — a benign
    // corrupt-footer store whose frames are intact must still open. resolve returns
    // the recovered frame-region end, which must equal the true frames_end (both
    // committed frames recovered, nothing dropped).
    let (bytes, frames_end) = two_intact_frames_then_untrusted_empty_footer();
    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let recovered = resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, None, false)
        .expect("LEG 2: the default permissive posture must recover a benign corrupt-footer store")
        .frames_end;
    assert_eq!(
        recovered, frames_end,
        "PROPERTY: the default posture recovers the entire CRC-valid prefix (both intact frames) — \
         the DEFAULT path is unchanged by the feature"
    );
}

#[test]
fn corroborated_proven_loss_fails_closed_under_both_policies() {
    // LEG 3 (unchanged by the feature): a CORROBORATED manifest that attests to a
    // committed frame missing from the recovered stream is PROVEN data loss and
    // fails closed REGARDLESS of tail policy. We drive corroborate directly: an
    // anchor at offset 0 corroborates (matching length + content hash), and an
    // entry names a committed frame at offset == P (128) that is NOT in R.
    let recovered: RecoveredFrameMap = [(
        0u64,
        RecoveredFrame {
            frame_length: 64,
            event_hash: Some([7u8; 32]),
        },
    )]
    .into_iter()
    .collect();
    let p = 64u64; // one recovered 64-byte frame → prefix end at 64
    let entry = |frame_offset: u64, frame_length: u32, event_hash: [u8; 32]| {
        crate::store::segment::sidx::SidxEntry {
            event_id: 1,
            entity_idx: 0,
            scope_idx: 0,
            kind: kind_to_raw(EventKind::custom(0x1, 1)),
            wall_ms: 1,
            clock: 1,
            dag_lane: 0,
            dag_depth: 0,
            prev_hash: [0; 32],
            event_hash,
            frame_offset,
            frame_length,
            global_sequence: 1,
            correlation_id: 1,
            causation_id: 0,
        }
    };
    // Anchor at 0 corroborates; entry at P (64) is missing from R → proven loss.
    let manifest = vec![entry(0, 64, [7; 32]), entry(64, 64, [9; 32])];
    for fallback_fail_closed in [false, true] {
        assert_eq!(
            corroborate_untrusted_entries(&manifest, &recovered, p, None, fallback_fail_closed),
            UntrustedRecovery::FailClosedCorroboratedLoss,
            "PROPERTY: a corroborated missing committed frame is proven loss and fails closed \
             regardless of tail policy (fallback_fail_closed = {fallback_fail_closed})"
        );
    }
}

/// Non-corroborating fixture shared by the footer-claimed-end truncation-evidence
/// tests: a NON-EMPTY recovered prefix of two 64-byte frames ending at P = 128, plus
/// a `forged` entry set whose content hashes match NO recovered frame (matching
/// offsets + lengths but wrong `event_hash`), so `any_corroborated` is false and the
/// decision falls to case (c) where the footer-claimed-end cross-check lives.
fn non_corroborating_prefix_fixture() -> (
    RecoveredFrameMap,
    Vec<crate::store::segment::sidx::SidxEntry>,
    u64,
) {
    let recovered: RecoveredFrameMap = [
        (
            0u64,
            RecoveredFrame {
                frame_length: 64,
                event_hash: Some([7u8; 32]),
            },
        ),
        (
            64u64,
            RecoveredFrame {
                frame_length: 64,
                event_hash: Some([8u8; 32]),
            },
        ),
    ]
    .into_iter()
    .collect();
    let entry = |frame_offset: u64, frame_length: u32, event_hash: [u8; 32]| {
        crate::store::segment::sidx::SidxEntry {
            event_id: 1,
            entity_idx: 0,
            scope_idx: 0,
            kind: kind_to_raw(EventKind::custom(0x1, 1)),
            wall_ms: 1,
            clock: 1,
            dag_lane: 0,
            dag_depth: 0,
            prev_hash: [0; 32],
            event_hash,
            frame_offset,
            frame_length,
            global_sequence: 1,
            correlation_id: 1,
            causation_id: 0,
        }
    };
    // Forged: matching offsets + lengths but WRONG content hashes → zero corroboration.
    let forged = vec![entry(0, 64, [0xAA; 32]), entry(64, 64, [0xBB; 32])];
    (recovered, forged, 128)
}

#[test]
fn corroborate_footer_claimed_past_prefix_yields_truncation_evidence() {
    // NEW leg: NO corroborating manifest, but the untrusted footer's OWN claimed frame
    // end lies strictly PAST the recovered prefix end P → a torn/corrupt region sits
    // between the recovered frames and the footer = POSITIVE (if untrusted) truncation
    // evidence. The strict and permissive postures diverge on what to do with it.
    let (recovered, forged, p) = non_corroborating_prefix_fixture();

    // STRICT: refuse with the DISTINCT evidence-of-truncation variant (NOT the
    // absence-only FailClosedUnprovableTail). Convicts a swap of the `recovery_stop <
    // claimed` gap guard and a wrong strict-branch variant selection.
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, Some(p + 64), true),
        UntrustedRecovery::FailClosedEvidenceOfTruncation {
            footer_claimed_end: p + 64,
        },
        "strict posture with a footer claiming frames past P must fail closed with POSITIVE \
         truncation evidence, not the mere-absence unprovable-tail variant"
    );

    // PERMISSIVE (default): recover the CRC-valid prefix WHILE recording the evidence.
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, Some(p + 64), false),
        UntrustedRecovery::RecoverPrefixWithTruncationEvidence {
            end: p,
            footer_claimed_end: p + 64,
        },
        "permissive posture recovers the prefix but records the footer-cross-checked truncation \
         evidence (end = P, footer_claimed_end past P) for the caller"
    );
}

#[test]
fn corroborate_footer_claimed_at_or_absent_is_not_truncation_evidence() {
    // The gap guard is STRICT (`recovery_stop < claimed`, NOT `<=`): a footer whose
    // claimed end equals P (frames end exactly at the footer = a CLEAN segment) is NOT
    // evidence, and an absent footer hint is not either. Both fall through to the
    // pre-existing absence-only decisions — the behavior-preserving `None` path.
    let (recovered, forged, p) = non_corroborating_prefix_fixture();

    // footer_claimed_end == P (no gap): strict → the mere-absence unprovable tail, NOT
    // evidence-of-truncation. Convicts a `<`→`<=` mutant on the gap guard.
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, Some(p), true),
        UntrustedRecovery::FailClosedUnprovableTail,
        "footer claiming frames end exactly AT P is a clean boundary, not truncation evidence \
         (the gap guard is `<`, not `<=`)"
    );

    // No footer hint at all: strict → unchanged unprovable-tail (the migrated-`None`
    // path that reproduces the pre-feature behavior exactly).
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, None, true),
        UntrustedRecovery::FailClosedUnprovableTail,
        "with no footer-claimed-end hint the strict posture keeps its pre-existing \
         unprovable-tail refusal"
    );

    // footer_claimed_end == P under PERMISSIVE: plain prefix recovery, no evidence.
    assert_eq!(
        corroborate_untrusted_entries(&forged, &recovered, p, Some(p), false),
        UntrustedRecovery::RecoverPrefix(p),
        "a clean at-P boundary under the permissive posture recovers the plain prefix with no \
         truncation evidence"
    );
}

#[test]
fn resolve_out_of_bounds_footer_claim_degrades_to_unprovable_tail_not_evidence() {
    // resolve-level: two intact CRC-valid frames + an UNTRUSTED empty footer (zero
    // entries → no corroboration). The caller passes a footer-claimed-end that is
    // OUT OF BOUNDS — well past `file_len - 16`, leaving no room for even the 16-byte
    // trailer after it. That is a forged offset, NOT a torn frame region, so `resolve`
    // must BOUND it out and fall back to the absence-only unprovable-tail refusal,
    // never a FALSE evidence-of-truncation. (Pins the `claimed <= file_len - TRAILER_LEN`
    // guard — the exact fix for `append_frames_from_segment` on an out-of-bounds footer.)
    let (bytes, frames_end) = two_intact_frames_then_untrusted_empty_footer();
    let file_len = bytes.len() as u64;
    let mut cursor = Cursor::new(bytes);
    let result =
        resolve_untrusted_frames_end(&mut cursor, 0, file_len, 7, Some(frames_end + 64), true);
    assert!(
        matches!(
            &result,
            Err(StoreError::CorruptSegment { segment_id: 7, .. })
        ),
        "PROPERTY: strict posture must REFUSE an out-of-bounds untrusted footer with \
         CorruptSegment{{segment_id:7}}; got {result:?}"
    );
    let detail = match result {
        Err(StoreError::CorruptSegment { detail, .. }) => detail,
        _ => String::new(),
    };
    assert!(
        detail.contains("unprovable tail"),
        "an OUT-OF-BOUNDS footer claim is forgery, not truncation evidence: the strict refusal must \
         carry the absence-only unprovable-tail detail, NOT the POSITIVE-truncation-evidence one; \
         got detail: {detail}"
    );
}