use super::*;
use std::io::Cursor;
use tempfile::TempDir;
fn realfs() -> std::sync::Arc<dyn crate::store::platform::fs::StoreFs> {
std::sync::Arc::new(crate::store::platform::fs::RealFs)
}
fn frames_then_sdx3_footer(payloads: &[&str]) -> (Vec<u8>, u64) {
use crate::store::segment::sidx::{kind_to_raw, SidxEntry, SidxEntryCollector};
let mut bytes = Vec::new();
let mut collector = SidxEntryCollector::new();
for (idx, p) in payloads.iter().enumerate() {
let frame_offset = bytes.len() as u64;
let frame = frame_encode(&serde_json::json!({ "payload": p })).expect("encode frame");
let frame_length = u32::try_from(frame.len()).expect("frame length fits u32");
bytes.extend_from_slice(&frame);
let entry = SidxEntry {
event_id: idx as u128 + 1,
entity_idx: 0,
scope_idx: 0,
kind: kind_to_raw(crate::event::EventKind::custom(0x1, 1)),
wall_ms: 1,
clock: 1,
dag_lane: 0,
dag_depth: 0,
prev_hash: [0; 32],
event_hash: [1; 32],
frame_offset,
frame_length,
global_sequence: idx as u64 + 1,
correlation_id: 1,
causation_id: 0,
};
collector
.record(entry, "entity:test", "scope:test")
.expect("intern test strings");
}
let frames_end = bytes.len() as u64;
let mut cursor = Cursor::new(&mut bytes);
cursor.seek(SeekFrom::End(0)).expect("seek to footer start");
collector
.write_footer(&mut cursor, 7)
.expect("write footer");
(bytes, frames_end)
}
fn frame_total_len_at(bytes: &[u8], offset: u64) -> u64 {
let start = usize::try_from(offset).expect("offset fits usize");
let header: [u8; 4] = bytes[start..start + 4]
.try_into()
.expect("4-byte frame length prefix");
let payload_len = u64::from(u32::from_be_bytes(header));
8 + payload_len
}
#[test]
fn detect_sidx_boundary_trusts_a_crc_valid_sdx3_footer() {
let (bytes, frames_end) = frames_then_sdx3_footer(&["a", "b", "c"]);
let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let boundary = detect_sidx_boundary(&mut cursor, file_len, 7)
.expect("must not error")
.expect("a CRC-valid SDX3 footer is a boundary");
assert_eq!(
boundary,
SidxBoundary {
frames_end,
trusted: true,
},
"PROPERTY: a CRC-valid SDX3 footer must mark the boundary trusted with the true frames_end"
);
}
#[test]
fn detect_sidx_boundary_distrusts_a_crc_failed_sdx3_footer() {
let (mut bytes, frames_end) = frames_then_sdx3_footer(&["a", "b", "c"]);
let corrupt_at = usize::try_from(frames_end).expect("frames_end fits usize");
bytes[corrupt_at] ^= 0xFF;
let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let boundary = detect_sidx_boundary(&mut cursor, file_len, 7)
.expect("must not error")
.expect("a CRC-failed footer is still recognized as a boundary");
assert_eq!(
boundary,
SidxBoundary {
frames_end,
trusted: false,
},
"PROPERTY: a CRC-failed SDX3 footer must be recognized as a boundary but flagged untrusted"
);
}
#[test]
fn crc_valid_frames_end_recovers_all_frames_for_a_too_high_hint() {
let (bytes, real_frames_end) = frames_then_sdx3_footer(&["x", "y", "z"]);
let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let recovered = crc_valid_frames_end(&mut cursor, 0, file_len, 7).expect("must not error");
assert_eq!(
recovered, real_frames_end,
"PROPERTY: the walker, bounded by file_len, stops at the true CRC-valid frame end and \
recovers all frames regardless of the (discarded) hint"
);
}
#[test]
fn crc_valid_frames_end_recovers_all_frames_for_a_too_low_hint() {
let (bytes, real_frames_end) = frames_then_sdx3_footer(&["a", "b", "c"]);
let file_len = bytes.len() as u64;
let too_low = frame_total_len_at(&bytes, 0);
assert!(
too_low > 0 && too_low < real_frames_end,
"too-low hint must land on an interior frame boundary"
);
let mut cursor = Cursor::new(bytes);
let recovered = crc_valid_frames_end(&mut cursor, 0, file_len, 7).expect("must not error");
assert_eq!(
recovered, real_frames_end,
"PROPERTY: a too-low untrusted hint must NOT bound recovery; all CRC-valid frames recover"
);
}
#[test]
fn crc_valid_frames_end_recovers_all_frames_for_a_mid_frame_hint() {
let (bytes, real_frames_end) = frames_then_sdx3_footer(&["alpha", "beta", "gamma"]);
let file_len = bytes.len() as u64;
let second_start = frame_total_len_at(&bytes, 0);
let third_start = second_start + frame_total_len_at(&bytes, second_start);
let mid_frame_hint = third_start + 3; assert!(
mid_frame_hint > third_start && mid_frame_hint < real_frames_end,
"hint must land strictly inside a later CRC-valid frame, not at a boundary"
);
let mut cursor = Cursor::new(bytes);
let recovered = crc_valid_frames_end(&mut cursor, 0, file_len, 7).expect("must not error");
assert_eq!(
recovered, real_frames_end,
"PROPERTY: a mid-frame untrusted hint must NOT drop the containing CRC-valid frame; all \
frames recover (this is the round-4 P1)"
);
}
#[test]
fn crc_valid_frames_end_stops_at_first_non_frame_byte() {
let (bytes, real_frames_end) = frames_then_sdx3_footer(&["p", "q"]);
let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let recovered = crc_valid_frames_end(&mut cursor, 0, file_len, 7).expect("must not error");
assert_eq!(
recovered, real_frames_end,
"PROPERTY: the walk stops at the true frame end (footer bytes never decode as a frame)"
);
}
#[test]
fn crc_valid_frames_end_fails_closed_on_mid_stream_corruption() {
let (mut bytes, frames_end) = frames_then_sdx3_footer(&["a", "b", "c", "d", "e"]);
let f2_start = frame_total_len_at(&bytes, 0);
let f3_start = f2_start + frame_total_len_at(&bytes, f2_start);
let third_payload_byte = usize::try_from(f3_start + 8).expect("offset fits usize");
assert!(
(third_payload_byte as u64) < frames_end,
"corruption target must be inside the frame region"
);
bytes[third_payload_byte] ^= 0x01; let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let result = crc_valid_frames_end(&mut cursor, 0, file_len, 7);
assert!(
matches!(
result,
Err(StoreError::CorruptSegment { segment_id: 7, .. })
),
"PROPERTY: mid-stream corruption (CRC-valid frames after the first bad frame) must \
FailClosed with CorruptSegment, not silently recover the prefix; got {result:?}"
);
}
#[test]
fn crc_valid_frames_end_recovers_prefix_for_torn_last_frame() {
let (bytes, _frames_end) = frames_then_sdx3_footer(&["a", "b", "c"]);
let f2_start = frame_total_len_at(&bytes, 0);
let f3_start = f2_start + frame_total_len_at(&bytes, f2_start);
let prefix_end = usize::try_from(f3_start + 8 + 1).expect("offset fits usize");
let torn = bytes[..prefix_end].to_vec();
let file_len = torn.len() as u64;
let mut cursor = Cursor::new(torn);
let recovered =
crc_valid_frames_end(&mut cursor, 0, file_len, 7).expect("torn tail must not error");
assert_eq!(
recovered, f3_start,
"PROPERTY: a torn last frame with nothing valid after it recovers the clean prefix \
(ends at the start of the torn frame), not FailClosed"
);
}
fn sidx_trailer_buf(total_len: usize, string_table_offset: u64) -> Vec<u8> {
assert!(total_len >= 16, "buffer must hold the 16-byte trailer");
let mut bytes = vec![0u8; total_len];
let trailer_start = total_len - 16;
bytes[trailer_start..trailer_start + 8].copy_from_slice(&string_table_offset.to_le_bytes());
bytes[trailer_start + 8..trailer_start + 12].copy_from_slice(&0u32.to_le_bytes());
bytes[trailer_start + 12..trailer_start + 16]
.copy_from_slice(crate::store::segment::sidx::SIDX_MAGIC);
bytes
}
#[test]
fn detect_sidx_boundary_untrusted_offset_past_trailer_does_not_error() {
let bytes = sidx_trailer_buf(64, 63); let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let result = detect_sidx_boundary(&mut cursor, file_len, 7).expect(
"PROPERTY: an out-of-bounds UNTRUSTED offset must NOT error — it must downgrade to the \
CRC-valid-frame recovery scan",
);
assert_eq!(
result,
Some(SidxBoundary {
frames_end: 63,
trusted: false,
}),
"PROPERTY: an out-of-bounds offset on an unauthenticated footer is recognized as an \
untrusted boundary (offset discarded by callers), never a hard error; got {result:?}"
);
}
#[test]
fn detect_sidx_boundary_synthetic_footer_is_never_trusted() {
let file_len = 64u64;
let max_offset = file_len - 16;
let bytes = sidx_trailer_buf(
usize::try_from(file_len).expect("file_len fits usize"),
max_offset,
);
let mut cursor = Cursor::new(bytes);
let result = detect_sidx_boundary(&mut cursor, file_len, 7).expect("must not error");
assert_eq!(
result.map(|b| b.trusted),
Some(false),
"PROPERTY: a synthetic (un-CRC'd) footer is never trusted"
);
}
#[test]
fn detect_sidx_boundary_untrusted_out_of_bounds_offset_is_inert() {
let (mut bytes, _frames_end) = frames_then_sdx3_footer(&["a", "b", "c"]);
let n = bytes.len() as u64;
let off_pos = bytes.len() - 16;
bytes[off_pos..off_pos + 8].copy_from_slice(&n.to_le_bytes());
let mut cursor = Cursor::new(bytes);
let boundary = detect_sidx_boundary(&mut cursor, n, 7).expect(
"PROPERTY: an out-of-bounds offset on an UNTRUSTED footer must NOT hard-error — it must \
return an untrusted boundary so the caller falls back to crc_valid_frames_end",
);
assert_eq!(
boundary.map(|b| b.trusted),
Some(false),
"PROPERTY: an out-of-bounds unauthenticated footer is an untrusted boundary, never a hard \
error; got {boundary:?}"
);
}
#[test]
fn crc_valid_frames_end_recovers_all_frames_for_adversarial_untrusted_offsets() {
let (bytes, real_frames_end) = frames_then_sdx3_footer(&["alpha", "beta", "gamma", "delta"]);
let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let recovered = crc_valid_frames_end(&mut cursor, 0, file_len, 7)
.expect("PROPERTY: the untrusted walker never errors over intact CRC-valid frames");
assert_eq!(
recovered, real_frames_end,
"PROPERTY: the untrusted-recovery walker recovers ALL CRC-valid frames bounded only by \
file_len, independent of any (discarded) offset hint"
);
}
#[test]
fn append_frames_from_segment_fails_closed_for_untrusted_too_low_offset_no_corroboration() {
use std::io::Write as _;
let dir = TempDir::new().expect("tmpdir");
let mut source: Segment<Active> =
Segment::create_with_created_ns_on(dir.path(), 1, 0, &realfs()).expect("create source");
let frame =
frame_encode(&serde_json::json!({"payload": "compaction-recover"})).expect("encode frame");
source.write_frame(&frame).expect("write frame");
let source_path = source.path.clone();
source
.sync_with_mode(&crate::store::SyncMode::default())
.expect("sync source");
drop(source);
let mut trailer = [0u8; 16];
trailer[0..8].copy_from_slice(&0u64.to_le_bytes());
trailer[8..12].copy_from_slice(&0u32.to_le_bytes());
trailer[12..16].copy_from_slice(crate::store::segment::sidx::SIDX_MAGIC);
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&source_path)
.expect("open source for trailer append");
f.write_all(&trailer).expect("append forged trailer");
drop(f);
let mut dest: Segment<Active> =
Segment::create_with_created_ns_on(dir.path(), 2, 0, &realfs()).expect("create dest");
let dest_header_bytes = dest.written_bytes;
let result = dest.append_frames_from_segment(&source_path);
assert!(
matches!(
&result,
Err(StoreError::CorruptSegment { detail, .. })
if detail.contains("unprovable tail")
),
"PROPERTY: the strict compaction copy must FAIL CLOSED on an unprovable non-empty tail \
under an untrusted footer with no corroborating manifest; got {result:?}"
);
assert_eq!(
dest.written_bytes, dest_header_bytes,
"a refused compaction copy must not append any source bytes to the destination"
);
}
#[test]
fn append_frames_from_segment_fails_closed_for_untrusted_out_of_bounds_offset_no_corroboration() {
use std::io::Write as _;
let dir = TempDir::new().expect("tmpdir");
let mut source: Segment<Active> =
Segment::create_with_created_ns_on(dir.path(), 1, 0, &realfs()).expect("create source");
let frame = frame_encode(&serde_json::json!({"payload": "compaction-oob-recover"}))
.expect("encode frame");
source.write_frame(&frame).expect("write frame");
let source_path = source.path.clone();
source
.sync_with_mode(&crate::store::SyncMode::default())
.expect("sync source");
drop(source);
let mut trailer = [0u8; 16];
trailer[8..12].copy_from_slice(&0u32.to_le_bytes());
trailer[12..16].copy_from_slice(crate::store::segment::sidx::SIDX_MAGIC);
let file_len = {
use std::io::Seek as _;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&source_path)
.expect("open source for trailer append");
f.write_all(&trailer).expect("append forged trailer");
f.seek(SeekFrom::End(0)).expect("seek to end for file_len")
};
{
use std::io::Seek as _;
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&source_path)
.expect("open source to rewrite offset");
f.seek(SeekFrom::End(-16)).expect("seek to trailer offset");
f.write_all(&file_len.to_le_bytes())
.expect("write out-of-bounds offset");
}
let mut dest: Segment<Active> =
Segment::create_with_created_ns_on(dir.path(), 2, 0, &realfs()).expect("create dest");
let dest_header_bytes = dest.written_bytes;
let result = dest.append_frames_from_segment(&source_path);
assert!(
matches!(
&result,
Err(StoreError::CorruptSegment { detail, .. })
if detail.contains("unprovable tail")
),
"PROPERTY: the strict compaction copy must FAIL CLOSED on an unprovable non-empty tail \
under an out-of-bounds untrusted footer with no corroborating manifest; got {result:?}"
);
assert_eq!(
dest.written_bytes, dest_header_bytes,
"a refused compaction copy must not append any source bytes to the destination"
);
}
#[test]
fn append_frames_from_segment_accepts_trusted_empty_frame_region() {
use std::io::{Seek as _, Write as _};
let dir = TempDir::new().expect("tmpdir");
let source: Segment<Active> =
Segment::create_with_created_ns_on(dir.path(), 1, 0, &realfs()).expect("create source");
let source_path = source.path.clone();
let frames_start = source.written_bytes;
drop(source);
{
let mut f = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&source_path)
.expect("open empty source to append footer");
f.seek(SeekFrom::End(0)).expect("seek to footer start");
let position = f.stream_position().expect("footer position");
assert_eq!(
position, frames_start,
"fixture sanity: an empty segment's footer must begin exactly at frames_start"
);
crate::store::segment::sidx::SidxEntryCollector::new()
.write_footer(&mut f, 1)
.expect("write empty CRC-valid footer");
f.flush().expect("flush footer");
}
{
let mut probe = std::fs::OpenOptions::new()
.read(true)
.open(&source_path)
.expect("open source for boundary probe");
let file_len = probe.seek(SeekFrom::End(0)).expect("file_len");
let boundary = detect_sidx_boundary(&mut probe, file_len, 1)
.expect("boundary detect must not error")
.expect("CRC-valid SDX3 footer is a boundary");
assert_eq!(
boundary,
SidxBoundary {
frames_end: frames_start,
trusted: true,
},
"fixture sanity: empty trusted footer reports frames_end == frames_start"
);
}
let mut dest: Segment<Active> =
Segment::create_with_created_ns_on(dir.path(), 2, 0, &realfs()).expect("create dest");
dest.append_frames_from_segment(&source_path).expect(
"PROPERTY: a trusted EMPTY frame region (frames_end == frames_start) must copy cleanly — \
the lower-bound guard `frames_end < frames_start` must NOT reject the inclusive boundary",
);
}
#[test]
fn try_decode_frame_at_admits_a_frame_ending_exactly_at_file_len() {
let frame = frame_encode(&serde_json::json!({ "payload": "exact-tail" })).expect("encode");
let file_len = frame.len() as u64;
let mut source = Cursor::new(frame);
let admitted =
try_decode_frame_at(&mut source, 0, file_len).expect("in-memory probe cannot fail seek");
assert_eq!(
admitted,
Some(file_len),
"a CRC-valid frame whose tail lands exactly on file_len is a real frame"
);
let admitted_past_bound = try_decode_frame_at(&mut source, 0, file_len - 1)
.expect("in-memory probe cannot fail seek");
assert_eq!(
admitted_past_bound, None,
"a frame tail strictly past file_len must be rejected by the bound, not by a short read"
);
}
#[test]
fn frame_decode_admits_an_eight_byte_empty_payload_frame() {
let mut frame = Vec::with_capacity(8);
frame.extend_from_slice(&0u32.to_be_bytes()); frame.extend_from_slice(&0u32.to_be_bytes()); assert_eq!(
frame.len(),
8,
"fixture must be exactly the 8-byte frame header"
);
let (msgpack, consumed) = frame_decode(&frame)
.expect("an 8-byte [len=0][crc=0] buffer is the minimal valid empty-payload frame");
assert!(
msgpack.is_empty(),
"a zero-length frame decodes to an empty payload slice"
);
assert_eq!(consumed, 8, "the whole 8-byte frame is consumed");
}
#[test]
fn detect_sidx_boundary_recognizes_a_file_that_is_exactly_the_trailer() {
let bytes = sidx_trailer_buf(16, 0);
let file_len = bytes.len() as u64;
let mut cursor = Cursor::new(bytes);
let boundary = detect_sidx_boundary(&mut cursor, file_len, 7)
.expect("a 16-byte trailer file must not error");
assert_eq!(
boundary,
Some(SidxBoundary {
frames_end: 0,
trusted: false,
}),
"a file that is exactly the 16-byte trailer is an untrusted boundary, not 'no footer'"
);
}
#[test]
fn try_decode_frame_at_reads_the_header_when_exactly_eight_bytes_remain() {
let mut source = Cursor::new(vec![0u8; 8]);
source
.seek(SeekFrom::Start(3))
.expect("park the cursor before the call");
let decoded =
try_decode_frame_at(&mut source, 0, 8).expect("in-memory probe cannot fail its seek");
assert_eq!(
decoded, None,
"8 bytes hold only a header, no non-empty frame"
);
assert_eq!(
source.position(),
8,
"the header at `at` must be seeked-to and read (cursor at at+8), not short-circuited"
);
}
#[test]
fn try_decode_frame_at_short_circuits_below_a_header_without_consuming() {
let mut source = Cursor::new(vec![0u8; 4]);
source
.seek(SeekFrom::Start(2))
.expect("park the cursor before the call");
let decoded =
try_decode_frame_at(&mut source, 0, 4).expect("in-memory probe cannot fail its seek");
assert_eq!(
decoded, None,
"fewer than 8 bytes cannot hold a frame header"
);
assert_eq!(
source.position(),
2,
"the sub-header guard must not seek or read — the cursor stays parked"
);
}
struct SyncFailFs;
impl crate::store::platform::fs::StoreFs for SyncFailFs {
fn read_dir(&self, path: &std::path::Path) -> std::io::Result<std::fs::ReadDir> {
crate::store::platform::fs::RealFs.read_dir(path)
}
fn create_dir_all(&self, path: &std::path::Path) -> std::io::Result<()> {
crate::store::platform::fs::RealFs.create_dir_all(path)
}
fn create_new_file(
&self,
path: &std::path::Path,
) -> Result<std::fs::File, crate::store::StoreError> {
crate::store::platform::fs::RealFs.create_new_file(path)
}
fn sync_file_with_mode(
&self,
_file: &std::fs::File,
_path: &std::path::Path,
_mode: &crate::store::SyncMode,
) -> Result<(), crate::store::StoreError> {
Err(crate::store::StoreError::corrupt_segment_with_detail(
4242,
"injected: sync_file_with_mode failed",
))
}
fn sync_file_all(&self, file: &std::fs::File, path: &std::path::Path) -> std::io::Result<()> {
crate::store::platform::fs::RealFs.sync_file_all(file, path)
}
fn sync_parent_dir(&self, path: &std::path::Path) -> Result<(), crate::store::StoreError> {
crate::store::platform::fs::RealFs.sync_parent_dir(path)
}
fn reject_symlink_leaf(
&self,
path: &std::path::Path,
purpose: &str,
) -> Result<(), crate::store::StoreError> {
crate::store::platform::fs::RealFs.reject_symlink_leaf(path, purpose)
}
fn canonicalize(&self, path: &std::path::Path) -> std::io::Result<std::path::PathBuf> {
crate::store::platform::fs::RealFs.canonicalize(path)
}
fn symlink_metadata(&self, path: &std::path::Path) -> std::io::Result<std::fs::Metadata> {
crate::store::platform::fs::RealFs.symlink_metadata(path)
}
fn cow_copy_file(
&self,
from: &std::path::Path,
to: &std::path::Path,
preference: crate::store::CopyPreference,
) -> std::io::Result<crate::store::platform::fs::CowStrategyUsed> {
crate::store::platform::fs::RealFs.cow_copy_file(from, to, preference)
}
fn copy(&self, from: &std::path::Path, to: &std::path::Path) -> std::io::Result<u64> {
crate::store::platform::fs::RealFs.copy(from, to)
}
fn metadata(&self, path: &std::path::Path) -> std::io::Result<std::fs::Metadata> {
crate::store::platform::fs::RealFs.metadata(path)
}
fn rename(&self, from: &std::path::Path, to: &std::path::Path) -> std::io::Result<()> {
crate::store::platform::fs::RealFs.rename(from, to)
}
fn remove_file(&self, path: &std::path::Path) -> std::io::Result<()> {
crate::store::platform::fs::RealFs.remove_file(path)
}
fn named_temp_in(&self, dir: &std::path::Path) -> std::io::Result<tempfile::NamedTempFile> {
crate::store::platform::fs::RealFs.named_temp_in(dir)
}
fn persist_temp_with_parent_sync(
&self,
named_temp: tempfile::NamedTempFile,
final_path: &std::path::Path,
admission: crate::store::platform::sync::ParentDirSyncAdmission,
) -> std::io::Result<()> {
crate::store::platform::fs::RealFs
.persist_temp_with_parent_sync(named_temp, final_path, admission)
}
fn read_exact_at(
&self,
file: &mut std::fs::File,
offset: u64,
buf: &mut [u8],
) -> Result<(), crate::store::platform::fs::PositionedReadError> {
crate::store::platform::fs::RealFs.read_exact_at(file, offset, buf)
}
}
#[test]
fn sync_with_mode_propagates_the_backend_sync_failure() {
let dir = TempDir::new().expect("tmpdir");
let fs: std::sync::Arc<dyn crate::store::platform::fs::StoreFs> =
std::sync::Arc::new(SyncFailFs);
let mut segment =
Segment::create_with_created_ns_on(dir.path(), 9, 0, &fs).expect("create over SyncFailFs");
let err = segment
.sync_with_mode(&crate::store::SyncMode::default())
.expect_err("a failing backend sync must surface as an error, not be swallowed");
assert!(
matches!(
err,
StoreError::CorruptSegment { segment_id: 4242, ref detail }
if detail.contains("injected: sync_file_with_mode failed")
),
"sync_with_mode must propagate the exact backend sync failure; got {err:?}"
);
}