Skip to main content

nodedb_codec/fastlanes/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! FastLanes-inspired FOR + bit-packing codec for integer columns.
4//!
5//! Frame-of-Reference (FOR): subtract the minimum value from all values,
6//! reducing them to small unsigned residuals. Then bit-pack the residuals
7//! using the minimum number of bits.
8//!
9//! The bit-packing loop is written as simple scalar operations on contiguous
10//! arrays, which LLVM auto-vectorizes to AVX2/AVX-512/NEON/WASM-SIMD without
11//! explicit intrinsics. This is the FastLanes insight: structured scalar code
12//! that the compiler vectorizes, portable across all targets.
13//!
14//! Wire format:
15//! ```text
16//! [4 bytes] total value count (LE u32)
17//! [2 bytes] block count (LE u16)
18//! For each block:
19//!   [2 bytes] values in this block (LE u16, max 1024)
20//!   [1 byte]  bit width (0-64)
21//!   [8 bytes] min value / reference (LE i64)
22//!   [N bytes] bit-packed residuals
23//! ```
24//!
25//! Block size: 1024 values. Last block may be smaller.
26
27mod bits;
28mod block;
29
30pub use block::bit_width_for_range;
31
32use crate::error::CodecError;
33use block::{decode_block, encode_block, skip_block};
34
35/// Block size for FastLanes processing. 1024 values aligns with SIMD
36/// register widths across all targets (16 × 64-bit lanes on AVX-512,
37/// 8 × 128-bit WASM v128 operations to cover 1024 elements).
38const BLOCK_SIZE: usize = 1024;
39
40/// Header: 4 bytes count + 2 bytes block_count.
41const GLOBAL_HEADER_SIZE: usize = 6;
42
43// ---------------------------------------------------------------------------
44// Public encode / decode API
45// ---------------------------------------------------------------------------
46
47/// Encode a slice of i64 values using FOR + bit-packing.
48pub fn encode(values: &[i64]) -> Vec<u8> {
49    let total_count = values.len() as u32;
50    let block_count = if values.is_empty() {
51        0u16
52    } else {
53        values.len().div_ceil(BLOCK_SIZE) as u16
54    };
55
56    let mut out = Vec::with_capacity(GLOBAL_HEADER_SIZE + values.len() * 5);
57
58    // Global header.
59    out.extend_from_slice(&total_count.to_le_bytes());
60    out.extend_from_slice(&block_count.to_le_bytes());
61
62    // Encode each block.
63    for chunk in values.chunks(BLOCK_SIZE) {
64        encode_block(chunk, &mut out);
65    }
66
67    out
68}
69
70/// Decode FOR + bit-packed bytes back to i64 values.
71pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
72    if data.len() < GLOBAL_HEADER_SIZE {
73        return Err(CodecError::Truncated {
74            expected: GLOBAL_HEADER_SIZE,
75            actual: data.len(),
76        });
77    }
78
79    let total_count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
80    let block_count = u16::from_le_bytes([data[4], data[5]]) as usize;
81
82    if total_count == 0 {
83        return Ok(Vec::new());
84    }
85
86    let mut values = Vec::with_capacity(total_count);
87    let mut offset = GLOBAL_HEADER_SIZE;
88
89    for block_idx in 0..block_count {
90        offset = decode_block(data, offset, &mut values, block_idx)?;
91    }
92
93    if values.len() != total_count {
94        return Err(CodecError::Corrupt {
95            detail: format!(
96                "value count mismatch: header says {total_count}, decoded {}",
97                values.len()
98            ),
99        });
100    }
101
102    Ok(values)
103}
104
105/// Compute byte offsets for each block in an encoded stream.
106///
107/// Returns a Vec of byte offsets — `offsets[i]` is the start position of
108/// block `i` within `data`. O(num_blocks) header scan, no decompression.
109pub fn block_byte_offsets(data: &[u8]) -> Result<Vec<usize>, CodecError> {
110    if data.len() < GLOBAL_HEADER_SIZE {
111        return Err(CodecError::Truncated {
112            expected: GLOBAL_HEADER_SIZE,
113            actual: data.len(),
114        });
115    }
116    let num_blocks = u16::from_le_bytes([data[4], data[5]]) as usize;
117    let mut offsets = Vec::with_capacity(num_blocks);
118    let mut pos = GLOBAL_HEADER_SIZE;
119    for i in 0..num_blocks {
120        offsets.push(pos);
121        pos = skip_block(data, pos, i)?;
122    }
123    Ok(offsets)
124}
125
126/// Decode a range of blocks [start_block..end_block) from encoded data.
127///
128/// More efficient than calling `decode_single_block` repeatedly — scans
129/// headers once to find start_block, then decodes contiguously.
130pub fn decode_block_range(
131    data: &[u8],
132    start_block: usize,
133    end_block: usize,
134) -> Result<Vec<i64>, CodecError> {
135    if data.len() < GLOBAL_HEADER_SIZE {
136        return Err(CodecError::Truncated {
137            expected: GLOBAL_HEADER_SIZE,
138            actual: data.len(),
139        });
140    }
141    let num_blocks = u16::from_le_bytes([data[4], data[5]]) as usize;
142    if start_block >= num_blocks || end_block > num_blocks || start_block >= end_block {
143        return Ok(Vec::new());
144    }
145
146    // Skip to start_block.
147    let mut offset = GLOBAL_HEADER_SIZE;
148    for i in 0..start_block {
149        offset = skip_block(data, offset, i)?;
150    }
151
152    // Decode [start_block..end_block).
153    let mut values = Vec::new();
154    for i in start_block..end_block {
155        offset = decode_block(data, offset, &mut values, i)?;
156    }
157    Ok(values)
158}
159
160/// Number of blocks in an encoded FastLanes stream.
161pub fn block_count(data: &[u8]) -> Result<usize, CodecError> {
162    if data.len() < GLOBAL_HEADER_SIZE {
163        return Err(CodecError::Truncated {
164            expected: GLOBAL_HEADER_SIZE,
165            actual: data.len(),
166        });
167    }
168    Ok(u16::from_le_bytes([data[4], data[5]]) as usize)
169}
170
171/// Decode a single block by index without decoding the entire stream.
172///
173/// Iterates block headers to reach `block_idx`, then decodes only that
174/// block. For sequential block-at-a-time processing, prefer
175/// [`BlockIterator`] which tracks byte offsets without re-scanning.
176pub fn decode_single_block(data: &[u8], block_idx: usize) -> Result<Vec<i64>, CodecError> {
177    if data.len() < GLOBAL_HEADER_SIZE {
178        return Err(CodecError::Truncated {
179            expected: GLOBAL_HEADER_SIZE,
180            actual: data.len(),
181        });
182    }
183    let num_blocks = u16::from_le_bytes([data[4], data[5]]) as usize;
184    if block_idx >= num_blocks {
185        return Err(CodecError::Corrupt {
186            detail: format!("block_idx {block_idx} >= block_count {num_blocks}"),
187        });
188    }
189
190    // Skip to the target block by iterating headers.
191    let mut offset = GLOBAL_HEADER_SIZE;
192    for i in 0..block_idx {
193        offset = skip_block(data, offset, i)?;
194    }
195
196    let mut values = Vec::new();
197    decode_block(data, offset, &mut values, block_idx)?;
198    Ok(values)
199}
200
201/// Iterator that decodes one 1024-row block at a time, tracking byte
202/// offsets internally. Avoids re-scanning headers for sequential access.
203pub struct BlockIterator<'a> {
204    data: &'a [u8],
205    offset: usize,
206    blocks_remaining: usize,
207    current_block: usize,
208}
209
210impl<'a> BlockIterator<'a> {
211    /// Create a block iterator over encoded FastLanes data.
212    pub fn new(data: &'a [u8]) -> Result<Self, CodecError> {
213        if data.len() < GLOBAL_HEADER_SIZE {
214            return Err(CodecError::Truncated {
215                expected: GLOBAL_HEADER_SIZE,
216                actual: data.len(),
217            });
218        }
219        let num_blocks = u16::from_le_bytes([data[4], data[5]]) as usize;
220        Ok(Self {
221            data,
222            offset: GLOBAL_HEADER_SIZE,
223            blocks_remaining: num_blocks,
224            current_block: 0,
225        })
226    }
227
228    /// Skip the next block without decoding it.
229    pub fn skip_block(&mut self) -> Result<(), CodecError> {
230        if self.blocks_remaining == 0 {
231            return Ok(());
232        }
233        self.offset = skip_block(self.data, self.offset, self.current_block)?;
234        self.current_block += 1;
235        self.blocks_remaining -= 1;
236        Ok(())
237    }
238}
239
240impl Iterator for BlockIterator<'_> {
241    type Item = Result<Vec<i64>, CodecError>;
242
243    fn next(&mut self) -> Option<Self::Item> {
244        if self.blocks_remaining == 0 {
245            return None;
246        }
247        let mut values = Vec::new();
248        match decode_block(self.data, self.offset, &mut values, self.current_block) {
249            Ok(new_offset) => {
250                self.offset = new_offset;
251                self.current_block += 1;
252                self.blocks_remaining -= 1;
253                Some(Ok(values))
254            }
255            Err(e) => Some(Err(e)),
256        }
257    }
258
259    fn size_hint(&self) -> (usize, Option<usize>) {
260        (self.blocks_remaining, Some(self.blocks_remaining))
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn empty_roundtrip() {
270        let encoded = encode(&[]);
271        let decoded = decode(&encoded).unwrap();
272        assert!(decoded.is_empty());
273    }
274
275    #[test]
276    fn single_value() {
277        let encoded = encode(&[42i64]);
278        let decoded = decode(&encoded).unwrap();
279        assert_eq!(decoded, vec![42i64]);
280    }
281
282    #[test]
283    fn identical_values_zero_bits() {
284        let values = vec![999i64; 1024];
285        let encoded = encode(&values);
286        let decoded = decode(&encoded).unwrap();
287        assert_eq!(decoded, values);
288
289        // All identical → bit_width=0 → only headers, no packed data.
290        // Global header(6) + block header(11) = 17 bytes for 1024 values.
291        assert_eq!(encoded.len(), 17);
292    }
293
294    #[test]
295    fn small_range_values() {
296        // Values in range [100, 107] → 3 bits per value.
297        let values: Vec<i64> = (0..1024).map(|i| 100 + (i % 8)).collect();
298        let encoded = encode(&values);
299        let decoded = decode(&encoded).unwrap();
300        assert_eq!(decoded, values);
301
302        // 1024 values × 3 bits = 384 bytes packed + headers.
303        let expected_packed = (1024usize * 3).div_ceil(8); // 384 bytes
304        let expected_total = GLOBAL_HEADER_SIZE + block::BLOCK_HEADER_SIZE + expected_packed;
305        assert_eq!(encoded.len(), expected_total);
306    }
307
308    #[test]
309    fn constant_rate_timestamps() {
310        let values: Vec<i64> = (0..10_000)
311            .map(|i| 1_700_000_000_000 + i * 10_000)
312            .collect();
313        let encoded = encode(&values);
314        let decoded = decode(&encoded).unwrap();
315        assert_eq!(decoded, values);
316
317        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
318        assert!(
319            bytes_per_sample < 4.0,
320            "timestamps should pack to <4 bytes/sample, got {bytes_per_sample:.2}"
321        );
322    }
323
324    #[test]
325    fn pre_delta_timestamps() {
326        let deltas: Vec<i64> = vec![10_000i64; 10_000];
327        let encoded = encode(&deltas);
328        let decoded = decode(&encoded).unwrap();
329        assert_eq!(decoded, deltas);
330
331        let bytes_per_sample = encoded.len() as f64 / deltas.len() as f64;
332        assert!(
333            bytes_per_sample < 0.2,
334            "constant deltas should pack to near-zero, got {bytes_per_sample:.2}"
335        );
336    }
337
338    #[test]
339    fn pre_delta_timestamps_with_jitter() {
340        let mut deltas = Vec::with_capacity(10_000);
341        let mut rng: u64 = 42;
342        for _ in 0..10_000 {
343            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
344            let jitter = ((rng >> 33) as i64 % 101) - 50;
345            deltas.push(10_000 + jitter);
346        }
347        let encoded = encode(&deltas);
348        let decoded = decode(&encoded).unwrap();
349        assert_eq!(decoded, deltas);
350
351        let bytes_per_sample = encoded.len() as f64 / deltas.len() as f64;
352        assert!(
353            bytes_per_sample < 1.5,
354            "jittered deltas should pack to <1.5 bytes/sample, got {bytes_per_sample:.2}"
355        );
356    }
357
358    #[test]
359    fn negative_values() {
360        let values: Vec<i64> = (-500..500).collect();
361        let encoded = encode(&values);
362        let decoded = decode(&encoded).unwrap();
363        assert_eq!(decoded, values);
364    }
365
366    #[test]
367    fn boundary_values() {
368        let values = vec![i64::MIN, 0, i64::MAX];
369        let encoded = encode(&values);
370        let decoded = decode(&encoded).unwrap();
371        assert_eq!(decoded, values);
372    }
373
374    #[test]
375    fn multiple_blocks() {
376        let values: Vec<i64> = (0..3000).map(|i| i * 7 + 100).collect();
377        let encoded = encode(&values);
378        let decoded = decode(&encoded).unwrap();
379        assert_eq!(decoded, values);
380    }
381
382    #[test]
383    fn partial_last_block() {
384        let values: Vec<i64> = (0..1025).collect();
385        let encoded = encode(&values);
386        let decoded = decode(&encoded).unwrap();
387        assert_eq!(decoded, values);
388    }
389
390    #[test]
391    fn compression_vs_raw() {
392        let values: Vec<i64> = (0..10_000)
393            .map(|i| 1_700_000_000_000 + i * 10_000)
394            .collect();
395        let encoded = encode(&values);
396        let raw_size = values.len() * 8;
397        let ratio = raw_size as f64 / encoded.len() as f64;
398        assert!(ratio > 2.0, "expected >2x compression, got {ratio:.1}x");
399    }
400
401    #[test]
402    fn bit_width_calculation() {
403        assert_eq!(bit_width_for_range(0, 0), 0);
404        assert_eq!(bit_width_for_range(100, 100), 0);
405        assert_eq!(bit_width_for_range(0, 1), 1);
406        assert_eq!(bit_width_for_range(0, 7), 3);
407        assert_eq!(bit_width_for_range(0, 8), 4);
408        assert_eq!(bit_width_for_range(0, 255), 8);
409        assert_eq!(bit_width_for_range(0, 256), 9);
410        assert_eq!(bit_width_for_range(i64::MIN, i64::MAX), 64);
411    }
412
413    #[test]
414    fn pack_unpack_roundtrip() {
415        for bw in 1..=64u8 {
416            let max_val: u64 = if bw == 64 { u64::MAX } else { (1u64 << bw) - 1 };
417            let test_vals = [0u64, 1, max_val / 2, max_val];
418            for &val in &test_vals {
419                let mut packed = vec![0u8; 16];
420                bits::pack_bits(&mut packed, 0, val, bw);
421                let unpacked = bits::unpack_bits(&packed, 0, bw);
422                let mask = if bw == 64 { u64::MAX } else { (1u64 << bw) - 1 };
423                assert_eq!(
424                    unpacked & mask,
425                    val & mask,
426                    "pack/unpack failed for bw={bw}, val={val}"
427                );
428            }
429        }
430    }
431
432    #[test]
433    fn pack_unpack_at_offsets() {
434        let mut packed = vec![0u8; 32];
435        bits::pack_bits(&mut packed, 0, 0b101, 3);
436        bits::pack_bits(&mut packed, 3, 0b110, 3);
437        bits::pack_bits(&mut packed, 6, 0b011, 3);
438
439        assert_eq!(bits::unpack_bits(&packed, 0, 3), 0b101);
440        assert_eq!(bits::unpack_bits(&packed, 3, 3), 0b110);
441        assert_eq!(bits::unpack_bits(&packed, 6, 3), 0b011);
442    }
443
444    #[test]
445    fn truncated_input_errors() {
446        assert!(decode(&[]).is_err());
447        assert!(decode(&[1, 0, 0, 0, 1, 0]).is_err()); // count=1, blocks=1, no block data
448    }
449
450    #[test]
451    fn large_dataset_roundtrip() {
452        let mut values = Vec::with_capacity(100_000);
453        let mut rng: u64 = 12345;
454        for _ in 0..100_000 {
455            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
456            values.push((rng >> 1) as i64);
457        }
458        let encoded = encode(&values);
459        let decoded = decode(&encoded).unwrap();
460        assert_eq!(decoded, values);
461    }
462
463    #[test]
464    fn decode_single_block_correctness() {
465        let values: Vec<i64> = (0..3000).collect();
466        let encoded = encode(&values);
467        assert_eq!(block_count(&encoded).unwrap(), 3);
468
469        let b0 = decode_single_block(&encoded, 0).unwrap();
470        assert_eq!(b0.len(), 1024);
471        assert_eq!(b0, &values[..1024]);
472
473        let b1 = decode_single_block(&encoded, 1).unwrap();
474        assert_eq!(b1.len(), 1024);
475        assert_eq!(b1, &values[1024..2048]);
476
477        let b2 = decode_single_block(&encoded, 2).unwrap();
478        assert_eq!(b2.len(), 952);
479        assert_eq!(b2, &values[2048..]);
480    }
481
482    #[test]
483    fn block_iterator_matches_full_decode() {
484        let values: Vec<i64> = (0..5000).map(|i| i * 7 - 2000).collect();
485        let encoded = encode(&values);
486
487        let mut all = Vec::new();
488        let iter = BlockIterator::new(&encoded).unwrap();
489        for blk in iter {
490            all.extend(blk.unwrap());
491        }
492        assert_eq!(all, values);
493    }
494
495    #[test]
496    fn block_iterator_skip() {
497        let values: Vec<i64> = (0..3000).collect();
498        let encoded = encode(&values);
499
500        let mut iter = BlockIterator::new(&encoded).unwrap();
501        iter.skip_block().unwrap(); // skip block 0
502        let b1 = iter.next().unwrap().unwrap();
503        assert_eq!(b1, &values[1024..2048]);
504    }
505}