tree_buf/internal/types/
integer.rs

1// TODO: Try Streaming V-Byte (which has a Rust port)
2// https://lemire.me/blog/2017/09/27/stream-vbyte-breaking-new-speed-records-for-integer-compression/
3use crate::internal::encodings::compress;
4use crate::internal::encodings::varint::*;
5use crate::prelude::*;
6use num_traits::{AsPrimitive, Bounded};
7use simple_16::Simple16;
8use std::any::TypeId;
9use std::convert::{TryFrom, TryInto};
10use std::mem::transmute;
11use std::vec::IntoIter;
12use zigzag::ZigZag;
13
14#[derive(Copy, Clone)]
15struct U0;
16
17impl Bounded for U0 {
18    fn min_value() -> Self {
19        U0
20    }
21    fn max_value() -> Self {
22        U0
23    }
24}
25
26fn encode_u0<T, O: EncodeOptions>(_data: &[T], _max: T, _stream: &mut EncoderStream<'_, O>) -> ArrayTypeId {
27    unreachable!();
28}
29fn fast_size_for_u0<T, O>(_data: &[T], _max: T, _options: O) -> usize {
30    unreachable!();
31}
32
33macro_rules! impl_lowerable {
34    ($Ty:ty, $fn:ident, $fn_fast:ident, $Lty:ty, $lfn:ident, $lfn_fast:ident, ($($lower:ty),*), ($($compressions:ty),+)) => {
35        impl TryFrom<$Ty> for U0 {
36            type Error=();
37            fn try_from(_value: $Ty) -> Result<U0, Self::Error> {
38                Err(())
39            }
40        }
41        impl TryFrom<U0> for $Ty {
42            type Error=();
43            fn try_from(_value: U0) -> Result<$Ty, Self::Error> {
44                Err(())
45            }
46        }
47        impl AsPrimitive<U0> for $Ty {
48            fn as_(self) -> U0 {
49                unreachable!()
50            }
51        }
52
53        #[cfg(feature = "encode")]
54        impl Encodable for $Ty {
55            type EncoderArray = Vec<$Ty>;
56            fn encode_root<O: EncodeOptions>(&self, stream: &mut EncoderStream<'_, O>) -> RootTypeId {
57                encode_root_uint(*self as u64, stream.bytes)
58            }
59        }
60
61
62
63        #[cfg(feature = "encode")]
64        impl EncoderArray<$Ty> for Vec<$Ty> {
65            fn buffer_one<'a, 'b: 'a>(&'a mut self, value: &'b $Ty) {
66                self.push(*value);
67            }
68            fn buffer_many<'a, 'b: 'a>(&'a mut self, values: &'b [$Ty]) {
69                profile_method!(buffer_many);
70                self.extend_from_slice(values);
71            }
72            fn encode_all<O: EncodeOptions>(values: &[$Ty], stream: &mut EncoderStream<'_, O>) -> ArrayTypeId {
73                profile_method!(encode_all);
74                // TODO: (Performance) When getting ranges, use SIMD
75
76                let max = values.iter().max();
77                //dbg!(max);
78                if let Some(max) = max {
79                    // TODO: (Performance) Use second-stack
80                    // Lower to bool if possible. This is especially nice for enums
81                    // with 2 variants.
82                    if *max < 2 {
83                        let bools = values.iter().map(|i| *i == 1).collect::<Vec<_>>();
84                        bools.flush(stream)
85                    } else {
86                        $fn(values, *max, stream)
87                    }
88                } else {
89                    ArrayTypeId::Void
90                }
91            }
92            fn flush<O: EncodeOptions>(self, stream: &mut EncoderStream<'_, O>) -> ArrayTypeId {
93                Self::encode_all(&self[..], stream)
94            }
95        }
96
97        #[cfg(feature = "encode")]
98        impl PrimitiveEncoderArray<$Ty> for Vec<$Ty> {
99            fn fast_size_for_all<O: EncodeOptions>(values: &[$Ty], options: &O) -> usize {
100                let max = values.iter().max();
101                if let Some(max) = max {
102                    // TODO: (Performance) Use second-stack
103                    // Lower to bool if possible. This is especially nice for enums
104                    // with 2 variants.
105                    if *max < 2 {
106                        let bools = values.iter().map(|i| *i == 1).collect::<Vec<_>>();
107                        Vec::<bool>::fast_size_for_all(&bools[..], options)
108                    } else {
109                        $fn_fast(values, *max, options)
110                    }
111                } else {
112                    0
113                }
114            }
115        }
116
117        #[cfg(feature = "decode")]
118        impl Decodable for $Ty {
119            type DecoderArray = IntoIter<$Ty>;
120            fn decode(sticks: DynRootBranch<'_>, _options: &impl DecodeOptions) -> DecodeResult<Self> {
121                profile_method!(decode);
122                match sticks {
123                    DynRootBranch::Integer(root_int) => {
124                        match root_int {
125                            RootInteger::U(v) => v.try_into().map_err(|_| DecodeError::SchemaMismatch),
126                            _ => Err(DecodeError::SchemaMismatch),
127                        }
128                    }
129                    _ => Err(DecodeError::SchemaMismatch),
130                }
131            }
132        }
133
134        #[cfg(feature = "decode")]
135        impl InfallibleDecoderArray for IntoIter<$Ty> {
136            type Decode = $Ty;
137            fn new_infallible(sticks: DynArrayBranch<'_>, options: &impl DecodeOptions) -> DecodeResult<Self> {
138                profile_method!(new_infallible);
139
140                match sticks {
141                    DynArrayBranch::Integer(array_int) => {
142                        let ArrayInteger { bytes, encoding } = array_int;
143                        match encoding {
144                            ArrayIntegerEncoding::PrefixVarInt => {
145                                profile_section!(prefix_var_int);
146
147                                let v: Vec<$Ty> = decode_all(
148                                        &bytes,
149                                        |bytes, offset| {
150                                            let r: $Ty = decode_prefix_varint(bytes, offset)?.try_into().map_err(|_| DecodeError::SchemaMismatch)?;
151                                            Ok(r)
152                                        }
153                                )?;
154                                Ok(v.into_iter())
155                            }
156                            ArrayIntegerEncoding::Simple16 => {
157                                profile_section!(simple_16);
158
159                                let mut v = Vec::new();
160                                simple_16::decompress(&bytes, &mut v).map_err(|_| DecodeError::InvalidFormat)?;
161                                let result: Result<Vec<_>, _> = v.into_iter().map(TryInto::<$Ty>::try_into).collect();
162                                let v = result.map_err(|_| DecodeError::SchemaMismatch)?;
163                                Ok(v.into_iter())
164                            },
165                            ArrayIntegerEncoding::U8 => {
166                                profile_section!(fixed_u8);
167
168                                let v: Vec<$Ty> = bytes.iter().map(|&b| b.into()).collect();
169                                Ok(v.into_iter())
170                            },
171                            ArrayIntegerEncoding::DeltaZig => {
172                                profile_section!(delta_zig);
173                                let mut v = Vec::new();
174                                let mut prev: u32 = 0;
175                                let mut offset = 0;
176                                while offset < bytes.len() {
177                                    // TODO: Not hardcoded to u32
178                                    // See also e394b0c7-d5af-40b8-b944-cb68bac33fe9
179                                    let next: u32 = decode_prefix_varint(&bytes, &mut offset)?.try_into().map_err(|_| DecodeError::InvalidFormat)?;
180                                    let next: i32 = ZigZag::decode(next);
181                                    let next = prev.wrapping_add(next as u32);
182                                    prev = next;
183                                    v.push(next.try_into().map_err(|_| DecodeError::InvalidFormat)?);
184                                }
185                                Ok(v.into_iter())
186                            }
187                        }
188                    },
189                    DynArrayBranch::RLE { runs, values } => {
190                        let rle = RleIterator::new(runs, values, options, |values| Self::new_infallible(values, options))?;
191                        let all = rle.collect::<Vec<_>>();
192                        Ok(all.into_iter())
193                    },
194                    // FIXME: This fixes a particular test.
195                    // It is unclear if this is canon.
196                    // See also: 84d15459-35e4-4f04-896f-0f4ea9ce52a9
197                    // TODO: Also apply this to other types
198                    DynArrayBranch::Void => {
199                        Ok(Vec::new().into_iter())
200                    }
201                    other => {
202                        let bools = <IntoIter<bool> as InfallibleDecoderArray>::new_infallible(other, options)?;
203                        let mapped = bools.map(|i| if i {1} else {0}).collect::<Vec<_>>();
204                        Ok(mapped.into_iter())
205                    },
206                }
207            }
208            fn decode_next_infallible(&mut self) -> Self::Decode {
209                self.next().unwrap_or_default()
210            }
211        }
212
213        #[cfg(feature = "encode")]
214        fn $fn_fast<O: EncodeOptions, T: Copy + std::fmt::Debug + AsPrimitive<$Ty> + AsPrimitive<U0> + AsPrimitive<u8> + AsPrimitive<$Lty> $(+ AsPrimitive<$lower>),*>
215            (data: &[T], max: T, options: &O) -> usize {
216
217            let lower_max: Result<$Ty, _> = <$Lty as Bounded>::max_value().try_into();
218
219            if let Ok(lower_max) = lower_max {
220                if lower_max >= max.as_() {
221                    return $lfn_fast(data, max, options)
222                }
223            }
224
225            fn fast_inner<O: EncodeOptions>(data: &[$Ty], options: &O, max: $Ty) -> usize {
226                let compressors = (
227                    $(<$compressions>::new(max),)+
228                    RLE::new(($(<$compressions>::new(max),)+))
229                );
230                fast_size_for(data, &compressors, options)
231            }
232
233            // Convert data to as<T>, using a transmute if that's already correct
234            if TypeId::of::<$Ty>() == TypeId::of::<T>() {
235                // Safety - this is a unit conversion.
236                let data = unsafe { transmute(data) };
237                fast_inner(data, options, max.as_())
238            } else {
239                // TODO: (Performance) Use second-stack
240                let v = {
241                    profile_section!(copy_to_lowered);
242                    data.iter().map(|i| i.as_()).collect::<Vec<_>>()
243                };
244                fast_inner(&v, options, max.as_())
245            }
246        }
247
248        #[cfg(feature = "encode")]
249        fn $fn<O: EncodeOptions, T: Copy + std::fmt::Debug + AsPrimitive<$Ty> + AsPrimitive<U0> + AsPrimitive<u8> + AsPrimitive<$Lty> $(+ AsPrimitive<$lower>),*>
250            (data: &[T], max: T, stream: &mut EncoderStream<'_, O>) -> ArrayTypeId {
251
252            let lower_max: Result<$Ty, _> = <$Lty as Bounded>::max_value().try_into();
253
254            if let Ok(lower_max) = lower_max {
255                if lower_max >= max.as_() {
256                    return $lfn(data, max, stream)
257                }
258            }
259
260            fn encode_inner<O: EncodeOptions>(data: &[$Ty], stream: &mut EncoderStream<'_, O>, max: $Ty) -> ArrayTypeId {
261                let compressors = (
262                    $(<$compressions>::new(max),)+
263                    RLE::new(($(<$compressions>::new(max),)+))
264                );
265                compress(data, stream, &compressors)
266            }
267
268            // Convert data to as<T>, using a transmute if that's already correct
269            if TypeId::of::<$Ty>() == TypeId::of::<T>() {
270                // Safety - this is a unit conversion.
271                let data = unsafe { transmute(data) };
272                encode_inner(data, stream, max.as_())
273            } else {
274                // TODO: (Performance) Use second-stack
275                let v = {
276                    profile_section!(needless_lowered_copy);
277                    data.iter().map(|i| i.as_()).collect::<Vec<_>>()
278                };
279                encode_inner(&v, stream, max.as_())
280            }
281        }
282    };
283}
284
285// TODO: This does all kinds of silly things. Eg: Perhaps we have u32 and simple16 is best.
286// This may downcast to u16 then back up to u32. I'm afraid the final result is just going to
287// be a bunch of hairy special code for each type with no generality.
288//
289// Broadly we only want to downcast if it allows for some other kind of compressor to be used.
290
291// Type, array encoder, next lower, next lower encoder, non-inferred lowers
292impl_lowerable!(u64, encode_u64, fast_size_for_u64, u32, encode_u32, fast_size_for_u32, (u16), (PrefixVarIntCompressor));
293impl_lowerable!(
294    u32,
295    encode_u32,
296    fast_size_for_u32,
297    u16,
298    encode_u16,
299    fast_size_for_u16,
300    (),
301    (Simple16Compressor<u32>, DeltaZigZagCompressor, PrefixVarIntCompressor)
302); // TODO: Consider adding Fixed.
303impl_lowerable!(
304    u16,
305    encode_u16,
306    fast_size_for_u16,
307    u8,
308    encode_u8,
309    fast_size_for_u8,
310    (),
311    (Simple16Compressor<u16>, PrefixVarIntCompressor)
312);
313impl_lowerable!(
314    u8,
315    encode_u8,
316    fast_size_for_u8,
317    U0,
318    encode_u0,
319    fast_size_for_u0,
320    (),
321    (Simple16Compressor<u8>, BytesCompressor)
322);
323
324#[cfg(feature = "encode")]
325fn encode_root_uint(value: u64, bytes: &mut Vec<u8>) -> RootTypeId {
326    let le = value.to_le_bytes();
327    match value {
328        0 => RootTypeId::Zero,
329        1 => RootTypeId::One,
330        2..=255 => {
331            bytes.push(le[0]);
332            RootTypeId::IntU8
333        }
334        256..=65535 => {
335            bytes.extend_from_slice(&le[..2]);
336            RootTypeId::IntU16
337        }
338        65536..=16777215 => {
339            bytes.extend_from_slice(&le[..3]);
340            RootTypeId::IntU24
341        }
342        16777216..=4294967295 => {
343            bytes.extend_from_slice(&le[..4]);
344            RootTypeId::IntU32
345        }
346        4294967296..=1099511627775 => {
347            bytes.extend_from_slice(&le[..5]);
348            RootTypeId::IntU40
349        }
350        1099511627776..=281474976710655 => {
351            bytes.extend_from_slice(&le[..6]);
352            RootTypeId::IntU48
353        }
354        281474976710656..=72057594037927936 => {
355            bytes.extend_from_slice(&le[..7]);
356            RootTypeId::IntU56
357        }
358        _ => {
359            bytes.extend_from_slice(&le);
360            RootTypeId::IntU64
361        }
362    }
363}
364
365// TODO: One-offing this isn't great.
366// Get unsigned integers implemented
367// TODO: Wrapping over smaller sizes
368struct DeltaZigZagCompressor;
369impl DeltaZigZagCompressor {
370    #[inline(always)]
371    pub fn new<T>(_max: T) -> Self {
372        Self
373    }
374}
375
376// TODO: Use second-stack
377fn get_delta_zigs(data: &[u32]) -> Result<Vec<u32>, ()> {
378    // TODO: Rename? This isn't really in rle
379    within_rle(|| {
380        if data.len() < 2 {
381            return Err(());
382        }
383        let mut result = Vec::new();
384        let mut current = 0;
385        for next in data.iter() {
386            // TODO: Not hard-coded to u32
387            // See also e394b0c7-d5af-40b8-b944-cb68bac33fe9
388            let diff = next.wrapping_sub(current) as i32;
389            let zig = ZigZag::encode(diff);
390            result.push(zig);
391            current = *next;
392        }
393        Ok(result)
394    })
395}
396
397impl Compressor<u32> for DeltaZigZagCompressor {
398    fn compress<O: EncodeOptions>(&self, data: &[u32], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
399        let deltas = get_delta_zigs(data)?;
400        let _ignore_id = PrefixVarIntCompressor.compress(&deltas, stream);
401        Ok(ArrayTypeId::DeltaZig)
402    }
403    fn fast_size_for<O: EncodeOptions>(&self, data: &[u32], options: &O) -> Result<usize, ()> {
404        let deltas = get_delta_zigs(data)?;
405        PrefixVarIntCompressor.fast_size_for(&deltas, options)
406    }
407}
408
409struct PrefixVarIntCompressor;
410
411impl PrefixVarIntCompressor {
412    #[inline(always)]
413    pub fn new<T>(_max: T) -> Self {
414        Self
415    }
416}
417
418impl<T: Into<u64> + Copy> Compressor<T> for PrefixVarIntCompressor {
419    fn fast_size_for<O: EncodeOptions>(&self, data: &[T], _options: &O) -> Result<usize, ()> {
420        profile_method!(fast_size_for);
421        let mut size = 0;
422        for item in data {
423            size += size_for_varint((*item).into());
424        }
425        Ok(size)
426    }
427    fn compress<O: EncodeOptions>(&self, data: &[T], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
428        profile_method!(compress);
429        stream.encode_with_len(|stream| {
430            for item in data {
431                encode_prefix_varint((*item).into(), &mut stream.bytes);
432            }
433        });
434        Ok(ArrayTypeId::IntPrefixVar)
435    }
436}
437
438struct Simple16Compressor<T>(T);
439
440impl<T> Simple16Compressor<T> {
441    #[inline(always)]
442    pub fn new(max: T) -> Self {
443        Self(max)
444    }
445}
446
447impl<T: Simple16> Simple16Compressor<T> {
448    fn check_range(&self) -> Result<(), ()> {
449        T::check(&[self.0]).map_err(|_| ())
450    }
451}
452
453impl<T: Simple16 + PartialOrd> Compressor<T> for Simple16Compressor<T> {
454    fn compress<O: EncodeOptions>(&self, data: &[T], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
455        profile_method!(compress);
456
457        self.check_range()?;
458
459        stream.encode_with_len(|stream| unsafe { simple_16::compress_unchecked(&data, stream.bytes) });
460
461        Ok(ArrayTypeId::IntSimple16)
462    }
463
464    fn fast_size_for<O: EncodeOptions>(&self, data: &[T], _options: &O) -> Result<usize, ()> {
465        profile_method!(fast_size_for);
466
467        self.check_range()?;
468
469        let size = unsafe { simple_16::calculate_size_unchecked(&data) };
470
471        Ok(size)
472    }
473}
474
475struct BytesCompressor;
476impl BytesCompressor {
477    #[inline(always)]
478    pub fn new<T>(_max: T) -> Self {
479        Self
480    }
481}
482
483impl Compressor<u8> for BytesCompressor {
484    fn compress<O: EncodeOptions>(&self, data: &[u8], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
485        profile_method!(compress);
486        stream.encode_with_len(|stream| stream.bytes.extend_from_slice(data));
487        Ok(ArrayTypeId::U8)
488    }
489    fn fast_size_for<O: EncodeOptions>(&self, data: &[u8], _options: &O) -> Result<usize, ()> {
490        let len_size = size_for_varint(data.len() as u64);
491        Ok(data.len() + len_size)
492    }
493}
494
495// TODO: Bitpacking https://crates.io/crates/bitpacking
496// TODO: Mayda https://crates.io/crates/mayda
497// TODO: https://lemire.me/blog/2012/09/12/fast-integer-compression-decoding-billions-of-integers-per-second/