lance_encoding/previous/encodings/
physical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use lance_arrow::DataTypeExt;
6
7use crate::{
8    buffer::LanceBuffer,
9    decoder::{PageBuffers, PageScheduler},
10    encodings::physical::block::{CompressionConfig, CompressionScheme},
11    format::pb::{self, PackedStruct},
12    previous::encodings::physical::{
13        basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
14        dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
15        fsst::FsstPageScheduler, packed_struct::PackedStructPageScheduler,
16        value::ValuePageScheduler,
17    },
18};
19
20pub mod basic;
21pub mod binary;
22pub mod bitmap;
23pub mod bitpack;
24pub mod block;
25pub mod dictionary;
26pub mod fixed_size_binary;
27pub mod fixed_size_list;
28pub mod fsst;
29pub mod packed_struct;
30pub mod value;
31
32// Translate a protobuf buffer description into a position in the file.  This could be a page
33// buffer, a column buffer, or a file buffer.
34fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
35    let index = buffer_desc.buffer_index as usize;
36
37    match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
38        pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
39        pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
40        pb::buffer::BufferType::File => {
41            buffers.column_buffers.file_buffers.positions_and_sizes[index]
42        }
43    }
44}
45
46/// Convert a protobuf buffer encoding into a physical page scheduler
47fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
48    let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
49    let compression_config: CompressionConfig = if encoding.compression.is_none() {
50        CompressionConfig::new(CompressionScheme::None, None)
51    } else {
52        let compression = encoding.compression.as_ref().unwrap();
53        CompressionConfig::new(
54            compression.scheme.as_str().parse().unwrap(),
55            compression.level,
56        )
57    };
58    match encoding.bits_per_value {
59        1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
60        bits_per_value => {
61            if bits_per_value % 8 != 0 {
62                todo!(
63                    "bits_per_value ({}) that is not a multiple of 8",
64                    bits_per_value
65                );
66            }
67            Box::new(ValuePageScheduler::new(
68                bits_per_value / 8,
69                buffer_offset,
70                buffer_size,
71                compression_config,
72            ))
73        }
74    }
75}
76
77fn get_bitpacked_buffer_decoder(
78    encoding: &pb::Bitpacked,
79    buffers: &PageBuffers,
80) -> Box<dyn PageScheduler> {
81    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
82
83    Box::new(bitpack::BitpackedScheduler::new(
84        encoding.compressed_bits_per_value,
85        encoding.uncompressed_bits_per_value,
86        buffer_offset,
87        encoding.signed,
88    ))
89}
90
91fn get_bitpacked_for_non_neg_buffer_decoder(
92    encoding: &pb::BitpackedForNonNeg,
93    buffers: &PageBuffers,
94) -> Box<dyn PageScheduler> {
95    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
96
97    Box::new(bitpack::BitpackedForNonNegScheduler::new(
98        encoding.compressed_bits_per_value,
99        encoding.uncompressed_bits_per_value,
100        buffer_offset,
101    ))
102}
103
104fn decoder_from_packed_struct(
105    packed_struct: &PackedStruct,
106    buffers: &PageBuffers,
107    data_type: &DataType,
108) -> Box<dyn PageScheduler> {
109    let inner_encodings = &packed_struct.inner;
110    let fields = match data_type {
111        DataType::Struct(fields) => Some(fields),
112        _ => None,
113    }
114    .unwrap();
115
116    let inner_datatypes = fields
117        .iter()
118        .map(|field| field.data_type())
119        .collect::<Vec<_>>();
120
121    let mut inner_schedulers = Vec::with_capacity(fields.len());
122    for i in 0..fields.len() {
123        let inner_encoding = &inner_encodings[i];
124        let inner_datatype = inner_datatypes[i];
125        let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
126        inner_schedulers.push(inner_scheduler);
127    }
128
129    let packed_buffer = packed_struct.buffer.as_ref().unwrap();
130    let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
131
132    Box::new(PackedStructPageScheduler::new(
133        inner_schedulers,
134        data_type.clone(),
135        buffer_offset,
136    ))
137}
138
139/// Convert a protobuf array encoding into a physical page scheduler
140pub fn decoder_from_array_encoding(
141    encoding: &pb::ArrayEncoding,
142    buffers: &PageBuffers,
143    data_type: &DataType,
144) -> Box<dyn PageScheduler> {
145    match encoding.array_encoding.as_ref().unwrap() {
146        pb::array_encoding::ArrayEncoding::Nullable(basic) => {
147            match basic.nullability.as_ref().unwrap() {
148                pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
149                    BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
150                        no_nulls.values.as_ref().unwrap(),
151                        buffers,
152                        data_type,
153                    )),
154                ),
155                pb::nullable::Nullability::SomeNulls(some_nulls) => {
156                    Box::new(BasicPageScheduler::new_nullable(
157                        decoder_from_array_encoding(
158                            some_nulls.validity.as_ref().unwrap(),
159                            buffers,
160                            data_type,
161                        ),
162                        decoder_from_array_encoding(
163                            some_nulls.values.as_ref().unwrap(),
164                            buffers,
165                            data_type,
166                        ),
167                    ))
168                }
169                pb::nullable::Nullability::AllNulls(_) => {
170                    Box::new(BasicPageScheduler::new_all_null())
171                }
172            }
173        }
174        pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
175            get_bitpacked_buffer_decoder(bitpacked, buffers)
176        }
177        pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
178        pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
179            let item_encoding = fixed_size_list.items.as_ref().unwrap();
180            let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
181            Box::new(FixedListScheduler::new(
182                item_scheduler,
183                fixed_size_list.dimension,
184            ))
185        }
186        // This is a column containing the list offsets.  This wrapper is superfluous at the moment
187        // since we know it is a list based on the schema.  In the future there may be different ways
188        // of storing the list offsets.
189        pb::array_encoding::ArrayEncoding::List(list) => {
190            decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
191        }
192        pb::array_encoding::ArrayEncoding::Binary(binary) => {
193            let indices_encoding = binary.indices.as_ref().unwrap();
194            let bytes_encoding = binary.bytes.as_ref().unwrap();
195
196            let indices_scheduler =
197                decoder_from_array_encoding(indices_encoding, buffers, data_type);
198            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
199
200            let offset_type = match data_type {
201                DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
202                _ => DataType::Int32,
203            };
204
205            Box::new(BinaryPageScheduler::new(
206                indices_scheduler.into(),
207                bytes_scheduler.into(),
208                offset_type,
209                binary.null_adjustment,
210            ))
211        }
212        pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
213            let inner =
214                decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
215
216            Box::new(FsstPageScheduler::new(
217                inner,
218                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
219            ))
220        }
221        pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
222            let indices_encoding = dictionary.indices.as_ref().unwrap();
223            let items_encoding = dictionary.items.as_ref().unwrap();
224            let num_dictionary_items = dictionary.num_dictionary_items;
225
226            // We can get here in 2 ways.  The data is dictionary encoded and the user wants a dictionary or
227            // the data is dictionary encoded, as an optimization, and the user wants the value type.  Figure
228            // out the value type.
229            let value_type = if let DataType::Dictionary(_, value_type) = data_type {
230                value_type
231            } else {
232                data_type
233            };
234
235            // Note: we don't actually know the indices type here, passing down `data_type` works ok because
236            // the dictionary indices are always integers and we don't need the data_type to figure out how
237            // to decode integers.
238            let indices_scheduler =
239                decoder_from_array_encoding(indices_encoding, buffers, data_type);
240
241            let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
242
243            let should_decode_dict = !data_type.is_dictionary();
244
245            Box::new(DictionaryPageScheduler::new(
246                indices_scheduler.into(),
247                items_scheduler.into(),
248                num_dictionary_items,
249                should_decode_dict,
250            ))
251        }
252        pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
253            let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
254            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
255            let bytes_per_offset = match data_type {
256                DataType::LargeBinary | DataType::LargeUtf8 => 8,
257                DataType::Binary | DataType::Utf8 => 4,
258                _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
259            };
260
261            Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
262                bytes_scheduler,
263                fixed_size_binary.byte_width,
264                bytes_per_offset,
265            ))
266        }
267        pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
268            decoder_from_packed_struct(packed_struct, buffers, data_type)
269        }
270        pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
271            get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
272        }
273        // Currently there is no way to encode struct nullability and structs are encoded with a "header" column
274        // (that has no data).  We never actually decode that column and so this branch is never actually encountered.
275        //
276        // This will change in the future when we add support for struct nullability.
277        pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
278        // 2.1 only
279        _ => unreachable!("Unsupported array encoding: {:?}", encoding),
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use crate::decoder::{ColumnBuffers, FileBuffers, PageBuffers};
286    use crate::format::pb;
287    use crate::previous::encodings::physical::get_buffer_decoder;
288
289    #[test]
290    fn test_get_buffer_decoder_for_compressed_buffer() {
291        let page_scheduler = get_buffer_decoder(
292            &pb::Flat {
293                buffer: Some(pb::Buffer {
294                    buffer_index: 0,
295                    buffer_type: pb::buffer::BufferType::File as i32,
296                }),
297                bits_per_value: 8,
298                compression: Some(pb::Compression {
299                    scheme: "zstd".to_string(),
300                    level: Some(0),
301                }),
302            },
303            &PageBuffers {
304                column_buffers: ColumnBuffers {
305                    file_buffers: FileBuffers {
306                        positions_and_sizes: &[(0, 100)],
307                    },
308                    positions_and_sizes: &[],
309                },
310                positions_and_sizes: &[],
311            },
312        );
313        assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
314    }
315}