use std::io::{Cursor, Read};
use chrono::{TimeZone, Utc};
use roaring::RoaringBitmap;
use uuid::Uuid;
use super::state_doc::SealRecord;
const MAGIC: &[u8; 8] = b"INFTOMB\0";
pub const SCHEMA_VERSION: u32 = 1;
const MAGIC_LEN: usize = 8;
const VERSION_LEN: usize = 4;
const SEAL_FLAG_LEN: usize = 1;
const SEAL_UUID_LEN: usize = 16;
const SEAL_TIMESTAMP_LEN: usize = 8;
const BITMAP_LEN_PREFIX_LEN: usize = 4;
const SEAL_FLAG_UNSEALED: u8 = 0;
const SEAL_FLAG_SEALED: u8 = 1;
const HEADER_LEN_UNSEALED: usize = MAGIC_LEN + VERSION_LEN + SEAL_FLAG_LEN;
const HEADER_LEN_SEALED: usize = HEADER_LEN_UNSEALED + SEAL_UUID_LEN + SEAL_TIMESTAMP_LEN;
#[derive(Debug, Clone)]
pub struct TombstonesSidecar {
pub seal: Option<SealRecord>,
pub bitmap: RoaringBitmap,
}
#[derive(Debug, thiserror::Error)]
pub enum SidecarCodecError {
#[error("sidecar truncated: need {needed} bytes, have {have}")]
Truncated { needed: usize, have: usize },
#[error("bad magic — expected {expected:?}, got {got:?}")]
BadMagic { expected: [u8; 8], got: [u8; 8] },
#[error("unsupported schema version {got}; this build supports up to {max}")]
UnsupportedVersion { got: u32, max: u32 },
#[error("invalid seal_flag byte {got}; must be 0 or 1")]
InvalidSealFlag { got: u8 },
#[error("invalid sealed_at_unix_millis {millis}: cannot represent as a UTC chrono::DateTime")]
InvalidSealTimestamp { millis: i64 },
#[error("bitmap length {declared} exceeds remaining bytes {remaining}")]
BitmapTooLong { declared: u32, remaining: usize },
#[error("trailing garbage after bitmap: {trailing} unexpected bytes")]
TrailingBytes { trailing: usize },
#[error("RoaringBitmap deserialize failed: {0}")]
BitmapDecode(#[source] std::io::Error),
#[error("RoaringBitmap serialize failed: {0}")]
BitmapEncode(#[source] std::io::Error),
}
pub fn encode_sidecar(sidecar: &TombstonesSidecar) -> Result<Vec<u8>, SidecarCodecError> {
let bitmap_size = sidecar.bitmap.serialized_size();
let header_len = if sidecar.seal.is_some() {
HEADER_LEN_SEALED
} else {
HEADER_LEN_UNSEALED
};
let total = header_len + BITMAP_LEN_PREFIX_LEN + bitmap_size;
let mut out: Vec<u8> = Vec::with_capacity(total);
out.extend_from_slice(MAGIC);
out.extend_from_slice(&SCHEMA_VERSION.to_le_bytes());
match &sidecar.seal {
None => {
out.push(SEAL_FLAG_UNSEALED);
}
Some(seal) => {
out.push(SEAL_FLAG_SEALED);
out.extend_from_slice(seal.compaction_id.as_bytes());
let millis = seal.sealed_at.timestamp_millis();
out.extend_from_slice(&millis.to_le_bytes());
}
}
let len_prefix_pos = out.len();
out.extend_from_slice(&[0u8; BITMAP_LEN_PREFIX_LEN]);
let pre_bitmap_len = out.len();
sidecar
.bitmap
.serialize_into(&mut out)
.map_err(SidecarCodecError::BitmapEncode)?;
let bitmap_actual = (out.len() - pre_bitmap_len) as u32;
out[len_prefix_pos..len_prefix_pos + BITMAP_LEN_PREFIX_LEN]
.copy_from_slice(&bitmap_actual.to_le_bytes());
Ok(out)
}
pub fn decode_sidecar(bytes: &[u8]) -> Result<TombstonesSidecar, SidecarCodecError> {
let mut cur = Cursor::new(bytes);
let mut magic_buf = [0u8; MAGIC_LEN];
read_exact(&mut cur, &mut magic_buf, MAGIC_LEN, bytes.len())?;
if &magic_buf != MAGIC {
return Err(SidecarCodecError::BadMagic {
expected: *MAGIC,
got: magic_buf,
});
}
let mut vbuf = [0u8; VERSION_LEN];
read_exact(&mut cur, &mut vbuf, VERSION_LEN, bytes.len())?;
let version = u32::from_le_bytes(vbuf);
if version > SCHEMA_VERSION {
return Err(SidecarCodecError::UnsupportedVersion {
got: version,
max: SCHEMA_VERSION,
});
}
let mut fbuf = [0u8; SEAL_FLAG_LEN];
read_exact(&mut cur, &mut fbuf, SEAL_FLAG_LEN, bytes.len())?;
let seal = match fbuf[0] {
SEAL_FLAG_UNSEALED => None,
SEAL_FLAG_SEALED => {
let mut uuid_buf = [0u8; SEAL_UUID_LEN];
read_exact(&mut cur, &mut uuid_buf, SEAL_UUID_LEN, bytes.len())?;
let compaction_id = Uuid::from_bytes(uuid_buf);
let mut tbuf = [0u8; SEAL_TIMESTAMP_LEN];
read_exact(&mut cur, &mut tbuf, SEAL_TIMESTAMP_LEN, bytes.len())?;
let millis = i64::from_le_bytes(tbuf);
let sealed_at = Utc
.timestamp_millis_opt(millis)
.single()
.ok_or(SidecarCodecError::InvalidSealTimestamp { millis })?;
Some(SealRecord {
compaction_id,
sealed_at,
})
}
other => return Err(SidecarCodecError::InvalidSealFlag { got: other }),
};
let mut lbuf = [0u8; BITMAP_LEN_PREFIX_LEN];
read_exact(&mut cur, &mut lbuf, BITMAP_LEN_PREFIX_LEN, bytes.len())?;
let bitmap_len = u32::from_le_bytes(lbuf);
let remaining = bytes.len() - (cur.position() as usize);
if (bitmap_len as usize) > remaining {
return Err(SidecarCodecError::BitmapTooLong {
declared: bitmap_len,
remaining,
});
}
let bitmap_start = cur.position() as usize;
let bitmap_end = bitmap_start + bitmap_len as usize;
let bitmap = RoaringBitmap::deserialize_from(&bytes[bitmap_start..bitmap_end])
.map_err(SidecarCodecError::BitmapDecode)?;
let trailing = bytes.len() - bitmap_end;
if trailing != 0 {
return Err(SidecarCodecError::TrailingBytes { trailing });
}
let _ = version;
Ok(TombstonesSidecar { seal, bitmap })
}
#[inline]
fn read_exact(
cur: &mut Cursor<&[u8]>,
dst: &mut [u8],
needed: usize,
total_len: usize,
) -> Result<(), SidecarCodecError> {
cur.read_exact(dst)
.map_err(|_| SidecarCodecError::Truncated {
needed,
have: total_len.saturating_sub(cur.position() as usize),
})
}
#[cfg(test)]
mod tests {
use chrono::DateTime;
use super::*;
fn sample_bitmap() -> RoaringBitmap {
let mut b = RoaringBitmap::new();
b.insert(1);
b.insert(42);
b.insert(1_000);
b.insert(100_000);
b
}
#[test]
fn unsealed_roundtrip() {
let sidecar = TombstonesSidecar {
seal: None,
bitmap: sample_bitmap(),
};
let bytes = encode_sidecar(&sidecar).expect("encode");
let decoded = decode_sidecar(&bytes).expect("decode");
assert!(decoded.seal.is_none());
let expected: Vec<u32> = sidecar.bitmap.iter().collect();
let got: Vec<u32> = decoded.bitmap.iter().collect();
assert_eq!(got, expected);
}
#[test]
fn sealed_roundtrip() {
let sealed_at: DateTime<Utc> = "2026-05-30T12:34:56.789Z".parse().expect("ts");
let sidecar = TombstonesSidecar {
seal: Some(SealRecord {
compaction_id: Uuid::from_u128(0x1234_5678_90AB_CDEF_0000_1111_2222_3333),
sealed_at,
}),
bitmap: sample_bitmap(),
};
let bytes = encode_sidecar(&sidecar).expect("encode");
let decoded = decode_sidecar(&bytes).expect("decode");
let s = decoded.seal.expect("seal preserved");
assert_eq!(
s.compaction_id,
sidecar.seal.as_ref().expect("seal set above").compaction_id
);
assert_eq!(s.sealed_at.timestamp_millis(), sealed_at.timestamp_millis());
}
#[test]
fn empty_bitmap_roundtrip() {
let sidecar = TombstonesSidecar {
seal: None,
bitmap: RoaringBitmap::new(),
};
let bytes = encode_sidecar(&sidecar).expect("encode");
let decoded = decode_sidecar(&bytes).expect("decode");
assert!(decoded.bitmap.is_empty());
}
#[test]
fn rejects_short_input() {
let err = decode_sidecar(&[0, 1, 2]).expect_err("short");
assert!(matches!(err, SidecarCodecError::Truncated { .. }));
}
#[test]
fn rejects_bad_magic() {
let mut bytes = encode_sidecar(&TombstonesSidecar {
seal: None,
bitmap: RoaringBitmap::new(),
})
.expect("encode");
bytes[0] = b'X';
let err = decode_sidecar(&bytes).expect_err("bad magic");
assert!(matches!(err, SidecarCodecError::BadMagic { .. }));
}
#[test]
fn rejects_unsupported_version() {
let mut bytes = encode_sidecar(&TombstonesSidecar {
seal: None,
bitmap: RoaringBitmap::new(),
})
.expect("encode");
let bumped = (SCHEMA_VERSION + 7).to_le_bytes();
bytes[8..12].copy_from_slice(&bumped);
let err = decode_sidecar(&bytes).expect_err("future version");
assert!(matches!(
err,
SidecarCodecError::UnsupportedVersion { got, .. } if got == SCHEMA_VERSION + 7
));
}
#[test]
fn rejects_invalid_seal_flag() {
let mut bytes = encode_sidecar(&TombstonesSidecar {
seal: None,
bitmap: RoaringBitmap::new(),
})
.expect("encode");
bytes[12] = 9;
let err = decode_sidecar(&bytes).expect_err("bad flag");
assert!(matches!(err, SidecarCodecError::InvalidSealFlag { got: 9 }));
}
#[test]
fn rejects_trailing_garbage() {
let mut bytes = encode_sidecar(&TombstonesSidecar {
seal: None,
bitmap: sample_bitmap(),
})
.expect("encode");
bytes.push(0xAA);
bytes.push(0xBB);
let err = decode_sidecar(&bytes).expect_err("trailing");
assert!(matches!(
err,
SidecarCodecError::TrailingBytes { trailing: 2 }
));
}
#[test]
fn rejects_bitmap_length_past_buffer() {
let mut bytes = encode_sidecar(&TombstonesSidecar {
seal: None,
bitmap: sample_bitmap(),
})
.expect("encode");
let huge: u32 = 0xFFFF_FFFF;
bytes[13..17].copy_from_slice(&huge.to_le_bytes());
let err = decode_sidecar(&bytes).expect_err("over-length");
assert!(matches!(err, SidecarCodecError::BitmapTooLong { .. }));
}
#[test]
fn magic_offset_is_stable() {
let bytes = encode_sidecar(&TombstonesSidecar {
seal: Some(SealRecord {
compaction_id: Uuid::nil(),
sealed_at: Utc.timestamp_millis_opt(0).single().expect("ts"),
}),
bitmap: sample_bitmap(),
})
.expect("encode");
assert_eq!(&bytes[0..8], MAGIC);
assert_eq!(&bytes[8..12], &SCHEMA_VERSION.to_le_bytes());
assert_eq!(bytes[12], 1);
}
}