1use core::panic;
5use std::sync::Arc;
6
7use arrow_array::ArrayRef;
8use arrow_array::cast::AsArray;
9use arrow_array::types::UInt64Type;
10use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer, bit_util};
11use futures::TryFutureExt;
12
13use futures::{FutureExt, future::BoxFuture};
14
15use crate::buffer::LanceBuffer;
16use crate::data::{
17 BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
18};
19use crate::encodings::physical::block::{
20 BufferCompressor, CompressionConfig, GeneralBufferCompressor,
21};
22use crate::format::ProtobufUtils;
23use crate::previous::decoder::LogicalPageDecoder;
24use crate::previous::encoder::{ArrayEncoder, EncodedArray};
25use crate::previous::encodings::logical::primitive::PrimitiveFieldDecoder;
26use crate::{
27 EncodingsIo,
28 decoder::{PageScheduler, PrimitivePageDecoder},
29};
30
31use arrow_array::{PrimitiveArray, UInt64Array};
32use arrow_schema::DataType;
33use lance_core::Result;
34
35struct IndicesNormalizer {
36 indices: Vec<u64>,
37 validity: BooleanBufferBuilder,
38 null_adjustment: u64,
39}
40
41impl IndicesNormalizer {
42 fn new(num_rows: u64, null_adjustment: u64) -> Self {
43 let mut indices = Vec::with_capacity(num_rows as usize);
44 indices.push(0);
45 Self {
46 indices,
47 validity: BooleanBufferBuilder::new(num_rows as usize),
48 null_adjustment,
49 }
50 }
51
52 fn normalize(&self, val: u64) -> (bool, u64) {
53 if val >= self.null_adjustment {
54 (false, val - self.null_adjustment)
55 } else {
56 (true, val)
57 }
58 }
59
60 fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) -> Result<()> {
61 let mut last = *self.indices.last().unwrap();
62 if is_start {
63 let (is_valid, val) = self.normalize(new_indices.value(0));
64 self.indices.push(val);
65 self.validity.append(is_valid);
66 last += val;
67 }
68 let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
69 for (i, w) in new_indices.values().windows(2).enumerate() {
70 let (is_valid, val) = self.normalize(w[1]);
71 let next = match val.checked_sub(prev) {
72 Some(delta) => delta + last,
73 None => {
74 return Err(lance_core::Error::invalid_input(format!(
75 "corrupt binary page: normalized offset {} is less than previous offset {} \
76 at index {}, null_adjustment={}, raw values were [{}, {}]. \
77 This usually indicates the file data has been corrupted.",
78 val, prev, i, self.null_adjustment, w[0], w[1]
79 )));
80 }
81 };
82 self.indices.push(next);
83 self.validity.append(is_valid);
84 prev = val;
85 last = next;
86 }
87 Ok(())
88 }
89
90 fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
91 (self.indices, self.validity.finish())
92 }
93}
94
95#[derive(Debug)]
96pub struct BinaryPageScheduler {
97 indices_scheduler: Arc<dyn PageScheduler>,
98 bytes_scheduler: Arc<dyn PageScheduler>,
99 offsets_type: DataType,
100 null_adjustment: u64,
101}
102
103impl BinaryPageScheduler {
104 pub fn new(
105 indices_scheduler: Arc<dyn PageScheduler>,
106 bytes_scheduler: Arc<dyn PageScheduler>,
107 offsets_type: DataType,
108 null_adjustment: u64,
109 ) -> Self {
110 Self {
111 indices_scheduler,
112 bytes_scheduler,
113 offsets_type,
114 null_adjustment,
115 }
116 }
117
118 fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
119 let mut primitive_wrapper =
120 PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
121 let drained_task = primitive_wrapper.drain(num_rows)?;
122 let indices_decode_task = drained_task.task;
123 indices_decode_task.decode().map(|(arr, _)| arr)
124 }
125}
126
127struct IndirectData {
128 decoded_indices: UInt64Array,
129 offsets_type: DataType,
130 validity: BooleanBuffer,
131 bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
132}
133
134impl PageScheduler for BinaryPageScheduler {
135 fn schedule_ranges(
136 &self,
137 ranges: &[std::ops::Range<u64>],
138 scheduler: &Arc<dyn EncodingsIo>,
139 top_level_row: u64,
140 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
141 let indices_ranges = ranges
146 .iter()
147 .map(|range| {
148 if range.start != 0 {
149 (range.start - 1)..range.end
150 } else {
151 0..range.end
152 }
153 })
154 .collect::<Vec<std::ops::Range<u64>>>();
155
156 let indices_page_decoder =
159 self.indices_scheduler
160 .schedule_ranges(&indices_ranges, scheduler, top_level_row);
161
162 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
163 let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
164
165 let ranges = ranges.to_vec();
166 let copy_scheduler = scheduler.clone();
167 let copy_bytes_scheduler = self.bytes_scheduler.clone();
168 let null_adjustment = self.null_adjustment;
169 let offsets_type = self.offsets_type.clone();
170
171 tokio::spawn(async move {
172 let indices_decoder = Arc::from(indices_page_decoder.await?);
184 let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
185 let decoded_indices = indices.as_primitive::<UInt64Type>();
186
187 let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
188 let mut bytes_ranges = Vec::new();
189 let mut curr_offset_index = 0;
190
191 for curr_row_range in ranges.iter() {
192 let row_start = curr_row_range.start;
193 let curr_range_len = (curr_row_range.end - row_start) as usize;
194
195 let curr_indices;
196
197 if row_start == 0 {
198 curr_indices = decoded_indices.slice(0, curr_range_len);
199 curr_offset_index = curr_range_len;
200 } else {
201 curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
202 curr_offset_index += curr_range_len + 1;
203 }
204
205 let first = if row_start == 0 {
206 0
207 } else {
208 indices_builder
209 .normalize(*curr_indices.values().first().unwrap())
210 .1
211 };
212 let last = indices_builder
213 .normalize(*curr_indices.values().last().unwrap())
214 .1;
215
216 if first != last {
217 bytes_ranges.push(first..last);
218 }
219
220 indices_builder.extend(&curr_indices, row_start == 0)?;
221 }
222
223 let (indices, validity) = indices_builder.into_parts();
224 let decoded_indices = UInt64Array::from(indices);
225
226 let bytes_decoder_fut =
230 copy_bytes_scheduler.schedule_ranges(&bytes_ranges, ©_scheduler, top_level_row);
231
232 Ok(IndirectData {
233 decoded_indices,
234 validity,
235 offsets_type,
236 bytes_decoder_fut,
237 })
238 })
239 .map(|join_handle| join_handle.unwrap())
241 .and_then(|indirect_data| {
242 async move {
243 let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
246 Ok(Box::new(BinaryPageDecoder {
247 decoded_indices: indirect_data.decoded_indices,
248 offsets_type: indirect_data.offsets_type,
249 validity: indirect_data.validity,
250 bytes_decoder,
251 }) as Box<dyn PrimitivePageDecoder>)
252 }
253 })
254 .boxed()
255 }
256}
257
258struct BinaryPageDecoder {
259 decoded_indices: UInt64Array,
260 offsets_type: DataType,
261 validity: BooleanBuffer,
262 bytes_decoder: Box<dyn PrimitivePageDecoder>,
263}
264
265impl PrimitivePageDecoder for BinaryPageDecoder {
266 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
279 let target_validity = self
281 .validity
282 .slice(rows_to_skip as usize, num_rows as usize);
283 let has_nulls = target_validity.count_set_bits() < target_validity.len();
284
285 let validity_buffer = if has_nulls {
286 let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
287 let mut validity_buffer = Vec::with_capacity(num_validity_bits);
288
289 if rows_to_skip == 0 {
290 validity_buffer.extend_from_slice(target_validity.inner().as_slice());
291 } else {
292 let target_validity = BooleanBuffer::from_iter(target_validity.iter());
294 validity_buffer.extend_from_slice(target_validity.inner().as_slice());
295 }
296 Some(validity_buffer)
297 } else {
298 None
299 };
300
301 let bytes_per_offset = match self.offsets_type {
306 DataType::Int32 => 4,
307 DataType::Int64 => 8,
308 _ => panic!("Unsupported offsets type"),
309 };
310
311 let target_offsets = self
312 .decoded_indices
313 .slice(rows_to_skip as usize, (num_rows + 1) as usize);
314
315 let target_vec = target_offsets.values();
317 let start = target_vec[0];
318 let offsets_buffer =
319 match bytes_per_offset {
320 4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
321 .into_inner(),
322 8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
323 .into_inner(),
324 _ => panic!("Unsupported offsets type"),
325 };
326
327 let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
328 let num_bytes = self
329 .decoded_indices
330 .value((rows_to_skip + num_rows) as usize)
331 - bytes_to_skip;
332
333 let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
334 let bytes = bytes.as_fixed_width().unwrap();
335 debug_assert_eq!(bytes.bits_per_value, 8);
336
337 let string_data = DataBlock::VariableWidth(VariableWidthBlock {
338 bits_per_offset: bytes_per_offset * 8,
339 data: bytes.data,
340 num_values: num_rows,
341 offsets: LanceBuffer::from(offsets_buffer),
342 block_info: BlockInfo::new(),
343 });
344 if let Some(validity) = validity_buffer {
345 Ok(DataBlock::Nullable(NullableDataBlock {
346 data: Box::new(string_data),
347 nulls: LanceBuffer::from(validity),
348 block_info: BlockInfo::new(),
349 }))
350 } else {
351 Ok(string_data)
352 }
353 }
354}
355
356#[derive(Debug)]
357pub struct BinaryEncoder {
358 indices_encoder: Box<dyn ArrayEncoder>,
359 compression_config: Option<CompressionConfig>,
360 buffer_compressor: Option<Box<dyn BufferCompressor>>,
361}
362
363impl BinaryEncoder {
364 pub fn try_new(
365 indices_encoder: Box<dyn ArrayEncoder>,
366 compression_config: Option<CompressionConfig>,
367 ) -> Result<Self> {
368 let buffer_compressor = compression_config
369 .map(GeneralBufferCompressor::get_compressor)
370 .transpose()?;
371 Ok(Self {
372 indices_encoder,
373 compression_config,
374 buffer_compressor,
375 })
376 }
377
378 fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
381 if matches!(data_type, DataType::Binary | DataType::Utf8) {
382 VariableWidthBlock {
383 bits_per_offset: 32,
384 data: LanceBuffer::empty(),
385 num_values,
386 offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
387 block_info: BlockInfo::new(),
388 }
389 } else {
390 VariableWidthBlock {
391 bits_per_offset: 64,
392 data: LanceBuffer::empty(),
393 num_values,
394 offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
395 block_info: BlockInfo::new(),
396 }
397 }
398 }
399}
400
401fn get_indices_from_string_arrays(
406 offsets: LanceBuffer,
407 bits_per_offset: u8,
408 nulls: Option<LanceBuffer>,
409 num_rows: usize,
410) -> (DataBlock, u64) {
411 let mut indices = Vec::with_capacity(num_rows);
412 let mut last_offset = 0_u64;
413 if bits_per_offset == 32 {
414 let offsets = offsets.borrow_to_typed_slice::<i32>();
415 indices.extend(offsets.as_ref().windows(2).map(|w| {
416 let strlen = (w[1] - w[0]) as u64;
417 last_offset += strlen;
418 last_offset
419 }));
420 } else if bits_per_offset == 64 {
421 let offsets = offsets.borrow_to_typed_slice::<i64>();
422 indices.extend(offsets.as_ref().windows(2).map(|w| {
423 let strlen = (w[1] - w[0]) as u64;
424 last_offset += strlen;
425 last_offset
426 }));
427 }
428
429 if indices.is_empty() {
430 return (
431 DataBlock::FixedWidth(FixedWidthDataBlock {
432 bits_per_value: 64,
433 data: LanceBuffer::empty(),
434 num_values: 0,
435 block_info: BlockInfo::new(),
436 }),
437 0,
438 );
439 }
440
441 let last_offset = *indices.last().expect("Indices array is empty");
442 assert!(
444 last_offset < u64::MAX / 2,
445 "Indices array with strings up to 2^63 is too large for this encoding"
446 );
447 let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
448
449 if let Some(nulls) = nulls {
450 let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
451 indices
452 .iter_mut()
453 .zip(nulls.iter())
454 .for_each(|(index, is_valid)| {
455 if !is_valid {
456 *index += null_adjustment;
457 }
458 });
459 }
460 let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
461 bits_per_value: 64,
462 data: LanceBuffer::reinterpret_vec(indices),
463 num_values: num_rows as u64,
464 block_info: BlockInfo::new(),
465 });
466 (indices, null_adjustment)
467}
468
469impl ArrayEncoder for BinaryEncoder {
470 fn encode(
471 &self,
472 data: DataBlock,
473 data_type: &DataType,
474 buffer_index: &mut u32,
475 ) -> Result<EncodedArray> {
476 let (mut data, nulls) = match data {
477 DataBlock::Nullable(nullable) => {
478 let data = nullable.data.as_variable_width().unwrap();
479 (data, Some(nullable.nulls))
480 }
481 DataBlock::VariableWidth(variable) => (variable, None),
482 DataBlock::AllNull(all_null) => {
483 let data = Self::all_null_variable_width(data_type, all_null.num_values);
484 let validity =
485 LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
486 (data, Some(validity))
487 }
488 _ => panic!("Expected variable width data block but got {}", data.name()),
489 };
490
491 let (indices, null_adjustment) = get_indices_from_string_arrays(
492 data.offsets,
493 data.bits_per_offset,
494 nulls,
495 data.num_values as usize,
496 );
497 let encoded_indices =
498 self.indices_encoder
499 .encode(indices, &DataType::UInt64, buffer_index)?;
500
501 let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
502
503 assert!(encoded_indices_data.bits_per_value <= 64);
504
505 if let Some(buffer_compressor) = &self.buffer_compressor {
506 let mut compressed_data = Vec::with_capacity(data.data.len());
507 buffer_compressor.compress(&data.data, &mut compressed_data)?;
508 data.data = LanceBuffer::from(compressed_data);
509 }
510
511 let data = DataBlock::VariableWidth(VariableWidthBlock {
512 bits_per_offset: encoded_indices_data.bits_per_value as u8,
513 offsets: encoded_indices_data.data,
514 data: data.data,
515 num_values: data.num_values,
516 block_info: BlockInfo::new(),
517 });
518
519 let bytes_buffer_index = *buffer_index;
520 *buffer_index += 1;
521
522 let bytes_encoding = ProtobufUtils::flat_encoding(
523 8,
524 bytes_buffer_index,
525 self.compression_config,
526 );
527
528 let encoding =
529 ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
530
531 Ok(EncodedArray { data, encoding })
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use arrow_array::StringArray;
538
539 use super::*;
540
541 #[test]
542 fn test_encode_indices_adjusts_nulls() {
543 let string_array = Arc::new(StringArray::from(vec![
545 None,
546 Some("foo"),
547 Some("foo"),
548 None,
549 None,
550 None,
551 ])) as ArrayRef;
552 let string_data = DataBlock::from(string_array).as_nullable().unwrap();
553 let nulls = string_data.nulls;
554 let string_data = string_data.data.as_variable_width().unwrap();
555
556 let (indices, null_adjustment) = get_indices_from_string_arrays(
557 string_data.offsets,
558 string_data.bits_per_offset,
559 Some(nulls),
560 string_data.num_values as usize,
561 );
562
563 let indices = indices.as_fixed_width().unwrap();
564 assert_eq!(indices.bits_per_value, 64);
565 assert_eq!(
566 indices.data,
567 LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
568 );
569 assert_eq!(null_adjustment, 7);
570 }
571}