mod footer;
#[cfg(test)]
use crate::event::EventKind;
use crate::event::HashChain;
#[cfg(test)]
pub(crate) use crate::store::cold_start::raw_to_kind;
pub(crate) use crate::store::cold_start::{
kind_to_raw, raw_to_kind_counted, ReservedKindFallbackStats,
};
use crate::store::cold_start::{ColdStartIndexRow, ColdStartSource};
use crate::store::index::interner::InternId;
use crate::store::StoreError;
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
pub(crate) const SIDX_MAGIC: &[u8; 4] = b"SDX2";
pub(crate) const ENTRY_SIZE: usize = 162;
const _ASSERT_ENTRY_SIZE: () = {
assert!(
ENTRY_SIZE == 162,
"ENTRY_SIZE must equal 162 — update when SidxEntry layout changes"
);
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct SidxEntry {
pub event_id: u128,
pub entity_idx: u32,
pub scope_idx: u32,
pub kind: u16,
pub wall_ms: u64,
pub clock: u32,
pub dag_lane: u32,
pub dag_depth: u32,
pub prev_hash: [u8; 32],
pub event_hash: [u8; 32],
pub frame_offset: u64,
pub frame_length: u32,
pub global_sequence: u64,
pub correlation_id: u128,
pub causation_id: u128,
}
impl SidxEntry {
pub(crate) fn to_disk_pos(&self, segment_id: u64) -> crate::store::index::DiskPos {
crate::store::index::DiskPos::new(segment_id, self.frame_offset, self.frame_length)
}
pub(crate) fn to_cold_start_row(&self, segment_id: u64) -> ColdStartIndexRow {
self.to_cold_start_row_counted(segment_id, &mut ReservedKindFallbackStats::default())
}
pub(crate) fn to_cold_start_row_counted(
&self,
segment_id: u64,
counts: &mut ReservedKindFallbackStats,
) -> ColdStartIndexRow {
ColdStartIndexRow {
source: ColdStartSource::Sidx,
event_id: self.event_id,
correlation_id: self.correlation_id,
causation_id: (self.causation_id != 0).then_some(self.causation_id),
entity_id: InternId(self.entity_idx),
scope_id: InternId(self.scope_idx),
kind: raw_to_kind_counted(self.kind, counts),
wall_ms: self.wall_ms,
clock: self.clock,
dag_lane: self.dag_lane,
dag_depth: self.dag_depth,
hash_chain: HashChain {
prev_hash: self.prev_hash,
event_hash: self.event_hash,
},
disk_pos: self.to_disk_pos(segment_id),
global_sequence: self.global_sequence,
}
}
pub(crate) fn encode_into(&self, buf: &mut [u8]) {
debug_assert_eq!(
buf.len(),
ENTRY_SIZE,
"encode_into: buf must be ENTRY_SIZE bytes"
);
let mut pos = 0usize;
macro_rules! put_le {
($val:expr, $n:expr) => {{
buf[pos..pos + $n].copy_from_slice(&($val).to_le_bytes());
pos += $n;
}};
}
macro_rules! put_bytes {
($arr:expr) => {{
let slice: &[u8] = &$arr;
buf[pos..pos + slice.len()].copy_from_slice(slice);
pos += slice.len();
}};
}
put_le!(self.event_id, 16);
put_le!(self.entity_idx, 4);
put_le!(self.scope_idx, 4);
put_le!(self.kind, 2);
put_le!(self.wall_ms, 8);
put_le!(self.clock, 4);
put_le!(self.dag_lane, 4);
put_le!(self.dag_depth, 4);
put_bytes!(self.prev_hash);
put_bytes!(self.event_hash);
put_le!(self.frame_offset, 8);
put_le!(self.frame_length, 4);
put_le!(self.global_sequence, 8);
put_le!(self.correlation_id, 16);
put_le!(self.causation_id, 16);
debug_assert_eq!(pos, ENTRY_SIZE, "encode_into: wrote wrong byte count");
}
pub(crate) fn decode_from(buf: &[u8], segment_id: u64) -> Result<Self, StoreError> {
if buf.len() != ENTRY_SIZE {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry buffer is {} bytes, expected {ENTRY_SIZE}",
buf.len()
),
});
}
let mut pos = 0usize;
macro_rules! get_le {
($t:ty, $n:expr) => {{
let arr: [u8; $n] = buf[pos..pos + $n]
.try_into()
.expect("slice length matches const");
pos += $n;
<$t>::from_le_bytes(arr)
}};
}
macro_rules! get_hash {
() => {{
let mut h = [0u8; 32];
h.copy_from_slice(&buf[pos..pos + 32]);
pos += 32;
h
}};
}
let event_id = get_le!(u128, 16);
let entity_idx = get_le!(u32, 4);
let scope_idx = get_le!(u32, 4);
let kind = get_le!(u16, 2);
let wall_ms = get_le!(u64, 8);
let clock = get_le!(u32, 4);
let dag_lane = get_le!(u32, 4);
let dag_depth = get_le!(u32, 4);
let prev_hash = get_hash!();
let event_hash = get_hash!();
let frame_offset = get_le!(u64, 8);
let frame_length = get_le!(u32, 4);
let global_sequence = get_le!(u64, 8);
let correlation_id = get_le!(u128, 16);
let causation_id = get_le!(u128, 16);
debug_assert_eq!(pos, ENTRY_SIZE, "decode_from: consumed wrong byte count");
Ok(Self {
event_id,
entity_idx,
scope_idx,
kind,
wall_ms,
clock,
dag_lane,
dag_depth,
prev_hash,
event_hash,
frame_offset,
frame_length,
global_sequence,
correlation_id,
causation_id,
})
}
#[cfg(test)]
pub(crate) fn event_kind(&self) -> EventKind {
raw_to_kind(self.kind)
}
}
pub(crate) struct SidxEntryCollector {
entries: Vec<SidxEntry>,
strings: Vec<String>,
string_map: HashMap<String, u32>,
}
impl SidxEntryCollector {
pub(crate) fn new() -> Self {
Self {
entries: Vec::new(),
strings: Vec::new(),
string_map: HashMap::new(),
}
}
pub(crate) fn record(&mut self, mut entry: SidxEntry, entity: &str, scope: &str) {
entry.entity_idx = self.intern(entity);
entry.scope_idx = self.intern(scope);
self.entries.push(entry);
}
#[cfg(test)]
pub(crate) fn entries(&self) -> &[SidxEntry] {
&self.entries
}
#[cfg(test)]
pub(crate) fn strings(&self) -> &[String] {
&self.strings
}
#[allow(clippy::expect_used)]
pub(crate) fn write_footer<W: Write + Seek>(
&self,
writer: &mut W,
segment_id: u64,
) -> Result<(), StoreError> {
let string_table_bytes = crate::encoding::to_bytes(&self.strings)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
let string_table_offset = writer.stream_position().map_err(StoreError::Io)?;
let entry_count =
u32::try_from(self.entries.len()).map_err(|_| StoreError::SegmentTooManyEntries {
segment_id,
count: self.entries.len() as u64,
})?;
let mut footer = Vec::with_capacity(
string_table_bytes.len()
+ self.entries.len() * ENTRY_SIZE
+ footer::trailer_size_usize(),
);
footer.extend_from_slice(&string_table_bytes);
let mut buf = [0u8; ENTRY_SIZE];
for entry in &self.entries {
entry.encode_into(&mut buf);
footer.extend_from_slice(&buf);
}
footer.extend_from_slice(&string_table_offset.to_le_bytes());
footer.extend_from_slice(&entry_count.to_le_bytes());
footer.extend_from_slice(SIDX_MAGIC);
writer.write_all(&footer).map_err(StoreError::Io)?;
Ok(())
}
#[allow(clippy::expect_used)]
fn intern(&mut self, s: &str) -> u32 {
if let Some(&idx) = self.string_map.get(s) {
return idx;
}
let idx = u32::try_from(self.strings.len())
.expect("invariant: SIDX string table is bounded by segment size, well under u32::MAX");
self.strings.push(s.to_owned());
self.string_map.insert(s.to_owned(), idx);
idx
}
}
pub(crate) type SidxFooterData = (Vec<SidxEntry>, Vec<String>);
pub(crate) fn read_footer(path: &Path) -> Result<Option<SidxFooterData>, StoreError> {
let segment_id = match crate::store::segment::SegmentId::from_filename(path) {
Ok(parsed) => parsed.as_u64(),
Err(error) => {
tracing::warn!(
path = %path.display(),
%error,
"skipping malformed segment filename"
);
0
}
};
let mut file = crate::store::platform::fs::open_file(path).map_err(StoreError::Io)?;
let Some(layout) = footer::read_layout(&mut file, segment_id)? else {
return Ok(None);
};
file.seek(SeekFrom::Start(layout.string_table_offset))
.map_err(StoreError::Io)?;
let table_len_usize =
usize::try_from(layout.string_table_len).map_err(|_| StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX string table length {} exceeds usize::MAX",
layout.string_table_len
),
})?;
let mut string_table_buf = vec![0u8; table_len_usize];
file.read_exact(&mut string_table_buf)
.map_err(StoreError::Io)?;
let strings: Vec<String> = crate::encoding::from_bytes(&string_table_buf)
.map_err(|e| StoreError::Serialization(Box::new(e)))?;
let mut entries = Vec::with_capacity(layout.entry_count);
let mut entry_buf = [0u8; ENTRY_SIZE];
for i in 0..layout.entry_count {
file.read_exact(&mut entry_buf).map_err(|e| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
StoreError::CorruptSegment {
segment_id,
detail: format!("SIDX: entry {i} truncated at EOF"),
}
} else {
StoreError::Io(e)
}
})?;
let entry = SidxEntry::decode_from(&entry_buf, segment_id)?;
if entry.entity_idx as usize >= strings.len() {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry {i}: entity_idx {} out of range (table has {} strings)",
entry.entity_idx,
strings.len()
),
});
}
if entry.scope_idx as usize >= strings.len() {
return Err(StoreError::CorruptSegment {
segment_id,
detail: format!(
"SIDX entry {i}: scope_idx {} out of range (table has {} strings)",
entry.scope_idx,
strings.len()
),
});
}
entries.push(entry);
}
Ok(Some((entries, strings)))
}
#[cfg(test)]
mod tests;