lance_encoding/previous/encodings/
physical.rs1use arrow_schema::DataType;
5use lance_arrow::DataTypeExt;
6
7use crate::{
8 buffer::LanceBuffer,
9 decoder::{PageBuffers, PageScheduler},
10 encodings::physical::block::{CompressionConfig, CompressionScheme},
11 format::pb::{self, PackedStruct},
12 previous::encodings::physical::{
13 basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
14 dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
15 fsst::FsstPageScheduler, packed_struct::PackedStructPageScheduler,
16 value::ValuePageScheduler,
17 },
18};
19
20pub mod basic;
21pub mod binary;
22pub mod bitmap;
23#[cfg(feature = "bitpacking")]
24pub mod bitpack;
25pub mod block;
26pub mod dictionary;
27pub mod fixed_size_binary;
28pub mod fixed_size_list;
29pub mod fsst;
30pub mod packed_struct;
31pub mod value;
32
33fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
36 let index = buffer_desc.buffer_index as usize;
37
38 match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
39 pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
40 pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
41 pb::buffer::BufferType::File => {
42 buffers.column_buffers.file_buffers.positions_and_sizes[index]
43 }
44 }
45}
46
47fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
49 let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
50 let compression_config: CompressionConfig = match encoding.compression.as_ref() {
51 None => CompressionConfig::new(CompressionScheme::None, None),
52 Some(compression) => CompressionConfig::new(
53 compression.scheme.as_str().parse().unwrap(),
54 compression.level,
55 ),
56 };
57 match encoding.bits_per_value {
58 1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
59 bits_per_value => {
60 if bits_per_value % 8 != 0 {
61 todo!(
62 "bits_per_value ({}) that is not a multiple of 8",
63 bits_per_value
64 );
65 }
66 Box::new(ValuePageScheduler::new(
67 bits_per_value / 8,
68 buffer_offset,
69 buffer_size,
70 compression_config,
71 ))
72 }
73 }
74}
75
76#[cfg(feature = "bitpacking")]
77fn get_bitpacked_buffer_decoder(
78 encoding: &pb::Bitpacked,
79 buffers: &PageBuffers,
80) -> Box<dyn PageScheduler> {
81 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
82
83 Box::new(bitpack::BitpackedScheduler::new(
84 encoding.compressed_bits_per_value,
85 encoding.uncompressed_bits_per_value,
86 buffer_offset,
87 encoding.signed,
88 ))
89}
90
91#[cfg(feature = "bitpacking")]
92fn get_bitpacked_for_non_neg_buffer_decoder(
93 encoding: &pb::BitpackedForNonNeg,
94 buffers: &PageBuffers,
95) -> Box<dyn PageScheduler> {
96 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
97
98 Box::new(bitpack::BitpackedForNonNegScheduler::new(
99 encoding.compressed_bits_per_value,
100 encoding.uncompressed_bits_per_value,
101 buffer_offset,
102 ))
103}
104
105fn decoder_from_packed_struct(
106 packed_struct: &PackedStruct,
107 buffers: &PageBuffers,
108 data_type: &DataType,
109) -> Box<dyn PageScheduler> {
110 let inner_encodings = &packed_struct.inner;
111 let fields = match data_type {
112 DataType::Struct(fields) => Some(fields),
113 _ => None,
114 }
115 .unwrap();
116
117 let inner_datatypes = fields
118 .iter()
119 .map(|field| field.data_type())
120 .collect::<Vec<_>>();
121
122 let mut inner_schedulers = Vec::with_capacity(fields.len());
123 for i in 0..fields.len() {
124 let inner_encoding = &inner_encodings[i];
125 let inner_datatype = inner_datatypes[i];
126 let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
127 inner_schedulers.push(inner_scheduler);
128 }
129
130 let packed_buffer = packed_struct.buffer.as_ref().unwrap();
131 let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
132
133 Box::new(PackedStructPageScheduler::new(
134 inner_schedulers,
135 data_type.clone(),
136 buffer_offset,
137 ))
138}
139
140pub fn decoder_from_array_encoding(
142 encoding: &pb::ArrayEncoding,
143 buffers: &PageBuffers,
144 data_type: &DataType,
145) -> Box<dyn PageScheduler> {
146 match encoding.array_encoding.as_ref().unwrap() {
147 pb::array_encoding::ArrayEncoding::Nullable(basic) => {
148 match basic.nullability.as_ref().unwrap() {
149 pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
150 BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
151 no_nulls.values.as_ref().unwrap(),
152 buffers,
153 data_type,
154 )),
155 ),
156 pb::nullable::Nullability::SomeNulls(some_nulls) => {
157 Box::new(BasicPageScheduler::new_nullable(
158 decoder_from_array_encoding(
159 some_nulls.validity.as_ref().unwrap(),
160 buffers,
161 data_type,
162 ),
163 decoder_from_array_encoding(
164 some_nulls.values.as_ref().unwrap(),
165 buffers,
166 data_type,
167 ),
168 ))
169 }
170 pb::nullable::Nullability::AllNulls(_) => {
171 Box::new(BasicPageScheduler::new_all_null())
172 }
173 }
174 }
175 #[cfg(feature = "bitpacking")]
176 pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
177 get_bitpacked_buffer_decoder(bitpacked, buffers)
178 }
179 #[cfg(not(feature = "bitpacking"))]
180 pb::array_encoding::ArrayEncoding::Bitpacked(_) => {
181 panic!("Runtime built without bitpacking support")
182 }
183 pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
184 pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
185 let item_encoding = fixed_size_list.items.as_ref().unwrap();
186 let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
187 Box::new(FixedListScheduler::new(
188 item_scheduler,
189 fixed_size_list.dimension,
190 ))
191 }
192 pb::array_encoding::ArrayEncoding::List(list) => {
196 decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
197 }
198 pb::array_encoding::ArrayEncoding::Binary(binary) => {
199 let indices_encoding = binary.indices.as_ref().unwrap();
200 let bytes_encoding = binary.bytes.as_ref().unwrap();
201
202 let indices_scheduler =
203 decoder_from_array_encoding(indices_encoding, buffers, data_type);
204 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
205
206 let offset_type = match data_type {
207 DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
208 _ => DataType::Int32,
209 };
210
211 Box::new(BinaryPageScheduler::new(
212 indices_scheduler.into(),
213 bytes_scheduler.into(),
214 offset_type,
215 binary.null_adjustment,
216 ))
217 }
218 pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
219 let inner =
220 decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
221
222 Box::new(FsstPageScheduler::new(
223 inner,
224 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
225 ))
226 }
227 pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
228 let indices_encoding = dictionary.indices.as_ref().unwrap();
229 let items_encoding = dictionary.items.as_ref().unwrap();
230 let num_dictionary_items = dictionary.num_dictionary_items;
231
232 let value_type = if let DataType::Dictionary(_, value_type) = data_type {
236 value_type
237 } else {
238 data_type
239 };
240
241 let indices_scheduler =
245 decoder_from_array_encoding(indices_encoding, buffers, data_type);
246
247 let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
248
249 let should_decode_dict = !data_type.is_dictionary();
250
251 Box::new(DictionaryPageScheduler::new(
252 indices_scheduler.into(),
253 items_scheduler.into(),
254 num_dictionary_items,
255 should_decode_dict,
256 ))
257 }
258 pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
259 let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
260 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
261 let bytes_per_offset = match data_type {
262 DataType::LargeBinary | DataType::LargeUtf8 => 8,
263 DataType::Binary | DataType::Utf8 => 4,
264 _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
265 };
266
267 Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
268 bytes_scheduler,
269 fixed_size_binary.byte_width,
270 bytes_per_offset,
271 ))
272 }
273 pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
274 decoder_from_packed_struct(packed_struct, buffers, data_type)
275 }
276 #[cfg(feature = "bitpacking")]
277 pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
278 get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
279 }
280 #[cfg(not(feature = "bitpacking"))]
281 pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(_) => {
282 panic!("Runtime built without bitpacking support")
283 }
284 pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
289 _ => unreachable!("Unsupported array encoding: {:?}", encoding),
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use crate::decoder::{ColumnBuffers, FileBuffers, PageBuffers};
297 use crate::format::pb;
298 use crate::previous::encodings::physical::get_buffer_decoder;
299
300 #[test]
301 fn test_get_buffer_decoder_for_compressed_buffer() {
302 let page_scheduler = get_buffer_decoder(
303 &pb::Flat {
304 buffer: Some(pb::Buffer {
305 buffer_index: 0,
306 buffer_type: pb::buffer::BufferType::File as i32,
307 }),
308 bits_per_value: 8,
309 compression: Some(pb::Compression {
310 scheme: "zstd".to_string(),
311 level: Some(0),
312 }),
313 },
314 &PageBuffers {
315 column_buffers: ColumnBuffers {
316 file_buffers: FileBuffers {
317 positions_and_sizes: &[(0, 100)],
318 },
319 positions_and_sizes: &[],
320 },
321 positions_and_sizes: &[],
322 },
323 );
324 assert_eq!(
325 format!("{:?}", page_scheduler).as_str(),
326 "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }"
327 );
328 }
329}