use crate::event::EventKind;
use crate::store::StoreError;
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
pub(crate) const SIDX_MAGIC: &[u8; 4] = b"SIDX";
const TRAILER_SIZE: u64 = 16;
pub(crate) const ENTRY_SIZE: usize = 154;
const _ASSERT_ENTRY_SIZE: () = {
assert!(
ENTRY_SIZE == 154,
"ENTRY_SIZE must equal 154 — update when SidxEntry layout changes"
);
};
#[inline]
pub(crate) fn kind_to_raw(kind: EventKind) -> u16 {
(u16::from(kind.category()) << 12) | kind.type_id()
}
pub(crate) fn raw_to_kind(raw: u16) -> EventKind {
let category = (raw >> 12) as u8;
match category {
0x0 => match raw {
0x0001 => EventKind::SYSTEM_INIT,
0x0002 => EventKind::SYSTEM_SHUTDOWN,
0x0003 => EventKind::SYSTEM_HEARTBEAT,
0x0004 => EventKind::SYSTEM_CONFIG_CHANGE,
0x0005 => EventKind::SYSTEM_CHECKPOINT,
0x0FFE => EventKind::TOMBSTONE,
_ => EventKind::DATA,
},
0xD => match raw {
0xD001 => EventKind::EFFECT_ERROR,
0xD002 => EventKind::EFFECT_RETRY,
0xD004 => EventKind::EFFECT_ACK,
0xD005 => EventKind::EFFECT_BACKPRESSURE,
0xD006 => EventKind::EFFECT_CANCEL,
0xD007 => EventKind::EFFECT_CONFLICT,
_ => EventKind::EFFECT_ERROR,
},
other => EventKind::custom(other, raw & 0x0FFF),
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct SidxEntry {
pub event_id: u128,
pub entity_idx: u32,
pub scope_idx: u32,
pub kind: u16,
pub wall_ms: u64,
pub clock: u32,
pub prev_hash: [u8; 32],
pub event_hash: [u8; 32],
pub frame_offset: u64,
pub frame_length: u32,
pub global_sequence: u64,
pub correlation_id: u128,
pub causation_id: u128,
}
impl SidxEntry {
pub(crate) fn encode_into(&self, buf: &mut [u8]) {
debug_assert_eq!(
buf.len(),
ENTRY_SIZE,
"encode_into: buf must be ENTRY_SIZE bytes"
);
let mut pos = 0usize;
macro_rules! put_le {
($val:expr, $n:expr) => {{
buf[pos..pos + $n].copy_from_slice(&($val).to_le_bytes());
pos += $n;
}};
}
macro_rules! put_bytes {
($arr:expr) => {{
let slice: &[u8] = &$arr;
buf[pos..pos + slice.len()].copy_from_slice(slice);
pos += slice.len();
}};
}
put_le!(self.event_id, 16);
put_le!(self.entity_idx, 4);
put_le!(self.scope_idx, 4);
put_le!(self.kind, 2);
put_le!(self.wall_ms, 8);
put_le!(self.clock, 4);
put_bytes!(self.prev_hash);
put_bytes!(self.event_hash);
put_le!(self.frame_offset, 8);
put_le!(self.frame_length, 4);
put_le!(self.global_sequence, 8);
put_le!(self.correlation_id, 16);
put_le!(self.causation_id, 16);
debug_assert_eq!(pos, ENTRY_SIZE, "encode_into: wrote wrong byte count");
}
pub(crate) fn decode_from(buf: &[u8], segment_id: u64) -> Result<Self, StoreError> {
if buf.len() != ENTRY_SIZE {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry buffer is {} bytes, expected {ENTRY_SIZE}",
buf.len()
),
});
}
let mut pos = 0usize;
macro_rules! get_le {
($t:ty, $n:expr) => {{
let arr: [u8; $n] = buf[pos..pos + $n]
.try_into()
.expect("slice length matches const");
pos += $n;
<$t>::from_le_bytes(arr)
}};
}
macro_rules! get_hash {
() => {{
let mut h = [0u8; 32];
h.copy_from_slice(&buf[pos..pos + 32]);
pos += 32;
h
}};
}
let event_id = get_le!(u128, 16);
let entity_idx = get_le!(u32, 4);
let scope_idx = get_le!(u32, 4);
let kind = get_le!(u16, 2);
let wall_ms = get_le!(u64, 8);
let clock = get_le!(u32, 4);
let prev_hash = get_hash!();
let event_hash = get_hash!();
let frame_offset = get_le!(u64, 8);
let frame_length = get_le!(u32, 4);
let global_sequence = get_le!(u64, 8);
let correlation_id = get_le!(u128, 16);
let causation_id = get_le!(u128, 16);
debug_assert_eq!(pos, ENTRY_SIZE, "decode_from: consumed wrong byte count");
Ok(Self {
event_id,
entity_idx,
scope_idx,
kind,
wall_ms,
clock,
prev_hash,
event_hash,
frame_offset,
frame_length,
global_sequence,
correlation_id,
causation_id,
})
}
#[cfg(test)]
pub(crate) fn event_kind(&self) -> EventKind {
raw_to_kind(self.kind)
}
}
pub(crate) struct SidxEntryCollector {
entries: Vec<SidxEntry>,
strings: Vec<String>,
string_map: HashMap<String, u32>,
}
impl SidxEntryCollector {
pub(crate) fn new() -> Self {
Self {
entries: Vec::new(),
strings: Vec::new(),
string_map: HashMap::new(),
}
}
pub(crate) fn record(&mut self, mut entry: SidxEntry, entity: &str, scope: &str) {
entry.entity_idx = self.intern(entity);
entry.scope_idx = self.intern(scope);
self.entries.push(entry);
}
#[cfg(test)]
pub(crate) fn entries(&self) -> &[SidxEntry] {
&self.entries
}
#[cfg(test)]
pub(crate) fn strings(&self) -> &[String] {
&self.strings
}
pub(crate) fn write_footer<W: Write + Seek>(&self, writer: &mut W) -> Result<(), StoreError> {
let string_table_bytes = rmp_serde::to_vec_named(&self.strings)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
let string_table_offset = writer.stream_position().map_err(StoreError::Io)?;
writer
.write_all(&string_table_bytes)
.map_err(StoreError::Io)?;
let mut buf = [0u8; ENTRY_SIZE];
for entry in &self.entries {
entry.encode_into(&mut buf);
writer.write_all(&buf).map_err(StoreError::Io)?;
}
writer
.write_all(&string_table_offset.to_le_bytes())
.map_err(StoreError::Io)?;
let entry_count = u32::try_from(self.entries.len()).unwrap_or(u32::MAX);
writer
.write_all(&entry_count.to_le_bytes())
.map_err(StoreError::Io)?;
writer.write_all(SIDX_MAGIC).map_err(StoreError::Io)?;
Ok(())
}
fn intern(&mut self, s: &str) -> u32 {
if let Some(&idx) = self.string_map.get(s) {
return idx;
}
#[allow(clippy::cast_possible_truncation)]
let idx = self.strings.len() as u32;
self.strings.push(s.to_owned());
self.string_map.insert(s.to_owned(), idx);
idx
}
}
pub(crate) type SidxFooterData = (Vec<SidxEntry>, Vec<String>);
pub(crate) fn read_footer(path: &Path) -> Result<Option<SidxFooterData>, StoreError> {
let segment_id = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let mut file = std::fs::File::open(path).map_err(StoreError::Io)?;
let file_len = file.seek(SeekFrom::End(0)).map_err(StoreError::Io)?;
if file_len < TRAILER_SIZE {
return Ok(None);
}
file.seek(SeekFrom::End(-(TRAILER_SIZE as i64)))
.map_err(StoreError::Io)?;
let mut trailer = [0u8; 16];
file.read_exact(&mut trailer).map_err(StoreError::Io)?;
if &trailer[12..16] != SIDX_MAGIC {
return Ok(None);
}
let string_table_offset =
u64::from_le_bytes(trailer[0..8].try_into().expect("slice is 8 bytes"));
let entry_count =
u32::from_le_bytes(trailer[8..12].try_into().expect("slice is 4 bytes")) as usize;
let entries_block_len = (entry_count as u64)
.checked_mul(ENTRY_SIZE as u64)
.ok_or_else(|| StoreError::CorruptSegment {
segment_id,
detail: "SIDX entry_count × ENTRY_SIZE overflows u64".into(),
})?;
let entries_start = file_len
.checked_sub(TRAILER_SIZE)
.and_then(|n| n.checked_sub(entries_block_len))
.ok_or_else(|| StoreError::CorruptSegment {
segment_id,
detail: "SIDX entry block extends before the beginning of the file".into(),
})?;
if string_table_offset > entries_start {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX string_table_offset {string_table_offset} is past entries_start {entries_start}"
),
});
}
let string_table_len = entries_start
.checked_sub(string_table_offset)
.ok_or_else(|| StoreError::CorruptSegment {
segment_id,
detail: "SIDX string table length underflows".into(),
})?;
file.seek(SeekFrom::Start(string_table_offset))
.map_err(StoreError::Io)?;
let table_len_usize =
usize::try_from(string_table_len).map_err(|_| StoreError::CorruptSegment {
segment_id,
detail: format!("SIDX string table length {string_table_len} exceeds usize::MAX"),
})?;
let mut string_table_buf = vec![0u8; table_len_usize];
file.read_exact(&mut string_table_buf)
.map_err(StoreError::Io)?;
let strings: Vec<String> = rmp_serde::from_slice(&string_table_buf)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
let mut entries = Vec::with_capacity(entry_count);
let mut entry_buf = [0u8; ENTRY_SIZE];
for i in 0..entry_count {
file.read_exact(&mut entry_buf).map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
StoreError::CorruptSegment {
segment_id,
detail: format!("SIDX: entry {i} truncated at EOF"),
}
} else {
StoreError::Io(e)
}
})?;
let entry = SidxEntry::decode_from(&entry_buf, segment_id)?;
if entry.entity_idx as usize >= strings.len() {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry {i}: entity_idx {} out of range (table has {} strings)",
entry.entity_idx,
strings.len()
),
});
}
if entry.scope_idx as usize >= strings.len() {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry {i}: scope_idx {} out of range (table has {} strings)",
entry.scope_idx,
strings.len()
),
});
}
entries.push(entry);
}
Ok(Some((entries, strings)))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tempfile::NamedTempFile;
fn sample_entry(n: u8) -> SidxEntry {
SidxEntry {
event_id: u128::from(n),
entity_idx: 0,
scope_idx: 0,
kind: kind_to_raw(EventKind::custom(0x1, u16::from(n))),
wall_ms: 1_000_000 + u64::from(n),
clock: u32::from(n),
prev_hash: [n; 32],
event_hash: [n.wrapping_add(1); 32],
frame_offset: u64::from(n) * 512,
frame_length: 128,
global_sequence: u64::from(n),
correlation_id: u128::from(n),
causation_id: 0,
}
}
#[test]
fn encode_decode_round_trip() {
let original = SidxEntry {
event_id: 0xDEAD_BEEF_CAFE_1234_5678_9ABC_DEF0_1234_u128,
entity_idx: 7,
scope_idx: 3,
kind: 0xF042,
wall_ms: 1_700_000_000_000,
clock: 99,
prev_hash: [0xAB; 32],
event_hash: [0xCD; 32],
frame_offset: 0x0000_1234_5678_9ABC,
frame_length: 4096,
global_sequence: 0xFFFF_FFFF_0000_0001,
correlation_id: 0x1111_1111_2222_2222_3333_3333_4444_4444_u128,
causation_id: 0,
};
let mut buf = [0u8; ENTRY_SIZE];
original.encode_into(&mut buf);
let decoded = SidxEntry::decode_from(&buf, 1).expect("decode must succeed");
assert_eq!(original, decoded, "round-trip must be lossless");
}
#[test]
fn kind_round_trip_product_kind() {
let kind = EventKind::custom(0x5, 0x042);
let raw = kind_to_raw(kind);
let recovered = raw_to_kind(raw);
assert_eq!(recovered.category(), kind.category());
assert_eq!(recovered.type_id(), kind.type_id());
}
#[test]
fn kind_round_trip_system_constants() {
for &kind in &[
EventKind::SYSTEM_INIT,
EventKind::SYSTEM_SHUTDOWN,
EventKind::SYSTEM_HEARTBEAT,
EventKind::SYSTEM_CONFIG_CHANGE,
EventKind::SYSTEM_CHECKPOINT,
EventKind::TOMBSTONE,
EventKind::DATA,
] {
let recovered = raw_to_kind(kind_to_raw(kind));
assert_eq!(
kind_to_raw(recovered),
kind_to_raw(kind),
"system kind round-trip failed for raw value {:#06x}",
kind_to_raw(kind)
);
}
}
#[test]
fn kind_round_trip_effect_constants() {
for &kind in &[
EventKind::EFFECT_ERROR,
EventKind::EFFECT_RETRY,
EventKind::EFFECT_ACK,
EventKind::EFFECT_BACKPRESSURE,
EventKind::EFFECT_CANCEL,
EventKind::EFFECT_CONFLICT,
] {
let recovered = raw_to_kind(kind_to_raw(kind));
assert_eq!(
kind_to_raw(recovered),
kind_to_raw(kind),
"effect kind round-trip failed for raw value {:#06x}",
kind_to_raw(kind)
);
}
}
#[test]
fn event_kind_helper_matches_raw_to_kind() {
let entry = SidxEntry {
kind: kind_to_raw(EventKind::custom(0x3, 0x7)),
..sample_entry(0)
};
let via_helper = entry.event_kind();
let via_fn = raw_to_kind(entry.kind);
assert_eq!(kind_to_raw(via_helper), kind_to_raw(via_fn));
}
#[test]
fn intern_deduplicates_strings() {
let mut collector = SidxEntryCollector::new();
let i0 = collector.intern("entity:1");
let i1 = collector.intern("scope:default");
let i2 = collector.intern("entity:1");
assert_eq!(i0, i2, "same string must return the same index");
assert_ne!(i0, i1, "different strings must get different indices");
assert_eq!(
collector.strings().len(),
2,
"only 2 unique strings expected"
);
}
#[test]
fn footer_round_trip() {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(b"FBAT"); buf.extend_from_slice(&[0u8; 60]);
let mut cursor = Cursor::new(&mut buf);
cursor.seek(SeekFrom::End(0)).expect("seek to end");
let mut collector = SidxEntryCollector::new();
collector.record(sample_entry(1), "user:1", "profile");
collector.record(sample_entry(2), "user:2", "profile");
collector
.write_footer(&mut cursor)
.expect("write_footer must succeed");
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(&buf).expect("write buf to temp file");
tmp.flush().expect("flush temp file");
let (entries, strings) = read_footer(tmp.path())
.expect("read_footer must not error")
.expect("SIDX footer must be found");
assert_eq!(entries.len(), 2, "expected 2 entries");
assert!(strings.contains(&"user:1".to_owned()));
assert!(strings.contains(&"user:2".to_owned()));
assert!(strings.contains(&"profile".to_owned()));
let e0_entity = &strings[entries[0].entity_idx as usize];
let e1_entity = &strings[entries[1].entity_idx as usize];
assert_eq!(e0_entity, "user:1");
assert_eq!(e1_entity, "user:2");
assert_eq!(
entries[0].scope_idx, entries[1].scope_idx,
"shared scope must use the same string table index"
);
}
#[test]
fn read_footer_returns_none_without_magic() {
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(b"FBAT\x00\x00\x00\x00some bytes that are not a sidx footer at all")
.expect("write");
tmp.flush().expect("flush");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "non-SIDX file must return None");
}
#[test]
fn read_footer_returns_none_for_tiny_file() {
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(b"AB").expect("write");
tmp.flush().expect("flush");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "tiny file must return None");
}
#[test]
fn read_footer_returns_none_for_empty_file() {
let tmp = NamedTempFile::new().expect("create temp file");
let result = read_footer(tmp.path()).expect("must not IO-error");
assert!(result.is_none(), "empty file must return None");
}
#[test]
fn shared_string_table_is_compact() {
let mut collector = SidxEntryCollector::new();
for n in 0u8..3 {
collector.record(sample_entry(n), "order:999", "payments");
}
assert_eq!(
collector.strings().len(),
2,
"only 'order:999' and 'payments' should appear in the table"
);
let unique_pairs: std::collections::HashSet<(u32, u32)> = collector
.entries()
.iter()
.map(|e| (e.entity_idx, e.scope_idx))
.collect();
assert_eq!(
unique_pairs.len(),
1,
"all entries sharing entity+scope must have identical index pairs"
);
}
#[test]
fn decode_from_rejects_wrong_size() {
let short = vec![0u8; ENTRY_SIZE - 1];
assert!(
SidxEntry::decode_from(&short, 42).is_err(),
"decode_from must error when buffer is too short"
);
let long = vec![0u8; ENTRY_SIZE + 1];
assert!(
SidxEntry::decode_from(&long, 42).is_err(),
"decode_from must error when buffer is too long"
);
}
#[test]
fn footer_round_trip_zero_entries() {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(&[0u8; 32]);
let mut cursor = Cursor::new(&mut buf);
cursor.seek(SeekFrom::End(0)).expect("seek to end");
let collector = SidxEntryCollector::new();
collector
.write_footer(&mut cursor)
.expect("write_footer must succeed");
let mut tmp = NamedTempFile::new().expect("create temp file");
tmp.write_all(&buf).expect("write");
tmp.flush().expect("flush");
let (entries, strings) = read_footer(tmp.path())
.expect("read_footer must not error")
.expect("footer must be found");
assert!(entries.is_empty(), "zero entries expected");
assert!(
strings.is_empty(),
"zero strings expected for empty collector"
);
}
}