pub(crate) mod cursor;
mod hash_stream;
use crate::error::InternalError;
use std::borrow::Cow;
pub(in crate::db) use hash_stream::{
finalize_hash_sha256, new_hash_sha256_prefixed, write_hash_str_u32, write_hash_tag_u8,
write_hash_u32, write_hash_u64,
};
pub(crate) const MAX_ROW_BYTES: u32 = 4 * 1024 * 1024;
pub(in crate::db) const ROW_FORMAT_VERSION_CURRENT: u8 = 2;
const ROW_ENVELOPE_MAGIC: [u8; 2] = *b"IR";
const ROW_ENVELOPE_HEADER_LEN: usize = 2 + 1 + 4;
pub(in crate::db) fn serialize_row_payload(payload: Vec<u8>) -> Result<Vec<u8>, InternalError> {
serialize_row_payload_with_version(payload, ROW_FORMAT_VERSION_CURRENT)
}
pub(in crate::db) fn decode_row_payload_bytes(
bytes: &[u8],
) -> Result<Cow<'_, [u8]>, InternalError> {
if bytes.len() > MAX_ROW_BYTES as usize {
return Err(InternalError::serialize_corruption(format!(
"row decode: payload size {} exceeds limit {}",
bytes.len(),
MAX_ROW_BYTES
)));
}
let magic = bytes
.get(..ROW_ENVELOPE_MAGIC.len())
.ok_or_else(|| InternalError::serialize_corruption("row decode: truncated row envelope"))?;
if magic != ROW_ENVELOPE_MAGIC {
return Err(InternalError::serialize_corruption(
"row decode: invalid row envelope magic",
));
}
let format_version = *bytes.get(ROW_ENVELOPE_MAGIC.len()).ok_or_else(|| {
InternalError::serialize_corruption("row decode: missing row format version")
})?;
validate_row_format_version(format_version)?;
let payload_len_offset = ROW_ENVELOPE_MAGIC.len() + 1;
let payload_len_bytes = bytes
.get(payload_len_offset..payload_len_offset + 4)
.ok_or_else(|| {
InternalError::serialize_corruption("row decode: truncated row payload length")
})?;
let payload_len = usize::try_from(u32::from_be_bytes([
payload_len_bytes[0],
payload_len_bytes[1],
payload_len_bytes[2],
payload_len_bytes[3],
]))
.map_err(|_| InternalError::serialize_corruption("row decode: payload length out of range"))?;
let payload_start = ROW_ENVELOPE_HEADER_LEN;
let payload_end = payload_start.checked_add(payload_len).ok_or_else(|| {
InternalError::serialize_corruption("row decode: payload length overflow")
})?;
if payload_end != bytes.len() {
return Err(InternalError::serialize_corruption(
"row decode: payload length does not match row envelope",
));
}
Ok(Cow::Borrowed(&bytes[payload_start..payload_end]))
}
fn validate_row_format_version(format_version: u8) -> Result<(), InternalError> {
if format_version == ROW_FORMAT_VERSION_CURRENT {
return Ok(());
}
Err(InternalError::serialize_incompatible_persisted_format(
format!(
"row format version {format_version} is unsupported by runtime version {ROW_FORMAT_VERSION_CURRENT}",
),
))
}
pub(in crate::db) fn serialize_row_payload_with_version(
payload: Vec<u8>,
format_version: u8,
) -> Result<Vec<u8>, InternalError> {
let total_len = ROW_ENVELOPE_HEADER_LEN
.checked_add(payload.len())
.ok_or_else(|| {
InternalError::persisted_row_encode_failed("row envelope length overflow")
})?;
if total_len > MAX_ROW_BYTES as usize {
return Err(InternalError::persisted_row_encode_failed(format!(
"row envelope exceeds max size: {total_len} bytes (limit {MAX_ROW_BYTES})",
)));
}
let mut encoded = Vec::with_capacity(total_len);
encoded.extend_from_slice(&ROW_ENVELOPE_MAGIC);
encoded.push(format_version);
encoded.extend_from_slice(
&u32::try_from(payload.len())
.map_err(|_| {
InternalError::persisted_row_encode_failed("row payload exceeds u32 length")
})?
.to_be_bytes(),
);
encoded.extend_from_slice(&payload);
Ok(encoded)
}