lance_encoding/previous/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::sync::Arc;
6
7use arrow_array::cast::AsArray;
8use arrow_array::types::UInt64Type;
9use arrow_array::ArrayRef;
10use arrow_buffer::{bit_util, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
11use futures::TryFutureExt;
12
13use futures::{future::BoxFuture, FutureExt};
14
15use crate::buffer::LanceBuffer;
16use crate::data::{
17    BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
18};
19use crate::encodings::physical::block::{
20    BufferCompressor, CompressionConfig, GeneralBufferCompressor,
21};
22use crate::format::ProtobufUtils;
23use crate::previous::decoder::LogicalPageDecoder;
24use crate::previous::encoder::{ArrayEncoder, EncodedArray};
25use crate::previous::encodings::logical::primitive::PrimitiveFieldDecoder;
26use crate::{
27    decoder::{PageScheduler, PrimitivePageDecoder},
28    EncodingsIo,
29};
30
31use arrow_array::{PrimitiveArray, UInt64Array};
32use arrow_schema::DataType;
33use lance_core::Result;
34
35struct IndicesNormalizer {
36    indices: Vec<u64>,
37    validity: BooleanBufferBuilder,
38    null_adjustment: u64,
39}
40
41impl IndicesNormalizer {
42    fn new(num_rows: u64, null_adjustment: u64) -> Self {
43        let mut indices = Vec::with_capacity(num_rows as usize);
44        indices.push(0);
45        Self {
46            indices,
47            validity: BooleanBufferBuilder::new(num_rows as usize),
48            null_adjustment,
49        }
50    }
51
52    fn normalize(&self, val: u64) -> (bool, u64) {
53        if val >= self.null_adjustment {
54            (false, val - self.null_adjustment)
55        } else {
56            (true, val)
57        }
58    }
59
60    fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) {
61        let mut last = *self.indices.last().unwrap();
62        if is_start {
63            let (is_valid, val) = self.normalize(new_indices.value(0));
64            self.indices.push(val);
65            self.validity.append(is_valid);
66            last += val;
67        }
68        let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
69        for w in new_indices.values().windows(2) {
70            let (is_valid, val) = self.normalize(w[1]);
71            let next = val - prev + last;
72            self.indices.push(next);
73            self.validity.append(is_valid);
74            prev = val;
75            last = next;
76        }
77    }
78
79    fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
80        (self.indices, self.validity.finish())
81    }
82}
83
84#[derive(Debug)]
85pub struct BinaryPageScheduler {
86    indices_scheduler: Arc<dyn PageScheduler>,
87    bytes_scheduler: Arc<dyn PageScheduler>,
88    offsets_type: DataType,
89    null_adjustment: u64,
90}
91
92impl BinaryPageScheduler {
93    pub fn new(
94        indices_scheduler: Arc<dyn PageScheduler>,
95        bytes_scheduler: Arc<dyn PageScheduler>,
96        offsets_type: DataType,
97        null_adjustment: u64,
98    ) -> Self {
99        Self {
100            indices_scheduler,
101            bytes_scheduler,
102            offsets_type,
103            null_adjustment,
104        }
105    }
106
107    fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
108        let mut primitive_wrapper =
109            PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
110        let drained_task = primitive_wrapper.drain(num_rows)?;
111        let indices_decode_task = drained_task.task;
112        indices_decode_task.decode()
113    }
114}
115
116struct IndirectData {
117    decoded_indices: UInt64Array,
118    offsets_type: DataType,
119    validity: BooleanBuffer,
120    bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
121}
122
123impl PageScheduler for BinaryPageScheduler {
124    fn schedule_ranges(
125        &self,
126        ranges: &[std::ops::Range<u64>],
127        scheduler: &Arc<dyn EncodingsIo>,
128        top_level_row: u64,
129    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
130        // ranges corresponds to row ranges that the user wants to fetch.
131        // if user wants row range a..b
132        // Case 1: if a != 0, we need indices a-1..b to decode
133        // Case 2: if a = 0, we need indices 0..b to decode
134        let indices_ranges = ranges
135            .iter()
136            .map(|range| {
137                if range.start != 0 {
138                    (range.start - 1)..range.end
139                } else {
140                    0..range.end
141                }
142            })
143            .collect::<Vec<std::ops::Range<u64>>>();
144
145        // We schedule all the indices for decoding together
146        // This is more efficient compared to scheduling them one by one (reduces speed significantly for random access)
147        let indices_page_decoder =
148            self.indices_scheduler
149                .schedule_ranges(&indices_ranges, scheduler, top_level_row);
150
151        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
152        let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
153
154        let ranges = ranges.to_vec();
155        let copy_scheduler = scheduler.clone();
156        let copy_bytes_scheduler = self.bytes_scheduler.clone();
157        let null_adjustment = self.null_adjustment;
158        let offsets_type = self.offsets_type.clone();
159
160        tokio::spawn(async move {
161            // For the following data:
162            // "abcd", "hello", "abcd", "apple", "hello", "abcd"
163            //   4,        9,     13,      18,      23,     27
164            // e.g. want to scan rows 0, 2, 4
165            // i.e. offsets are 4 | 9, 13 | 18, 23
166            // Normalization is required for decoding later on
167            // Normalize each part: 0, 4 | 0, 4 | 0, 5
168            // Remove leading zeros except first one: 0, 4 | 4 | 5
169            // Cumulative sum: 0, 4 | 8 | 13
170            // These are the normalized offsets stored in decoded_indices
171            // Rest of the workflow is continued later in BinaryPageDecoder
172            let indices_decoder = Arc::from(indices_page_decoder.await?);
173            let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
174            let decoded_indices = indices.as_primitive::<UInt64Type>();
175
176            let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
177            let mut bytes_ranges = Vec::new();
178            let mut curr_offset_index = 0;
179
180            for curr_row_range in ranges.iter() {
181                let row_start = curr_row_range.start;
182                let curr_range_len = (curr_row_range.end - row_start) as usize;
183
184                let curr_indices;
185
186                if row_start == 0 {
187                    curr_indices = decoded_indices.slice(0, curr_range_len);
188                    curr_offset_index = curr_range_len;
189                } else {
190                    curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
191                    curr_offset_index += curr_range_len + 1;
192                }
193
194                let first = if row_start == 0 {
195                    0
196                } else {
197                    indices_builder
198                        .normalize(*curr_indices.values().first().unwrap())
199                        .1
200                };
201                let last = indices_builder
202                    .normalize(*curr_indices.values().last().unwrap())
203                    .1;
204                if first != last {
205                    bytes_ranges.push(first..last);
206                }
207
208                indices_builder.extend(&curr_indices, row_start == 0);
209            }
210
211            let (indices, validity) = indices_builder.into_parts();
212            let decoded_indices = UInt64Array::from(indices);
213
214            // In the indirect task we schedule the bytes, but we do not await them.  We don't want to
215            // await the bytes until the decoder is ready for them so that we don't release the backpressure
216            // too early
217            let bytes_decoder_fut =
218                copy_bytes_scheduler.schedule_ranges(&bytes_ranges, &copy_scheduler, top_level_row);
219
220            Ok(IndirectData {
221                decoded_indices,
222                validity,
223                offsets_type,
224                bytes_decoder_fut,
225            })
226        })
227        // Propagate join panic
228        .map(|join_handle| join_handle.unwrap())
229        .and_then(|indirect_data| {
230            async move {
231                // Later, this will be called once the decoder actually starts polling.  At that point
232                // we await the bytes (releasing the backpressure)
233                let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
234                Ok(Box::new(BinaryPageDecoder {
235                    decoded_indices: indirect_data.decoded_indices,
236                    offsets_type: indirect_data.offsets_type,
237                    validity: indirect_data.validity,
238                    bytes_decoder,
239                }) as Box<dyn PrimitivePageDecoder>)
240            }
241        })
242        .boxed()
243    }
244}
245
246struct BinaryPageDecoder {
247    decoded_indices: UInt64Array,
248    offsets_type: DataType,
249    validity: BooleanBuffer,
250    bytes_decoder: Box<dyn PrimitivePageDecoder>,
251}
252
253impl PrimitivePageDecoder for BinaryPageDecoder {
254    // Continuing the example from BinaryPageScheduler
255    // Suppose batch_size = 2. Then first, rows_to_skip=0, num_rows=2
256    // Need to scan 2 rows
257    // First row will be 4-0=4 bytes, second also 8-4=4 bytes.
258    // Allocate 8 bytes capacity.
259    // Next rows_to_skip=2, num_rows=1
260    // Skip 8 bytes. Allocate 5 bytes capacity.
261    //
262    // The normalized offsets are [0, 4, 8, 13]
263    // We only need [8, 13] to decode in this case.
264    // These need to be normalized in order to build the string later
265    // So return [0, 5]
266    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
267        // STEP 1: validity buffer
268        let target_validity = self
269            .validity
270            .slice(rows_to_skip as usize, num_rows as usize);
271        let has_nulls = target_validity.count_set_bits() < target_validity.len();
272
273        let validity_buffer = if has_nulls {
274            let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
275            let mut validity_buffer = Vec::with_capacity(num_validity_bits);
276
277            if rows_to_skip == 0 {
278                validity_buffer.extend_from_slice(target_validity.inner().as_slice());
279            } else {
280                // Need to copy the buffer because there may be a bit offset in first byte
281                let target_validity = BooleanBuffer::from_iter(target_validity.iter());
282                validity_buffer.extend_from_slice(target_validity.inner().as_slice());
283            }
284            Some(validity_buffer)
285        } else {
286            None
287        };
288
289        // STEP 2: offsets buffer
290        // Currently we always do a copy here, we need to cast to the appropriate type
291        // and we go ahead and normalize so the starting offset is 0 (though we could skip
292        // this)
293        let bytes_per_offset = match self.offsets_type {
294            DataType::Int32 => 4,
295            DataType::Int64 => 8,
296            _ => panic!("Unsupported offsets type"),
297        };
298
299        let target_offsets = self
300            .decoded_indices
301            .slice(rows_to_skip as usize, (num_rows + 1) as usize);
302
303        // Normalize and cast (TODO: could fuse these into one pass for micro-optimization)
304        let target_vec = target_offsets.values();
305        let start = target_vec[0];
306        let offsets_buffer =
307            match bytes_per_offset {
308                4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
309                    .into_inner(),
310                8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
311                    .into_inner(),
312                _ => panic!("Unsupported offsets type"),
313            };
314
315        let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
316        let num_bytes = self
317            .decoded_indices
318            .value((rows_to_skip + num_rows) as usize)
319            - bytes_to_skip;
320
321        let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
322        let bytes = bytes.as_fixed_width().unwrap();
323        debug_assert_eq!(bytes.bits_per_value, 8);
324
325        let string_data = DataBlock::VariableWidth(VariableWidthBlock {
326            bits_per_offset: bytes_per_offset * 8,
327            data: bytes.data,
328            num_values: num_rows,
329            offsets: LanceBuffer::from(offsets_buffer),
330            block_info: BlockInfo::new(),
331        });
332        if let Some(validity) = validity_buffer {
333            Ok(DataBlock::Nullable(NullableDataBlock {
334                data: Box::new(string_data),
335                nulls: LanceBuffer::from(validity),
336                block_info: BlockInfo::new(),
337            }))
338        } else {
339            Ok(string_data)
340        }
341    }
342}
343
344#[derive(Debug)]
345pub struct BinaryEncoder {
346    indices_encoder: Box<dyn ArrayEncoder>,
347    compression_config: Option<CompressionConfig>,
348    buffer_compressor: Option<Box<dyn BufferCompressor>>,
349}
350
351impl BinaryEncoder {
352    pub fn try_new(
353        indices_encoder: Box<dyn ArrayEncoder>,
354        compression_config: Option<CompressionConfig>,
355    ) -> Result<Self> {
356        let buffer_compressor = compression_config
357            .map(GeneralBufferCompressor::get_compressor)
358            .transpose()?;
359        Ok(Self {
360            indices_encoder,
361            compression_config,
362            buffer_compressor,
363        })
364    }
365
366    // In 2.1 we will materialize nulls higher up (in the primitive encoder).  Unfortunately,
367    // in 2.0 we actually need to write the offsets.
368    fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
369        if matches!(data_type, DataType::Binary | DataType::Utf8) {
370            VariableWidthBlock {
371                bits_per_offset: 32,
372                data: LanceBuffer::empty(),
373                num_values,
374                offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
375                block_info: BlockInfo::new(),
376            }
377        } else {
378            VariableWidthBlock {
379                bits_per_offset: 64,
380                data: LanceBuffer::empty(),
381                num_values,
382                offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
383                block_info: BlockInfo::new(),
384            }
385        }
386    }
387}
388
389// Creates indices arrays from string arrays
390// Strings are a vector of arrays corresponding to each record batch
391// Zero offset is removed from the start of the offsets array
392// The indices array is computed across all arrays in the vector
393fn get_indices_from_string_arrays(
394    offsets: LanceBuffer,
395    bits_per_offset: u8,
396    nulls: Option<LanceBuffer>,
397    num_rows: usize,
398) -> (DataBlock, u64) {
399    let mut indices = Vec::with_capacity(num_rows);
400    let mut last_offset = 0_u64;
401    if bits_per_offset == 32 {
402        let offsets = offsets.borrow_to_typed_slice::<i32>();
403        indices.extend(offsets.as_ref().windows(2).map(|w| {
404            let strlen = (w[1] - w[0]) as u64;
405            last_offset += strlen;
406            last_offset
407        }));
408    } else if bits_per_offset == 64 {
409        let offsets = offsets.borrow_to_typed_slice::<i64>();
410        indices.extend(offsets.as_ref().windows(2).map(|w| {
411            let strlen = (w[1] - w[0]) as u64;
412            last_offset += strlen;
413            last_offset
414        }));
415    }
416
417    if indices.is_empty() {
418        return (
419            DataBlock::FixedWidth(FixedWidthDataBlock {
420                bits_per_value: 64,
421                data: LanceBuffer::empty(),
422                num_values: 0,
423                block_info: BlockInfo::new(),
424            }),
425            0,
426        );
427    }
428
429    let last_offset = *indices.last().expect("Indices array is empty");
430    // 8 exabytes in a single array seems unlikely but...just in case
431    assert!(
432        last_offset < u64::MAX / 2,
433        "Indices array with strings up to 2^63 is too large for this encoding"
434    );
435    let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
436
437    if let Some(nulls) = nulls {
438        let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
439        indices
440            .iter_mut()
441            .zip(nulls.iter())
442            .for_each(|(index, is_valid)| {
443                if !is_valid {
444                    *index += null_adjustment;
445                }
446            });
447    }
448    let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
449        bits_per_value: 64,
450        data: LanceBuffer::reinterpret_vec(indices),
451        num_values: num_rows as u64,
452        block_info: BlockInfo::new(),
453    });
454    (indices, null_adjustment)
455}
456
457impl ArrayEncoder for BinaryEncoder {
458    fn encode(
459        &self,
460        data: DataBlock,
461        data_type: &DataType,
462        buffer_index: &mut u32,
463    ) -> Result<EncodedArray> {
464        let (mut data, nulls) = match data {
465            DataBlock::Nullable(nullable) => {
466                let data = nullable.data.as_variable_width().unwrap();
467                (data, Some(nullable.nulls))
468            }
469            DataBlock::VariableWidth(variable) => (variable, None),
470            DataBlock::AllNull(all_null) => {
471                let data = Self::all_null_variable_width(data_type, all_null.num_values);
472                let validity =
473                    LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
474                (data, Some(validity))
475            }
476            _ => panic!("Expected variable width data block but got {}", data.name()),
477        };
478
479        let (indices, null_adjustment) = get_indices_from_string_arrays(
480            data.offsets,
481            data.bits_per_offset,
482            nulls,
483            data.num_values as usize,
484        );
485        let encoded_indices =
486            self.indices_encoder
487                .encode(indices, &DataType::UInt64, buffer_index)?;
488
489        let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
490
491        assert!(encoded_indices_data.bits_per_value <= 64);
492
493        if let Some(buffer_compressor) = &self.buffer_compressor {
494            let mut compressed_data = Vec::with_capacity(data.data.len());
495            buffer_compressor.compress(&data.data, &mut compressed_data)?;
496            data.data = LanceBuffer::from(compressed_data);
497        }
498
499        let data = DataBlock::VariableWidth(VariableWidthBlock {
500            bits_per_offset: encoded_indices_data.bits_per_value as u8,
501            offsets: encoded_indices_data.data,
502            data: data.data,
503            num_values: data.num_values,
504            block_info: BlockInfo::new(),
505        });
506
507        let bytes_buffer_index = *buffer_index;
508        *buffer_index += 1;
509
510        let bytes_encoding = ProtobufUtils::flat_encoding(
511            /*bits_per_value=*/ 8,
512            bytes_buffer_index,
513            self.compression_config,
514        );
515
516        let encoding =
517            ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
518
519        Ok(EncodedArray { data, encoding })
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use arrow_array::StringArray;
526
527    use super::*;
528
529    #[test]
530    fn test_encode_indices_adjusts_nulls() {
531        // Null entries in string arrays should be adjusted
532        let string_array = Arc::new(StringArray::from(vec![
533            None,
534            Some("foo"),
535            Some("foo"),
536            None,
537            None,
538            None,
539        ])) as ArrayRef;
540        let string_data = DataBlock::from(string_array).as_nullable().unwrap();
541        let nulls = string_data.nulls;
542        let string_data = string_data.data.as_variable_width().unwrap();
543
544        let (indices, null_adjustment) = get_indices_from_string_arrays(
545            string_data.offsets,
546            string_data.bits_per_offset,
547            Some(nulls),
548            string_data.num_values as usize,
549        );
550
551        let indices = indices.as_fixed_width().unwrap();
552        assert_eq!(indices.bits_per_value, 64);
553        assert_eq!(
554            indices.data,
555            LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
556        );
557        assert_eq!(null_adjustment, 7);
558    }
559}