use crate::storage::engine::overflow::{OverflowChain, OverflowError};
use crate::storage::engine::pager::Pager;
use crate::storage::engine::vector_btree::value_codec;
use reddb_file::BTreeValueCell;
pub const OVERFLOW_THRESHOLD: usize = reddb_file::BTREE_VALUE_OVERFLOW_THRESHOLD;
pub const MAX_VALUE_SIZE: usize = reddb_file::BTREE_VALUE_MAX_SIZE;
pub const POINTER_CELL_LEN: usize = reddb_file::BTREE_VALUE_POINTER_CELL_LEN;
#[derive(Debug)]
pub enum ValueLayoutError {
ValueTooLarge(usize),
UnknownFlag(u8),
TruncatedPointer { got: usize },
Codec(value_codec::ValueCodecError),
Overflow(OverflowError),
}
impl std::fmt::Display for ValueLayoutError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ValueTooLarge(n) => {
write!(f, "value too large: {} bytes (max {})", n, MAX_VALUE_SIZE)
}
Self::UnknownFlag(b) => write!(f, "unknown leaf-cell flag byte: 0b{:08b}", b),
Self::TruncatedPointer { got } => {
write!(
f,
"pointer cell truncated: need {} bytes after flag, got {}",
POINTER_CELL_LEN - 1,
got
)
}
Self::Codec(e) => write!(f, "value codec: {}", e),
Self::Overflow(e) => write!(f, "overflow chain: {}", e),
}
}
}
impl std::error::Error for ValueLayoutError {}
impl From<value_codec::ValueCodecError> for ValueLayoutError {
fn from(e: value_codec::ValueCodecError) -> Self {
Self::Codec(e)
}
}
impl From<reddb_file::BTreeValueCellError> for ValueLayoutError {
fn from(e: reddb_file::BTreeValueCellError) -> Self {
match e {
reddb_file::BTreeValueCellError::UnknownFlag(flag) => Self::UnknownFlag(flag),
reddb_file::BTreeValueCellError::TruncatedPointer { got } => {
Self::TruncatedPointer { got }
}
}
}
}
impl From<OverflowError> for ValueLayoutError {
fn from(e: OverflowError) -> Self {
Self::Overflow(e)
}
}
pub fn encode(pager: &Pager, value: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
if value.len() > MAX_VALUE_SIZE {
return Err(ValueLayoutError::ValueTooLarge(value.len()));
}
if value.len() <= OVERFLOW_THRESHOLD {
return Ok(reddb_file::encode_btree_inline_raw(value));
}
let (codec_flag, codec_bytes) = value_codec::encode(value);
if codec_flag == value_codec::ValueFlag::Lz4 && codec_bytes.len() <= OVERFLOW_THRESHOLD {
return Ok(reddb_file::encode_btree_inline_compressed(&codec_bytes));
}
let is_compressed = codec_flag == value_codec::ValueFlag::Lz4;
let chain = OverflowChain::new(pager);
let (head, total_len) = chain.store(&codec_bytes)?;
Ok(reddb_file::encode_btree_pointer(
head,
total_len,
is_compressed,
))
}
pub fn decode(pager: &Pager, stored: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
match reddb_file::decode_btree_value_cell(stored)? {
BTreeValueCell::Pointer {
is_compressed,
head_page_id,
total_len,
} => {
let chain = OverflowChain::new(pager);
let chain_bytes = chain.read(head_page_id, total_len)?;
if is_compressed {
Ok(value_codec::decode(
value_codec::ValueFlag::Lz4,
&chain_bytes,
)?)
} else {
Ok(chain_bytes)
}
}
BTreeValueCell::Inline {
is_compressed,
payload,
} => Ok(reddb_file::decode_btree_inline_payload(
is_compressed,
payload,
)?),
}
}
pub fn pointer_head(stored: &[u8]) -> Option<u32> {
reddb_file::btree_value_pointer_head(stored)
}
#[inline]
#[allow(dead_code)]
pub fn projected_cell_len(input: &[u8]) -> usize {
let codec_len = value_codec::would_encode_to(input);
reddb_file::btree_projected_cell_len(input.len(), codec_len)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::engine::pager::Pager;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
fn temp_db_path() -> PathBuf {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut path = std::env::temp_dir();
path.push(format!(
"reddb_value_layout_test_{}_{}.db",
std::process::id(),
id
));
path
}
fn cleanup(path: &std::path::Path) {
let _ = std::fs::remove_file(path);
for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
let _ = std::fs::remove_file(sidecar);
}
}
fn fresh_pager() -> (Pager, PathBuf) {
let path = temp_db_path();
cleanup(&path);
let pager = Pager::open_default(&path).unwrap();
(pager, path)
}
#[test]
fn inline_raw_round_trip_below_threshold() {
let (pager, path) = fresh_pager();
let value = vec![0xABu8; OVERFLOW_THRESHOLD - 1];
let stored = encode(&pager, &value).unwrap();
assert_eq!(
reddb_file::decode_btree_value_cell(&stored).unwrap(),
reddb_file::BTreeValueCell::Inline {
is_compressed: false,
payload: value.as_slice(),
}
);
assert_eq!(stored.len(), 1 + value.len());
let decoded = decode(&pager, &stored).unwrap();
assert_eq!(decoded, value);
cleanup(&path);
}
#[test]
fn inline_raw_at_exact_threshold() {
let (pager, path) = fresh_pager();
let value = vec![0x7Eu8; OVERFLOW_THRESHOLD];
let stored = encode(&pager, &value).unwrap();
assert!(matches!(
reddb_file::decode_btree_value_cell(&stored).unwrap(),
reddb_file::BTreeValueCell::Inline {
is_compressed: false,
..
}
));
assert_eq!(decode(&pager, &stored).unwrap(), value);
cleanup(&path);
}
#[test]
fn compressible_above_threshold_inlines_compressed() {
let (pager, path) = fresh_pager();
let value = "the quick brown fox jumps over the lazy dog\n"
.repeat(1024)
.into_bytes();
assert!(value.len() > OVERFLOW_THRESHOLD);
let stored = encode(&pager, &value).unwrap();
assert!(matches!(
reddb_file::decode_btree_value_cell(&stored).unwrap(),
reddb_file::BTreeValueCell::Inline {
is_compressed: true,
..
}
));
assert!(
stored.len() <= 1 + OVERFLOW_THRESHOLD,
"compressed cell must fit inline budget"
);
let decoded = decode(&pager, &stored).unwrap();
assert_eq!(decoded, value);
cleanup(&path);
}
#[test]
fn incompressible_above_threshold_spills_raw() {
let (pager, path) = fresh_pager();
let mut state: u64 = 0x1234_5678_9ABC_DEF0;
let value: Vec<u8> = (0..5 * 1024 * 1024)
.map(|_| {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
state as u8
})
.collect();
let stored = encode(&pager, &value).unwrap();
assert!(matches!(
reddb_file::decode_btree_value_cell(&stored).unwrap(),
reddb_file::BTreeValueCell::Pointer {
is_compressed: false,
..
}
));
assert_eq!(stored.len(), POINTER_CELL_LEN);
let decoded = decode(&pager, &stored).unwrap();
assert_eq!(decoded.len(), value.len());
assert_eq!(decoded, value);
cleanup(&path);
}
#[test]
fn value_above_max_rejected_without_allocation() {
let (pager, path) = fresh_pager();
let before = pager.page_count().unwrap();
let value = vec![0u8; MAX_VALUE_SIZE + 1];
let err = encode(&pager, &value).unwrap_err();
assert!(matches!(err, ValueLayoutError::ValueTooLarge(_)));
let after = pager.page_count().unwrap();
assert_eq!(before, after, "rejected value must not allocate pages");
cleanup(&path);
}
#[test]
fn pointer_head_extracts_head_id_only_for_pointer_cells() {
let inline = reddb_file::encode_btree_inline_raw(&[1, 2, 3]);
assert_eq!(pointer_head(&inline), None);
let inline_compressed = reddb_file::encode_btree_inline_compressed(&[0, 0, 0, 5]);
assert_eq!(pointer_head(&inline_compressed), None);
let cell = reddb_file::encode_btree_pointer(0x0102_0304, 0, false);
assert_eq!(pointer_head(&cell), Some(0x0102_0304));
let compressed_cell = reddb_file::encode_btree_pointer(0x0102_0304, 0, true);
assert_eq!(pointer_head(&compressed_cell), Some(0x0102_0304));
}
#[test]
fn empty_value_round_trips() {
let (pager, path) = fresh_pager();
let stored = encode(&pager, &[]).unwrap();
assert_eq!(stored, vec![0u8]);
assert!(decode(&pager, &stored).unwrap().is_empty());
cleanup(&path);
}
}