reddb_server/storage/engine/btree/
value_layout.rs1use crate::storage::engine::overflow::{OverflowChain, OverflowError};
29use crate::storage::engine::pager::Pager;
30use crate::storage::engine::vector_btree::value_codec;
31use reddb_file::BTreeValueCell;
32
33pub const OVERFLOW_THRESHOLD: usize = reddb_file::BTREE_VALUE_OVERFLOW_THRESHOLD;
37
38pub const MAX_VALUE_SIZE: usize = reddb_file::BTREE_VALUE_MAX_SIZE;
42
43pub const POINTER_CELL_LEN: usize = reddb_file::BTREE_VALUE_POINTER_CELL_LEN;
45
46#[derive(Debug)]
48pub enum ValueLayoutError {
49 ValueTooLarge(usize),
51 UnknownFlag(u8),
53 TruncatedPointer { got: usize },
55 Codec(value_codec::ValueCodecError),
57 Overflow(OverflowError),
59}
60
61impl std::fmt::Display for ValueLayoutError {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 Self::ValueTooLarge(n) => {
65 write!(f, "value too large: {} bytes (max {})", n, MAX_VALUE_SIZE)
66 }
67 Self::UnknownFlag(b) => write!(f, "unknown leaf-cell flag byte: 0b{:08b}", b),
68 Self::TruncatedPointer { got } => {
69 write!(
70 f,
71 "pointer cell truncated: need {} bytes after flag, got {}",
72 POINTER_CELL_LEN - 1,
73 got
74 )
75 }
76 Self::Codec(e) => write!(f, "value codec: {}", e),
77 Self::Overflow(e) => write!(f, "overflow chain: {}", e),
78 }
79 }
80}
81
82impl std::error::Error for ValueLayoutError {}
83
84impl From<value_codec::ValueCodecError> for ValueLayoutError {
85 fn from(e: value_codec::ValueCodecError) -> Self {
86 Self::Codec(e)
87 }
88}
89
90impl From<reddb_file::BTreeValueCellError> for ValueLayoutError {
91 fn from(e: reddb_file::BTreeValueCellError) -> Self {
92 match e {
93 reddb_file::BTreeValueCellError::UnknownFlag(flag) => Self::UnknownFlag(flag),
94 reddb_file::BTreeValueCellError::TruncatedPointer { got } => {
95 Self::TruncatedPointer { got }
96 }
97 }
98 }
99}
100
101impl From<OverflowError> for ValueLayoutError {
102 fn from(e: OverflowError) -> Self {
103 Self::Overflow(e)
104 }
105}
106
107pub fn encode(pager: &Pager, value: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
111 if value.len() > MAX_VALUE_SIZE {
112 return Err(ValueLayoutError::ValueTooLarge(value.len()));
113 }
114
115 if value.len() <= OVERFLOW_THRESHOLD {
119 return Ok(reddb_file::encode_btree_inline_raw(value));
120 }
121
122 let (codec_flag, codec_bytes) = value_codec::encode(value);
125
126 if codec_flag == value_codec::ValueFlag::Lz4 && codec_bytes.len() <= OVERFLOW_THRESHOLD {
131 return Ok(reddb_file::encode_btree_inline_compressed(&codec_bytes));
132 }
133
134 let is_compressed = codec_flag == value_codec::ValueFlag::Lz4;
139 let chain = OverflowChain::new(pager);
140 let (head, total_len) = chain.store(&codec_bytes)?;
141
142 Ok(reddb_file::encode_btree_pointer(
143 head,
144 total_len,
145 is_compressed,
146 ))
147}
148
149pub fn decode(pager: &Pager, stored: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
152 match reddb_file::decode_btree_value_cell(stored)? {
153 BTreeValueCell::Pointer {
154 is_compressed,
155 head_page_id,
156 total_len,
157 } => {
158 let chain = OverflowChain::new(pager);
159 let chain_bytes = chain.read(head_page_id, total_len)?;
160 if is_compressed {
161 Ok(value_codec::decode(
162 value_codec::ValueFlag::Lz4,
163 &chain_bytes,
164 )?)
165 } else {
166 Ok(chain_bytes)
167 }
168 }
169 BTreeValueCell::Inline {
170 is_compressed,
171 payload,
172 } => Ok(reddb_file::decode_btree_inline_payload(
173 is_compressed,
174 payload,
175 )?),
176 }
177}
178
179pub fn pointer_head(stored: &[u8]) -> Option<u32> {
185 reddb_file::btree_value_pointer_head(stored)
186}
187
188#[inline]
192#[allow(dead_code)]
193pub fn projected_cell_len(input: &[u8]) -> usize {
194 let codec_len = value_codec::would_encode_to(input);
195 reddb_file::btree_projected_cell_len(input.len(), codec_len)
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::storage::engine::pager::Pager;
202 use std::path::PathBuf;
203 use std::sync::atomic::{AtomicU64, Ordering};
204
205 fn temp_db_path() -> PathBuf {
206 static COUNTER: AtomicU64 = AtomicU64::new(0);
207 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
208 let mut path = std::env::temp_dir();
209 path.push(format!(
210 "reddb_value_layout_test_{}_{}.db",
211 std::process::id(),
212 id
213 ));
214 path
215 }
216
217 fn cleanup(path: &std::path::Path) {
218 let _ = std::fs::remove_file(path);
219 for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
220 let _ = std::fs::remove_file(sidecar);
221 }
222 }
223
224 fn fresh_pager() -> (Pager, PathBuf) {
225 let path = temp_db_path();
226 cleanup(&path);
227 let pager = Pager::open_default(&path).unwrap();
228 (pager, path)
229 }
230
231 #[test]
232 fn inline_raw_round_trip_below_threshold() {
233 let (pager, path) = fresh_pager();
234 let value = vec![0xABu8; OVERFLOW_THRESHOLD - 1];
235 let stored = encode(&pager, &value).unwrap();
236 assert_eq!(
237 reddb_file::decode_btree_value_cell(&stored).unwrap(),
238 reddb_file::BTreeValueCell::Inline {
239 is_compressed: false,
240 payload: value.as_slice(),
241 }
242 );
243 assert_eq!(stored.len(), 1 + value.len());
244 let decoded = decode(&pager, &stored).unwrap();
245 assert_eq!(decoded, value);
246 cleanup(&path);
247 }
248
249 #[test]
250 fn inline_raw_at_exact_threshold() {
251 let (pager, path) = fresh_pager();
252 let value = vec![0x7Eu8; OVERFLOW_THRESHOLD];
253 let stored = encode(&pager, &value).unwrap();
254 assert!(matches!(
255 reddb_file::decode_btree_value_cell(&stored).unwrap(),
256 reddb_file::BTreeValueCell::Inline {
257 is_compressed: false,
258 ..
259 }
260 ));
261 assert_eq!(decode(&pager, &stored).unwrap(), value);
262 cleanup(&path);
263 }
264
265 #[test]
266 fn compressible_above_threshold_inlines_compressed() {
267 let (pager, path) = fresh_pager();
268 let value = "the quick brown fox jumps over the lazy dog\n"
271 .repeat(1024)
272 .into_bytes();
273 assert!(value.len() > OVERFLOW_THRESHOLD);
274 let stored = encode(&pager, &value).unwrap();
275 assert!(matches!(
276 reddb_file::decode_btree_value_cell(&stored).unwrap(),
277 reddb_file::BTreeValueCell::Inline {
278 is_compressed: true,
279 ..
280 }
281 ));
282 assert!(
283 stored.len() <= 1 + OVERFLOW_THRESHOLD,
284 "compressed cell must fit inline budget"
285 );
286 let decoded = decode(&pager, &stored).unwrap();
287 assert_eq!(decoded, value);
288 cleanup(&path);
289 }
290
291 #[test]
292 fn incompressible_above_threshold_spills_raw() {
293 let (pager, path) = fresh_pager();
294 let mut state: u64 = 0x1234_5678_9ABC_DEF0;
296 let value: Vec<u8> = (0..5 * 1024 * 1024)
297 .map(|_| {
298 state ^= state << 13;
299 state ^= state >> 7;
300 state ^= state << 17;
301 state as u8
302 })
303 .collect();
304 let stored = encode(&pager, &value).unwrap();
305 assert!(matches!(
306 reddb_file::decode_btree_value_cell(&stored).unwrap(),
307 reddb_file::BTreeValueCell::Pointer {
308 is_compressed: false,
309 ..
310 }
311 ));
312 assert_eq!(stored.len(), POINTER_CELL_LEN);
313 let decoded = decode(&pager, &stored).unwrap();
314 assert_eq!(decoded.len(), value.len());
315 assert_eq!(decoded, value);
316 cleanup(&path);
317 }
318
319 #[test]
320 fn value_above_max_rejected_without_allocation() {
321 let (pager, path) = fresh_pager();
322 let before = pager.page_count().unwrap();
323 let value = vec![0u8; MAX_VALUE_SIZE + 1];
324 let err = encode(&pager, &value).unwrap_err();
325 assert!(matches!(err, ValueLayoutError::ValueTooLarge(_)));
326 let after = pager.page_count().unwrap();
327 assert_eq!(before, after, "rejected value must not allocate pages");
328 cleanup(&path);
329 }
330
331 #[test]
332 fn pointer_head_extracts_head_id_only_for_pointer_cells() {
333 let inline = reddb_file::encode_btree_inline_raw(&[1, 2, 3]);
334 assert_eq!(pointer_head(&inline), None);
335 let inline_compressed = reddb_file::encode_btree_inline_compressed(&[0, 0, 0, 5]);
336 assert_eq!(pointer_head(&inline_compressed), None);
337
338 let cell = reddb_file::encode_btree_pointer(0x0102_0304, 0, false);
339 assert_eq!(pointer_head(&cell), Some(0x0102_0304));
340 let compressed_cell = reddb_file::encode_btree_pointer(0x0102_0304, 0, true);
341 assert_eq!(pointer_head(&compressed_cell), Some(0x0102_0304));
342 }
343
344 #[test]
345 fn empty_value_round_trips() {
346 let (pager, path) = fresh_pager();
347 let stored = encode(&pager, &[]).unwrap();
348 assert_eq!(stored, vec![0u8]);
349 assert!(decode(&pager, &stored).unwrap().is_empty());
350 cleanup(&path);
351 }
352}