Skip to main content

lance_encoding/previous/encodings/
physical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use lance_arrow::DataTypeExt;
6
7use crate::{
8    buffer::LanceBuffer,
9    decoder::{PageBuffers, PageScheduler},
10    encodings::physical::block::{CompressionConfig, CompressionScheme},
11    format::pb::{self, PackedStruct},
12    previous::encodings::physical::{
13        basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
14        dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
15        fsst::FsstPageScheduler, packed_struct::PackedStructPageScheduler,
16        value::ValuePageScheduler,
17    },
18};
19
20pub mod basic;
21pub mod binary;
22pub mod bitmap;
23#[cfg(feature = "bitpacking")]
24pub mod bitpack;
25pub mod block;
26pub mod dictionary;
27pub mod fixed_size_binary;
28pub mod fixed_size_list;
29pub mod fsst;
30pub mod packed_struct;
31pub mod value;
32
33// Translate a protobuf buffer description into a position in the file.  This could be a page
34// buffer, a column buffer, or a file buffer.
35fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
36    let index = buffer_desc.buffer_index as usize;
37
38    match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
39        pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
40        pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
41        pb::buffer::BufferType::File => {
42            buffers.column_buffers.file_buffers.positions_and_sizes[index]
43        }
44    }
45}
46
47/// Convert a protobuf buffer encoding into a physical page scheduler
48fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
49    let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
50    let compression_config: CompressionConfig = match encoding.compression.as_ref() {
51        None => CompressionConfig::new(CompressionScheme::None, None),
52        Some(compression) => CompressionConfig::new(
53            compression.scheme.as_str().parse().unwrap(),
54            compression.level,
55        ),
56    };
57    match encoding.bits_per_value {
58        1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
59        bits_per_value => {
60            if bits_per_value % 8 != 0 {
61                todo!(
62                    "bits_per_value ({}) that is not a multiple of 8",
63                    bits_per_value
64                );
65            }
66            Box::new(ValuePageScheduler::new(
67                bits_per_value / 8,
68                buffer_offset,
69                buffer_size,
70                compression_config,
71            ))
72        }
73    }
74}
75
76#[cfg(feature = "bitpacking")]
77fn get_bitpacked_buffer_decoder(
78    encoding: &pb::Bitpacked,
79    buffers: &PageBuffers,
80) -> Box<dyn PageScheduler> {
81    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
82
83    Box::new(bitpack::BitpackedScheduler::new(
84        encoding.compressed_bits_per_value,
85        encoding.uncompressed_bits_per_value,
86        buffer_offset,
87        encoding.signed,
88    ))
89}
90
91#[cfg(feature = "bitpacking")]
92fn get_bitpacked_for_non_neg_buffer_decoder(
93    encoding: &pb::BitpackedForNonNeg,
94    buffers: &PageBuffers,
95) -> Box<dyn PageScheduler> {
96    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
97
98    Box::new(bitpack::BitpackedForNonNegScheduler::new(
99        encoding.compressed_bits_per_value,
100        encoding.uncompressed_bits_per_value,
101        buffer_offset,
102    ))
103}
104
105fn decoder_from_packed_struct(
106    packed_struct: &PackedStruct,
107    buffers: &PageBuffers,
108    data_type: &DataType,
109) -> Box<dyn PageScheduler> {
110    let inner_encodings = &packed_struct.inner;
111    let fields = match data_type {
112        DataType::Struct(fields) => Some(fields),
113        _ => None,
114    }
115    .unwrap();
116
117    let inner_datatypes = fields
118        .iter()
119        .map(|field| field.data_type())
120        .collect::<Vec<_>>();
121
122    let mut inner_schedulers = Vec::with_capacity(fields.len());
123    for i in 0..fields.len() {
124        let inner_encoding = &inner_encodings[i];
125        let inner_datatype = inner_datatypes[i];
126        let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
127        inner_schedulers.push(inner_scheduler);
128    }
129
130    let packed_buffer = packed_struct.buffer.as_ref().unwrap();
131    let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
132
133    Box::new(PackedStructPageScheduler::new(
134        inner_schedulers,
135        data_type.clone(),
136        buffer_offset,
137    ))
138}
139
140/// Convert a protobuf array encoding into a physical page scheduler
141pub fn decoder_from_array_encoding(
142    encoding: &pb::ArrayEncoding,
143    buffers: &PageBuffers,
144    data_type: &DataType,
145) -> Box<dyn PageScheduler> {
146    match encoding.array_encoding.as_ref().unwrap() {
147        pb::array_encoding::ArrayEncoding::Nullable(basic) => {
148            match basic.nullability.as_ref().unwrap() {
149                pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
150                    BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
151                        no_nulls.values.as_ref().unwrap(),
152                        buffers,
153                        data_type,
154                    )),
155                ),
156                pb::nullable::Nullability::SomeNulls(some_nulls) => {
157                    Box::new(BasicPageScheduler::new_nullable(
158                        decoder_from_array_encoding(
159                            some_nulls.validity.as_ref().unwrap(),
160                            buffers,
161                            data_type,
162                        ),
163                        decoder_from_array_encoding(
164                            some_nulls.values.as_ref().unwrap(),
165                            buffers,
166                            data_type,
167                        ),
168                    ))
169                }
170                pb::nullable::Nullability::AllNulls(_) => {
171                    Box::new(BasicPageScheduler::new_all_null())
172                }
173            }
174        }
175        #[cfg(feature = "bitpacking")]
176        pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
177            get_bitpacked_buffer_decoder(bitpacked, buffers)
178        }
179        #[cfg(not(feature = "bitpacking"))]
180        pb::array_encoding::ArrayEncoding::Bitpacked(_) => {
181            panic!("Runtime built without bitpacking support")
182        }
183        pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
184        pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
185            let item_encoding = fixed_size_list.items.as_ref().unwrap();
186            let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
187            Box::new(FixedListScheduler::new(
188                item_scheduler,
189                fixed_size_list.dimension,
190            ))
191        }
192        // This is a column containing the list offsets.  This wrapper is superfluous at the moment
193        // since we know it is a list based on the schema.  In the future there may be different ways
194        // of storing the list offsets.
195        pb::array_encoding::ArrayEncoding::List(list) => {
196            decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
197        }
198        pb::array_encoding::ArrayEncoding::Binary(binary) => {
199            let indices_encoding = binary.indices.as_ref().unwrap();
200            let bytes_encoding = binary.bytes.as_ref().unwrap();
201
202            let indices_scheduler =
203                decoder_from_array_encoding(indices_encoding, buffers, data_type);
204            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
205
206            let offset_type = match data_type {
207                DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
208                _ => DataType::Int32,
209            };
210
211            Box::new(BinaryPageScheduler::new(
212                indices_scheduler.into(),
213                bytes_scheduler.into(),
214                offset_type,
215                binary.null_adjustment,
216            ))
217        }
218        pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
219            let inner =
220                decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
221
222            Box::new(FsstPageScheduler::new(
223                inner,
224                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
225            ))
226        }
227        pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
228            let indices_encoding = dictionary.indices.as_ref().unwrap();
229            let items_encoding = dictionary.items.as_ref().unwrap();
230            let num_dictionary_items = dictionary.num_dictionary_items;
231
232            // We can get here in 2 ways.  The data is dictionary encoded and the user wants a dictionary or
233            // the data is dictionary encoded, as an optimization, and the user wants the value type.  Figure
234            // out the value type.
235            let value_type = if let DataType::Dictionary(_, value_type) = data_type {
236                value_type
237            } else {
238                data_type
239            };
240
241            // Note: we don't actually know the indices type here, passing down `data_type` works ok because
242            // the dictionary indices are always integers and we don't need the data_type to figure out how
243            // to decode integers.
244            let indices_scheduler =
245                decoder_from_array_encoding(indices_encoding, buffers, data_type);
246
247            let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
248
249            let should_decode_dict = !data_type.is_dictionary();
250
251            Box::new(DictionaryPageScheduler::new(
252                indices_scheduler.into(),
253                items_scheduler.into(),
254                num_dictionary_items,
255                should_decode_dict,
256            ))
257        }
258        pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
259            let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
260            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
261            let bytes_per_offset = match data_type {
262                DataType::LargeBinary | DataType::LargeUtf8 => 8,
263                DataType::Binary | DataType::Utf8 => 4,
264                _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
265            };
266
267            Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
268                bytes_scheduler,
269                fixed_size_binary.byte_width,
270                bytes_per_offset,
271            ))
272        }
273        pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
274            decoder_from_packed_struct(packed_struct, buffers, data_type)
275        }
276        #[cfg(feature = "bitpacking")]
277        pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
278            get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
279        }
280        #[cfg(not(feature = "bitpacking"))]
281        pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(_) => {
282            panic!("Runtime built without bitpacking support")
283        }
284        // Currently there is no way to encode struct nullability and structs are encoded with a "header" column
285        // (that has no data).  We never actually decode that column and so this branch is never actually encountered.
286        //
287        // This will change in the future when we add support for struct nullability.
288        pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
289        // 2.1 only
290        _ => unreachable!("Unsupported array encoding: {:?}", encoding),
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use crate::decoder::{ColumnBuffers, FileBuffers, PageBuffers};
297    use crate::format::pb;
298    use crate::previous::encodings::physical::get_buffer_decoder;
299
300    #[test]
301    fn test_get_buffer_decoder_for_compressed_buffer() {
302        let page_scheduler = get_buffer_decoder(
303            &pb::Flat {
304                buffer: Some(pb::Buffer {
305                    buffer_index: 0,
306                    buffer_type: pb::buffer::BufferType::File as i32,
307                }),
308                bits_per_value: 8,
309                compression: Some(pb::Compression {
310                    scheme: "zstd".to_string(),
311                    level: Some(0),
312                }),
313            },
314            &PageBuffers {
315                column_buffers: ColumnBuffers {
316                    file_buffers: FileBuffers {
317                        positions_and_sizes: &[(0, 100)],
318                    },
319                    positions_and_sizes: &[],
320                },
321                positions_and_sizes: &[],
322            },
323        );
324        assert_eq!(
325            format!("{:?}", page_scheduler).as_str(),
326            "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }"
327        );
328    }
329}