tree_buf/internal/types/
float.rs

1// Promising Compressors:
2// Gorilla - https://crates.io/crates/tsz   http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
3// FPC
4// Akamuli - https://akumuli.org/akumuli/2017/02/05/compression_part2/
5// ? http://blog.omega-prime.co.uk/2016/01/25/compression-of-floating-point-timeseries/
6// https://www.cs.unc.edu/~isenburg/lcpfpv/
7// dfcm - https://userweb.cs.txstate.edu/~mb92/papers/dcc06.pdf
8
9// TODO: Lowerings
10// Interesting reading: https://internals.rust-lang.org/t/tryfrom-for-f64/9793/35
11// A useful starting point is that all possible down-cast through up-cast round trips
12// must preserve bit-for-bit the original value. That's not quite enough though, since this
13// is true for some values due to saturating rounding that one wouldn't want to downcast.
14// https://floating-point-gui.de/formats/fp/
15// f64 -> u64
16// f64 -> f32
17// f32 -> u32
18
19// TODO: More compressors
20
21macro_rules! impl_float {
22    ($T:ident, $id:ident) => {
23        //use crate::encodings::zfp;
24        use crate::prelude::*;
25        use num_traits::AsPrimitive as _;
26        use std::convert::TryInto;
27        use std::mem::size_of;
28        use std::vec::IntoIter;
29
30        // TODO: Check for lowering - f64 -> f63
31        #[cfg(feature = "encode")]
32        fn encode_item(item: $T, bytes: &mut Vec<u8>) {
33            let b = item.to_le_bytes();
34            bytes.extend_from_slice(&b);
35        }
36
37        #[cfg(feature = "decode")]
38        pub(super) fn decode_item(bytes: &[u8], offset: &mut usize) -> DecodeResult<$T> {
39            let bytes = decode_bytes(size_of::<$T>(), bytes, offset)?;
40            // This unwrap is ok, because we just read exactly size_of::<T> bytes on the line above.
41            Ok(<$T>::from_le_bytes(bytes.try_into().unwrap()))
42        }
43
44        #[cfg(feature = "encode")]
45        impl Encodable for $T {
46            type EncoderArray = Vec<$T>;
47            fn encode_root<O: EncodeOptions>(&self, stream: &mut EncoderStream<'_, O>) -> RootTypeId {
48                let value = *self;
49
50                // Check for positive sign so that -0.0 goes through
51                // the unhappy path but round-trips bit-for-bit
52                if value == 0.0 && value.is_sign_positive() {
53                    RootTypeId::Zero
54                } else if value == 1.0 {
55                    RootTypeId::One
56                } else if value == -1.0 {
57                    RootTypeId::NegOne
58                } else if value.is_nan() {
59                    // FIXME: Check for canonical NaN,
60                    // so that other NaN round trip bit-for-bit
61                    RootTypeId::NaN
62                } else {
63                    encode_item(value, stream.bytes);
64                    RootTypeId::$id
65                }
66            }
67        }
68
69        #[cfg(feature = "decode")]
70        impl Decodable for $T {
71            type DecoderArray = IntoIter<$T>;
72            fn decode(sticks: DynRootBranch<'_>, _options: &impl DecodeOptions) -> DecodeResult<Self> {
73                profile_method!(decode);
74                match sticks {
75                    DynRootBranch::Integer(root_integer) => {
76                        // FIXME: Fast and lose to get refactoring done. Double check here.
77                        // Also, float can express some (but not all) integers larger than MAX_SAFE_INT
78                        match root_integer {
79                            RootInteger::U(u) => {
80                                if u < (2 << std::$T::MANTISSA_DIGITS) {
81                                    Ok(u as $T)
82                                } else {
83                                    Err(DecodeError::SchemaMismatch)
84                                }
85                            }
86                            RootInteger::S(s) => {
87                                if s < (2 << std::$T::MANTISSA_DIGITS) && s > (-2 << (std::$T::MANTISSA_DIGITS - 1)) {
88                                    // FIXME: Made up number
89                                    Ok(s as $T)
90                                } else {
91                                    Err(DecodeError::SchemaMismatch)
92                                }
93                            }
94                        }
95                    }
96                    DynRootBranch::Float(root_float) => {
97                        match root_float {
98                            // FIXME: Macro here - should be schema mismatch for f64 -> f32
99                            RootFloat::F64(v) => Ok(v as $T),
100                            RootFloat::NaN => Ok(std::$T::NAN),
101                            // This should be safe to cast without loss of information.
102                            // Double-check that the meaning of various NaN values
103                            // is preserved though (signaling, non-signaling, etc)
104                            // https://stackoverflow.com/a/59795029/11837266
105                            RootFloat::F32(v) => Ok(v as $T),
106                        }
107                    }
108                    _ => Err(DecodeError::SchemaMismatch),
109                }
110            }
111        }
112
113        #[cfg(feature = "decode")]
114        impl InfallibleDecoderArray for IntoIter<$T> {
115            type Decode = $T;
116            fn new_infallible(sticks: DynArrayBranch<'_>, _options: &impl DecodeOptions) -> DecodeResult<Self> {
117                profile_method!(new_infallibe);
118
119                match sticks {
120                    DynArrayBranch::Float(float) => {
121                        match float {
122                            ArrayFloat::F64(bytes) => {
123                                profile_section!(array_f64);
124
125                                // FIXME: Should do schema mismatch for f32 -> f64
126                                let values = decode_all(&bytes, |bytes, offset| Ok(super::_f64::decode_item(bytes, offset)?.as_()))?;
127                                Ok(values.into_iter())
128                            }
129                            ArrayFloat::F32(bytes) => {
130                                profile_section!(array_f32);
131
132                                let values = decode_all(&bytes, |bytes, offset| Ok(super::_f32::decode_item(bytes, offset)?.as_()))?;
133                                Ok(values.into_iter())
134                            }
135                            ArrayFloat::DoubleGorilla(bytes) => gorilla::decompress::<$T>(&bytes).map(|f| f.into_iter()),
136                            /*
137                            ArrayFloat::Zfp32(bytes) => {
138                                // FIXME: This is likely a bug switching between 32 and 64 might just get garbage data out
139                                let values = zfp::decompress::<f32>(&bytes)?;
140                                // TODO: (Performance) unnecessary copy in some cases
141                                let values: Vec<_> = values.iter().map(|v| v.as_()).collect();
142                                Ok(values.into_iter())
143                            }
144                            ArrayFloat::Zfp64(bytes) => {
145                                let values = zfp::decompress::<f64>(&bytes)?;
146                                // TODO: (Performance) unnecessary copy in some cases
147                                let values: Vec<_> = values.iter().map(|v| v.as_()).collect();
148                                Ok(values.into_iter())
149                            }
150                            */
151                            ArrayFloat::Zfp32(_bytes) => unimplemented!("zfp32"),
152                            ArrayFloat::Zfp64(_bytes) => unimplemented!("zfp64"),
153                        }
154                    }
155                    // TODO: There are some conversions that are infallable.
156                    // Eg: Simple16.
157                    _ => Err(DecodeError::SchemaMismatch),
158                }
159            }
160            fn decode_next_infallible(&mut self) -> Self::Decode {
161                self.next().unwrap_or_default()
162            }
163        }
164
165        #[cfg(feature = "encode")]
166        impl EncoderArray<$T> for Vec<$T> {
167            fn buffer_one<'a, 'b: 'a>(&'a mut self, value: &'b $T) {
168                self.push(*value);
169            }
170            fn buffer_many<'a, 'b: 'a>(&'a mut self, values: &'b [$T]) {
171                profile_method!(buffer_many);
172                self.extend_from_slice(values);
173            }
174            fn encode_all<O: EncodeOptions>(values: &[$T], stream: &mut EncoderStream<'_, O>) -> ArrayTypeId {
175                profile_method!(encode_all);
176
177                // See also 558c24b8-dc75-4f08-8ea2-0f839af4da2e
178                let compressors = (
179                    Fixed, //Zfp,
180                    Gorilla,
181                );
182
183                compress(values, stream, &compressors)
184            }
185            fn flush<O: EncodeOptions>(self, stream: &mut EncoderStream<'_, O>) -> ArrayTypeId {
186                Self::encode_all(&self[..], stream)
187            }
188        }
189
190        impl PrimitiveEncoderArray<$T> for Vec<$T> {
191            fn fast_size_for_all<O: EncodeOptions>(values: &[$T], options: &O) -> usize {
192                // See also 558c24b8-dc75-4f08-8ea2-0f839af4da2e
193                let compressors = (
194                    Fixed, //Zfp,
195                    Gorilla,
196                );
197                fast_size_for(values, &compressors, options)
198            }
199        }
200
201        struct Fixed;
202        impl Compressor<$T> for Fixed {
203            fn fast_size_for<O: EncodeOptions>(&self, data: &[$T], _options: &O) -> Result<usize, ()> {
204                let arr_size = size_of::<$T>() * data.len();
205                Ok(arr_size + size_for_varint(arr_size as u64))
206            }
207            fn compress<O: EncodeOptions>(&self, data: &[$T], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
208                profile_method!(compress);
209                stream.encode_with_len(|stream| {
210                    for item in data {
211                        encode_item(*item, &mut stream.bytes);
212                    }
213                });
214                Ok(ArrayTypeId::$id)
215            }
216        }
217
218        struct Gorilla;
219        impl Compressor<$T> for Gorilla {
220            fn fast_size_for<O: EncodeOptions>(&self, data: &[$T], options: &O) -> Result<usize, ()> {
221                profile_method!(fast_size_for);
222
223                if let Some(tolerance) = options.lossy_float_tolerance() {
224                    // TODO: This is a hack (albeit a surprisingly effective one) to get lossy compression
225                    // before a real lossy compressor (Eg: fzip) is used.
226                    let multiplier = (2.0 as $T).powi(tolerance * -1);
227                    let data = data.iter().map(|f| ((f * multiplier).floor() / multiplier) as f64);
228                    gorilla::size_for(data)
229                } else {
230                    let data = data.iter().map(|f| *f as f64);
231                    gorilla::size_for(data)
232                }
233            }
234
235            fn compress<O: EncodeOptions>(&self, data: &[$T], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
236                profile_method!(compress);
237
238                stream.encode_with_len(|stream| {
239                    if let Some(tolerance) = stream.options.lossy_float_tolerance() {
240                        // TODO: This is a hack (albeit a surprisingly effective one) to get lossy compression
241                        // before a real lossy compressor (Eg: fzip) is used.
242                        let multiplier = (2.0 as $T).powi(tolerance * -1);
243                        let data = data.iter().map(|f| ((f * multiplier).floor() / multiplier) as f64);
244                        gorilla::compress(data, stream.bytes)
245                    } else {
246                        let data = data.iter().map(|f| *f as f64);
247                        gorilla::compress(data, stream.bytes)
248                    }
249                })
250            }
251        }
252    };
253}
254
255/*
256struct Zfp64 {
257    tolerance: Option<i32>,
258}
259impl Compressor<f64> for Zfp64 {
260    fn compress<O: EncodeOptions>(&self, data: &[f64], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
261        profile_method!(compress);
262        stream.encode_with_len(|stream| zfp::compress(data, &mut stream.bytes, self.tolerance));
263    }
264}
265
266struct Zfp32 {
267    tolerance: Option<i32>,
268}
269impl Compressor<f32> for Zfp32 {
270    fn compress<O: EncodeOptions>(&self, data: &[f32], stream: &mut EncoderStream<'_, O>) -> Result<ArrayTypeId, ()> {
271        profile_method!(compress);
272        stream.encode_with_len(|stream| zfp::compress(data, bytes, self.tolerance));
273    }
274}
275*/
276
277mod _f64 {
278    impl_float!(f64, F64);
279}
280mod _f32 {
281    impl_float!(f32, F32);
282}