1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 ProtobufUtils21,
23 pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24 },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35 cache::{CacheKey, Context, DeepSizeOf},
36 error::{Error, LanceOptionExt},
37 utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::{
42 compression::{
43 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44 },
45 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46 utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50 encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56 encodings::logical::primitive::miniblock::MiniBlockCompressed,
57 statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60 repdef::{
61 CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
62 RepDefSlicer, build_control_word_iterator,
63 },
64 utils::accumulation::AccumulationQueue,
65};
66use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
67
68use crate::constants::{
69 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
70 DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
71 DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
72 DICT_VALUES_COMPRESSION_META_KEY,
73};
74use crate::version::LanceFileVersion;
75use crate::{
76 EncodingsIo,
77 buffer::LanceBuffer,
78 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
79 decoder::{
80 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
81 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
82 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
83 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
84 },
85 encoder::{
86 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
87 },
88 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
89};
90
91pub mod blob;
92pub mod constant;
93pub mod dict;
94pub mod fullzip;
95pub mod miniblock;
96
97const FILL_BYTE: u8 = 0xFE;
98const DEFAULT_DICT_DIVISOR: u64 = 2;
99const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
100const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
101const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
102
103struct PageLoadTask {
104 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
105 num_rows: u64,
106}
107
108trait StructuralPageScheduler: std::fmt::Debug + Send {
111 fn initialize<'a>(
113 &'a mut self,
114 io: &Arc<dyn EncodingsIo>,
115 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
116 fn load(&mut self, data: &Arc<dyn CachedPageData>);
118 fn schedule_ranges(
127 &self,
128 ranges: &[Range<u64>],
129 io: &Arc<dyn EncodingsIo>,
130 ) -> Result<Vec<PageLoadTask>>;
131}
132
133#[derive(Debug)]
135struct ChunkMeta {
136 num_values: u64,
137 chunk_size_bytes: u64,
138 offset_bytes: u64,
139}
140
141#[derive(Debug, Clone)]
143struct DecodedMiniBlockChunk {
144 rep: Option<ScalarBuffer<u16>>,
145 def: Option<ScalarBuffer<u16>>,
146 values: DataBlock,
147}
148
149#[derive(Debug)]
157struct DecodeMiniBlockTask {
158 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
159 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
160 value_decompressor: Arc<dyn MiniBlockDecompressor>,
161 dictionary_data: Option<Arc<DataBlock>>,
162 def_meaning: Arc<[DefinitionInterpretation]>,
163 num_buffers: u64,
164 max_visible_level: u16,
165 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
166 has_large_chunk: bool,
167}
168
169impl DecodeMiniBlockTask {
170 fn decode_levels(
171 rep_decompressor: &dyn BlockDecompressor,
172 levels: LanceBuffer,
173 num_levels: u16,
174 ) -> Result<ScalarBuffer<u16>> {
175 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
176 let rep = rep.as_fixed_width().unwrap();
177 debug_assert_eq!(rep.num_values, num_levels as u64);
178 debug_assert_eq!(rep.bits_per_value, 16);
179 Ok(rep.data.borrow_to_typed_slice::<u16>())
180 }
181
182 fn extend_levels(
189 range: Range<u64>,
190 levels: &mut Option<LevelBuffer>,
191 level_buf: &Option<impl AsRef<[u16]>>,
192 dest_offset: usize,
193 ) {
194 if let Some(level_buf) = level_buf {
195 if levels.is_none() {
196 let mut new_levels_vec =
199 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
200 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
201 *levels = Some(new_levels_vec);
202 }
203 levels.as_mut().unwrap().extend(
204 level_buf.as_ref()[range.start as usize..range.end as usize]
205 .iter()
206 .copied(),
207 );
208 } else if let Some(levels) = levels {
209 let num_values = (range.end - range.start) as usize;
210 levels.extend(iter::repeat_n(0, num_values));
213 }
214 }
215
216 fn map_range(
253 range: Range<u64>,
254 rep: Option<&impl AsRef<[u16]>>,
255 def: Option<&impl AsRef<[u16]>>,
256 max_rep: u16,
257 max_visible_def: u16,
258 total_items: u64,
261 preamble_action: PreambleAction,
262 ) -> (Range<u64>, Range<u64>) {
263 if let Some(rep) = rep {
264 let mut rep = rep.as_ref();
265 let mut items_in_preamble = 0_u64;
268 let first_row_start = match preamble_action {
269 PreambleAction::Skip | PreambleAction::Take => {
270 let first_row_start = if let Some(def) = def.as_ref() {
271 let mut first_row_start = None;
272 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
273 if *rep == max_rep {
274 first_row_start = Some(idx as u64);
275 break;
276 }
277 if *def <= max_visible_def {
278 items_in_preamble += 1;
279 }
280 }
281 first_row_start
282 } else {
283 let first_row_start =
284 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
285 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
286 first_row_start
287 };
288 if first_row_start.is_none() {
291 assert!(preamble_action == PreambleAction::Take);
292 return (0..total_items, 0..rep.len() as u64);
293 }
294 let first_row_start = first_row_start.unwrap();
295 rep = &rep[first_row_start as usize..];
296 first_row_start
297 }
298 PreambleAction::Absent => {
299 debug_assert!(rep[0] == max_rep);
300 0
301 }
302 };
303
304 if range.start == range.end {
306 debug_assert!(preamble_action == PreambleAction::Take);
307 debug_assert!(items_in_preamble <= total_items);
308 return (0..items_in_preamble, 0..first_row_start);
309 }
310 assert!(range.start < range.end);
311
312 let mut rows_seen = 0;
313 let mut new_start = 0;
314 let mut new_levels_start = 0;
315
316 if let Some(def) = def {
317 let def = &def.as_ref()[first_row_start as usize..];
318
319 let mut lead_invis_seen = 0;
321
322 if range.start > 0 {
323 if def[0] > max_visible_def {
324 lead_invis_seen += 1;
325 }
326 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
327 if *rep == max_rep {
328 rows_seen += 1;
329 if rows_seen == range.start {
330 new_start = idx as u64 + 1 - lead_invis_seen;
331 new_levels_start = idx as u64 + 1;
332 break;
333 }
334 }
335 if *def > max_visible_def {
336 lead_invis_seen += 1;
337 }
338 }
339 }
340
341 rows_seen += 1;
342
343 let mut new_end = u64::MAX;
344 let mut new_levels_end = rep.len() as u64;
345 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
346 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
347 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
348 .iter()
349 .zip(&def[(new_levels_start + 1) as usize..])
350 .enumerate()
351 {
352 if *rep == max_rep {
353 rows_seen += 1;
354 if rows_seen == range.end + 1 {
355 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
356 new_levels_end = idx as u64 + new_levels_start + 1;
357 break;
358 }
359 }
360 if *def > max_visible_def {
361 tail_invis_seen += 1;
362 }
363 }
364
365 if new_end == u64::MAX {
366 new_levels_end = rep.len() as u64;
367 let total_invis_seen = lead_invis_seen + tail_invis_seen;
368 new_end = rep.len() as u64 - total_invis_seen;
369 }
370
371 assert_ne!(new_end, u64::MAX);
372
373 if preamble_action == PreambleAction::Skip {
375 new_start += items_in_preamble;
376 new_end += items_in_preamble;
377 new_levels_start += first_row_start;
378 new_levels_end += first_row_start;
379 } else if preamble_action == PreambleAction::Take {
380 debug_assert_eq!(new_start, 0);
381 debug_assert_eq!(new_levels_start, 0);
382 new_end += items_in_preamble;
383 new_levels_end += first_row_start;
384 }
385
386 debug_assert!(new_end <= total_items);
387 (new_start..new_end, new_levels_start..new_levels_end)
388 } else {
389 if range.start > 0 {
395 for (idx, rep) in rep.iter().skip(1).enumerate() {
396 if *rep == max_rep {
397 rows_seen += 1;
398 if rows_seen == range.start {
399 new_start = idx as u64 + 1;
400 break;
401 }
402 }
403 }
404 }
405 let mut new_end = rep.len() as u64;
406 if range.end < total_items {
408 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
409 if *rep == max_rep {
410 rows_seen += 1;
411 if rows_seen == range.end {
412 new_end = idx as u64 + new_start + 1;
413 break;
414 }
415 }
416 }
417 }
418
419 if preamble_action == PreambleAction::Skip {
421 new_start += first_row_start;
422 new_end += first_row_start;
423 } else if preamble_action == PreambleAction::Take {
424 debug_assert_eq!(new_start, 0);
425 new_end += first_row_start;
426 }
427
428 debug_assert!(new_end <= total_items);
429 (new_start..new_end, new_start..new_end)
430 }
431 } else {
432 (range.clone(), range)
435 }
436 }
437
438 fn read_buffer_sizes<const LARGE: bool>(
440 buf: &[u8],
441 offset: &mut usize,
442 num_buffers: u64,
443 ) -> Vec<u32> {
444 let read_size = if LARGE { 4 } else { 2 };
445 (0..num_buffers)
446 .map(|_| {
447 let bytes = &buf[*offset..*offset + read_size];
448 let size = if LARGE {
449 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
450 } else {
451 u16::from_le_bytes([bytes[0], bytes[1]]) as u32
453 };
454 *offset += read_size;
455 size
456 })
457 .collect()
458 }
459
460 fn decode_miniblock_chunk(
462 &self,
463 buf: &LanceBuffer,
464 items_in_chunk: u64,
465 ) -> Result<DecodedMiniBlockChunk> {
466 let mut offset = 0;
467 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
468 offset += 2;
469
470 let rep_size = if self.rep_decompressor.is_some() {
471 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
472 offset += 2;
473 Some(rep_size)
474 } else {
475 None
476 };
477 let def_size = if self.def_decompressor.is_some() {
478 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
479 offset += 2;
480 Some(def_size)
481 } else {
482 None
483 };
484
485 let buffer_sizes = if self.has_large_chunk {
486 Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
487 } else {
488 Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
489 };
490
491 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
492
493 let rep = rep_size.map(|rep_size| {
494 let rep = buf.slice_with_length(offset, rep_size as usize);
495 offset += rep_size as usize;
496 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
497 rep
498 });
499
500 let def = def_size.map(|def_size| {
501 let def = buf.slice_with_length(offset, def_size as usize);
502 offset += def_size as usize;
503 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504 def
505 });
506
507 let buffers = buffer_sizes
508 .into_iter()
509 .map(|buf_size| {
510 let buf = buf.slice_with_length(offset, buf_size as usize);
511 offset += buf_size as usize;
512 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
513 buf
514 })
515 .collect::<Vec<_>>();
516
517 let values = self
518 .value_decompressor
519 .decompress(buffers, items_in_chunk)?;
520
521 let rep = rep
522 .map(|rep| {
523 Self::decode_levels(
524 self.rep_decompressor.as_ref().unwrap().as_ref(),
525 rep,
526 num_levels,
527 )
528 })
529 .transpose()?;
530 let def = def
531 .map(|def| {
532 Self::decode_levels(
533 self.def_decompressor.as_ref().unwrap().as_ref(),
534 def,
535 num_levels,
536 )
537 })
538 .transpose()?;
539
540 Ok(DecodedMiniBlockChunk { rep, def, values })
541 }
542}
543
544impl DecodePageTask for DecodeMiniBlockTask {
545 fn decode(self: Box<Self>) -> Result<DecodedPage> {
546 let mut repbuf: Option<LevelBuffer> = None;
548 let mut defbuf: Option<LevelBuffer> = None;
549
550 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
551
552 let estimated_size_bytes = self
554 .instructions
555 .iter()
556 .map(|(_, chunk)| chunk.data.len())
557 .sum::<usize>()
558 * 2;
559 let mut data_builder =
560 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
561
562 let mut level_offset = 0;
564
565 let needs_caching: Vec<bool> = self
567 .instructions
568 .windows(2)
569 .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
570 .chain(std::iter::once(false)) .collect();
572
573 let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
575
576 for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
578 let should_cache_this_chunk = needs_caching[idx];
579
580 let decoded_chunk = match &chunk_cache {
581 Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
582 cached_chunk.clone()
584 }
585 _ => {
586 let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
588
589 if should_cache_this_chunk {
591 chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
592 }
593 decoded
594 }
595 };
596
597 let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
598
599 let row_range_start =
601 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
602 let row_range_end = row_range_start + instructions.rows_to_take;
603
604 let (item_range, level_range) = Self::map_range(
606 row_range_start..row_range_end,
607 rep.as_ref(),
608 def.as_ref(),
609 max_rep,
610 self.max_visible_level,
611 chunk.items_in_chunk,
612 instructions.preamble_action,
613 );
614 if item_range.end - item_range.start > chunk.items_in_chunk {
615 return Err(lance_core::Error::internal(format!(
616 "Item range {:?} is greater than chunk items in chunk {:?}",
617 item_range, chunk.items_in_chunk
618 )));
619 }
620
621 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
623 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
624 level_offset += (level_range.end - level_range.start) as usize;
625 data_builder.append(&values, item_range);
626 }
627
628 let mut data = data_builder.finish();
629
630 let unraveler =
631 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
632
633 if let Some(dictionary) = &self.dictionary_data {
634 let DataBlock::FixedWidth(indices) = data else {
636 return Err(lance_core::Error::internal(format!(
637 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
638 data
639 )));
640 };
641 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
642 indices,
643 dictionary.as_ref().clone(),
644 ));
645 }
646
647 Ok(DecodedPage {
648 data,
649 repdef: unraveler,
650 })
651 }
652}
653
654#[derive(Debug)]
657struct LoadedChunk {
658 data: LanceBuffer,
659 items_in_chunk: u64,
660 byte_range: Range<u64>,
661 chunk_idx: usize,
662}
663
664impl Clone for LoadedChunk {
665 fn clone(&self) -> Self {
666 Self {
667 data: self.data.clone(),
669 items_in_chunk: self.items_in_chunk,
670 byte_range: self.byte_range.clone(),
671 chunk_idx: self.chunk_idx,
672 }
673 }
674}
675
676#[derive(Debug)]
679struct MiniBlockDecoder {
680 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
681 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
682 value_decompressor: Arc<dyn MiniBlockDecompressor>,
683 def_meaning: Arc<[DefinitionInterpretation]>,
684 loaded_chunks: VecDeque<LoadedChunk>,
685 instructions: VecDeque<ChunkInstructions>,
686 offset_in_current_chunk: u64,
687 num_rows: u64,
688 num_buffers: u64,
689 dictionary: Option<Arc<DataBlock>>,
690 has_large_chunk: bool,
691}
692
693impl StructuralPageDecoder for MiniBlockDecoder {
696 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
697 let mut items_desired = num_rows;
698 let mut need_preamble = false;
699 let mut skip_in_chunk = self.offset_in_current_chunk;
700 let mut drain_instructions = Vec::new();
701 while items_desired > 0 || need_preamble {
702 let (instructions, consumed) = self
703 .instructions
704 .front()
705 .unwrap()
706 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
707
708 while self.loaded_chunks.front().unwrap().chunk_idx
709 != instructions.chunk_instructions.chunk_idx
710 {
711 self.loaded_chunks.pop_front();
712 }
713 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
714 if consumed {
715 self.instructions.pop_front();
716 }
717 }
718 self.offset_in_current_chunk = skip_in_chunk;
721
722 let max_visible_level = self
723 .def_meaning
724 .iter()
725 .take_while(|l| !l.is_list())
726 .map(|l| l.num_def_levels())
727 .sum::<u16>();
728
729 Ok(Box::new(DecodeMiniBlockTask {
730 instructions: drain_instructions,
731 def_decompressor: self.def_decompressor.clone(),
732 rep_decompressor: self.rep_decompressor.clone(),
733 value_decompressor: self.value_decompressor.clone(),
734 dictionary_data: self.dictionary.clone(),
735 def_meaning: self.def_meaning.clone(),
736 num_buffers: self.num_buffers,
737 max_visible_level,
738 has_large_chunk: self.has_large_chunk,
739 }))
740 }
741
742 fn num_rows(&self) -> u64 {
743 self.num_rows
744 }
745}
746
747#[derive(Debug)]
748struct CachedComplexAllNullState {
749 rep: Option<ScalarBuffer<u16>>,
750 def: Option<ScalarBuffer<u16>>,
751}
752
753impl DeepSizeOf for CachedComplexAllNullState {
754 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
755 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
757 }
758}
759
760impl CachedPageData for CachedComplexAllNullState {
761 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
762 self
763 }
764}
765
766#[derive(Debug)]
775pub struct ComplexAllNullScheduler {
776 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
778 def_meaning: Arc<[DefinitionInterpretation]>,
779 repdef: Option<Arc<CachedComplexAllNullState>>,
780 max_visible_level: u16,
781 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
782 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
783 num_rep_values: u64,
784 num_def_values: u64,
785}
786
787impl ComplexAllNullScheduler {
788 pub fn new(
789 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
790 def_meaning: Arc<[DefinitionInterpretation]>,
791 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
792 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
793 num_rep_values: u64,
794 num_def_values: u64,
795 ) -> Self {
796 let max_visible_level = def_meaning
797 .iter()
798 .take_while(|l| !l.is_list())
799 .map(|l| l.num_def_levels())
800 .sum::<u16>();
801 Self {
802 buffer_offsets_and_sizes,
803 def_meaning,
804 repdef: None,
805 max_visible_level,
806 rep_decompressor,
807 def_decompressor,
808 num_rep_values,
809 num_def_values,
810 }
811 }
812}
813
814impl StructuralPageScheduler for ComplexAllNullScheduler {
815 fn initialize<'a>(
816 &'a mut self,
817 io: &Arc<dyn EncodingsIo>,
818 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
819 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
821 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
822 let has_rep = rep_size > 0;
823 let has_def = def_size > 0;
824
825 let mut reads = Vec::with_capacity(2);
826 if has_rep {
827 reads.push(rep_pos..rep_pos + rep_size);
828 }
829 if has_def {
830 reads.push(def_pos..def_pos + def_size);
831 }
832
833 let data = io.submit_request(reads, 0);
834 let rep_decompressor = self.rep_decompressor.clone();
835 let def_decompressor = self.def_decompressor.clone();
836 let num_rep_values = self.num_rep_values;
837 let num_def_values = self.num_def_values;
838
839 async move {
840 let data = data.await?;
841 let mut data_iter = data.into_iter();
842
843 let decompress_levels = |compressed_bytes: Bytes,
844 decompressor: &Arc<dyn BlockDecompressor>,
845 num_values: u64,
846 level_type: &str|
847 -> Result<ScalarBuffer<u16>> {
848 let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
849 let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
850 match decompressed {
851 DataBlock::FixedWidth(block) => {
852 if block.num_values != num_values {
853 return Err(Error::invalid_input_source(format!(
854 "Unexpected {} level count after decompression: expected {}, got {}",
855 level_type, num_values, block.num_values
856 )
857 .into()));
858 }
859 if block.bits_per_value != 16 {
860 return Err(Error::invalid_input_source(format!(
861 "Unexpected {} level bit width after decompression: expected 16, got {}",
862 level_type, block.bits_per_value
863 )
864 .into()));
865 }
866 Ok(block.data.borrow_to_typed_slice::<u16>())
867 }
868 _ => Err(Error::invalid_input_source(format!(
869 "Expected fixed-width data block for {} levels",
870 level_type
871 )
872 .into())),
873 }
874 };
875
876 let rep = if has_rep {
877 let rep = data_iter.next().unwrap();
878 if let Some(rep_decompressor) = rep_decompressor.as_ref() {
879 Some(decompress_levels(
880 rep,
881 rep_decompressor,
882 num_rep_values,
883 "repetition",
884 )?)
885 } else {
886 let rep = LanceBuffer::from_bytes(rep, 2);
887 let rep = rep.borrow_to_typed_slice::<u16>();
888 Some(rep)
889 }
890 } else {
891 None
892 };
893
894 let def = if has_def {
895 let def = data_iter.next().unwrap();
896 if let Some(def_decompressor) = def_decompressor.as_ref() {
897 Some(decompress_levels(
898 def,
899 def_decompressor,
900 num_def_values,
901 "definition",
902 )?)
903 } else {
904 let def = LanceBuffer::from_bytes(def, 2);
905 let def = def.borrow_to_typed_slice::<u16>();
906 Some(def)
907 }
908 } else {
909 None
910 };
911
912 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
913
914 self.repdef = Some(repdef.clone());
915
916 Ok(repdef as Arc<dyn CachedPageData>)
917 }
918 .boxed()
919 }
920
921 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
922 self.repdef = Some(
923 data.clone()
924 .as_arc_any()
925 .downcast::<CachedComplexAllNullState>()
926 .unwrap(),
927 );
928 }
929
930 fn schedule_ranges(
931 &self,
932 ranges: &[Range<u64>],
933 _io: &Arc<dyn EncodingsIo>,
934 ) -> Result<Vec<PageLoadTask>> {
935 let ranges = VecDeque::from_iter(ranges.iter().cloned());
936 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
937 let decoder = Box::new(ComplexAllNullPageDecoder {
938 ranges,
939 rep: self.repdef.as_ref().unwrap().rep.clone(),
940 def: self.repdef.as_ref().unwrap().def.clone(),
941 num_rows,
942 def_meaning: self.def_meaning.clone(),
943 max_visible_level: self.max_visible_level,
944 }) as Box<dyn StructuralPageDecoder>;
945 let page_load_task = PageLoadTask {
946 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
947 num_rows,
948 };
949 Ok(vec![page_load_task])
950 }
951}
952
953#[derive(Debug)]
954pub struct ComplexAllNullPageDecoder {
955 ranges: VecDeque<Range<u64>>,
956 rep: Option<ScalarBuffer<u16>>,
957 def: Option<ScalarBuffer<u16>>,
958 num_rows: u64,
959 def_meaning: Arc<[DefinitionInterpretation]>,
960 max_visible_level: u16,
961}
962
963impl ComplexAllNullPageDecoder {
964 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
965 let mut rows_desired = num_rows;
966 let mut ranges = Vec::with_capacity(self.ranges.len());
967 while rows_desired > 0 {
968 let front = self.ranges.front_mut().unwrap();
969 let avail = front.end - front.start;
970 if avail > rows_desired {
971 ranges.push(front.start..front.start + rows_desired);
972 front.start += rows_desired;
973 rows_desired = 0;
974 } else {
975 ranges.push(self.ranges.pop_front().unwrap());
976 rows_desired -= avail;
977 }
978 }
979 ranges
980 }
981}
982
983impl StructuralPageDecoder for ComplexAllNullPageDecoder {
984 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
985 let drained_ranges = self.drain_ranges(num_rows);
986 Ok(Box::new(DecodeComplexAllNullTask {
987 ranges: drained_ranges,
988 rep: self.rep.clone(),
989 def: self.def.clone(),
990 def_meaning: self.def_meaning.clone(),
991 max_visible_level: self.max_visible_level,
992 }))
993 }
994
995 fn num_rows(&self) -> u64 {
996 self.num_rows
997 }
998}
999
1000#[derive(Debug)]
1003pub struct DecodeComplexAllNullTask {
1004 ranges: Vec<Range<u64>>,
1005 rep: Option<ScalarBuffer<u16>>,
1006 def: Option<ScalarBuffer<u16>>,
1007 def_meaning: Arc<[DefinitionInterpretation]>,
1008 max_visible_level: u16,
1009}
1010
1011impl DecodeComplexAllNullTask {
1012 fn decode_level(
1013 &self,
1014 levels: &Option<ScalarBuffer<u16>>,
1015 num_values: u64,
1016 ) -> Option<Vec<u16>> {
1017 levels.as_ref().map(|levels| {
1018 let mut referenced_levels = Vec::with_capacity(num_values as usize);
1019 for range in &self.ranges {
1020 referenced_levels.extend(
1021 levels[range.start as usize..range.end as usize]
1022 .iter()
1023 .copied(),
1024 );
1025 }
1026 referenced_levels
1027 })
1028 }
1029}
1030
1031impl DecodePageTask for DecodeComplexAllNullTask {
1032 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1033 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1034 let rep = self.decode_level(&self.rep, num_values);
1035 let def = self.decode_level(&self.def, num_values);
1036
1037 let num_values = if let Some(def) = &def {
1041 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1042 } else {
1043 num_values
1044 };
1045
1046 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1047 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1048 Ok(DecodedPage {
1049 data,
1050 repdef: unraveler,
1051 })
1052 }
1053}
1054
1055#[derive(Debug, Default)]
1060pub struct SimpleAllNullScheduler {}
1061
1062impl StructuralPageScheduler for SimpleAllNullScheduler {
1063 fn initialize<'a>(
1064 &'a mut self,
1065 _io: &Arc<dyn EncodingsIo>,
1066 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1067 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1068 }
1069
1070 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1071
1072 fn schedule_ranges(
1073 &self,
1074 ranges: &[Range<u64>],
1075 _io: &Arc<dyn EncodingsIo>,
1076 ) -> Result<Vec<PageLoadTask>> {
1077 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1078 let decoder =
1079 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1080 let page_load_task = PageLoadTask {
1081 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1082 num_rows,
1083 };
1084 Ok(vec![page_load_task])
1085 }
1086}
1087
1088#[derive(Debug)]
1091struct SimpleAllNullDecodePageTask {
1092 num_values: u64,
1093}
1094impl DecodePageTask for SimpleAllNullDecodePageTask {
1095 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1096 let unraveler = RepDefUnraveler::new(
1097 None,
1098 Some(vec![1; self.num_values as usize]),
1099 Arc::new([DefinitionInterpretation::NullableItem]),
1100 self.num_values,
1101 );
1102 Ok(DecodedPage {
1103 data: DataBlock::AllNull(AllNullDataBlock {
1104 num_values: self.num_values,
1105 }),
1106 repdef: unraveler,
1107 })
1108 }
1109}
1110
1111#[derive(Debug)]
1112pub struct SimpleAllNullPageDecoder {
1113 num_rows: u64,
1114}
1115
1116impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1117 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1118 Ok(Box::new(SimpleAllNullDecodePageTask {
1119 num_values: num_rows,
1120 }))
1121 }
1122
1123 fn num_rows(&self) -> u64 {
1124 self.num_rows
1125 }
1126}
1127
1128#[derive(Debug, Clone)]
1129struct MiniBlockSchedulerDictionary {
1130 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1132 dictionary_buf_position_and_size: (u64, u64),
1133 dictionary_data_alignment: u64,
1134 num_dictionary_items: u64,
1135}
1136
1137#[derive(Debug)]
1139struct MiniBlockRepIndexBlock {
1140 first_row: u64,
1144 starts_including_trailer: u64,
1147 has_preamble: bool,
1149 has_trailer: bool,
1151}
1152
1153impl DeepSizeOf for MiniBlockRepIndexBlock {
1154 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1155 0
1156 }
1157}
1158
1159#[derive(Debug)]
1164struct MiniBlockRepIndex {
1165 blocks: Vec<MiniBlockRepIndexBlock>,
1166}
1167
1168impl DeepSizeOf for MiniBlockRepIndex {
1169 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1170 self.blocks.deep_size_of_children(context)
1171 }
1172}
1173
1174impl MiniBlockRepIndex {
1175 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1180 let mut blocks = Vec::with_capacity(chunks.len());
1181 let mut offset: u64 = 0;
1182
1183 for c in chunks {
1184 blocks.push(MiniBlockRepIndexBlock {
1185 first_row: offset,
1186 starts_including_trailer: c.num_values,
1187 has_preamble: false,
1188 has_trailer: false,
1189 });
1190
1191 offset += c.num_values;
1192 }
1193
1194 Self { blocks }
1195 }
1196
1197 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1203 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1205 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1206 let n = u64_slice.len() / stride;
1207
1208 let mut blocks = Vec::with_capacity(n);
1209 let mut chunk_has_preamble = false;
1210 let mut offset: u64 = 0;
1211
1212 for i in 0..n {
1214 let base_idx = i * stride;
1215 let ends = u64_slice[base_idx];
1216 let partial = u64_slice[base_idx + 1];
1217
1218 let has_trailer = partial > 0;
1219 let starts_including_trailer =
1221 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1222
1223 blocks.push(MiniBlockRepIndexBlock {
1224 first_row: offset,
1225 starts_including_trailer,
1226 has_preamble: chunk_has_preamble,
1227 has_trailer,
1228 });
1229
1230 chunk_has_preamble = has_trailer;
1231 offset += starts_including_trailer;
1232 }
1233
1234 Self { blocks }
1235 }
1236}
1237
1238#[derive(Debug)]
1240struct MiniBlockCacheableState {
1241 chunk_meta: Vec<ChunkMeta>,
1243 rep_index: MiniBlockRepIndex,
1245 dictionary: Option<Arc<DataBlock>>,
1247}
1248
1249impl DeepSizeOf for MiniBlockCacheableState {
1250 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1251 self.rep_index.deep_size_of_children(context)
1252 + self
1253 .dictionary
1254 .as_ref()
1255 .map(|dict| dict.data_size() as usize)
1256 .unwrap_or(0)
1257 }
1258}
1259
1260impl CachedPageData for MiniBlockCacheableState {
1261 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1262 self
1263 }
1264}
1265
1266#[derive(Debug)]
1293pub struct MiniBlockScheduler {
1294 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1296 priority: u64,
1297 items_in_page: u64,
1298 repetition_index_depth: u16,
1299 num_buffers: u64,
1300 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1301 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1302 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1303 def_meaning: Arc<[DefinitionInterpretation]>,
1304 dictionary: Option<MiniBlockSchedulerDictionary>,
1305 page_meta: Option<Arc<MiniBlockCacheableState>>,
1307 has_large_chunk: bool,
1308}
1309
1310impl MiniBlockScheduler {
1311 fn try_new(
1312 buffer_offsets_and_sizes: &[(u64, u64)],
1313 priority: u64,
1314 items_in_page: u64,
1315 layout: &pb21::MiniBlockLayout,
1316 decompressors: &dyn DecompressionStrategy,
1317 ) -> Result<Self> {
1318 let rep_decompressor = layout
1319 .rep_compression
1320 .as_ref()
1321 .map(|rep_compression| {
1322 decompressors
1323 .create_block_decompressor(rep_compression)
1324 .map(Arc::from)
1325 })
1326 .transpose()?;
1327 let def_decompressor = layout
1328 .def_compression
1329 .as_ref()
1330 .map(|def_compression| {
1331 decompressors
1332 .create_block_decompressor(def_compression)
1333 .map(Arc::from)
1334 })
1335 .transpose()?;
1336 let def_meaning = layout
1337 .layers
1338 .iter()
1339 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1340 .collect::<Vec<_>>();
1341 let value_decompressor = decompressors.create_miniblock_decompressor(
1342 layout.value_compression.as_ref().unwrap(),
1343 decompressors,
1344 )?;
1345
1346 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1347 let num_dictionary_items = layout.num_dictionary_items;
1348 let dictionary_decompressor = decompressors
1349 .create_block_decompressor(dictionary_encoding)?
1350 .into();
1351 let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1352 {
1353 Compression::Variable(_) => 4,
1354 Compression::Flat(_) => 16,
1355 Compression::General(_) => 1,
1356 Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1357 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1358 }
1359 _ => {
1360 return Err(Error::invalid_input_source(
1361 format!(
1362 "Unsupported mini-block dictionary encoding: {:?}",
1363 dictionary_encoding.compression.as_ref().unwrap()
1364 )
1365 .into(),
1366 ));
1367 }
1368 };
1369 Some(MiniBlockSchedulerDictionary {
1370 dictionary_decompressor,
1371 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1372 dictionary_data_alignment,
1373 num_dictionary_items,
1374 })
1375 } else {
1376 None
1377 };
1378
1379 Ok(Self {
1380 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1381 rep_decompressor,
1382 def_decompressor,
1383 value_decompressor: value_decompressor.into(),
1384 repetition_index_depth: layout.repetition_index_depth as u16,
1385 num_buffers: layout.num_buffers,
1386 priority,
1387 items_in_page,
1388 dictionary,
1389 def_meaning: def_meaning.into(),
1390 page_meta: None,
1391 has_large_chunk: layout.has_large_chunk,
1392 })
1393 }
1394
1395 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1396 let page_meta = self.page_meta.as_ref().unwrap();
1397 chunk_indices
1398 .iter()
1399 .map(|&chunk_idx| {
1400 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1401 let bytes_start = chunk_meta.offset_bytes;
1402 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1403 LoadedChunk {
1404 byte_range: bytes_start..bytes_end,
1405 items_in_chunk: chunk_meta.num_values,
1406 chunk_idx,
1407 data: LanceBuffer::empty(),
1408 }
1409 })
1410 .collect()
1411 }
1412}
1413
1414#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1415enum PreambleAction {
1416 Take,
1417 Skip,
1418 Absent,
1419}
1420
1421#[derive(Clone, Debug, PartialEq, Eq)]
1456struct ChunkInstructions {
1457 chunk_idx: usize,
1459 preamble: PreambleAction,
1465 rows_to_skip: u64,
1469 rows_to_take: u64,
1472 take_trailer: bool,
1479}
1480
1481#[derive(Debug, PartialEq, Eq)]
1499struct ChunkDrainInstructions {
1500 chunk_instructions: ChunkInstructions,
1501 rows_to_skip: u64,
1502 rows_to_take: u64,
1503 preamble_action: PreambleAction,
1504}
1505
1506impl ChunkInstructions {
1507 fn schedule_instructions(
1513 rep_index: &MiniBlockRepIndex,
1514 user_ranges: &[Range<u64>],
1515 ) -> Vec<Self> {
1516 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1520
1521 for user_range in user_ranges {
1522 let mut rows_needed = user_range.end - user_range.start;
1523 let mut need_preamble = false;
1524
1525 let mut block_index = match rep_index
1528 .blocks
1529 .binary_search_by_key(&user_range.start, |block| block.first_row)
1530 {
1531 Ok(idx) => {
1532 let mut idx = idx;
1535 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1536 idx -= 1;
1537 }
1538 idx
1539 }
1540 Err(idx) => idx - 1,
1542 };
1543
1544 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1545
1546 while rows_needed > 0 || need_preamble {
1547 if block_index >= rep_index.blocks.len() {
1549 log::warn!(
1550 "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1551 );
1552 break;
1553 }
1554
1555 let chunk = &rep_index.blocks[block_index];
1556 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1557
1558 if rows_avail == 0 && to_skip == 0 {
1562 if chunk.has_preamble && need_preamble {
1564 chunk_instructions.push(Self {
1565 chunk_idx: block_index,
1566 preamble: PreambleAction::Take,
1567 rows_to_skip: 0,
1568 rows_to_take: 0,
1569 take_trailer: chunk.has_trailer,
1573 });
1574 if chunk.starts_including_trailer > 0
1578 || block_index == rep_index.blocks.len() - 1
1579 {
1580 need_preamble = false;
1581 }
1582 }
1583 block_index += 1;
1585 continue;
1586 }
1587
1588 if rows_avail == 0 && to_skip > 0 {
1592 to_skip -= chunk.starts_including_trailer;
1595 block_index += 1;
1596 continue;
1597 }
1598
1599 let rows_to_take = rows_avail.min(rows_needed);
1600 rows_needed -= rows_to_take;
1601
1602 let mut take_trailer = false;
1603 let preamble = if chunk.has_preamble {
1604 if need_preamble {
1605 PreambleAction::Take
1606 } else {
1607 PreambleAction::Skip
1608 }
1609 } else {
1610 PreambleAction::Absent
1611 };
1612
1613 if rows_to_take == rows_avail && chunk.has_trailer {
1615 take_trailer = true;
1616 need_preamble = true;
1617 } else {
1618 need_preamble = false;
1619 };
1620
1621 chunk_instructions.push(Self {
1622 preamble,
1623 chunk_idx: block_index,
1624 rows_to_skip: to_skip,
1625 rows_to_take,
1626 take_trailer,
1627 });
1628
1629 to_skip = 0;
1630 block_index += 1;
1631 }
1632 }
1633
1634 if user_ranges.len() > 1 {
1638 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1640 let mut instructions_iter = chunk_instructions.into_iter();
1641 merged_instructions.push(instructions_iter.next().unwrap());
1642 for instruction in instructions_iter {
1643 let last = merged_instructions.last_mut().unwrap();
1644 if last.chunk_idx == instruction.chunk_idx
1645 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1646 {
1647 last.rows_to_take += instruction.rows_to_take;
1648 last.take_trailer |= instruction.take_trailer;
1649 } else {
1650 merged_instructions.push(instruction);
1651 }
1652 }
1653 merged_instructions
1654 } else {
1655 chunk_instructions
1656 }
1657 }
1658
1659 fn drain_from_instruction(
1660 &self,
1661 rows_desired: &mut u64,
1662 need_preamble: &mut bool,
1663 skip_in_chunk: &mut u64,
1664 ) -> (ChunkDrainInstructions, bool) {
1665 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1667 let rows_avail = self.rows_to_take - *skip_in_chunk;
1668 let has_preamble = self.preamble != PreambleAction::Absent;
1669 let preamble_action = match (*need_preamble, has_preamble) {
1670 (true, true) => PreambleAction::Take,
1671 (true, false) => panic!("Need preamble but there isn't one"),
1672 (false, true) => PreambleAction::Skip,
1673 (false, false) => PreambleAction::Absent,
1674 };
1675
1676 let rows_taking = if *rows_desired >= rows_avail {
1679 *need_preamble = self.take_trailer;
1687 rows_avail
1688 } else {
1689 *need_preamble = false;
1692 *rows_desired
1693 };
1694 let rows_skipped = *skip_in_chunk;
1695
1696 let consumed_chunk = if *rows_desired >= rows_avail {
1698 *rows_desired -= rows_avail;
1699 *skip_in_chunk = 0;
1700 true
1701 } else {
1702 *skip_in_chunk += *rows_desired;
1703 *rows_desired = 0;
1704 false
1705 };
1706
1707 (
1708 ChunkDrainInstructions {
1709 chunk_instructions: self.clone(),
1710 rows_to_skip: rows_skipped,
1711 rows_to_take: rows_taking,
1712 preamble_action,
1713 },
1714 consumed_chunk,
1715 )
1716 }
1717}
1718
1719enum Words {
1720 U16(ScalarBuffer<u16>),
1721 U32(ScalarBuffer<u32>),
1722}
1723
1724struct WordsIter<'a> {
1725 iter: Box<dyn Iterator<Item = u32> + 'a>,
1726}
1727
1728impl Words {
1729 pub fn len(&self) -> usize {
1730 match self {
1731 Self::U16(b) => b.len(),
1732 Self::U32(b) => b.len(),
1733 }
1734 }
1735
1736 pub fn iter(&self) -> WordsIter<'_> {
1737 match self {
1738 Self::U16(buf) => WordsIter {
1739 iter: Box::new(buf.iter().map(|&x| x as u32)),
1740 },
1741 Self::U32(buf) => WordsIter {
1742 iter: Box::new(buf.iter().copied()),
1743 },
1744 }
1745 }
1746
1747 pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1748 let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1749 assert_eq!(bytes.len() % bytes_per_value, 0);
1750 let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1751 if has_large_chunk {
1752 Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1753 } else {
1754 Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1755 }
1756 }
1757}
1758
1759impl<'a> Iterator for WordsIter<'a> {
1760 type Item = u32;
1761
1762 fn next(&mut self) -> Option<Self::Item> {
1763 self.iter.next()
1764 }
1765}
1766
1767impl StructuralPageScheduler for MiniBlockScheduler {
1768 fn initialize<'a>(
1769 &'a mut self,
1770 io: &Arc<dyn EncodingsIo>,
1771 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1772 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1776 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1777 let mut bufs_needed = 1;
1778 if self.dictionary.is_some() {
1779 bufs_needed += 1;
1780 }
1781 if self.repetition_index_depth > 0 {
1782 bufs_needed += 1;
1783 }
1784 let mut required_ranges = Vec::with_capacity(bufs_needed);
1785 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1786 if let Some(ref dictionary) = self.dictionary {
1787 required_ranges.push(
1788 dictionary.dictionary_buf_position_and_size.0
1789 ..dictionary.dictionary_buf_position_and_size.0
1790 + dictionary.dictionary_buf_position_and_size.1,
1791 );
1792 }
1793 if self.repetition_index_depth > 0 {
1794 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1795 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1796 }
1797 let io_req = io.submit_request(required_ranges, 0);
1798
1799 async move {
1800 let mut buffers = io_req.await?.into_iter().fuse();
1801 let meta_bytes = buffers.next().unwrap();
1802 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1803 let rep_index_bytes = buffers.next();
1804
1805 let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1807 let mut chunk_meta = Vec::with_capacity(words.len());
1808
1809 let mut rows_counter = 0;
1810 let mut offset_bytes = value_buf_position;
1811 for (word_idx, word) in words.iter().enumerate() {
1812 let log_num_values = word & 0x0F;
1813 let divided_bytes = word >> 4;
1814 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1815 debug_assert!(num_bytes > 0);
1816 let num_values = if word_idx < words.len() - 1 {
1817 debug_assert!(log_num_values > 0);
1818 1 << log_num_values
1819 } else {
1820 debug_assert!(
1821 log_num_values == 0
1822 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1823 );
1824 self.items_in_page - rows_counter
1825 };
1826 rows_counter += num_values;
1827
1828 chunk_meta.push(ChunkMeta {
1829 num_values,
1830 chunk_size_bytes: num_bytes as u64,
1831 offset_bytes,
1832 });
1833 offset_bytes += num_bytes as u64;
1834 }
1835
1836 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1838 assert!(rep_index_data.len() % 8 == 0);
1839 let stride = self.repetition_index_depth as usize + 1;
1840 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1841 } else {
1842 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1843 };
1844
1845 let mut page_meta = MiniBlockCacheableState {
1846 chunk_meta,
1847 rep_index,
1848 dictionary: None,
1849 };
1850
1851 if let Some(ref mut dictionary) = self.dictionary {
1853 let dictionary_data = dictionary_bytes.unwrap();
1854 page_meta.dictionary =
1855 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1856 LanceBuffer::from_bytes(
1857 dictionary_data,
1858 dictionary.dictionary_data_alignment,
1859 ),
1860 dictionary.num_dictionary_items,
1861 )?));
1862 };
1863 let page_meta = Arc::new(page_meta);
1864 self.page_meta = Some(page_meta.clone());
1865 Ok(page_meta as Arc<dyn CachedPageData>)
1866 }
1867 .boxed()
1868 }
1869
1870 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1871 self.page_meta = Some(
1872 data.clone()
1873 .as_arc_any()
1874 .downcast::<MiniBlockCacheableState>()
1875 .unwrap(),
1876 );
1877 }
1878
1879 fn schedule_ranges(
1880 &self,
1881 ranges: &[Range<u64>],
1882 io: &Arc<dyn EncodingsIo>,
1883 ) -> Result<Vec<PageLoadTask>> {
1884 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1885
1886 let page_meta = self.page_meta.as_ref().unwrap();
1887
1888 let chunk_instructions =
1889 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1890
1891 debug_assert_eq!(
1892 num_rows,
1893 chunk_instructions
1894 .iter()
1895 .map(|ci| ci.rows_to_take)
1896 .sum::<u64>()
1897 );
1898
1899 let chunks_needed = chunk_instructions
1900 .iter()
1901 .map(|ci| ci.chunk_idx)
1902 .unique()
1903 .collect::<Vec<_>>();
1904
1905 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1906 let chunk_ranges = loaded_chunks
1907 .iter()
1908 .map(|c| c.byte_range.clone())
1909 .collect::<Vec<_>>();
1910 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1911
1912 let rep_decompressor = self.rep_decompressor.clone();
1913 let def_decompressor = self.def_decompressor.clone();
1914 let value_decompressor = self.value_decompressor.clone();
1915 let num_buffers = self.num_buffers;
1916 let has_large_chunk = self.has_large_chunk;
1917 let dictionary = page_meta
1918 .dictionary
1919 .as_ref()
1920 .map(|dictionary| dictionary.clone());
1921 let def_meaning = self.def_meaning.clone();
1922
1923 let res = async move {
1924 let loaded_chunk_data = loaded_chunk_data.await?;
1925 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1926 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1927 }
1928
1929 Ok(Box::new(MiniBlockDecoder {
1930 rep_decompressor,
1931 def_decompressor,
1932 value_decompressor,
1933 def_meaning,
1934 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1935 instructions: VecDeque::from(chunk_instructions),
1936 offset_in_current_chunk: 0,
1937 dictionary,
1938 num_rows,
1939 num_buffers,
1940 has_large_chunk,
1941 }) as Box<dyn StructuralPageDecoder>)
1942 }
1943 .boxed();
1944 let page_load_task = PageLoadTask {
1945 decoder_fut: res,
1946 num_rows,
1947 };
1948 Ok(vec![page_load_task])
1949 }
1950}
1951
1952#[derive(Debug, Clone, Copy)]
1953struct FullZipRepIndexDetails {
1954 buf_position: u64,
1955 bytes_per_value: u64, }
1957
1958#[derive(Debug)]
1959enum PerValueDecompressor {
1960 Fixed(Arc<dyn FixedPerValueDecompressor>),
1961 Variable(Arc<dyn VariablePerValueDecompressor>),
1962}
1963
1964#[derive(Debug)]
1965struct FullZipDecodeDetails {
1966 value_decompressor: PerValueDecompressor,
1967 def_meaning: Arc<[DefinitionInterpretation]>,
1968 ctrl_word_parser: ControlWordParser,
1969 max_rep: u16,
1970 max_visible_def: u16,
1971}
1972
1973#[derive(Debug, Clone)]
1985enum FullZipReadSource {
1986 Remote(Arc<dyn EncodingsIo>),
1988 PrefetchedPage { base_offset: u64, data: LanceBuffer },
1990}
1991
1992impl FullZipReadSource {
1993 fn fetch(
1997 &self,
1998 ranges: &[Range<u64>],
1999 priority: u64,
2000 ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2001 match self {
2002 Self::Remote(io) => {
2003 let io = io.clone();
2004 let ranges = ranges.to_vec();
2005 async move {
2006 let data = io.submit_request(ranges, priority).await?;
2007 Ok(data
2008 .into_iter()
2009 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2010 .collect::<VecDeque<_>>())
2011 }
2012 .boxed()
2013 }
2014 Self::PrefetchedPage { base_offset, data } => {
2015 let base_offset = *base_offset;
2016 let data = data.clone();
2017 let page_end = base_offset + data.len() as u64;
2018 std::future::ready(
2019 ranges
2020 .iter()
2021 .map(|range| {
2022 if range.start > range.end
2023 || range.start < base_offset
2024 || range.end > page_end
2025 {
2026 return Err(Error::internal(format!(
2027 "Requested range {:?} is outside page range {}..{}",
2028 range, base_offset, page_end
2029 )));
2030 }
2031 let start = (range.start - base_offset) as usize;
2032 let len = (range.end - range.start) as usize;
2033 Ok(data.slice_with_length(start, len))
2034 })
2035 .collect::<Result<VecDeque<_>>>(),
2036 )
2037 .boxed()
2038 }
2039 }
2040 }
2041}
2042
2043#[derive(Debug)]
2051pub struct FullZipScheduler {
2052 data_buf_position: u64,
2053 data_buf_size: u64,
2054 rep_index: Option<FullZipRepIndexDetails>,
2055 priority: u64,
2056 rows_in_page: u64,
2057 bits_per_offset: u8,
2058 details: Arc<FullZipDecodeDetails>,
2059 cached_state: Option<Arc<FullZipCacheableState>>,
2061 enable_cache: bool,
2063}
2064
2065impl FullZipScheduler {
2066 fn try_new(
2067 buffer_offsets_and_sizes: &[(u64, u64)],
2068 priority: u64,
2069 rows_in_page: u64,
2070 layout: &pb21::FullZipLayout,
2071 decompressors: &dyn DecompressionStrategy,
2072 ) -> Result<Self> {
2073 let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2074 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2075 let num_reps = rows_in_page + 1;
2076 let bytes_per_rep = len / num_reps;
2077 debug_assert_eq!(len % num_reps, 0);
2078 debug_assert!(
2079 bytes_per_rep == 1
2080 || bytes_per_rep == 2
2081 || bytes_per_rep == 4
2082 || bytes_per_rep == 8
2083 );
2084 FullZipRepIndexDetails {
2085 buf_position: *pos,
2086 bytes_per_value: bytes_per_rep,
2087 }
2088 });
2089
2090 let value_decompressor = match layout.details {
2091 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2092 let decompressor = decompressors.create_fixed_per_value_decompressor(
2093 layout.value_compression.as_ref().unwrap(),
2094 )?;
2095 PerValueDecompressor::Fixed(decompressor.into())
2096 }
2097 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2098 let decompressor = decompressors.create_variable_per_value_decompressor(
2099 layout.value_compression.as_ref().unwrap(),
2100 )?;
2101 PerValueDecompressor::Variable(decompressor.into())
2102 }
2103 None => {
2104 panic!("Full-zip layout must have a `details` field");
2105 }
2106 };
2107 let ctrl_word_parser = ControlWordParser::new(
2108 layout.bits_rep.try_into().unwrap(),
2109 layout.bits_def.try_into().unwrap(),
2110 );
2111 let def_meaning = layout
2112 .layers
2113 .iter()
2114 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2115 .collect::<Vec<_>>();
2116
2117 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2118 let max_visible_def = def_meaning
2119 .iter()
2120 .filter(|d| !d.is_list())
2121 .map(|d| d.num_def_levels())
2122 .sum();
2123
2124 let bits_per_offset = match layout.details {
2125 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2126 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2127 bits_per_offset as u8
2128 }
2129 None => panic!("Full-zip layout must have a `details` field"),
2130 };
2131
2132 let details = Arc::new(FullZipDecodeDetails {
2133 value_decompressor,
2134 def_meaning: def_meaning.into(),
2135 ctrl_word_parser,
2136 max_rep,
2137 max_visible_def,
2138 });
2139 Ok(Self {
2140 data_buf_position,
2141 data_buf_size,
2142 rep_index,
2143 details,
2144 priority,
2145 rows_in_page,
2146 bits_per_offset,
2147 cached_state: None,
2148 enable_cache: false,
2149 })
2150 }
2151
2152 fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2153 if ranges.is_empty() {
2154 return false;
2155 }
2156 let mut expected_start = 0;
2157 for range in ranges {
2158 if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2159 {
2160 return false;
2161 }
2162 expected_start = range.end;
2163 }
2164 expected_start == rows_in_page
2165 }
2166
2167 fn create_page_load_task(
2168 io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
2169 num_rows: u64,
2170 details: Arc<FullZipDecodeDetails>,
2171 bits_per_offset: u8,
2172 ) -> PageLoadTask {
2173 let load_task = async move {
2174 let buffers = io_future.await?;
2175 let data = buffers
2176 .into_iter()
2177 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2178 .collect::<VecDeque<_>>();
2179 Self::create_decoder(details, data, num_rows, bits_per_offset)
2180 }
2181 .boxed();
2182 PageLoadTask {
2183 decoder_fut: load_task,
2184 num_rows,
2185 }
2186 }
2187
2188 fn create_decoder(
2190 details: Arc<FullZipDecodeDetails>,
2191 data: VecDeque<LanceBuffer>,
2192 num_rows: u64,
2193 bits_per_offset: u8,
2194 ) -> Result<Box<dyn StructuralPageDecoder>> {
2195 match &details.value_decompressor {
2196 PerValueDecompressor::Fixed(decompressor) => {
2197 let bits_per_value = decompressor.bits_per_value();
2198 if bits_per_value % 8 != 0 {
2199 return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2200 }
2201 let bytes_per_value = bits_per_value / 8;
2202 let total_bytes_per_value =
2203 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2204 if total_bytes_per_value == 0 {
2205 return Err(lance_core::Error::internal(
2206 "Invalid encoding: per-row byte width must be greater than 0",
2207 ));
2208 }
2209 Ok(Box::new(FixedFullZipDecoder {
2210 details,
2211 data,
2212 num_rows,
2213 offset_in_current: 0,
2214 bytes_per_value: bytes_per_value as usize,
2215 total_bytes_per_value,
2216 }) as Box<dyn StructuralPageDecoder>)
2217 }
2218 PerValueDecompressor::Variable(_decompressor) => {
2219 Ok(Box::new(VariableFullZipDecoder::new(
2220 details,
2221 data,
2222 num_rows,
2223 bits_per_offset,
2224 bits_per_offset,
2225 )))
2226 }
2227 }
2228 }
2229
2230 fn extract_byte_ranges_from_pairs(
2233 buffer: LanceBuffer,
2234 bytes_per_value: u64,
2235 data_buf_position: u64,
2236 ) -> Vec<Range<u64>> {
2237 ByteUnpacker::new(buffer, bytes_per_value as usize)
2238 .chunks(2)
2239 .into_iter()
2240 .map(|mut c| {
2241 let start = c.next().unwrap() + data_buf_position;
2242 let end = c.next().unwrap() + data_buf_position;
2243 start..end
2244 })
2245 .collect::<Vec<_>>()
2246 }
2247
2248 fn extract_byte_ranges_from_cached(
2251 buffer: &LanceBuffer,
2252 ranges: &[Range<u64>],
2253 bytes_per_value: u64,
2254 data_buf_position: u64,
2255 ) -> Vec<Range<u64>> {
2256 ranges
2257 .iter()
2258 .map(|r| {
2259 let start_offset = (r.start * bytes_per_value) as usize;
2260 let end_offset = (r.end * bytes_per_value) as usize;
2261
2262 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2263 let start_val =
2264 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2265 .next()
2266 .unwrap();
2267
2268 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2269 let end_val =
2270 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2271 .next()
2272 .unwrap();
2273
2274 (data_buf_position + start_val)..(data_buf_position + end_val)
2275 })
2276 .collect()
2277 }
2278
2279 fn compute_rep_index_ranges(
2281 ranges: &[Range<u64>],
2282 rep_index: &FullZipRepIndexDetails,
2283 ) -> Vec<Range<u64>> {
2284 ranges
2285 .iter()
2286 .flat_map(|r| {
2287 let first_val_start =
2288 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2289 let first_val_end = first_val_start + rep_index.bytes_per_value;
2290 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2291 let last_val_end = last_val_start + rep_index.bytes_per_value;
2292 [first_val_start..first_val_end, last_val_start..last_val_end]
2293 })
2294 .collect()
2295 }
2296
2297 fn schedule_ranges_rep(
2299 &self,
2300 ranges: &[Range<u64>],
2301 io: &Arc<dyn EncodingsIo>,
2302 rep_index: FullZipRepIndexDetails,
2303 ) -> Result<Vec<PageLoadTask>> {
2304 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2305 let data_buf_position = self.data_buf_position;
2306 let priority = self.priority;
2307 let details = self.details.clone();
2308 let bits_per_offset = self.bits_per_offset;
2309
2310 if Self::covers_entire_page(ranges, self.rows_in_page) {
2311 let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2312 let page_data = io.submit_single(full_range.clone(), priority);
2313 let load_task = async move {
2314 let page_data = page_data.await?;
2315 let source = FullZipReadSource::PrefetchedPage {
2316 base_offset: full_range.start,
2317 data: LanceBuffer::from_bytes(page_data, 1),
2318 };
2319 let read_ranges = vec![full_range];
2320 let data = source.fetch(&read_ranges, priority).await?;
2321 Self::create_decoder(details, data, num_rows, bits_per_offset)
2322 }
2323 .boxed();
2324 let page_load_task = PageLoadTask {
2325 decoder_fut: load_task,
2326 num_rows,
2327 };
2328 return Ok(vec![page_load_task]);
2329 }
2330
2331 if let Some(cached_state) = &self.cached_state {
2332 let byte_ranges = Self::extract_byte_ranges_from_cached(
2333 &cached_state.rep_index_buffer,
2334 ranges,
2335 rep_index.bytes_per_value,
2336 data_buf_position,
2337 );
2338 let io_future = io.submit_request(byte_ranges, priority);
2339 let page_load_task =
2340 Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
2341 return Ok(vec![page_load_task]);
2342 }
2343
2344 let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2345 let rep_data = io.submit_request(rep_ranges, priority);
2346 let io_clone = io.clone();
2347 let load_task = async move {
2348 let rep_data = rep_data.await?;
2349 let rep_buffer = LanceBuffer::concat(
2350 &rep_data
2351 .into_iter()
2352 .map(|d| LanceBuffer::from_bytes(d, 1))
2353 .collect::<Vec<_>>(),
2354 );
2355 let byte_ranges = Self::extract_byte_ranges_from_pairs(
2356 rep_buffer,
2357 rep_index.bytes_per_value,
2358 data_buf_position,
2359 );
2360 let source = FullZipReadSource::Remote(io_clone);
2361 let data = source.fetch(&byte_ranges, priority).await?;
2362 Self::create_decoder(details, data, num_rows, bits_per_offset)
2363 }
2364 .boxed();
2365 let page_load_task = PageLoadTask {
2366 decoder_fut: load_task,
2367 num_rows,
2368 };
2369 Ok(vec![page_load_task])
2370 }
2371
2372 fn schedule_ranges_simple(
2376 &self,
2377 ranges: &[Range<u64>],
2378 io: &Arc<dyn EncodingsIo>,
2379 ) -> Result<Vec<PageLoadTask>> {
2380 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2382
2383 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2384 unreachable!()
2385 };
2386
2387 let bits_per_value = decompressor.bits_per_value();
2389 assert_eq!(bits_per_value % 8, 0);
2390 let bytes_per_value = bits_per_value / 8;
2391 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2392 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2393 let byte_ranges = ranges
2394 .iter()
2395 .map(|r| {
2396 debug_assert!(r.end <= self.rows_in_page);
2397 let start = self.data_buf_position + r.start * total_bytes_per_value;
2398 let end = self.data_buf_position + r.end * total_bytes_per_value;
2399 start..end
2400 })
2401 .collect::<Vec<_>>();
2402
2403 let io_future = io.submit_request(byte_ranges, self.priority);
2404 let page_load_task = Self::create_page_load_task(
2405 io_future,
2406 num_rows,
2407 self.details.clone(),
2408 self.bits_per_offset,
2409 );
2410 Ok(vec![page_load_task])
2411 }
2412}
2413
2414#[derive(Debug)]
2416struct FullZipCacheableState {
2417 rep_index_buffer: LanceBuffer,
2419}
2420
2421impl DeepSizeOf for FullZipCacheableState {
2422 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2423 self.rep_index_buffer.len()
2424 }
2425}
2426
2427impl CachedPageData for FullZipCacheableState {
2428 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2429 self
2430 }
2431}
2432
2433impl StructuralPageScheduler for FullZipScheduler {
2434 fn initialize<'a>(
2435 &'a mut self,
2436 io: &Arc<dyn EncodingsIo>,
2437 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2438 if self.enable_cache
2439 && let Some(rep_index) = self.rep_index
2440 {
2441 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2442 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2443 let io_clone = io.clone();
2444 return async move {
2445 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2446 let state = Arc::new(FullZipCacheableState {
2447 rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2448 });
2449 self.cached_state = Some(state.clone());
2450 Ok(state as Arc<dyn CachedPageData>)
2451 }
2452 .boxed();
2453 }
2454 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2455 }
2456
2457 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2461 if let Ok(cached_state) = cache
2463 .clone()
2464 .as_arc_any()
2465 .downcast::<FullZipCacheableState>()
2466 {
2467 self.cached_state = Some(cached_state);
2469 }
2470 }
2471
2472 fn schedule_ranges(
2473 &self,
2474 ranges: &[Range<u64>],
2475 io: &Arc<dyn EncodingsIo>,
2476 ) -> Result<Vec<PageLoadTask>> {
2477 if let Some(rep_index) = self.rep_index {
2478 self.schedule_ranges_rep(ranges, io, rep_index)
2479 } else {
2480 self.schedule_ranges_simple(ranges, io)
2481 }
2482 }
2483}
2484
2485#[derive(Debug)]
2493struct FixedFullZipDecoder {
2494 details: Arc<FullZipDecodeDetails>,
2495 data: VecDeque<LanceBuffer>,
2496 offset_in_current: usize,
2497 bytes_per_value: usize,
2498 total_bytes_per_value: usize,
2499 num_rows: u64,
2500}
2501
2502impl FixedFullZipDecoder {
2503 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2504 debug_assert!(num_rows > 0);
2505 let cur_buf = self.data.front_mut().unwrap();
2506 let start = self.offset_in_current;
2507 if self.details.ctrl_word_parser.has_rep() {
2508 let mut rows_started = 0;
2511 let mut num_items = 0;
2514 while self.offset_in_current < cur_buf.len() {
2515 let control = self.details.ctrl_word_parser.parse_desc(
2516 &cur_buf[self.offset_in_current..],
2517 self.details.max_rep,
2518 self.details.max_visible_def,
2519 );
2520 if control.is_new_row {
2521 if rows_started == num_rows {
2522 break;
2523 }
2524 rows_started += 1;
2525 }
2526 num_items += 1;
2527 if control.is_visible {
2528 self.offset_in_current += self.total_bytes_per_value;
2529 } else {
2530 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2531 }
2532 }
2533
2534 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2535 if self.offset_in_current == cur_buf.len() {
2536 self.data.pop_front();
2537 self.offset_in_current = 0;
2538 }
2539
2540 FullZipDecodeTaskItem {
2541 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2542 data: task_slice,
2543 bits_per_value: self.bytes_per_value as u64 * 8,
2544 num_values: num_items,
2545 block_info: BlockInfo::new(),
2546 }),
2547 rows_in_buf: rows_started,
2548 }
2549 } else {
2550 let cur_buf = self.data.front_mut().unwrap();
2553 let bytes_avail = cur_buf.len() - self.offset_in_current;
2554 let offset_in_cur = self.offset_in_current;
2555
2556 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2557 let mut rows_taken = num_rows;
2558 let task_slice = if bytes_needed >= bytes_avail {
2559 self.offset_in_current = 0;
2560 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2561 self.data
2562 .pop_front()
2563 .unwrap()
2564 .slice_with_length(offset_in_cur, bytes_avail)
2565 } else {
2566 self.offset_in_current += bytes_needed;
2567 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2568 };
2569 FullZipDecodeTaskItem {
2570 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2571 data: task_slice,
2572 bits_per_value: self.bytes_per_value as u64 * 8,
2573 num_values: rows_taken,
2574 block_info: BlockInfo::new(),
2575 }),
2576 rows_in_buf: rows_taken,
2577 }
2578 }
2579 }
2580}
2581
2582impl StructuralPageDecoder for FixedFullZipDecoder {
2583 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2584 let mut task_data = Vec::with_capacity(self.data.len());
2585 let mut remaining = num_rows;
2586 while remaining > 0 {
2587 let task_item = self.slice_next_task(remaining);
2588 remaining -= task_item.rows_in_buf;
2589 task_data.push(task_item);
2590 }
2591 Ok(Box::new(FixedFullZipDecodeTask {
2592 details: self.details.clone(),
2593 data: task_data,
2594 bytes_per_value: self.bytes_per_value,
2595 num_rows: num_rows as usize,
2596 }))
2597 }
2598
2599 fn num_rows(&self) -> u64 {
2600 self.num_rows
2601 }
2602}
2603
2604#[derive(Debug)]
2609struct VariableFullZipDecoder {
2610 details: Arc<FullZipDecodeDetails>,
2611 decompressor: Arc<dyn VariablePerValueDecompressor>,
2612 data: LanceBuffer,
2613 offsets: LanceBuffer,
2614 rep: ScalarBuffer<u16>,
2615 def: ScalarBuffer<u16>,
2616 repdef_starts: Vec<usize>,
2617 data_starts: Vec<usize>,
2618 offset_starts: Vec<usize>,
2619 visible_item_counts: Vec<u64>,
2620 bits_per_offset: u8,
2621 current_idx: usize,
2622 num_rows: u64,
2623}
2624
2625impl VariableFullZipDecoder {
2626 fn new(
2627 details: Arc<FullZipDecodeDetails>,
2628 data: VecDeque<LanceBuffer>,
2629 num_rows: u64,
2630 in_bits_per_length: u8,
2631 out_bits_per_offset: u8,
2632 ) -> Self {
2633 let decompressor = match details.value_decompressor {
2634 PerValueDecompressor::Variable(ref d) => d.clone(),
2635 _ => unreachable!(),
2636 };
2637
2638 assert_eq!(in_bits_per_length % 8, 0);
2639 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2640
2641 let mut decoder = Self {
2642 details,
2643 decompressor,
2644 data: LanceBuffer::empty(),
2645 offsets: LanceBuffer::empty(),
2646 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2647 def: LanceBuffer::empty().borrow_to_typed_slice(),
2648 bits_per_offset: out_bits_per_offset,
2649 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2650 data_starts: Vec::with_capacity(num_rows as usize + 1),
2651 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2652 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2653 current_idx: 0,
2654 num_rows,
2655 };
2656
2657 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2678
2679 decoder
2680 }
2681
2682 fn slice_batch_data_and_rebase_offsets_typed<T>(
2683 data: &LanceBuffer,
2684 offsets: &LanceBuffer,
2685 ) -> Result<(LanceBuffer, LanceBuffer)>
2686 where
2687 T: arrow_buffer::ArrowNativeType
2688 + Copy
2689 + PartialOrd
2690 + std::ops::Sub<Output = T>
2691 + std::fmt::Display
2692 + TryInto<usize>,
2693 {
2694 let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2695 let offsets_slice = offsets_slice.as_ref();
2696 if offsets_slice.is_empty() {
2697 return Err(Error::internal(
2698 "Variable offsets cannot be empty".to_string(),
2699 ));
2700 }
2701
2702 let base = offsets_slice[0];
2703 let end = *offsets_slice.last().unwrap();
2704 if end < base {
2705 return Err(Error::internal(format!(
2706 "Invalid variable offsets: end ({end}) is less than base ({base})"
2707 )));
2708 }
2709
2710 let data_start = base.try_into().map_err(|_| {
2711 Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2712 })?;
2713 let data_end = end.try_into().map_err(|_| {
2714 Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2715 })?;
2716 if data_end > data.len() {
2717 return Err(Error::internal(format!(
2718 "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2719 data.len()
2720 )));
2721 }
2722
2723 let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2724 for &offset in offsets_slice {
2725 if offset < base {
2726 return Err(Error::internal(format!(
2727 "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2728 )));
2729 }
2730 rebased_offsets.push(offset - base);
2731 }
2732
2733 let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2734 let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2736 let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2737 Ok((sliced_data, rebased_offsets))
2738 }
2739
2740 fn slice_batch_data_and_rebase_offsets(
2741 data: &LanceBuffer,
2742 offsets: &LanceBuffer,
2743 bits_per_offset: u8,
2744 ) -> Result<(LanceBuffer, LanceBuffer)> {
2745 match bits_per_offset {
2746 32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2747 64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2748 _ => Err(Error::internal(format!(
2749 "Unsupported bits_per_offset={bits_per_offset}"
2750 ))),
2751 }
2752 }
2753
2754 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2755 match bits_per_offset {
2756 8 => *data.get_unchecked(0) as u64,
2757 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2758 32 => u32::from_le_bytes([
2759 *data.get_unchecked(0),
2760 *data.get_unchecked(1),
2761 *data.get_unchecked(2),
2762 *data.get_unchecked(3),
2763 ]) as u64,
2764 64 => u64::from_le_bytes([
2765 *data.get_unchecked(0),
2766 *data.get_unchecked(1),
2767 *data.get_unchecked(2),
2768 *data.get_unchecked(3),
2769 *data.get_unchecked(4),
2770 *data.get_unchecked(5),
2771 *data.get_unchecked(6),
2772 *data.get_unchecked(7),
2773 ]),
2774 _ => unreachable!(),
2775 }
2776 }
2777
2778 fn unzip(
2779 &mut self,
2780 data: VecDeque<LanceBuffer>,
2781 in_bits_per_length: u8,
2782 out_bits_per_offset: u8,
2783 num_rows: u64,
2784 ) {
2785 let mut rep = Vec::with_capacity(num_rows as usize);
2787 let mut def = Vec::with_capacity(num_rows as usize);
2788 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2789
2790 let bytes_per_offset = out_bits_per_offset as usize / 8;
2793 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2794 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2795
2796 let bytes_per_length = in_bits_per_length as usize / 8;
2797 let bytes_lengths = bytes_per_length * num_rows as usize;
2798
2799 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2800 let mut unzipped_data =
2803 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2804
2805 let mut current_offset = 0_u64;
2806 let mut visible_item_count = 0_u64;
2807 for databuf in data.into_iter() {
2808 let mut databuf = databuf.as_ref();
2809 while !databuf.is_empty() {
2810 let data_start = unzipped_data.len();
2811 let offset_start = offsets_data.len();
2812 let repdef_start = rep.len().max(def.len());
2815 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2817 databuf,
2818 self.details.max_rep,
2819 self.details.max_visible_def,
2820 );
2821 self.details
2822 .ctrl_word_parser
2823 .parse(databuf, &mut rep, &mut def);
2824 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2825
2826 if ctrl_desc.is_new_row {
2827 self.repdef_starts.push(repdef_start);
2828 self.data_starts.push(data_start);
2829 self.offset_starts.push(offset_start);
2830 self.visible_item_counts.push(visible_item_count);
2831 }
2832 if ctrl_desc.is_visible {
2833 visible_item_count += 1;
2834 if ctrl_desc.is_valid_item {
2835 debug_assert!(databuf.len() >= bytes_per_length);
2837 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2838 match out_bits_per_offset {
2839 32 => offsets_data
2840 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2841 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2842 _ => unreachable!(),
2843 };
2844 databuf = &databuf[bytes_per_offset..];
2845 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2846 databuf = &databuf[length as usize..];
2847 current_offset += length;
2848 } else {
2849 match out_bits_per_offset {
2851 32 => offsets_data
2852 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2853 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2854 _ => unreachable!(),
2855 }
2856 }
2857 }
2858 }
2859 }
2860 self.repdef_starts.push(rep.len().max(def.len()));
2861 self.data_starts.push(unzipped_data.len());
2862 self.offset_starts.push(offsets_data.len());
2863 self.visible_item_counts.push(visible_item_count);
2864 match out_bits_per_offset {
2865 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2866 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2867 _ => unreachable!(),
2868 };
2869 self.rep = ScalarBuffer::from(rep);
2870 self.def = ScalarBuffer::from(def);
2871 self.data = LanceBuffer::from(unzipped_data);
2872 self.offsets = LanceBuffer::from(offsets_data);
2873 }
2874}
2875
2876impl StructuralPageDecoder for VariableFullZipDecoder {
2877 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2878 let start = self.current_idx;
2879 let end = start + num_rows as usize;
2880
2881 let offset_start = self.offset_starts[start];
2882 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2883 let offsets = self
2884 .offsets
2885 .slice_with_length(offset_start, offset_end - offset_start);
2886 let (data, offsets) =
2888 Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2889
2890 let repdef_start = self.repdef_starts[start];
2891 let repdef_end = self.repdef_starts[end];
2892 let rep = if self.rep.is_empty() {
2893 self.rep.clone()
2894 } else {
2895 self.rep.slice(repdef_start, repdef_end - repdef_start)
2896 };
2897 let def = if self.def.is_empty() {
2898 self.def.clone()
2899 } else {
2900 self.def.slice(repdef_start, repdef_end - repdef_start)
2901 };
2902
2903 let visible_item_counts_start = self.visible_item_counts[start];
2904 let visible_item_counts_end = self.visible_item_counts[end];
2905 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2906
2907 self.current_idx += num_rows as usize;
2908
2909 Ok(Box::new(VariableFullZipDecodeTask {
2910 details: self.details.clone(),
2911 decompressor: self.decompressor.clone(),
2912 data,
2913 offsets,
2914 bits_per_offset: self.bits_per_offset,
2915 num_visible_items,
2916 rep,
2917 def,
2918 }))
2919 }
2920
2921 fn num_rows(&self) -> u64 {
2922 self.num_rows
2923 }
2924}
2925
2926#[derive(Debug)]
2927struct VariableFullZipDecodeTask {
2928 details: Arc<FullZipDecodeDetails>,
2929 decompressor: Arc<dyn VariablePerValueDecompressor>,
2930 data: LanceBuffer,
2931 offsets: LanceBuffer,
2932 bits_per_offset: u8,
2933 num_visible_items: u64,
2934 rep: ScalarBuffer<u16>,
2935 def: ScalarBuffer<u16>,
2936}
2937
2938impl DecodePageTask for VariableFullZipDecodeTask {
2939 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2940 let block = VariableWidthBlock {
2941 data: self.data,
2942 offsets: self.offsets,
2943 bits_per_offset: self.bits_per_offset,
2944 num_values: self.num_visible_items,
2945 block_info: BlockInfo::new(),
2946 };
2947 let decomopressed = self.decompressor.decompress(block)?;
2948 let rep = if self.rep.is_empty() {
2949 None
2950 } else {
2951 Some(self.rep.to_vec())
2952 };
2953 let def = if self.def.is_empty() {
2954 None
2955 } else {
2956 Some(self.def.to_vec())
2957 };
2958 let unraveler = RepDefUnraveler::new(
2959 rep,
2960 def,
2961 self.details.def_meaning.clone(),
2962 self.num_visible_items,
2963 );
2964 Ok(DecodedPage {
2965 data: decomopressed,
2966 repdef: unraveler,
2967 })
2968 }
2969}
2970
2971#[derive(Debug)]
2972struct FullZipDecodeTaskItem {
2973 data: PerValueDataBlock,
2974 rows_in_buf: u64,
2975}
2976
2977#[derive(Debug)]
2980struct FixedFullZipDecodeTask {
2981 details: Arc<FullZipDecodeDetails>,
2982 data: Vec<FullZipDecodeTaskItem>,
2983 num_rows: usize,
2984 bytes_per_value: usize,
2985}
2986
2987impl DecodePageTask for FixedFullZipDecodeTask {
2988 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2989 let estimated_size_bytes = self
2991 .data
2992 .iter()
2993 .map(|task_item| task_item.data.data_size() as usize)
2994 .sum::<usize>()
2995 * 2;
2996 let mut data_builder =
2997 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2998
2999 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3000 for task_item in self.data.into_iter() {
3004 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3005 unreachable!()
3006 };
3007 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3008 else {
3009 unreachable!()
3010 };
3011 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3012 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3013 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3014 }
3015
3016 let unraveler = RepDefUnraveler::new(
3017 None,
3018 None,
3019 self.details.def_meaning.clone(),
3020 self.num_rows as u64,
3021 );
3022
3023 Ok(DecodedPage {
3024 data: data_builder.finish(),
3025 repdef: unraveler,
3026 })
3027 } else {
3028 let mut rep = Vec::with_capacity(self.num_rows);
3030 let mut def = Vec::with_capacity(self.num_rows);
3031
3032 for task_item in self.data.into_iter() {
3033 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3034 unreachable!()
3035 };
3036 let mut buf_slice = fixed_data.data.as_ref();
3037 let num_values = fixed_data.num_values as usize;
3038 let mut values = Vec::with_capacity(
3041 fixed_data.data.len()
3042 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3043 );
3044 let mut visible_items = 0;
3045 for _ in 0..num_values {
3046 self.details
3048 .ctrl_word_parser
3049 .parse(buf_slice, &mut rep, &mut def);
3050 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3051
3052 let is_visible = def
3053 .last()
3054 .map(|d| *d <= self.details.max_visible_def)
3055 .unwrap_or(true);
3056 if is_visible {
3057 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3059 buf_slice = &buf_slice[self.bytes_per_value..];
3060 visible_items += 1;
3061 }
3062 }
3063
3064 let values_buf = LanceBuffer::from(values);
3066 let fixed_data = FixedWidthDataBlock {
3067 bits_per_value: self.bytes_per_value as u64 * 8,
3068 block_info: BlockInfo::new(),
3069 data: values_buf,
3070 num_values: visible_items,
3071 };
3072 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3073 else {
3074 unreachable!()
3075 };
3076 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3077 data_builder.append(&decompressed, 0..visible_items);
3078 }
3079
3080 let repetition = if rep.is_empty() { None } else { Some(rep) };
3081 let definition = if def.is_empty() { None } else { Some(def) };
3082
3083 let unraveler = RepDefUnraveler::new(
3084 repetition,
3085 definition,
3086 self.details.def_meaning.clone(),
3087 self.num_rows as u64,
3088 );
3089 let data = data_builder.finish();
3090
3091 Ok(DecodedPage {
3092 data,
3093 repdef: unraveler,
3094 })
3095 }
3096 }
3097}
3098
3099#[derive(Debug)]
3100struct StructuralPrimitiveFieldSchedulingJob<'a> {
3101 scheduler: &'a StructuralPrimitiveFieldScheduler,
3102 ranges: Vec<Range<u64>>,
3103 page_idx: usize,
3104 range_idx: usize,
3105 global_row_offset: u64,
3106}
3107
3108impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3109 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3110 Self {
3111 scheduler,
3112 ranges,
3113 page_idx: 0,
3114 range_idx: 0,
3115 global_row_offset: 0,
3116 }
3117 }
3118}
3119
3120impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3121 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3122 if self.range_idx >= self.ranges.len() {
3123 return Ok(Vec::new());
3124 }
3125 let mut range = self.ranges[self.range_idx].clone();
3127 let priority = range.start;
3128
3129 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3130 trace!(
3131 "Current range is {:?} and current page has {} rows",
3132 range, cur_page.num_rows
3133 );
3134 while cur_page.num_rows + self.global_row_offset <= range.start {
3136 self.global_row_offset += cur_page.num_rows;
3137 self.page_idx += 1;
3138 trace!("Skipping entire page of {} rows", cur_page.num_rows);
3139 cur_page = &self.scheduler.page_schedulers[self.page_idx];
3140 }
3141
3142 let mut ranges_in_page = Vec::new();
3146 while cur_page.num_rows + self.global_row_offset > range.start {
3147 range.start = range.start.max(self.global_row_offset);
3148 let start_in_page = range.start - self.global_row_offset;
3149 let end_in_page = start_in_page + (range.end - range.start);
3150 let end_in_page = end_in_page.min(cur_page.num_rows);
3151 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3152
3153 ranges_in_page.push(start_in_page..end_in_page);
3154 if last_in_range {
3155 self.range_idx += 1;
3156 if self.range_idx == self.ranges.len() {
3157 break;
3158 }
3159 range = self.ranges[self.range_idx].clone();
3160 } else {
3161 break;
3162 }
3163 }
3164
3165 trace!(
3166 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3167 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3168 ranges_in_page.len(),
3169 cur_page.num_rows,
3170 priority,
3171 self.scheduler.column_index,
3172 cur_page.page_index,
3173 );
3174
3175 self.global_row_offset += cur_page.num_rows;
3176 self.page_idx += 1;
3177
3178 let page_decoders = cur_page
3179 .scheduler
3180 .schedule_ranges(&ranges_in_page, context.io())?;
3181
3182 let cur_path = context.current_path();
3183 page_decoders
3184 .into_iter()
3185 .map(|page_load_task| {
3186 let cur_path = cur_path.clone();
3187 let page_decoder = page_load_task.decoder_fut;
3188 let unloaded_page = async move {
3189 let page_decoder = page_decoder.await?;
3190 Ok(LoadedPageShard {
3191 decoder: page_decoder,
3192 path: cur_path,
3193 })
3194 }
3195 .boxed();
3196 Ok(ScheduledScanLine {
3197 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3198 rows_scheduled: page_load_task.num_rows,
3199 })
3200 })
3201 .collect::<Result<Vec<_>>>()
3202 }
3203}
3204
3205#[derive(Debug)]
3206struct PageInfoAndScheduler {
3207 page_index: usize,
3208 num_rows: u64,
3209 scheduler: Box<dyn StructuralPageScheduler>,
3210}
3211
3212#[derive(Debug)]
3217pub struct StructuralPrimitiveFieldScheduler {
3218 page_schedulers: Vec<PageInfoAndScheduler>,
3219 column_index: u32,
3220}
3221
3222impl StructuralPrimitiveFieldScheduler {
3223 pub fn try_new(
3224 column_info: &ColumnInfo,
3225 decompressors: &dyn DecompressionStrategy,
3226 cache_repetition_index: bool,
3227 target_field: &Field,
3228 ) -> Result<Self> {
3229 let page_schedulers = column_info
3230 .page_infos
3231 .iter()
3232 .enumerate()
3233 .map(|(page_index, page_info)| {
3234 Self::page_info_to_scheduler(
3235 page_info,
3236 page_index,
3237 decompressors,
3238 cache_repetition_index,
3239 target_field,
3240 )
3241 })
3242 .collect::<Result<Vec<_>>>()?;
3243 Ok(Self {
3244 page_schedulers,
3245 column_index: column_info.index,
3246 })
3247 }
3248
3249 fn page_layout_to_scheduler(
3250 page_info: &PageInfo,
3251 page_layout: &PageLayout,
3252 decompressors: &dyn DecompressionStrategy,
3253 cache_repetition_index: bool,
3254 target_field: &Field,
3255 ) -> Result<Box<dyn StructuralPageScheduler>> {
3256 use pb21::page_layout::Layout;
3257 Ok(match page_layout.layout.as_ref().expect_ok()? {
3258 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3259 &page_info.buffer_offsets_and_sizes,
3260 page_info.priority,
3261 mini_block.num_items,
3262 mini_block,
3263 decompressors,
3264 )?),
3265 Layout::FullZipLayout(full_zip) => {
3266 let mut scheduler = FullZipScheduler::try_new(
3267 &page_info.buffer_offsets_and_sizes,
3268 page_info.priority,
3269 page_info.num_rows,
3270 full_zip,
3271 decompressors,
3272 )?;
3273 scheduler.enable_cache = cache_repetition_index;
3274 Box::new(scheduler)
3275 }
3276 Layout::ConstantLayout(constant_layout) => {
3277 let def_meaning = constant_layout
3278 .layers
3279 .iter()
3280 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3281 .collect::<Vec<_>>();
3282 let has_scalar_value = constant_layout.inline_value.is_some()
3283 || page_info.buffer_offsets_and_sizes.len() == 1
3284 || page_info.buffer_offsets_and_sizes.len() == 3;
3285 if has_scalar_value {
3286 Box::new(constant::ConstantPageScheduler::try_new(
3287 page_info.buffer_offsets_and_sizes.clone(),
3288 constant_layout.inline_value.clone(),
3289 target_field.data_type(),
3290 def_meaning.into(),
3291 )?) as Box<dyn StructuralPageScheduler>
3292 } else if def_meaning.len() == 1
3293 && def_meaning[0] == DefinitionInterpretation::NullableItem
3294 {
3295 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3296 } else {
3297 let rep_decompressor = constant_layout
3298 .rep_compression
3299 .as_ref()
3300 .map(|encoding| decompressors.create_block_decompressor(encoding))
3301 .transpose()?
3302 .map(Arc::from);
3303
3304 let def_decompressor = constant_layout
3305 .def_compression
3306 .as_ref()
3307 .map(|encoding| decompressors.create_block_decompressor(encoding))
3308 .transpose()?
3309 .map(Arc::from);
3310
3311 Box::new(ComplexAllNullScheduler::new(
3312 page_info.buffer_offsets_and_sizes.clone(),
3313 def_meaning.into(),
3314 rep_decompressor,
3315 def_decompressor,
3316 constant_layout.num_rep_values,
3317 constant_layout.num_def_values,
3318 )) as Box<dyn StructuralPageScheduler>
3319 }
3320 }
3321 Layout::BlobLayout(blob) => {
3322 let inner_scheduler = Self::page_layout_to_scheduler(
3323 page_info,
3324 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3325 decompressors,
3326 cache_repetition_index,
3327 target_field,
3328 )?;
3329 let def_meaning = blob
3330 .layers
3331 .iter()
3332 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3333 .collect::<Vec<_>>();
3334 if matches!(target_field.data_type(), DataType::Struct(_)) {
3335 Box::new(BlobDescriptionPageScheduler::new(
3337 inner_scheduler,
3338 def_meaning.into(),
3339 ))
3340 } else {
3341 Box::new(BlobPageScheduler::new(
3343 inner_scheduler,
3344 page_info.priority,
3345 page_info.num_rows,
3346 def_meaning.into(),
3347 ))
3348 }
3349 }
3350 })
3351 }
3352
3353 fn page_info_to_scheduler(
3354 page_info: &PageInfo,
3355 page_index: usize,
3356 decompressors: &dyn DecompressionStrategy,
3357 cache_repetition_index: bool,
3358 target_field: &Field,
3359 ) -> Result<PageInfoAndScheduler> {
3360 let page_layout = page_info.encoding.as_structural();
3361 let scheduler = Self::page_layout_to_scheduler(
3362 page_info,
3363 page_layout,
3364 decompressors,
3365 cache_repetition_index,
3366 target_field,
3367 )?;
3368 Ok(PageInfoAndScheduler {
3369 page_index,
3370 num_rows: page_info.num_rows,
3371 scheduler,
3372 })
3373 }
3374}
3375
3376pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3377 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3378}
3379
3380pub struct NoCachedPageData;
3381
3382impl DeepSizeOf for NoCachedPageData {
3383 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3384 0
3385 }
3386}
3387impl CachedPageData for NoCachedPageData {
3388 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3389 self
3390 }
3391}
3392
3393pub struct CachedFieldData {
3394 pages: Vec<Arc<dyn CachedPageData>>,
3395}
3396
3397impl DeepSizeOf for CachedFieldData {
3398 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3399 self.pages.deep_size_of_children(ctx)
3400 }
3401}
3402
3403#[derive(Debug, Clone)]
3405pub struct FieldDataCacheKey {
3406 pub column_index: u32,
3407}
3408
3409impl CacheKey for FieldDataCacheKey {
3410 type ValueType = CachedFieldData;
3411
3412 fn key(&self) -> std::borrow::Cow<'_, str> {
3413 self.column_index.to_string().into()
3414 }
3415
3416 fn type_name() -> &'static str {
3417 "FieldData"
3418 }
3419}
3420
3421impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3422 fn initialize<'a>(
3423 &'a mut self,
3424 _filter: &'a FilterExpression,
3425 context: &'a SchedulerContext,
3426 ) -> BoxFuture<'a, Result<()>> {
3427 let cache_key = FieldDataCacheKey {
3428 column_index: self.column_index,
3429 };
3430 let cache = context.cache().clone();
3431
3432 async move {
3433 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3434 self.page_schedulers
3435 .iter_mut()
3436 .zip(cached_data.pages.iter())
3437 .for_each(|(page_scheduler, cached_data)| {
3438 page_scheduler.scheduler.load(cached_data);
3439 });
3440 return Ok(());
3441 }
3442
3443 let page_data = self
3444 .page_schedulers
3445 .iter_mut()
3446 .map(|s| s.scheduler.initialize(context.io()))
3447 .collect::<FuturesOrdered<_>>();
3448
3449 let page_data = page_data.try_collect::<Vec<_>>().await?;
3450 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3451 cache.insert_with_key(&cache_key, cached_data).await;
3452 Ok(())
3453 }
3454 .boxed()
3455 }
3456
3457 fn schedule_ranges<'a>(
3458 &'a self,
3459 ranges: &[Range<u64>],
3460 _filter: &FilterExpression,
3461 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3462 let ranges = ranges.to_vec();
3463 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3464 self, ranges,
3465 )))
3466 }
3467}
3468
3469#[derive(Debug)]
3472pub struct StructuralCompositeDecodeArrayTask {
3473 tasks: Vec<Box<dyn DecodePageTask>>,
3474 should_validate: bool,
3475 data_type: DataType,
3476}
3477
3478impl StructuralCompositeDecodeArrayTask {
3479 fn restore_validity(
3480 array: Arc<dyn Array>,
3481 unraveler: &mut CompositeRepDefUnraveler,
3482 ) -> Arc<dyn Array> {
3483 let validity = unraveler.unravel_validity(array.len());
3484 let Some(validity) = validity else {
3485 return array;
3486 };
3487 if array.data_type() == &DataType::Null {
3488 return array;
3490 }
3491 assert_eq!(validity.len(), array.len());
3492 make_array(unsafe {
3495 array
3496 .to_data()
3497 .into_builder()
3498 .nulls(Some(validity))
3499 .build_unchecked()
3500 })
3501 }
3502}
3503
3504impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3505 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3506 let mut arrays = Vec::with_capacity(self.tasks.len());
3507 let mut unravelers = Vec::with_capacity(self.tasks.len());
3508 let mut data_size = 0u64;
3509 for task in self.tasks {
3510 let decoded = task.decode()?;
3511 data_size += decoded.data.data_size();
3512 unravelers.push(decoded.repdef);
3513
3514 let array = make_array(
3515 decoded
3516 .data
3517 .into_arrow(self.data_type.clone(), self.should_validate)?,
3518 );
3519
3520 arrays.push(array);
3521 }
3522 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3523 let array = arrow_select::concat::concat(&array_refs)?;
3524 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3525
3526 let array = Self::restore_validity(array, &mut repdef);
3527
3528 Ok(DecodedArray {
3529 array,
3530 repdef,
3531 data_size,
3532 })
3533 }
3534}
3535
3536#[derive(Debug)]
3537pub struct StructuralPrimitiveFieldDecoder {
3538 field: Arc<ArrowField>,
3539 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3540 should_validate: bool,
3541 rows_drained_in_current: u64,
3542}
3543
3544impl StructuralPrimitiveFieldDecoder {
3545 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3546 Self {
3547 field: field.clone(),
3548 page_decoders: VecDeque::new(),
3549 should_validate,
3550 rows_drained_in_current: 0,
3551 }
3552 }
3553}
3554
3555impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3556 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3557 assert!(child.path.is_empty());
3558 self.page_decoders.push_back(child.decoder);
3559 Ok(())
3560 }
3561
3562 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3563 let mut remaining = num_rows;
3564 let mut tasks = Vec::new();
3565 while remaining > 0 {
3566 let cur_page = self.page_decoders.front_mut().unwrap();
3567 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3568 let to_take = num_in_page.min(remaining);
3569
3570 let task = cur_page.drain(to_take)?;
3571 tasks.push(task);
3572
3573 if to_take == num_in_page {
3574 self.page_decoders.pop_front();
3575 self.rows_drained_in_current = 0;
3576 } else {
3577 self.rows_drained_in_current += to_take;
3578 }
3579
3580 remaining -= to_take;
3581 }
3582 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3583 tasks,
3584 should_validate: self.should_validate,
3585 data_type: self.field.data_type().clone(),
3586 }))
3587 }
3588
3589 fn data_type(&self) -> &DataType {
3590 self.field.data_type()
3591 }
3592}
3593
3594struct SerializedFullZip {
3596 values: LanceBuffer,
3598 repetition_index: Option<LanceBuffer>,
3600}
3601
3602const MINIBLOCK_ALIGNMENT: usize = 8;
3622
3623pub struct PrimitiveStructuralEncoder {
3650 accumulation_queue: AccumulationQueue,
3652
3653 keep_original_array: bool,
3654 support_large_chunk: bool,
3655 accumulated_repdefs: Vec<RepDefBuilder>,
3656 compression_strategy: Arc<dyn CompressionStrategy>,
3658 column_index: u32,
3659 field: Field,
3660 encoding_metadata: Arc<HashMap<String, String>>,
3661 version: LanceFileVersion,
3662}
3663
3664struct CompressedLevelsChunk {
3665 data: LanceBuffer,
3666 num_levels: u16,
3667}
3668
3669struct CompressedLevels {
3670 data: Vec<CompressedLevelsChunk>,
3671 compression: CompressiveEncoding,
3672 rep_index: Option<LanceBuffer>,
3673}
3674
3675struct SerializedMiniBlockPage {
3676 num_buffers: u64,
3677 data: LanceBuffer,
3678 metadata: LanceBuffer,
3679}
3680
3681#[derive(Debug, Clone, Copy)]
3682struct DictEncodingBudget {
3683 max_dict_entries: u32,
3684 max_encoded_size: usize,
3685}
3686
3687impl PrimitiveStructuralEncoder {
3688 pub fn try_new(
3689 options: &EncodingOptions,
3690 compression_strategy: Arc<dyn CompressionStrategy>,
3691 column_index: u32,
3692 field: Field,
3693 encoding_metadata: Arc<HashMap<String, String>>,
3694 ) -> Result<Self> {
3695 Ok(Self {
3696 accumulation_queue: AccumulationQueue::new(
3697 options.cache_bytes_per_column,
3698 column_index,
3699 options.keep_original_array,
3700 ),
3701 support_large_chunk: options.support_large_chunk(),
3702 keep_original_array: options.keep_original_array,
3703 accumulated_repdefs: Vec::new(),
3704 column_index,
3705 compression_strategy,
3706 field,
3707 encoding_metadata,
3708 version: options.version,
3709 })
3710 }
3711
3712 fn is_narrow(data_block: &DataBlock) -> bool {
3720 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3721
3722 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3723 let max_len_array = max_len_array
3724 .as_any()
3725 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3726 .unwrap();
3727 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3728 return true;
3729 }
3730 }
3731 false
3732 }
3733
3734 fn prefers_miniblock(
3735 data_block: &DataBlock,
3736 encoding_metadata: &HashMap<String, String>,
3737 ) -> bool {
3738 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3740 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3741 }
3742 Self::is_narrow(data_block)
3744 }
3745
3746 fn repdef_too_sparse_for_miniblock(
3759 repdef: &crate::repdef::SerializedRepDefs,
3760 num_values: u64,
3761 ) -> bool {
3762 if num_values == 0 {
3763 return false;
3764 }
3765 let num_levels = repdef
3766 .repetition_levels
3767 .as_ref()
3768 .map(|r| r.len() as u64)
3769 .max(repdef.definition_levels.as_ref().map(|d| d.len() as u64))
3770 .unwrap_or(0);
3771 if num_levels == 0 {
3772 return false;
3773 }
3774
3775 let bits_per_rep = repdef
3777 .repetition_levels
3778 .as_ref()
3779 .and_then(|r| r.iter().max().copied())
3780 .map(|max_val| u16::BITS - max_val.leading_zeros())
3781 .unwrap_or(0) as u64;
3782 let bits_per_def = repdef
3783 .definition_levels
3784 .as_ref()
3785 .and_then(|d| d.iter().max().copied())
3786 .map(|max_val| u16::BITS - max_val.leading_zeros())
3787 .unwrap_or(0) as u64;
3788
3789 let bits_per_level = bits_per_rep + bits_per_def;
3790 if bits_per_level == 0 {
3791 return false;
3792 }
3793
3794 const REPDEF_BUDGET_BITS: u64 = 16 * 1024 * 8;
3796 let max_levels_per_chunk = REPDEF_BUDGET_BITS / bits_per_level;
3797
3798 let levels_per_chunk =
3801 (num_levels as f64 / num_values as f64) * *miniblock::MAX_MINIBLOCK_VALUES as f64;
3802
3803 levels_per_chunk > max_levels_per_chunk as f64
3804 }
3805
3806 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3807 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3811 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3812 }
3813 true
3814 }
3815
3816 fn serialize_miniblocks(
3863 miniblocks: MiniBlockCompressed,
3864 rep: Option<Vec<CompressedLevelsChunk>>,
3865 def: Option<Vec<CompressedLevelsChunk>>,
3866 support_large_chunk: bool,
3867 ) -> Result<SerializedMiniBlockPage> {
3868 let bytes_rep = rep
3869 .as_ref()
3870 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3871 .unwrap_or(0);
3872 let bytes_def = def
3873 .as_ref()
3874 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3875 .unwrap_or(0);
3876 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3877 let mut num_buffers = miniblocks.data.len();
3878 if rep.is_some() {
3879 num_buffers += 1;
3880 }
3881 if def.is_some() {
3882 num_buffers += 1;
3883 }
3884 let max_extra = 9 * num_buffers;
3886 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3887 let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3888 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3889
3890 let mut rep_iter = rep.map(|r| r.into_iter());
3891 let mut def_iter = def.map(|d| d.into_iter());
3892
3893 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3894 for chunk in miniblocks.chunks {
3895 let start_pos = data_buffer.len();
3896 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3898
3899 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3900 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3901
3902 let num_levels = rep
3904 .as_ref()
3905 .map(|r| r.num_levels)
3906 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3907 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3908
3909 if let Some(rep) = rep.as_ref() {
3911 let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
3912 Error::internal(format!(
3913 "Repetition buffer size ({} bytes) too large",
3914 rep.data.len()
3915 ))
3916 })?;
3917 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3918 }
3919 if let Some(def) = def.as_ref() {
3920 let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
3921 Error::internal(format!(
3922 "Definition buffer size ({} bytes) too large",
3923 def.data.len()
3924 ))
3925 })?;
3926 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3927 }
3928
3929 if support_large_chunk {
3930 for &buffer_size in &chunk.buffer_sizes {
3931 data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
3932 }
3933 } else {
3934 for &buffer_size in &chunk.buffer_sizes {
3935 data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
3936 }
3937 }
3938
3939 let add_padding = |data_buffer: &mut Vec<u8>| {
3941 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3942 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3943 };
3944 add_padding(&mut data_buffer);
3945
3946 if let Some(rep) = rep.as_ref() {
3948 data_buffer.extend_from_slice(&rep.data);
3949 add_padding(&mut data_buffer);
3950 }
3951 if let Some(def) = def.as_ref() {
3952 data_buffer.extend_from_slice(&def.data);
3953 add_padding(&mut data_buffer);
3954 }
3955 for (buffer_size, (buffer, buffer_offset)) in chunk
3956 .buffer_sizes
3957 .iter()
3958 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3959 {
3960 let start = *buffer_offset;
3961 let end = start + *buffer_size as usize;
3962 *buffer_offset += *buffer_size as usize;
3963 data_buffer.extend_from_slice(&buffer[start..end]);
3964 add_padding(&mut data_buffer);
3965 }
3966
3967 let chunk_bytes = data_buffer.len() - start_pos;
3968 let max_chunk_size = if support_large_chunk {
3969 4 * 1024 * 1024 * 1024 } else {
3971 32 * 1024 };
3973 assert!(chunk_bytes <= max_chunk_size);
3974 assert!(chunk_bytes > 0);
3975 assert_eq!(chunk_bytes % 8, 0);
3976 assert!(chunk.log_num_values <= 12);
3978 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3982 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3983
3984 let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
3985 if support_large_chunk {
3986 meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
3987 } else {
3988 meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
3989 }
3990 }
3991
3992 let data_buffer = LanceBuffer::from(data_buffer);
3993 let metadata_buffer = LanceBuffer::from(meta_buffer);
3994
3995 Ok(SerializedMiniBlockPage {
3996 num_buffers: miniblocks.data.len() as u64,
3997 data: data_buffer,
3998 metadata: metadata_buffer,
3999 })
4000 }
4001
4002 fn compress_levels(
4007 mut levels: RepDefSlicer<'_>,
4008 num_elements: u64,
4009 compression_strategy: &dyn CompressionStrategy,
4010 chunks: &[MiniBlockChunk],
4011 max_rep: u16,
4013 ) -> Result<CompressedLevels> {
4014 let mut rep_index = if max_rep > 0 {
4015 Vec::with_capacity(chunks.len())
4016 } else {
4017 vec![]
4018 };
4019 let num_levels = levels.num_levels() as u64;
4021 let levels_buf = levels.all_levels().clone();
4022
4023 let mut fixed_width_block = FixedWidthDataBlock {
4024 data: levels_buf,
4025 bits_per_value: 16,
4026 num_values: num_levels,
4027 block_info: BlockInfo::new(),
4028 };
4029 fixed_width_block.compute_stat();
4031
4032 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4033 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4034 let (compressor, compressor_desc) =
4036 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4037 let mut level_chunks = Vec::with_capacity(chunks.len());
4039 let mut values_counter = 0;
4040 for (chunk_idx, chunk) in chunks.iter().enumerate() {
4041 let chunk_num_values = chunk.num_values(values_counter, num_elements);
4042 debug_assert!(chunk_num_values > 0);
4043 values_counter += chunk_num_values;
4044 let chunk_levels = if chunk_idx < chunks.len() - 1 {
4045 levels.slice_next(chunk_num_values as usize)
4046 } else {
4047 levels.slice_rest()
4048 };
4049 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
4050 if max_rep > 0 {
4051 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
4061 let rep_values = rep_values.as_ref();
4062
4063 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
4066 let num_leftovers = if chunk_idx < chunks.len() - 1 {
4067 rep_values
4068 .iter()
4069 .rev()
4070 .position(|v| *v == max_rep)
4071 .map(|pos| pos + 1)
4073 .unwrap_or(rep_values.len())
4074 } else {
4075 0
4077 };
4078
4079 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4080 let rep_len = rep_index.len();
4084 if rep_index[rep_len - 1] != 0 {
4085 rep_index[rep_len - 2] += 1;
4087 rep_index[rep_len - 1] = 0;
4088 }
4089 }
4090
4091 if chunk_idx == chunks.len() - 1 {
4092 num_rows += 1;
4094 }
4095 rep_index.push(num_rows as u64);
4096 rep_index.push(num_leftovers as u64);
4097 }
4098 let mut chunk_fixed_width = FixedWidthDataBlock {
4099 data: chunk_levels,
4100 bits_per_value: 16,
4101 num_values: num_chunk_levels,
4102 block_info: BlockInfo::new(),
4103 };
4104 chunk_fixed_width.compute_stat();
4105 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4106 let compressed_levels = compressor.compress(chunk_levels_block)?;
4107 level_chunks.push(CompressedLevelsChunk {
4108 data: compressed_levels,
4109 num_levels: num_chunk_levels as u16,
4110 });
4111 }
4112 debug_assert_eq!(levels.num_levels_remaining(), 0);
4113 let rep_index = if rep_index.is_empty() {
4114 None
4115 } else {
4116 Some(LanceBuffer::reinterpret_vec(rep_index))
4117 };
4118 Ok(CompressedLevels {
4119 data: level_chunks,
4120 compression: compressor_desc,
4121 rep_index,
4122 })
4123 }
4124
4125 fn encode_simple_all_null(
4126 column_idx: u32,
4127 num_rows: u64,
4128 row_number: u64,
4129 ) -> Result<EncodedPage> {
4130 let description =
4131 ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4132 Ok(EncodedPage {
4133 column_idx,
4134 data: vec![],
4135 description: PageEncoding::Structural(description),
4136 num_rows,
4137 row_number,
4138 })
4139 }
4140
4141 fn encode_complex_all_null_vals(
4142 data: &Arc<[u16]>,
4143 compression_strategy: &dyn CompressionStrategy,
4144 ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4145 let buffer = LanceBuffer::reinterpret_slice(data.clone());
4146 let mut fixed_width_block = FixedWidthDataBlock {
4147 data: buffer,
4148 bits_per_value: 16,
4149 num_values: data.len() as u64,
4150 block_info: BlockInfo::new(),
4151 };
4152 fixed_width_block.compute_stat();
4153
4154 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4155 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4156 let (compressor, encoding) =
4157 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4158 let compressed_buffer = compressor.compress(levels_block)?;
4159 Ok((compressed_buffer, encoding))
4160 }
4161
4162 fn encode_complex_all_null(
4166 column_idx: u32,
4167 repdef: crate::repdef::SerializedRepDefs,
4168 row_number: u64,
4169 num_rows: u64,
4170 version: LanceFileVersion,
4171 compression_strategy: &dyn CompressionStrategy,
4172 ) -> Result<EncodedPage> {
4173 if version.resolve() < LanceFileVersion::V2_2 {
4174 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4175 LanceBuffer::reinterpret_slice(rep.clone())
4176 } else {
4177 LanceBuffer::empty()
4178 };
4179
4180 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4181 LanceBuffer::reinterpret_slice(def.clone())
4182 } else {
4183 LanceBuffer::empty()
4184 };
4185
4186 let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4187 return Ok(EncodedPage {
4188 column_idx,
4189 data: vec![rep_bytes, def_bytes],
4190 description: PageEncoding::Structural(description),
4191 num_rows,
4192 row_number,
4193 });
4194 }
4195
4196 let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4197 repdef.repetition_levels.as_ref()
4198 {
4199 let num_values = rep.len() as u64;
4200 let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4201 (buffer, Some(encoding), num_values)
4202 } else {
4203 (LanceBuffer::empty(), None, 0)
4204 };
4205
4206 let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4207 repdef.definition_levels.as_ref()
4208 {
4209 let num_values = def.len() as u64;
4210 let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4211 (buffer, Some(encoding), num_values)
4212 } else {
4213 (LanceBuffer::empty(), None, 0)
4214 };
4215
4216 let description = ProtobufUtils21::compressed_all_null_constant_layout(
4217 &repdef.def_meaning,
4218 rep_encoding,
4219 def_encoding,
4220 num_rep_values,
4221 num_def_values,
4222 );
4223 Ok(EncodedPage {
4224 column_idx,
4225 data: vec![rep_bytes, def_bytes],
4226 description: PageEncoding::Structural(description),
4227 num_rows,
4228 row_number,
4229 })
4230 }
4231
4232 fn leaf_validity(
4233 repdef: &crate::repdef::SerializedRepDefs,
4234 num_values: usize,
4235 ) -> Result<Option<BooleanBuffer>> {
4236 let rep = repdef
4237 .repetition_levels
4238 .as_ref()
4239 .map(|rep| rep.as_ref().to_vec());
4240 let def = repdef
4241 .definition_levels
4242 .as_ref()
4243 .map(|def| def.as_ref().to_vec());
4244 let mut unraveler = RepDefUnraveler::new(
4245 rep,
4246 def,
4247 repdef.def_meaning.clone().into(),
4248 num_values as u64,
4249 );
4250 if unraveler.is_all_valid() {
4251 return Ok(None);
4252 }
4253 let mut validity = BooleanBufferBuilder::new(num_values);
4254 unraveler.unravel_validity(&mut validity);
4255 Ok(Some(validity.finish()))
4256 }
4257
4258 fn is_constant_values(
4259 arrays: &[ArrayRef],
4260 scalar: &ArrayRef,
4261 validity: Option<&BooleanBuffer>,
4262 ) -> Result<bool> {
4263 debug_assert_eq!(scalar.len(), 1);
4264 debug_assert_eq!(scalar.null_count(), 0);
4265
4266 match scalar.data_type() {
4267 DataType::Boolean => {
4268 let mut global_idx = 0usize;
4269 let scalar_val = scalar.as_boolean().value(0);
4270 for arr in arrays {
4271 let bool_arr = arr.as_boolean();
4272 for i in 0..arr.len() {
4273 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4274 global_idx += 1;
4275 if !is_valid {
4276 continue;
4277 }
4278 if bool_arr.value(i) != scalar_val {
4279 return Ok(false);
4280 }
4281 }
4282 }
4283 Ok(true)
4284 }
4285 DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4286 DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4287 DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4288 DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4289 data_type => {
4290 let mut global_idx = 0usize;
4291 let Some(byte_width) = data_type.byte_width_opt() else {
4292 return Ok(false);
4293 };
4294 let scalar_data = scalar.to_data();
4295 if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4296 return Ok(false);
4297 }
4298 let scalar_bytes = scalar_data.buffers()[0].as_slice();
4299 if scalar_bytes.len() != byte_width {
4300 return Ok(false);
4301 }
4302
4303 for arr in arrays {
4304 let data = arr.to_data();
4305 if data.buffers().is_empty() {
4306 return Ok(false);
4307 }
4308 let buf = data.buffers()[0].as_slice();
4309 let base = data.offset();
4310 for i in 0..arr.len() {
4311 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4312 global_idx += 1;
4313 if !is_valid {
4314 continue;
4315 }
4316 let start = (base + i) * byte_width;
4317 if buf[start..start + byte_width] != scalar_bytes[..] {
4318 return Ok(false);
4319 }
4320 }
4321 }
4322 Ok(true)
4323 }
4324 }
4325 }
4326
4327 fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4328 arrays: &[ArrayRef],
4329 scalar: &ArrayRef,
4330 validity: Option<&BooleanBuffer>,
4331 ) -> Result<bool> {
4332 debug_assert_eq!(scalar.len(), 1);
4333 let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4334 let mut global_idx = 0usize;
4335 for arr in arrays {
4336 let str_arr = arr.as_string::<O>();
4337 for i in 0..arr.len() {
4338 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4339 global_idx += 1;
4340 if !is_valid {
4341 continue;
4342 }
4343 if str_arr.value(i).as_bytes() != scalar_val {
4344 return Ok(false);
4345 }
4346 }
4347 }
4348 Ok(true)
4349 }
4350
4351 fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4352 arrays: &[ArrayRef],
4353 scalar: &ArrayRef,
4354 validity: Option<&BooleanBuffer>,
4355 ) -> Result<bool> {
4356 debug_assert_eq!(scalar.len(), 1);
4357 let scalar_val = scalar.as_binary::<O>().value(0);
4358 let mut global_idx = 0usize;
4359 for arr in arrays {
4360 let bin_arr = arr.as_binary::<O>();
4361 for i in 0..arr.len() {
4362 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4363 global_idx += 1;
4364 if !is_valid {
4365 continue;
4366 }
4367 if bin_arr.value(i) != scalar_val {
4368 return Ok(false);
4369 }
4370 }
4371 }
4372 Ok(true)
4373 }
4374
4375 fn find_constant_scalar(
4376 arrays: &[ArrayRef],
4377 validity: Option<&BooleanBuffer>,
4378 ) -> Result<Option<ArrayRef>> {
4379 if arrays.is_empty() {
4380 return Ok(None);
4381 }
4382
4383 let global_scalar_idx = if let Some(validity) = validity {
4384 let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4385 return Ok(None);
4386 };
4387 idx
4388 } else {
4389 0
4390 };
4391
4392 let mut idx_remaining = global_scalar_idx;
4393 let mut scalar_arr_idx = 0usize;
4394 while scalar_arr_idx < arrays.len() {
4395 let len = arrays[scalar_arr_idx].len();
4396 if idx_remaining < len {
4397 break;
4398 }
4399 idx_remaining -= len;
4400 scalar_arr_idx += 1;
4401 }
4402
4403 if scalar_arr_idx >= arrays.len() {
4404 return Ok(None);
4405 }
4406
4407 let scalar =
4408 lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4409 if scalar.null_count() != 0 {
4410 return Ok(None);
4411 }
4412 if !Self::is_constant_values(arrays, &scalar, validity)? {
4413 return Ok(None);
4414 }
4415 Ok(Some(scalar))
4416 }
4417
4418 fn resolve_dict_values_compression_metadata(
4419 field_metadata: &HashMap<String, String>,
4420 env_compression: Option<String>,
4421 env_compression_level: Option<String>,
4422 ) -> HashMap<String, String> {
4423 let mut metadata = HashMap::new();
4424
4425 let compression = field_metadata
4426 .get(DICT_VALUES_COMPRESSION_META_KEY)
4427 .cloned()
4428 .or(env_compression)
4429 .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4430 metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4431
4432 if let Some(compression_level) = field_metadata
4433 .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4434 .cloned()
4435 .or(env_compression_level)
4436 {
4437 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4438 }
4439
4440 metadata
4441 }
4442
4443 fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4444 let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4449 dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4450 &field.metadata,
4451 env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4452 env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4453 );
4454 Ok(dict_values_field)
4455 }
4456
4457 #[allow(clippy::too_many_arguments)]
4458 fn encode_miniblock(
4459 column_idx: u32,
4460 field: &Field,
4461 compression_strategy: &dyn CompressionStrategy,
4462 data: DataBlock,
4463 repdef: crate::repdef::SerializedRepDefs,
4464 row_number: u64,
4465 dictionary_data: Option<DataBlock>,
4466 num_rows: u64,
4467 support_large_chunk: bool,
4468 ) -> Result<EncodedPage> {
4469 if let DataBlock::AllNull(_null_block) = data {
4470 unreachable!()
4473 }
4474
4475 let num_items = data.num_values();
4476
4477 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4478 let (compressed_data, value_encoding) = compressor.compress(data)?;
4479
4480 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4481
4482 let mut compressed_rep = repdef
4483 .rep_slicer()
4484 .map(|rep_slicer| {
4485 Self::compress_levels(
4486 rep_slicer,
4487 num_items,
4488 compression_strategy,
4489 &compressed_data.chunks,
4490 max_rep,
4491 )
4492 })
4493 .transpose()?;
4494
4495 let (rep_index, rep_index_depth) =
4496 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4497 Some(rep_index) => (Some(rep_index.clone()), 1),
4498 None => (None, 0),
4499 };
4500
4501 let mut compressed_def = repdef
4502 .def_slicer()
4503 .map(|def_slicer| {
4504 Self::compress_levels(
4505 def_slicer,
4506 num_items,
4507 compression_strategy,
4508 &compressed_data.chunks,
4509 0,
4510 )
4511 })
4512 .transpose()?;
4513
4514 let rep_data = compressed_rep
4520 .as_mut()
4521 .map(|cr| std::mem::take(&mut cr.data));
4522 let def_data = compressed_def
4523 .as_mut()
4524 .map(|cd| std::mem::take(&mut cd.data));
4525
4526 let serialized =
4527 Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
4528
4529 let mut data = Vec::with_capacity(4);
4531 data.push(serialized.metadata);
4532 data.push(serialized.data);
4533
4534 if let Some(dictionary_data) = dictionary_data {
4535 let num_dictionary_items = dictionary_data.num_values();
4536 let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4537
4538 let (compressor, dictionary_encoding) = compression_strategy
4539 .create_block_compressor(&dict_values_field, &dictionary_data)?;
4540 let dictionary_buffer = compressor.compress(dictionary_data)?;
4541
4542 data.push(dictionary_buffer);
4543 if let Some(rep_index) = rep_index {
4544 data.push(rep_index);
4545 }
4546
4547 let description = ProtobufUtils21::miniblock_layout(
4548 compressed_rep.map(|cr| cr.compression),
4549 compressed_def.map(|cd| cd.compression),
4550 value_encoding,
4551 rep_index_depth,
4552 serialized.num_buffers,
4553 Some((dictionary_encoding, num_dictionary_items)),
4554 &repdef.def_meaning,
4555 num_items,
4556 support_large_chunk,
4557 );
4558 Ok(EncodedPage {
4559 num_rows,
4560 column_idx,
4561 data,
4562 description: PageEncoding::Structural(description),
4563 row_number,
4564 })
4565 } else {
4566 let description = ProtobufUtils21::miniblock_layout(
4567 compressed_rep.map(|cr| cr.compression),
4568 compressed_def.map(|cd| cd.compression),
4569 value_encoding,
4570 rep_index_depth,
4571 serialized.num_buffers,
4572 None,
4573 &repdef.def_meaning,
4574 num_items,
4575 support_large_chunk,
4576 );
4577
4578 if let Some(rep_index) = rep_index {
4579 let view = rep_index.borrow_to_typed_slice::<u64>();
4580 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4581 debug_assert_eq!(total, num_rows);
4582
4583 data.push(rep_index);
4584 }
4585
4586 Ok(EncodedPage {
4587 num_rows,
4588 column_idx,
4589 data,
4590 description: PageEncoding::Structural(description),
4591 row_number,
4592 })
4593 }
4594 }
4595
4596 fn serialize_full_zip_fixed(
4598 fixed: FixedWidthDataBlock,
4599 mut repdef: ControlWordIterator,
4600 num_values: u64,
4601 ) -> SerializedFullZip {
4602 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4603 let mut zipped_data = Vec::with_capacity(len);
4604
4605 let max_rep_index_val = if repdef.has_repetition() {
4606 len as u64
4607 } else {
4608 0
4610 };
4611 let mut rep_index_builder =
4612 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4613
4614 assert_eq!(
4617 fixed.bits_per_value % 8,
4618 0,
4619 "Non-byte aligned full-zip compression not yet supported"
4620 );
4621
4622 let bytes_per_value = fixed.bits_per_value as usize / 8;
4623 let mut offset = 0;
4624
4625 if bytes_per_value == 0 {
4626 while let Some(control) = repdef.append_next(&mut zipped_data) {
4628 if control.is_new_row {
4629 debug_assert!(offset <= len);
4631 unsafe { rep_index_builder.append(offset as u64) };
4633 }
4634 offset = zipped_data.len();
4635 }
4636 } else {
4637 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4639 while let Some(control) = repdef.append_next(&mut zipped_data) {
4640 if control.is_new_row {
4641 debug_assert!(offset <= len);
4643 unsafe { rep_index_builder.append(offset as u64) };
4645 }
4646 if control.is_visible {
4647 let value = data_iter.next().unwrap();
4648 zipped_data.extend_from_slice(value);
4649 }
4650 offset = zipped_data.len();
4651 }
4652 }
4653
4654 debug_assert_eq!(zipped_data.len(), len);
4655 unsafe {
4658 rep_index_builder.append(zipped_data.len() as u64);
4659 }
4660
4661 let zipped_data = LanceBuffer::from(zipped_data);
4662 let rep_index = rep_index_builder.into_data();
4663 let rep_index = if rep_index.is_empty() {
4664 None
4665 } else {
4666 Some(LanceBuffer::from(rep_index))
4667 };
4668 SerializedFullZip {
4669 values: zipped_data,
4670 repetition_index: rep_index,
4671 }
4672 }
4673
4674 fn serialize_full_zip_variable(
4678 variable: VariableWidthBlock,
4679 mut repdef: ControlWordIterator,
4680 num_items: u64,
4681 ) -> SerializedFullZip {
4682 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4683 assert_eq!(
4684 variable.bits_per_offset % 8,
4685 0,
4686 "Only byte-aligned offsets supported"
4687 );
4688 let len = variable.data.len()
4689 + repdef.bytes_per_word() * num_items as usize
4690 + bytes_per_offset * variable.num_values as usize;
4691 let mut buf = Vec::with_capacity(len);
4692
4693 let max_rep_index_val = len as u64;
4694 let mut rep_index_builder =
4695 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4696
4697 match bytes_per_offset {
4699 4 => {
4700 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4701 let mut rep_offset = 0;
4702 let mut windows_iter = offs.as_ref().windows(2);
4703 while let Some(control) = repdef.append_next(&mut buf) {
4704 if control.is_new_row {
4705 debug_assert!(rep_offset <= len);
4707 unsafe { rep_index_builder.append(rep_offset as u64) };
4709 }
4710 if control.is_visible {
4711 let window = windows_iter.next().unwrap();
4712 if control.is_valid_item {
4713 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4714 buf.extend_from_slice(
4715 &variable.data[window[0] as usize..window[1] as usize],
4716 );
4717 }
4718 }
4719 rep_offset = buf.len();
4720 }
4721 }
4722 8 => {
4723 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4724 let mut rep_offset = 0;
4725 let mut windows_iter = offs.as_ref().windows(2);
4726 while let Some(control) = repdef.append_next(&mut buf) {
4727 if control.is_new_row {
4728 debug_assert!(rep_offset <= len);
4730 unsafe { rep_index_builder.append(rep_offset as u64) };
4732 }
4733 if control.is_visible {
4734 let window = windows_iter.next().unwrap();
4735 if control.is_valid_item {
4736 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4737 buf.extend_from_slice(
4738 &variable.data[window[0] as usize..window[1] as usize],
4739 );
4740 }
4741 }
4742 rep_offset = buf.len();
4743 }
4744 }
4745 _ => panic!("Unsupported offset size"),
4746 }
4747
4748 debug_assert!(buf.len() <= len);
4751 unsafe {
4754 rep_index_builder.append(buf.len() as u64);
4755 }
4756
4757 let zipped_data = LanceBuffer::from(buf);
4758 let rep_index = rep_index_builder.into_data();
4759 debug_assert!(!rep_index.is_empty());
4760 let rep_index = Some(LanceBuffer::from(rep_index));
4761 SerializedFullZip {
4762 values: zipped_data,
4763 repetition_index: rep_index,
4764 }
4765 }
4766
4767 fn serialize_full_zip(
4770 compressed_data: PerValueDataBlock,
4771 repdef: ControlWordIterator,
4772 num_items: u64,
4773 ) -> SerializedFullZip {
4774 match compressed_data {
4775 PerValueDataBlock::Fixed(fixed) => {
4776 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4777 }
4778 PerValueDataBlock::Variable(var) => {
4779 Self::serialize_full_zip_variable(var, repdef, num_items)
4780 }
4781 }
4782 }
4783
4784 fn encode_full_zip(
4785 column_idx: u32,
4786 field: &Field,
4787 compression_strategy: &dyn CompressionStrategy,
4788 data: DataBlock,
4789 repdef: crate::repdef::SerializedRepDefs,
4790 row_number: u64,
4791 num_lists: u64,
4792 ) -> Result<EncodedPage> {
4793 let max_rep = repdef
4794 .repetition_levels
4795 .as_ref()
4796 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4797 let max_def = repdef
4798 .definition_levels
4799 .as_ref()
4800 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4801
4802 let (num_items, num_visible_items) =
4806 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4807 (rep_levels.len() as u64, data.num_values())
4810 } else {
4811 (data.num_values(), data.num_values())
4813 };
4814
4815 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4816
4817 let repdef_iter = build_control_word_iterator(
4818 repdef.repetition_levels.as_deref(),
4819 max_rep,
4820 repdef.definition_levels.as_deref(),
4821 max_def,
4822 max_visible_def,
4823 num_items as usize,
4824 );
4825 let bits_rep = repdef_iter.bits_rep();
4826 let bits_def = repdef_iter.bits_def();
4827
4828 let compressor = compression_strategy.create_per_value(field, &data)?;
4829 let (compressed_data, value_encoding) = compressor.compress(data)?;
4830
4831 let description = match &compressed_data {
4832 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4833 bits_rep,
4834 bits_def,
4835 fixed.bits_per_value as u32,
4836 value_encoding,
4837 &repdef.def_meaning,
4838 num_items as u32,
4839 num_visible_items as u32,
4840 ),
4841 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4842 bits_rep,
4843 bits_def,
4844 variable.bits_per_offset as u32,
4845 value_encoding,
4846 &repdef.def_meaning,
4847 num_items as u32,
4848 num_visible_items as u32,
4849 ),
4850 };
4851
4852 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4853
4854 let data = if let Some(repindex) = zipped.repetition_index {
4855 vec![zipped.values, repindex]
4856 } else {
4857 vec![zipped.values]
4858 };
4859
4860 Ok(EncodedPage {
4861 num_rows: num_lists,
4862 column_idx,
4863 data,
4864 description: PageEncoding::Structural(description),
4865 row_number,
4866 })
4867 }
4868
4869 fn should_dictionary_encode(
4870 data_block: &DataBlock,
4871 field: &Field,
4872 version: LanceFileVersion,
4873 ) -> Option<DictEncodingBudget> {
4874 const DEFAULT_SAMPLE_SIZE: usize = 4096;
4875 const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
4876
4877 match data_block {
4880 DataBlock::FixedWidth(fixed) => {
4881 if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4882 return None;
4883 }
4884 if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4885 return None;
4886 }
4887 if fixed.bits_per_value % 8 != 0 {
4888 return None;
4889 }
4890 }
4891 DataBlock::VariableWidth(var) => {
4892 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4893 return None;
4894 }
4895 }
4896 _ => return None,
4897 }
4898
4899 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4901 .ok()
4902 .and_then(|val| val.parse().ok())
4903 .unwrap_or(100);
4904 if data_block.num_values() < too_small {
4905 return None;
4906 }
4907
4908 let num_values = data_block.num_values();
4909
4910 let divisor: u64 = field
4913 .metadata
4914 .get(DICT_DIVISOR_META_KEY)
4915 .and_then(|val| val.parse().ok())
4916 .or_else(|| {
4917 env::var("LANCE_ENCODING_DICT_DIVISOR")
4918 .ok()
4919 .and_then(|val| val.parse().ok())
4920 })
4921 .unwrap_or(DEFAULT_DICT_DIVISOR);
4922
4923 let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
4924 .ok()
4925 .and_then(|val| val.parse().ok())
4926 .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
4927
4928 let threshold_cardinality = num_values
4929 .checked_div(divisor.max(1))
4930 .unwrap_or(0)
4931 .min(max_cardinality);
4932 if threshold_cardinality == 0 {
4933 return None;
4934 }
4935
4936 let threshold_ratio = field
4938 .metadata
4939 .get(DICT_SIZE_RATIO_META_KEY)
4940 .and_then(|val| val.parse::<f64>().ok())
4941 .or_else(|| {
4942 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4943 .ok()
4944 .and_then(|val| val.parse().ok())
4945 })
4946 .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
4947
4948 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4949 panic!(
4950 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4951 threshold_ratio
4952 );
4953 }
4954
4955 let data_size = data_block.data_size();
4956 if data_size == 0 {
4957 return None;
4958 }
4959
4960 let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
4961 let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
4962
4963 if Self::sample_is_near_unique(
4965 data_block,
4966 DEFAULT_SAMPLE_SIZE,
4967 DEFAULT_SAMPLE_UNIQUE_RATIO,
4968 )? {
4969 return None;
4970 }
4971
4972 let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
4973 Some(DictEncodingBudget {
4974 max_dict_entries,
4975 max_encoded_size,
4976 })
4977 }
4978
4979 fn sample_is_near_unique(
4985 data_block: &DataBlock,
4986 max_samples: usize,
4987 unique_ratio_threshold: f64,
4988 ) -> Option<bool> {
4989 use std::collections::HashSet;
4990
4991 if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
4992 return None;
4993 }
4994
4995 let num_values = usize::try_from(data_block.num_values()).ok()?;
4996 if num_values == 0 {
4997 return Some(false);
4998 }
4999
5000 let sample_count = num_values.min(max_samples).max(1);
5001 let step = (num_values / sample_count).max(1);
5003
5004 match data_block {
5005 DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
5006 64 => {
5007 let values = fixed.data.borrow_to_typed_slice::<u64>();
5008 let values = values.as_ref();
5009 let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5010 for idx in (0..num_values).step_by(step).take(sample_count) {
5011 unique.insert(values.get(idx).copied()?);
5012 }
5013 let ratio = unique.len() as f64 / sample_count as f64;
5014 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5016 }
5017 128 => {
5018 let values = fixed.data.borrow_to_typed_slice::<u128>();
5019 let values = values.as_ref();
5020 let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
5021 for idx in (0..num_values).step_by(step).take(sample_count) {
5022 unique.insert(values.get(idx).copied()?);
5023 }
5024 let ratio = unique.len() as f64 / sample_count as f64;
5025 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5026 }
5027 _ => Some(false),
5028 },
5029 DataBlock::VariableWidth(var) => {
5030 use xxhash_rust::xxh3::xxh3_64;
5031
5032 let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5034 match var.bits_per_offset {
5035 32 => {
5036 let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
5037 let offsets: &[u32] = offsets_ref.as_ref();
5038 for i in (0..num_values).step_by(step).take(sample_count) {
5039 let start = usize::try_from(*offsets.get(i)?).ok()?;
5040 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5041 if start > end || end > var.data.len() {
5042 return None;
5043 }
5044 unique.insert(xxh3_64(&var.data[start..end]));
5045 }
5046 }
5047 64 => {
5048 let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
5049 let offsets: &[u64] = offsets_ref.as_ref();
5050 for i in (0..num_values).step_by(step).take(sample_count) {
5051 let start = usize::try_from(*offsets.get(i)?).ok()?;
5052 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5053 if start > end || end > var.data.len() {
5054 return None;
5055 }
5056 unique.insert(xxh3_64(&var.data[start..end]));
5057 }
5058 }
5059 _ => return Some(false),
5060 }
5061 let ratio = unique.len() as f64 / sample_count as f64;
5062 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5063 }
5064 _ => Some(false),
5065 }
5066 }
5067
5068 fn do_flush(
5070 &mut self,
5071 arrays: Vec<ArrayRef>,
5072 repdefs: Vec<RepDefBuilder>,
5073 row_number: u64,
5074 num_rows: u64,
5075 ) -> Result<Vec<EncodeTask>> {
5076 let column_idx = self.column_index;
5077 let compression_strategy = self.compression_strategy.clone();
5078 let field = self.field.clone();
5079 let encoding_metadata = self.encoding_metadata.clone();
5080 let support_large_chunk = self.support_large_chunk;
5081 let version = self.version;
5082 let task = spawn_cpu(move || {
5083 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5084 let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5085 let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5086 let repdef = RepDefBuilder::serialize(repdefs);
5087
5088 if num_values == 0 {
5089 log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
5093 return Self::encode_complex_all_null(
5094 column_idx,
5095 repdef,
5096 row_number,
5097 num_rows,
5098 version,
5099 compression_strategy.as_ref(),
5100 );
5101 }
5102
5103 let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5104 let all_null = leaf_validity
5105 .as_ref()
5106 .map(|validity| validity.count_set_bits() == 0)
5107 .unwrap_or(false);
5108
5109 if all_null {
5110 return if is_simple_validity {
5111 log::debug!(
5112 "Encoding column {} with {} items ({} rows) using simple-null layout",
5113 column_idx,
5114 num_values,
5115 num_rows
5116 );
5117 Self::encode_simple_all_null(column_idx, num_values, row_number)
5118 } else {
5119 log::debug!(
5120 "Encoding column {} with {} items ({} rows) using complex-null layout",
5121 column_idx,
5122 num_values,
5123 num_rows
5124 );
5125 Self::encode_complex_all_null(
5126 column_idx,
5127 repdef,
5128 row_number,
5129 num_rows,
5130 version,
5131 compression_strategy.as_ref(),
5132 )
5133 };
5134 }
5135
5136 if let DataType::Struct(fields) = &field.data_type()
5137 && fields.is_empty()
5138 {
5139 if has_repdef_info {
5140 return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5141 }
5142 return Self::encode_simple_all_null(column_idx, num_values, row_number);
5145 }
5146
5147 let data_block = DataBlock::from_arrays(&arrays, num_values);
5148
5149 if version.resolve() >= LanceFileVersion::V2_2
5150 && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5151 {
5152 log::debug!(
5153 "Encoding column {} with {} items ({} rows) using constant layout",
5154 column_idx,
5155 num_values,
5156 num_rows
5157 );
5158 return constant::encode_constant_page(
5159 column_idx,
5160 scalar,
5161 repdef,
5162 row_number,
5163 num_rows,
5164 );
5165 }
5166
5167 let requires_full_zip_packed_struct =
5168 if let DataBlock::Struct(ref struct_data_block) = data_block {
5169 struct_data_block.has_variable_width_child()
5170 } else {
5171 false
5172 };
5173
5174 if requires_full_zip_packed_struct {
5175 log::debug!(
5176 "Encoding column {} with {} items using full-zip packed struct layout",
5177 column_idx,
5178 num_values
5179 );
5180 return Self::encode_full_zip(
5181 column_idx,
5182 &field,
5183 compression_strategy.as_ref(),
5184 data_block,
5185 repdef,
5186 row_number,
5187 num_rows,
5188 );
5189 }
5190
5191 let too_sparse = Self::repdef_too_sparse_for_miniblock(&repdef, num_values);
5195
5196 if !too_sparse {
5197 if let DataBlock::Dictionary(dict) = data_block {
5198 log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
5199 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5200 indices_data_block.compute_stat();
5205 return Self::encode_miniblock(
5206 column_idx,
5207 &field,
5208 compression_strategy.as_ref(),
5209 indices_data_block,
5210 repdef,
5211 row_number,
5212 Some(dictionary_data_block),
5213 num_rows,
5214 support_large_chunk,
5215 );
5216 }
5217 } else {
5218 log::debug!(
5219 "Encoding column {} with {} items using full-zip layout \
5220 (rep/def too sparse for mini-block)",
5221 column_idx,
5222 num_values
5223 );
5224 }
5225
5226 {
5227 let dict_result = if too_sparse {
5230 None
5231 } else {
5232 Self::should_dictionary_encode(&data_block, &field, version)
5233 .and_then(|budget| {
5234 log::debug!(
5235 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5236 column_idx,
5237 num_values
5238 );
5239 dict::dictionary_encode(
5240 &data_block,
5241 budget.max_dict_entries,
5242 budget.max_encoded_size,
5243 )
5244 })
5245 };
5246
5247 if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5248 Self::encode_miniblock(
5249 column_idx,
5250 &field,
5251 compression_strategy.as_ref(),
5252 indices_data_block,
5253 repdef,
5254 row_number,
5255 Some(dictionary_data_block),
5256 num_rows,
5257 support_large_chunk,
5258 )
5259 } else if !too_sparse && Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5260 log::debug!(
5261 "Encoding column {} with {} items using mini-block layout",
5262 column_idx,
5263 num_values
5264 );
5265 Self::encode_miniblock(
5266 column_idx,
5267 &field,
5268 compression_strategy.as_ref(),
5269 data_block,
5270 repdef,
5271 row_number,
5272 None,
5273 num_rows,
5274 support_large_chunk,
5275 )
5276 } else if too_sparse || Self::prefers_fullzip(encoding_metadata.as_ref()) {
5277 log::debug!(
5278 "Encoding column {} with {} items using full-zip layout",
5279 column_idx,
5280 num_values
5281 );
5282 Self::encode_full_zip(
5283 column_idx,
5284 &field,
5285 compression_strategy.as_ref(),
5286 data_block,
5287 repdef,
5288 row_number,
5289 num_rows,
5290 )
5291 } else {
5292 Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5293 }
5294 }
5295 })
5296 .boxed();
5297 Ok(vec![task])
5298 }
5299
5300 fn extract_validity_buf(
5301 array: Arc<dyn Array>,
5302 repdef: &mut RepDefBuilder,
5303 keep_original_array: bool,
5304 ) -> Result<Arc<dyn Array>> {
5305 if let Some(validity) = array.nulls() {
5306 if keep_original_array {
5307 repdef.add_validity_bitmap(validity.clone());
5308 } else {
5309 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5310 }
5311 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5312 Ok(make_array(data_no_nulls))
5313 } else {
5314 repdef.add_no_null(array.len());
5315 Ok(array)
5316 }
5317 }
5318
5319 fn extract_validity(
5320 mut array: Arc<dyn Array>,
5321 repdef: &mut RepDefBuilder,
5322 keep_original_array: bool,
5323 ) -> Result<Arc<dyn Array>> {
5324 match array.data_type() {
5325 DataType::Null => {
5326 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5327 Ok(array)
5328 }
5329 DataType::Dictionary(_, _) => {
5330 array = dict::normalize_dict_nulls(array)?;
5331 Self::extract_validity_buf(array, repdef, keep_original_array)
5332 }
5333 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5342 }
5343 }
5344}
5345
5346impl FieldEncoder for PrimitiveStructuralEncoder {
5347 fn maybe_encode(
5349 &mut self,
5350 array: ArrayRef,
5351 _external_buffers: &mut OutOfLineBuffers,
5352 mut repdef: RepDefBuilder,
5353 row_number: u64,
5354 num_rows: u64,
5355 ) -> Result<Vec<EncodeTask>> {
5356 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5357 self.accumulated_repdefs.push(repdef);
5358
5359 if let Some((arrays, row_number, num_rows)) =
5360 self.accumulation_queue.insert(array, row_number, num_rows)
5361 {
5362 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5363 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5364 } else {
5365 Ok(vec![])
5366 }
5367 }
5368
5369 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5371 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5372 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5373 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5374 } else {
5375 Ok(vec![])
5376 }
5377 }
5378
5379 fn num_columns(&self) -> u32 {
5380 1
5381 }
5382
5383 fn finish(
5384 &mut self,
5385 _external_buffers: &mut OutOfLineBuffers,
5386 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5387 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5388 }
5389}
5390
5391#[cfg(test)]
5392#[allow(clippy::single_range_in_vec_init)]
5393mod tests {
5394 use super::{
5395 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5396 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5397 FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
5398 PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
5399 };
5400 use crate::buffer::LanceBuffer;
5401 use crate::compression::DefaultDecompressionStrategy;
5402 use crate::constants::{
5403 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5404 DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5405 STRUCTURAL_ENCODING_MINIBLOCK,
5406 };
5407 use crate::data::BlockInfo;
5408 use crate::decoder::PageEncoding;
5409 use crate::encodings::logical::primitive::{
5410 ChunkDrainInstructions, PrimitiveStructuralEncoder,
5411 };
5412 use crate::format::ProtobufUtils21;
5413 use crate::format::pb21;
5414 use crate::format::pb21::compressive_encoding::Compression;
5415 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5416 use crate::version::LanceFileVersion;
5417 use arrow_array::{ArrayRef, Int8Array, StringArray};
5418 use arrow_schema::DataType;
5419 use std::collections::HashMap;
5420 use std::{collections::VecDeque, sync::Arc};
5421
5422 #[test]
5423 fn test_is_narrow() {
5424 let int8_array = Int8Array::from(vec![1, 2, 3]);
5425 let array_ref: ArrayRef = Arc::new(int8_array);
5426 let block = DataBlock::from_array(array_ref);
5427
5428 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5429
5430 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5431 let block = DataBlock::from_array(string_array);
5432 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5433
5434 let string_array = StringArray::from(vec![
5435 Some("hello world".repeat(100)),
5436 Some("world".to_string()),
5437 ]);
5438 let block = DataBlock::from_array(string_array);
5439 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5440 }
5441
5442 #[test]
5443 fn test_map_range() {
5444 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5447 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5448 let max_visible_def = 0;
5449 let total_items = 8;
5450 let max_rep = 1;
5451
5452 let check = |range, expected_item_range, expected_level_range| {
5453 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5454 range,
5455 rep.as_ref(),
5456 def.as_ref(),
5457 max_rep,
5458 max_visible_def,
5459 total_items,
5460 PreambleAction::Absent,
5461 );
5462 assert_eq!(item_range, expected_item_range);
5463 assert_eq!(level_range, expected_level_range);
5464 };
5465
5466 check(0..1, 0..3, 0..3);
5467 check(1..2, 3..5, 3..5);
5468 check(2..3, 5..5, 5..6);
5469 check(3..4, 5..8, 6..9);
5470 check(0..2, 0..5, 0..5);
5471 check(1..3, 3..5, 3..6);
5472 check(2..4, 5..8, 5..9);
5473 check(0..3, 0..5, 0..6);
5474 check(1..4, 3..8, 3..9);
5475 check(0..4, 0..8, 0..9);
5476
5477 let rep = Some(vec![1, 1, 0, 1]);
5480 let def = Some(vec![1, 0, 0, 0]);
5481 let max_visible_def = 0;
5482 let total_items = 3;
5483
5484 let check = |range, expected_item_range, expected_level_range| {
5485 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5486 range,
5487 rep.as_ref(),
5488 def.as_ref(),
5489 max_rep,
5490 max_visible_def,
5491 total_items,
5492 PreambleAction::Absent,
5493 );
5494 assert_eq!(item_range, expected_item_range);
5495 assert_eq!(level_range, expected_level_range);
5496 };
5497
5498 check(0..1, 0..0, 0..1);
5499 check(1..2, 0..2, 1..3);
5500 check(2..3, 2..3, 3..4);
5501 check(0..2, 0..2, 0..3);
5502 check(1..3, 0..3, 1..4);
5503 check(0..3, 0..3, 0..4);
5504
5505 let rep = Some(vec![1, 1, 0, 1]);
5508 let def = Some(vec![0, 0, 0, 1]);
5509 let max_visible_def = 0;
5510 let total_items = 3;
5511
5512 let check = |range, expected_item_range, expected_level_range| {
5513 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5514 range,
5515 rep.as_ref(),
5516 def.as_ref(),
5517 max_rep,
5518 max_visible_def,
5519 total_items,
5520 PreambleAction::Absent,
5521 );
5522 assert_eq!(item_range, expected_item_range);
5523 assert_eq!(level_range, expected_level_range);
5524 };
5525
5526 check(0..1, 0..1, 0..1);
5527 check(1..2, 1..3, 1..3);
5528 check(2..3, 3..3, 3..4);
5529 check(0..2, 0..3, 0..3);
5530 check(1..3, 1..3, 1..4);
5531 check(0..3, 0..3, 0..4);
5532
5533 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5536 let def: Option<&[u16]> = None;
5537 let max_visible_def = 0;
5538 let total_items = 6;
5539
5540 let check = |range, expected_item_range, expected_level_range| {
5541 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5542 range,
5543 rep.as_ref(),
5544 def.as_ref(),
5545 max_rep,
5546 max_visible_def,
5547 total_items,
5548 PreambleAction::Absent,
5549 );
5550 assert_eq!(item_range, expected_item_range);
5551 assert_eq!(level_range, expected_level_range);
5552 };
5553
5554 check(0..1, 0..2, 0..2);
5555 check(1..2, 2..4, 2..4);
5556 check(2..3, 4..6, 4..6);
5557 check(0..2, 0..4, 0..4);
5558 check(1..3, 2..6, 2..6);
5559 check(0..3, 0..6, 0..6);
5560
5561 let rep: Option<&[u16]> = None;
5564 let def = Some(vec![0, 0, 1, 0]);
5565 let max_visible_def = 1;
5566 let total_items = 4;
5567
5568 let check = |range, expected_item_range, expected_level_range| {
5569 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5570 range,
5571 rep.as_ref(),
5572 def.as_ref(),
5573 max_rep,
5574 max_visible_def,
5575 total_items,
5576 PreambleAction::Absent,
5577 );
5578 assert_eq!(item_range, expected_item_range);
5579 assert_eq!(level_range, expected_level_range);
5580 };
5581
5582 check(0..1, 0..1, 0..1);
5583 check(1..2, 1..2, 1..2);
5584 check(2..3, 2..3, 2..3);
5585 check(0..2, 0..2, 0..2);
5586 check(1..3, 1..3, 1..3);
5587 check(0..3, 0..3, 0..3);
5588
5589 let rep = Some(vec![0, 1, 0, 1]);
5594 let def = Some(vec![0, 0, 0, 1]);
5595 let max_visible_def = 0;
5596 let total_items = 3;
5597
5598 let check = |range, expected_item_range, expected_level_range| {
5599 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5600 range,
5601 rep.as_ref(),
5602 def.as_ref(),
5603 max_rep,
5604 max_visible_def,
5605 total_items,
5606 PreambleAction::Take,
5607 );
5608 assert_eq!(item_range, expected_item_range);
5609 assert_eq!(level_range, expected_level_range);
5610 };
5611
5612 check(0..1, 0..3, 0..3);
5614 check(0..2, 0..3, 0..4);
5615
5616 let check = |range, expected_item_range, expected_level_range| {
5617 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5618 range,
5619 rep.as_ref(),
5620 def.as_ref(),
5621 max_rep,
5622 max_visible_def,
5623 total_items,
5624 PreambleAction::Skip,
5625 );
5626 assert_eq!(item_range, expected_item_range);
5627 assert_eq!(level_range, expected_level_range);
5628 };
5629
5630 check(0..1, 1..3, 1..3);
5631 check(1..2, 3..3, 3..4);
5632 check(0..2, 1..3, 1..4);
5633
5634 let rep = Some(vec![0, 1, 1, 0]);
5639 let def = Some(vec![0, 1, 0, 0]);
5640 let max_visible_def = 0;
5641 let total_items = 4;
5642
5643 let check = |range, expected_item_range, expected_level_range| {
5644 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5645 range,
5646 rep.as_ref(),
5647 def.as_ref(),
5648 max_rep,
5649 max_visible_def,
5650 total_items,
5651 PreambleAction::Take,
5652 );
5653 assert_eq!(item_range, expected_item_range);
5654 assert_eq!(level_range, expected_level_range);
5655 };
5656
5657 check(0..1, 0..1, 0..2);
5659 check(0..2, 0..3, 0..4);
5660
5661 let check = |range, expected_item_range, expected_level_range| {
5662 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5663 range,
5664 rep.as_ref(),
5665 def.as_ref(),
5666 max_rep,
5667 max_visible_def,
5668 total_items,
5669 PreambleAction::Skip,
5670 );
5671 assert_eq!(item_range, expected_item_range);
5672 assert_eq!(level_range, expected_level_range);
5673 };
5674
5675 check(0..1, 1..1, 1..2);
5677 check(1..2, 1..3, 2..4);
5678 check(0..2, 1..3, 1..4);
5679
5680 let rep = Some(vec![0, 1, 0, 1]);
5683 let def: Option<Vec<u16>> = None;
5684 let max_visible_def = 0;
5685 let total_items = 4;
5686
5687 let check = |range, expected_item_range, expected_level_range| {
5688 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5689 range,
5690 rep.as_ref(),
5691 def.as_ref(),
5692 max_rep,
5693 max_visible_def,
5694 total_items,
5695 PreambleAction::Take,
5696 );
5697 assert_eq!(item_range, expected_item_range);
5698 assert_eq!(level_range, expected_level_range);
5699 };
5700
5701 check(0..1, 0..3, 0..3);
5703 check(0..2, 0..4, 0..4);
5704
5705 let check = |range, expected_item_range, expected_level_range| {
5706 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5707 range,
5708 rep.as_ref(),
5709 def.as_ref(),
5710 max_rep,
5711 max_visible_def,
5712 total_items,
5713 PreambleAction::Skip,
5714 );
5715 assert_eq!(item_range, expected_item_range);
5716 assert_eq!(level_range, expected_level_range);
5717 };
5718
5719 check(0..1, 1..3, 1..3);
5720 check(1..2, 3..4, 3..4);
5721 check(0..2, 1..4, 1..4);
5722
5723 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
5727 let def = Some(vec![0, 1, 2, 0, 0, 0]);
5728 let max_rep = 2;
5729 let max_visible_def = 0;
5730 let total_items = 4;
5731
5732 let check = |range, expected_item_range, expected_level_range| {
5733 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5734 range,
5735 rep.as_ref(),
5736 def.as_ref(),
5737 max_rep,
5738 max_visible_def,
5739 total_items,
5740 PreambleAction::Absent,
5741 );
5742 assert_eq!(item_range, expected_item_range);
5743 assert_eq!(level_range, expected_level_range);
5744 };
5745
5746 check(0..3, 0..4, 0..6);
5747 check(0..1, 0..1, 0..2);
5748 check(1..2, 1..3, 2..5);
5749 check(2..3, 3..4, 5..6);
5750
5751 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5753 let def = Some(vec![0, 1, 0, 0, 0, 0]);
5754 let max_rep = 1;
5755 let max_visible_def = 0;
5756 let total_items = 5;
5757
5758 let check = |range, expected_item_range, expected_level_range| {
5759 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5760 range,
5761 rep.as_ref(),
5762 def.as_ref(),
5763 max_rep,
5764 max_visible_def,
5765 total_items,
5766 PreambleAction::Take,
5767 );
5768 assert_eq!(item_range, expected_item_range);
5769 assert_eq!(level_range, expected_level_range);
5770 };
5771
5772 check(0..0, 0..1, 0..2);
5773 check(0..1, 0..3, 0..4);
5774 check(0..2, 0..4, 0..5);
5775
5776 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5779 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5780 let max_rep = 1;
5781 let max_visible_def = 0;
5782 let total_items = 5;
5783
5784 let check = |range, expected_item_range, expected_level_range| {
5785 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5786 range,
5787 rep.as_ref(),
5788 def.as_ref(),
5789 max_rep,
5790 max_visible_def,
5791 total_items,
5792 PreambleAction::Skip,
5793 );
5794 assert_eq!(item_range, expected_item_range);
5795 assert_eq!(level_range, expected_level_range);
5796 };
5797
5798 check(2..3, 2..4, 5..7);
5799 }
5800
5801 #[test]
5802 fn test_slice_batch_data_and_rebase_offsets_u32() {
5803 let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
5804 let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
5805
5806 let (sliced_data, normalized_offsets) =
5807 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5808 .unwrap();
5809
5810 assert_eq!(sliced_data.as_ref(), b"6789ab");
5811 let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
5812 assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
5813 }
5814
5815 #[test]
5816 fn test_slice_batch_data_and_rebase_offsets_u64() {
5817 let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
5818 let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
5819
5820 let (sliced_data, normalized_offsets) =
5821 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
5822 .unwrap();
5823
5824 assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
5825 let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
5826 assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
5827 }
5828
5829 #[test]
5830 fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
5831 let data = LanceBuffer::copy_slice(b"abcd");
5832 let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
5833
5834 let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5835 .expect_err("offset end before start should error");
5836 assert!(err.to_string().contains("less than base"));
5837 }
5838
5839 #[test]
5840 fn test_schedule_instructions() {
5841 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5843 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5844 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5845
5846 let check = |user_ranges, expected_instructions| {
5847 let instructions =
5848 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5849 assert_eq!(instructions, expected_instructions);
5850 };
5851
5852 let expected_take_all = vec![
5854 ChunkInstructions {
5855 chunk_idx: 0,
5856 preamble: PreambleAction::Absent,
5857 rows_to_skip: 0,
5858 rows_to_take: 6,
5859 take_trailer: true,
5860 },
5861 ChunkInstructions {
5862 chunk_idx: 1,
5863 preamble: PreambleAction::Take,
5864 rows_to_skip: 0,
5865 rows_to_take: 2,
5866 take_trailer: false,
5867 },
5868 ChunkInstructions {
5869 chunk_idx: 2,
5870 preamble: PreambleAction::Absent,
5871 rows_to_skip: 0,
5872 rows_to_take: 5,
5873 take_trailer: true,
5874 },
5875 ChunkInstructions {
5876 chunk_idx: 3,
5877 preamble: PreambleAction::Take,
5878 rows_to_skip: 0,
5879 rows_to_take: 1,
5880 take_trailer: false,
5881 },
5882 ];
5883
5884 check(&[0..14], expected_take_all.clone());
5886
5887 check(
5889 &[
5890 0..1,
5891 1..2,
5892 2..3,
5893 3..4,
5894 4..5,
5895 5..6,
5896 6..7,
5897 7..8,
5898 8..9,
5899 9..10,
5900 10..11,
5901 11..12,
5902 12..13,
5903 13..14,
5904 ],
5905 expected_take_all,
5906 );
5907
5908 check(
5912 &[0..1, 3..4],
5913 vec![
5914 ChunkInstructions {
5915 chunk_idx: 0,
5916 preamble: PreambleAction::Absent,
5917 rows_to_skip: 0,
5918 rows_to_take: 1,
5919 take_trailer: false,
5920 },
5921 ChunkInstructions {
5922 chunk_idx: 0,
5923 preamble: PreambleAction::Absent,
5924 rows_to_skip: 3,
5925 rows_to_take: 1,
5926 take_trailer: false,
5927 },
5928 ],
5929 );
5930
5931 check(
5933 &[5..6],
5934 vec![
5935 ChunkInstructions {
5936 chunk_idx: 0,
5937 preamble: PreambleAction::Absent,
5938 rows_to_skip: 5,
5939 rows_to_take: 1,
5940 take_trailer: true,
5941 },
5942 ChunkInstructions {
5943 chunk_idx: 1,
5944 preamble: PreambleAction::Take,
5945 rows_to_skip: 0,
5946 rows_to_take: 0,
5947 take_trailer: false,
5948 },
5949 ],
5950 );
5951
5952 check(
5954 &[7..10],
5955 vec![
5956 ChunkInstructions {
5957 chunk_idx: 1,
5958 preamble: PreambleAction::Skip,
5959 rows_to_skip: 1,
5960 rows_to_take: 1,
5961 take_trailer: false,
5962 },
5963 ChunkInstructions {
5964 chunk_idx: 2,
5965 preamble: PreambleAction::Absent,
5966 rows_to_skip: 0,
5967 rows_to_take: 2,
5968 take_trailer: false,
5969 },
5970 ],
5971 );
5972 }
5973
5974 #[test]
5975 fn test_drain_instructions() {
5976 fn drain_from_instructions(
5977 instructions: &mut VecDeque<ChunkInstructions>,
5978 mut rows_desired: u64,
5979 need_preamble: &mut bool,
5980 skip_in_chunk: &mut u64,
5981 ) -> Vec<ChunkDrainInstructions> {
5982 let mut drain_instructions = Vec::with_capacity(instructions.len());
5984 while rows_desired > 0 || *need_preamble {
5985 let (next_instructions, consumed_chunk) = instructions
5986 .front()
5987 .unwrap()
5988 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5989 if consumed_chunk {
5990 instructions.pop_front();
5991 }
5992 drain_instructions.push(next_instructions);
5993 }
5994 drain_instructions
5995 }
5996
5997 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5999 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6000 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6001 let user_ranges = vec![1..7, 10..14];
6002
6003 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6005
6006 let mut to_drain = VecDeque::from(scheduled.clone());
6007
6008 let mut need_preamble = false;
6011 let mut skip_in_chunk = 0;
6012
6013 let next_batch =
6014 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6015
6016 assert!(!need_preamble);
6017 assert_eq!(skip_in_chunk, 4);
6018 assert_eq!(
6019 next_batch,
6020 vec![ChunkDrainInstructions {
6021 chunk_instructions: scheduled[0].clone(),
6022 rows_to_take: 4,
6023 rows_to_skip: 0,
6024 preamble_action: PreambleAction::Absent,
6025 }]
6026 );
6027
6028 let next_batch =
6029 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6030
6031 assert!(!need_preamble);
6032 assert_eq!(skip_in_chunk, 2);
6033
6034 assert_eq!(
6035 next_batch,
6036 vec![
6037 ChunkDrainInstructions {
6038 chunk_instructions: scheduled[0].clone(),
6039 rows_to_take: 1,
6040 rows_to_skip: 4,
6041 preamble_action: PreambleAction::Absent,
6042 },
6043 ChunkDrainInstructions {
6044 chunk_instructions: scheduled[1].clone(),
6045 rows_to_take: 1,
6046 rows_to_skip: 0,
6047 preamble_action: PreambleAction::Take,
6048 },
6049 ChunkDrainInstructions {
6050 chunk_instructions: scheduled[2].clone(),
6051 rows_to_take: 2,
6052 rows_to_skip: 0,
6053 preamble_action: PreambleAction::Absent,
6054 }
6055 ]
6056 );
6057
6058 let next_batch =
6059 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6060
6061 assert!(!need_preamble);
6062 assert_eq!(skip_in_chunk, 0);
6063
6064 assert_eq!(
6065 next_batch,
6066 vec![
6067 ChunkDrainInstructions {
6068 chunk_instructions: scheduled[2].clone(),
6069 rows_to_take: 1,
6070 rows_to_skip: 2,
6071 preamble_action: PreambleAction::Absent,
6072 },
6073 ChunkDrainInstructions {
6074 chunk_instructions: scheduled[3].clone(),
6075 rows_to_take: 1,
6076 rows_to_skip: 0,
6077 preamble_action: PreambleAction::Take,
6078 },
6079 ]
6080 );
6081
6082 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
6084 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6085 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6086 let user_ranges = vec![0..28];
6087
6088 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6090
6091 let mut to_drain = VecDeque::from(scheduled.clone());
6092
6093 let mut need_preamble = false;
6096 let mut skip_in_chunk = 0;
6097
6098 let next_batch =
6099 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6100
6101 assert_eq!(
6102 next_batch,
6103 vec![
6104 ChunkDrainInstructions {
6105 chunk_instructions: scheduled[0].clone(),
6106 rows_to_take: 6,
6107 rows_to_skip: 0,
6108 preamble_action: PreambleAction::Absent,
6109 },
6110 ChunkDrainInstructions {
6111 chunk_instructions: scheduled[1].clone(),
6112 rows_to_take: 1,
6113 rows_to_skip: 0,
6114 preamble_action: PreambleAction::Take,
6115 },
6116 ]
6117 );
6118
6119 assert!(!need_preamble);
6120 assert_eq!(skip_in_chunk, 1);
6121
6122 let next_batch =
6125 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6126
6127 assert_eq!(
6128 next_batch,
6129 vec![
6130 ChunkDrainInstructions {
6131 chunk_instructions: scheduled[1].clone(),
6132 rows_to_take: 2,
6133 rows_to_skip: 1,
6134 preamble_action: PreambleAction::Skip,
6135 },
6136 ChunkDrainInstructions {
6137 chunk_instructions: scheduled[2].clone(),
6138 rows_to_take: 0,
6139 rows_to_skip: 0,
6140 preamble_action: PreambleAction::Take,
6141 },
6142 ]
6143 );
6144
6145 assert!(!need_preamble);
6146 assert_eq!(skip_in_chunk, 0);
6147 }
6148
6149 #[tokio::test]
6150 async fn test_fullzip_initialize_is_lazy() {
6151 use futures::{FutureExt, future::BoxFuture};
6152 use std::ops::Range;
6153 use std::sync::Mutex;
6154
6155 #[derive(Debug, Clone)]
6156 struct RecordingScheduler {
6157 data: bytes::Bytes,
6158 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6159 }
6160
6161 impl RecordingScheduler {
6162 fn new(data: bytes::Bytes) -> Self {
6163 Self {
6164 data,
6165 requests: Arc::new(Mutex::new(Vec::new())),
6166 }
6167 }
6168
6169 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6170 self.requests.lock().unwrap().clone()
6171 }
6172 }
6173
6174 impl crate::EncodingsIo for RecordingScheduler {
6175 fn submit_request(
6176 &self,
6177 ranges: Vec<Range<u64>>,
6178 _priority: u64,
6179 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6180 self.requests.lock().unwrap().push(ranges.clone());
6181 let data = ranges
6182 .into_iter()
6183 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6184 .collect::<Vec<_>>();
6185 std::future::ready(Ok(data)).boxed()
6186 }
6187 }
6188
6189 #[derive(Debug)]
6190 struct TestFixedDecompressor;
6191
6192 impl FixedPerValueDecompressor for TestFixedDecompressor {
6193 fn decompress(
6194 &self,
6195 _data: FixedWidthDataBlock,
6196 _num_rows: u64,
6197 ) -> crate::Result<DataBlock> {
6198 unimplemented!("Test decompressor")
6199 }
6200
6201 fn bits_per_value(&self) -> u64 {
6202 32
6203 }
6204 }
6205
6206 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6207 0;
6208 16 * 1024
6209 ])));
6210 let mut scheduler = FullZipScheduler {
6211 data_buf_position: 0,
6212 data_buf_size: 4096,
6213 rep_index: Some(FullZipRepIndexDetails {
6214 buf_position: 1000,
6215 bytes_per_value: 4,
6216 }),
6217 priority: 0,
6218 rows_in_page: 100,
6219 bits_per_offset: 32,
6220 details: Arc::new(FullZipDecodeDetails {
6221 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6222 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6223 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6224 max_rep: 0,
6225 max_visible_def: 0,
6226 }),
6227 cached_state: None,
6228 enable_cache: false,
6229 };
6230
6231 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6232 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6233
6234 assert!(
6235 cached_data
6236 .as_arc_any()
6237 .downcast_ref::<super::NoCachedPageData>()
6238 .is_some(),
6239 "FullZip initialize should not eagerly load repetition index data"
6240 );
6241 assert!(scheduler.cached_state.is_none());
6242 assert!(
6243 io.requests().is_empty(),
6244 "FullZip initialize should not issue any I/O"
6245 );
6246 }
6247
6248 #[tokio::test]
6249 async fn test_fullzip_read_source_slices_prefetched_page() {
6250 let page_start = 200_u64;
6251 let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6252 let source = FullZipReadSource::PrefetchedPage {
6253 base_offset: page_start,
6254 data: page_data,
6255 };
6256 let ranges = vec![
6257 page_start..(page_start + 3),
6258 (page_start + 4)..(page_start + 8),
6259 ];
6260 let mut data = source.fetch(&ranges, 0).await.unwrap();
6261 assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6262 assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6263 }
6264
6265 #[tokio::test]
6266 async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6267 use futures::{FutureExt, future::BoxFuture};
6268 use std::ops::Range;
6269 use std::sync::Mutex;
6270
6271 #[derive(Debug, Clone)]
6272 struct RecordingScheduler {
6273 data: bytes::Bytes,
6274 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6275 }
6276
6277 impl RecordingScheduler {
6278 fn new(data: bytes::Bytes) -> Self {
6279 Self {
6280 data,
6281 requests: Arc::new(Mutex::new(Vec::new())),
6282 }
6283 }
6284
6285 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6286 self.requests.lock().unwrap().clone()
6287 }
6288 }
6289
6290 impl crate::EncodingsIo for RecordingScheduler {
6291 fn submit_request(
6292 &self,
6293 ranges: Vec<Range<u64>>,
6294 _priority: u64,
6295 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6296 self.requests.lock().unwrap().push(ranges.clone());
6297 let data = ranges
6298 .into_iter()
6299 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6300 .collect::<Vec<_>>();
6301 std::future::ready(Ok(data)).boxed()
6302 }
6303 }
6304
6305 #[derive(Debug)]
6306 struct TestFixedDecompressor;
6307
6308 impl FixedPerValueDecompressor for TestFixedDecompressor {
6309 fn decompress(
6310 &self,
6311 _data: FixedWidthDataBlock,
6312 _num_rows: u64,
6313 ) -> crate::Result<DataBlock> {
6314 unimplemented!("Test decompressor")
6315 }
6316
6317 fn bits_per_value(&self) -> u64 {
6318 32
6319 }
6320 }
6321
6322 let rows_in_page = 100_u64;
6323 let bytes_per_value = 4_u64;
6324 let rep_start = 1000_u64;
6325 let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6326 let mut data = vec![0_u8; 16 * 1024];
6327 data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6328 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6329
6330 let mut scheduler = FullZipScheduler {
6331 data_buf_position: 0,
6332 data_buf_size: 4096,
6333 rep_index: Some(FullZipRepIndexDetails {
6334 buf_position: rep_start,
6335 bytes_per_value,
6336 }),
6337 priority: 0,
6338 rows_in_page,
6339 bits_per_offset: 32,
6340 details: Arc::new(FullZipDecodeDetails {
6341 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6342 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6343 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6344 max_rep: 0,
6345 max_visible_def: 0,
6346 }),
6347 cached_state: None,
6348 enable_cache: true,
6349 };
6350
6351 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6352 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6353 assert!(
6354 cached_data
6355 .as_arc_any()
6356 .downcast_ref::<FullZipCacheableState>()
6357 .is_some()
6358 );
6359 assert!(scheduler.cached_state.is_some());
6360 assert_eq!(
6361 io.requests(),
6362 vec![vec![
6363 rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6364 ]]
6365 );
6366 }
6367
6368 #[tokio::test]
6369 async fn test_fullzip_full_page_bypasses_rep_index_io() {
6370 use futures::{FutureExt, future::BoxFuture};
6371 use std::ops::Range;
6372 use std::sync::Mutex;
6373
6374 #[derive(Debug, Clone)]
6375 struct RecordingScheduler {
6376 data: bytes::Bytes,
6377 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6378 }
6379
6380 impl RecordingScheduler {
6381 fn new(data: bytes::Bytes) -> Self {
6382 Self {
6383 data,
6384 requests: Arc::new(Mutex::new(Vec::new())),
6385 }
6386 }
6387
6388 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6389 self.requests.lock().unwrap().clone()
6390 }
6391 }
6392
6393 impl crate::EncodingsIo for RecordingScheduler {
6394 fn submit_request(
6395 &self,
6396 ranges: Vec<Range<u64>>,
6397 _priority: u64,
6398 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6399 self.requests.lock().unwrap().push(ranges.clone());
6400 let data = ranges
6401 .into_iter()
6402 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6403 .collect::<Vec<_>>();
6404 std::future::ready(Ok(data)).boxed()
6405 }
6406 }
6407
6408 #[derive(Debug)]
6409 struct TestFixedDecompressor;
6410
6411 impl FixedPerValueDecompressor for TestFixedDecompressor {
6412 fn decompress(
6413 &self,
6414 _data: FixedWidthDataBlock,
6415 _num_rows: u64,
6416 ) -> crate::Result<DataBlock> {
6417 unimplemented!("Test decompressor")
6418 }
6419
6420 fn bits_per_value(&self) -> u64 {
6421 32
6422 }
6423 }
6424
6425 let rows_in_page = 100_u64;
6426 let data_start = 256_u64;
6427 let data_size = 500_u64;
6428 let rep_start = 4096_u64;
6429 let bytes_per_value = 4_u64;
6430
6431 let mut bytes = vec![0_u8; 16 * 1024];
6432 for i in 0..=rows_in_page {
6433 let offset = (i * 5) as u32;
6434 let pos = rep_start as usize + (i * bytes_per_value) as usize;
6435 bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6436 }
6437 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6438
6439 let scheduler = FullZipScheduler {
6440 data_buf_position: data_start,
6441 data_buf_size: data_size,
6442 rep_index: Some(FullZipRepIndexDetails {
6443 buf_position: rep_start,
6444 bytes_per_value,
6445 }),
6446 priority: 0,
6447 rows_in_page,
6448 bits_per_offset: 32,
6449 details: Arc::new(FullZipDecodeDetails {
6450 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6451 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6452 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6453 max_rep: 0,
6454 max_visible_def: 0,
6455 }),
6456 cached_state: None,
6457 enable_cache: false,
6458 };
6459
6460 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6461 let tasks = scheduler
6462 .schedule_ranges_rep(
6463 &[0..rows_in_page],
6464 &io_dyn,
6465 FullZipRepIndexDetails {
6466 buf_position: rep_start,
6467 bytes_per_value,
6468 },
6469 )
6470 .unwrap();
6471
6472 let requests = io.requests();
6473 assert_eq!(requests.len(), 1);
6474 assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6475
6476 let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6477 let requests_after_await = io.requests();
6478 assert_eq!(
6479 requests_after_await.len(),
6480 1,
6481 "full page path should not issue rep-index I/O"
6482 );
6483 }
6484
6485 #[tokio::test]
6487 async fn test_fuzz_issue_4492_empty_rep_values() {
6488 use lance_datagen::{RowCount, Seed, array, gen_batch};
6489
6490 let seed = 1823859942947654717u64;
6491 let num_rows = 2741usize;
6492
6493 let batch_gen = gen_batch().with_seed(Seed::from(seed));
6495 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6496 let list_generator = array::rand_list_any(base_generator, false);
6497
6498 let batch = batch_gen
6499 .anon_col(list_generator)
6500 .into_batch_rows(RowCount::from(num_rows as u64))
6501 .unwrap();
6502
6503 let list_array = batch.column(0).clone();
6504
6505 let mut metadata = HashMap::new();
6507 metadata.insert(
6508 STRUCTURAL_ENCODING_META_KEY.to_string(),
6509 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6510 );
6511
6512 let test_cases = TestCases::default()
6513 .with_min_file_version(LanceFileVersion::V2_1)
6514 .with_batch_size(100)
6515 .with_range(0..num_rows.min(500) as u64)
6516 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6517
6518 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6519 }
6520
6521 async fn test_minichunk_size_helper(
6522 string_data: Vec<Option<String>>,
6523 minichunk_size: u64,
6524 file_version: LanceFileVersion,
6525 ) {
6526 use crate::constants::MINICHUNK_SIZE_META_KEY;
6527 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6528 use arrow_array::{ArrayRef, StringArray};
6529 use std::sync::Arc;
6530
6531 let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6532
6533 let mut metadata = HashMap::new();
6534 metadata.insert(
6535 MINICHUNK_SIZE_META_KEY.to_string(),
6536 minichunk_size.to_string(),
6537 );
6538 metadata.insert(
6539 STRUCTURAL_ENCODING_META_KEY.to_string(),
6540 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6541 );
6542
6543 let test_cases = TestCases::default()
6544 .with_min_file_version(file_version)
6545 .with_batch_size(1000);
6546
6547 check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6548 }
6549
6550 #[tokio::test]
6551 async fn test_minichunk_size_roundtrip() {
6552 let mut string_data = Vec::new();
6554 for i in 0..100 {
6555 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6556 }
6557 test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6559 }
6560
6561 #[tokio::test]
6562 async fn test_minichunk_size_128kb_v2_2() {
6563 let mut string_data = Vec::new();
6565 for i in 0..10000 {
6567 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6568 }
6569 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6570 }
6571
6572 #[tokio::test]
6573 async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6574 let mut string_data = Vec::new();
6575 for i in 0..10000 {
6577 string_data.push(Some(format!("t_{}", i)));
6578 }
6579 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6580 }
6581
6582 #[tokio::test]
6583 async fn test_large_dictionary_general_compression() {
6584 use arrow_array::{ArrayRef, StringArray};
6585 use std::collections::HashMap;
6586 use std::sync::Arc;
6587
6588 let unique_values: Vec<String> = (0..100)
6591 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
6592 .collect();
6593
6594 let repeated_strings: Vec<_> = unique_values
6596 .iter()
6597 .cycle()
6598 .take(100_000)
6599 .map(|s| Some(s.as_str()))
6600 .collect();
6601
6602 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
6603
6604 let test_cases = TestCases::default()
6606 .with_min_file_version(LanceFileVersion::V2_2)
6607 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
6608 assert_eq!(cols.len(), 1);
6609 let col = &cols[0];
6610
6611 if let Some(PageEncoding::Structural(page_layout)) =
6613 &col.final_pages.first().map(|p| &p.description)
6614 && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
6615 &page_layout.layout
6616 && let Some(dictionary_encoding) = &mini_block.dictionary
6617 {
6618 match dictionary_encoding.compression.as_ref() {
6619 Some(Compression::General(general)) => {
6620 let compression = general.compression.as_ref().unwrap();
6622 assert!(
6623 compression.scheme()
6624 == pb21::CompressionScheme::CompressionAlgorithmLz4
6625 || compression.scheme()
6626 == pb21::CompressionScheme::CompressionAlgorithmZstd,
6627 "Expected LZ4 or Zstd compression for large dictionary"
6628 );
6629 }
6630 _ => panic!("Expected General compression for large dictionary"),
6631 }
6632 }
6633 }));
6634
6635 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
6636 }
6637
6638 fn dictionary_encoding_from_page(
6639 page: &crate::encoder::EncodedPage,
6640 ) -> &crate::format::pb21::CompressiveEncoding {
6641 let PageEncoding::Structural(layout) = &page.description else {
6642 panic!("Expected structural page encoding");
6643 };
6644 let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
6645 else {
6646 panic!("Expected mini-block layout");
6647 };
6648 layout
6649 .dictionary
6650 .as_ref()
6651 .unwrap_or_else(|| panic!("Expected dictionary encoding"))
6652 }
6653
6654 async fn encode_variable_dict_page(
6655 metadata: HashMap<String, String>,
6656 ) -> crate::encoder::EncodedPage {
6657 use arrow_array::types::Int32Type;
6658 use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
6659
6660 let values = Arc::new(StringArray::from(
6661 (0..128)
6662 .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
6663 .collect::<Vec<_>>(),
6664 )) as ArrayRef;
6665 let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
6666 let dict_array =
6667 Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
6668
6669 let field = arrow_schema::Field::new(
6670 "dict_col",
6671 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
6672 false,
6673 )
6674 .with_metadata(metadata);
6675
6676 encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
6677 }
6678
6679 async fn encode_auto_fixed_dict_page(
6680 metadata: HashMap<String, String>,
6681 ) -> crate::encoder::EncodedPage {
6682 use arrow_array::{ArrayRef, Decimal128Array};
6683
6684 let values = (0..20_000)
6686 .map(|i| match i % 3 {
6687 0 => 10_i128,
6688 1 => 20_i128,
6689 _ => 30_i128,
6690 })
6691 .collect::<Vec<_>>();
6692 let decimal = Decimal128Array::from_iter_values(values)
6693 .with_precision_and_scale(38, 0)
6694 .unwrap();
6695 let decimal = Arc::new(decimal) as ArrayRef;
6696
6697 let mut field_metadata = metadata;
6698 field_metadata.insert(
6700 "lance-encoding:dict-size-ratio".to_string(),
6701 "0.99".to_string(),
6702 );
6703 let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
6704 .with_metadata(field_metadata);
6705
6706 encode_first_page(field, decimal, LanceFileVersion::V2_2).await
6707 }
6708
6709 #[tokio::test]
6710 async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
6711 let page = encode_variable_dict_page(HashMap::new()).await;
6712 let dictionary_encoding = dictionary_encoding_from_page(&page);
6713 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6714 panic!("Expected General compression for dictionary values");
6715 };
6716 let compression = general.compression.as_ref().unwrap();
6717 assert_eq!(
6718 compression.scheme(),
6719 pb21::CompressionScheme::CompressionAlgorithmLz4
6720 );
6721 }
6722
6723 #[tokio::test]
6724 async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
6725 let page = encode_auto_fixed_dict_page(HashMap::new()).await;
6726 let dictionary_encoding = dictionary_encoding_from_page(&page);
6727 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6728 panic!("Expected General compression for dictionary values");
6729 };
6730 let compression = general.compression.as_ref().unwrap();
6731 assert_eq!(
6732 compression.scheme(),
6733 pb21::CompressionScheme::CompressionAlgorithmLz4
6734 );
6735 }
6736
6737 #[tokio::test]
6738 async fn test_dict_values_general_compression_zstd() {
6739 let mut metadata = HashMap::new();
6740 metadata.insert(
6741 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6742 "zstd".to_string(),
6743 );
6744 let page = encode_variable_dict_page(metadata).await;
6745 let dictionary_encoding = dictionary_encoding_from_page(&page);
6746 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6747 panic!("Expected General compression for dictionary values");
6748 };
6749 let compression = general.compression.as_ref().unwrap();
6750 assert_eq!(
6751 compression.scheme(),
6752 pb21::CompressionScheme::CompressionAlgorithmZstd
6753 );
6754 }
6755
6756 #[tokio::test]
6757 async fn test_dict_values_general_compression_none() {
6758 let mut metadata = HashMap::new();
6759 metadata.insert(
6760 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6761 "none".to_string(),
6762 );
6763 let page = encode_variable_dict_page(metadata).await;
6764 let dictionary_encoding = dictionary_encoding_from_page(&page);
6765 assert!(
6766 !matches!(
6767 dictionary_encoding.compression.as_ref(),
6768 Some(Compression::General(_))
6769 ),
6770 "Expected dictionary values to avoid General compression"
6771 );
6772 }
6773
6774 #[test]
6775 fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
6776 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6777 &HashMap::new(),
6778 None,
6779 None,
6780 );
6781 assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
6782 assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
6783 }
6784
6785 #[test]
6786 fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
6787 let field_metadata = HashMap::from([
6788 (
6789 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6790 "none".to_string(),
6791 ),
6792 (
6793 DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
6794 "7".to_string(),
6795 ),
6796 ]);
6797 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6798 &field_metadata,
6799 Some("zstd".to_string()),
6800 Some("3".to_string()),
6801 );
6802 assert_eq!(
6803 metadata.get(COMPRESSION_META_KEY),
6804 Some(&"none".to_string()),
6805 );
6806 assert_eq!(
6807 metadata.get(COMPRESSION_LEVEL_META_KEY),
6808 Some(&"7".to_string()),
6809 );
6810 }
6811
6812 #[test]
6813 fn test_resolve_dict_values_compression_metadata_env_fallback() {
6814 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6815 &HashMap::new(),
6816 Some("zstd".to_string()),
6817 Some("9".to_string()),
6818 );
6819 assert_eq!(
6820 metadata.get(COMPRESSION_META_KEY),
6821 Some(&"zstd".to_string()),
6822 );
6823 assert_eq!(
6824 metadata.get(COMPRESSION_LEVEL_META_KEY),
6825 Some(&"9".to_string()),
6826 );
6827 }
6828
6829 #[tokio::test]
6830 async fn test_dictionary_encode_int64() {
6831 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6832 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6833 use crate::version::LanceFileVersion;
6834 use arrow_array::{ArrayRef, Int64Array};
6835 use std::collections::HashMap;
6836 use std::sync::Arc;
6837
6838 let values = (0..1000)
6840 .map(|i| match i % 3 {
6841 0 => 10i64,
6842 1 => 20i64,
6843 _ => 30i64,
6844 })
6845 .collect::<Vec<_>>();
6846 let array = Arc::new(Int64Array::from(values)) as ArrayRef;
6847
6848 let mut metadata = HashMap::new();
6849 metadata.insert(
6850 STRUCTURAL_ENCODING_META_KEY.to_string(),
6851 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6852 );
6853 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6854
6855 let test_cases = TestCases::default()
6856 .with_min_file_version(LanceFileVersion::V2_2)
6857 .with_batch_size(1000)
6858 .with_range(0..1000)
6859 .with_indices(vec![0, 1, 10, 999])
6860 .with_expected_encoding("dictionary");
6861
6862 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6863 }
6864
6865 #[tokio::test]
6866 async fn test_dictionary_encode_float64() {
6867 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6868 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6869 use crate::version::LanceFileVersion;
6870 use arrow_array::{ArrayRef, Float64Array};
6871 use std::collections::HashMap;
6872 use std::sync::Arc;
6873
6874 let values = (0..1000)
6876 .map(|i| match i % 3 {
6877 0 => 0.1f64,
6878 1 => 0.2f64,
6879 _ => 0.3f64,
6880 })
6881 .collect::<Vec<_>>();
6882 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
6883
6884 let mut metadata = HashMap::new();
6885 metadata.insert(
6886 STRUCTURAL_ENCODING_META_KEY.to_string(),
6887 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6888 );
6889 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6890
6891 let test_cases = TestCases::default()
6892 .with_min_file_version(LanceFileVersion::V2_2)
6893 .with_batch_size(1000)
6894 .with_range(0..1000)
6895 .with_indices(vec![0, 1, 10, 999])
6896 .with_expected_encoding("dictionary");
6897
6898 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6899 }
6900
6901 #[test]
6902 fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
6903 let rows = 10_000;
6904 let unique_values = 2_000;
6905
6906 let dictionary_encoding =
6907 ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
6908 let layout = pb21::MiniBlockLayout {
6909 rep_compression: None,
6910 def_compression: None,
6911 value_compression: Some(ProtobufUtils21::flat(64, None)),
6912 dictionary: Some(dictionary_encoding),
6913 num_dictionary_items: unique_values,
6914 layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
6915 num_buffers: 1,
6916 repetition_index_depth: 0,
6917 num_items: rows,
6918 has_large_chunk: false,
6919 };
6920
6921 let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
6922 let scheduler = super::MiniBlockScheduler::try_new(
6923 &buffer_offsets_and_sizes,
6924 0,
6925 rows,
6926 &layout,
6927 &DefaultDecompressionStrategy::default(),
6928 )
6929 .unwrap();
6930
6931 let dictionary = scheduler.dictionary.unwrap();
6932 assert_eq!(dictionary.num_dictionary_items, unique_values);
6933 assert_eq!(
6934 dictionary.dictionary_data_alignment,
6935 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
6936 );
6937 }
6938
6939 fn create_test_fixed_data_block(
6941 num_values: u64,
6942 cardinality: u64,
6943 bits_per_value: u64,
6944 ) -> DataBlock {
6945 assert!(cardinality > 0);
6946 assert!(cardinality <= num_values);
6947 let block_info = BlockInfo::default();
6948
6949 assert_eq!(bits_per_value % 8, 0);
6950 let data = match bits_per_value {
6951 32 => {
6952 let values = (0..num_values)
6953 .map(|i| (i % cardinality) as u32)
6954 .collect::<Vec<_>>();
6955 crate::buffer::LanceBuffer::reinterpret_vec(values)
6956 }
6957 64 => {
6958 let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
6959 crate::buffer::LanceBuffer::reinterpret_vec(values)
6960 }
6961 128 => {
6962 let values = (0..num_values)
6963 .map(|i| (i % cardinality) as u128)
6964 .collect::<Vec<_>>();
6965 crate::buffer::LanceBuffer::reinterpret_vec(values)
6966 }
6967 _ => unreachable!(),
6968 };
6969 DataBlock::FixedWidth(FixedWidthDataBlock {
6970 bits_per_value,
6971 data,
6972 num_values,
6973 block_info,
6974 })
6975 }
6976
6977 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
6979 use arrow_array::StringArray;
6980
6981 assert!(cardinality <= num_values && cardinality > 0);
6982
6983 let mut values = Vec::with_capacity(num_values as usize);
6984 for i in 0..num_values {
6985 values.push(format!("value_{:016}", i % cardinality));
6986 }
6987
6988 let array = StringArray::from(values);
6989 DataBlock::from_array(Arc::new(array) as ArrayRef)
6990 }
6991
6992 #[test]
6993 fn test_should_dictionary_encode() {
6994 use crate::constants::DICT_SIZE_RATIO_META_KEY;
6995 use lance_core::datatypes::Field as LanceField;
6996
6997 let block = create_test_variable_width_block(1000, 10);
6999
7000 let mut metadata = HashMap::new();
7001 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7002 let arrow_field =
7003 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7004 let field = LanceField::try_from(&arrow_field).unwrap();
7005
7006 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7007 &block,
7008 &field,
7009 LanceFileVersion::V2_1,
7010 );
7011
7012 assert!(
7013 result.is_some(),
7014 "Should use dictionary encode based on size"
7015 );
7016 }
7017
7018 #[test]
7019 fn test_should_not_dictionary_encode_unsupported_bits() {
7020 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7021 use lance_core::datatypes::Field as LanceField;
7022
7023 let block = create_test_fixed_data_block(1000, 1000, 32);
7024
7025 let mut metadata = HashMap::new();
7026 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7027 let arrow_field =
7028 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
7029 let field = LanceField::try_from(&arrow_field).unwrap();
7030
7031 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7032 &block,
7033 &field,
7034 LanceFileVersion::V2_1,
7035 );
7036
7037 assert!(
7038 result.is_none(),
7039 "Should not use dictionary encode for unsupported bit width"
7040 );
7041 }
7042
7043 #[test]
7044 fn test_should_not_dictionary_encode_near_unique_sample() {
7045 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7046 use lance_core::datatypes::Field as LanceField;
7047
7048 let num_values = 5000;
7049 let block = create_test_variable_width_block(num_values, num_values);
7050
7051 let mut metadata = HashMap::new();
7052 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
7053 let arrow_field =
7054 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7055 let field = LanceField::try_from(&arrow_field).unwrap();
7056
7057 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7058 &block,
7059 &field,
7060 LanceFileVersion::V2_1,
7061 );
7062
7063 assert!(
7064 result.is_none(),
7065 "Should not probe dictionary encoding for near-unique data"
7066 );
7067 }
7068
7069 async fn encode_first_page(
7070 field: arrow_schema::Field,
7071 array: ArrayRef,
7072 version: LanceFileVersion,
7073 ) -> crate::encoder::EncodedPage {
7074 use crate::encoder::{
7075 ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
7076 default_encoding_strategy,
7077 };
7078 use crate::repdef::RepDefBuilder;
7079
7080 let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
7081 let encoding_strategy = default_encoding_strategy(version);
7082 let mut column_index_seq = ColumnIndexSequence::default();
7083 let encoding_options = EncodingOptions {
7084 cache_bytes_per_column: 1,
7085 max_page_bytes: 32 * 1024 * 1024,
7086 keep_original_array: true,
7087 buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
7088 version,
7089 };
7090
7091 let mut encoder = encoding_strategy
7092 .create_field_encoder(
7093 encoding_strategy.as_ref(),
7094 &lance_field,
7095 &mut column_index_seq,
7096 &encoding_options,
7097 )
7098 .unwrap();
7099
7100 let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7101 let repdef = RepDefBuilder::default();
7102 let num_rows = array.len() as u64;
7103 let mut pages = Vec::new();
7104 for task in encoder
7105 .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7106 .unwrap()
7107 {
7108 pages.push(task.await.unwrap());
7109 }
7110 for task in encoder.flush(&mut external_buffers).unwrap() {
7111 pages.push(task.await.unwrap());
7112 }
7113 pages.into_iter().next().unwrap()
7114 }
7115
7116 #[tokio::test]
7117 async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7118 use crate::format::pb21::page_layout::Layout;
7119
7120 let val = vec![0xABu8; 33];
7121 let arr: ArrayRef = Arc::new(
7122 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7123 std::iter::repeat_n(Some(val.as_slice()), 256),
7124 33,
7125 )
7126 .unwrap(),
7127 );
7128 let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7129 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7130
7131 let PageEncoding::Structural(layout) = &page.description else {
7132 panic!("Expected structural encoding");
7133 };
7134 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7135 panic!("Expected constant layout in slot 2");
7136 };
7137 assert!(layout.inline_value.is_none());
7138 assert_eq!(page.data.len(), 1);
7139
7140 let test_cases = TestCases::default()
7141 .with_min_file_version(LanceFileVersion::V2_2)
7142 .with_max_file_version(LanceFileVersion::V2_2)
7143 .with_page_sizes(vec![4096]);
7144 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7145 }
7146
7147 #[tokio::test]
7148 async fn test_constant_layout_out_of_line_utf8_v2_2() {
7149 use crate::format::pb21::page_layout::Layout;
7150
7151 let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7152 std::iter::repeat_n("hello", 512),
7153 ));
7154 let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7155 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7156
7157 let PageEncoding::Structural(layout) = &page.description else {
7158 panic!("Expected structural encoding");
7159 };
7160 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7161 panic!("Expected constant layout in slot 2");
7162 };
7163 assert!(layout.inline_value.is_none());
7164 assert_eq!(page.data.len(), 1);
7165
7166 let test_cases = TestCases::default()
7167 .with_min_file_version(LanceFileVersion::V2_2)
7168 .with_max_file_version(LanceFileVersion::V2_2)
7169 .with_page_sizes(vec![4096]);
7170 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7171 }
7172
7173 #[tokio::test]
7174 async fn test_constant_layout_nullable_item_v2_2() {
7175 use crate::format::pb21::page_layout::Layout;
7176
7177 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7178 Some(7),
7179 None,
7180 Some(7),
7181 None,
7182 Some(7),
7183 ]));
7184 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7185 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7186
7187 let PageEncoding::Structural(layout) = &page.description else {
7188 panic!("Expected structural encoding");
7189 };
7190 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7191 panic!("Expected constant layout in slot 2");
7192 };
7193 assert!(layout.inline_value.is_some());
7194 assert_eq!(page.data.len(), 2);
7195
7196 let test_cases = TestCases::default()
7197 .with_min_file_version(LanceFileVersion::V2_2)
7198 .with_max_file_version(LanceFileVersion::V2_2)
7199 .with_page_sizes(vec![4096]);
7200 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7201 }
7202
7203 #[tokio::test]
7204 async fn test_constant_layout_list_repdef_v2_2() {
7205 use crate::format::pb21::page_layout::Layout;
7206 use arrow_array::builder::{Int32Builder, ListBuilder};
7207
7208 let mut builder = ListBuilder::new(Int32Builder::new());
7209 builder.values().append_value(7);
7210 builder.values().append_null();
7211 builder.values().append_value(7);
7212 builder.append(true);
7213
7214 builder.append(true);
7215
7216 builder.values().append_value(7);
7217 builder.append(true);
7218
7219 builder.append_null();
7220
7221 let arr: ArrayRef = Arc::new(builder.finish());
7222 let field = arrow_schema::Field::new(
7223 "c",
7224 DataType::List(Arc::new(arrow_schema::Field::new(
7225 "item",
7226 DataType::Int32,
7227 true,
7228 ))),
7229 true,
7230 );
7231 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7232
7233 let PageEncoding::Structural(layout) = &page.description else {
7234 panic!("Expected structural encoding");
7235 };
7236 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7237 panic!("Expected constant layout in slot 2");
7238 };
7239 assert!(layout.inline_value.is_some());
7240 assert_eq!(page.data.len(), 2);
7241
7242 let test_cases = TestCases::default()
7243 .with_min_file_version(LanceFileVersion::V2_2)
7244 .with_max_file_version(LanceFileVersion::V2_2)
7245 .with_page_sizes(vec![4096]);
7246 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7247 }
7248
7249 #[tokio::test]
7250 async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7251 use crate::format::pb21::page_layout::Layout;
7252 use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7253
7254 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7255 for _ in 0..64 {
7256 builder.values().append_value(1);
7257 builder.values().append_null();
7258 builder.values().append_value(3);
7259 builder.append(true);
7260 }
7261 let arr: ArrayRef = Arc::new(builder.finish());
7262 let field = arrow_schema::Field::new(
7263 "c",
7264 DataType::FixedSizeList(
7265 Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7266 3,
7267 ),
7268 true,
7269 );
7270 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7271
7272 if let PageEncoding::Structural(layout) = &page.description {
7273 assert!(
7274 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7275 "FixedSizeList should not use constant layout yet"
7276 );
7277 }
7278
7279 let test_cases = TestCases::default()
7280 .with_min_file_version(LanceFileVersion::V2_2)
7281 .with_max_file_version(LanceFileVersion::V2_2)
7282 .with_page_sizes(vec![4096]);
7283 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7284 }
7285
7286 #[tokio::test]
7287 async fn test_constant_layout_not_written_before_v2_2() {
7288 use crate::format::pb21::page_layout::Layout;
7289
7290 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7291 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7292 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7293
7294 let PageEncoding::Structural(layout) = &page.description else {
7295 return;
7296 };
7297 assert!(
7298 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7299 "Should not emit constant layout before v2.2"
7300 );
7301
7302 let test_cases = TestCases::default()
7303 .with_min_file_version(LanceFileVersion::V2_1)
7304 .with_max_file_version(LanceFileVersion::V2_1)
7305 .with_page_sizes(vec![4096]);
7306 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7307 }
7308
7309 #[tokio::test]
7310 async fn test_all_null_constant_layout_still_works_v2_2() {
7311 use crate::format::pb21::page_layout::Layout;
7312
7313 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7314 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7315 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7316
7317 let PageEncoding::Structural(layout) = &page.description else {
7318 panic!("Expected structural encoding");
7319 };
7320 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7321 panic!("Expected layout in slot 2");
7322 };
7323 assert!(layout.inline_value.is_none());
7324 assert_eq!(page.data.len(), 0);
7325
7326 let test_cases = TestCases::default()
7327 .with_min_file_version(LanceFileVersion::V2_2)
7328 .with_max_file_version(LanceFileVersion::V2_2)
7329 .with_page_sizes(vec![4096]);
7330 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7331 }
7332
7333 #[test]
7334 fn test_encode_decode_complex_all_null_vals_roundtrip() {
7335 use crate::compression::{
7336 DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7337 };
7338
7339 let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7340
7341 let compression_strategy = DefaultCompressionStrategy::default();
7342 let decompression_strategy = DefaultDecompressionStrategy::default();
7343
7344 let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7345 &values,
7346 &compression_strategy,
7347 )
7348 .unwrap();
7349
7350 let decompressor = decompression_strategy
7351 .create_block_decompressor(&encoding)
7352 .unwrap();
7353 let decompressed = decompressor
7354 .decompress(compressed_buf, values.len() as u64)
7355 .unwrap();
7356 let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7357 assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7358 assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7359 let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7360 assert_eq!(rep_result.as_ref(), values.as_ref());
7361 }
7362
7363 #[tokio::test]
7364 async fn test_complex_all_null_compression_gated_by_version() {
7365 use crate::format::pb21::page_layout::Layout;
7366 use arrow_array::ListArray;
7367
7368 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7369 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7370 );
7371 let arr: ArrayRef = Arc::new(list_array);
7372 let field = arrow_schema::Field::new(
7373 "c",
7374 DataType::List(Arc::new(arrow_schema::Field::new(
7375 "item",
7376 DataType::Int32,
7377 true,
7378 ))),
7379 true,
7380 );
7381
7382 let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7383 let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7384 panic!("Expected structural encoding");
7385 };
7386 let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7387 panic!("Expected constant layout");
7388 };
7389 assert!(layout_v21.rep_compression.is_none());
7390 assert!(layout_v21.def_compression.is_none());
7391 assert_eq!(layout_v21.num_rep_values, 0);
7392 assert_eq!(layout_v21.num_def_values, 0);
7393
7394 let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7395 let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7396 panic!("Expected structural encoding");
7397 };
7398 let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7399 panic!("Expected constant layout");
7400 };
7401 assert!(layout_v22.def_compression.is_some());
7402 assert!(layout_v22.num_def_values > 0);
7403 }
7404
7405 #[tokio::test]
7406 async fn test_complex_all_null_round_trip() {
7407 use arrow_array::ListArray;
7408
7409 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7410 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7411 );
7412
7413 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7414 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7415 .await;
7416 }
7417}