use bytes::{Buf, BufMut, Bytes, BytesMut};
use thiserror::Error;
pub const INDEX_MAGIC: &[u8; 4] = b"S4IX";
pub const INDEX_VERSION: u32 = 2;
pub const INDEX_VERSION_V1: u32 = 1;
pub const INDEX_HEADER_BYTES: usize = 4 + 4 + 8 + 8 + 4 + 4 + 8; const HEADER_FIXED_V1: usize = 4 + 4 + 8 + 8 + 8;
const HEADER_FIXED_V2: usize = HEADER_FIXED_V1 + 8 + 4;
pub const ENTRY_BYTES: usize = 8 + 8 + 8 + 8;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FrameIndexEntry {
pub original_offset: u64,
pub original_size: u64,
pub compressed_offset: u64,
pub compressed_size: u64,
}
impl FrameIndexEntry {
pub fn original_end(&self) -> u64 {
self.original_offset + self.original_size
}
pub fn compressed_end(&self) -> u64 {
self.compressed_offset + self.compressed_size
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FrameIndex {
pub total_padded_size: u64,
pub entries: Vec<FrameIndexEntry>,
pub source_etag: Option<String>,
pub source_compressed_size: Option<u64>,
}
impl FrameIndex {
pub fn total_original_size(&self) -> u64 {
self.entries.last().map(|e| e.original_end()).unwrap_or(0)
}
pub fn lookup_range(&self, start: u64, end_exclusive: u64) -> Option<RangePlan> {
if self.entries.is_empty() || start >= end_exclusive {
return None;
}
let total = self.total_original_size();
if start >= total {
return None;
}
let clamped_end = end_exclusive.min(total);
let first_idx = match self.entries.binary_search_by(|e| {
if e.original_end() <= start {
std::cmp::Ordering::Less
} else if e.original_offset > start {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Equal
}
}) {
Ok(i) => i,
Err(_) => return None,
};
let last_inclusive = clamped_end - 1;
let last_idx = match self.entries.binary_search_by(|e| {
if e.original_end() <= last_inclusive {
std::cmp::Ordering::Less
} else if e.original_offset > last_inclusive {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Equal
}
}) {
Ok(i) => i,
Err(_) => return None,
};
let byte_start = self.entries[first_idx].compressed_offset;
let byte_end_exclusive = self.entries[last_idx].compressed_end();
Some(RangePlan {
first_frame_idx: first_idx,
last_frame_idx_inclusive: last_idx,
byte_start,
byte_end_exclusive,
slice_start_in_combined: start - self.entries[first_idx].original_offset,
slice_end_in_combined: clamped_end - self.entries[first_idx].original_offset,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangePlan {
pub first_frame_idx: usize,
pub last_frame_idx_inclusive: usize,
pub byte_start: u64,
pub byte_end_exclusive: u64,
pub slice_start_in_combined: u64,
pub slice_end_in_combined: u64,
}
#[derive(Debug, Error)]
pub enum IndexError {
#[error("index too short: {0} bytes")]
TooShort(usize),
#[error("bad index magic: {got:?}")]
BadMagic { got: [u8; 4] },
#[error("unsupported index version {0} (this build supports {INDEX_VERSION})")]
UnsupportedVersion(u32),
#[error("entry count {claimed} doesn't match buffer remaining {remaining}")]
EntryCountMismatch { claimed: u64, remaining: usize },
}
pub fn encode_index(idx: &FrameIndex) -> Bytes {
let etag_bytes = idx.source_etag.as_deref().unwrap_or("").as_bytes();
let mut buf = BytesMut::with_capacity(
HEADER_FIXED_V2 + etag_bytes.len() + idx.entries.len() * ENTRY_BYTES,
);
buf.put_slice(INDEX_MAGIC);
buf.put_u32_le(INDEX_VERSION);
buf.put_u64_le(idx.entries.len() as u64);
buf.put_u64_le(idx.total_original_size());
buf.put_u64_le(idx.total_padded_size);
buf.put_u64_le(idx.source_compressed_size.unwrap_or(0));
buf.put_u32_le(etag_bytes.len() as u32);
buf.put_slice(etag_bytes);
for e in &idx.entries {
buf.put_u64_le(e.original_offset);
buf.put_u64_le(e.original_size);
buf.put_u64_le(e.compressed_offset);
buf.put_u64_le(e.compressed_size);
}
buf.freeze()
}
#[doc(hidden)]
pub fn encode_index_v1_for_test(idx: &FrameIndex) -> Bytes {
let mut buf = BytesMut::with_capacity(HEADER_FIXED_V1 + idx.entries.len() * ENTRY_BYTES);
buf.put_slice(INDEX_MAGIC);
buf.put_u32_le(INDEX_VERSION_V1);
buf.put_u64_le(idx.entries.len() as u64);
buf.put_u64_le(idx.total_original_size());
buf.put_u64_le(idx.total_padded_size);
for e in &idx.entries {
buf.put_u64_le(e.original_offset);
buf.put_u64_le(e.original_size);
buf.put_u64_le(e.compressed_offset);
buf.put_u64_le(e.compressed_size);
}
buf.freeze()
}
pub fn decode_index(mut input: Bytes) -> Result<FrameIndex, IndexError> {
if input.len() < HEADER_FIXED_V1 {
return Err(IndexError::TooShort(input.len()));
}
let mut magic = [0u8; 4];
magic.copy_from_slice(&input[..4]);
if &magic != INDEX_MAGIC {
return Err(IndexError::BadMagic { got: magic });
}
input.advance(4);
let version = input.get_u32_le();
let n = input.get_u64_le();
let _total_original = input.get_u64_le();
let total_padded_size = input.get_u64_le();
let (source_compressed_size, source_etag) = match version {
v if v == INDEX_VERSION_V1 => (None, None),
v if v == INDEX_VERSION => {
if input.len() < 8 + 4 {
return Err(IndexError::TooShort(input.len()));
}
let scs = input.get_u64_le();
let etag_len = input.get_u32_le() as usize;
if input.len() < etag_len {
return Err(IndexError::TooShort(input.len()));
}
let etag_bytes = input.split_to(etag_len);
let etag = if etag_len == 0 {
None
} else {
std::str::from_utf8(&etag_bytes).ok().map(str::to_owned)
};
(if scs == 0 { None } else { Some(scs) }, etag)
}
other => return Err(IndexError::UnsupportedVersion(other)),
};
let expected_remaining = (n as usize).saturating_mul(ENTRY_BYTES);
if input.len() != expected_remaining {
return Err(IndexError::EntryCountMismatch {
claimed: n,
remaining: input.len(),
});
}
let mut entries = Vec::with_capacity(n as usize);
for _ in 0..n {
let original_offset = input.get_u64_le();
let original_size = input.get_u64_le();
let compressed_offset = input.get_u64_le();
let compressed_size = input.get_u64_le();
entries.push(FrameIndexEntry {
original_offset,
original_size,
compressed_offset,
compressed_size,
});
}
Ok(FrameIndex {
total_padded_size,
entries,
source_etag,
source_compressed_size,
})
}
pub fn build_index_from_body(body: &Bytes) -> Result<FrameIndex, crate::multipart::FrameError> {
let mut entries = Vec::new();
let mut original_off: u64 = 0;
let mut cursor = 0usize;
let mut iter_buf = body.clone();
while cursor < body.len() {
if cursor + 4 <= body.len() && &body[cursor..cursor + 4] == crate::multipart::PADDING_MAGIC
{
if cursor + crate::multipart::PADDING_HEADER_BYTES > body.len() {
break;
}
let pad_len = u64::from_le_bytes(body[cursor + 4..cursor + 12].try_into().unwrap());
cursor += crate::multipart::PADDING_HEADER_BYTES + pad_len as usize;
iter_buf = body.slice(cursor..);
continue;
}
if cursor + crate::multipart::FRAME_HEADER_BYTES > body.len() {
break;
}
let (header, _payload, rest) = crate::multipart::read_frame(iter_buf.clone())?;
let frame_total = crate::multipart::FRAME_HEADER_BYTES + header.compressed_size as usize;
entries.push(FrameIndexEntry {
original_offset: original_off,
original_size: header.original_size,
compressed_offset: cursor as u64,
compressed_size: frame_total as u64,
});
original_off += header.original_size;
cursor += frame_total;
iter_buf = rest;
}
Ok(FrameIndex {
total_padded_size: body.len() as u64,
entries,
source_etag: None,
source_compressed_size: None,
})
}
pub fn sidecar_key(object_key: &str) -> String {
format!("{object_key}.s4index")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CodecKind;
use crate::multipart::{FrameHeader, pad_to_minimum, write_frame};
fn sample_index() -> FrameIndex {
FrameIndex {
total_padded_size: 200,
entries: vec![
FrameIndexEntry {
original_offset: 0,
original_size: 100,
compressed_offset: 0,
compressed_size: 50,
},
FrameIndexEntry {
original_offset: 100,
original_size: 80,
compressed_offset: 60, compressed_size: 40,
},
FrameIndexEntry {
original_offset: 180,
original_size: 50,
compressed_offset: 100,
compressed_size: 30,
},
],
source_etag: None,
source_compressed_size: None,
}
}
#[test]
fn encode_decode_roundtrip() {
let idx = sample_index();
let bytes = encode_index(&idx);
let decoded = decode_index(bytes).unwrap();
assert_eq!(decoded, idx);
}
#[test]
fn encode_decode_roundtrip_v2_with_source_binding() {
let mut idx = sample_index();
idx.source_etag = Some("\"deadbeefcafe\"".into());
idx.source_compressed_size = Some(987_654);
let bytes = encode_index(&idx);
assert_eq!(&bytes[..4], INDEX_MAGIC);
let version = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
assert_eq!(version, INDEX_VERSION, "writer must always emit v2");
let decoded = decode_index(bytes).unwrap();
assert_eq!(decoded, idx);
}
#[test]
fn sidecar_header_back_compat_old_format_no_source_etag() {
let v2_idx = {
let mut idx = sample_index();
idx.source_etag = Some("\"unused\"".into());
idx.source_compressed_size = Some(42);
idx
};
let v1_bytes = encode_index_v1_for_test(&v2_idx);
let version = u32::from_le_bytes(v1_bytes[4..8].try_into().unwrap());
assert_eq!(version, INDEX_VERSION_V1);
let decoded = decode_index(v1_bytes).expect("v1 sidecar must still decode");
assert_eq!(decoded.entries, v2_idx.entries);
assert_eq!(decoded.total_padded_size, v2_idx.total_padded_size);
assert_eq!(decoded.source_etag, None);
assert_eq!(decoded.source_compressed_size, None);
}
#[test]
fn lookup_range_within_single_frame() {
let idx = sample_index();
let plan = idx.lookup_range(10, 50).unwrap();
assert_eq!(plan.first_frame_idx, 0);
assert_eq!(plan.last_frame_idx_inclusive, 0);
assert_eq!(plan.byte_start, 0);
assert_eq!(plan.byte_end_exclusive, 50); assert_eq!(plan.slice_start_in_combined, 10);
assert_eq!(plan.slice_end_in_combined, 50);
}
#[test]
fn lookup_range_spans_frames() {
let idx = sample_index();
let plan = idx.lookup_range(50, 150).unwrap();
assert_eq!(plan.first_frame_idx, 0);
assert_eq!(plan.last_frame_idx_inclusive, 1);
assert_eq!(plan.byte_start, 0);
assert_eq!(plan.byte_end_exclusive, 100); assert_eq!(plan.slice_start_in_combined, 50);
assert_eq!(plan.slice_end_in_combined, 150);
}
#[test]
fn lookup_range_at_end_clamps() {
let idx = sample_index();
let plan = idx.lookup_range(200, 1000).unwrap();
assert_eq!(plan.first_frame_idx, 2);
assert_eq!(plan.last_frame_idx_inclusive, 2);
assert_eq!(plan.byte_start, 100);
assert_eq!(plan.byte_end_exclusive, 130);
}
#[test]
fn lookup_range_out_of_bounds_returns_none() {
let idx = sample_index();
assert!(idx.lookup_range(500, 600).is_none());
}
#[test]
fn build_index_from_real_body_skips_padding() {
let mut buf = BytesMut::new();
let p1 = Bytes::from_static(b"AAAA");
write_frame(
&mut buf,
FrameHeader {
codec: CodecKind::Passthrough,
original_size: 100,
compressed_size: p1.len() as u64,
crc32c: 0,
},
&p1,
);
let frame1_end = buf.len();
pad_to_minimum(&mut buf, 5000);
let pad_end = buf.len();
let p2 = Bytes::from_static(b"BBBB");
write_frame(
&mut buf,
FrameHeader {
codec: CodecKind::Passthrough,
original_size: 80,
compressed_size: p2.len() as u64,
crc32c: 0,
},
&p2,
);
let idx = build_index_from_body(&buf.freeze()).unwrap();
assert_eq!(idx.entries.len(), 2);
assert_eq!(idx.entries[0].original_offset, 0);
assert_eq!(idx.entries[0].compressed_offset, 0);
assert_eq!(idx.entries[0].original_size, 100);
assert_eq!(idx.entries[0].compressed_size, frame1_end as u64);
assert_eq!(idx.entries[1].original_offset, 100);
assert_eq!(idx.entries[1].compressed_offset, pad_end as u64);
assert_eq!(idx.entries[1].original_size, 80);
assert_eq!(idx.total_original_size(), 180);
}
}