1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 pb21::{self, compressive_encoding::Compression, CompressiveEncoding, PageLayout},
23 ProtobufUtils21,
24 },
25};
26use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
27use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
30use itertools::Itertools;
31use lance_arrow::deepcopy::deep_copy_nulls;
32use lance_core::{
33 cache::{CacheKey, Context, DeepSizeOf},
34 error::{Error, LanceOptionExt},
35 utils::bit::pad_bytes,
36};
37use log::trace;
38use snafu::location;
39
40use crate::{
41 compression::{
42 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
43 },
44 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
45 utils::bytepack::BytepackedIntegerEncoder,
46};
47use crate::{
48 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
49 encodings::logical::primitive::fullzip::PerValueDataBlock,
50};
51use crate::{
52 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
53};
54use crate::{
55 encodings::logical::primitive::miniblock::MiniBlockCompressed,
56 statistics::{ComputeStat, GetStat, Stat},
57};
58use crate::{
59 repdef::{
60 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
61 ControlWordParser, DefinitionInterpretation, RepDefSlicer,
62 },
63 utils::accumulation::AccumulationQueue,
64};
65use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
66
67use crate::constants::DICT_SIZE_RATIO_META_KEY;
68use crate::encodings::logical::primitive::dict::{
69 DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
70};
71use crate::{
72 buffer::LanceBuffer,
73 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
74 decoder::{
75 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
76 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
77 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
78 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
79 },
80 encoder::{
81 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
82 },
83 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
84 EncodingsIo,
85};
86
87pub mod blob;
88pub mod dict;
89pub mod fullzip;
90pub mod miniblock;
91
92const FILL_BYTE: u8 = 0xFE;
93
94struct PageLoadTask {
95 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
96 num_rows: u64,
97}
98
99trait StructuralPageScheduler: std::fmt::Debug + Send {
102 fn initialize<'a>(
104 &'a mut self,
105 io: &Arc<dyn EncodingsIo>,
106 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
107 fn load(&mut self, data: &Arc<dyn CachedPageData>);
109 fn schedule_ranges(
118 &self,
119 ranges: &[Range<u64>],
120 io: &Arc<dyn EncodingsIo>,
121 ) -> Result<Vec<PageLoadTask>>;
122}
123
124#[derive(Debug)]
126struct ChunkMeta {
127 num_values: u64,
128 chunk_size_bytes: u64,
129 offset_bytes: u64,
130}
131
132#[derive(Debug)]
134struct DecodedMiniBlockChunk {
135 rep: Option<ScalarBuffer<u16>>,
136 def: Option<ScalarBuffer<u16>>,
137 values: DataBlock,
138}
139
140#[derive(Debug)]
148struct DecodeMiniBlockTask {
149 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
150 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
151 value_decompressor: Arc<dyn MiniBlockDecompressor>,
152 dictionary_data: Option<Arc<DataBlock>>,
153 def_meaning: Arc<[DefinitionInterpretation]>,
154 num_buffers: u64,
155 max_visible_level: u16,
156 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
157}
158
159impl DecodeMiniBlockTask {
160 fn decode_levels(
161 rep_decompressor: &dyn BlockDecompressor,
162 levels: LanceBuffer,
163 num_levels: u16,
164 ) -> Result<ScalarBuffer<u16>> {
165 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
166 let rep = rep.as_fixed_width().unwrap();
167 debug_assert_eq!(rep.num_values, num_levels as u64);
168 debug_assert_eq!(rep.bits_per_value, 16);
169 Ok(rep.data.borrow_to_typed_slice::<u16>())
170 }
171
172 fn extend_levels(
179 range: Range<u64>,
180 levels: &mut Option<LevelBuffer>,
181 level_buf: &Option<impl AsRef<[u16]>>,
182 dest_offset: usize,
183 ) {
184 if let Some(level_buf) = level_buf {
185 if levels.is_none() {
186 let mut new_levels_vec =
189 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
190 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
191 *levels = Some(new_levels_vec);
192 }
193 levels.as_mut().unwrap().extend(
194 level_buf.as_ref()[range.start as usize..range.end as usize]
195 .iter()
196 .copied(),
197 );
198 } else if let Some(levels) = levels {
199 let num_values = (range.end - range.start) as usize;
200 levels.extend(iter::repeat_n(0, num_values));
203 }
204 }
205
206 fn map_range(
243 range: Range<u64>,
244 rep: Option<&impl AsRef<[u16]>>,
245 def: Option<&impl AsRef<[u16]>>,
246 max_rep: u16,
247 max_visible_def: u16,
248 total_items: u64,
251 preamble_action: PreambleAction,
252 ) -> (Range<u64>, Range<u64>) {
253 if let Some(rep) = rep {
254 let mut rep = rep.as_ref();
255 let mut items_in_preamble = 0_u64;
258 let first_row_start = match preamble_action {
259 PreambleAction::Skip | PreambleAction::Take => {
260 let first_row_start = if let Some(def) = def.as_ref() {
261 let mut first_row_start = None;
262 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
263 if *rep == max_rep {
264 first_row_start = Some(idx as u64);
265 break;
266 }
267 if *def <= max_visible_def {
268 items_in_preamble += 1;
269 }
270 }
271 first_row_start
272 } else {
273 let first_row_start =
274 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
275 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
276 first_row_start
277 };
278 if first_row_start.is_none() {
281 assert!(preamble_action == PreambleAction::Take);
282 return (0..total_items, 0..rep.len() as u64);
283 }
284 let first_row_start = first_row_start.unwrap();
285 rep = &rep[first_row_start as usize..];
286 first_row_start
287 }
288 PreambleAction::Absent => {
289 debug_assert!(rep[0] == max_rep);
290 0
291 }
292 };
293
294 if range.start == range.end {
296 debug_assert!(preamble_action == PreambleAction::Take);
297 debug_assert!(items_in_preamble <= total_items);
298 return (0..items_in_preamble, 0..first_row_start);
299 }
300 assert!(range.start < range.end);
301
302 let mut rows_seen = 0;
303 let mut new_start = 0;
304 let mut new_levels_start = 0;
305
306 if let Some(def) = def {
307 let def = &def.as_ref()[first_row_start as usize..];
308
309 let mut lead_invis_seen = 0;
311
312 if range.start > 0 {
313 if def[0] > max_visible_def {
314 lead_invis_seen += 1;
315 }
316 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
317 if *rep == max_rep {
318 rows_seen += 1;
319 if rows_seen == range.start {
320 new_start = idx as u64 + 1 - lead_invis_seen;
321 new_levels_start = idx as u64 + 1;
322 break;
323 }
324 }
325 if *def > max_visible_def {
326 lead_invis_seen += 1;
327 }
328 }
329 }
330
331 rows_seen += 1;
332
333 let mut new_end = u64::MAX;
334 let mut new_levels_end = rep.len() as u64;
335 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
336 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
337 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
338 .iter()
339 .zip(&def[(new_levels_start + 1) as usize..])
340 .enumerate()
341 {
342 if *rep == max_rep {
343 rows_seen += 1;
344 if rows_seen == range.end + 1 {
345 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
346 new_levels_end = idx as u64 + new_levels_start + 1;
347 break;
348 }
349 }
350 if *def > max_visible_def {
351 tail_invis_seen += 1;
352 }
353 }
354
355 if new_end == u64::MAX {
356 new_levels_end = rep.len() as u64;
357 let total_invis_seen = lead_invis_seen + tail_invis_seen;
358 new_end = rep.len() as u64 - total_invis_seen;
359 }
360
361 assert_ne!(new_end, u64::MAX);
362
363 if preamble_action == PreambleAction::Skip {
365 new_start += items_in_preamble;
366 new_end += items_in_preamble;
367 new_levels_start += first_row_start;
368 new_levels_end += first_row_start;
369 } else if preamble_action == PreambleAction::Take {
370 debug_assert_eq!(new_start, 0);
371 debug_assert_eq!(new_levels_start, 0);
372 new_end += items_in_preamble;
373 new_levels_end += first_row_start;
374 }
375
376 debug_assert!(new_end <= total_items);
377 (new_start..new_end, new_levels_start..new_levels_end)
378 } else {
379 if range.start > 0 {
385 for (idx, rep) in rep.iter().skip(1).enumerate() {
386 if *rep == max_rep {
387 rows_seen += 1;
388 if rows_seen == range.start {
389 new_start = idx as u64 + 1;
390 break;
391 }
392 }
393 }
394 }
395 let mut new_end = rep.len() as u64;
396 if range.end < total_items {
398 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
399 if *rep == max_rep {
400 rows_seen += 1;
401 if rows_seen == range.end {
402 new_end = idx as u64 + new_start + 1;
403 break;
404 }
405 }
406 }
407 }
408
409 if preamble_action == PreambleAction::Skip {
411 new_start += first_row_start;
412 new_end += first_row_start;
413 } else if preamble_action == PreambleAction::Take {
414 debug_assert_eq!(new_start, 0);
415 new_end += first_row_start;
416 }
417
418 debug_assert!(new_end <= total_items);
419 (new_start..new_end, new_start..new_end)
420 }
421 } else {
422 (range.clone(), range)
425 }
426 }
427
428 fn decode_miniblock_chunk(
430 &self,
431 buf: &LanceBuffer,
432 items_in_chunk: u64,
433 ) -> Result<DecodedMiniBlockChunk> {
434 let mut offset = 0;
435 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
436 offset += 2;
437
438 let rep_size = if self.rep_decompressor.is_some() {
439 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
440 offset += 2;
441 Some(rep_size)
442 } else {
443 None
444 };
445 let def_size = if self.def_decompressor.is_some() {
446 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
447 offset += 2;
448 Some(def_size)
449 } else {
450 None
451 };
452 let buffer_sizes = (0..self.num_buffers)
453 .map(|_| {
454 let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
455 offset += 2;
456 size
457 })
458 .collect::<Vec<_>>();
459
460 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
461
462 let rep = rep_size.map(|rep_size| {
463 let rep = buf.slice_with_length(offset, rep_size as usize);
464 offset += rep_size as usize;
465 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
466 rep
467 });
468
469 let def = def_size.map(|def_size| {
470 let def = buf.slice_with_length(offset, def_size as usize);
471 offset += def_size as usize;
472 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
473 def
474 });
475
476 let buffers = buffer_sizes
477 .into_iter()
478 .map(|buf_size| {
479 let buf = buf.slice_with_length(offset, buf_size as usize);
480 offset += buf_size as usize;
481 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
482 buf
483 })
484 .collect::<Vec<_>>();
485
486 let values = self
487 .value_decompressor
488 .decompress(buffers, items_in_chunk)?;
489
490 let rep = rep
491 .map(|rep| {
492 Self::decode_levels(
493 self.rep_decompressor.as_ref().unwrap().as_ref(),
494 rep,
495 num_levels,
496 )
497 })
498 .transpose()?;
499 let def = def
500 .map(|def| {
501 Self::decode_levels(
502 self.def_decompressor.as_ref().unwrap().as_ref(),
503 def,
504 num_levels,
505 )
506 })
507 .transpose()?;
508
509 Ok(DecodedMiniBlockChunk { rep, def, values })
510 }
511}
512
513impl DecodePageTask for DecodeMiniBlockTask {
514 fn decode(self: Box<Self>) -> Result<DecodedPage> {
515 let mut repbuf: Option<LevelBuffer> = None;
517 let mut defbuf: Option<LevelBuffer> = None;
518
519 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
520
521 let estimated_size_bytes = self
523 .instructions
524 .iter()
525 .map(|(_, chunk)| chunk.data.len())
526 .sum::<usize>()
527 * 2;
528 let mut data_builder =
529 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
530
531 let mut level_offset = 0;
533 for (instructions, chunk) in self.instructions.iter() {
535 let DecodedMiniBlockChunk { rep, def, values } =
539 self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
540
541 let row_range_start =
543 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
544 let row_range_end = row_range_start + instructions.rows_to_take;
545
546 let (item_range, level_range) = Self::map_range(
548 row_range_start..row_range_end,
549 rep.as_ref(),
550 def.as_ref(),
551 max_rep,
552 self.max_visible_level,
553 chunk.items_in_chunk,
554 instructions.preamble_action,
555 );
556 if item_range.end - item_range.start > chunk.items_in_chunk {
557 return Err(lance_core::Error::Internal {
558 message: format!(
559 "Item range {:?} is greater than chunk items in chunk {:?}",
560 item_range, chunk.items_in_chunk
561 ),
562 location: location!(),
563 });
564 }
565
566 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
568 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
569 level_offset += (level_range.end - level_range.start) as usize;
570 data_builder.append(&values, item_range);
571 }
572
573 let mut data = data_builder.finish();
574
575 let unraveler =
576 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
577
578 if let Some(dictionary) = &self.dictionary_data {
579 let DataBlock::FixedWidth(indices) = data else {
581 return Err(lance_core::Error::Internal {
582 message: format!(
583 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
584 data
585 ),
586 location: location!(),
587 });
588 };
589 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
590 indices,
591 dictionary.as_ref().clone(),
592 ));
593 }
594
595 Ok(DecodedPage {
596 data,
597 repdef: unraveler,
598 })
599 }
600}
601
602#[derive(Debug)]
605struct LoadedChunk {
606 data: LanceBuffer,
607 items_in_chunk: u64,
608 byte_range: Range<u64>,
609 chunk_idx: usize,
610}
611
612impl Clone for LoadedChunk {
613 fn clone(&self) -> Self {
614 Self {
615 data: self.data.clone(),
617 items_in_chunk: self.items_in_chunk,
618 byte_range: self.byte_range.clone(),
619 chunk_idx: self.chunk_idx,
620 }
621 }
622}
623
624#[derive(Debug)]
627struct MiniBlockDecoder {
628 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
629 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
630 value_decompressor: Arc<dyn MiniBlockDecompressor>,
631 def_meaning: Arc<[DefinitionInterpretation]>,
632 loaded_chunks: VecDeque<LoadedChunk>,
633 instructions: VecDeque<ChunkInstructions>,
634 offset_in_current_chunk: u64,
635 num_rows: u64,
636 num_buffers: u64,
637 dictionary: Option<Arc<DataBlock>>,
638}
639
640impl StructuralPageDecoder for MiniBlockDecoder {
643 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
644 let mut items_desired = num_rows;
645 let mut need_preamble = false;
646 let mut skip_in_chunk = self.offset_in_current_chunk;
647 let mut drain_instructions = Vec::new();
648 while items_desired > 0 || need_preamble {
649 let (instructions, consumed) = self
650 .instructions
651 .front()
652 .unwrap()
653 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
654
655 while self.loaded_chunks.front().unwrap().chunk_idx
656 != instructions.chunk_instructions.chunk_idx
657 {
658 self.loaded_chunks.pop_front();
659 }
660 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
661 if consumed {
662 self.instructions.pop_front();
663 }
664 }
665 self.offset_in_current_chunk = skip_in_chunk;
668
669 let max_visible_level = self
670 .def_meaning
671 .iter()
672 .take_while(|l| !l.is_list())
673 .map(|l| l.num_def_levels())
674 .sum::<u16>();
675
676 Ok(Box::new(DecodeMiniBlockTask {
677 instructions: drain_instructions,
678 def_decompressor: self.def_decompressor.clone(),
679 rep_decompressor: self.rep_decompressor.clone(),
680 value_decompressor: self.value_decompressor.clone(),
681 dictionary_data: self.dictionary.clone(),
682 def_meaning: self.def_meaning.clone(),
683 num_buffers: self.num_buffers,
684 max_visible_level,
685 }))
686 }
687
688 fn num_rows(&self) -> u64 {
689 self.num_rows
690 }
691}
692
693#[derive(Debug)]
694struct CachedComplexAllNullState {
695 rep: Option<ScalarBuffer<u16>>,
696 def: Option<ScalarBuffer<u16>>,
697}
698
699impl DeepSizeOf for CachedComplexAllNullState {
700 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
701 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
702 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
703 }
704}
705
706impl CachedPageData for CachedComplexAllNullState {
707 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
708 self
709 }
710}
711
712#[derive(Debug)]
721pub struct ComplexAllNullScheduler {
722 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
724 def_meaning: Arc<[DefinitionInterpretation]>,
725 repdef: Option<Arc<CachedComplexAllNullState>>,
726 max_visible_level: u16,
727}
728
729impl ComplexAllNullScheduler {
730 pub fn new(
731 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
732 def_meaning: Arc<[DefinitionInterpretation]>,
733 ) -> Self {
734 let max_visible_level = def_meaning
735 .iter()
736 .take_while(|l| !l.is_list())
737 .map(|l| l.num_def_levels())
738 .sum::<u16>();
739 Self {
740 buffer_offsets_and_sizes,
741 def_meaning,
742 repdef: None,
743 max_visible_level,
744 }
745 }
746}
747
748impl StructuralPageScheduler for ComplexAllNullScheduler {
749 fn initialize<'a>(
750 &'a mut self,
751 io: &Arc<dyn EncodingsIo>,
752 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
753 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
755 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
756 let has_rep = rep_size > 0;
757 let has_def = def_size > 0;
758
759 let mut reads = Vec::with_capacity(2);
760 if has_rep {
761 reads.push(rep_pos..rep_pos + rep_size);
762 }
763 if has_def {
764 reads.push(def_pos..def_pos + def_size);
765 }
766
767 let data = io.submit_request(reads, 0);
768
769 async move {
770 let data = data.await?;
771 let mut data_iter = data.into_iter();
772
773 let rep = if has_rep {
774 let rep = data_iter.next().unwrap();
775 let rep = LanceBuffer::from_bytes(rep, 2);
776 let rep = rep.borrow_to_typed_slice::<u16>();
777 Some(rep)
778 } else {
779 None
780 };
781
782 let def = if has_def {
783 let def = data_iter.next().unwrap();
784 let def = LanceBuffer::from_bytes(def, 2);
785 let def = def.borrow_to_typed_slice::<u16>();
786 Some(def)
787 } else {
788 None
789 };
790
791 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
792
793 self.repdef = Some(repdef.clone());
794
795 Ok(repdef as Arc<dyn CachedPageData>)
796 }
797 .boxed()
798 }
799
800 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
801 self.repdef = Some(
802 data.clone()
803 .as_arc_any()
804 .downcast::<CachedComplexAllNullState>()
805 .unwrap(),
806 );
807 }
808
809 fn schedule_ranges(
810 &self,
811 ranges: &[Range<u64>],
812 _io: &Arc<dyn EncodingsIo>,
813 ) -> Result<Vec<PageLoadTask>> {
814 let ranges = VecDeque::from_iter(ranges.iter().cloned());
815 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
816 let decoder = Box::new(ComplexAllNullPageDecoder {
817 ranges,
818 rep: self.repdef.as_ref().unwrap().rep.clone(),
819 def: self.repdef.as_ref().unwrap().def.clone(),
820 num_rows,
821 def_meaning: self.def_meaning.clone(),
822 max_visible_level: self.max_visible_level,
823 }) as Box<dyn StructuralPageDecoder>;
824 let page_load_task = PageLoadTask {
825 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
826 num_rows,
827 };
828 Ok(vec![page_load_task])
829 }
830}
831
832#[derive(Debug)]
833pub struct ComplexAllNullPageDecoder {
834 ranges: VecDeque<Range<u64>>,
835 rep: Option<ScalarBuffer<u16>>,
836 def: Option<ScalarBuffer<u16>>,
837 num_rows: u64,
838 def_meaning: Arc<[DefinitionInterpretation]>,
839 max_visible_level: u16,
840}
841
842impl ComplexAllNullPageDecoder {
843 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
844 let mut rows_desired = num_rows;
845 let mut ranges = Vec::with_capacity(self.ranges.len());
846 while rows_desired > 0 {
847 let front = self.ranges.front_mut().unwrap();
848 let avail = front.end - front.start;
849 if avail > rows_desired {
850 ranges.push(front.start..front.start + rows_desired);
851 front.start += rows_desired;
852 rows_desired = 0;
853 } else {
854 ranges.push(self.ranges.pop_front().unwrap());
855 rows_desired -= avail;
856 }
857 }
858 ranges
859 }
860}
861
862impl StructuralPageDecoder for ComplexAllNullPageDecoder {
863 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
864 let drained_ranges = self.drain_ranges(num_rows);
865 Ok(Box::new(DecodeComplexAllNullTask {
866 ranges: drained_ranges,
867 rep: self.rep.clone(),
868 def: self.def.clone(),
869 def_meaning: self.def_meaning.clone(),
870 max_visible_level: self.max_visible_level,
871 }))
872 }
873
874 fn num_rows(&self) -> u64 {
875 self.num_rows
876 }
877}
878
879#[derive(Debug)]
882pub struct DecodeComplexAllNullTask {
883 ranges: Vec<Range<u64>>,
884 rep: Option<ScalarBuffer<u16>>,
885 def: Option<ScalarBuffer<u16>>,
886 def_meaning: Arc<[DefinitionInterpretation]>,
887 max_visible_level: u16,
888}
889
890impl DecodeComplexAllNullTask {
891 fn decode_level(
892 &self,
893 levels: &Option<ScalarBuffer<u16>>,
894 num_values: u64,
895 ) -> Option<Vec<u16>> {
896 levels.as_ref().map(|levels| {
897 let mut referenced_levels = Vec::with_capacity(num_values as usize);
898 for range in &self.ranges {
899 referenced_levels.extend(
900 levels[range.start as usize..range.end as usize]
901 .iter()
902 .copied(),
903 );
904 }
905 referenced_levels
906 })
907 }
908}
909
910impl DecodePageTask for DecodeComplexAllNullTask {
911 fn decode(self: Box<Self>) -> Result<DecodedPage> {
912 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
913 let rep = self.decode_level(&self.rep, num_values);
914 let def = self.decode_level(&self.def, num_values);
915
916 let num_values = if let Some(def) = &def {
920 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
921 } else {
922 num_values
923 };
924
925 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
926 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
927 Ok(DecodedPage {
928 data,
929 repdef: unraveler,
930 })
931 }
932}
933
934#[derive(Debug, Default)]
939pub struct SimpleAllNullScheduler {}
940
941impl StructuralPageScheduler for SimpleAllNullScheduler {
942 fn initialize<'a>(
943 &'a mut self,
944 _io: &Arc<dyn EncodingsIo>,
945 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
946 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
947 }
948
949 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
950
951 fn schedule_ranges(
952 &self,
953 ranges: &[Range<u64>],
954 _io: &Arc<dyn EncodingsIo>,
955 ) -> Result<Vec<PageLoadTask>> {
956 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
957 let decoder =
958 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
959 let page_load_task = PageLoadTask {
960 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
961 num_rows,
962 };
963 Ok(vec![page_load_task])
964 }
965}
966
967#[derive(Debug)]
970struct SimpleAllNullDecodePageTask {
971 num_values: u64,
972}
973impl DecodePageTask for SimpleAllNullDecodePageTask {
974 fn decode(self: Box<Self>) -> Result<DecodedPage> {
975 let unraveler = RepDefUnraveler::new(
976 None,
977 Some(vec![1; self.num_values as usize]),
978 Arc::new([DefinitionInterpretation::NullableItem]),
979 self.num_values,
980 );
981 Ok(DecodedPage {
982 data: DataBlock::AllNull(AllNullDataBlock {
983 num_values: self.num_values,
984 }),
985 repdef: unraveler,
986 })
987 }
988}
989
990#[derive(Debug)]
991pub struct SimpleAllNullPageDecoder {
992 num_rows: u64,
993}
994
995impl StructuralPageDecoder for SimpleAllNullPageDecoder {
996 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
997 Ok(Box::new(SimpleAllNullDecodePageTask {
998 num_values: num_rows,
999 }))
1000 }
1001
1002 fn num_rows(&self) -> u64 {
1003 self.num_rows
1004 }
1005}
1006
1007#[derive(Debug, Clone)]
1008struct MiniBlockSchedulerDictionary {
1009 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1011 dictionary_buf_position_and_size: (u64, u64),
1012 dictionary_data_alignment: u64,
1013 num_dictionary_items: u64,
1014}
1015
1016#[derive(Debug)]
1018struct MiniBlockRepIndexBlock {
1019 first_row: u64,
1023 starts_including_trailer: u64,
1026 has_preamble: bool,
1028 has_trailer: bool,
1030}
1031
1032impl DeepSizeOf for MiniBlockRepIndexBlock {
1033 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1034 0
1035 }
1036}
1037
1038#[derive(Debug)]
1043struct MiniBlockRepIndex {
1044 blocks: Vec<MiniBlockRepIndexBlock>,
1045}
1046
1047impl DeepSizeOf for MiniBlockRepIndex {
1048 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1049 self.blocks.deep_size_of_children(context)
1050 }
1051}
1052
1053impl MiniBlockRepIndex {
1054 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1059 let mut blocks = Vec::with_capacity(chunks.len());
1060 let mut offset: u64 = 0;
1061
1062 for c in chunks {
1063 blocks.push(MiniBlockRepIndexBlock {
1064 first_row: offset,
1065 starts_including_trailer: c.num_values,
1066 has_preamble: false,
1067 has_trailer: false,
1068 });
1069
1070 offset += c.num_values;
1071 }
1072
1073 Self { blocks }
1074 }
1075
1076 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1082 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1084 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1085 let n = u64_slice.len() / stride;
1086
1087 let mut blocks = Vec::with_capacity(n);
1088 let mut chunk_has_preamble = false;
1089 let mut offset: u64 = 0;
1090
1091 for i in 0..n {
1093 let base_idx = i * stride;
1094 let ends = u64_slice[base_idx];
1095 let partial = u64_slice[base_idx + 1];
1096
1097 let has_trailer = partial > 0;
1098 let starts_including_trailer =
1100 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1101
1102 blocks.push(MiniBlockRepIndexBlock {
1103 first_row: offset,
1104 starts_including_trailer,
1105 has_preamble: chunk_has_preamble,
1106 has_trailer,
1107 });
1108
1109 chunk_has_preamble = has_trailer;
1110 offset += starts_including_trailer;
1111 }
1112
1113 Self { blocks }
1114 }
1115}
1116
1117#[derive(Debug)]
1119struct MiniBlockCacheableState {
1120 chunk_meta: Vec<ChunkMeta>,
1122 rep_index: MiniBlockRepIndex,
1124 dictionary: Option<Arc<DataBlock>>,
1126}
1127
1128impl DeepSizeOf for MiniBlockCacheableState {
1129 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1130 self.rep_index.deep_size_of_children(context)
1131 + self
1132 .dictionary
1133 .as_ref()
1134 .map(|dict| dict.data_size() as usize)
1135 .unwrap_or(0)
1136 }
1137}
1138
1139impl CachedPageData for MiniBlockCacheableState {
1140 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1141 self
1142 }
1143}
1144
1145#[derive(Debug)]
1172pub struct MiniBlockScheduler {
1173 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1175 priority: u64,
1176 items_in_page: u64,
1177 repetition_index_depth: u16,
1178 num_buffers: u64,
1179 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1180 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1181 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1182 def_meaning: Arc<[DefinitionInterpretation]>,
1183 dictionary: Option<MiniBlockSchedulerDictionary>,
1184 page_meta: Option<Arc<MiniBlockCacheableState>>,
1186}
1187
1188impl MiniBlockScheduler {
1189 fn try_new(
1190 buffer_offsets_and_sizes: &[(u64, u64)],
1191 priority: u64,
1192 items_in_page: u64,
1193 layout: &pb21::MiniBlockLayout,
1194 decompressors: &dyn DecompressionStrategy,
1195 ) -> Result<Self> {
1196 let rep_decompressor = layout
1197 .rep_compression
1198 .as_ref()
1199 .map(|rep_compression| {
1200 decompressors
1201 .create_block_decompressor(rep_compression)
1202 .map(Arc::from)
1203 })
1204 .transpose()?;
1205 let def_decompressor = layout
1206 .def_compression
1207 .as_ref()
1208 .map(|def_compression| {
1209 decompressors
1210 .create_block_decompressor(def_compression)
1211 .map(Arc::from)
1212 })
1213 .transpose()?;
1214 let def_meaning = layout
1215 .layers
1216 .iter()
1217 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1218 .collect::<Vec<_>>();
1219 let value_decompressor = decompressors.create_miniblock_decompressor(
1220 layout.value_compression.as_ref().unwrap(),
1221 decompressors,
1222 )?;
1223
1224 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1225 let num_dictionary_items = layout.num_dictionary_items;
1226 match dictionary_encoding.compression.as_ref().unwrap() {
1227 Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
1228 dictionary_decompressor: decompressors
1229 .create_block_decompressor(dictionary_encoding)?
1230 .into(),
1231 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1232 dictionary_data_alignment: 4,
1233 num_dictionary_items,
1234 }),
1235 Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
1236 dictionary_decompressor: decompressors
1237 .create_block_decompressor(dictionary_encoding)?
1238 .into(),
1239 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1240 dictionary_data_alignment: 16,
1241 num_dictionary_items,
1242 }),
1243 Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1244 dictionary_decompressor: decompressors
1245 .create_block_decompressor(dictionary_encoding)?
1246 .into(),
1247 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1248 dictionary_data_alignment: 1,
1249 num_dictionary_items,
1250 }),
1251 _ => unreachable!(
1252 "Mini-block dictionary encoding must use Variable, Flat, or General compression"
1253 ),
1254 }
1255 } else {
1256 None
1257 };
1258
1259 Ok(Self {
1260 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1261 rep_decompressor,
1262 def_decompressor,
1263 value_decompressor: value_decompressor.into(),
1264 repetition_index_depth: layout.repetition_index_depth as u16,
1265 num_buffers: layout.num_buffers,
1266 priority,
1267 items_in_page,
1268 dictionary,
1269 def_meaning: def_meaning.into(),
1270 page_meta: None,
1271 })
1272 }
1273
1274 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1275 let page_meta = self.page_meta.as_ref().unwrap();
1276 chunk_indices
1277 .iter()
1278 .map(|&chunk_idx| {
1279 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1280 let bytes_start = chunk_meta.offset_bytes;
1281 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1282 LoadedChunk {
1283 byte_range: bytes_start..bytes_end,
1284 items_in_chunk: chunk_meta.num_values,
1285 chunk_idx,
1286 data: LanceBuffer::empty(),
1287 }
1288 })
1289 .collect()
1290 }
1291}
1292
1293#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1294enum PreambleAction {
1295 Take,
1296 Skip,
1297 Absent,
1298}
1299
1300#[derive(Clone, Debug, PartialEq, Eq)]
1335struct ChunkInstructions {
1336 chunk_idx: usize,
1338 preamble: PreambleAction,
1344 rows_to_skip: u64,
1348 rows_to_take: u64,
1351 take_trailer: bool,
1358}
1359
1360#[derive(Debug, PartialEq, Eq)]
1378struct ChunkDrainInstructions {
1379 chunk_instructions: ChunkInstructions,
1380 rows_to_skip: u64,
1381 rows_to_take: u64,
1382 preamble_action: PreambleAction,
1383}
1384
1385impl ChunkInstructions {
1386 fn schedule_instructions(
1392 rep_index: &MiniBlockRepIndex,
1393 user_ranges: &[Range<u64>],
1394 ) -> Vec<Self> {
1395 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1399
1400 for user_range in user_ranges {
1401 let mut rows_needed = user_range.end - user_range.start;
1402 let mut need_preamble = false;
1403
1404 let mut block_index = match rep_index
1407 .blocks
1408 .binary_search_by_key(&user_range.start, |block| block.first_row)
1409 {
1410 Ok(idx) => {
1411 let mut idx = idx;
1414 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1415 idx -= 1;
1416 }
1417 idx
1418 }
1419 Err(idx) => idx - 1,
1421 };
1422
1423 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1424
1425 while rows_needed > 0 || need_preamble {
1426 if block_index >= rep_index.blocks.len() {
1428 log::warn!("schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early");
1429 break;
1430 }
1431
1432 let chunk = &rep_index.blocks[block_index];
1433 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1434
1435 if rows_avail == 0 && to_skip == 0 {
1439 if chunk.has_preamble && need_preamble {
1441 chunk_instructions.push(Self {
1442 chunk_idx: block_index,
1443 preamble: PreambleAction::Take,
1444 rows_to_skip: 0,
1445 rows_to_take: 0,
1446 take_trailer: chunk.has_trailer,
1450 });
1451 if chunk.starts_including_trailer > 0
1455 || block_index == rep_index.blocks.len() - 1
1456 {
1457 need_preamble = false;
1458 }
1459 }
1460 block_index += 1;
1462 continue;
1463 }
1464
1465 if rows_avail == 0 && to_skip > 0 {
1469 to_skip -= chunk.starts_including_trailer;
1472 block_index += 1;
1473 continue;
1474 }
1475
1476 let rows_to_take = rows_avail.min(rows_needed);
1477 rows_needed -= rows_to_take;
1478
1479 let mut take_trailer = false;
1480 let preamble = if chunk.has_preamble {
1481 if need_preamble {
1482 PreambleAction::Take
1483 } else {
1484 PreambleAction::Skip
1485 }
1486 } else {
1487 PreambleAction::Absent
1488 };
1489
1490 if rows_to_take == rows_avail && chunk.has_trailer {
1492 take_trailer = true;
1493 need_preamble = true;
1494 } else {
1495 need_preamble = false;
1496 };
1497
1498 chunk_instructions.push(Self {
1499 preamble,
1500 chunk_idx: block_index,
1501 rows_to_skip: to_skip,
1502 rows_to_take,
1503 take_trailer,
1504 });
1505
1506 to_skip = 0;
1507 block_index += 1;
1508 }
1509 }
1510
1511 if user_ranges.len() > 1 {
1515 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1517 let mut instructions_iter = chunk_instructions.into_iter();
1518 merged_instructions.push(instructions_iter.next().unwrap());
1519 for instruction in instructions_iter {
1520 let last = merged_instructions.last_mut().unwrap();
1521 if last.chunk_idx == instruction.chunk_idx
1522 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1523 {
1524 last.rows_to_take += instruction.rows_to_take;
1525 last.take_trailer |= instruction.take_trailer;
1526 } else {
1527 merged_instructions.push(instruction);
1528 }
1529 }
1530 merged_instructions
1531 } else {
1532 chunk_instructions
1533 }
1534 }
1535
1536 fn drain_from_instruction(
1537 &self,
1538 rows_desired: &mut u64,
1539 need_preamble: &mut bool,
1540 skip_in_chunk: &mut u64,
1541 ) -> (ChunkDrainInstructions, bool) {
1542 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1544 let rows_avail = self.rows_to_take - *skip_in_chunk;
1545 let has_preamble = self.preamble != PreambleAction::Absent;
1546 let preamble_action = match (*need_preamble, has_preamble) {
1547 (true, true) => PreambleAction::Take,
1548 (true, false) => panic!("Need preamble but there isn't one"),
1549 (false, true) => PreambleAction::Skip,
1550 (false, false) => PreambleAction::Absent,
1551 };
1552
1553 let rows_taking = if *rows_desired >= rows_avail {
1556 *need_preamble = self.take_trailer;
1564 rows_avail
1565 } else {
1566 *need_preamble = false;
1569 *rows_desired
1570 };
1571 let rows_skipped = *skip_in_chunk;
1572
1573 let consumed_chunk = if *rows_desired >= rows_avail {
1575 *rows_desired -= rows_avail;
1576 *skip_in_chunk = 0;
1577 true
1578 } else {
1579 *skip_in_chunk += *rows_desired;
1580 *rows_desired = 0;
1581 false
1582 };
1583
1584 (
1585 ChunkDrainInstructions {
1586 chunk_instructions: self.clone(),
1587 rows_to_skip: rows_skipped,
1588 rows_to_take: rows_taking,
1589 preamble_action,
1590 },
1591 consumed_chunk,
1592 )
1593 }
1594}
1595
1596impl StructuralPageScheduler for MiniBlockScheduler {
1597 fn initialize<'a>(
1598 &'a mut self,
1599 io: &Arc<dyn EncodingsIo>,
1600 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1601 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1605 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1606 let mut bufs_needed = 1;
1607 if self.dictionary.is_some() {
1608 bufs_needed += 1;
1609 }
1610 if self.repetition_index_depth > 0 {
1611 bufs_needed += 1;
1612 }
1613 let mut required_ranges = Vec::with_capacity(bufs_needed);
1614 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1615 if let Some(ref dictionary) = self.dictionary {
1616 required_ranges.push(
1617 dictionary.dictionary_buf_position_and_size.0
1618 ..dictionary.dictionary_buf_position_and_size.0
1619 + dictionary.dictionary_buf_position_and_size.1,
1620 );
1621 }
1622 if self.repetition_index_depth > 0 {
1623 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1624 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1625 }
1626 let io_req = io.submit_request(required_ranges, 0);
1627
1628 async move {
1629 let mut buffers = io_req.await?.into_iter().fuse();
1630 let meta_bytes = buffers.next().unwrap();
1631 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1632 let rep_index_bytes = buffers.next();
1633
1634 assert!(meta_bytes.len() % 2 == 0);
1636 let bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1637 let words = bytes.borrow_to_typed_slice::<u16>();
1638 let words = words.as_ref();
1639
1640 let mut chunk_meta = Vec::with_capacity(words.len());
1641
1642 let mut rows_counter = 0;
1643 let mut offset_bytes = value_buf_position;
1644 for (word_idx, word) in words.iter().enumerate() {
1645 let log_num_values = word & 0x0F;
1646 let divided_bytes = word >> 4;
1647 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1648 debug_assert!(num_bytes > 0);
1649 let num_values = if word_idx < words.len() - 1 {
1650 debug_assert!(log_num_values > 0);
1651 1 << log_num_values
1652 } else {
1653 debug_assert!(
1654 log_num_values == 0
1655 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1656 );
1657 self.items_in_page - rows_counter
1658 };
1659 rows_counter += num_values;
1660
1661 chunk_meta.push(ChunkMeta {
1662 num_values,
1663 chunk_size_bytes: num_bytes as u64,
1664 offset_bytes,
1665 });
1666 offset_bytes += num_bytes as u64;
1667 }
1668
1669 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1671 assert!(rep_index_data.len() % 8 == 0);
1672 let stride = self.repetition_index_depth as usize + 1;
1673 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1674 } else {
1675 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1676 };
1677
1678 let mut page_meta = MiniBlockCacheableState {
1679 chunk_meta,
1680 rep_index,
1681 dictionary: None,
1682 };
1683
1684 if let Some(ref mut dictionary) = self.dictionary {
1686 let dictionary_data = dictionary_bytes.unwrap();
1687 page_meta.dictionary =
1688 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1689 LanceBuffer::from_bytes(
1690 dictionary_data,
1691 dictionary.dictionary_data_alignment,
1692 ),
1693 dictionary.num_dictionary_items,
1694 )?));
1695 };
1696 let page_meta = Arc::new(page_meta);
1697 self.page_meta = Some(page_meta.clone());
1698 Ok(page_meta as Arc<dyn CachedPageData>)
1699 }
1700 .boxed()
1701 }
1702
1703 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1704 self.page_meta = Some(
1705 data.clone()
1706 .as_arc_any()
1707 .downcast::<MiniBlockCacheableState>()
1708 .unwrap(),
1709 );
1710 }
1711
1712 fn schedule_ranges(
1713 &self,
1714 ranges: &[Range<u64>],
1715 io: &Arc<dyn EncodingsIo>,
1716 ) -> Result<Vec<PageLoadTask>> {
1717 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1718
1719 let page_meta = self.page_meta.as_ref().unwrap();
1720
1721 let chunk_instructions =
1722 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1723
1724 debug_assert_eq!(
1725 num_rows,
1726 chunk_instructions
1727 .iter()
1728 .map(|ci| ci.rows_to_take)
1729 .sum::<u64>()
1730 );
1731
1732 let chunks_needed = chunk_instructions
1733 .iter()
1734 .map(|ci| ci.chunk_idx)
1735 .unique()
1736 .collect::<Vec<_>>();
1737
1738 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1739 let chunk_ranges = loaded_chunks
1740 .iter()
1741 .map(|c| c.byte_range.clone())
1742 .collect::<Vec<_>>();
1743 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1744
1745 let rep_decompressor = self.rep_decompressor.clone();
1746 let def_decompressor = self.def_decompressor.clone();
1747 let value_decompressor = self.value_decompressor.clone();
1748 let num_buffers = self.num_buffers;
1749 let dictionary = page_meta
1750 .dictionary
1751 .as_ref()
1752 .map(|dictionary| dictionary.clone());
1753 let def_meaning = self.def_meaning.clone();
1754
1755 let res = async move {
1756 let loaded_chunk_data = loaded_chunk_data.await?;
1757 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1758 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1759 }
1760
1761 Ok(Box::new(MiniBlockDecoder {
1762 rep_decompressor,
1763 def_decompressor,
1764 value_decompressor,
1765 def_meaning,
1766 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1767 instructions: VecDeque::from(chunk_instructions),
1768 offset_in_current_chunk: 0,
1769 dictionary,
1770 num_rows,
1771 num_buffers,
1772 }) as Box<dyn StructuralPageDecoder>)
1773 }
1774 .boxed();
1775 let page_load_task = PageLoadTask {
1776 decoder_fut: res,
1777 num_rows,
1778 };
1779 Ok(vec![page_load_task])
1780 }
1781}
1782
1783#[derive(Debug, Clone, Copy)]
1784struct FullZipRepIndexDetails {
1785 buf_position: u64,
1786 bytes_per_value: u64, }
1788
1789#[derive(Debug)]
1790enum PerValueDecompressor {
1791 Fixed(Arc<dyn FixedPerValueDecompressor>),
1792 Variable(Arc<dyn VariablePerValueDecompressor>),
1793}
1794
1795#[derive(Debug)]
1796struct FullZipDecodeDetails {
1797 value_decompressor: PerValueDecompressor,
1798 def_meaning: Arc<[DefinitionInterpretation]>,
1799 ctrl_word_parser: ControlWordParser,
1800 max_rep: u16,
1801 max_visible_def: u16,
1802}
1803
1804#[derive(Debug)]
1812pub struct FullZipScheduler {
1813 data_buf_position: u64,
1814 rep_index: Option<FullZipRepIndexDetails>,
1815 priority: u64,
1816 rows_in_page: u64,
1817 bits_per_offset: u8,
1818 details: Arc<FullZipDecodeDetails>,
1819 cached_state: Option<Arc<FullZipCacheableState>>,
1821 enable_cache: bool,
1823}
1824
1825impl FullZipScheduler {
1826 fn try_new(
1827 buffer_offsets_and_sizes: &[(u64, u64)],
1828 priority: u64,
1829 rows_in_page: u64,
1830 layout: &pb21::FullZipLayout,
1831 decompressors: &dyn DecompressionStrategy,
1832 ) -> Result<Self> {
1833 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1837 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1838 let num_reps = rows_in_page + 1;
1839 let bytes_per_rep = len / num_reps;
1840 debug_assert_eq!(len % num_reps, 0);
1841 debug_assert!(
1842 bytes_per_rep == 1
1843 || bytes_per_rep == 2
1844 || bytes_per_rep == 4
1845 || bytes_per_rep == 8
1846 );
1847 FullZipRepIndexDetails {
1848 buf_position: *pos,
1849 bytes_per_value: bytes_per_rep,
1850 }
1851 });
1852
1853 let value_decompressor = match layout.details {
1854 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
1855 let decompressor = decompressors.create_fixed_per_value_decompressor(
1856 layout.value_compression.as_ref().unwrap(),
1857 )?;
1858 PerValueDecompressor::Fixed(decompressor.into())
1859 }
1860 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
1861 let decompressor = decompressors.create_variable_per_value_decompressor(
1862 layout.value_compression.as_ref().unwrap(),
1863 )?;
1864 PerValueDecompressor::Variable(decompressor.into())
1865 }
1866 None => {
1867 panic!("Full-zip layout must have a `details` field");
1868 }
1869 };
1870 let ctrl_word_parser = ControlWordParser::new(
1871 layout.bits_rep.try_into().unwrap(),
1872 layout.bits_def.try_into().unwrap(),
1873 );
1874 let def_meaning = layout
1875 .layers
1876 .iter()
1877 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1878 .collect::<Vec<_>>();
1879
1880 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1881 let max_visible_def = def_meaning
1882 .iter()
1883 .filter(|d| !d.is_list())
1884 .map(|d| d.num_def_levels())
1885 .sum();
1886
1887 let bits_per_offset = match layout.details {
1888 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
1889 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1890 bits_per_offset as u8
1891 }
1892 None => panic!("Full-zip layout must have a `details` field"),
1893 };
1894
1895 let details = Arc::new(FullZipDecodeDetails {
1896 value_decompressor,
1897 def_meaning: def_meaning.into(),
1898 ctrl_word_parser,
1899 max_rep,
1900 max_visible_def,
1901 });
1902 Ok(Self {
1903 data_buf_position,
1904 rep_index,
1905 details,
1906 priority,
1907 rows_in_page,
1908 bits_per_offset,
1909 cached_state: None,
1910 enable_cache: false, })
1912 }
1913
1914 fn create_decoder(
1916 details: Arc<FullZipDecodeDetails>,
1917 data: VecDeque<LanceBuffer>,
1918 num_rows: u64,
1919 bits_per_offset: u8,
1920 ) -> Result<Box<dyn StructuralPageDecoder>> {
1921 match &details.value_decompressor {
1922 PerValueDecompressor::Fixed(decompressor) => {
1923 let bits_per_value = decompressor.bits_per_value();
1924 if bits_per_value == 0 {
1925 return Err(lance_core::Error::Internal {
1926 message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1927 location: location!(),
1928 });
1929 }
1930 if bits_per_value % 8 != 0 {
1931 return Err(lance_core::Error::NotSupported {
1932 source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1933 location: location!(),
1934 });
1935 }
1936 let bytes_per_value = bits_per_value / 8;
1937 let total_bytes_per_value =
1938 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1939 Ok(Box::new(FixedFullZipDecoder {
1940 details,
1941 data,
1942 num_rows,
1943 offset_in_current: 0,
1944 bytes_per_value: bytes_per_value as usize,
1945 total_bytes_per_value,
1946 }) as Box<dyn StructuralPageDecoder>)
1947 }
1948 PerValueDecompressor::Variable(_decompressor) => {
1949 Ok(Box::new(VariableFullZipDecoder::new(
1950 details,
1951 data,
1952 num_rows,
1953 bits_per_offset,
1954 bits_per_offset,
1955 )))
1956 }
1957 }
1958 }
1959
1960 fn extract_byte_ranges_from_pairs(
1963 buffer: LanceBuffer,
1964 bytes_per_value: u64,
1965 data_buf_position: u64,
1966 ) -> Vec<Range<u64>> {
1967 ByteUnpacker::new(buffer, bytes_per_value as usize)
1968 .chunks(2)
1969 .into_iter()
1970 .map(|mut c| {
1971 let start = c.next().unwrap() + data_buf_position;
1972 let end = c.next().unwrap() + data_buf_position;
1973 start..end
1974 })
1975 .collect::<Vec<_>>()
1976 }
1977
1978 fn extract_byte_ranges_from_cached(
1981 buffer: &LanceBuffer,
1982 ranges: &[Range<u64>],
1983 bytes_per_value: u64,
1984 data_buf_position: u64,
1985 ) -> Vec<Range<u64>> {
1986 ranges
1987 .iter()
1988 .map(|r| {
1989 let start_offset = (r.start * bytes_per_value) as usize;
1990 let end_offset = (r.end * bytes_per_value) as usize;
1991
1992 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
1993 let start_val =
1994 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1995 .next()
1996 .unwrap();
1997
1998 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
1999 let end_val =
2000 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2001 .next()
2002 .unwrap();
2003
2004 (data_buf_position + start_val)..(data_buf_position + end_val)
2005 })
2006 .collect()
2007 }
2008
2009 fn compute_rep_index_ranges(
2011 ranges: &[Range<u64>],
2012 rep_index: &FullZipRepIndexDetails,
2013 ) -> Vec<Range<u64>> {
2014 ranges
2015 .iter()
2016 .flat_map(|r| {
2017 let first_val_start =
2018 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2019 let first_val_end = first_val_start + rep_index.bytes_per_value;
2020 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2021 let last_val_end = last_val_start + rep_index.bytes_per_value;
2022 [first_val_start..first_val_end, last_val_start..last_val_end]
2023 })
2024 .collect()
2025 }
2026
2027 async fn resolve_byte_ranges(
2029 data_buf_position: u64,
2030 ranges: &[Range<u64>],
2031 io: &Arc<dyn EncodingsIo>,
2032 rep_index: &FullZipRepIndexDetails,
2033 cached_state: Option<&Arc<FullZipCacheableState>>,
2034 priority: u64,
2035 ) -> Result<Vec<Range<u64>>> {
2036 if let Some(cached_state) = cached_state {
2037 Ok(Self::extract_byte_ranges_from_cached(
2039 &cached_state.rep_index_buffer,
2040 ranges,
2041 rep_index.bytes_per_value,
2042 data_buf_position,
2043 ))
2044 } else {
2045 let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
2047 let rep_data = io.submit_request(rep_ranges, priority).await?;
2048 let rep_buffer = LanceBuffer::concat(
2049 &rep_data
2050 .into_iter()
2051 .map(|d| LanceBuffer::from_bytes(d, 1))
2052 .collect::<Vec<_>>(),
2053 );
2054 Ok(Self::extract_byte_ranges_from_pairs(
2055 rep_buffer,
2056 rep_index.bytes_per_value,
2057 data_buf_position,
2058 ))
2059 }
2060 }
2061
2062 fn schedule_ranges_rep(
2064 &self,
2065 ranges: &[Range<u64>],
2066 io: &Arc<dyn EncodingsIo>,
2067 rep_index: FullZipRepIndexDetails,
2068 ) -> Result<Vec<PageLoadTask>> {
2069 let data_buf_position = self.data_buf_position;
2071 let cached_state = self.cached_state.clone();
2072 let priority = self.priority;
2073 let details = self.details.clone();
2074 let bits_per_offset = self.bits_per_offset;
2075 let ranges = ranges.to_vec();
2076 let io_clone = io.clone();
2077 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2078
2079 let load_task = async move {
2080 let byte_ranges = Self::resolve_byte_ranges(
2082 data_buf_position,
2083 &ranges,
2084 &io_clone,
2085 &rep_index,
2086 cached_state.as_ref(),
2087 priority,
2088 )
2089 .await?;
2090
2091 let data = io_clone.submit_request(byte_ranges, priority).await?;
2093 let data = data
2094 .into_iter()
2095 .map(|d| LanceBuffer::from_bytes(d, 1))
2096 .collect::<VecDeque<_>>();
2097
2098 let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2100
2101 Self::create_decoder(details, data, num_rows, bits_per_offset)
2103 }
2104 .boxed();
2105 let page_load_task = PageLoadTask {
2106 decoder_fut: load_task,
2107 num_rows,
2108 };
2109 Ok(vec![page_load_task])
2110 }
2111
2112 fn schedule_ranges_simple(
2116 &self,
2117 ranges: &[Range<u64>],
2118 io: &dyn EncodingsIo,
2119 ) -> Result<Vec<PageLoadTask>> {
2120 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2122
2123 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2124 unreachable!()
2125 };
2126
2127 let bits_per_value = decompressor.bits_per_value();
2129 assert_eq!(bits_per_value % 8, 0);
2130 let bytes_per_value = bits_per_value / 8;
2131 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2132 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2133 let byte_ranges = ranges.iter().map(|r| {
2134 debug_assert!(r.end <= self.rows_in_page);
2135 let start = self.data_buf_position + r.start * total_bytes_per_value;
2136 let end = self.data_buf_position + r.end * total_bytes_per_value;
2137 start..end
2138 });
2139
2140 let data = io.submit_request(byte_ranges.collect(), self.priority);
2142
2143 let details = self.details.clone();
2144
2145 let load_task = async move {
2146 let data = data.await?;
2147 let data = data
2148 .into_iter()
2149 .map(|d| LanceBuffer::from_bytes(d, 1))
2150 .collect();
2151 Ok(Box::new(FixedFullZipDecoder {
2152 details,
2153 data,
2154 num_rows,
2155 offset_in_current: 0,
2156 bytes_per_value: bytes_per_value as usize,
2157 total_bytes_per_value: total_bytes_per_value as usize,
2158 }) as Box<dyn StructuralPageDecoder>)
2159 }
2160 .boxed();
2161 let page_load_task = PageLoadTask {
2162 decoder_fut: load_task,
2163 num_rows,
2164 };
2165 Ok(vec![page_load_task])
2166 }
2167}
2168
2169#[derive(Debug)]
2171struct FullZipCacheableState {
2172 rep_index_buffer: LanceBuffer,
2174}
2175
2176impl DeepSizeOf for FullZipCacheableState {
2177 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2178 self.rep_index_buffer.len()
2179 }
2180}
2181
2182impl CachedPageData for FullZipCacheableState {
2183 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2184 self
2185 }
2186}
2187
2188impl StructuralPageScheduler for FullZipScheduler {
2189 fn initialize<'a>(
2192 &'a mut self,
2193 io: &Arc<dyn EncodingsIo>,
2194 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2195 if self.enable_cache && self.rep_index.is_some() {
2197 let rep_index = self.rep_index.as_ref().unwrap();
2198 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2200 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2201
2202 let io_clone = io.clone();
2204 let future = async move {
2205 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2206 let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2207
2208 Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2210 };
2211
2212 future.boxed()
2213 } else {
2214 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2216 }
2217 }
2218
2219 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2223 if let Ok(cached_state) = cache
2225 .clone()
2226 .as_arc_any()
2227 .downcast::<FullZipCacheableState>()
2228 {
2229 self.cached_state = Some(cached_state);
2231 }
2232 }
2233
2234 fn schedule_ranges(
2235 &self,
2236 ranges: &[Range<u64>],
2237 io: &Arc<dyn EncodingsIo>,
2238 ) -> Result<Vec<PageLoadTask>> {
2239 if let Some(rep_index) = self.rep_index {
2240 self.schedule_ranges_rep(ranges, io, rep_index)
2241 } else {
2242 self.schedule_ranges_simple(ranges, io.as_ref())
2243 }
2244 }
2245}
2246
2247#[derive(Debug)]
2255struct FixedFullZipDecoder {
2256 details: Arc<FullZipDecodeDetails>,
2257 data: VecDeque<LanceBuffer>,
2258 offset_in_current: usize,
2259 bytes_per_value: usize,
2260 total_bytes_per_value: usize,
2261 num_rows: u64,
2262}
2263
2264impl FixedFullZipDecoder {
2265 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2266 debug_assert!(num_rows > 0);
2267 let cur_buf = self.data.front_mut().unwrap();
2268 let start = self.offset_in_current;
2269 if self.details.ctrl_word_parser.has_rep() {
2270 let mut rows_started = 0;
2273 let mut num_items = 0;
2276 while self.offset_in_current < cur_buf.len() {
2277 let control = self.details.ctrl_word_parser.parse_desc(
2278 &cur_buf[self.offset_in_current..],
2279 self.details.max_rep,
2280 self.details.max_visible_def,
2281 );
2282 if control.is_new_row {
2283 if rows_started == num_rows {
2284 break;
2285 }
2286 rows_started += 1;
2287 }
2288 num_items += 1;
2289 if control.is_visible {
2290 self.offset_in_current += self.total_bytes_per_value;
2291 } else {
2292 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2293 }
2294 }
2295
2296 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2297 if self.offset_in_current == cur_buf.len() {
2298 self.data.pop_front();
2299 self.offset_in_current = 0;
2300 }
2301
2302 FullZipDecodeTaskItem {
2303 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2304 data: task_slice,
2305 bits_per_value: self.bytes_per_value as u64 * 8,
2306 num_values: num_items,
2307 block_info: BlockInfo::new(),
2308 }),
2309 rows_in_buf: rows_started,
2310 }
2311 } else {
2312 let cur_buf = self.data.front_mut().unwrap();
2315 let bytes_avail = cur_buf.len() - self.offset_in_current;
2316 let offset_in_cur = self.offset_in_current;
2317
2318 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2319 let mut rows_taken = num_rows;
2320 let task_slice = if bytes_needed >= bytes_avail {
2321 self.offset_in_current = 0;
2322 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2323 self.data
2324 .pop_front()
2325 .unwrap()
2326 .slice_with_length(offset_in_cur, bytes_avail)
2327 } else {
2328 self.offset_in_current += bytes_needed;
2329 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2330 };
2331 FullZipDecodeTaskItem {
2332 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2333 data: task_slice,
2334 bits_per_value: self.bytes_per_value as u64 * 8,
2335 num_values: rows_taken,
2336 block_info: BlockInfo::new(),
2337 }),
2338 rows_in_buf: rows_taken,
2339 }
2340 }
2341 }
2342}
2343
2344impl StructuralPageDecoder for FixedFullZipDecoder {
2345 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2346 let mut task_data = Vec::with_capacity(self.data.len());
2347 let mut remaining = num_rows;
2348 while remaining > 0 {
2349 let task_item = self.slice_next_task(remaining);
2350 remaining -= task_item.rows_in_buf;
2351 task_data.push(task_item);
2352 }
2353 Ok(Box::new(FixedFullZipDecodeTask {
2354 details: self.details.clone(),
2355 data: task_data,
2356 bytes_per_value: self.bytes_per_value,
2357 num_rows: num_rows as usize,
2358 }))
2359 }
2360
2361 fn num_rows(&self) -> u64 {
2362 self.num_rows
2363 }
2364}
2365
2366#[derive(Debug)]
2371struct VariableFullZipDecoder {
2372 details: Arc<FullZipDecodeDetails>,
2373 decompressor: Arc<dyn VariablePerValueDecompressor>,
2374 data: LanceBuffer,
2375 offsets: LanceBuffer,
2376 rep: ScalarBuffer<u16>,
2377 def: ScalarBuffer<u16>,
2378 repdef_starts: Vec<usize>,
2379 data_starts: Vec<usize>,
2380 offset_starts: Vec<usize>,
2381 visible_item_counts: Vec<u64>,
2382 bits_per_offset: u8,
2383 current_idx: usize,
2384 num_rows: u64,
2385}
2386
2387impl VariableFullZipDecoder {
2388 fn new(
2389 details: Arc<FullZipDecodeDetails>,
2390 data: VecDeque<LanceBuffer>,
2391 num_rows: u64,
2392 in_bits_per_length: u8,
2393 out_bits_per_offset: u8,
2394 ) -> Self {
2395 let decompressor = match details.value_decompressor {
2396 PerValueDecompressor::Variable(ref d) => d.clone(),
2397 _ => unreachable!(),
2398 };
2399
2400 assert_eq!(in_bits_per_length % 8, 0);
2401 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2402
2403 let mut decoder = Self {
2404 details,
2405 decompressor,
2406 data: LanceBuffer::empty(),
2407 offsets: LanceBuffer::empty(),
2408 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2409 def: LanceBuffer::empty().borrow_to_typed_slice(),
2410 bits_per_offset: out_bits_per_offset,
2411 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2412 data_starts: Vec::with_capacity(num_rows as usize + 1),
2413 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2414 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2415 current_idx: 0,
2416 num_rows,
2417 };
2418
2419 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2440
2441 decoder
2442 }
2443
2444 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2445 match bits_per_offset {
2446 8 => *data.get_unchecked(0) as u64,
2447 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2448 32 => u32::from_le_bytes([
2449 *data.get_unchecked(0),
2450 *data.get_unchecked(1),
2451 *data.get_unchecked(2),
2452 *data.get_unchecked(3),
2453 ]) as u64,
2454 64 => u64::from_le_bytes([
2455 *data.get_unchecked(0),
2456 *data.get_unchecked(1),
2457 *data.get_unchecked(2),
2458 *data.get_unchecked(3),
2459 *data.get_unchecked(4),
2460 *data.get_unchecked(5),
2461 *data.get_unchecked(6),
2462 *data.get_unchecked(7),
2463 ]),
2464 _ => unreachable!(),
2465 }
2466 }
2467
2468 fn unzip(
2469 &mut self,
2470 data: VecDeque<LanceBuffer>,
2471 in_bits_per_length: u8,
2472 out_bits_per_offset: u8,
2473 num_rows: u64,
2474 ) {
2475 let mut rep = Vec::with_capacity(num_rows as usize);
2477 let mut def = Vec::with_capacity(num_rows as usize);
2478 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2479
2480 let bytes_per_offset = out_bits_per_offset as usize / 8;
2483 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2484 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2485
2486 let bytes_per_length = in_bits_per_length as usize / 8;
2487 let bytes_lengths = bytes_per_length * num_rows as usize;
2488
2489 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2490 let mut unzipped_data =
2493 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2494
2495 let mut current_offset = 0_u64;
2496 let mut visible_item_count = 0_u64;
2497 for databuf in data.into_iter() {
2498 let mut databuf = databuf.as_ref();
2499 while !databuf.is_empty() {
2500 let data_start = unzipped_data.len();
2501 let offset_start = offsets_data.len();
2502 let repdef_start = rep.len().max(def.len());
2505 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2507 databuf,
2508 self.details.max_rep,
2509 self.details.max_visible_def,
2510 );
2511 self.details
2512 .ctrl_word_parser
2513 .parse(databuf, &mut rep, &mut def);
2514 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2515
2516 if ctrl_desc.is_new_row {
2517 self.repdef_starts.push(repdef_start);
2518 self.data_starts.push(data_start);
2519 self.offset_starts.push(offset_start);
2520 self.visible_item_counts.push(visible_item_count);
2521 }
2522 if ctrl_desc.is_visible {
2523 visible_item_count += 1;
2524 if ctrl_desc.is_valid_item {
2525 debug_assert!(databuf.len() >= bytes_per_length);
2527 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2528 match out_bits_per_offset {
2529 32 => offsets_data
2530 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2531 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2532 _ => unreachable!(),
2533 };
2534 databuf = &databuf[bytes_per_offset..];
2535 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2536 databuf = &databuf[length as usize..];
2537 current_offset += length;
2538 } else {
2539 match out_bits_per_offset {
2541 32 => offsets_data
2542 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2543 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2544 _ => unreachable!(),
2545 }
2546 }
2547 }
2548 }
2549 }
2550 self.repdef_starts.push(rep.len().max(def.len()));
2551 self.data_starts.push(unzipped_data.len());
2552 self.offset_starts.push(offsets_data.len());
2553 self.visible_item_counts.push(visible_item_count);
2554 match out_bits_per_offset {
2555 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2556 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2557 _ => unreachable!(),
2558 };
2559 self.rep = ScalarBuffer::from(rep);
2560 self.def = ScalarBuffer::from(def);
2561 self.data = LanceBuffer::from(unzipped_data);
2562 self.offsets = LanceBuffer::from(offsets_data);
2563 }
2564}
2565
2566impl StructuralPageDecoder for VariableFullZipDecoder {
2567 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2568 let start = self.current_idx;
2569 let end = start + num_rows as usize;
2570
2571 let data = self.data.clone();
2579
2580 let offset_start = self.offset_starts[start];
2581 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2582 let offsets = self
2583 .offsets
2584 .slice_with_length(offset_start, offset_end - offset_start);
2585
2586 let repdef_start = self.repdef_starts[start];
2587 let repdef_end = self.repdef_starts[end];
2588 let rep = if self.rep.is_empty() {
2589 self.rep.clone()
2590 } else {
2591 self.rep.slice(repdef_start, repdef_end - repdef_start)
2592 };
2593 let def = if self.def.is_empty() {
2594 self.def.clone()
2595 } else {
2596 self.def.slice(repdef_start, repdef_end - repdef_start)
2597 };
2598
2599 let visible_item_counts_start = self.visible_item_counts[start];
2600 let visible_item_counts_end = self.visible_item_counts[end];
2601 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2602
2603 self.current_idx += num_rows as usize;
2604
2605 Ok(Box::new(VariableFullZipDecodeTask {
2606 details: self.details.clone(),
2607 decompressor: self.decompressor.clone(),
2608 data,
2609 offsets,
2610 bits_per_offset: self.bits_per_offset,
2611 num_visible_items,
2612 rep,
2613 def,
2614 }))
2615 }
2616
2617 fn num_rows(&self) -> u64 {
2618 self.num_rows
2619 }
2620}
2621
2622#[derive(Debug)]
2623struct VariableFullZipDecodeTask {
2624 details: Arc<FullZipDecodeDetails>,
2625 decompressor: Arc<dyn VariablePerValueDecompressor>,
2626 data: LanceBuffer,
2627 offsets: LanceBuffer,
2628 bits_per_offset: u8,
2629 num_visible_items: u64,
2630 rep: ScalarBuffer<u16>,
2631 def: ScalarBuffer<u16>,
2632}
2633
2634impl DecodePageTask for VariableFullZipDecodeTask {
2635 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2636 let block = VariableWidthBlock {
2637 data: self.data,
2638 offsets: self.offsets,
2639 bits_per_offset: self.bits_per_offset,
2640 num_values: self.num_visible_items,
2641 block_info: BlockInfo::new(),
2642 };
2643 let decomopressed = self.decompressor.decompress(block)?;
2644 let rep = if self.rep.is_empty() {
2645 None
2646 } else {
2647 Some(self.rep.to_vec())
2648 };
2649 let def = if self.def.is_empty() {
2650 None
2651 } else {
2652 Some(self.def.to_vec())
2653 };
2654 let unraveler = RepDefUnraveler::new(
2655 rep,
2656 def,
2657 self.details.def_meaning.clone(),
2658 self.num_visible_items,
2659 );
2660 Ok(DecodedPage {
2661 data: decomopressed,
2662 repdef: unraveler,
2663 })
2664 }
2665}
2666
2667#[derive(Debug)]
2668struct FullZipDecodeTaskItem {
2669 data: PerValueDataBlock,
2670 rows_in_buf: u64,
2671}
2672
2673#[derive(Debug)]
2676struct FixedFullZipDecodeTask {
2677 details: Arc<FullZipDecodeDetails>,
2678 data: Vec<FullZipDecodeTaskItem>,
2679 num_rows: usize,
2680 bytes_per_value: usize,
2681}
2682
2683impl DecodePageTask for FixedFullZipDecodeTask {
2684 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2685 let estimated_size_bytes = self
2687 .data
2688 .iter()
2689 .map(|task_item| task_item.data.data_size() as usize)
2690 .sum::<usize>()
2691 * 2;
2692 let mut data_builder =
2693 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2694
2695 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2696 for task_item in self.data.into_iter() {
2700 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2701 unreachable!()
2702 };
2703 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2704 else {
2705 unreachable!()
2706 };
2707 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2708 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2709 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2710 }
2711
2712 let unraveler = RepDefUnraveler::new(
2713 None,
2714 None,
2715 self.details.def_meaning.clone(),
2716 self.num_rows as u64,
2717 );
2718
2719 Ok(DecodedPage {
2720 data: data_builder.finish(),
2721 repdef: unraveler,
2722 })
2723 } else {
2724 let mut rep = Vec::with_capacity(self.num_rows);
2726 let mut def = Vec::with_capacity(self.num_rows);
2727
2728 for task_item in self.data.into_iter() {
2729 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2730 unreachable!()
2731 };
2732 let mut buf_slice = fixed_data.data.as_ref();
2733 let num_values = fixed_data.num_values as usize;
2734 let mut values = Vec::with_capacity(
2737 fixed_data.data.len()
2738 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2739 );
2740 let mut visible_items = 0;
2741 for _ in 0..num_values {
2742 self.details
2744 .ctrl_word_parser
2745 .parse(buf_slice, &mut rep, &mut def);
2746 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2747
2748 let is_visible = def
2749 .last()
2750 .map(|d| *d <= self.details.max_visible_def)
2751 .unwrap_or(true);
2752 if is_visible {
2753 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2755 buf_slice = &buf_slice[self.bytes_per_value..];
2756 visible_items += 1;
2757 }
2758 }
2759
2760 let values_buf = LanceBuffer::from(values);
2762 let fixed_data = FixedWidthDataBlock {
2763 bits_per_value: self.bytes_per_value as u64 * 8,
2764 block_info: BlockInfo::new(),
2765 data: values_buf,
2766 num_values: visible_items,
2767 };
2768 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2769 else {
2770 unreachable!()
2771 };
2772 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2773 data_builder.append(&decompressed, 0..visible_items);
2774 }
2775
2776 let repetition = if rep.is_empty() { None } else { Some(rep) };
2777 let definition = if def.is_empty() { None } else { Some(def) };
2778
2779 let unraveler = RepDefUnraveler::new(
2780 repetition,
2781 definition,
2782 self.details.def_meaning.clone(),
2783 self.num_rows as u64,
2784 );
2785 let data = data_builder.finish();
2786
2787 Ok(DecodedPage {
2788 data,
2789 repdef: unraveler,
2790 })
2791 }
2792 }
2793}
2794
2795#[derive(Debug)]
2796struct StructuralPrimitiveFieldSchedulingJob<'a> {
2797 scheduler: &'a StructuralPrimitiveFieldScheduler,
2798 ranges: Vec<Range<u64>>,
2799 page_idx: usize,
2800 range_idx: usize,
2801 global_row_offset: u64,
2802}
2803
2804impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2805 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2806 Self {
2807 scheduler,
2808 ranges,
2809 page_idx: 0,
2810 range_idx: 0,
2811 global_row_offset: 0,
2812 }
2813 }
2814}
2815
2816impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2817 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
2818 if self.range_idx >= self.ranges.len() {
2819 return Ok(Vec::new());
2820 }
2821 let mut range = self.ranges[self.range_idx].clone();
2823 let priority = range.start;
2824
2825 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2826 trace!(
2827 "Current range is {:?} and current page has {} rows",
2828 range,
2829 cur_page.num_rows
2830 );
2831 while cur_page.num_rows + self.global_row_offset <= range.start {
2833 self.global_row_offset += cur_page.num_rows;
2834 self.page_idx += 1;
2835 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2836 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2837 }
2838
2839 let mut ranges_in_page = Vec::new();
2843 while cur_page.num_rows + self.global_row_offset > range.start {
2844 range.start = range.start.max(self.global_row_offset);
2845 let start_in_page = range.start - self.global_row_offset;
2846 let end_in_page = start_in_page + (range.end - range.start);
2847 let end_in_page = end_in_page.min(cur_page.num_rows);
2848 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2849
2850 ranges_in_page.push(start_in_page..end_in_page);
2851 if last_in_range {
2852 self.range_idx += 1;
2853 if self.range_idx == self.ranges.len() {
2854 break;
2855 }
2856 range = self.ranges[self.range_idx].clone();
2857 } else {
2858 break;
2859 }
2860 }
2861
2862 trace!(
2863 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2864 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
2865 ranges_in_page.len(),
2866 cur_page.num_rows,
2867 priority,
2868 self.scheduler.column_index,
2869 cur_page.page_index,
2870 );
2871
2872 self.global_row_offset += cur_page.num_rows;
2873 self.page_idx += 1;
2874
2875 let page_decoders = cur_page
2876 .scheduler
2877 .schedule_ranges(&ranges_in_page, context.io())?;
2878
2879 let cur_path = context.current_path();
2880 page_decoders
2881 .into_iter()
2882 .map(|page_load_task| {
2883 let cur_path = cur_path.clone();
2884 let page_decoder = page_load_task.decoder_fut;
2885 let unloaded_page = async move {
2886 let page_decoder = page_decoder.await?;
2887 Ok(LoadedPageShard {
2888 decoder: page_decoder,
2889 path: cur_path,
2890 })
2891 }
2892 .boxed();
2893 Ok(ScheduledScanLine {
2894 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
2895 rows_scheduled: page_load_task.num_rows,
2896 })
2897 })
2898 .collect::<Result<Vec<_>>>()
2899 }
2900}
2901
2902#[derive(Debug)]
2903struct PageInfoAndScheduler {
2904 page_index: usize,
2905 num_rows: u64,
2906 scheduler: Box<dyn StructuralPageScheduler>,
2907}
2908
2909#[derive(Debug)]
2914pub struct StructuralPrimitiveFieldScheduler {
2915 page_schedulers: Vec<PageInfoAndScheduler>,
2916 column_index: u32,
2917}
2918
2919impl StructuralPrimitiveFieldScheduler {
2920 pub fn try_new(
2921 column_info: &ColumnInfo,
2922 decompressors: &dyn DecompressionStrategy,
2923 cache_repetition_index: bool,
2924 target_field: &Field,
2925 ) -> Result<Self> {
2926 let page_schedulers = column_info
2927 .page_infos
2928 .iter()
2929 .enumerate()
2930 .map(|(page_index, page_info)| {
2931 Self::page_info_to_scheduler(
2932 page_info,
2933 page_index,
2934 decompressors,
2935 cache_repetition_index,
2936 target_field,
2937 )
2938 })
2939 .collect::<Result<Vec<_>>>()?;
2940 Ok(Self {
2941 page_schedulers,
2942 column_index: column_info.index,
2943 })
2944 }
2945
2946 fn page_layout_to_scheduler(
2947 page_info: &PageInfo,
2948 page_layout: &PageLayout,
2949 decompressors: &dyn DecompressionStrategy,
2950 cache_repetition_index: bool,
2951 target_field: &Field,
2952 ) -> Result<Box<dyn StructuralPageScheduler>> {
2953 use pb21::page_layout::Layout;
2954 Ok(match page_layout.layout.as_ref().expect_ok()? {
2955 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
2956 &page_info.buffer_offsets_and_sizes,
2957 page_info.priority,
2958 mini_block.num_items,
2959 mini_block,
2960 decompressors,
2961 )?),
2962 Layout::FullZipLayout(full_zip) => {
2963 let mut scheduler = FullZipScheduler::try_new(
2964 &page_info.buffer_offsets_and_sizes,
2965 page_info.priority,
2966 page_info.num_rows,
2967 full_zip,
2968 decompressors,
2969 )?;
2970 scheduler.enable_cache = cache_repetition_index;
2971 Box::new(scheduler)
2972 }
2973 Layout::AllNullLayout(all_null) => {
2974 let def_meaning = all_null
2975 .layers
2976 .iter()
2977 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2978 .collect::<Vec<_>>();
2979 if def_meaning.len() == 1
2980 && def_meaning[0] == DefinitionInterpretation::NullableItem
2981 {
2982 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
2983 } else {
2984 Box::new(ComplexAllNullScheduler::new(
2985 page_info.buffer_offsets_and_sizes.clone(),
2986 def_meaning.into(),
2987 )) as Box<dyn StructuralPageScheduler>
2988 }
2989 }
2990 Layout::BlobLayout(blob) => {
2991 let inner_scheduler = Self::page_layout_to_scheduler(
2992 page_info,
2993 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
2994 decompressors,
2995 cache_repetition_index,
2996 target_field,
2997 )?;
2998 let def_meaning = blob
2999 .layers
3000 .iter()
3001 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3002 .collect::<Vec<_>>();
3003 if matches!(target_field.data_type(), DataType::Struct(_)) {
3004 Box::new(BlobDescriptionPageScheduler::new(
3006 inner_scheduler,
3007 def_meaning.into(),
3008 ))
3009 } else {
3010 Box::new(BlobPageScheduler::new(
3012 inner_scheduler,
3013 page_info.priority,
3014 page_info.num_rows,
3015 def_meaning.into(),
3016 ))
3017 }
3018 }
3019 })
3020 }
3021
3022 fn page_info_to_scheduler(
3023 page_info: &PageInfo,
3024 page_index: usize,
3025 decompressors: &dyn DecompressionStrategy,
3026 cache_repetition_index: bool,
3027 target_field: &Field,
3028 ) -> Result<PageInfoAndScheduler> {
3029 let page_layout = page_info.encoding.as_structural();
3030 let scheduler = Self::page_layout_to_scheduler(
3031 page_info,
3032 page_layout,
3033 decompressors,
3034 cache_repetition_index,
3035 target_field,
3036 )?;
3037 Ok(PageInfoAndScheduler {
3038 page_index,
3039 num_rows: page_info.num_rows,
3040 scheduler,
3041 })
3042 }
3043}
3044
3045pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3046 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3047}
3048
3049pub struct NoCachedPageData;
3050
3051impl DeepSizeOf for NoCachedPageData {
3052 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3053 0
3054 }
3055}
3056impl CachedPageData for NoCachedPageData {
3057 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3058 self
3059 }
3060}
3061
3062pub struct CachedFieldData {
3063 pages: Vec<Arc<dyn CachedPageData>>,
3064}
3065
3066impl DeepSizeOf for CachedFieldData {
3067 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3068 self.pages.deep_size_of_children(ctx)
3069 }
3070}
3071
3072#[derive(Debug, Clone)]
3074pub struct FieldDataCacheKey {
3075 pub column_index: u32,
3076}
3077
3078impl CacheKey for FieldDataCacheKey {
3079 type ValueType = CachedFieldData;
3080
3081 fn key(&self) -> std::borrow::Cow<'_, str> {
3082 self.column_index.to_string().into()
3083 }
3084}
3085
3086impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3087 fn initialize<'a>(
3088 &'a mut self,
3089 _filter: &'a FilterExpression,
3090 context: &'a SchedulerContext,
3091 ) -> BoxFuture<'a, Result<()>> {
3092 let cache_key = FieldDataCacheKey {
3093 column_index: self.column_index,
3094 };
3095 let cache = context.cache().clone();
3096
3097 async move {
3098 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3099 self.page_schedulers
3100 .iter_mut()
3101 .zip(cached_data.pages.iter())
3102 .for_each(|(page_scheduler, cached_data)| {
3103 page_scheduler.scheduler.load(cached_data);
3104 });
3105 return Ok(());
3106 }
3107
3108 let page_data = self
3109 .page_schedulers
3110 .iter_mut()
3111 .map(|s| s.scheduler.initialize(context.io()))
3112 .collect::<FuturesOrdered<_>>();
3113
3114 let page_data = page_data.try_collect::<Vec<_>>().await?;
3115 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3116 cache.insert_with_key(&cache_key, cached_data).await;
3117 Ok(())
3118 }
3119 .boxed()
3120 }
3121
3122 fn schedule_ranges<'a>(
3123 &'a self,
3124 ranges: &[Range<u64>],
3125 _filter: &FilterExpression,
3126 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3127 let ranges = ranges.to_vec();
3128 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3129 self, ranges,
3130 )))
3131 }
3132}
3133
3134#[derive(Debug)]
3137pub struct StructuralCompositeDecodeArrayTask {
3138 tasks: Vec<Box<dyn DecodePageTask>>,
3139 should_validate: bool,
3140 data_type: DataType,
3141}
3142
3143impl StructuralCompositeDecodeArrayTask {
3144 fn restore_validity(
3145 array: Arc<dyn Array>,
3146 unraveler: &mut CompositeRepDefUnraveler,
3147 ) -> Arc<dyn Array> {
3148 let validity = unraveler.unravel_validity(array.len());
3149 let Some(validity) = validity else {
3150 return array;
3151 };
3152 if array.data_type() == &DataType::Null {
3153 return array;
3155 }
3156 assert_eq!(validity.len(), array.len());
3157 make_array(unsafe {
3160 array
3161 .to_data()
3162 .into_builder()
3163 .nulls(Some(validity))
3164 .build_unchecked()
3165 })
3166 }
3167}
3168
3169impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3170 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3171 let mut arrays = Vec::with_capacity(self.tasks.len());
3172 let mut unravelers = Vec::with_capacity(self.tasks.len());
3173 for task in self.tasks {
3174 let decoded = task.decode()?;
3175 unravelers.push(decoded.repdef);
3176
3177 let array = make_array(
3178 decoded
3179 .data
3180 .into_arrow(self.data_type.clone(), self.should_validate)?,
3181 );
3182
3183 arrays.push(array);
3184 }
3185 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3186 let array = arrow_select::concat::concat(&array_refs)?;
3187 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3188
3189 let array = Self::restore_validity(array, &mut repdef);
3190
3191 Ok(DecodedArray { array, repdef })
3192 }
3193}
3194
3195#[derive(Debug)]
3196pub struct StructuralPrimitiveFieldDecoder {
3197 field: Arc<ArrowField>,
3198 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3199 should_validate: bool,
3200 rows_drained_in_current: u64,
3201}
3202
3203impl StructuralPrimitiveFieldDecoder {
3204 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3205 Self {
3206 field: field.clone(),
3207 page_decoders: VecDeque::new(),
3208 should_validate,
3209 rows_drained_in_current: 0,
3210 }
3211 }
3212}
3213
3214impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3215 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3216 assert!(child.path.is_empty());
3217 self.page_decoders.push_back(child.decoder);
3218 Ok(())
3219 }
3220
3221 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3222 let mut remaining = num_rows;
3223 let mut tasks = Vec::new();
3224 while remaining > 0 {
3225 let cur_page = self.page_decoders.front_mut().unwrap();
3226 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3227 let to_take = num_in_page.min(remaining);
3228
3229 let task = cur_page.drain(to_take)?;
3230 tasks.push(task);
3231
3232 if to_take == num_in_page {
3233 self.page_decoders.pop_front();
3234 self.rows_drained_in_current = 0;
3235 } else {
3236 self.rows_drained_in_current += to_take;
3237 }
3238
3239 remaining -= to_take;
3240 }
3241 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3242 tasks,
3243 should_validate: self.should_validate,
3244 data_type: self.field.data_type().clone(),
3245 }))
3246 }
3247
3248 fn data_type(&self) -> &DataType {
3249 self.field.data_type()
3250 }
3251}
3252
3253struct SerializedFullZip {
3255 values: LanceBuffer,
3257 repetition_index: Option<LanceBuffer>,
3259}
3260
3261const MINIBLOCK_ALIGNMENT: usize = 8;
3281
3282pub struct PrimitiveStructuralEncoder {
3309 accumulation_queue: AccumulationQueue,
3311
3312 keep_original_array: bool,
3313 accumulated_repdefs: Vec<RepDefBuilder>,
3314 compression_strategy: Arc<dyn CompressionStrategy>,
3316 column_index: u32,
3317 field: Field,
3318 encoding_metadata: Arc<HashMap<String, String>>,
3319}
3320
3321struct CompressedLevelsChunk {
3322 data: LanceBuffer,
3323 num_levels: u16,
3324}
3325
3326struct CompressedLevels {
3327 data: Vec<CompressedLevelsChunk>,
3328 compression: CompressiveEncoding,
3329 rep_index: Option<LanceBuffer>,
3330}
3331
3332struct SerializedMiniBlockPage {
3333 num_buffers: u64,
3334 data: LanceBuffer,
3335 metadata: LanceBuffer,
3336}
3337
3338impl PrimitiveStructuralEncoder {
3339 pub fn try_new(
3340 options: &EncodingOptions,
3341 compression_strategy: Arc<dyn CompressionStrategy>,
3342 column_index: u32,
3343 field: Field,
3344 encoding_metadata: Arc<HashMap<String, String>>,
3345 ) -> Result<Self> {
3346 Ok(Self {
3347 accumulation_queue: AccumulationQueue::new(
3348 options.cache_bytes_per_column,
3349 column_index,
3350 options.keep_original_array,
3351 ),
3352 keep_original_array: options.keep_original_array,
3353 accumulated_repdefs: Vec::new(),
3354 column_index,
3355 compression_strategy,
3356 field,
3357 encoding_metadata,
3358 })
3359 }
3360
3361 fn is_narrow(data_block: &DataBlock) -> bool {
3369 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3370
3371 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3372 let max_len_array = max_len_array
3373 .as_any()
3374 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3375 .unwrap();
3376 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3377 return true;
3378 }
3379 }
3380 false
3381 }
3382
3383 fn prefers_miniblock(
3384 data_block: &DataBlock,
3385 encoding_metadata: &HashMap<String, String>,
3386 ) -> bool {
3387 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3389 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3390 }
3391 Self::is_narrow(data_block)
3393 }
3394
3395 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3396 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3400 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3401 }
3402 true
3403 }
3404
3405 fn serialize_miniblocks(
3452 miniblocks: MiniBlockCompressed,
3453 rep: Option<Vec<CompressedLevelsChunk>>,
3454 def: Option<Vec<CompressedLevelsChunk>>,
3455 ) -> SerializedMiniBlockPage {
3456 let bytes_rep = rep
3457 .as_ref()
3458 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3459 .unwrap_or(0);
3460 let bytes_def = def
3461 .as_ref()
3462 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3463 .unwrap_or(0);
3464 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3465 let mut num_buffers = miniblocks.data.len();
3466 if rep.is_some() {
3467 num_buffers += 1;
3468 }
3469 if def.is_some() {
3470 num_buffers += 1;
3471 }
3472 let max_extra = 9 * num_buffers;
3474 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3475 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3476
3477 let mut rep_iter = rep.map(|r| r.into_iter());
3478 let mut def_iter = def.map(|d| d.into_iter());
3479
3480 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3481 for chunk in miniblocks.chunks {
3482 let start_pos = data_buffer.len();
3483 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3485
3486 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3487 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3488
3489 let num_levels = rep
3491 .as_ref()
3492 .map(|r| r.num_levels)
3493 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3494 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3495
3496 if let Some(rep) = rep.as_ref() {
3498 let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3499 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3500 }
3501 if let Some(def) = def.as_ref() {
3502 let bytes_def = u16::try_from(def.data.len()).unwrap();
3503 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3504 }
3505
3506 for buffer_size in &chunk.buffer_sizes {
3507 let bytes = *buffer_size;
3508 data_buffer.extend_from_slice(&bytes.to_le_bytes());
3509 }
3510
3511 let add_padding = |data_buffer: &mut Vec<u8>| {
3513 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3514 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3515 };
3516 add_padding(&mut data_buffer);
3517
3518 if let Some(rep) = rep.as_ref() {
3520 data_buffer.extend_from_slice(&rep.data);
3521 add_padding(&mut data_buffer);
3522 }
3523 if let Some(def) = def.as_ref() {
3524 data_buffer.extend_from_slice(&def.data);
3525 add_padding(&mut data_buffer);
3526 }
3527 for (buffer_size, (buffer, buffer_offset)) in chunk
3528 .buffer_sizes
3529 .iter()
3530 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3531 {
3532 let start = *buffer_offset;
3533 let end = start + *buffer_size as usize;
3534 *buffer_offset += *buffer_size as usize;
3535 data_buffer.extend_from_slice(&buffer[start..end]);
3536 add_padding(&mut data_buffer);
3537 }
3538
3539 let chunk_bytes = data_buffer.len() - start_pos;
3540 assert!(chunk_bytes <= 32 * 1024);
3541 assert!(chunk_bytes > 0);
3542 assert_eq!(chunk_bytes % 8, 0);
3543 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3547 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3548
3549 let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3550 meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3551 }
3552
3553 let data_buffer = LanceBuffer::from(data_buffer);
3554 let metadata_buffer = LanceBuffer::from(meta_buffer);
3555
3556 SerializedMiniBlockPage {
3557 num_buffers: miniblocks.data.len() as u64,
3558 data: data_buffer,
3559 metadata: metadata_buffer,
3560 }
3561 }
3562
3563 fn compress_levels(
3568 mut levels: RepDefSlicer<'_>,
3569 num_elements: u64,
3570 compression_strategy: &dyn CompressionStrategy,
3571 chunks: &[MiniBlockChunk],
3572 max_rep: u16,
3574 ) -> Result<CompressedLevels> {
3575 let mut rep_index = if max_rep > 0 {
3576 Vec::with_capacity(chunks.len())
3577 } else {
3578 vec![]
3579 };
3580 let num_levels = levels.num_levels() as u64;
3582 let levels_buf = levels.all_levels().clone();
3583
3584 let mut fixed_width_block = FixedWidthDataBlock {
3585 data: levels_buf,
3586 bits_per_value: 16,
3587 num_values: num_levels,
3588 block_info: BlockInfo::new(),
3589 };
3590 fixed_width_block.compute_stat();
3592
3593 let levels_block = DataBlock::FixedWidth(fixed_width_block);
3594 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3595 let (compressor, compressor_desc) =
3597 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3598 let mut level_chunks = Vec::with_capacity(chunks.len());
3600 let mut values_counter = 0;
3601 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3602 let chunk_num_values = chunk.num_values(values_counter, num_elements);
3603 debug_assert!(chunk_num_values > 0);
3604 values_counter += chunk_num_values;
3605 let chunk_levels = if chunk_idx < chunks.len() - 1 {
3606 levels.slice_next(chunk_num_values as usize)
3607 } else {
3608 levels.slice_rest()
3609 };
3610 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3611 if max_rep > 0 {
3612 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3622 let rep_values = rep_values.as_ref();
3623
3624 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3627 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3628 rep_values
3629 .iter()
3630 .rev()
3631 .position(|v| *v == max_rep)
3632 .map(|pos| pos + 1)
3634 .unwrap_or(rep_values.len())
3635 } else {
3636 0
3638 };
3639
3640 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
3641 let rep_len = rep_index.len();
3645 if rep_index[rep_len - 1] != 0 {
3646 rep_index[rep_len - 2] += 1;
3648 rep_index[rep_len - 1] = 0;
3649 }
3650 }
3651
3652 if chunk_idx == chunks.len() - 1 {
3653 num_rows += 1;
3655 }
3656 rep_index.push(num_rows as u64);
3657 rep_index.push(num_leftovers as u64);
3658 }
3659 let mut chunk_fixed_width = FixedWidthDataBlock {
3660 data: chunk_levels,
3661 bits_per_value: 16,
3662 num_values: num_chunk_levels,
3663 block_info: BlockInfo::new(),
3664 };
3665 chunk_fixed_width.compute_stat();
3666 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
3667 let compressed_levels = compressor.compress(chunk_levels_block)?;
3668 level_chunks.push(CompressedLevelsChunk {
3669 data: compressed_levels,
3670 num_levels: num_chunk_levels as u16,
3671 });
3672 }
3673 debug_assert_eq!(levels.num_levels_remaining(), 0);
3674 let rep_index = if rep_index.is_empty() {
3675 None
3676 } else {
3677 Some(LanceBuffer::reinterpret_vec(rep_index))
3678 };
3679 Ok(CompressedLevels {
3680 data: level_chunks,
3681 compression: compressor_desc,
3682 rep_index,
3683 })
3684 }
3685
3686 fn encode_simple_all_null(
3687 column_idx: u32,
3688 num_rows: u64,
3689 row_number: u64,
3690 ) -> Result<EncodedPage> {
3691 let description = ProtobufUtils21::simple_all_null_layout();
3692 Ok(EncodedPage {
3693 column_idx,
3694 data: vec![],
3695 description: PageEncoding::Structural(description),
3696 num_rows,
3697 row_number,
3698 })
3699 }
3700
3701 fn encode_complex_all_null(
3705 column_idx: u32,
3706 repdefs: Vec<RepDefBuilder>,
3707 row_number: u64,
3708 num_rows: u64,
3709 ) -> Result<EncodedPage> {
3710 let repdef = RepDefBuilder::serialize(repdefs);
3711
3712 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3714 LanceBuffer::reinterpret_slice(rep.clone())
3715 } else {
3716 LanceBuffer::empty()
3717 };
3718
3719 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3720 LanceBuffer::reinterpret_slice(def.clone())
3721 } else {
3722 LanceBuffer::empty()
3723 };
3724
3725 let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning);
3726 Ok(EncodedPage {
3727 column_idx,
3728 data: vec![rep_bytes, def_bytes],
3729 description: PageEncoding::Structural(description),
3730 num_rows,
3731 row_number,
3732 })
3733 }
3734
3735 #[allow(clippy::too_many_arguments)]
3736 fn encode_miniblock(
3737 column_idx: u32,
3738 field: &Field,
3739 compression_strategy: &dyn CompressionStrategy,
3740 data: DataBlock,
3741 repdefs: Vec<RepDefBuilder>,
3742 row_number: u64,
3743 dictionary_data: Option<DataBlock>,
3744 num_rows: u64,
3745 ) -> Result<EncodedPage> {
3746 let repdef = RepDefBuilder::serialize(repdefs);
3747
3748 if let DataBlock::AllNull(_null_block) = data {
3749 unreachable!()
3752 }
3753
3754 let num_items = data.num_values();
3755
3756 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3757 let (compressed_data, value_encoding) = compressor.compress(data)?;
3758
3759 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3760
3761 let mut compressed_rep = repdef
3762 .rep_slicer()
3763 .map(|rep_slicer| {
3764 Self::compress_levels(
3765 rep_slicer,
3766 num_items,
3767 compression_strategy,
3768 &compressed_data.chunks,
3769 max_rep,
3770 )
3771 })
3772 .transpose()?;
3773
3774 let (rep_index, rep_index_depth) =
3775 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3776 Some(rep_index) => (Some(rep_index.clone()), 1),
3777 None => (None, 0),
3778 };
3779
3780 let mut compressed_def = repdef
3781 .def_slicer()
3782 .map(|def_slicer| {
3783 Self::compress_levels(
3784 def_slicer,
3785 num_items,
3786 compression_strategy,
3787 &compressed_data.chunks,
3788 0,
3789 )
3790 })
3791 .transpose()?;
3792
3793 let rep_data = compressed_rep
3799 .as_mut()
3800 .map(|cr| std::mem::take(&mut cr.data));
3801 let def_data = compressed_def
3802 .as_mut()
3803 .map(|cd| std::mem::take(&mut cd.data));
3804
3805 let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3806
3807 let mut data = Vec::with_capacity(4);
3809 data.push(serialized.metadata);
3810 data.push(serialized.data);
3811
3812 if let Some(dictionary_data) = dictionary_data {
3813 let num_dictionary_items = dictionary_data.num_values();
3814 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3816
3817 let (compressor, dictionary_encoding) = compression_strategy
3818 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3819 let dictionary_buffer = compressor.compress(dictionary_data)?;
3820
3821 data.push(dictionary_buffer);
3822 if let Some(rep_index) = rep_index {
3823 data.push(rep_index);
3824 }
3825
3826 let description = ProtobufUtils21::miniblock_layout(
3827 compressed_rep.map(|cr| cr.compression),
3828 compressed_def.map(|cd| cd.compression),
3829 value_encoding,
3830 rep_index_depth,
3831 serialized.num_buffers,
3832 Some((dictionary_encoding, num_dictionary_items)),
3833 &repdef.def_meaning,
3834 num_items,
3835 );
3836 Ok(EncodedPage {
3837 num_rows,
3838 column_idx,
3839 data,
3840 description: PageEncoding::Structural(description),
3841 row_number,
3842 })
3843 } else {
3844 let description = ProtobufUtils21::miniblock_layout(
3845 compressed_rep.map(|cr| cr.compression),
3846 compressed_def.map(|cd| cd.compression),
3847 value_encoding,
3848 rep_index_depth,
3849 serialized.num_buffers,
3850 None,
3851 &repdef.def_meaning,
3852 num_items,
3853 );
3854
3855 if let Some(rep_index) = rep_index {
3856 let view = rep_index.borrow_to_typed_slice::<u64>();
3857 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3858 debug_assert_eq!(total, num_rows);
3859
3860 data.push(rep_index);
3861 }
3862
3863 Ok(EncodedPage {
3864 num_rows,
3865 column_idx,
3866 data,
3867 description: PageEncoding::Structural(description),
3868 row_number,
3869 })
3870 }
3871 }
3872
3873 fn serialize_full_zip_fixed(
3875 fixed: FixedWidthDataBlock,
3876 mut repdef: ControlWordIterator,
3877 num_values: u64,
3878 ) -> SerializedFullZip {
3879 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3880 let mut zipped_data = Vec::with_capacity(len);
3881
3882 let max_rep_index_val = if repdef.has_repetition() {
3883 len as u64
3884 } else {
3885 0
3887 };
3888 let mut rep_index_builder =
3889 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3890
3891 assert_eq!(
3894 fixed.bits_per_value % 8,
3895 0,
3896 "Non-byte aligned full-zip compression not yet supported"
3897 );
3898
3899 let bytes_per_value = fixed.bits_per_value as usize / 8;
3900 let mut offset = 0;
3901
3902 if bytes_per_value == 0 {
3903 while let Some(control) = repdef.append_next(&mut zipped_data) {
3905 if control.is_new_row {
3906 debug_assert!(offset <= len);
3908 unsafe { rep_index_builder.append(offset as u64) };
3910 }
3911 offset = zipped_data.len();
3912 }
3913 } else {
3914 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3916 while let Some(control) = repdef.append_next(&mut zipped_data) {
3917 if control.is_new_row {
3918 debug_assert!(offset <= len);
3920 unsafe { rep_index_builder.append(offset as u64) };
3922 }
3923 if control.is_visible {
3924 let value = data_iter.next().unwrap();
3925 zipped_data.extend_from_slice(value);
3926 }
3927 offset = zipped_data.len();
3928 }
3929 }
3930
3931 debug_assert_eq!(zipped_data.len(), len);
3932 unsafe {
3935 rep_index_builder.append(zipped_data.len() as u64);
3936 }
3937
3938 let zipped_data = LanceBuffer::from(zipped_data);
3939 let rep_index = rep_index_builder.into_data();
3940 let rep_index = if rep_index.is_empty() {
3941 None
3942 } else {
3943 Some(LanceBuffer::from(rep_index))
3944 };
3945 SerializedFullZip {
3946 values: zipped_data,
3947 repetition_index: rep_index,
3948 }
3949 }
3950
3951 fn serialize_full_zip_variable(
3955 variable: VariableWidthBlock,
3956 mut repdef: ControlWordIterator,
3957 num_items: u64,
3958 ) -> SerializedFullZip {
3959 let bytes_per_offset = variable.bits_per_offset as usize / 8;
3960 assert_eq!(
3961 variable.bits_per_offset % 8,
3962 0,
3963 "Only byte-aligned offsets supported"
3964 );
3965 let len = variable.data.len()
3966 + repdef.bytes_per_word() * num_items as usize
3967 + bytes_per_offset * variable.num_values as usize;
3968 let mut buf = Vec::with_capacity(len);
3969
3970 let max_rep_index_val = len as u64;
3971 let mut rep_index_builder =
3972 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
3973
3974 match bytes_per_offset {
3976 4 => {
3977 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
3978 let mut rep_offset = 0;
3979 let mut windows_iter = offs.as_ref().windows(2);
3980 while let Some(control) = repdef.append_next(&mut buf) {
3981 if control.is_new_row {
3982 debug_assert!(rep_offset <= len);
3984 unsafe { rep_index_builder.append(rep_offset as u64) };
3986 }
3987 if control.is_visible {
3988 let window = windows_iter.next().unwrap();
3989 if control.is_valid_item {
3990 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3991 buf.extend_from_slice(
3992 &variable.data[window[0] as usize..window[1] as usize],
3993 );
3994 }
3995 }
3996 rep_offset = buf.len();
3997 }
3998 }
3999 8 => {
4000 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4001 let mut rep_offset = 0;
4002 let mut windows_iter = offs.as_ref().windows(2);
4003 while let Some(control) = repdef.append_next(&mut buf) {
4004 if control.is_new_row {
4005 debug_assert!(rep_offset <= len);
4007 unsafe { rep_index_builder.append(rep_offset as u64) };
4009 }
4010 if control.is_visible {
4011 let window = windows_iter.next().unwrap();
4012 if control.is_valid_item {
4013 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4014 buf.extend_from_slice(
4015 &variable.data[window[0] as usize..window[1] as usize],
4016 );
4017 }
4018 }
4019 rep_offset = buf.len();
4020 }
4021 }
4022 _ => panic!("Unsupported offset size"),
4023 }
4024
4025 debug_assert!(buf.len() <= len);
4028 unsafe {
4031 rep_index_builder.append(buf.len() as u64);
4032 }
4033
4034 let zipped_data = LanceBuffer::from(buf);
4035 let rep_index = rep_index_builder.into_data();
4036 debug_assert!(!rep_index.is_empty());
4037 let rep_index = Some(LanceBuffer::from(rep_index));
4038 SerializedFullZip {
4039 values: zipped_data,
4040 repetition_index: rep_index,
4041 }
4042 }
4043
4044 fn serialize_full_zip(
4047 compressed_data: PerValueDataBlock,
4048 repdef: ControlWordIterator,
4049 num_items: u64,
4050 ) -> SerializedFullZip {
4051 match compressed_data {
4052 PerValueDataBlock::Fixed(fixed) => {
4053 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4054 }
4055 PerValueDataBlock::Variable(var) => {
4056 Self::serialize_full_zip_variable(var, repdef, num_items)
4057 }
4058 }
4059 }
4060
4061 fn encode_full_zip(
4062 column_idx: u32,
4063 field: &Field,
4064 compression_strategy: &dyn CompressionStrategy,
4065 data: DataBlock,
4066 repdefs: Vec<RepDefBuilder>,
4067 row_number: u64,
4068 num_lists: u64,
4069 ) -> Result<EncodedPage> {
4070 let repdef = RepDefBuilder::serialize(repdefs);
4071 let max_rep = repdef
4072 .repetition_levels
4073 .as_ref()
4074 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4075 let max_def = repdef
4076 .definition_levels
4077 .as_ref()
4078 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4079
4080 let (num_items, num_visible_items) =
4084 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4085 (rep_levels.len() as u64, data.num_values())
4088 } else {
4089 (data.num_values(), data.num_values())
4091 };
4092
4093 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4094
4095 let repdef_iter = build_control_word_iterator(
4096 repdef.repetition_levels.as_deref(),
4097 max_rep,
4098 repdef.definition_levels.as_deref(),
4099 max_def,
4100 max_visible_def,
4101 num_items as usize,
4102 );
4103 let bits_rep = repdef_iter.bits_rep();
4104 let bits_def = repdef_iter.bits_def();
4105
4106 let compressor = compression_strategy.create_per_value(field, &data)?;
4107 let (compressed_data, value_encoding) = compressor.compress(data)?;
4108
4109 let description = match &compressed_data {
4110 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4111 bits_rep,
4112 bits_def,
4113 fixed.bits_per_value as u32,
4114 value_encoding,
4115 &repdef.def_meaning,
4116 num_items as u32,
4117 num_visible_items as u32,
4118 ),
4119 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4120 bits_rep,
4121 bits_def,
4122 variable.bits_per_offset as u32,
4123 value_encoding,
4124 &repdef.def_meaning,
4125 num_items as u32,
4126 num_visible_items as u32,
4127 ),
4128 };
4129
4130 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4131
4132 let data = if let Some(repindex) = zipped.repetition_index {
4133 vec![zipped.values, repindex]
4134 } else {
4135 vec![zipped.values]
4136 };
4137
4138 Ok(EncodedPage {
4139 num_rows: num_lists,
4140 column_idx,
4141 data,
4142 description: PageEncoding::Structural(description),
4143 row_number,
4144 })
4145 }
4146
4147 fn estimate_dict_size(data_block: &DataBlock) -> Option<u64> {
4162 let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4163 cardinality_array.as_primitive::<UInt64Type>().value(0)
4164 } else {
4165 return None;
4166 };
4167
4168 let num_values = data_block.num_values();
4169
4170 match data_block {
4171 DataBlock::FixedWidth(_) => {
4172 let dict_size = cardinality * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
4174 let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8);
4176 Some(dict_size + indices_size)
4177 }
4178 DataBlock::VariableWidth(var) => {
4179 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4181 return None;
4182 }
4183 let bits_per_offset = var.bits_per_offset as u64;
4184
4185 let data_size = data_block.data_size();
4186 let avg_value_size = data_size / num_values;
4187
4188 let dict_values_size = cardinality * avg_value_size;
4190 let dict_offsets_size = cardinality * (bits_per_offset / 8);
4192 let indices_size = num_values * (bits_per_offset / 8);
4194
4195 Some(dict_values_size + dict_offsets_size + indices_size)
4196 }
4197 _ => None,
4198 }
4199 }
4200
4201 fn should_dictionary_encode(data_block: &DataBlock, field: &Field) -> bool {
4202 if !matches!(
4205 data_block,
4206 DataBlock::FixedWidth(_) | DataBlock::VariableWidth(_)
4207 ) {
4208 return false;
4209 }
4210
4211 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4213 .ok()
4214 .and_then(|val| val.parse().ok())
4215 .unwrap_or(100);
4216 if data_block.num_values() < too_small {
4217 return false;
4218 }
4219
4220 let threshold_ratio = field
4222 .metadata
4223 .get(DICT_SIZE_RATIO_META_KEY)
4224 .and_then(|val| val.parse::<f64>().ok())
4225 .or_else(|| {
4226 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4227 .ok()
4228 .and_then(|val| val.parse().ok())
4229 })
4230 .unwrap_or(0.8);
4231
4232 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4234 panic!(
4235 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4236 threshold_ratio
4237 );
4238 }
4239
4240 let data_size = data_block.data_size();
4242
4243 let Some(encoded_size) = Self::estimate_dict_size(data_block) else {
4245 return false;
4246 };
4247
4248 let size_ratio_actual = if data_size > 0 {
4249 encoded_size as f64 / data_size as f64
4250 } else {
4251 return false;
4252 };
4253 size_ratio_actual < threshold_ratio
4254 }
4255
4256 fn do_flush(
4258 &mut self,
4259 arrays: Vec<ArrayRef>,
4260 repdefs: Vec<RepDefBuilder>,
4261 row_number: u64,
4262 num_rows: u64,
4263 ) -> Result<Vec<EncodeTask>> {
4264 let column_idx = self.column_index;
4265 let compression_strategy = self.compression_strategy.clone();
4266 let field = self.field.clone();
4267 let encoding_metadata = self.encoding_metadata.clone();
4268 let task = spawn_cpu(move || {
4269 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4270
4271 if num_values == 0 {
4272 log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
4276 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4277 }
4278 let num_nulls = arrays
4279 .iter()
4280 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4281 .sum::<u64>();
4282
4283 if num_values == num_nulls {
4284 return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4285 log::debug!(
4286 "Encoding column {} with {} items ({} rows) using simple-null layout",
4287 column_idx,
4288 num_values,
4289 num_rows
4290 );
4291 Self::encode_simple_all_null(column_idx, num_values, row_number)
4293 } else {
4294 log::debug!(
4295 "Encoding column {} with {} items ({} rows) using complex-null layout",
4296 column_idx,
4297 num_values,
4298 num_rows
4299 );
4300 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4302 };
4303 }
4304
4305 if let DataType::Struct(fields) = &field.data_type() {
4306 if fields.is_empty() {
4307 if repdefs.iter().any(|rd| !rd.is_empty()) {
4308 return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() });
4309 }
4310 return Self::encode_simple_all_null(column_idx, num_values, row_number);
4313 }
4314 }
4315
4316 let data_block = DataBlock::from_arrays(&arrays, num_values);
4317
4318 let requires_full_zip_packed_struct =
4319 if let DataBlock::Struct(ref struct_data_block) = data_block {
4320 struct_data_block.has_variable_width_child()
4321 } else {
4322 false
4323 };
4324
4325 if requires_full_zip_packed_struct {
4326 log::debug!(
4327 "Encoding column {} with {} items using full-zip packed struct layout",
4328 column_idx,
4329 num_values
4330 );
4331 return Self::encode_full_zip(
4332 column_idx,
4333 &field,
4334 compression_strategy.as_ref(),
4335 data_block,
4336 repdefs,
4337 row_number,
4338 num_rows,
4339 );
4340 }
4341
4342 if let DataBlock::Dictionary(dict) = data_block {
4343 log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
4344 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
4345 indices_data_block.compute_stat();
4350 Self::encode_miniblock(
4351 column_idx,
4352 &field,
4353 compression_strategy.as_ref(),
4354 indices_data_block,
4355 repdefs,
4356 row_number,
4357 Some(dictionary_data_block),
4358 num_rows
4359 )
4360 } else if Self::should_dictionary_encode(&data_block, &field) {
4361 log::debug!(
4362 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4363 column_idx,
4364 num_values
4365 );
4366 let (indices_data_block, dictionary_data_block) =
4367 dict::dictionary_encode(data_block);
4368 Self::encode_miniblock(
4369 column_idx,
4370 &field,
4371 compression_strategy.as_ref(),
4372 indices_data_block,
4373 repdefs,
4374 row_number,
4375 Some(dictionary_data_block),
4376 num_rows,
4377 )
4378 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4379 log::debug!(
4380 "Encoding column {} with {} items using mini-block layout",
4381 column_idx,
4382 num_values
4383 );
4384 Self::encode_miniblock(
4385 column_idx,
4386 &field,
4387 compression_strategy.as_ref(),
4388 data_block,
4389 repdefs,
4390 row_number,
4391 None,
4392 num_rows,
4393 )
4394 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4395 log::debug!(
4396 "Encoding column {} with {} items using full-zip layout",
4397 column_idx,
4398 num_values
4399 );
4400 Self::encode_full_zip(
4401 column_idx,
4402 &field,
4403 compression_strategy.as_ref(),
4404 data_block,
4405 repdefs,
4406 row_number,
4407 num_rows,
4408 )
4409 } else {
4410 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4411 }
4412 })
4413 .boxed();
4414 Ok(vec![task])
4415 }
4416
4417 fn extract_validity_buf(
4418 array: Arc<dyn Array>,
4419 repdef: &mut RepDefBuilder,
4420 keep_original_array: bool,
4421 ) -> Result<Arc<dyn Array>> {
4422 if let Some(validity) = array.nulls() {
4423 if keep_original_array {
4424 repdef.add_validity_bitmap(validity.clone());
4425 } else {
4426 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4427 }
4428 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
4429 Ok(make_array(data_no_nulls))
4430 } else {
4431 repdef.add_no_null(array.len());
4432 Ok(array)
4433 }
4434 }
4435
4436 fn extract_validity(
4437 mut array: Arc<dyn Array>,
4438 repdef: &mut RepDefBuilder,
4439 keep_original_array: bool,
4440 ) -> Result<Arc<dyn Array>> {
4441 match array.data_type() {
4442 DataType::Null => {
4443 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4444 Ok(array)
4445 }
4446 DataType::Dictionary(_, _) => {
4447 array = dict::normalize_dict_nulls(array)?;
4448 Self::extract_validity_buf(array, repdef, keep_original_array)
4449 }
4450 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4459 }
4460 }
4461}
4462
4463impl FieldEncoder for PrimitiveStructuralEncoder {
4464 fn maybe_encode(
4466 &mut self,
4467 array: ArrayRef,
4468 _external_buffers: &mut OutOfLineBuffers,
4469 mut repdef: RepDefBuilder,
4470 row_number: u64,
4471 num_rows: u64,
4472 ) -> Result<Vec<EncodeTask>> {
4473 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
4474 self.accumulated_repdefs.push(repdef);
4475
4476 if let Some((arrays, row_number, num_rows)) =
4477 self.accumulation_queue.insert(array, row_number, num_rows)
4478 {
4479 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4480 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4481 } else {
4482 Ok(vec![])
4483 }
4484 }
4485
4486 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4488 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4489 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4490 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4491 } else {
4492 Ok(vec![])
4493 }
4494 }
4495
4496 fn num_columns(&self) -> u32 {
4497 1
4498 }
4499
4500 fn finish(
4501 &mut self,
4502 _external_buffers: &mut OutOfLineBuffers,
4503 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4504 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4505 }
4506}
4507
4508#[cfg(test)]
4509#[allow(clippy::single_range_in_vec_init)]
4510mod tests {
4511 use super::{
4512 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4513 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4514 FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4515 StructuralPageScheduler,
4516 };
4517 use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
4518 use crate::data::BlockInfo;
4519 use crate::decoder::PageEncoding;
4520 use crate::encodings::logical::primitive::{
4521 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4522 };
4523 use crate::format::pb21;
4524 use crate::format::pb21::compressive_encoding::Compression;
4525 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
4526 use crate::version::LanceFileVersion;
4527 use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
4528 use arrow_schema::DataType;
4529 use std::collections::HashMap;
4530 use std::{collections::VecDeque, sync::Arc};
4531
4532 #[test]
4533 fn test_is_narrow() {
4534 let int8_array = Int8Array::from(vec![1, 2, 3]);
4535 let array_ref: ArrayRef = Arc::new(int8_array);
4536 let block = DataBlock::from_array(array_ref);
4537
4538 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4539
4540 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4541 let block = DataBlock::from_array(string_array);
4542 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4543
4544 let string_array = StringArray::from(vec![
4545 Some("hello world".repeat(100)),
4546 Some("world".to_string()),
4547 ]);
4548 let block = DataBlock::from_array(string_array);
4549 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4550 }
4551
4552 #[test]
4553 fn test_map_range() {
4554 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4557 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4558 let max_visible_def = 0;
4559 let total_items = 8;
4560 let max_rep = 1;
4561
4562 let check = |range, expected_item_range, expected_level_range| {
4563 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4564 range,
4565 rep.as_ref(),
4566 def.as_ref(),
4567 max_rep,
4568 max_visible_def,
4569 total_items,
4570 PreambleAction::Absent,
4571 );
4572 assert_eq!(item_range, expected_item_range);
4573 assert_eq!(level_range, expected_level_range);
4574 };
4575
4576 check(0..1, 0..3, 0..3);
4577 check(1..2, 3..5, 3..5);
4578 check(2..3, 5..5, 5..6);
4579 check(3..4, 5..8, 6..9);
4580 check(0..2, 0..5, 0..5);
4581 check(1..3, 3..5, 3..6);
4582 check(2..4, 5..8, 5..9);
4583 check(0..3, 0..5, 0..6);
4584 check(1..4, 3..8, 3..9);
4585 check(0..4, 0..8, 0..9);
4586
4587 let rep = Some(vec![1, 1, 0, 1]);
4590 let def = Some(vec![1, 0, 0, 0]);
4591 let max_visible_def = 0;
4592 let total_items = 3;
4593
4594 let check = |range, expected_item_range, expected_level_range| {
4595 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4596 range,
4597 rep.as_ref(),
4598 def.as_ref(),
4599 max_rep,
4600 max_visible_def,
4601 total_items,
4602 PreambleAction::Absent,
4603 );
4604 assert_eq!(item_range, expected_item_range);
4605 assert_eq!(level_range, expected_level_range);
4606 };
4607
4608 check(0..1, 0..0, 0..1);
4609 check(1..2, 0..2, 1..3);
4610 check(2..3, 2..3, 3..4);
4611 check(0..2, 0..2, 0..3);
4612 check(1..3, 0..3, 1..4);
4613 check(0..3, 0..3, 0..4);
4614
4615 let rep = Some(vec![1, 1, 0, 1]);
4618 let def = Some(vec![0, 0, 0, 1]);
4619 let max_visible_def = 0;
4620 let total_items = 3;
4621
4622 let check = |range, expected_item_range, expected_level_range| {
4623 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4624 range,
4625 rep.as_ref(),
4626 def.as_ref(),
4627 max_rep,
4628 max_visible_def,
4629 total_items,
4630 PreambleAction::Absent,
4631 );
4632 assert_eq!(item_range, expected_item_range);
4633 assert_eq!(level_range, expected_level_range);
4634 };
4635
4636 check(0..1, 0..1, 0..1);
4637 check(1..2, 1..3, 1..3);
4638 check(2..3, 3..3, 3..4);
4639 check(0..2, 0..3, 0..3);
4640 check(1..3, 1..3, 1..4);
4641 check(0..3, 0..3, 0..4);
4642
4643 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4646 let def: Option<&[u16]> = None;
4647 let max_visible_def = 0;
4648 let total_items = 6;
4649
4650 let check = |range, expected_item_range, expected_level_range| {
4651 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4652 range,
4653 rep.as_ref(),
4654 def.as_ref(),
4655 max_rep,
4656 max_visible_def,
4657 total_items,
4658 PreambleAction::Absent,
4659 );
4660 assert_eq!(item_range, expected_item_range);
4661 assert_eq!(level_range, expected_level_range);
4662 };
4663
4664 check(0..1, 0..2, 0..2);
4665 check(1..2, 2..4, 2..4);
4666 check(2..3, 4..6, 4..6);
4667 check(0..2, 0..4, 0..4);
4668 check(1..3, 2..6, 2..6);
4669 check(0..3, 0..6, 0..6);
4670
4671 let rep: Option<&[u16]> = None;
4674 let def = Some(vec![0, 0, 1, 0]);
4675 let max_visible_def = 1;
4676 let total_items = 4;
4677
4678 let check = |range, expected_item_range, expected_level_range| {
4679 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4680 range,
4681 rep.as_ref(),
4682 def.as_ref(),
4683 max_rep,
4684 max_visible_def,
4685 total_items,
4686 PreambleAction::Absent,
4687 );
4688 assert_eq!(item_range, expected_item_range);
4689 assert_eq!(level_range, expected_level_range);
4690 };
4691
4692 check(0..1, 0..1, 0..1);
4693 check(1..2, 1..2, 1..2);
4694 check(2..3, 2..3, 2..3);
4695 check(0..2, 0..2, 0..2);
4696 check(1..3, 1..3, 1..3);
4697 check(0..3, 0..3, 0..3);
4698
4699 let rep = Some(vec![0, 1, 0, 1]);
4704 let def = Some(vec![0, 0, 0, 1]);
4705 let max_visible_def = 0;
4706 let total_items = 3;
4707
4708 let check = |range, expected_item_range, expected_level_range| {
4709 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4710 range,
4711 rep.as_ref(),
4712 def.as_ref(),
4713 max_rep,
4714 max_visible_def,
4715 total_items,
4716 PreambleAction::Take,
4717 );
4718 assert_eq!(item_range, expected_item_range);
4719 assert_eq!(level_range, expected_level_range);
4720 };
4721
4722 check(0..1, 0..3, 0..3);
4724 check(0..2, 0..3, 0..4);
4725
4726 let check = |range, expected_item_range, expected_level_range| {
4727 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4728 range,
4729 rep.as_ref(),
4730 def.as_ref(),
4731 max_rep,
4732 max_visible_def,
4733 total_items,
4734 PreambleAction::Skip,
4735 );
4736 assert_eq!(item_range, expected_item_range);
4737 assert_eq!(level_range, expected_level_range);
4738 };
4739
4740 check(0..1, 1..3, 1..3);
4741 check(1..2, 3..3, 3..4);
4742 check(0..2, 1..3, 1..4);
4743
4744 let rep = Some(vec![0, 1, 1, 0]);
4749 let def = Some(vec![0, 1, 0, 0]);
4750 let max_visible_def = 0;
4751 let total_items = 4;
4752
4753 let check = |range, expected_item_range, expected_level_range| {
4754 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4755 range,
4756 rep.as_ref(),
4757 def.as_ref(),
4758 max_rep,
4759 max_visible_def,
4760 total_items,
4761 PreambleAction::Take,
4762 );
4763 assert_eq!(item_range, expected_item_range);
4764 assert_eq!(level_range, expected_level_range);
4765 };
4766
4767 check(0..1, 0..1, 0..2);
4769 check(0..2, 0..3, 0..4);
4770
4771 let check = |range, expected_item_range, expected_level_range| {
4772 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4773 range,
4774 rep.as_ref(),
4775 def.as_ref(),
4776 max_rep,
4777 max_visible_def,
4778 total_items,
4779 PreambleAction::Skip,
4780 );
4781 assert_eq!(item_range, expected_item_range);
4782 assert_eq!(level_range, expected_level_range);
4783 };
4784
4785 check(0..1, 1..1, 1..2);
4787 check(1..2, 1..3, 2..4);
4788 check(0..2, 1..3, 1..4);
4789
4790 let rep = Some(vec![0, 1, 0, 1]);
4793 let def: Option<Vec<u16>> = None;
4794 let max_visible_def = 0;
4795 let total_items = 4;
4796
4797 let check = |range, expected_item_range, expected_level_range| {
4798 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4799 range,
4800 rep.as_ref(),
4801 def.as_ref(),
4802 max_rep,
4803 max_visible_def,
4804 total_items,
4805 PreambleAction::Take,
4806 );
4807 assert_eq!(item_range, expected_item_range);
4808 assert_eq!(level_range, expected_level_range);
4809 };
4810
4811 check(0..1, 0..3, 0..3);
4813 check(0..2, 0..4, 0..4);
4814
4815 let check = |range, expected_item_range, expected_level_range| {
4816 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4817 range,
4818 rep.as_ref(),
4819 def.as_ref(),
4820 max_rep,
4821 max_visible_def,
4822 total_items,
4823 PreambleAction::Skip,
4824 );
4825 assert_eq!(item_range, expected_item_range);
4826 assert_eq!(level_range, expected_level_range);
4827 };
4828
4829 check(0..1, 1..3, 1..3);
4830 check(1..2, 3..4, 3..4);
4831 check(0..2, 1..4, 1..4);
4832
4833 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
4837 let def = Some(vec![0, 1, 2, 0, 0, 0]);
4838 let max_rep = 2;
4839 let max_visible_def = 0;
4840 let total_items = 4;
4841
4842 let check = |range, expected_item_range, expected_level_range| {
4843 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4844 range,
4845 rep.as_ref(),
4846 def.as_ref(),
4847 max_rep,
4848 max_visible_def,
4849 total_items,
4850 PreambleAction::Absent,
4851 );
4852 assert_eq!(item_range, expected_item_range);
4853 assert_eq!(level_range, expected_level_range);
4854 };
4855
4856 check(0..3, 0..4, 0..6);
4857 check(0..1, 0..1, 0..2);
4858 check(1..2, 1..3, 2..5);
4859 check(2..3, 3..4, 5..6);
4860
4861 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
4863 let def = Some(vec![0, 1, 0, 0, 0, 0]);
4864 let max_rep = 1;
4865 let max_visible_def = 0;
4866 let total_items = 5;
4867
4868 let check = |range, expected_item_range, expected_level_range| {
4869 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4870 range,
4871 rep.as_ref(),
4872 def.as_ref(),
4873 max_rep,
4874 max_visible_def,
4875 total_items,
4876 PreambleAction::Take,
4877 );
4878 assert_eq!(item_range, expected_item_range);
4879 assert_eq!(level_range, expected_level_range);
4880 };
4881
4882 check(0..0, 0..1, 0..2);
4883 check(0..1, 0..3, 0..4);
4884 check(0..2, 0..4, 0..5);
4885
4886 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
4889 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
4890 let max_rep = 1;
4891 let max_visible_def = 0;
4892 let total_items = 5;
4893
4894 let check = |range, expected_item_range, expected_level_range| {
4895 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4896 range,
4897 rep.as_ref(),
4898 def.as_ref(),
4899 max_rep,
4900 max_visible_def,
4901 total_items,
4902 PreambleAction::Skip,
4903 );
4904 assert_eq!(item_range, expected_item_range);
4905 assert_eq!(level_range, expected_level_range);
4906 };
4907
4908 check(2..3, 2..4, 5..7);
4909 }
4910
4911 #[test]
4912 fn test_schedule_instructions() {
4913 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
4915 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
4916 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
4917
4918 let check = |user_ranges, expected_instructions| {
4919 let instructions =
4920 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4921 assert_eq!(instructions, expected_instructions);
4922 };
4923
4924 let expected_take_all = vec![
4926 ChunkInstructions {
4927 chunk_idx: 0,
4928 preamble: PreambleAction::Absent,
4929 rows_to_skip: 0,
4930 rows_to_take: 6,
4931 take_trailer: true,
4932 },
4933 ChunkInstructions {
4934 chunk_idx: 1,
4935 preamble: PreambleAction::Take,
4936 rows_to_skip: 0,
4937 rows_to_take: 2,
4938 take_trailer: false,
4939 },
4940 ChunkInstructions {
4941 chunk_idx: 2,
4942 preamble: PreambleAction::Absent,
4943 rows_to_skip: 0,
4944 rows_to_take: 5,
4945 take_trailer: true,
4946 },
4947 ChunkInstructions {
4948 chunk_idx: 3,
4949 preamble: PreambleAction::Take,
4950 rows_to_skip: 0,
4951 rows_to_take: 1,
4952 take_trailer: false,
4953 },
4954 ];
4955
4956 check(&[0..14], expected_take_all.clone());
4958
4959 check(
4961 &[
4962 0..1,
4963 1..2,
4964 2..3,
4965 3..4,
4966 4..5,
4967 5..6,
4968 6..7,
4969 7..8,
4970 8..9,
4971 9..10,
4972 10..11,
4973 11..12,
4974 12..13,
4975 13..14,
4976 ],
4977 expected_take_all,
4978 );
4979
4980 check(
4984 &[0..1, 3..4],
4985 vec![
4986 ChunkInstructions {
4987 chunk_idx: 0,
4988 preamble: PreambleAction::Absent,
4989 rows_to_skip: 0,
4990 rows_to_take: 1,
4991 take_trailer: false,
4992 },
4993 ChunkInstructions {
4994 chunk_idx: 0,
4995 preamble: PreambleAction::Absent,
4996 rows_to_skip: 3,
4997 rows_to_take: 1,
4998 take_trailer: false,
4999 },
5000 ],
5001 );
5002
5003 check(
5005 &[5..6],
5006 vec![
5007 ChunkInstructions {
5008 chunk_idx: 0,
5009 preamble: PreambleAction::Absent,
5010 rows_to_skip: 5,
5011 rows_to_take: 1,
5012 take_trailer: true,
5013 },
5014 ChunkInstructions {
5015 chunk_idx: 1,
5016 preamble: PreambleAction::Take,
5017 rows_to_skip: 0,
5018 rows_to_take: 0,
5019 take_trailer: false,
5020 },
5021 ],
5022 );
5023
5024 check(
5026 &[7..10],
5027 vec![
5028 ChunkInstructions {
5029 chunk_idx: 1,
5030 preamble: PreambleAction::Skip,
5031 rows_to_skip: 1,
5032 rows_to_take: 1,
5033 take_trailer: false,
5034 },
5035 ChunkInstructions {
5036 chunk_idx: 2,
5037 preamble: PreambleAction::Absent,
5038 rows_to_skip: 0,
5039 rows_to_take: 2,
5040 take_trailer: false,
5041 },
5042 ],
5043 );
5044 }
5045
5046 #[test]
5047 fn test_drain_instructions() {
5048 fn drain_from_instructions(
5049 instructions: &mut VecDeque<ChunkInstructions>,
5050 mut rows_desired: u64,
5051 need_preamble: &mut bool,
5052 skip_in_chunk: &mut u64,
5053 ) -> Vec<ChunkDrainInstructions> {
5054 let mut drain_instructions = Vec::with_capacity(instructions.len());
5056 while rows_desired > 0 || *need_preamble {
5057 let (next_instructions, consumed_chunk) = instructions
5058 .front()
5059 .unwrap()
5060 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5061 if consumed_chunk {
5062 instructions.pop_front();
5063 }
5064 drain_instructions.push(next_instructions);
5065 }
5066 drain_instructions
5067 }
5068
5069 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5071 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5072 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5073 let user_ranges = vec![1..7, 10..14];
5074
5075 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5077
5078 let mut to_drain = VecDeque::from(scheduled.clone());
5079
5080 let mut need_preamble = false;
5083 let mut skip_in_chunk = 0;
5084
5085 let next_batch =
5086 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5087
5088 assert!(!need_preamble);
5089 assert_eq!(skip_in_chunk, 4);
5090 assert_eq!(
5091 next_batch,
5092 vec![ChunkDrainInstructions {
5093 chunk_instructions: scheduled[0].clone(),
5094 rows_to_take: 4,
5095 rows_to_skip: 0,
5096 preamble_action: PreambleAction::Absent,
5097 }]
5098 );
5099
5100 let next_batch =
5101 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5102
5103 assert!(!need_preamble);
5104 assert_eq!(skip_in_chunk, 2);
5105
5106 assert_eq!(
5107 next_batch,
5108 vec![
5109 ChunkDrainInstructions {
5110 chunk_instructions: scheduled[0].clone(),
5111 rows_to_take: 1,
5112 rows_to_skip: 4,
5113 preamble_action: PreambleAction::Absent,
5114 },
5115 ChunkDrainInstructions {
5116 chunk_instructions: scheduled[1].clone(),
5117 rows_to_take: 1,
5118 rows_to_skip: 0,
5119 preamble_action: PreambleAction::Take,
5120 },
5121 ChunkDrainInstructions {
5122 chunk_instructions: scheduled[2].clone(),
5123 rows_to_take: 2,
5124 rows_to_skip: 0,
5125 preamble_action: PreambleAction::Absent,
5126 }
5127 ]
5128 );
5129
5130 let next_batch =
5131 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5132
5133 assert!(!need_preamble);
5134 assert_eq!(skip_in_chunk, 0);
5135
5136 assert_eq!(
5137 next_batch,
5138 vec![
5139 ChunkDrainInstructions {
5140 chunk_instructions: scheduled[2].clone(),
5141 rows_to_take: 1,
5142 rows_to_skip: 2,
5143 preamble_action: PreambleAction::Absent,
5144 },
5145 ChunkDrainInstructions {
5146 chunk_instructions: scheduled[3].clone(),
5147 rows_to_take: 1,
5148 rows_to_skip: 0,
5149 preamble_action: PreambleAction::Take,
5150 },
5151 ]
5152 );
5153
5154 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5156 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5157 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5158 let user_ranges = vec![0..28];
5159
5160 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5162
5163 let mut to_drain = VecDeque::from(scheduled.clone());
5164
5165 let mut need_preamble = false;
5168 let mut skip_in_chunk = 0;
5169
5170 let next_batch =
5171 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5172
5173 assert_eq!(
5174 next_batch,
5175 vec![
5176 ChunkDrainInstructions {
5177 chunk_instructions: scheduled[0].clone(),
5178 rows_to_take: 6,
5179 rows_to_skip: 0,
5180 preamble_action: PreambleAction::Absent,
5181 },
5182 ChunkDrainInstructions {
5183 chunk_instructions: scheduled[1].clone(),
5184 rows_to_take: 1,
5185 rows_to_skip: 0,
5186 preamble_action: PreambleAction::Take,
5187 },
5188 ]
5189 );
5190
5191 assert!(!need_preamble);
5192 assert_eq!(skip_in_chunk, 1);
5193
5194 let next_batch =
5197 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5198
5199 assert_eq!(
5200 next_batch,
5201 vec![
5202 ChunkDrainInstructions {
5203 chunk_instructions: scheduled[1].clone(),
5204 rows_to_take: 2,
5205 rows_to_skip: 1,
5206 preamble_action: PreambleAction::Skip,
5207 },
5208 ChunkDrainInstructions {
5209 chunk_instructions: scheduled[2].clone(),
5210 rows_to_take: 0,
5211 rows_to_skip: 0,
5212 preamble_action: PreambleAction::Take,
5213 },
5214 ]
5215 );
5216
5217 assert!(!need_preamble);
5218 assert_eq!(skip_in_chunk, 0);
5219 }
5220
5221 #[tokio::test]
5222 async fn test_fullzip_repetition_index_caching() {
5223 use crate::testing::SimulatedScheduler;
5224
5225 #[derive(Debug)]
5227 struct TestFixedDecompressor;
5228
5229 impl FixedPerValueDecompressor for TestFixedDecompressor {
5230 fn decompress(
5231 &self,
5232 _data: FixedWidthDataBlock,
5233 _num_rows: u64,
5234 ) -> crate::Result<DataBlock> {
5235 unimplemented!("Test decompressor")
5236 }
5237
5238 fn bits_per_value(&self) -> u64 {
5239 32
5240 }
5241 }
5242
5243 let rows_in_page = 100u64;
5245 let bytes_per_value = 4u64;
5246 let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
5247
5248 let mut rep_index_data = Vec::new();
5250 for i in 0..=rows_in_page {
5251 let offset = (i * 100) as u32; rep_index_data.extend_from_slice(&offset.to_le_bytes());
5253 }
5254
5255 let mut full_data = vec![0u8; 1000];
5257 full_data.extend_from_slice(&rep_index_data);
5258 full_data.extend_from_slice(&vec![0u8; 10000]); let data = bytes::Bytes::from(full_data);
5261 let io = Arc::new(SimulatedScheduler::new(data));
5262 let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024));
5263
5264 let mut scheduler = FullZipScheduler {
5266 data_buf_position: 0,
5267 rep_index: Some(FullZipRepIndexDetails {
5268 buf_position: 1000,
5269 bytes_per_value,
5270 }),
5271 priority: 0,
5272 rows_in_page,
5273 bits_per_offset: 32,
5274 details: Arc::new(FullZipDecodeDetails {
5275 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5276 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5277 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5278 max_rep: 0,
5279 max_visible_def: 0,
5280 }),
5281 cached_state: None,
5282 enable_cache: true, };
5284
5285 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5287 let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5288
5289 let is_cached = cached_data1
5291 .clone()
5292 .as_arc_any()
5293 .downcast::<FullZipCacheableState>()
5294 .is_ok();
5295 assert!(
5296 is_cached,
5297 "Expected FullZipCacheableState, got NoCachedPageData"
5298 );
5299
5300 scheduler.load(&cached_data1);
5302
5303 assert!(
5305 scheduler.cached_state.is_some(),
5306 "cached_state should be populated after load"
5307 );
5308
5309 let cached_state = scheduler.cached_state.as_ref().unwrap();
5311
5312 let ranges = vec![0..10, 20..30];
5314 let result = scheduler.schedule_ranges_rep(
5315 &ranges,
5316 &io_dyn,
5317 FullZipRepIndexDetails {
5318 buf_position: 1000,
5319 bytes_per_value,
5320 },
5321 );
5322
5323 assert!(
5325 result.is_ok(),
5326 "schedule_ranges_rep should succeed with cached data"
5327 );
5328
5329 let mut scheduler2 = FullZipScheduler {
5331 data_buf_position: 0,
5332 rep_index: Some(FullZipRepIndexDetails {
5333 buf_position: 1000,
5334 bytes_per_value,
5335 }),
5336 priority: 0,
5337 rows_in_page,
5338 bits_per_offset: 32,
5339 details: scheduler.details.clone(),
5340 cached_state: None,
5341 enable_cache: true, };
5343
5344 scheduler2.load(&cached_data1);
5346 assert!(
5347 scheduler2.cached_state.is_some(),
5348 "Second scheduler should have cached_state after load"
5349 );
5350
5351 let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5353 assert!(
5354 Arc::ptr_eq(cached_state, cached_state2),
5355 "Both schedulers should share the same cached data"
5356 );
5357 }
5358
5359 #[tokio::test]
5360 async fn test_fullzip_cache_config_controls_caching() {
5361 use crate::testing::SimulatedScheduler;
5362
5363 #[derive(Debug)]
5365 struct TestFixedDecompressor;
5366
5367 impl FixedPerValueDecompressor for TestFixedDecompressor {
5368 fn decompress(
5369 &self,
5370 _data: FixedWidthDataBlock,
5371 _num_rows: u64,
5372 ) -> crate::Result<DataBlock> {
5373 unimplemented!("Test decompressor")
5374 }
5375
5376 fn bits_per_value(&self) -> u64 {
5377 32
5378 }
5379 }
5380
5381 let rows_in_page = 1000_u64;
5383 let bytes_per_value = 4_u64;
5384
5385 let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5387 let value_data = vec![0u8; 4000]; let mut full_data = vec![0u8; 1000]; full_data.extend_from_slice(&rep_index_data);
5390 full_data.extend_from_slice(&value_data);
5391
5392 let data = bytes::Bytes::from(full_data);
5393 let io = Arc::new(SimulatedScheduler::new(data));
5394
5395 let mut scheduler_no_cache = FullZipScheduler {
5397 data_buf_position: 0,
5398 rep_index: Some(FullZipRepIndexDetails {
5399 buf_position: 1000,
5400 bytes_per_value,
5401 }),
5402 priority: 0,
5403 rows_in_page,
5404 bits_per_offset: 32,
5405 details: Arc::new(FullZipDecodeDetails {
5406 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5407 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5408 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5409 max_rep: 0,
5410 max_visible_def: 0,
5411 }),
5412 cached_state: None,
5413 enable_cache: false, };
5415
5416 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5417 let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5418
5419 assert!(
5421 cached_data
5422 .as_arc_any()
5423 .downcast_ref::<super::NoCachedPageData>()
5424 .is_some(),
5425 "With enable_cache=false, should return NoCachedPageData"
5426 );
5427
5428 let mut scheduler_with_cache = FullZipScheduler {
5430 data_buf_position: 0,
5431 rep_index: Some(FullZipRepIndexDetails {
5432 buf_position: 1000,
5433 bytes_per_value,
5434 }),
5435 priority: 0,
5436 rows_in_page,
5437 bits_per_offset: 32,
5438 details: Arc::new(FullZipDecodeDetails {
5439 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5440 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5441 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5442 max_rep: 0,
5443 max_visible_def: 0,
5444 }),
5445 cached_state: None,
5446 enable_cache: true, };
5448
5449 let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5450
5451 assert!(
5453 cached_data2
5454 .as_arc_any()
5455 .downcast_ref::<super::FullZipCacheableState>()
5456 .is_some(),
5457 "With enable_cache=true, should return FullZipCacheableState"
5458 );
5459 }
5460
5461 #[tokio::test]
5463 async fn test_fuzz_issue_4492_empty_rep_values() {
5464 use lance_datagen::{array, gen_batch, RowCount, Seed};
5465
5466 let seed = 1823859942947654717u64;
5467 let num_rows = 2741usize;
5468
5469 let batch_gen = gen_batch().with_seed(Seed::from(seed));
5471 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
5472 let list_generator = array::rand_list_any(base_generator, false);
5473
5474 let batch = batch_gen
5475 .anon_col(list_generator)
5476 .into_batch_rows(RowCount::from(num_rows as u64))
5477 .unwrap();
5478
5479 let list_array = batch.column(0).clone();
5480
5481 let mut metadata = HashMap::new();
5483 metadata.insert(
5484 STRUCTURAL_ENCODING_META_KEY.to_string(),
5485 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5486 );
5487
5488 let test_cases = TestCases::default()
5489 .with_min_file_version(LanceFileVersion::V2_1)
5490 .with_batch_size(100)
5491 .with_range(0..num_rows.min(500) as u64)
5492 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
5493
5494 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
5495 }
5496
5497 #[tokio::test]
5498 async fn test_large_dictionary_general_compression() {
5499 use arrow_array::{ArrayRef, StringArray};
5500 use std::collections::HashMap;
5501 use std::sync::Arc;
5502
5503 let unique_values: Vec<String> = (0..100)
5506 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
5507 .collect();
5508
5509 let repeated_strings: Vec<_> = unique_values
5511 .iter()
5512 .cycle()
5513 .take(100_000)
5514 .map(|s| Some(s.as_str()))
5515 .collect();
5516
5517 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
5518
5519 let test_cases = TestCases::default()
5521 .with_min_file_version(LanceFileVersion::V2_2)
5522 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
5523 assert_eq!(cols.len(), 1);
5524 let col = &cols[0];
5525
5526 if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
5528 if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
5530 if let Some(dictionary_encoding) = &mini_block.dictionary {
5531 match dictionary_encoding.compression.as_ref() {
5532 Some(Compression::General(general)) => {
5533 let compression = general.compression.as_ref().unwrap();
5535 assert!(
5536 compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
5537 || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
5538 "Expected LZ4 or Zstd compression for large dictionary"
5539 );
5540 }
5541 _ => panic!("Expected General compression for large dictionary"),
5542 }
5543 }
5544 }
5545 }
5546 }));
5547
5548 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
5549 }
5550
5551 fn create_test_fixed_data_block(num_values: u64, cardinality: u64) -> DataBlock {
5555 use crate::statistics::Stat;
5556
5557 let block_info = BlockInfo::default();
5558
5559 let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality]));
5561 block_info
5562 .0
5563 .write()
5564 .unwrap()
5565 .insert(Stat::Cardinality, cardinality_array);
5566
5567 DataBlock::FixedWidth(FixedWidthDataBlock {
5568 bits_per_value: 32,
5569 data: crate::buffer::LanceBuffer::from(vec![0u8; (num_values * 4) as usize]),
5570 num_values,
5571 block_info,
5572 })
5573 }
5574
5575 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
5577 use crate::statistics::Stat;
5578 use arrow_array::StringArray;
5579
5580 assert!(cardinality <= num_values && cardinality > 0);
5581
5582 let mut values = Vec::with_capacity(num_values as usize);
5583 for i in 0..num_values {
5584 values.push(format!("value_{:016}", i % cardinality));
5585 }
5586
5587 let array = StringArray::from(values);
5588 let block = DataBlock::from_array(Arc::new(array) as ArrayRef);
5589
5590 if let DataBlock::VariableWidth(ref var_block) = block {
5592 let mut info = var_block.block_info.0.write().unwrap();
5593 info.insert(
5595 Stat::Cardinality,
5596 Arc::new(UInt64Array::from(vec![cardinality])),
5597 );
5598 }
5599
5600 block
5601 }
5602
5603 #[test]
5604 fn test_estimate_dict_size_fixed_width() {
5605 use crate::encodings::logical::primitive::dict::{
5606 DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
5607 };
5608
5609 let block = create_test_fixed_data_block(1000, 400);
5610 let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5611
5612 let expected_dict_size = 400 * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
5615 let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8);
5616 let expected_total = expected_dict_size + expected_indices_size;
5617
5618 assert_eq!(estimated_size, expected_total);
5619 }
5620
5621 #[test]
5622 fn test_estimate_dict_size_variable_width() {
5623 let block = create_test_variable_width_block(1000, 400);
5624 let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5625
5626 let data_size = block.data_size();
5628 let avg_value_size = data_size / 1000;
5629
5630 let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4;
5631
5632 assert_eq!(estimated_size, expected);
5633 }
5634
5635 #[test]
5636 fn test_should_dictionary_encode() {
5637 use crate::constants::DICT_SIZE_RATIO_META_KEY;
5638 use lance_core::datatypes::Field as LanceField;
5639
5640 let block = create_test_variable_width_block(1000, 10);
5642
5643 let mut metadata = HashMap::new();
5644 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5645 let arrow_field =
5646 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5647 let field = LanceField::try_from(&arrow_field).unwrap();
5648
5649 let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5650
5651 assert!(result, "Should use dictionary encode based on size");
5652 }
5653
5654 #[test]
5655 fn test_should_not_dictionary_encode() {
5656 use crate::constants::DICT_SIZE_RATIO_META_KEY;
5657 use lance_core::datatypes::Field as LanceField;
5658
5659 let block = create_test_fixed_data_block(1000, 10);
5660
5661 let mut metadata = HashMap::new();
5662 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5663 let arrow_field =
5664 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5665 let field = LanceField::try_from(&arrow_field).unwrap();
5666
5667 let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5668
5669 assert!(!result, "Should not use dictionary encode based on size");
5670 }
5671}