use bytes::{Bytes, BytesMut};
use kimberlite_crypto::{ChainHash, chain_hash};
use kimberlite_types::{CompressionKind, Offset, RecordKind};
use crate::StorageError;
const RECORD_START: u32 = 0xBADC_0FFE;
const RECORD_END: u32 = 0xC0FF_EE42;
const HEADER_SIZE: usize = 50;
const RECORD_OVERHEAD: usize = 58;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Record {
offset: Offset,
prev_hash: Option<ChainHash>,
kind: RecordKind,
compression: CompressionKind,
payload: Bytes,
}
impl Record {
pub fn new(offset: Offset, prev_hash: Option<ChainHash>, payload: Bytes) -> Self {
Self {
offset,
prev_hash,
kind: RecordKind::Data,
compression: CompressionKind::None,
payload,
}
}
pub fn with_kind(
offset: Offset,
prev_hash: Option<ChainHash>,
kind: RecordKind,
payload: Bytes,
) -> Self {
Self {
offset,
prev_hash,
kind,
compression: CompressionKind::None,
payload,
}
}
pub fn with_compression(
offset: Offset,
prev_hash: Option<ChainHash>,
kind: RecordKind,
compression: CompressionKind,
payload: Bytes,
) -> Self {
Self {
offset,
prev_hash,
kind,
compression,
payload,
}
}
pub fn offset(&self) -> Offset {
self.offset
}
pub fn prev_hash(&self) -> Option<ChainHash> {
self.prev_hash
}
pub fn kind(&self) -> RecordKind {
self.kind
}
pub fn compression(&self) -> CompressionKind {
self.compression
}
pub fn payload(&self) -> &Bytes {
&self.payload
}
pub fn is_checkpoint(&self) -> bool {
self.kind == RecordKind::Checkpoint
}
pub fn compute_hash(&self) -> ChainHash {
let mut data = Vec::with_capacity(1 + self.payload.len());
data.push(self.kind.as_byte());
data.extend_from_slice(&self.payload);
chain_hash(self.prev_hash.as_ref(), &data)
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(RECORD_OVERHEAD + self.payload.len());
self.write_into(&mut buf);
buf
}
pub fn to_bytes_into(&self, buf: &mut BytesMut) {
buf.reserve(RECORD_OVERHEAD + self.payload.len());
let mut tmp = Vec::with_capacity(RECORD_OVERHEAD + self.payload.len());
self.write_into(&mut tmp);
buf.extend_from_slice(&tmp);
}
fn write_into(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&RECORD_START.to_le_bytes());
buf.extend_from_slice(&self.offset.as_u64().to_le_bytes());
match &self.prev_hash {
Some(hash) => buf.extend_from_slice(hash.as_bytes()),
None => buf.extend_from_slice(&[0u8; 32]),
}
buf.push(self.kind.as_byte());
buf.push(self.compression.as_byte());
buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
buf.extend_from_slice(&self.payload);
let crc = kimberlite_crypto::crc32(buf);
buf.extend_from_slice(&crc.to_le_bytes());
kimberlite_properties::always!(
kimberlite_crypto::crc32(&buf[..buf.len() - 4]) == crc,
"storage.crc32_matches_after_write",
"CRC32 checksum matches stored data immediately after write"
);
buf.extend_from_slice(&RECORD_END.to_le_bytes());
}
pub fn from_bytes(data: &Bytes) -> Result<(Self, usize), StorageError> {
if data.len() < HEADER_SIZE {
return Err(StorageError::UnexpectedEof);
}
let start_sentinel = u32::from_le_bytes(
data[0..4]
.try_into()
.expect("slice is exactly 4 bytes after bounds check"),
);
if start_sentinel != RECORD_START {
return Err(StorageError::TornWrite {
reason: "missing or corrupted RECORD_START sentinel".to_string(),
});
}
let offset = Offset::new(u64::from_le_bytes(
data[4..12]
.try_into()
.expect("slice is exactly 8 bytes after bounds check"),
));
let prev_hash_bytes: [u8; 32] = data[12..44]
.try_into()
.expect("slice is exactly 32 bytes after bounds check");
let prev_hash = if prev_hash_bytes == [0u8; 32] {
None
} else {
Some(ChainHash::from_bytes(&prev_hash_bytes))
};
let kind = RecordKind::from_byte(data[44]).ok_or(StorageError::InvalidRecordKind {
byte: data[44],
offset,
})?;
let compression =
CompressionKind::from_byte(data[45]).ok_or(StorageError::InvalidCompressionKind {
byte: data[45],
offset,
})?;
let length = u32::from_le_bytes(
data[46..50]
.try_into()
.expect("slice is exactly 4 bytes after bounds check"),
) as usize;
let total_size = HEADER_SIZE + length + 4 + 4;
if data.len() < total_size {
return Err(StorageError::UnexpectedEof);
}
let payload = data.slice(HEADER_SIZE..HEADER_SIZE + length);
let crc_offset = HEADER_SIZE + length;
let stored_crc = u32::from_le_bytes(
data[crc_offset..crc_offset + 4]
.try_into()
.expect("slice is exactly 4 bytes after bounds check"),
);
let computed_crc = kimberlite_crypto::crc32(&data[0..crc_offset]);
if stored_crc != computed_crc {
return Err(StorageError::CorruptedRecord);
}
kimberlite_properties::always!(
stored_crc == computed_crc,
"storage.crc32_verified_on_read",
"CRC32 checksum matches stored data after successful deserialization"
);
let end_sentinel_offset = crc_offset + 4;
let end_sentinel = u32::from_le_bytes(
data[end_sentinel_offset..end_sentinel_offset + 4]
.try_into()
.expect("slice is exactly 4 bytes after bounds check"),
);
if end_sentinel != RECORD_END {
return Err(StorageError::TornWrite {
reason: format!(
"missing or corrupted RECORD_END sentinel at offset {}: expected {:#010x}, found {:#010x}",
offset.as_u64(),
RECORD_END,
end_sentinel
),
});
}
Ok((
Record {
offset,
prev_hash,
kind,
compression,
payload,
},
total_size,
))
}
}