lance_encoding/previous/encodings/
physical.rs1use arrow_schema::DataType;
5use lance_arrow::DataTypeExt;
6
7use crate::{
8 buffer::LanceBuffer,
9 decoder::{PageBuffers, PageScheduler},
10 encodings::physical::block::{CompressionConfig, CompressionScheme},
11 format::pb::{self, PackedStruct},
12 previous::encodings::physical::{
13 basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
14 dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
15 fsst::FsstPageScheduler, packed_struct::PackedStructPageScheduler,
16 value::ValuePageScheduler,
17 },
18};
19
20pub mod basic;
21pub mod binary;
22pub mod bitmap;
23pub mod bitpack;
24pub mod block;
25pub mod dictionary;
26pub mod fixed_size_binary;
27pub mod fixed_size_list;
28pub mod fsst;
29pub mod packed_struct;
30pub mod value;
31
32fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
35 let index = buffer_desc.buffer_index as usize;
36
37 match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
38 pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
39 pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
40 pb::buffer::BufferType::File => {
41 buffers.column_buffers.file_buffers.positions_and_sizes[index]
42 }
43 }
44}
45
46fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
48 let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
49 let compression_config: CompressionConfig = if encoding.compression.is_none() {
50 CompressionConfig::new(CompressionScheme::None, None)
51 } else {
52 let compression = encoding.compression.as_ref().unwrap();
53 CompressionConfig::new(
54 compression.scheme.as_str().parse().unwrap(),
55 compression.level,
56 )
57 };
58 match encoding.bits_per_value {
59 1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
60 bits_per_value => {
61 if bits_per_value % 8 != 0 {
62 todo!(
63 "bits_per_value ({}) that is not a multiple of 8",
64 bits_per_value
65 );
66 }
67 Box::new(ValuePageScheduler::new(
68 bits_per_value / 8,
69 buffer_offset,
70 buffer_size,
71 compression_config,
72 ))
73 }
74 }
75}
76
77fn get_bitpacked_buffer_decoder(
78 encoding: &pb::Bitpacked,
79 buffers: &PageBuffers,
80) -> Box<dyn PageScheduler> {
81 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
82
83 Box::new(bitpack::BitpackedScheduler::new(
84 encoding.compressed_bits_per_value,
85 encoding.uncompressed_bits_per_value,
86 buffer_offset,
87 encoding.signed,
88 ))
89}
90
91fn get_bitpacked_for_non_neg_buffer_decoder(
92 encoding: &pb::BitpackedForNonNeg,
93 buffers: &PageBuffers,
94) -> Box<dyn PageScheduler> {
95 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
96
97 Box::new(bitpack::BitpackedForNonNegScheduler::new(
98 encoding.compressed_bits_per_value,
99 encoding.uncompressed_bits_per_value,
100 buffer_offset,
101 ))
102}
103
104fn decoder_from_packed_struct(
105 packed_struct: &PackedStruct,
106 buffers: &PageBuffers,
107 data_type: &DataType,
108) -> Box<dyn PageScheduler> {
109 let inner_encodings = &packed_struct.inner;
110 let fields = match data_type {
111 DataType::Struct(fields) => Some(fields),
112 _ => None,
113 }
114 .unwrap();
115
116 let inner_datatypes = fields
117 .iter()
118 .map(|field| field.data_type())
119 .collect::<Vec<_>>();
120
121 let mut inner_schedulers = Vec::with_capacity(fields.len());
122 for i in 0..fields.len() {
123 let inner_encoding = &inner_encodings[i];
124 let inner_datatype = inner_datatypes[i];
125 let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
126 inner_schedulers.push(inner_scheduler);
127 }
128
129 let packed_buffer = packed_struct.buffer.as_ref().unwrap();
130 let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
131
132 Box::new(PackedStructPageScheduler::new(
133 inner_schedulers,
134 data_type.clone(),
135 buffer_offset,
136 ))
137}
138
139pub fn decoder_from_array_encoding(
141 encoding: &pb::ArrayEncoding,
142 buffers: &PageBuffers,
143 data_type: &DataType,
144) -> Box<dyn PageScheduler> {
145 match encoding.array_encoding.as_ref().unwrap() {
146 pb::array_encoding::ArrayEncoding::Nullable(basic) => {
147 match basic.nullability.as_ref().unwrap() {
148 pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
149 BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
150 no_nulls.values.as_ref().unwrap(),
151 buffers,
152 data_type,
153 )),
154 ),
155 pb::nullable::Nullability::SomeNulls(some_nulls) => {
156 Box::new(BasicPageScheduler::new_nullable(
157 decoder_from_array_encoding(
158 some_nulls.validity.as_ref().unwrap(),
159 buffers,
160 data_type,
161 ),
162 decoder_from_array_encoding(
163 some_nulls.values.as_ref().unwrap(),
164 buffers,
165 data_type,
166 ),
167 ))
168 }
169 pb::nullable::Nullability::AllNulls(_) => {
170 Box::new(BasicPageScheduler::new_all_null())
171 }
172 }
173 }
174 pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
175 get_bitpacked_buffer_decoder(bitpacked, buffers)
176 }
177 pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
178 pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
179 let item_encoding = fixed_size_list.items.as_ref().unwrap();
180 let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
181 Box::new(FixedListScheduler::new(
182 item_scheduler,
183 fixed_size_list.dimension,
184 ))
185 }
186 pb::array_encoding::ArrayEncoding::List(list) => {
190 decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
191 }
192 pb::array_encoding::ArrayEncoding::Binary(binary) => {
193 let indices_encoding = binary.indices.as_ref().unwrap();
194 let bytes_encoding = binary.bytes.as_ref().unwrap();
195
196 let indices_scheduler =
197 decoder_from_array_encoding(indices_encoding, buffers, data_type);
198 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
199
200 let offset_type = match data_type {
201 DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
202 _ => DataType::Int32,
203 };
204
205 Box::new(BinaryPageScheduler::new(
206 indices_scheduler.into(),
207 bytes_scheduler.into(),
208 offset_type,
209 binary.null_adjustment,
210 ))
211 }
212 pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
213 let inner =
214 decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
215
216 Box::new(FsstPageScheduler::new(
217 inner,
218 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
219 ))
220 }
221 pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
222 let indices_encoding = dictionary.indices.as_ref().unwrap();
223 let items_encoding = dictionary.items.as_ref().unwrap();
224 let num_dictionary_items = dictionary.num_dictionary_items;
225
226 let value_type = if let DataType::Dictionary(_, value_type) = data_type {
230 value_type
231 } else {
232 data_type
233 };
234
235 let indices_scheduler =
239 decoder_from_array_encoding(indices_encoding, buffers, data_type);
240
241 let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
242
243 let should_decode_dict = !data_type.is_dictionary();
244
245 Box::new(DictionaryPageScheduler::new(
246 indices_scheduler.into(),
247 items_scheduler.into(),
248 num_dictionary_items,
249 should_decode_dict,
250 ))
251 }
252 pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
253 let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
254 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
255 let bytes_per_offset = match data_type {
256 DataType::LargeBinary | DataType::LargeUtf8 => 8,
257 DataType::Binary | DataType::Utf8 => 4,
258 _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
259 };
260
261 Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
262 bytes_scheduler,
263 fixed_size_binary.byte_width,
264 bytes_per_offset,
265 ))
266 }
267 pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
268 decoder_from_packed_struct(packed_struct, buffers, data_type)
269 }
270 pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
271 get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
272 }
273 pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
278 _ => unreachable!("Unsupported array encoding: {:?}", encoding),
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use crate::decoder::{ColumnBuffers, FileBuffers, PageBuffers};
286 use crate::format::pb;
287 use crate::previous::encodings::physical::get_buffer_decoder;
288
289 #[test]
290 fn test_get_buffer_decoder_for_compressed_buffer() {
291 let page_scheduler = get_buffer_decoder(
292 &pb::Flat {
293 buffer: Some(pb::Buffer {
294 buffer_index: 0,
295 buffer_type: pb::buffer::BufferType::File as i32,
296 }),
297 bits_per_value: 8,
298 compression: Some(pb::Compression {
299 scheme: "zstd".to_string(),
300 level: Some(0),
301 }),
302 },
303 &PageBuffers {
304 column_buffers: ColumnBuffers {
305 file_buffers: FileBuffers {
306 positions_and_sizes: &[(0, 100)],
307 },
308 positions_and_sizes: &[],
309 },
310 positions_and_sizes: &[],
311 },
312 );
313 assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
314 }
315}