use crate::event::EventKind;
use crate::event::HashChain;
use crate::store::cold_start::{ColdStartIndexRow, ColdStartSource};
use crate::store::index::interner::InternId;
use crate::store::StoreError;
use std::collections::{BTreeMap, HashMap};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use tracing::warn;
pub(crate) const SIDX_MAGIC: &[u8; 4] = b"SDX2";
const TRAILER_SIZE: u64 = 16;
pub(crate) const ENTRY_SIZE: usize = 162;
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub(crate) struct ReservedKindFallbackStats {
pub(crate) system: usize,
pub(crate) effect: usize,
#[serde(default)]
pub(crate) system_histogram: BTreeMap<u16, usize>,
#[serde(default)]
pub(crate) effect_histogram: BTreeMap<u16, usize>,
}
impl ReservedKindFallbackStats {
pub(crate) fn record_system(&mut self, raw: u16) {
self.system += 1;
*self.system_histogram.entry(raw).or_insert(0) += 1;
}
pub(crate) fn record_effect(&mut self, raw: u16) {
self.effect += 1;
*self.effect_histogram.entry(raw).or_insert(0) += 1;
}
pub(crate) fn merge_from(&mut self, other: &Self) {
self.system += other.system;
self.effect += other.effect;
for (&raw, &count) in &other.system_histogram {
*self.system_histogram.entry(raw).or_insert(0) += count;
}
for (&raw, &count) in &other.effect_histogram {
*self.effect_histogram.entry(raw).or_insert(0) += count;
}
}
pub(crate) fn add(mut self, other: &Self) -> Self {
self.merge_from(other);
self
}
}
const _ASSERT_ENTRY_SIZE: () = {
assert!(
ENTRY_SIZE == 162,
"ENTRY_SIZE must equal 162 — update when SidxEntry layout changes"
);
};
#[inline]
pub(crate) fn kind_to_raw(kind: EventKind) -> u16 {
(u16::from(kind.category()) << 12) | kind.type_id()
}
fn raw_to_kind_impl(raw: u16, counts: Option<&mut ReservedKindFallbackStats>) -> EventKind {
let category = (raw >> 12) as u8;
match category {
0x0 => match raw {
0x0001 => EventKind::SYSTEM_INIT,
0x0002 => EventKind::SYSTEM_SHUTDOWN,
0x0003 => EventKind::SYSTEM_HEARTBEAT,
0x0004 => EventKind::SYSTEM_CONFIG_CHANGE,
0x0005 => EventKind::SYSTEM_CHECKPOINT,
0x0006 => EventKind::SYSTEM_BATCH_BEGIN,
0x0007 => EventKind::SYSTEM_BATCH_COMMIT,
0x0008 => EventKind::SYSTEM_OPEN_COMPLETED,
0x0009 => EventKind::SYSTEM_CLOSE_COMPLETED,
0x000F => EventKind::SYSTEM_DENIAL,
0x0FFE => EventKind::TOMBSTONE,
0x0000 => EventKind::DATA,
_ => {
if let Some(counts) = counts {
counts.record_system(raw);
}
warn!(
raw,
"unrecognized reserved system kind in SIDX footer; falling back to DATA"
);
EventKind::DATA
}
},
0xD => match raw {
0xD001 => EventKind::EFFECT_ERROR,
0xD002 => EventKind::EFFECT_RETRY,
0xD004 => EventKind::EFFECT_ACK,
0xD005 => EventKind::EFFECT_BACKPRESSURE,
0xD006 => EventKind::EFFECT_CANCEL,
0xD007 => EventKind::EFFECT_CONFLICT,
_ => {
if let Some(counts) = counts {
counts.record_effect(raw);
}
warn!(
raw,
"unrecognized reserved effect kind in SIDX footer; falling back to EFFECT_ERROR"
);
EventKind::EFFECT_ERROR
}
},
other => EventKind::custom(other, raw & 0x0FFF),
}
}
#[cfg(test)]
pub(crate) fn raw_to_kind(raw: u16) -> EventKind {
raw_to_kind_impl(raw, None)
}
pub(crate) fn raw_to_kind_counted(raw: u16, counts: &mut ReservedKindFallbackStats) -> EventKind {
raw_to_kind_impl(raw, Some(counts))
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct SidxEntry {
pub event_id: u128,
pub entity_idx: u32,
pub scope_idx: u32,
pub kind: u16,
pub wall_ms: u64,
pub clock: u32,
pub dag_lane: u32,
pub dag_depth: u32,
pub prev_hash: [u8; 32],
pub event_hash: [u8; 32],
pub frame_offset: u64,
pub frame_length: u32,
pub global_sequence: u64,
pub correlation_id: u128,
pub causation_id: u128,
}
impl SidxEntry {
pub(crate) fn to_disk_pos(&self, segment_id: u64) -> crate::store::DiskPos {
crate::store::DiskPos::new(segment_id, self.frame_offset, self.frame_length)
}
pub(crate) fn to_cold_start_row(&self, segment_id: u64) -> ColdStartIndexRow {
self.to_cold_start_row_counted(segment_id, &mut ReservedKindFallbackStats::default())
}
pub(crate) fn to_cold_start_row_counted(
&self,
segment_id: u64,
counts: &mut ReservedKindFallbackStats,
) -> ColdStartIndexRow {
ColdStartIndexRow {
source: ColdStartSource::Sidx,
event_id: self.event_id,
correlation_id: self.correlation_id,
causation_id: (self.causation_id != 0).then_some(self.causation_id),
entity_id: InternId(self.entity_idx),
scope_id: InternId(self.scope_idx),
kind: raw_to_kind_counted(self.kind, counts),
wall_ms: self.wall_ms,
clock: self.clock,
dag_lane: self.dag_lane,
dag_depth: self.dag_depth,
hash_chain: HashChain {
prev_hash: self.prev_hash,
event_hash: self.event_hash,
},
disk_pos: self.to_disk_pos(segment_id),
global_sequence: self.global_sequence,
}
}
pub(crate) fn encode_into(&self, buf: &mut [u8]) {
debug_assert_eq!(
buf.len(),
ENTRY_SIZE,
"encode_into: buf must be ENTRY_SIZE bytes"
);
let mut pos = 0usize;
macro_rules! put_le {
($val:expr, $n:expr) => {{
buf[pos..pos + $n].copy_from_slice(&($val).to_le_bytes());
pos += $n;
}};
}
macro_rules! put_bytes {
($arr:expr) => {{
let slice: &[u8] = &$arr;
buf[pos..pos + slice.len()].copy_from_slice(slice);
pos += slice.len();
}};
}
put_le!(self.event_id, 16);
put_le!(self.entity_idx, 4);
put_le!(self.scope_idx, 4);
put_le!(self.kind, 2);
put_le!(self.wall_ms, 8);
put_le!(self.clock, 4);
put_le!(self.dag_lane, 4);
put_le!(self.dag_depth, 4);
put_bytes!(self.prev_hash);
put_bytes!(self.event_hash);
put_le!(self.frame_offset, 8);
put_le!(self.frame_length, 4);
put_le!(self.global_sequence, 8);
put_le!(self.correlation_id, 16);
put_le!(self.causation_id, 16);
debug_assert_eq!(pos, ENTRY_SIZE, "encode_into: wrote wrong byte count");
}
pub(crate) fn decode_from(buf: &[u8], segment_id: u64) -> Result<Self, StoreError> {
if buf.len() != ENTRY_SIZE {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry buffer is {} bytes, expected {ENTRY_SIZE}",
buf.len()
),
});
}
let mut pos = 0usize;
macro_rules! get_le {
($t:ty, $n:expr) => {{
let arr: [u8; $n] = buf[pos..pos + $n]
.try_into()
.expect("slice length matches const");
pos += $n;
<$t>::from_le_bytes(arr)
}};
}
macro_rules! get_hash {
() => {{
let mut h = [0u8; 32];
h.copy_from_slice(&buf[pos..pos + 32]);
pos += 32;
h
}};
}
let event_id = get_le!(u128, 16);
let entity_idx = get_le!(u32, 4);
let scope_idx = get_le!(u32, 4);
let kind = get_le!(u16, 2);
let wall_ms = get_le!(u64, 8);
let clock = get_le!(u32, 4);
let dag_lane = get_le!(u32, 4);
let dag_depth = get_le!(u32, 4);
let prev_hash = get_hash!();
let event_hash = get_hash!();
let frame_offset = get_le!(u64, 8);
let frame_length = get_le!(u32, 4);
let global_sequence = get_le!(u64, 8);
let correlation_id = get_le!(u128, 16);
let causation_id = get_le!(u128, 16);
debug_assert_eq!(pos, ENTRY_SIZE, "decode_from: consumed wrong byte count");
Ok(Self {
event_id,
entity_idx,
scope_idx,
kind,
wall_ms,
clock,
dag_lane,
dag_depth,
prev_hash,
event_hash,
frame_offset,
frame_length,
global_sequence,
correlation_id,
causation_id,
})
}
#[cfg(test)]
pub(crate) fn event_kind(&self) -> EventKind {
raw_to_kind(self.kind)
}
}
pub(crate) struct SidxEntryCollector {
entries: Vec<SidxEntry>,
strings: Vec<String>,
string_map: HashMap<String, u32>,
}
impl SidxEntryCollector {
pub(crate) fn new() -> Self {
Self {
entries: Vec::new(),
strings: Vec::new(),
string_map: HashMap::new(),
}
}
pub(crate) fn record(&mut self, mut entry: SidxEntry, entity: &str, scope: &str) {
entry.entity_idx = self.intern(entity);
entry.scope_idx = self.intern(scope);
self.entries.push(entry);
}
#[cfg(test)]
pub(crate) fn entries(&self) -> &[SidxEntry] {
&self.entries
}
#[cfg(test)]
pub(crate) fn strings(&self) -> &[String] {
&self.strings
}
#[allow(clippy::expect_used)]
pub(crate) fn write_footer<W: Write + Seek>(
&self,
writer: &mut W,
segment_id: u64,
) -> Result<(), StoreError> {
let string_table_bytes = rmp_serde::to_vec_named(&self.strings)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
let string_table_offset = writer.stream_position().map_err(StoreError::Io)?;
let entry_count =
u32::try_from(self.entries.len()).map_err(|_| StoreError::SegmentTooManyEntries {
segment_id,
count: self.entries.len() as u64,
})?;
let trailer_size = usize::try_from(TRAILER_SIZE)
.expect("invariant: SIDX trailer size fits usize on every supported target");
let mut footer = Vec::with_capacity(
string_table_bytes.len() + self.entries.len() * ENTRY_SIZE + trailer_size,
);
footer.extend_from_slice(&string_table_bytes);
let mut buf = [0u8; ENTRY_SIZE];
for entry in &self.entries {
entry.encode_into(&mut buf);
footer.extend_from_slice(&buf);
}
footer.extend_from_slice(&string_table_offset.to_le_bytes());
footer.extend_from_slice(&entry_count.to_le_bytes());
footer.extend_from_slice(SIDX_MAGIC);
writer.write_all(&footer).map_err(StoreError::Io)?;
Ok(())
}
#[allow(clippy::expect_used)]
fn intern(&mut self, s: &str) -> u32 {
if let Some(&idx) = self.string_map.get(s) {
return idx;
}
let idx = u32::try_from(self.strings.len())
.expect("invariant: SIDX string table is bounded by segment size, well under u32::MAX");
self.strings.push(s.to_owned());
self.string_map.insert(s.to_owned(), idx);
idx
}
}
pub(crate) type SidxFooterData = (Vec<SidxEntry>, Vec<String>);
pub(crate) fn read_footer(path: &Path) -> Result<Option<SidxFooterData>, StoreError> {
let segment_id = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let mut file = std::fs::File::open(path).map_err(StoreError::Io)?;
let file_len = file.seek(SeekFrom::End(0)).map_err(StoreError::Io)?;
if file_len < TRAILER_SIZE {
return Ok(None);
}
file.seek(SeekFrom::End(-(TRAILER_SIZE as i64)))
.map_err(StoreError::Io)?;
let mut trailer = [0u8; 16];
file.read_exact(&mut trailer).map_err(StoreError::Io)?;
if &trailer[12..16] != SIDX_MAGIC {
return Ok(None);
}
let offset_bytes: [u8; 8] = trailer[0..8]
.try_into()
.map_err(|_| StoreError::CorruptFrame {
segment_id,
offset: 0,
reason: "trailer truncated: string_table_offset bytes not readable".into(),
})?;
let string_table_offset = u64::from_le_bytes(offset_bytes);
let count_bytes: [u8; 4] = trailer[8..12]
.try_into()
.map_err(|_| StoreError::CorruptFrame {
segment_id,
offset: 0,
reason: "trailer truncated: entry_count bytes not readable".into(),
})?;
let entry_count = u32::from_le_bytes(count_bytes) as usize;
let entries_block_len = (entry_count as u64)
.checked_mul(ENTRY_SIZE as u64)
.ok_or_else(|| StoreError::CorruptSegment {
segment_id,
detail: "SIDX entry_count × ENTRY_SIZE overflows u64".into(),
})?;
let entries_start = file_len
.checked_sub(TRAILER_SIZE)
.and_then(|n| n.checked_sub(entries_block_len))
.ok_or_else(|| StoreError::CorruptSegment {
segment_id,
detail: "SIDX entry block extends before the beginning of the file".into(),
})?;
if string_table_offset > entries_start {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX string_table_offset {string_table_offset} is past entries_start {entries_start}"
),
});
}
let string_table_len = entries_start
.checked_sub(string_table_offset)
.ok_or_else(|| StoreError::CorruptSegment {
segment_id,
detail: "SIDX string table length underflows".into(),
})?;
file.seek(SeekFrom::Start(string_table_offset))
.map_err(StoreError::Io)?;
let table_len_usize =
usize::try_from(string_table_len).map_err(|_| StoreError::CorruptSegment {
segment_id,
detail: format!("SIDX string table length {string_table_len} exceeds usize::MAX"),
})?;
let mut string_table_buf = vec![0u8; table_len_usize];
file.read_exact(&mut string_table_buf)
.map_err(StoreError::Io)?;
let strings: Vec<String> = rmp_serde::from_slice(&string_table_buf)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
let mut entries = Vec::with_capacity(entry_count);
let mut entry_buf = [0u8; ENTRY_SIZE];
for i in 0..entry_count {
file.read_exact(&mut entry_buf).map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
StoreError::CorruptSegment {
segment_id,
detail: format!("SIDX: entry {i} truncated at EOF"),
}
} else {
StoreError::Io(e)
}
})?;
let entry = SidxEntry::decode_from(&entry_buf, segment_id)?;
if entry.entity_idx as usize >= strings.len() {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry {i}: entity_idx {} out of range (table has {} strings)",
entry.entity_idx,
strings.len()
),
});
}
if entry.scope_idx as usize >= strings.len() {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry {i}: scope_idx {} out of range (table has {} strings)",
entry.scope_idx,
strings.len()
),
});
}
entries.push(entry);
}
Ok(Some((entries, strings)))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tempfile::NamedTempFile;
fn sample_entry(n: u8) -> SidxEntry {
SidxEntry {
event_id: u128::from(n),
entity_idx: 0,
scope_idx: 0,
kind: kind_to_raw(EventKind::custom(0x1, u16::from(n))),
wall_ms: 1_000_000 + u64::from(n),
clock: u32::from(n),
dag_lane: u32::from(n % 3),
dag_depth: u32::from(n % 5),
prev_hash: [n; 32],
event_hash: [n.wrapping_add(1); 32],
frame_offset: u64::from(n) * 512,
frame_length: 128,
global_sequence: u64::from(n),
correlation_id: u128::from(n),
causation_id: 0,
}
}
#[test]
fn encode_decode_round_trip() {
let original = SidxEntry {
event_id: 0xDEAD_BEEF_CAFE_1234_5678_9ABC_DEF0_1234_u128,
entity_idx: 7,
scope_idx: 3,
kind: 0xF042,
wall_ms: 1_700_000_000_000,
clock: 99,
dag_lane: 4,
dag_depth: 2,
prev_hash: [0xAB; 32],
event_hash: [0xCD; 32],
frame_offset: 0x0000_1234_5678_9ABC,
frame_length: 4096,
global_sequence: 0xFFFF_FFFF_0000_0001,
correlation_id: 0x1111_1111_2222_2222_3333_3333_4444_4444_u128,
causation_id: 0,
};
let mut buf = [0u8; ENTRY_SIZE];
original.encode_into(&mut buf);
let decoded = SidxEntry::decode_from(&buf, 1).expect("decode must succeed");
assert_eq!(original, decoded, "round-trip must be lossless");
}
#[test]
fn reserved_kind_fallback_stats_merge_accumulates_effect_histogram() {
let mut left = ReservedKindFallbackStats::default();
left.record_effect(0xD0AA);
let mut right = ReservedKindFallbackStats::default();
right.record_effect(0xD0AA);
right.record_effect(0xD0AA);
right.record_system(0x00AA);
left.merge_from(&right);
assert_eq!(
left.effect, 3,
"PROPERTY: effect fallback totals must accumulate across merged SIDX scan shards"
);
assert_eq!(
left.effect_histogram.get(&0xD0AA),
Some(&3),
"PROPERTY: effect fallback histograms must add counts rather than subtracting or replacing them"
);
assert_eq!(
left.system, 1,
"SANITY: merge still carries independent system fallback counts"
);
assert_eq!(
left.system_histogram.get(&0x00AA),
Some(&1),
"SANITY: merge still carries independent system fallback histograms"
);
}
#[test]
fn sidx_entry_to_cold_start_row_preserves_index_and_header_fields() {
let entry = SidxEntry {
event_id: 0xDE,
entity_idx: 1,
scope_idx: 2,
kind: kind_to_raw(EventKind::custom(0x6, 0x77)),
wall_ms: 9_999,
clock: 12,
dag_lane: 4,
dag_depth: 8,
prev_hash: [0xAB; 32],
event_hash: [0xCD; 32],
frame_offset: 512,
frame_length: 144,
global_sequence: 123,
correlation_id: 0xEE,
causation_id: 0xFA,
};
let strings = vec![
String::new(),
"entity:sidx".to_owned(),
"scope:test".to_owned(),
];
let row = entry.to_cold_start_row(7);
let rebuilt = row
.to_index_entry(&strings)
.expect("SIDX row to index entry");
let header = row.to_event_header();
assert_eq!(rebuilt.event_id, entry.event_id);
assert_eq!(rebuilt.correlation_id, entry.correlation_id);
assert_eq!(rebuilt.causation_id, Some(entry.causation_id));
assert_eq!(rebuilt.coord.entity(), "entity:sidx");
assert_eq!(rebuilt.coord.scope(), "scope:test");
assert_eq!(rebuilt.kind, raw_to_kind(entry.kind));
assert_eq!(rebuilt.wall_ms, entry.wall_ms);
assert_eq!(rebuilt.clock, entry.clock);
assert_eq!(rebuilt.dag_lane, entry.dag_lane);
assert_eq!(rebuilt.dag_depth, entry.dag_depth);
assert_eq!(rebuilt.hash_chain.prev_hash, entry.prev_hash);
assert_eq!(rebuilt.hash_chain.event_hash, entry.event_hash);
assert_eq!(rebuilt.disk_pos, entry.to_disk_pos(7));
assert_eq!(rebuilt.global_sequence, entry.global_sequence);
assert_eq!(header.event_id, entry.event_id);
assert_eq!(header.correlation_id, entry.correlation_id);
assert_eq!(header.causation_id, Some(entry.causation_id));
assert_eq!(header.position.wall_ms, entry.wall_ms);
assert_eq!(header.position.sequence, entry.clock);
assert_eq!(header.position.lane, entry.dag_lane);
assert_eq!(header.position.depth, entry.dag_depth);
assert_eq!(header.event_kind, raw_to_kind(entry.kind));
}
#[test]
fn sidx_entry_normalizes_zero_causation_to_none() {
let entry = SidxEntry {
causation_id: 0,
..sample_entry(7)
};
let row = entry.to_cold_start_row(11);
assert_eq!(row.causation_id, None);
assert_eq!(
row.disk_pos,
crate::store::DiskPos::new(11, entry.frame_offset, entry.frame_length)
);
}
#[test]
fn kind_round_trip_product_kind() {
let kind = EventKind::custom(0x5, 0x042);
let raw = kind_to_raw(kind);
let recovered = raw_to_kind(raw);
assert_eq!(recovered.category(), kind.category());
assert_eq!(recovered.type_id(), kind.type_id());
}
#[test]
fn kind_round_trip_system_constants() {
for &kind in &[
EventKind::SYSTEM_INIT,
EventKind::SYSTEM_SHUTDOWN,
EventKind::SYSTEM_HEARTBEAT,
EventKind::SYSTEM_CONFIG_CHANGE,
EventKind::SYSTEM_CHECKPOINT,
EventKind::SYSTEM_BATCH_BEGIN,
EventKind::SYSTEM_BATCH_COMMIT,
EventKind::SYSTEM_OPEN_COMPLETED,
EventKind::SYSTEM_CLOSE_COMPLETED,
EventKind::TOMBSTONE,
EventKind::DATA,
] {
let recovered = raw_to_kind(kind_to_raw(kind));
assert_eq!(
kind_to_raw(recovered),
kind_to_raw(kind),
"system kind round-trip failed for raw value {:#06x}",
kind_to_raw(kind)
);
}
}
#[test]
fn kind_round_trip_effect_constants() {
for &kind in &[
EventKind::EFFECT_ERROR,
EventKind::EFFECT_RETRY,
EventKind::EFFECT_ACK,
EventKind::EFFECT_BACKPRESSURE,
EventKind::EFFECT_CANCEL,
EventKind::EFFECT_CONFLICT,
] {
let recovered = raw_to_kind(kind_to_raw(kind));
assert_eq!(
kind_to_raw(recovered),
kind_to_raw(kind),
"effect kind round-trip failed for raw value {:#06x}",
kind_to_raw(kind)
);
}
}
#[test]
fn event_kind_helper_matches_raw_to_kind() {
let entry = SidxEntry {
kind: kind_to_raw(EventKind::custom(0x3, 0x7)),
..sample_entry(0)
};
let via_helper = entry.event_kind();
let via_fn = raw_to_kind(entry.kind);
assert_eq!(kind_to_raw(via_helper), kind_to_raw(via_fn));
}
#[test]
fn raw_to_kind_counted_tracks_reserved_fallbacks() {
let mut counts = ReservedKindFallbackStats::default();
assert_eq!(raw_to_kind_counted(0x000A, &mut counts), EventKind::DATA);
assert_eq!(
raw_to_kind_counted(0xD0FF, &mut counts),
EventKind::EFFECT_ERROR
);
assert_eq!(counts.system, 1);
assert_eq!(counts.effect, 1);
assert_eq!(counts.system_histogram.get(&0x000A), Some(&1));
assert_eq!(counts.effect_histogram.get(&0xD0FF), Some(&1));
}
#[test]
fn intern_deduplicates_strings() {
let mut collector = SidxEntryCollector::new();
let i0 = collector.intern("entity:1");
let i1 = collector.intern("scope:default");
let i2 = collector.intern("entity:1");
assert_eq!(i0, i2, "same string must return the same index");
assert_ne!(i0, i1, "different strings must get different indices");
assert_eq!(
collector.strings().len(),
2,
"only 2 unique strings expected"
);
}
#[test]
fn footer_round_trip() {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(b"FBAT"); buf.extend_from_slice(&[0u8; 60]);
let mut cursor = Cursor::new(&mut buf);
cursor.seek(SeekFrom::End(0)).expect("seek to end");
let mut collector = SidxEntryCollector::new();
collector.record(sample_entry(1), "user:1", "profile");
collector.record(sample_entry(2), "user:2", "profile");
collector
.write_footer(&mut cursor, 0)
.expect("write_footer must succeed");
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(&buf).expect("write buf to temp file");
tmp.flush().expect("flush temp file");
let (entries, strings) = read_footer(tmp.path())
.expect("read_footer must not error")
.expect("SIDX footer must be found");
assert_eq!(entries.len(), 2, "expected 2 entries");
assert!(strings.contains(&"user:1".to_owned()));
assert!(strings.contains(&"user:2".to_owned()));
assert!(strings.contains(&"profile".to_owned()));
let e0_entity = &strings[entries[0].entity_idx as usize];
let e1_entity = &strings[entries[1].entity_idx as usize];
assert_eq!(e0_entity, "user:1");
assert_eq!(e1_entity, "user:2");
assert_eq!(
entries[0].scope_idx, entries[1].scope_idx,
"shared scope must use the same string table index"
);
}
#[test]
fn read_footer_returns_none_without_magic() {
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(b"FBAT\x00\x00\x00\x00some bytes that are not a sidx footer at all")
.expect("write");
tmp.flush().expect("flush");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "non-SIDX file must return None");
}
#[test]
fn read_footer_returns_none_for_old_sidx_magic() {
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(&[0u8; 12]).expect("write prefix");
tmp.write_all(b"SIDX").expect("write old magic");
tmp.flush().expect("flush");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "old SIDX magic must fall back cleanly");
}
#[test]
fn read_footer_returns_none_for_tiny_file() {
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(b"AB").expect("write");
tmp.flush().expect("flush");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "tiny file must return None");
}
#[test]
fn read_footer_returns_none_for_empty_file() {
let tmp = NamedTempFile::new().expect("create temp file");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "empty file must return None");
}
#[test]
fn read_footer_allows_empty_string_table_range_to_reach_decoder() {
let mut bytes = vec![0xA5; 32];
bytes.extend_from_slice(&32u64.to_le_bytes());
bytes.extend_from_slice(&0u32.to_le_bytes());
bytes.extend_from_slice(SIDX_MAGIC);
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(&bytes).expect("write malformed footer");
tmp.flush().expect("flush malformed footer");
let err = read_footer(tmp.path()).expect_err("empty string table bytes are malformed");
assert!(
matches!(err, StoreError::Serialization(_)),
"PROPERTY: string_table_offset == entries_start is a valid range boundary; malformed empty bytes must reach the MessagePack decoder instead of being rejected as an offset-overlap corruption"
);
}
#[test]
fn shared_string_table_is_compact() {
let mut collector = SidxEntryCollector::new();
for n in 0u8..3 {
collector.record(sample_entry(n), "order:999", "payments");
}
assert_eq!(
collector.strings().len(),
2,
"only 'order:999' and 'payments' should appear in the table"
);
let unique_pairs: std::collections::HashSet<(u32, u32)> = collector
.entries()
.iter()
.map(|e| (e.entity_idx, e.scope_idx))
.collect();
assert_eq!(
unique_pairs.len(),
1,
"all entries sharing entity+scope must have identical index pairs"
);
}
#[test]
fn decode_from_rejects_wrong_size() {
let short = vec![0u8; ENTRY_SIZE - 1];
assert!(
SidxEntry::decode_from(&short, 42).is_err(),
"decode_from must error when buffer is too short"
);
let long = vec![0u8; ENTRY_SIZE + 1];
assert!(
SidxEntry::decode_from(&long, 42).is_err(),
"decode_from must error when buffer is too long"
);
}
#[test]
fn footer_round_trip_zero_entries() {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(&[0u8; 32]);
let mut cursor = Cursor::new(&mut buf);
cursor.seek(SeekFrom::End(0)).expect("seek to end");
let collector = SidxEntryCollector::new();
collector
.write_footer(&mut cursor, 0)
.expect("write_footer must succeed");
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(&buf).expect("write");
tmp.flush().expect("flush");
let (entries, strings) = read_footer(tmp.path())
.expect("read_footer must not error")
.expect("footer must be found");
assert!(entries.is_empty(), "zero entries expected");
assert!(
strings.is_empty(),
"zero strings expected for empty collector"
);
}
}