1use core::panic;
5use std::sync::Arc;
6
7use arrow_array::cast::AsArray;
8use arrow_array::types::UInt64Type;
9use arrow_array::ArrayRef;
10use arrow_buffer::{bit_util, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
11use futures::TryFutureExt;
12
13use futures::{future::BoxFuture, FutureExt};
14
15use crate::buffer::LanceBuffer;
16use crate::data::{
17 BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
18};
19use crate::encodings::physical::block::{
20 BufferCompressor, CompressionConfig, GeneralBufferCompressor,
21};
22use crate::format::ProtobufUtils;
23use crate::previous::decoder::LogicalPageDecoder;
24use crate::previous::encoder::{ArrayEncoder, EncodedArray};
25use crate::previous::encodings::logical::primitive::PrimitiveFieldDecoder;
26use crate::{
27 decoder::{PageScheduler, PrimitivePageDecoder},
28 EncodingsIo,
29};
30
31use arrow_array::{PrimitiveArray, UInt64Array};
32use arrow_schema::DataType;
33use lance_core::Result;
34
35struct IndicesNormalizer {
36 indices: Vec<u64>,
37 validity: BooleanBufferBuilder,
38 null_adjustment: u64,
39}
40
41impl IndicesNormalizer {
42 fn new(num_rows: u64, null_adjustment: u64) -> Self {
43 let mut indices = Vec::with_capacity(num_rows as usize);
44 indices.push(0);
45 Self {
46 indices,
47 validity: BooleanBufferBuilder::new(num_rows as usize),
48 null_adjustment,
49 }
50 }
51
52 fn normalize(&self, val: u64) -> (bool, u64) {
53 if val >= self.null_adjustment {
54 (false, val - self.null_adjustment)
55 } else {
56 (true, val)
57 }
58 }
59
60 fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) {
61 let mut last = *self.indices.last().unwrap();
62 if is_start {
63 let (is_valid, val) = self.normalize(new_indices.value(0));
64 self.indices.push(val);
65 self.validity.append(is_valid);
66 last += val;
67 }
68 let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
69 for w in new_indices.values().windows(2) {
70 let (is_valid, val) = self.normalize(w[1]);
71 let next = val - prev + last;
72 self.indices.push(next);
73 self.validity.append(is_valid);
74 prev = val;
75 last = next;
76 }
77 }
78
79 fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
80 (self.indices, self.validity.finish())
81 }
82}
83
84#[derive(Debug)]
85pub struct BinaryPageScheduler {
86 indices_scheduler: Arc<dyn PageScheduler>,
87 bytes_scheduler: Arc<dyn PageScheduler>,
88 offsets_type: DataType,
89 null_adjustment: u64,
90}
91
92impl BinaryPageScheduler {
93 pub fn new(
94 indices_scheduler: Arc<dyn PageScheduler>,
95 bytes_scheduler: Arc<dyn PageScheduler>,
96 offsets_type: DataType,
97 null_adjustment: u64,
98 ) -> Self {
99 Self {
100 indices_scheduler,
101 bytes_scheduler,
102 offsets_type,
103 null_adjustment,
104 }
105 }
106
107 fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
108 let mut primitive_wrapper =
109 PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
110 let drained_task = primitive_wrapper.drain(num_rows)?;
111 let indices_decode_task = drained_task.task;
112 indices_decode_task.decode()
113 }
114}
115
116struct IndirectData {
117 decoded_indices: UInt64Array,
118 offsets_type: DataType,
119 validity: BooleanBuffer,
120 bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
121}
122
123impl PageScheduler for BinaryPageScheduler {
124 fn schedule_ranges(
125 &self,
126 ranges: &[std::ops::Range<u64>],
127 scheduler: &Arc<dyn EncodingsIo>,
128 top_level_row: u64,
129 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
130 let indices_ranges = ranges
135 .iter()
136 .map(|range| {
137 if range.start != 0 {
138 (range.start - 1)..range.end
139 } else {
140 0..range.end
141 }
142 })
143 .collect::<Vec<std::ops::Range<u64>>>();
144
145 let indices_page_decoder =
148 self.indices_scheduler
149 .schedule_ranges(&indices_ranges, scheduler, top_level_row);
150
151 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
152 let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
153
154 let ranges = ranges.to_vec();
155 let copy_scheduler = scheduler.clone();
156 let copy_bytes_scheduler = self.bytes_scheduler.clone();
157 let null_adjustment = self.null_adjustment;
158 let offsets_type = self.offsets_type.clone();
159
160 tokio::spawn(async move {
161 let indices_decoder = Arc::from(indices_page_decoder.await?);
173 let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
174 let decoded_indices = indices.as_primitive::<UInt64Type>();
175
176 let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
177 let mut bytes_ranges = Vec::new();
178 let mut curr_offset_index = 0;
179
180 for curr_row_range in ranges.iter() {
181 let row_start = curr_row_range.start;
182 let curr_range_len = (curr_row_range.end - row_start) as usize;
183
184 let curr_indices;
185
186 if row_start == 0 {
187 curr_indices = decoded_indices.slice(0, curr_range_len);
188 curr_offset_index = curr_range_len;
189 } else {
190 curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
191 curr_offset_index += curr_range_len + 1;
192 }
193
194 let first = if row_start == 0 {
195 0
196 } else {
197 indices_builder
198 .normalize(*curr_indices.values().first().unwrap())
199 .1
200 };
201 let last = indices_builder
202 .normalize(*curr_indices.values().last().unwrap())
203 .1;
204 if first != last {
205 bytes_ranges.push(first..last);
206 }
207
208 indices_builder.extend(&curr_indices, row_start == 0);
209 }
210
211 let (indices, validity) = indices_builder.into_parts();
212 let decoded_indices = UInt64Array::from(indices);
213
214 let bytes_decoder_fut =
218 copy_bytes_scheduler.schedule_ranges(&bytes_ranges, ©_scheduler, top_level_row);
219
220 Ok(IndirectData {
221 decoded_indices,
222 validity,
223 offsets_type,
224 bytes_decoder_fut,
225 })
226 })
227 .map(|join_handle| join_handle.unwrap())
229 .and_then(|indirect_data| {
230 async move {
231 let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
234 Ok(Box::new(BinaryPageDecoder {
235 decoded_indices: indirect_data.decoded_indices,
236 offsets_type: indirect_data.offsets_type,
237 validity: indirect_data.validity,
238 bytes_decoder,
239 }) as Box<dyn PrimitivePageDecoder>)
240 }
241 })
242 .boxed()
243 }
244}
245
246struct BinaryPageDecoder {
247 decoded_indices: UInt64Array,
248 offsets_type: DataType,
249 validity: BooleanBuffer,
250 bytes_decoder: Box<dyn PrimitivePageDecoder>,
251}
252
253impl PrimitivePageDecoder for BinaryPageDecoder {
254 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
267 let target_validity = self
269 .validity
270 .slice(rows_to_skip as usize, num_rows as usize);
271 let has_nulls = target_validity.count_set_bits() < target_validity.len();
272
273 let validity_buffer = if has_nulls {
274 let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
275 let mut validity_buffer = Vec::with_capacity(num_validity_bits);
276
277 if rows_to_skip == 0 {
278 validity_buffer.extend_from_slice(target_validity.inner().as_slice());
279 } else {
280 let target_validity = BooleanBuffer::from_iter(target_validity.iter());
282 validity_buffer.extend_from_slice(target_validity.inner().as_slice());
283 }
284 Some(validity_buffer)
285 } else {
286 None
287 };
288
289 let bytes_per_offset = match self.offsets_type {
294 DataType::Int32 => 4,
295 DataType::Int64 => 8,
296 _ => panic!("Unsupported offsets type"),
297 };
298
299 let target_offsets = self
300 .decoded_indices
301 .slice(rows_to_skip as usize, (num_rows + 1) as usize);
302
303 let target_vec = target_offsets.values();
305 let start = target_vec[0];
306 let offsets_buffer =
307 match bytes_per_offset {
308 4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
309 .into_inner(),
310 8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
311 .into_inner(),
312 _ => panic!("Unsupported offsets type"),
313 };
314
315 let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
316 let num_bytes = self
317 .decoded_indices
318 .value((rows_to_skip + num_rows) as usize)
319 - bytes_to_skip;
320
321 let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
322 let bytes = bytes.as_fixed_width().unwrap();
323 debug_assert_eq!(bytes.bits_per_value, 8);
324
325 let string_data = DataBlock::VariableWidth(VariableWidthBlock {
326 bits_per_offset: bytes_per_offset * 8,
327 data: bytes.data,
328 num_values: num_rows,
329 offsets: LanceBuffer::from(offsets_buffer),
330 block_info: BlockInfo::new(),
331 });
332 if let Some(validity) = validity_buffer {
333 Ok(DataBlock::Nullable(NullableDataBlock {
334 data: Box::new(string_data),
335 nulls: LanceBuffer::from(validity),
336 block_info: BlockInfo::new(),
337 }))
338 } else {
339 Ok(string_data)
340 }
341 }
342}
343
344#[derive(Debug)]
345pub struct BinaryEncoder {
346 indices_encoder: Box<dyn ArrayEncoder>,
347 compression_config: Option<CompressionConfig>,
348 buffer_compressor: Option<Box<dyn BufferCompressor>>,
349}
350
351impl BinaryEncoder {
352 pub fn try_new(
353 indices_encoder: Box<dyn ArrayEncoder>,
354 compression_config: Option<CompressionConfig>,
355 ) -> Result<Self> {
356 let buffer_compressor = compression_config
357 .map(GeneralBufferCompressor::get_compressor)
358 .transpose()?;
359 Ok(Self {
360 indices_encoder,
361 compression_config,
362 buffer_compressor,
363 })
364 }
365
366 fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
369 if matches!(data_type, DataType::Binary | DataType::Utf8) {
370 VariableWidthBlock {
371 bits_per_offset: 32,
372 data: LanceBuffer::empty(),
373 num_values,
374 offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
375 block_info: BlockInfo::new(),
376 }
377 } else {
378 VariableWidthBlock {
379 bits_per_offset: 64,
380 data: LanceBuffer::empty(),
381 num_values,
382 offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
383 block_info: BlockInfo::new(),
384 }
385 }
386 }
387}
388
389fn get_indices_from_string_arrays(
394 offsets: LanceBuffer,
395 bits_per_offset: u8,
396 nulls: Option<LanceBuffer>,
397 num_rows: usize,
398) -> (DataBlock, u64) {
399 let mut indices = Vec::with_capacity(num_rows);
400 let mut last_offset = 0_u64;
401 if bits_per_offset == 32 {
402 let offsets = offsets.borrow_to_typed_slice::<i32>();
403 indices.extend(offsets.as_ref().windows(2).map(|w| {
404 let strlen = (w[1] - w[0]) as u64;
405 last_offset += strlen;
406 last_offset
407 }));
408 } else if bits_per_offset == 64 {
409 let offsets = offsets.borrow_to_typed_slice::<i64>();
410 indices.extend(offsets.as_ref().windows(2).map(|w| {
411 let strlen = (w[1] - w[0]) as u64;
412 last_offset += strlen;
413 last_offset
414 }));
415 }
416
417 if indices.is_empty() {
418 return (
419 DataBlock::FixedWidth(FixedWidthDataBlock {
420 bits_per_value: 64,
421 data: LanceBuffer::empty(),
422 num_values: 0,
423 block_info: BlockInfo::new(),
424 }),
425 0,
426 );
427 }
428
429 let last_offset = *indices.last().expect("Indices array is empty");
430 assert!(
432 last_offset < u64::MAX / 2,
433 "Indices array with strings up to 2^63 is too large for this encoding"
434 );
435 let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
436
437 if let Some(nulls) = nulls {
438 let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
439 indices
440 .iter_mut()
441 .zip(nulls.iter())
442 .for_each(|(index, is_valid)| {
443 if !is_valid {
444 *index += null_adjustment;
445 }
446 });
447 }
448 let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
449 bits_per_value: 64,
450 data: LanceBuffer::reinterpret_vec(indices),
451 num_values: num_rows as u64,
452 block_info: BlockInfo::new(),
453 });
454 (indices, null_adjustment)
455}
456
457impl ArrayEncoder for BinaryEncoder {
458 fn encode(
459 &self,
460 data: DataBlock,
461 data_type: &DataType,
462 buffer_index: &mut u32,
463 ) -> Result<EncodedArray> {
464 let (mut data, nulls) = match data {
465 DataBlock::Nullable(nullable) => {
466 let data = nullable.data.as_variable_width().unwrap();
467 (data, Some(nullable.nulls))
468 }
469 DataBlock::VariableWidth(variable) => (variable, None),
470 DataBlock::AllNull(all_null) => {
471 let data = Self::all_null_variable_width(data_type, all_null.num_values);
472 let validity =
473 LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
474 (data, Some(validity))
475 }
476 _ => panic!("Expected variable width data block but got {}", data.name()),
477 };
478
479 let (indices, null_adjustment) = get_indices_from_string_arrays(
480 data.offsets,
481 data.bits_per_offset,
482 nulls,
483 data.num_values as usize,
484 );
485 let encoded_indices =
486 self.indices_encoder
487 .encode(indices, &DataType::UInt64, buffer_index)?;
488
489 let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
490
491 assert!(encoded_indices_data.bits_per_value <= 64);
492
493 if let Some(buffer_compressor) = &self.buffer_compressor {
494 let mut compressed_data = Vec::with_capacity(data.data.len());
495 buffer_compressor.compress(&data.data, &mut compressed_data)?;
496 data.data = LanceBuffer::from(compressed_data);
497 }
498
499 let data = DataBlock::VariableWidth(VariableWidthBlock {
500 bits_per_offset: encoded_indices_data.bits_per_value as u8,
501 offsets: encoded_indices_data.data,
502 data: data.data,
503 num_values: data.num_values,
504 block_info: BlockInfo::new(),
505 });
506
507 let bytes_buffer_index = *buffer_index;
508 *buffer_index += 1;
509
510 let bytes_encoding = ProtobufUtils::flat_encoding(
511 8,
512 bytes_buffer_index,
513 self.compression_config,
514 );
515
516 let encoding =
517 ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
518
519 Ok(EncodedArray { data, encoding })
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use arrow_array::StringArray;
526
527 use super::*;
528
529 #[test]
530 fn test_encode_indices_adjusts_nulls() {
531 let string_array = Arc::new(StringArray::from(vec![
533 None,
534 Some("foo"),
535 Some("foo"),
536 None,
537 None,
538 None,
539 ])) as ArrayRef;
540 let string_data = DataBlock::from(string_array).as_nullable().unwrap();
541 let nulls = string_data.nulls;
542 let string_data = string_data.data.as_variable_width().unwrap();
543
544 let (indices, null_adjustment) = get_indices_from_string_arrays(
545 string_data.offsets,
546 string_data.bits_per_offset,
547 Some(nulls),
548 string_data.num_values as usize,
549 );
550
551 let indices = indices.as_fixed_width().unwrap();
552 assert_eq!(indices.bits_per_value, 64);
553 assert_eq!(
554 indices.data,
555 LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
556 );
557 assert_eq!(null_adjustment, 7);
558 }
559}