1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 pb21::{self, compressive_encoding::Compression, CompressiveEncoding, PageLayout},
23 ProtobufUtils21,
24 },
25};
26use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
27use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
30use itertools::Itertools;
31use lance_arrow::deepcopy::deep_copy_nulls;
32use lance_core::{
33 cache::{CacheKey, Context, DeepSizeOf},
34 error::{Error, LanceOptionExt},
35 utils::bit::pad_bytes,
36};
37use log::trace;
38use snafu::location;
39
40use crate::{
41 compression::{
42 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
43 },
44 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
45 utils::bytepack::BytepackedIntegerEncoder,
46};
47use crate::{
48 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
49 encodings::logical::primitive::fullzip::PerValueDataBlock,
50};
51use crate::{
52 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
53};
54use crate::{
55 encodings::logical::primitive::miniblock::MiniBlockCompressed,
56 statistics::{ComputeStat, GetStat, Stat},
57};
58use crate::{
59 repdef::{
60 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
61 ControlWordParser, DefinitionInterpretation, RepDefSlicer,
62 },
63 utils::accumulation::AccumulationQueue,
64};
65use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
66
67use crate::constants::DICT_SIZE_RATIO_META_KEY;
68use crate::encodings::logical::primitive::dict::{
69 DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
70};
71use crate::{
72 buffer::LanceBuffer,
73 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
74 decoder::{
75 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
76 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
77 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
78 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
79 },
80 encoder::{
81 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
82 },
83 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
84 EncodingsIo,
85};
86
87pub mod blob;
88pub mod dict;
89pub mod fullzip;
90pub mod miniblock;
91
92const FILL_BYTE: u8 = 0xFE;
93
94struct PageLoadTask {
95 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
96 num_rows: u64,
97}
98
99trait StructuralPageScheduler: std::fmt::Debug + Send {
102 fn initialize<'a>(
104 &'a mut self,
105 io: &Arc<dyn EncodingsIo>,
106 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
107 fn load(&mut self, data: &Arc<dyn CachedPageData>);
109 fn schedule_ranges(
118 &self,
119 ranges: &[Range<u64>],
120 io: &Arc<dyn EncodingsIo>,
121 ) -> Result<Vec<PageLoadTask>>;
122}
123
124#[derive(Debug)]
126struct ChunkMeta {
127 num_values: u64,
128 chunk_size_bytes: u64,
129 offset_bytes: u64,
130}
131
132#[derive(Debug, Clone)]
134struct DecodedMiniBlockChunk {
135 rep: Option<ScalarBuffer<u16>>,
136 def: Option<ScalarBuffer<u16>>,
137 values: DataBlock,
138}
139
140#[derive(Debug)]
148struct DecodeMiniBlockTask {
149 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
150 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
151 value_decompressor: Arc<dyn MiniBlockDecompressor>,
152 dictionary_data: Option<Arc<DataBlock>>,
153 def_meaning: Arc<[DefinitionInterpretation]>,
154 num_buffers: u64,
155 max_visible_level: u16,
156 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
157}
158
159impl DecodeMiniBlockTask {
160 fn decode_levels(
161 rep_decompressor: &dyn BlockDecompressor,
162 levels: LanceBuffer,
163 num_levels: u16,
164 ) -> Result<ScalarBuffer<u16>> {
165 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
166 let rep = rep.as_fixed_width().unwrap();
167 debug_assert_eq!(rep.num_values, num_levels as u64);
168 debug_assert_eq!(rep.bits_per_value, 16);
169 Ok(rep.data.borrow_to_typed_slice::<u16>())
170 }
171
172 fn extend_levels(
179 range: Range<u64>,
180 levels: &mut Option<LevelBuffer>,
181 level_buf: &Option<impl AsRef<[u16]>>,
182 dest_offset: usize,
183 ) {
184 if let Some(level_buf) = level_buf {
185 if levels.is_none() {
186 let mut new_levels_vec =
189 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
190 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
191 *levels = Some(new_levels_vec);
192 }
193 levels.as_mut().unwrap().extend(
194 level_buf.as_ref()[range.start as usize..range.end as usize]
195 .iter()
196 .copied(),
197 );
198 } else if let Some(levels) = levels {
199 let num_values = (range.end - range.start) as usize;
200 levels.extend(iter::repeat_n(0, num_values));
203 }
204 }
205
206 fn map_range(
243 range: Range<u64>,
244 rep: Option<&impl AsRef<[u16]>>,
245 def: Option<&impl AsRef<[u16]>>,
246 max_rep: u16,
247 max_visible_def: u16,
248 total_items: u64,
251 preamble_action: PreambleAction,
252 ) -> (Range<u64>, Range<u64>) {
253 if let Some(rep) = rep {
254 let mut rep = rep.as_ref();
255 let mut items_in_preamble = 0_u64;
258 let first_row_start = match preamble_action {
259 PreambleAction::Skip | PreambleAction::Take => {
260 let first_row_start = if let Some(def) = def.as_ref() {
261 let mut first_row_start = None;
262 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
263 if *rep == max_rep {
264 first_row_start = Some(idx as u64);
265 break;
266 }
267 if *def <= max_visible_def {
268 items_in_preamble += 1;
269 }
270 }
271 first_row_start
272 } else {
273 let first_row_start =
274 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
275 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
276 first_row_start
277 };
278 if first_row_start.is_none() {
281 assert!(preamble_action == PreambleAction::Take);
282 return (0..total_items, 0..rep.len() as u64);
283 }
284 let first_row_start = first_row_start.unwrap();
285 rep = &rep[first_row_start as usize..];
286 first_row_start
287 }
288 PreambleAction::Absent => {
289 debug_assert!(rep[0] == max_rep);
290 0
291 }
292 };
293
294 if range.start == range.end {
296 debug_assert!(preamble_action == PreambleAction::Take);
297 debug_assert!(items_in_preamble <= total_items);
298 return (0..items_in_preamble, 0..first_row_start);
299 }
300 assert!(range.start < range.end);
301
302 let mut rows_seen = 0;
303 let mut new_start = 0;
304 let mut new_levels_start = 0;
305
306 if let Some(def) = def {
307 let def = &def.as_ref()[first_row_start as usize..];
308
309 let mut lead_invis_seen = 0;
311
312 if range.start > 0 {
313 if def[0] > max_visible_def {
314 lead_invis_seen += 1;
315 }
316 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
317 if *rep == max_rep {
318 rows_seen += 1;
319 if rows_seen == range.start {
320 new_start = idx as u64 + 1 - lead_invis_seen;
321 new_levels_start = idx as u64 + 1;
322 break;
323 }
324 }
325 if *def > max_visible_def {
326 lead_invis_seen += 1;
327 }
328 }
329 }
330
331 rows_seen += 1;
332
333 let mut new_end = u64::MAX;
334 let mut new_levels_end = rep.len() as u64;
335 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
336 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
337 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
338 .iter()
339 .zip(&def[(new_levels_start + 1) as usize..])
340 .enumerate()
341 {
342 if *rep == max_rep {
343 rows_seen += 1;
344 if rows_seen == range.end + 1 {
345 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
346 new_levels_end = idx as u64 + new_levels_start + 1;
347 break;
348 }
349 }
350 if *def > max_visible_def {
351 tail_invis_seen += 1;
352 }
353 }
354
355 if new_end == u64::MAX {
356 new_levels_end = rep.len() as u64;
357 let total_invis_seen = lead_invis_seen + tail_invis_seen;
358 new_end = rep.len() as u64 - total_invis_seen;
359 }
360
361 assert_ne!(new_end, u64::MAX);
362
363 if preamble_action == PreambleAction::Skip {
365 new_start += items_in_preamble;
366 new_end += items_in_preamble;
367 new_levels_start += first_row_start;
368 new_levels_end += first_row_start;
369 } else if preamble_action == PreambleAction::Take {
370 debug_assert_eq!(new_start, 0);
371 debug_assert_eq!(new_levels_start, 0);
372 new_end += items_in_preamble;
373 new_levels_end += first_row_start;
374 }
375
376 debug_assert!(new_end <= total_items);
377 (new_start..new_end, new_levels_start..new_levels_end)
378 } else {
379 if range.start > 0 {
385 for (idx, rep) in rep.iter().skip(1).enumerate() {
386 if *rep == max_rep {
387 rows_seen += 1;
388 if rows_seen == range.start {
389 new_start = idx as u64 + 1;
390 break;
391 }
392 }
393 }
394 }
395 let mut new_end = rep.len() as u64;
396 if range.end < total_items {
398 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
399 if *rep == max_rep {
400 rows_seen += 1;
401 if rows_seen == range.end {
402 new_end = idx as u64 + new_start + 1;
403 break;
404 }
405 }
406 }
407 }
408
409 if preamble_action == PreambleAction::Skip {
411 new_start += first_row_start;
412 new_end += first_row_start;
413 } else if preamble_action == PreambleAction::Take {
414 debug_assert_eq!(new_start, 0);
415 new_end += first_row_start;
416 }
417
418 debug_assert!(new_end <= total_items);
419 (new_start..new_end, new_start..new_end)
420 }
421 } else {
422 (range.clone(), range)
425 }
426 }
427
428 fn decode_miniblock_chunk(
430 &self,
431 buf: &LanceBuffer,
432 items_in_chunk: u64,
433 ) -> Result<DecodedMiniBlockChunk> {
434 let mut offset = 0;
435 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
436 offset += 2;
437
438 let rep_size = if self.rep_decompressor.is_some() {
439 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
440 offset += 2;
441 Some(rep_size)
442 } else {
443 None
444 };
445 let def_size = if self.def_decompressor.is_some() {
446 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
447 offset += 2;
448 Some(def_size)
449 } else {
450 None
451 };
452 let buffer_sizes = (0..self.num_buffers)
453 .map(|_| {
454 let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
455 offset += 2;
456 size
457 })
458 .collect::<Vec<_>>();
459
460 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
461
462 let rep = rep_size.map(|rep_size| {
463 let rep = buf.slice_with_length(offset, rep_size as usize);
464 offset += rep_size as usize;
465 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
466 rep
467 });
468
469 let def = def_size.map(|def_size| {
470 let def = buf.slice_with_length(offset, def_size as usize);
471 offset += def_size as usize;
472 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
473 def
474 });
475
476 let buffers = buffer_sizes
477 .into_iter()
478 .map(|buf_size| {
479 let buf = buf.slice_with_length(offset, buf_size as usize);
480 offset += buf_size as usize;
481 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
482 buf
483 })
484 .collect::<Vec<_>>();
485
486 let values = self
487 .value_decompressor
488 .decompress(buffers, items_in_chunk)?;
489
490 let rep = rep
491 .map(|rep| {
492 Self::decode_levels(
493 self.rep_decompressor.as_ref().unwrap().as_ref(),
494 rep,
495 num_levels,
496 )
497 })
498 .transpose()?;
499 let def = def
500 .map(|def| {
501 Self::decode_levels(
502 self.def_decompressor.as_ref().unwrap().as_ref(),
503 def,
504 num_levels,
505 )
506 })
507 .transpose()?;
508
509 Ok(DecodedMiniBlockChunk { rep, def, values })
510 }
511}
512
513impl DecodePageTask for DecodeMiniBlockTask {
514 fn decode(self: Box<Self>) -> Result<DecodedPage> {
515 let mut repbuf: Option<LevelBuffer> = None;
517 let mut defbuf: Option<LevelBuffer> = None;
518
519 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
520
521 let estimated_size_bytes = self
523 .instructions
524 .iter()
525 .map(|(_, chunk)| chunk.data.len())
526 .sum::<usize>()
527 * 2;
528 let mut data_builder =
529 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
530
531 let mut level_offset = 0;
533
534 let needs_caching: Vec<bool> = self
536 .instructions
537 .windows(2)
538 .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
539 .chain(std::iter::once(false)) .collect();
541
542 let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
544
545 for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
547 let should_cache_this_chunk = needs_caching[idx];
548
549 let decoded_chunk = match &chunk_cache {
550 Some((cached_chunk_idx, ref cached_chunk))
551 if *cached_chunk_idx == chunk.chunk_idx =>
552 {
553 cached_chunk.clone()
555 }
556 _ => {
557 let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
559
560 if should_cache_this_chunk {
562 chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
563 }
564 decoded
565 }
566 };
567
568 let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
569
570 let row_range_start =
572 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
573 let row_range_end = row_range_start + instructions.rows_to_take;
574
575 let (item_range, level_range) = Self::map_range(
577 row_range_start..row_range_end,
578 rep.as_ref(),
579 def.as_ref(),
580 max_rep,
581 self.max_visible_level,
582 chunk.items_in_chunk,
583 instructions.preamble_action,
584 );
585 if item_range.end - item_range.start > chunk.items_in_chunk {
586 return Err(lance_core::Error::Internal {
587 message: format!(
588 "Item range {:?} is greater than chunk items in chunk {:?}",
589 item_range, chunk.items_in_chunk
590 ),
591 location: location!(),
592 });
593 }
594
595 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
597 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
598 level_offset += (level_range.end - level_range.start) as usize;
599 data_builder.append(&values, item_range);
600 }
601
602 let mut data = data_builder.finish();
603
604 let unraveler =
605 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
606
607 if let Some(dictionary) = &self.dictionary_data {
608 let DataBlock::FixedWidth(indices) = data else {
610 return Err(lance_core::Error::Internal {
611 message: format!(
612 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
613 data
614 ),
615 location: location!(),
616 });
617 };
618 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
619 indices,
620 dictionary.as_ref().clone(),
621 ));
622 }
623
624 Ok(DecodedPage {
625 data,
626 repdef: unraveler,
627 })
628 }
629}
630
631#[derive(Debug)]
634struct LoadedChunk {
635 data: LanceBuffer,
636 items_in_chunk: u64,
637 byte_range: Range<u64>,
638 chunk_idx: usize,
639}
640
641impl Clone for LoadedChunk {
642 fn clone(&self) -> Self {
643 Self {
644 data: self.data.clone(),
646 items_in_chunk: self.items_in_chunk,
647 byte_range: self.byte_range.clone(),
648 chunk_idx: self.chunk_idx,
649 }
650 }
651}
652
653#[derive(Debug)]
656struct MiniBlockDecoder {
657 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
658 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
659 value_decompressor: Arc<dyn MiniBlockDecompressor>,
660 def_meaning: Arc<[DefinitionInterpretation]>,
661 loaded_chunks: VecDeque<LoadedChunk>,
662 instructions: VecDeque<ChunkInstructions>,
663 offset_in_current_chunk: u64,
664 num_rows: u64,
665 num_buffers: u64,
666 dictionary: Option<Arc<DataBlock>>,
667}
668
669impl StructuralPageDecoder for MiniBlockDecoder {
672 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
673 let mut items_desired = num_rows;
674 let mut need_preamble = false;
675 let mut skip_in_chunk = self.offset_in_current_chunk;
676 let mut drain_instructions = Vec::new();
677 while items_desired > 0 || need_preamble {
678 let (instructions, consumed) = self
679 .instructions
680 .front()
681 .unwrap()
682 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
683
684 while self.loaded_chunks.front().unwrap().chunk_idx
685 != instructions.chunk_instructions.chunk_idx
686 {
687 self.loaded_chunks.pop_front();
688 }
689 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
690 if consumed {
691 self.instructions.pop_front();
692 }
693 }
694 self.offset_in_current_chunk = skip_in_chunk;
697
698 let max_visible_level = self
699 .def_meaning
700 .iter()
701 .take_while(|l| !l.is_list())
702 .map(|l| l.num_def_levels())
703 .sum::<u16>();
704
705 Ok(Box::new(DecodeMiniBlockTask {
706 instructions: drain_instructions,
707 def_decompressor: self.def_decompressor.clone(),
708 rep_decompressor: self.rep_decompressor.clone(),
709 value_decompressor: self.value_decompressor.clone(),
710 dictionary_data: self.dictionary.clone(),
711 def_meaning: self.def_meaning.clone(),
712 num_buffers: self.num_buffers,
713 max_visible_level,
714 }))
715 }
716
717 fn num_rows(&self) -> u64 {
718 self.num_rows
719 }
720}
721
722#[derive(Debug)]
723struct CachedComplexAllNullState {
724 rep: Option<ScalarBuffer<u16>>,
725 def: Option<ScalarBuffer<u16>>,
726}
727
728impl DeepSizeOf for CachedComplexAllNullState {
729 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
730 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
731 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
732 }
733}
734
735impl CachedPageData for CachedComplexAllNullState {
736 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
737 self
738 }
739}
740
741#[derive(Debug)]
750pub struct ComplexAllNullScheduler {
751 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
753 def_meaning: Arc<[DefinitionInterpretation]>,
754 repdef: Option<Arc<CachedComplexAllNullState>>,
755 max_visible_level: u16,
756}
757
758impl ComplexAllNullScheduler {
759 pub fn new(
760 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
761 def_meaning: Arc<[DefinitionInterpretation]>,
762 ) -> Self {
763 let max_visible_level = def_meaning
764 .iter()
765 .take_while(|l| !l.is_list())
766 .map(|l| l.num_def_levels())
767 .sum::<u16>();
768 Self {
769 buffer_offsets_and_sizes,
770 def_meaning,
771 repdef: None,
772 max_visible_level,
773 }
774 }
775}
776
777impl StructuralPageScheduler for ComplexAllNullScheduler {
778 fn initialize<'a>(
779 &'a mut self,
780 io: &Arc<dyn EncodingsIo>,
781 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
782 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
784 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
785 let has_rep = rep_size > 0;
786 let has_def = def_size > 0;
787
788 let mut reads = Vec::with_capacity(2);
789 if has_rep {
790 reads.push(rep_pos..rep_pos + rep_size);
791 }
792 if has_def {
793 reads.push(def_pos..def_pos + def_size);
794 }
795
796 let data = io.submit_request(reads, 0);
797
798 async move {
799 let data = data.await?;
800 let mut data_iter = data.into_iter();
801
802 let rep = if has_rep {
803 let rep = data_iter.next().unwrap();
804 let rep = LanceBuffer::from_bytes(rep, 2);
805 let rep = rep.borrow_to_typed_slice::<u16>();
806 Some(rep)
807 } else {
808 None
809 };
810
811 let def = if has_def {
812 let def = data_iter.next().unwrap();
813 let def = LanceBuffer::from_bytes(def, 2);
814 let def = def.borrow_to_typed_slice::<u16>();
815 Some(def)
816 } else {
817 None
818 };
819
820 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
821
822 self.repdef = Some(repdef.clone());
823
824 Ok(repdef as Arc<dyn CachedPageData>)
825 }
826 .boxed()
827 }
828
829 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
830 self.repdef = Some(
831 data.clone()
832 .as_arc_any()
833 .downcast::<CachedComplexAllNullState>()
834 .unwrap(),
835 );
836 }
837
838 fn schedule_ranges(
839 &self,
840 ranges: &[Range<u64>],
841 _io: &Arc<dyn EncodingsIo>,
842 ) -> Result<Vec<PageLoadTask>> {
843 let ranges = VecDeque::from_iter(ranges.iter().cloned());
844 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
845 let decoder = Box::new(ComplexAllNullPageDecoder {
846 ranges,
847 rep: self.repdef.as_ref().unwrap().rep.clone(),
848 def: self.repdef.as_ref().unwrap().def.clone(),
849 num_rows,
850 def_meaning: self.def_meaning.clone(),
851 max_visible_level: self.max_visible_level,
852 }) as Box<dyn StructuralPageDecoder>;
853 let page_load_task = PageLoadTask {
854 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
855 num_rows,
856 };
857 Ok(vec![page_load_task])
858 }
859}
860
861#[derive(Debug)]
862pub struct ComplexAllNullPageDecoder {
863 ranges: VecDeque<Range<u64>>,
864 rep: Option<ScalarBuffer<u16>>,
865 def: Option<ScalarBuffer<u16>>,
866 num_rows: u64,
867 def_meaning: Arc<[DefinitionInterpretation]>,
868 max_visible_level: u16,
869}
870
871impl ComplexAllNullPageDecoder {
872 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
873 let mut rows_desired = num_rows;
874 let mut ranges = Vec::with_capacity(self.ranges.len());
875 while rows_desired > 0 {
876 let front = self.ranges.front_mut().unwrap();
877 let avail = front.end - front.start;
878 if avail > rows_desired {
879 ranges.push(front.start..front.start + rows_desired);
880 front.start += rows_desired;
881 rows_desired = 0;
882 } else {
883 ranges.push(self.ranges.pop_front().unwrap());
884 rows_desired -= avail;
885 }
886 }
887 ranges
888 }
889}
890
891impl StructuralPageDecoder for ComplexAllNullPageDecoder {
892 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
893 let drained_ranges = self.drain_ranges(num_rows);
894 Ok(Box::new(DecodeComplexAllNullTask {
895 ranges: drained_ranges,
896 rep: self.rep.clone(),
897 def: self.def.clone(),
898 def_meaning: self.def_meaning.clone(),
899 max_visible_level: self.max_visible_level,
900 }))
901 }
902
903 fn num_rows(&self) -> u64 {
904 self.num_rows
905 }
906}
907
908#[derive(Debug)]
911pub struct DecodeComplexAllNullTask {
912 ranges: Vec<Range<u64>>,
913 rep: Option<ScalarBuffer<u16>>,
914 def: Option<ScalarBuffer<u16>>,
915 def_meaning: Arc<[DefinitionInterpretation]>,
916 max_visible_level: u16,
917}
918
919impl DecodeComplexAllNullTask {
920 fn decode_level(
921 &self,
922 levels: &Option<ScalarBuffer<u16>>,
923 num_values: u64,
924 ) -> Option<Vec<u16>> {
925 levels.as_ref().map(|levels| {
926 let mut referenced_levels = Vec::with_capacity(num_values as usize);
927 for range in &self.ranges {
928 referenced_levels.extend(
929 levels[range.start as usize..range.end as usize]
930 .iter()
931 .copied(),
932 );
933 }
934 referenced_levels
935 })
936 }
937}
938
939impl DecodePageTask for DecodeComplexAllNullTask {
940 fn decode(self: Box<Self>) -> Result<DecodedPage> {
941 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
942 let rep = self.decode_level(&self.rep, num_values);
943 let def = self.decode_level(&self.def, num_values);
944
945 let num_values = if let Some(def) = &def {
949 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
950 } else {
951 num_values
952 };
953
954 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
955 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
956 Ok(DecodedPage {
957 data,
958 repdef: unraveler,
959 })
960 }
961}
962
963#[derive(Debug, Default)]
968pub struct SimpleAllNullScheduler {}
969
970impl StructuralPageScheduler for SimpleAllNullScheduler {
971 fn initialize<'a>(
972 &'a mut self,
973 _io: &Arc<dyn EncodingsIo>,
974 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
975 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
976 }
977
978 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
979
980 fn schedule_ranges(
981 &self,
982 ranges: &[Range<u64>],
983 _io: &Arc<dyn EncodingsIo>,
984 ) -> Result<Vec<PageLoadTask>> {
985 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
986 let decoder =
987 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
988 let page_load_task = PageLoadTask {
989 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
990 num_rows,
991 };
992 Ok(vec![page_load_task])
993 }
994}
995
996#[derive(Debug)]
999struct SimpleAllNullDecodePageTask {
1000 num_values: u64,
1001}
1002impl DecodePageTask for SimpleAllNullDecodePageTask {
1003 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1004 let unraveler = RepDefUnraveler::new(
1005 None,
1006 Some(vec![1; self.num_values as usize]),
1007 Arc::new([DefinitionInterpretation::NullableItem]),
1008 self.num_values,
1009 );
1010 Ok(DecodedPage {
1011 data: DataBlock::AllNull(AllNullDataBlock {
1012 num_values: self.num_values,
1013 }),
1014 repdef: unraveler,
1015 })
1016 }
1017}
1018
1019#[derive(Debug)]
1020pub struct SimpleAllNullPageDecoder {
1021 num_rows: u64,
1022}
1023
1024impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1025 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1026 Ok(Box::new(SimpleAllNullDecodePageTask {
1027 num_values: num_rows,
1028 }))
1029 }
1030
1031 fn num_rows(&self) -> u64 {
1032 self.num_rows
1033 }
1034}
1035
1036#[derive(Debug, Clone)]
1037struct MiniBlockSchedulerDictionary {
1038 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1040 dictionary_buf_position_and_size: (u64, u64),
1041 dictionary_data_alignment: u64,
1042 num_dictionary_items: u64,
1043}
1044
1045#[derive(Debug)]
1047struct MiniBlockRepIndexBlock {
1048 first_row: u64,
1052 starts_including_trailer: u64,
1055 has_preamble: bool,
1057 has_trailer: bool,
1059}
1060
1061impl DeepSizeOf for MiniBlockRepIndexBlock {
1062 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1063 0
1064 }
1065}
1066
1067#[derive(Debug)]
1072struct MiniBlockRepIndex {
1073 blocks: Vec<MiniBlockRepIndexBlock>,
1074}
1075
1076impl DeepSizeOf for MiniBlockRepIndex {
1077 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1078 self.blocks.deep_size_of_children(context)
1079 }
1080}
1081
1082impl MiniBlockRepIndex {
1083 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1088 let mut blocks = Vec::with_capacity(chunks.len());
1089 let mut offset: u64 = 0;
1090
1091 for c in chunks {
1092 blocks.push(MiniBlockRepIndexBlock {
1093 first_row: offset,
1094 starts_including_trailer: c.num_values,
1095 has_preamble: false,
1096 has_trailer: false,
1097 });
1098
1099 offset += c.num_values;
1100 }
1101
1102 Self { blocks }
1103 }
1104
1105 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1111 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1113 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1114 let n = u64_slice.len() / stride;
1115
1116 let mut blocks = Vec::with_capacity(n);
1117 let mut chunk_has_preamble = false;
1118 let mut offset: u64 = 0;
1119
1120 for i in 0..n {
1122 let base_idx = i * stride;
1123 let ends = u64_slice[base_idx];
1124 let partial = u64_slice[base_idx + 1];
1125
1126 let has_trailer = partial > 0;
1127 let starts_including_trailer =
1129 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1130
1131 blocks.push(MiniBlockRepIndexBlock {
1132 first_row: offset,
1133 starts_including_trailer,
1134 has_preamble: chunk_has_preamble,
1135 has_trailer,
1136 });
1137
1138 chunk_has_preamble = has_trailer;
1139 offset += starts_including_trailer;
1140 }
1141
1142 Self { blocks }
1143 }
1144}
1145
1146#[derive(Debug)]
1148struct MiniBlockCacheableState {
1149 chunk_meta: Vec<ChunkMeta>,
1151 rep_index: MiniBlockRepIndex,
1153 dictionary: Option<Arc<DataBlock>>,
1155}
1156
1157impl DeepSizeOf for MiniBlockCacheableState {
1158 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1159 self.rep_index.deep_size_of_children(context)
1160 + self
1161 .dictionary
1162 .as_ref()
1163 .map(|dict| dict.data_size() as usize)
1164 .unwrap_or(0)
1165 }
1166}
1167
1168impl CachedPageData for MiniBlockCacheableState {
1169 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1170 self
1171 }
1172}
1173
1174#[derive(Debug)]
1201pub struct MiniBlockScheduler {
1202 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1204 priority: u64,
1205 items_in_page: u64,
1206 repetition_index_depth: u16,
1207 num_buffers: u64,
1208 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1209 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1210 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1211 def_meaning: Arc<[DefinitionInterpretation]>,
1212 dictionary: Option<MiniBlockSchedulerDictionary>,
1213 page_meta: Option<Arc<MiniBlockCacheableState>>,
1215}
1216
1217impl MiniBlockScheduler {
1218 fn try_new(
1219 buffer_offsets_and_sizes: &[(u64, u64)],
1220 priority: u64,
1221 items_in_page: u64,
1222 layout: &pb21::MiniBlockLayout,
1223 decompressors: &dyn DecompressionStrategy,
1224 ) -> Result<Self> {
1225 let rep_decompressor = layout
1226 .rep_compression
1227 .as_ref()
1228 .map(|rep_compression| {
1229 decompressors
1230 .create_block_decompressor(rep_compression)
1231 .map(Arc::from)
1232 })
1233 .transpose()?;
1234 let def_decompressor = layout
1235 .def_compression
1236 .as_ref()
1237 .map(|def_compression| {
1238 decompressors
1239 .create_block_decompressor(def_compression)
1240 .map(Arc::from)
1241 })
1242 .transpose()?;
1243 let def_meaning = layout
1244 .layers
1245 .iter()
1246 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1247 .collect::<Vec<_>>();
1248 let value_decompressor = decompressors.create_miniblock_decompressor(
1249 layout.value_compression.as_ref().unwrap(),
1250 decompressors,
1251 )?;
1252
1253 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1254 let num_dictionary_items = layout.num_dictionary_items;
1255 match dictionary_encoding.compression.as_ref().unwrap() {
1256 Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
1257 dictionary_decompressor: decompressors
1258 .create_block_decompressor(dictionary_encoding)?
1259 .into(),
1260 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1261 dictionary_data_alignment: 4,
1262 num_dictionary_items,
1263 }),
1264 Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
1265 dictionary_decompressor: decompressors
1266 .create_block_decompressor(dictionary_encoding)?
1267 .into(),
1268 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1269 dictionary_data_alignment: 16,
1270 num_dictionary_items,
1271 }),
1272 Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1273 dictionary_decompressor: decompressors
1274 .create_block_decompressor(dictionary_encoding)?
1275 .into(),
1276 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1277 dictionary_data_alignment: 1,
1278 num_dictionary_items,
1279 }),
1280 _ => unreachable!(
1281 "Mini-block dictionary encoding must use Variable, Flat, or General compression"
1282 ),
1283 }
1284 } else {
1285 None
1286 };
1287
1288 Ok(Self {
1289 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1290 rep_decompressor,
1291 def_decompressor,
1292 value_decompressor: value_decompressor.into(),
1293 repetition_index_depth: layout.repetition_index_depth as u16,
1294 num_buffers: layout.num_buffers,
1295 priority,
1296 items_in_page,
1297 dictionary,
1298 def_meaning: def_meaning.into(),
1299 page_meta: None,
1300 })
1301 }
1302
1303 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1304 let page_meta = self.page_meta.as_ref().unwrap();
1305 chunk_indices
1306 .iter()
1307 .map(|&chunk_idx| {
1308 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1309 let bytes_start = chunk_meta.offset_bytes;
1310 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1311 LoadedChunk {
1312 byte_range: bytes_start..bytes_end,
1313 items_in_chunk: chunk_meta.num_values,
1314 chunk_idx,
1315 data: LanceBuffer::empty(),
1316 }
1317 })
1318 .collect()
1319 }
1320}
1321
1322#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1323enum PreambleAction {
1324 Take,
1325 Skip,
1326 Absent,
1327}
1328
1329#[derive(Clone, Debug, PartialEq, Eq)]
1364struct ChunkInstructions {
1365 chunk_idx: usize,
1367 preamble: PreambleAction,
1373 rows_to_skip: u64,
1377 rows_to_take: u64,
1380 take_trailer: bool,
1387}
1388
1389#[derive(Debug, PartialEq, Eq)]
1407struct ChunkDrainInstructions {
1408 chunk_instructions: ChunkInstructions,
1409 rows_to_skip: u64,
1410 rows_to_take: u64,
1411 preamble_action: PreambleAction,
1412}
1413
1414impl ChunkInstructions {
1415 fn schedule_instructions(
1421 rep_index: &MiniBlockRepIndex,
1422 user_ranges: &[Range<u64>],
1423 ) -> Vec<Self> {
1424 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1428
1429 for user_range in user_ranges {
1430 let mut rows_needed = user_range.end - user_range.start;
1431 let mut need_preamble = false;
1432
1433 let mut block_index = match rep_index
1436 .blocks
1437 .binary_search_by_key(&user_range.start, |block| block.first_row)
1438 {
1439 Ok(idx) => {
1440 let mut idx = idx;
1443 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1444 idx -= 1;
1445 }
1446 idx
1447 }
1448 Err(idx) => idx - 1,
1450 };
1451
1452 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1453
1454 while rows_needed > 0 || need_preamble {
1455 if block_index >= rep_index.blocks.len() {
1457 log::warn!("schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early");
1458 break;
1459 }
1460
1461 let chunk = &rep_index.blocks[block_index];
1462 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1463
1464 if rows_avail == 0 && to_skip == 0 {
1468 if chunk.has_preamble && need_preamble {
1470 chunk_instructions.push(Self {
1471 chunk_idx: block_index,
1472 preamble: PreambleAction::Take,
1473 rows_to_skip: 0,
1474 rows_to_take: 0,
1475 take_trailer: chunk.has_trailer,
1479 });
1480 if chunk.starts_including_trailer > 0
1484 || block_index == rep_index.blocks.len() - 1
1485 {
1486 need_preamble = false;
1487 }
1488 }
1489 block_index += 1;
1491 continue;
1492 }
1493
1494 if rows_avail == 0 && to_skip > 0 {
1498 to_skip -= chunk.starts_including_trailer;
1501 block_index += 1;
1502 continue;
1503 }
1504
1505 let rows_to_take = rows_avail.min(rows_needed);
1506 rows_needed -= rows_to_take;
1507
1508 let mut take_trailer = false;
1509 let preamble = if chunk.has_preamble {
1510 if need_preamble {
1511 PreambleAction::Take
1512 } else {
1513 PreambleAction::Skip
1514 }
1515 } else {
1516 PreambleAction::Absent
1517 };
1518
1519 if rows_to_take == rows_avail && chunk.has_trailer {
1521 take_trailer = true;
1522 need_preamble = true;
1523 } else {
1524 need_preamble = false;
1525 };
1526
1527 chunk_instructions.push(Self {
1528 preamble,
1529 chunk_idx: block_index,
1530 rows_to_skip: to_skip,
1531 rows_to_take,
1532 take_trailer,
1533 });
1534
1535 to_skip = 0;
1536 block_index += 1;
1537 }
1538 }
1539
1540 if user_ranges.len() > 1 {
1544 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1546 let mut instructions_iter = chunk_instructions.into_iter();
1547 merged_instructions.push(instructions_iter.next().unwrap());
1548 for instruction in instructions_iter {
1549 let last = merged_instructions.last_mut().unwrap();
1550 if last.chunk_idx == instruction.chunk_idx
1551 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1552 {
1553 last.rows_to_take += instruction.rows_to_take;
1554 last.take_trailer |= instruction.take_trailer;
1555 } else {
1556 merged_instructions.push(instruction);
1557 }
1558 }
1559 merged_instructions
1560 } else {
1561 chunk_instructions
1562 }
1563 }
1564
1565 fn drain_from_instruction(
1566 &self,
1567 rows_desired: &mut u64,
1568 need_preamble: &mut bool,
1569 skip_in_chunk: &mut u64,
1570 ) -> (ChunkDrainInstructions, bool) {
1571 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1573 let rows_avail = self.rows_to_take - *skip_in_chunk;
1574 let has_preamble = self.preamble != PreambleAction::Absent;
1575 let preamble_action = match (*need_preamble, has_preamble) {
1576 (true, true) => PreambleAction::Take,
1577 (true, false) => panic!("Need preamble but there isn't one"),
1578 (false, true) => PreambleAction::Skip,
1579 (false, false) => PreambleAction::Absent,
1580 };
1581
1582 let rows_taking = if *rows_desired >= rows_avail {
1585 *need_preamble = self.take_trailer;
1593 rows_avail
1594 } else {
1595 *need_preamble = false;
1598 *rows_desired
1599 };
1600 let rows_skipped = *skip_in_chunk;
1601
1602 let consumed_chunk = if *rows_desired >= rows_avail {
1604 *rows_desired -= rows_avail;
1605 *skip_in_chunk = 0;
1606 true
1607 } else {
1608 *skip_in_chunk += *rows_desired;
1609 *rows_desired = 0;
1610 false
1611 };
1612
1613 (
1614 ChunkDrainInstructions {
1615 chunk_instructions: self.clone(),
1616 rows_to_skip: rows_skipped,
1617 rows_to_take: rows_taking,
1618 preamble_action,
1619 },
1620 consumed_chunk,
1621 )
1622 }
1623}
1624
1625impl StructuralPageScheduler for MiniBlockScheduler {
1626 fn initialize<'a>(
1627 &'a mut self,
1628 io: &Arc<dyn EncodingsIo>,
1629 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1630 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1634 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1635 let mut bufs_needed = 1;
1636 if self.dictionary.is_some() {
1637 bufs_needed += 1;
1638 }
1639 if self.repetition_index_depth > 0 {
1640 bufs_needed += 1;
1641 }
1642 let mut required_ranges = Vec::with_capacity(bufs_needed);
1643 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1644 if let Some(ref dictionary) = self.dictionary {
1645 required_ranges.push(
1646 dictionary.dictionary_buf_position_and_size.0
1647 ..dictionary.dictionary_buf_position_and_size.0
1648 + dictionary.dictionary_buf_position_and_size.1,
1649 );
1650 }
1651 if self.repetition_index_depth > 0 {
1652 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1653 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1654 }
1655 let io_req = io.submit_request(required_ranges, 0);
1656
1657 async move {
1658 let mut buffers = io_req.await?.into_iter().fuse();
1659 let meta_bytes = buffers.next().unwrap();
1660 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1661 let rep_index_bytes = buffers.next();
1662
1663 assert!(meta_bytes.len() % 2 == 0);
1665 let bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1666 let words = bytes.borrow_to_typed_slice::<u16>();
1667 let words = words.as_ref();
1668
1669 let mut chunk_meta = Vec::with_capacity(words.len());
1670
1671 let mut rows_counter = 0;
1672 let mut offset_bytes = value_buf_position;
1673 for (word_idx, word) in words.iter().enumerate() {
1674 let log_num_values = word & 0x0F;
1675 let divided_bytes = word >> 4;
1676 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1677 debug_assert!(num_bytes > 0);
1678 let num_values = if word_idx < words.len() - 1 {
1679 debug_assert!(log_num_values > 0);
1680 1 << log_num_values
1681 } else {
1682 debug_assert!(
1683 log_num_values == 0
1684 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1685 );
1686 self.items_in_page - rows_counter
1687 };
1688 rows_counter += num_values;
1689
1690 chunk_meta.push(ChunkMeta {
1691 num_values,
1692 chunk_size_bytes: num_bytes as u64,
1693 offset_bytes,
1694 });
1695 offset_bytes += num_bytes as u64;
1696 }
1697
1698 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1700 assert!(rep_index_data.len() % 8 == 0);
1701 let stride = self.repetition_index_depth as usize + 1;
1702 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1703 } else {
1704 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1705 };
1706
1707 let mut page_meta = MiniBlockCacheableState {
1708 chunk_meta,
1709 rep_index,
1710 dictionary: None,
1711 };
1712
1713 if let Some(ref mut dictionary) = self.dictionary {
1715 let dictionary_data = dictionary_bytes.unwrap();
1716 page_meta.dictionary =
1717 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1718 LanceBuffer::from_bytes(
1719 dictionary_data,
1720 dictionary.dictionary_data_alignment,
1721 ),
1722 dictionary.num_dictionary_items,
1723 )?));
1724 };
1725 let page_meta = Arc::new(page_meta);
1726 self.page_meta = Some(page_meta.clone());
1727 Ok(page_meta as Arc<dyn CachedPageData>)
1728 }
1729 .boxed()
1730 }
1731
1732 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1733 self.page_meta = Some(
1734 data.clone()
1735 .as_arc_any()
1736 .downcast::<MiniBlockCacheableState>()
1737 .unwrap(),
1738 );
1739 }
1740
1741 fn schedule_ranges(
1742 &self,
1743 ranges: &[Range<u64>],
1744 io: &Arc<dyn EncodingsIo>,
1745 ) -> Result<Vec<PageLoadTask>> {
1746 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1747
1748 let page_meta = self.page_meta.as_ref().unwrap();
1749
1750 let chunk_instructions =
1751 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1752
1753 debug_assert_eq!(
1754 num_rows,
1755 chunk_instructions
1756 .iter()
1757 .map(|ci| ci.rows_to_take)
1758 .sum::<u64>()
1759 );
1760
1761 let chunks_needed = chunk_instructions
1762 .iter()
1763 .map(|ci| ci.chunk_idx)
1764 .unique()
1765 .collect::<Vec<_>>();
1766
1767 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1768 let chunk_ranges = loaded_chunks
1769 .iter()
1770 .map(|c| c.byte_range.clone())
1771 .collect::<Vec<_>>();
1772 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1773
1774 let rep_decompressor = self.rep_decompressor.clone();
1775 let def_decompressor = self.def_decompressor.clone();
1776 let value_decompressor = self.value_decompressor.clone();
1777 let num_buffers = self.num_buffers;
1778 let dictionary = page_meta
1779 .dictionary
1780 .as_ref()
1781 .map(|dictionary| dictionary.clone());
1782 let def_meaning = self.def_meaning.clone();
1783
1784 let res = async move {
1785 let loaded_chunk_data = loaded_chunk_data.await?;
1786 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1787 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1788 }
1789
1790 Ok(Box::new(MiniBlockDecoder {
1791 rep_decompressor,
1792 def_decompressor,
1793 value_decompressor,
1794 def_meaning,
1795 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1796 instructions: VecDeque::from(chunk_instructions),
1797 offset_in_current_chunk: 0,
1798 dictionary,
1799 num_rows,
1800 num_buffers,
1801 }) as Box<dyn StructuralPageDecoder>)
1802 }
1803 .boxed();
1804 let page_load_task = PageLoadTask {
1805 decoder_fut: res,
1806 num_rows,
1807 };
1808 Ok(vec![page_load_task])
1809 }
1810}
1811
1812#[derive(Debug, Clone, Copy)]
1813struct FullZipRepIndexDetails {
1814 buf_position: u64,
1815 bytes_per_value: u64, }
1817
1818#[derive(Debug)]
1819enum PerValueDecompressor {
1820 Fixed(Arc<dyn FixedPerValueDecompressor>),
1821 Variable(Arc<dyn VariablePerValueDecompressor>),
1822}
1823
1824#[derive(Debug)]
1825struct FullZipDecodeDetails {
1826 value_decompressor: PerValueDecompressor,
1827 def_meaning: Arc<[DefinitionInterpretation]>,
1828 ctrl_word_parser: ControlWordParser,
1829 max_rep: u16,
1830 max_visible_def: u16,
1831}
1832
1833#[derive(Debug)]
1841pub struct FullZipScheduler {
1842 data_buf_position: u64,
1843 rep_index: Option<FullZipRepIndexDetails>,
1844 priority: u64,
1845 rows_in_page: u64,
1846 bits_per_offset: u8,
1847 details: Arc<FullZipDecodeDetails>,
1848 cached_state: Option<Arc<FullZipCacheableState>>,
1850 enable_cache: bool,
1852}
1853
1854impl FullZipScheduler {
1855 fn try_new(
1856 buffer_offsets_and_sizes: &[(u64, u64)],
1857 priority: u64,
1858 rows_in_page: u64,
1859 layout: &pb21::FullZipLayout,
1860 decompressors: &dyn DecompressionStrategy,
1861 ) -> Result<Self> {
1862 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1866 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1867 let num_reps = rows_in_page + 1;
1868 let bytes_per_rep = len / num_reps;
1869 debug_assert_eq!(len % num_reps, 0);
1870 debug_assert!(
1871 bytes_per_rep == 1
1872 || bytes_per_rep == 2
1873 || bytes_per_rep == 4
1874 || bytes_per_rep == 8
1875 );
1876 FullZipRepIndexDetails {
1877 buf_position: *pos,
1878 bytes_per_value: bytes_per_rep,
1879 }
1880 });
1881
1882 let value_decompressor = match layout.details {
1883 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
1884 let decompressor = decompressors.create_fixed_per_value_decompressor(
1885 layout.value_compression.as_ref().unwrap(),
1886 )?;
1887 PerValueDecompressor::Fixed(decompressor.into())
1888 }
1889 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
1890 let decompressor = decompressors.create_variable_per_value_decompressor(
1891 layout.value_compression.as_ref().unwrap(),
1892 )?;
1893 PerValueDecompressor::Variable(decompressor.into())
1894 }
1895 None => {
1896 panic!("Full-zip layout must have a `details` field");
1897 }
1898 };
1899 let ctrl_word_parser = ControlWordParser::new(
1900 layout.bits_rep.try_into().unwrap(),
1901 layout.bits_def.try_into().unwrap(),
1902 );
1903 let def_meaning = layout
1904 .layers
1905 .iter()
1906 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1907 .collect::<Vec<_>>();
1908
1909 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1910 let max_visible_def = def_meaning
1911 .iter()
1912 .filter(|d| !d.is_list())
1913 .map(|d| d.num_def_levels())
1914 .sum();
1915
1916 let bits_per_offset = match layout.details {
1917 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
1918 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1919 bits_per_offset as u8
1920 }
1921 None => panic!("Full-zip layout must have a `details` field"),
1922 };
1923
1924 let details = Arc::new(FullZipDecodeDetails {
1925 value_decompressor,
1926 def_meaning: def_meaning.into(),
1927 ctrl_word_parser,
1928 max_rep,
1929 max_visible_def,
1930 });
1931 Ok(Self {
1932 data_buf_position,
1933 rep_index,
1934 details,
1935 priority,
1936 rows_in_page,
1937 bits_per_offset,
1938 cached_state: None,
1939 enable_cache: false, })
1941 }
1942
1943 fn create_decoder(
1945 details: Arc<FullZipDecodeDetails>,
1946 data: VecDeque<LanceBuffer>,
1947 num_rows: u64,
1948 bits_per_offset: u8,
1949 ) -> Result<Box<dyn StructuralPageDecoder>> {
1950 match &details.value_decompressor {
1951 PerValueDecompressor::Fixed(decompressor) => {
1952 let bits_per_value = decompressor.bits_per_value();
1953 if bits_per_value == 0 {
1954 return Err(lance_core::Error::Internal {
1955 message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1956 location: location!(),
1957 });
1958 }
1959 if bits_per_value % 8 != 0 {
1960 return Err(lance_core::Error::NotSupported {
1961 source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1962 location: location!(),
1963 });
1964 }
1965 let bytes_per_value = bits_per_value / 8;
1966 let total_bytes_per_value =
1967 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1968 Ok(Box::new(FixedFullZipDecoder {
1969 details,
1970 data,
1971 num_rows,
1972 offset_in_current: 0,
1973 bytes_per_value: bytes_per_value as usize,
1974 total_bytes_per_value,
1975 }) as Box<dyn StructuralPageDecoder>)
1976 }
1977 PerValueDecompressor::Variable(_decompressor) => {
1978 Ok(Box::new(VariableFullZipDecoder::new(
1979 details,
1980 data,
1981 num_rows,
1982 bits_per_offset,
1983 bits_per_offset,
1984 )))
1985 }
1986 }
1987 }
1988
1989 fn extract_byte_ranges_from_pairs(
1992 buffer: LanceBuffer,
1993 bytes_per_value: u64,
1994 data_buf_position: u64,
1995 ) -> Vec<Range<u64>> {
1996 ByteUnpacker::new(buffer, bytes_per_value as usize)
1997 .chunks(2)
1998 .into_iter()
1999 .map(|mut c| {
2000 let start = c.next().unwrap() + data_buf_position;
2001 let end = c.next().unwrap() + data_buf_position;
2002 start..end
2003 })
2004 .collect::<Vec<_>>()
2005 }
2006
2007 fn extract_byte_ranges_from_cached(
2010 buffer: &LanceBuffer,
2011 ranges: &[Range<u64>],
2012 bytes_per_value: u64,
2013 data_buf_position: u64,
2014 ) -> Vec<Range<u64>> {
2015 ranges
2016 .iter()
2017 .map(|r| {
2018 let start_offset = (r.start * bytes_per_value) as usize;
2019 let end_offset = (r.end * bytes_per_value) as usize;
2020
2021 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2022 let start_val =
2023 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2024 .next()
2025 .unwrap();
2026
2027 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2028 let end_val =
2029 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2030 .next()
2031 .unwrap();
2032
2033 (data_buf_position + start_val)..(data_buf_position + end_val)
2034 })
2035 .collect()
2036 }
2037
2038 fn compute_rep_index_ranges(
2040 ranges: &[Range<u64>],
2041 rep_index: &FullZipRepIndexDetails,
2042 ) -> Vec<Range<u64>> {
2043 ranges
2044 .iter()
2045 .flat_map(|r| {
2046 let first_val_start =
2047 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2048 let first_val_end = first_val_start + rep_index.bytes_per_value;
2049 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2050 let last_val_end = last_val_start + rep_index.bytes_per_value;
2051 [first_val_start..first_val_end, last_val_start..last_val_end]
2052 })
2053 .collect()
2054 }
2055
2056 async fn resolve_byte_ranges(
2058 data_buf_position: u64,
2059 ranges: &[Range<u64>],
2060 io: &Arc<dyn EncodingsIo>,
2061 rep_index: &FullZipRepIndexDetails,
2062 cached_state: Option<&Arc<FullZipCacheableState>>,
2063 priority: u64,
2064 ) -> Result<Vec<Range<u64>>> {
2065 if let Some(cached_state) = cached_state {
2066 Ok(Self::extract_byte_ranges_from_cached(
2068 &cached_state.rep_index_buffer,
2069 ranges,
2070 rep_index.bytes_per_value,
2071 data_buf_position,
2072 ))
2073 } else {
2074 let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
2076 let rep_data = io.submit_request(rep_ranges, priority).await?;
2077 let rep_buffer = LanceBuffer::concat(
2078 &rep_data
2079 .into_iter()
2080 .map(|d| LanceBuffer::from_bytes(d, 1))
2081 .collect::<Vec<_>>(),
2082 );
2083 Ok(Self::extract_byte_ranges_from_pairs(
2084 rep_buffer,
2085 rep_index.bytes_per_value,
2086 data_buf_position,
2087 ))
2088 }
2089 }
2090
2091 fn schedule_ranges_rep(
2093 &self,
2094 ranges: &[Range<u64>],
2095 io: &Arc<dyn EncodingsIo>,
2096 rep_index: FullZipRepIndexDetails,
2097 ) -> Result<Vec<PageLoadTask>> {
2098 let data_buf_position = self.data_buf_position;
2100 let cached_state = self.cached_state.clone();
2101 let priority = self.priority;
2102 let details = self.details.clone();
2103 let bits_per_offset = self.bits_per_offset;
2104 let ranges = ranges.to_vec();
2105 let io_clone = io.clone();
2106 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2107
2108 let load_task = async move {
2109 let byte_ranges = Self::resolve_byte_ranges(
2111 data_buf_position,
2112 &ranges,
2113 &io_clone,
2114 &rep_index,
2115 cached_state.as_ref(),
2116 priority,
2117 )
2118 .await?;
2119
2120 let data = io_clone.submit_request(byte_ranges, priority).await?;
2122 let data = data
2123 .into_iter()
2124 .map(|d| LanceBuffer::from_bytes(d, 1))
2125 .collect::<VecDeque<_>>();
2126
2127 let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2129
2130 Self::create_decoder(details, data, num_rows, bits_per_offset)
2132 }
2133 .boxed();
2134 let page_load_task = PageLoadTask {
2135 decoder_fut: load_task,
2136 num_rows,
2137 };
2138 Ok(vec![page_load_task])
2139 }
2140
2141 fn schedule_ranges_simple(
2145 &self,
2146 ranges: &[Range<u64>],
2147 io: &dyn EncodingsIo,
2148 ) -> Result<Vec<PageLoadTask>> {
2149 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2151
2152 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2153 unreachable!()
2154 };
2155
2156 let bits_per_value = decompressor.bits_per_value();
2158 assert_eq!(bits_per_value % 8, 0);
2159 let bytes_per_value = bits_per_value / 8;
2160 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2161 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2162 let byte_ranges = ranges.iter().map(|r| {
2163 debug_assert!(r.end <= self.rows_in_page);
2164 let start = self.data_buf_position + r.start * total_bytes_per_value;
2165 let end = self.data_buf_position + r.end * total_bytes_per_value;
2166 start..end
2167 });
2168
2169 let data = io.submit_request(byte_ranges.collect(), self.priority);
2171
2172 let details = self.details.clone();
2173
2174 let load_task = async move {
2175 let data = data.await?;
2176 let data = data
2177 .into_iter()
2178 .map(|d| LanceBuffer::from_bytes(d, 1))
2179 .collect();
2180 Ok(Box::new(FixedFullZipDecoder {
2181 details,
2182 data,
2183 num_rows,
2184 offset_in_current: 0,
2185 bytes_per_value: bytes_per_value as usize,
2186 total_bytes_per_value: total_bytes_per_value as usize,
2187 }) as Box<dyn StructuralPageDecoder>)
2188 }
2189 .boxed();
2190 let page_load_task = PageLoadTask {
2191 decoder_fut: load_task,
2192 num_rows,
2193 };
2194 Ok(vec![page_load_task])
2195 }
2196}
2197
2198#[derive(Debug)]
2200struct FullZipCacheableState {
2201 rep_index_buffer: LanceBuffer,
2203}
2204
2205impl DeepSizeOf for FullZipCacheableState {
2206 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2207 self.rep_index_buffer.len()
2208 }
2209}
2210
2211impl CachedPageData for FullZipCacheableState {
2212 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2213 self
2214 }
2215}
2216
2217impl StructuralPageScheduler for FullZipScheduler {
2218 fn initialize<'a>(
2221 &'a mut self,
2222 io: &Arc<dyn EncodingsIo>,
2223 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2224 if self.enable_cache && self.rep_index.is_some() {
2226 let rep_index = self.rep_index.as_ref().unwrap();
2227 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2229 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2230
2231 let io_clone = io.clone();
2233 let future = async move {
2234 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2235 let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2236
2237 Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2239 };
2240
2241 future.boxed()
2242 } else {
2243 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2245 }
2246 }
2247
2248 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2252 if let Ok(cached_state) = cache
2254 .clone()
2255 .as_arc_any()
2256 .downcast::<FullZipCacheableState>()
2257 {
2258 self.cached_state = Some(cached_state);
2260 }
2261 }
2262
2263 fn schedule_ranges(
2264 &self,
2265 ranges: &[Range<u64>],
2266 io: &Arc<dyn EncodingsIo>,
2267 ) -> Result<Vec<PageLoadTask>> {
2268 if let Some(rep_index) = self.rep_index {
2269 self.schedule_ranges_rep(ranges, io, rep_index)
2270 } else {
2271 self.schedule_ranges_simple(ranges, io.as_ref())
2272 }
2273 }
2274}
2275
2276#[derive(Debug)]
2284struct FixedFullZipDecoder {
2285 details: Arc<FullZipDecodeDetails>,
2286 data: VecDeque<LanceBuffer>,
2287 offset_in_current: usize,
2288 bytes_per_value: usize,
2289 total_bytes_per_value: usize,
2290 num_rows: u64,
2291}
2292
2293impl FixedFullZipDecoder {
2294 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2295 debug_assert!(num_rows > 0);
2296 let cur_buf = self.data.front_mut().unwrap();
2297 let start = self.offset_in_current;
2298 if self.details.ctrl_word_parser.has_rep() {
2299 let mut rows_started = 0;
2302 let mut num_items = 0;
2305 while self.offset_in_current < cur_buf.len() {
2306 let control = self.details.ctrl_word_parser.parse_desc(
2307 &cur_buf[self.offset_in_current..],
2308 self.details.max_rep,
2309 self.details.max_visible_def,
2310 );
2311 if control.is_new_row {
2312 if rows_started == num_rows {
2313 break;
2314 }
2315 rows_started += 1;
2316 }
2317 num_items += 1;
2318 if control.is_visible {
2319 self.offset_in_current += self.total_bytes_per_value;
2320 } else {
2321 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2322 }
2323 }
2324
2325 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2326 if self.offset_in_current == cur_buf.len() {
2327 self.data.pop_front();
2328 self.offset_in_current = 0;
2329 }
2330
2331 FullZipDecodeTaskItem {
2332 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2333 data: task_slice,
2334 bits_per_value: self.bytes_per_value as u64 * 8,
2335 num_values: num_items,
2336 block_info: BlockInfo::new(),
2337 }),
2338 rows_in_buf: rows_started,
2339 }
2340 } else {
2341 let cur_buf = self.data.front_mut().unwrap();
2344 let bytes_avail = cur_buf.len() - self.offset_in_current;
2345 let offset_in_cur = self.offset_in_current;
2346
2347 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2348 let mut rows_taken = num_rows;
2349 let task_slice = if bytes_needed >= bytes_avail {
2350 self.offset_in_current = 0;
2351 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2352 self.data
2353 .pop_front()
2354 .unwrap()
2355 .slice_with_length(offset_in_cur, bytes_avail)
2356 } else {
2357 self.offset_in_current += bytes_needed;
2358 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2359 };
2360 FullZipDecodeTaskItem {
2361 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2362 data: task_slice,
2363 bits_per_value: self.bytes_per_value as u64 * 8,
2364 num_values: rows_taken,
2365 block_info: BlockInfo::new(),
2366 }),
2367 rows_in_buf: rows_taken,
2368 }
2369 }
2370 }
2371}
2372
2373impl StructuralPageDecoder for FixedFullZipDecoder {
2374 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2375 let mut task_data = Vec::with_capacity(self.data.len());
2376 let mut remaining = num_rows;
2377 while remaining > 0 {
2378 let task_item = self.slice_next_task(remaining);
2379 remaining -= task_item.rows_in_buf;
2380 task_data.push(task_item);
2381 }
2382 Ok(Box::new(FixedFullZipDecodeTask {
2383 details: self.details.clone(),
2384 data: task_data,
2385 bytes_per_value: self.bytes_per_value,
2386 num_rows: num_rows as usize,
2387 }))
2388 }
2389
2390 fn num_rows(&self) -> u64 {
2391 self.num_rows
2392 }
2393}
2394
2395#[derive(Debug)]
2400struct VariableFullZipDecoder {
2401 details: Arc<FullZipDecodeDetails>,
2402 decompressor: Arc<dyn VariablePerValueDecompressor>,
2403 data: LanceBuffer,
2404 offsets: LanceBuffer,
2405 rep: ScalarBuffer<u16>,
2406 def: ScalarBuffer<u16>,
2407 repdef_starts: Vec<usize>,
2408 data_starts: Vec<usize>,
2409 offset_starts: Vec<usize>,
2410 visible_item_counts: Vec<u64>,
2411 bits_per_offset: u8,
2412 current_idx: usize,
2413 num_rows: u64,
2414}
2415
2416impl VariableFullZipDecoder {
2417 fn new(
2418 details: Arc<FullZipDecodeDetails>,
2419 data: VecDeque<LanceBuffer>,
2420 num_rows: u64,
2421 in_bits_per_length: u8,
2422 out_bits_per_offset: u8,
2423 ) -> Self {
2424 let decompressor = match details.value_decompressor {
2425 PerValueDecompressor::Variable(ref d) => d.clone(),
2426 _ => unreachable!(),
2427 };
2428
2429 assert_eq!(in_bits_per_length % 8, 0);
2430 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2431
2432 let mut decoder = Self {
2433 details,
2434 decompressor,
2435 data: LanceBuffer::empty(),
2436 offsets: LanceBuffer::empty(),
2437 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2438 def: LanceBuffer::empty().borrow_to_typed_slice(),
2439 bits_per_offset: out_bits_per_offset,
2440 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2441 data_starts: Vec::with_capacity(num_rows as usize + 1),
2442 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2443 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2444 current_idx: 0,
2445 num_rows,
2446 };
2447
2448 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2469
2470 decoder
2471 }
2472
2473 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2474 match bits_per_offset {
2475 8 => *data.get_unchecked(0) as u64,
2476 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2477 32 => u32::from_le_bytes([
2478 *data.get_unchecked(0),
2479 *data.get_unchecked(1),
2480 *data.get_unchecked(2),
2481 *data.get_unchecked(3),
2482 ]) as u64,
2483 64 => u64::from_le_bytes([
2484 *data.get_unchecked(0),
2485 *data.get_unchecked(1),
2486 *data.get_unchecked(2),
2487 *data.get_unchecked(3),
2488 *data.get_unchecked(4),
2489 *data.get_unchecked(5),
2490 *data.get_unchecked(6),
2491 *data.get_unchecked(7),
2492 ]),
2493 _ => unreachable!(),
2494 }
2495 }
2496
2497 fn unzip(
2498 &mut self,
2499 data: VecDeque<LanceBuffer>,
2500 in_bits_per_length: u8,
2501 out_bits_per_offset: u8,
2502 num_rows: u64,
2503 ) {
2504 let mut rep = Vec::with_capacity(num_rows as usize);
2506 let mut def = Vec::with_capacity(num_rows as usize);
2507 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2508
2509 let bytes_per_offset = out_bits_per_offset as usize / 8;
2512 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2513 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2514
2515 let bytes_per_length = in_bits_per_length as usize / 8;
2516 let bytes_lengths = bytes_per_length * num_rows as usize;
2517
2518 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2519 let mut unzipped_data =
2522 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2523
2524 let mut current_offset = 0_u64;
2525 let mut visible_item_count = 0_u64;
2526 for databuf in data.into_iter() {
2527 let mut databuf = databuf.as_ref();
2528 while !databuf.is_empty() {
2529 let data_start = unzipped_data.len();
2530 let offset_start = offsets_data.len();
2531 let repdef_start = rep.len().max(def.len());
2534 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2536 databuf,
2537 self.details.max_rep,
2538 self.details.max_visible_def,
2539 );
2540 self.details
2541 .ctrl_word_parser
2542 .parse(databuf, &mut rep, &mut def);
2543 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2544
2545 if ctrl_desc.is_new_row {
2546 self.repdef_starts.push(repdef_start);
2547 self.data_starts.push(data_start);
2548 self.offset_starts.push(offset_start);
2549 self.visible_item_counts.push(visible_item_count);
2550 }
2551 if ctrl_desc.is_visible {
2552 visible_item_count += 1;
2553 if ctrl_desc.is_valid_item {
2554 debug_assert!(databuf.len() >= bytes_per_length);
2556 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2557 match out_bits_per_offset {
2558 32 => offsets_data
2559 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2560 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2561 _ => unreachable!(),
2562 };
2563 databuf = &databuf[bytes_per_offset..];
2564 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2565 databuf = &databuf[length as usize..];
2566 current_offset += length;
2567 } else {
2568 match out_bits_per_offset {
2570 32 => offsets_data
2571 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2572 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2573 _ => unreachable!(),
2574 }
2575 }
2576 }
2577 }
2578 }
2579 self.repdef_starts.push(rep.len().max(def.len()));
2580 self.data_starts.push(unzipped_data.len());
2581 self.offset_starts.push(offsets_data.len());
2582 self.visible_item_counts.push(visible_item_count);
2583 match out_bits_per_offset {
2584 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2585 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2586 _ => unreachable!(),
2587 };
2588 self.rep = ScalarBuffer::from(rep);
2589 self.def = ScalarBuffer::from(def);
2590 self.data = LanceBuffer::from(unzipped_data);
2591 self.offsets = LanceBuffer::from(offsets_data);
2592 }
2593}
2594
2595impl StructuralPageDecoder for VariableFullZipDecoder {
2596 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2597 let start = self.current_idx;
2598 let end = start + num_rows as usize;
2599
2600 let data = self.data.clone();
2608
2609 let offset_start = self.offset_starts[start];
2610 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2611 let offsets = self
2612 .offsets
2613 .slice_with_length(offset_start, offset_end - offset_start);
2614
2615 let repdef_start = self.repdef_starts[start];
2616 let repdef_end = self.repdef_starts[end];
2617 let rep = if self.rep.is_empty() {
2618 self.rep.clone()
2619 } else {
2620 self.rep.slice(repdef_start, repdef_end - repdef_start)
2621 };
2622 let def = if self.def.is_empty() {
2623 self.def.clone()
2624 } else {
2625 self.def.slice(repdef_start, repdef_end - repdef_start)
2626 };
2627
2628 let visible_item_counts_start = self.visible_item_counts[start];
2629 let visible_item_counts_end = self.visible_item_counts[end];
2630 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2631
2632 self.current_idx += num_rows as usize;
2633
2634 Ok(Box::new(VariableFullZipDecodeTask {
2635 details: self.details.clone(),
2636 decompressor: self.decompressor.clone(),
2637 data,
2638 offsets,
2639 bits_per_offset: self.bits_per_offset,
2640 num_visible_items,
2641 rep,
2642 def,
2643 }))
2644 }
2645
2646 fn num_rows(&self) -> u64 {
2647 self.num_rows
2648 }
2649}
2650
2651#[derive(Debug)]
2652struct VariableFullZipDecodeTask {
2653 details: Arc<FullZipDecodeDetails>,
2654 decompressor: Arc<dyn VariablePerValueDecompressor>,
2655 data: LanceBuffer,
2656 offsets: LanceBuffer,
2657 bits_per_offset: u8,
2658 num_visible_items: u64,
2659 rep: ScalarBuffer<u16>,
2660 def: ScalarBuffer<u16>,
2661}
2662
2663impl DecodePageTask for VariableFullZipDecodeTask {
2664 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2665 let block = VariableWidthBlock {
2666 data: self.data,
2667 offsets: self.offsets,
2668 bits_per_offset: self.bits_per_offset,
2669 num_values: self.num_visible_items,
2670 block_info: BlockInfo::new(),
2671 };
2672 let decomopressed = self.decompressor.decompress(block)?;
2673 let rep = if self.rep.is_empty() {
2674 None
2675 } else {
2676 Some(self.rep.to_vec())
2677 };
2678 let def = if self.def.is_empty() {
2679 None
2680 } else {
2681 Some(self.def.to_vec())
2682 };
2683 let unraveler = RepDefUnraveler::new(
2684 rep,
2685 def,
2686 self.details.def_meaning.clone(),
2687 self.num_visible_items,
2688 );
2689 Ok(DecodedPage {
2690 data: decomopressed,
2691 repdef: unraveler,
2692 })
2693 }
2694}
2695
2696#[derive(Debug)]
2697struct FullZipDecodeTaskItem {
2698 data: PerValueDataBlock,
2699 rows_in_buf: u64,
2700}
2701
2702#[derive(Debug)]
2705struct FixedFullZipDecodeTask {
2706 details: Arc<FullZipDecodeDetails>,
2707 data: Vec<FullZipDecodeTaskItem>,
2708 num_rows: usize,
2709 bytes_per_value: usize,
2710}
2711
2712impl DecodePageTask for FixedFullZipDecodeTask {
2713 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2714 let estimated_size_bytes = self
2716 .data
2717 .iter()
2718 .map(|task_item| task_item.data.data_size() as usize)
2719 .sum::<usize>()
2720 * 2;
2721 let mut data_builder =
2722 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2723
2724 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2725 for task_item in self.data.into_iter() {
2729 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2730 unreachable!()
2731 };
2732 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2733 else {
2734 unreachable!()
2735 };
2736 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2737 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2738 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2739 }
2740
2741 let unraveler = RepDefUnraveler::new(
2742 None,
2743 None,
2744 self.details.def_meaning.clone(),
2745 self.num_rows as u64,
2746 );
2747
2748 Ok(DecodedPage {
2749 data: data_builder.finish(),
2750 repdef: unraveler,
2751 })
2752 } else {
2753 let mut rep = Vec::with_capacity(self.num_rows);
2755 let mut def = Vec::with_capacity(self.num_rows);
2756
2757 for task_item in self.data.into_iter() {
2758 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2759 unreachable!()
2760 };
2761 let mut buf_slice = fixed_data.data.as_ref();
2762 let num_values = fixed_data.num_values as usize;
2763 let mut values = Vec::with_capacity(
2766 fixed_data.data.len()
2767 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2768 );
2769 let mut visible_items = 0;
2770 for _ in 0..num_values {
2771 self.details
2773 .ctrl_word_parser
2774 .parse(buf_slice, &mut rep, &mut def);
2775 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2776
2777 let is_visible = def
2778 .last()
2779 .map(|d| *d <= self.details.max_visible_def)
2780 .unwrap_or(true);
2781 if is_visible {
2782 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2784 buf_slice = &buf_slice[self.bytes_per_value..];
2785 visible_items += 1;
2786 }
2787 }
2788
2789 let values_buf = LanceBuffer::from(values);
2791 let fixed_data = FixedWidthDataBlock {
2792 bits_per_value: self.bytes_per_value as u64 * 8,
2793 block_info: BlockInfo::new(),
2794 data: values_buf,
2795 num_values: visible_items,
2796 };
2797 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2798 else {
2799 unreachable!()
2800 };
2801 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2802 data_builder.append(&decompressed, 0..visible_items);
2803 }
2804
2805 let repetition = if rep.is_empty() { None } else { Some(rep) };
2806 let definition = if def.is_empty() { None } else { Some(def) };
2807
2808 let unraveler = RepDefUnraveler::new(
2809 repetition,
2810 definition,
2811 self.details.def_meaning.clone(),
2812 self.num_rows as u64,
2813 );
2814 let data = data_builder.finish();
2815
2816 Ok(DecodedPage {
2817 data,
2818 repdef: unraveler,
2819 })
2820 }
2821 }
2822}
2823
2824#[derive(Debug)]
2825struct StructuralPrimitiveFieldSchedulingJob<'a> {
2826 scheduler: &'a StructuralPrimitiveFieldScheduler,
2827 ranges: Vec<Range<u64>>,
2828 page_idx: usize,
2829 range_idx: usize,
2830 global_row_offset: u64,
2831}
2832
2833impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2834 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2835 Self {
2836 scheduler,
2837 ranges,
2838 page_idx: 0,
2839 range_idx: 0,
2840 global_row_offset: 0,
2841 }
2842 }
2843}
2844
2845impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2846 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
2847 if self.range_idx >= self.ranges.len() {
2848 return Ok(Vec::new());
2849 }
2850 let mut range = self.ranges[self.range_idx].clone();
2852 let priority = range.start;
2853
2854 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2855 trace!(
2856 "Current range is {:?} and current page has {} rows",
2857 range,
2858 cur_page.num_rows
2859 );
2860 while cur_page.num_rows + self.global_row_offset <= range.start {
2862 self.global_row_offset += cur_page.num_rows;
2863 self.page_idx += 1;
2864 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2865 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2866 }
2867
2868 let mut ranges_in_page = Vec::new();
2872 while cur_page.num_rows + self.global_row_offset > range.start {
2873 range.start = range.start.max(self.global_row_offset);
2874 let start_in_page = range.start - self.global_row_offset;
2875 let end_in_page = start_in_page + (range.end - range.start);
2876 let end_in_page = end_in_page.min(cur_page.num_rows);
2877 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2878
2879 ranges_in_page.push(start_in_page..end_in_page);
2880 if last_in_range {
2881 self.range_idx += 1;
2882 if self.range_idx == self.ranges.len() {
2883 break;
2884 }
2885 range = self.ranges[self.range_idx].clone();
2886 } else {
2887 break;
2888 }
2889 }
2890
2891 trace!(
2892 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2893 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
2894 ranges_in_page.len(),
2895 cur_page.num_rows,
2896 priority,
2897 self.scheduler.column_index,
2898 cur_page.page_index,
2899 );
2900
2901 self.global_row_offset += cur_page.num_rows;
2902 self.page_idx += 1;
2903
2904 let page_decoders = cur_page
2905 .scheduler
2906 .schedule_ranges(&ranges_in_page, context.io())?;
2907
2908 let cur_path = context.current_path();
2909 page_decoders
2910 .into_iter()
2911 .map(|page_load_task| {
2912 let cur_path = cur_path.clone();
2913 let page_decoder = page_load_task.decoder_fut;
2914 let unloaded_page = async move {
2915 let page_decoder = page_decoder.await?;
2916 Ok(LoadedPageShard {
2917 decoder: page_decoder,
2918 path: cur_path,
2919 })
2920 }
2921 .boxed();
2922 Ok(ScheduledScanLine {
2923 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
2924 rows_scheduled: page_load_task.num_rows,
2925 })
2926 })
2927 .collect::<Result<Vec<_>>>()
2928 }
2929}
2930
2931#[derive(Debug)]
2932struct PageInfoAndScheduler {
2933 page_index: usize,
2934 num_rows: u64,
2935 scheduler: Box<dyn StructuralPageScheduler>,
2936}
2937
2938#[derive(Debug)]
2943pub struct StructuralPrimitiveFieldScheduler {
2944 page_schedulers: Vec<PageInfoAndScheduler>,
2945 column_index: u32,
2946}
2947
2948impl StructuralPrimitiveFieldScheduler {
2949 pub fn try_new(
2950 column_info: &ColumnInfo,
2951 decompressors: &dyn DecompressionStrategy,
2952 cache_repetition_index: bool,
2953 target_field: &Field,
2954 ) -> Result<Self> {
2955 let page_schedulers = column_info
2956 .page_infos
2957 .iter()
2958 .enumerate()
2959 .map(|(page_index, page_info)| {
2960 Self::page_info_to_scheduler(
2961 page_info,
2962 page_index,
2963 decompressors,
2964 cache_repetition_index,
2965 target_field,
2966 )
2967 })
2968 .collect::<Result<Vec<_>>>()?;
2969 Ok(Self {
2970 page_schedulers,
2971 column_index: column_info.index,
2972 })
2973 }
2974
2975 fn page_layout_to_scheduler(
2976 page_info: &PageInfo,
2977 page_layout: &PageLayout,
2978 decompressors: &dyn DecompressionStrategy,
2979 cache_repetition_index: bool,
2980 target_field: &Field,
2981 ) -> Result<Box<dyn StructuralPageScheduler>> {
2982 use pb21::page_layout::Layout;
2983 Ok(match page_layout.layout.as_ref().expect_ok()? {
2984 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
2985 &page_info.buffer_offsets_and_sizes,
2986 page_info.priority,
2987 mini_block.num_items,
2988 mini_block,
2989 decompressors,
2990 )?),
2991 Layout::FullZipLayout(full_zip) => {
2992 let mut scheduler = FullZipScheduler::try_new(
2993 &page_info.buffer_offsets_and_sizes,
2994 page_info.priority,
2995 page_info.num_rows,
2996 full_zip,
2997 decompressors,
2998 )?;
2999 scheduler.enable_cache = cache_repetition_index;
3000 Box::new(scheduler)
3001 }
3002 Layout::AllNullLayout(all_null) => {
3003 let def_meaning = all_null
3004 .layers
3005 .iter()
3006 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3007 .collect::<Vec<_>>();
3008 if def_meaning.len() == 1
3009 && def_meaning[0] == DefinitionInterpretation::NullableItem
3010 {
3011 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3012 } else {
3013 Box::new(ComplexAllNullScheduler::new(
3014 page_info.buffer_offsets_and_sizes.clone(),
3015 def_meaning.into(),
3016 )) as Box<dyn StructuralPageScheduler>
3017 }
3018 }
3019 Layout::BlobLayout(blob) => {
3020 let inner_scheduler = Self::page_layout_to_scheduler(
3021 page_info,
3022 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3023 decompressors,
3024 cache_repetition_index,
3025 target_field,
3026 )?;
3027 let def_meaning = blob
3028 .layers
3029 .iter()
3030 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3031 .collect::<Vec<_>>();
3032 if matches!(target_field.data_type(), DataType::Struct(_)) {
3033 Box::new(BlobDescriptionPageScheduler::new(
3035 inner_scheduler,
3036 def_meaning.into(),
3037 ))
3038 } else {
3039 Box::new(BlobPageScheduler::new(
3041 inner_scheduler,
3042 page_info.priority,
3043 page_info.num_rows,
3044 def_meaning.into(),
3045 ))
3046 }
3047 }
3048 })
3049 }
3050
3051 fn page_info_to_scheduler(
3052 page_info: &PageInfo,
3053 page_index: usize,
3054 decompressors: &dyn DecompressionStrategy,
3055 cache_repetition_index: bool,
3056 target_field: &Field,
3057 ) -> Result<PageInfoAndScheduler> {
3058 let page_layout = page_info.encoding.as_structural();
3059 let scheduler = Self::page_layout_to_scheduler(
3060 page_info,
3061 page_layout,
3062 decompressors,
3063 cache_repetition_index,
3064 target_field,
3065 )?;
3066 Ok(PageInfoAndScheduler {
3067 page_index,
3068 num_rows: page_info.num_rows,
3069 scheduler,
3070 })
3071 }
3072}
3073
3074pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3075 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3076}
3077
3078pub struct NoCachedPageData;
3079
3080impl DeepSizeOf for NoCachedPageData {
3081 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3082 0
3083 }
3084}
3085impl CachedPageData for NoCachedPageData {
3086 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3087 self
3088 }
3089}
3090
3091pub struct CachedFieldData {
3092 pages: Vec<Arc<dyn CachedPageData>>,
3093}
3094
3095impl DeepSizeOf for CachedFieldData {
3096 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3097 self.pages.deep_size_of_children(ctx)
3098 }
3099}
3100
3101#[derive(Debug, Clone)]
3103pub struct FieldDataCacheKey {
3104 pub column_index: u32,
3105}
3106
3107impl CacheKey for FieldDataCacheKey {
3108 type ValueType = CachedFieldData;
3109
3110 fn key(&self) -> std::borrow::Cow<'_, str> {
3111 self.column_index.to_string().into()
3112 }
3113}
3114
3115impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3116 fn initialize<'a>(
3117 &'a mut self,
3118 _filter: &'a FilterExpression,
3119 context: &'a SchedulerContext,
3120 ) -> BoxFuture<'a, Result<()>> {
3121 let cache_key = FieldDataCacheKey {
3122 column_index: self.column_index,
3123 };
3124 let cache = context.cache().clone();
3125
3126 async move {
3127 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3128 self.page_schedulers
3129 .iter_mut()
3130 .zip(cached_data.pages.iter())
3131 .for_each(|(page_scheduler, cached_data)| {
3132 page_scheduler.scheduler.load(cached_data);
3133 });
3134 return Ok(());
3135 }
3136
3137 let page_data = self
3138 .page_schedulers
3139 .iter_mut()
3140 .map(|s| s.scheduler.initialize(context.io()))
3141 .collect::<FuturesOrdered<_>>();
3142
3143 let page_data = page_data.try_collect::<Vec<_>>().await?;
3144 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3145 cache.insert_with_key(&cache_key, cached_data).await;
3146 Ok(())
3147 }
3148 .boxed()
3149 }
3150
3151 fn schedule_ranges<'a>(
3152 &'a self,
3153 ranges: &[Range<u64>],
3154 _filter: &FilterExpression,
3155 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3156 let ranges = ranges.to_vec();
3157 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3158 self, ranges,
3159 )))
3160 }
3161}
3162
3163#[derive(Debug)]
3166pub struct StructuralCompositeDecodeArrayTask {
3167 tasks: Vec<Box<dyn DecodePageTask>>,
3168 should_validate: bool,
3169 data_type: DataType,
3170}
3171
3172impl StructuralCompositeDecodeArrayTask {
3173 fn restore_validity(
3174 array: Arc<dyn Array>,
3175 unraveler: &mut CompositeRepDefUnraveler,
3176 ) -> Arc<dyn Array> {
3177 let validity = unraveler.unravel_validity(array.len());
3178 let Some(validity) = validity else {
3179 return array;
3180 };
3181 if array.data_type() == &DataType::Null {
3182 return array;
3184 }
3185 assert_eq!(validity.len(), array.len());
3186 make_array(unsafe {
3189 array
3190 .to_data()
3191 .into_builder()
3192 .nulls(Some(validity))
3193 .build_unchecked()
3194 })
3195 }
3196}
3197
3198impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3199 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3200 let mut arrays = Vec::with_capacity(self.tasks.len());
3201 let mut unravelers = Vec::with_capacity(self.tasks.len());
3202 for task in self.tasks {
3203 let decoded = task.decode()?;
3204 unravelers.push(decoded.repdef);
3205
3206 let array = make_array(
3207 decoded
3208 .data
3209 .into_arrow(self.data_type.clone(), self.should_validate)?,
3210 );
3211
3212 arrays.push(array);
3213 }
3214 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3215 let array = arrow_select::concat::concat(&array_refs)?;
3216 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3217
3218 let array = Self::restore_validity(array, &mut repdef);
3219
3220 Ok(DecodedArray { array, repdef })
3221 }
3222}
3223
3224#[derive(Debug)]
3225pub struct StructuralPrimitiveFieldDecoder {
3226 field: Arc<ArrowField>,
3227 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3228 should_validate: bool,
3229 rows_drained_in_current: u64,
3230}
3231
3232impl StructuralPrimitiveFieldDecoder {
3233 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3234 Self {
3235 field: field.clone(),
3236 page_decoders: VecDeque::new(),
3237 should_validate,
3238 rows_drained_in_current: 0,
3239 }
3240 }
3241}
3242
3243impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3244 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3245 assert!(child.path.is_empty());
3246 self.page_decoders.push_back(child.decoder);
3247 Ok(())
3248 }
3249
3250 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3251 let mut remaining = num_rows;
3252 let mut tasks = Vec::new();
3253 while remaining > 0 {
3254 let cur_page = self.page_decoders.front_mut().unwrap();
3255 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3256 let to_take = num_in_page.min(remaining);
3257
3258 let task = cur_page.drain(to_take)?;
3259 tasks.push(task);
3260
3261 if to_take == num_in_page {
3262 self.page_decoders.pop_front();
3263 self.rows_drained_in_current = 0;
3264 } else {
3265 self.rows_drained_in_current += to_take;
3266 }
3267
3268 remaining -= to_take;
3269 }
3270 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3271 tasks,
3272 should_validate: self.should_validate,
3273 data_type: self.field.data_type().clone(),
3274 }))
3275 }
3276
3277 fn data_type(&self) -> &DataType {
3278 self.field.data_type()
3279 }
3280}
3281
3282struct SerializedFullZip {
3284 values: LanceBuffer,
3286 repetition_index: Option<LanceBuffer>,
3288}
3289
3290const MINIBLOCK_ALIGNMENT: usize = 8;
3310
3311pub struct PrimitiveStructuralEncoder {
3338 accumulation_queue: AccumulationQueue,
3340
3341 keep_original_array: bool,
3342 accumulated_repdefs: Vec<RepDefBuilder>,
3343 compression_strategy: Arc<dyn CompressionStrategy>,
3345 column_index: u32,
3346 field: Field,
3347 encoding_metadata: Arc<HashMap<String, String>>,
3348}
3349
3350struct CompressedLevelsChunk {
3351 data: LanceBuffer,
3352 num_levels: u16,
3353}
3354
3355struct CompressedLevels {
3356 data: Vec<CompressedLevelsChunk>,
3357 compression: CompressiveEncoding,
3358 rep_index: Option<LanceBuffer>,
3359}
3360
3361struct SerializedMiniBlockPage {
3362 num_buffers: u64,
3363 data: LanceBuffer,
3364 metadata: LanceBuffer,
3365}
3366
3367impl PrimitiveStructuralEncoder {
3368 pub fn try_new(
3369 options: &EncodingOptions,
3370 compression_strategy: Arc<dyn CompressionStrategy>,
3371 column_index: u32,
3372 field: Field,
3373 encoding_metadata: Arc<HashMap<String, String>>,
3374 ) -> Result<Self> {
3375 Ok(Self {
3376 accumulation_queue: AccumulationQueue::new(
3377 options.cache_bytes_per_column,
3378 column_index,
3379 options.keep_original_array,
3380 ),
3381 keep_original_array: options.keep_original_array,
3382 accumulated_repdefs: Vec::new(),
3383 column_index,
3384 compression_strategy,
3385 field,
3386 encoding_metadata,
3387 })
3388 }
3389
3390 fn is_narrow(data_block: &DataBlock) -> bool {
3398 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3399
3400 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3401 let max_len_array = max_len_array
3402 .as_any()
3403 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3404 .unwrap();
3405 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3406 return true;
3407 }
3408 }
3409 false
3410 }
3411
3412 fn prefers_miniblock(
3413 data_block: &DataBlock,
3414 encoding_metadata: &HashMap<String, String>,
3415 ) -> bool {
3416 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3418 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3419 }
3420 Self::is_narrow(data_block)
3422 }
3423
3424 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3425 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3429 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3430 }
3431 true
3432 }
3433
3434 fn serialize_miniblocks(
3481 miniblocks: MiniBlockCompressed,
3482 rep: Option<Vec<CompressedLevelsChunk>>,
3483 def: Option<Vec<CompressedLevelsChunk>>,
3484 ) -> SerializedMiniBlockPage {
3485 let bytes_rep = rep
3486 .as_ref()
3487 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3488 .unwrap_or(0);
3489 let bytes_def = def
3490 .as_ref()
3491 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3492 .unwrap_or(0);
3493 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3494 let mut num_buffers = miniblocks.data.len();
3495 if rep.is_some() {
3496 num_buffers += 1;
3497 }
3498 if def.is_some() {
3499 num_buffers += 1;
3500 }
3501 let max_extra = 9 * num_buffers;
3503 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3504 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3505
3506 let mut rep_iter = rep.map(|r| r.into_iter());
3507 let mut def_iter = def.map(|d| d.into_iter());
3508
3509 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3510 for chunk in miniblocks.chunks {
3511 let start_pos = data_buffer.len();
3512 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3514
3515 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3516 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3517
3518 let num_levels = rep
3520 .as_ref()
3521 .map(|r| r.num_levels)
3522 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3523 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3524
3525 if let Some(rep) = rep.as_ref() {
3527 let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3528 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3529 }
3530 if let Some(def) = def.as_ref() {
3531 let bytes_def = u16::try_from(def.data.len()).unwrap();
3532 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3533 }
3534
3535 for buffer_size in &chunk.buffer_sizes {
3536 let bytes = *buffer_size;
3537 data_buffer.extend_from_slice(&bytes.to_le_bytes());
3538 }
3539
3540 let add_padding = |data_buffer: &mut Vec<u8>| {
3542 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3543 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3544 };
3545 add_padding(&mut data_buffer);
3546
3547 if let Some(rep) = rep.as_ref() {
3549 data_buffer.extend_from_slice(&rep.data);
3550 add_padding(&mut data_buffer);
3551 }
3552 if let Some(def) = def.as_ref() {
3553 data_buffer.extend_from_slice(&def.data);
3554 add_padding(&mut data_buffer);
3555 }
3556 for (buffer_size, (buffer, buffer_offset)) in chunk
3557 .buffer_sizes
3558 .iter()
3559 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3560 {
3561 let start = *buffer_offset;
3562 let end = start + *buffer_size as usize;
3563 *buffer_offset += *buffer_size as usize;
3564 data_buffer.extend_from_slice(&buffer[start..end]);
3565 add_padding(&mut data_buffer);
3566 }
3567
3568 let chunk_bytes = data_buffer.len() - start_pos;
3569 assert!(chunk_bytes <= 32 * 1024);
3570 assert!(chunk_bytes > 0);
3571 assert_eq!(chunk_bytes % 8, 0);
3572 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3576 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3577
3578 let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3579 meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3580 }
3581
3582 let data_buffer = LanceBuffer::from(data_buffer);
3583 let metadata_buffer = LanceBuffer::from(meta_buffer);
3584
3585 SerializedMiniBlockPage {
3586 num_buffers: miniblocks.data.len() as u64,
3587 data: data_buffer,
3588 metadata: metadata_buffer,
3589 }
3590 }
3591
3592 fn compress_levels(
3597 mut levels: RepDefSlicer<'_>,
3598 num_elements: u64,
3599 compression_strategy: &dyn CompressionStrategy,
3600 chunks: &[MiniBlockChunk],
3601 max_rep: u16,
3603 ) -> Result<CompressedLevels> {
3604 let mut rep_index = if max_rep > 0 {
3605 Vec::with_capacity(chunks.len())
3606 } else {
3607 vec![]
3608 };
3609 let num_levels = levels.num_levels() as u64;
3611 let levels_buf = levels.all_levels().clone();
3612
3613 let mut fixed_width_block = FixedWidthDataBlock {
3614 data: levels_buf,
3615 bits_per_value: 16,
3616 num_values: num_levels,
3617 block_info: BlockInfo::new(),
3618 };
3619 fixed_width_block.compute_stat();
3621
3622 let levels_block = DataBlock::FixedWidth(fixed_width_block);
3623 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3624 let (compressor, compressor_desc) =
3626 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3627 let mut level_chunks = Vec::with_capacity(chunks.len());
3629 let mut values_counter = 0;
3630 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3631 let chunk_num_values = chunk.num_values(values_counter, num_elements);
3632 debug_assert!(chunk_num_values > 0);
3633 values_counter += chunk_num_values;
3634 let chunk_levels = if chunk_idx < chunks.len() - 1 {
3635 levels.slice_next(chunk_num_values as usize)
3636 } else {
3637 levels.slice_rest()
3638 };
3639 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3640 if max_rep > 0 {
3641 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3651 let rep_values = rep_values.as_ref();
3652
3653 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3656 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3657 rep_values
3658 .iter()
3659 .rev()
3660 .position(|v| *v == max_rep)
3661 .map(|pos| pos + 1)
3663 .unwrap_or(rep_values.len())
3664 } else {
3665 0
3667 };
3668
3669 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
3670 let rep_len = rep_index.len();
3674 if rep_index[rep_len - 1] != 0 {
3675 rep_index[rep_len - 2] += 1;
3677 rep_index[rep_len - 1] = 0;
3678 }
3679 }
3680
3681 if chunk_idx == chunks.len() - 1 {
3682 num_rows += 1;
3684 }
3685 rep_index.push(num_rows as u64);
3686 rep_index.push(num_leftovers as u64);
3687 }
3688 let mut chunk_fixed_width = FixedWidthDataBlock {
3689 data: chunk_levels,
3690 bits_per_value: 16,
3691 num_values: num_chunk_levels,
3692 block_info: BlockInfo::new(),
3693 };
3694 chunk_fixed_width.compute_stat();
3695 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
3696 let compressed_levels = compressor.compress(chunk_levels_block)?;
3697 level_chunks.push(CompressedLevelsChunk {
3698 data: compressed_levels,
3699 num_levels: num_chunk_levels as u16,
3700 });
3701 }
3702 debug_assert_eq!(levels.num_levels_remaining(), 0);
3703 let rep_index = if rep_index.is_empty() {
3704 None
3705 } else {
3706 Some(LanceBuffer::reinterpret_vec(rep_index))
3707 };
3708 Ok(CompressedLevels {
3709 data: level_chunks,
3710 compression: compressor_desc,
3711 rep_index,
3712 })
3713 }
3714
3715 fn encode_simple_all_null(
3716 column_idx: u32,
3717 num_rows: u64,
3718 row_number: u64,
3719 ) -> Result<EncodedPage> {
3720 let description = ProtobufUtils21::simple_all_null_layout();
3721 Ok(EncodedPage {
3722 column_idx,
3723 data: vec![],
3724 description: PageEncoding::Structural(description),
3725 num_rows,
3726 row_number,
3727 })
3728 }
3729
3730 fn encode_complex_all_null(
3734 column_idx: u32,
3735 repdefs: Vec<RepDefBuilder>,
3736 row_number: u64,
3737 num_rows: u64,
3738 ) -> Result<EncodedPage> {
3739 let repdef = RepDefBuilder::serialize(repdefs);
3740
3741 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3743 LanceBuffer::reinterpret_slice(rep.clone())
3744 } else {
3745 LanceBuffer::empty()
3746 };
3747
3748 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3749 LanceBuffer::reinterpret_slice(def.clone())
3750 } else {
3751 LanceBuffer::empty()
3752 };
3753
3754 let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning);
3755 Ok(EncodedPage {
3756 column_idx,
3757 data: vec![rep_bytes, def_bytes],
3758 description: PageEncoding::Structural(description),
3759 num_rows,
3760 row_number,
3761 })
3762 }
3763
3764 #[allow(clippy::too_many_arguments)]
3765 fn encode_miniblock(
3766 column_idx: u32,
3767 field: &Field,
3768 compression_strategy: &dyn CompressionStrategy,
3769 data: DataBlock,
3770 repdefs: Vec<RepDefBuilder>,
3771 row_number: u64,
3772 dictionary_data: Option<DataBlock>,
3773 num_rows: u64,
3774 ) -> Result<EncodedPage> {
3775 let repdef = RepDefBuilder::serialize(repdefs);
3776
3777 if let DataBlock::AllNull(_null_block) = data {
3778 unreachable!()
3781 }
3782
3783 let num_items = data.num_values();
3784
3785 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3786 let (compressed_data, value_encoding) = compressor.compress(data)?;
3787
3788 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3789
3790 let mut compressed_rep = repdef
3791 .rep_slicer()
3792 .map(|rep_slicer| {
3793 Self::compress_levels(
3794 rep_slicer,
3795 num_items,
3796 compression_strategy,
3797 &compressed_data.chunks,
3798 max_rep,
3799 )
3800 })
3801 .transpose()?;
3802
3803 let (rep_index, rep_index_depth) =
3804 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3805 Some(rep_index) => (Some(rep_index.clone()), 1),
3806 None => (None, 0),
3807 };
3808
3809 let mut compressed_def = repdef
3810 .def_slicer()
3811 .map(|def_slicer| {
3812 Self::compress_levels(
3813 def_slicer,
3814 num_items,
3815 compression_strategy,
3816 &compressed_data.chunks,
3817 0,
3818 )
3819 })
3820 .transpose()?;
3821
3822 let rep_data = compressed_rep
3828 .as_mut()
3829 .map(|cr| std::mem::take(&mut cr.data));
3830 let def_data = compressed_def
3831 .as_mut()
3832 .map(|cd| std::mem::take(&mut cd.data));
3833
3834 let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3835
3836 let mut data = Vec::with_capacity(4);
3838 data.push(serialized.metadata);
3839 data.push(serialized.data);
3840
3841 if let Some(dictionary_data) = dictionary_data {
3842 let num_dictionary_items = dictionary_data.num_values();
3843 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3845
3846 let (compressor, dictionary_encoding) = compression_strategy
3847 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3848 let dictionary_buffer = compressor.compress(dictionary_data)?;
3849
3850 data.push(dictionary_buffer);
3851 if let Some(rep_index) = rep_index {
3852 data.push(rep_index);
3853 }
3854
3855 let description = ProtobufUtils21::miniblock_layout(
3856 compressed_rep.map(|cr| cr.compression),
3857 compressed_def.map(|cd| cd.compression),
3858 value_encoding,
3859 rep_index_depth,
3860 serialized.num_buffers,
3861 Some((dictionary_encoding, num_dictionary_items)),
3862 &repdef.def_meaning,
3863 num_items,
3864 );
3865 Ok(EncodedPage {
3866 num_rows,
3867 column_idx,
3868 data,
3869 description: PageEncoding::Structural(description),
3870 row_number,
3871 })
3872 } else {
3873 let description = ProtobufUtils21::miniblock_layout(
3874 compressed_rep.map(|cr| cr.compression),
3875 compressed_def.map(|cd| cd.compression),
3876 value_encoding,
3877 rep_index_depth,
3878 serialized.num_buffers,
3879 None,
3880 &repdef.def_meaning,
3881 num_items,
3882 );
3883
3884 if let Some(rep_index) = rep_index {
3885 let view = rep_index.borrow_to_typed_slice::<u64>();
3886 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3887 debug_assert_eq!(total, num_rows);
3888
3889 data.push(rep_index);
3890 }
3891
3892 Ok(EncodedPage {
3893 num_rows,
3894 column_idx,
3895 data,
3896 description: PageEncoding::Structural(description),
3897 row_number,
3898 })
3899 }
3900 }
3901
3902 fn serialize_full_zip_fixed(
3904 fixed: FixedWidthDataBlock,
3905 mut repdef: ControlWordIterator,
3906 num_values: u64,
3907 ) -> SerializedFullZip {
3908 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3909 let mut zipped_data = Vec::with_capacity(len);
3910
3911 let max_rep_index_val = if repdef.has_repetition() {
3912 len as u64
3913 } else {
3914 0
3916 };
3917 let mut rep_index_builder =
3918 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3919
3920 assert_eq!(
3923 fixed.bits_per_value % 8,
3924 0,
3925 "Non-byte aligned full-zip compression not yet supported"
3926 );
3927
3928 let bytes_per_value = fixed.bits_per_value as usize / 8;
3929 let mut offset = 0;
3930
3931 if bytes_per_value == 0 {
3932 while let Some(control) = repdef.append_next(&mut zipped_data) {
3934 if control.is_new_row {
3935 debug_assert!(offset <= len);
3937 unsafe { rep_index_builder.append(offset as u64) };
3939 }
3940 offset = zipped_data.len();
3941 }
3942 } else {
3943 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3945 while let Some(control) = repdef.append_next(&mut zipped_data) {
3946 if control.is_new_row {
3947 debug_assert!(offset <= len);
3949 unsafe { rep_index_builder.append(offset as u64) };
3951 }
3952 if control.is_visible {
3953 let value = data_iter.next().unwrap();
3954 zipped_data.extend_from_slice(value);
3955 }
3956 offset = zipped_data.len();
3957 }
3958 }
3959
3960 debug_assert_eq!(zipped_data.len(), len);
3961 unsafe {
3964 rep_index_builder.append(zipped_data.len() as u64);
3965 }
3966
3967 let zipped_data = LanceBuffer::from(zipped_data);
3968 let rep_index = rep_index_builder.into_data();
3969 let rep_index = if rep_index.is_empty() {
3970 None
3971 } else {
3972 Some(LanceBuffer::from(rep_index))
3973 };
3974 SerializedFullZip {
3975 values: zipped_data,
3976 repetition_index: rep_index,
3977 }
3978 }
3979
3980 fn serialize_full_zip_variable(
3984 variable: VariableWidthBlock,
3985 mut repdef: ControlWordIterator,
3986 num_items: u64,
3987 ) -> SerializedFullZip {
3988 let bytes_per_offset = variable.bits_per_offset as usize / 8;
3989 assert_eq!(
3990 variable.bits_per_offset % 8,
3991 0,
3992 "Only byte-aligned offsets supported"
3993 );
3994 let len = variable.data.len()
3995 + repdef.bytes_per_word() * num_items as usize
3996 + bytes_per_offset * variable.num_values as usize;
3997 let mut buf = Vec::with_capacity(len);
3998
3999 let max_rep_index_val = len as u64;
4000 let mut rep_index_builder =
4001 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4002
4003 match bytes_per_offset {
4005 4 => {
4006 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4007 let mut rep_offset = 0;
4008 let mut windows_iter = offs.as_ref().windows(2);
4009 while let Some(control) = repdef.append_next(&mut buf) {
4010 if control.is_new_row {
4011 debug_assert!(rep_offset <= len);
4013 unsafe { rep_index_builder.append(rep_offset as u64) };
4015 }
4016 if control.is_visible {
4017 let window = windows_iter.next().unwrap();
4018 if control.is_valid_item {
4019 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4020 buf.extend_from_slice(
4021 &variable.data[window[0] as usize..window[1] as usize],
4022 );
4023 }
4024 }
4025 rep_offset = buf.len();
4026 }
4027 }
4028 8 => {
4029 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4030 let mut rep_offset = 0;
4031 let mut windows_iter = offs.as_ref().windows(2);
4032 while let Some(control) = repdef.append_next(&mut buf) {
4033 if control.is_new_row {
4034 debug_assert!(rep_offset <= len);
4036 unsafe { rep_index_builder.append(rep_offset as u64) };
4038 }
4039 if control.is_visible {
4040 let window = windows_iter.next().unwrap();
4041 if control.is_valid_item {
4042 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4043 buf.extend_from_slice(
4044 &variable.data[window[0] as usize..window[1] as usize],
4045 );
4046 }
4047 }
4048 rep_offset = buf.len();
4049 }
4050 }
4051 _ => panic!("Unsupported offset size"),
4052 }
4053
4054 debug_assert!(buf.len() <= len);
4057 unsafe {
4060 rep_index_builder.append(buf.len() as u64);
4061 }
4062
4063 let zipped_data = LanceBuffer::from(buf);
4064 let rep_index = rep_index_builder.into_data();
4065 debug_assert!(!rep_index.is_empty());
4066 let rep_index = Some(LanceBuffer::from(rep_index));
4067 SerializedFullZip {
4068 values: zipped_data,
4069 repetition_index: rep_index,
4070 }
4071 }
4072
4073 fn serialize_full_zip(
4076 compressed_data: PerValueDataBlock,
4077 repdef: ControlWordIterator,
4078 num_items: u64,
4079 ) -> SerializedFullZip {
4080 match compressed_data {
4081 PerValueDataBlock::Fixed(fixed) => {
4082 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4083 }
4084 PerValueDataBlock::Variable(var) => {
4085 Self::serialize_full_zip_variable(var, repdef, num_items)
4086 }
4087 }
4088 }
4089
4090 fn encode_full_zip(
4091 column_idx: u32,
4092 field: &Field,
4093 compression_strategy: &dyn CompressionStrategy,
4094 data: DataBlock,
4095 repdefs: Vec<RepDefBuilder>,
4096 row_number: u64,
4097 num_lists: u64,
4098 ) -> Result<EncodedPage> {
4099 let repdef = RepDefBuilder::serialize(repdefs);
4100 let max_rep = repdef
4101 .repetition_levels
4102 .as_ref()
4103 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4104 let max_def = repdef
4105 .definition_levels
4106 .as_ref()
4107 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4108
4109 let (num_items, num_visible_items) =
4113 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4114 (rep_levels.len() as u64, data.num_values())
4117 } else {
4118 (data.num_values(), data.num_values())
4120 };
4121
4122 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4123
4124 let repdef_iter = build_control_word_iterator(
4125 repdef.repetition_levels.as_deref(),
4126 max_rep,
4127 repdef.definition_levels.as_deref(),
4128 max_def,
4129 max_visible_def,
4130 num_items as usize,
4131 );
4132 let bits_rep = repdef_iter.bits_rep();
4133 let bits_def = repdef_iter.bits_def();
4134
4135 let compressor = compression_strategy.create_per_value(field, &data)?;
4136 let (compressed_data, value_encoding) = compressor.compress(data)?;
4137
4138 let description = match &compressed_data {
4139 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4140 bits_rep,
4141 bits_def,
4142 fixed.bits_per_value as u32,
4143 value_encoding,
4144 &repdef.def_meaning,
4145 num_items as u32,
4146 num_visible_items as u32,
4147 ),
4148 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4149 bits_rep,
4150 bits_def,
4151 variable.bits_per_offset as u32,
4152 value_encoding,
4153 &repdef.def_meaning,
4154 num_items as u32,
4155 num_visible_items as u32,
4156 ),
4157 };
4158
4159 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4160
4161 let data = if let Some(repindex) = zipped.repetition_index {
4162 vec![zipped.values, repindex]
4163 } else {
4164 vec![zipped.values]
4165 };
4166
4167 Ok(EncodedPage {
4168 num_rows: num_lists,
4169 column_idx,
4170 data,
4171 description: PageEncoding::Structural(description),
4172 row_number,
4173 })
4174 }
4175
4176 fn estimate_dict_size(data_block: &DataBlock) -> Option<u64> {
4191 let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4192 cardinality_array.as_primitive::<UInt64Type>().value(0)
4193 } else {
4194 return None;
4195 };
4196
4197 let num_values = data_block.num_values();
4198
4199 match data_block {
4200 DataBlock::FixedWidth(_) => {
4201 let dict_size = cardinality * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
4203 let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8);
4205 Some(dict_size + indices_size)
4206 }
4207 DataBlock::VariableWidth(var) => {
4208 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4210 return None;
4211 }
4212 let bits_per_offset = var.bits_per_offset as u64;
4213
4214 let data_size = data_block.data_size();
4215 let avg_value_size = data_size / num_values;
4216
4217 let dict_values_size = cardinality * avg_value_size;
4219 let dict_offsets_size = cardinality * (bits_per_offset / 8);
4221 let indices_size = num_values * (bits_per_offset / 8);
4223
4224 Some(dict_values_size + dict_offsets_size + indices_size)
4225 }
4226 _ => None,
4227 }
4228 }
4229
4230 fn should_dictionary_encode(data_block: &DataBlock, field: &Field) -> bool {
4231 if !matches!(
4234 data_block,
4235 DataBlock::FixedWidth(_) | DataBlock::VariableWidth(_)
4236 ) {
4237 return false;
4238 }
4239
4240 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4242 .ok()
4243 .and_then(|val| val.parse().ok())
4244 .unwrap_or(100);
4245 if data_block.num_values() < too_small {
4246 return false;
4247 }
4248
4249 let threshold_ratio = field
4251 .metadata
4252 .get(DICT_SIZE_RATIO_META_KEY)
4253 .and_then(|val| val.parse::<f64>().ok())
4254 .or_else(|| {
4255 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4256 .ok()
4257 .and_then(|val| val.parse().ok())
4258 })
4259 .unwrap_or(0.8);
4260
4261 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4263 panic!(
4264 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4265 threshold_ratio
4266 );
4267 }
4268
4269 let data_size = data_block.data_size();
4271
4272 let Some(encoded_size) = Self::estimate_dict_size(data_block) else {
4274 return false;
4275 };
4276
4277 let size_ratio_actual = if data_size > 0 {
4278 encoded_size as f64 / data_size as f64
4279 } else {
4280 return false;
4281 };
4282 size_ratio_actual < threshold_ratio
4283 }
4284
4285 fn do_flush(
4287 &mut self,
4288 arrays: Vec<ArrayRef>,
4289 repdefs: Vec<RepDefBuilder>,
4290 row_number: u64,
4291 num_rows: u64,
4292 ) -> Result<Vec<EncodeTask>> {
4293 let column_idx = self.column_index;
4294 let compression_strategy = self.compression_strategy.clone();
4295 let field = self.field.clone();
4296 let encoding_metadata = self.encoding_metadata.clone();
4297 let task = spawn_cpu(move || {
4298 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4299
4300 if num_values == 0 {
4301 log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
4305 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4306 }
4307 let num_nulls = arrays
4308 .iter()
4309 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4310 .sum::<u64>();
4311
4312 if num_values == num_nulls {
4313 return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4314 log::debug!(
4315 "Encoding column {} with {} items ({} rows) using simple-null layout",
4316 column_idx,
4317 num_values,
4318 num_rows
4319 );
4320 Self::encode_simple_all_null(column_idx, num_values, row_number)
4322 } else {
4323 log::debug!(
4324 "Encoding column {} with {} items ({} rows) using complex-null layout",
4325 column_idx,
4326 num_values,
4327 num_rows
4328 );
4329 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4331 };
4332 }
4333
4334 if let DataType::Struct(fields) = &field.data_type() {
4335 if fields.is_empty() {
4336 if repdefs.iter().any(|rd| !rd.is_empty()) {
4337 return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() });
4338 }
4339 return Self::encode_simple_all_null(column_idx, num_values, row_number);
4342 }
4343 }
4344
4345 let data_block = DataBlock::from_arrays(&arrays, num_values);
4346
4347 let requires_full_zip_packed_struct =
4348 if let DataBlock::Struct(ref struct_data_block) = data_block {
4349 struct_data_block.has_variable_width_child()
4350 } else {
4351 false
4352 };
4353
4354 if requires_full_zip_packed_struct {
4355 log::debug!(
4356 "Encoding column {} with {} items using full-zip packed struct layout",
4357 column_idx,
4358 num_values
4359 );
4360 return Self::encode_full_zip(
4361 column_idx,
4362 &field,
4363 compression_strategy.as_ref(),
4364 data_block,
4365 repdefs,
4366 row_number,
4367 num_rows,
4368 );
4369 }
4370
4371 if let DataBlock::Dictionary(dict) = data_block {
4372 log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
4373 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
4374 indices_data_block.compute_stat();
4379 Self::encode_miniblock(
4380 column_idx,
4381 &field,
4382 compression_strategy.as_ref(),
4383 indices_data_block,
4384 repdefs,
4385 row_number,
4386 Some(dictionary_data_block),
4387 num_rows
4388 )
4389 } else if Self::should_dictionary_encode(&data_block, &field) {
4390 log::debug!(
4391 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4392 column_idx,
4393 num_values
4394 );
4395 let (indices_data_block, dictionary_data_block) =
4396 dict::dictionary_encode(data_block);
4397 Self::encode_miniblock(
4398 column_idx,
4399 &field,
4400 compression_strategy.as_ref(),
4401 indices_data_block,
4402 repdefs,
4403 row_number,
4404 Some(dictionary_data_block),
4405 num_rows,
4406 )
4407 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4408 log::debug!(
4409 "Encoding column {} with {} items using mini-block layout",
4410 column_idx,
4411 num_values
4412 );
4413 Self::encode_miniblock(
4414 column_idx,
4415 &field,
4416 compression_strategy.as_ref(),
4417 data_block,
4418 repdefs,
4419 row_number,
4420 None,
4421 num_rows,
4422 )
4423 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4424 log::debug!(
4425 "Encoding column {} with {} items using full-zip layout",
4426 column_idx,
4427 num_values
4428 );
4429 Self::encode_full_zip(
4430 column_idx,
4431 &field,
4432 compression_strategy.as_ref(),
4433 data_block,
4434 repdefs,
4435 row_number,
4436 num_rows,
4437 )
4438 } else {
4439 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4440 }
4441 })
4442 .boxed();
4443 Ok(vec![task])
4444 }
4445
4446 fn extract_validity_buf(
4447 array: Arc<dyn Array>,
4448 repdef: &mut RepDefBuilder,
4449 keep_original_array: bool,
4450 ) -> Result<Arc<dyn Array>> {
4451 if let Some(validity) = array.nulls() {
4452 if keep_original_array {
4453 repdef.add_validity_bitmap(validity.clone());
4454 } else {
4455 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4456 }
4457 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
4458 Ok(make_array(data_no_nulls))
4459 } else {
4460 repdef.add_no_null(array.len());
4461 Ok(array)
4462 }
4463 }
4464
4465 fn extract_validity(
4466 mut array: Arc<dyn Array>,
4467 repdef: &mut RepDefBuilder,
4468 keep_original_array: bool,
4469 ) -> Result<Arc<dyn Array>> {
4470 match array.data_type() {
4471 DataType::Null => {
4472 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4473 Ok(array)
4474 }
4475 DataType::Dictionary(_, _) => {
4476 array = dict::normalize_dict_nulls(array)?;
4477 Self::extract_validity_buf(array, repdef, keep_original_array)
4478 }
4479 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4488 }
4489 }
4490}
4491
4492impl FieldEncoder for PrimitiveStructuralEncoder {
4493 fn maybe_encode(
4495 &mut self,
4496 array: ArrayRef,
4497 _external_buffers: &mut OutOfLineBuffers,
4498 mut repdef: RepDefBuilder,
4499 row_number: u64,
4500 num_rows: u64,
4501 ) -> Result<Vec<EncodeTask>> {
4502 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
4503 self.accumulated_repdefs.push(repdef);
4504
4505 if let Some((arrays, row_number, num_rows)) =
4506 self.accumulation_queue.insert(array, row_number, num_rows)
4507 {
4508 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4509 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4510 } else {
4511 Ok(vec![])
4512 }
4513 }
4514
4515 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4517 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4518 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4519 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4520 } else {
4521 Ok(vec![])
4522 }
4523 }
4524
4525 fn num_columns(&self) -> u32 {
4526 1
4527 }
4528
4529 fn finish(
4530 &mut self,
4531 _external_buffers: &mut OutOfLineBuffers,
4532 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4533 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4534 }
4535}
4536
4537#[cfg(test)]
4538#[allow(clippy::single_range_in_vec_init)]
4539mod tests {
4540 use super::{
4541 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4542 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4543 FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4544 StructuralPageScheduler,
4545 };
4546 use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
4547 use crate::data::BlockInfo;
4548 use crate::decoder::PageEncoding;
4549 use crate::encodings::logical::primitive::{
4550 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4551 };
4552 use crate::format::pb21;
4553 use crate::format::pb21::compressive_encoding::Compression;
4554 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
4555 use crate::version::LanceFileVersion;
4556 use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
4557 use arrow_schema::DataType;
4558 use std::collections::HashMap;
4559 use std::{collections::VecDeque, sync::Arc};
4560
4561 #[test]
4562 fn test_is_narrow() {
4563 let int8_array = Int8Array::from(vec![1, 2, 3]);
4564 let array_ref: ArrayRef = Arc::new(int8_array);
4565 let block = DataBlock::from_array(array_ref);
4566
4567 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4568
4569 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4570 let block = DataBlock::from_array(string_array);
4571 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4572
4573 let string_array = StringArray::from(vec![
4574 Some("hello world".repeat(100)),
4575 Some("world".to_string()),
4576 ]);
4577 let block = DataBlock::from_array(string_array);
4578 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4579 }
4580
4581 #[test]
4582 fn test_map_range() {
4583 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4586 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4587 let max_visible_def = 0;
4588 let total_items = 8;
4589 let max_rep = 1;
4590
4591 let check = |range, expected_item_range, expected_level_range| {
4592 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4593 range,
4594 rep.as_ref(),
4595 def.as_ref(),
4596 max_rep,
4597 max_visible_def,
4598 total_items,
4599 PreambleAction::Absent,
4600 );
4601 assert_eq!(item_range, expected_item_range);
4602 assert_eq!(level_range, expected_level_range);
4603 };
4604
4605 check(0..1, 0..3, 0..3);
4606 check(1..2, 3..5, 3..5);
4607 check(2..3, 5..5, 5..6);
4608 check(3..4, 5..8, 6..9);
4609 check(0..2, 0..5, 0..5);
4610 check(1..3, 3..5, 3..6);
4611 check(2..4, 5..8, 5..9);
4612 check(0..3, 0..5, 0..6);
4613 check(1..4, 3..8, 3..9);
4614 check(0..4, 0..8, 0..9);
4615
4616 let rep = Some(vec![1, 1, 0, 1]);
4619 let def = Some(vec![1, 0, 0, 0]);
4620 let max_visible_def = 0;
4621 let total_items = 3;
4622
4623 let check = |range, expected_item_range, expected_level_range| {
4624 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4625 range,
4626 rep.as_ref(),
4627 def.as_ref(),
4628 max_rep,
4629 max_visible_def,
4630 total_items,
4631 PreambleAction::Absent,
4632 );
4633 assert_eq!(item_range, expected_item_range);
4634 assert_eq!(level_range, expected_level_range);
4635 };
4636
4637 check(0..1, 0..0, 0..1);
4638 check(1..2, 0..2, 1..3);
4639 check(2..3, 2..3, 3..4);
4640 check(0..2, 0..2, 0..3);
4641 check(1..3, 0..3, 1..4);
4642 check(0..3, 0..3, 0..4);
4643
4644 let rep = Some(vec![1, 1, 0, 1]);
4647 let def = Some(vec![0, 0, 0, 1]);
4648 let max_visible_def = 0;
4649 let total_items = 3;
4650
4651 let check = |range, expected_item_range, expected_level_range| {
4652 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4653 range,
4654 rep.as_ref(),
4655 def.as_ref(),
4656 max_rep,
4657 max_visible_def,
4658 total_items,
4659 PreambleAction::Absent,
4660 );
4661 assert_eq!(item_range, expected_item_range);
4662 assert_eq!(level_range, expected_level_range);
4663 };
4664
4665 check(0..1, 0..1, 0..1);
4666 check(1..2, 1..3, 1..3);
4667 check(2..3, 3..3, 3..4);
4668 check(0..2, 0..3, 0..3);
4669 check(1..3, 1..3, 1..4);
4670 check(0..3, 0..3, 0..4);
4671
4672 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4675 let def: Option<&[u16]> = None;
4676 let max_visible_def = 0;
4677 let total_items = 6;
4678
4679 let check = |range, expected_item_range, expected_level_range| {
4680 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4681 range,
4682 rep.as_ref(),
4683 def.as_ref(),
4684 max_rep,
4685 max_visible_def,
4686 total_items,
4687 PreambleAction::Absent,
4688 );
4689 assert_eq!(item_range, expected_item_range);
4690 assert_eq!(level_range, expected_level_range);
4691 };
4692
4693 check(0..1, 0..2, 0..2);
4694 check(1..2, 2..4, 2..4);
4695 check(2..3, 4..6, 4..6);
4696 check(0..2, 0..4, 0..4);
4697 check(1..3, 2..6, 2..6);
4698 check(0..3, 0..6, 0..6);
4699
4700 let rep: Option<&[u16]> = None;
4703 let def = Some(vec![0, 0, 1, 0]);
4704 let max_visible_def = 1;
4705 let total_items = 4;
4706
4707 let check = |range, expected_item_range, expected_level_range| {
4708 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4709 range,
4710 rep.as_ref(),
4711 def.as_ref(),
4712 max_rep,
4713 max_visible_def,
4714 total_items,
4715 PreambleAction::Absent,
4716 );
4717 assert_eq!(item_range, expected_item_range);
4718 assert_eq!(level_range, expected_level_range);
4719 };
4720
4721 check(0..1, 0..1, 0..1);
4722 check(1..2, 1..2, 1..2);
4723 check(2..3, 2..3, 2..3);
4724 check(0..2, 0..2, 0..2);
4725 check(1..3, 1..3, 1..3);
4726 check(0..3, 0..3, 0..3);
4727
4728 let rep = Some(vec![0, 1, 0, 1]);
4733 let def = Some(vec![0, 0, 0, 1]);
4734 let max_visible_def = 0;
4735 let total_items = 3;
4736
4737 let check = |range, expected_item_range, expected_level_range| {
4738 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4739 range,
4740 rep.as_ref(),
4741 def.as_ref(),
4742 max_rep,
4743 max_visible_def,
4744 total_items,
4745 PreambleAction::Take,
4746 );
4747 assert_eq!(item_range, expected_item_range);
4748 assert_eq!(level_range, expected_level_range);
4749 };
4750
4751 check(0..1, 0..3, 0..3);
4753 check(0..2, 0..3, 0..4);
4754
4755 let check = |range, expected_item_range, expected_level_range| {
4756 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4757 range,
4758 rep.as_ref(),
4759 def.as_ref(),
4760 max_rep,
4761 max_visible_def,
4762 total_items,
4763 PreambleAction::Skip,
4764 );
4765 assert_eq!(item_range, expected_item_range);
4766 assert_eq!(level_range, expected_level_range);
4767 };
4768
4769 check(0..1, 1..3, 1..3);
4770 check(1..2, 3..3, 3..4);
4771 check(0..2, 1..3, 1..4);
4772
4773 let rep = Some(vec![0, 1, 1, 0]);
4778 let def = Some(vec![0, 1, 0, 0]);
4779 let max_visible_def = 0;
4780 let total_items = 4;
4781
4782 let check = |range, expected_item_range, expected_level_range| {
4783 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4784 range,
4785 rep.as_ref(),
4786 def.as_ref(),
4787 max_rep,
4788 max_visible_def,
4789 total_items,
4790 PreambleAction::Take,
4791 );
4792 assert_eq!(item_range, expected_item_range);
4793 assert_eq!(level_range, expected_level_range);
4794 };
4795
4796 check(0..1, 0..1, 0..2);
4798 check(0..2, 0..3, 0..4);
4799
4800 let check = |range, expected_item_range, expected_level_range| {
4801 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4802 range,
4803 rep.as_ref(),
4804 def.as_ref(),
4805 max_rep,
4806 max_visible_def,
4807 total_items,
4808 PreambleAction::Skip,
4809 );
4810 assert_eq!(item_range, expected_item_range);
4811 assert_eq!(level_range, expected_level_range);
4812 };
4813
4814 check(0..1, 1..1, 1..2);
4816 check(1..2, 1..3, 2..4);
4817 check(0..2, 1..3, 1..4);
4818
4819 let rep = Some(vec![0, 1, 0, 1]);
4822 let def: Option<Vec<u16>> = None;
4823 let max_visible_def = 0;
4824 let total_items = 4;
4825
4826 let check = |range, expected_item_range, expected_level_range| {
4827 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4828 range,
4829 rep.as_ref(),
4830 def.as_ref(),
4831 max_rep,
4832 max_visible_def,
4833 total_items,
4834 PreambleAction::Take,
4835 );
4836 assert_eq!(item_range, expected_item_range);
4837 assert_eq!(level_range, expected_level_range);
4838 };
4839
4840 check(0..1, 0..3, 0..3);
4842 check(0..2, 0..4, 0..4);
4843
4844 let check = |range, expected_item_range, expected_level_range| {
4845 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4846 range,
4847 rep.as_ref(),
4848 def.as_ref(),
4849 max_rep,
4850 max_visible_def,
4851 total_items,
4852 PreambleAction::Skip,
4853 );
4854 assert_eq!(item_range, expected_item_range);
4855 assert_eq!(level_range, expected_level_range);
4856 };
4857
4858 check(0..1, 1..3, 1..3);
4859 check(1..2, 3..4, 3..4);
4860 check(0..2, 1..4, 1..4);
4861
4862 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
4866 let def = Some(vec![0, 1, 2, 0, 0, 0]);
4867 let max_rep = 2;
4868 let max_visible_def = 0;
4869 let total_items = 4;
4870
4871 let check = |range, expected_item_range, expected_level_range| {
4872 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4873 range,
4874 rep.as_ref(),
4875 def.as_ref(),
4876 max_rep,
4877 max_visible_def,
4878 total_items,
4879 PreambleAction::Absent,
4880 );
4881 assert_eq!(item_range, expected_item_range);
4882 assert_eq!(level_range, expected_level_range);
4883 };
4884
4885 check(0..3, 0..4, 0..6);
4886 check(0..1, 0..1, 0..2);
4887 check(1..2, 1..3, 2..5);
4888 check(2..3, 3..4, 5..6);
4889
4890 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
4892 let def = Some(vec![0, 1, 0, 0, 0, 0]);
4893 let max_rep = 1;
4894 let max_visible_def = 0;
4895 let total_items = 5;
4896
4897 let check = |range, expected_item_range, expected_level_range| {
4898 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4899 range,
4900 rep.as_ref(),
4901 def.as_ref(),
4902 max_rep,
4903 max_visible_def,
4904 total_items,
4905 PreambleAction::Take,
4906 );
4907 assert_eq!(item_range, expected_item_range);
4908 assert_eq!(level_range, expected_level_range);
4909 };
4910
4911 check(0..0, 0..1, 0..2);
4912 check(0..1, 0..3, 0..4);
4913 check(0..2, 0..4, 0..5);
4914
4915 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
4918 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
4919 let max_rep = 1;
4920 let max_visible_def = 0;
4921 let total_items = 5;
4922
4923 let check = |range, expected_item_range, expected_level_range| {
4924 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4925 range,
4926 rep.as_ref(),
4927 def.as_ref(),
4928 max_rep,
4929 max_visible_def,
4930 total_items,
4931 PreambleAction::Skip,
4932 );
4933 assert_eq!(item_range, expected_item_range);
4934 assert_eq!(level_range, expected_level_range);
4935 };
4936
4937 check(2..3, 2..4, 5..7);
4938 }
4939
4940 #[test]
4941 fn test_schedule_instructions() {
4942 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
4944 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
4945 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
4946
4947 let check = |user_ranges, expected_instructions| {
4948 let instructions =
4949 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4950 assert_eq!(instructions, expected_instructions);
4951 };
4952
4953 let expected_take_all = vec![
4955 ChunkInstructions {
4956 chunk_idx: 0,
4957 preamble: PreambleAction::Absent,
4958 rows_to_skip: 0,
4959 rows_to_take: 6,
4960 take_trailer: true,
4961 },
4962 ChunkInstructions {
4963 chunk_idx: 1,
4964 preamble: PreambleAction::Take,
4965 rows_to_skip: 0,
4966 rows_to_take: 2,
4967 take_trailer: false,
4968 },
4969 ChunkInstructions {
4970 chunk_idx: 2,
4971 preamble: PreambleAction::Absent,
4972 rows_to_skip: 0,
4973 rows_to_take: 5,
4974 take_trailer: true,
4975 },
4976 ChunkInstructions {
4977 chunk_idx: 3,
4978 preamble: PreambleAction::Take,
4979 rows_to_skip: 0,
4980 rows_to_take: 1,
4981 take_trailer: false,
4982 },
4983 ];
4984
4985 check(&[0..14], expected_take_all.clone());
4987
4988 check(
4990 &[
4991 0..1,
4992 1..2,
4993 2..3,
4994 3..4,
4995 4..5,
4996 5..6,
4997 6..7,
4998 7..8,
4999 8..9,
5000 9..10,
5001 10..11,
5002 11..12,
5003 12..13,
5004 13..14,
5005 ],
5006 expected_take_all,
5007 );
5008
5009 check(
5013 &[0..1, 3..4],
5014 vec![
5015 ChunkInstructions {
5016 chunk_idx: 0,
5017 preamble: PreambleAction::Absent,
5018 rows_to_skip: 0,
5019 rows_to_take: 1,
5020 take_trailer: false,
5021 },
5022 ChunkInstructions {
5023 chunk_idx: 0,
5024 preamble: PreambleAction::Absent,
5025 rows_to_skip: 3,
5026 rows_to_take: 1,
5027 take_trailer: false,
5028 },
5029 ],
5030 );
5031
5032 check(
5034 &[5..6],
5035 vec![
5036 ChunkInstructions {
5037 chunk_idx: 0,
5038 preamble: PreambleAction::Absent,
5039 rows_to_skip: 5,
5040 rows_to_take: 1,
5041 take_trailer: true,
5042 },
5043 ChunkInstructions {
5044 chunk_idx: 1,
5045 preamble: PreambleAction::Take,
5046 rows_to_skip: 0,
5047 rows_to_take: 0,
5048 take_trailer: false,
5049 },
5050 ],
5051 );
5052
5053 check(
5055 &[7..10],
5056 vec![
5057 ChunkInstructions {
5058 chunk_idx: 1,
5059 preamble: PreambleAction::Skip,
5060 rows_to_skip: 1,
5061 rows_to_take: 1,
5062 take_trailer: false,
5063 },
5064 ChunkInstructions {
5065 chunk_idx: 2,
5066 preamble: PreambleAction::Absent,
5067 rows_to_skip: 0,
5068 rows_to_take: 2,
5069 take_trailer: false,
5070 },
5071 ],
5072 );
5073 }
5074
5075 #[test]
5076 fn test_drain_instructions() {
5077 fn drain_from_instructions(
5078 instructions: &mut VecDeque<ChunkInstructions>,
5079 mut rows_desired: u64,
5080 need_preamble: &mut bool,
5081 skip_in_chunk: &mut u64,
5082 ) -> Vec<ChunkDrainInstructions> {
5083 let mut drain_instructions = Vec::with_capacity(instructions.len());
5085 while rows_desired > 0 || *need_preamble {
5086 let (next_instructions, consumed_chunk) = instructions
5087 .front()
5088 .unwrap()
5089 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5090 if consumed_chunk {
5091 instructions.pop_front();
5092 }
5093 drain_instructions.push(next_instructions);
5094 }
5095 drain_instructions
5096 }
5097
5098 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5100 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5101 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5102 let user_ranges = vec![1..7, 10..14];
5103
5104 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5106
5107 let mut to_drain = VecDeque::from(scheduled.clone());
5108
5109 let mut need_preamble = false;
5112 let mut skip_in_chunk = 0;
5113
5114 let next_batch =
5115 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5116
5117 assert!(!need_preamble);
5118 assert_eq!(skip_in_chunk, 4);
5119 assert_eq!(
5120 next_batch,
5121 vec![ChunkDrainInstructions {
5122 chunk_instructions: scheduled[0].clone(),
5123 rows_to_take: 4,
5124 rows_to_skip: 0,
5125 preamble_action: PreambleAction::Absent,
5126 }]
5127 );
5128
5129 let next_batch =
5130 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5131
5132 assert!(!need_preamble);
5133 assert_eq!(skip_in_chunk, 2);
5134
5135 assert_eq!(
5136 next_batch,
5137 vec![
5138 ChunkDrainInstructions {
5139 chunk_instructions: scheduled[0].clone(),
5140 rows_to_take: 1,
5141 rows_to_skip: 4,
5142 preamble_action: PreambleAction::Absent,
5143 },
5144 ChunkDrainInstructions {
5145 chunk_instructions: scheduled[1].clone(),
5146 rows_to_take: 1,
5147 rows_to_skip: 0,
5148 preamble_action: PreambleAction::Take,
5149 },
5150 ChunkDrainInstructions {
5151 chunk_instructions: scheduled[2].clone(),
5152 rows_to_take: 2,
5153 rows_to_skip: 0,
5154 preamble_action: PreambleAction::Absent,
5155 }
5156 ]
5157 );
5158
5159 let next_batch =
5160 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5161
5162 assert!(!need_preamble);
5163 assert_eq!(skip_in_chunk, 0);
5164
5165 assert_eq!(
5166 next_batch,
5167 vec![
5168 ChunkDrainInstructions {
5169 chunk_instructions: scheduled[2].clone(),
5170 rows_to_take: 1,
5171 rows_to_skip: 2,
5172 preamble_action: PreambleAction::Absent,
5173 },
5174 ChunkDrainInstructions {
5175 chunk_instructions: scheduled[3].clone(),
5176 rows_to_take: 1,
5177 rows_to_skip: 0,
5178 preamble_action: PreambleAction::Take,
5179 },
5180 ]
5181 );
5182
5183 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5185 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5186 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5187 let user_ranges = vec![0..28];
5188
5189 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5191
5192 let mut to_drain = VecDeque::from(scheduled.clone());
5193
5194 let mut need_preamble = false;
5197 let mut skip_in_chunk = 0;
5198
5199 let next_batch =
5200 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5201
5202 assert_eq!(
5203 next_batch,
5204 vec![
5205 ChunkDrainInstructions {
5206 chunk_instructions: scheduled[0].clone(),
5207 rows_to_take: 6,
5208 rows_to_skip: 0,
5209 preamble_action: PreambleAction::Absent,
5210 },
5211 ChunkDrainInstructions {
5212 chunk_instructions: scheduled[1].clone(),
5213 rows_to_take: 1,
5214 rows_to_skip: 0,
5215 preamble_action: PreambleAction::Take,
5216 },
5217 ]
5218 );
5219
5220 assert!(!need_preamble);
5221 assert_eq!(skip_in_chunk, 1);
5222
5223 let next_batch =
5226 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5227
5228 assert_eq!(
5229 next_batch,
5230 vec![
5231 ChunkDrainInstructions {
5232 chunk_instructions: scheduled[1].clone(),
5233 rows_to_take: 2,
5234 rows_to_skip: 1,
5235 preamble_action: PreambleAction::Skip,
5236 },
5237 ChunkDrainInstructions {
5238 chunk_instructions: scheduled[2].clone(),
5239 rows_to_take: 0,
5240 rows_to_skip: 0,
5241 preamble_action: PreambleAction::Take,
5242 },
5243 ]
5244 );
5245
5246 assert!(!need_preamble);
5247 assert_eq!(skip_in_chunk, 0);
5248 }
5249
5250 #[tokio::test]
5251 async fn test_fullzip_repetition_index_caching() {
5252 use crate::testing::SimulatedScheduler;
5253
5254 #[derive(Debug)]
5256 struct TestFixedDecompressor;
5257
5258 impl FixedPerValueDecompressor for TestFixedDecompressor {
5259 fn decompress(
5260 &self,
5261 _data: FixedWidthDataBlock,
5262 _num_rows: u64,
5263 ) -> crate::Result<DataBlock> {
5264 unimplemented!("Test decompressor")
5265 }
5266
5267 fn bits_per_value(&self) -> u64 {
5268 32
5269 }
5270 }
5271
5272 let rows_in_page = 100u64;
5274 let bytes_per_value = 4u64;
5275 let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
5276
5277 let mut rep_index_data = Vec::new();
5279 for i in 0..=rows_in_page {
5280 let offset = (i * 100) as u32; rep_index_data.extend_from_slice(&offset.to_le_bytes());
5282 }
5283
5284 let mut full_data = vec![0u8; 1000];
5286 full_data.extend_from_slice(&rep_index_data);
5287 full_data.extend_from_slice(&vec![0u8; 10000]); let data = bytes::Bytes::from(full_data);
5290 let io = Arc::new(SimulatedScheduler::new(data));
5291 let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024));
5292
5293 let mut scheduler = FullZipScheduler {
5295 data_buf_position: 0,
5296 rep_index: Some(FullZipRepIndexDetails {
5297 buf_position: 1000,
5298 bytes_per_value,
5299 }),
5300 priority: 0,
5301 rows_in_page,
5302 bits_per_offset: 32,
5303 details: Arc::new(FullZipDecodeDetails {
5304 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5305 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5306 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5307 max_rep: 0,
5308 max_visible_def: 0,
5309 }),
5310 cached_state: None,
5311 enable_cache: true, };
5313
5314 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5316 let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5317
5318 let is_cached = cached_data1
5320 .clone()
5321 .as_arc_any()
5322 .downcast::<FullZipCacheableState>()
5323 .is_ok();
5324 assert!(
5325 is_cached,
5326 "Expected FullZipCacheableState, got NoCachedPageData"
5327 );
5328
5329 scheduler.load(&cached_data1);
5331
5332 assert!(
5334 scheduler.cached_state.is_some(),
5335 "cached_state should be populated after load"
5336 );
5337
5338 let cached_state = scheduler.cached_state.as_ref().unwrap();
5340
5341 let ranges = vec![0..10, 20..30];
5343 let result = scheduler.schedule_ranges_rep(
5344 &ranges,
5345 &io_dyn,
5346 FullZipRepIndexDetails {
5347 buf_position: 1000,
5348 bytes_per_value,
5349 },
5350 );
5351
5352 assert!(
5354 result.is_ok(),
5355 "schedule_ranges_rep should succeed with cached data"
5356 );
5357
5358 let mut scheduler2 = FullZipScheduler {
5360 data_buf_position: 0,
5361 rep_index: Some(FullZipRepIndexDetails {
5362 buf_position: 1000,
5363 bytes_per_value,
5364 }),
5365 priority: 0,
5366 rows_in_page,
5367 bits_per_offset: 32,
5368 details: scheduler.details.clone(),
5369 cached_state: None,
5370 enable_cache: true, };
5372
5373 scheduler2.load(&cached_data1);
5375 assert!(
5376 scheduler2.cached_state.is_some(),
5377 "Second scheduler should have cached_state after load"
5378 );
5379
5380 let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5382 assert!(
5383 Arc::ptr_eq(cached_state, cached_state2),
5384 "Both schedulers should share the same cached data"
5385 );
5386 }
5387
5388 #[tokio::test]
5389 async fn test_fullzip_cache_config_controls_caching() {
5390 use crate::testing::SimulatedScheduler;
5391
5392 #[derive(Debug)]
5394 struct TestFixedDecompressor;
5395
5396 impl FixedPerValueDecompressor for TestFixedDecompressor {
5397 fn decompress(
5398 &self,
5399 _data: FixedWidthDataBlock,
5400 _num_rows: u64,
5401 ) -> crate::Result<DataBlock> {
5402 unimplemented!("Test decompressor")
5403 }
5404
5405 fn bits_per_value(&self) -> u64 {
5406 32
5407 }
5408 }
5409
5410 let rows_in_page = 1000_u64;
5412 let bytes_per_value = 4_u64;
5413
5414 let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5416 let value_data = vec![0u8; 4000]; let mut full_data = vec![0u8; 1000]; full_data.extend_from_slice(&rep_index_data);
5419 full_data.extend_from_slice(&value_data);
5420
5421 let data = bytes::Bytes::from(full_data);
5422 let io = Arc::new(SimulatedScheduler::new(data));
5423
5424 let mut scheduler_no_cache = FullZipScheduler {
5426 data_buf_position: 0,
5427 rep_index: Some(FullZipRepIndexDetails {
5428 buf_position: 1000,
5429 bytes_per_value,
5430 }),
5431 priority: 0,
5432 rows_in_page,
5433 bits_per_offset: 32,
5434 details: Arc::new(FullZipDecodeDetails {
5435 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5436 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5437 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5438 max_rep: 0,
5439 max_visible_def: 0,
5440 }),
5441 cached_state: None,
5442 enable_cache: false, };
5444
5445 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5446 let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5447
5448 assert!(
5450 cached_data
5451 .as_arc_any()
5452 .downcast_ref::<super::NoCachedPageData>()
5453 .is_some(),
5454 "With enable_cache=false, should return NoCachedPageData"
5455 );
5456
5457 let mut scheduler_with_cache = FullZipScheduler {
5459 data_buf_position: 0,
5460 rep_index: Some(FullZipRepIndexDetails {
5461 buf_position: 1000,
5462 bytes_per_value,
5463 }),
5464 priority: 0,
5465 rows_in_page,
5466 bits_per_offset: 32,
5467 details: Arc::new(FullZipDecodeDetails {
5468 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5469 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5470 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5471 max_rep: 0,
5472 max_visible_def: 0,
5473 }),
5474 cached_state: None,
5475 enable_cache: true, };
5477
5478 let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5479
5480 assert!(
5482 cached_data2
5483 .as_arc_any()
5484 .downcast_ref::<super::FullZipCacheableState>()
5485 .is_some(),
5486 "With enable_cache=true, should return FullZipCacheableState"
5487 );
5488 }
5489
5490 #[tokio::test]
5492 async fn test_fuzz_issue_4492_empty_rep_values() {
5493 use lance_datagen::{array, gen_batch, RowCount, Seed};
5494
5495 let seed = 1823859942947654717u64;
5496 let num_rows = 2741usize;
5497
5498 let batch_gen = gen_batch().with_seed(Seed::from(seed));
5500 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
5501 let list_generator = array::rand_list_any(base_generator, false);
5502
5503 let batch = batch_gen
5504 .anon_col(list_generator)
5505 .into_batch_rows(RowCount::from(num_rows as u64))
5506 .unwrap();
5507
5508 let list_array = batch.column(0).clone();
5509
5510 let mut metadata = HashMap::new();
5512 metadata.insert(
5513 STRUCTURAL_ENCODING_META_KEY.to_string(),
5514 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5515 );
5516
5517 let test_cases = TestCases::default()
5518 .with_min_file_version(LanceFileVersion::V2_1)
5519 .with_batch_size(100)
5520 .with_range(0..num_rows.min(500) as u64)
5521 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
5522
5523 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
5524 }
5525
5526 #[tokio::test]
5527 async fn test_large_dictionary_general_compression() {
5528 use arrow_array::{ArrayRef, StringArray};
5529 use std::collections::HashMap;
5530 use std::sync::Arc;
5531
5532 let unique_values: Vec<String> = (0..100)
5535 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
5536 .collect();
5537
5538 let repeated_strings: Vec<_> = unique_values
5540 .iter()
5541 .cycle()
5542 .take(100_000)
5543 .map(|s| Some(s.as_str()))
5544 .collect();
5545
5546 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
5547
5548 let test_cases = TestCases::default()
5550 .with_min_file_version(LanceFileVersion::V2_2)
5551 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
5552 assert_eq!(cols.len(), 1);
5553 let col = &cols[0];
5554
5555 if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
5557 if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
5559 if let Some(dictionary_encoding) = &mini_block.dictionary {
5560 match dictionary_encoding.compression.as_ref() {
5561 Some(Compression::General(general)) => {
5562 let compression = general.compression.as_ref().unwrap();
5564 assert!(
5565 compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
5566 || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
5567 "Expected LZ4 or Zstd compression for large dictionary"
5568 );
5569 }
5570 _ => panic!("Expected General compression for large dictionary"),
5571 }
5572 }
5573 }
5574 }
5575 }));
5576
5577 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
5578 }
5579
5580 fn create_test_fixed_data_block(num_values: u64, cardinality: u64) -> DataBlock {
5584 use crate::statistics::Stat;
5585
5586 let block_info = BlockInfo::default();
5587
5588 let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality]));
5590 block_info
5591 .0
5592 .write()
5593 .unwrap()
5594 .insert(Stat::Cardinality, cardinality_array);
5595
5596 DataBlock::FixedWidth(FixedWidthDataBlock {
5597 bits_per_value: 32,
5598 data: crate::buffer::LanceBuffer::from(vec![0u8; (num_values * 4) as usize]),
5599 num_values,
5600 block_info,
5601 })
5602 }
5603
5604 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
5606 use crate::statistics::Stat;
5607 use arrow_array::StringArray;
5608
5609 assert!(cardinality <= num_values && cardinality > 0);
5610
5611 let mut values = Vec::with_capacity(num_values as usize);
5612 for i in 0..num_values {
5613 values.push(format!("value_{:016}", i % cardinality));
5614 }
5615
5616 let array = StringArray::from(values);
5617 let block = DataBlock::from_array(Arc::new(array) as ArrayRef);
5618
5619 if let DataBlock::VariableWidth(ref var_block) = block {
5621 let mut info = var_block.block_info.0.write().unwrap();
5622 info.insert(
5624 Stat::Cardinality,
5625 Arc::new(UInt64Array::from(vec![cardinality])),
5626 );
5627 }
5628
5629 block
5630 }
5631
5632 #[test]
5633 fn test_estimate_dict_size_fixed_width() {
5634 use crate::encodings::logical::primitive::dict::{
5635 DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
5636 };
5637
5638 let block = create_test_fixed_data_block(1000, 400);
5639 let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5640
5641 let expected_dict_size = 400 * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
5644 let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8);
5645 let expected_total = expected_dict_size + expected_indices_size;
5646
5647 assert_eq!(estimated_size, expected_total);
5648 }
5649
5650 #[test]
5651 fn test_estimate_dict_size_variable_width() {
5652 let block = create_test_variable_width_block(1000, 400);
5653 let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5654
5655 let data_size = block.data_size();
5657 let avg_value_size = data_size / 1000;
5658
5659 let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4;
5660
5661 assert_eq!(estimated_size, expected);
5662 }
5663
5664 #[test]
5665 fn test_should_dictionary_encode() {
5666 use crate::constants::DICT_SIZE_RATIO_META_KEY;
5667 use lance_core::datatypes::Field as LanceField;
5668
5669 let block = create_test_variable_width_block(1000, 10);
5671
5672 let mut metadata = HashMap::new();
5673 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5674 let arrow_field =
5675 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5676 let field = LanceField::try_from(&arrow_field).unwrap();
5677
5678 let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5679
5680 assert!(result, "Should use dictionary encode based on size");
5681 }
5682
5683 #[test]
5684 fn test_should_not_dictionary_encode() {
5685 use crate::constants::DICT_SIZE_RATIO_META_KEY;
5686 use lance_core::datatypes::Field as LanceField;
5687
5688 let block = create_test_fixed_data_block(1000, 10);
5689
5690 let mut metadata = HashMap::new();
5691 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5692 let arrow_field =
5693 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5694 let field = LanceField::try_from(&arrow_field).unwrap();
5695
5696 let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5697
5698 assert!(!result, "Should not use dictionary encode based on size");
5699 }
5700}