svbyte/
lib.rs

1/*!
2This library provides encoding/decoding primitives for Stream VByte encoding.
3
4Stream VByte encoding is a SIMD accelerated algorithm of varint decompression. It is used in a search and database
5systems as a way of efficiently store and stream large number of variable length integers from a disk or main memory.
6
7The idea behind varint is to skip leading zero bytes of the number, so large amount of relatively small numbers can be
8stored eficiently. Varint encoding is frequently used with delta-encoding if numbers are stored in the
9ascending order. This way all the numbers are smaller by magnitude, hence better compression.
10
11Stream VByte is a storage format and an algorithm which allows to vectorize compressing and decompressing of numbers
12on a modern CPUs.
13
14Main types of this crate are [`DecodeCursor`] and [`EncodeCursor`].
15
16## Encoding
17
18```rust,no_run
19# use std::io::BufWriter;
20# use std::fs::File;
21# use svbyte::EncodeCursor;
22# use std::io::{self, Write};
23# fn main() -> io::Result<()> {
24let output = BufWriter::new(File::create("./encoded.bin")?);
25let mut encoder = EncodeCursor::new(output);
26encoder.encode(&[1, 2, 3, 4]);
27
28encoder.finish()?.flush()?;
29# Ok(())
30# }
31```
32
33## Decoding
34
35```rust,no_run
36# use std::fs::File;
37# use std::io::{self, BufReader};
38# use svbyte::{BufReadSegments, DecodeCursor, Decoder};
39# fn main() -> io::Result<()> {
40let segments = BufReadSegments::new(BufReader::new(File::open("./encoded.bin")?));
41let mut decoder = DecodeCursor::new(segments)?;
42
43let mut buffer = [0u32; 128];
44let mut sum = 0u64;
45loop {
46    let decoded = decoder.decode(&mut buffer)?;
47    if decoded == 0 {
48        break;
49    }
50    sum += buffer[..decoded].iter().sum::<u32>() as u64;
51}
52# Ok(())
53# }
54```
55
56## Links
57
58- [Stream VByte: Faster Byte-Oriented Integer Compression][pub] by Daniel Lemire, Nathan Kurz, and Christoph Rupp
59- [Stream VByte: breaking new speed records for integer compression][blog-post] by Daniel Lemire
60
61[pub]: https://arxiv.org/abs/1709.08990
62[blog-post]: https://lemire.me/blog/2017/09/27/stream-vbyte-breaking-new-speed-records-for-integer-compression/
63*/
64use std::{
65    arch::x86_64::{_mm_loadu_si128, _mm_shuffle_epi8, _mm_storeu_si128},
66    debug_assert,
67    io::{self, BufRead, Write},
68    mem,
69};
70
71#[allow(non_camel_case_types)]
72type u32x4 = [u32; 4];
73
74/// Shuffle masks and correspinding length of encoded numbers
75///
76/// For more information see documentation to [`u32_shuffle_masks`]
77///
78/// [`u32_shuffle_masks`]: u32_shuffle_masks
79const MASKS: [(u32x4, u8); 256] = u32_shuffle_masks();
80
81/// Marker bytes of a [`SegmentHeader`]
82const SEGMENT_MAGIC: u16 = 0x0B0D;
83
84/// Lenth of [`SegmentHeader`] in bytes
85const SEGMENT_HEADER_LENGTH: usize = 14;
86
87/// Provides facility for reading segments
88///
89/// Each segment contains elements (integers) in encoded format. Each [`Segments::next`] method call
90/// moves this objects to the next segment.
91///
92/// ## Motivation
93/// This trait exists to abstract [`DecodeCursor`] from logic of reading segments. If all the segments are
94/// in memory the most efficient way of decoding is decoding `[u8]` slices in memory. This maximize the
95/// decoding speed because no memory copy is needed. In case segments data are on the file system,
96/// some logic for reading next segment in a memory buffer is required. In this case it's more
97/// appropriate to read segments one by one in a memory buffer of a predefined size. [`Segments`] trait
98/// and its 2 base implementations: [`MemorySegments`] and [`BufReadSegments`] are providing those facilities.
99pub trait Segments {
100    /// Moves to the next segment and return number of the elements encoded in the segment
101    fn next(&mut self) -> io::Result<usize>;
102
103    /// Returns the current segment's data stream
104    fn data_stream(&self) -> &[u8];
105
106    /// Returns the current segment's control stream
107    fn control_stream(&self) -> &[u8];
108}
109
110/// Reads a segment from an underlying [`BufRead`]
111pub struct BufReadSegments<R> {
112    source: R,
113    control_stream: Vec<u8>,
114    data_stream: Vec<u8>,
115}
116
117impl<R> BufReadSegments<R> {
118    pub fn new(source: R) -> Self {
119        Self {
120            source,
121            control_stream: vec![],
122            data_stream: vec![],
123        }
124    }
125}
126
127impl<R: BufRead> Segments for BufReadSegments<R> {
128    fn next(&mut self) -> io::Result<usize> {
129        let result = read_segment(
130            &mut self.source,
131            &mut self.control_stream,
132            &mut self.data_stream,
133        );
134        match result {
135            Ok(elements) => Ok(elements),
136            Err(e) => {
137                if e.kind() == io::ErrorKind::UnexpectedEof {
138                    Ok(0)
139                } else {
140                    Err(e)
141                }
142            }
143        }
144    }
145
146    fn data_stream(&self) -> &[u8] {
147        self.control_stream.as_ref()
148    }
149
150    fn control_stream(&self) -> &[u8] {
151        self.data_stream.as_ref()
152    }
153}
154
155/// [`Segments`] implementation with all segment data in memory
156pub struct MemorySegments<'a> {
157    data: &'a [u8],
158    control_stream: &'a [u8],
159    data_stream: &'a [u8],
160}
161
162impl<'a> MemorySegments<'a> {
163    pub fn new(data: &'a [u8]) -> Self {
164        Self {
165            data,
166            control_stream: &data[0..0],
167            data_stream: &data[0..0],
168        }
169    }
170}
171
172impl<'a> Segments for MemorySegments<'a> {
173    fn next(&mut self) -> io::Result<usize> {
174        if self.data.is_empty() {
175            return Ok(0);
176        }
177
178        let segment = SegmentHeader::parse(self.data);
179        self.control_stream =
180            &self.data[SEGMENT_HEADER_LENGTH..SEGMENT_HEADER_LENGTH + segment.cs_length];
181        self.data_stream = &self.data[SEGMENT_HEADER_LENGTH + segment.cs_length
182            ..SEGMENT_HEADER_LENGTH + segment.cs_length + segment.ds_length];
183        self.data = &self.data[SEGMENT_HEADER_LENGTH + segment.cs_length + segment.ds_length..];
184
185        Ok(segment.count)
186    }
187
188    fn data_stream(&self) -> &[u8] {
189        self.data_stream
190    }
191    fn control_stream(&self) -> &[u8] {
192        self.control_stream
193    }
194}
195
196/// Decodes integers
197///
198/// Cursor allows to decode stream of elements using one of the [`Segments`] implementations as a source
199/// of decoding data.
200pub struct DecodeCursor<S: Segments> {
201    elements_left: usize,
202    control_stream_offset: usize,
203    data_stream_offset: usize,
204    segments: S,
205}
206
207impl<S: Segments> DecodeCursor<S> {
208    pub fn new(segments: S) -> io::Result<Self> {
209        Ok(Self {
210            elements_left: 0,
211            control_stream_offset: 0,
212            data_stream_offset: 0,
213            segments,
214        })
215    }
216
217    #[inline(never)]
218    fn refill(&mut self) -> io::Result<usize> {
219        debug_assert!(
220            self.elements_left == 0,
221            "Should be 0, got: {}",
222            self.elements_left
223        );
224
225        let elements = self.segments.next()?;
226        if elements > 0 {
227            let cs = self.segments.control_stream();
228            let ds = self.segments.data_stream();
229            assert!(
230                cs.len() * 4 >= elements,
231                "Invalid control stream length. Expected: {}, got: {}",
232                (elements + 3) / 4,
233                cs.len()
234            );
235            assert!(
236                ds.len() >= elements,
237                "Invalid data stream length. Expected: >={}, got: {}",
238                elements,
239                ds.len()
240            );
241            self.data_stream_offset = 0;
242            self.control_stream_offset = 0;
243            self.elements_left = elements;
244        }
245        Ok(elements)
246    }
247}
248
249/// Segment Header
250///
251/// Each segment starts with a header described in the [`EncodeCursor`] documentation.
252#[derive(Debug, PartialEq)]
253struct SegmentHeader {
254    count: usize,
255    cs_length: usize,
256    ds_length: usize,
257}
258
259impl SegmentHeader {
260    fn new(count: usize, cs_size: usize, ds_size: usize) -> Self {
261        Self {
262            count,
263            cs_length: cs_size,
264            ds_length: ds_size,
265        }
266    }
267
268    fn parse(input: &[u8]) -> Self {
269        assert!(
270            input.len() >= SEGMENT_HEADER_LENGTH,
271            "Expected slice of len >={}, got: {}",
272            SEGMENT_HEADER_LENGTH,
273            input.len()
274        );
275        let input = &input[..SEGMENT_HEADER_LENGTH];
276
277        let magic = u16::from_be_bytes(input[0..2].try_into().unwrap());
278        let count = u32::from_be_bytes(input[2..6].try_into().unwrap()) as usize;
279        let cs_length = u32::from_be_bytes(input[6..10].try_into().unwrap()) as usize;
280        let ds_length = u32::from_be_bytes(input[10..14].try_into().unwrap()) as usize;
281
282        assert!(
283            magic == SEGMENT_MAGIC,
284            "Expected magic: {}, got: {}",
285            SEGMENT_MAGIC,
286            magic,
287        );
288
289        Self {
290            count,
291            cs_length,
292            ds_length,
293        }
294    }
295
296    fn write(&self, out: &mut dyn Write) -> io::Result<()> {
297        out.write_all(&SEGMENT_MAGIC.to_be_bytes())?;
298
299        debug_assert!(self.count <= u32::MAX as usize);
300        let number_of_elements = (self.count as u32).to_be_bytes();
301        out.write_all(&number_of_elements)?;
302
303        debug_assert!(self.cs_length <= u32::MAX as usize);
304        let cs_len = (self.cs_length as u32).to_be_bytes();
305        out.write_all(&cs_len)?;
306
307        debug_assert!(self.ds_length <= u32::MAX as usize);
308        let ds_len = (self.ds_length as u32).to_be_bytes();
309        out.write_all(&ds_len)?;
310
311        Ok(())
312    }
313}
314
315/// Reads the segment, checks segment header and copies streams into corresponding buffers
316///
317/// Returns the number of elements encoded in the segment
318fn read_segment(input: &mut impl BufRead, cs: &mut Vec<u8>, ds: &mut Vec<u8>) -> io::Result<usize> {
319    let mut buf = [0u8; SEGMENT_HEADER_LENGTH];
320    input.read_exact(&mut buf)?;
321    let header = SegmentHeader::parse(&buf);
322
323    cs.resize(header.cs_length, 0);
324    input.read_exact(&mut cs[..header.cs_length])?;
325
326    ds.resize(header.ds_length, 0);
327    input.read_exact(&mut ds[..header.ds_length])?;
328
329    Ok(header.count)
330}
331
332impl<S: Segments> Decoder<u32> for DecodeCursor<S> {
333    fn decode(&mut self, buffer: &mut [u32]) -> io::Result<usize> {
334        // Number of elements decoded per iteration
335        const DECODE_WIDTH: usize = 4;
336        assert!(
337            buffer.len() >= DECODE_WIDTH,
338            "Buffer should be at least {} elements long",
339            DECODE_WIDTH
340        );
341        if self.elements_left == 0 && self.refill()? == 0 {
342            return Ok(0);
343        }
344
345        let mut data_stream_offset = self.data_stream_offset;
346        let control_stream = &self.segments.control_stream()[self.control_stream_offset..];
347        let data_stream = &self.segments.data_stream()[data_stream_offset..];
348        let mut data_stream = data_stream.as_ptr();
349
350        /*
351        Safety considerations!
352
353        This code relies heavily on pointers. To make all pointer arithmetic safe several rules must be obeyed.
354
355        1. number of iterations should be limited by both output buffer length as well as the number of elements left
356           in the data and control streams
357        2. each iteration control stream and output buffer pointers are moved by 1. Therefore, all pointers should be
358           of the type which is consumed/produced in each iteration.
359        3. the only exception is the data stream whose type is `*const u8` because the data stream moved different
360           amounts of bytes each iteration.
361        */
362        let mut iterations = buffer.len() / 4;
363        iterations = iterations.min(control_stream.len());
364
365        self.control_stream_offset += iterations;
366        let decoded = iterations * DECODE_WIDTH;
367
368        let mut buffer: *mut u32x4 = buffer.as_mut_ptr().cast();
369        let mut control_words = control_stream.as_ptr();
370
371        // Decode loop unrolling
372        const UNROLL_FACTOR: usize = 8;
373        while iterations >= UNROLL_FACTOR {
374            for _ in 0..UNROLL_FACTOR {
375                let encoded_len = unsafe {
376                    debug_assert!(
377                        self.segments.data_stream()[data_stream_offset..].len() >= 16,
378                        "At least 16 bytes should be available in data stream"
379                    );
380                    let data_stream = mem::transmute(data_stream);
381                    let output = mem::transmute(buffer);
382                    simd_decode(data_stream, *control_words, output)
383                };
384
385                control_words = control_words.wrapping_add(1);
386                buffer = buffer.wrapping_add(1);
387
388                data_stream = data_stream.wrapping_add(encoded_len as usize);
389                data_stream_offset += encoded_len as usize;
390            }
391
392            iterations -= UNROLL_FACTOR;
393        }
394
395        // Tail decode
396        while iterations > 0 {
397            let encoded_len = unsafe {
398                debug_assert!(
399                    self.segments.data_stream()[data_stream_offset..].len() >= 16,
400                    "At least 16 bytes should be available in data stream"
401                );
402                let data_stream = mem::transmute(data_stream);
403                let output = mem::transmute(buffer);
404                simd_decode(data_stream, *control_words, output)
405            };
406
407            control_words = control_words.wrapping_add(1);
408            buffer = buffer.wrapping_add(1);
409
410            data_stream = data_stream.wrapping_add(encoded_len as usize);
411            data_stream_offset += encoded_len as usize;
412
413            iterations -= 1;
414        }
415
416        self.data_stream_offset = data_stream_offset;
417        let decoded = decoded.min(self.elements_left);
418        self.elements_left -= decoded;
419        Ok(decoded)
420    }
421}
422
423/// Decoding SIMD kernel using SSE intrinsics
424///
425/// Types of this function tries to implement safety guardrails as much as possible. Namely:
426/// `output` - is a reference to the buffer of 4 u32 values;
427/// `input` - is a reference to u8 array of unspecified length (`control_word` speciefies how much will be decoded);
428//
429/// Technically the encoded length can be calculated from control word directly using horizontal 2-bit sum
430/// ```rust,ignore
431/// let result = *control_word;
432/// let result = ((result & 0b11001100) >> 2) + (result & 0b00110011);
433/// let result = (result >> 4) + (result & 0b1111) + 4;
434/// ```
435/// Unfortunatley, this approach is slower then memoized length. There is a mention of this approach can be faster
436/// when using `u32` control words, which implies decoding a batch of size 16[^1].
437///
438/// [^1]: [Bit hacking versus memoization: a Stream VByte example](https://lemire.me/blog/2017/11/28/bit-hacking-versus-memoization-a-stream-vbyte-example/)
439#[inline]
440fn simd_decode(input: &[u8; 16], control_word: u8, output: &mut u32x4) -> u8 {
441    let (ref mask, encoded_len) = MASKS[control_word as usize];
442    unsafe {
443        let mask = _mm_loadu_si128(mask.as_ptr().cast());
444        let input = _mm_loadu_si128(input.as_ptr().cast());
445        let answer = _mm_shuffle_epi8(input, mask);
446        _mm_storeu_si128(output.as_mut_ptr().cast(), answer);
447    }
448
449    encoded_len
450}
451
452/**
453Prepares shuffle mask for decoding a single `u32` using `pshufb` instruction
454
455`len` parameter is describing the length of decoded `u32` in the input register (1-4). `offset` parameter is
456describing the base offset in the register. It is the sum of all previous number lengths loaded in the input register.
457*/
458const fn u32_shuffle_mask(len: usize, offset: usize) -> u32 {
459    const PZ: u8 = 0b10000000;
460    assert!(offset < 16, "Offset should be <16");
461    let offset = offset as u8;
462    let p1 = offset;
463    let p2 = offset + 1;
464    let p3 = offset + 2;
465    let p4 = offset + 3;
466    match len {
467        1 => u32::from_be_bytes([PZ, PZ, PZ, p1]),
468        2 => u32::from_be_bytes([PZ, PZ, p1, p2]),
469        3 => u32::from_be_bytes([PZ, p1, p2, p3]),
470        4 => u32::from_be_bytes([p1, p2, p3, p4]),
471        _ => panic!("Length of u32 is 1..=4 bytes"),
472    }
473}
474
475/**
476Preparing shuffling masks for `pshufb` SSE instructions
477
478`pshufb` (`_mm_shuffle_epi8()`) allows to shuffle bytes around in a `__mm128` register. Shuffle mask consist of 16
479bytes. Each byte describe byte index in input register which should be copied to corresponding byte in the output
480register. For addressing 16 bytes we need log(16) = 4 bits. So bits 0:3 of each byte are storing input register byte
481index. MSB of each byte indicating if corresponding byte in output register should be zeroed out. 4 least significant
482bits are non effective if MSB is set.
483
484`pshufb` SSE instruction visualization.
485
486```graph
487  Byte offsets:           0        1        2        3        4
488                  ┌────────┬────────┬────────┬────────┬────────┬───┐
489Input Register:   │   0x03 │   0x15 │   0x22 │   0x19 │   0x08 │...│
490                  └────▲───┴────────┴────▲───┴────▲───┴────▲───┴───┘
491                       │        ┌────────┘        │        │
492                       │        │        ┌─────────────────┘
493                       │        │        │        │
494                       └───────────────────────────────────┐
495                                │        │        │        │
496                  ┌────────┬────┴───┬────┴───┬────┴───┬────┴───┬───┐
497  Mask Register:  │   0x80 │   0x02 │   0x04 │   0x03 │   0x00 │...│
498                  ├────────┼────────┼────────┼────────┼────────┼───┤
499Output Register:  │   0x00 │   0x22 │   0x08 │   0x19 │   0x03 │...│
500                  └────────┴────────┴────────┴────────┴────────┴───┘
501```
502
503See [`_mm_shuffle_epi8()`][_mm_shuffle_epi8] documentation.
504
505[_mm_shuffle_epi8]: https://www.intel.com/content/www/us/en/docs/intrinsics-guide/index.html#text=shuffle_epi8&ig_expand=6097
506*/
507const fn u32_shuffle_masks() -> [(u32x4, u8); 256] {
508    let mut masks = [([0u32; 4], 0u8); 256];
509
510    let mut a = 1;
511    while a <= 4 {
512        let mut b = 1;
513        while b <= 4 {
514            let mut c = 1;
515            while c <= 4 {
516                let mut d = 1;
517                while d <= 4 {
518                    // Loading in reverse order because Intel is Little Endian Machine
519                    let mask = [
520                        u32_shuffle_mask(a, 0),
521                        u32_shuffle_mask(b, a),
522                        u32_shuffle_mask(c, a + b),
523                        u32_shuffle_mask(d, a + b + c),
524                    ];
525
526                    // counting in the index must be 0 based (eg. length of 1 is `00`, not `01`), hence `a - 1`
527                    let idx = (a - 1) << 6 | (b - 1) << 4 | (c - 1) << 2 | (d - 1);
528                    assert!(a + b + c + d <= 16);
529                    masks[idx] = (mask, (a + b + c + d) as u8);
530                    d += 1;
531                }
532                c += 1;
533            }
534            b += 1;
535        }
536        a += 1;
537    }
538    masks
539}
540
541/**
542Stream VByte Encoder
543
544Encodes a stream of numbers and saves them in a [`Write`] output stream.
545
546Data format follows this structure:
547
548```diagram
549┌───────┬───────┬─────────┬─────────┬────────┬────────┐
550│ MAGIC │ COUNT │ CS SIZE │ DS SIZE │ CS ... │ DS ... │
551└───────┴───────┴─────────┴─────────┴────────┴────────┘
552```
553
554- `MAGIC` is always `0x0B0D`;
555- `COUNT` the number of elements encoded in the segment (`u32`);
556- `CS SIZE` is the size of control stream in bytes (`u32`);
557- `DS SIZE` is the size of data stream in bytes (`u32`);
558- `CS` and `DS` and control and data streams.
559
560Segment header (`MAGIC`, `COUNT`, `CS SIZE`, `DS SIZE`) is enough to calculate the whole segment size.
561Segments follows each other until EOF of a stream reached.
562*/
563pub struct EncodeCursor<W> {
564    data_stream: Vec<u8>,
565    control_stream: Vec<u8>,
566    output: Box<W>,
567    written: usize,
568}
569
570impl<W: Write> EncodeCursor<W> {
571    pub fn new(output: W) -> Self {
572        Self {
573            data_stream: vec![],
574            control_stream: vec![],
575            output: Box::new(output),
576            written: 0,
577        }
578    }
579    /// Compresses input data using stream algorithm
580    pub fn encode(&mut self, input: &[u32]) -> io::Result<()> {
581        for n in input {
582            let bytes: [u8; 4] = n.to_be_bytes();
583            let length = 4 - n.leading_zeros() as u8 / 8;
584            let length = length.max(1);
585            debug_assert!((1..=4).contains(&length));
586
587            let control_word = self.get_control_word();
588            *control_word <<= 2;
589            *control_word |= length - 1;
590            self.written += 1;
591
592            self.data_stream.write_all(&bytes[4 - length as usize..])?;
593            self.write_segment_if_needed()?;
594        }
595        Ok(())
596    }
597
598    fn get_control_word(&mut self) -> &mut u8 {
599        if self.written % 4 == 0 {
600            self.control_stream.push(0);
601        }
602        self.control_stream.last_mut().unwrap()
603    }
604
605    fn write_segment_if_needed(&mut self) -> io::Result<()> {
606        const MAX_SEGMENT_SIZE: usize = 8 * 1024;
607        let segment_size = 2 // magic size
608            + 4 // stream size
609            + 4 // control stream size
610            + 4 // data stream size
611            + self.data_stream.len() + self.control_stream.len();
612        if segment_size >= MAX_SEGMENT_SIZE {
613            self.write_segment()?;
614
615            self.written = 0;
616            self.data_stream.clear();
617            self.control_stream.clear();
618        }
619        Ok(())
620    }
621
622    fn write_segment(&mut self) -> io::Result<()> {
623        let tail = self.written % 4;
624        // we need to shift last control byte left if number of elements
625        // not multiple of 4, otherwise it will be misaligned
626        if tail > 0 {
627            let control_word = self.control_stream.last_mut().unwrap();
628            *control_word <<= 2 * (4 - tail);
629        }
630
631        // Next we need to pad the data stream so that last quadruple will have 16 bytes at the end.
632        // Otherwise algorithm can cause loads from partially allocated memory when loading from
633        // the data stream to SIMD vector
634        let control_word = self.control_stream.last().unwrap();
635        let quadruple_length =
636            byte_to_4_length(*control_word).iter().sum::<u8>() as usize - (4 - tail);
637
638        for _ in quadruple_length..16 {
639            self.data_stream.write_all(&[0])?;
640        }
641
642        let header = SegmentHeader::new(
643            self.written,
644            self.control_stream.len(),
645            self.data_stream.len(),
646        );
647        header.write(&mut self.output)?;
648
649        self.output.write_all(&self.control_stream)?;
650        self.output.write_all(&self.data_stream)?;
651
652        Ok(())
653    }
654
655    /// Finish pending writes
656    ///
657    /// Write last segment to the output and return underlying [`Write`] to the client.
658    /// Writes are **not flushed**. It is a responsibility of a client to flush if needed.
659    pub fn finish(mut self) -> io::Result<W> {
660        self.write_segment()?;
661        Ok(*self.output)
662    }
663}
664
665/// Represents an object that can decode a stream of data into a buffer of fixed size. A type parameter `T` specifies /// the type of the elements in the buffer.
666pub trait Decoder<T: Copy + From<u8>> {
667    /// Decodes next elements into the buffer
668    ///
669    /// Decodes next elements and returns the number of decoded elements, or zero if the end of the
670    /// stream is reached. There is no guarantee about buffer element past the return value. They might be
671    /// left unchanged or zeroed out by this method.
672    fn decode(&mut self, buffer: &mut [T]) -> io::Result<usize>;
673
674    /// Returns the content of a stream in a Vec
675    fn to_vec(mut self) -> io::Result<Vec<T>>
676    where
677        Self: Sized,
678    {
679        let mut buffer = [0u8.into(); 128];
680        let mut result = vec![];
681        let mut len = self.decode(&mut buffer)?;
682        while len > 0 {
683            result.extend(&buffer[..len]);
684            len = self.decode(&mut buffer)?;
685        }
686        Ok(result)
687    }
688}
689
690/// Decoding control byte to 4 corresponding length
691///
692/// The length of each integer es encoded as 2 bits: from 00 (length 1) to 11 (length 4).
693fn byte_to_4_length(input: u8) -> [u8; 4] {
694    [
695        (input.rotate_left(2) & 0b11) + 1,
696        (input.rotate_left(4) & 0b11) + 1,
697        (input.rotate_left(6) & 0b11) + 1,
698        (input.rotate_left(8) & 0b11) + 1,
699    ]
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    use rand::{rngs::ThreadRng, thread_rng, Rng, RngCore};
706    use std::io::{Cursor, Seek, SeekFrom};
707
708    #[test]
709    fn check_encode() {
710        let (control, data, _) = encode_values(&[0x01, 0x0100, 0x010000, 0x01000000, 0x010000]);
711
712        assert_eq!(
713            data,
714            [
715                0x01, //
716                0x01, 0x00, //
717                0x01, 0x00, 0x00, //
718                0x01, 0x00, 0x00, 0x00, //
719                0x01, 0x00, 0x00, //
720                // 13 byte padding so last quadruple is 16 byte long
721                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
722            ]
723        );
724
725        let len = byte_to_4_length(control[0]);
726        assert_eq!(len, [1, 2, 3, 4]);
727
728        let len = byte_to_4_length(control[1]);
729        assert_eq!(len, [3, 1, 1, 1]);
730    }
731
732    #[test]
733    fn check_small_functional_encode_decode() {
734        let mut rng = thread_rng();
735        for _ in 0..1000 {
736            let len = rng.gen_range(1..20);
737            check_encode_decode_cycle(&mut rng, len);
738        }
739    }
740
741    #[test]
742    fn check_large_functional_encode_decode() {
743        let mut rng = thread_rng();
744        for _ in 0..10 {
745            let len = rng.gen_range(10000..20000);
746            check_encode_decode_cycle(&mut rng, len);
747        }
748    }
749
750    fn check_encode_decode_cycle(rng: &mut ThreadRng, len: usize) {
751        let input: Vec<u32> = generate_random_data(rng, len);
752        let (_, _, encoded) = encode_values(&input);
753        let output = DecodeCursor::new(MemorySegments::new(&encoded.into_inner()))
754            .unwrap()
755            .to_vec()
756            .unwrap();
757        assert_eq!(input.len(), output.len());
758        let chunk_size = 4;
759        for (i, (input, output)) in input
760            .chunks(chunk_size)
761            .zip(output.chunks(chunk_size))
762            .enumerate()
763        {
764            assert_eq!(input, output, "Arrays differs position {}", i * chunk_size);
765        }
766    }
767
768    #[test]
769    fn check_decode() {
770        let input = [1, 255, 1024, 2048, 0xFF000000];
771        let (_, _, encoded) = encode_values(&input);
772        let output = DecodeCursor::new(MemorySegments::new(&encoded.into_inner()))
773            .unwrap()
774            .to_vec()
775            .unwrap();
776        assert_eq!(output.len(), output.len());
777        assert_eq!(output, input);
778    }
779
780    #[allow(clippy::unusual_byte_groupings)]
781    #[test]
782    fn check_create_mask() {
783        assert_eq!(u32_shuffle_mask(1, 0), 0x808080_00);
784        assert_eq!(u32_shuffle_mask(2, 0), 0x8080_0001);
785
786        assert_eq!(u32_shuffle_mask(1, 3), 0x808080_03);
787        assert_eq!(u32_shuffle_mask(2, 3), 0x8080_0304);
788    }
789
790    #[allow(clippy::unusual_byte_groupings)]
791    #[test]
792    fn check_shuffle_masks() {
793        let masks = u32_shuffle_masks();
794        assert_eq!(
795            // Lengths 1, 1, 1, 1
796            masks[0b_00_00_00_00],
797            ([0x808080_00, 0x808080_01, 0x808080_02, 0x808080_03], 4)
798        );
799        assert_eq!(
800            // Lengths 4, 4, 4, 4
801            masks[0b_11_11_11_11],
802            ([0x00010203, 0x04050607, 0x08090a0b, 0x0c0d0e0f], 16)
803        );
804        assert_eq!(
805            // Lengths 4, 1, 4, 1
806            masks[0b_11_00_11_00],
807            ([0x00010203, 0x808080_04, 0x05060708, 0x808080_09], 10)
808        );
809        assert_eq!(
810            // Lengths 4, 3, 2, 1
811            masks[0b_11_10_01_00],
812            ([0x00010203, 0x80_040506, 0x8080_0708, 0x808080_09], 10)
813        );
814    }
815
816    #[test]
817    fn check_header_format() {
818        let expected = SegmentHeader::new(3, 1, 2);
819        let mut out = vec![];
820
821        expected.write(&mut out).unwrap();
822        let header = SegmentHeader::parse(&out);
823        assert_eq!(header, expected);
824    }
825
826    /// Creates and returns control and data stream for a given slice of numbers
827    pub fn encode_values(input: &[u32]) -> (Vec<u8>, Vec<u8>, Cursor<Vec<u8>>) {
828        let mut encoder = EncodeCursor::new(Cursor::new(vec![]));
829        encoder.encode(input).unwrap();
830        let mut source = encoder.finish().unwrap();
831        let mut cs = vec![];
832        let mut ds = vec![];
833        source.seek(SeekFrom::Start(0)).unwrap();
834        read_segment(&mut source, &mut cs, &mut ds).unwrap();
835        source.seek(SeekFrom::Start(0)).unwrap();
836        (cs, ds, source)
837    }
838
839    /// Generates "weighed" dataset fortesting purposes
840    ///
841    /// "Weighted" basically means that there is equal number of elements (in probabilistic terms)
842    /// with different length in varint encoding.
843    fn generate_random_data(rng: &mut ThreadRng, size: usize) -> Vec<u32> {
844        let mut input = vec![];
845        input.resize_with(size, || match rng.gen_range(1..=4) {
846            1 => rng.next_u32() % (0xFF + 1),
847            2 => rng.next_u32() % (0xFFFF + 1),
848            3 => rng.next_u32() % (0xFFFFFF + 1),
849            _ => rng.next_u32(),
850        });
851        input
852    }
853}