Skip to main content

lance_encoding/previous/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::sync::Arc;
6
7use arrow_array::ArrayRef;
8use arrow_array::cast::AsArray;
9use arrow_array::types::UInt64Type;
10use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer, bit_util};
11use futures::TryFutureExt;
12
13use futures::{FutureExt, future::BoxFuture};
14
15use crate::buffer::LanceBuffer;
16use crate::data::{
17    BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
18};
19use crate::encodings::physical::block::{
20    BufferCompressor, CompressionConfig, GeneralBufferCompressor,
21};
22use crate::format::ProtobufUtils;
23use crate::previous::decoder::LogicalPageDecoder;
24use crate::previous::encoder::{ArrayEncoder, EncodedArray};
25use crate::previous::encodings::logical::primitive::PrimitiveFieldDecoder;
26use crate::{
27    EncodingsIo,
28    decoder::{PageScheduler, PrimitivePageDecoder},
29};
30
31use arrow_array::{PrimitiveArray, UInt64Array};
32use arrow_schema::DataType;
33use lance_core::Result;
34
35struct IndicesNormalizer {
36    indices: Vec<u64>,
37    validity: BooleanBufferBuilder,
38    null_adjustment: u64,
39}
40
41impl IndicesNormalizer {
42    fn new(num_rows: u64, null_adjustment: u64) -> Self {
43        let mut indices = Vec::with_capacity(num_rows as usize);
44        indices.push(0);
45        Self {
46            indices,
47            validity: BooleanBufferBuilder::new(num_rows as usize),
48            null_adjustment,
49        }
50    }
51
52    fn normalize(&self, val: u64) -> (bool, u64) {
53        if val >= self.null_adjustment {
54            (false, val - self.null_adjustment)
55        } else {
56            (true, val)
57        }
58    }
59
60    fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) -> Result<()> {
61        let mut last = *self.indices.last().unwrap();
62        if is_start {
63            let (is_valid, val) = self.normalize(new_indices.value(0));
64            self.indices.push(val);
65            self.validity.append(is_valid);
66            last += val;
67        }
68        let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
69        for (i, w) in new_indices.values().windows(2).enumerate() {
70            let (is_valid, val) = self.normalize(w[1]);
71            let next = match val.checked_sub(prev) {
72                Some(delta) => delta + last,
73                None => {
74                    return Err(lance_core::Error::invalid_input(format!(
75                        "corrupt binary page: normalized offset {} is less than previous offset {} \
76                             at index {}, null_adjustment={}, raw values were [{}, {}]. \
77                             This usually indicates the file data has been corrupted.",
78                        val, prev, i, self.null_adjustment, w[0], w[1]
79                    )));
80                }
81            };
82            self.indices.push(next);
83            self.validity.append(is_valid);
84            prev = val;
85            last = next;
86        }
87        Ok(())
88    }
89
90    fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
91        (self.indices, self.validity.finish())
92    }
93}
94
95#[derive(Debug)]
96pub struct BinaryPageScheduler {
97    indices_scheduler: Arc<dyn PageScheduler>,
98    bytes_scheduler: Arc<dyn PageScheduler>,
99    offsets_type: DataType,
100    null_adjustment: u64,
101}
102
103impl BinaryPageScheduler {
104    pub fn new(
105        indices_scheduler: Arc<dyn PageScheduler>,
106        bytes_scheduler: Arc<dyn PageScheduler>,
107        offsets_type: DataType,
108        null_adjustment: u64,
109    ) -> Self {
110        Self {
111            indices_scheduler,
112            bytes_scheduler,
113            offsets_type,
114            null_adjustment,
115        }
116    }
117
118    fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
119        let mut primitive_wrapper =
120            PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
121        let drained_task = primitive_wrapper.drain(num_rows)?;
122        let indices_decode_task = drained_task.task;
123        indices_decode_task.decode().map(|(arr, _)| arr)
124    }
125}
126
127struct IndirectData {
128    decoded_indices: UInt64Array,
129    offsets_type: DataType,
130    validity: BooleanBuffer,
131    bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
132}
133
134impl PageScheduler for BinaryPageScheduler {
135    fn schedule_ranges(
136        &self,
137        ranges: &[std::ops::Range<u64>],
138        scheduler: &Arc<dyn EncodingsIo>,
139        top_level_row: u64,
140    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
141        // ranges corresponds to row ranges that the user wants to fetch.
142        // if user wants row range a..b
143        // Case 1: if a != 0, we need indices a-1..b to decode
144        // Case 2: if a = 0, we need indices 0..b to decode
145        let indices_ranges = ranges
146            .iter()
147            .map(|range| {
148                if range.start != 0 {
149                    (range.start - 1)..range.end
150                } else {
151                    0..range.end
152                }
153            })
154            .collect::<Vec<std::ops::Range<u64>>>();
155
156        // We schedule all the indices for decoding together
157        // This is more efficient compared to scheduling them one by one (reduces speed significantly for random access)
158        let indices_page_decoder =
159            self.indices_scheduler
160                .schedule_ranges(&indices_ranges, scheduler, top_level_row);
161
162        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
163        let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
164
165        let ranges = ranges.to_vec();
166        let copy_scheduler = scheduler.clone();
167        let copy_bytes_scheduler = self.bytes_scheduler.clone();
168        let null_adjustment = self.null_adjustment;
169        let offsets_type = self.offsets_type.clone();
170
171        tokio::spawn(async move {
172            // For the following data:
173            // "abcd", "hello", "abcd", "apple", "hello", "abcd"
174            //   4,        9,     13,      18,      23,     27
175            // e.g. want to scan rows 0, 2, 4
176            // i.e. offsets are 4 | 9, 13 | 18, 23
177            // Normalization is required for decoding later on
178            // Normalize each part: 0, 4 | 0, 4 | 0, 5
179            // Remove leading zeros except first one: 0, 4 | 4 | 5
180            // Cumulative sum: 0, 4 | 8 | 13
181            // These are the normalized offsets stored in decoded_indices
182            // Rest of the workflow is continued later in BinaryPageDecoder
183            let indices_decoder = Arc::from(indices_page_decoder.await?);
184            let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
185            let decoded_indices = indices.as_primitive::<UInt64Type>();
186
187            let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
188            let mut bytes_ranges = Vec::new();
189            let mut curr_offset_index = 0;
190
191            for curr_row_range in ranges.iter() {
192                let row_start = curr_row_range.start;
193                let curr_range_len = (curr_row_range.end - row_start) as usize;
194
195                let curr_indices;
196
197                if row_start == 0 {
198                    curr_indices = decoded_indices.slice(0, curr_range_len);
199                    curr_offset_index = curr_range_len;
200                } else {
201                    curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
202                    curr_offset_index += curr_range_len + 1;
203                }
204
205                let first = if row_start == 0 {
206                    0
207                } else {
208                    indices_builder
209                        .normalize(*curr_indices.values().first().unwrap())
210                        .1
211                };
212                let last = indices_builder
213                    .normalize(*curr_indices.values().last().unwrap())
214                    .1;
215
216                if first != last {
217                    bytes_ranges.push(first..last);
218                }
219
220                indices_builder.extend(&curr_indices, row_start == 0)?;
221            }
222
223            let (indices, validity) = indices_builder.into_parts();
224            let decoded_indices = UInt64Array::from(indices);
225
226            // In the indirect task we schedule the bytes, but we do not await them.  We don't want to
227            // await the bytes until the decoder is ready for them so that we don't release the backpressure
228            // too early
229            let bytes_decoder_fut =
230                copy_bytes_scheduler.schedule_ranges(&bytes_ranges, &copy_scheduler, top_level_row);
231
232            Ok(IndirectData {
233                decoded_indices,
234                validity,
235                offsets_type,
236                bytes_decoder_fut,
237            })
238        })
239        // Propagate join panic
240        .map(|join_handle| join_handle.unwrap())
241        .and_then(|indirect_data| {
242            async move {
243                // Later, this will be called once the decoder actually starts polling.  At that point
244                // we await the bytes (releasing the backpressure)
245                let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
246                Ok(Box::new(BinaryPageDecoder {
247                    decoded_indices: indirect_data.decoded_indices,
248                    offsets_type: indirect_data.offsets_type,
249                    validity: indirect_data.validity,
250                    bytes_decoder,
251                }) as Box<dyn PrimitivePageDecoder>)
252            }
253        })
254        .boxed()
255    }
256}
257
258struct BinaryPageDecoder {
259    decoded_indices: UInt64Array,
260    offsets_type: DataType,
261    validity: BooleanBuffer,
262    bytes_decoder: Box<dyn PrimitivePageDecoder>,
263}
264
265impl PrimitivePageDecoder for BinaryPageDecoder {
266    // Continuing the example from BinaryPageScheduler
267    // Suppose batch_size = 2. Then first, rows_to_skip=0, num_rows=2
268    // Need to scan 2 rows
269    // First row will be 4-0=4 bytes, second also 8-4=4 bytes.
270    // Allocate 8 bytes capacity.
271    // Next rows_to_skip=2, num_rows=1
272    // Skip 8 bytes. Allocate 5 bytes capacity.
273    //
274    // The normalized offsets are [0, 4, 8, 13]
275    // We only need [8, 13] to decode in this case.
276    // These need to be normalized in order to build the string later
277    // So return [0, 5]
278    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
279        // STEP 1: validity buffer
280        let target_validity = self
281            .validity
282            .slice(rows_to_skip as usize, num_rows as usize);
283        let has_nulls = target_validity.count_set_bits() < target_validity.len();
284
285        let validity_buffer = if has_nulls {
286            let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
287            let mut validity_buffer = Vec::with_capacity(num_validity_bits);
288
289            if rows_to_skip == 0 {
290                validity_buffer.extend_from_slice(target_validity.inner().as_slice());
291            } else {
292                // Need to copy the buffer because there may be a bit offset in first byte
293                let target_validity = BooleanBuffer::from_iter(target_validity.iter());
294                validity_buffer.extend_from_slice(target_validity.inner().as_slice());
295            }
296            Some(validity_buffer)
297        } else {
298            None
299        };
300
301        // STEP 2: offsets buffer
302        // Currently we always do a copy here, we need to cast to the appropriate type
303        // and we go ahead and normalize so the starting offset is 0 (though we could skip
304        // this)
305        let bytes_per_offset = match self.offsets_type {
306            DataType::Int32 => 4,
307            DataType::Int64 => 8,
308            _ => panic!("Unsupported offsets type"),
309        };
310
311        let target_offsets = self
312            .decoded_indices
313            .slice(rows_to_skip as usize, (num_rows + 1) as usize);
314
315        // Normalize and cast (TODO: could fuse these into one pass for micro-optimization)
316        let target_vec = target_offsets.values();
317        let start = target_vec[0];
318        let offsets_buffer =
319            match bytes_per_offset {
320                4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
321                    .into_inner(),
322                8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
323                    .into_inner(),
324                _ => panic!("Unsupported offsets type"),
325            };
326
327        let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
328        let num_bytes = self
329            .decoded_indices
330            .value((rows_to_skip + num_rows) as usize)
331            - bytes_to_skip;
332
333        let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
334        let bytes = bytes.as_fixed_width().unwrap();
335        debug_assert_eq!(bytes.bits_per_value, 8);
336
337        let string_data = DataBlock::VariableWidth(VariableWidthBlock {
338            bits_per_offset: bytes_per_offset * 8,
339            data: bytes.data,
340            num_values: num_rows,
341            offsets: LanceBuffer::from(offsets_buffer),
342            block_info: BlockInfo::new(),
343        });
344        if let Some(validity) = validity_buffer {
345            Ok(DataBlock::Nullable(NullableDataBlock {
346                data: Box::new(string_data),
347                nulls: LanceBuffer::from(validity),
348                block_info: BlockInfo::new(),
349            }))
350        } else {
351            Ok(string_data)
352        }
353    }
354}
355
356#[derive(Debug)]
357pub struct BinaryEncoder {
358    indices_encoder: Box<dyn ArrayEncoder>,
359    compression_config: Option<CompressionConfig>,
360    buffer_compressor: Option<Box<dyn BufferCompressor>>,
361}
362
363impl BinaryEncoder {
364    pub fn try_new(
365        indices_encoder: Box<dyn ArrayEncoder>,
366        compression_config: Option<CompressionConfig>,
367    ) -> Result<Self> {
368        let buffer_compressor = compression_config
369            .map(GeneralBufferCompressor::get_compressor)
370            .transpose()?;
371        Ok(Self {
372            indices_encoder,
373            compression_config,
374            buffer_compressor,
375        })
376    }
377
378    // In 2.1 we will materialize nulls higher up (in the primitive encoder).  Unfortunately,
379    // in 2.0 we actually need to write the offsets.
380    fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
381        if matches!(data_type, DataType::Binary | DataType::Utf8) {
382            VariableWidthBlock {
383                bits_per_offset: 32,
384                data: LanceBuffer::empty(),
385                num_values,
386                offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
387                block_info: BlockInfo::new(),
388            }
389        } else {
390            VariableWidthBlock {
391                bits_per_offset: 64,
392                data: LanceBuffer::empty(),
393                num_values,
394                offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
395                block_info: BlockInfo::new(),
396            }
397        }
398    }
399}
400
401// Creates indices arrays from string arrays
402// Strings are a vector of arrays corresponding to each record batch
403// Zero offset is removed from the start of the offsets array
404// The indices array is computed across all arrays in the vector
405fn get_indices_from_string_arrays(
406    offsets: LanceBuffer,
407    bits_per_offset: u8,
408    nulls: Option<LanceBuffer>,
409    num_rows: usize,
410) -> (DataBlock, u64) {
411    let mut indices = Vec::with_capacity(num_rows);
412    let mut last_offset = 0_u64;
413    if bits_per_offset == 32 {
414        let offsets = offsets.borrow_to_typed_slice::<i32>();
415        indices.extend(offsets.as_ref().windows(2).map(|w| {
416            let strlen = (w[1] - w[0]) as u64;
417            last_offset += strlen;
418            last_offset
419        }));
420    } else if bits_per_offset == 64 {
421        let offsets = offsets.borrow_to_typed_slice::<i64>();
422        indices.extend(offsets.as_ref().windows(2).map(|w| {
423            let strlen = (w[1] - w[0]) as u64;
424            last_offset += strlen;
425            last_offset
426        }));
427    }
428
429    if indices.is_empty() {
430        return (
431            DataBlock::FixedWidth(FixedWidthDataBlock {
432                bits_per_value: 64,
433                data: LanceBuffer::empty(),
434                num_values: 0,
435                block_info: BlockInfo::new(),
436            }),
437            0,
438        );
439    }
440
441    let last_offset = *indices.last().expect("Indices array is empty");
442    // 8 exabytes in a single array seems unlikely but...just in case
443    assert!(
444        last_offset < u64::MAX / 2,
445        "Indices array with strings up to 2^63 is too large for this encoding"
446    );
447    let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
448
449    if let Some(nulls) = nulls {
450        let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
451        indices
452            .iter_mut()
453            .zip(nulls.iter())
454            .for_each(|(index, is_valid)| {
455                if !is_valid {
456                    *index += null_adjustment;
457                }
458            });
459    }
460    let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
461        bits_per_value: 64,
462        data: LanceBuffer::reinterpret_vec(indices),
463        num_values: num_rows as u64,
464        block_info: BlockInfo::new(),
465    });
466    (indices, null_adjustment)
467}
468
469impl ArrayEncoder for BinaryEncoder {
470    fn encode(
471        &self,
472        data: DataBlock,
473        data_type: &DataType,
474        buffer_index: &mut u32,
475    ) -> Result<EncodedArray> {
476        let (mut data, nulls) = match data {
477            DataBlock::Nullable(nullable) => {
478                let data = nullable.data.as_variable_width().unwrap();
479                (data, Some(nullable.nulls))
480            }
481            DataBlock::VariableWidth(variable) => (variable, None),
482            DataBlock::AllNull(all_null) => {
483                let data = Self::all_null_variable_width(data_type, all_null.num_values);
484                let validity =
485                    LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
486                (data, Some(validity))
487            }
488            _ => panic!("Expected variable width data block but got {}", data.name()),
489        };
490
491        let (indices, null_adjustment) = get_indices_from_string_arrays(
492            data.offsets,
493            data.bits_per_offset,
494            nulls,
495            data.num_values as usize,
496        );
497        let encoded_indices =
498            self.indices_encoder
499                .encode(indices, &DataType::UInt64, buffer_index)?;
500
501        let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
502
503        assert!(encoded_indices_data.bits_per_value <= 64);
504
505        if let Some(buffer_compressor) = &self.buffer_compressor {
506            let mut compressed_data = Vec::with_capacity(data.data.len());
507            buffer_compressor.compress(&data.data, &mut compressed_data)?;
508            data.data = LanceBuffer::from(compressed_data);
509        }
510
511        let data = DataBlock::VariableWidth(VariableWidthBlock {
512            bits_per_offset: encoded_indices_data.bits_per_value as u8,
513            offsets: encoded_indices_data.data,
514            data: data.data,
515            num_values: data.num_values,
516            block_info: BlockInfo::new(),
517        });
518
519        let bytes_buffer_index = *buffer_index;
520        *buffer_index += 1;
521
522        let bytes_encoding = ProtobufUtils::flat_encoding(
523            /*bits_per_value=*/ 8,
524            bytes_buffer_index,
525            self.compression_config,
526        );
527
528        let encoding =
529            ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
530
531        Ok(EncodedArray { data, encoding })
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use arrow_array::StringArray;
538
539    use super::*;
540
541    #[test]
542    fn test_encode_indices_adjusts_nulls() {
543        // Null entries in string arrays should be adjusted
544        let string_array = Arc::new(StringArray::from(vec![
545            None,
546            Some("foo"),
547            Some("foo"),
548            None,
549            None,
550            None,
551        ])) as ArrayRef;
552        let string_data = DataBlock::from(string_array).as_nullable().unwrap();
553        let nulls = string_data.nulls;
554        let string_data = string_data.data.as_variable_width().unwrap();
555
556        let (indices, null_adjustment) = get_indices_from_string_arrays(
557            string_data.offsets,
558            string_data.bits_per_offset,
559            Some(nulls),
560            string_data.num_values as usize,
561        );
562
563        let indices = indices.as_fixed_width().unwrap();
564        assert_eq!(indices.bits_per_value, 64);
565        assert_eq!(
566            indices.data,
567            LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
568        );
569        assert_eq!(null_adjustment, 7);
570    }
571}