lance_encoding/previous/encodings/
physical.rs1use arrow_schema::DataType;
5use lance_arrow::DataTypeExt;
6
7use crate::{
8 buffer::LanceBuffer,
9 decoder::{PageBuffers, PageScheduler},
10 encodings::physical::block::{CompressionConfig, CompressionScheme},
11 format::pb::{self, PackedStruct},
12 previous::encodings::physical::{
13 basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
14 dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
15 fsst::FsstPageScheduler, packed_struct::PackedStructPageScheduler,
16 value::ValuePageScheduler,
17 },
18};
19
20pub mod basic;
21pub mod binary;
22pub mod bitmap;
23#[cfg(feature = "bitpacking")]
24pub mod bitpack;
25pub mod block;
26pub mod dictionary;
27pub mod fixed_size_binary;
28pub mod fixed_size_list;
29pub mod fsst;
30pub mod packed_struct;
31pub mod value;
32
33fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
36 let index = buffer_desc.buffer_index as usize;
37
38 match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
39 pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
40 pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
41 pb::buffer::BufferType::File => {
42 buffers.column_buffers.file_buffers.positions_and_sizes[index]
43 }
44 }
45}
46
47fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
49 let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
50 let compression_config: CompressionConfig = if encoding.compression.is_none() {
51 CompressionConfig::new(CompressionScheme::None, None)
52 } else {
53 let compression = encoding.compression.as_ref().unwrap();
54 CompressionConfig::new(
55 compression.scheme.as_str().parse().unwrap(),
56 compression.level,
57 )
58 };
59 match encoding.bits_per_value {
60 1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
61 bits_per_value => {
62 if bits_per_value % 8 != 0 {
63 todo!(
64 "bits_per_value ({}) that is not a multiple of 8",
65 bits_per_value
66 );
67 }
68 Box::new(ValuePageScheduler::new(
69 bits_per_value / 8,
70 buffer_offset,
71 buffer_size,
72 compression_config,
73 ))
74 }
75 }
76}
77
78#[cfg(feature = "bitpacking")]
79fn get_bitpacked_buffer_decoder(
80 encoding: &pb::Bitpacked,
81 buffers: &PageBuffers,
82) -> Box<dyn PageScheduler> {
83 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
84
85 Box::new(bitpack::BitpackedScheduler::new(
86 encoding.compressed_bits_per_value,
87 encoding.uncompressed_bits_per_value,
88 buffer_offset,
89 encoding.signed,
90 ))
91}
92
93#[cfg(feature = "bitpacking")]
94fn get_bitpacked_for_non_neg_buffer_decoder(
95 encoding: &pb::BitpackedForNonNeg,
96 buffers: &PageBuffers,
97) -> Box<dyn PageScheduler> {
98 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
99
100 Box::new(bitpack::BitpackedForNonNegScheduler::new(
101 encoding.compressed_bits_per_value,
102 encoding.uncompressed_bits_per_value,
103 buffer_offset,
104 ))
105}
106
107fn decoder_from_packed_struct(
108 packed_struct: &PackedStruct,
109 buffers: &PageBuffers,
110 data_type: &DataType,
111) -> Box<dyn PageScheduler> {
112 let inner_encodings = &packed_struct.inner;
113 let fields = match data_type {
114 DataType::Struct(fields) => Some(fields),
115 _ => None,
116 }
117 .unwrap();
118
119 let inner_datatypes = fields
120 .iter()
121 .map(|field| field.data_type())
122 .collect::<Vec<_>>();
123
124 let mut inner_schedulers = Vec::with_capacity(fields.len());
125 for i in 0..fields.len() {
126 let inner_encoding = &inner_encodings[i];
127 let inner_datatype = inner_datatypes[i];
128 let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
129 inner_schedulers.push(inner_scheduler);
130 }
131
132 let packed_buffer = packed_struct.buffer.as_ref().unwrap();
133 let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
134
135 Box::new(PackedStructPageScheduler::new(
136 inner_schedulers,
137 data_type.clone(),
138 buffer_offset,
139 ))
140}
141
142pub fn decoder_from_array_encoding(
144 encoding: &pb::ArrayEncoding,
145 buffers: &PageBuffers,
146 data_type: &DataType,
147) -> Box<dyn PageScheduler> {
148 match encoding.array_encoding.as_ref().unwrap() {
149 pb::array_encoding::ArrayEncoding::Nullable(basic) => {
150 match basic.nullability.as_ref().unwrap() {
151 pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
152 BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
153 no_nulls.values.as_ref().unwrap(),
154 buffers,
155 data_type,
156 )),
157 ),
158 pb::nullable::Nullability::SomeNulls(some_nulls) => {
159 Box::new(BasicPageScheduler::new_nullable(
160 decoder_from_array_encoding(
161 some_nulls.validity.as_ref().unwrap(),
162 buffers,
163 data_type,
164 ),
165 decoder_from_array_encoding(
166 some_nulls.values.as_ref().unwrap(),
167 buffers,
168 data_type,
169 ),
170 ))
171 }
172 pb::nullable::Nullability::AllNulls(_) => {
173 Box::new(BasicPageScheduler::new_all_null())
174 }
175 }
176 }
177 #[cfg(feature = "bitpacking")]
178 pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
179 get_bitpacked_buffer_decoder(bitpacked, buffers)
180 }
181 #[cfg(not(feature = "bitpacking"))]
182 pb::array_encoding::ArrayEncoding::Bitpacked(_) => {
183 panic!("Runtime built without bitpacking support")
184 }
185 pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
186 pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
187 let item_encoding = fixed_size_list.items.as_ref().unwrap();
188 let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
189 Box::new(FixedListScheduler::new(
190 item_scheduler,
191 fixed_size_list.dimension,
192 ))
193 }
194 pb::array_encoding::ArrayEncoding::List(list) => {
198 decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
199 }
200 pb::array_encoding::ArrayEncoding::Binary(binary) => {
201 let indices_encoding = binary.indices.as_ref().unwrap();
202 let bytes_encoding = binary.bytes.as_ref().unwrap();
203
204 let indices_scheduler =
205 decoder_from_array_encoding(indices_encoding, buffers, data_type);
206 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
207
208 let offset_type = match data_type {
209 DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
210 _ => DataType::Int32,
211 };
212
213 Box::new(BinaryPageScheduler::new(
214 indices_scheduler.into(),
215 bytes_scheduler.into(),
216 offset_type,
217 binary.null_adjustment,
218 ))
219 }
220 pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
221 let inner =
222 decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
223
224 Box::new(FsstPageScheduler::new(
225 inner,
226 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
227 ))
228 }
229 pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
230 let indices_encoding = dictionary.indices.as_ref().unwrap();
231 let items_encoding = dictionary.items.as_ref().unwrap();
232 let num_dictionary_items = dictionary.num_dictionary_items;
233
234 let value_type = if let DataType::Dictionary(_, value_type) = data_type {
238 value_type
239 } else {
240 data_type
241 };
242
243 let indices_scheduler =
247 decoder_from_array_encoding(indices_encoding, buffers, data_type);
248
249 let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
250
251 let should_decode_dict = !data_type.is_dictionary();
252
253 Box::new(DictionaryPageScheduler::new(
254 indices_scheduler.into(),
255 items_scheduler.into(),
256 num_dictionary_items,
257 should_decode_dict,
258 ))
259 }
260 pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
261 let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
262 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
263 let bytes_per_offset = match data_type {
264 DataType::LargeBinary | DataType::LargeUtf8 => 8,
265 DataType::Binary | DataType::Utf8 => 4,
266 _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
267 };
268
269 Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
270 bytes_scheduler,
271 fixed_size_binary.byte_width,
272 bytes_per_offset,
273 ))
274 }
275 pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
276 decoder_from_packed_struct(packed_struct, buffers, data_type)
277 }
278 #[cfg(feature = "bitpacking")]
279 pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
280 get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
281 }
282 #[cfg(not(feature = "bitpacking"))]
283 pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(_) => {
284 panic!("Runtime built without bitpacking support")
285 }
286 pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
291 _ => unreachable!("Unsupported array encoding: {:?}", encoding),
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use crate::decoder::{ColumnBuffers, FileBuffers, PageBuffers};
299 use crate::format::pb;
300 use crate::previous::encodings::physical::get_buffer_decoder;
301
302 #[test]
303 fn test_get_buffer_decoder_for_compressed_buffer() {
304 let page_scheduler = get_buffer_decoder(
305 &pb::Flat {
306 buffer: Some(pb::Buffer {
307 buffer_index: 0,
308 buffer_type: pb::buffer::BufferType::File as i32,
309 }),
310 bits_per_value: 8,
311 compression: Some(pb::Compression {
312 scheme: "zstd".to_string(),
313 level: Some(0),
314 }),
315 },
316 &PageBuffers {
317 column_buffers: ColumnBuffers {
318 file_buffers: FileBuffers {
319 positions_and_sizes: &[(0, 100)],
320 },
321 positions_and_sizes: &[],
322 },
323 positions_and_sizes: &[],
324 },
325 );
326 assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
327 }
328}