1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 ProtobufUtils21,
23 pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24 },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35 cache::{CacheKey, Context, DeepSizeOf},
36 error::{Error, LanceOptionExt},
37 utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::{
42 compression::{
43 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44 },
45 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46 utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50 encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56 encodings::logical::primitive::miniblock::MiniBlockCompressed,
57 statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60 repdef::{
61 CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
62 RepDefSlicer, build_control_word_iterator,
63 },
64 utils::accumulation::AccumulationQueue,
65};
66use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
67
68use crate::constants::{
69 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
70 DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
71 DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
72 DICT_VALUES_COMPRESSION_META_KEY,
73};
74use crate::version::LanceFileVersion;
75use crate::{
76 EncodingsIo,
77 buffer::LanceBuffer,
78 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
79 decoder::{
80 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
81 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
82 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
83 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
84 },
85 encoder::{
86 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
87 },
88 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
89};
90
91pub mod blob;
92pub mod constant;
93pub mod dict;
94pub mod fullzip;
95pub mod miniblock;
96
97const FILL_BYTE: u8 = 0xFE;
98const DEFAULT_DICT_DIVISOR: u64 = 2;
99const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
100const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
101const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
102
103struct PageLoadTask {
104 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
105 num_rows: u64,
106}
107
108trait StructuralPageScheduler: std::fmt::Debug + Send {
111 fn initialize<'a>(
113 &'a mut self,
114 io: &Arc<dyn EncodingsIo>,
115 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
116 fn load(&mut self, data: &Arc<dyn CachedPageData>);
118 fn schedule_ranges(
127 &self,
128 ranges: &[Range<u64>],
129 io: &Arc<dyn EncodingsIo>,
130 ) -> Result<Vec<PageLoadTask>>;
131}
132
133#[derive(Debug)]
135struct ChunkMeta {
136 num_values: u64,
137 chunk_size_bytes: u64,
138 offset_bytes: u64,
139}
140
141#[derive(Debug, Clone)]
143struct DecodedMiniBlockChunk {
144 rep: Option<ScalarBuffer<u16>>,
145 def: Option<ScalarBuffer<u16>>,
146 values: DataBlock,
147}
148
149#[derive(Debug)]
157struct DecodeMiniBlockTask {
158 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
159 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
160 value_decompressor: Arc<dyn MiniBlockDecompressor>,
161 dictionary_data: Option<Arc<DataBlock>>,
162 def_meaning: Arc<[DefinitionInterpretation]>,
163 num_buffers: u64,
164 max_visible_level: u16,
165 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
166 has_large_chunk: bool,
167}
168
169impl DecodeMiniBlockTask {
170 fn decode_levels(
171 rep_decompressor: &dyn BlockDecompressor,
172 levels: LanceBuffer,
173 num_levels: u16,
174 ) -> Result<ScalarBuffer<u16>> {
175 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
176 let rep = rep.as_fixed_width().unwrap();
177 debug_assert_eq!(rep.num_values, num_levels as u64);
178 debug_assert_eq!(rep.bits_per_value, 16);
179 Ok(rep.data.borrow_to_typed_slice::<u16>())
180 }
181
182 fn extend_levels(
189 range: Range<u64>,
190 levels: &mut Option<LevelBuffer>,
191 level_buf: &Option<impl AsRef<[u16]>>,
192 dest_offset: usize,
193 ) {
194 if let Some(level_buf) = level_buf {
195 if levels.is_none() {
196 let mut new_levels_vec =
199 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
200 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
201 *levels = Some(new_levels_vec);
202 }
203 levels.as_mut().unwrap().extend(
204 level_buf.as_ref()[range.start as usize..range.end as usize]
205 .iter()
206 .copied(),
207 );
208 } else if let Some(levels) = levels {
209 let num_values = (range.end - range.start) as usize;
210 levels.extend(iter::repeat_n(0, num_values));
213 }
214 }
215
216 fn map_range(
253 range: Range<u64>,
254 rep: Option<&impl AsRef<[u16]>>,
255 def: Option<&impl AsRef<[u16]>>,
256 max_rep: u16,
257 max_visible_def: u16,
258 total_items: u64,
261 preamble_action: PreambleAction,
262 ) -> (Range<u64>, Range<u64>) {
263 if let Some(rep) = rep {
264 let mut rep = rep.as_ref();
265 let mut items_in_preamble = 0_u64;
268 let first_row_start = match preamble_action {
269 PreambleAction::Skip | PreambleAction::Take => {
270 let first_row_start = if let Some(def) = def.as_ref() {
271 let mut first_row_start = None;
272 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
273 if *rep == max_rep {
274 first_row_start = Some(idx as u64);
275 break;
276 }
277 if *def <= max_visible_def {
278 items_in_preamble += 1;
279 }
280 }
281 first_row_start
282 } else {
283 let first_row_start =
284 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
285 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
286 first_row_start
287 };
288 if first_row_start.is_none() {
291 assert!(preamble_action == PreambleAction::Take);
292 return (0..total_items, 0..rep.len() as u64);
293 }
294 let first_row_start = first_row_start.unwrap();
295 rep = &rep[first_row_start as usize..];
296 first_row_start
297 }
298 PreambleAction::Absent => {
299 debug_assert!(rep[0] == max_rep);
300 0
301 }
302 };
303
304 if range.start == range.end {
306 debug_assert!(preamble_action == PreambleAction::Take);
307 debug_assert!(items_in_preamble <= total_items);
308 return (0..items_in_preamble, 0..first_row_start);
309 }
310 assert!(range.start < range.end);
311
312 let mut rows_seen = 0;
313 let mut new_start = 0;
314 let mut new_levels_start = 0;
315
316 if let Some(def) = def {
317 let def = &def.as_ref()[first_row_start as usize..];
318
319 let mut lead_invis_seen = 0;
321
322 if range.start > 0 {
323 if def[0] > max_visible_def {
324 lead_invis_seen += 1;
325 }
326 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
327 if *rep == max_rep {
328 rows_seen += 1;
329 if rows_seen == range.start {
330 new_start = idx as u64 + 1 - lead_invis_seen;
331 new_levels_start = idx as u64 + 1;
332 break;
333 }
334 }
335 if *def > max_visible_def {
336 lead_invis_seen += 1;
337 }
338 }
339 }
340
341 rows_seen += 1;
342
343 let mut new_end = u64::MAX;
344 let mut new_levels_end = rep.len() as u64;
345 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
346 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
347 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
348 .iter()
349 .zip(&def[(new_levels_start + 1) as usize..])
350 .enumerate()
351 {
352 if *rep == max_rep {
353 rows_seen += 1;
354 if rows_seen == range.end + 1 {
355 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
356 new_levels_end = idx as u64 + new_levels_start + 1;
357 break;
358 }
359 }
360 if *def > max_visible_def {
361 tail_invis_seen += 1;
362 }
363 }
364
365 if new_end == u64::MAX {
366 new_levels_end = rep.len() as u64;
367 let total_invis_seen = lead_invis_seen + tail_invis_seen;
368 new_end = rep.len() as u64 - total_invis_seen;
369 }
370
371 assert_ne!(new_end, u64::MAX);
372
373 if preamble_action == PreambleAction::Skip {
375 new_start += items_in_preamble;
376 new_end += items_in_preamble;
377 new_levels_start += first_row_start;
378 new_levels_end += first_row_start;
379 } else if preamble_action == PreambleAction::Take {
380 debug_assert_eq!(new_start, 0);
381 debug_assert_eq!(new_levels_start, 0);
382 new_end += items_in_preamble;
383 new_levels_end += first_row_start;
384 }
385
386 debug_assert!(new_end <= total_items);
387 (new_start..new_end, new_levels_start..new_levels_end)
388 } else {
389 if range.start > 0 {
395 for (idx, rep) in rep.iter().skip(1).enumerate() {
396 if *rep == max_rep {
397 rows_seen += 1;
398 if rows_seen == range.start {
399 new_start = idx as u64 + 1;
400 break;
401 }
402 }
403 }
404 }
405 let mut new_end = rep.len() as u64;
406 if range.end < total_items {
408 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
409 if *rep == max_rep {
410 rows_seen += 1;
411 if rows_seen == range.end {
412 new_end = idx as u64 + new_start + 1;
413 break;
414 }
415 }
416 }
417 }
418
419 if preamble_action == PreambleAction::Skip {
421 new_start += first_row_start;
422 new_end += first_row_start;
423 } else if preamble_action == PreambleAction::Take {
424 debug_assert_eq!(new_start, 0);
425 new_end += first_row_start;
426 }
427
428 debug_assert!(new_end <= total_items);
429 (new_start..new_end, new_start..new_end)
430 }
431 } else {
432 (range.clone(), range)
435 }
436 }
437
438 fn read_buffer_sizes<const LARGE: bool>(
440 buf: &[u8],
441 offset: &mut usize,
442 num_buffers: u64,
443 ) -> Vec<u32> {
444 let read_size = if LARGE { 4 } else { 2 };
445 (0..num_buffers)
446 .map(|_| {
447 let bytes = &buf[*offset..*offset + read_size];
448 let size = if LARGE {
449 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
450 } else {
451 u16::from_le_bytes([bytes[0], bytes[1]]) as u32
453 };
454 *offset += read_size;
455 size
456 })
457 .collect()
458 }
459
460 fn decode_miniblock_chunk(
462 &self,
463 buf: &LanceBuffer,
464 items_in_chunk: u64,
465 ) -> Result<DecodedMiniBlockChunk> {
466 let mut offset = 0;
467 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
468 offset += 2;
469
470 let rep_size = if self.rep_decompressor.is_some() {
471 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
472 offset += 2;
473 Some(rep_size)
474 } else {
475 None
476 };
477 let def_size = if self.def_decompressor.is_some() {
478 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
479 offset += 2;
480 Some(def_size)
481 } else {
482 None
483 };
484
485 let buffer_sizes = if self.has_large_chunk {
486 Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
487 } else {
488 Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
489 };
490
491 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
492
493 let rep = rep_size.map(|rep_size| {
494 let rep = buf.slice_with_length(offset, rep_size as usize);
495 offset += rep_size as usize;
496 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
497 rep
498 });
499
500 let def = def_size.map(|def_size| {
501 let def = buf.slice_with_length(offset, def_size as usize);
502 offset += def_size as usize;
503 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504 def
505 });
506
507 let buffers = buffer_sizes
508 .into_iter()
509 .map(|buf_size| {
510 let buf = buf.slice_with_length(offset, buf_size as usize);
511 offset += buf_size as usize;
512 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
513 buf
514 })
515 .collect::<Vec<_>>();
516
517 let values = self
518 .value_decompressor
519 .decompress(buffers, items_in_chunk)?;
520
521 let rep = rep
522 .map(|rep| {
523 Self::decode_levels(
524 self.rep_decompressor.as_ref().unwrap().as_ref(),
525 rep,
526 num_levels,
527 )
528 })
529 .transpose()?;
530 let def = def
531 .map(|def| {
532 Self::decode_levels(
533 self.def_decompressor.as_ref().unwrap().as_ref(),
534 def,
535 num_levels,
536 )
537 })
538 .transpose()?;
539
540 Ok(DecodedMiniBlockChunk { rep, def, values })
541 }
542}
543
544impl DecodePageTask for DecodeMiniBlockTask {
545 fn decode(self: Box<Self>) -> Result<DecodedPage> {
546 let mut repbuf: Option<LevelBuffer> = None;
548 let mut defbuf: Option<LevelBuffer> = None;
549
550 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
551
552 let estimated_size_bytes = self
554 .instructions
555 .iter()
556 .map(|(_, chunk)| chunk.data.len())
557 .sum::<usize>()
558 * 2;
559 let mut data_builder =
560 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
561
562 let mut level_offset = 0;
564
565 let needs_caching: Vec<bool> = self
567 .instructions
568 .windows(2)
569 .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
570 .chain(std::iter::once(false)) .collect();
572
573 let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
575
576 for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
578 let should_cache_this_chunk = needs_caching[idx];
579
580 let decoded_chunk = match &chunk_cache {
581 Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
582 cached_chunk.clone()
584 }
585 _ => {
586 let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
588
589 if should_cache_this_chunk {
591 chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
592 }
593 decoded
594 }
595 };
596
597 let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
598
599 let row_range_start =
601 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
602 let row_range_end = row_range_start + instructions.rows_to_take;
603
604 let (item_range, level_range) = Self::map_range(
606 row_range_start..row_range_end,
607 rep.as_ref(),
608 def.as_ref(),
609 max_rep,
610 self.max_visible_level,
611 chunk.items_in_chunk,
612 instructions.preamble_action,
613 );
614 if item_range.end - item_range.start > chunk.items_in_chunk {
615 return Err(lance_core::Error::internal(format!(
616 "Item range {:?} is greater than chunk items in chunk {:?}",
617 item_range, chunk.items_in_chunk
618 )));
619 }
620
621 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
623 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
624 level_offset += (level_range.end - level_range.start) as usize;
625 data_builder.append(&values, item_range);
626 }
627
628 let mut data = data_builder.finish();
629
630 let unraveler =
631 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
632
633 if let Some(dictionary) = &self.dictionary_data {
634 let DataBlock::FixedWidth(indices) = data else {
636 return Err(lance_core::Error::internal(format!(
637 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
638 data
639 )));
640 };
641 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
642 indices,
643 dictionary.as_ref().clone(),
644 ));
645 }
646
647 Ok(DecodedPage {
648 data,
649 repdef: unraveler,
650 })
651 }
652}
653
654#[derive(Debug)]
657struct LoadedChunk {
658 data: LanceBuffer,
659 items_in_chunk: u64,
660 byte_range: Range<u64>,
661 chunk_idx: usize,
662}
663
664impl Clone for LoadedChunk {
665 fn clone(&self) -> Self {
666 Self {
667 data: self.data.clone(),
669 items_in_chunk: self.items_in_chunk,
670 byte_range: self.byte_range.clone(),
671 chunk_idx: self.chunk_idx,
672 }
673 }
674}
675
676#[derive(Debug)]
679struct MiniBlockDecoder {
680 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
681 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
682 value_decompressor: Arc<dyn MiniBlockDecompressor>,
683 def_meaning: Arc<[DefinitionInterpretation]>,
684 loaded_chunks: VecDeque<LoadedChunk>,
685 instructions: VecDeque<ChunkInstructions>,
686 offset_in_current_chunk: u64,
687 num_rows: u64,
688 num_buffers: u64,
689 dictionary: Option<Arc<DataBlock>>,
690 has_large_chunk: bool,
691}
692
693impl StructuralPageDecoder for MiniBlockDecoder {
696 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
697 let mut items_desired = num_rows;
698 let mut need_preamble = false;
699 let mut skip_in_chunk = self.offset_in_current_chunk;
700 let mut drain_instructions = Vec::new();
701 while items_desired > 0 || need_preamble {
702 let (instructions, consumed) = self
703 .instructions
704 .front()
705 .unwrap()
706 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
707
708 while self.loaded_chunks.front().unwrap().chunk_idx
709 != instructions.chunk_instructions.chunk_idx
710 {
711 self.loaded_chunks.pop_front();
712 }
713 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
714 if consumed {
715 self.instructions.pop_front();
716 }
717 }
718 self.offset_in_current_chunk = skip_in_chunk;
721
722 let max_visible_level = self
723 .def_meaning
724 .iter()
725 .take_while(|l| !l.is_list())
726 .map(|l| l.num_def_levels())
727 .sum::<u16>();
728
729 Ok(Box::new(DecodeMiniBlockTask {
730 instructions: drain_instructions,
731 def_decompressor: self.def_decompressor.clone(),
732 rep_decompressor: self.rep_decompressor.clone(),
733 value_decompressor: self.value_decompressor.clone(),
734 dictionary_data: self.dictionary.clone(),
735 def_meaning: self.def_meaning.clone(),
736 num_buffers: self.num_buffers,
737 max_visible_level,
738 has_large_chunk: self.has_large_chunk,
739 }))
740 }
741
742 fn num_rows(&self) -> u64 {
743 self.num_rows
744 }
745}
746
747#[derive(Debug)]
748struct CachedComplexAllNullState {
749 rep: Option<ScalarBuffer<u16>>,
750 def: Option<ScalarBuffer<u16>>,
751}
752
753impl DeepSizeOf for CachedComplexAllNullState {
754 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
755 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
757 }
758}
759
760impl CachedPageData for CachedComplexAllNullState {
761 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
762 self
763 }
764}
765
766#[derive(Debug)]
775pub struct ComplexAllNullScheduler {
776 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
778 def_meaning: Arc<[DefinitionInterpretation]>,
779 repdef: Option<Arc<CachedComplexAllNullState>>,
780 max_rep: u16,
781 max_visible_level: u16,
782 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
783 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
784 num_rep_values: u64,
785 num_def_values: u64,
786}
787
788impl ComplexAllNullScheduler {
789 pub fn new(
790 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
791 def_meaning: Arc<[DefinitionInterpretation]>,
792 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
793 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
794 num_rep_values: u64,
795 num_def_values: u64,
796 ) -> Self {
797 let max_rep = def_meaning.iter().filter(|l| l.is_list()).count() as u16;
798 let max_visible_level = def_meaning
799 .iter()
800 .take_while(|l| !l.is_list())
801 .map(|l| l.num_def_levels())
802 .sum::<u16>();
803 Self {
804 buffer_offsets_and_sizes,
805 def_meaning,
806 repdef: None,
807 max_rep,
808 max_visible_level,
809 rep_decompressor,
810 def_decompressor,
811 num_rep_values,
812 num_def_values,
813 }
814 }
815}
816
817impl StructuralPageScheduler for ComplexAllNullScheduler {
818 fn initialize<'a>(
819 &'a mut self,
820 io: &Arc<dyn EncodingsIo>,
821 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
822 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
824 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
825 let has_rep = rep_size > 0;
826 let has_def = def_size > 0;
827
828 let mut reads = Vec::with_capacity(2);
829 if has_rep {
830 reads.push(rep_pos..rep_pos + rep_size);
831 }
832 if has_def {
833 reads.push(def_pos..def_pos + def_size);
834 }
835
836 let data = io.submit_request(reads, 0);
837 let rep_decompressor = self.rep_decompressor.clone();
838 let def_decompressor = self.def_decompressor.clone();
839 let num_rep_values = self.num_rep_values;
840 let num_def_values = self.num_def_values;
841
842 async move {
843 let data = data.await?;
844 let mut data_iter = data.into_iter();
845
846 let decompress_levels = |compressed_bytes: Bytes,
847 decompressor: &Arc<dyn BlockDecompressor>,
848 num_values: u64,
849 level_type: &str|
850 -> Result<ScalarBuffer<u16>> {
851 let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
852 let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
853 match decompressed {
854 DataBlock::FixedWidth(block) => {
855 if block.num_values != num_values {
856 return Err(Error::invalid_input_source(format!(
857 "Unexpected {} level count after decompression: expected {}, got {}",
858 level_type, num_values, block.num_values
859 )
860 .into()));
861 }
862 if block.bits_per_value != 16 {
863 return Err(Error::invalid_input_source(format!(
864 "Unexpected {} level bit width after decompression: expected 16, got {}",
865 level_type, block.bits_per_value
866 )
867 .into()));
868 }
869 Ok(block.data.borrow_to_typed_slice::<u16>())
870 }
871 _ => Err(Error::invalid_input_source(format!(
872 "Expected fixed-width data block for {} levels",
873 level_type
874 )
875 .into())),
876 }
877 };
878
879 let rep = if has_rep {
880 let rep = data_iter.next().unwrap();
881 if let Some(rep_decompressor) = rep_decompressor.as_ref() {
882 Some(decompress_levels(
883 rep,
884 rep_decompressor,
885 num_rep_values,
886 "repetition",
887 )?)
888 } else {
889 let rep = LanceBuffer::from_bytes(rep, 2);
890 let rep = rep.borrow_to_typed_slice::<u16>();
891 Some(rep)
892 }
893 } else {
894 None
895 };
896
897 let def = if has_def {
898 let def = data_iter.next().unwrap();
899 if let Some(def_decompressor) = def_decompressor.as_ref() {
900 Some(decompress_levels(
901 def,
902 def_decompressor,
903 num_def_values,
904 "definition",
905 )?)
906 } else {
907 let def = LanceBuffer::from_bytes(def, 2);
908 let def = def.borrow_to_typed_slice::<u16>();
909 Some(def)
910 }
911 } else {
912 None
913 };
914
915 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
916
917 self.repdef = Some(repdef.clone());
918
919 Ok(repdef as Arc<dyn CachedPageData>)
920 }
921 .boxed()
922 }
923
924 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
925 self.repdef = Some(
926 data.clone()
927 .as_arc_any()
928 .downcast::<CachedComplexAllNullState>()
929 .unwrap(),
930 );
931 }
932
933 fn schedule_ranges(
934 &self,
935 ranges: &[Range<u64>],
936 _io: &Arc<dyn EncodingsIo>,
937 ) -> Result<Vec<PageLoadTask>> {
938 let ranges = VecDeque::from_iter(ranges.iter().cloned());
939 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
940 let decoder = Box::new(ComplexAllNullPageDecoder {
941 ranges,
942 rep: self.repdef.as_ref().unwrap().rep.clone(),
943 def: self.repdef.as_ref().unwrap().def.clone(),
944 num_rows,
945 def_meaning: self.def_meaning.clone(),
946 max_rep: self.max_rep,
947 max_visible_level: self.max_visible_level,
948 cursor_row: 0,
949 cursor_level: 0,
950 }) as Box<dyn StructuralPageDecoder>;
951 let page_load_task = PageLoadTask {
952 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
953 num_rows,
954 };
955 Ok(vec![page_load_task])
956 }
957}
958
959#[derive(Debug)]
960pub struct ComplexAllNullPageDecoder {
961 ranges: VecDeque<Range<u64>>,
962 rep: Option<ScalarBuffer<u16>>,
963 def: Option<ScalarBuffer<u16>>,
964 num_rows: u64,
965 def_meaning: Arc<[DefinitionInterpretation]>,
966 max_rep: u16,
967 max_visible_level: u16,
968 cursor_row: u64,
969 cursor_level: usize,
970}
971
972impl ComplexAllNullPageDecoder {
973 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
974 let mut rows_desired = num_rows;
975 let mut ranges = Vec::with_capacity(self.ranges.len());
976 while rows_desired > 0 {
977 let front = self.ranges.front_mut().unwrap();
978 let avail = front.end - front.start;
979 if avail > rows_desired {
980 ranges.push(front.start..front.start + rows_desired);
981 front.start += rows_desired;
982 rows_desired = 0;
983 } else {
984 ranges.push(self.ranges.pop_front().unwrap());
985 rows_desired -= avail;
986 }
987 }
988 ranges
989 }
990
991 fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
992 let start = self.cursor_level;
993 let end = if let Some(rep) = &self.rep {
994 if start >= rep.len() {
995 return Err(Error::internal(
996 "Invalid complex all-null layout: repetition buffer too short",
997 ));
998 }
999 if rep[start] != self.max_rep {
1000 return Err(Error::internal(
1001 "Invalid complex all-null layout: row did not start at max repetition level",
1002 ));
1003 }
1004 let mut end = start + 1;
1005 while end < rep.len() && rep[end] != self.max_rep {
1006 end += 1;
1007 }
1008 end
1009 } else {
1010 start + 1
1011 };
1012
1013 let visible = if let Some(def) = &self.def {
1014 if end > def.len() {
1015 return Err(Error::internal(
1016 "Invalid complex all-null layout: definition buffer too short",
1017 ));
1018 }
1019 def[start..end]
1020 .iter()
1021 .filter(|d| **d <= self.max_visible_level)
1022 .count() as u64
1023 } else {
1024 (end - start) as u64
1025 };
1026
1027 self.cursor_level = end;
1028 self.cursor_row += 1;
1029 Ok((start..end, visible))
1030 }
1031
1032 fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
1033 while self.cursor_row < target_row {
1034 self.take_row()?;
1035 }
1036 Ok(())
1037 }
1038}
1039
1040impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1041 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1042 let drained_ranges = self.drain_ranges(num_rows);
1043 let mut level_slices: Vec<Range<usize>> = Vec::new();
1044 let mut visible_items_total = 0;
1045
1046 for range in drained_ranges {
1047 self.skip_to_row(range.start)?;
1048 for _ in range.start..range.end {
1049 let (level_range, visible) = self.take_row()?;
1050 visible_items_total += visible;
1051 if let Some(last) = level_slices.last_mut()
1052 && last.end == level_range.start
1053 {
1054 last.end = level_range.end;
1055 continue;
1056 }
1057 level_slices.push(level_range);
1058 }
1059 }
1060
1061 Ok(Box::new(DecodeComplexAllNullTask {
1062 level_slices,
1063 visible_items_total,
1064 rep: self.rep.clone(),
1065 def: self.def.clone(),
1066 def_meaning: self.def_meaning.clone(),
1067 max_visible_level: self.max_visible_level,
1068 }))
1069 }
1070
1071 fn num_rows(&self) -> u64 {
1072 self.num_rows
1073 }
1074}
1075
1076#[derive(Debug)]
1079pub struct DecodeComplexAllNullTask {
1080 level_slices: Vec<Range<usize>>,
1081 visible_items_total: u64,
1082 rep: Option<ScalarBuffer<u16>>,
1083 def: Option<ScalarBuffer<u16>>,
1084 def_meaning: Arc<[DefinitionInterpretation]>,
1085 max_visible_level: u16,
1086}
1087
1088impl DecodeComplexAllNullTask {
1089 fn decode_level(&self, levels: &Option<ScalarBuffer<u16>>) -> Option<Vec<u16>> {
1090 levels.as_ref().map(|levels| {
1091 let num_levels = self
1092 .level_slices
1093 .iter()
1094 .map(|range| range.end - range.start)
1095 .sum();
1096 let mut referenced_levels = Vec::with_capacity(num_levels);
1097 for range in &self.level_slices {
1098 referenced_levels.extend(levels[range.start..range.end].iter().copied());
1099 }
1100 referenced_levels
1101 })
1102 }
1103}
1104
1105impl DecodePageTask for DecodeComplexAllNullTask {
1106 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1107 let rep = self.decode_level(&self.rep);
1108 let def = self.decode_level(&self.def);
1109
1110 let num_values = if let Some(def) = &def {
1114 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1115 } else {
1116 self.visible_items_total
1117 };
1118
1119 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1120 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1121 Ok(DecodedPage {
1122 data,
1123 repdef: unraveler,
1124 })
1125 }
1126}
1127
1128#[derive(Debug, Default)]
1133pub struct SimpleAllNullScheduler {}
1134
1135impl StructuralPageScheduler for SimpleAllNullScheduler {
1136 fn initialize<'a>(
1137 &'a mut self,
1138 _io: &Arc<dyn EncodingsIo>,
1139 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1140 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1141 }
1142
1143 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1144
1145 fn schedule_ranges(
1146 &self,
1147 ranges: &[Range<u64>],
1148 _io: &Arc<dyn EncodingsIo>,
1149 ) -> Result<Vec<PageLoadTask>> {
1150 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1151 let decoder =
1152 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1153 let page_load_task = PageLoadTask {
1154 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1155 num_rows,
1156 };
1157 Ok(vec![page_load_task])
1158 }
1159}
1160
1161#[derive(Debug)]
1164struct SimpleAllNullDecodePageTask {
1165 num_values: u64,
1166}
1167impl DecodePageTask for SimpleAllNullDecodePageTask {
1168 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1169 let unraveler = RepDefUnraveler::new(
1170 None,
1171 Some(vec![1; self.num_values as usize]),
1172 Arc::new([DefinitionInterpretation::NullableItem]),
1173 self.num_values,
1174 );
1175 Ok(DecodedPage {
1176 data: DataBlock::AllNull(AllNullDataBlock {
1177 num_values: self.num_values,
1178 }),
1179 repdef: unraveler,
1180 })
1181 }
1182}
1183
1184#[derive(Debug)]
1185pub struct SimpleAllNullPageDecoder {
1186 num_rows: u64,
1187}
1188
1189impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1190 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1191 Ok(Box::new(SimpleAllNullDecodePageTask {
1192 num_values: num_rows,
1193 }))
1194 }
1195
1196 fn num_rows(&self) -> u64 {
1197 self.num_rows
1198 }
1199}
1200
1201#[derive(Debug, Clone)]
1202struct MiniBlockSchedulerDictionary {
1203 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1205 dictionary_buf_position_and_size: (u64, u64),
1206 dictionary_data_alignment: u64,
1207 num_dictionary_items: u64,
1208}
1209
1210#[derive(Debug)]
1212struct MiniBlockRepIndexBlock {
1213 first_row: u64,
1217 starts_including_trailer: u64,
1220 has_preamble: bool,
1222 has_trailer: bool,
1224}
1225
1226impl DeepSizeOf for MiniBlockRepIndexBlock {
1227 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1228 0
1229 }
1230}
1231
1232#[derive(Debug)]
1237struct MiniBlockRepIndex {
1238 blocks: Vec<MiniBlockRepIndexBlock>,
1239}
1240
1241impl DeepSizeOf for MiniBlockRepIndex {
1242 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1243 self.blocks.deep_size_of_children(context)
1244 }
1245}
1246
1247impl MiniBlockRepIndex {
1248 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1253 let mut blocks = Vec::with_capacity(chunks.len());
1254 let mut offset: u64 = 0;
1255
1256 for c in chunks {
1257 blocks.push(MiniBlockRepIndexBlock {
1258 first_row: offset,
1259 starts_including_trailer: c.num_values,
1260 has_preamble: false,
1261 has_trailer: false,
1262 });
1263
1264 offset += c.num_values;
1265 }
1266
1267 Self { blocks }
1268 }
1269
1270 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1276 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1278 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1279 let n = u64_slice.len() / stride;
1280
1281 let mut blocks = Vec::with_capacity(n);
1282 let mut chunk_has_preamble = false;
1283 let mut offset: u64 = 0;
1284
1285 for i in 0..n {
1287 let base_idx = i * stride;
1288 let ends = u64_slice[base_idx];
1289 let partial = u64_slice[base_idx + 1];
1290
1291 let has_trailer = partial > 0;
1292 let starts_including_trailer =
1294 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1295
1296 blocks.push(MiniBlockRepIndexBlock {
1297 first_row: offset,
1298 starts_including_trailer,
1299 has_preamble: chunk_has_preamble,
1300 has_trailer,
1301 });
1302
1303 chunk_has_preamble = has_trailer;
1304 offset += starts_including_trailer;
1305 }
1306
1307 Self { blocks }
1308 }
1309}
1310
1311#[derive(Debug)]
1313struct MiniBlockCacheableState {
1314 chunk_meta: Vec<ChunkMeta>,
1316 rep_index: MiniBlockRepIndex,
1318 dictionary: Option<Arc<DataBlock>>,
1320}
1321
1322impl DeepSizeOf for MiniBlockCacheableState {
1323 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1324 self.rep_index.deep_size_of_children(context)
1325 + self
1326 .dictionary
1327 .as_ref()
1328 .map(|dict| dict.data_size() as usize)
1329 .unwrap_or(0)
1330 }
1331}
1332
1333impl CachedPageData for MiniBlockCacheableState {
1334 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1335 self
1336 }
1337}
1338
1339#[derive(Debug)]
1366pub struct MiniBlockScheduler {
1367 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1369 priority: u64,
1370 items_in_page: u64,
1371 repetition_index_depth: u16,
1372 num_buffers: u64,
1373 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1374 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1375 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1376 def_meaning: Arc<[DefinitionInterpretation]>,
1377 dictionary: Option<MiniBlockSchedulerDictionary>,
1378 page_meta: Option<Arc<MiniBlockCacheableState>>,
1380 has_large_chunk: bool,
1381}
1382
1383impl MiniBlockScheduler {
1384 fn try_new(
1385 buffer_offsets_and_sizes: &[(u64, u64)],
1386 priority: u64,
1387 items_in_page: u64,
1388 layout: &pb21::MiniBlockLayout,
1389 decompressors: &dyn DecompressionStrategy,
1390 ) -> Result<Self> {
1391 let rep_decompressor = layout
1392 .rep_compression
1393 .as_ref()
1394 .map(|rep_compression| {
1395 decompressors
1396 .create_block_decompressor(rep_compression)
1397 .map(Arc::from)
1398 })
1399 .transpose()?;
1400 let def_decompressor = layout
1401 .def_compression
1402 .as_ref()
1403 .map(|def_compression| {
1404 decompressors
1405 .create_block_decompressor(def_compression)
1406 .map(Arc::from)
1407 })
1408 .transpose()?;
1409 let def_meaning = layout
1410 .layers
1411 .iter()
1412 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1413 .collect::<Vec<_>>();
1414 let value_decompressor = decompressors.create_miniblock_decompressor(
1415 layout.value_compression.as_ref().unwrap(),
1416 decompressors,
1417 )?;
1418
1419 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1420 let num_dictionary_items = layout.num_dictionary_items;
1421 let dictionary_decompressor = decompressors
1422 .create_block_decompressor(dictionary_encoding)?
1423 .into();
1424 let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1425 {
1426 Compression::Variable(_) => 4,
1427 Compression::Flat(_) => 16,
1428 Compression::General(_) => 1,
1429 Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1430 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1431 }
1432 _ => {
1433 return Err(Error::invalid_input_source(
1434 format!(
1435 "Unsupported mini-block dictionary encoding: {:?}",
1436 dictionary_encoding.compression.as_ref().unwrap()
1437 )
1438 .into(),
1439 ));
1440 }
1441 };
1442 Some(MiniBlockSchedulerDictionary {
1443 dictionary_decompressor,
1444 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1445 dictionary_data_alignment,
1446 num_dictionary_items,
1447 })
1448 } else {
1449 None
1450 };
1451
1452 Ok(Self {
1453 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1454 rep_decompressor,
1455 def_decompressor,
1456 value_decompressor: value_decompressor.into(),
1457 repetition_index_depth: layout.repetition_index_depth as u16,
1458 num_buffers: layout.num_buffers,
1459 priority,
1460 items_in_page,
1461 dictionary,
1462 def_meaning: def_meaning.into(),
1463 page_meta: None,
1464 has_large_chunk: layout.has_large_chunk,
1465 })
1466 }
1467
1468 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1469 let page_meta = self.page_meta.as_ref().unwrap();
1470 chunk_indices
1471 .iter()
1472 .map(|&chunk_idx| {
1473 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1474 let bytes_start = chunk_meta.offset_bytes;
1475 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1476 LoadedChunk {
1477 byte_range: bytes_start..bytes_end,
1478 items_in_chunk: chunk_meta.num_values,
1479 chunk_idx,
1480 data: LanceBuffer::empty(),
1481 }
1482 })
1483 .collect()
1484 }
1485}
1486
1487#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1488enum PreambleAction {
1489 Take,
1490 Skip,
1491 Absent,
1492}
1493
1494#[derive(Clone, Debug, PartialEq, Eq)]
1529struct ChunkInstructions {
1530 chunk_idx: usize,
1532 preamble: PreambleAction,
1538 rows_to_skip: u64,
1542 rows_to_take: u64,
1545 take_trailer: bool,
1552}
1553
1554#[derive(Debug, PartialEq, Eq)]
1572struct ChunkDrainInstructions {
1573 chunk_instructions: ChunkInstructions,
1574 rows_to_skip: u64,
1575 rows_to_take: u64,
1576 preamble_action: PreambleAction,
1577}
1578
1579impl ChunkInstructions {
1580 fn schedule_instructions(
1586 rep_index: &MiniBlockRepIndex,
1587 user_ranges: &[Range<u64>],
1588 ) -> Vec<Self> {
1589 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1593
1594 for user_range in user_ranges {
1595 let mut rows_needed = user_range.end - user_range.start;
1596 let mut need_preamble = false;
1597
1598 let mut block_index = match rep_index
1601 .blocks
1602 .binary_search_by_key(&user_range.start, |block| block.first_row)
1603 {
1604 Ok(idx) => {
1605 let mut idx = idx;
1608 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1609 idx -= 1;
1610 }
1611 idx
1612 }
1613 Err(idx) => idx - 1,
1615 };
1616
1617 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1618
1619 while rows_needed > 0 || need_preamble {
1620 if block_index >= rep_index.blocks.len() {
1622 log::warn!(
1623 "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1624 );
1625 break;
1626 }
1627
1628 let chunk = &rep_index.blocks[block_index];
1629 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1630
1631 if rows_avail == 0 && to_skip == 0 {
1635 if chunk.has_preamble && need_preamble {
1637 chunk_instructions.push(Self {
1638 chunk_idx: block_index,
1639 preamble: PreambleAction::Take,
1640 rows_to_skip: 0,
1641 rows_to_take: 0,
1642 take_trailer: chunk.has_trailer,
1646 });
1647 if chunk.starts_including_trailer > 0
1651 || block_index == rep_index.blocks.len() - 1
1652 {
1653 need_preamble = false;
1654 }
1655 }
1656 block_index += 1;
1658 continue;
1659 }
1660
1661 if rows_avail == 0 && to_skip > 0 {
1665 to_skip -= chunk.starts_including_trailer;
1668 block_index += 1;
1669 continue;
1670 }
1671
1672 let rows_to_take = rows_avail.min(rows_needed);
1673 rows_needed -= rows_to_take;
1674
1675 let mut take_trailer = false;
1676 let preamble = if chunk.has_preamble {
1677 if need_preamble {
1678 PreambleAction::Take
1679 } else {
1680 PreambleAction::Skip
1681 }
1682 } else {
1683 PreambleAction::Absent
1684 };
1685
1686 if rows_to_take == rows_avail && chunk.has_trailer {
1688 take_trailer = true;
1689 need_preamble = true;
1690 } else {
1691 need_preamble = false;
1692 };
1693
1694 chunk_instructions.push(Self {
1695 preamble,
1696 chunk_idx: block_index,
1697 rows_to_skip: to_skip,
1698 rows_to_take,
1699 take_trailer,
1700 });
1701
1702 to_skip = 0;
1703 block_index += 1;
1704 }
1705 }
1706
1707 if user_ranges.len() > 1 {
1711 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1713 let mut instructions_iter = chunk_instructions.into_iter();
1714 merged_instructions.push(instructions_iter.next().unwrap());
1715 for instruction in instructions_iter {
1716 let last = merged_instructions.last_mut().unwrap();
1717 if last.chunk_idx == instruction.chunk_idx
1718 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1719 {
1720 last.rows_to_take += instruction.rows_to_take;
1721 last.take_trailer |= instruction.take_trailer;
1722 } else {
1723 merged_instructions.push(instruction);
1724 }
1725 }
1726 merged_instructions
1727 } else {
1728 chunk_instructions
1729 }
1730 }
1731
1732 fn drain_from_instruction(
1733 &self,
1734 rows_desired: &mut u64,
1735 need_preamble: &mut bool,
1736 skip_in_chunk: &mut u64,
1737 ) -> (ChunkDrainInstructions, bool) {
1738 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1740 let rows_avail = self.rows_to_take - *skip_in_chunk;
1741 let has_preamble = self.preamble != PreambleAction::Absent;
1742 let preamble_action = match (*need_preamble, has_preamble) {
1743 (true, true) => PreambleAction::Take,
1744 (true, false) => panic!("Need preamble but there isn't one"),
1745 (false, true) => PreambleAction::Skip,
1746 (false, false) => PreambleAction::Absent,
1747 };
1748
1749 let rows_taking = if *rows_desired >= rows_avail {
1752 *need_preamble = self.take_trailer;
1760 rows_avail
1761 } else {
1762 *need_preamble = false;
1765 *rows_desired
1766 };
1767 let rows_skipped = *skip_in_chunk;
1768
1769 let consumed_chunk = if *rows_desired >= rows_avail {
1771 *rows_desired -= rows_avail;
1772 *skip_in_chunk = 0;
1773 true
1774 } else {
1775 *skip_in_chunk += *rows_desired;
1776 *rows_desired = 0;
1777 false
1778 };
1779
1780 (
1781 ChunkDrainInstructions {
1782 chunk_instructions: self.clone(),
1783 rows_to_skip: rows_skipped,
1784 rows_to_take: rows_taking,
1785 preamble_action,
1786 },
1787 consumed_chunk,
1788 )
1789 }
1790}
1791
1792enum Words {
1793 U16(ScalarBuffer<u16>),
1794 U32(ScalarBuffer<u32>),
1795}
1796
1797struct WordsIter<'a> {
1798 iter: Box<dyn Iterator<Item = u32> + 'a>,
1799}
1800
1801impl Words {
1802 pub fn len(&self) -> usize {
1803 match self {
1804 Self::U16(b) => b.len(),
1805 Self::U32(b) => b.len(),
1806 }
1807 }
1808
1809 pub fn iter(&self) -> WordsIter<'_> {
1810 match self {
1811 Self::U16(buf) => WordsIter {
1812 iter: Box::new(buf.iter().map(|&x| x as u32)),
1813 },
1814 Self::U32(buf) => WordsIter {
1815 iter: Box::new(buf.iter().copied()),
1816 },
1817 }
1818 }
1819
1820 pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1821 let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1822 assert_eq!(bytes.len() % bytes_per_value, 0);
1823 let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1824 if has_large_chunk {
1825 Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1826 } else {
1827 Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1828 }
1829 }
1830}
1831
1832impl<'a> Iterator for WordsIter<'a> {
1833 type Item = u32;
1834
1835 fn next(&mut self) -> Option<Self::Item> {
1836 self.iter.next()
1837 }
1838}
1839
1840impl StructuralPageScheduler for MiniBlockScheduler {
1841 fn initialize<'a>(
1842 &'a mut self,
1843 io: &Arc<dyn EncodingsIo>,
1844 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1845 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1849 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1850 let mut bufs_needed = 1;
1851 if self.dictionary.is_some() {
1852 bufs_needed += 1;
1853 }
1854 if self.repetition_index_depth > 0 {
1855 bufs_needed += 1;
1856 }
1857 let mut required_ranges = Vec::with_capacity(bufs_needed);
1858 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1859 if let Some(ref dictionary) = self.dictionary {
1860 required_ranges.push(
1861 dictionary.dictionary_buf_position_and_size.0
1862 ..dictionary.dictionary_buf_position_and_size.0
1863 + dictionary.dictionary_buf_position_and_size.1,
1864 );
1865 }
1866 if self.repetition_index_depth > 0 {
1867 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1868 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1869 }
1870 let io_req = io.submit_request(required_ranges, 0);
1871
1872 async move {
1873 let mut buffers = io_req.await?.into_iter().fuse();
1874 let meta_bytes = buffers.next().unwrap();
1875 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1876 let rep_index_bytes = buffers.next();
1877
1878 let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1880 let mut chunk_meta = Vec::with_capacity(words.len());
1881
1882 let mut rows_counter = 0;
1883 let mut offset_bytes = value_buf_position;
1884 for (word_idx, word) in words.iter().enumerate() {
1885 let log_num_values = word & 0x0F;
1886 let divided_bytes = word >> 4;
1887 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1888 debug_assert!(num_bytes > 0);
1889 let num_values = if word_idx < words.len() - 1 {
1890 debug_assert!(log_num_values > 0);
1891 1 << log_num_values
1892 } else {
1893 debug_assert!(
1894 log_num_values == 0
1895 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1896 );
1897 self.items_in_page - rows_counter
1898 };
1899 rows_counter += num_values;
1900
1901 chunk_meta.push(ChunkMeta {
1902 num_values,
1903 chunk_size_bytes: num_bytes as u64,
1904 offset_bytes,
1905 });
1906 offset_bytes += num_bytes as u64;
1907 }
1908
1909 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1911 assert!(rep_index_data.len() % 8 == 0);
1912 let stride = self.repetition_index_depth as usize + 1;
1913 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1914 } else {
1915 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1916 };
1917
1918 let mut page_meta = MiniBlockCacheableState {
1919 chunk_meta,
1920 rep_index,
1921 dictionary: None,
1922 };
1923
1924 if let Some(ref mut dictionary) = self.dictionary {
1926 let dictionary_data = dictionary_bytes.unwrap();
1927 page_meta.dictionary =
1928 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1929 LanceBuffer::from_bytes(
1930 dictionary_data,
1931 dictionary.dictionary_data_alignment,
1932 ),
1933 dictionary.num_dictionary_items,
1934 )?));
1935 };
1936 let page_meta = Arc::new(page_meta);
1937 self.page_meta = Some(page_meta.clone());
1938 Ok(page_meta as Arc<dyn CachedPageData>)
1939 }
1940 .boxed()
1941 }
1942
1943 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1944 self.page_meta = Some(
1945 data.clone()
1946 .as_arc_any()
1947 .downcast::<MiniBlockCacheableState>()
1948 .unwrap(),
1949 );
1950 }
1951
1952 fn schedule_ranges(
1953 &self,
1954 ranges: &[Range<u64>],
1955 io: &Arc<dyn EncodingsIo>,
1956 ) -> Result<Vec<PageLoadTask>> {
1957 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1958
1959 let page_meta = self.page_meta.as_ref().unwrap();
1960
1961 let chunk_instructions =
1962 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1963
1964 debug_assert_eq!(
1965 num_rows,
1966 chunk_instructions
1967 .iter()
1968 .map(|ci| ci.rows_to_take)
1969 .sum::<u64>()
1970 );
1971
1972 let chunks_needed = chunk_instructions
1973 .iter()
1974 .map(|ci| ci.chunk_idx)
1975 .unique()
1976 .collect::<Vec<_>>();
1977
1978 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1979 let chunk_ranges = loaded_chunks
1980 .iter()
1981 .map(|c| c.byte_range.clone())
1982 .collect::<Vec<_>>();
1983 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1984
1985 let rep_decompressor = self.rep_decompressor.clone();
1986 let def_decompressor = self.def_decompressor.clone();
1987 let value_decompressor = self.value_decompressor.clone();
1988 let num_buffers = self.num_buffers;
1989 let has_large_chunk = self.has_large_chunk;
1990 let dictionary = page_meta
1991 .dictionary
1992 .as_ref()
1993 .map(|dictionary| dictionary.clone());
1994 let def_meaning = self.def_meaning.clone();
1995
1996 let res = async move {
1997 let loaded_chunk_data = loaded_chunk_data.await?;
1998 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1999 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
2000 }
2001
2002 Ok(Box::new(MiniBlockDecoder {
2003 rep_decompressor,
2004 def_decompressor,
2005 value_decompressor,
2006 def_meaning,
2007 loaded_chunks: VecDeque::from_iter(loaded_chunks),
2008 instructions: VecDeque::from(chunk_instructions),
2009 offset_in_current_chunk: 0,
2010 dictionary,
2011 num_rows,
2012 num_buffers,
2013 has_large_chunk,
2014 }) as Box<dyn StructuralPageDecoder>)
2015 }
2016 .boxed();
2017 let page_load_task = PageLoadTask {
2018 decoder_fut: res,
2019 num_rows,
2020 };
2021 Ok(vec![page_load_task])
2022 }
2023}
2024
2025#[derive(Debug, Clone, Copy)]
2026struct FullZipRepIndexDetails {
2027 buf_position: u64,
2028 bytes_per_value: u64, }
2030
2031#[derive(Debug)]
2032enum PerValueDecompressor {
2033 Fixed(Arc<dyn FixedPerValueDecompressor>),
2034 Variable(Arc<dyn VariablePerValueDecompressor>),
2035}
2036
2037#[derive(Debug)]
2038struct FullZipDecodeDetails {
2039 value_decompressor: PerValueDecompressor,
2040 def_meaning: Arc<[DefinitionInterpretation]>,
2041 ctrl_word_parser: ControlWordParser,
2042 max_rep: u16,
2043 max_visible_def: u16,
2044}
2045
2046#[derive(Debug, Clone)]
2058enum FullZipReadSource {
2059 Remote(Arc<dyn EncodingsIo>),
2061 PrefetchedPage { base_offset: u64, data: LanceBuffer },
2063}
2064
2065impl FullZipReadSource {
2066 fn fetch(
2070 &self,
2071 ranges: &[Range<u64>],
2072 priority: u64,
2073 ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2074 match self {
2075 Self::Remote(io) => {
2076 let io = io.clone();
2077 let ranges = ranges.to_vec();
2078 async move {
2079 let data = io.submit_request(ranges, priority).await?;
2080 Ok(data
2081 .into_iter()
2082 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2083 .collect::<VecDeque<_>>())
2084 }
2085 .boxed()
2086 }
2087 Self::PrefetchedPage { base_offset, data } => {
2088 let base_offset = *base_offset;
2089 let data = data.clone();
2090 let page_end = base_offset + data.len() as u64;
2091 std::future::ready(
2092 ranges
2093 .iter()
2094 .map(|range| {
2095 if range.start > range.end
2096 || range.start < base_offset
2097 || range.end > page_end
2098 {
2099 return Err(Error::internal(format!(
2100 "Requested range {:?} is outside page range {}..{}",
2101 range, base_offset, page_end
2102 )));
2103 }
2104 let start = (range.start - base_offset) as usize;
2105 let len = (range.end - range.start) as usize;
2106 Ok(data.slice_with_length(start, len))
2107 })
2108 .collect::<Result<VecDeque<_>>>(),
2109 )
2110 .boxed()
2111 }
2112 }
2113 }
2114}
2115
2116#[derive(Debug)]
2124pub struct FullZipScheduler {
2125 data_buf_position: u64,
2126 data_buf_size: u64,
2127 rep_index: Option<FullZipRepIndexDetails>,
2128 priority: u64,
2129 rows_in_page: u64,
2130 bits_per_offset: u8,
2131 details: Arc<FullZipDecodeDetails>,
2132 cached_state: Option<Arc<FullZipCacheableState>>,
2134 enable_cache: bool,
2136}
2137
2138impl FullZipScheduler {
2139 fn try_new(
2140 buffer_offsets_and_sizes: &[(u64, u64)],
2141 priority: u64,
2142 rows_in_page: u64,
2143 layout: &pb21::FullZipLayout,
2144 decompressors: &dyn DecompressionStrategy,
2145 ) -> Result<Self> {
2146 let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2147 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2148 let num_reps = rows_in_page + 1;
2149 let bytes_per_rep = len / num_reps;
2150 debug_assert_eq!(len % num_reps, 0);
2151 debug_assert!(
2152 bytes_per_rep == 1
2153 || bytes_per_rep == 2
2154 || bytes_per_rep == 4
2155 || bytes_per_rep == 8
2156 );
2157 FullZipRepIndexDetails {
2158 buf_position: *pos,
2159 bytes_per_value: bytes_per_rep,
2160 }
2161 });
2162
2163 let value_decompressor = match layout.details {
2164 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2165 let decompressor = decompressors.create_fixed_per_value_decompressor(
2166 layout.value_compression.as_ref().unwrap(),
2167 )?;
2168 PerValueDecompressor::Fixed(decompressor.into())
2169 }
2170 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2171 let decompressor = decompressors.create_variable_per_value_decompressor(
2172 layout.value_compression.as_ref().unwrap(),
2173 )?;
2174 PerValueDecompressor::Variable(decompressor.into())
2175 }
2176 None => {
2177 panic!("Full-zip layout must have a `details` field");
2178 }
2179 };
2180 let ctrl_word_parser = ControlWordParser::new(
2181 layout.bits_rep.try_into().unwrap(),
2182 layout.bits_def.try_into().unwrap(),
2183 );
2184 let def_meaning = layout
2185 .layers
2186 .iter()
2187 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2188 .collect::<Vec<_>>();
2189
2190 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2191 let max_visible_def = def_meaning
2192 .iter()
2193 .filter(|d| !d.is_list())
2194 .map(|d| d.num_def_levels())
2195 .sum();
2196
2197 let bits_per_offset = match layout.details {
2198 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2199 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2200 bits_per_offset as u8
2201 }
2202 None => panic!("Full-zip layout must have a `details` field"),
2203 };
2204
2205 let details = Arc::new(FullZipDecodeDetails {
2206 value_decompressor,
2207 def_meaning: def_meaning.into(),
2208 ctrl_word_parser,
2209 max_rep,
2210 max_visible_def,
2211 });
2212 Ok(Self {
2213 data_buf_position,
2214 data_buf_size,
2215 rep_index,
2216 details,
2217 priority,
2218 rows_in_page,
2219 bits_per_offset,
2220 cached_state: None,
2221 enable_cache: false,
2222 })
2223 }
2224
2225 fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2226 if ranges.is_empty() {
2227 return false;
2228 }
2229 let mut expected_start = 0;
2230 for range in ranges {
2231 if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2232 {
2233 return false;
2234 }
2235 expected_start = range.end;
2236 }
2237 expected_start == rows_in_page
2238 }
2239
2240 fn create_page_load_task(
2241 io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
2242 num_rows: u64,
2243 details: Arc<FullZipDecodeDetails>,
2244 bits_per_offset: u8,
2245 ) -> PageLoadTask {
2246 let load_task = async move {
2247 let buffers = io_future.await?;
2248 let data = buffers
2249 .into_iter()
2250 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2251 .collect::<VecDeque<_>>();
2252 Self::create_decoder(details, data, num_rows, bits_per_offset)
2253 }
2254 .boxed();
2255 PageLoadTask {
2256 decoder_fut: load_task,
2257 num_rows,
2258 }
2259 }
2260
2261 fn create_decoder(
2263 details: Arc<FullZipDecodeDetails>,
2264 data: VecDeque<LanceBuffer>,
2265 num_rows: u64,
2266 bits_per_offset: u8,
2267 ) -> Result<Box<dyn StructuralPageDecoder>> {
2268 match &details.value_decompressor {
2269 PerValueDecompressor::Fixed(decompressor) => {
2270 let bits_per_value = decompressor.bits_per_value();
2271 if bits_per_value % 8 != 0 {
2272 return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2273 }
2274 let bytes_per_value = bits_per_value / 8;
2275 let total_bytes_per_value =
2276 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2277 if total_bytes_per_value == 0 {
2278 return Err(lance_core::Error::internal(
2279 "Invalid encoding: per-row byte width must be greater than 0",
2280 ));
2281 }
2282 Ok(Box::new(FixedFullZipDecoder {
2283 details,
2284 data,
2285 num_rows,
2286 offset_in_current: 0,
2287 bytes_per_value: bytes_per_value as usize,
2288 total_bytes_per_value,
2289 }) as Box<dyn StructuralPageDecoder>)
2290 }
2291 PerValueDecompressor::Variable(_decompressor) => {
2292 Ok(Box::new(VariableFullZipDecoder::new(
2293 details,
2294 data,
2295 num_rows,
2296 bits_per_offset,
2297 bits_per_offset,
2298 )))
2299 }
2300 }
2301 }
2302
2303 fn extract_byte_ranges_from_pairs(
2306 buffer: LanceBuffer,
2307 bytes_per_value: u64,
2308 data_buf_position: u64,
2309 ) -> Vec<Range<u64>> {
2310 ByteUnpacker::new(buffer, bytes_per_value as usize)
2311 .chunks(2)
2312 .into_iter()
2313 .map(|mut c| {
2314 let start = c.next().unwrap() + data_buf_position;
2315 let end = c.next().unwrap() + data_buf_position;
2316 start..end
2317 })
2318 .collect::<Vec<_>>()
2319 }
2320
2321 fn extract_byte_ranges_from_cached(
2324 buffer: &LanceBuffer,
2325 ranges: &[Range<u64>],
2326 bytes_per_value: u64,
2327 data_buf_position: u64,
2328 ) -> Vec<Range<u64>> {
2329 ranges
2330 .iter()
2331 .map(|r| {
2332 let start_offset = (r.start * bytes_per_value) as usize;
2333 let end_offset = (r.end * bytes_per_value) as usize;
2334
2335 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2336 let start_val =
2337 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2338 .next()
2339 .unwrap();
2340
2341 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2342 let end_val =
2343 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2344 .next()
2345 .unwrap();
2346
2347 (data_buf_position + start_val)..(data_buf_position + end_val)
2348 })
2349 .collect()
2350 }
2351
2352 fn compute_rep_index_ranges(
2354 ranges: &[Range<u64>],
2355 rep_index: &FullZipRepIndexDetails,
2356 ) -> Vec<Range<u64>> {
2357 ranges
2358 .iter()
2359 .flat_map(|r| {
2360 let first_val_start =
2361 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2362 let first_val_end = first_val_start + rep_index.bytes_per_value;
2363 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2364 let last_val_end = last_val_start + rep_index.bytes_per_value;
2365 [first_val_start..first_val_end, last_val_start..last_val_end]
2366 })
2367 .collect()
2368 }
2369
2370 fn schedule_ranges_rep(
2372 &self,
2373 ranges: &[Range<u64>],
2374 io: &Arc<dyn EncodingsIo>,
2375 rep_index: FullZipRepIndexDetails,
2376 ) -> Result<Vec<PageLoadTask>> {
2377 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2378 let data_buf_position = self.data_buf_position;
2379 let priority = self.priority;
2380 let details = self.details.clone();
2381 let bits_per_offset = self.bits_per_offset;
2382
2383 if Self::covers_entire_page(ranges, self.rows_in_page) {
2384 let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2385 let page_data = io.submit_single(full_range.clone(), priority);
2386 let load_task = async move {
2387 let page_data = page_data.await?;
2388 let source = FullZipReadSource::PrefetchedPage {
2389 base_offset: full_range.start,
2390 data: LanceBuffer::from_bytes(page_data, 1),
2391 };
2392 let read_ranges = vec![full_range];
2393 let data = source.fetch(&read_ranges, priority).await?;
2394 Self::create_decoder(details, data, num_rows, bits_per_offset)
2395 }
2396 .boxed();
2397 let page_load_task = PageLoadTask {
2398 decoder_fut: load_task,
2399 num_rows,
2400 };
2401 return Ok(vec![page_load_task]);
2402 }
2403
2404 if let Some(cached_state) = &self.cached_state {
2405 let byte_ranges = Self::extract_byte_ranges_from_cached(
2406 &cached_state.rep_index_buffer,
2407 ranges,
2408 rep_index.bytes_per_value,
2409 data_buf_position,
2410 );
2411 let io_future = io.submit_request(byte_ranges, priority);
2412 let page_load_task =
2413 Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
2414 return Ok(vec![page_load_task]);
2415 }
2416
2417 let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2418 let rep_data = io.submit_request(rep_ranges, priority);
2419 let io_clone = io.clone();
2420 let load_task = async move {
2421 let rep_data = rep_data.await?;
2422 let rep_buffer = LanceBuffer::concat(
2423 &rep_data
2424 .into_iter()
2425 .map(|d| LanceBuffer::from_bytes(d, 1))
2426 .collect::<Vec<_>>(),
2427 );
2428 let byte_ranges = Self::extract_byte_ranges_from_pairs(
2429 rep_buffer,
2430 rep_index.bytes_per_value,
2431 data_buf_position,
2432 );
2433 let source = FullZipReadSource::Remote(io_clone);
2434 let data = source.fetch(&byte_ranges, priority).await?;
2435 Self::create_decoder(details, data, num_rows, bits_per_offset)
2436 }
2437 .boxed();
2438 let page_load_task = PageLoadTask {
2439 decoder_fut: load_task,
2440 num_rows,
2441 };
2442 Ok(vec![page_load_task])
2443 }
2444
2445 fn schedule_ranges_simple(
2449 &self,
2450 ranges: &[Range<u64>],
2451 io: &Arc<dyn EncodingsIo>,
2452 ) -> Result<Vec<PageLoadTask>> {
2453 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2455
2456 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2457 unreachable!()
2458 };
2459
2460 let bits_per_value = decompressor.bits_per_value();
2462 assert_eq!(bits_per_value % 8, 0);
2463 let bytes_per_value = bits_per_value / 8;
2464 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2465 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2466 let byte_ranges = ranges
2467 .iter()
2468 .map(|r| {
2469 debug_assert!(r.end <= self.rows_in_page);
2470 let start = self.data_buf_position + r.start * total_bytes_per_value;
2471 let end = self.data_buf_position + r.end * total_bytes_per_value;
2472 start..end
2473 })
2474 .collect::<Vec<_>>();
2475
2476 let io_future = io.submit_request(byte_ranges, self.priority);
2477 let page_load_task = Self::create_page_load_task(
2478 io_future,
2479 num_rows,
2480 self.details.clone(),
2481 self.bits_per_offset,
2482 );
2483 Ok(vec![page_load_task])
2484 }
2485}
2486
2487#[derive(Debug)]
2489struct FullZipCacheableState {
2490 rep_index_buffer: LanceBuffer,
2492}
2493
2494impl DeepSizeOf for FullZipCacheableState {
2495 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2496 self.rep_index_buffer.len()
2497 }
2498}
2499
2500impl CachedPageData for FullZipCacheableState {
2501 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2502 self
2503 }
2504}
2505
2506impl StructuralPageScheduler for FullZipScheduler {
2507 fn initialize<'a>(
2508 &'a mut self,
2509 io: &Arc<dyn EncodingsIo>,
2510 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2511 if self.enable_cache
2512 && let Some(rep_index) = self.rep_index
2513 {
2514 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2515 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2516 let io_clone = io.clone();
2517 return async move {
2518 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2519 let state = Arc::new(FullZipCacheableState {
2520 rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2521 });
2522 self.cached_state = Some(state.clone());
2523 Ok(state as Arc<dyn CachedPageData>)
2524 }
2525 .boxed();
2526 }
2527 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2528 }
2529
2530 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2534 if let Ok(cached_state) = cache
2536 .clone()
2537 .as_arc_any()
2538 .downcast::<FullZipCacheableState>()
2539 {
2540 self.cached_state = Some(cached_state);
2542 }
2543 }
2544
2545 fn schedule_ranges(
2546 &self,
2547 ranges: &[Range<u64>],
2548 io: &Arc<dyn EncodingsIo>,
2549 ) -> Result<Vec<PageLoadTask>> {
2550 if let Some(rep_index) = self.rep_index {
2551 self.schedule_ranges_rep(ranges, io, rep_index)
2552 } else {
2553 self.schedule_ranges_simple(ranges, io)
2554 }
2555 }
2556}
2557
2558#[derive(Debug)]
2566struct FixedFullZipDecoder {
2567 details: Arc<FullZipDecodeDetails>,
2568 data: VecDeque<LanceBuffer>,
2569 offset_in_current: usize,
2570 bytes_per_value: usize,
2571 total_bytes_per_value: usize,
2572 num_rows: u64,
2573}
2574
2575impl FixedFullZipDecoder {
2576 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2577 debug_assert!(num_rows > 0);
2578 let cur_buf = self.data.front_mut().unwrap();
2579 let start = self.offset_in_current;
2580 if self.details.ctrl_word_parser.has_rep() {
2581 let mut rows_started = 0;
2584 let mut num_items = 0;
2587 while self.offset_in_current < cur_buf.len() {
2588 let control = self.details.ctrl_word_parser.parse_desc(
2589 &cur_buf[self.offset_in_current..],
2590 self.details.max_rep,
2591 self.details.max_visible_def,
2592 );
2593 if control.is_new_row {
2594 if rows_started == num_rows {
2595 break;
2596 }
2597 rows_started += 1;
2598 }
2599 num_items += 1;
2600 if control.is_visible {
2601 self.offset_in_current += self.total_bytes_per_value;
2602 } else {
2603 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2604 }
2605 }
2606
2607 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2608 if self.offset_in_current == cur_buf.len() {
2609 self.data.pop_front();
2610 self.offset_in_current = 0;
2611 }
2612
2613 FullZipDecodeTaskItem {
2614 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2615 data: task_slice,
2616 bits_per_value: self.bytes_per_value as u64 * 8,
2617 num_values: num_items,
2618 block_info: BlockInfo::new(),
2619 }),
2620 rows_in_buf: rows_started,
2621 }
2622 } else {
2623 let cur_buf = self.data.front_mut().unwrap();
2626 let bytes_avail = cur_buf.len() - self.offset_in_current;
2627 let offset_in_cur = self.offset_in_current;
2628
2629 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2630 let mut rows_taken = num_rows;
2631 let task_slice = if bytes_needed >= bytes_avail {
2632 self.offset_in_current = 0;
2633 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2634 self.data
2635 .pop_front()
2636 .unwrap()
2637 .slice_with_length(offset_in_cur, bytes_avail)
2638 } else {
2639 self.offset_in_current += bytes_needed;
2640 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2641 };
2642 FullZipDecodeTaskItem {
2643 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2644 data: task_slice,
2645 bits_per_value: self.bytes_per_value as u64 * 8,
2646 num_values: rows_taken,
2647 block_info: BlockInfo::new(),
2648 }),
2649 rows_in_buf: rows_taken,
2650 }
2651 }
2652 }
2653}
2654
2655impl StructuralPageDecoder for FixedFullZipDecoder {
2656 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2657 let mut task_data = Vec::with_capacity(self.data.len());
2658 let mut remaining = num_rows;
2659 while remaining > 0 {
2660 let task_item = self.slice_next_task(remaining);
2661 remaining -= task_item.rows_in_buf;
2662 task_data.push(task_item);
2663 }
2664 Ok(Box::new(FixedFullZipDecodeTask {
2665 details: self.details.clone(),
2666 data: task_data,
2667 bytes_per_value: self.bytes_per_value,
2668 num_rows: num_rows as usize,
2669 }))
2670 }
2671
2672 fn num_rows(&self) -> u64 {
2673 self.num_rows
2674 }
2675}
2676
2677#[derive(Debug)]
2682struct VariableFullZipDecoder {
2683 details: Arc<FullZipDecodeDetails>,
2684 decompressor: Arc<dyn VariablePerValueDecompressor>,
2685 data: LanceBuffer,
2686 offsets: LanceBuffer,
2687 rep: ScalarBuffer<u16>,
2688 def: ScalarBuffer<u16>,
2689 repdef_starts: Vec<usize>,
2690 data_starts: Vec<usize>,
2691 offset_starts: Vec<usize>,
2692 visible_item_counts: Vec<u64>,
2693 bits_per_offset: u8,
2694 current_idx: usize,
2695 num_rows: u64,
2696}
2697
2698impl VariableFullZipDecoder {
2699 fn new(
2700 details: Arc<FullZipDecodeDetails>,
2701 data: VecDeque<LanceBuffer>,
2702 num_rows: u64,
2703 in_bits_per_length: u8,
2704 out_bits_per_offset: u8,
2705 ) -> Self {
2706 let decompressor = match details.value_decompressor {
2707 PerValueDecompressor::Variable(ref d) => d.clone(),
2708 _ => unreachable!(),
2709 };
2710
2711 assert_eq!(in_bits_per_length % 8, 0);
2712 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2713
2714 let mut decoder = Self {
2715 details,
2716 decompressor,
2717 data: LanceBuffer::empty(),
2718 offsets: LanceBuffer::empty(),
2719 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2720 def: LanceBuffer::empty().borrow_to_typed_slice(),
2721 bits_per_offset: out_bits_per_offset,
2722 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2723 data_starts: Vec::with_capacity(num_rows as usize + 1),
2724 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2725 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2726 current_idx: 0,
2727 num_rows,
2728 };
2729
2730 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2751
2752 decoder
2753 }
2754
2755 fn slice_batch_data_and_rebase_offsets_typed<T>(
2756 data: &LanceBuffer,
2757 offsets: &LanceBuffer,
2758 ) -> Result<(LanceBuffer, LanceBuffer)>
2759 where
2760 T: arrow_buffer::ArrowNativeType
2761 + Copy
2762 + PartialOrd
2763 + std::ops::Sub<Output = T>
2764 + std::fmt::Display
2765 + TryInto<usize>,
2766 {
2767 let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2768 let offsets_slice = offsets_slice.as_ref();
2769 if offsets_slice.is_empty() {
2770 return Err(Error::internal(
2771 "Variable offsets cannot be empty".to_string(),
2772 ));
2773 }
2774
2775 let base = offsets_slice[0];
2776 let end = *offsets_slice.last().unwrap();
2777 if end < base {
2778 return Err(Error::internal(format!(
2779 "Invalid variable offsets: end ({end}) is less than base ({base})"
2780 )));
2781 }
2782
2783 let data_start = base.try_into().map_err(|_| {
2784 Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2785 })?;
2786 let data_end = end.try_into().map_err(|_| {
2787 Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2788 })?;
2789 if data_end > data.len() {
2790 return Err(Error::internal(format!(
2791 "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2792 data.len()
2793 )));
2794 }
2795
2796 let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2797 for &offset in offsets_slice {
2798 if offset < base {
2799 return Err(Error::internal(format!(
2800 "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2801 )));
2802 }
2803 rebased_offsets.push(offset - base);
2804 }
2805
2806 let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2807 let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2809 let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2810 Ok((sliced_data, rebased_offsets))
2811 }
2812
2813 fn slice_batch_data_and_rebase_offsets(
2814 data: &LanceBuffer,
2815 offsets: &LanceBuffer,
2816 bits_per_offset: u8,
2817 ) -> Result<(LanceBuffer, LanceBuffer)> {
2818 match bits_per_offset {
2819 32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2820 64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2821 _ => Err(Error::internal(format!(
2822 "Unsupported bits_per_offset={bits_per_offset}"
2823 ))),
2824 }
2825 }
2826
2827 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2828 match bits_per_offset {
2829 8 => *data.get_unchecked(0) as u64,
2830 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2831 32 => u32::from_le_bytes([
2832 *data.get_unchecked(0),
2833 *data.get_unchecked(1),
2834 *data.get_unchecked(2),
2835 *data.get_unchecked(3),
2836 ]) as u64,
2837 64 => u64::from_le_bytes([
2838 *data.get_unchecked(0),
2839 *data.get_unchecked(1),
2840 *data.get_unchecked(2),
2841 *data.get_unchecked(3),
2842 *data.get_unchecked(4),
2843 *data.get_unchecked(5),
2844 *data.get_unchecked(6),
2845 *data.get_unchecked(7),
2846 ]),
2847 _ => unreachable!(),
2848 }
2849 }
2850
2851 fn unzip(
2852 &mut self,
2853 data: VecDeque<LanceBuffer>,
2854 in_bits_per_length: u8,
2855 out_bits_per_offset: u8,
2856 num_rows: u64,
2857 ) {
2858 let mut rep = Vec::with_capacity(num_rows as usize);
2860 let mut def = Vec::with_capacity(num_rows as usize);
2861 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2862
2863 let bytes_per_offset = out_bits_per_offset as usize / 8;
2866 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2867 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2868
2869 let bytes_per_length = in_bits_per_length as usize / 8;
2870 let bytes_lengths = bytes_per_length * num_rows as usize;
2871
2872 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2873 let mut unzipped_data =
2876 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2877
2878 let mut current_offset = 0_u64;
2879 let mut visible_item_count = 0_u64;
2880 for databuf in data.into_iter() {
2881 let mut databuf = databuf.as_ref();
2882 while !databuf.is_empty() {
2883 let data_start = unzipped_data.len();
2884 let offset_start = offsets_data.len();
2885 let repdef_start = rep.len().max(def.len());
2888 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2890 databuf,
2891 self.details.max_rep,
2892 self.details.max_visible_def,
2893 );
2894 self.details
2895 .ctrl_word_parser
2896 .parse(databuf, &mut rep, &mut def);
2897 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2898
2899 if ctrl_desc.is_new_row {
2900 self.repdef_starts.push(repdef_start);
2901 self.data_starts.push(data_start);
2902 self.offset_starts.push(offset_start);
2903 self.visible_item_counts.push(visible_item_count);
2904 }
2905 if ctrl_desc.is_visible {
2906 visible_item_count += 1;
2907 if ctrl_desc.is_valid_item {
2908 debug_assert!(databuf.len() >= bytes_per_length);
2910 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2911 match out_bits_per_offset {
2912 32 => offsets_data
2913 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2914 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2915 _ => unreachable!(),
2916 };
2917 databuf = &databuf[bytes_per_offset..];
2918 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2919 databuf = &databuf[length as usize..];
2920 current_offset += length;
2921 } else {
2922 match out_bits_per_offset {
2924 32 => offsets_data
2925 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2926 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2927 _ => unreachable!(),
2928 }
2929 }
2930 }
2931 }
2932 }
2933 self.repdef_starts.push(rep.len().max(def.len()));
2934 self.data_starts.push(unzipped_data.len());
2935 self.offset_starts.push(offsets_data.len());
2936 self.visible_item_counts.push(visible_item_count);
2937 match out_bits_per_offset {
2938 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2939 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2940 _ => unreachable!(),
2941 };
2942 self.rep = ScalarBuffer::from(rep);
2943 self.def = ScalarBuffer::from(def);
2944 self.data = LanceBuffer::from(unzipped_data);
2945 self.offsets = LanceBuffer::from(offsets_data);
2946 }
2947}
2948
2949impl StructuralPageDecoder for VariableFullZipDecoder {
2950 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2951 let start = self.current_idx;
2952 let end = start + num_rows as usize;
2953
2954 let offset_start = self.offset_starts[start];
2955 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2956 let offsets = self
2957 .offsets
2958 .slice_with_length(offset_start, offset_end - offset_start);
2959 let (data, offsets) =
2961 Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2962
2963 let repdef_start = self.repdef_starts[start];
2964 let repdef_end = self.repdef_starts[end];
2965 let rep = if self.rep.is_empty() {
2966 self.rep.clone()
2967 } else {
2968 self.rep.slice(repdef_start, repdef_end - repdef_start)
2969 };
2970 let def = if self.def.is_empty() {
2971 self.def.clone()
2972 } else {
2973 self.def.slice(repdef_start, repdef_end - repdef_start)
2974 };
2975
2976 let visible_item_counts_start = self.visible_item_counts[start];
2977 let visible_item_counts_end = self.visible_item_counts[end];
2978 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2979
2980 self.current_idx += num_rows as usize;
2981
2982 Ok(Box::new(VariableFullZipDecodeTask {
2983 details: self.details.clone(),
2984 decompressor: self.decompressor.clone(),
2985 data,
2986 offsets,
2987 bits_per_offset: self.bits_per_offset,
2988 num_visible_items,
2989 rep,
2990 def,
2991 }))
2992 }
2993
2994 fn num_rows(&self) -> u64 {
2995 self.num_rows
2996 }
2997}
2998
2999#[derive(Debug)]
3000struct VariableFullZipDecodeTask {
3001 details: Arc<FullZipDecodeDetails>,
3002 decompressor: Arc<dyn VariablePerValueDecompressor>,
3003 data: LanceBuffer,
3004 offsets: LanceBuffer,
3005 bits_per_offset: u8,
3006 num_visible_items: u64,
3007 rep: ScalarBuffer<u16>,
3008 def: ScalarBuffer<u16>,
3009}
3010
3011impl DecodePageTask for VariableFullZipDecodeTask {
3012 fn decode(self: Box<Self>) -> Result<DecodedPage> {
3013 let block = VariableWidthBlock {
3014 data: self.data,
3015 offsets: self.offsets,
3016 bits_per_offset: self.bits_per_offset,
3017 num_values: self.num_visible_items,
3018 block_info: BlockInfo::new(),
3019 };
3020 let decomopressed = self.decompressor.decompress(block)?;
3021 let rep = if self.rep.is_empty() {
3022 None
3023 } else {
3024 Some(self.rep.to_vec())
3025 };
3026 let def = if self.def.is_empty() {
3027 None
3028 } else {
3029 Some(self.def.to_vec())
3030 };
3031 let unraveler = RepDefUnraveler::new(
3032 rep,
3033 def,
3034 self.details.def_meaning.clone(),
3035 self.num_visible_items,
3036 );
3037 Ok(DecodedPage {
3038 data: decomopressed,
3039 repdef: unraveler,
3040 })
3041 }
3042}
3043
3044#[derive(Debug)]
3045struct FullZipDecodeTaskItem {
3046 data: PerValueDataBlock,
3047 rows_in_buf: u64,
3048}
3049
3050#[derive(Debug)]
3053struct FixedFullZipDecodeTask {
3054 details: Arc<FullZipDecodeDetails>,
3055 data: Vec<FullZipDecodeTaskItem>,
3056 num_rows: usize,
3057 bytes_per_value: usize,
3058}
3059
3060impl DecodePageTask for FixedFullZipDecodeTask {
3061 fn decode(self: Box<Self>) -> Result<DecodedPage> {
3062 let estimated_size_bytes = self
3064 .data
3065 .iter()
3066 .map(|task_item| task_item.data.data_size() as usize)
3067 .sum::<usize>()
3068 * 2;
3069 let mut data_builder =
3070 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
3071
3072 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3073 for task_item in self.data.into_iter() {
3077 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3078 unreachable!()
3079 };
3080 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3081 else {
3082 unreachable!()
3083 };
3084 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3085 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3086 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3087 }
3088
3089 let unraveler = RepDefUnraveler::new(
3090 None,
3091 None,
3092 self.details.def_meaning.clone(),
3093 self.num_rows as u64,
3094 );
3095
3096 Ok(DecodedPage {
3097 data: data_builder.finish(),
3098 repdef: unraveler,
3099 })
3100 } else {
3101 let mut rep = Vec::with_capacity(self.num_rows);
3103 let mut def = Vec::with_capacity(self.num_rows);
3104
3105 for task_item in self.data.into_iter() {
3106 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3107 unreachable!()
3108 };
3109 let mut buf_slice = fixed_data.data.as_ref();
3110 let num_values = fixed_data.num_values as usize;
3111 let mut values = Vec::with_capacity(
3114 fixed_data.data.len()
3115 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3116 );
3117 let mut visible_items = 0;
3118 for _ in 0..num_values {
3119 self.details
3121 .ctrl_word_parser
3122 .parse(buf_slice, &mut rep, &mut def);
3123 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3124
3125 let is_visible = def
3126 .last()
3127 .map(|d| *d <= self.details.max_visible_def)
3128 .unwrap_or(true);
3129 if is_visible {
3130 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3132 buf_slice = &buf_slice[self.bytes_per_value..];
3133 visible_items += 1;
3134 }
3135 }
3136
3137 let values_buf = LanceBuffer::from(values);
3139 let fixed_data = FixedWidthDataBlock {
3140 bits_per_value: self.bytes_per_value as u64 * 8,
3141 block_info: BlockInfo::new(),
3142 data: values_buf,
3143 num_values: visible_items,
3144 };
3145 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3146 else {
3147 unreachable!()
3148 };
3149 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3150 data_builder.append(&decompressed, 0..visible_items);
3151 }
3152
3153 let repetition = if rep.is_empty() { None } else { Some(rep) };
3154 let definition = if def.is_empty() { None } else { Some(def) };
3155
3156 let unraveler = RepDefUnraveler::new(
3157 repetition,
3158 definition,
3159 self.details.def_meaning.clone(),
3160 self.num_rows as u64,
3161 );
3162 let data = data_builder.finish();
3163
3164 Ok(DecodedPage {
3165 data,
3166 repdef: unraveler,
3167 })
3168 }
3169 }
3170}
3171
3172#[derive(Debug)]
3173struct StructuralPrimitiveFieldSchedulingJob<'a> {
3174 scheduler: &'a StructuralPrimitiveFieldScheduler,
3175 ranges: Vec<Range<u64>>,
3176 page_idx: usize,
3177 range_idx: usize,
3178 global_row_offset: u64,
3179}
3180
3181impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3182 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3183 Self {
3184 scheduler,
3185 ranges,
3186 page_idx: 0,
3187 range_idx: 0,
3188 global_row_offset: 0,
3189 }
3190 }
3191}
3192
3193impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3194 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3195 if self.range_idx >= self.ranges.len() {
3196 return Ok(Vec::new());
3197 }
3198 let mut range = self.ranges[self.range_idx].clone();
3200 let priority = range.start;
3201
3202 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3203 trace!(
3204 "Current range is {:?} and current page has {} rows",
3205 range, cur_page.num_rows
3206 );
3207 while cur_page.num_rows + self.global_row_offset <= range.start {
3209 self.global_row_offset += cur_page.num_rows;
3210 self.page_idx += 1;
3211 trace!("Skipping entire page of {} rows", cur_page.num_rows);
3212 cur_page = &self.scheduler.page_schedulers[self.page_idx];
3213 }
3214
3215 let mut ranges_in_page = Vec::new();
3219 while cur_page.num_rows + self.global_row_offset > range.start {
3220 range.start = range.start.max(self.global_row_offset);
3221 let start_in_page = range.start - self.global_row_offset;
3222 let end_in_page = start_in_page + (range.end - range.start);
3223 let end_in_page = end_in_page.min(cur_page.num_rows);
3224 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3225
3226 ranges_in_page.push(start_in_page..end_in_page);
3227 if last_in_range {
3228 self.range_idx += 1;
3229 if self.range_idx == self.ranges.len() {
3230 break;
3231 }
3232 range = self.ranges[self.range_idx].clone();
3233 } else {
3234 break;
3235 }
3236 }
3237
3238 trace!(
3239 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3240 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3241 ranges_in_page.len(),
3242 cur_page.num_rows,
3243 priority,
3244 self.scheduler.column_index,
3245 cur_page.page_index,
3246 );
3247
3248 self.global_row_offset += cur_page.num_rows;
3249 self.page_idx += 1;
3250
3251 let page_decoders = cur_page
3252 .scheduler
3253 .schedule_ranges(&ranges_in_page, context.io())?;
3254
3255 let cur_path = context.current_path();
3256 page_decoders
3257 .into_iter()
3258 .map(|page_load_task| {
3259 let cur_path = cur_path.clone();
3260 let page_decoder = page_load_task.decoder_fut;
3261 let unloaded_page = async move {
3262 let page_decoder = page_decoder.await?;
3263 Ok(LoadedPageShard {
3264 decoder: page_decoder,
3265 path: cur_path,
3266 })
3267 }
3268 .boxed();
3269 Ok(ScheduledScanLine {
3270 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3271 rows_scheduled: page_load_task.num_rows,
3272 })
3273 })
3274 .collect::<Result<Vec<_>>>()
3275 }
3276}
3277
3278#[derive(Debug)]
3279struct PageInfoAndScheduler {
3280 page_index: usize,
3281 num_rows: u64,
3282 scheduler: Box<dyn StructuralPageScheduler>,
3283}
3284
3285#[derive(Debug)]
3290pub struct StructuralPrimitiveFieldScheduler {
3291 page_schedulers: Vec<PageInfoAndScheduler>,
3292 column_index: u32,
3293}
3294
3295impl StructuralPrimitiveFieldScheduler {
3296 pub fn try_new(
3297 column_info: &ColumnInfo,
3298 decompressors: &dyn DecompressionStrategy,
3299 cache_repetition_index: bool,
3300 target_field: &Field,
3301 ) -> Result<Self> {
3302 let page_schedulers = column_info
3303 .page_infos
3304 .iter()
3305 .enumerate()
3306 .map(|(page_index, page_info)| {
3307 Self::page_info_to_scheduler(
3308 page_info,
3309 page_index,
3310 decompressors,
3311 cache_repetition_index,
3312 target_field,
3313 )
3314 })
3315 .collect::<Result<Vec<_>>>()?;
3316 Ok(Self {
3317 page_schedulers,
3318 column_index: column_info.index,
3319 })
3320 }
3321
3322 fn page_layout_to_scheduler(
3323 page_info: &PageInfo,
3324 page_layout: &PageLayout,
3325 decompressors: &dyn DecompressionStrategy,
3326 cache_repetition_index: bool,
3327 target_field: &Field,
3328 ) -> Result<Box<dyn StructuralPageScheduler>> {
3329 use pb21::page_layout::Layout;
3330 Ok(match page_layout.layout.as_ref().expect_ok()? {
3331 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3332 &page_info.buffer_offsets_and_sizes,
3333 page_info.priority,
3334 mini_block.num_items,
3335 mini_block,
3336 decompressors,
3337 )?),
3338 Layout::FullZipLayout(full_zip) => {
3339 let mut scheduler = FullZipScheduler::try_new(
3340 &page_info.buffer_offsets_and_sizes,
3341 page_info.priority,
3342 page_info.num_rows,
3343 full_zip,
3344 decompressors,
3345 )?;
3346 scheduler.enable_cache = cache_repetition_index;
3347 Box::new(scheduler)
3348 }
3349 Layout::ConstantLayout(constant_layout) => {
3350 let def_meaning = constant_layout
3351 .layers
3352 .iter()
3353 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3354 .collect::<Vec<_>>();
3355 let has_scalar_value = constant_layout.inline_value.is_some()
3356 || page_info.buffer_offsets_and_sizes.len() == 1
3357 || page_info.buffer_offsets_and_sizes.len() == 3;
3358 if has_scalar_value {
3359 Box::new(constant::ConstantPageScheduler::try_new(
3360 page_info.buffer_offsets_and_sizes.clone(),
3361 constant_layout.inline_value.clone(),
3362 target_field.data_type(),
3363 def_meaning.into(),
3364 )?) as Box<dyn StructuralPageScheduler>
3365 } else if def_meaning.len() == 1
3366 && def_meaning[0] == DefinitionInterpretation::NullableItem
3367 {
3368 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3369 } else {
3370 let rep_decompressor = constant_layout
3371 .rep_compression
3372 .as_ref()
3373 .map(|encoding| decompressors.create_block_decompressor(encoding))
3374 .transpose()?
3375 .map(Arc::from);
3376
3377 let def_decompressor = constant_layout
3378 .def_compression
3379 .as_ref()
3380 .map(|encoding| decompressors.create_block_decompressor(encoding))
3381 .transpose()?
3382 .map(Arc::from);
3383
3384 Box::new(ComplexAllNullScheduler::new(
3385 page_info.buffer_offsets_and_sizes.clone(),
3386 def_meaning.into(),
3387 rep_decompressor,
3388 def_decompressor,
3389 constant_layout.num_rep_values,
3390 constant_layout.num_def_values,
3391 )) as Box<dyn StructuralPageScheduler>
3392 }
3393 }
3394 Layout::BlobLayout(blob) => {
3395 let inner_scheduler = Self::page_layout_to_scheduler(
3396 page_info,
3397 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3398 decompressors,
3399 cache_repetition_index,
3400 target_field,
3401 )?;
3402 let def_meaning = blob
3403 .layers
3404 .iter()
3405 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3406 .collect::<Vec<_>>();
3407 if matches!(target_field.data_type(), DataType::Struct(_)) {
3408 Box::new(BlobDescriptionPageScheduler::new(
3410 inner_scheduler,
3411 def_meaning.into(),
3412 ))
3413 } else {
3414 Box::new(BlobPageScheduler::new(
3416 inner_scheduler,
3417 page_info.priority,
3418 page_info.num_rows,
3419 def_meaning.into(),
3420 ))
3421 }
3422 }
3423 })
3424 }
3425
3426 fn page_info_to_scheduler(
3427 page_info: &PageInfo,
3428 page_index: usize,
3429 decompressors: &dyn DecompressionStrategy,
3430 cache_repetition_index: bool,
3431 target_field: &Field,
3432 ) -> Result<PageInfoAndScheduler> {
3433 let page_layout = page_info.encoding.as_structural();
3434 let scheduler = Self::page_layout_to_scheduler(
3435 page_info,
3436 page_layout,
3437 decompressors,
3438 cache_repetition_index,
3439 target_field,
3440 )?;
3441 Ok(PageInfoAndScheduler {
3442 page_index,
3443 num_rows: page_info.num_rows,
3444 scheduler,
3445 })
3446 }
3447}
3448
3449pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3450 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3451}
3452
3453pub struct NoCachedPageData;
3454
3455impl DeepSizeOf for NoCachedPageData {
3456 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3457 0
3458 }
3459}
3460impl CachedPageData for NoCachedPageData {
3461 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3462 self
3463 }
3464}
3465
3466pub struct CachedFieldData {
3467 pages: Vec<Arc<dyn CachedPageData>>,
3468}
3469
3470impl DeepSizeOf for CachedFieldData {
3471 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3472 self.pages.deep_size_of_children(ctx)
3473 }
3474}
3475
3476#[derive(Debug, Clone)]
3478pub struct FieldDataCacheKey {
3479 pub column_index: u32,
3480}
3481
3482impl CacheKey for FieldDataCacheKey {
3483 type ValueType = CachedFieldData;
3484
3485 fn key(&self) -> std::borrow::Cow<'_, str> {
3486 self.column_index.to_string().into()
3487 }
3488
3489 fn type_name() -> &'static str {
3490 "FieldData"
3491 }
3492}
3493
3494impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3495 fn initialize<'a>(
3496 &'a mut self,
3497 _filter: &'a FilterExpression,
3498 context: &'a SchedulerContext,
3499 ) -> BoxFuture<'a, Result<()>> {
3500 let cache_key = FieldDataCacheKey {
3501 column_index: self.column_index,
3502 };
3503 let cache = context.cache().clone();
3504
3505 async move {
3506 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3507 self.page_schedulers
3508 .iter_mut()
3509 .zip(cached_data.pages.iter())
3510 .for_each(|(page_scheduler, cached_data)| {
3511 page_scheduler.scheduler.load(cached_data);
3512 });
3513 return Ok(());
3514 }
3515
3516 let page_data = self
3517 .page_schedulers
3518 .iter_mut()
3519 .map(|s| s.scheduler.initialize(context.io()))
3520 .collect::<FuturesOrdered<_>>();
3521
3522 let page_data = page_data.try_collect::<Vec<_>>().await?;
3523 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3524 cache.insert_with_key(&cache_key, cached_data).await;
3525 Ok(())
3526 }
3527 .boxed()
3528 }
3529
3530 fn schedule_ranges<'a>(
3531 &'a self,
3532 ranges: &[Range<u64>],
3533 _filter: &FilterExpression,
3534 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3535 let ranges = ranges.to_vec();
3536 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3537 self, ranges,
3538 )))
3539 }
3540}
3541
3542#[derive(Debug)]
3545pub struct StructuralCompositeDecodeArrayTask {
3546 tasks: Vec<Box<dyn DecodePageTask>>,
3547 should_validate: bool,
3548 data_type: DataType,
3549}
3550
3551impl StructuralCompositeDecodeArrayTask {
3552 fn restore_validity(
3553 array: Arc<dyn Array>,
3554 unraveler: &mut CompositeRepDefUnraveler,
3555 ) -> Arc<dyn Array> {
3556 let validity = unraveler.unravel_validity(array.len());
3557 let Some(validity) = validity else {
3558 return array;
3559 };
3560 if array.data_type() == &DataType::Null {
3561 return array;
3563 }
3564 assert_eq!(validity.len(), array.len());
3565 make_array(unsafe {
3568 array
3569 .to_data()
3570 .into_builder()
3571 .nulls(Some(validity))
3572 .build_unchecked()
3573 })
3574 }
3575}
3576
3577impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3578 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3579 let mut arrays = Vec::with_capacity(self.tasks.len());
3580 let mut unravelers = Vec::with_capacity(self.tasks.len());
3581 let mut data_size = 0u64;
3582 for task in self.tasks {
3583 let decoded = task.decode()?;
3584 data_size += decoded.data.data_size();
3585 unravelers.push(decoded.repdef);
3586
3587 let array = make_array(
3588 decoded
3589 .data
3590 .into_arrow(self.data_type.clone(), self.should_validate)?,
3591 );
3592
3593 arrays.push(array);
3594 }
3595 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3596 let array = arrow_select::concat::concat(&array_refs)?;
3597 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3598
3599 let array = Self::restore_validity(array, &mut repdef);
3600
3601 Ok(DecodedArray {
3602 array,
3603 repdef,
3604 data_size,
3605 })
3606 }
3607}
3608
3609#[derive(Debug)]
3610pub struct StructuralPrimitiveFieldDecoder {
3611 field: Arc<ArrowField>,
3612 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3613 should_validate: bool,
3614 rows_drained_in_current: u64,
3615}
3616
3617impl StructuralPrimitiveFieldDecoder {
3618 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3619 Self {
3620 field: field.clone(),
3621 page_decoders: VecDeque::new(),
3622 should_validate,
3623 rows_drained_in_current: 0,
3624 }
3625 }
3626}
3627
3628impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3629 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3630 assert!(child.path.is_empty());
3631 self.page_decoders.push_back(child.decoder);
3632 Ok(())
3633 }
3634
3635 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3636 let mut remaining = num_rows;
3637 let mut tasks = Vec::new();
3638 while remaining > 0 {
3639 let cur_page = self.page_decoders.front_mut().unwrap();
3640 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3641 let to_take = num_in_page.min(remaining);
3642
3643 let task = cur_page.drain(to_take)?;
3644 tasks.push(task);
3645
3646 if to_take == num_in_page {
3647 self.page_decoders.pop_front();
3648 self.rows_drained_in_current = 0;
3649 } else {
3650 self.rows_drained_in_current += to_take;
3651 }
3652
3653 remaining -= to_take;
3654 }
3655 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3656 tasks,
3657 should_validate: self.should_validate,
3658 data_type: self.field.data_type().clone(),
3659 }))
3660 }
3661
3662 fn data_type(&self) -> &DataType {
3663 self.field.data_type()
3664 }
3665}
3666
3667struct SerializedFullZip {
3669 values: LanceBuffer,
3671 repetition_index: Option<LanceBuffer>,
3673}
3674
3675const MINIBLOCK_ALIGNMENT: usize = 8;
3695
3696pub struct PrimitiveStructuralEncoder {
3723 accumulation_queue: AccumulationQueue,
3725
3726 keep_original_array: bool,
3727 support_large_chunk: bool,
3728 accumulated_repdefs: Vec<RepDefBuilder>,
3729 compression_strategy: Arc<dyn CompressionStrategy>,
3731 column_index: u32,
3732 field: Field,
3733 encoding_metadata: Arc<HashMap<String, String>>,
3734 version: LanceFileVersion,
3735}
3736
3737struct CompressedLevelsChunk {
3738 data: LanceBuffer,
3739 num_levels: u16,
3740}
3741
3742struct CompressedLevels {
3743 data: Vec<CompressedLevelsChunk>,
3744 compression: CompressiveEncoding,
3745 rep_index: Option<LanceBuffer>,
3746}
3747
3748struct SerializedMiniBlockPage {
3749 num_buffers: u64,
3750 data: LanceBuffer,
3751 metadata: LanceBuffer,
3752}
3753
3754#[derive(Debug, Clone, Copy)]
3755struct DictEncodingBudget {
3756 max_dict_entries: u32,
3757 max_encoded_size: usize,
3758}
3759
3760impl PrimitiveStructuralEncoder {
3761 pub fn try_new(
3762 options: &EncodingOptions,
3763 compression_strategy: Arc<dyn CompressionStrategy>,
3764 column_index: u32,
3765 field: Field,
3766 encoding_metadata: Arc<HashMap<String, String>>,
3767 ) -> Result<Self> {
3768 Ok(Self {
3769 accumulation_queue: AccumulationQueue::new(
3770 options.cache_bytes_per_column,
3771 column_index,
3772 options.keep_original_array,
3773 ),
3774 support_large_chunk: options.support_large_chunk(),
3775 keep_original_array: options.keep_original_array,
3776 accumulated_repdefs: Vec::new(),
3777 column_index,
3778 compression_strategy,
3779 field,
3780 encoding_metadata,
3781 version: options.version,
3782 })
3783 }
3784
3785 fn is_narrow(data_block: &DataBlock) -> bool {
3793 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3794
3795 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3796 let max_len_array = max_len_array
3797 .as_any()
3798 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3799 .unwrap();
3800 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3801 return true;
3802 }
3803 }
3804 false
3805 }
3806
3807 fn prefers_miniblock(
3808 data_block: &DataBlock,
3809 encoding_metadata: &HashMap<String, String>,
3810 ) -> bool {
3811 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3813 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3814 }
3815 Self::is_narrow(data_block)
3817 }
3818
3819 fn repdef_too_sparse_for_miniblock(
3832 repdef: &crate::repdef::SerializedRepDefs,
3833 num_values: u64,
3834 ) -> bool {
3835 if num_values == 0 {
3836 return false;
3837 }
3838 let num_levels = repdef
3839 .repetition_levels
3840 .as_ref()
3841 .map(|r| r.len() as u64)
3842 .max(repdef.definition_levels.as_ref().map(|d| d.len() as u64))
3843 .unwrap_or(0);
3844 if num_levels == 0 {
3845 return false;
3846 }
3847
3848 let bits_per_rep = repdef
3850 .repetition_levels
3851 .as_ref()
3852 .and_then(|r| r.iter().max().copied())
3853 .map(|max_val| u16::BITS - max_val.leading_zeros())
3854 .unwrap_or(0) as u64;
3855 let bits_per_def = repdef
3856 .definition_levels
3857 .as_ref()
3858 .and_then(|d| d.iter().max().copied())
3859 .map(|max_val| u16::BITS - max_val.leading_zeros())
3860 .unwrap_or(0) as u64;
3861
3862 let bits_per_level = bits_per_rep + bits_per_def;
3863 if bits_per_level == 0 {
3864 return false;
3865 }
3866
3867 const REPDEF_BUDGET_BITS: u64 = 16 * 1024 * 8;
3869 let max_levels_per_chunk = REPDEF_BUDGET_BITS / bits_per_level;
3870
3871 let levels_per_chunk =
3874 (num_levels as f64 / num_values as f64) * *miniblock::MAX_MINIBLOCK_VALUES as f64;
3875
3876 levels_per_chunk > max_levels_per_chunk as f64
3877 }
3878
3879 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3880 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3884 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3885 }
3886 true
3887 }
3888
3889 fn serialize_miniblocks(
3936 miniblocks: MiniBlockCompressed,
3937 rep: Option<Vec<CompressedLevelsChunk>>,
3938 def: Option<Vec<CompressedLevelsChunk>>,
3939 support_large_chunk: bool,
3940 ) -> Result<SerializedMiniBlockPage> {
3941 let bytes_rep = rep
3942 .as_ref()
3943 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3944 .unwrap_or(0);
3945 let bytes_def = def
3946 .as_ref()
3947 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3948 .unwrap_or(0);
3949 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3950 let mut num_buffers = miniblocks.data.len();
3951 if rep.is_some() {
3952 num_buffers += 1;
3953 }
3954 if def.is_some() {
3955 num_buffers += 1;
3956 }
3957 let max_extra = 9 * num_buffers;
3959 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3960 let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3961 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3962
3963 let mut rep_iter = rep.map(|r| r.into_iter());
3964 let mut def_iter = def.map(|d| d.into_iter());
3965
3966 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3967 for chunk in miniblocks.chunks {
3968 let start_pos = data_buffer.len();
3969 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3971
3972 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3973 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3974
3975 let num_levels = rep
3977 .as_ref()
3978 .map(|r| r.num_levels)
3979 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3980 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3981
3982 if let Some(rep) = rep.as_ref() {
3984 let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
3985 Error::internal(format!(
3986 "Repetition buffer size ({} bytes) too large",
3987 rep.data.len()
3988 ))
3989 })?;
3990 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3991 }
3992 if let Some(def) = def.as_ref() {
3993 let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
3994 Error::internal(format!(
3995 "Definition buffer size ({} bytes) too large",
3996 def.data.len()
3997 ))
3998 })?;
3999 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
4000 }
4001
4002 if support_large_chunk {
4003 for &buffer_size in &chunk.buffer_sizes {
4004 data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
4005 }
4006 } else {
4007 for &buffer_size in &chunk.buffer_sizes {
4008 data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
4009 }
4010 }
4011
4012 let add_padding = |data_buffer: &mut Vec<u8>| {
4014 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
4015 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
4016 };
4017 add_padding(&mut data_buffer);
4018
4019 if let Some(rep) = rep.as_ref() {
4021 data_buffer.extend_from_slice(&rep.data);
4022 add_padding(&mut data_buffer);
4023 }
4024 if let Some(def) = def.as_ref() {
4025 data_buffer.extend_from_slice(&def.data);
4026 add_padding(&mut data_buffer);
4027 }
4028 for (buffer_size, (buffer, buffer_offset)) in chunk
4029 .buffer_sizes
4030 .iter()
4031 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
4032 {
4033 let start = *buffer_offset;
4034 let end = start + *buffer_size as usize;
4035 *buffer_offset += *buffer_size as usize;
4036 data_buffer.extend_from_slice(&buffer[start..end]);
4037 add_padding(&mut data_buffer);
4038 }
4039
4040 let chunk_bytes = data_buffer.len() - start_pos;
4041 let max_chunk_size = if support_large_chunk {
4042 4 * 1024 * 1024 * 1024 } else {
4044 32 * 1024 };
4046 assert!(chunk_bytes <= max_chunk_size);
4047 assert!(chunk_bytes > 0);
4048 assert_eq!(chunk_bytes % 8, 0);
4049 assert!(chunk.log_num_values <= 12);
4051 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
4055 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
4056
4057 let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
4058 if support_large_chunk {
4059 meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
4060 } else {
4061 meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
4062 }
4063 }
4064
4065 let data_buffer = LanceBuffer::from(data_buffer);
4066 let metadata_buffer = LanceBuffer::from(meta_buffer);
4067
4068 Ok(SerializedMiniBlockPage {
4069 num_buffers: miniblocks.data.len() as u64,
4070 data: data_buffer,
4071 metadata: metadata_buffer,
4072 })
4073 }
4074
4075 fn compress_levels(
4080 mut levels: RepDefSlicer<'_>,
4081 num_elements: u64,
4082 compression_strategy: &dyn CompressionStrategy,
4083 chunks: &[MiniBlockChunk],
4084 max_rep: u16,
4086 ) -> Result<CompressedLevels> {
4087 let mut rep_index = if max_rep > 0 {
4088 Vec::with_capacity(chunks.len())
4089 } else {
4090 vec![]
4091 };
4092 let num_levels = levels.num_levels() as u64;
4094 let levels_buf = levels.all_levels().clone();
4095
4096 let mut fixed_width_block = FixedWidthDataBlock {
4097 data: levels_buf,
4098 bits_per_value: 16,
4099 num_values: num_levels,
4100 block_info: BlockInfo::new(),
4101 };
4102 fixed_width_block.compute_stat();
4104
4105 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4106 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4107 let (compressor, compressor_desc) =
4109 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4110 let mut level_chunks = Vec::with_capacity(chunks.len());
4112 let mut values_counter = 0;
4113 for (chunk_idx, chunk) in chunks.iter().enumerate() {
4114 let chunk_num_values = chunk.num_values(values_counter, num_elements);
4115 debug_assert!(chunk_num_values > 0);
4116 values_counter += chunk_num_values;
4117 let chunk_levels = if chunk_idx < chunks.len() - 1 {
4118 levels.slice_next(chunk_num_values as usize)
4119 } else {
4120 levels.slice_rest()
4121 };
4122 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
4123 if max_rep > 0 {
4124 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
4134 let rep_values = rep_values.as_ref();
4135
4136 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
4139 let num_leftovers = if chunk_idx < chunks.len() - 1 {
4140 rep_values
4141 .iter()
4142 .rev()
4143 .position(|v| *v == max_rep)
4144 .map(|pos| pos + 1)
4146 .unwrap_or(rep_values.len())
4147 } else {
4148 0
4150 };
4151
4152 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4153 let rep_len = rep_index.len();
4157 if rep_index[rep_len - 1] != 0 {
4158 rep_index[rep_len - 2] += 1;
4160 rep_index[rep_len - 1] = 0;
4161 }
4162 }
4163
4164 if chunk_idx == chunks.len() - 1 {
4165 num_rows += 1;
4167 }
4168 rep_index.push(num_rows as u64);
4169 rep_index.push(num_leftovers as u64);
4170 }
4171 let mut chunk_fixed_width = FixedWidthDataBlock {
4172 data: chunk_levels,
4173 bits_per_value: 16,
4174 num_values: num_chunk_levels,
4175 block_info: BlockInfo::new(),
4176 };
4177 chunk_fixed_width.compute_stat();
4178 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4179 let compressed_levels = compressor.compress(chunk_levels_block)?;
4180 level_chunks.push(CompressedLevelsChunk {
4181 data: compressed_levels,
4182 num_levels: num_chunk_levels as u16,
4183 });
4184 }
4185 debug_assert_eq!(levels.num_levels_remaining(), 0);
4186 let rep_index = if rep_index.is_empty() {
4187 None
4188 } else {
4189 Some(LanceBuffer::reinterpret_vec(rep_index))
4190 };
4191 Ok(CompressedLevels {
4192 data: level_chunks,
4193 compression: compressor_desc,
4194 rep_index,
4195 })
4196 }
4197
4198 fn encode_simple_all_null(
4199 column_idx: u32,
4200 num_rows: u64,
4201 row_number: u64,
4202 ) -> Result<EncodedPage> {
4203 let description =
4204 ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4205 Ok(EncodedPage {
4206 column_idx,
4207 data: vec![],
4208 description: PageEncoding::Structural(description),
4209 num_rows,
4210 row_number,
4211 })
4212 }
4213
4214 fn encode_complex_all_null_vals(
4215 data: &Arc<[u16]>,
4216 compression_strategy: &dyn CompressionStrategy,
4217 ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4218 let buffer = LanceBuffer::reinterpret_slice(data.clone());
4219 let mut fixed_width_block = FixedWidthDataBlock {
4220 data: buffer,
4221 bits_per_value: 16,
4222 num_values: data.len() as u64,
4223 block_info: BlockInfo::new(),
4224 };
4225 fixed_width_block.compute_stat();
4226
4227 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4228 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4229 let (compressor, encoding) =
4230 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4231 let compressed_buffer = compressor.compress(levels_block)?;
4232 Ok((compressed_buffer, encoding))
4233 }
4234
4235 fn encode_complex_all_null(
4239 column_idx: u32,
4240 repdef: crate::repdef::SerializedRepDefs,
4241 row_number: u64,
4242 num_rows: u64,
4243 version: LanceFileVersion,
4244 compression_strategy: &dyn CompressionStrategy,
4245 ) -> Result<EncodedPage> {
4246 if version.resolve() < LanceFileVersion::V2_2 {
4247 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4248 LanceBuffer::reinterpret_slice(rep.clone())
4249 } else {
4250 LanceBuffer::empty()
4251 };
4252
4253 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4254 LanceBuffer::reinterpret_slice(def.clone())
4255 } else {
4256 LanceBuffer::empty()
4257 };
4258
4259 let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4260 return Ok(EncodedPage {
4261 column_idx,
4262 data: vec![rep_bytes, def_bytes],
4263 description: PageEncoding::Structural(description),
4264 num_rows,
4265 row_number,
4266 });
4267 }
4268
4269 let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4270 repdef.repetition_levels.as_ref()
4271 {
4272 let num_values = rep.len() as u64;
4273 let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4274 (buffer, Some(encoding), num_values)
4275 } else {
4276 (LanceBuffer::empty(), None, 0)
4277 };
4278
4279 let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4280 repdef.definition_levels.as_ref()
4281 {
4282 let num_values = def.len() as u64;
4283 let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4284 (buffer, Some(encoding), num_values)
4285 } else {
4286 (LanceBuffer::empty(), None, 0)
4287 };
4288
4289 let description = ProtobufUtils21::compressed_all_null_constant_layout(
4290 &repdef.def_meaning,
4291 rep_encoding,
4292 def_encoding,
4293 num_rep_values,
4294 num_def_values,
4295 );
4296 Ok(EncodedPage {
4297 column_idx,
4298 data: vec![rep_bytes, def_bytes],
4299 description: PageEncoding::Structural(description),
4300 num_rows,
4301 row_number,
4302 })
4303 }
4304
4305 fn leaf_validity(
4306 repdef: &crate::repdef::SerializedRepDefs,
4307 num_values: usize,
4308 ) -> Result<Option<BooleanBuffer>> {
4309 let rep = repdef
4310 .repetition_levels
4311 .as_ref()
4312 .map(|rep| rep.as_ref().to_vec());
4313 let def = repdef
4314 .definition_levels
4315 .as_ref()
4316 .map(|def| def.as_ref().to_vec());
4317 let mut unraveler = RepDefUnraveler::new(
4318 rep,
4319 def,
4320 repdef.def_meaning.clone().into(),
4321 num_values as u64,
4322 );
4323 if unraveler.is_all_valid() {
4324 return Ok(None);
4325 }
4326 let mut validity = BooleanBufferBuilder::new(num_values);
4327 unraveler.unravel_validity(&mut validity);
4328 Ok(Some(validity.finish()))
4329 }
4330
4331 fn is_constant_values(
4332 arrays: &[ArrayRef],
4333 scalar: &ArrayRef,
4334 validity: Option<&BooleanBuffer>,
4335 ) -> Result<bool> {
4336 debug_assert_eq!(scalar.len(), 1);
4337 debug_assert_eq!(scalar.null_count(), 0);
4338
4339 match scalar.data_type() {
4340 DataType::Boolean => {
4341 let mut global_idx = 0usize;
4342 let scalar_val = scalar.as_boolean().value(0);
4343 for arr in arrays {
4344 let bool_arr = arr.as_boolean();
4345 for i in 0..arr.len() {
4346 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4347 global_idx += 1;
4348 if !is_valid {
4349 continue;
4350 }
4351 if bool_arr.value(i) != scalar_val {
4352 return Ok(false);
4353 }
4354 }
4355 }
4356 Ok(true)
4357 }
4358 DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4359 DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4360 DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4361 DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4362 data_type => {
4363 let mut global_idx = 0usize;
4364 let Some(byte_width) = data_type.byte_width_opt() else {
4365 return Ok(false);
4366 };
4367 let scalar_data = scalar.to_data();
4368 if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4369 return Ok(false);
4370 }
4371 let scalar_bytes = scalar_data.buffers()[0].as_slice();
4372 if scalar_bytes.len() != byte_width {
4373 return Ok(false);
4374 }
4375
4376 for arr in arrays {
4377 let data = arr.to_data();
4378 if data.buffers().is_empty() {
4379 return Ok(false);
4380 }
4381 let buf = data.buffers()[0].as_slice();
4382 let base = data.offset();
4383 for i in 0..arr.len() {
4384 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4385 global_idx += 1;
4386 if !is_valid {
4387 continue;
4388 }
4389 let start = (base + i) * byte_width;
4390 if buf[start..start + byte_width] != scalar_bytes[..] {
4391 return Ok(false);
4392 }
4393 }
4394 }
4395 Ok(true)
4396 }
4397 }
4398 }
4399
4400 fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4401 arrays: &[ArrayRef],
4402 scalar: &ArrayRef,
4403 validity: Option<&BooleanBuffer>,
4404 ) -> Result<bool> {
4405 debug_assert_eq!(scalar.len(), 1);
4406 let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4407 let mut global_idx = 0usize;
4408 for arr in arrays {
4409 let str_arr = arr.as_string::<O>();
4410 for i in 0..arr.len() {
4411 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4412 global_idx += 1;
4413 if !is_valid {
4414 continue;
4415 }
4416 if str_arr.value(i).as_bytes() != scalar_val {
4417 return Ok(false);
4418 }
4419 }
4420 }
4421 Ok(true)
4422 }
4423
4424 fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4425 arrays: &[ArrayRef],
4426 scalar: &ArrayRef,
4427 validity: Option<&BooleanBuffer>,
4428 ) -> Result<bool> {
4429 debug_assert_eq!(scalar.len(), 1);
4430 let scalar_val = scalar.as_binary::<O>().value(0);
4431 let mut global_idx = 0usize;
4432 for arr in arrays {
4433 let bin_arr = arr.as_binary::<O>();
4434 for i in 0..arr.len() {
4435 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4436 global_idx += 1;
4437 if !is_valid {
4438 continue;
4439 }
4440 if bin_arr.value(i) != scalar_val {
4441 return Ok(false);
4442 }
4443 }
4444 }
4445 Ok(true)
4446 }
4447
4448 fn find_constant_scalar(
4449 arrays: &[ArrayRef],
4450 validity: Option<&BooleanBuffer>,
4451 ) -> Result<Option<ArrayRef>> {
4452 if arrays.is_empty() {
4453 return Ok(None);
4454 }
4455
4456 let global_scalar_idx = if let Some(validity) = validity {
4457 let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4458 return Ok(None);
4459 };
4460 idx
4461 } else {
4462 0
4463 };
4464
4465 let mut idx_remaining = global_scalar_idx;
4466 let mut scalar_arr_idx = 0usize;
4467 while scalar_arr_idx < arrays.len() {
4468 let len = arrays[scalar_arr_idx].len();
4469 if idx_remaining < len {
4470 break;
4471 }
4472 idx_remaining -= len;
4473 scalar_arr_idx += 1;
4474 }
4475
4476 if scalar_arr_idx >= arrays.len() {
4477 return Ok(None);
4478 }
4479
4480 let scalar =
4481 lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4482 if scalar.null_count() != 0 {
4483 return Ok(None);
4484 }
4485 if !Self::is_constant_values(arrays, &scalar, validity)? {
4486 return Ok(None);
4487 }
4488 Ok(Some(scalar))
4489 }
4490
4491 fn resolve_dict_values_compression_metadata(
4492 field_metadata: &HashMap<String, String>,
4493 env_compression: Option<String>,
4494 env_compression_level: Option<String>,
4495 ) -> HashMap<String, String> {
4496 let mut metadata = HashMap::new();
4497
4498 let compression = field_metadata
4499 .get(DICT_VALUES_COMPRESSION_META_KEY)
4500 .cloned()
4501 .or(env_compression)
4502 .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4503 metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4504
4505 if let Some(compression_level) = field_metadata
4506 .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4507 .cloned()
4508 .or(env_compression_level)
4509 {
4510 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4511 }
4512
4513 metadata
4514 }
4515
4516 fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4517 let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4522 dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4523 &field.metadata,
4524 env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4525 env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4526 );
4527 Ok(dict_values_field)
4528 }
4529
4530 #[allow(clippy::too_many_arguments)]
4531 fn encode_miniblock(
4532 column_idx: u32,
4533 field: &Field,
4534 compression_strategy: &dyn CompressionStrategy,
4535 data: DataBlock,
4536 repdef: crate::repdef::SerializedRepDefs,
4537 row_number: u64,
4538 dictionary_data: Option<DataBlock>,
4539 num_rows: u64,
4540 support_large_chunk: bool,
4541 ) -> Result<EncodedPage> {
4542 if let DataBlock::AllNull(_null_block) = data {
4543 unreachable!()
4546 }
4547
4548 let num_items = data.num_values();
4549
4550 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4551 let (compressed_data, value_encoding) = compressor.compress(data)?;
4552
4553 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4554
4555 let mut compressed_rep = repdef
4556 .rep_slicer()
4557 .map(|rep_slicer| {
4558 Self::compress_levels(
4559 rep_slicer,
4560 num_items,
4561 compression_strategy,
4562 &compressed_data.chunks,
4563 max_rep,
4564 )
4565 })
4566 .transpose()?;
4567
4568 let (rep_index, rep_index_depth) =
4569 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4570 Some(rep_index) => (Some(rep_index.clone()), 1),
4571 None => (None, 0),
4572 };
4573
4574 let mut compressed_def = repdef
4575 .def_slicer()
4576 .map(|def_slicer| {
4577 Self::compress_levels(
4578 def_slicer,
4579 num_items,
4580 compression_strategy,
4581 &compressed_data.chunks,
4582 0,
4583 )
4584 })
4585 .transpose()?;
4586
4587 let rep_data = compressed_rep
4593 .as_mut()
4594 .map(|cr| std::mem::take(&mut cr.data));
4595 let def_data = compressed_def
4596 .as_mut()
4597 .map(|cd| std::mem::take(&mut cd.data));
4598
4599 let serialized =
4600 Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
4601
4602 let mut data = Vec::with_capacity(4);
4604 data.push(serialized.metadata);
4605 data.push(serialized.data);
4606
4607 if let Some(dictionary_data) = dictionary_data {
4608 let num_dictionary_items = dictionary_data.num_values();
4609 let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4610
4611 let (compressor, dictionary_encoding) = compression_strategy
4612 .create_block_compressor(&dict_values_field, &dictionary_data)?;
4613 let dictionary_buffer = compressor.compress(dictionary_data)?;
4614
4615 data.push(dictionary_buffer);
4616 if let Some(rep_index) = rep_index {
4617 data.push(rep_index);
4618 }
4619
4620 let description = ProtobufUtils21::miniblock_layout(
4621 compressed_rep.map(|cr| cr.compression),
4622 compressed_def.map(|cd| cd.compression),
4623 value_encoding,
4624 rep_index_depth,
4625 serialized.num_buffers,
4626 Some((dictionary_encoding, num_dictionary_items)),
4627 &repdef.def_meaning,
4628 num_items,
4629 support_large_chunk,
4630 );
4631 Ok(EncodedPage {
4632 num_rows,
4633 column_idx,
4634 data,
4635 description: PageEncoding::Structural(description),
4636 row_number,
4637 })
4638 } else {
4639 let description = ProtobufUtils21::miniblock_layout(
4640 compressed_rep.map(|cr| cr.compression),
4641 compressed_def.map(|cd| cd.compression),
4642 value_encoding,
4643 rep_index_depth,
4644 serialized.num_buffers,
4645 None,
4646 &repdef.def_meaning,
4647 num_items,
4648 support_large_chunk,
4649 );
4650
4651 if let Some(rep_index) = rep_index {
4652 let view = rep_index.borrow_to_typed_slice::<u64>();
4653 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4654 debug_assert_eq!(total, num_rows);
4655
4656 data.push(rep_index);
4657 }
4658
4659 Ok(EncodedPage {
4660 num_rows,
4661 column_idx,
4662 data,
4663 description: PageEncoding::Structural(description),
4664 row_number,
4665 })
4666 }
4667 }
4668
4669 fn serialize_full_zip_fixed(
4671 fixed: FixedWidthDataBlock,
4672 mut repdef: ControlWordIterator,
4673 num_values: u64,
4674 ) -> SerializedFullZip {
4675 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4676 let mut zipped_data = Vec::with_capacity(len);
4677
4678 let max_rep_index_val = if repdef.has_repetition() {
4679 len as u64
4680 } else {
4681 0
4683 };
4684 let mut rep_index_builder =
4685 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4686
4687 assert_eq!(
4690 fixed.bits_per_value % 8,
4691 0,
4692 "Non-byte aligned full-zip compression not yet supported"
4693 );
4694
4695 let bytes_per_value = fixed.bits_per_value as usize / 8;
4696 let mut offset = 0;
4697
4698 if bytes_per_value == 0 {
4699 while let Some(control) = repdef.append_next(&mut zipped_data) {
4701 if control.is_new_row {
4702 debug_assert!(offset <= len);
4704 unsafe { rep_index_builder.append(offset as u64) };
4706 }
4707 offset = zipped_data.len();
4708 }
4709 } else {
4710 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4712 while let Some(control) = repdef.append_next(&mut zipped_data) {
4713 if control.is_new_row {
4714 debug_assert!(offset <= len);
4716 unsafe { rep_index_builder.append(offset as u64) };
4718 }
4719 if control.is_visible {
4720 let value = data_iter.next().unwrap();
4721 zipped_data.extend_from_slice(value);
4722 }
4723 offset = zipped_data.len();
4724 }
4725 }
4726
4727 debug_assert_eq!(zipped_data.len(), len);
4728 unsafe {
4731 rep_index_builder.append(zipped_data.len() as u64);
4732 }
4733
4734 let zipped_data = LanceBuffer::from(zipped_data);
4735 let rep_index = rep_index_builder.into_data();
4736 let rep_index = if rep_index.is_empty() {
4737 None
4738 } else {
4739 Some(LanceBuffer::from(rep_index))
4740 };
4741 SerializedFullZip {
4742 values: zipped_data,
4743 repetition_index: rep_index,
4744 }
4745 }
4746
4747 fn serialize_full_zip_variable(
4751 variable: VariableWidthBlock,
4752 mut repdef: ControlWordIterator,
4753 num_items: u64,
4754 ) -> SerializedFullZip {
4755 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4756 assert_eq!(
4757 variable.bits_per_offset % 8,
4758 0,
4759 "Only byte-aligned offsets supported"
4760 );
4761 let len = variable.data.len()
4762 + repdef.bytes_per_word() * num_items as usize
4763 + bytes_per_offset * variable.num_values as usize;
4764 let mut buf = Vec::with_capacity(len);
4765
4766 let max_rep_index_val = len as u64;
4767 let mut rep_index_builder =
4768 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4769
4770 match bytes_per_offset {
4772 4 => {
4773 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4774 let mut rep_offset = 0;
4775 let mut windows_iter = offs.as_ref().windows(2);
4776 while let Some(control) = repdef.append_next(&mut buf) {
4777 if control.is_new_row {
4778 debug_assert!(rep_offset <= len);
4780 unsafe { rep_index_builder.append(rep_offset as u64) };
4782 }
4783 if control.is_visible {
4784 let window = windows_iter.next().unwrap();
4785 if control.is_valid_item {
4786 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4787 buf.extend_from_slice(
4788 &variable.data[window[0] as usize..window[1] as usize],
4789 );
4790 }
4791 }
4792 rep_offset = buf.len();
4793 }
4794 }
4795 8 => {
4796 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4797 let mut rep_offset = 0;
4798 let mut windows_iter = offs.as_ref().windows(2);
4799 while let Some(control) = repdef.append_next(&mut buf) {
4800 if control.is_new_row {
4801 debug_assert!(rep_offset <= len);
4803 unsafe { rep_index_builder.append(rep_offset as u64) };
4805 }
4806 if control.is_visible {
4807 let window = windows_iter.next().unwrap();
4808 if control.is_valid_item {
4809 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4810 buf.extend_from_slice(
4811 &variable.data[window[0] as usize..window[1] as usize],
4812 );
4813 }
4814 }
4815 rep_offset = buf.len();
4816 }
4817 }
4818 _ => panic!("Unsupported offset size"),
4819 }
4820
4821 debug_assert!(buf.len() <= len);
4824 unsafe {
4827 rep_index_builder.append(buf.len() as u64);
4828 }
4829
4830 let zipped_data = LanceBuffer::from(buf);
4831 let rep_index = rep_index_builder.into_data();
4832 debug_assert!(!rep_index.is_empty());
4833 let rep_index = Some(LanceBuffer::from(rep_index));
4834 SerializedFullZip {
4835 values: zipped_data,
4836 repetition_index: rep_index,
4837 }
4838 }
4839
4840 fn serialize_full_zip(
4843 compressed_data: PerValueDataBlock,
4844 repdef: ControlWordIterator,
4845 num_items: u64,
4846 ) -> SerializedFullZip {
4847 match compressed_data {
4848 PerValueDataBlock::Fixed(fixed) => {
4849 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4850 }
4851 PerValueDataBlock::Variable(var) => {
4852 Self::serialize_full_zip_variable(var, repdef, num_items)
4853 }
4854 }
4855 }
4856
4857 fn expand_boolean_to_bytes(fixed: FixedWidthDataBlock) -> FixedWidthDataBlock {
4858 debug_assert_eq!(fixed.bits_per_value, 1);
4859 let num_values = fixed.num_values as usize;
4860 let bool_buf = BooleanBuffer::new(fixed.data.into_buffer(), 0, num_values);
4861 let expanded: Vec<u8> = (0..num_values).map(|i| bool_buf.value(i) as u8).collect();
4862 FixedWidthDataBlock {
4863 data: LanceBuffer::from(expanded),
4864 bits_per_value: 8,
4865 num_values: fixed.num_values,
4866 block_info: BlockInfo::new(),
4867 }
4868 }
4869
4870 fn encode_full_zip(
4871 column_idx: u32,
4872 field: &Field,
4873 compression_strategy: &dyn CompressionStrategy,
4874 data: DataBlock,
4875 repdef: crate::repdef::SerializedRepDefs,
4876 row_number: u64,
4877 num_lists: u64,
4878 ) -> Result<EncodedPage> {
4879 let max_rep = repdef
4880 .repetition_levels
4881 .as_ref()
4882 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4883 let max_def = repdef
4884 .definition_levels
4885 .as_ref()
4886 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4887
4888 let (num_items, num_visible_items) =
4892 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4893 (rep_levels.len() as u64, data.num_values())
4896 } else {
4897 (data.num_values(), data.num_values())
4899 };
4900
4901 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4902
4903 let repdef_iter = build_control_word_iterator(
4904 repdef.repetition_levels.as_deref(),
4905 max_rep,
4906 repdef.definition_levels.as_deref(),
4907 max_def,
4908 max_visible_def,
4909 num_items as usize,
4910 );
4911 let bits_rep = repdef_iter.bits_rep();
4912 let bits_def = repdef_iter.bits_def();
4913
4914 let data = match data {
4916 DataBlock::FixedWidth(fixed) if fixed.bits_per_value == 1 => {
4917 DataBlock::FixedWidth(Self::expand_boolean_to_bytes(fixed))
4918 }
4919 other => other,
4920 };
4921
4922 let compressor = compression_strategy.create_per_value(field, &data)?;
4923 let (compressed_data, value_encoding) = compressor.compress(data)?;
4924
4925 let description = match &compressed_data {
4926 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4927 bits_rep,
4928 bits_def,
4929 fixed.bits_per_value as u32,
4930 value_encoding,
4931 &repdef.def_meaning,
4932 num_items as u32,
4933 num_visible_items as u32,
4934 ),
4935 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4936 bits_rep,
4937 bits_def,
4938 variable.bits_per_offset as u32,
4939 value_encoding,
4940 &repdef.def_meaning,
4941 num_items as u32,
4942 num_visible_items as u32,
4943 ),
4944 };
4945
4946 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4947
4948 let data = if let Some(repindex) = zipped.repetition_index {
4949 vec![zipped.values, repindex]
4950 } else {
4951 vec![zipped.values]
4952 };
4953
4954 Ok(EncodedPage {
4955 num_rows: num_lists,
4956 column_idx,
4957 data,
4958 description: PageEncoding::Structural(description),
4959 row_number,
4960 })
4961 }
4962
4963 fn should_dictionary_encode(
4964 data_block: &DataBlock,
4965 field: &Field,
4966 version: LanceFileVersion,
4967 ) -> Option<DictEncodingBudget> {
4968 const DEFAULT_SAMPLE_SIZE: usize = 4096;
4969 const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
4970
4971 match data_block {
4974 DataBlock::FixedWidth(fixed) => {
4975 if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4976 return None;
4977 }
4978 if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4979 return None;
4980 }
4981 if fixed.bits_per_value % 8 != 0 {
4982 return None;
4983 }
4984 }
4985 DataBlock::VariableWidth(var) => {
4986 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4987 return None;
4988 }
4989 }
4990 _ => return None,
4991 }
4992
4993 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4995 .ok()
4996 .and_then(|val| val.parse().ok())
4997 .unwrap_or(100);
4998 if data_block.num_values() < too_small {
4999 return None;
5000 }
5001
5002 let num_values = data_block.num_values();
5003
5004 let divisor: u64 = field
5007 .metadata
5008 .get(DICT_DIVISOR_META_KEY)
5009 .and_then(|val| val.parse().ok())
5010 .or_else(|| {
5011 env::var("LANCE_ENCODING_DICT_DIVISOR")
5012 .ok()
5013 .and_then(|val| val.parse().ok())
5014 })
5015 .unwrap_or(DEFAULT_DICT_DIVISOR);
5016
5017 let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
5018 .ok()
5019 .and_then(|val| val.parse().ok())
5020 .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
5021
5022 let threshold_cardinality = num_values
5023 .checked_div(divisor.max(1))
5024 .unwrap_or(0)
5025 .min(max_cardinality);
5026 if threshold_cardinality == 0 {
5027 return None;
5028 }
5029
5030 let threshold_ratio = field
5032 .metadata
5033 .get(DICT_SIZE_RATIO_META_KEY)
5034 .and_then(|val| val.parse::<f64>().ok())
5035 .or_else(|| {
5036 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
5037 .ok()
5038 .and_then(|val| val.parse().ok())
5039 })
5040 .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
5041
5042 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
5043 panic!(
5044 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
5045 threshold_ratio
5046 );
5047 }
5048
5049 let data_size = data_block.data_size();
5050 if data_size == 0 {
5051 return None;
5052 }
5053
5054 let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
5055 let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
5056
5057 if Self::sample_is_near_unique(
5059 data_block,
5060 DEFAULT_SAMPLE_SIZE,
5061 DEFAULT_SAMPLE_UNIQUE_RATIO,
5062 )? {
5063 return None;
5064 }
5065
5066 let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
5067 Some(DictEncodingBudget {
5068 max_dict_entries,
5069 max_encoded_size,
5070 })
5071 }
5072
5073 fn sample_is_near_unique(
5079 data_block: &DataBlock,
5080 max_samples: usize,
5081 unique_ratio_threshold: f64,
5082 ) -> Option<bool> {
5083 use std::collections::HashSet;
5084
5085 if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
5086 return None;
5087 }
5088
5089 let num_values = usize::try_from(data_block.num_values()).ok()?;
5090 if num_values == 0 {
5091 return Some(false);
5092 }
5093
5094 let sample_count = num_values.min(max_samples).max(1);
5095 let step = (num_values / sample_count).max(1);
5097
5098 match data_block {
5099 DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
5100 64 => {
5101 let values = fixed.data.borrow_to_typed_slice::<u64>();
5102 let values = values.as_ref();
5103 let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5104 for idx in (0..num_values).step_by(step).take(sample_count) {
5105 unique.insert(values.get(idx).copied()?);
5106 }
5107 let ratio = unique.len() as f64 / sample_count as f64;
5108 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5110 }
5111 128 => {
5112 let values = fixed.data.borrow_to_typed_slice::<u128>();
5113 let values = values.as_ref();
5114 let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
5115 for idx in (0..num_values).step_by(step).take(sample_count) {
5116 unique.insert(values.get(idx).copied()?);
5117 }
5118 let ratio = unique.len() as f64 / sample_count as f64;
5119 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5120 }
5121 _ => Some(false),
5122 },
5123 DataBlock::VariableWidth(var) => {
5124 use xxhash_rust::xxh3::xxh3_64;
5125
5126 let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5128 match var.bits_per_offset {
5129 32 => {
5130 let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
5131 let offsets: &[u32] = offsets_ref.as_ref();
5132 for i in (0..num_values).step_by(step).take(sample_count) {
5133 let start = usize::try_from(*offsets.get(i)?).ok()?;
5134 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5135 if start > end || end > var.data.len() {
5136 return None;
5137 }
5138 unique.insert(xxh3_64(&var.data[start..end]));
5139 }
5140 }
5141 64 => {
5142 let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
5143 let offsets: &[u64] = offsets_ref.as_ref();
5144 for i in (0..num_values).step_by(step).take(sample_count) {
5145 let start = usize::try_from(*offsets.get(i)?).ok()?;
5146 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5147 if start > end || end > var.data.len() {
5148 return None;
5149 }
5150 unique.insert(xxh3_64(&var.data[start..end]));
5151 }
5152 }
5153 _ => return Some(false),
5154 }
5155 let ratio = unique.len() as f64 / sample_count as f64;
5156 Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5157 }
5158 _ => Some(false),
5159 }
5160 }
5161
5162 fn do_flush(
5164 &mut self,
5165 arrays: Vec<ArrayRef>,
5166 repdefs: Vec<RepDefBuilder>,
5167 row_number: u64,
5168 num_rows: u64,
5169 ) -> Result<Vec<EncodeTask>> {
5170 let column_idx = self.column_index;
5171 let compression_strategy = self.compression_strategy.clone();
5172 let field = self.field.clone();
5173 let encoding_metadata = self.encoding_metadata.clone();
5174 let support_large_chunk = self.support_large_chunk;
5175 let version = self.version;
5176 let task = spawn_cpu(move || {
5177 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5178 let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5179 let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5180 let repdef = RepDefBuilder::serialize(repdefs);
5181
5182 if num_values == 0 {
5183 log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
5187 return Self::encode_complex_all_null(
5188 column_idx,
5189 repdef,
5190 row_number,
5191 num_rows,
5192 version,
5193 compression_strategy.as_ref(),
5194 );
5195 }
5196
5197 let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5198 let all_null = leaf_validity
5199 .as_ref()
5200 .map(|validity| validity.count_set_bits() == 0)
5201 .unwrap_or(false);
5202
5203 if all_null {
5204 return if is_simple_validity {
5205 log::debug!(
5206 "Encoding column {} with {} items ({} rows) using simple-null layout",
5207 column_idx,
5208 num_values,
5209 num_rows
5210 );
5211 Self::encode_simple_all_null(column_idx, num_values, row_number)
5212 } else {
5213 log::debug!(
5214 "Encoding column {} with {} items ({} rows) using complex-null layout",
5215 column_idx,
5216 num_values,
5217 num_rows
5218 );
5219 Self::encode_complex_all_null(
5220 column_idx,
5221 repdef,
5222 row_number,
5223 num_rows,
5224 version,
5225 compression_strategy.as_ref(),
5226 )
5227 };
5228 }
5229
5230 if let DataType::Struct(fields) = &field.data_type()
5231 && fields.is_empty()
5232 {
5233 if has_repdef_info {
5234 return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5235 }
5236 return Self::encode_simple_all_null(column_idx, num_values, row_number);
5239 }
5240
5241 let data_block = DataBlock::from_arrays(&arrays, num_values);
5242
5243 if version.resolve() >= LanceFileVersion::V2_2
5244 && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5245 {
5246 log::debug!(
5247 "Encoding column {} with {} items ({} rows) using constant layout",
5248 column_idx,
5249 num_values,
5250 num_rows
5251 );
5252 return constant::encode_constant_page(
5253 column_idx,
5254 scalar,
5255 repdef,
5256 row_number,
5257 num_rows,
5258 );
5259 }
5260
5261 let requires_full_zip_packed_struct =
5262 if let DataBlock::Struct(ref struct_data_block) = data_block {
5263 struct_data_block.has_variable_width_child()
5264 } else {
5265 false
5266 };
5267
5268 if requires_full_zip_packed_struct {
5269 log::debug!(
5270 "Encoding column {} with {} items using full-zip packed struct layout",
5271 column_idx,
5272 num_values
5273 );
5274 return Self::encode_full_zip(
5275 column_idx,
5276 &field,
5277 compression_strategy.as_ref(),
5278 data_block,
5279 repdef,
5280 row_number,
5281 num_rows,
5282 );
5283 }
5284
5285 let too_sparse = Self::repdef_too_sparse_for_miniblock(&repdef, num_values);
5289
5290 if !too_sparse {
5291 if let DataBlock::Dictionary(dict) = data_block {
5292 log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
5293 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5294 indices_data_block.compute_stat();
5299 return Self::encode_miniblock(
5300 column_idx,
5301 &field,
5302 compression_strategy.as_ref(),
5303 indices_data_block,
5304 repdef,
5305 row_number,
5306 Some(dictionary_data_block),
5307 num_rows,
5308 support_large_chunk,
5309 );
5310 }
5311 } else {
5312 log::debug!(
5313 "Encoding column {} with {} items using full-zip layout \
5314 (rep/def too sparse for mini-block)",
5315 column_idx,
5316 num_values
5317 );
5318 }
5319
5320 {
5321 let dict_result = if too_sparse {
5324 None
5325 } else {
5326 Self::should_dictionary_encode(&data_block, &field, version)
5327 .and_then(|budget| {
5328 log::debug!(
5329 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5330 column_idx,
5331 num_values
5332 );
5333 dict::dictionary_encode(
5334 &data_block,
5335 budget.max_dict_entries,
5336 budget.max_encoded_size,
5337 )
5338 })
5339 };
5340
5341 if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5342 Self::encode_miniblock(
5343 column_idx,
5344 &field,
5345 compression_strategy.as_ref(),
5346 indices_data_block,
5347 repdef,
5348 row_number,
5349 Some(dictionary_data_block),
5350 num_rows,
5351 support_large_chunk,
5352 )
5353 } else if !too_sparse && Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5354 log::debug!(
5355 "Encoding column {} with {} items using mini-block layout",
5356 column_idx,
5357 num_values
5358 );
5359 Self::encode_miniblock(
5360 column_idx,
5361 &field,
5362 compression_strategy.as_ref(),
5363 data_block,
5364 repdef,
5365 row_number,
5366 None,
5367 num_rows,
5368 support_large_chunk,
5369 )
5370 } else if too_sparse || Self::prefers_fullzip(encoding_metadata.as_ref()) {
5371 log::debug!(
5372 "Encoding column {} with {} items using full-zip layout",
5373 column_idx,
5374 num_values
5375 );
5376 Self::encode_full_zip(
5377 column_idx,
5378 &field,
5379 compression_strategy.as_ref(),
5380 data_block,
5381 repdef,
5382 row_number,
5383 num_rows,
5384 )
5385 } else {
5386 Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5387 }
5388 }
5389 })
5390 .boxed();
5391 Ok(vec![task])
5392 }
5393
5394 fn extract_validity_buf(
5395 array: Arc<dyn Array>,
5396 repdef: &mut RepDefBuilder,
5397 keep_original_array: bool,
5398 ) -> Result<Arc<dyn Array>> {
5399 if let Some(validity) = array.nulls() {
5400 if keep_original_array {
5401 repdef.add_validity_bitmap(validity.clone());
5402 } else {
5403 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5404 }
5405 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5406 Ok(make_array(data_no_nulls))
5407 } else {
5408 repdef.add_no_null(array.len());
5409 Ok(array)
5410 }
5411 }
5412
5413 fn extract_validity(
5414 mut array: Arc<dyn Array>,
5415 repdef: &mut RepDefBuilder,
5416 keep_original_array: bool,
5417 ) -> Result<Arc<dyn Array>> {
5418 match array.data_type() {
5419 DataType::Null => {
5420 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5421 Ok(array)
5422 }
5423 DataType::Dictionary(_, _) => {
5424 array = dict::normalize_dict_nulls(array)?;
5425 Self::extract_validity_buf(array, repdef, keep_original_array)
5426 }
5427 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5436 }
5437 }
5438}
5439
5440impl FieldEncoder for PrimitiveStructuralEncoder {
5441 fn maybe_encode(
5443 &mut self,
5444 array: ArrayRef,
5445 _external_buffers: &mut OutOfLineBuffers,
5446 mut repdef: RepDefBuilder,
5447 row_number: u64,
5448 num_rows: u64,
5449 ) -> Result<Vec<EncodeTask>> {
5450 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5451 self.accumulated_repdefs.push(repdef);
5452
5453 if let Some((arrays, row_number, num_rows)) =
5454 self.accumulation_queue.insert(array, row_number, num_rows)
5455 {
5456 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5457 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5458 } else {
5459 Ok(vec![])
5460 }
5461 }
5462
5463 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5465 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5466 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5467 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5468 } else {
5469 Ok(vec![])
5470 }
5471 }
5472
5473 fn num_columns(&self) -> u32 {
5474 1
5475 }
5476
5477 fn finish(
5478 &mut self,
5479 _external_buffers: &mut OutOfLineBuffers,
5480 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5481 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5482 }
5483}
5484
5485#[cfg(test)]
5486#[allow(clippy::single_range_in_vec_init)]
5487mod tests {
5488 use super::{
5489 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5490 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5491 FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
5492 PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
5493 };
5494 use crate::buffer::LanceBuffer;
5495 use crate::compression::DefaultDecompressionStrategy;
5496 use crate::constants::{
5497 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5498 DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5499 STRUCTURAL_ENCODING_MINIBLOCK,
5500 };
5501 use crate::data::BlockInfo;
5502 use crate::decoder::PageEncoding;
5503 use crate::encodings::logical::primitive::{
5504 ChunkDrainInstructions, PrimitiveStructuralEncoder,
5505 };
5506 use crate::format::ProtobufUtils21;
5507 use crate::format::pb21;
5508 use crate::format::pb21::compressive_encoding::Compression;
5509 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5510 use crate::version::LanceFileVersion;
5511 use arrow_array::{ArrayRef, Int8Array, StringArray};
5512 use arrow_schema::DataType;
5513 use std::collections::HashMap;
5514 use std::{collections::VecDeque, sync::Arc};
5515
5516 #[test]
5517 fn test_is_narrow() {
5518 let int8_array = Int8Array::from(vec![1, 2, 3]);
5519 let array_ref: ArrayRef = Arc::new(int8_array);
5520 let block = DataBlock::from_array(array_ref);
5521
5522 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5523
5524 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5525 let block = DataBlock::from_array(string_array);
5526 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5527
5528 let string_array = StringArray::from(vec![
5529 Some("hello world".repeat(100)),
5530 Some("world".to_string()),
5531 ]);
5532 let block = DataBlock::from_array(string_array);
5533 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5534 }
5535
5536 #[test]
5537 fn test_map_range() {
5538 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5541 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5542 let max_visible_def = 0;
5543 let total_items = 8;
5544 let max_rep = 1;
5545
5546 let check = |range, expected_item_range, expected_level_range| {
5547 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5548 range,
5549 rep.as_ref(),
5550 def.as_ref(),
5551 max_rep,
5552 max_visible_def,
5553 total_items,
5554 PreambleAction::Absent,
5555 );
5556 assert_eq!(item_range, expected_item_range);
5557 assert_eq!(level_range, expected_level_range);
5558 };
5559
5560 check(0..1, 0..3, 0..3);
5561 check(1..2, 3..5, 3..5);
5562 check(2..3, 5..5, 5..6);
5563 check(3..4, 5..8, 6..9);
5564 check(0..2, 0..5, 0..5);
5565 check(1..3, 3..5, 3..6);
5566 check(2..4, 5..8, 5..9);
5567 check(0..3, 0..5, 0..6);
5568 check(1..4, 3..8, 3..9);
5569 check(0..4, 0..8, 0..9);
5570
5571 let rep = Some(vec![1, 1, 0, 1]);
5574 let def = Some(vec![1, 0, 0, 0]);
5575 let max_visible_def = 0;
5576 let total_items = 3;
5577
5578 let check = |range, expected_item_range, expected_level_range| {
5579 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5580 range,
5581 rep.as_ref(),
5582 def.as_ref(),
5583 max_rep,
5584 max_visible_def,
5585 total_items,
5586 PreambleAction::Absent,
5587 );
5588 assert_eq!(item_range, expected_item_range);
5589 assert_eq!(level_range, expected_level_range);
5590 };
5591
5592 check(0..1, 0..0, 0..1);
5593 check(1..2, 0..2, 1..3);
5594 check(2..3, 2..3, 3..4);
5595 check(0..2, 0..2, 0..3);
5596 check(1..3, 0..3, 1..4);
5597 check(0..3, 0..3, 0..4);
5598
5599 let rep = Some(vec![1, 1, 0, 1]);
5602 let def = Some(vec![0, 0, 0, 1]);
5603 let max_visible_def = 0;
5604 let total_items = 3;
5605
5606 let check = |range, expected_item_range, expected_level_range| {
5607 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5608 range,
5609 rep.as_ref(),
5610 def.as_ref(),
5611 max_rep,
5612 max_visible_def,
5613 total_items,
5614 PreambleAction::Absent,
5615 );
5616 assert_eq!(item_range, expected_item_range);
5617 assert_eq!(level_range, expected_level_range);
5618 };
5619
5620 check(0..1, 0..1, 0..1);
5621 check(1..2, 1..3, 1..3);
5622 check(2..3, 3..3, 3..4);
5623 check(0..2, 0..3, 0..3);
5624 check(1..3, 1..3, 1..4);
5625 check(0..3, 0..3, 0..4);
5626
5627 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5630 let def: Option<&[u16]> = None;
5631 let max_visible_def = 0;
5632 let total_items = 6;
5633
5634 let check = |range, expected_item_range, expected_level_range| {
5635 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5636 range,
5637 rep.as_ref(),
5638 def.as_ref(),
5639 max_rep,
5640 max_visible_def,
5641 total_items,
5642 PreambleAction::Absent,
5643 );
5644 assert_eq!(item_range, expected_item_range);
5645 assert_eq!(level_range, expected_level_range);
5646 };
5647
5648 check(0..1, 0..2, 0..2);
5649 check(1..2, 2..4, 2..4);
5650 check(2..3, 4..6, 4..6);
5651 check(0..2, 0..4, 0..4);
5652 check(1..3, 2..6, 2..6);
5653 check(0..3, 0..6, 0..6);
5654
5655 let rep: Option<&[u16]> = None;
5658 let def = Some(vec![0, 0, 1, 0]);
5659 let max_visible_def = 1;
5660 let total_items = 4;
5661
5662 let check = |range, expected_item_range, expected_level_range| {
5663 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5664 range,
5665 rep.as_ref(),
5666 def.as_ref(),
5667 max_rep,
5668 max_visible_def,
5669 total_items,
5670 PreambleAction::Absent,
5671 );
5672 assert_eq!(item_range, expected_item_range);
5673 assert_eq!(level_range, expected_level_range);
5674 };
5675
5676 check(0..1, 0..1, 0..1);
5677 check(1..2, 1..2, 1..2);
5678 check(2..3, 2..3, 2..3);
5679 check(0..2, 0..2, 0..2);
5680 check(1..3, 1..3, 1..3);
5681 check(0..3, 0..3, 0..3);
5682
5683 let rep = Some(vec![0, 1, 0, 1]);
5688 let def = Some(vec![0, 0, 0, 1]);
5689 let max_visible_def = 0;
5690 let total_items = 3;
5691
5692 let check = |range, expected_item_range, expected_level_range| {
5693 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5694 range,
5695 rep.as_ref(),
5696 def.as_ref(),
5697 max_rep,
5698 max_visible_def,
5699 total_items,
5700 PreambleAction::Take,
5701 );
5702 assert_eq!(item_range, expected_item_range);
5703 assert_eq!(level_range, expected_level_range);
5704 };
5705
5706 check(0..1, 0..3, 0..3);
5708 check(0..2, 0..3, 0..4);
5709
5710 let check = |range, expected_item_range, expected_level_range| {
5711 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5712 range,
5713 rep.as_ref(),
5714 def.as_ref(),
5715 max_rep,
5716 max_visible_def,
5717 total_items,
5718 PreambleAction::Skip,
5719 );
5720 assert_eq!(item_range, expected_item_range);
5721 assert_eq!(level_range, expected_level_range);
5722 };
5723
5724 check(0..1, 1..3, 1..3);
5725 check(1..2, 3..3, 3..4);
5726 check(0..2, 1..3, 1..4);
5727
5728 let rep = Some(vec![0, 1, 1, 0]);
5733 let def = Some(vec![0, 1, 0, 0]);
5734 let max_visible_def = 0;
5735 let total_items = 4;
5736
5737 let check = |range, expected_item_range, expected_level_range| {
5738 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5739 range,
5740 rep.as_ref(),
5741 def.as_ref(),
5742 max_rep,
5743 max_visible_def,
5744 total_items,
5745 PreambleAction::Take,
5746 );
5747 assert_eq!(item_range, expected_item_range);
5748 assert_eq!(level_range, expected_level_range);
5749 };
5750
5751 check(0..1, 0..1, 0..2);
5753 check(0..2, 0..3, 0..4);
5754
5755 let check = |range, expected_item_range, expected_level_range| {
5756 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5757 range,
5758 rep.as_ref(),
5759 def.as_ref(),
5760 max_rep,
5761 max_visible_def,
5762 total_items,
5763 PreambleAction::Skip,
5764 );
5765 assert_eq!(item_range, expected_item_range);
5766 assert_eq!(level_range, expected_level_range);
5767 };
5768
5769 check(0..1, 1..1, 1..2);
5771 check(1..2, 1..3, 2..4);
5772 check(0..2, 1..3, 1..4);
5773
5774 let rep = Some(vec![0, 1, 0, 1]);
5777 let def: Option<Vec<u16>> = None;
5778 let max_visible_def = 0;
5779 let total_items = 4;
5780
5781 let check = |range, expected_item_range, expected_level_range| {
5782 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5783 range,
5784 rep.as_ref(),
5785 def.as_ref(),
5786 max_rep,
5787 max_visible_def,
5788 total_items,
5789 PreambleAction::Take,
5790 );
5791 assert_eq!(item_range, expected_item_range);
5792 assert_eq!(level_range, expected_level_range);
5793 };
5794
5795 check(0..1, 0..3, 0..3);
5797 check(0..2, 0..4, 0..4);
5798
5799 let check = |range, expected_item_range, expected_level_range| {
5800 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5801 range,
5802 rep.as_ref(),
5803 def.as_ref(),
5804 max_rep,
5805 max_visible_def,
5806 total_items,
5807 PreambleAction::Skip,
5808 );
5809 assert_eq!(item_range, expected_item_range);
5810 assert_eq!(level_range, expected_level_range);
5811 };
5812
5813 check(0..1, 1..3, 1..3);
5814 check(1..2, 3..4, 3..4);
5815 check(0..2, 1..4, 1..4);
5816
5817 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
5821 let def = Some(vec![0, 1, 2, 0, 0, 0]);
5822 let max_rep = 2;
5823 let max_visible_def = 0;
5824 let total_items = 4;
5825
5826 let check = |range, expected_item_range, expected_level_range| {
5827 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5828 range,
5829 rep.as_ref(),
5830 def.as_ref(),
5831 max_rep,
5832 max_visible_def,
5833 total_items,
5834 PreambleAction::Absent,
5835 );
5836 assert_eq!(item_range, expected_item_range);
5837 assert_eq!(level_range, expected_level_range);
5838 };
5839
5840 check(0..3, 0..4, 0..6);
5841 check(0..1, 0..1, 0..2);
5842 check(1..2, 1..3, 2..5);
5843 check(2..3, 3..4, 5..6);
5844
5845 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5847 let def = Some(vec![0, 1, 0, 0, 0, 0]);
5848 let max_rep = 1;
5849 let max_visible_def = 0;
5850 let total_items = 5;
5851
5852 let check = |range, expected_item_range, expected_level_range| {
5853 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5854 range,
5855 rep.as_ref(),
5856 def.as_ref(),
5857 max_rep,
5858 max_visible_def,
5859 total_items,
5860 PreambleAction::Take,
5861 );
5862 assert_eq!(item_range, expected_item_range);
5863 assert_eq!(level_range, expected_level_range);
5864 };
5865
5866 check(0..0, 0..1, 0..2);
5867 check(0..1, 0..3, 0..4);
5868 check(0..2, 0..4, 0..5);
5869
5870 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5873 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5874 let max_rep = 1;
5875 let max_visible_def = 0;
5876 let total_items = 5;
5877
5878 let check = |range, expected_item_range, expected_level_range| {
5879 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5880 range,
5881 rep.as_ref(),
5882 def.as_ref(),
5883 max_rep,
5884 max_visible_def,
5885 total_items,
5886 PreambleAction::Skip,
5887 );
5888 assert_eq!(item_range, expected_item_range);
5889 assert_eq!(level_range, expected_level_range);
5890 };
5891
5892 check(2..3, 2..4, 5..7);
5893 }
5894
5895 #[test]
5896 fn test_slice_batch_data_and_rebase_offsets_u32() {
5897 let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
5898 let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
5899
5900 let (sliced_data, normalized_offsets) =
5901 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5902 .unwrap();
5903
5904 assert_eq!(sliced_data.as_ref(), b"6789ab");
5905 let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
5906 assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
5907 }
5908
5909 #[test]
5910 fn test_slice_batch_data_and_rebase_offsets_u64() {
5911 let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
5912 let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
5913
5914 let (sliced_data, normalized_offsets) =
5915 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
5916 .unwrap();
5917
5918 assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
5919 let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
5920 assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
5921 }
5922
5923 #[test]
5924 fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
5925 let data = LanceBuffer::copy_slice(b"abcd");
5926 let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
5927
5928 let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5929 .expect_err("offset end before start should error");
5930 assert!(err.to_string().contains("less than base"));
5931 }
5932
5933 #[test]
5934 fn test_schedule_instructions() {
5935 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5937 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5938 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5939
5940 let check = |user_ranges, expected_instructions| {
5941 let instructions =
5942 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5943 assert_eq!(instructions, expected_instructions);
5944 };
5945
5946 let expected_take_all = vec![
5948 ChunkInstructions {
5949 chunk_idx: 0,
5950 preamble: PreambleAction::Absent,
5951 rows_to_skip: 0,
5952 rows_to_take: 6,
5953 take_trailer: true,
5954 },
5955 ChunkInstructions {
5956 chunk_idx: 1,
5957 preamble: PreambleAction::Take,
5958 rows_to_skip: 0,
5959 rows_to_take: 2,
5960 take_trailer: false,
5961 },
5962 ChunkInstructions {
5963 chunk_idx: 2,
5964 preamble: PreambleAction::Absent,
5965 rows_to_skip: 0,
5966 rows_to_take: 5,
5967 take_trailer: true,
5968 },
5969 ChunkInstructions {
5970 chunk_idx: 3,
5971 preamble: PreambleAction::Take,
5972 rows_to_skip: 0,
5973 rows_to_take: 1,
5974 take_trailer: false,
5975 },
5976 ];
5977
5978 check(&[0..14], expected_take_all.clone());
5980
5981 check(
5983 &[
5984 0..1,
5985 1..2,
5986 2..3,
5987 3..4,
5988 4..5,
5989 5..6,
5990 6..7,
5991 7..8,
5992 8..9,
5993 9..10,
5994 10..11,
5995 11..12,
5996 12..13,
5997 13..14,
5998 ],
5999 expected_take_all,
6000 );
6001
6002 check(
6006 &[0..1, 3..4],
6007 vec![
6008 ChunkInstructions {
6009 chunk_idx: 0,
6010 preamble: PreambleAction::Absent,
6011 rows_to_skip: 0,
6012 rows_to_take: 1,
6013 take_trailer: false,
6014 },
6015 ChunkInstructions {
6016 chunk_idx: 0,
6017 preamble: PreambleAction::Absent,
6018 rows_to_skip: 3,
6019 rows_to_take: 1,
6020 take_trailer: false,
6021 },
6022 ],
6023 );
6024
6025 check(
6027 &[5..6],
6028 vec![
6029 ChunkInstructions {
6030 chunk_idx: 0,
6031 preamble: PreambleAction::Absent,
6032 rows_to_skip: 5,
6033 rows_to_take: 1,
6034 take_trailer: true,
6035 },
6036 ChunkInstructions {
6037 chunk_idx: 1,
6038 preamble: PreambleAction::Take,
6039 rows_to_skip: 0,
6040 rows_to_take: 0,
6041 take_trailer: false,
6042 },
6043 ],
6044 );
6045
6046 check(
6048 &[7..10],
6049 vec![
6050 ChunkInstructions {
6051 chunk_idx: 1,
6052 preamble: PreambleAction::Skip,
6053 rows_to_skip: 1,
6054 rows_to_take: 1,
6055 take_trailer: false,
6056 },
6057 ChunkInstructions {
6058 chunk_idx: 2,
6059 preamble: PreambleAction::Absent,
6060 rows_to_skip: 0,
6061 rows_to_take: 2,
6062 take_trailer: false,
6063 },
6064 ],
6065 );
6066 }
6067
6068 #[test]
6069 fn test_drain_instructions() {
6070 fn drain_from_instructions(
6071 instructions: &mut VecDeque<ChunkInstructions>,
6072 mut rows_desired: u64,
6073 need_preamble: &mut bool,
6074 skip_in_chunk: &mut u64,
6075 ) -> Vec<ChunkDrainInstructions> {
6076 let mut drain_instructions = Vec::with_capacity(instructions.len());
6078 while rows_desired > 0 || *need_preamble {
6079 let (next_instructions, consumed_chunk) = instructions
6080 .front()
6081 .unwrap()
6082 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
6083 if consumed_chunk {
6084 instructions.pop_front();
6085 }
6086 drain_instructions.push(next_instructions);
6087 }
6088 drain_instructions
6089 }
6090
6091 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
6093 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6094 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6095 let user_ranges = vec![1..7, 10..14];
6096
6097 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6099
6100 let mut to_drain = VecDeque::from(scheduled.clone());
6101
6102 let mut need_preamble = false;
6105 let mut skip_in_chunk = 0;
6106
6107 let next_batch =
6108 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6109
6110 assert!(!need_preamble);
6111 assert_eq!(skip_in_chunk, 4);
6112 assert_eq!(
6113 next_batch,
6114 vec![ChunkDrainInstructions {
6115 chunk_instructions: scheduled[0].clone(),
6116 rows_to_take: 4,
6117 rows_to_skip: 0,
6118 preamble_action: PreambleAction::Absent,
6119 }]
6120 );
6121
6122 let next_batch =
6123 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6124
6125 assert!(!need_preamble);
6126 assert_eq!(skip_in_chunk, 2);
6127
6128 assert_eq!(
6129 next_batch,
6130 vec![
6131 ChunkDrainInstructions {
6132 chunk_instructions: scheduled[0].clone(),
6133 rows_to_take: 1,
6134 rows_to_skip: 4,
6135 preamble_action: PreambleAction::Absent,
6136 },
6137 ChunkDrainInstructions {
6138 chunk_instructions: scheduled[1].clone(),
6139 rows_to_take: 1,
6140 rows_to_skip: 0,
6141 preamble_action: PreambleAction::Take,
6142 },
6143 ChunkDrainInstructions {
6144 chunk_instructions: scheduled[2].clone(),
6145 rows_to_take: 2,
6146 rows_to_skip: 0,
6147 preamble_action: PreambleAction::Absent,
6148 }
6149 ]
6150 );
6151
6152 let next_batch =
6153 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6154
6155 assert!(!need_preamble);
6156 assert_eq!(skip_in_chunk, 0);
6157
6158 assert_eq!(
6159 next_batch,
6160 vec![
6161 ChunkDrainInstructions {
6162 chunk_instructions: scheduled[2].clone(),
6163 rows_to_take: 1,
6164 rows_to_skip: 2,
6165 preamble_action: PreambleAction::Absent,
6166 },
6167 ChunkDrainInstructions {
6168 chunk_instructions: scheduled[3].clone(),
6169 rows_to_take: 1,
6170 rows_to_skip: 0,
6171 preamble_action: PreambleAction::Take,
6172 },
6173 ]
6174 );
6175
6176 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
6178 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6179 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6180 let user_ranges = vec![0..28];
6181
6182 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6184
6185 let mut to_drain = VecDeque::from(scheduled.clone());
6186
6187 let mut need_preamble = false;
6190 let mut skip_in_chunk = 0;
6191
6192 let next_batch =
6193 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6194
6195 assert_eq!(
6196 next_batch,
6197 vec![
6198 ChunkDrainInstructions {
6199 chunk_instructions: scheduled[0].clone(),
6200 rows_to_take: 6,
6201 rows_to_skip: 0,
6202 preamble_action: PreambleAction::Absent,
6203 },
6204 ChunkDrainInstructions {
6205 chunk_instructions: scheduled[1].clone(),
6206 rows_to_take: 1,
6207 rows_to_skip: 0,
6208 preamble_action: PreambleAction::Take,
6209 },
6210 ]
6211 );
6212
6213 assert!(!need_preamble);
6214 assert_eq!(skip_in_chunk, 1);
6215
6216 let next_batch =
6219 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6220
6221 assert_eq!(
6222 next_batch,
6223 vec![
6224 ChunkDrainInstructions {
6225 chunk_instructions: scheduled[1].clone(),
6226 rows_to_take: 2,
6227 rows_to_skip: 1,
6228 preamble_action: PreambleAction::Skip,
6229 },
6230 ChunkDrainInstructions {
6231 chunk_instructions: scheduled[2].clone(),
6232 rows_to_take: 0,
6233 rows_to_skip: 0,
6234 preamble_action: PreambleAction::Take,
6235 },
6236 ]
6237 );
6238
6239 assert!(!need_preamble);
6240 assert_eq!(skip_in_chunk, 0);
6241 }
6242
6243 #[tokio::test]
6244 async fn test_fullzip_initialize_is_lazy() {
6245 use futures::{FutureExt, future::BoxFuture};
6246 use std::ops::Range;
6247 use std::sync::Mutex;
6248
6249 #[derive(Debug, Clone)]
6250 struct RecordingScheduler {
6251 data: bytes::Bytes,
6252 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6253 }
6254
6255 impl RecordingScheduler {
6256 fn new(data: bytes::Bytes) -> Self {
6257 Self {
6258 data,
6259 requests: Arc::new(Mutex::new(Vec::new())),
6260 }
6261 }
6262
6263 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6264 self.requests.lock().unwrap().clone()
6265 }
6266 }
6267
6268 impl crate::EncodingsIo for RecordingScheduler {
6269 fn submit_request(
6270 &self,
6271 ranges: Vec<Range<u64>>,
6272 _priority: u64,
6273 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6274 self.requests.lock().unwrap().push(ranges.clone());
6275 let data = ranges
6276 .into_iter()
6277 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6278 .collect::<Vec<_>>();
6279 std::future::ready(Ok(data)).boxed()
6280 }
6281 }
6282
6283 #[derive(Debug)]
6284 struct TestFixedDecompressor;
6285
6286 impl FixedPerValueDecompressor for TestFixedDecompressor {
6287 fn decompress(
6288 &self,
6289 _data: FixedWidthDataBlock,
6290 _num_rows: u64,
6291 ) -> crate::Result<DataBlock> {
6292 unimplemented!("Test decompressor")
6293 }
6294
6295 fn bits_per_value(&self) -> u64 {
6296 32
6297 }
6298 }
6299
6300 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6301 0;
6302 16 * 1024
6303 ])));
6304 let mut scheduler = FullZipScheduler {
6305 data_buf_position: 0,
6306 data_buf_size: 4096,
6307 rep_index: Some(FullZipRepIndexDetails {
6308 buf_position: 1000,
6309 bytes_per_value: 4,
6310 }),
6311 priority: 0,
6312 rows_in_page: 100,
6313 bits_per_offset: 32,
6314 details: Arc::new(FullZipDecodeDetails {
6315 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6316 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6317 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6318 max_rep: 0,
6319 max_visible_def: 0,
6320 }),
6321 cached_state: None,
6322 enable_cache: false,
6323 };
6324
6325 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6326 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6327
6328 assert!(
6329 cached_data
6330 .as_arc_any()
6331 .downcast_ref::<super::NoCachedPageData>()
6332 .is_some(),
6333 "FullZip initialize should not eagerly load repetition index data"
6334 );
6335 assert!(scheduler.cached_state.is_none());
6336 assert!(
6337 io.requests().is_empty(),
6338 "FullZip initialize should not issue any I/O"
6339 );
6340 }
6341
6342 #[tokio::test]
6343 async fn test_fullzip_read_source_slices_prefetched_page() {
6344 let page_start = 200_u64;
6345 let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6346 let source = FullZipReadSource::PrefetchedPage {
6347 base_offset: page_start,
6348 data: page_data,
6349 };
6350 let ranges = vec![
6351 page_start..(page_start + 3),
6352 (page_start + 4)..(page_start + 8),
6353 ];
6354 let mut data = source.fetch(&ranges, 0).await.unwrap();
6355 assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6356 assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6357 }
6358
6359 #[tokio::test]
6360 async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6361 use futures::{FutureExt, future::BoxFuture};
6362 use std::ops::Range;
6363 use std::sync::Mutex;
6364
6365 #[derive(Debug, Clone)]
6366 struct RecordingScheduler {
6367 data: bytes::Bytes,
6368 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6369 }
6370
6371 impl RecordingScheduler {
6372 fn new(data: bytes::Bytes) -> Self {
6373 Self {
6374 data,
6375 requests: Arc::new(Mutex::new(Vec::new())),
6376 }
6377 }
6378
6379 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6380 self.requests.lock().unwrap().clone()
6381 }
6382 }
6383
6384 impl crate::EncodingsIo for RecordingScheduler {
6385 fn submit_request(
6386 &self,
6387 ranges: Vec<Range<u64>>,
6388 _priority: u64,
6389 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6390 self.requests.lock().unwrap().push(ranges.clone());
6391 let data = ranges
6392 .into_iter()
6393 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6394 .collect::<Vec<_>>();
6395 std::future::ready(Ok(data)).boxed()
6396 }
6397 }
6398
6399 #[derive(Debug)]
6400 struct TestFixedDecompressor;
6401
6402 impl FixedPerValueDecompressor for TestFixedDecompressor {
6403 fn decompress(
6404 &self,
6405 _data: FixedWidthDataBlock,
6406 _num_rows: u64,
6407 ) -> crate::Result<DataBlock> {
6408 unimplemented!("Test decompressor")
6409 }
6410
6411 fn bits_per_value(&self) -> u64 {
6412 32
6413 }
6414 }
6415
6416 let rows_in_page = 100_u64;
6417 let bytes_per_value = 4_u64;
6418 let rep_start = 1000_u64;
6419 let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6420 let mut data = vec![0_u8; 16 * 1024];
6421 data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6422 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6423
6424 let mut scheduler = FullZipScheduler {
6425 data_buf_position: 0,
6426 data_buf_size: 4096,
6427 rep_index: Some(FullZipRepIndexDetails {
6428 buf_position: rep_start,
6429 bytes_per_value,
6430 }),
6431 priority: 0,
6432 rows_in_page,
6433 bits_per_offset: 32,
6434 details: Arc::new(FullZipDecodeDetails {
6435 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6436 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6437 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6438 max_rep: 0,
6439 max_visible_def: 0,
6440 }),
6441 cached_state: None,
6442 enable_cache: true,
6443 };
6444
6445 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6446 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6447 assert!(
6448 cached_data
6449 .as_arc_any()
6450 .downcast_ref::<FullZipCacheableState>()
6451 .is_some()
6452 );
6453 assert!(scheduler.cached_state.is_some());
6454 assert_eq!(
6455 io.requests(),
6456 vec![vec![
6457 rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6458 ]]
6459 );
6460 }
6461
6462 #[tokio::test]
6463 async fn test_fullzip_full_page_bypasses_rep_index_io() {
6464 use futures::{FutureExt, future::BoxFuture};
6465 use std::ops::Range;
6466 use std::sync::Mutex;
6467
6468 #[derive(Debug, Clone)]
6469 struct RecordingScheduler {
6470 data: bytes::Bytes,
6471 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6472 }
6473
6474 impl RecordingScheduler {
6475 fn new(data: bytes::Bytes) -> Self {
6476 Self {
6477 data,
6478 requests: Arc::new(Mutex::new(Vec::new())),
6479 }
6480 }
6481
6482 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6483 self.requests.lock().unwrap().clone()
6484 }
6485 }
6486
6487 impl crate::EncodingsIo for RecordingScheduler {
6488 fn submit_request(
6489 &self,
6490 ranges: Vec<Range<u64>>,
6491 _priority: u64,
6492 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6493 self.requests.lock().unwrap().push(ranges.clone());
6494 let data = ranges
6495 .into_iter()
6496 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6497 .collect::<Vec<_>>();
6498 std::future::ready(Ok(data)).boxed()
6499 }
6500 }
6501
6502 #[derive(Debug)]
6503 struct TestFixedDecompressor;
6504
6505 impl FixedPerValueDecompressor for TestFixedDecompressor {
6506 fn decompress(
6507 &self,
6508 _data: FixedWidthDataBlock,
6509 _num_rows: u64,
6510 ) -> crate::Result<DataBlock> {
6511 unimplemented!("Test decompressor")
6512 }
6513
6514 fn bits_per_value(&self) -> u64 {
6515 32
6516 }
6517 }
6518
6519 let rows_in_page = 100_u64;
6520 let data_start = 256_u64;
6521 let data_size = 500_u64;
6522 let rep_start = 4096_u64;
6523 let bytes_per_value = 4_u64;
6524
6525 let mut bytes = vec![0_u8; 16 * 1024];
6526 for i in 0..=rows_in_page {
6527 let offset = (i * 5) as u32;
6528 let pos = rep_start as usize + (i * bytes_per_value) as usize;
6529 bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6530 }
6531 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6532
6533 let scheduler = FullZipScheduler {
6534 data_buf_position: data_start,
6535 data_buf_size: data_size,
6536 rep_index: Some(FullZipRepIndexDetails {
6537 buf_position: rep_start,
6538 bytes_per_value,
6539 }),
6540 priority: 0,
6541 rows_in_page,
6542 bits_per_offset: 32,
6543 details: Arc::new(FullZipDecodeDetails {
6544 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6545 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6546 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6547 max_rep: 0,
6548 max_visible_def: 0,
6549 }),
6550 cached_state: None,
6551 enable_cache: false,
6552 };
6553
6554 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6555 let tasks = scheduler
6556 .schedule_ranges_rep(
6557 &[0..rows_in_page],
6558 &io_dyn,
6559 FullZipRepIndexDetails {
6560 buf_position: rep_start,
6561 bytes_per_value,
6562 },
6563 )
6564 .unwrap();
6565
6566 let requests = io.requests();
6567 assert_eq!(requests.len(), 1);
6568 assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6569
6570 let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6571 let requests_after_await = io.requests();
6572 assert_eq!(
6573 requests_after_await.len(),
6574 1,
6575 "full page path should not issue rep-index I/O"
6576 );
6577 }
6578
6579 #[tokio::test]
6581 async fn test_fuzz_issue_4492_empty_rep_values() {
6582 use lance_datagen::{RowCount, Seed, array, gen_batch};
6583
6584 let seed = 1823859942947654717u64;
6585 let num_rows = 2741usize;
6586
6587 let batch_gen = gen_batch().with_seed(Seed::from(seed));
6589 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6590 let list_generator = array::rand_list_any(base_generator, false);
6591
6592 let batch = batch_gen
6593 .anon_col(list_generator)
6594 .into_batch_rows(RowCount::from(num_rows as u64))
6595 .unwrap();
6596
6597 let list_array = batch.column(0).clone();
6598
6599 let mut metadata = HashMap::new();
6601 metadata.insert(
6602 STRUCTURAL_ENCODING_META_KEY.to_string(),
6603 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6604 );
6605
6606 let test_cases = TestCases::default()
6607 .with_min_file_version(LanceFileVersion::V2_1)
6608 .with_batch_size(100)
6609 .with_range(0..num_rows.min(500) as u64)
6610 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6611
6612 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6613 }
6614
6615 async fn test_minichunk_size_helper(
6616 string_data: Vec<Option<String>>,
6617 minichunk_size: u64,
6618 file_version: LanceFileVersion,
6619 ) {
6620 use crate::constants::MINICHUNK_SIZE_META_KEY;
6621 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6622 use arrow_array::{ArrayRef, StringArray};
6623 use std::sync::Arc;
6624
6625 let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6626
6627 let mut metadata = HashMap::new();
6628 metadata.insert(
6629 MINICHUNK_SIZE_META_KEY.to_string(),
6630 minichunk_size.to_string(),
6631 );
6632 metadata.insert(
6633 STRUCTURAL_ENCODING_META_KEY.to_string(),
6634 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6635 );
6636
6637 let test_cases = TestCases::default()
6638 .with_min_file_version(file_version)
6639 .with_batch_size(1000);
6640
6641 check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6642 }
6643
6644 #[tokio::test]
6645 async fn test_minichunk_size_roundtrip() {
6646 let mut string_data = Vec::new();
6648 for i in 0..100 {
6649 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6650 }
6651 test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6653 }
6654
6655 #[tokio::test]
6656 async fn test_minichunk_size_128kb_v2_2() {
6657 let mut string_data = Vec::new();
6659 for i in 0..10000 {
6661 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6662 }
6663 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6664 }
6665
6666 #[tokio::test]
6667 async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6668 let mut string_data = Vec::new();
6669 for i in 0..10000 {
6671 string_data.push(Some(format!("t_{}", i)));
6672 }
6673 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6674 }
6675
6676 #[tokio::test]
6677 async fn test_large_dictionary_general_compression() {
6678 use arrow_array::{ArrayRef, StringArray};
6679 use std::collections::HashMap;
6680 use std::sync::Arc;
6681
6682 let unique_values: Vec<String> = (0..100)
6685 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
6686 .collect();
6687
6688 let repeated_strings: Vec<_> = unique_values
6690 .iter()
6691 .cycle()
6692 .take(100_000)
6693 .map(|s| Some(s.as_str()))
6694 .collect();
6695
6696 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
6697
6698 let test_cases = TestCases::default()
6700 .with_min_file_version(LanceFileVersion::V2_2)
6701 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
6702 assert_eq!(cols.len(), 1);
6703 let col = &cols[0];
6704
6705 if let Some(PageEncoding::Structural(page_layout)) =
6707 &col.final_pages.first().map(|p| &p.description)
6708 && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
6709 &page_layout.layout
6710 && let Some(dictionary_encoding) = &mini_block.dictionary
6711 {
6712 match dictionary_encoding.compression.as_ref() {
6713 Some(Compression::General(general)) => {
6714 let compression = general.compression.as_ref().unwrap();
6716 assert!(
6717 compression.scheme()
6718 == pb21::CompressionScheme::CompressionAlgorithmLz4
6719 || compression.scheme()
6720 == pb21::CompressionScheme::CompressionAlgorithmZstd,
6721 "Expected LZ4 or Zstd compression for large dictionary"
6722 );
6723 }
6724 _ => panic!("Expected General compression for large dictionary"),
6725 }
6726 }
6727 }));
6728
6729 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
6730 }
6731
6732 fn dictionary_encoding_from_page(
6733 page: &crate::encoder::EncodedPage,
6734 ) -> &crate::format::pb21::CompressiveEncoding {
6735 let PageEncoding::Structural(layout) = &page.description else {
6736 panic!("Expected structural page encoding");
6737 };
6738 let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
6739 else {
6740 panic!("Expected mini-block layout");
6741 };
6742 layout
6743 .dictionary
6744 .as_ref()
6745 .unwrap_or_else(|| panic!("Expected dictionary encoding"))
6746 }
6747
6748 async fn encode_variable_dict_page(
6749 metadata: HashMap<String, String>,
6750 ) -> crate::encoder::EncodedPage {
6751 use arrow_array::types::Int32Type;
6752 use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
6753
6754 let values = Arc::new(StringArray::from(
6755 (0..128)
6756 .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
6757 .collect::<Vec<_>>(),
6758 )) as ArrayRef;
6759 let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
6760 let dict_array =
6761 Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
6762
6763 let field = arrow_schema::Field::new(
6764 "dict_col",
6765 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
6766 false,
6767 )
6768 .with_metadata(metadata);
6769
6770 encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
6771 }
6772
6773 async fn encode_auto_fixed_dict_page(
6774 metadata: HashMap<String, String>,
6775 ) -> crate::encoder::EncodedPage {
6776 use arrow_array::{ArrayRef, Decimal128Array};
6777
6778 let values = (0..20_000)
6780 .map(|i| match i % 3 {
6781 0 => 10_i128,
6782 1 => 20_i128,
6783 _ => 30_i128,
6784 })
6785 .collect::<Vec<_>>();
6786 let decimal = Decimal128Array::from_iter_values(values)
6787 .with_precision_and_scale(38, 0)
6788 .unwrap();
6789 let decimal = Arc::new(decimal) as ArrayRef;
6790
6791 let mut field_metadata = metadata;
6792 field_metadata.insert(
6794 "lance-encoding:dict-size-ratio".to_string(),
6795 "0.99".to_string(),
6796 );
6797 let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
6798 .with_metadata(field_metadata);
6799
6800 encode_first_page(field, decimal, LanceFileVersion::V2_2).await
6801 }
6802
6803 #[tokio::test]
6804 async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
6805 let page = encode_variable_dict_page(HashMap::new()).await;
6806 let dictionary_encoding = dictionary_encoding_from_page(&page);
6807 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6808 panic!("Expected General compression for dictionary values");
6809 };
6810 let compression = general.compression.as_ref().unwrap();
6811 assert_eq!(
6812 compression.scheme(),
6813 pb21::CompressionScheme::CompressionAlgorithmLz4
6814 );
6815 }
6816
6817 #[tokio::test]
6818 async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
6819 let page = encode_auto_fixed_dict_page(HashMap::new()).await;
6820 let dictionary_encoding = dictionary_encoding_from_page(&page);
6821 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6822 panic!("Expected General compression for dictionary values");
6823 };
6824 let compression = general.compression.as_ref().unwrap();
6825 assert_eq!(
6826 compression.scheme(),
6827 pb21::CompressionScheme::CompressionAlgorithmLz4
6828 );
6829 }
6830
6831 #[tokio::test]
6832 async fn test_dict_values_general_compression_zstd() {
6833 let mut metadata = HashMap::new();
6834 metadata.insert(
6835 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6836 "zstd".to_string(),
6837 );
6838 let page = encode_variable_dict_page(metadata).await;
6839 let dictionary_encoding = dictionary_encoding_from_page(&page);
6840 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6841 panic!("Expected General compression for dictionary values");
6842 };
6843 let compression = general.compression.as_ref().unwrap();
6844 assert_eq!(
6845 compression.scheme(),
6846 pb21::CompressionScheme::CompressionAlgorithmZstd
6847 );
6848 }
6849
6850 #[tokio::test]
6851 async fn test_dict_values_general_compression_none() {
6852 let mut metadata = HashMap::new();
6853 metadata.insert(
6854 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6855 "none".to_string(),
6856 );
6857 let page = encode_variable_dict_page(metadata).await;
6858 let dictionary_encoding = dictionary_encoding_from_page(&page);
6859 assert!(
6860 !matches!(
6861 dictionary_encoding.compression.as_ref(),
6862 Some(Compression::General(_))
6863 ),
6864 "Expected dictionary values to avoid General compression"
6865 );
6866 }
6867
6868 #[test]
6869 fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
6870 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6871 &HashMap::new(),
6872 None,
6873 None,
6874 );
6875 assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
6876 assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
6877 }
6878
6879 #[test]
6880 fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
6881 let field_metadata = HashMap::from([
6882 (
6883 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6884 "none".to_string(),
6885 ),
6886 (
6887 DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
6888 "7".to_string(),
6889 ),
6890 ]);
6891 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6892 &field_metadata,
6893 Some("zstd".to_string()),
6894 Some("3".to_string()),
6895 );
6896 assert_eq!(
6897 metadata.get(COMPRESSION_META_KEY),
6898 Some(&"none".to_string()),
6899 );
6900 assert_eq!(
6901 metadata.get(COMPRESSION_LEVEL_META_KEY),
6902 Some(&"7".to_string()),
6903 );
6904 }
6905
6906 #[test]
6907 fn test_resolve_dict_values_compression_metadata_env_fallback() {
6908 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6909 &HashMap::new(),
6910 Some("zstd".to_string()),
6911 Some("9".to_string()),
6912 );
6913 assert_eq!(
6914 metadata.get(COMPRESSION_META_KEY),
6915 Some(&"zstd".to_string()),
6916 );
6917 assert_eq!(
6918 metadata.get(COMPRESSION_LEVEL_META_KEY),
6919 Some(&"9".to_string()),
6920 );
6921 }
6922
6923 #[tokio::test]
6924 async fn test_dictionary_encode_int64() {
6925 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6926 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6927 use crate::version::LanceFileVersion;
6928 use arrow_array::{ArrayRef, Int64Array};
6929 use std::collections::HashMap;
6930 use std::sync::Arc;
6931
6932 let values = (0..1000)
6934 .map(|i| match i % 3 {
6935 0 => 10i64,
6936 1 => 20i64,
6937 _ => 30i64,
6938 })
6939 .collect::<Vec<_>>();
6940 let array = Arc::new(Int64Array::from(values)) as ArrayRef;
6941
6942 let mut metadata = HashMap::new();
6943 metadata.insert(
6944 STRUCTURAL_ENCODING_META_KEY.to_string(),
6945 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6946 );
6947 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6948
6949 let test_cases = TestCases::default()
6950 .with_min_file_version(LanceFileVersion::V2_2)
6951 .with_batch_size(1000)
6952 .with_range(0..1000)
6953 .with_indices(vec![0, 1, 10, 999])
6954 .with_expected_encoding("dictionary");
6955
6956 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6957 }
6958
6959 #[tokio::test]
6960 async fn test_dictionary_encode_float64() {
6961 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6962 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6963 use crate::version::LanceFileVersion;
6964 use arrow_array::{ArrayRef, Float64Array};
6965 use std::collections::HashMap;
6966 use std::sync::Arc;
6967
6968 let values = (0..1000)
6970 .map(|i| match i % 3 {
6971 0 => 0.1f64,
6972 1 => 0.2f64,
6973 _ => 0.3f64,
6974 })
6975 .collect::<Vec<_>>();
6976 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
6977
6978 let mut metadata = HashMap::new();
6979 metadata.insert(
6980 STRUCTURAL_ENCODING_META_KEY.to_string(),
6981 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6982 );
6983 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6984
6985 let test_cases = TestCases::default()
6986 .with_min_file_version(LanceFileVersion::V2_2)
6987 .with_batch_size(1000)
6988 .with_range(0..1000)
6989 .with_indices(vec![0, 1, 10, 999])
6990 .with_expected_encoding("dictionary");
6991
6992 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6993 }
6994
6995 #[test]
6996 fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
6997 let rows = 10_000;
6998 let unique_values = 2_000;
6999
7000 let dictionary_encoding =
7001 ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
7002 let layout = pb21::MiniBlockLayout {
7003 rep_compression: None,
7004 def_compression: None,
7005 value_compression: Some(ProtobufUtils21::flat(64, None)),
7006 dictionary: Some(dictionary_encoding),
7007 num_dictionary_items: unique_values,
7008 layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
7009 num_buffers: 1,
7010 repetition_index_depth: 0,
7011 num_items: rows,
7012 has_large_chunk: false,
7013 };
7014
7015 let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
7016 let scheduler = super::MiniBlockScheduler::try_new(
7017 &buffer_offsets_and_sizes,
7018 0,
7019 rows,
7020 &layout,
7021 &DefaultDecompressionStrategy::default(),
7022 )
7023 .unwrap();
7024
7025 let dictionary = scheduler.dictionary.unwrap();
7026 assert_eq!(dictionary.num_dictionary_items, unique_values);
7027 assert_eq!(
7028 dictionary.dictionary_data_alignment,
7029 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
7030 );
7031 }
7032
7033 fn create_test_fixed_data_block(
7035 num_values: u64,
7036 cardinality: u64,
7037 bits_per_value: u64,
7038 ) -> DataBlock {
7039 assert!(cardinality > 0);
7040 assert!(cardinality <= num_values);
7041 let block_info = BlockInfo::default();
7042
7043 assert_eq!(bits_per_value % 8, 0);
7044 let data = match bits_per_value {
7045 32 => {
7046 let values = (0..num_values)
7047 .map(|i| (i % cardinality) as u32)
7048 .collect::<Vec<_>>();
7049 crate::buffer::LanceBuffer::reinterpret_vec(values)
7050 }
7051 64 => {
7052 let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
7053 crate::buffer::LanceBuffer::reinterpret_vec(values)
7054 }
7055 128 => {
7056 let values = (0..num_values)
7057 .map(|i| (i % cardinality) as u128)
7058 .collect::<Vec<_>>();
7059 crate::buffer::LanceBuffer::reinterpret_vec(values)
7060 }
7061 _ => unreachable!(),
7062 };
7063 DataBlock::FixedWidth(FixedWidthDataBlock {
7064 bits_per_value,
7065 data,
7066 num_values,
7067 block_info,
7068 })
7069 }
7070
7071 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
7073 use arrow_array::StringArray;
7074
7075 assert!(cardinality <= num_values && cardinality > 0);
7076
7077 let mut values = Vec::with_capacity(num_values as usize);
7078 for i in 0..num_values {
7079 values.push(format!("value_{:016}", i % cardinality));
7080 }
7081
7082 let array = StringArray::from(values);
7083 DataBlock::from_array(Arc::new(array) as ArrayRef)
7084 }
7085
7086 #[test]
7087 fn test_should_dictionary_encode() {
7088 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7089 use lance_core::datatypes::Field as LanceField;
7090
7091 let block = create_test_variable_width_block(1000, 10);
7093
7094 let mut metadata = HashMap::new();
7095 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7096 let arrow_field =
7097 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7098 let field = LanceField::try_from(&arrow_field).unwrap();
7099
7100 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7101 &block,
7102 &field,
7103 LanceFileVersion::V2_1,
7104 );
7105
7106 assert!(
7107 result.is_some(),
7108 "Should use dictionary encode based on size"
7109 );
7110 }
7111
7112 #[test]
7113 fn test_should_not_dictionary_encode_unsupported_bits() {
7114 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7115 use lance_core::datatypes::Field as LanceField;
7116
7117 let block = create_test_fixed_data_block(1000, 1000, 32);
7118
7119 let mut metadata = HashMap::new();
7120 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7121 let arrow_field =
7122 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
7123 let field = LanceField::try_from(&arrow_field).unwrap();
7124
7125 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7126 &block,
7127 &field,
7128 LanceFileVersion::V2_1,
7129 );
7130
7131 assert!(
7132 result.is_none(),
7133 "Should not use dictionary encode for unsupported bit width"
7134 );
7135 }
7136
7137 #[test]
7138 fn test_should_not_dictionary_encode_near_unique_sample() {
7139 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7140 use lance_core::datatypes::Field as LanceField;
7141
7142 let num_values = 5000;
7143 let block = create_test_variable_width_block(num_values, num_values);
7144
7145 let mut metadata = HashMap::new();
7146 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
7147 let arrow_field =
7148 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7149 let field = LanceField::try_from(&arrow_field).unwrap();
7150
7151 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7152 &block,
7153 &field,
7154 LanceFileVersion::V2_1,
7155 );
7156
7157 assert!(
7158 result.is_none(),
7159 "Should not probe dictionary encoding for near-unique data"
7160 );
7161 }
7162
7163 async fn encode_first_page(
7164 field: arrow_schema::Field,
7165 array: ArrayRef,
7166 version: LanceFileVersion,
7167 ) -> crate::encoder::EncodedPage {
7168 use crate::encoder::{
7169 ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
7170 default_encoding_strategy,
7171 };
7172 use crate::repdef::RepDefBuilder;
7173
7174 let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
7175 let encoding_strategy = default_encoding_strategy(version);
7176 let mut column_index_seq = ColumnIndexSequence::default();
7177 let encoding_options = EncodingOptions {
7178 cache_bytes_per_column: 1,
7179 max_page_bytes: 32 * 1024 * 1024,
7180 keep_original_array: true,
7181 buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
7182 version,
7183 };
7184
7185 let mut encoder = encoding_strategy
7186 .create_field_encoder(
7187 encoding_strategy.as_ref(),
7188 &lance_field,
7189 &mut column_index_seq,
7190 &encoding_options,
7191 )
7192 .unwrap();
7193
7194 let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7195 let repdef = RepDefBuilder::default();
7196 let num_rows = array.len() as u64;
7197 let mut pages = Vec::new();
7198 for task in encoder
7199 .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7200 .unwrap()
7201 {
7202 pages.push(task.await.unwrap());
7203 }
7204 for task in encoder.flush(&mut external_buffers).unwrap() {
7205 pages.push(task.await.unwrap());
7206 }
7207 pages.into_iter().next().unwrap()
7208 }
7209
7210 #[tokio::test]
7211 async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7212 use crate::format::pb21::page_layout::Layout;
7213
7214 let val = vec![0xABu8; 33];
7215 let arr: ArrayRef = Arc::new(
7216 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7217 std::iter::repeat_n(Some(val.as_slice()), 256),
7218 33,
7219 )
7220 .unwrap(),
7221 );
7222 let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7223 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7224
7225 let PageEncoding::Structural(layout) = &page.description else {
7226 panic!("Expected structural encoding");
7227 };
7228 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7229 panic!("Expected constant layout in slot 2");
7230 };
7231 assert!(layout.inline_value.is_none());
7232 assert_eq!(page.data.len(), 1);
7233
7234 let test_cases = TestCases::default()
7235 .with_min_file_version(LanceFileVersion::V2_2)
7236 .with_max_file_version(LanceFileVersion::V2_2)
7237 .with_page_sizes(vec![4096]);
7238 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7239 }
7240
7241 #[tokio::test]
7242 async fn test_constant_layout_out_of_line_utf8_v2_2() {
7243 use crate::format::pb21::page_layout::Layout;
7244
7245 let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7246 std::iter::repeat_n("hello", 512),
7247 ));
7248 let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7249 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7250
7251 let PageEncoding::Structural(layout) = &page.description else {
7252 panic!("Expected structural encoding");
7253 };
7254 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7255 panic!("Expected constant layout in slot 2");
7256 };
7257 assert!(layout.inline_value.is_none());
7258 assert_eq!(page.data.len(), 1);
7259
7260 let test_cases = TestCases::default()
7261 .with_min_file_version(LanceFileVersion::V2_2)
7262 .with_max_file_version(LanceFileVersion::V2_2)
7263 .with_page_sizes(vec![4096]);
7264 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7265 }
7266
7267 #[tokio::test]
7268 async fn test_constant_layout_nullable_item_v2_2() {
7269 use crate::format::pb21::page_layout::Layout;
7270
7271 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7272 Some(7),
7273 None,
7274 Some(7),
7275 None,
7276 Some(7),
7277 ]));
7278 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7279 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7280
7281 let PageEncoding::Structural(layout) = &page.description else {
7282 panic!("Expected structural encoding");
7283 };
7284 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7285 panic!("Expected constant layout in slot 2");
7286 };
7287 assert!(layout.inline_value.is_some());
7288 assert_eq!(page.data.len(), 2);
7289
7290 let test_cases = TestCases::default()
7291 .with_min_file_version(LanceFileVersion::V2_2)
7292 .with_max_file_version(LanceFileVersion::V2_2)
7293 .with_page_sizes(vec![4096]);
7294 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7295 }
7296
7297 #[tokio::test]
7298 async fn test_constant_layout_list_repdef_v2_2() {
7299 use crate::format::pb21::page_layout::Layout;
7300 use arrow_array::builder::{Int32Builder, ListBuilder};
7301
7302 let mut builder = ListBuilder::new(Int32Builder::new());
7303 builder.values().append_value(7);
7304 builder.values().append_null();
7305 builder.values().append_value(7);
7306 builder.append(true);
7307
7308 builder.append(true);
7309
7310 builder.values().append_value(7);
7311 builder.append(true);
7312
7313 builder.append_null();
7314
7315 let arr: ArrayRef = Arc::new(builder.finish());
7316 let field = arrow_schema::Field::new(
7317 "c",
7318 DataType::List(Arc::new(arrow_schema::Field::new(
7319 "item",
7320 DataType::Int32,
7321 true,
7322 ))),
7323 true,
7324 );
7325 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7326
7327 let PageEncoding::Structural(layout) = &page.description else {
7328 panic!("Expected structural encoding");
7329 };
7330 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7331 panic!("Expected constant layout in slot 2");
7332 };
7333 assert!(layout.inline_value.is_some());
7334 assert_eq!(page.data.len(), 2);
7335
7336 let test_cases = TestCases::default()
7337 .with_min_file_version(LanceFileVersion::V2_2)
7338 .with_max_file_version(LanceFileVersion::V2_2)
7339 .with_page_sizes(vec![4096]);
7340 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7341 }
7342
7343 #[tokio::test]
7344 async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7345 use crate::format::pb21::page_layout::Layout;
7346 use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7347
7348 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7349 for _ in 0..64 {
7350 builder.values().append_value(1);
7351 builder.values().append_null();
7352 builder.values().append_value(3);
7353 builder.append(true);
7354 }
7355 let arr: ArrayRef = Arc::new(builder.finish());
7356 let field = arrow_schema::Field::new(
7357 "c",
7358 DataType::FixedSizeList(
7359 Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7360 3,
7361 ),
7362 true,
7363 );
7364 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7365
7366 if let PageEncoding::Structural(layout) = &page.description {
7367 assert!(
7368 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7369 "FixedSizeList should not use constant layout yet"
7370 );
7371 }
7372
7373 let test_cases = TestCases::default()
7374 .with_min_file_version(LanceFileVersion::V2_2)
7375 .with_max_file_version(LanceFileVersion::V2_2)
7376 .with_page_sizes(vec![4096]);
7377 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7378 }
7379
7380 #[tokio::test]
7381 async fn test_constant_layout_not_written_before_v2_2() {
7382 use crate::format::pb21::page_layout::Layout;
7383
7384 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7385 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7386 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7387
7388 let PageEncoding::Structural(layout) = &page.description else {
7389 return;
7390 };
7391 assert!(
7392 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7393 "Should not emit constant layout before v2.2"
7394 );
7395
7396 let test_cases = TestCases::default()
7397 .with_min_file_version(LanceFileVersion::V2_1)
7398 .with_max_file_version(LanceFileVersion::V2_1)
7399 .with_page_sizes(vec![4096]);
7400 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7401 }
7402
7403 #[tokio::test]
7404 async fn test_all_null_constant_layout_still_works_v2_2() {
7405 use crate::format::pb21::page_layout::Layout;
7406
7407 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7408 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7409 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7410
7411 let PageEncoding::Structural(layout) = &page.description else {
7412 panic!("Expected structural encoding");
7413 };
7414 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7415 panic!("Expected layout in slot 2");
7416 };
7417 assert!(layout.inline_value.is_none());
7418 assert_eq!(page.data.len(), 0);
7419
7420 let test_cases = TestCases::default()
7421 .with_min_file_version(LanceFileVersion::V2_2)
7422 .with_max_file_version(LanceFileVersion::V2_2)
7423 .with_page_sizes(vec![4096]);
7424 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7425 }
7426
7427 #[test]
7428 fn test_encode_decode_complex_all_null_vals_roundtrip() {
7429 use crate::compression::{
7430 DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7431 };
7432
7433 let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7434
7435 let compression_strategy = DefaultCompressionStrategy::default();
7436 let decompression_strategy = DefaultDecompressionStrategy::default();
7437
7438 let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7439 &values,
7440 &compression_strategy,
7441 )
7442 .unwrap();
7443
7444 let decompressor = decompression_strategy
7445 .create_block_decompressor(&encoding)
7446 .unwrap();
7447 let decompressed = decompressor
7448 .decompress(compressed_buf, values.len() as u64)
7449 .unwrap();
7450 let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7451 assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7452 assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7453 let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7454 assert_eq!(rep_result.as_ref(), values.as_ref());
7455 }
7456
7457 #[tokio::test]
7458 async fn test_complex_all_null_compression_gated_by_version() {
7459 use crate::format::pb21::page_layout::Layout;
7460 use arrow_array::ListArray;
7461
7462 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7463 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7464 );
7465 let arr: ArrayRef = Arc::new(list_array);
7466 let field = arrow_schema::Field::new(
7467 "c",
7468 DataType::List(Arc::new(arrow_schema::Field::new(
7469 "item",
7470 DataType::Int32,
7471 true,
7472 ))),
7473 true,
7474 );
7475
7476 let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7477 let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7478 panic!("Expected structural encoding");
7479 };
7480 let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7481 panic!("Expected constant layout");
7482 };
7483 assert!(layout_v21.rep_compression.is_none());
7484 assert!(layout_v21.def_compression.is_none());
7485 assert_eq!(layout_v21.num_rep_values, 0);
7486 assert_eq!(layout_v21.num_def_values, 0);
7487
7488 let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7489 let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7490 panic!("Expected structural encoding");
7491 };
7492 let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7493 panic!("Expected constant layout");
7494 };
7495 assert!(layout_v22.def_compression.is_some());
7496 assert!(layout_v22.num_def_values > 0);
7497 }
7498
7499 #[tokio::test]
7500 async fn test_complex_all_null_round_trip() {
7501 use arrow_array::ListArray;
7502
7503 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7504 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7505 );
7506
7507 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7508 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7509 .await;
7510 }
7511
7512 #[tokio::test]
7514 async fn test_sparse_boolean_list_roundtrip() {
7515 use arrow_array::builder::{BooleanBuilder, ListBuilder};
7516
7517 let mut list_builder = ListBuilder::new(BooleanBuilder::new());
7518 for i in 0..1000i32 {
7519 if i % 64 == 0 {
7520 list_builder.values().append_value(i % 128 == 0);
7522 list_builder.append(true);
7523 } else {
7524 list_builder.append(false);
7525 }
7526 }
7527 let list_array = Arc::new(list_builder.finish());
7528
7529 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
7530 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
7531 }
7532}