Skip to main content

reddb_server/storage/engine/vector_btree/
value_codec.rs

1//! Large-value codec for the vector B-tree large-value path.
2//!
3//! Self-contained byte-slice codec: knows nothing about pages, MVCC,
4//! or overflow chains. `encode` runs LZ4 and only keeps the
5//! compressed bytes if they are strictly smaller than the input —
6//! otherwise it returns the input verbatim under a `Raw` flag, so
7//! callers never pay negative-savings storage.
8//!
9//! The flag is a single byte. The algorithm (LZ4) lives behind this
10//! interface so it can change later without touching callers.
11
12/// One-byte tag stored alongside the encoded payload. Distinguishes
13/// raw passthrough from compressed bytes so `decode` knows what to do
14/// without re-running compression.
15#[repr(u8)]
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum ValueFlag {
18    /// Payload bytes are the original input, byte-identical.
19    Raw = 0,
20    /// Payload bytes are an LZ4 block; the original length is
21    /// prepended as a little-endian `u32` so `decode` can size the
22    /// output buffer without a separate length sidecar.
23    Lz4 = 1,
24}
25
26impl ValueFlag {
27    /// Convert from the on-disk tag byte. Unknown tags are rejected.
28    pub fn from_byte(b: u8) -> Result<Self, ValueCodecError> {
29        match b {
30            0 => Ok(ValueFlag::Raw),
31            1 => Ok(ValueFlag::Lz4),
32            other => Err(ValueCodecError::UnknownFlag(other)),
33        }
34    }
35}
36
37/// Errors returned by [`decode`]. Encoding is infallible — the codec
38/// always has a Raw fallback when LZ4 would grow the input.
39#[derive(Debug, PartialEq, Eq)]
40pub enum ValueCodecError {
41    UnknownFlag(u8),
42    TruncatedHeader,
43    Lz4Decode(String),
44}
45
46impl std::fmt::Display for ValueCodecError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            ValueCodecError::UnknownFlag(b) => write!(f, "unknown value codec flag: {}", b),
50            ValueCodecError::TruncatedHeader => write!(
51                f,
52                "compressed payload truncated: need at least 4 bytes for length header"
53            ),
54            ValueCodecError::Lz4Decode(msg) => write!(f, "lz4 decode failed: {}", msg),
55        }
56    }
57}
58
59impl std::error::Error for ValueCodecError {}
60
61/// Encode `input` for storage. Returns the flag indicating which
62/// representation is stored, plus the bytes themselves. When LZ4
63/// would not shrink the input (including the 4-byte length header),
64/// the codec returns `(Raw, input.to_vec())` — never a longer payload.
65pub fn encode(input: &[u8]) -> (ValueFlag, Vec<u8>) {
66    // Empty input round-trips trivially through Raw. lz4_flex would
67    // happily produce a 1-byte token but that is larger than zero, so
68    // the size guard below would also catch this — special-casing
69    // keeps the hot path obvious.
70    if input.is_empty() {
71        return (ValueFlag::Raw, Vec::new());
72    }
73
74    let compressed = lz4_flex::compress(input);
75    // Encoded form is 4-byte original length + compressed bytes. Only
76    // worth keeping if the *total* is strictly smaller than the raw
77    // input (equal size is not a win — extra decode work for nothing).
78    if compressed.len() + 4 < input.len() {
79        let mut out = Vec::with_capacity(compressed.len() + 4);
80        out.extend_from_slice(&(input.len() as u32).to_le_bytes());
81        out.extend_from_slice(&compressed);
82        (ValueFlag::Lz4, out)
83    } else {
84        (ValueFlag::Raw, input.to_vec())
85    }
86}
87
88/// Predicate companion to [`encode`]: would the encoded form fit in
89/// `limit` bytes? Returns the on-disk size the codec *would* use,
90/// without committing to a spill decision. Callers compare the
91/// returned size to their page budget and choose to spill or inline
92/// without re-running compression — encode is idempotent so re-running
93/// after a yes answer is cheap, but this keeps the budget check
94/// decoupled from the commit.
95pub fn would_encode_to(input: &[u8]) -> usize {
96    if input.is_empty() {
97        return 0;
98    }
99    let compressed_len = lz4_flex::compress(input).len();
100    let lz4_total = compressed_len + 4;
101    if lz4_total < input.len() {
102        lz4_total
103    } else {
104        input.len()
105    }
106}
107
108/// Decode a `(flag, bytes)` pair produced by [`encode`] back to the
109/// original input bytes.
110pub fn decode(flag: ValueFlag, bytes: &[u8]) -> Result<Vec<u8>, ValueCodecError> {
111    match flag {
112        ValueFlag::Raw => Ok(bytes.to_vec()),
113        ValueFlag::Lz4 => {
114            if bytes.len() < 4 {
115                return Err(ValueCodecError::TruncatedHeader);
116            }
117            let raw_len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
118            lz4_flex::decompress(&bytes[4..], raw_len)
119                .map_err(|e| ValueCodecError::Lz4Decode(e.to_string()))
120        }
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn round_trip_compressible_text() {
130        let input = "the quick brown fox jumps over the lazy dog "
131            .repeat(64)
132            .into_bytes();
133        let (flag, bytes) = encode(&input);
134        assert_eq!(flag, ValueFlag::Lz4, "highly repetitive text must compress");
135        assert!(
136            bytes.len() < input.len(),
137            "stored size {} must be less than input {}",
138            bytes.len(),
139            input.len()
140        );
141        let decoded = decode(flag, &bytes).expect("decode");
142        assert_eq!(decoded, input);
143    }
144
145    #[test]
146    fn round_trip_incompressible_random() {
147        // Pseudo-random bytes from a tiny xorshift — deterministic so
148        // the test never flakes, and incompressible enough that LZ4
149        // cannot beat the raw input + 4-byte header.
150        let mut state: u64 = 0x9E37_79B9_7F4A_7C15;
151        let input: Vec<u8> = (0..512)
152            .map(|_| {
153                state ^= state << 13;
154                state ^= state >> 7;
155                state ^= state << 17;
156                state as u8
157            })
158            .collect();
159        let (flag, bytes) = encode(&input);
160        assert_eq!(
161            flag,
162            ValueFlag::Raw,
163            "incompressible input must fall back to raw"
164        );
165        assert_eq!(bytes, input, "raw bytes must be byte-identical");
166        let decoded = decode(flag, &bytes).expect("decode");
167        assert_eq!(decoded, input);
168    }
169
170    #[test]
171    fn empty_input_round_trips_as_raw() {
172        let (flag, bytes) = encode(&[]);
173        assert_eq!(flag, ValueFlag::Raw);
174        assert!(bytes.is_empty());
175        let decoded = decode(flag, &bytes).expect("decode empty");
176        assert!(decoded.is_empty());
177    }
178
179    #[test]
180    fn exact_threshold_falls_back_to_raw() {
181        // A single byte cannot shrink under LZ4 + a 4-byte length
182        // header, so the codec must emit Raw rather than waste 4
183        // bytes for zero savings.
184        let input = vec![0x42u8];
185        let (flag, bytes) = encode(&input);
186        assert_eq!(flag, ValueFlag::Raw);
187        assert_eq!(bytes, input);
188    }
189
190    #[test]
191    fn flag_distinguishes_compressed_and_raw() {
192        let compressible = vec![b'a'; 256];
193        let (flag_c, _) = encode(&compressible);
194        let (flag_r, _) = encode(&[0xAB, 0xCD, 0xEF]);
195        assert_eq!(flag_c, ValueFlag::Lz4);
196        assert_eq!(flag_r, ValueFlag::Raw);
197        assert_ne!(flag_c, flag_r);
198    }
199
200    #[test]
201    fn flag_byte_round_trips() {
202        assert_eq!(ValueFlag::from_byte(0).unwrap(), ValueFlag::Raw);
203        assert_eq!(ValueFlag::from_byte(1).unwrap(), ValueFlag::Lz4);
204        assert_eq!(
205            ValueFlag::from_byte(255).unwrap_err(),
206            ValueCodecError::UnknownFlag(255)
207        );
208    }
209
210    #[test]
211    fn would_encode_to_matches_actual_encode() {
212        let compressible = vec![b'x'; 1024];
213        let (_, bytes) = encode(&compressible);
214        assert_eq!(would_encode_to(&compressible), bytes.len());
215
216        let mut state: u64 = 0xDEAD_BEEF_1234_5678;
217        let random: Vec<u8> = (0..256)
218            .map(|_| {
219                state ^= state << 13;
220                state ^= state >> 7;
221                state ^= state << 17;
222                state as u8
223            })
224            .collect();
225        let (_, bytes) = encode(&random);
226        assert_eq!(would_encode_to(&random), bytes.len());
227
228        assert_eq!(would_encode_to(&[]), 0);
229    }
230
231    #[test]
232    fn would_encode_to_decouples_from_spill_decision() {
233        // Caller asks "can this fit in 64 bytes?" without committing.
234        let blob = vec![b'z'; 4096];
235        let projected = would_encode_to(&blob);
236        let fits_in_64 = projected <= 64;
237
238        // Asking the question must not change the answer of a later
239        // encode — encode/decode round-trips remain byte-identical
240        // regardless of how many predicate calls came before.
241        let (flag, bytes) = encode(&blob);
242        assert_eq!(bytes.len(), projected);
243        assert_eq!(decode(flag, &bytes).unwrap(), blob);
244        // 4 KiB of one byte definitely compresses below 64 bytes
245        // under LZ4 — sanity-check the predicate's answer.
246        assert!(fits_in_64);
247    }
248
249    #[test]
250    fn decode_rejects_unknown_flag_byte() {
251        assert!(matches!(
252            ValueFlag::from_byte(7),
253            Err(ValueCodecError::UnknownFlag(7))
254        ));
255    }
256
257    #[test]
258    fn decode_rejects_truncated_lz4_header() {
259        let err = decode(ValueFlag::Lz4, &[0x01, 0x02]).unwrap_err();
260        assert_eq!(err, ValueCodecError::TruncatedHeader);
261    }
262}