lance_encoding/previous/encodings/
physical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use lance_arrow::DataTypeExt;
6
7use crate::{
8    buffer::LanceBuffer,
9    decoder::{PageBuffers, PageScheduler},
10    encodings::physical::block::{CompressionConfig, CompressionScheme},
11    format::pb::{self, PackedStruct},
12    previous::encodings::physical::{
13        basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
14        dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
15        fsst::FsstPageScheduler, packed_struct::PackedStructPageScheduler,
16        value::ValuePageScheduler,
17    },
18};
19
20pub mod basic;
21pub mod binary;
22pub mod bitmap;
23#[cfg(feature = "bitpacking")]
24pub mod bitpack;
25pub mod block;
26pub mod dictionary;
27pub mod fixed_size_binary;
28pub mod fixed_size_list;
29pub mod fsst;
30pub mod packed_struct;
31pub mod value;
32
33// Translate a protobuf buffer description into a position in the file.  This could be a page
34// buffer, a column buffer, or a file buffer.
35fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
36    let index = buffer_desc.buffer_index as usize;
37
38    match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
39        pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
40        pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
41        pb::buffer::BufferType::File => {
42            buffers.column_buffers.file_buffers.positions_and_sizes[index]
43        }
44    }
45}
46
47/// Convert a protobuf buffer encoding into a physical page scheduler
48fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
49    let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
50    let compression_config: CompressionConfig = if encoding.compression.is_none() {
51        CompressionConfig::new(CompressionScheme::None, None)
52    } else {
53        let compression = encoding.compression.as_ref().unwrap();
54        CompressionConfig::new(
55            compression.scheme.as_str().parse().unwrap(),
56            compression.level,
57        )
58    };
59    match encoding.bits_per_value {
60        1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
61        bits_per_value => {
62            if bits_per_value % 8 != 0 {
63                todo!(
64                    "bits_per_value ({}) that is not a multiple of 8",
65                    bits_per_value
66                );
67            }
68            Box::new(ValuePageScheduler::new(
69                bits_per_value / 8,
70                buffer_offset,
71                buffer_size,
72                compression_config,
73            ))
74        }
75    }
76}
77
78#[cfg(feature = "bitpacking")]
79fn get_bitpacked_buffer_decoder(
80    encoding: &pb::Bitpacked,
81    buffers: &PageBuffers,
82) -> Box<dyn PageScheduler> {
83    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
84
85    Box::new(bitpack::BitpackedScheduler::new(
86        encoding.compressed_bits_per_value,
87        encoding.uncompressed_bits_per_value,
88        buffer_offset,
89        encoding.signed,
90    ))
91}
92
93#[cfg(feature = "bitpacking")]
94fn get_bitpacked_for_non_neg_buffer_decoder(
95    encoding: &pb::BitpackedForNonNeg,
96    buffers: &PageBuffers,
97) -> Box<dyn PageScheduler> {
98    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
99
100    Box::new(bitpack::BitpackedForNonNegScheduler::new(
101        encoding.compressed_bits_per_value,
102        encoding.uncompressed_bits_per_value,
103        buffer_offset,
104    ))
105}
106
107fn decoder_from_packed_struct(
108    packed_struct: &PackedStruct,
109    buffers: &PageBuffers,
110    data_type: &DataType,
111) -> Box<dyn PageScheduler> {
112    let inner_encodings = &packed_struct.inner;
113    let fields = match data_type {
114        DataType::Struct(fields) => Some(fields),
115        _ => None,
116    }
117    .unwrap();
118
119    let inner_datatypes = fields
120        .iter()
121        .map(|field| field.data_type())
122        .collect::<Vec<_>>();
123
124    let mut inner_schedulers = Vec::with_capacity(fields.len());
125    for i in 0..fields.len() {
126        let inner_encoding = &inner_encodings[i];
127        let inner_datatype = inner_datatypes[i];
128        let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
129        inner_schedulers.push(inner_scheduler);
130    }
131
132    let packed_buffer = packed_struct.buffer.as_ref().unwrap();
133    let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
134
135    Box::new(PackedStructPageScheduler::new(
136        inner_schedulers,
137        data_type.clone(),
138        buffer_offset,
139    ))
140}
141
142/// Convert a protobuf array encoding into a physical page scheduler
143pub fn decoder_from_array_encoding(
144    encoding: &pb::ArrayEncoding,
145    buffers: &PageBuffers,
146    data_type: &DataType,
147) -> Box<dyn PageScheduler> {
148    match encoding.array_encoding.as_ref().unwrap() {
149        pb::array_encoding::ArrayEncoding::Nullable(basic) => {
150            match basic.nullability.as_ref().unwrap() {
151                pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
152                    BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
153                        no_nulls.values.as_ref().unwrap(),
154                        buffers,
155                        data_type,
156                    )),
157                ),
158                pb::nullable::Nullability::SomeNulls(some_nulls) => {
159                    Box::new(BasicPageScheduler::new_nullable(
160                        decoder_from_array_encoding(
161                            some_nulls.validity.as_ref().unwrap(),
162                            buffers,
163                            data_type,
164                        ),
165                        decoder_from_array_encoding(
166                            some_nulls.values.as_ref().unwrap(),
167                            buffers,
168                            data_type,
169                        ),
170                    ))
171                }
172                pb::nullable::Nullability::AllNulls(_) => {
173                    Box::new(BasicPageScheduler::new_all_null())
174                }
175            }
176        }
177        #[cfg(feature = "bitpacking")]
178        pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
179            get_bitpacked_buffer_decoder(bitpacked, buffers)
180        }
181        #[cfg(not(feature = "bitpacking"))]
182        pb::array_encoding::ArrayEncoding::Bitpacked(_) => {
183            panic!("Runtime built without bitpacking support")
184        }
185        pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
186        pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
187            let item_encoding = fixed_size_list.items.as_ref().unwrap();
188            let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
189            Box::new(FixedListScheduler::new(
190                item_scheduler,
191                fixed_size_list.dimension,
192            ))
193        }
194        // This is a column containing the list offsets.  This wrapper is superfluous at the moment
195        // since we know it is a list based on the schema.  In the future there may be different ways
196        // of storing the list offsets.
197        pb::array_encoding::ArrayEncoding::List(list) => {
198            decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
199        }
200        pb::array_encoding::ArrayEncoding::Binary(binary) => {
201            let indices_encoding = binary.indices.as_ref().unwrap();
202            let bytes_encoding = binary.bytes.as_ref().unwrap();
203
204            let indices_scheduler =
205                decoder_from_array_encoding(indices_encoding, buffers, data_type);
206            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
207
208            let offset_type = match data_type {
209                DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
210                _ => DataType::Int32,
211            };
212
213            Box::new(BinaryPageScheduler::new(
214                indices_scheduler.into(),
215                bytes_scheduler.into(),
216                offset_type,
217                binary.null_adjustment,
218            ))
219        }
220        pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
221            let inner =
222                decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
223
224            Box::new(FsstPageScheduler::new(
225                inner,
226                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
227            ))
228        }
229        pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
230            let indices_encoding = dictionary.indices.as_ref().unwrap();
231            let items_encoding = dictionary.items.as_ref().unwrap();
232            let num_dictionary_items = dictionary.num_dictionary_items;
233
234            // We can get here in 2 ways.  The data is dictionary encoded and the user wants a dictionary or
235            // the data is dictionary encoded, as an optimization, and the user wants the value type.  Figure
236            // out the value type.
237            let value_type = if let DataType::Dictionary(_, value_type) = data_type {
238                value_type
239            } else {
240                data_type
241            };
242
243            // Note: we don't actually know the indices type here, passing down `data_type` works ok because
244            // the dictionary indices are always integers and we don't need the data_type to figure out how
245            // to decode integers.
246            let indices_scheduler =
247                decoder_from_array_encoding(indices_encoding, buffers, data_type);
248
249            let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
250
251            let should_decode_dict = !data_type.is_dictionary();
252
253            Box::new(DictionaryPageScheduler::new(
254                indices_scheduler.into(),
255                items_scheduler.into(),
256                num_dictionary_items,
257                should_decode_dict,
258            ))
259        }
260        pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
261            let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
262            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
263            let bytes_per_offset = match data_type {
264                DataType::LargeBinary | DataType::LargeUtf8 => 8,
265                DataType::Binary | DataType::Utf8 => 4,
266                _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
267            };
268
269            Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
270                bytes_scheduler,
271                fixed_size_binary.byte_width,
272                bytes_per_offset,
273            ))
274        }
275        pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
276            decoder_from_packed_struct(packed_struct, buffers, data_type)
277        }
278        #[cfg(feature = "bitpacking")]
279        pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
280            get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
281        }
282        #[cfg(not(feature = "bitpacking"))]
283        pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(_) => {
284            panic!("Runtime built without bitpacking support")
285        }
286        // Currently there is no way to encode struct nullability and structs are encoded with a "header" column
287        // (that has no data).  We never actually decode that column and so this branch is never actually encountered.
288        //
289        // This will change in the future when we add support for struct nullability.
290        pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
291        // 2.1 only
292        _ => unreachable!("Unsupported array encoding: {:?}", encoding),
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use crate::decoder::{ColumnBuffers, FileBuffers, PageBuffers};
299    use crate::format::pb;
300    use crate::previous::encodings::physical::get_buffer_decoder;
301
302    #[test]
303    fn test_get_buffer_decoder_for_compressed_buffer() {
304        let page_scheduler = get_buffer_decoder(
305            &pb::Flat {
306                buffer: Some(pb::Buffer {
307                    buffer_index: 0,
308                    buffer_type: pb::buffer::BufferType::File as i32,
309                }),
310                bits_per_value: 8,
311                compression: Some(pb::Compression {
312                    scheme: "zstd".to_string(),
313                    level: Some(0),
314                }),
315            },
316            &PageBuffers {
317                column_buffers: ColumnBuffers {
318                    file_buffers: FileBuffers {
319                        positions_and_sizes: &[(0, 100)],
320                    },
321                    positions_and_sizes: &[],
322                },
323                positions_and_sizes: &[],
324            },
325        );
326        assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
327    }
328}