1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 ProtobufUtils21,
23 pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24 },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35 cache::{CacheKey, Context, DeepSizeOf},
36 error::{Error, LanceOptionExt},
37 utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::{
42 compression::{
43 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44 },
45 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46 utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50 encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56 encodings::logical::primitive::miniblock::MiniBlockCompressed,
57 statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60 repdef::{
61 CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
62 RepDefSlicer, build_control_word_iterator,
63 },
64 utils::accumulation::AccumulationQueue,
65};
66use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
67
68use crate::constants::{
69 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
70 DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
71 DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
72 DICT_VALUES_COMPRESSION_META_KEY,
73};
74use crate::version::LanceFileVersion;
75use crate::{
76 EncodingsIo,
77 buffer::LanceBuffer,
78 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
79 decoder::{
80 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
81 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
82 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
83 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
84 },
85 encoder::{
86 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
87 },
88 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
89};
90
91pub mod blob;
92pub mod constant;
93pub mod dict;
94pub mod fullzip;
95pub mod miniblock;
96
97const FILL_BYTE: u8 = 0xFE;
98const DEFAULT_DICT_DIVISOR: u64 = 2;
99const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
100const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
101const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
102
103struct PageLoadTask {
104 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
105 num_rows: u64,
106}
107
108trait StructuralPageScheduler: std::fmt::Debug + Send {
111 fn initialize<'a>(
113 &'a mut self,
114 io: &Arc<dyn EncodingsIo>,
115 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
116 fn load(&mut self, data: &Arc<dyn CachedPageData>);
118 fn schedule_ranges(
127 &self,
128 ranges: &[Range<u64>],
129 io: &Arc<dyn EncodingsIo>,
130 ) -> Result<Vec<PageLoadTask>>;
131}
132
133#[derive(Debug)]
135struct ChunkMeta {
136 num_values: u64,
137 chunk_size_bytes: u64,
138 offset_bytes: u64,
139}
140
141#[derive(Debug, Clone)]
143struct DecodedMiniBlockChunk {
144 rep: Option<ScalarBuffer<u16>>,
145 def: Option<ScalarBuffer<u16>>,
146 values: DataBlock,
147}
148
149#[derive(Debug)]
157struct DecodeMiniBlockTask {
158 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
159 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
160 value_decompressor: Arc<dyn MiniBlockDecompressor>,
161 dictionary_data: Option<Arc<DataBlock>>,
162 def_meaning: Arc<[DefinitionInterpretation]>,
163 num_buffers: u64,
164 max_visible_level: u16,
165 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
166 has_large_chunk: bool,
167}
168
169impl DecodeMiniBlockTask {
170 fn decode_levels(
171 rep_decompressor: &dyn BlockDecompressor,
172 levels: LanceBuffer,
173 num_levels: u16,
174 ) -> Result<ScalarBuffer<u16>> {
175 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
176 let rep = rep.as_fixed_width().unwrap();
177 debug_assert_eq!(rep.num_values, num_levels as u64);
178 debug_assert_eq!(rep.bits_per_value, 16);
179 Ok(rep.data.borrow_to_typed_slice::<u16>())
180 }
181
182 fn extend_levels(
189 range: Range<u64>,
190 levels: &mut Option<LevelBuffer>,
191 level_buf: &Option<impl AsRef<[u16]>>,
192 dest_offset: usize,
193 ) {
194 if let Some(level_buf) = level_buf {
195 if levels.is_none() {
196 let mut new_levels_vec =
199 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
200 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
201 *levels = Some(new_levels_vec);
202 }
203 levels.as_mut().unwrap().extend(
204 level_buf.as_ref()[range.start as usize..range.end as usize]
205 .iter()
206 .copied(),
207 );
208 } else if let Some(levels) = levels {
209 let num_values = (range.end - range.start) as usize;
210 levels.extend(iter::repeat_n(0, num_values));
213 }
214 }
215
216 fn map_range(
253 range: Range<u64>,
254 rep: Option<&impl AsRef<[u16]>>,
255 def: Option<&impl AsRef<[u16]>>,
256 max_rep: u16,
257 max_visible_def: u16,
258 total_items: u64,
261 preamble_action: PreambleAction,
262 ) -> (Range<u64>, Range<u64>) {
263 if let Some(rep) = rep {
264 let mut rep = rep.as_ref();
265 let mut items_in_preamble = 0_u64;
268 let first_row_start = match preamble_action {
269 PreambleAction::Skip | PreambleAction::Take => {
270 let first_row_start = if let Some(def) = def.as_ref() {
271 let mut first_row_start = None;
272 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
273 if *rep == max_rep {
274 first_row_start = Some(idx as u64);
275 break;
276 }
277 if *def <= max_visible_def {
278 items_in_preamble += 1;
279 }
280 }
281 first_row_start
282 } else {
283 let first_row_start =
284 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
285 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
286 first_row_start
287 };
288 if first_row_start.is_none() {
291 assert!(preamble_action == PreambleAction::Take);
292 return (0..total_items, 0..rep.len() as u64);
293 }
294 let first_row_start = first_row_start.unwrap();
295 rep = &rep[first_row_start as usize..];
296 first_row_start
297 }
298 PreambleAction::Absent => {
299 debug_assert!(rep[0] == max_rep);
300 0
301 }
302 };
303
304 if range.start == range.end {
306 debug_assert!(preamble_action == PreambleAction::Take);
307 debug_assert!(items_in_preamble <= total_items);
308 return (0..items_in_preamble, 0..first_row_start);
309 }
310 assert!(range.start < range.end);
311
312 let mut rows_seen = 0;
313 let mut new_start = 0;
314 let mut new_levels_start = 0;
315
316 if let Some(def) = def {
317 let def = &def.as_ref()[first_row_start as usize..];
318
319 let mut lead_invis_seen = 0;
321
322 if range.start > 0 {
323 if def[0] > max_visible_def {
324 lead_invis_seen += 1;
325 }
326 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
327 if *rep == max_rep {
328 rows_seen += 1;
329 if rows_seen == range.start {
330 new_start = idx as u64 + 1 - lead_invis_seen;
331 new_levels_start = idx as u64 + 1;
332 break;
333 }
334 }
335 if *def > max_visible_def {
336 lead_invis_seen += 1;
337 }
338 }
339 }
340
341 rows_seen += 1;
342
343 let mut new_end = u64::MAX;
344 let mut new_levels_end = rep.len() as u64;
345 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
346 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
347 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
348 .iter()
349 .zip(&def[(new_levels_start + 1) as usize..])
350 .enumerate()
351 {
352 if *rep == max_rep {
353 rows_seen += 1;
354 if rows_seen == range.end + 1 {
355 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
356 new_levels_end = idx as u64 + new_levels_start + 1;
357 break;
358 }
359 }
360 if *def > max_visible_def {
361 tail_invis_seen += 1;
362 }
363 }
364
365 if new_end == u64::MAX {
366 new_levels_end = rep.len() as u64;
367 let total_invis_seen = lead_invis_seen + tail_invis_seen;
368 new_end = rep.len() as u64 - total_invis_seen;
369 }
370
371 assert_ne!(new_end, u64::MAX);
372
373 if preamble_action == PreambleAction::Skip {
375 new_start += items_in_preamble;
376 new_end += items_in_preamble;
377 new_levels_start += first_row_start;
378 new_levels_end += first_row_start;
379 } else if preamble_action == PreambleAction::Take {
380 debug_assert_eq!(new_start, 0);
381 debug_assert_eq!(new_levels_start, 0);
382 new_end += items_in_preamble;
383 new_levels_end += first_row_start;
384 }
385
386 debug_assert!(new_end <= total_items);
387 (new_start..new_end, new_levels_start..new_levels_end)
388 } else {
389 if range.start > 0 {
395 for (idx, rep) in rep.iter().skip(1).enumerate() {
396 if *rep == max_rep {
397 rows_seen += 1;
398 if rows_seen == range.start {
399 new_start = idx as u64 + 1;
400 break;
401 }
402 }
403 }
404 }
405 let mut new_end = rep.len() as u64;
406 if range.end < total_items {
408 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
409 if *rep == max_rep {
410 rows_seen += 1;
411 if rows_seen == range.end {
412 new_end = idx as u64 + new_start + 1;
413 break;
414 }
415 }
416 }
417 }
418
419 if preamble_action == PreambleAction::Skip {
421 new_start += first_row_start;
422 new_end += first_row_start;
423 } else if preamble_action == PreambleAction::Take {
424 debug_assert_eq!(new_start, 0);
425 new_end += first_row_start;
426 }
427
428 debug_assert!(new_end <= total_items);
429 (new_start..new_end, new_start..new_end)
430 }
431 } else {
432 (range.clone(), range)
435 }
436 }
437
438 fn read_buffer_sizes<const LARGE: bool>(
440 buf: &[u8],
441 offset: &mut usize,
442 num_buffers: u64,
443 ) -> Vec<u32> {
444 let read_size = if LARGE { 4 } else { 2 };
445 (0..num_buffers)
446 .map(|_| {
447 let bytes = &buf[*offset..*offset + read_size];
448 let size = if LARGE {
449 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
450 } else {
451 u16::from_le_bytes([bytes[0], bytes[1]]) as u32
453 };
454 *offset += read_size;
455 size
456 })
457 .collect()
458 }
459
460 fn decode_miniblock_chunk(
462 &self,
463 buf: &LanceBuffer,
464 items_in_chunk: u64,
465 ) -> Result<DecodedMiniBlockChunk> {
466 let mut offset = 0;
467 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
468 offset += 2;
469
470 let rep_size = if self.rep_decompressor.is_some() {
471 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
472 offset += 2;
473 Some(rep_size)
474 } else {
475 None
476 };
477 let def_size = if self.def_decompressor.is_some() {
478 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
479 offset += 2;
480 Some(def_size)
481 } else {
482 None
483 };
484
485 let buffer_sizes = if self.has_large_chunk {
486 Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
487 } else {
488 Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
489 };
490
491 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
492
493 let rep = rep_size.map(|rep_size| {
494 let rep = buf.slice_with_length(offset, rep_size as usize);
495 offset += rep_size as usize;
496 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
497 rep
498 });
499
500 let def = def_size.map(|def_size| {
501 let def = buf.slice_with_length(offset, def_size as usize);
502 offset += def_size as usize;
503 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504 def
505 });
506
507 let buffers = buffer_sizes
508 .into_iter()
509 .map(|buf_size| {
510 let buf = buf.slice_with_length(offset, buf_size as usize);
511 offset += buf_size as usize;
512 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
513 buf
514 })
515 .collect::<Vec<_>>();
516
517 let values = self
518 .value_decompressor
519 .decompress(buffers, items_in_chunk)?;
520
521 let rep = rep
522 .map(|rep| {
523 Self::decode_levels(
524 self.rep_decompressor.as_ref().unwrap().as_ref(),
525 rep,
526 num_levels,
527 )
528 })
529 .transpose()?;
530 let def = def
531 .map(|def| {
532 Self::decode_levels(
533 self.def_decompressor.as_ref().unwrap().as_ref(),
534 def,
535 num_levels,
536 )
537 })
538 .transpose()?;
539
540 Ok(DecodedMiniBlockChunk { rep, def, values })
541 }
542}
543
544impl DecodePageTask for DecodeMiniBlockTask {
545 fn decode(self: Box<Self>) -> Result<DecodedPage> {
546 let mut repbuf: Option<LevelBuffer> = None;
548 let mut defbuf: Option<LevelBuffer> = None;
549
550 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
551
552 let estimated_size_bytes = self
554 .instructions
555 .iter()
556 .map(|(_, chunk)| chunk.data.len())
557 .sum::<usize>()
558 * 2;
559 let mut data_builder =
560 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
561
562 let mut level_offset = 0;
564
565 let needs_caching: Vec<bool> = self
567 .instructions
568 .windows(2)
569 .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
570 .chain(std::iter::once(false)) .collect();
572
573 let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
575
576 for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
578 let should_cache_this_chunk = needs_caching[idx];
579
580 let decoded_chunk = match &chunk_cache {
581 Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
582 cached_chunk.clone()
584 }
585 _ => {
586 let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
588
589 if should_cache_this_chunk {
591 chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
592 }
593 decoded
594 }
595 };
596
597 let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
598
599 let row_range_start =
601 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
602 let row_range_end = row_range_start + instructions.rows_to_take;
603
604 let (item_range, level_range) = Self::map_range(
606 row_range_start..row_range_end,
607 rep.as_ref(),
608 def.as_ref(),
609 max_rep,
610 self.max_visible_level,
611 chunk.items_in_chunk,
612 instructions.preamble_action,
613 );
614 if item_range.end - item_range.start > chunk.items_in_chunk {
615 return Err(lance_core::Error::internal(format!(
616 "Item range {:?} is greater than chunk items in chunk {:?}",
617 item_range, chunk.items_in_chunk
618 )));
619 }
620
621 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
623 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
624 level_offset += (level_range.end - level_range.start) as usize;
625 data_builder.append(&values, item_range);
626 }
627
628 let mut data = data_builder.finish();
629
630 let unraveler =
631 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
632
633 if let Some(dictionary) = &self.dictionary_data {
634 let DataBlock::FixedWidth(indices) = data else {
636 return Err(lance_core::Error::internal(format!(
637 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
638 data
639 )));
640 };
641 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
642 indices,
643 dictionary.as_ref().clone(),
644 ));
645 }
646
647 Ok(DecodedPage {
648 data,
649 repdef: unraveler,
650 })
651 }
652}
653
654#[derive(Debug)]
657struct LoadedChunk {
658 data: LanceBuffer,
659 items_in_chunk: u64,
660 byte_range: Range<u64>,
661 chunk_idx: usize,
662}
663
664impl Clone for LoadedChunk {
665 fn clone(&self) -> Self {
666 Self {
667 data: self.data.clone(),
669 items_in_chunk: self.items_in_chunk,
670 byte_range: self.byte_range.clone(),
671 chunk_idx: self.chunk_idx,
672 }
673 }
674}
675
676#[derive(Debug)]
679struct MiniBlockDecoder {
680 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
681 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
682 value_decompressor: Arc<dyn MiniBlockDecompressor>,
683 def_meaning: Arc<[DefinitionInterpretation]>,
684 loaded_chunks: VecDeque<LoadedChunk>,
685 instructions: VecDeque<ChunkInstructions>,
686 offset_in_current_chunk: u64,
687 num_rows: u64,
688 num_buffers: u64,
689 dictionary: Option<Arc<DataBlock>>,
690 has_large_chunk: bool,
691}
692
693impl StructuralPageDecoder for MiniBlockDecoder {
696 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
697 let mut items_desired = num_rows;
698 let mut need_preamble = false;
699 let mut skip_in_chunk = self.offset_in_current_chunk;
700 let mut drain_instructions = Vec::new();
701 while items_desired > 0 || need_preamble {
702 let (instructions, consumed) = self
703 .instructions
704 .front()
705 .unwrap()
706 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
707
708 while self.loaded_chunks.front().unwrap().chunk_idx
709 != instructions.chunk_instructions.chunk_idx
710 {
711 self.loaded_chunks.pop_front();
712 }
713 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
714 if consumed {
715 self.instructions.pop_front();
716 }
717 }
718 self.offset_in_current_chunk = skip_in_chunk;
721
722 let max_visible_level = self
723 .def_meaning
724 .iter()
725 .take_while(|l| !l.is_list())
726 .map(|l| l.num_def_levels())
727 .sum::<u16>();
728
729 Ok(Box::new(DecodeMiniBlockTask {
730 instructions: drain_instructions,
731 def_decompressor: self.def_decompressor.clone(),
732 rep_decompressor: self.rep_decompressor.clone(),
733 value_decompressor: self.value_decompressor.clone(),
734 dictionary_data: self.dictionary.clone(),
735 def_meaning: self.def_meaning.clone(),
736 num_buffers: self.num_buffers,
737 max_visible_level,
738 has_large_chunk: self.has_large_chunk,
739 }))
740 }
741
742 fn num_rows(&self) -> u64 {
743 self.num_rows
744 }
745}
746
747#[derive(Debug)]
748struct CachedComplexAllNullState {
749 rep: Option<ScalarBuffer<u16>>,
750 def: Option<ScalarBuffer<u16>>,
751}
752
753impl DeepSizeOf for CachedComplexAllNullState {
754 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
755 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
757 }
758}
759
760impl CachedPageData for CachedComplexAllNullState {
761 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
762 self
763 }
764}
765
766#[derive(Debug)]
775pub struct ComplexAllNullScheduler {
776 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
778 def_meaning: Arc<[DefinitionInterpretation]>,
779 repdef: Option<Arc<CachedComplexAllNullState>>,
780 max_rep: u16,
781 max_visible_level: u16,
782 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
783 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
784 num_rep_values: u64,
785 num_def_values: u64,
786}
787
788impl ComplexAllNullScheduler {
789 pub fn new(
790 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
791 def_meaning: Arc<[DefinitionInterpretation]>,
792 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
793 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
794 num_rep_values: u64,
795 num_def_values: u64,
796 ) -> Self {
797 let max_rep = def_meaning.iter().filter(|l| l.is_list()).count() as u16;
798 let max_visible_level = def_meaning
799 .iter()
800 .take_while(|l| !l.is_list())
801 .map(|l| l.num_def_levels())
802 .sum::<u16>();
803 Self {
804 buffer_offsets_and_sizes,
805 def_meaning,
806 repdef: None,
807 max_rep,
808 max_visible_level,
809 rep_decompressor,
810 def_decompressor,
811 num_rep_values,
812 num_def_values,
813 }
814 }
815}
816
817impl StructuralPageScheduler for ComplexAllNullScheduler {
818 fn initialize<'a>(
819 &'a mut self,
820 io: &Arc<dyn EncodingsIo>,
821 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
822 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
824 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
825 let has_rep = rep_size > 0;
826 let has_def = def_size > 0;
827
828 let mut reads = Vec::with_capacity(2);
829 if has_rep {
830 reads.push(rep_pos..rep_pos + rep_size);
831 }
832 if has_def {
833 reads.push(def_pos..def_pos + def_size);
834 }
835
836 let data = io.submit_request(reads, 0);
837 let rep_decompressor = self.rep_decompressor.clone();
838 let def_decompressor = self.def_decompressor.clone();
839 let num_rep_values = self.num_rep_values;
840 let num_def_values = self.num_def_values;
841
842 async move {
843 let data = data.await?;
844 let mut data_iter = data.into_iter();
845
846 let decompress_levels = |compressed_bytes: Bytes,
847 decompressor: &Arc<dyn BlockDecompressor>,
848 num_values: u64,
849 level_type: &str|
850 -> Result<ScalarBuffer<u16>> {
851 let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
852 let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
853 match decompressed {
854 DataBlock::FixedWidth(block) => {
855 if block.num_values != num_values {
856 return Err(Error::invalid_input_source(format!(
857 "Unexpected {} level count after decompression: expected {}, got {}",
858 level_type, num_values, block.num_values
859 )
860 .into()));
861 }
862 if block.bits_per_value != 16 {
863 return Err(Error::invalid_input_source(format!(
864 "Unexpected {} level bit width after decompression: expected 16, got {}",
865 level_type, block.bits_per_value
866 )
867 .into()));
868 }
869 Ok(block.data.borrow_to_typed_slice::<u16>())
870 }
871 _ => Err(Error::invalid_input_source(format!(
872 "Expected fixed-width data block for {} levels",
873 level_type
874 )
875 .into())),
876 }
877 };
878
879 let rep = if has_rep {
880 let rep = data_iter.next().unwrap();
881 if let Some(rep_decompressor) = rep_decompressor.as_ref() {
882 Some(decompress_levels(
883 rep,
884 rep_decompressor,
885 num_rep_values,
886 "repetition",
887 )?)
888 } else {
889 let rep = LanceBuffer::from_bytes(rep, 2);
890 let rep = rep.borrow_to_typed_slice::<u16>();
891 Some(rep)
892 }
893 } else {
894 None
895 };
896
897 let def = if has_def {
898 let def = data_iter.next().unwrap();
899 if let Some(def_decompressor) = def_decompressor.as_ref() {
900 Some(decompress_levels(
901 def,
902 def_decompressor,
903 num_def_values,
904 "definition",
905 )?)
906 } else {
907 let def = LanceBuffer::from_bytes(def, 2);
908 let def = def.borrow_to_typed_slice::<u16>();
909 Some(def)
910 }
911 } else {
912 None
913 };
914
915 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
916
917 self.repdef = Some(repdef.clone());
918
919 Ok(repdef as Arc<dyn CachedPageData>)
920 }
921 .boxed()
922 }
923
924 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
925 self.repdef = Some(
926 data.clone()
927 .as_arc_any()
928 .downcast::<CachedComplexAllNullState>()
929 .unwrap(),
930 );
931 }
932
933 fn schedule_ranges(
934 &self,
935 ranges: &[Range<u64>],
936 _io: &Arc<dyn EncodingsIo>,
937 ) -> Result<Vec<PageLoadTask>> {
938 let ranges = VecDeque::from_iter(ranges.iter().cloned());
939 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
940 let decoder = Box::new(ComplexAllNullPageDecoder {
941 ranges,
942 rep: self.repdef.as_ref().unwrap().rep.clone(),
943 def: self.repdef.as_ref().unwrap().def.clone(),
944 num_rows,
945 def_meaning: self.def_meaning.clone(),
946 max_rep: self.max_rep,
947 max_visible_level: self.max_visible_level,
948 cursor_row: 0,
949 cursor_level: 0,
950 }) as Box<dyn StructuralPageDecoder>;
951 let page_load_task = PageLoadTask {
952 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
953 num_rows,
954 };
955 Ok(vec![page_load_task])
956 }
957}
958
959#[derive(Debug)]
960pub struct ComplexAllNullPageDecoder {
961 ranges: VecDeque<Range<u64>>,
962 rep: Option<ScalarBuffer<u16>>,
963 def: Option<ScalarBuffer<u16>>,
964 num_rows: u64,
965 def_meaning: Arc<[DefinitionInterpretation]>,
966 max_rep: u16,
967 max_visible_level: u16,
968 cursor_row: u64,
969 cursor_level: usize,
970}
971
972impl ComplexAllNullPageDecoder {
973 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
974 let mut rows_desired = num_rows;
975 let mut ranges = Vec::with_capacity(self.ranges.len());
976 while rows_desired > 0 {
977 let front = self.ranges.front_mut().unwrap();
978 let avail = front.end - front.start;
979 if avail > rows_desired {
980 ranges.push(front.start..front.start + rows_desired);
981 front.start += rows_desired;
982 rows_desired = 0;
983 } else {
984 ranges.push(self.ranges.pop_front().unwrap());
985 rows_desired -= avail;
986 }
987 }
988 ranges
989 }
990
991 fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
992 let start = self.cursor_level;
993 let end = if let Some(rep) = &self.rep {
994 if start >= rep.len() {
995 return Err(Error::internal(
996 "Invalid complex all-null layout: repetition buffer too short",
997 ));
998 }
999 if rep[start] != self.max_rep {
1000 return Err(Error::internal(
1001 "Invalid complex all-null layout: row did not start at max repetition level",
1002 ));
1003 }
1004 let mut end = start + 1;
1005 while end < rep.len() && rep[end] != self.max_rep {
1006 end += 1;
1007 }
1008 end
1009 } else {
1010 start + 1
1011 };
1012
1013 let visible = if let Some(def) = &self.def {
1014 if end > def.len() {
1015 return Err(Error::internal(
1016 "Invalid complex all-null layout: definition buffer too short",
1017 ));
1018 }
1019 def[start..end]
1020 .iter()
1021 .filter(|d| **d <= self.max_visible_level)
1022 .count() as u64
1023 } else {
1024 (end - start) as u64
1025 };
1026
1027 self.cursor_level = end;
1028 self.cursor_row += 1;
1029 Ok((start..end, visible))
1030 }
1031
1032 fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
1033 while self.cursor_row < target_row {
1034 self.take_row()?;
1035 }
1036 Ok(())
1037 }
1038}
1039
1040impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1041 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1042 let drained_ranges = self.drain_ranges(num_rows);
1043 let mut level_slices: Vec<Range<usize>> = Vec::new();
1044 let mut visible_items_total = 0;
1045
1046 for range in drained_ranges {
1047 self.skip_to_row(range.start)?;
1048 for _ in range.start..range.end {
1049 let (level_range, visible) = self.take_row()?;
1050 visible_items_total += visible;
1051 if let Some(last) = level_slices.last_mut()
1052 && last.end == level_range.start
1053 {
1054 last.end = level_range.end;
1055 continue;
1056 }
1057 level_slices.push(level_range);
1058 }
1059 }
1060
1061 Ok(Box::new(DecodeComplexAllNullTask {
1062 level_slices,
1063 visible_items_total,
1064 rep: self.rep.clone(),
1065 def: self.def.clone(),
1066 def_meaning: self.def_meaning.clone(),
1067 max_visible_level: self.max_visible_level,
1068 }))
1069 }
1070
1071 fn num_rows(&self) -> u64 {
1072 self.num_rows
1073 }
1074}
1075
1076#[derive(Debug)]
1079pub struct DecodeComplexAllNullTask {
1080 level_slices: Vec<Range<usize>>,
1081 visible_items_total: u64,
1082 rep: Option<ScalarBuffer<u16>>,
1083 def: Option<ScalarBuffer<u16>>,
1084 def_meaning: Arc<[DefinitionInterpretation]>,
1085 max_visible_level: u16,
1086}
1087
1088impl DecodeComplexAllNullTask {
1089 fn decode_level(&self, levels: &Option<ScalarBuffer<u16>>) -> Option<Vec<u16>> {
1090 levels.as_ref().map(|levels| {
1091 let num_levels = self
1092 .level_slices
1093 .iter()
1094 .map(|range| range.end - range.start)
1095 .sum();
1096 let mut referenced_levels = Vec::with_capacity(num_levels);
1097 for range in &self.level_slices {
1098 referenced_levels.extend(levels[range.start..range.end].iter().copied());
1099 }
1100 referenced_levels
1101 })
1102 }
1103}
1104
1105impl DecodePageTask for DecodeComplexAllNullTask {
1106 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1107 let rep = self.decode_level(&self.rep);
1108 let def = self.decode_level(&self.def);
1109
1110 let num_values = if let Some(def) = &def {
1114 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1115 } else {
1116 self.visible_items_total
1117 };
1118
1119 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1120 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1121 Ok(DecodedPage {
1122 data,
1123 repdef: unraveler,
1124 })
1125 }
1126}
1127
1128#[derive(Debug, Default)]
1133pub struct SimpleAllNullScheduler {}
1134
1135impl StructuralPageScheduler for SimpleAllNullScheduler {
1136 fn initialize<'a>(
1137 &'a mut self,
1138 _io: &Arc<dyn EncodingsIo>,
1139 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1140 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1141 }
1142
1143 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1144
1145 fn schedule_ranges(
1146 &self,
1147 ranges: &[Range<u64>],
1148 _io: &Arc<dyn EncodingsIo>,
1149 ) -> Result<Vec<PageLoadTask>> {
1150 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1151 let decoder =
1152 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1153 let page_load_task = PageLoadTask {
1154 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1155 num_rows,
1156 };
1157 Ok(vec![page_load_task])
1158 }
1159}
1160
1161#[derive(Debug)]
1164struct SimpleAllNullDecodePageTask {
1165 num_values: u64,
1166}
1167impl DecodePageTask for SimpleAllNullDecodePageTask {
1168 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1169 let unraveler = RepDefUnraveler::new(
1170 None,
1171 Some(vec![1; self.num_values as usize]),
1172 Arc::new([DefinitionInterpretation::NullableItem]),
1173 self.num_values,
1174 );
1175 Ok(DecodedPage {
1176 data: DataBlock::AllNull(AllNullDataBlock {
1177 num_values: self.num_values,
1178 }),
1179 repdef: unraveler,
1180 })
1181 }
1182}
1183
1184#[derive(Debug)]
1185pub struct SimpleAllNullPageDecoder {
1186 num_rows: u64,
1187}
1188
1189impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1190 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1191 Ok(Box::new(SimpleAllNullDecodePageTask {
1192 num_values: num_rows,
1193 }))
1194 }
1195
1196 fn num_rows(&self) -> u64 {
1197 self.num_rows
1198 }
1199}
1200
1201#[derive(Debug, Clone)]
1202struct MiniBlockSchedulerDictionary {
1203 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1205 dictionary_buf_position_and_size: (u64, u64),
1206 dictionary_data_alignment: u64,
1207 num_dictionary_items: u64,
1208}
1209
1210#[derive(Debug)]
1212struct MiniBlockRepIndexBlock {
1213 first_row: u64,
1217 starts_including_trailer: u64,
1220 has_preamble: bool,
1222 has_trailer: bool,
1224}
1225
1226impl DeepSizeOf for MiniBlockRepIndexBlock {
1227 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1228 0
1229 }
1230}
1231
1232#[derive(Debug)]
1237struct MiniBlockRepIndex {
1238 blocks: Vec<MiniBlockRepIndexBlock>,
1239}
1240
1241impl DeepSizeOf for MiniBlockRepIndex {
1242 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1243 self.blocks.deep_size_of_children(context)
1244 }
1245}
1246
1247impl MiniBlockRepIndex {
1248 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1253 let mut blocks = Vec::with_capacity(chunks.len());
1254 let mut offset: u64 = 0;
1255
1256 for c in chunks {
1257 blocks.push(MiniBlockRepIndexBlock {
1258 first_row: offset,
1259 starts_including_trailer: c.num_values,
1260 has_preamble: false,
1261 has_trailer: false,
1262 });
1263
1264 offset += c.num_values;
1265 }
1266
1267 Self { blocks }
1268 }
1269
1270 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1276 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1278 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1279 let n = u64_slice.len() / stride;
1280
1281 let mut blocks = Vec::with_capacity(n);
1282 let mut chunk_has_preamble = false;
1283 let mut offset: u64 = 0;
1284
1285 for i in 0..n {
1287 let base_idx = i * stride;
1288 let ends = u64_slice[base_idx];
1289 let partial = u64_slice[base_idx + 1];
1290
1291 let has_trailer = partial > 0;
1292 let starts_including_trailer =
1294 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1295
1296 blocks.push(MiniBlockRepIndexBlock {
1297 first_row: offset,
1298 starts_including_trailer,
1299 has_preamble: chunk_has_preamble,
1300 has_trailer,
1301 });
1302
1303 chunk_has_preamble = has_trailer;
1304 offset += starts_including_trailer;
1305 }
1306
1307 Self { blocks }
1308 }
1309}
1310
1311#[derive(Debug)]
1313struct MiniBlockCacheableState {
1314 chunk_meta: Vec<ChunkMeta>,
1316 rep_index: MiniBlockRepIndex,
1318 dictionary: Option<Arc<DataBlock>>,
1320}
1321
1322impl DeepSizeOf for MiniBlockCacheableState {
1323 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1324 self.rep_index.deep_size_of_children(context)
1325 + self
1326 .dictionary
1327 .as_ref()
1328 .map(|dict| dict.data_size() as usize)
1329 .unwrap_or(0)
1330 }
1331}
1332
1333impl CachedPageData for MiniBlockCacheableState {
1334 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1335 self
1336 }
1337}
1338
1339#[derive(Debug)]
1366pub struct MiniBlockScheduler {
1367 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1369 priority: u64,
1370 items_in_page: u64,
1371 repetition_index_depth: u16,
1372 num_buffers: u64,
1373 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1374 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1375 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1376 def_meaning: Arc<[DefinitionInterpretation]>,
1377 dictionary: Option<MiniBlockSchedulerDictionary>,
1378 page_meta: Option<Arc<MiniBlockCacheableState>>,
1380 has_large_chunk: bool,
1381}
1382
1383impl MiniBlockScheduler {
1384 fn try_new(
1385 buffer_offsets_and_sizes: &[(u64, u64)],
1386 priority: u64,
1387 items_in_page: u64,
1388 layout: &pb21::MiniBlockLayout,
1389 decompressors: &dyn DecompressionStrategy,
1390 ) -> Result<Self> {
1391 let rep_decompressor = layout
1392 .rep_compression
1393 .as_ref()
1394 .map(|rep_compression| {
1395 decompressors
1396 .create_block_decompressor(rep_compression)
1397 .map(Arc::from)
1398 })
1399 .transpose()?;
1400 let def_decompressor = layout
1401 .def_compression
1402 .as_ref()
1403 .map(|def_compression| {
1404 decompressors
1405 .create_block_decompressor(def_compression)
1406 .map(Arc::from)
1407 })
1408 .transpose()?;
1409 let def_meaning = layout
1410 .layers
1411 .iter()
1412 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1413 .collect::<Vec<_>>();
1414 let value_decompressor = decompressors.create_miniblock_decompressor(
1415 layout.value_compression.as_ref().unwrap(),
1416 decompressors,
1417 )?;
1418
1419 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1420 let num_dictionary_items = layout.num_dictionary_items;
1421 let dictionary_decompressor = decompressors
1422 .create_block_decompressor(dictionary_encoding)?
1423 .into();
1424 let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1425 {
1426 Compression::Variable(_) => 4,
1427 Compression::Flat(_) => 16,
1428 Compression::General(_) => 1,
1429 Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1430 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1431 }
1432 _ => {
1433 return Err(Error::invalid_input_source(
1434 format!(
1435 "Unsupported mini-block dictionary encoding: {:?}",
1436 dictionary_encoding.compression.as_ref().unwrap()
1437 )
1438 .into(),
1439 ));
1440 }
1441 };
1442 Some(MiniBlockSchedulerDictionary {
1443 dictionary_decompressor,
1444 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1445 dictionary_data_alignment,
1446 num_dictionary_items,
1447 })
1448 } else {
1449 None
1450 };
1451
1452 Ok(Self {
1453 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1454 rep_decompressor,
1455 def_decompressor,
1456 value_decompressor: value_decompressor.into(),
1457 repetition_index_depth: layout.repetition_index_depth as u16,
1458 num_buffers: layout.num_buffers,
1459 priority,
1460 items_in_page,
1461 dictionary,
1462 def_meaning: def_meaning.into(),
1463 page_meta: None,
1464 has_large_chunk: layout.has_large_chunk,
1465 })
1466 }
1467
1468 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1469 let page_meta = self.page_meta.as_ref().unwrap();
1470 chunk_indices
1471 .iter()
1472 .map(|&chunk_idx| {
1473 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1474 let bytes_start = chunk_meta.offset_bytes;
1475 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1476 LoadedChunk {
1477 byte_range: bytes_start..bytes_end,
1478 items_in_chunk: chunk_meta.num_values,
1479 chunk_idx,
1480 data: LanceBuffer::empty(),
1481 }
1482 })
1483 .collect()
1484 }
1485}
1486
1487#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1488enum PreambleAction {
1489 Take,
1490 Skip,
1491 Absent,
1492}
1493
1494#[derive(Clone, Debug, PartialEq, Eq)]
1529struct ChunkInstructions {
1530 chunk_idx: usize,
1532 preamble: PreambleAction,
1538 rows_to_skip: u64,
1542 rows_to_take: u64,
1545 take_trailer: bool,
1552}
1553
1554#[derive(Debug, PartialEq, Eq)]
1572struct ChunkDrainInstructions {
1573 chunk_instructions: ChunkInstructions,
1574 rows_to_skip: u64,
1575 rows_to_take: u64,
1576 preamble_action: PreambleAction,
1577}
1578
1579impl ChunkInstructions {
1580 fn schedule_instructions(
1586 rep_index: &MiniBlockRepIndex,
1587 user_ranges: &[Range<u64>],
1588 ) -> Vec<Self> {
1589 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1593
1594 for user_range in user_ranges {
1595 let mut rows_needed = user_range.end - user_range.start;
1596 let mut need_preamble = false;
1597
1598 let mut block_index = match rep_index
1601 .blocks
1602 .binary_search_by_key(&user_range.start, |block| block.first_row)
1603 {
1604 Ok(idx) => {
1605 let mut idx = idx;
1608 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1609 idx -= 1;
1610 }
1611 idx
1612 }
1613 Err(idx) => idx - 1,
1615 };
1616
1617 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1618
1619 while rows_needed > 0 || need_preamble {
1620 if block_index >= rep_index.blocks.len() {
1622 log::warn!(
1623 "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1624 );
1625 break;
1626 }
1627
1628 let chunk = &rep_index.blocks[block_index];
1629 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1630
1631 if rows_avail == 0 && to_skip == 0 {
1635 if chunk.has_preamble && need_preamble {
1637 chunk_instructions.push(Self {
1638 chunk_idx: block_index,
1639 preamble: PreambleAction::Take,
1640 rows_to_skip: 0,
1641 rows_to_take: 0,
1642 take_trailer: chunk.has_trailer,
1646 });
1647 if chunk.starts_including_trailer > 0
1651 || block_index == rep_index.blocks.len() - 1
1652 {
1653 need_preamble = false;
1654 }
1655 }
1656 block_index += 1;
1658 continue;
1659 }
1660
1661 if rows_avail == 0 && to_skip > 0 {
1665 to_skip -= chunk.starts_including_trailer;
1668 block_index += 1;
1669 continue;
1670 }
1671
1672 let rows_to_take = rows_avail.min(rows_needed);
1673 rows_needed -= rows_to_take;
1674
1675 let mut take_trailer = false;
1676 let preamble = if chunk.has_preamble {
1677 if need_preamble {
1678 PreambleAction::Take
1679 } else {
1680 PreambleAction::Skip
1681 }
1682 } else {
1683 PreambleAction::Absent
1684 };
1685
1686 if rows_to_take == rows_avail && chunk.has_trailer {
1688 take_trailer = true;
1689 need_preamble = true;
1690 } else {
1691 need_preamble = false;
1692 };
1693
1694 chunk_instructions.push(Self {
1695 preamble,
1696 chunk_idx: block_index,
1697 rows_to_skip: to_skip,
1698 rows_to_take,
1699 take_trailer,
1700 });
1701
1702 to_skip = 0;
1703 block_index += 1;
1704 }
1705 }
1706
1707 if user_ranges.len() > 1 {
1711 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1713 let mut instructions_iter = chunk_instructions.into_iter();
1714 merged_instructions.push(instructions_iter.next().unwrap());
1715 for instruction in instructions_iter {
1716 let last = merged_instructions.last_mut().unwrap();
1717 if last.chunk_idx == instruction.chunk_idx
1718 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1719 {
1720 last.rows_to_take += instruction.rows_to_take;
1721 last.take_trailer |= instruction.take_trailer;
1722 } else {
1723 merged_instructions.push(instruction);
1724 }
1725 }
1726 merged_instructions
1727 } else {
1728 chunk_instructions
1729 }
1730 }
1731
1732 fn drain_from_instruction(
1733 &self,
1734 rows_desired: &mut u64,
1735 need_preamble: &mut bool,
1736 skip_in_chunk: &mut u64,
1737 ) -> (ChunkDrainInstructions, bool) {
1738 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1740 let rows_avail = self.rows_to_take - *skip_in_chunk;
1741 let has_preamble = self.preamble != PreambleAction::Absent;
1742 let preamble_action = match (*need_preamble, has_preamble) {
1743 (true, true) => PreambleAction::Take,
1744 (true, false) => panic!("Need preamble but there isn't one"),
1745 (false, true) => PreambleAction::Skip,
1746 (false, false) => PreambleAction::Absent,
1747 };
1748
1749 let rows_taking = if *rows_desired >= rows_avail {
1752 *need_preamble = self.take_trailer;
1760 rows_avail
1761 } else {
1762 *need_preamble = false;
1765 *rows_desired
1766 };
1767 let rows_skipped = *skip_in_chunk;
1768
1769 let consumed_chunk = if *rows_desired >= rows_avail {
1771 *rows_desired -= rows_avail;
1772 *skip_in_chunk = 0;
1773 true
1774 } else {
1775 *skip_in_chunk += *rows_desired;
1776 *rows_desired = 0;
1777 false
1778 };
1779
1780 (
1781 ChunkDrainInstructions {
1782 chunk_instructions: self.clone(),
1783 rows_to_skip: rows_skipped,
1784 rows_to_take: rows_taking,
1785 preamble_action,
1786 },
1787 consumed_chunk,
1788 )
1789 }
1790}
1791
1792enum Words {
1793 U16(ScalarBuffer<u16>),
1794 U32(ScalarBuffer<u32>),
1795}
1796
1797struct WordsIter<'a> {
1798 iter: Box<dyn Iterator<Item = u32> + 'a>,
1799}
1800
1801impl Words {
1802 pub fn len(&self) -> usize {
1803 match self {
1804 Self::U16(b) => b.len(),
1805 Self::U32(b) => b.len(),
1806 }
1807 }
1808
1809 pub fn iter(&self) -> WordsIter<'_> {
1810 match self {
1811 Self::U16(buf) => WordsIter {
1812 iter: Box::new(buf.iter().map(|&x| x as u32)),
1813 },
1814 Self::U32(buf) => WordsIter {
1815 iter: Box::new(buf.iter().copied()),
1816 },
1817 }
1818 }
1819
1820 pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1821 let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1822 assert_eq!(bytes.len() % bytes_per_value, 0);
1823 let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1824 if has_large_chunk {
1825 Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1826 } else {
1827 Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1828 }
1829 }
1830}
1831
1832impl<'a> Iterator for WordsIter<'a> {
1833 type Item = u32;
1834
1835 fn next(&mut self) -> Option<Self::Item> {
1836 self.iter.next()
1837 }
1838}
1839
1840impl StructuralPageScheduler for MiniBlockScheduler {
1841 fn initialize<'a>(
1842 &'a mut self,
1843 io: &Arc<dyn EncodingsIo>,
1844 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1845 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1849 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1850 let mut bufs_needed = 1;
1851 if self.dictionary.is_some() {
1852 bufs_needed += 1;
1853 }
1854 if self.repetition_index_depth > 0 {
1855 bufs_needed += 1;
1856 }
1857 let mut required_ranges = Vec::with_capacity(bufs_needed);
1858 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1859 if let Some(ref dictionary) = self.dictionary {
1860 required_ranges.push(
1861 dictionary.dictionary_buf_position_and_size.0
1862 ..dictionary.dictionary_buf_position_and_size.0
1863 + dictionary.dictionary_buf_position_and_size.1,
1864 );
1865 }
1866 if self.repetition_index_depth > 0 {
1867 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1868 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1869 }
1870 let io_req = io.submit_request(required_ranges, 0);
1871
1872 async move {
1873 let mut buffers = io_req.await?.into_iter().fuse();
1874 let meta_bytes = buffers.next().unwrap();
1875 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1876 let rep_index_bytes = buffers.next();
1877
1878 let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1880 let mut chunk_meta = Vec::with_capacity(words.len());
1881
1882 let mut rows_counter = 0;
1883 let mut offset_bytes = value_buf_position;
1884 for (word_idx, word) in words.iter().enumerate() {
1885 let log_num_values = word & 0x0F;
1886 let divided_bytes = word >> 4;
1887 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1888 debug_assert!(num_bytes > 0);
1889 let num_values = if word_idx < words.len() - 1 {
1890 debug_assert!(log_num_values > 0);
1891 1 << log_num_values
1892 } else {
1893 debug_assert!(
1894 log_num_values == 0
1895 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1896 );
1897 self.items_in_page - rows_counter
1898 };
1899 rows_counter += num_values;
1900
1901 chunk_meta.push(ChunkMeta {
1902 num_values,
1903 chunk_size_bytes: num_bytes as u64,
1904 offset_bytes,
1905 });
1906 offset_bytes += num_bytes as u64;
1907 }
1908
1909 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1911 assert!(rep_index_data.len() % 8 == 0);
1912 let stride = self.repetition_index_depth as usize + 1;
1913 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1914 } else {
1915 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1916 };
1917
1918 let mut page_meta = MiniBlockCacheableState {
1919 chunk_meta,
1920 rep_index,
1921 dictionary: None,
1922 };
1923
1924 if let Some(ref mut dictionary) = self.dictionary {
1926 let dictionary_data = dictionary_bytes.unwrap();
1927 page_meta.dictionary =
1928 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1929 LanceBuffer::from_bytes(
1930 dictionary_data,
1931 dictionary.dictionary_data_alignment,
1932 ),
1933 dictionary.num_dictionary_items,
1934 )?));
1935 };
1936 let page_meta = Arc::new(page_meta);
1937 self.page_meta = Some(page_meta.clone());
1938 Ok(page_meta as Arc<dyn CachedPageData>)
1939 }
1940 .boxed()
1941 }
1942
1943 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1944 self.page_meta = Some(
1945 data.clone()
1946 .as_arc_any()
1947 .downcast::<MiniBlockCacheableState>()
1948 .unwrap(),
1949 );
1950 }
1951
1952 fn schedule_ranges(
1953 &self,
1954 ranges: &[Range<u64>],
1955 io: &Arc<dyn EncodingsIo>,
1956 ) -> Result<Vec<PageLoadTask>> {
1957 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1958
1959 let page_meta = self.page_meta.as_ref().unwrap();
1960
1961 let chunk_instructions =
1962 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1963
1964 debug_assert_eq!(
1965 num_rows,
1966 chunk_instructions
1967 .iter()
1968 .map(|ci| ci.rows_to_take)
1969 .sum::<u64>()
1970 );
1971
1972 let chunks_needed = chunk_instructions
1973 .iter()
1974 .map(|ci| ci.chunk_idx)
1975 .unique()
1976 .collect::<Vec<_>>();
1977
1978 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1979 let chunk_ranges = loaded_chunks
1980 .iter()
1981 .map(|c| c.byte_range.clone())
1982 .collect::<Vec<_>>();
1983 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1984
1985 let rep_decompressor = self.rep_decompressor.clone();
1986 let def_decompressor = self.def_decompressor.clone();
1987 let value_decompressor = self.value_decompressor.clone();
1988 let num_buffers = self.num_buffers;
1989 let has_large_chunk = self.has_large_chunk;
1990 let dictionary = page_meta
1991 .dictionary
1992 .as_ref()
1993 .map(|dictionary| dictionary.clone());
1994 let def_meaning = self.def_meaning.clone();
1995
1996 let res = async move {
1997 let loaded_chunk_data = loaded_chunk_data.await?;
1998 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1999 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
2000 }
2001
2002 Ok(Box::new(MiniBlockDecoder {
2003 rep_decompressor,
2004 def_decompressor,
2005 value_decompressor,
2006 def_meaning,
2007 loaded_chunks: VecDeque::from_iter(loaded_chunks),
2008 instructions: VecDeque::from(chunk_instructions),
2009 offset_in_current_chunk: 0,
2010 dictionary,
2011 num_rows,
2012 num_buffers,
2013 has_large_chunk,
2014 }) as Box<dyn StructuralPageDecoder>)
2015 }
2016 .boxed();
2017 let page_load_task = PageLoadTask {
2018 decoder_fut: res,
2019 num_rows,
2020 };
2021 Ok(vec![page_load_task])
2022 }
2023}
2024
2025#[derive(Debug, Clone, Copy)]
2026struct FullZipRepIndexDetails {
2027 buf_position: u64,
2028 bytes_per_value: u64, }
2030
2031#[derive(Debug)]
2032enum PerValueDecompressor {
2033 Fixed(Arc<dyn FixedPerValueDecompressor>),
2034 Variable(Arc<dyn VariablePerValueDecompressor>),
2035}
2036
2037#[derive(Debug)]
2038struct FullZipDecodeDetails {
2039 value_decompressor: PerValueDecompressor,
2040 def_meaning: Arc<[DefinitionInterpretation]>,
2041 ctrl_word_parser: ControlWordParser,
2042 max_rep: u16,
2043 max_visible_def: u16,
2044}
2045
2046#[derive(Debug, Clone)]
2058enum FullZipReadSource {
2059 Remote(Arc<dyn EncodingsIo>),
2061 PrefetchedPage { base_offset: u64, data: LanceBuffer },
2063}
2064
2065impl FullZipReadSource {
2066 fn fetch(
2070 &self,
2071 ranges: &[Range<u64>],
2072 priority: u64,
2073 ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2074 match self {
2075 Self::Remote(io) => {
2076 let io = io.clone();
2077 let ranges = ranges.to_vec();
2078 async move {
2079 let data = io.submit_request(ranges, priority).await?;
2080 Ok(data
2081 .into_iter()
2082 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2083 .collect::<VecDeque<_>>())
2084 }
2085 .boxed()
2086 }
2087 Self::PrefetchedPage { base_offset, data } => {
2088 let base_offset = *base_offset;
2089 let data = data.clone();
2090 let page_end = base_offset + data.len() as u64;
2091 std::future::ready(
2092 ranges
2093 .iter()
2094 .map(|range| {
2095 if range.start > range.end
2096 || range.start < base_offset
2097 || range.end > page_end
2098 {
2099 return Err(Error::internal(format!(
2100 "Requested range {:?} is outside page range {}..{}",
2101 range, base_offset, page_end
2102 )));
2103 }
2104 let start = (range.start - base_offset) as usize;
2105 let len = (range.end - range.start) as usize;
2106 Ok(data.slice_with_length(start, len))
2107 })
2108 .collect::<Result<VecDeque<_>>>(),
2109 )
2110 .boxed()
2111 }
2112 }
2113 }
2114}
2115
2116#[derive(Debug)]
2124pub struct FullZipScheduler {
2125 data_buf_position: u64,
2126 data_buf_size: u64,
2127 rep_index: Option<FullZipRepIndexDetails>,
2128 priority: u64,
2129 rows_in_page: u64,
2130 bits_per_offset: u8,
2131 details: Arc<FullZipDecodeDetails>,
2132 cached_state: Option<Arc<FullZipCacheableState>>,
2134 enable_cache: bool,
2136}
2137
2138impl FullZipScheduler {
2139 fn try_new(
2140 buffer_offsets_and_sizes: &[(u64, u64)],
2141 priority: u64,
2142 rows_in_page: u64,
2143 layout: &pb21::FullZipLayout,
2144 decompressors: &dyn DecompressionStrategy,
2145 ) -> Result<Self> {
2146 let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2147 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2148 let num_reps = rows_in_page + 1;
2149 let bytes_per_rep = len / num_reps;
2150 debug_assert_eq!(len % num_reps, 0);
2151 debug_assert!(
2152 bytes_per_rep == 1
2153 || bytes_per_rep == 2
2154 || bytes_per_rep == 4
2155 || bytes_per_rep == 8
2156 );
2157 FullZipRepIndexDetails {
2158 buf_position: *pos,
2159 bytes_per_value: bytes_per_rep,
2160 }
2161 });
2162
2163 let value_decompressor = match layout.details {
2164 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2165 let decompressor = decompressors.create_fixed_per_value_decompressor(
2166 layout.value_compression.as_ref().unwrap(),
2167 )?;
2168 PerValueDecompressor::Fixed(decompressor.into())
2169 }
2170 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2171 let decompressor = decompressors.create_variable_per_value_decompressor(
2172 layout.value_compression.as_ref().unwrap(),
2173 )?;
2174 PerValueDecompressor::Variable(decompressor.into())
2175 }
2176 None => {
2177 panic!("Full-zip layout must have a `details` field");
2178 }
2179 };
2180 let ctrl_word_parser = ControlWordParser::new(
2181 layout.bits_rep.try_into().unwrap(),
2182 layout.bits_def.try_into().unwrap(),
2183 );
2184 let def_meaning = layout
2185 .layers
2186 .iter()
2187 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2188 .collect::<Vec<_>>();
2189
2190 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2191 let max_visible_def = def_meaning
2192 .iter()
2193 .filter(|d| !d.is_list())
2194 .map(|d| d.num_def_levels())
2195 .sum();
2196
2197 let bits_per_offset = match layout.details {
2198 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2199 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2200 bits_per_offset as u8
2201 }
2202 None => panic!("Full-zip layout must have a `details` field"),
2203 };
2204
2205 let details = Arc::new(FullZipDecodeDetails {
2206 value_decompressor,
2207 def_meaning: def_meaning.into(),
2208 ctrl_word_parser,
2209 max_rep,
2210 max_visible_def,
2211 });
2212 Ok(Self {
2213 data_buf_position,
2214 data_buf_size,
2215 rep_index,
2216 details,
2217 priority,
2218 rows_in_page,
2219 bits_per_offset,
2220 cached_state: None,
2221 enable_cache: false,
2222 })
2223 }
2224
2225 fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2226 if ranges.is_empty() {
2227 return false;
2228 }
2229 let mut expected_start = 0;
2230 for range in ranges {
2231 if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2232 {
2233 return false;
2234 }
2235 expected_start = range.end;
2236 }
2237 expected_start == rows_in_page
2238 }
2239
2240 fn create_page_load_task(
2241 io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
2242 num_rows: u64,
2243 details: Arc<FullZipDecodeDetails>,
2244 bits_per_offset: u8,
2245 ) -> PageLoadTask {
2246 let load_task = async move {
2247 let buffers = io_future.await?;
2248 let data = buffers
2249 .into_iter()
2250 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2251 .collect::<VecDeque<_>>();
2252 Self::create_decoder(details, data, num_rows, bits_per_offset)
2253 }
2254 .boxed();
2255 PageLoadTask {
2256 decoder_fut: load_task,
2257 num_rows,
2258 }
2259 }
2260
2261 fn create_decoder(
2263 details: Arc<FullZipDecodeDetails>,
2264 data: VecDeque<LanceBuffer>,
2265 num_rows: u64,
2266 bits_per_offset: u8,
2267 ) -> Result<Box<dyn StructuralPageDecoder>> {
2268 match &details.value_decompressor {
2269 PerValueDecompressor::Fixed(decompressor) => {
2270 let bits_per_value = decompressor.bits_per_value();
2271 if bits_per_value % 8 != 0 {
2272 return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2273 }
2274 let bytes_per_value = bits_per_value / 8;
2275 let total_bytes_per_value =
2276 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2277 if total_bytes_per_value == 0 {
2278 return Err(lance_core::Error::internal(
2279 "Invalid encoding: per-row byte width must be greater than 0",
2280 ));
2281 }
2282 Ok(Box::new(FixedFullZipDecoder {
2283 details,
2284 data,
2285 num_rows,
2286 offset_in_current: 0,
2287 bytes_per_value: bytes_per_value as usize,
2288 total_bytes_per_value,
2289 }) as Box<dyn StructuralPageDecoder>)
2290 }
2291 PerValueDecompressor::Variable(_decompressor) => {
2292 Ok(Box::new(VariableFullZipDecoder::new(
2293 details,
2294 data,
2295 num_rows,
2296 bits_per_offset,
2297 bits_per_offset,
2298 )))
2299 }
2300 }
2301 }
2302
2303 fn extract_byte_ranges_from_pairs(
2306 buffer: LanceBuffer,
2307 bytes_per_value: u64,
2308 data_buf_position: u64,
2309 ) -> Vec<Range<u64>> {
2310 ByteUnpacker::new(buffer, bytes_per_value as usize)
2311 .chunks(2)
2312 .into_iter()
2313 .map(|mut c| {
2314 let start = c.next().unwrap() + data_buf_position;
2315 let end = c.next().unwrap() + data_buf_position;
2316 start..end
2317 })
2318 .collect::<Vec<_>>()
2319 }
2320
2321 fn extract_byte_ranges_from_cached(
2324 buffer: &LanceBuffer,
2325 ranges: &[Range<u64>],
2326 bytes_per_value: u64,
2327 data_buf_position: u64,
2328 ) -> Vec<Range<u64>> {
2329 ranges
2330 .iter()
2331 .map(|r| {
2332 let start_offset = (r.start * bytes_per_value) as usize;
2333 let end_offset = (r.end * bytes_per_value) as usize;
2334
2335 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2336 let start_val =
2337 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2338 .next()
2339 .unwrap();
2340
2341 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2342 let end_val =
2343 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2344 .next()
2345 .unwrap();
2346
2347 (data_buf_position + start_val)..(data_buf_position + end_val)
2348 })
2349 .collect()
2350 }
2351
2352 fn compute_rep_index_ranges(
2354 ranges: &[Range<u64>],
2355 rep_index: &FullZipRepIndexDetails,
2356 ) -> Vec<Range<u64>> {
2357 ranges
2358 .iter()
2359 .flat_map(|r| {
2360 let first_val_start =
2361 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2362 let first_val_end = first_val_start + rep_index.bytes_per_value;
2363 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2364 let last_val_end = last_val_start + rep_index.bytes_per_value;
2365 [first_val_start..first_val_end, last_val_start..last_val_end]
2366 })
2367 .collect()
2368 }
2369
2370 fn schedule_ranges_rep(
2372 &self,
2373 ranges: &[Range<u64>],
2374 io: &Arc<dyn EncodingsIo>,
2375 rep_index: FullZipRepIndexDetails,
2376 ) -> Result<Vec<PageLoadTask>> {
2377 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2378 let data_buf_position = self.data_buf_position;
2379 let priority = self.priority;
2380 let details = self.details.clone();
2381 let bits_per_offset = self.bits_per_offset;
2382
2383 if Self::covers_entire_page(ranges, self.rows_in_page) {
2384 let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2385 let page_data = io.submit_single(full_range.clone(), priority);
2386 let load_task = async move {
2387 let page_data = page_data.await?;
2388 let source = FullZipReadSource::PrefetchedPage {
2389 base_offset: full_range.start,
2390 data: LanceBuffer::from_bytes(page_data, 1),
2391 };
2392 let read_ranges = vec![full_range];
2393 let data = source.fetch(&read_ranges, priority).await?;
2394 Self::create_decoder(details, data, num_rows, bits_per_offset)
2395 }
2396 .boxed();
2397 let page_load_task = PageLoadTask {
2398 decoder_fut: load_task,
2399 num_rows,
2400 };
2401 return Ok(vec![page_load_task]);
2402 }
2403
2404 if let Some(cached_state) = &self.cached_state {
2405 let byte_ranges = Self::extract_byte_ranges_from_cached(
2406 &cached_state.rep_index_buffer,
2407 ranges,
2408 rep_index.bytes_per_value,
2409 data_buf_position,
2410 );
2411 let io_future = io.submit_request(byte_ranges, priority);
2412 let page_load_task =
2413 Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
2414 return Ok(vec![page_load_task]);
2415 }
2416
2417 let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2418 let rep_data = io.submit_request(rep_ranges, priority);
2419 let io_clone = io.clone();
2420 let load_task = async move {
2421 let rep_data = rep_data.await?;
2422 let rep_buffer = LanceBuffer::concat(
2423 &rep_data
2424 .into_iter()
2425 .map(|d| LanceBuffer::from_bytes(d, 1))
2426 .collect::<Vec<_>>(),
2427 );
2428 let byte_ranges = Self::extract_byte_ranges_from_pairs(
2429 rep_buffer,
2430 rep_index.bytes_per_value,
2431 data_buf_position,
2432 );
2433 let source = FullZipReadSource::Remote(io_clone);
2434 let data = source.fetch(&byte_ranges, priority).await?;
2435 Self::create_decoder(details, data, num_rows, bits_per_offset)
2436 }
2437 .boxed();
2438 let page_load_task = PageLoadTask {
2439 decoder_fut: load_task,
2440 num_rows,
2441 };
2442 Ok(vec![page_load_task])
2443 }
2444
2445 fn schedule_ranges_simple(
2449 &self,
2450 ranges: &[Range<u64>],
2451 io: &Arc<dyn EncodingsIo>,
2452 ) -> Result<Vec<PageLoadTask>> {
2453 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2455
2456 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2457 unreachable!()
2458 };
2459
2460 let bits_per_value = decompressor.bits_per_value();
2462 assert_eq!(bits_per_value % 8, 0);
2463 let bytes_per_value = bits_per_value / 8;
2464 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2465 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2466 let byte_ranges = ranges
2467 .iter()
2468 .map(|r| {
2469 debug_assert!(r.end <= self.rows_in_page);
2470 let start = self.data_buf_position + r.start * total_bytes_per_value;
2471 let end = self.data_buf_position + r.end * total_bytes_per_value;
2472 start..end
2473 })
2474 .collect::<Vec<_>>();
2475
2476 let io_future = io.submit_request(byte_ranges, self.priority);
2477 let page_load_task = Self::create_page_load_task(
2478 io_future,
2479 num_rows,
2480 self.details.clone(),
2481 self.bits_per_offset,
2482 );
2483 Ok(vec![page_load_task])
2484 }
2485}
2486
2487#[derive(Debug)]
2489struct FullZipCacheableState {
2490 rep_index_buffer: LanceBuffer,
2492}
2493
2494impl DeepSizeOf for FullZipCacheableState {
2495 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2496 self.rep_index_buffer.len()
2497 }
2498}
2499
2500impl CachedPageData for FullZipCacheableState {
2501 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2502 self
2503 }
2504}
2505
2506impl StructuralPageScheduler for FullZipScheduler {
2507 fn initialize<'a>(
2508 &'a mut self,
2509 io: &Arc<dyn EncodingsIo>,
2510 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2511 if self.enable_cache
2512 && let Some(rep_index) = self.rep_index
2513 {
2514 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2515 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2516 let io_clone = io.clone();
2517 return async move {
2518 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2519 let state = Arc::new(FullZipCacheableState {
2520 rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2521 });
2522 self.cached_state = Some(state.clone());
2523 Ok(state as Arc<dyn CachedPageData>)
2524 }
2525 .boxed();
2526 }
2527 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2528 }
2529
2530 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2534 if let Ok(cached_state) = cache
2536 .clone()
2537 .as_arc_any()
2538 .downcast::<FullZipCacheableState>()
2539 {
2540 self.cached_state = Some(cached_state);
2542 }
2543 }
2544
2545 fn schedule_ranges(
2546 &self,
2547 ranges: &[Range<u64>],
2548 io: &Arc<dyn EncodingsIo>,
2549 ) -> Result<Vec<PageLoadTask>> {
2550 if let Some(rep_index) = self.rep_index {
2551 self.schedule_ranges_rep(ranges, io, rep_index)
2552 } else {
2553 self.schedule_ranges_simple(ranges, io)
2554 }
2555 }
2556}
2557
2558#[derive(Debug)]
2566struct FixedFullZipDecoder {
2567 details: Arc<FullZipDecodeDetails>,
2568 data: VecDeque<LanceBuffer>,
2569 offset_in_current: usize,
2570 bytes_per_value: usize,
2571 total_bytes_per_value: usize,
2572 num_rows: u64,
2573}
2574
2575impl FixedFullZipDecoder {
2576 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2577 debug_assert!(num_rows > 0);
2578 let cur_buf = self.data.front_mut().unwrap();
2579 let start = self.offset_in_current;
2580 if self.details.ctrl_word_parser.has_rep() {
2581 let mut rows_started = 0;
2584 let mut num_items = 0;
2587 while self.offset_in_current < cur_buf.len() {
2588 let control = self.details.ctrl_word_parser.parse_desc(
2589 &cur_buf[self.offset_in_current..],
2590 self.details.max_rep,
2591 self.details.max_visible_def,
2592 );
2593 if control.is_new_row {
2594 if rows_started == num_rows {
2595 break;
2596 }
2597 rows_started += 1;
2598 }
2599 num_items += 1;
2600 if control.is_visible {
2601 self.offset_in_current += self.total_bytes_per_value;
2602 } else {
2603 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2604 }
2605 }
2606
2607 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2608 if self.offset_in_current == cur_buf.len() {
2609 self.data.pop_front();
2610 self.offset_in_current = 0;
2611 }
2612
2613 FullZipDecodeTaskItem {
2614 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2615 data: task_slice,
2616 bits_per_value: self.bytes_per_value as u64 * 8,
2617 num_values: num_items,
2618 block_info: BlockInfo::new(),
2619 }),
2620 rows_in_buf: rows_started,
2621 }
2622 } else {
2623 let cur_buf = self.data.front_mut().unwrap();
2626 let bytes_avail = cur_buf.len() - self.offset_in_current;
2627 let offset_in_cur = self.offset_in_current;
2628
2629 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2630 let mut rows_taken = num_rows;
2631 let task_slice = if bytes_needed >= bytes_avail {
2632 self.offset_in_current = 0;
2633 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2634 self.data
2635 .pop_front()
2636 .unwrap()
2637 .slice_with_length(offset_in_cur, bytes_avail)
2638 } else {
2639 self.offset_in_current += bytes_needed;
2640 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2641 };
2642 FullZipDecodeTaskItem {
2643 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2644 data: task_slice,
2645 bits_per_value: self.bytes_per_value as u64 * 8,
2646 num_values: rows_taken,
2647 block_info: BlockInfo::new(),
2648 }),
2649 rows_in_buf: rows_taken,
2650 }
2651 }
2652 }
2653}
2654
2655impl StructuralPageDecoder for FixedFullZipDecoder {
2656 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2657 let mut task_data = Vec::with_capacity(self.data.len());
2658 let mut remaining = num_rows;
2659 while remaining > 0 {
2660 let task_item = self.slice_next_task(remaining);
2661 remaining -= task_item.rows_in_buf;
2662 task_data.push(task_item);
2663 }
2664 Ok(Box::new(FixedFullZipDecodeTask {
2665 details: self.details.clone(),
2666 data: task_data,
2667 bytes_per_value: self.bytes_per_value,
2668 num_rows: num_rows as usize,
2669 }))
2670 }
2671
2672 fn num_rows(&self) -> u64 {
2673 self.num_rows
2674 }
2675}
2676
2677#[derive(Debug)]
2682struct VariableFullZipDecoder {
2683 details: Arc<FullZipDecodeDetails>,
2684 decompressor: Arc<dyn VariablePerValueDecompressor>,
2685 data: LanceBuffer,
2686 offsets: LanceBuffer,
2687 rep: ScalarBuffer<u16>,
2688 def: ScalarBuffer<u16>,
2689 repdef_starts: Vec<usize>,
2690 data_starts: Vec<usize>,
2691 offset_starts: Vec<usize>,
2692 visible_item_counts: Vec<u64>,
2693 bits_per_offset: u8,
2694 current_idx: usize,
2695 num_rows: u64,
2696}
2697
2698impl VariableFullZipDecoder {
2699 fn new(
2700 details: Arc<FullZipDecodeDetails>,
2701 data: VecDeque<LanceBuffer>,
2702 num_rows: u64,
2703 in_bits_per_length: u8,
2704 out_bits_per_offset: u8,
2705 ) -> Self {
2706 let decompressor = match details.value_decompressor {
2707 PerValueDecompressor::Variable(ref d) => d.clone(),
2708 _ => unreachable!(),
2709 };
2710
2711 assert_eq!(in_bits_per_length % 8, 0);
2712 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2713
2714 let mut decoder = Self {
2715 details,
2716 decompressor,
2717 data: LanceBuffer::empty(),
2718 offsets: LanceBuffer::empty(),
2719 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2720 def: LanceBuffer::empty().borrow_to_typed_slice(),
2721 bits_per_offset: out_bits_per_offset,
2722 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2723 data_starts: Vec::with_capacity(num_rows as usize + 1),
2724 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2725 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2726 current_idx: 0,
2727 num_rows,
2728 };
2729
2730 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2751
2752 decoder
2753 }
2754
2755 fn slice_batch_data_and_rebase_offsets_typed<T>(
2756 data: &LanceBuffer,
2757 offsets: &LanceBuffer,
2758 ) -> Result<(LanceBuffer, LanceBuffer)>
2759 where
2760 T: arrow_buffer::ArrowNativeType
2761 + Copy
2762 + PartialOrd
2763 + std::ops::Sub<Output = T>
2764 + std::fmt::Display
2765 + TryInto<usize>,
2766 {
2767 let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2768 let offsets_slice = offsets_slice.as_ref();
2769 if offsets_slice.is_empty() {
2770 return Err(Error::internal(
2771 "Variable offsets cannot be empty".to_string(),
2772 ));
2773 }
2774
2775 let base = offsets_slice[0];
2776 let end = *offsets_slice.last().unwrap();
2777 if end < base {
2778 return Err(Error::internal(format!(
2779 "Invalid variable offsets: end ({end}) is less than base ({base})"
2780 )));
2781 }
2782
2783 let data_start = base.try_into().map_err(|_| {
2784 Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2785 })?;
2786 let data_end = end.try_into().map_err(|_| {
2787 Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2788 })?;
2789 if data_end > data.len() {
2790 return Err(Error::internal(format!(
2791 "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2792 data.len()
2793 )));
2794 }
2795
2796 let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2797 for &offset in offsets_slice {
2798 if offset < base {
2799 return Err(Error::internal(format!(
2800 "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2801 )));
2802 }
2803 rebased_offsets.push(offset - base);
2804 }
2805
2806 let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2807 let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2809 let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2810 Ok((sliced_data, rebased_offsets))
2811 }
2812
2813 fn slice_batch_data_and_rebase_offsets(
2814 data: &LanceBuffer,
2815 offsets: &LanceBuffer,
2816 bits_per_offset: u8,
2817 ) -> Result<(LanceBuffer, LanceBuffer)> {
2818 match bits_per_offset {
2819 32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2820 64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2821 _ => Err(Error::internal(format!(
2822 "Unsupported bits_per_offset={bits_per_offset}"
2823 ))),
2824 }
2825 }
2826
2827 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2828 match bits_per_offset {
2829 8 => *data.get_unchecked(0) as u64,
2830 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2831 32 => u32::from_le_bytes([
2832 *data.get_unchecked(0),
2833 *data.get_unchecked(1),
2834 *data.get_unchecked(2),
2835 *data.get_unchecked(3),
2836 ]) as u64,
2837 64 => u64::from_le_bytes([
2838 *data.get_unchecked(0),
2839 *data.get_unchecked(1),
2840 *data.get_unchecked(2),
2841 *data.get_unchecked(3),
2842 *data.get_unchecked(4),
2843 *data.get_unchecked(5),
2844 *data.get_unchecked(6),
2845 *data.get_unchecked(7),
2846 ]),
2847 _ => unreachable!(),
2848 }
2849 }
2850
2851 fn unzip(
2852 &mut self,
2853 data: VecDeque<LanceBuffer>,
2854 in_bits_per_length: u8,
2855 out_bits_per_offset: u8,
2856 num_rows: u64,
2857 ) {
2858 let mut rep = Vec::with_capacity(num_rows as usize);
2860 let mut def = Vec::with_capacity(num_rows as usize);
2861 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2862
2863 let bytes_per_offset = out_bits_per_offset as usize / 8;
2866 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2867 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2868
2869 let bytes_per_length = in_bits_per_length as usize / 8;
2870 let bytes_lengths = bytes_per_length * num_rows as usize;
2871
2872 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2873 let mut unzipped_data =
2876 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2877
2878 let mut current_offset = 0_u64;
2879 let mut visible_item_count = 0_u64;
2880 for databuf in data.into_iter() {
2881 let mut databuf = databuf.as_ref();
2882 while !databuf.is_empty() {
2883 let data_start = unzipped_data.len();
2884 let offset_start = offsets_data.len();
2885 let repdef_start = rep.len().max(def.len());
2888 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2890 databuf,
2891 self.details.max_rep,
2892 self.details.max_visible_def,
2893 );
2894 self.details
2895 .ctrl_word_parser
2896 .parse(databuf, &mut rep, &mut def);
2897 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2898
2899 if ctrl_desc.is_new_row {
2900 self.repdef_starts.push(repdef_start);
2901 self.data_starts.push(data_start);
2902 self.offset_starts.push(offset_start);
2903 self.visible_item_counts.push(visible_item_count);
2904 }
2905 if ctrl_desc.is_visible {
2906 visible_item_count += 1;
2907 if ctrl_desc.is_valid_item {
2908 debug_assert!(databuf.len() >= bytes_per_length);
2910 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2911 match out_bits_per_offset {
2912 32 => offsets_data
2913 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2914 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2915 _ => unreachable!(),
2916 };
2917 databuf = &databuf[bytes_per_offset..];
2918 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2919 databuf = &databuf[length as usize..];
2920 current_offset += length;
2921 } else {
2922 match out_bits_per_offset {
2924 32 => offsets_data
2925 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2926 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2927 _ => unreachable!(),
2928 }
2929 }
2930 }
2931 }
2932 }
2933 self.repdef_starts.push(rep.len().max(def.len()));
2934 self.data_starts.push(unzipped_data.len());
2935 self.offset_starts.push(offsets_data.len());
2936 self.visible_item_counts.push(visible_item_count);
2937 match out_bits_per_offset {
2938 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2939 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2940 _ => unreachable!(),
2941 };
2942 self.rep = ScalarBuffer::from(rep);
2943 self.def = ScalarBuffer::from(def);
2944 self.data = LanceBuffer::from(unzipped_data);
2945 self.offsets = LanceBuffer::from(offsets_data);
2946 }
2947}
2948
2949impl StructuralPageDecoder for VariableFullZipDecoder {
2950 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2951 let start = self.current_idx;
2952 let end = start + num_rows as usize;
2953
2954 let offset_start = self.offset_starts[start];
2955 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2956 let offsets = self
2957 .offsets
2958 .slice_with_length(offset_start, offset_end - offset_start);
2959 let (data, offsets) =
2961 Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2962
2963 let repdef_start = self.repdef_starts[start];
2964 let repdef_end = self.repdef_starts[end];
2965 let rep = if self.rep.is_empty() {
2966 self.rep.clone()
2967 } else {
2968 self.rep.slice(repdef_start, repdef_end - repdef_start)
2969 };
2970 let def = if self.def.is_empty() {
2971 self.def.clone()
2972 } else {
2973 self.def.slice(repdef_start, repdef_end - repdef_start)
2974 };
2975
2976 let visible_item_counts_start = self.visible_item_counts[start];
2977 let visible_item_counts_end = self.visible_item_counts[end];
2978 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2979
2980 self.current_idx += num_rows as usize;
2981
2982 Ok(Box::new(VariableFullZipDecodeTask {
2983 details: self.details.clone(),
2984 decompressor: self.decompressor.clone(),
2985 data,
2986 offsets,
2987 bits_per_offset: self.bits_per_offset,
2988 num_visible_items,
2989 rep,
2990 def,
2991 }))
2992 }
2993
2994 fn num_rows(&self) -> u64 {
2995 self.num_rows
2996 }
2997}
2998
2999#[derive(Debug)]
3000struct VariableFullZipDecodeTask {
3001 details: Arc<FullZipDecodeDetails>,
3002 decompressor: Arc<dyn VariablePerValueDecompressor>,
3003 data: LanceBuffer,
3004 offsets: LanceBuffer,
3005 bits_per_offset: u8,
3006 num_visible_items: u64,
3007 rep: ScalarBuffer<u16>,
3008 def: ScalarBuffer<u16>,
3009}
3010
3011impl DecodePageTask for VariableFullZipDecodeTask {
3012 fn decode(self: Box<Self>) -> Result<DecodedPage> {
3013 let block = VariableWidthBlock {
3014 data: self.data,
3015 offsets: self.offsets,
3016 bits_per_offset: self.bits_per_offset,
3017 num_values: self.num_visible_items,
3018 block_info: BlockInfo::new(),
3019 };
3020 let decomopressed = self.decompressor.decompress(block)?;
3021 let rep = if self.rep.is_empty() {
3022 None
3023 } else {
3024 Some(self.rep.to_vec())
3025 };
3026 let def = if self.def.is_empty() {
3027 None
3028 } else {
3029 Some(self.def.to_vec())
3030 };
3031 let unraveler = RepDefUnraveler::new(
3032 rep,
3033 def,
3034 self.details.def_meaning.clone(),
3035 self.num_visible_items,
3036 );
3037 Ok(DecodedPage {
3038 data: decomopressed,
3039 repdef: unraveler,
3040 })
3041 }
3042}
3043
3044#[derive(Debug)]
3045struct FullZipDecodeTaskItem {
3046 data: PerValueDataBlock,
3047 rows_in_buf: u64,
3048}
3049
3050#[derive(Debug)]
3053struct FixedFullZipDecodeTask {
3054 details: Arc<FullZipDecodeDetails>,
3055 data: Vec<FullZipDecodeTaskItem>,
3056 num_rows: usize,
3057 bytes_per_value: usize,
3058}
3059
3060impl DecodePageTask for FixedFullZipDecodeTask {
3061 fn decode(self: Box<Self>) -> Result<DecodedPage> {
3062 let estimated_size_bytes = self
3064 .data
3065 .iter()
3066 .map(|task_item| task_item.data.data_size() as usize)
3067 .sum::<usize>()
3068 * 2;
3069 let mut data_builder =
3070 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
3071
3072 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3073 for task_item in self.data.into_iter() {
3077 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3078 unreachable!()
3079 };
3080 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3081 else {
3082 unreachable!()
3083 };
3084 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3085 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3086 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3087 }
3088
3089 let unraveler = RepDefUnraveler::new(
3090 None,
3091 None,
3092 self.details.def_meaning.clone(),
3093 self.num_rows as u64,
3094 );
3095
3096 Ok(DecodedPage {
3097 data: data_builder.finish(),
3098 repdef: unraveler,
3099 })
3100 } else {
3101 let mut rep = Vec::with_capacity(self.num_rows);
3103 let mut def = Vec::with_capacity(self.num_rows);
3104
3105 for task_item in self.data.into_iter() {
3106 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3107 unreachable!()
3108 };
3109 let mut buf_slice = fixed_data.data.as_ref();
3110 let num_values = fixed_data.num_values as usize;
3111 let mut values = Vec::with_capacity(
3114 fixed_data.data.len()
3115 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3116 );
3117 let mut visible_items = 0;
3118 for _ in 0..num_values {
3119 self.details
3121 .ctrl_word_parser
3122 .parse(buf_slice, &mut rep, &mut def);
3123 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3124
3125 let is_visible = def
3126 .last()
3127 .map(|d| *d <= self.details.max_visible_def)
3128 .unwrap_or(true);
3129 if is_visible {
3130 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3132 buf_slice = &buf_slice[self.bytes_per_value..];
3133 visible_items += 1;
3134 }
3135 }
3136
3137 let values_buf = LanceBuffer::from(values);
3139 let fixed_data = FixedWidthDataBlock {
3140 bits_per_value: self.bytes_per_value as u64 * 8,
3141 block_info: BlockInfo::new(),
3142 data: values_buf,
3143 num_values: visible_items,
3144 };
3145 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3146 else {
3147 unreachable!()
3148 };
3149 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3150 data_builder.append(&decompressed, 0..visible_items);
3151 }
3152
3153 let repetition = if rep.is_empty() { None } else { Some(rep) };
3154 let definition = if def.is_empty() { None } else { Some(def) };
3155
3156 let unraveler = RepDefUnraveler::new(
3157 repetition,
3158 definition,
3159 self.details.def_meaning.clone(),
3160 self.num_rows as u64,
3161 );
3162 let data = data_builder.finish();
3163
3164 Ok(DecodedPage {
3165 data,
3166 repdef: unraveler,
3167 })
3168 }
3169 }
3170}
3171
3172#[derive(Debug)]
3173struct StructuralPrimitiveFieldSchedulingJob<'a> {
3174 scheduler: &'a StructuralPrimitiveFieldScheduler,
3175 ranges: Vec<Range<u64>>,
3176 page_idx: usize,
3177 range_idx: usize,
3178 global_row_offset: u64,
3179}
3180
3181impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3182 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3183 Self {
3184 scheduler,
3185 ranges,
3186 page_idx: 0,
3187 range_idx: 0,
3188 global_row_offset: 0,
3189 }
3190 }
3191}
3192
3193impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3194 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3195 if self.range_idx >= self.ranges.len() {
3196 return Ok(Vec::new());
3197 }
3198 let mut range = self.ranges[self.range_idx].clone();
3200 let priority = range.start;
3201
3202 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3203 trace!(
3204 "Current range is {:?} and current page has {} rows",
3205 range, cur_page.num_rows
3206 );
3207 while cur_page.num_rows + self.global_row_offset <= range.start {
3209 self.global_row_offset += cur_page.num_rows;
3210 self.page_idx += 1;
3211 trace!("Skipping entire page of {} rows", cur_page.num_rows);
3212 cur_page = &self.scheduler.page_schedulers[self.page_idx];
3213 }
3214
3215 let mut ranges_in_page = Vec::new();
3219 while cur_page.num_rows + self.global_row_offset > range.start {
3220 range.start = range.start.max(self.global_row_offset);
3221 let start_in_page = range.start - self.global_row_offset;
3222 let end_in_page = start_in_page + (range.end - range.start);
3223 let end_in_page = end_in_page.min(cur_page.num_rows);
3224 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3225
3226 ranges_in_page.push(start_in_page..end_in_page);
3227 if last_in_range {
3228 self.range_idx += 1;
3229 if self.range_idx == self.ranges.len() {
3230 break;
3231 }
3232 range = self.ranges[self.range_idx].clone();
3233 } else {
3234 break;
3235 }
3236 }
3237
3238 trace!(
3239 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3240 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3241 ranges_in_page.len(),
3242 cur_page.num_rows,
3243 priority,
3244 self.scheduler.column_index,
3245 cur_page.page_index,
3246 );
3247
3248 self.global_row_offset += cur_page.num_rows;
3249 self.page_idx += 1;
3250
3251 let page_decoders = cur_page
3252 .scheduler
3253 .schedule_ranges(&ranges_in_page, context.io())?;
3254
3255 let cur_path = context.current_path();
3256 page_decoders
3257 .into_iter()
3258 .map(|page_load_task| {
3259 let cur_path = cur_path.clone();
3260 let page_decoder = page_load_task.decoder_fut;
3261 let unloaded_page = async move {
3262 let page_decoder = page_decoder.await?;
3263 Ok(LoadedPageShard {
3264 decoder: page_decoder,
3265 path: cur_path,
3266 })
3267 }
3268 .boxed();
3269 Ok(ScheduledScanLine {
3270 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3271 rows_scheduled: page_load_task.num_rows,
3272 })
3273 })
3274 .collect::<Result<Vec<_>>>()
3275 }
3276}
3277
3278#[derive(Debug)]
3279struct PageInfoAndScheduler {
3280 page_index: usize,
3281 num_rows: u64,
3282 scheduler: Box<dyn StructuralPageScheduler>,
3283}
3284
3285#[derive(Debug)]
3290pub struct StructuralPrimitiveFieldScheduler {
3291 page_schedulers: Vec<PageInfoAndScheduler>,
3292 column_index: u32,
3293 view_tag: String,
3299}
3300
3301impl StructuralPrimitiveFieldScheduler {
3302 pub fn try_new(
3303 column_info: &ColumnInfo,
3304 decompressors: &dyn DecompressionStrategy,
3305 cache_repetition_index: bool,
3306 target_field: &Field,
3307 ) -> Result<Self> {
3308 let page_schedulers = column_info
3309 .page_infos
3310 .iter()
3311 .enumerate()
3312 .map(|(page_index, page_info)| {
3313 Self::page_info_to_scheduler(
3314 page_info,
3315 page_index,
3316 decompressors,
3317 cache_repetition_index,
3318 target_field,
3319 )
3320 })
3321 .collect::<Result<Vec<_>>>()?;
3322 Ok(Self {
3323 page_schedulers,
3324 column_index: column_info.index,
3325 view_tag: format!("{:?}", target_field.data_type()),
3326 })
3327 }
3328
3329 fn page_layout_to_scheduler(
3330 page_info: &PageInfo,
3331 page_layout: &PageLayout,
3332 decompressors: &dyn DecompressionStrategy,
3333 cache_repetition_index: bool,
3334 target_field: &Field,
3335 ) -> Result<Box<dyn StructuralPageScheduler>> {
3336 use pb21::page_layout::Layout;
3337 Ok(match page_layout.layout.as_ref().expect_ok()? {
3338 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3339 &page_info.buffer_offsets_and_sizes,
3340 page_info.priority,
3341 mini_block.num_items,
3342 mini_block,
3343 decompressors,
3344 )?),
3345 Layout::FullZipLayout(full_zip) => {
3346 let mut scheduler = FullZipScheduler::try_new(
3347 &page_info.buffer_offsets_and_sizes,
3348 page_info.priority,
3349 page_info.num_rows,
3350 full_zip,
3351 decompressors,
3352 )?;
3353 scheduler.enable_cache = cache_repetition_index;
3354 Box::new(scheduler)
3355 }
3356 Layout::ConstantLayout(constant_layout) => {
3357 let def_meaning = constant_layout
3358 .layers
3359 .iter()
3360 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3361 .collect::<Vec<_>>();
3362 let has_scalar_value = constant_layout.inline_value.is_some()
3363 || page_info.buffer_offsets_and_sizes.len() == 1
3364 || page_info.buffer_offsets_and_sizes.len() == 3;
3365 if has_scalar_value {
3366 Box::new(constant::ConstantPageScheduler::try_new(
3367 page_info.buffer_offsets_and_sizes.clone(),
3368 constant_layout.inline_value.clone(),
3369 target_field.data_type(),
3370 def_meaning.into(),
3371 )?) as Box<dyn StructuralPageScheduler>
3372 } else if def_meaning.len() == 1
3373 && def_meaning[0] == DefinitionInterpretation::NullableItem
3374 {
3375 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3376 } else {
3377 let rep_decompressor = constant_layout
3378 .rep_compression
3379 .as_ref()
3380 .map(|encoding| decompressors.create_block_decompressor(encoding))
3381 .transpose()?
3382 .map(Arc::from);
3383
3384 let def_decompressor = constant_layout
3385 .def_compression
3386 .as_ref()
3387 .map(|encoding| decompressors.create_block_decompressor(encoding))
3388 .transpose()?
3389 .map(Arc::from);
3390
3391 Box::new(ComplexAllNullScheduler::new(
3392 page_info.buffer_offsets_and_sizes.clone(),
3393 def_meaning.into(),
3394 rep_decompressor,
3395 def_decompressor,
3396 constant_layout.num_rep_values,
3397 constant_layout.num_def_values,
3398 )) as Box<dyn StructuralPageScheduler>
3399 }
3400 }
3401 Layout::BlobLayout(blob) => {
3402 let inner_scheduler = Self::page_layout_to_scheduler(
3403 page_info,
3404 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3405 decompressors,
3406 cache_repetition_index,
3407 target_field,
3408 )?;
3409 let def_meaning = blob
3410 .layers
3411 .iter()
3412 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3413 .collect::<Vec<_>>();
3414 if matches!(target_field.data_type(), DataType::Struct(_)) {
3415 Box::new(BlobDescriptionPageScheduler::new(
3417 inner_scheduler,
3418 def_meaning.into(),
3419 ))
3420 } else {
3421 Box::new(BlobPageScheduler::new(
3423 inner_scheduler,
3424 page_info.priority,
3425 page_info.num_rows,
3426 def_meaning.into(),
3427 ))
3428 }
3429 }
3430 })
3431 }
3432
3433 fn page_info_to_scheduler(
3434 page_info: &PageInfo,
3435 page_index: usize,
3436 decompressors: &dyn DecompressionStrategy,
3437 cache_repetition_index: bool,
3438 target_field: &Field,
3439 ) -> Result<PageInfoAndScheduler> {
3440 let page_layout = page_info.encoding.as_structural();
3441 let scheduler = Self::page_layout_to_scheduler(
3442 page_info,
3443 page_layout,
3444 decompressors,
3445 cache_repetition_index,
3446 target_field,
3447 )?;
3448 Ok(PageInfoAndScheduler {
3449 page_index,
3450 num_rows: page_info.num_rows,
3451 scheduler,
3452 })
3453 }
3454}
3455
3456pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3457 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3458}
3459
3460pub struct NoCachedPageData;
3461
3462impl DeepSizeOf for NoCachedPageData {
3463 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3464 0
3465 }
3466}
3467impl CachedPageData for NoCachedPageData {
3468 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3469 self
3470 }
3471}
3472
3473pub struct CachedFieldData {
3474 pages: Vec<Arc<dyn CachedPageData>>,
3475}
3476
3477impl DeepSizeOf for CachedFieldData {
3478 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3479 self.pages.deep_size_of_children(ctx)
3480 }
3481}
3482
3483#[derive(Debug, Clone)]
3493pub struct FieldDataCacheKey {
3494 pub column_index: u32,
3495 pub view_tag: String,
3496}
3497
3498impl CacheKey for FieldDataCacheKey {
3499 type ValueType = CachedFieldData;
3500
3501 fn key(&self) -> std::borrow::Cow<'_, str> {
3502 format!("{}:{}", self.column_index, self.view_tag).into()
3503 }
3504
3505 fn type_name() -> &'static str {
3506 "FieldData"
3507 }
3508}
3509
3510impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3511 fn initialize<'a>(
3512 &'a mut self,
3513 _filter: &'a FilterExpression,
3514 context: &'a SchedulerContext,
3515 ) -> BoxFuture<'a, Result<()>> {
3516 let cache_key = FieldDataCacheKey {
3517 column_index: self.column_index,
3518 view_tag: self.view_tag.clone(),
3519 };
3520 let cache = context.cache().clone();
3521
3522 async move {
3523 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3524 self.page_schedulers
3525 .iter_mut()
3526 .zip(cached_data.pages.iter())
3527 .for_each(|(page_scheduler, cached_data)| {
3528 page_scheduler.scheduler.load(cached_data);
3529 });
3530 return Ok(());
3531 }
3532
3533 let page_data = self
3534 .page_schedulers
3535 .iter_mut()
3536 .map(|s| s.scheduler.initialize(context.io()))
3537 .collect::<FuturesOrdered<_>>();
3538
3539 let page_data = page_data.try_collect::<Vec<_>>().await?;
3540 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3541 cache.insert_with_key(&cache_key, cached_data).await;
3542 Ok(())
3543 }
3544 .boxed()
3545 }
3546
3547 fn schedule_ranges<'a>(
3548 &'a self,
3549 ranges: &[Range<u64>],
3550 _filter: &FilterExpression,
3551 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3552 let ranges = ranges.to_vec();
3553 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3554 self, ranges,
3555 )))
3556 }
3557}
3558
3559#[derive(Debug)]
3562pub struct StructuralCompositeDecodeArrayTask {
3563 tasks: Vec<Box<dyn DecodePageTask>>,
3564 should_validate: bool,
3565 data_type: DataType,
3566}
3567
3568impl StructuralCompositeDecodeArrayTask {
3569 fn restore_validity(
3570 array: Arc<dyn Array>,
3571 unraveler: &mut CompositeRepDefUnraveler,
3572 ) -> Arc<dyn Array> {
3573 let validity = unraveler.unravel_validity(array.len());
3574 let Some(validity) = validity else {
3575 return array;
3576 };
3577 if array.data_type() == &DataType::Null {
3578 return array;
3580 }
3581 assert_eq!(validity.len(), array.len());
3582 make_array(unsafe {
3585 array
3586 .to_data()
3587 .into_builder()
3588 .nulls(Some(validity))
3589 .build_unchecked()
3590 })
3591 }
3592}
3593
3594impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3595 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3596 let mut arrays = Vec::with_capacity(self.tasks.len());
3597 let mut unravelers = Vec::with_capacity(self.tasks.len());
3598 let mut data_size = 0u64;
3599 for task in self.tasks {
3600 let decoded = task.decode()?;
3601 data_size += decoded.data.data_size();
3602 unravelers.push(decoded.repdef);
3603
3604 let array = make_array(
3605 decoded
3606 .data
3607 .into_arrow(self.data_type.clone(), self.should_validate)?,
3608 );
3609
3610 arrays.push(array);
3611 }
3612 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3613 let array = arrow_select::concat::concat(&array_refs)?;
3614 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3615
3616 let array = Self::restore_validity(array, &mut repdef);
3617
3618 Ok(DecodedArray {
3619 array,
3620 repdef,
3621 data_size,
3622 })
3623 }
3624}
3625
3626#[derive(Debug)]
3627pub struct StructuralPrimitiveFieldDecoder {
3628 field: Arc<ArrowField>,
3629 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3630 should_validate: bool,
3631 rows_drained_in_current: u64,
3632}
3633
3634impl StructuralPrimitiveFieldDecoder {
3635 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3636 Self {
3637 field: field.clone(),
3638 page_decoders: VecDeque::new(),
3639 should_validate,
3640 rows_drained_in_current: 0,
3641 }
3642 }
3643}
3644
3645impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3646 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3647 assert!(child.path.is_empty());
3648 self.page_decoders.push_back(child.decoder);
3649 Ok(())
3650 }
3651
3652 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3653 let mut remaining = num_rows;
3654 let mut tasks = Vec::new();
3655 while remaining > 0 {
3656 let cur_page = self.page_decoders.front_mut().unwrap();
3657 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3658 let to_take = num_in_page.min(remaining);
3659
3660 let task = cur_page.drain(to_take)?;
3661 tasks.push(task);
3662
3663 if to_take == num_in_page {
3664 self.page_decoders.pop_front();
3665 self.rows_drained_in_current = 0;
3666 } else {
3667 self.rows_drained_in_current += to_take;
3668 }
3669
3670 remaining -= to_take;
3671 }
3672 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3673 tasks,
3674 should_validate: self.should_validate,
3675 data_type: self.field.data_type().clone(),
3676 }))
3677 }
3678
3679 fn data_type(&self) -> &DataType {
3680 self.field.data_type()
3681 }
3682}
3683
3684struct SerializedFullZip {
3686 values: LanceBuffer,
3688 repetition_index: Option<LanceBuffer>,
3690}
3691
3692const MINIBLOCK_ALIGNMENT: usize = 8;
3712
3713pub struct PrimitiveStructuralEncoder {
3740 accumulation_queue: AccumulationQueue,
3742
3743 keep_original_array: bool,
3744 support_large_chunk: bool,
3745 accumulated_repdefs: Vec<RepDefBuilder>,
3746 compression_strategy: Arc<dyn CompressionStrategy>,
3748 column_index: u32,
3749 field: Field,
3750 encoding_metadata: Arc<HashMap<String, String>>,
3751 version: LanceFileVersion,
3752}
3753
3754struct CompressedLevelsChunk {
3755 data: LanceBuffer,
3756 num_levels: u16,
3757}
3758
3759struct CompressedLevels {
3760 data: Vec<CompressedLevelsChunk>,
3761 compression: CompressiveEncoding,
3762 rep_index: Option<LanceBuffer>,
3763}
3764
3765struct SerializedMiniBlockPage {
3766 num_buffers: u64,
3767 data: LanceBuffer,
3768 metadata: LanceBuffer,
3769}
3770
3771#[derive(Debug, Clone, Copy)]
3772struct DictEncodingBudget {
3773 max_dict_entries: u32,
3774 max_encoded_size: usize,
3775}
3776
3777impl PrimitiveStructuralEncoder {
3778 pub fn try_new(
3779 options: &EncodingOptions,
3780 compression_strategy: Arc<dyn CompressionStrategy>,
3781 column_index: u32,
3782 field: Field,
3783 encoding_metadata: Arc<HashMap<String, String>>,
3784 ) -> Result<Self> {
3785 Ok(Self {
3786 accumulation_queue: AccumulationQueue::new(
3787 options.cache_bytes_per_column,
3788 column_index,
3789 options.keep_original_array,
3790 ),
3791 support_large_chunk: options.support_large_chunk(),
3792 keep_original_array: options.keep_original_array,
3793 accumulated_repdefs: Vec::new(),
3794 column_index,
3795 compression_strategy,
3796 field,
3797 encoding_metadata,
3798 version: options.version,
3799 })
3800 }
3801
3802 fn is_narrow(data_block: &DataBlock) -> bool {
3810 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3811
3812 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3813 let max_len_array = max_len_array
3814 .as_any()
3815 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3816 .unwrap();
3817 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3818 return true;
3819 }
3820 }
3821 false
3822 }
3823
3824 fn prefers_miniblock(
3825 data_block: &DataBlock,
3826 encoding_metadata: &HashMap<String, String>,
3827 ) -> bool {
3828 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3830 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3831 }
3832 Self::is_narrow(data_block)
3834 }
3835
3836 fn repdef_too_sparse_for_miniblock(
3849 repdef: &crate::repdef::SerializedRepDefs,
3850 num_values: u64,
3851 ) -> bool {
3852 if num_values == 0 {
3853 return false;
3854 }
3855 let num_levels = repdef
3856 .repetition_levels
3857 .as_ref()
3858 .map(|r| r.len() as u64)
3859 .max(repdef.definition_levels.as_ref().map(|d| d.len() as u64))
3860 .unwrap_or(0);
3861 if num_levels == 0 {
3862 return false;
3863 }
3864
3865 let bits_per_rep = repdef
3867 .repetition_levels
3868 .as_ref()
3869 .and_then(|r| r.iter().max().copied())
3870 .map(|max_val| u16::BITS - max_val.leading_zeros())
3871 .unwrap_or(0) as u64;
3872 let bits_per_def = repdef
3873 .definition_levels
3874 .as_ref()
3875 .and_then(|d| d.iter().max().copied())
3876 .map(|max_val| u16::BITS - max_val.leading_zeros())
3877 .unwrap_or(0) as u64;
3878
3879 let bits_per_level = bits_per_rep + bits_per_def;
3880 if bits_per_level == 0 {
3881 return false;
3882 }
3883
3884 const REPDEF_BUDGET_BITS: u64 = 16 * 1024 * 8;
3886 let max_levels_per_chunk = REPDEF_BUDGET_BITS / bits_per_level;
3887
3888 let levels_per_chunk =
3891 (num_levels as f64 / num_values as f64) * *miniblock::MAX_MINIBLOCK_VALUES as f64;
3892
3893 levels_per_chunk > max_levels_per_chunk as f64
3894 }
3895
3896 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3897 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3901 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3902 }
3903 true
3904 }
3905
3906 fn serialize_miniblocks(
3953 miniblocks: MiniBlockCompressed,
3954 rep: Option<Vec<CompressedLevelsChunk>>,
3955 def: Option<Vec<CompressedLevelsChunk>>,
3956 support_large_chunk: bool,
3957 ) -> Result<SerializedMiniBlockPage> {
3958 let bytes_rep = rep
3959 .as_ref()
3960 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3961 .unwrap_or(0);
3962 let bytes_def = def
3963 .as_ref()
3964 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3965 .unwrap_or(0);
3966 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3967 let mut num_buffers = miniblocks.data.len();
3968 if rep.is_some() {
3969 num_buffers += 1;
3970 }
3971 if def.is_some() {
3972 num_buffers += 1;
3973 }
3974 let max_extra = 9 * num_buffers;
3976 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3977 let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3978 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3979
3980 let mut rep_iter = rep.map(|r| r.into_iter());
3981 let mut def_iter = def.map(|d| d.into_iter());
3982
3983 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3984 for chunk in miniblocks.chunks {
3985 let start_pos = data_buffer.len();
3986 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3988
3989 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3990 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3991
3992 let num_levels = rep
3994 .as_ref()
3995 .map(|r| r.num_levels)
3996 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3997 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3998
3999 if let Some(rep) = rep.as_ref() {
4001 let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
4002 Error::internal(format!(
4003 "Repetition buffer size ({} bytes) too large",
4004 rep.data.len()
4005 ))
4006 })?;
4007 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
4008 }
4009 if let Some(def) = def.as_ref() {
4010 let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
4011 Error::internal(format!(
4012 "Definition buffer size ({} bytes) too large",
4013 def.data.len()
4014 ))
4015 })?;
4016 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
4017 }
4018
4019 if support_large_chunk {
4020 for &buffer_size in &chunk.buffer_sizes {
4021 data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
4022 }
4023 } else {
4024 for &buffer_size in &chunk.buffer_sizes {
4025 data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
4026 }
4027 }
4028
4029 let add_padding = |data_buffer: &mut Vec<u8>| {
4031 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
4032 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
4033 };
4034 add_padding(&mut data_buffer);
4035
4036 if let Some(rep) = rep.as_ref() {
4038 data_buffer.extend_from_slice(&rep.data);
4039 add_padding(&mut data_buffer);
4040 }
4041 if let Some(def) = def.as_ref() {
4042 data_buffer.extend_from_slice(&def.data);
4043 add_padding(&mut data_buffer);
4044 }
4045 for (buffer_size, (buffer, buffer_offset)) in chunk
4046 .buffer_sizes
4047 .iter()
4048 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
4049 {
4050 let start = *buffer_offset;
4051 let end = start + *buffer_size as usize;
4052 *buffer_offset += *buffer_size as usize;
4053 data_buffer.extend_from_slice(&buffer[start..end]);
4054 add_padding(&mut data_buffer);
4055 }
4056
4057 let chunk_bytes = data_buffer.len() - start_pos;
4058 let max_chunk_size = if support_large_chunk {
4059 4 * 1024 * 1024 * 1024 } else {
4061 32 * 1024 };
4063 assert!(chunk_bytes <= max_chunk_size);
4064 assert!(chunk_bytes > 0);
4065 assert_eq!(chunk_bytes % 8, 0);
4066 assert!(chunk.log_num_values <= 12);
4068 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
4072 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
4073
4074 let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
4075 if support_large_chunk {
4076 meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
4077 } else {
4078 meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
4079 }
4080 }
4081
4082 let data_buffer = LanceBuffer::from(data_buffer);
4083 let metadata_buffer = LanceBuffer::from(meta_buffer);
4084
4085 Ok(SerializedMiniBlockPage {
4086 num_buffers: miniblocks.data.len() as u64,
4087 data: data_buffer,
4088 metadata: metadata_buffer,
4089 })
4090 }
4091
4092 fn compress_levels(
4097 mut levels: RepDefSlicer<'_>,
4098 num_elements: u64,
4099 compression_strategy: &dyn CompressionStrategy,
4100 chunks: &[MiniBlockChunk],
4101 max_rep: u16,
4103 ) -> Result<CompressedLevels> {
4104 let mut rep_index = if max_rep > 0 {
4105 Vec::with_capacity(chunks.len())
4106 } else {
4107 vec![]
4108 };
4109 let num_levels = levels.num_levels() as u64;
4111 let levels_buf = levels.all_levels().clone();
4112
4113 let mut fixed_width_block = FixedWidthDataBlock {
4114 data: levels_buf,
4115 bits_per_value: 16,
4116 num_values: num_levels,
4117 block_info: BlockInfo::new(),
4118 };
4119 fixed_width_block.compute_stat();
4121
4122 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4123 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4124 let (compressor, compressor_desc) =
4126 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4127 let mut level_chunks = Vec::with_capacity(chunks.len());
4129 let mut values_counter = 0;
4130 for (chunk_idx, chunk) in chunks.iter().enumerate() {
4131 let chunk_num_values = chunk.num_values(values_counter, num_elements);
4132 debug_assert!(chunk_num_values > 0);
4133 values_counter += chunk_num_values;
4134 let chunk_levels = if chunk_idx < chunks.len() - 1 {
4135 levels.slice_next(chunk_num_values as usize)
4136 } else {
4137 levels.slice_rest()
4138 };
4139 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
4140 if max_rep > 0 {
4141 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
4151 let rep_values = rep_values.as_ref();
4152
4153 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
4156 let num_leftovers = if chunk_idx < chunks.len() - 1 {
4157 rep_values
4158 .iter()
4159 .rev()
4160 .position(|v| *v == max_rep)
4161 .map(|pos| pos + 1)
4163 .unwrap_or(rep_values.len())
4164 } else {
4165 0
4167 };
4168
4169 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4170 let rep_len = rep_index.len();
4174 if rep_index[rep_len - 1] != 0 {
4175 rep_index[rep_len - 2] += 1;
4177 rep_index[rep_len - 1] = 0;
4178 }
4179 }
4180
4181 if chunk_idx == chunks.len() - 1 {
4182 num_rows += 1;
4184 }
4185 rep_index.push(num_rows as u64);
4186 rep_index.push(num_leftovers as u64);
4187 }
4188 let mut chunk_fixed_width = FixedWidthDataBlock {
4189 data: chunk_levels,
4190 bits_per_value: 16,
4191 num_values: num_chunk_levels,
4192 block_info: BlockInfo::new(),
4193 };
4194 chunk_fixed_width.compute_stat();
4195 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4196 let compressed_levels = compressor.compress(chunk_levels_block)?;
4197 level_chunks.push(CompressedLevelsChunk {
4198 data: compressed_levels,
4199 num_levels: num_chunk_levels as u16,
4200 });
4201 }
4202 debug_assert_eq!(levels.num_levels_remaining(), 0);
4203 let rep_index = if rep_index.is_empty() {
4204 None
4205 } else {
4206 Some(LanceBuffer::reinterpret_vec(rep_index))
4207 };
4208 Ok(CompressedLevels {
4209 data: level_chunks,
4210 compression: compressor_desc,
4211 rep_index,
4212 })
4213 }
4214
4215 fn encode_simple_all_null(
4216 column_idx: u32,
4217 num_rows: u64,
4218 row_number: u64,
4219 ) -> Result<EncodedPage> {
4220 let description =
4221 ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4222 Ok(EncodedPage {
4223 column_idx,
4224 data: vec![],
4225 description: PageEncoding::Structural(description),
4226 num_rows,
4227 row_number,
4228 })
4229 }
4230
4231 fn encode_complex_all_null_vals(
4232 data: &Arc<[u16]>,
4233 compression_strategy: &dyn CompressionStrategy,
4234 ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4235 let buffer = LanceBuffer::reinterpret_slice(data.clone());
4236 let mut fixed_width_block = FixedWidthDataBlock {
4237 data: buffer,
4238 bits_per_value: 16,
4239 num_values: data.len() as u64,
4240 block_info: BlockInfo::new(),
4241 };
4242 fixed_width_block.compute_stat();
4243
4244 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4245 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4246 let (compressor, encoding) =
4247 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4248 let compressed_buffer = compressor.compress(levels_block)?;
4249 Ok((compressed_buffer, encoding))
4250 }
4251
4252 fn encode_complex_all_null(
4256 column_idx: u32,
4257 repdef: crate::repdef::SerializedRepDefs,
4258 row_number: u64,
4259 num_rows: u64,
4260 version: LanceFileVersion,
4261 compression_strategy: &dyn CompressionStrategy,
4262 ) -> Result<EncodedPage> {
4263 if version.resolve() < LanceFileVersion::V2_2 {
4264 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4265 LanceBuffer::reinterpret_slice(rep.clone())
4266 } else {
4267 LanceBuffer::empty()
4268 };
4269
4270 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4271 LanceBuffer::reinterpret_slice(def.clone())
4272 } else {
4273 LanceBuffer::empty()
4274 };
4275
4276 let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4277 return Ok(EncodedPage {
4278 column_idx,
4279 data: vec![rep_bytes, def_bytes],
4280 description: PageEncoding::Structural(description),
4281 num_rows,
4282 row_number,
4283 });
4284 }
4285
4286 let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4287 repdef.repetition_levels.as_ref()
4288 {
4289 let num_values = rep.len() as u64;
4290 let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4291 (buffer, Some(encoding), num_values)
4292 } else {
4293 (LanceBuffer::empty(), None, 0)
4294 };
4295
4296 let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4297 repdef.definition_levels.as_ref()
4298 {
4299 let num_values = def.len() as u64;
4300 let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4301 (buffer, Some(encoding), num_values)
4302 } else {
4303 (LanceBuffer::empty(), None, 0)
4304 };
4305
4306 let description = ProtobufUtils21::compressed_all_null_constant_layout(
4307 &repdef.def_meaning,
4308 rep_encoding,
4309 def_encoding,
4310 num_rep_values,
4311 num_def_values,
4312 );
4313 Ok(EncodedPage {
4314 column_idx,
4315 data: vec![rep_bytes, def_bytes],
4316 description: PageEncoding::Structural(description),
4317 num_rows,
4318 row_number,
4319 })
4320 }
4321
4322 fn leaf_validity(
4323 repdef: &crate::repdef::SerializedRepDefs,
4324 num_values: usize,
4325 ) -> Result<Option<BooleanBuffer>> {
4326 let rep = repdef
4327 .repetition_levels
4328 .as_ref()
4329 .map(|rep| rep.as_ref().to_vec());
4330 let def = repdef
4331 .definition_levels
4332 .as_ref()
4333 .map(|def| def.as_ref().to_vec());
4334 let mut unraveler = RepDefUnraveler::new(
4335 rep,
4336 def,
4337 repdef.def_meaning.clone().into(),
4338 num_values as u64,
4339 );
4340 if unraveler.is_all_valid() {
4341 return Ok(None);
4342 }
4343 let mut validity = BooleanBufferBuilder::new(num_values);
4344 unraveler.unravel_validity(&mut validity);
4345 Ok(Some(validity.finish()))
4346 }
4347
4348 fn is_constant_values(
4349 arrays: &[ArrayRef],
4350 scalar: &ArrayRef,
4351 validity: Option<&BooleanBuffer>,
4352 ) -> Result<bool> {
4353 debug_assert_eq!(scalar.len(), 1);
4354 debug_assert_eq!(scalar.null_count(), 0);
4355
4356 match scalar.data_type() {
4357 DataType::Boolean => {
4358 let mut global_idx = 0usize;
4359 let scalar_val = scalar.as_boolean().value(0);
4360 for arr in arrays {
4361 let bool_arr = arr.as_boolean();
4362 for i in 0..arr.len() {
4363 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4364 global_idx += 1;
4365 if !is_valid {
4366 continue;
4367 }
4368 if bool_arr.value(i) != scalar_val {
4369 return Ok(false);
4370 }
4371 }
4372 }
4373 Ok(true)
4374 }
4375 DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4376 DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4377 DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4378 DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4379 data_type => {
4380 let mut global_idx = 0usize;
4381 let Some(byte_width) = data_type.byte_width_opt() else {
4382 return Ok(false);
4383 };
4384 let scalar_data = scalar.to_data();
4385 if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4386 return Ok(false);
4387 }
4388 let scalar_bytes = scalar_data.buffers()[0].as_slice();
4389 if scalar_bytes.len() != byte_width {
4390 return Ok(false);
4391 }
4392
4393 for arr in arrays {
4394 let data = arr.to_data();
4395 if data.buffers().is_empty() {
4396 return Ok(false);
4397 }
4398 let buf = data.buffers()[0].as_slice();
4399 let base = data.offset();
4400 for i in 0..arr.len() {
4401 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4402 global_idx += 1;
4403 if !is_valid {
4404 continue;
4405 }
4406 let start = (base + i) * byte_width;
4407 if buf[start..start + byte_width] != scalar_bytes[..] {
4408 return Ok(false);
4409 }
4410 }
4411 }
4412 Ok(true)
4413 }
4414 }
4415 }
4416
4417 fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4418 arrays: &[ArrayRef],
4419 scalar: &ArrayRef,
4420 validity: Option<&BooleanBuffer>,
4421 ) -> Result<bool> {
4422 debug_assert_eq!(scalar.len(), 1);
4423 let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4424 let mut global_idx = 0usize;
4425 for arr in arrays {
4426 let str_arr = arr.as_string::<O>();
4427 for i in 0..arr.len() {
4428 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4429 global_idx += 1;
4430 if !is_valid {
4431 continue;
4432 }
4433 if str_arr.value(i).as_bytes() != scalar_val {
4434 return Ok(false);
4435 }
4436 }
4437 }
4438 Ok(true)
4439 }
4440
4441 fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4442 arrays: &[ArrayRef],
4443 scalar: &ArrayRef,
4444 validity: Option<&BooleanBuffer>,
4445 ) -> Result<bool> {
4446 debug_assert_eq!(scalar.len(), 1);
4447 let scalar_val = scalar.as_binary::<O>().value(0);
4448 let mut global_idx = 0usize;
4449 for arr in arrays {
4450 let bin_arr = arr.as_binary::<O>();
4451 for i in 0..arr.len() {
4452 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4453 global_idx += 1;
4454 if !is_valid {
4455 continue;
4456 }
4457 if bin_arr.value(i) != scalar_val {
4458 return Ok(false);
4459 }
4460 }
4461 }
4462 Ok(true)
4463 }
4464
4465 fn find_constant_scalar(
4466 arrays: &[ArrayRef],
4467 validity: Option<&BooleanBuffer>,
4468 ) -> Result<Option<ArrayRef>> {
4469 if arrays.is_empty() {
4470 return Ok(None);
4471 }
4472
4473 let global_scalar_idx = if let Some(validity) = validity {
4474 let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4475 return Ok(None);
4476 };
4477 idx
4478 } else {
4479 0
4480 };
4481
4482 let mut idx_remaining = global_scalar_idx;
4483 let mut scalar_arr_idx = 0usize;
4484 while scalar_arr_idx < arrays.len() {
4485 let len = arrays[scalar_arr_idx].len();
4486 if idx_remaining < len {
4487 break;
4488 }
4489 idx_remaining -= len;
4490 scalar_arr_idx += 1;
4491 }
4492
4493 if scalar_arr_idx >= arrays.len() {
4494 return Ok(None);
4495 }
4496
4497 let scalar =
4498 lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4499 if scalar.null_count() != 0 {
4500 return Ok(None);
4501 }
4502 if !Self::is_constant_values(arrays, &scalar, validity)? {
4503 return Ok(None);
4504 }
4505 Ok(Some(scalar))
4506 }
4507
4508 fn resolve_dict_values_compression_metadata(
4509 field_metadata: &HashMap<String, String>,
4510 env_compression: Option<String>,
4511 env_compression_level: Option<String>,
4512 ) -> HashMap<String, String> {
4513 let mut metadata = HashMap::new();
4514
4515 let compression = field_metadata
4516 .get(DICT_VALUES_COMPRESSION_META_KEY)
4517 .cloned()
4518 .or(env_compression)
4519 .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4520 metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4521
4522 if let Some(compression_level) = field_metadata
4523 .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4524 .cloned()
4525 .or(env_compression_level)
4526 {
4527 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4528 }
4529
4530 metadata
4531 }
4532
4533 fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4534 let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4539 dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4540 &field.metadata,
4541 env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4542 env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4543 );
4544 Ok(dict_values_field)
4545 }
4546
4547 #[allow(clippy::too_many_arguments)]
4548 fn encode_miniblock(
4549 column_idx: u32,
4550 field: &Field,
4551 compression_strategy: &dyn CompressionStrategy,
4552 data: DataBlock,
4553 repdef: crate::repdef::SerializedRepDefs,
4554 row_number: u64,
4555 dictionary_data: Option<DataBlock>,
4556 num_rows: u64,
4557 support_large_chunk: bool,
4558 ) -> Result<EncodedPage> {
4559 if let DataBlock::AllNull(_null_block) = data {
4560 unreachable!()
4563 }
4564
4565 let num_items = data.num_values();
4566
4567 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4568 let (compressed_data, value_encoding) = compressor.compress(data)?;
4569
4570 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4571
4572 let mut compressed_rep = repdef
4573 .rep_slicer()
4574 .map(|rep_slicer| {
4575 Self::compress_levels(
4576 rep_slicer,
4577 num_items,
4578 compression_strategy,
4579 &compressed_data.chunks,
4580 max_rep,
4581 )
4582 })
4583 .transpose()?;
4584
4585 let (rep_index, rep_index_depth) =
4586 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4587 Some(rep_index) => (Some(rep_index.clone()), 1),
4588 None => (None, 0),
4589 };
4590
4591 let mut compressed_def = repdef
4592 .def_slicer()
4593 .map(|def_slicer| {
4594 Self::compress_levels(
4595 def_slicer,
4596 num_items,
4597 compression_strategy,
4598 &compressed_data.chunks,
4599 0,
4600 )
4601 })
4602 .transpose()?;
4603
4604 let rep_data = compressed_rep
4610 .as_mut()
4611 .map(|cr| std::mem::take(&mut cr.data));
4612 let def_data = compressed_def
4613 .as_mut()
4614 .map(|cd| std::mem::take(&mut cd.data));
4615
4616 let serialized =
4617 Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
4618
4619 let mut data = Vec::with_capacity(4);
4621 data.push(serialized.metadata);
4622 data.push(serialized.data);
4623
4624 if let Some(dictionary_data) = dictionary_data {
4625 let num_dictionary_items = dictionary_data.num_values();
4626 let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4627
4628 let (compressor, dictionary_encoding) = compression_strategy
4629 .create_block_compressor(&dict_values_field, &dictionary_data)?;
4630 let dictionary_buffer = compressor.compress(dictionary_data)?;
4631
4632 data.push(dictionary_buffer);
4633 if let Some(rep_index) = rep_index {
4634 data.push(rep_index);
4635 }
4636
4637 let description = ProtobufUtils21::miniblock_layout(
4638 compressed_rep.map(|cr| cr.compression),
4639 compressed_def.map(|cd| cd.compression),
4640 value_encoding,
4641 rep_index_depth,
4642 serialized.num_buffers,
4643 Some((dictionary_encoding, num_dictionary_items)),
4644 &repdef.def_meaning,
4645 num_items,
4646 support_large_chunk,
4647 );
4648 Ok(EncodedPage {
4649 num_rows,
4650 column_idx,
4651 data,
4652 description: PageEncoding::Structural(description),
4653 row_number,
4654 })
4655 } else {
4656 let description = ProtobufUtils21::miniblock_layout(
4657 compressed_rep.map(|cr| cr.compression),
4658 compressed_def.map(|cd| cd.compression),
4659 value_encoding,
4660 rep_index_depth,
4661 serialized.num_buffers,
4662 None,
4663 &repdef.def_meaning,
4664 num_items,
4665 support_large_chunk,
4666 );
4667
4668 if let Some(rep_index) = rep_index {
4669 let view = rep_index.borrow_to_typed_slice::<u64>();
4670 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4671 debug_assert_eq!(total, num_rows);
4672
4673 data.push(rep_index);
4674 }
4675
4676 Ok(EncodedPage {
4677 num_rows,
4678 column_idx,
4679 data,
4680 description: PageEncoding::Structural(description),
4681 row_number,
4682 })
4683 }
4684 }
4685
4686 fn serialize_full_zip_fixed(
4688 fixed: FixedWidthDataBlock,
4689 mut repdef: ControlWordIterator,
4690 num_values: u64,
4691 ) -> SerializedFullZip {
4692 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4693 let mut zipped_data = Vec::with_capacity(len);
4694
4695 let max_rep_index_val = if repdef.has_repetition() {
4696 len as u64
4697 } else {
4698 0
4700 };
4701 let mut rep_index_builder =
4702 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4703
4704 assert_eq!(
4707 fixed.bits_per_value % 8,
4708 0,
4709 "Non-byte aligned full-zip compression not yet supported"
4710 );
4711
4712 let bytes_per_value = fixed.bits_per_value as usize / 8;
4713 let mut offset = 0;
4714
4715 if bytes_per_value == 0 {
4716 while let Some(control) = repdef.append_next(&mut zipped_data) {
4718 if control.is_new_row {
4719 debug_assert!(offset <= len);
4721 unsafe { rep_index_builder.append(offset as u64) };
4723 }
4724 offset = zipped_data.len();
4725 }
4726 } else {
4727 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4729 while let Some(control) = repdef.append_next(&mut zipped_data) {
4730 if control.is_new_row {
4731 debug_assert!(offset <= len);
4733 unsafe { rep_index_builder.append(offset as u64) };
4735 }
4736 if control.is_visible {
4737 let value = data_iter.next().unwrap();
4738 zipped_data.extend_from_slice(value);
4739 }
4740 offset = zipped_data.len();
4741 }
4742 }
4743
4744 debug_assert_eq!(zipped_data.len(), len);
4745 unsafe {
4748 rep_index_builder.append(zipped_data.len() as u64);
4749 }
4750
4751 let zipped_data = LanceBuffer::from(zipped_data);
4752 let rep_index = rep_index_builder.into_data();
4753 let rep_index = if rep_index.is_empty() {
4754 None
4755 } else {
4756 Some(LanceBuffer::from(rep_index))
4757 };
4758 SerializedFullZip {
4759 values: zipped_data,
4760 repetition_index: rep_index,
4761 }
4762 }
4763
4764 fn serialize_full_zip_variable(
4768 variable: VariableWidthBlock,
4769 mut repdef: ControlWordIterator,
4770 num_items: u64,
4771 ) -> SerializedFullZip {
4772 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4773 assert_eq!(
4774 variable.bits_per_offset % 8,
4775 0,
4776 "Only byte-aligned offsets supported"
4777 );
4778 let len = variable.data.len()
4779 + repdef.bytes_per_word() * num_items as usize
4780 + bytes_per_offset * variable.num_values as usize;
4781 let mut buf = Vec::with_capacity(len);
4782
4783 let max_rep_index_val = len as u64;
4784 let mut rep_index_builder =
4785 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4786
4787 match bytes_per_offset {
4789 4 => {
4790 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4791 let mut rep_offset = 0;
4792 let mut windows_iter = offs.as_ref().windows(2);
4793 while let Some(control) = repdef.append_next(&mut buf) {
4794 if control.is_new_row {
4795 debug_assert!(rep_offset <= len);
4797 unsafe { rep_index_builder.append(rep_offset as u64) };
4799 }
4800 if control.is_visible {
4801 let window = windows_iter.next().unwrap();
4802 if control.is_valid_item {
4803 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4804 buf.extend_from_slice(
4805 &variable.data[window[0] as usize..window[1] as usize],
4806 );
4807 }
4808 }
4809 rep_offset = buf.len();
4810 }
4811 }
4812 8 => {
4813 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4814 let mut rep_offset = 0;
4815 let mut windows_iter = offs.as_ref().windows(2);
4816 while let Some(control) = repdef.append_next(&mut buf) {
4817 if control.is_new_row {
4818 debug_assert!(rep_offset <= len);
4820 unsafe { rep_index_builder.append(rep_offset as u64) };
4822 }
4823 if control.is_visible {
4824 let window = windows_iter.next().unwrap();
4825 if control.is_valid_item {
4826 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4827 buf.extend_from_slice(
4828 &variable.data[window[0] as usize..window[1] as usize],
4829 );
4830 }
4831 }
4832 rep_offset = buf.len();
4833 }
4834 }
4835 _ => panic!("Unsupported offset size"),
4836 }
4837
4838 debug_assert!(buf.len() <= len);
4841 unsafe {
4844 rep_index_builder.append(buf.len() as u64);
4845 }
4846
4847 let zipped_data = LanceBuffer::from(buf);
4848 let rep_index = rep_index_builder.into_data();
4849 debug_assert!(!rep_index.is_empty());
4850 let rep_index = Some(LanceBuffer::from(rep_index));
4851 SerializedFullZip {
4852 values: zipped_data,
4853 repetition_index: rep_index,
4854 }
4855 }
4856
4857 fn serialize_full_zip(
4860 compressed_data: PerValueDataBlock,
4861 repdef: ControlWordIterator,
4862 num_items: u64,
4863 ) -> SerializedFullZip {
4864 match compressed_data {
4865 PerValueDataBlock::Fixed(fixed) => {
4866 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4867 }
4868 PerValueDataBlock::Variable(var) => {
4869 Self::serialize_full_zip_variable(var, repdef, num_items)
4870 }
4871 }
4872 }
4873
4874 fn expand_boolean_to_bytes(fixed: FixedWidthDataBlock) -> FixedWidthDataBlock {
4875 debug_assert_eq!(fixed.bits_per_value, 1);
4876 let num_values = fixed.num_values as usize;
4877 let bool_buf = BooleanBuffer::new(fixed.data.into_buffer(), 0, num_values);
4878 let expanded: Vec<u8> = (0..num_values).map(|i| bool_buf.value(i) as u8).collect();
4879 FixedWidthDataBlock {
4880 data: LanceBuffer::from(expanded),
4881 bits_per_value: 8,
4882 num_values: fixed.num_values,
4883 block_info: BlockInfo::new(),
4884 }
4885 }
4886
4887 fn encode_full_zip(
4888 column_idx: u32,
4889 field: &Field,
4890 compression_strategy: &dyn CompressionStrategy,
4891 data: DataBlock,
4892 repdef: crate::repdef::SerializedRepDefs,
4893 row_number: u64,
4894 num_lists: u64,
4895 ) -> Result<EncodedPage> {
4896 let max_rep = repdef
4897 .repetition_levels
4898 .as_ref()
4899 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4900 let max_def = repdef
4901 .definition_levels
4902 .as_ref()
4903 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4904
4905 let (num_items, num_visible_items) =
4909 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4910 (rep_levels.len() as u64, data.num_values())
4913 } else {
4914 (data.num_values(), data.num_values())
4916 };
4917
4918 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4919
4920 let repdef_iter = build_control_word_iterator(
4921 repdef.repetition_levels.as_deref(),
4922 max_rep,
4923 repdef.definition_levels.as_deref(),
4924 max_def,
4925 max_visible_def,
4926 num_items as usize,
4927 );
4928 let bits_rep = repdef_iter.bits_rep();
4929 let bits_def = repdef_iter.bits_def();
4930
4931 let data = match data {
4933 DataBlock::FixedWidth(fixed) if fixed.bits_per_value == 1 => {
4934 DataBlock::FixedWidth(Self::expand_boolean_to_bytes(fixed))
4935 }
4936 other => other,
4937 };
4938
4939 let compressor = compression_strategy.create_per_value(field, &data)?;
4940 let (compressed_data, value_encoding) = compressor.compress(data)?;
4941
4942 let description = match &compressed_data {
4943 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4944 bits_rep,
4945 bits_def,
4946 fixed.bits_per_value as u32,
4947 value_encoding,
4948 &repdef.def_meaning,
4949 num_items as u32,
4950 num_visible_items as u32,
4951 ),
4952 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4953 bits_rep,
4954 bits_def,
4955 variable.bits_per_offset as u32,
4956 value_encoding,
4957 &repdef.def_meaning,
4958 num_items as u32,
4959 num_visible_items as u32,
4960 ),
4961 };
4962
4963 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4964
4965 let data = if let Some(repindex) = zipped.repetition_index {
4966 vec![zipped.values, repindex]
4967 } else {
4968 vec![zipped.values]
4969 };
4970
4971 Ok(EncodedPage {
4972 num_rows: num_lists,
4973 column_idx,
4974 data,
4975 description: PageEncoding::Structural(description),
4976 row_number,
4977 })
4978 }
4979
4980 fn should_dictionary_encode(
4981 data_block: &DataBlock,
4982 field: &Field,
4983 version: LanceFileVersion,
4984 ) -> Option<DictEncodingBudget> {
4985 const DEFAULT_SAMPLE_SIZE: usize = 4096;
4986 const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
4987
4988 match data_block {
4991 DataBlock::FixedWidth(fixed) => {
4992 if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4993 return None;
4994 }
4995 if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4996 return None;
4997 }
4998 if fixed.bits_per_value % 8 != 0 {
4999 return None;
5000 }
5001 }
5002 DataBlock::VariableWidth(var) => {
5003 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
5004 return None;
5005 }
5006 }
5007 _ => return None,
5008 }
5009
5010 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
5012 .ok()
5013 .and_then(|val| val.parse().ok())
5014 .unwrap_or(100);
5015 if data_block.num_values() < too_small {
5016 return None;
5017 }
5018
5019 let num_values = data_block.num_values();
5020
5021 let divisor: u64 = field
5024 .metadata
5025 .get(DICT_DIVISOR_META_KEY)
5026 .and_then(|val| val.parse().ok())
5027 .or_else(|| {
5028 env::var("LANCE_ENCODING_DICT_DIVISOR")
5029 .ok()
5030 .and_then(|val| val.parse().ok())
5031 })
5032 .unwrap_or(DEFAULT_DICT_DIVISOR);
5033
5034 let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
5035 .ok()
5036 .and_then(|val| val.parse().ok())
5037 .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
5038
5039 let threshold_cardinality = num_values
5040 .checked_div(divisor.max(1))
5041 .unwrap_or(0)
5042 .min(max_cardinality);
5043 if threshold_cardinality == 0 {
5044 return None;
5045 }
5046
5047 let threshold_ratio = field
5049 .metadata
5050 .get(DICT_SIZE_RATIO_META_KEY)
5051 .and_then(|val| val.parse::<f64>().ok())
5052 .or_else(|| {
5053 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
5054 .ok()
5055 .and_then(|val| val.parse().ok())
5056 })
5057 .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
5058
5059 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
5060 panic!(
5061 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
5062 threshold_ratio
5063 );
5064 }
5065
5066 let data_size = data_block.data_size();
5067 if data_size == 0 {
5068 return None;
5069 }
5070
5071 let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
5072 let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
5073
5074 if Self::sample_is_near_unique(
5076 data_block,
5077 DEFAULT_SAMPLE_SIZE,
5078 DEFAULT_SAMPLE_UNIQUE_RATIO,
5079 )? {
5080 return None;
5081 }
5082
5083 let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
5084 Some(DictEncodingBudget {
5085 max_dict_entries,
5086 max_encoded_size,
5087 })
5088 }
5089
5090 fn sample_is_near_unique(
5096 data_block: &DataBlock,
5097 max_samples: usize,
5098 unique_ratio_threshold: f64,
5099 ) -> Option<bool> {
5100 use std::collections::HashSet;
5101
5102 if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
5103 return None;
5104 }
5105
5106 let num_values = usize::try_from(data_block.num_values()).ok()?;
5107 if num_values == 0 {
5108 return Some(false);
5109 }
5110
5111 let sample_count = num_values.min(max_samples).max(1);
5112 let step = (num_values / sample_count).max(1);
5114
5115 match data_block {
5116 DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
5117 64 => {
5118 let values = fixed.data.borrow_to_typed_slice::<u64>();
5119 let values = values.as_ref();
5120 let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5121 for idx in (0..num_values).step_by(step).take(sample_count) {
5122 unique.insert(values.get(idx).copied()?);
5123 }
5124 let ratio = unique.len() as f64 / sample_count as f64;
5125 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5127 }
5128 128 => {
5129 let values = fixed.data.borrow_to_typed_slice::<u128>();
5130 let values = values.as_ref();
5131 let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
5132 for idx in (0..num_values).step_by(step).take(sample_count) {
5133 unique.insert(values.get(idx).copied()?);
5134 }
5135 let ratio = unique.len() as f64 / sample_count as f64;
5136 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5137 }
5138 _ => Some(false),
5139 },
5140 DataBlock::VariableWidth(var) => {
5141 use xxhash_rust::xxh3::xxh3_64;
5142
5143 let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5145 match var.bits_per_offset {
5146 32 => {
5147 let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
5148 let offsets: &[u32] = offsets_ref.as_ref();
5149 for i in (0..num_values).step_by(step).take(sample_count) {
5150 let start = usize::try_from(*offsets.get(i)?).ok()?;
5151 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5152 if start > end || end > var.data.len() {
5153 return None;
5154 }
5155 unique.insert(xxh3_64(&var.data[start..end]));
5156 }
5157 }
5158 64 => {
5159 let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
5160 let offsets: &[u64] = offsets_ref.as_ref();
5161 for i in (0..num_values).step_by(step).take(sample_count) {
5162 let start = usize::try_from(*offsets.get(i)?).ok()?;
5163 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5164 if start > end || end > var.data.len() {
5165 return None;
5166 }
5167 unique.insert(xxh3_64(&var.data[start..end]));
5168 }
5169 }
5170 _ => return Some(false),
5171 }
5172 let ratio = unique.len() as f64 / sample_count as f64;
5173 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5174 }
5175 _ => Some(false),
5176 }
5177 }
5178
5179 fn do_flush(
5181 &mut self,
5182 arrays: Vec<ArrayRef>,
5183 repdefs: Vec<RepDefBuilder>,
5184 row_number: u64,
5185 num_rows: u64,
5186 ) -> Result<Vec<EncodeTask>> {
5187 let column_idx = self.column_index;
5188 let compression_strategy = self.compression_strategy.clone();
5189 let field = self.field.clone();
5190 let encoding_metadata = self.encoding_metadata.clone();
5191 let support_large_chunk = self.support_large_chunk;
5192 let version = self.version;
5193 let task = spawn_cpu(move || {
5194 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5195 let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5196 let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5197 let repdef = RepDefBuilder::serialize(repdefs);
5198
5199 if num_values == 0 {
5200 log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
5204 return Self::encode_complex_all_null(
5205 column_idx,
5206 repdef,
5207 row_number,
5208 num_rows,
5209 version,
5210 compression_strategy.as_ref(),
5211 );
5212 }
5213
5214 let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5215 let all_null = leaf_validity
5216 .as_ref()
5217 .map(|validity| validity.count_set_bits() == 0)
5218 .unwrap_or(false);
5219
5220 if all_null {
5221 return if is_simple_validity {
5222 log::debug!(
5223 "Encoding column {} with {} items ({} rows) using simple-null layout",
5224 column_idx,
5225 num_values,
5226 num_rows
5227 );
5228 Self::encode_simple_all_null(column_idx, num_values, row_number)
5229 } else {
5230 log::debug!(
5231 "Encoding column {} with {} items ({} rows) using complex-null layout",
5232 column_idx,
5233 num_values,
5234 num_rows
5235 );
5236 Self::encode_complex_all_null(
5237 column_idx,
5238 repdef,
5239 row_number,
5240 num_rows,
5241 version,
5242 compression_strategy.as_ref(),
5243 )
5244 };
5245 }
5246
5247 if let DataType::Struct(fields) = &field.data_type()
5248 && fields.is_empty()
5249 {
5250 if has_repdef_info {
5251 return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5252 }
5253 return Self::encode_simple_all_null(column_idx, num_values, row_number);
5256 }
5257
5258 let data_block = DataBlock::from_arrays(&arrays, num_values);
5259
5260 if version.resolve() >= LanceFileVersion::V2_2
5261 && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5262 {
5263 log::debug!(
5264 "Encoding column {} with {} items ({} rows) using constant layout",
5265 column_idx,
5266 num_values,
5267 num_rows
5268 );
5269 return constant::encode_constant_page(
5270 column_idx,
5271 scalar,
5272 repdef,
5273 row_number,
5274 num_rows,
5275 );
5276 }
5277
5278 let requires_full_zip_packed_struct =
5279 if let DataBlock::Struct(ref struct_data_block) = data_block {
5280 struct_data_block.has_variable_width_child()
5281 } else {
5282 false
5283 };
5284
5285 if requires_full_zip_packed_struct {
5286 log::debug!(
5287 "Encoding column {} with {} items using full-zip packed struct layout",
5288 column_idx,
5289 num_values
5290 );
5291 return Self::encode_full_zip(
5292 column_idx,
5293 &field,
5294 compression_strategy.as_ref(),
5295 data_block,
5296 repdef,
5297 row_number,
5298 num_rows,
5299 );
5300 }
5301
5302 let too_sparse = Self::repdef_too_sparse_for_miniblock(&repdef, num_values);
5306
5307 if !too_sparse {
5308 if let DataBlock::Dictionary(dict) = data_block {
5309 log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
5310 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5311 indices_data_block.compute_stat();
5316 return Self::encode_miniblock(
5317 column_idx,
5318 &field,
5319 compression_strategy.as_ref(),
5320 indices_data_block,
5321 repdef,
5322 row_number,
5323 Some(dictionary_data_block),
5324 num_rows,
5325 support_large_chunk,
5326 );
5327 }
5328 } else {
5329 log::debug!(
5330 "Encoding column {} with {} items using full-zip layout \
5331 (rep/def too sparse for mini-block)",
5332 column_idx,
5333 num_values
5334 );
5335 }
5336
5337 {
5338 let dict_result = if too_sparse {
5341 None
5342 } else {
5343 Self::should_dictionary_encode(&data_block, &field, version)
5344 .and_then(|budget| {
5345 log::debug!(
5346 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5347 column_idx,
5348 num_values
5349 );
5350 dict::dictionary_encode(
5351 &data_block,
5352 budget.max_dict_entries,
5353 budget.max_encoded_size,
5354 )
5355 })
5356 };
5357
5358 if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5359 Self::encode_miniblock(
5360 column_idx,
5361 &field,
5362 compression_strategy.as_ref(),
5363 indices_data_block,
5364 repdef,
5365 row_number,
5366 Some(dictionary_data_block),
5367 num_rows,
5368 support_large_chunk,
5369 )
5370 } else if !too_sparse && Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5371 log::debug!(
5372 "Encoding column {} with {} items using mini-block layout",
5373 column_idx,
5374 num_values
5375 );
5376 Self::encode_miniblock(
5377 column_idx,
5378 &field,
5379 compression_strategy.as_ref(),
5380 data_block,
5381 repdef,
5382 row_number,
5383 None,
5384 num_rows,
5385 support_large_chunk,
5386 )
5387 } else if too_sparse || Self::prefers_fullzip(encoding_metadata.as_ref()) {
5388 log::debug!(
5389 "Encoding column {} with {} items using full-zip layout",
5390 column_idx,
5391 num_values
5392 );
5393 Self::encode_full_zip(
5394 column_idx,
5395 &field,
5396 compression_strategy.as_ref(),
5397 data_block,
5398 repdef,
5399 row_number,
5400 num_rows,
5401 )
5402 } else {
5403 Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5404 }
5405 }
5406 })
5407 .boxed();
5408 Ok(vec![task])
5409 }
5410
5411 fn extract_validity_buf(
5412 array: Arc<dyn Array>,
5413 repdef: &mut RepDefBuilder,
5414 keep_original_array: bool,
5415 ) -> Result<Arc<dyn Array>> {
5416 if let Some(validity) = array.nulls() {
5417 if keep_original_array {
5418 repdef.add_validity_bitmap(validity.clone());
5419 } else {
5420 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5421 }
5422 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5423 Ok(make_array(data_no_nulls))
5424 } else {
5425 repdef.add_no_null(array.len());
5426 Ok(array)
5427 }
5428 }
5429
5430 fn extract_validity(
5431 mut array: Arc<dyn Array>,
5432 repdef: &mut RepDefBuilder,
5433 keep_original_array: bool,
5434 ) -> Result<Arc<dyn Array>> {
5435 match array.data_type() {
5436 DataType::Null => {
5437 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5438 Ok(array)
5439 }
5440 DataType::Dictionary(_, _) => {
5441 array = dict::normalize_dict_nulls(array)?;
5442 Self::extract_validity_buf(array, repdef, keep_original_array)
5443 }
5444 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5453 }
5454 }
5455}
5456
5457impl FieldEncoder for PrimitiveStructuralEncoder {
5458 fn maybe_encode(
5460 &mut self,
5461 array: ArrayRef,
5462 _external_buffers: &mut OutOfLineBuffers,
5463 mut repdef: RepDefBuilder,
5464 row_number: u64,
5465 num_rows: u64,
5466 ) -> Result<Vec<EncodeTask>> {
5467 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5468 self.accumulated_repdefs.push(repdef);
5469
5470 if let Some((arrays, row_number, num_rows)) =
5471 self.accumulation_queue.insert(array, row_number, num_rows)
5472 {
5473 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5474 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5475 } else {
5476 Ok(vec![])
5477 }
5478 }
5479
5480 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5482 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5483 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5484 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5485 } else {
5486 Ok(vec![])
5487 }
5488 }
5489
5490 fn num_columns(&self) -> u32 {
5491 1
5492 }
5493
5494 fn finish(
5495 &mut self,
5496 _external_buffers: &mut OutOfLineBuffers,
5497 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5498 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5499 }
5500}
5501
5502#[cfg(test)]
5503#[allow(clippy::single_range_in_vec_init)]
5504mod tests {
5505 use super::{
5506 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5507 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5508 FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
5509 PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
5510 };
5511 use crate::buffer::LanceBuffer;
5512 use crate::compression::DefaultDecompressionStrategy;
5513 use crate::constants::{
5514 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5515 DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5516 STRUCTURAL_ENCODING_MINIBLOCK,
5517 };
5518 use crate::data::BlockInfo;
5519 use crate::decoder::PageEncoding;
5520 use crate::encodings::logical::primitive::{
5521 ChunkDrainInstructions, PrimitiveStructuralEncoder,
5522 };
5523 use crate::format::ProtobufUtils21;
5524 use crate::format::pb21;
5525 use crate::format::pb21::compressive_encoding::Compression;
5526 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5527 use crate::version::LanceFileVersion;
5528 use arrow_array::{ArrayRef, Int8Array, StringArray};
5529 use arrow_schema::DataType;
5530 use std::collections::HashMap;
5531 use std::{collections::VecDeque, sync::Arc};
5532
5533 #[test]
5534 fn test_is_narrow() {
5535 let int8_array = Int8Array::from(vec![1, 2, 3]);
5536 let array_ref: ArrayRef = Arc::new(int8_array);
5537 let block = DataBlock::from_array(array_ref);
5538
5539 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5540
5541 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5542 let block = DataBlock::from_array(string_array);
5543 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5544
5545 let string_array = StringArray::from(vec![
5546 Some("hello world".repeat(100)),
5547 Some("world".to_string()),
5548 ]);
5549 let block = DataBlock::from_array(string_array);
5550 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5551 }
5552
5553 #[test]
5554 fn test_map_range() {
5555 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5558 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5559 let max_visible_def = 0;
5560 let total_items = 8;
5561 let max_rep = 1;
5562
5563 let check = |range, expected_item_range, expected_level_range| {
5564 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5565 range,
5566 rep.as_ref(),
5567 def.as_ref(),
5568 max_rep,
5569 max_visible_def,
5570 total_items,
5571 PreambleAction::Absent,
5572 );
5573 assert_eq!(item_range, expected_item_range);
5574 assert_eq!(level_range, expected_level_range);
5575 };
5576
5577 check(0..1, 0..3, 0..3);
5578 check(1..2, 3..5, 3..5);
5579 check(2..3, 5..5, 5..6);
5580 check(3..4, 5..8, 6..9);
5581 check(0..2, 0..5, 0..5);
5582 check(1..3, 3..5, 3..6);
5583 check(2..4, 5..8, 5..9);
5584 check(0..3, 0..5, 0..6);
5585 check(1..4, 3..8, 3..9);
5586 check(0..4, 0..8, 0..9);
5587
5588 let rep = Some(vec![1, 1, 0, 1]);
5591 let def = Some(vec![1, 0, 0, 0]);
5592 let max_visible_def = 0;
5593 let total_items = 3;
5594
5595 let check = |range, expected_item_range, expected_level_range| {
5596 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5597 range,
5598 rep.as_ref(),
5599 def.as_ref(),
5600 max_rep,
5601 max_visible_def,
5602 total_items,
5603 PreambleAction::Absent,
5604 );
5605 assert_eq!(item_range, expected_item_range);
5606 assert_eq!(level_range, expected_level_range);
5607 };
5608
5609 check(0..1, 0..0, 0..1);
5610 check(1..2, 0..2, 1..3);
5611 check(2..3, 2..3, 3..4);
5612 check(0..2, 0..2, 0..3);
5613 check(1..3, 0..3, 1..4);
5614 check(0..3, 0..3, 0..4);
5615
5616 let rep = Some(vec![1, 1, 0, 1]);
5619 let def = Some(vec![0, 0, 0, 1]);
5620 let max_visible_def = 0;
5621 let total_items = 3;
5622
5623 let check = |range, expected_item_range, expected_level_range| {
5624 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5625 range,
5626 rep.as_ref(),
5627 def.as_ref(),
5628 max_rep,
5629 max_visible_def,
5630 total_items,
5631 PreambleAction::Absent,
5632 );
5633 assert_eq!(item_range, expected_item_range);
5634 assert_eq!(level_range, expected_level_range);
5635 };
5636
5637 check(0..1, 0..1, 0..1);
5638 check(1..2, 1..3, 1..3);
5639 check(2..3, 3..3, 3..4);
5640 check(0..2, 0..3, 0..3);
5641 check(1..3, 1..3, 1..4);
5642 check(0..3, 0..3, 0..4);
5643
5644 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5647 let def: Option<&[u16]> = None;
5648 let max_visible_def = 0;
5649 let total_items = 6;
5650
5651 let check = |range, expected_item_range, expected_level_range| {
5652 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5653 range,
5654 rep.as_ref(),
5655 def.as_ref(),
5656 max_rep,
5657 max_visible_def,
5658 total_items,
5659 PreambleAction::Absent,
5660 );
5661 assert_eq!(item_range, expected_item_range);
5662 assert_eq!(level_range, expected_level_range);
5663 };
5664
5665 check(0..1, 0..2, 0..2);
5666 check(1..2, 2..4, 2..4);
5667 check(2..3, 4..6, 4..6);
5668 check(0..2, 0..4, 0..4);
5669 check(1..3, 2..6, 2..6);
5670 check(0..3, 0..6, 0..6);
5671
5672 let rep: Option<&[u16]> = None;
5675 let def = Some(vec![0, 0, 1, 0]);
5676 let max_visible_def = 1;
5677 let total_items = 4;
5678
5679 let check = |range, expected_item_range, expected_level_range| {
5680 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5681 range,
5682 rep.as_ref(),
5683 def.as_ref(),
5684 max_rep,
5685 max_visible_def,
5686 total_items,
5687 PreambleAction::Absent,
5688 );
5689 assert_eq!(item_range, expected_item_range);
5690 assert_eq!(level_range, expected_level_range);
5691 };
5692
5693 check(0..1, 0..1, 0..1);
5694 check(1..2, 1..2, 1..2);
5695 check(2..3, 2..3, 2..3);
5696 check(0..2, 0..2, 0..2);
5697 check(1..3, 1..3, 1..3);
5698 check(0..3, 0..3, 0..3);
5699
5700 let rep = Some(vec![0, 1, 0, 1]);
5705 let def = Some(vec![0, 0, 0, 1]);
5706 let max_visible_def = 0;
5707 let total_items = 3;
5708
5709 let check = |range, expected_item_range, expected_level_range| {
5710 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5711 range,
5712 rep.as_ref(),
5713 def.as_ref(),
5714 max_rep,
5715 max_visible_def,
5716 total_items,
5717 PreambleAction::Take,
5718 );
5719 assert_eq!(item_range, expected_item_range);
5720 assert_eq!(level_range, expected_level_range);
5721 };
5722
5723 check(0..1, 0..3, 0..3);
5725 check(0..2, 0..3, 0..4);
5726
5727 let check = |range, expected_item_range, expected_level_range| {
5728 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5729 range,
5730 rep.as_ref(),
5731 def.as_ref(),
5732 max_rep,
5733 max_visible_def,
5734 total_items,
5735 PreambleAction::Skip,
5736 );
5737 assert_eq!(item_range, expected_item_range);
5738 assert_eq!(level_range, expected_level_range);
5739 };
5740
5741 check(0..1, 1..3, 1..3);
5742 check(1..2, 3..3, 3..4);
5743 check(0..2, 1..3, 1..4);
5744
5745 let rep = Some(vec![0, 1, 1, 0]);
5750 let def = Some(vec![0, 1, 0, 0]);
5751 let max_visible_def = 0;
5752 let total_items = 4;
5753
5754 let check = |range, expected_item_range, expected_level_range| {
5755 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5756 range,
5757 rep.as_ref(),
5758 def.as_ref(),
5759 max_rep,
5760 max_visible_def,
5761 total_items,
5762 PreambleAction::Take,
5763 );
5764 assert_eq!(item_range, expected_item_range);
5765 assert_eq!(level_range, expected_level_range);
5766 };
5767
5768 check(0..1, 0..1, 0..2);
5770 check(0..2, 0..3, 0..4);
5771
5772 let check = |range, expected_item_range, expected_level_range| {
5773 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5774 range,
5775 rep.as_ref(),
5776 def.as_ref(),
5777 max_rep,
5778 max_visible_def,
5779 total_items,
5780 PreambleAction::Skip,
5781 );
5782 assert_eq!(item_range, expected_item_range);
5783 assert_eq!(level_range, expected_level_range);
5784 };
5785
5786 check(0..1, 1..1, 1..2);
5788 check(1..2, 1..3, 2..4);
5789 check(0..2, 1..3, 1..4);
5790
5791 let rep = Some(vec![0, 1, 0, 1]);
5794 let def: Option<Vec<u16>> = None;
5795 let max_visible_def = 0;
5796 let total_items = 4;
5797
5798 let check = |range, expected_item_range, expected_level_range| {
5799 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5800 range,
5801 rep.as_ref(),
5802 def.as_ref(),
5803 max_rep,
5804 max_visible_def,
5805 total_items,
5806 PreambleAction::Take,
5807 );
5808 assert_eq!(item_range, expected_item_range);
5809 assert_eq!(level_range, expected_level_range);
5810 };
5811
5812 check(0..1, 0..3, 0..3);
5814 check(0..2, 0..4, 0..4);
5815
5816 let check = |range, expected_item_range, expected_level_range| {
5817 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5818 range,
5819 rep.as_ref(),
5820 def.as_ref(),
5821 max_rep,
5822 max_visible_def,
5823 total_items,
5824 PreambleAction::Skip,
5825 );
5826 assert_eq!(item_range, expected_item_range);
5827 assert_eq!(level_range, expected_level_range);
5828 };
5829
5830 check(0..1, 1..3, 1..3);
5831 check(1..2, 3..4, 3..4);
5832 check(0..2, 1..4, 1..4);
5833
5834 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
5838 let def = Some(vec![0, 1, 2, 0, 0, 0]);
5839 let max_rep = 2;
5840 let max_visible_def = 0;
5841 let total_items = 4;
5842
5843 let check = |range, expected_item_range, expected_level_range| {
5844 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5845 range,
5846 rep.as_ref(),
5847 def.as_ref(),
5848 max_rep,
5849 max_visible_def,
5850 total_items,
5851 PreambleAction::Absent,
5852 );
5853 assert_eq!(item_range, expected_item_range);
5854 assert_eq!(level_range, expected_level_range);
5855 };
5856
5857 check(0..3, 0..4, 0..6);
5858 check(0..1, 0..1, 0..2);
5859 check(1..2, 1..3, 2..5);
5860 check(2..3, 3..4, 5..6);
5861
5862 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5864 let def = Some(vec![0, 1, 0, 0, 0, 0]);
5865 let max_rep = 1;
5866 let max_visible_def = 0;
5867 let total_items = 5;
5868
5869 let check = |range, expected_item_range, expected_level_range| {
5870 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5871 range,
5872 rep.as_ref(),
5873 def.as_ref(),
5874 max_rep,
5875 max_visible_def,
5876 total_items,
5877 PreambleAction::Take,
5878 );
5879 assert_eq!(item_range, expected_item_range);
5880 assert_eq!(level_range, expected_level_range);
5881 };
5882
5883 check(0..0, 0..1, 0..2);
5884 check(0..1, 0..3, 0..4);
5885 check(0..2, 0..4, 0..5);
5886
5887 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5890 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5891 let max_rep = 1;
5892 let max_visible_def = 0;
5893 let total_items = 5;
5894
5895 let check = |range, expected_item_range, expected_level_range| {
5896 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5897 range,
5898 rep.as_ref(),
5899 def.as_ref(),
5900 max_rep,
5901 max_visible_def,
5902 total_items,
5903 PreambleAction::Skip,
5904 );
5905 assert_eq!(item_range, expected_item_range);
5906 assert_eq!(level_range, expected_level_range);
5907 };
5908
5909 check(2..3, 2..4, 5..7);
5910 }
5911
5912 #[test]
5913 fn test_slice_batch_data_and_rebase_offsets_u32() {
5914 let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
5915 let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
5916
5917 let (sliced_data, normalized_offsets) =
5918 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5919 .unwrap();
5920
5921 assert_eq!(sliced_data.as_ref(), b"6789ab");
5922 let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
5923 assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
5924 }
5925
5926 #[test]
5927 fn test_slice_batch_data_and_rebase_offsets_u64() {
5928 let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
5929 let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
5930
5931 let (sliced_data, normalized_offsets) =
5932 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
5933 .unwrap();
5934
5935 assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
5936 let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
5937 assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
5938 }
5939
5940 #[test]
5941 fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
5942 let data = LanceBuffer::copy_slice(b"abcd");
5943 let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
5944
5945 let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5946 .expect_err("offset end before start should error");
5947 assert!(err.to_string().contains("less than base"));
5948 }
5949
5950 #[test]
5951 fn test_schedule_instructions() {
5952 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5954 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5955 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5956
5957 let check = |user_ranges, expected_instructions| {
5958 let instructions =
5959 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5960 assert_eq!(instructions, expected_instructions);
5961 };
5962
5963 let expected_take_all = vec![
5965 ChunkInstructions {
5966 chunk_idx: 0,
5967 preamble: PreambleAction::Absent,
5968 rows_to_skip: 0,
5969 rows_to_take: 6,
5970 take_trailer: true,
5971 },
5972 ChunkInstructions {
5973 chunk_idx: 1,
5974 preamble: PreambleAction::Take,
5975 rows_to_skip: 0,
5976 rows_to_take: 2,
5977 take_trailer: false,
5978 },
5979 ChunkInstructions {
5980 chunk_idx: 2,
5981 preamble: PreambleAction::Absent,
5982 rows_to_skip: 0,
5983 rows_to_take: 5,
5984 take_trailer: true,
5985 },
5986 ChunkInstructions {
5987 chunk_idx: 3,
5988 preamble: PreambleAction::Take,
5989 rows_to_skip: 0,
5990 rows_to_take: 1,
5991 take_trailer: false,
5992 },
5993 ];
5994
5995 check(&[0..14], expected_take_all.clone());
5997
5998 check(
6000 &[
6001 0..1,
6002 1..2,
6003 2..3,
6004 3..4,
6005 4..5,
6006 5..6,
6007 6..7,
6008 7..8,
6009 8..9,
6010 9..10,
6011 10..11,
6012 11..12,
6013 12..13,
6014 13..14,
6015 ],
6016 expected_take_all,
6017 );
6018
6019 check(
6023 &[0..1, 3..4],
6024 vec![
6025 ChunkInstructions {
6026 chunk_idx: 0,
6027 preamble: PreambleAction::Absent,
6028 rows_to_skip: 0,
6029 rows_to_take: 1,
6030 take_trailer: false,
6031 },
6032 ChunkInstructions {
6033 chunk_idx: 0,
6034 preamble: PreambleAction::Absent,
6035 rows_to_skip: 3,
6036 rows_to_take: 1,
6037 take_trailer: false,
6038 },
6039 ],
6040 );
6041
6042 check(
6044 &[5..6],
6045 vec![
6046 ChunkInstructions {
6047 chunk_idx: 0,
6048 preamble: PreambleAction::Absent,
6049 rows_to_skip: 5,
6050 rows_to_take: 1,
6051 take_trailer: true,
6052 },
6053 ChunkInstructions {
6054 chunk_idx: 1,
6055 preamble: PreambleAction::Take,
6056 rows_to_skip: 0,
6057 rows_to_take: 0,
6058 take_trailer: false,
6059 },
6060 ],
6061 );
6062
6063 check(
6065 &[7..10],
6066 vec![
6067 ChunkInstructions {
6068 chunk_idx: 1,
6069 preamble: PreambleAction::Skip,
6070 rows_to_skip: 1,
6071 rows_to_take: 1,
6072 take_trailer: false,
6073 },
6074 ChunkInstructions {
6075 chunk_idx: 2,
6076 preamble: PreambleAction::Absent,
6077 rows_to_skip: 0,
6078 rows_to_take: 2,
6079 take_trailer: false,
6080 },
6081 ],
6082 );
6083 }
6084
6085 #[test]
6086 fn test_drain_instructions() {
6087 fn drain_from_instructions(
6088 instructions: &mut VecDeque<ChunkInstructions>,
6089 mut rows_desired: u64,
6090 need_preamble: &mut bool,
6091 skip_in_chunk: &mut u64,
6092 ) -> Vec<ChunkDrainInstructions> {
6093 let mut drain_instructions = Vec::with_capacity(instructions.len());
6095 while rows_desired > 0 || *need_preamble {
6096 let (next_instructions, consumed_chunk) = instructions
6097 .front()
6098 .unwrap()
6099 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
6100 if consumed_chunk {
6101 instructions.pop_front();
6102 }
6103 drain_instructions.push(next_instructions);
6104 }
6105 drain_instructions
6106 }
6107
6108 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
6110 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6111 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6112 let user_ranges = vec![1..7, 10..14];
6113
6114 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6116
6117 let mut to_drain = VecDeque::from(scheduled.clone());
6118
6119 let mut need_preamble = false;
6122 let mut skip_in_chunk = 0;
6123
6124 let next_batch =
6125 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6126
6127 assert!(!need_preamble);
6128 assert_eq!(skip_in_chunk, 4);
6129 assert_eq!(
6130 next_batch,
6131 vec![ChunkDrainInstructions {
6132 chunk_instructions: scheduled[0].clone(),
6133 rows_to_take: 4,
6134 rows_to_skip: 0,
6135 preamble_action: PreambleAction::Absent,
6136 }]
6137 );
6138
6139 let next_batch =
6140 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6141
6142 assert!(!need_preamble);
6143 assert_eq!(skip_in_chunk, 2);
6144
6145 assert_eq!(
6146 next_batch,
6147 vec![
6148 ChunkDrainInstructions {
6149 chunk_instructions: scheduled[0].clone(),
6150 rows_to_take: 1,
6151 rows_to_skip: 4,
6152 preamble_action: PreambleAction::Absent,
6153 },
6154 ChunkDrainInstructions {
6155 chunk_instructions: scheduled[1].clone(),
6156 rows_to_take: 1,
6157 rows_to_skip: 0,
6158 preamble_action: PreambleAction::Take,
6159 },
6160 ChunkDrainInstructions {
6161 chunk_instructions: scheduled[2].clone(),
6162 rows_to_take: 2,
6163 rows_to_skip: 0,
6164 preamble_action: PreambleAction::Absent,
6165 }
6166 ]
6167 );
6168
6169 let next_batch =
6170 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6171
6172 assert!(!need_preamble);
6173 assert_eq!(skip_in_chunk, 0);
6174
6175 assert_eq!(
6176 next_batch,
6177 vec![
6178 ChunkDrainInstructions {
6179 chunk_instructions: scheduled[2].clone(),
6180 rows_to_take: 1,
6181 rows_to_skip: 2,
6182 preamble_action: PreambleAction::Absent,
6183 },
6184 ChunkDrainInstructions {
6185 chunk_instructions: scheduled[3].clone(),
6186 rows_to_take: 1,
6187 rows_to_skip: 0,
6188 preamble_action: PreambleAction::Take,
6189 },
6190 ]
6191 );
6192
6193 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
6195 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6196 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6197 let user_ranges = vec![0..28];
6198
6199 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6201
6202 let mut to_drain = VecDeque::from(scheduled.clone());
6203
6204 let mut need_preamble = false;
6207 let mut skip_in_chunk = 0;
6208
6209 let next_batch =
6210 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6211
6212 assert_eq!(
6213 next_batch,
6214 vec![
6215 ChunkDrainInstructions {
6216 chunk_instructions: scheduled[0].clone(),
6217 rows_to_take: 6,
6218 rows_to_skip: 0,
6219 preamble_action: PreambleAction::Absent,
6220 },
6221 ChunkDrainInstructions {
6222 chunk_instructions: scheduled[1].clone(),
6223 rows_to_take: 1,
6224 rows_to_skip: 0,
6225 preamble_action: PreambleAction::Take,
6226 },
6227 ]
6228 );
6229
6230 assert!(!need_preamble);
6231 assert_eq!(skip_in_chunk, 1);
6232
6233 let next_batch =
6236 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6237
6238 assert_eq!(
6239 next_batch,
6240 vec![
6241 ChunkDrainInstructions {
6242 chunk_instructions: scheduled[1].clone(),
6243 rows_to_take: 2,
6244 rows_to_skip: 1,
6245 preamble_action: PreambleAction::Skip,
6246 },
6247 ChunkDrainInstructions {
6248 chunk_instructions: scheduled[2].clone(),
6249 rows_to_take: 0,
6250 rows_to_skip: 0,
6251 preamble_action: PreambleAction::Take,
6252 },
6253 ]
6254 );
6255
6256 assert!(!need_preamble);
6257 assert_eq!(skip_in_chunk, 0);
6258 }
6259
6260 #[tokio::test]
6261 async fn test_fullzip_initialize_is_lazy() {
6262 use futures::{FutureExt, future::BoxFuture};
6263 use std::ops::Range;
6264 use std::sync::Mutex;
6265
6266 #[derive(Debug, Clone)]
6267 struct RecordingScheduler {
6268 data: bytes::Bytes,
6269 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6270 }
6271
6272 impl RecordingScheduler {
6273 fn new(data: bytes::Bytes) -> Self {
6274 Self {
6275 data,
6276 requests: Arc::new(Mutex::new(Vec::new())),
6277 }
6278 }
6279
6280 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6281 self.requests.lock().unwrap().clone()
6282 }
6283 }
6284
6285 impl crate::EncodingsIo for RecordingScheduler {
6286 fn submit_request(
6287 &self,
6288 ranges: Vec<Range<u64>>,
6289 _priority: u64,
6290 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6291 self.requests.lock().unwrap().push(ranges.clone());
6292 let data = ranges
6293 .into_iter()
6294 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6295 .collect::<Vec<_>>();
6296 std::future::ready(Ok(data)).boxed()
6297 }
6298 }
6299
6300 #[derive(Debug)]
6301 struct TestFixedDecompressor;
6302
6303 impl FixedPerValueDecompressor for TestFixedDecompressor {
6304 fn decompress(
6305 &self,
6306 _data: FixedWidthDataBlock,
6307 _num_rows: u64,
6308 ) -> crate::Result<DataBlock> {
6309 unimplemented!("Test decompressor")
6310 }
6311
6312 fn bits_per_value(&self) -> u64 {
6313 32
6314 }
6315 }
6316
6317 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6318 0;
6319 16 * 1024
6320 ])));
6321 let mut scheduler = FullZipScheduler {
6322 data_buf_position: 0,
6323 data_buf_size: 4096,
6324 rep_index: Some(FullZipRepIndexDetails {
6325 buf_position: 1000,
6326 bytes_per_value: 4,
6327 }),
6328 priority: 0,
6329 rows_in_page: 100,
6330 bits_per_offset: 32,
6331 details: Arc::new(FullZipDecodeDetails {
6332 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6333 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6334 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6335 max_rep: 0,
6336 max_visible_def: 0,
6337 }),
6338 cached_state: None,
6339 enable_cache: false,
6340 };
6341
6342 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6343 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6344
6345 assert!(
6346 cached_data
6347 .as_arc_any()
6348 .downcast_ref::<super::NoCachedPageData>()
6349 .is_some(),
6350 "FullZip initialize should not eagerly load repetition index data"
6351 );
6352 assert!(scheduler.cached_state.is_none());
6353 assert!(
6354 io.requests().is_empty(),
6355 "FullZip initialize should not issue any I/O"
6356 );
6357 }
6358
6359 #[tokio::test]
6360 async fn test_fullzip_read_source_slices_prefetched_page() {
6361 let page_start = 200_u64;
6362 let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6363 let source = FullZipReadSource::PrefetchedPage {
6364 base_offset: page_start,
6365 data: page_data,
6366 };
6367 let ranges = vec![
6368 page_start..(page_start + 3),
6369 (page_start + 4)..(page_start + 8),
6370 ];
6371 let mut data = source.fetch(&ranges, 0).await.unwrap();
6372 assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6373 assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6374 }
6375
6376 #[tokio::test]
6377 async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6378 use futures::{FutureExt, future::BoxFuture};
6379 use std::ops::Range;
6380 use std::sync::Mutex;
6381
6382 #[derive(Debug, Clone)]
6383 struct RecordingScheduler {
6384 data: bytes::Bytes,
6385 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6386 }
6387
6388 impl RecordingScheduler {
6389 fn new(data: bytes::Bytes) -> Self {
6390 Self {
6391 data,
6392 requests: Arc::new(Mutex::new(Vec::new())),
6393 }
6394 }
6395
6396 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6397 self.requests.lock().unwrap().clone()
6398 }
6399 }
6400
6401 impl crate::EncodingsIo for RecordingScheduler {
6402 fn submit_request(
6403 &self,
6404 ranges: Vec<Range<u64>>,
6405 _priority: u64,
6406 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6407 self.requests.lock().unwrap().push(ranges.clone());
6408 let data = ranges
6409 .into_iter()
6410 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6411 .collect::<Vec<_>>();
6412 std::future::ready(Ok(data)).boxed()
6413 }
6414 }
6415
6416 #[derive(Debug)]
6417 struct TestFixedDecompressor;
6418
6419 impl FixedPerValueDecompressor for TestFixedDecompressor {
6420 fn decompress(
6421 &self,
6422 _data: FixedWidthDataBlock,
6423 _num_rows: u64,
6424 ) -> crate::Result<DataBlock> {
6425 unimplemented!("Test decompressor")
6426 }
6427
6428 fn bits_per_value(&self) -> u64 {
6429 32
6430 }
6431 }
6432
6433 let rows_in_page = 100_u64;
6434 let bytes_per_value = 4_u64;
6435 let rep_start = 1000_u64;
6436 let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6437 let mut data = vec![0_u8; 16 * 1024];
6438 data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6439 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6440
6441 let mut scheduler = FullZipScheduler {
6442 data_buf_position: 0,
6443 data_buf_size: 4096,
6444 rep_index: Some(FullZipRepIndexDetails {
6445 buf_position: rep_start,
6446 bytes_per_value,
6447 }),
6448 priority: 0,
6449 rows_in_page,
6450 bits_per_offset: 32,
6451 details: Arc::new(FullZipDecodeDetails {
6452 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6453 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6454 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6455 max_rep: 0,
6456 max_visible_def: 0,
6457 }),
6458 cached_state: None,
6459 enable_cache: true,
6460 };
6461
6462 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6463 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6464 assert!(
6465 cached_data
6466 .as_arc_any()
6467 .downcast_ref::<FullZipCacheableState>()
6468 .is_some()
6469 );
6470 assert!(scheduler.cached_state.is_some());
6471 assert_eq!(
6472 io.requests(),
6473 vec![vec![
6474 rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6475 ]]
6476 );
6477 }
6478
6479 #[tokio::test]
6480 async fn test_fullzip_full_page_bypasses_rep_index_io() {
6481 use futures::{FutureExt, future::BoxFuture};
6482 use std::ops::Range;
6483 use std::sync::Mutex;
6484
6485 #[derive(Debug, Clone)]
6486 struct RecordingScheduler {
6487 data: bytes::Bytes,
6488 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6489 }
6490
6491 impl RecordingScheduler {
6492 fn new(data: bytes::Bytes) -> Self {
6493 Self {
6494 data,
6495 requests: Arc::new(Mutex::new(Vec::new())),
6496 }
6497 }
6498
6499 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6500 self.requests.lock().unwrap().clone()
6501 }
6502 }
6503
6504 impl crate::EncodingsIo for RecordingScheduler {
6505 fn submit_request(
6506 &self,
6507 ranges: Vec<Range<u64>>,
6508 _priority: u64,
6509 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6510 self.requests.lock().unwrap().push(ranges.clone());
6511 let data = ranges
6512 .into_iter()
6513 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6514 .collect::<Vec<_>>();
6515 std::future::ready(Ok(data)).boxed()
6516 }
6517 }
6518
6519 #[derive(Debug)]
6520 struct TestFixedDecompressor;
6521
6522 impl FixedPerValueDecompressor for TestFixedDecompressor {
6523 fn decompress(
6524 &self,
6525 _data: FixedWidthDataBlock,
6526 _num_rows: u64,
6527 ) -> crate::Result<DataBlock> {
6528 unimplemented!("Test decompressor")
6529 }
6530
6531 fn bits_per_value(&self) -> u64 {
6532 32
6533 }
6534 }
6535
6536 let rows_in_page = 100_u64;
6537 let data_start = 256_u64;
6538 let data_size = 500_u64;
6539 let rep_start = 4096_u64;
6540 let bytes_per_value = 4_u64;
6541
6542 let mut bytes = vec![0_u8; 16 * 1024];
6543 for i in 0..=rows_in_page {
6544 let offset = (i * 5) as u32;
6545 let pos = rep_start as usize + (i * bytes_per_value) as usize;
6546 bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6547 }
6548 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6549
6550 let scheduler = FullZipScheduler {
6551 data_buf_position: data_start,
6552 data_buf_size: data_size,
6553 rep_index: Some(FullZipRepIndexDetails {
6554 buf_position: rep_start,
6555 bytes_per_value,
6556 }),
6557 priority: 0,
6558 rows_in_page,
6559 bits_per_offset: 32,
6560 details: Arc::new(FullZipDecodeDetails {
6561 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6562 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6563 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6564 max_rep: 0,
6565 max_visible_def: 0,
6566 }),
6567 cached_state: None,
6568 enable_cache: false,
6569 };
6570
6571 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6572 let tasks = scheduler
6573 .schedule_ranges_rep(
6574 &[0..rows_in_page],
6575 &io_dyn,
6576 FullZipRepIndexDetails {
6577 buf_position: rep_start,
6578 bytes_per_value,
6579 },
6580 )
6581 .unwrap();
6582
6583 let requests = io.requests();
6584 assert_eq!(requests.len(), 1);
6585 assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6586
6587 let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6588 let requests_after_await = io.requests();
6589 assert_eq!(
6590 requests_after_await.len(),
6591 1,
6592 "full page path should not issue rep-index I/O"
6593 );
6594 }
6595
6596 #[tokio::test]
6598 async fn test_fuzz_issue_4492_empty_rep_values() {
6599 use lance_datagen::{RowCount, Seed, array, gen_batch};
6600
6601 let seed = 1823859942947654717u64;
6602 let num_rows = 2741usize;
6603
6604 let batch_gen = gen_batch().with_seed(Seed::from(seed));
6606 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6607 let list_generator = array::rand_list_any(base_generator, false);
6608
6609 let batch = batch_gen
6610 .anon_col(list_generator)
6611 .into_batch_rows(RowCount::from(num_rows as u64))
6612 .unwrap();
6613
6614 let list_array = batch.column(0).clone();
6615
6616 let mut metadata = HashMap::new();
6618 metadata.insert(
6619 STRUCTURAL_ENCODING_META_KEY.to_string(),
6620 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6621 );
6622
6623 let test_cases = TestCases::default()
6624 .with_min_file_version(LanceFileVersion::V2_1)
6625 .with_batch_size(100)
6626 .with_range(0..num_rows.min(500) as u64)
6627 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6628
6629 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6630 }
6631
6632 async fn test_minichunk_size_helper(
6633 string_data: Vec<Option<String>>,
6634 minichunk_size: u64,
6635 file_version: LanceFileVersion,
6636 ) {
6637 use crate::constants::MINICHUNK_SIZE_META_KEY;
6638 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6639 use arrow_array::{ArrayRef, StringArray};
6640 use std::sync::Arc;
6641
6642 let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6643
6644 let mut metadata = HashMap::new();
6645 metadata.insert(
6646 MINICHUNK_SIZE_META_KEY.to_string(),
6647 minichunk_size.to_string(),
6648 );
6649 metadata.insert(
6650 STRUCTURAL_ENCODING_META_KEY.to_string(),
6651 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6652 );
6653
6654 let test_cases = TestCases::default()
6655 .with_min_file_version(file_version)
6656 .with_batch_size(1000);
6657
6658 check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6659 }
6660
6661 #[tokio::test]
6662 async fn test_minichunk_size_roundtrip() {
6663 let mut string_data = Vec::new();
6665 for i in 0..100 {
6666 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6667 }
6668 test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6670 }
6671
6672 #[tokio::test]
6673 async fn test_minichunk_size_128kb_v2_2() {
6674 let mut string_data = Vec::new();
6676 for i in 0..10000 {
6678 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6679 }
6680 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6681 }
6682
6683 #[tokio::test]
6684 async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6685 let mut string_data = Vec::new();
6686 for i in 0..10000 {
6688 string_data.push(Some(format!("t_{}", i)));
6689 }
6690 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6691 }
6692
6693 #[tokio::test]
6694 async fn test_large_dictionary_general_compression() {
6695 use arrow_array::{ArrayRef, StringArray};
6696 use std::collections::HashMap;
6697 use std::sync::Arc;
6698
6699 let unique_values: Vec<String> = (0..100)
6702 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
6703 .collect();
6704
6705 let repeated_strings: Vec<_> = unique_values
6707 .iter()
6708 .cycle()
6709 .take(100_000)
6710 .map(|s| Some(s.as_str()))
6711 .collect();
6712
6713 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
6714
6715 let test_cases = TestCases::default()
6717 .with_min_file_version(LanceFileVersion::V2_2)
6718 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
6719 assert_eq!(cols.len(), 1);
6720 let col = &cols[0];
6721
6722 if let Some(PageEncoding::Structural(page_layout)) =
6724 &col.final_pages.first().map(|p| &p.description)
6725 && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
6726 &page_layout.layout
6727 && let Some(dictionary_encoding) = &mini_block.dictionary
6728 {
6729 match dictionary_encoding.compression.as_ref() {
6730 Some(Compression::General(general)) => {
6731 let compression = general.compression.as_ref().unwrap();
6733 assert!(
6734 compression.scheme()
6735 == pb21::CompressionScheme::CompressionAlgorithmLz4
6736 || compression.scheme()
6737 == pb21::CompressionScheme::CompressionAlgorithmZstd,
6738 "Expected LZ4 or Zstd compression for large dictionary"
6739 );
6740 }
6741 _ => panic!("Expected General compression for large dictionary"),
6742 }
6743 }
6744 }));
6745
6746 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
6747 }
6748
6749 fn dictionary_encoding_from_page(
6750 page: &crate::encoder::EncodedPage,
6751 ) -> &crate::format::pb21::CompressiveEncoding {
6752 let PageEncoding::Structural(layout) = &page.description else {
6753 panic!("Expected structural page encoding");
6754 };
6755 let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
6756 else {
6757 panic!("Expected mini-block layout");
6758 };
6759 layout
6760 .dictionary
6761 .as_ref()
6762 .unwrap_or_else(|| panic!("Expected dictionary encoding"))
6763 }
6764
6765 async fn encode_variable_dict_page(
6766 metadata: HashMap<String, String>,
6767 ) -> crate::encoder::EncodedPage {
6768 use arrow_array::types::Int32Type;
6769 use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
6770
6771 let values = Arc::new(StringArray::from(
6772 (0..128)
6773 .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
6774 .collect::<Vec<_>>(),
6775 )) as ArrayRef;
6776 let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
6777 let dict_array =
6778 Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
6779
6780 let field = arrow_schema::Field::new(
6781 "dict_col",
6782 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
6783 false,
6784 )
6785 .with_metadata(metadata);
6786
6787 encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
6788 }
6789
6790 async fn encode_auto_fixed_dict_page(
6791 metadata: HashMap<String, String>,
6792 ) -> crate::encoder::EncodedPage {
6793 use arrow_array::{ArrayRef, Decimal128Array};
6794
6795 let values = (0..20_000)
6797 .map(|i| match i % 3 {
6798 0 => 10_i128,
6799 1 => 20_i128,
6800 _ => 30_i128,
6801 })
6802 .collect::<Vec<_>>();
6803 let decimal = Decimal128Array::from_iter_values(values)
6804 .with_precision_and_scale(38, 0)
6805 .unwrap();
6806 let decimal = Arc::new(decimal) as ArrayRef;
6807
6808 let mut field_metadata = metadata;
6809 field_metadata.insert(
6811 "lance-encoding:dict-size-ratio".to_string(),
6812 "0.99".to_string(),
6813 );
6814 let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
6815 .with_metadata(field_metadata);
6816
6817 encode_first_page(field, decimal, LanceFileVersion::V2_2).await
6818 }
6819
6820 #[tokio::test]
6821 async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
6822 let page = encode_variable_dict_page(HashMap::new()).await;
6823 let dictionary_encoding = dictionary_encoding_from_page(&page);
6824 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6825 panic!("Expected General compression for dictionary values");
6826 };
6827 let compression = general.compression.as_ref().unwrap();
6828 assert_eq!(
6829 compression.scheme(),
6830 pb21::CompressionScheme::CompressionAlgorithmLz4
6831 );
6832 }
6833
6834 #[tokio::test]
6835 async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
6836 let page = encode_auto_fixed_dict_page(HashMap::new()).await;
6837 let dictionary_encoding = dictionary_encoding_from_page(&page);
6838 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6839 panic!("Expected General compression for dictionary values");
6840 };
6841 let compression = general.compression.as_ref().unwrap();
6842 assert_eq!(
6843 compression.scheme(),
6844 pb21::CompressionScheme::CompressionAlgorithmLz4
6845 );
6846 }
6847
6848 #[tokio::test]
6849 async fn test_dict_values_general_compression_zstd() {
6850 let mut metadata = HashMap::new();
6851 metadata.insert(
6852 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6853 "zstd".to_string(),
6854 );
6855 let page = encode_variable_dict_page(metadata).await;
6856 let dictionary_encoding = dictionary_encoding_from_page(&page);
6857 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6858 panic!("Expected General compression for dictionary values");
6859 };
6860 let compression = general.compression.as_ref().unwrap();
6861 assert_eq!(
6862 compression.scheme(),
6863 pb21::CompressionScheme::CompressionAlgorithmZstd
6864 );
6865 }
6866
6867 #[tokio::test]
6868 async fn test_dict_values_general_compression_none() {
6869 let mut metadata = HashMap::new();
6870 metadata.insert(
6871 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6872 "none".to_string(),
6873 );
6874 let page = encode_variable_dict_page(metadata).await;
6875 let dictionary_encoding = dictionary_encoding_from_page(&page);
6876 assert!(
6877 !matches!(
6878 dictionary_encoding.compression.as_ref(),
6879 Some(Compression::General(_))
6880 ),
6881 "Expected dictionary values to avoid General compression"
6882 );
6883 }
6884
6885 #[test]
6886 fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
6887 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6888 &HashMap::new(),
6889 None,
6890 None,
6891 );
6892 assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
6893 assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
6894 }
6895
6896 #[test]
6897 fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
6898 let field_metadata = HashMap::from([
6899 (
6900 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6901 "none".to_string(),
6902 ),
6903 (
6904 DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
6905 "7".to_string(),
6906 ),
6907 ]);
6908 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6909 &field_metadata,
6910 Some("zstd".to_string()),
6911 Some("3".to_string()),
6912 );
6913 assert_eq!(
6914 metadata.get(COMPRESSION_META_KEY),
6915 Some(&"none".to_string()),
6916 );
6917 assert_eq!(
6918 metadata.get(COMPRESSION_LEVEL_META_KEY),
6919 Some(&"7".to_string()),
6920 );
6921 }
6922
6923 #[test]
6924 fn test_resolve_dict_values_compression_metadata_env_fallback() {
6925 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6926 &HashMap::new(),
6927 Some("zstd".to_string()),
6928 Some("9".to_string()),
6929 );
6930 assert_eq!(
6931 metadata.get(COMPRESSION_META_KEY),
6932 Some(&"zstd".to_string()),
6933 );
6934 assert_eq!(
6935 metadata.get(COMPRESSION_LEVEL_META_KEY),
6936 Some(&"9".to_string()),
6937 );
6938 }
6939
6940 #[tokio::test]
6941 async fn test_dictionary_encode_int64() {
6942 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6943 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6944 use crate::version::LanceFileVersion;
6945 use arrow_array::{ArrayRef, Int64Array};
6946 use std::collections::HashMap;
6947 use std::sync::Arc;
6948
6949 let values = (0..1000)
6951 .map(|i| match i % 3 {
6952 0 => 10i64,
6953 1 => 20i64,
6954 _ => 30i64,
6955 })
6956 .collect::<Vec<_>>();
6957 let array = Arc::new(Int64Array::from(values)) as ArrayRef;
6958
6959 let mut metadata = HashMap::new();
6960 metadata.insert(
6961 STRUCTURAL_ENCODING_META_KEY.to_string(),
6962 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6963 );
6964 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6965
6966 let test_cases = TestCases::default()
6967 .with_min_file_version(LanceFileVersion::V2_2)
6968 .with_batch_size(1000)
6969 .with_range(0..1000)
6970 .with_indices(vec![0, 1, 10, 999])
6971 .with_expected_encoding("dictionary");
6972
6973 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6974 }
6975
6976 #[tokio::test]
6977 async fn test_dictionary_encode_float64() {
6978 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6979 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6980 use crate::version::LanceFileVersion;
6981 use arrow_array::{ArrayRef, Float64Array};
6982 use std::collections::HashMap;
6983 use std::sync::Arc;
6984
6985 let values = (0..1000)
6987 .map(|i| match i % 3 {
6988 0 => 0.1f64,
6989 1 => 0.2f64,
6990 _ => 0.3f64,
6991 })
6992 .collect::<Vec<_>>();
6993 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
6994
6995 let mut metadata = HashMap::new();
6996 metadata.insert(
6997 STRUCTURAL_ENCODING_META_KEY.to_string(),
6998 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6999 );
7000 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
7001
7002 let test_cases = TestCases::default()
7003 .with_min_file_version(LanceFileVersion::V2_2)
7004 .with_batch_size(1000)
7005 .with_range(0..1000)
7006 .with_indices(vec![0, 1, 10, 999])
7007 .with_expected_encoding("dictionary");
7008
7009 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
7010 }
7011
7012 #[test]
7013 fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
7014 let rows = 10_000;
7015 let unique_values = 2_000;
7016
7017 let dictionary_encoding =
7018 ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
7019 let layout = pb21::MiniBlockLayout {
7020 rep_compression: None,
7021 def_compression: None,
7022 value_compression: Some(ProtobufUtils21::flat(64, None)),
7023 dictionary: Some(dictionary_encoding),
7024 num_dictionary_items: unique_values,
7025 layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
7026 num_buffers: 1,
7027 repetition_index_depth: 0,
7028 num_items: rows,
7029 has_large_chunk: false,
7030 };
7031
7032 let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
7033 let scheduler = super::MiniBlockScheduler::try_new(
7034 &buffer_offsets_and_sizes,
7035 0,
7036 rows,
7037 &layout,
7038 &DefaultDecompressionStrategy::default(),
7039 )
7040 .unwrap();
7041
7042 let dictionary = scheduler.dictionary.unwrap();
7043 assert_eq!(dictionary.num_dictionary_items, unique_values);
7044 assert_eq!(
7045 dictionary.dictionary_data_alignment,
7046 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
7047 );
7048 }
7049
7050 fn create_test_fixed_data_block(
7052 num_values: u64,
7053 cardinality: u64,
7054 bits_per_value: u64,
7055 ) -> DataBlock {
7056 assert!(cardinality > 0);
7057 assert!(cardinality <= num_values);
7058 let block_info = BlockInfo::default();
7059
7060 assert_eq!(bits_per_value % 8, 0);
7061 let data = match bits_per_value {
7062 32 => {
7063 let values = (0..num_values)
7064 .map(|i| (i % cardinality) as u32)
7065 .collect::<Vec<_>>();
7066 crate::buffer::LanceBuffer::reinterpret_vec(values)
7067 }
7068 64 => {
7069 let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
7070 crate::buffer::LanceBuffer::reinterpret_vec(values)
7071 }
7072 128 => {
7073 let values = (0..num_values)
7074 .map(|i| (i % cardinality) as u128)
7075 .collect::<Vec<_>>();
7076 crate::buffer::LanceBuffer::reinterpret_vec(values)
7077 }
7078 _ => unreachable!(),
7079 };
7080 DataBlock::FixedWidth(FixedWidthDataBlock {
7081 bits_per_value,
7082 data,
7083 num_values,
7084 block_info,
7085 })
7086 }
7087
7088 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
7090 use arrow_array::StringArray;
7091
7092 assert!(cardinality <= num_values && cardinality > 0);
7093
7094 let mut values = Vec::with_capacity(num_values as usize);
7095 for i in 0..num_values {
7096 values.push(format!("value_{:016}", i % cardinality));
7097 }
7098
7099 let array = StringArray::from(values);
7100 DataBlock::from_array(Arc::new(array) as ArrayRef)
7101 }
7102
7103 #[test]
7104 fn test_should_dictionary_encode() {
7105 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7106 use lance_core::datatypes::Field as LanceField;
7107
7108 let block = create_test_variable_width_block(1000, 10);
7110
7111 let mut metadata = HashMap::new();
7112 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7113 let arrow_field =
7114 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7115 let field = LanceField::try_from(&arrow_field).unwrap();
7116
7117 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7118 &block,
7119 &field,
7120 LanceFileVersion::V2_1,
7121 );
7122
7123 assert!(
7124 result.is_some(),
7125 "Should use dictionary encode based on size"
7126 );
7127 }
7128
7129 #[test]
7130 fn test_should_not_dictionary_encode_unsupported_bits() {
7131 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7132 use lance_core::datatypes::Field as LanceField;
7133
7134 let block = create_test_fixed_data_block(1000, 1000, 32);
7135
7136 let mut metadata = HashMap::new();
7137 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7138 let arrow_field =
7139 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
7140 let field = LanceField::try_from(&arrow_field).unwrap();
7141
7142 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7143 &block,
7144 &field,
7145 LanceFileVersion::V2_1,
7146 );
7147
7148 assert!(
7149 result.is_none(),
7150 "Should not use dictionary encode for unsupported bit width"
7151 );
7152 }
7153
7154 #[test]
7155 fn test_should_not_dictionary_encode_near_unique_sample() {
7156 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7157 use lance_core::datatypes::Field as LanceField;
7158
7159 let num_values = 5000;
7160 let block = create_test_variable_width_block(num_values, num_values);
7161
7162 let mut metadata = HashMap::new();
7163 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
7164 let arrow_field =
7165 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7166 let field = LanceField::try_from(&arrow_field).unwrap();
7167
7168 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7169 &block,
7170 &field,
7171 LanceFileVersion::V2_1,
7172 );
7173
7174 assert!(
7175 result.is_none(),
7176 "Should not probe dictionary encoding for near-unique data"
7177 );
7178 }
7179
7180 async fn encode_first_page(
7181 field: arrow_schema::Field,
7182 array: ArrayRef,
7183 version: LanceFileVersion,
7184 ) -> crate::encoder::EncodedPage {
7185 use crate::encoder::{
7186 ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
7187 default_encoding_strategy,
7188 };
7189 use crate::repdef::RepDefBuilder;
7190
7191 let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
7192 let encoding_strategy = default_encoding_strategy(version);
7193 let mut column_index_seq = ColumnIndexSequence::default();
7194 let encoding_options = EncodingOptions {
7195 cache_bytes_per_column: 1,
7196 max_page_bytes: 32 * 1024 * 1024,
7197 keep_original_array: true,
7198 buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
7199 version,
7200 };
7201
7202 let mut encoder = encoding_strategy
7203 .create_field_encoder(
7204 encoding_strategy.as_ref(),
7205 &lance_field,
7206 &mut column_index_seq,
7207 &encoding_options,
7208 )
7209 .unwrap();
7210
7211 let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7212 let repdef = RepDefBuilder::default();
7213 let num_rows = array.len() as u64;
7214 let mut pages = Vec::new();
7215 for task in encoder
7216 .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7217 .unwrap()
7218 {
7219 pages.push(task.await.unwrap());
7220 }
7221 for task in encoder.flush(&mut external_buffers).unwrap() {
7222 pages.push(task.await.unwrap());
7223 }
7224 pages.into_iter().next().unwrap()
7225 }
7226
7227 #[tokio::test]
7228 async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7229 use crate::format::pb21::page_layout::Layout;
7230
7231 let val = vec![0xABu8; 33];
7232 let arr: ArrayRef = Arc::new(
7233 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7234 std::iter::repeat_n(Some(val.as_slice()), 256),
7235 33,
7236 )
7237 .unwrap(),
7238 );
7239 let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7240 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7241
7242 let PageEncoding::Structural(layout) = &page.description else {
7243 panic!("Expected structural encoding");
7244 };
7245 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7246 panic!("Expected constant layout in slot 2");
7247 };
7248 assert!(layout.inline_value.is_none());
7249 assert_eq!(page.data.len(), 1);
7250
7251 let test_cases = TestCases::default()
7252 .with_min_file_version(LanceFileVersion::V2_2)
7253 .with_max_file_version(LanceFileVersion::V2_2)
7254 .with_page_sizes(vec![4096]);
7255 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7256 }
7257
7258 #[tokio::test]
7259 async fn test_constant_layout_out_of_line_utf8_v2_2() {
7260 use crate::format::pb21::page_layout::Layout;
7261
7262 let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7263 std::iter::repeat_n("hello", 512),
7264 ));
7265 let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7266 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7267
7268 let PageEncoding::Structural(layout) = &page.description else {
7269 panic!("Expected structural encoding");
7270 };
7271 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7272 panic!("Expected constant layout in slot 2");
7273 };
7274 assert!(layout.inline_value.is_none());
7275 assert_eq!(page.data.len(), 1);
7276
7277 let test_cases = TestCases::default()
7278 .with_min_file_version(LanceFileVersion::V2_2)
7279 .with_max_file_version(LanceFileVersion::V2_2)
7280 .with_page_sizes(vec![4096]);
7281 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7282 }
7283
7284 #[tokio::test]
7285 async fn test_constant_layout_nullable_item_v2_2() {
7286 use crate::format::pb21::page_layout::Layout;
7287
7288 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7289 Some(7),
7290 None,
7291 Some(7),
7292 None,
7293 Some(7),
7294 ]));
7295 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7296 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7297
7298 let PageEncoding::Structural(layout) = &page.description else {
7299 panic!("Expected structural encoding");
7300 };
7301 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7302 panic!("Expected constant layout in slot 2");
7303 };
7304 assert!(layout.inline_value.is_some());
7305 assert_eq!(page.data.len(), 2);
7306
7307 let test_cases = TestCases::default()
7308 .with_min_file_version(LanceFileVersion::V2_2)
7309 .with_max_file_version(LanceFileVersion::V2_2)
7310 .with_page_sizes(vec![4096]);
7311 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7312 }
7313
7314 #[tokio::test]
7315 async fn test_constant_layout_list_repdef_v2_2() {
7316 use crate::format::pb21::page_layout::Layout;
7317 use arrow_array::builder::{Int32Builder, ListBuilder};
7318
7319 let mut builder = ListBuilder::new(Int32Builder::new());
7320 builder.values().append_value(7);
7321 builder.values().append_null();
7322 builder.values().append_value(7);
7323 builder.append(true);
7324
7325 builder.append(true);
7326
7327 builder.values().append_value(7);
7328 builder.append(true);
7329
7330 builder.append_null();
7331
7332 let arr: ArrayRef = Arc::new(builder.finish());
7333 let field = arrow_schema::Field::new(
7334 "c",
7335 DataType::List(Arc::new(arrow_schema::Field::new(
7336 "item",
7337 DataType::Int32,
7338 true,
7339 ))),
7340 true,
7341 );
7342 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7343
7344 let PageEncoding::Structural(layout) = &page.description else {
7345 panic!("Expected structural encoding");
7346 };
7347 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7348 panic!("Expected constant layout in slot 2");
7349 };
7350 assert!(layout.inline_value.is_some());
7351 assert_eq!(page.data.len(), 2);
7352
7353 let test_cases = TestCases::default()
7354 .with_min_file_version(LanceFileVersion::V2_2)
7355 .with_max_file_version(LanceFileVersion::V2_2)
7356 .with_page_sizes(vec![4096]);
7357 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7358 }
7359
7360 #[tokio::test]
7361 async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7362 use crate::format::pb21::page_layout::Layout;
7363 use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7364
7365 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7366 for _ in 0..64 {
7367 builder.values().append_value(1);
7368 builder.values().append_null();
7369 builder.values().append_value(3);
7370 builder.append(true);
7371 }
7372 let arr: ArrayRef = Arc::new(builder.finish());
7373 let field = arrow_schema::Field::new(
7374 "c",
7375 DataType::FixedSizeList(
7376 Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7377 3,
7378 ),
7379 true,
7380 );
7381 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7382
7383 if let PageEncoding::Structural(layout) = &page.description {
7384 assert!(
7385 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7386 "FixedSizeList should not use constant layout yet"
7387 );
7388 }
7389
7390 let test_cases = TestCases::default()
7391 .with_min_file_version(LanceFileVersion::V2_2)
7392 .with_max_file_version(LanceFileVersion::V2_2)
7393 .with_page_sizes(vec![4096]);
7394 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7395 }
7396
7397 #[tokio::test]
7398 async fn test_constant_layout_not_written_before_v2_2() {
7399 use crate::format::pb21::page_layout::Layout;
7400
7401 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7402 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7403 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7404
7405 let PageEncoding::Structural(layout) = &page.description else {
7406 return;
7407 };
7408 assert!(
7409 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7410 "Should not emit constant layout before v2.2"
7411 );
7412
7413 let test_cases = TestCases::default()
7414 .with_min_file_version(LanceFileVersion::V2_1)
7415 .with_max_file_version(LanceFileVersion::V2_1)
7416 .with_page_sizes(vec![4096]);
7417 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7418 }
7419
7420 #[tokio::test]
7421 async fn test_all_null_constant_layout_still_works_v2_2() {
7422 use crate::format::pb21::page_layout::Layout;
7423
7424 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7425 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7426 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7427
7428 let PageEncoding::Structural(layout) = &page.description else {
7429 panic!("Expected structural encoding");
7430 };
7431 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7432 panic!("Expected layout in slot 2");
7433 };
7434 assert!(layout.inline_value.is_none());
7435 assert_eq!(page.data.len(), 0);
7436
7437 let test_cases = TestCases::default()
7438 .with_min_file_version(LanceFileVersion::V2_2)
7439 .with_max_file_version(LanceFileVersion::V2_2)
7440 .with_page_sizes(vec![4096]);
7441 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7442 }
7443
7444 #[test]
7445 fn test_encode_decode_complex_all_null_vals_roundtrip() {
7446 use crate::compression::{
7447 DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7448 };
7449
7450 let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7451
7452 let compression_strategy = DefaultCompressionStrategy::default();
7453 let decompression_strategy = DefaultDecompressionStrategy::default();
7454
7455 let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7456 &values,
7457 &compression_strategy,
7458 )
7459 .unwrap();
7460
7461 let decompressor = decompression_strategy
7462 .create_block_decompressor(&encoding)
7463 .unwrap();
7464 let decompressed = decompressor
7465 .decompress(compressed_buf, values.len() as u64)
7466 .unwrap();
7467 let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7468 assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7469 assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7470 let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7471 assert_eq!(rep_result.as_ref(), values.as_ref());
7472 }
7473
7474 #[tokio::test]
7475 async fn test_complex_all_null_compression_gated_by_version() {
7476 use crate::format::pb21::page_layout::Layout;
7477 use arrow_array::ListArray;
7478
7479 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7480 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7481 );
7482 let arr: ArrayRef = Arc::new(list_array);
7483 let field = arrow_schema::Field::new(
7484 "c",
7485 DataType::List(Arc::new(arrow_schema::Field::new(
7486 "item",
7487 DataType::Int32,
7488 true,
7489 ))),
7490 true,
7491 );
7492
7493 let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7494 let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7495 panic!("Expected structural encoding");
7496 };
7497 let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7498 panic!("Expected constant layout");
7499 };
7500 assert!(layout_v21.rep_compression.is_none());
7501 assert!(layout_v21.def_compression.is_none());
7502 assert_eq!(layout_v21.num_rep_values, 0);
7503 assert_eq!(layout_v21.num_def_values, 0);
7504
7505 let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7506 let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7507 panic!("Expected structural encoding");
7508 };
7509 let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7510 panic!("Expected constant layout");
7511 };
7512 assert!(layout_v22.def_compression.is_some());
7513 assert!(layout_v22.num_def_values > 0);
7514 }
7515
7516 #[tokio::test]
7517 async fn test_complex_all_null_round_trip() {
7518 use arrow_array::ListArray;
7519
7520 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7521 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7522 );
7523
7524 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7525 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7526 .await;
7527 }
7528
7529 #[tokio::test]
7531 async fn test_sparse_boolean_list_roundtrip() {
7532 use arrow_array::builder::{BooleanBuilder, ListBuilder};
7533
7534 let mut list_builder = ListBuilder::new(BooleanBuilder::new());
7535 for i in 0..1000i32 {
7536 if i % 64 == 0 {
7537 list_builder.values().append_value(i % 128 == 0);
7539 list_builder.append(true);
7540 } else {
7541 list_builder.append(false);
7542 }
7543 }
7544 let list_array = Arc::new(list_builder.finish());
7545
7546 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
7547 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
7548 }
7549}