reddb_server/storage/engine/btree/
value_layout.rs1use crate::storage::engine::overflow::{OverflowChain, OverflowError};
42use crate::storage::engine::pager::Pager;
43use crate::storage::engine::vector_btree::value_codec;
44
45pub const OVERFLOW_THRESHOLD: usize = 1024;
49
50pub const MAX_VALUE_SIZE: usize = 256 * 1024 * 1024;
54
55const FLAG_POINTER: u8 = 0b0000_0001;
58const FLAG_COMPRESSED: u8 = 0b0000_0010;
59const FLAG_RESERVED_MASK: u8 = !(FLAG_POINTER | FLAG_COMPRESSED);
60
61const POINTER_PAYLOAD_LEN: usize = 4 + 8;
64
65pub const POINTER_CELL_LEN: usize = 1 + POINTER_PAYLOAD_LEN;
67
68#[derive(Debug)]
70pub enum ValueLayoutError {
71 ValueTooLarge(usize),
73 UnknownFlag(u8),
75 TruncatedPointer { got: usize },
77 Codec(value_codec::ValueCodecError),
79 Overflow(OverflowError),
81}
82
83impl std::fmt::Display for ValueLayoutError {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 match self {
86 Self::ValueTooLarge(n) => {
87 write!(f, "value too large: {} bytes (max {})", n, MAX_VALUE_SIZE)
88 }
89 Self::UnknownFlag(b) => write!(f, "unknown leaf-cell flag byte: 0b{:08b}", b),
90 Self::TruncatedPointer { got } => {
91 write!(
92 f,
93 "pointer cell truncated: need {} bytes after flag, got {}",
94 POINTER_PAYLOAD_LEN, got
95 )
96 }
97 Self::Codec(e) => write!(f, "value codec: {}", e),
98 Self::Overflow(e) => write!(f, "overflow chain: {}", e),
99 }
100 }
101}
102
103impl std::error::Error for ValueLayoutError {}
104
105impl From<value_codec::ValueCodecError> for ValueLayoutError {
106 fn from(e: value_codec::ValueCodecError) -> Self {
107 Self::Codec(e)
108 }
109}
110
111impl From<OverflowError> for ValueLayoutError {
112 fn from(e: OverflowError) -> Self {
113 Self::Overflow(e)
114 }
115}
116
117pub fn encode(pager: &Pager, value: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
121 if value.len() > MAX_VALUE_SIZE {
122 return Err(ValueLayoutError::ValueTooLarge(value.len()));
123 }
124
125 if value.len() <= OVERFLOW_THRESHOLD {
129 let mut out = Vec::with_capacity(1 + value.len());
130 out.push(0);
131 out.extend_from_slice(value);
132 return Ok(out);
133 }
134
135 let (codec_flag, codec_bytes) = value_codec::encode(value);
138
139 if codec_flag == value_codec::ValueFlag::Lz4 && codec_bytes.len() <= OVERFLOW_THRESHOLD {
144 let mut out = Vec::with_capacity(1 + codec_bytes.len());
145 out.push(FLAG_COMPRESSED);
146 out.extend_from_slice(&codec_bytes);
147 return Ok(out);
148 }
149
150 let is_compressed = codec_flag == value_codec::ValueFlag::Lz4;
155 let chain = OverflowChain::new(pager);
156 let (head, total_len) = chain.store(&codec_bytes)?;
157
158 let mut flag = FLAG_POINTER;
159 if is_compressed {
160 flag |= FLAG_COMPRESSED;
161 }
162 let mut out = Vec::with_capacity(POINTER_CELL_LEN);
163 out.push(flag);
164 out.extend_from_slice(&head.to_le_bytes());
165 out.extend_from_slice(&total_len.to_le_bytes());
166 Ok(out)
167}
168
169pub fn decode(pager: &Pager, stored: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
172 if stored.is_empty() {
173 return Ok(Vec::new());
177 }
178
179 let flag = stored[0];
180 if flag & FLAG_RESERVED_MASK != 0 {
181 return Err(ValueLayoutError::UnknownFlag(flag));
182 }
183 let is_pointer = flag & FLAG_POINTER != 0;
184 let is_compressed = flag & FLAG_COMPRESSED != 0;
185 let payload = &stored[1..];
186
187 if is_pointer {
188 if payload.len() != POINTER_PAYLOAD_LEN {
189 return Err(ValueLayoutError::TruncatedPointer { got: payload.len() });
190 }
191 let head = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
192 let total_len = u64::from_le_bytes([
193 payload[4],
194 payload[5],
195 payload[6],
196 payload[7],
197 payload[8],
198 payload[9],
199 payload[10],
200 payload[11],
201 ]);
202 let chain = OverflowChain::new(pager);
203 let chain_bytes = chain.read(head, total_len)?;
204 if is_compressed {
205 Ok(value_codec::decode(
206 value_codec::ValueFlag::Lz4,
207 &chain_bytes,
208 )?)
209 } else {
210 Ok(chain_bytes)
211 }
212 } else if is_compressed {
213 Ok(value_codec::decode(value_codec::ValueFlag::Lz4, payload)?)
214 } else {
215 Ok(payload.to_vec())
216 }
217}
218
219pub fn pointer_head(stored: &[u8]) -> Option<u32> {
225 if stored.is_empty() {
226 return None;
227 }
228 let flag = stored[0];
229 if flag & FLAG_RESERVED_MASK != 0 {
230 return None;
231 }
232 if flag & FLAG_POINTER == 0 {
233 return None;
234 }
235 if stored.len() < 1 + POINTER_PAYLOAD_LEN {
236 return None;
237 }
238 let payload = &stored[1..];
239 Some(u32::from_le_bytes([
240 payload[0], payload[1], payload[2], payload[3],
241 ]))
242}
243
244#[inline]
248#[allow(dead_code)]
249pub fn projected_cell_len(input: &[u8]) -> usize {
250 if input.len() <= OVERFLOW_THRESHOLD {
251 return 1 + input.len();
252 }
253 let codec_len = value_codec::would_encode_to(input);
254 if codec_len < input.len() && codec_len <= OVERFLOW_THRESHOLD {
259 1 + codec_len
260 } else {
261 POINTER_CELL_LEN
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use crate::storage::engine::pager::Pager;
269 use std::path::PathBuf;
270 use std::sync::atomic::{AtomicU64, Ordering};
271
272 fn temp_db_path() -> PathBuf {
273 static COUNTER: AtomicU64 = AtomicU64::new(0);
274 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
275 let mut path = std::env::temp_dir();
276 path.push(format!(
277 "reddb_value_layout_test_{}_{}.db",
278 std::process::id(),
279 id
280 ));
281 path
282 }
283
284 fn cleanup(path: &std::path::Path) {
285 let _ = std::fs::remove_file(path);
286 for suffix in ["-hdr", "-meta", "-dwb"] {
287 let mut p = path.to_path_buf().into_os_string();
288 p.push(suffix);
289 let _ = std::fs::remove_file(&p);
290 }
291 }
292
293 fn fresh_pager() -> (Pager, PathBuf) {
294 let path = temp_db_path();
295 cleanup(&path);
296 let pager = Pager::open_default(&path).unwrap();
297 (pager, path)
298 }
299
300 #[test]
301 fn inline_raw_round_trip_below_threshold() {
302 let (pager, path) = fresh_pager();
303 let value = vec![0xABu8; OVERFLOW_THRESHOLD - 1];
304 let stored = encode(&pager, &value).unwrap();
305 assert_eq!(stored[0], 0, "inline raw flag must be zero");
306 assert_eq!(stored.len(), 1 + value.len());
307 let decoded = decode(&pager, &stored).unwrap();
308 assert_eq!(decoded, value);
309 cleanup(&path);
310 }
311
312 #[test]
313 fn inline_raw_at_exact_threshold() {
314 let (pager, path) = fresh_pager();
315 let value = vec![0x7Eu8; OVERFLOW_THRESHOLD];
316 let stored = encode(&pager, &value).unwrap();
317 assert_eq!(stored[0], 0);
318 assert_eq!(decode(&pager, &stored).unwrap(), value);
319 cleanup(&path);
320 }
321
322 #[test]
323 fn compressible_above_threshold_inlines_compressed() {
324 let (pager, path) = fresh_pager();
325 let value = "the quick brown fox jumps over the lazy dog\n"
328 .repeat(1024)
329 .into_bytes();
330 assert!(value.len() > OVERFLOW_THRESHOLD);
331 let stored = encode(&pager, &value).unwrap();
332 assert_eq!(
333 stored[0], FLAG_COMPRESSED,
334 "highly repetitive payload must inline compressed"
335 );
336 assert!(
337 stored.len() <= 1 + OVERFLOW_THRESHOLD,
338 "compressed cell must fit inline budget"
339 );
340 let decoded = decode(&pager, &stored).unwrap();
341 assert_eq!(decoded, value);
342 cleanup(&path);
343 }
344
345 #[test]
346 fn incompressible_above_threshold_spills_raw() {
347 let (pager, path) = fresh_pager();
348 let mut state: u64 = 0x1234_5678_9ABC_DEF0;
350 let value: Vec<u8> = (0..5 * 1024 * 1024)
351 .map(|_| {
352 state ^= state << 13;
353 state ^= state >> 7;
354 state ^= state << 17;
355 state as u8
356 })
357 .collect();
358 let stored = encode(&pager, &value).unwrap();
359 assert_eq!(
360 stored[0] & FLAG_POINTER,
361 FLAG_POINTER,
362 "incompressible spill must set pointer flag"
363 );
364 assert_eq!(
365 stored[0] & FLAG_COMPRESSED,
366 0,
367 "incompressible payload must not claim to be compressed"
368 );
369 assert_eq!(stored.len(), POINTER_CELL_LEN);
370 let decoded = decode(&pager, &stored).unwrap();
371 assert_eq!(decoded.len(), value.len());
372 assert_eq!(decoded, value);
373 cleanup(&path);
374 }
375
376 #[test]
377 fn value_above_max_rejected_without_allocation() {
378 let (pager, path) = fresh_pager();
379 let before = pager.page_count().unwrap();
380 let value = vec![0u8; MAX_VALUE_SIZE + 1];
381 let err = encode(&pager, &value).unwrap_err();
382 assert!(matches!(err, ValueLayoutError::ValueTooLarge(_)));
383 let after = pager.page_count().unwrap();
384 assert_eq!(before, after, "rejected value must not allocate pages");
385 cleanup(&path);
386 }
387
388 #[test]
389 fn decode_rejects_reserved_flag_bits() {
390 let (pager, path) = fresh_pager();
391 let stored = vec![0b0000_0100u8];
392 let err = decode(&pager, &stored).unwrap_err();
393 assert!(matches!(err, ValueLayoutError::UnknownFlag(_)));
394 cleanup(&path);
395 }
396
397 #[test]
398 fn pointer_head_extracts_head_id_only_for_pointer_cells() {
399 assert_eq!(pointer_head(&[0x00, 1, 2, 3]), None);
401 assert_eq!(pointer_head(&[0x02, 0, 0, 0, 5, 0xff, 0xfe]), None);
403 assert_eq!(pointer_head(&[]), None);
405 assert_eq!(pointer_head(&[0b0000_0100]), None);
408 let mut cell = vec![FLAG_POINTER];
410 cell.extend_from_slice(&0x0102_0304u32.to_le_bytes());
411 cell.extend_from_slice(&0u64.to_le_bytes());
412 assert_eq!(pointer_head(&cell), Some(0x0102_0304));
413 cell[0] = FLAG_POINTER | FLAG_COMPRESSED;
415 assert_eq!(pointer_head(&cell), Some(0x0102_0304));
416 assert_eq!(pointer_head(&[FLAG_POINTER, 1, 2]), None);
418 }
419
420 #[test]
421 fn empty_value_round_trips() {
422 let (pager, path) = fresh_pager();
423 let stored = encode(&pager, &[]).unwrap();
424 assert_eq!(stored, vec![0u8]);
425 assert!(decode(&pager, &stored).unwrap().is_empty());
426 cleanup(&path);
427 }
428}